tolerate listing with only readQuorum disks (#10357)

We can reduce this further in the future, but this is a good
value to keep around. With the advent of continuous healing,
we can be assured that namespace will eventually be
consistent so we are okay to avoid the necessity to
a list across all drives on all sets.

Bonus Pop()'s in parallel seem to have the potential to
wait too on large drive setups and cause more slowness
instead of gaining any performance remove it for now.

Also, implement load balanced reply for local disks,
ensuring that local disks have an affinity for

- cleanupStaleMultipartUploads()
master
Harshavardhana 4 years ago committed by GitHub
parent 0a2e6d58a5
commit a359e36e35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      cmd/background-newdisks-heal-ops.go
  2. 18
      cmd/config/storageclass/storage-class.go
  3. 10
      cmd/config/storageclass/storage-class_test.go
  4. 2
      cmd/endpoint.go
  5. 3
      cmd/erasure-bucket.go
  6. 11
      cmd/erasure-common.go
  7. 3
      cmd/erasure-multipart.go
  8. 128
      cmd/erasure-sets.go
  9. 97
      cmd/erasure-zones.go
  10. 42
      cmd/format-erasure.go
  11. 26
      cmd/format-erasure_test.go
  12. 4
      cmd/global-heal.go
  13. 14
      cmd/prepare-storage.go
  14. 6
      cmd/xl-storage.go

@ -183,7 +183,7 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, drivesToHeal
// Heal all erasure sets that need
for i, erasureSetToHeal := range erasureSetInZoneToHeal {
for _, setIndex := range erasureSetToHeal {
err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex], z.zones[i].drivesPerSet)
err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex], z.zones[i].setDriveCount)
if err != nil {
logger.LogIf(ctx, err)
}

@ -156,7 +156,7 @@ func parseStorageClass(storageClassEnv string) (sc StorageClass, err error) {
}
// Validates the parity disks.
func validateParity(ssParity, rrsParity, drivesPerSet int) (err error) {
func validateParity(ssParity, rrsParity, setDriveCount int) (err error) {
if ssParity == 0 && rrsParity == 0 {
return nil
}
@ -174,12 +174,12 @@ func validateParity(ssParity, rrsParity, drivesPerSet int) (err error) {
return fmt.Errorf("Reduced redundancy storage class parity %d should be greater than or equal to %d", rrsParity, minParityDisks)
}
if ssParity > drivesPerSet/2 {
return fmt.Errorf("Standard storage class parity %d should be less than or equal to %d", ssParity, drivesPerSet/2)
if ssParity > setDriveCount/2 {
return fmt.Errorf("Standard storage class parity %d should be less than or equal to %d", ssParity, setDriveCount/2)
}
if rrsParity > drivesPerSet/2 {
return fmt.Errorf("Reduced redundancy storage class parity %d should be less than or equal to %d", rrsParity, drivesPerSet/2)
if rrsParity > setDriveCount/2 {
return fmt.Errorf("Reduced redundancy storage class parity %d should be less than or equal to %d", rrsParity, setDriveCount/2)
}
if ssParity > 0 && rrsParity > 0 {
@ -220,9 +220,9 @@ func Enabled(kvs config.KVS) bool {
}
// LookupConfig - lookup storage class config and override with valid environment settings if any.
func LookupConfig(kvs config.KVS, drivesPerSet int) (cfg Config, err error) {
func LookupConfig(kvs config.KVS, setDriveCount int) (cfg Config, err error) {
cfg = Config{}
cfg.Standard.Parity = drivesPerSet / 2
cfg.Standard.Parity = setDriveCount / 2
cfg.RRS.Parity = defaultRRSParity
if err = config.CheckValidKeys(config.StorageClassSubSys, kvs, DefaultKVS); err != nil {
@ -239,7 +239,7 @@ func LookupConfig(kvs config.KVS, drivesPerSet int) (cfg Config, err error) {
}
}
if cfg.Standard.Parity == 0 {
cfg.Standard.Parity = drivesPerSet / 2
cfg.Standard.Parity = setDriveCount / 2
}
if rrsc != "" {
@ -254,7 +254,7 @@ func LookupConfig(kvs config.KVS, drivesPerSet int) (cfg Config, err error) {
// Validation is done after parsing both the storage classes. This is needed because we need one
// storage class value to deduce the correct value of the other storage class.
if err = validateParity(cfg.Standard.Parity, cfg.RRS.Parity, drivesPerSet); err != nil {
if err = validateParity(cfg.Standard.Parity, cfg.RRS.Parity, setDriveCount); err != nil {
return Config{}, err
}

@ -69,10 +69,10 @@ func TestParseStorageClass(t *testing.T) {
func TestValidateParity(t *testing.T) {
tests := []struct {
rrsParity int
ssParity int
success bool
drivesPerSet int
rrsParity int
ssParity int
success bool
setDriveCount int
}{
{2, 4, true, 16},
{3, 3, true, 16},
@ -85,7 +85,7 @@ func TestValidateParity(t *testing.T) {
{9, 2, false, 16},
}
for i, tt := range tests {
err := validateParity(tt.ssParity, tt.rrsParity, tt.drivesPerSet)
err := validateParity(tt.ssParity, tt.rrsParity, tt.setDriveCount)
if err != nil && tt.success {
t.Errorf("Test %d, Expected success, got %s", i+1, err)
}

@ -193,7 +193,7 @@ func NewEndpoint(arg string) (ep Endpoint, e error) {
}
// ZoneEndpoints represent endpoints in a given zone
// along with its setCount and drivesPerSet.
// along with its setCount and setDriveCount.
type ZoneEndpoints struct {
SetCount int
DrivesPerSet int

@ -133,8 +133,7 @@ func (er erasureObjects) listBuckets(ctx context.Context) (bucketsInfo []BucketI
if err == nil {
// NOTE: The assumption here is that volumes across all disks in
// readQuorum have consistent view i.e they all have same number
// of buckets. This is essentially not verified since healing
// should take care of this.
// of buckets.
var bucketsInfo []BucketInfo
for _, volInfo := range volsInfo {
if isReservedOrInvalidBucket(volInfo.Name, true) {

@ -23,6 +23,17 @@ import (
"github.com/minio/minio/pkg/sync/errgroup"
)
func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) {
disks := er.getDisks()
// Based on the random shuffling return back randomized disks.
for _, i := range hashOrder(UTCNow().String(), len(disks)) {
if disks[i-1] != nil && disks[i-1].IsLocal() {
newDisks = append(newDisks, disks[i-1])
}
}
return newDisks
}
// getLoadBalancedDisks - fetches load balanced (sufficiently randomized) disk slice.
func (er erasureObjects) getLoadBalancedDisks() (newDisks []StorageAPI) {
disks := er.getDisks()

@ -81,7 +81,8 @@ func (er erasureObjects) cleanupStaleMultipartUploads(ctx context.Context, clean
return
case <-ticker.C:
var disk StorageAPI
for _, d := range er.getLoadBalancedDisks() {
// run multiple cleanup's local to this server.
for _, d := range er.getLoadBalancedLocalDisks() {
if d != nil {
disk = d
break

@ -75,7 +75,8 @@ type erasureSets struct {
endpointStrings []string
// Total number of sets and the number of disks per set.
setCount, drivesPerSet int
setCount, setDriveCount int
listTolerancePerSet int
disksConnectEvent chan diskConnectInfo
@ -112,7 +113,7 @@ func (s *erasureSets) getDiskMap() map[string]StorageAPI {
defer s.erasureDisksMu.RUnlock()
for i := 0; i < s.setCount; i++ {
for j := 0; j < s.drivesPerSet; j++ {
for j := 0; j < s.setDriveCount; j++ {
disk := s.erasureDisks[i][j]
if disk == nil {
continue
@ -228,7 +229,7 @@ func (s *erasureSets) connectDisks() {
s.erasureDisks[setIndex][diskIndex].Close()
}
s.erasureDisks[setIndex][diskIndex] = disk
s.endpointStrings[setIndex*s.drivesPerSet+diskIndex] = disk.String()
s.endpointStrings[setIndex*s.setDriveCount+diskIndex] = disk.String()
s.erasureDisksMu.Unlock()
go func(setIndex int) {
// Send a new disk connect event with a timeout
@ -260,7 +261,7 @@ func (s *erasureSets) monitorAndConnectEndpoints(ctx context.Context, monitorInt
func (s *erasureSets) GetLockers(setIndex int) func() []dsync.NetLocker {
return func() []dsync.NetLocker {
lockers := make([]dsync.NetLocker, s.drivesPerSet)
lockers := make([]dsync.NetLocker, s.setDriveCount)
copy(lockers, s.erasureLockers[setIndex])
return lockers
}
@ -271,9 +272,9 @@ func (s *erasureSets) GetEndpoints(setIndex int) func() []string {
s.erasureDisksMu.RLock()
defer s.erasureDisksMu.RUnlock()
eps := make([]string, s.drivesPerSet)
for i := 0; i < s.drivesPerSet; i++ {
eps[i] = s.endpointStrings[setIndex*s.drivesPerSet+i]
eps := make([]string, s.setDriveCount)
for i := 0; i < s.setDriveCount; i++ {
eps[i] = s.endpointStrings[setIndex*s.setDriveCount+i]
}
return eps
}
@ -284,7 +285,7 @@ func (s *erasureSets) GetDisks(setIndex int) func() []StorageAPI {
return func() []StorageAPI {
s.erasureDisksMu.RLock()
defer s.erasureDisksMu.RUnlock()
disks := make([]StorageAPI, s.drivesPerSet)
disks := make([]StorageAPI, s.setDriveCount)
copy(disks, s.erasureDisks[setIndex])
return disks
}
@ -295,46 +296,47 @@ const defaultMonitorConnectEndpointInterval = time.Second * 10 // Set to 10 secs
// Initialize new set of erasure coded sets.
func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageAPI, format *formatErasureV3) (*erasureSets, error) {
setCount := len(format.Erasure.Sets)
drivesPerSet := len(format.Erasure.Sets[0])
setDriveCount := len(format.Erasure.Sets[0])
endpointStrings := make([]string, len(endpoints))
// Initialize the erasure sets instance.
s := &erasureSets{
sets: make([]*erasureObjects, setCount),
erasureDisks: make([][]StorageAPI, setCount),
erasureLockers: make([][]dsync.NetLocker, setCount),
endpoints: endpoints,
endpointStrings: endpointStrings,
setCount: setCount,
drivesPerSet: drivesPerSet,
format: format,
disksConnectEvent: make(chan diskConnectInfo),
disksConnectDoneCh: make(chan struct{}),
distributionAlgo: format.Erasure.DistributionAlgo,
deploymentID: uuid.MustParse(format.ID),
pool: NewMergeWalkPool(globalMergeLookupTimeout),
poolSplunk: NewMergeWalkPool(globalMergeLookupTimeout),
poolVersions: NewMergeWalkVersionsPool(globalMergeLookupTimeout),
mrfOperations: make(map[healSource]int),
sets: make([]*erasureObjects, setCount),
erasureDisks: make([][]StorageAPI, setCount),
erasureLockers: make([][]dsync.NetLocker, setCount),
endpoints: endpoints,
endpointStrings: endpointStrings,
setCount: setCount,
setDriveCount: setDriveCount,
listTolerancePerSet: setDriveCount / 2,
format: format,
disksConnectEvent: make(chan diskConnectInfo),
disksConnectDoneCh: make(chan struct{}),
distributionAlgo: format.Erasure.DistributionAlgo,
deploymentID: uuid.MustParse(format.ID),
pool: NewMergeWalkPool(globalMergeLookupTimeout),
poolSplunk: NewMergeWalkPool(globalMergeLookupTimeout),
poolVersions: NewMergeWalkVersionsPool(globalMergeLookupTimeout),
mrfOperations: make(map[healSource]int),
}
mutex := newNSLock(globalIsDistErasure)
// Initialize byte pool once for all sets, bpool size is set to
// setCount * drivesPerSet with each memory upto blockSizeV1.
bp := bpool.NewBytePoolCap(setCount*drivesPerSet, blockSizeV1, blockSizeV1*2)
// setCount * setDriveCount with each memory upto blockSizeV1.
bp := bpool.NewBytePoolCap(setCount*setDriveCount, blockSizeV1, blockSizeV1*2)
for i := 0; i < setCount; i++ {
s.erasureDisks[i] = make([]StorageAPI, drivesPerSet)
s.erasureLockers[i] = make([]dsync.NetLocker, drivesPerSet)
s.erasureDisks[i] = make([]StorageAPI, setDriveCount)
s.erasureLockers[i] = make([]dsync.NetLocker, setDriveCount)
}
for i := 0; i < setCount; i++ {
for j := 0; j < drivesPerSet; j++ {
endpoint := endpoints[i*drivesPerSet+j]
for j := 0; j < setDriveCount; j++ {
endpoint := endpoints[i*setDriveCount+j]
// Rely on endpoints list to initialize, init lockers and available disks.
s.erasureLockers[i][j] = newLockAPI(endpoint)
disk := storageDisks[i*drivesPerSet+j]
disk := storageDisks[i*setDriveCount+j]
if disk == nil {
continue
}
@ -348,7 +350,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
disk.Close()
continue
}
s.endpointStrings[m*drivesPerSet+n] = disk.String()
s.endpointStrings[m*setDriveCount+n] = disk.String()
s.erasureDisks[m][n] = disk
}
@ -384,7 +386,7 @@ func (s *erasureSets) NewNSLock(ctx context.Context, bucket string, objects ...s
// SetDriveCount returns the current drives per set.
func (s *erasureSets) SetDriveCount() int {
return s.drivesPerSet
return s.setDriveCount
}
// StorageUsageInfo - combines output of StorageInfo across all erasure coded object sets.
@ -458,13 +460,13 @@ func (s *erasureSets) StorageInfo(ctx context.Context, local bool) (StorageInfo,
scParity := globalStorageClass.GetParityForSC(storageclass.STANDARD)
if scParity == 0 {
scParity = s.drivesPerSet / 2
scParity = s.setDriveCount / 2
}
storageInfo.Backend.StandardSCData = s.drivesPerSet - scParity
storageInfo.Backend.StandardSCData = s.setDriveCount - scParity
storageInfo.Backend.StandardSCParity = scParity
rrSCParity := globalStorageClass.GetParityForSC(storageclass.RRS)
storageInfo.Backend.RRSCData = s.drivesPerSet - rrSCParity
storageInfo.Backend.RRSCData = s.setDriveCount - rrSCParity
storageInfo.Backend.RRSCParity = rrSCParity
if local {
@ -838,17 +840,9 @@ func (f *FileInfoCh) Push(fi FileInfo) {
// if the caller wishes to list N entries to call lexicallySortedEntry
// N times until this boolean is 'false'.
func lexicallySortedEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool) (FileInfo, int, bool) {
var wg sync.WaitGroup
for j := range entryChs {
j := j
wg.Add(1)
// Pop() entries in parallel for large drive setups.
go func() {
defer wg.Done()
entries[j], entriesValid[j] = entryChs[j].Pop()
}()
entries[j], entriesValid[j] = entryChs[j].Pop()
}
wg.Wait()
var isTruncated = false
for _, valid := range entriesValid {
@ -910,17 +904,9 @@ func lexicallySortedEntry(entryChs []FileInfoCh, entries []FileInfo, entriesVali
// if the caller wishes to list N entries to call lexicallySortedEntry
// N times until this boolean is 'false'.
func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileInfoVersions, entriesValid []bool) (FileInfoVersions, int, bool) {
var wg sync.WaitGroup
for j := range entryChs {
j := j
wg.Add(1)
// Pop() entries in parallel for large drive setups.
go func() {
defer wg.Done()
entries[j], entriesValid[j] = entryChs[j].Pop()
}()
entries[j], entriesValid[j] = entryChs[j].Pop()
}
wg.Wait()
var isTruncated = false
for _, valid := range entriesValid {
@ -1232,7 +1218,7 @@ func (s *erasureSets) ReloadFormat(ctx context.Context, dryRun bool) (err error)
}(storageDisks)
formats, _ := loadFormatErasureAll(storageDisks, false)
if err = checkFormatErasureValues(formats, s.drivesPerSet); err != nil {
if err = checkFormatErasureValues(formats, s.setDriveCount); err != nil {
return err
}
@ -1272,7 +1258,7 @@ func (s *erasureSets) ReloadFormat(ctx context.Context, dryRun bool) (err error)
s.erasureDisks[m][n].Close()
}
s.endpointStrings[m*s.drivesPerSet+n] = disk.String()
s.endpointStrings[m*s.setDriveCount+n] = disk.String()
s.erasureDisks[m][n] = disk
}
s.erasureDisksMu.Unlock()
@ -1354,7 +1340,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
}(storageDisks)
formats, sErrs := loadFormatErasureAll(storageDisks, true)
if err = checkFormatErasureValues(formats, s.drivesPerSet); err != nil {
if err = checkFormatErasureValues(formats, s.setDriveCount); err != nil {
return madmin.HealResultItem{}, err
}
@ -1365,7 +1351,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
res = madmin.HealResultItem{
Type: madmin.HealItemMetadata,
Detail: "disk-format",
DiskCount: s.setCount * s.drivesPerSet,
DiskCount: s.setCount * s.setDriveCount,
SetCount: s.setCount,
}
@ -1396,7 +1382,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
markUUIDsOffline(refFormat, formats)
// Initialize a new set of set formats which will be written to disk.
newFormatSets := newHealFormatSets(refFormat, s.setCount, s.drivesPerSet, formats, sErrs)
newFormatSets := newHealFormatSets(refFormat, s.setCount, s.setDriveCount, formats, sErrs)
// Look for all offline/unformatted disks in our reference format,
// such that we can fill them up with new UUIDs, this looping also
@ -1413,7 +1399,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
newFormatSets[i][l].Erasure.This = mustGetUUID()
refFormat.Erasure.Sets[i][j] = newFormatSets[i][l].Erasure.This
for m, v := range res.After.Drives {
if v.Endpoint == s.endpoints.GetString(i*s.drivesPerSet+l) {
if v.Endpoint == s.endpoints.GetString(i*s.setDriveCount+l) {
res.After.Drives[m].UUID = newFormatSets[i][l].Erasure.This
res.After.Drives[m].State = madmin.DriveStateOk
}
@ -1426,14 +1412,14 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
}
if !dryRun {
var tmpNewFormats = make([]*formatErasureV3, s.setCount*s.drivesPerSet)
var tmpNewFormats = make([]*formatErasureV3, s.setCount*s.setDriveCount)
for i := range newFormatSets {
for j := range newFormatSets[i] {
if newFormatSets[i][j] == nil {
continue
}
tmpNewFormats[i*s.drivesPerSet+j] = newFormatSets[i][j]
tmpNewFormats[i*s.drivesPerSet+j].Erasure.Sets = refFormat.Erasure.Sets
tmpNewFormats[i*s.setDriveCount+j] = newFormatSets[i][j]
tmpNewFormats[i*s.setDriveCount+j].Erasure.Sets = refFormat.Erasure.Sets
}
}
@ -1478,7 +1464,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
s.erasureDisks[m][n].Close()
}
s.endpointStrings[m*s.drivesPerSet+n] = disk.String()
s.endpointStrings[m*s.setDriveCount+n] = disk.String()
s.erasureDisks[m][n] = disk
}
s.erasureDisksMu.Unlock()
@ -1496,7 +1482,7 @@ func (s *erasureSets) HealBucket(ctx context.Context, bucket string, dryRun, rem
result = madmin.HealResultItem{
Type: madmin.HealItemBucket,
Bucket: bucket,
DiskCount: s.setCount * s.drivesPerSet,
DiskCount: s.setCount * s.setDriveCount,
SetCount: s.setCount,
}
@ -1512,7 +1498,7 @@ func (s *erasureSets) HealBucket(ctx context.Context, bucket string, dryRun, rem
// Check if we had quorum to write, if not return an appropriate error.
_, afterDriveOnline := result.GetOnlineCounts()
if afterDriveOnline < ((s.setCount*s.drivesPerSet)/2)+1 {
if afterDriveOnline < ((s.setCount*s.setDriveCount)/2)+1 {
return result, toObjectErr(errErasureWriteQuorum, bucket)
}
@ -1568,7 +1554,7 @@ func (s *erasureSets) Walk(ctx context.Context, bucket, prefix string, results c
return
}
if quorumCount >= s.drivesPerSet/2 {
if quorumCount >= s.setDriveCount/2 {
// Read quorum exists proceed
for _, version := range entry.Versions {
results <- version.ToObjectInfo(bucket, version.Name)
@ -1595,7 +1581,7 @@ func (s *erasureSets) Walk(ctx context.Context, bucket, prefix string, results c
return
}
if quorumCount >= s.drivesPerSet/2 {
if quorumCount >= s.setDriveCount/2 {
// Read quorum exists proceed
results <- entry.ToObjectInfo(bucket, entry.Name)
}
@ -1622,14 +1608,14 @@ func (s *erasureSets) HealObjects(ctx context.Context, bucket, prefix string, op
break
}
if quorumCount == s.drivesPerSet && opts.ScanMode == madmin.HealNormalScan {
if quorumCount == s.setDriveCount && opts.ScanMode == madmin.HealNormalScan {
// Skip good entries.
continue
}
for _, version := range entry.Versions {
// Wait and proceed if there are active requests
waitForLowHTTPReq(int32(s.drivesPerSet), time.Second)
waitForLowHTTPReq(int32(s.setDriveCount), time.Second)
if err := healObject(bucket, version.Name, version.VersionID); err != nil {
return toObjectErr(err, bucket, version.Name)

@ -690,15 +690,15 @@ func (z *erasureZones) ListObjectsV2(ctx context.Context, bucket, prefix, contin
func (z *erasureZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
var zonesEntryChs [][]FileInfoCh
var zonesDrivesPerSet []int
var zonesListTolerancePerSet []int
endWalkCh := make(chan struct{})
defer close(endWalkCh)
for _, zone := range z.zones {
zonesEntryChs = append(zonesEntryChs,
zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, zone.drivesPerSet))
zonesDrivesPerSet = append(zonesDrivesPerSet, zone.drivesPerSet)
zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, zone.listTolerancePerSet))
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet)
}
var objInfos []ObjectInfo
@ -723,7 +723,7 @@ func (z *erasureZones) listObjectsNonSlash(ctx context.Context, bucket, prefix,
break
}
if quorumCount < zonesDrivesPerSet[zoneIndex]/2 {
if quorumCount < zonesListTolerancePerSet[zoneIndex] {
// Skip entries which are not found on upto ndisks/2.
continue
}
@ -810,20 +810,20 @@ func (z *erasureZones) listObjectsSplunk(ctx context.Context, bucket, prefix, ma
var zonesEntryChs [][]FileInfoCh
var zonesEndWalkCh []chan struct{}
var drivesPerSets []int
var zonesListTolerancePerSet []int
for _, zone := range z.zones {
entryChs, endWalkCh := zone.poolSplunk.Release(listParams{bucket, recursive, marker, prefix})
if entryChs == nil {
endWalkCh = make(chan struct{})
entryChs = zone.startSplunkMergeWalksN(ctx, bucket, prefix, marker, endWalkCh, zone.drivesPerSet)
entryChs = zone.startSplunkMergeWalksN(ctx, bucket, prefix, marker, endWalkCh, zone.listTolerancePerSet)
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
drivesPerSets = append(drivesPerSets, zone.drivesPerSet)
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet)
}
entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, drivesPerSets)
entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, zonesListTolerancePerSet)
if len(entries.Files) == 0 {
return loi, nil
}
@ -902,20 +902,20 @@ func (z *erasureZones) listObjects(ctx context.Context, bucket, prefix, marker,
var zonesEntryChs [][]FileInfoCh
var zonesEndWalkCh []chan struct{}
var drivesPerSets []int
var zonesListTolerancePerSet []int
for _, zone := range z.zones {
entryChs, endWalkCh := zone.pool.Release(listParams{bucket, recursive, marker, prefix})
if entryChs == nil {
endWalkCh = make(chan struct{})
entryChs = zone.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.drivesPerSet)
entryChs = zone.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.listTolerancePerSet)
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
drivesPerSets = append(drivesPerSets, zone.drivesPerSet)
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet)
}
entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, drivesPerSets)
entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, zonesListTolerancePerSet)
if len(entries.Files) == 0 {
return loi, nil
}
@ -951,18 +951,9 @@ func (z *erasureZones) listObjects(ctx context.Context, bucket, prefix, marker,
// N times until this boolean is 'false'.
func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) (FileInfo, int, int, bool) {
for i, entryChs := range zoneEntryChs {
i := i
var wg sync.WaitGroup
for j := range entryChs {
j := j
wg.Add(1)
// Pop() entries in parallel for large drive setups.
go func() {
defer wg.Done()
zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop()
}()
zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop()
}
wg.Wait()
}
var isTruncated = false
@ -1040,18 +1031,9 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI
// N times until this boolean is 'false'.
func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneEntries [][]FileInfoVersions, zoneEntriesValid [][]bool) (FileInfoVersions, int, int, bool) {
for i, entryChs := range zoneEntryChs {
i := i
var wg sync.WaitGroup
for j := range entryChs {
j := j
wg.Add(1)
// Pop() entries in parallel for large drive setups.
go func() {
defer wg.Done()
zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop()
}()
zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop()
}
wg.Wait()
}
var isTruncated = false
@ -1119,7 +1101,7 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE
}
// mergeZonesEntriesVersionsCh - merges FileInfoVersions channel to entries upto maxKeys.
func mergeZonesEntriesVersionsCh(zonesEntryChs [][]FileInfoVersionsCh, maxKeys int, drivesPerSets []int) (entries FilesInfoVersions) {
func mergeZonesEntriesVersionsCh(zonesEntryChs [][]FileInfoVersionsCh, maxKeys int, zonesListTolerancePerSet []int) (entries FilesInfoVersions) {
var i = 0
var zonesEntriesInfos [][]FileInfoVersions
var zonesEntriesValid [][]bool
@ -1134,8 +1116,8 @@ func mergeZonesEntriesVersionsCh(zonesEntryChs [][]FileInfoVersionsCh, maxKeys i
break
}
if quorumCount < drivesPerSets[zoneIndex]/2 {
// Skip entries which are not found on upto ndisks/2.
if quorumCount < zonesListTolerancePerSet[zoneIndex] {
// Skip entries which are not found upto the expected tolerance
continue
}
@ -1150,7 +1132,7 @@ func mergeZonesEntriesVersionsCh(zonesEntryChs [][]FileInfoVersionsCh, maxKeys i
}
// mergeZonesEntriesCh - merges FileInfo channel to entries upto maxKeys.
func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, drivesPerSets []int) (entries FilesInfo) {
func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, zonesListTolerancePerSet []int) (entries FilesInfo) {
var i = 0
var zonesEntriesInfos [][]FileInfo
var zonesEntriesValid [][]bool
@ -1165,8 +1147,8 @@ func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, drivesPerSet
break
}
if quorumCount < drivesPerSets[zoneIndex]/2 {
// Skip entries which are not found on upto ndisks/2.
if quorumCount < zonesListTolerancePerSet[zoneIndex] {
// Skip entries which are not found upto configured tolerance.
continue
}
@ -1182,18 +1164,9 @@ func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, drivesPerSet
func isTruncatedZones(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) bool {
for i, entryChs := range zoneEntryChs {
i := i
var wg sync.WaitGroup
for j := range entryChs {
j := j
wg.Add(1)
// Pop() entries in parallel for large drive setups.
go func() {
defer wg.Done()
zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop()
}()
zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop()
}
wg.Wait()
}
var isTruncated = false
@ -1214,24 +1187,16 @@ func isTruncatedZones(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zon
zoneEntryChs[i][j].Push(zoneEntries[i][j])
}
}
}
return isTruncated
}
func isTruncatedZonesVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneEntries [][]FileInfoVersions, zoneEntriesValid [][]bool) bool {
for i, entryChs := range zoneEntryChs {
i := i
var wg sync.WaitGroup
for j := range entryChs {
j := j
wg.Add(1)
// Pop() entries in parallel for large drive setups.
go func() {
defer wg.Done()
zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop()
}()
zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop()
}
wg.Wait()
}
var isTruncated = false
@ -1307,19 +1272,19 @@ func (z *erasureZones) listObjectVersions(ctx context.Context, bucket, prefix, m
var zonesEntryChs [][]FileInfoVersionsCh
var zonesEndWalkCh []chan struct{}
var drivesPerSets []int
var zonesListTolerancePerSet []int
for _, zone := range z.zones {
entryChs, endWalkCh := zone.poolVersions.Release(listParams{bucket, recursive, marker, prefix})
if entryChs == nil {
endWalkCh = make(chan struct{})
entryChs = zone.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.drivesPerSet)
entryChs = zone.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.listTolerancePerSet)
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
drivesPerSets = append(drivesPerSets, zone.drivesPerSet)
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet)
}
entries := mergeZonesEntriesVersionsCh(zonesEntryChs, maxKeys, drivesPerSets)
entries := mergeZonesEntriesVersionsCh(zonesEntryChs, maxKeys, zonesListTolerancePerSet)
if len(entries.FilesVersions) == 0 {
return loi, nil
}
@ -1830,7 +1795,7 @@ func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results
var zoneDrivesPerSet []int
for _, zone := range z.zones {
zoneDrivesPerSet = append(zoneDrivesPerSet, zone.drivesPerSet)
zoneDrivesPerSet = append(zoneDrivesPerSet, zone.setDriveCount)
}
var zonesEntriesInfos [][]FileInfoVersions
@ -1871,7 +1836,7 @@ func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results
var zoneDrivesPerSet []int
for _, zone := range z.zones {
zoneDrivesPerSet = append(zoneDrivesPerSet, zone.drivesPerSet)
zoneDrivesPerSet = append(zoneDrivesPerSet, zone.setDriveCount)
}
var zonesEntriesInfos [][]FileInfo
@ -1918,7 +1883,7 @@ func (z *erasureZones) HealObjects(ctx context.Context, bucket, prefix string, o
var zoneDrivesPerSet []int
for _, zone := range z.zones {
zoneDrivesPerSet = append(zoneDrivesPerSet, zone.drivesPerSet)
zoneDrivesPerSet = append(zoneDrivesPerSet, zone.setDriveCount)
}
var zonesEntriesInfos [][]FileInfoVersions
@ -2082,7 +2047,7 @@ func (z *erasureZones) Health(ctx context.Context, opts HealthOptions) HealthRes
for zoneIdx := range erasureSetUpCount {
parityDrives := globalStorageClass.GetParityForSC(storageclass.STANDARD)
diskCount := z.zones[zoneIdx].drivesPerSet
diskCount := z.zones[zoneIdx].setDriveCount
if parityDrives == 0 {
parityDrives = getDefaultParityBlocks(diskCount)
}

@ -439,7 +439,7 @@ func checkFormatErasureValue(formatErasure *formatErasureV3) error {
}
// Check all format values.
func checkFormatErasureValues(formats []*formatErasureV3, drivesPerSet int) error {
func checkFormatErasureValues(formats []*formatErasureV3, setDriveCount int) error {
for i, formatErasure := range formats {
if formatErasure == nil {
continue
@ -454,8 +454,8 @@ func checkFormatErasureValues(formats []*formatErasureV3, drivesPerSet int) erro
// Only if custom erasure drive count is set,
// we should fail here other proceed to honor what
// is present on the disk.
if globalCustomErasureDriveCount && len(formatErasure.Erasure.Sets[0]) != drivesPerSet {
return fmt.Errorf("%s disk is already formatted with %d drives per erasure set. This cannot be changed to %d, please revert your MINIO_ERASURE_SET_DRIVE_COUNT setting", humanize.Ordinal(i+1), len(formatErasure.Erasure.Sets[0]), drivesPerSet)
if globalCustomErasureDriveCount && len(formatErasure.Erasure.Sets[0]) != setDriveCount {
return fmt.Errorf("%s disk is already formatted with %d drives per erasure set. This cannot be changed to %d, please revert your MINIO_ERASURE_SET_DRIVE_COUNT setting", humanize.Ordinal(i+1), len(formatErasure.Erasure.Sets[0]), setDriveCount)
}
}
return nil
@ -788,22 +788,22 @@ func fixFormatErasureV3(storageDisks []StorageAPI, endpoints Endpoints, formats
}
// initFormatErasure - save Erasure format configuration on all disks.
func initFormatErasure(ctx context.Context, storageDisks []StorageAPI, setCount, drivesPerSet int, deploymentID string, sErrs []error) (*formatErasureV3, error) {
format := newFormatErasureV3(setCount, drivesPerSet)
func initFormatErasure(ctx context.Context, storageDisks []StorageAPI, setCount, setDriveCount int, deploymentID string, sErrs []error) (*formatErasureV3, error) {
format := newFormatErasureV3(setCount, setDriveCount)
formats := make([]*formatErasureV3, len(storageDisks))
wantAtMost := ecDrivesNoConfig(drivesPerSet)
wantAtMost := ecDrivesNoConfig(setDriveCount)
for i := 0; i < setCount; i++ {
hostCount := make(map[string]int, drivesPerSet)
for j := 0; j < drivesPerSet; j++ {
disk := storageDisks[i*drivesPerSet+j]
hostCount := make(map[string]int, setDriveCount)
for j := 0; j < setDriveCount; j++ {
disk := storageDisks[i*setDriveCount+j]
newFormat := format.Clone()
newFormat.Erasure.This = format.Erasure.Sets[i][j]
if deploymentID != "" {
newFormat.ID = deploymentID
}
hostCount[disk.Hostname()]++
formats[i*drivesPerSet+j] = newFormat
formats[i*setDriveCount+j] = newFormat
}
if len(hostCount) > 0 {
var once sync.Once
@ -817,8 +817,8 @@ func initFormatErasure(ctx context.Context, storageDisks []StorageAPI, setCount,
return
}
logger.Info(" * Set %v:", i+1)
for j := 0; j < drivesPerSet; j++ {
disk := storageDisks[i*drivesPerSet+j]
for j := 0; j < setDriveCount; j++ {
disk := storageDisks[i*setDriveCount+j]
logger.Info(" - Drive: %s", disk.String())
}
})
@ -842,15 +842,15 @@ func initFormatErasure(ctx context.Context, storageDisks []StorageAPI, setCount,
// ecDrivesNoConfig returns the erasure coded drives in a set if no config has been set.
// It will attempt to read it from env variable and fall back to drives/2.
func ecDrivesNoConfig(drivesPerSet int) int {
func ecDrivesNoConfig(setDriveCount int) int {
ecDrives := globalStorageClass.GetParityForSC(storageclass.STANDARD)
if ecDrives == 0 {
cfg, err := storageclass.LookupConfig(nil, drivesPerSet)
cfg, err := storageclass.LookupConfig(nil, setDriveCount)
if err == nil {
ecDrives = cfg.Standard.Parity
}
if ecDrives == 0 {
ecDrives = drivesPerSet / 2
ecDrives = setDriveCount / 2
}
}
return ecDrives
@ -920,14 +920,14 @@ func markUUIDsOffline(refFormat *formatErasureV3, formats []*formatErasureV3) {
}
// Initialize a new set of set formats which will be written to all disks.
func newHealFormatSets(refFormat *formatErasureV3, setCount, drivesPerSet int, formats []*formatErasureV3, errs []error) [][]*formatErasureV3 {
func newHealFormatSets(refFormat *formatErasureV3, setCount, setDriveCount int, formats []*formatErasureV3, errs []error) [][]*formatErasureV3 {
newFormats := make([][]*formatErasureV3, setCount)
for i := range refFormat.Erasure.Sets {
newFormats[i] = make([]*formatErasureV3, drivesPerSet)
newFormats[i] = make([]*formatErasureV3, setDriveCount)
}
for i := range refFormat.Erasure.Sets {
for j := range refFormat.Erasure.Sets[i] {
if errs[i*drivesPerSet+j] == errUnformattedDisk || errs[i*drivesPerSet+j] == nil {
if errs[i*setDriveCount+j] == errUnformattedDisk || errs[i*setDriveCount+j] == nil {
newFormats[i][j] = &formatErasureV3{}
newFormats[i][j].Version = refFormat.Version
newFormats[i][j].ID = refFormat.ID
@ -935,13 +935,13 @@ func newHealFormatSets(refFormat *formatErasureV3, setCount, drivesPerSet int, f
newFormats[i][j].Erasure.Version = refFormat.Erasure.Version
newFormats[i][j].Erasure.DistributionAlgo = refFormat.Erasure.DistributionAlgo
}
if errs[i*drivesPerSet+j] == errUnformattedDisk {
if errs[i*setDriveCount+j] == errUnformattedDisk {
newFormats[i][j].Erasure.This = ""
newFormats[i][j].Erasure.Sets = nil
continue
}
if errs[i*drivesPerSet+j] == nil {
newFormats[i][j].Erasure.This = formats[i*drivesPerSet+j].Erasure.This
if errs[i*setDriveCount+j] == nil {
newFormats[i][j].Erasure.This = formats[i*setDriveCount+j].Erasure.This
newFormats[i][j].Erasure.Sets = nil
}
}

@ -324,16 +324,16 @@ func TestCheckFormatErasureValue(t *testing.T) {
// Tests getFormatErasureInQuorum()
func TestGetFormatErasureInQuorumCheck(t *testing.T) {
setCount := 2
drivesPerSet := 16
setDriveCount := 16
format := newFormatErasureV3(setCount, drivesPerSet)
format := newFormatErasureV3(setCount, setDriveCount)
formats := make([]*formatErasureV3, 32)
for i := 0; i < setCount; i++ {
for j := 0; j < drivesPerSet; j++ {
for j := 0; j < setDriveCount; j++ {
newFormat := format.Clone()
newFormat.Erasure.This = format.Erasure.Sets[i][j]
formats[i*drivesPerSet+j] = newFormat
formats[i*setDriveCount+j] = newFormat
}
}
@ -390,16 +390,16 @@ func TestGetFormatErasureInQuorumCheck(t *testing.T) {
// Tests formatErasureGetDeploymentID()
func TestGetErasureID(t *testing.T) {
setCount := 2
drivesPerSet := 8
setDriveCount := 8
format := newFormatErasureV3(setCount, drivesPerSet)
format := newFormatErasureV3(setCount, setDriveCount)
formats := make([]*formatErasureV3, 16)
for i := 0; i < setCount; i++ {
for j := 0; j < drivesPerSet; j++ {
for j := 0; j < setDriveCount; j++ {
newFormat := format.Clone()
newFormat.Erasure.This = format.Erasure.Sets[i][j]
formats[i*drivesPerSet+j] = newFormat
formats[i*setDriveCount+j] = newFormat
}
}
@ -445,17 +445,17 @@ func TestGetErasureID(t *testing.T) {
// Initialize new format sets.
func TestNewFormatSets(t *testing.T) {
setCount := 2
drivesPerSet := 16
setDriveCount := 16
format := newFormatErasureV3(setCount, drivesPerSet)
format := newFormatErasureV3(setCount, setDriveCount)
formats := make([]*formatErasureV3, 32)
errs := make([]error, 32)
for i := 0; i < setCount; i++ {
for j := 0; j < drivesPerSet; j++ {
for j := 0; j < setDriveCount; j++ {
newFormat := format.Clone()
newFormat.Erasure.This = format.Erasure.Sets[i][j]
formats[i*drivesPerSet+j] = newFormat
formats[i*setDriveCount+j] = newFormat
}
}
@ -467,7 +467,7 @@ func TestNewFormatSets(t *testing.T) {
// 16th disk is unformatted.
errs[15] = errUnformattedDisk
newFormats := newHealFormatSets(quorumFormat, setCount, drivesPerSet, formats, errs)
newFormats := newHealFormatSets(quorumFormat, setCount, setDriveCount, formats, errs)
if newFormats == nil {
t.Fatal("Unexpected failure")
}

@ -89,7 +89,7 @@ func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) {
}
// healErasureSet lists and heals all objects in a specific erasure set
func healErasureSet(ctx context.Context, setIndex int, xlObj *erasureObjects, drivesPerSet int) error {
func healErasureSet(ctx context.Context, setIndex int, xlObj *erasureObjects, setDriveCount int) error {
buckets, err := xlObj.ListBuckets(ctx)
if err != nil {
return err
@ -151,7 +151,7 @@ func healErasureSet(ctx context.Context, setIndex int, xlObj *erasureObjects, dr
break
}
if quorumCount == drivesPerSet {
if quorumCount == setDriveCount {
// Skip good entries.
continue
}

@ -230,7 +230,7 @@ func IsServerResolvable(endpoint Endpoint) error {
// connect to list of endpoints and load all Erasure disk formats, validate the formats are correct
// and are in quorum, if no formats are found attempt to initialize all of them for the first
// time. additionally make sure to close all the disks used in this attempt.
func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints, zoneCount, setCount, drivesPerSet int, deploymentID string) (storageDisks []StorageAPI, format *formatErasureV3, err error) {
func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints, zoneCount, setCount, setDriveCount int, deploymentID string) (storageDisks []StorageAPI, format *formatErasureV3, err error) {
// Initialize all storage disks
storageDisks, errs := initStorageDisksWithErrors(endpoints)
@ -268,17 +268,17 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints,
// most part unless one of the formats is not consistent
// with expected Erasure format. For example if a user is
// trying to pool FS backend into an Erasure set.
if err = checkFormatErasureValues(formatConfigs, drivesPerSet); err != nil {
if err = checkFormatErasureValues(formatConfigs, setDriveCount); err != nil {
return nil, nil, err
}
// All disks report unformatted we should initialized everyone.
if shouldInitErasureDisks(sErrs) && firstDisk {
logger.Info("Formatting %s zone, %v set(s), %v drives per set.",
humanize.Ordinal(zoneCount), setCount, drivesPerSet)
humanize.Ordinal(zoneCount), setCount, setDriveCount)
// Initialize erasure code format on disks
format, err = initFormatErasure(GlobalContext, storageDisks, setCount, drivesPerSet, deploymentID, sErrs)
format, err = initFormatErasure(GlobalContext, storageDisks, setCount, setDriveCount, deploymentID, sErrs)
if err != nil {
return nil, nil, err
}
@ -347,8 +347,8 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints,
}
// Format disks before initialization of object layer.
func waitForFormatErasure(firstDisk bool, endpoints Endpoints, zoneCount, setCount, drivesPerSet int, deploymentID string) ([]StorageAPI, *formatErasureV3, error) {
if len(endpoints) == 0 || setCount == 0 || drivesPerSet == 0 {
func waitForFormatErasure(firstDisk bool, endpoints Endpoints, zoneCount, setCount, setDriveCount int, deploymentID string) ([]StorageAPI, *formatErasureV3, error) {
if len(endpoints) == 0 || setCount == 0 || setDriveCount == 0 {
return nil, nil, errInvalidArgument
}
@ -374,7 +374,7 @@ func waitForFormatErasure(firstDisk bool, endpoints Endpoints, zoneCount, setCou
for {
select {
case <-ticker.C:
storageDisks, format, err := connectLoadInitFormats(tries, firstDisk, endpoints, zoneCount, setCount, drivesPerSet, deploymentID)
storageDisks, format, err := connectLoadInitFormats(tries, firstDisk, endpoints, zoneCount, setCount, setDriveCount, deploymentID)
if err != nil {
tries++
switch err {

@ -842,11 +842,7 @@ func (s *xlStorage) WalkSplunk(volume, dirPath, marker string, endWalkCh <-chan
}
walkResultCh := startTreeWalk(GlobalContext, volume, dirPath, marker, true, listDir, s.isLeafSplunk, s.isLeafDir, endWalkCh)
for {
walkResult, ok := <-walkResultCh
if !ok {
return
}
for walkResult := range walkResultCh {
var fi FileInfo
if HasSuffix(walkResult.entry, SlashSeparator) {
fi = FileInfo{

Loading…
Cancel
Save