Avoid double for loops in notification init (#8691)

master
Harshavardhana 5 years ago committed by GitHub
parent 54431b3953
commit 99ad445260
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      buildscripts/gateway-tests.sh
  2. 59
      cmd/config-encrypted.go
  3. 27
      cmd/format-fs.go
  4. 58
      cmd/iam.go
  5. 16
      cmd/notification.go

@ -46,7 +46,10 @@ function main()
gw_pid="$(start_minio_gateway_s3)"
SERVER_ENDPOINT=127.0.0.1:24240 ENABLE_HTTPS=0 ACCESS_KEY=minio \
SECRET_KEY=minio123 MINT_MODE="full" /mint/entrypoint.sh
SECRET_KEY=minio123 MINT_MODE="full" /mint/entrypoint.sh \
aws-sdk-go aws-sdk-java aws-sdk-php aws-sdk-ruby awscli \
healthcheck mc minio-dotnet minio-go minio-java minio-py \
s3cmd security
rv=$?
kill "$sr_pid"

@ -20,6 +20,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"os"
"strings"
"unicode/utf8"
@ -39,7 +40,6 @@ func handleEncryptedConfigBackend(objAPI ObjectLayer, server bool) error {
// If its server mode or nas gateway, migrate the backend.
doneCh := make(chan struct{})
defer close(doneCh)
var encrypted bool
var err error
@ -48,17 +48,27 @@ func handleEncryptedConfigBackend(objAPI ObjectLayer, server bool) error {
// the following reasons:
// - Read quorum is lost just after the initialization
// of the object layer.
for range newRetryTimerSimple(doneCh) {
if encrypted, err = checkBackendEncrypted(objAPI); err != nil {
if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) {
logger.Info("Waiting for config backend to be encrypted..")
continue
retryTimerCh := newRetryTimerSimple(doneCh)
var stop bool
for !stop {
select {
case <-retryTimerCh:
if encrypted, err = checkBackendEncrypted(objAPI); err != nil {
if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) {
logger.Info("Waiting for config backend to be encrypted..")
continue
}
close(doneCh)
return err
}
return err
stop = true
case <-globalOSSignalCh:
close(doneCh)
return fmt.Errorf("Config encryption process stopped gracefully")
}
break
}
close(doneCh)
if encrypted {
// backend is encrypted, but credentials are not specified
@ -83,24 +93,33 @@ func handleEncryptedConfigBackend(objAPI ObjectLayer, server bool) error {
return err
}
doneCh = make(chan struct{})
defer close(doneCh)
retryTimerCh = newRetryTimerSimple(doneCh)
// Migrating Config backend needs a retry mechanism for
// the following reasons:
// - Read quorum is lost just after the initialization
// of the object layer.
for range newRetryTimerSimple(doneCh) {
// Migrate IAM configuration
if err = migrateConfigPrefixToEncrypted(objAPI, activeCredOld, encrypted); err != nil {
if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||
strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) {
logger.Info("Waiting for config backend to be encrypted..")
continue
for {
select {
case <-retryTimerCh:
// Migrate IAM configuration
if err = migrateConfigPrefixToEncrypted(objAPI, activeCredOld, encrypted); err != nil {
if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||
strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) {
logger.Info("Waiting for config backend to be encrypted..")
continue
}
return err
}
return err
return nil
case <-globalOSSignalCh:
return fmt.Errorf("Config encryption process stopped gracefully")
}
break
}
return nil
}
const (

@ -343,17 +343,24 @@ func formatFSFixDeploymentID(fsFormatPath string) error {
defer close(doneCh)
var wlk *lock.LockedFile
for range newRetryTimerSimple(doneCh) {
wlk, err = lock.TryLockedOpenFile(fsFormatPath, os.O_RDWR, 0)
if err == lock.ErrAlreadyLocked {
// Lock already present, sleep and attempt again
logger.Info("Another minio process(es) might be holding a lock to the file %s. Please kill that minio process(es) (elapsed %s)\n", fsFormatPath, getElapsedTime())
continue
retryCh := newRetryTimerSimple(doneCh)
var stop bool
for !stop {
select {
case <-retryCh:
wlk, err = lock.TryLockedOpenFile(fsFormatPath, os.O_RDWR, 0)
if err == lock.ErrAlreadyLocked {
// Lock already present, sleep and attempt again
logger.Info("Another minio process(es) might be holding a lock to the file %s. Please kill that minio process(es) (elapsed %s)\n", fsFormatPath, getElapsedTime())
continue
}
if err != nil {
return err
}
stop = true
case <-globalOSSignalCh:
return fmt.Errorf("Initializing FS format stopped gracefully")
}
break
}
if err != nil {
return err
}
defer wlk.Close()

@ -20,6 +20,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"
"sync"
@ -365,46 +366,41 @@ func (sys *IAMSys) Init(objAPI ObjectLayer) error {
doneCh := make(chan struct{})
defer close(doneCh)
// Migrating IAM needs a retry mechanism for
// Migrating IAM amd Loading IAM needs a retry mechanism for
// the following reasons:
// - Read quorum is lost just after the initialization
// of the object layer.
for range newRetryTimerSimple(doneCh) {
// Migrate IAM configuration
if err := sys.doIAMConfigMigration(objAPI); err != nil {
if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||
strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) {
logger.Info("Waiting for IAM subsystem to be initialized..")
continue
retryCh := newRetryTimerSimple(doneCh)
for {
select {
case <-retryCh:
// Migrate IAM configuration
if err := sys.doIAMConfigMigration(objAPI); err != nil {
if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||
strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) {
logger.Info("Waiting for IAM subsystem to be initialized..")
continue
}
return err
}
return err
}
break
}
sys.store.watch(sys)
sys.store.watch(sys)
// Initializing IAM needs a retry mechanism for
// the following reasons:
// - Read quorum is lost just after the initialization
// of the object layer.
for range newRetryTimerSimple(doneCh) {
// Load IAMSys once during boot. Need to pass in
// objAPI as server has not yet initialized.
if err := sys.store.loadAll(sys, objAPI); err != nil {
if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||
strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) {
logger.Info("Waiting for IAM subsystem to be initialized..")
continue
if err := sys.store.loadAll(sys, objAPI); err != nil {
if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||
strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) {
logger.Info("Waiting for IAM subsystem to be initialized..")
continue
}
return err
}
return err
return nil
case <-globalOSSignalCh:
return fmt.Errorf("Initializing IAM sub-system gracefully stopped")
}
break
}
return nil
}
// DeletePolicy - deletes a canned policy from backend or etcd.

@ -718,8 +718,7 @@ func (sys *NotificationSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error
// - Read quorum is lost just after the initialization
// of the object layer.
retryTimerCh := newRetryTimerSimple(doneCh)
stop := false
for !stop {
for {
select {
case <-retryTimerCh:
if err := sys.load(buckets, objAPI); err != nil {
@ -731,17 +730,8 @@ func (sys *NotificationSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error
}
return err
}
stop = true
case <-globalOSSignalCh:
return fmt.Errorf("Initializing Notification sub-system gracefully stopped")
}
}
// Initializing bucket retention config needs a retry mechanism if
// read quorum is lost just after the initialization of the object layer.
for {
select {
case <-retryTimerCh:
// Initializing bucket retention config needs a retry mechanism if
// read quorum is lost just after the initialization of the object layer.
if err := sys.initBucketObjectLockConfig(objAPI); err != nil {
if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||

Loading…
Cancel
Save