From ca989eb0b37d6c95f8c48df94ebee2380de26e24 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 24 Sep 2020 09:53:38 -0700 Subject: [PATCH] avoid ListBuckets returning quorum errors when node is down (#10555) Also, revamp the way ListBuckets work make few portions of the healing logic parallel - walk objects for healing disks in parallel - collect the list of buckets in parallel across drives - provide consistent view for listBuckets() --- cmd/background-newdisks-heal-ops.go | 13 +++++++- cmd/bootstrap-peer-server.go | 2 +- cmd/erasure-bucket.go | 48 --------------------------- cmd/erasure-healing.go | 51 +++++++++++++++-------------- cmd/erasure-sets.go | 4 +-- cmd/erasure-zones.go | 24 +++++++------- cmd/global-heal.go | 38 +++++++++++---------- cmd/server-main.go | 24 ++------------ 8 files changed, 78 insertions(+), 126 deletions(-) diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index ce04d7179..d56508b23 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -153,11 +153,22 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, bgSeq *healS for _, ep := range endpoints { logger.Info("Healing disk '%s' on %s zone", ep, humanize.Ordinal(i+1)) - if err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex], z.zones[i].setDriveCount); err != nil { + buckets, err := z.ListBucketsHeal(ctx) + if err != nil { logger.LogIf(ctx, err) continue } + if len(buckets) > 0 { + disks := z.zones[i].sets[setIndex].getLoadBalancedDisks() + if err := healErasureSet(ctx, setIndex, buckets, disks, z.zones[i].setDriveCount); err != nil { + logger.LogIf(ctx, err) + continue + } + } + + logger.Info("Healing disk '%s' on %s zone complete", ep, humanize.Ordinal(i+1)) + // Only upon success pop the healed disk. globalBackgroundHealState.popHealLocalDisks(ep) } diff --git a/cmd/bootstrap-peer-server.go b/cmd/bootstrap-peer-server.go index f194f08a9..096e6b340 100644 --- a/cmd/bootstrap-peer-server.go +++ b/cmd/bootstrap-peer-server.go @@ -188,7 +188,7 @@ func verifyServerSystemConfig(ctx context.Context, endpointZones EndpointZones) retries++ // after 5 retries start logging that servers are not reachable yet if retries >= 5 { - logger.Info(fmt.Sprintf("Waiting for atleast %d servers to be online for bootstrap check", len(clnts)/2)) + logger.Info(fmt.Sprintf("Waiting for atleast %d remote servers to be online for bootstrap check", len(clnts)/2)) logger.Info(fmt.Sprintf("Following servers are currently offline or unreachable %s", offlineEndpoints)) retries = 0 // reset to log again after 5 retries. } diff --git a/cmd/erasure-bucket.go b/cmd/erasure-bucket.go index 6396cc3a6..d483b772d 100644 --- a/cmd/erasure-bucket.go +++ b/cmd/erasure-bucket.go @@ -18,7 +18,6 @@ package cmd import ( "context" - "sort" "github.com/minio/minio-go/v7/pkg/s3utils" "github.com/minio/minio/cmd/logger" @@ -122,53 +121,6 @@ func (er erasureObjects) GetBucketInfo(ctx context.Context, bucket string) (bi B return bucketInfo, nil } -// listBuckets - returns list of all buckets from a disk picked at random. -func (er erasureObjects) listBuckets(ctx context.Context) (bucketsInfo []BucketInfo, err error) { - for _, disk := range er.getLoadBalancedDisks() { - if disk == nil { - continue - } - var volsInfo []VolInfo - volsInfo, err = disk.ListVols(ctx) - if err == nil { - // NOTE: The assumption here is that volumes across all disks in - // readQuorum have consistent view i.e they all have same number - // of buckets. - var bucketsInfo []BucketInfo - for _, volInfo := range volsInfo { - if isReservedOrInvalidBucket(volInfo.Name, true) { - continue - } - bucketsInfo = append(bucketsInfo, BucketInfo(volInfo)) - } - // For buckets info empty, loop once again to check - // if we have, can happen if disks were down. - if len(bucketsInfo) == 0 { - continue - } - return bucketsInfo, nil - } - logger.LogIf(ctx, err) - // Ignore any disks not found. - if IsErrIgnored(err, bucketMetadataOpIgnoredErrs...) { - continue - } - break - } - return nil, err -} - -// ListBuckets - lists all the buckets, sorted by its name. -func (er erasureObjects) ListBuckets(ctx context.Context) ([]BucketInfo, error) { - bucketInfos, err := er.listBuckets(ctx) - if err != nil { - return nil, toObjectErr(err) - } - // Sort by bucket name before returning. - sort.Sort(byBucketName(bucketInfos)) - return bucketInfos, nil -} - // Dangling buckets should be handled appropriately, in this following situation // we actually have quorum error to be `nil` but we have some disks where // the bucket delete returned `errVolumeNotEmpty` but this is not correct diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index b915b9dfd..768ea96f9 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -155,34 +155,37 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints // listAllBuckets lists all buckets from all disks. It also // returns the occurrence of each buckets in all disks -func listAllBuckets(storageDisks []StorageAPI, healBuckets map[string]VolInfo) (err error) { - for _, disk := range storageDisks { - if disk == nil { - continue - } - var volsInfo []VolInfo - volsInfo, err = disk.ListVols(context.TODO()) - if err != nil { - if IsErrIgnored(err, bucketMetadataOpIgnoredErrs...) { - continue +func listAllBuckets(ctx context.Context, storageDisks []StorageAPI, healBuckets map[string]VolInfo) error { + g := errgroup.WithNErrs(len(storageDisks)) + var mu sync.Mutex + for index := range storageDisks { + index := index + g.Go(func() error { + if storageDisks[index] == nil { + // we ignore disk not found errors + return nil } - return err - } - for _, volInfo := range volsInfo { - // StorageAPI can send volume names which are - // incompatible with buckets - these are - // skipped, like the meta-bucket. - if isReservedOrInvalidBucket(volInfo.Name, false) { - continue + volsInfo, err := storageDisks[index].ListVols(ctx) + if err != nil { + return err } - // always save unique buckets across drives. - if _, ok := healBuckets[volInfo.Name]; !ok { - healBuckets[volInfo.Name] = volInfo + for _, volInfo := range volsInfo { + // StorageAPI can send volume names which are + // incompatible with buckets - these are + // skipped, like the meta-bucket. + if isReservedOrInvalidBucket(volInfo.Name, false) { + continue + } + mu.Lock() + if _, ok := healBuckets[volInfo.Name]; !ok { + healBuckets[volInfo.Name] = volInfo + } + mu.Unlock() } - - } + return nil + }, index) } - return nil + return reduceReadQuorumErrs(ctx, g.Wait(), bucketMetadataOpIgnoredErrs, len(storageDisks)/2) } // Only heal on disks where we are sure that healing is needed. We can expand diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 2cadac7c1..ee317bf72 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -665,7 +665,7 @@ func undoDeleteBucketSets(ctx context.Context, bucket string, sets []*erasureObj // that all buckets are present on all sets. func (s *erasureSets) ListBuckets(ctx context.Context) (buckets []BucketInfo, err error) { // Always lists from the same set signified by the empty string. - return s.getHashedSet("").ListBuckets(ctx) + return s.ListBucketsHeal(ctx) } // --- Object Operations --- @@ -1465,7 +1465,7 @@ func (s *erasureSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) var healBuckets = map[string]VolInfo{} for _, set := range s.sets { // lists all unique buckets across drives. - if err := listAllBuckets(set.getDisks(), healBuckets); err != nil { + if err := listAllBuckets(ctx, set.getDisks(), healBuckets); err != nil { return nil, err } } diff --git a/cmd/erasure-zones.go b/cmd/erasure-zones.go index b622bff68..0d7e89a0d 100644 --- a/cmd/erasure-zones.go +++ b/cmd/erasure-zones.go @@ -298,19 +298,19 @@ func (z *erasureZones) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter // Collect for each set in zones. for _, z := range z.zones { - for _, erObj := range z.sets { - // Add new buckets. - buckets, err := erObj.ListBuckets(ctx) - if err != nil { - return err - } - for _, b := range buckets { - if _, ok := knownBuckets[b.Name]; ok { - continue - } - allBuckets = append(allBuckets, b) - knownBuckets[b.Name] = struct{}{} + buckets, err := z.ListBuckets(ctx) + if err != nil { + return err + } + // Add new buckets. + for _, b := range buckets { + if _, ok := knownBuckets[b.Name]; ok { + continue } + allBuckets = append(allBuckets, b) + knownBuckets[b.Name] = struct{}{} + } + for _, erObj := range z.sets { wg.Add(1) results = append(results, dataUsageCache{}) go func(i int, erObj *erasureObjects) { diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 20034117e..9e45c1938 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -18,6 +18,7 @@ package cmd import ( "context" + "sync" "time" "github.com/minio/minio/cmd/logger" @@ -89,12 +90,7 @@ func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) { } // healErasureSet lists and heals all objects in a specific erasure set -func healErasureSet(ctx context.Context, setIndex int, xlObj *erasureObjects, setDriveCount int) error { - buckets, err := xlObj.ListBuckets(ctx) - if err != nil { - return err - } - +func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, disks []StorageAPI, setDriveCount int) error { // Get background heal sequence to send elements to heal var bgSeq *healSequence var ok bool @@ -125,22 +121,30 @@ func healErasureSet(ctx context.Context, setIndex int, xlObj *erasureObjects, se } var entryChs []FileInfoVersionsCh - for _, disk := range xlObj.getLoadBalancedDisks() { + var mu sync.Mutex + var wg sync.WaitGroup + for _, disk := range disks { if disk == nil { // Disk can be offline continue } - - entryCh, err := disk.WalkVersions(ctx, bucket.Name, "", "", true, ctx.Done()) - if err != nil { - // Disk walk returned error, ignore it. - continue - } - - entryChs = append(entryChs, FileInfoVersionsCh{ - Ch: entryCh, - }) + disk := disk + wg.Add(1) + go func() { + defer wg.Done() + entryCh, err := disk.WalkVersions(ctx, bucket.Name, "", "", true, ctx.Done()) + if err != nil { + // Disk walk returned error, ignore it. + return + } + mu.Lock() + entryChs = append(entryChs, FileInfoVersionsCh{ + Ch: entryCh, + }) + mu.Unlock() + }() } + wg.Wait() entriesValid := make([]bool, len(entryChs)) entries := make([]FileInfoVersions, len(entryChs)) diff --git a/cmd/server-main.go b/cmd/server-main.go index f944f0778..8be5bafe8 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -305,31 +305,13 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) { // are modifying this code that you do so, if and when // you want to add extra context to your error. This // ensures top level retry works accordingly. + // List buckets to heal, and be re-used for loading configs. var buckets []BucketInfo - if globalIsDistErasure || globalIsErasure { - // List buckets to heal, and be re-used for loading configs. + if globalIsErasure { buckets, err = newObject.ListBucketsHeal(ctx) if err != nil { return fmt.Errorf("Unable to list buckets to heal: %w", err) } - // Attempt a heal if possible and re-use the bucket names - // to reload their config. - wquorum := &InsufficientWriteQuorum{} - rquorum := &InsufficientReadQuorum{} - for _, bucket := range buckets { - if err = newObject.MakeBucketWithLocation(ctx, bucket.Name, BucketOptions{}); err != nil { - if errors.As(err, &wquorum) || errors.As(err, &rquorum) { - // Return the error upwards for the caller to retry. - return fmt.Errorf("Unable to heal bucket: %w", err) - } - if _, ok := err.(BucketExists); !ok { - // ignore any other error and log for investigation. - logger.LogIf(ctx, err) - continue - } - // Bucket already exists, nothing that needs to be done. - } - } } else { buckets, err = newObject.ListBuckets(ctx) if err != nil { @@ -349,7 +331,7 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) { } // Initialize bucket metadata sub-system. - if err := globalBucketMetadataSys.Init(ctx, buckets, newObject); err != nil { + if err = globalBucketMetadataSys.Init(ctx, buckets, newObject); err != nil { return fmt.Errorf("Unable to initialize bucket metadata sub-system: %w", err) }