From cbc5d78a0999e94c68a7fede574a55c5b079e952 Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Mon, 8 Oct 2018 23:47:13 +0100 Subject: [PATCH] Handle read/quorum errors when initializing all subsystems (#6585) - Only require len(disks)/2 to initialize the cluster - Fix checking of read/write quorm in subsystems init - Add retry mechanism in policy and notification to avoid aborting in case of read/write quorums errors --- cmd/config.go | 4 ++- cmd/notification.go | 67 +++++++++++++++++++++++++++------------------ cmd/policy.go | 54 +++++++++++++++++++++++++----------- cmd/xl-sets.go | 2 +- 4 files changed, 83 insertions(+), 44 deletions(-) diff --git a/cmd/config.go b/cmd/config.go index b362ef455..66e422b2f 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -26,6 +26,7 @@ import ( "os" "path" "runtime" + "strings" "time" "github.com/minio/minio/cmd/logger" @@ -234,7 +235,8 @@ func (sys *ConfigSys) Init(objAPI ObjectLayer) error { case _ = <-retryTimerCh: err := initConfig(objAPI) if err != nil { - if isInsufficientReadQuorum(err) || isInsufficientWriteQuorum(err) { + if strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) || + strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) { logger.Info("Waiting for configuration to be initialized..") continue } diff --git a/cmd/notification.go b/cmd/notification.go index 505a2caf9..74dc4d11e 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -241,10 +241,10 @@ func (sys *NotificationSys) initListeners(ctx context.Context, objAPI ObjectLaye // and configFile, take a transaction lock to avoid data race between readConfig() // and saveConfig(). objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile) - if err := objLock.GetLock(globalOperationTimeout); err != nil { + if err := objLock.GetRLock(globalOperationTimeout); err != nil { return err } - defer objLock.Unlock() + defer objLock.RUnlock() reader, e := readConfig(ctx, objAPI, configFile) if e != nil && !IsErrIgnored(e, errDiskNotFound, errConfigNotFound) { @@ -265,7 +265,6 @@ func (sys *NotificationSys) initListeners(ctx context.Context, objAPI ObjectLaye return nil } - activeListenerList := []ListenBucketNotificationArgs{} for _, args := range listenerList { found, err := isLocalHost(args.Addr.Name) if err != nil { @@ -301,48 +300,64 @@ func (sys *NotificationSys) initListeners(ctx context.Context, objAPI ObjectLaye logger.LogIf(ctx, err) return err } - activeListenerList = append(activeListenerList, args) - } - - data, err := json.Marshal(activeListenerList) - if err != nil { - logger.LogIf(ctx, err) - return err } - return saveConfig(objAPI, configFile, data) + return nil } -// Init - initializes notification system from notification.xml and listener.json of all buckets. -func (sys *NotificationSys) Init(objAPI ObjectLayer) error { - if objAPI == nil { - return errInvalidArgument - } - +func (sys *NotificationSys) refresh(objAPI ObjectLayer) error { buckets, err := objAPI.ListBuckets(context.Background()) if err != nil { return err } - for _, bucket := range buckets { ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucket.Name}) config, err := readNotificationConfig(ctx, objAPI, bucket.Name) - if err != nil { - if !IsErrIgnored(err, errDiskNotFound, errNoSuchNotifications) { - return err - } - } else { - sys.AddRulesMap(bucket.Name, config.ToRulesMap()) + if err != nil && err != errNoSuchNotifications { + return err } - + if err == errNoSuchNotifications { + continue + } + sys.AddRulesMap(bucket.Name, config.ToRulesMap()) if err = sys.initListeners(ctx, objAPI, bucket.Name); err != nil { return err } } - return nil } +// Init - initializes notification system from notification.xml and listener.json of all buckets. +func (sys *NotificationSys) Init(objAPI ObjectLayer) error { + if objAPI == nil { + return errInvalidArgument + } + + doneCh := make(chan struct{}) + defer close(doneCh) + + // Initializing notification needs a retry mechanism for + // the following reasons: + // - Read quorum is lost just after the initialization + // of the object layer. + retryTimerCh := newRetryTimerSimple(doneCh) + for { + select { + case _ = <-retryTimerCh: + if err := sys.refresh(objAPI); err != nil { + if err == errDiskNotFound || + strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) || + strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) { + logger.Info("Waiting for notification subsystem to be initialized..") + continue + } + return err + } + return nil + } + } +} + // AddRulesMap - adds rules map for bucket name. func (sys *NotificationSys) AddRulesMap(bucketName string, rulesMap event.RulesMap) { sys.Lock() diff --git a/cmd/policy.go b/cmd/policy.go index 43a9462ab..5440fca1d 100644 --- a/cmd/policy.go +++ b/cmd/policy.go @@ -21,6 +21,7 @@ import ( "encoding/json" "net/http" "path" + "strings" "sync" "time" @@ -131,25 +132,46 @@ func (sys *PolicySys) Init(objAPI ObjectLayer) error { return errInvalidArgument } - // Load PolicySys once during boot. - if err := sys.refresh(objAPI); err != nil { - return err - } + defer func() { + // Refresh PolicySys in background. + go func() { + ticker := time.NewTicker(globalRefreshBucketPolicyInterval) + defer ticker.Stop() + for { + select { + case <-globalServiceDoneCh: + return + case <-ticker.C: + sys.refresh(objAPI) + } + } + }() + }() - // Refresh PolicySys in background. - go func() { - ticker := time.NewTicker(globalRefreshBucketPolicyInterval) - defer ticker.Stop() - for { - select { - case <-globalServiceDoneCh: - return - case <-ticker.C: - sys.refresh(objAPI) + doneCh := make(chan struct{}) + defer close(doneCh) + + // Initializing policy needs a retry mechanism for + // the following reasons: + // - Read quorum is lost just after the initialization + // of the object layer. + retryTimerCh := newRetryTimerSimple(doneCh) + for { + select { + case _ = <-retryTimerCh: + // Load PolicySys once during boot. + if err := sys.refresh(objAPI); err != nil { + if err == errDiskNotFound || + strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) || + strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) { + logger.Info("Waiting for policy subsystem to be initialized..") + continue + } + return err } + return nil } - }() - return nil + } } // NewPolicySys - creates new policy system. diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index ceb0eaf8f..a1eed3fd2 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -174,7 +174,7 @@ func (s *xlSets) reInitDisks(refFormat *formatXLV3, storageDisks []StorageAPI, f // any given sets. func (s *xlSets) connectDisksWithQuorum() { var onlineDisks int - for onlineDisks < (len(s.endpoints)/2)+1 { + for onlineDisks < len(s.endpoints)/2 { for _, endpoint := range s.endpoints { if s.isConnected(endpoint) { continue