diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index d87142ca7..0c58625fd 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -715,7 +715,6 @@ func (h *healSequence) healItemsFromSourceCh() error { func (h *healSequence) healFromSourceCh() { h.healItemsFromSourceCh() - close(h.traverseAndHealDoneCh) } func (h *healSequence) healItems(bucketsOnly bool) error { diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 499820e59..5284e5311 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -293,10 +293,12 @@ func (api objectAPIHandlers) ListBucketsHandler(w http.ResponseWriter, r *http.R for i := range bucketsInfo { meta, err := loadBucketMetadata(ctx, objectAPI, bucketsInfo[i].Name) - logger.LogIf(ctx, err) if err == nil { bucketsInfo[i].Created = meta.Created } + if err != errMetaDataConverted { + logger.LogIf(ctx, err) + } } } @@ -1103,7 +1105,7 @@ func (api objectAPIHandlers) GetBucketObjectLockConfigHandler(w http.ResponseWri } meta, err := loadBucketMetadata(ctx, objectAPI, bucket) - if err != nil { + if err != nil && err != errMetaDataConverted { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } diff --git a/cmd/bucket-meta.go b/cmd/bucket-meta.go index a8aff7ccc..c830cd26e 100644 --- a/cmd/bucket-meta.go +++ b/cmd/bucket-meta.go @@ -115,13 +115,15 @@ func (b *bucketMetadata) convertLegacyLockconfig(ctx context.Context, objectAPI return err } - logger.LogIf(ctx, deleteConfig(ctx, objectAPI, configFile)) + if err := deleteConfig(ctx, objectAPI, configFile); err != nil && !errors.Is(err, errConfigNotFound) { + logger.LogIf(ctx, err) + } return nil } configData, err := readConfig(ctx, objectAPI, configFile) if err != nil { - if err != errConfigNotFound { + if !errors.Is(err, errConfigNotFound) { return err } diff --git a/cmd/bucket-quota.go b/cmd/bucket-quota.go index 6f7103573..9a9eea476 100644 --- a/cmd/bucket-quota.go +++ b/cmd/bucket-quota.go @@ -77,7 +77,7 @@ func (sys *BucketQuotaSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error return errServerNotInitialized } - // In gateway mode, we always fetch the bucket lifecycle configuration from the gateway backend. + // In gateway mode, we do not support bucket quota. // So, this is a no-op for gateway servers. if globalIsGateway { return nil diff --git a/cmd/config-encrypted.go b/cmd/config-encrypted.go index 4cac84aa1..170365381 100644 --- a/cmd/config-encrypted.go +++ b/cmd/config-encrypted.go @@ -36,42 +36,10 @@ func handleEncryptedConfigBackend(objAPI ObjectLayer, server bool) error { return nil } - // If its server mode or nas gateway, migrate the backend. - doneCh := make(chan struct{}) - - var encrypted bool - var err error - - // Migrating Config backend needs a retry mechanism for - // the following reasons: - // - Read quorum is lost just after the initialization - // of the object layer. - retryTimerCh := newRetryTimerSimple(doneCh) - var stop bool - - rquorum := InsufficientReadQuorum{} - wquorum := InsufficientWriteQuorum{} - - for !stop { - select { - case <-retryTimerCh: - if encrypted, err = checkBackendEncrypted(objAPI); err != nil { - if errors.Is(err, errDiskNotFound) || - errors.As(err, &rquorum) || - isErrBucketNotFound(err) { - logger.Info("Waiting for config backend to be encrypted..") - continue - } - close(doneCh) - return err - } - stop = true - case <-globalOSSignalCh: - close(doneCh) - return fmt.Errorf("Config encryption process stopped gracefully") - } + encrypted, err := checkBackendEncrypted(objAPI) + if err != nil { + return fmt.Errorf("Unable to encrypt config %w", err) } - close(doneCh) if encrypted { // backend is encrypted, but credentials are not specified @@ -91,34 +59,12 @@ func handleEncryptedConfigBackend(objAPI ObjectLayer, server bool) error { } } - doneCh = make(chan struct{}) - defer close(doneCh) - - retryTimerCh = newRetryTimerSimple(doneCh) - - // Migrating Config backend needs a retry mechanism for - // the following reasons: - // - Read quorum is lost just after the initialization - // of the object layer. - for { - select { - case <-retryTimerCh: - // Migrate IAM configuration - if err = migrateConfigPrefixToEncrypted(objAPI, globalOldCred, encrypted); err != nil { - if errors.Is(err, errDiskNotFound) || - errors.As(err, &rquorum) || - errors.As(err, &wquorum) || - isErrBucketNotFound(err) { - logger.Info("Waiting for config backend to be encrypted..") - continue - } - return err - } - return nil - case <-globalOSSignalCh: - return fmt.Errorf("Config encryption process stopped gracefully") - } + // Migrate IAM configuration + if err = migrateConfigPrefixToEncrypted(objAPI, globalOldCred, encrypted); err != nil { + return fmt.Errorf("Unable to migrate all config at .minio.sys/config/: %w", err) } + + return nil } const ( diff --git a/cmd/data-update-tracker.go b/cmd/data-update-tracker.go index 350a1ed0a..77c84eddf 100644 --- a/cmd/data-update-tracker.go +++ b/cmd/data-update-tracker.go @@ -352,7 +352,9 @@ func (d *dataUpdateTracker) deserialize(src io.Reader, newerThan time.Time) erro // Version if _, err := io.ReadFull(src, tmp[:1]); err != nil { if d.debug { - logger.LogIf(ctx, err) + if err != io.EOF { + logger.LogIf(ctx, err) + } } return err } diff --git a/cmd/format-fs.go b/cmd/format-fs.go index 93330b0ba..a300cd87d 100644 --- a/cmd/format-fs.go +++ b/cmd/format-fs.go @@ -190,7 +190,7 @@ func initFormatFS(ctx context.Context, fsPath string) (rlk *lock.RLockedFile, er fsFormatPath := pathJoin(fsPath, minioMetaBucket, formatConfigFile) // Add a deployment ID, if it does not exist. - if err := formatFSFixDeploymentID(fsFormatPath); err != nil { + if err := formatFSFixDeploymentID(ctx, fsFormatPath); err != nil { return nil, err } @@ -288,7 +288,7 @@ func formatFSGetDeploymentID(rlk *lock.RLockedFile) (id string, err error) { } // Generate a deployment ID if one does not exist already. -func formatFSFixDeploymentID(fsFormatPath string) error { +func formatFSFixDeploymentID(ctx context.Context, fsFormatPath string) error { rlk, err := lock.RLockedOpenFile(fsFormatPath) if err == nil { // format.json can be empty in a rare condition when another @@ -339,11 +339,12 @@ func formatFSFixDeploymentID(fsFormatPath string) error { return time.Now().Round(time.Second).Sub(formatStartTime).String() } - doneCh := make(chan struct{}) - defer close(doneCh) + retryCtx, cancel := context.WithCancel(ctx) + // Indicate to our routine to exit cleanly upon return. + defer cancel() var wlk *lock.LockedFile - retryCh := newRetryTimerSimple(doneCh) + retryCh := newRetryTimerSimple(retryCtx) var stop bool for !stop { select { @@ -358,7 +359,7 @@ func formatFSFixDeploymentID(fsFormatPath string) error { return err } stop = true - case <-globalOSSignalCh: + case <-ctx.Done(): return fmt.Errorf("Initializing FS format stopped gracefully") } } diff --git a/cmd/retry.go b/cmd/retry.go index 48ea5d102..5c7b49a05 100644 --- a/cmd/retry.go +++ b/cmd/retry.go @@ -17,6 +17,7 @@ package cmd import ( + "context" "math/rand" "sync" "time" @@ -61,7 +62,7 @@ var globalRandomSource = rand.New(&lockedRandSource{ // until the maximum retry attempts are reached. - this function is a fully // configurable version, meant for only advanced use cases. For the most part // one should use newRetryTimerSimple and newRetryTimer. -func newRetryTimerWithJitter(unit time.Duration, cap time.Duration, jitter float64, doneCh chan struct{}) <-chan int { +func newRetryTimerWithJitter(ctx context.Context, unit time.Duration, cap time.Duration, jitter float64) <-chan int { attemptCh := make(chan int) // normalize jitter to the range [0, 1.0] @@ -100,7 +101,7 @@ func newRetryTimerWithJitter(unit time.Duration, cap time.Duration, jitter float select { // Attempts starts. case attemptCh <- nextBackoff: nextBackoff++ - case <-doneCh: + case <-ctx.Done(): // Stop the routine. return } @@ -108,7 +109,7 @@ func newRetryTimerWithJitter(unit time.Duration, cap time.Duration, jitter float // wait till next backoff time or till doneCh gets a message. select { case <-timer.C: - case <-doneCh: + case <-ctx.Done(): // stop the timer and return. timer.Stop() return @@ -130,6 +131,6 @@ const ( // newRetryTimerSimple creates a timer with exponentially increasing delays // until the maximum retry attempts are reached. - this function is a // simpler version with all default values. -func newRetryTimerSimple(doneCh chan struct{}) <-chan int { - return newRetryTimerWithJitter(defaultRetryUnit, defaultRetryCap, MaxJitter, doneCh) +func newRetryTimerSimple(ctx context.Context) <-chan int { + return newRetryTimerWithJitter(ctx, defaultRetryUnit, defaultRetryCap, MaxJitter) } diff --git a/cmd/server-main.go b/cmd/server-main.go index 1e090ebc5..57c436880 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -160,53 +160,45 @@ func newAllSubsystems() { globalBucketQuotaSys = NewBucketQuotaSys() } -func initSafeMode(buckets []BucketInfo) (err error) { +func initSafeMode() (err error) { newObject := newObjectLayerWithoutSafeModeFn() - // Construct path to config/transaction.lock for locking - transactionConfigPrefix := minioConfigPrefix + "/transaction.lock" - // Make sure to hold lock for entire migration to avoid // such that only one server should migrate the entire config // at a given time, this big transaction lock ensures this // appropriately. This is also true for rotation of encrypted // content. - objLock := newObject.NewNSLock(GlobalContext, minioMetaBucket, transactionConfigPrefix) - if err = objLock.GetLock(globalOperationTimeout); err != nil { - return err - } - - defer func(objLock RWLocker) { - objLock.Unlock() + txnLk := newObject.NewNSLock(GlobalContext, minioMetaBucket, minioConfigPrefix+"/transaction.lock") + defer func(txnLk RWLocker) { + txnLk.Unlock() if err != nil { var cerr config.Err + // For any config error, we don't need to drop into safe-mode + // instead its a user error and should be fixed by user. if errors.As(err, &cerr) { return } // Prints the formatted startup message in safe mode operation. + // Drops-into safe mode where users need to now manually recover + // the server. printStartupSafeModeMessage(getAPIEndpoints(), err) // Initialization returned error reaching safe mode and // not proceeding waiting for admin action. handleSignals() } - }(objLock) - - // Migrate all backend configs to encrypted backend configs, optionally - // handles rotating keys for encryption. - if err = handleEncryptedConfigBackend(newObject, true); err != nil { - return fmt.Errorf("Unable to handle encrypted backend for config, iam and policies: %w", err) - } + }(txnLk) // **** WARNING **** // Migrating to encrypted backend should happen before initialization of any // sub-systems, make sure that we do not move the above codeblock elsewhere. + // Create cancel context to control 'newRetryTimer' go routine. + retryCtx, cancel := context.WithCancel(GlobalContext) - // Validate and initialize all subsystems. - doneCh := make(chan struct{}) - defer close(doneCh) + // Indicate to our routine to exit cleanly upon return. + defer cancel() // Initializing sub-systems needs a retry mechanism for // the following reasons: @@ -214,43 +206,67 @@ func initSafeMode(buckets []BucketInfo) (err error) { // of the object layer. // - Write quorum not met when upgrading configuration // version is needed, migration is needed etc. - retryTimerCh := newRetryTimerSimple(doneCh) - for { - rquorum := InsufficientReadQuorum{} - wquorum := InsufficientWriteQuorum{} - bucketNotFound := BucketNotFound{} - var err error - select { - case n := <-retryTimerCh: - if err = initAllSubsystems(buckets, newObject); err != nil { - if errors.Is(err, errDiskNotFound) || - errors.As(err, &rquorum) || - errors.As(err, &wquorum) || - errors.As(err, &bucketNotFound) { - if n < 5 { - logger.Info("Waiting for all sub-systems to be initialized..") - } else { - logger.Info("Waiting for all sub-systems to be initialized.. %v", err) - } - continue + rquorum := InsufficientReadQuorum{} + wquorum := InsufficientWriteQuorum{} + optimeout := OperationTimedOut{} + for n := range newRetryTimerSimple(retryCtx) { + // let one of the server acquire the lock, if not let them timeout. + // which shall be retried again by this loop. + if err = txnLk.GetLock(leaderLockTimeout); err == nil { + // Migrate all backend configs to encrypted backend configs, optionally + // handles rotating keys for encryption, if there is any retriable failure + // that shall be retried if there is an error. + if err = handleEncryptedConfigBackend(newObject, true); err == nil { + // Upon success migrating the config, initialize all sub-systems + // if all sub-systems initialized successfully return right away + if err = initAllSubsystems(newObject); err == nil { + return nil } - return err } - return nil - case <-globalOSSignalCh: - if err == nil { - return errors.New("Initializing sub-systems stopped gracefully") + } + + // One of these retriable errors shall be retried. + if errors.Is(err, errDiskNotFound) || + errors.Is(err, errConfigNotFound) || + errors.Is(err, context.Canceled) || + errors.Is(err, context.DeadlineExceeded) || + errors.As(err, &optimeout) || + errors.As(err, &rquorum) || + errors.As(err, &wquorum) || + isErrBucketNotFound(err) { + if n < 5 { + logger.Info("Waiting for all MinIO sub-systems to be initialized..") + } else { + logger.Info("Waiting for all MinIO sub-systems to be initialized.. possible cause (%v)", err) } - return fmt.Errorf("Unable to initialize sub-systems: %w", err) + continue } + + // Any other unhandled return right here. + return fmt.Errorf("Unable to initialize sub-systems: %w", err) + } + + // Return an error when retry is canceled or deadlined + if err = retryCtx.Err(); err != nil { + return fmt.Errorf("Unable to initialize sub-systems: %w", err) } + + // Retry was canceled successfully. + return errors.New("Initializing sub-systems stopped gracefully") } -func initAllSubsystems(buckets []BucketInfo, newObject ObjectLayer) (err error) { +func initAllSubsystems(newObject ObjectLayer) (err error) { + // List buckets to be re-used for loading configs. + buckets, err := newObject.ListBuckets(GlobalContext) + if err != nil { + return fmt.Errorf("Unable to list buckets: %w", err) + } + // Initialize config system. if err = globalConfigSys.Init(newObject); err != nil { return fmt.Errorf("Unable to initialize config system: %w", err) } + if globalEtcdClient != nil { // **** WARNING **** // Migrating to encrypted backend on etcd should happen before initialization of @@ -294,6 +310,11 @@ func initAllSubsystems(buckets []BucketInfo, newObject ObjectLayer) (err error) return fmt.Errorf("Unable to initialize bucket quota system: %w", err) } + // Populate existing buckets to the etcd backend + if globalDNSConfig != nil { + initFederatorBackend(buckets, newObject) + } + return nil } @@ -458,13 +479,7 @@ func serverMain(ctx *cli.Context) { go startBackgroundOps(GlobalContext, newObject) - // Calls New() and initializes all sub-systems. - buckets, err := newObject.ListBuckets(GlobalContext) - if err != nil { - logger.Fatal(err, "Unable to list buckets") - } - - logger.FatalIf(initSafeMode(buckets), "Unable to initialize server switching into safe-mode") + logger.FatalIf(initSafeMode(), "Unable to initialize server switching into safe-mode") if globalCacheConfig.Enabled { // initialize the new disk cache objects. @@ -477,11 +492,6 @@ func serverMain(ctx *cli.Context) { globalObjLayerMutex.Unlock() } - // Populate existing buckets to the etcd backend - if globalDNSConfig != nil { - initFederatorBackend(buckets, newObject) - } - // Disable safe mode operation, after all initialization is over. globalObjLayerMutex.Lock() globalSafeMode = false diff --git a/pkg/dsync/drwmutex.go b/pkg/dsync/drwmutex.go index c32b306f1..ca5364769 100644 --- a/pkg/dsync/drwmutex.go +++ b/pkg/dsync/drwmutex.go @@ -128,13 +128,16 @@ func (dm *DRWMutex) GetRLock(id, source string, timeout time.Duration) (locked b // algorithm until either the lock is acquired successfully or more // time has elapsed than the timeout value. func (dm *DRWMutex) lockBlocking(timeout time.Duration, id, source string, isReadLock bool) (locked bool) { - doneCh, start := make(chan struct{}), time.Now().UTC() - defer close(doneCh) + start := time.Now().UTC() restClnts := dm.clnt.GetLockersFn() + retryCtx, cancel := context.WithCancel(dm.ctx) + + defer cancel() + // Use incremental back-off algorithm for repeated attempts to acquire the lock - for range newRetryTimerSimple(doneCh) { + for range newRetryTimerSimple(retryCtx) { select { case <-dm.ctx.Done(): return diff --git a/pkg/dsync/retry.go b/pkg/dsync/retry.go index 7d72a1f5a..970ad872b 100644 --- a/pkg/dsync/retry.go +++ b/pkg/dsync/retry.go @@ -17,6 +17,7 @@ package dsync import ( + "context" "math/rand" "sync" "time" @@ -61,7 +62,7 @@ var globalRandomSource = rand.New(&lockedRandSource{ // until the maximum retry attempts are reached. - this function is a fully // configurable version, meant for only advanced use cases. For the most part // one should use newRetryTimerSimple and newRetryTimer. -func newRetryTimerWithJitter(unit time.Duration, cap time.Duration, jitter float64, doneCh <-chan struct{}) <-chan int { +func newRetryTimerWithJitter(ctx context.Context, unit time.Duration, cap time.Duration, jitter float64) <-chan int { attemptCh := make(chan int) // normalize jitter to the range [0, 1.0] @@ -100,7 +101,7 @@ func newRetryTimerWithJitter(unit time.Duration, cap time.Duration, jitter float select { // Attempts starts. case attemptCh <- nextBackoff: nextBackoff++ - case <-doneCh: + case <-ctx.Done(): // Stop the routine. return } @@ -108,7 +109,7 @@ func newRetryTimerWithJitter(unit time.Duration, cap time.Duration, jitter float // wait till next backoff time or till doneCh gets a message. select { case <-timer.C: - case <-doneCh: + case <-ctx.Done(): // stop the timer and return. timer.Stop() return @@ -130,13 +131,13 @@ const ( // newRetryTimer creates a timer with exponentially increasing delays // until the maximum retry attempts are reached. - this function provides // resulting retry values to be of maximum jitter. -func newRetryTimer(unit time.Duration, cap time.Duration, doneCh <-chan struct{}) <-chan int { - return newRetryTimerWithJitter(unit, cap, MaxJitter, doneCh) +func newRetryTimer(ctx context.Context, unit time.Duration, cap time.Duration) <-chan int { + return newRetryTimerWithJitter(ctx, unit, cap, MaxJitter) } // newRetryTimerSimple creates a timer with exponentially increasing delays // until the maximum retry attempts are reached. - this function is a // simpler version with all default values. -func newRetryTimerSimple(doneCh <-chan struct{}) <-chan int { - return newRetryTimerWithJitter(defaultRetryUnit, defaultRetryCap, MaxJitter, doneCh) +func newRetryTimerSimple(ctx context.Context) <-chan int { + return newRetryTimerWithJitter(ctx, defaultRetryUnit, defaultRetryCap, MaxJitter) } diff --git a/pkg/dsync/retry_test.go b/pkg/dsync/retry_test.go index bd8e14d7e..8fa5fc96d 100644 --- a/pkg/dsync/retry_test.go +++ b/pkg/dsync/retry_test.go @@ -17,25 +17,26 @@ package dsync import ( + "context" "testing" "time" ) // Tests for retry timer. func TestRetryTimerSimple(t *testing.T) { - doneCh := make(chan struct{}) - attemptCh := newRetryTimerSimple(doneCh) + rctx, cancel := context.WithCancel(context.Background()) + attemptCh := newRetryTimerSimple(rctx) i := <-attemptCh if i != 0 { - close(doneCh) + cancel() t.Fatalf("Invalid attempt counter returned should be 0, found %d instead", i) } i = <-attemptCh if i <= 0 { - close(doneCh) + cancel() t.Fatalf("Invalid attempt counter returned should be greater than 0, found %d instead", i) } - close(doneCh) + cancel() _, ok := <-attemptCh if ok { t.Fatal("Attempt counter should be closed") @@ -44,18 +45,19 @@ func TestRetryTimerSimple(t *testing.T) { // Test retry time with no jitter. func TestRetryTimerWithNoJitter(t *testing.T) { - doneCh := make(chan struct{}) + rctx, cancel := context.WithCancel(context.Background()) + // No jitter - attemptCh := newRetryTimerWithJitter(time.Millisecond, 5*time.Millisecond, NoJitter, doneCh) + attemptCh := newRetryTimerWithJitter(rctx, time.Millisecond, 5*time.Millisecond, NoJitter) i := <-attemptCh if i != 0 { - close(doneCh) + cancel() t.Fatalf("Invalid attempt counter returned should be 0, found %d instead", i) } // Loop through the maximum possible attempt. for i = range attemptCh { if i == 30 { - close(doneCh) + cancel() } } _, ok := <-attemptCh @@ -66,15 +68,16 @@ func TestRetryTimerWithNoJitter(t *testing.T) { // Test retry time with Jitter greater than MaxJitter. func TestRetryTimerWithJitter(t *testing.T) { - doneCh := make(chan struct{}) + rctx, cancel := context.WithCancel(context.Background()) + // Jitter will be set back to 1.0 - attemptCh := newRetryTimerWithJitter(time.Second, 30*time.Second, 2.0, doneCh) + attemptCh := newRetryTimerWithJitter(rctx, time.Second, 30*time.Second, 2.0) i := <-attemptCh if i != 0 { - close(doneCh) + cancel() t.Fatalf("Invalid attempt counter returned should be 0, found %d instead", i) } - close(doneCh) + cancel() _, ok := <-attemptCh if ok { t.Fatal("Attempt counter should be closed")