From 6484453fc6eb0ca37657198dfe46df11a76d1d4f Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 9 Oct 2020 15:40:46 -0700 Subject: [PATCH] optionally allow strict quorum listing (#10649) ``` export MINIO_API_LIST_STRICT_QUORUM=on ``` would enable listing in quorum if necessary --- cmd/erasure-common.go | 12 ++++++++++-- cmd/erasure-multipart.go | 2 +- cmd/erasure-sets.go | 11 ++++++++++- cmd/erasure-zones.go | 42 +++++++++++++++++++++++++++++----------- cmd/erasure.go | 2 +- 5 files changed, 53 insertions(+), 16 deletions(-) diff --git a/cmd/erasure-common.go b/cmd/erasure-common.go index df1b38d7b..5f193abb3 100644 --- a/cmd/erasure-common.go +++ b/cmd/erasure-common.go @@ -41,7 +41,7 @@ func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) { // with N disks online. If ndisks is zero or negative, then it will returns all disks, // same if ndisks is greater than the number of all disks. func (er erasureObjects) getLoadBalancedNDisks(ndisks int) (newDisks []StorageAPI) { - disks := er.getLoadBalancedDisks() + disks := er.getLoadBalancedDisks(ndisks != -1) for _, disk := range disks { newDisks = append(newDisks, disk) ndisks-- @@ -54,9 +54,17 @@ func (er erasureObjects) getLoadBalancedNDisks(ndisks int) (newDisks []StorageAP // getLoadBalancedDisks - fetches load balanced (sufficiently randomized) disk slice. // ensures to skip disks if they are not healing and online. -func (er erasureObjects) getLoadBalancedDisks() []StorageAPI { +func (er erasureObjects) getLoadBalancedDisks(optimized bool) []StorageAPI { disks := er.getDisks() + if !optimized { + var newDisks []StorageAPI + for _, i := range hashOrder(UTCNow().String(), len(disks)) { + newDisks = append(newDisks, disks[i-1]) + } + return newDisks + } + var wg sync.WaitGroup var mu sync.Mutex var newDisks = map[uint64][]StorageAPI{} diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 81ec7685e..ee6b975fe 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -149,7 +149,7 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec var uploadIDs []string var disk StorageAPI - for _, disk = range er.getLoadBalancedDisks() { + for _, disk = range er.getLoadBalancedDisks(true) { uploadIDs, err = disk.ListDir(ctx, minioMetaMultipartBucket, er.getMultipartSHADir(bucket, object), -1) if err != nil { if err == errDiskNotFound { diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index d49654844..800287c9c 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -30,10 +30,12 @@ import ( "github.com/dchest/siphash" "github.com/google/uuid" "github.com/minio/minio-go/v7/pkg/tags" + "github.com/minio/minio/cmd/config" "github.com/minio/minio/cmd/config/storageclass" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/bpool" "github.com/minio/minio/pkg/dsync" + "github.com/minio/minio/pkg/env" "github.com/minio/minio/pkg/madmin" "github.com/minio/minio/pkg/sync/errgroup" ) @@ -318,6 +320,13 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto setDriveCount := len(format.Erasure.Sets[0]) endpointStrings := make([]string, len(endpoints)) + + listTolerancePerSet := 3 + // By default this is off + if env.Get("MINIO_API_LIST_STRICT_QUORUM", config.EnableOff) == config.EnableOn { + listTolerancePerSet = -1 + } + // Initialize the erasure sets instance. s := &erasureSets{ sets: make([]*erasureObjects, setCount), @@ -328,7 +337,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto endpointStrings: endpointStrings, setCount: setCount, setDriveCount: setDriveCount, - listTolerancePerSet: 3, // Expect 3 good entries across disks. + listTolerancePerSet: listTolerancePerSet, format: format, disksConnectEvent: make(chan diskConnectInfo), distributionAlgo: format.Erasure.DistributionAlgo, diff --git a/cmd/erasure-zones.go b/cmd/erasure-zones.go index d361a336d..f3d80d4c0 100644 --- a/cmd/erasure-zones.go +++ b/cmd/erasure-zones.go @@ -684,7 +684,11 @@ func (z *erasureZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, for _, zone := range z.zones { zonesEntryChs = append(zonesEntryChs, zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, zone.listTolerancePerSet, false)) - zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet-2) + if zone.listTolerancePerSet == -1 { + zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.setDriveCount/2) + } else { + zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet-2) + } } var objInfos []ObjectInfo @@ -710,7 +714,7 @@ func (z *erasureZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, } if quorumCount < zonesListTolerancePerSet[zoneIndex] { - // Skip entries which are not found on upto ndisks/2. + // Skip entries which are not found on upto expected tolerance continue } @@ -806,7 +810,11 @@ func (z *erasureZones) listObjectsSplunk(ctx context.Context, bucket, prefix, ma } zonesEntryChs = append(zonesEntryChs, entryChs) zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh) - zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet-2) + if zone.listTolerancePerSet == -1 { + zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.setDriveCount/2) + } else { + zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet-2) + } } entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, zonesListTolerancePerSet) @@ -898,7 +906,11 @@ func (z *erasureZones) listObjects(ctx context.Context, bucket, prefix, marker, } zonesEntryChs = append(zonesEntryChs, entryChs) zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh) - zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet-2) + if zone.listTolerancePerSet == -1 { + zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.setDriveCount/2) + } else { + zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet-2) + } } entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, zonesListTolerancePerSet) @@ -1300,7 +1312,11 @@ func (z *erasureZones) listObjectVersions(ctx context.Context, bucket, prefix, m } zonesEntryChs = append(zonesEntryChs, entryChs) zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh) - zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet-2) + if zone.listTolerancePerSet == -1 { + zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.setDriveCount/2) + } else { + zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet-2) + } } entries := mergeZonesEntriesVersionsCh(zonesEntryChs, maxKeys, zonesListTolerancePerSet) @@ -1772,9 +1788,13 @@ func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results return err } - var zoneDrivesPerSet []int + zonesListTolerancePerSet := make([]int, 0, len(z.zones)) for _, zone := range z.zones { - zoneDrivesPerSet = append(zoneDrivesPerSet, zone.listTolerancePerSet-2) + if zone.listTolerancePerSet == -1 { + zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.setDriveCount/2) + } else { + zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet-2) + } } if opts.WalkVersions { @@ -1794,13 +1814,13 @@ func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results defer close(results) for { - entry, quorumCount, _, ok := lexicallySortedEntryZoneVersions(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid) + entry, quorumCount, zoneIdx, ok := lexicallySortedEntryZoneVersions(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid) if !ok { // We have reached EOF across all entryChs, break the loop. return } - if quorumCount > 0 { + if quorumCount >= zonesListTolerancePerSet[zoneIdx] { for _, version := range entry.Versions { results <- version.ToObjectInfo(bucket, version.Name) } @@ -1827,13 +1847,13 @@ func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results defer close(results) for { - entry, quorumCount, _, ok := lexicallySortedEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid) + entry, quorumCount, zoneIdx, ok := lexicallySortedEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid) if !ok { // We have reached EOF across all entryChs, break the loop. return } - if quorumCount > 0 { + if quorumCount >= zonesListTolerancePerSet[zoneIdx] { results <- entry.ToObjectInfo(bucket, entry.Name) } } diff --git a/cmd/erasure.go b/cmd/erasure.go index 49207ea66..b453adad1 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -252,7 +252,7 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc } // Collect disks we can use. - disks := er.getLoadBalancedDisks() + disks := er.getLoadBalancedDisks(true) if len(disks) == 0 { logger.Info(color.Green("data-crawl:") + " all disks are offline or being healed, skipping crawl") return nil