optionally allow strict quorum listing (#10649)

```
export MINIO_API_LIST_STRICT_QUORUM=on
```

would enable listing in quorum if necessary
master
Harshavardhana 4 years ago committed by GitHub
parent a0d0645128
commit 6484453fc6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      cmd/erasure-common.go
  2. 2
      cmd/erasure-multipart.go
  3. 11
      cmd/erasure-sets.go
  4. 34
      cmd/erasure-zones.go
  5. 2
      cmd/erasure.go

@ -41,7 +41,7 @@ func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) {
// with N disks online. If ndisks is zero or negative, then it will returns all disks,
// same if ndisks is greater than the number of all disks.
func (er erasureObjects) getLoadBalancedNDisks(ndisks int) (newDisks []StorageAPI) {
disks := er.getLoadBalancedDisks()
disks := er.getLoadBalancedDisks(ndisks != -1)
for _, disk := range disks {
newDisks = append(newDisks, disk)
ndisks--
@ -54,9 +54,17 @@ func (er erasureObjects) getLoadBalancedNDisks(ndisks int) (newDisks []StorageAP
// getLoadBalancedDisks - fetches load balanced (sufficiently randomized) disk slice.
// ensures to skip disks if they are not healing and online.
func (er erasureObjects) getLoadBalancedDisks() []StorageAPI {
func (er erasureObjects) getLoadBalancedDisks(optimized bool) []StorageAPI {
disks := er.getDisks()
if !optimized {
var newDisks []StorageAPI
for _, i := range hashOrder(UTCNow().String(), len(disks)) {
newDisks = append(newDisks, disks[i-1])
}
return newDisks
}
var wg sync.WaitGroup
var mu sync.Mutex
var newDisks = map[uint64][]StorageAPI{}

@ -149,7 +149,7 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec
var uploadIDs []string
var disk StorageAPI
for _, disk = range er.getLoadBalancedDisks() {
for _, disk = range er.getLoadBalancedDisks(true) {
uploadIDs, err = disk.ListDir(ctx, minioMetaMultipartBucket, er.getMultipartSHADir(bucket, object), -1)
if err != nil {
if err == errDiskNotFound {

@ -30,10 +30,12 @@ import (
"github.com/dchest/siphash"
"github.com/google/uuid"
"github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/cmd/config"
"github.com/minio/minio/cmd/config/storageclass"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/bpool"
"github.com/minio/minio/pkg/dsync"
"github.com/minio/minio/pkg/env"
"github.com/minio/minio/pkg/madmin"
"github.com/minio/minio/pkg/sync/errgroup"
)
@ -318,6 +320,13 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
setDriveCount := len(format.Erasure.Sets[0])
endpointStrings := make([]string, len(endpoints))
listTolerancePerSet := 3
// By default this is off
if env.Get("MINIO_API_LIST_STRICT_QUORUM", config.EnableOff) == config.EnableOn {
listTolerancePerSet = -1
}
// Initialize the erasure sets instance.
s := &erasureSets{
sets: make([]*erasureObjects, setCount),
@ -328,7 +337,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
endpointStrings: endpointStrings,
setCount: setCount,
setDriveCount: setDriveCount,
listTolerancePerSet: 3, // Expect 3 good entries across disks.
listTolerancePerSet: listTolerancePerSet,
format: format,
disksConnectEvent: make(chan diskConnectInfo),
distributionAlgo: format.Erasure.DistributionAlgo,

@ -684,8 +684,12 @@ func (z *erasureZones) listObjectsNonSlash(ctx context.Context, bucket, prefix,
for _, zone := range z.zones {
zonesEntryChs = append(zonesEntryChs,
zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, zone.listTolerancePerSet, false))
if zone.listTolerancePerSet == -1 {
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.setDriveCount/2)
} else {
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet-2)
}
}
var objInfos []ObjectInfo
var eof bool
@ -710,7 +714,7 @@ func (z *erasureZones) listObjectsNonSlash(ctx context.Context, bucket, prefix,
}
if quorumCount < zonesListTolerancePerSet[zoneIndex] {
// Skip entries which are not found on upto ndisks/2.
// Skip entries which are not found on upto expected tolerance
continue
}
@ -806,8 +810,12 @@ func (z *erasureZones) listObjectsSplunk(ctx context.Context, bucket, prefix, ma
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
if zone.listTolerancePerSet == -1 {
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.setDriveCount/2)
} else {
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet-2)
}
}
entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, zonesListTolerancePerSet)
if len(entries.Files) == 0 {
@ -898,8 +906,12 @@ func (z *erasureZones) listObjects(ctx context.Context, bucket, prefix, marker,
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
if zone.listTolerancePerSet == -1 {
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.setDriveCount/2)
} else {
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet-2)
}
}
entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, zonesListTolerancePerSet)
if len(entries.Files) == 0 {
@ -1300,8 +1312,12 @@ func (z *erasureZones) listObjectVersions(ctx context.Context, bucket, prefix, m
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
if zone.listTolerancePerSet == -1 {
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.setDriveCount/2)
} else {
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet-2)
}
}
entries := mergeZonesEntriesVersionsCh(zonesEntryChs, maxKeys, zonesListTolerancePerSet)
if len(entries.FilesVersions) == 0 {
@ -1772,9 +1788,13 @@ func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results
return err
}
var zoneDrivesPerSet []int
zonesListTolerancePerSet := make([]int, 0, len(z.zones))
for _, zone := range z.zones {
zoneDrivesPerSet = append(zoneDrivesPerSet, zone.listTolerancePerSet-2)
if zone.listTolerancePerSet == -1 {
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.setDriveCount/2)
} else {
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet-2)
}
}
if opts.WalkVersions {
@ -1794,13 +1814,13 @@ func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results
defer close(results)
for {
entry, quorumCount, _, ok := lexicallySortedEntryZoneVersions(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
entry, quorumCount, zoneIdx, ok := lexicallySortedEntryZoneVersions(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
if !ok {
// We have reached EOF across all entryChs, break the loop.
return
}
if quorumCount > 0 {
if quorumCount >= zonesListTolerancePerSet[zoneIdx] {
for _, version := range entry.Versions {
results <- version.ToObjectInfo(bucket, version.Name)
}
@ -1827,13 +1847,13 @@ func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results
defer close(results)
for {
entry, quorumCount, _, ok := lexicallySortedEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
entry, quorumCount, zoneIdx, ok := lexicallySortedEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
if !ok {
// We have reached EOF across all entryChs, break the loop.
return
}
if quorumCount > 0 {
if quorumCount >= zonesListTolerancePerSet[zoneIdx] {
results <- entry.ToObjectInfo(bucket, entry.Name)
}
}

@ -252,7 +252,7 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc
}
// Collect disks we can use.
disks := er.getLoadBalancedDisks()
disks := er.getLoadBalancedDisks(true)
if len(disks) == 0 {
logger.Info(color.Green("data-crawl:") + " all disks are offline or being healed, skipping crawl")
return nil

Loading…
Cancel
Save