diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index e9b341725..ec4d9a1b6 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -183,7 +183,7 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, drivesToHeal // Heal all erasure sets that need for i, erasureSetToHeal := range erasureSetInZoneToHeal { for _, setIndex := range erasureSetToHeal { - err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex], z.zones[i].drivesPerSet) + err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex], z.zones[i].setDriveCount) if err != nil { logger.LogIf(ctx, err) } diff --git a/cmd/config/storageclass/storage-class.go b/cmd/config/storageclass/storage-class.go index 1e3fd5949..e1465f946 100644 --- a/cmd/config/storageclass/storage-class.go +++ b/cmd/config/storageclass/storage-class.go @@ -156,7 +156,7 @@ func parseStorageClass(storageClassEnv string) (sc StorageClass, err error) { } // Validates the parity disks. -func validateParity(ssParity, rrsParity, drivesPerSet int) (err error) { +func validateParity(ssParity, rrsParity, setDriveCount int) (err error) { if ssParity == 0 && rrsParity == 0 { return nil } @@ -174,12 +174,12 @@ func validateParity(ssParity, rrsParity, drivesPerSet int) (err error) { return fmt.Errorf("Reduced redundancy storage class parity %d should be greater than or equal to %d", rrsParity, minParityDisks) } - if ssParity > drivesPerSet/2 { - return fmt.Errorf("Standard storage class parity %d should be less than or equal to %d", ssParity, drivesPerSet/2) + if ssParity > setDriveCount/2 { + return fmt.Errorf("Standard storage class parity %d should be less than or equal to %d", ssParity, setDriveCount/2) } - if rrsParity > drivesPerSet/2 { - return fmt.Errorf("Reduced redundancy storage class parity %d should be less than or equal to %d", rrsParity, drivesPerSet/2) + if rrsParity > setDriveCount/2 { + return fmt.Errorf("Reduced redundancy storage class parity %d should be less than or equal to %d", rrsParity, setDriveCount/2) } if ssParity > 0 && rrsParity > 0 { @@ -220,9 +220,9 @@ func Enabled(kvs config.KVS) bool { } // LookupConfig - lookup storage class config and override with valid environment settings if any. -func LookupConfig(kvs config.KVS, drivesPerSet int) (cfg Config, err error) { +func LookupConfig(kvs config.KVS, setDriveCount int) (cfg Config, err error) { cfg = Config{} - cfg.Standard.Parity = drivesPerSet / 2 + cfg.Standard.Parity = setDriveCount / 2 cfg.RRS.Parity = defaultRRSParity if err = config.CheckValidKeys(config.StorageClassSubSys, kvs, DefaultKVS); err != nil { @@ -239,7 +239,7 @@ func LookupConfig(kvs config.KVS, drivesPerSet int) (cfg Config, err error) { } } if cfg.Standard.Parity == 0 { - cfg.Standard.Parity = drivesPerSet / 2 + cfg.Standard.Parity = setDriveCount / 2 } if rrsc != "" { @@ -254,7 +254,7 @@ func LookupConfig(kvs config.KVS, drivesPerSet int) (cfg Config, err error) { // Validation is done after parsing both the storage classes. This is needed because we need one // storage class value to deduce the correct value of the other storage class. - if err = validateParity(cfg.Standard.Parity, cfg.RRS.Parity, drivesPerSet); err != nil { + if err = validateParity(cfg.Standard.Parity, cfg.RRS.Parity, setDriveCount); err != nil { return Config{}, err } diff --git a/cmd/config/storageclass/storage-class_test.go b/cmd/config/storageclass/storage-class_test.go index da47c33ec..f67c561ee 100644 --- a/cmd/config/storageclass/storage-class_test.go +++ b/cmd/config/storageclass/storage-class_test.go @@ -69,10 +69,10 @@ func TestParseStorageClass(t *testing.T) { func TestValidateParity(t *testing.T) { tests := []struct { - rrsParity int - ssParity int - success bool - drivesPerSet int + rrsParity int + ssParity int + success bool + setDriveCount int }{ {2, 4, true, 16}, {3, 3, true, 16}, @@ -85,7 +85,7 @@ func TestValidateParity(t *testing.T) { {9, 2, false, 16}, } for i, tt := range tests { - err := validateParity(tt.ssParity, tt.rrsParity, tt.drivesPerSet) + err := validateParity(tt.ssParity, tt.rrsParity, tt.setDriveCount) if err != nil && tt.success { t.Errorf("Test %d, Expected success, got %s", i+1, err) } diff --git a/cmd/endpoint.go b/cmd/endpoint.go index c174ce8ee..b9ecd1aa1 100644 --- a/cmd/endpoint.go +++ b/cmd/endpoint.go @@ -193,7 +193,7 @@ func NewEndpoint(arg string) (ep Endpoint, e error) { } // ZoneEndpoints represent endpoints in a given zone -// along with its setCount and drivesPerSet. +// along with its setCount and setDriveCount. type ZoneEndpoints struct { SetCount int DrivesPerSet int diff --git a/cmd/erasure-bucket.go b/cmd/erasure-bucket.go index 3c7f93515..696c38dac 100644 --- a/cmd/erasure-bucket.go +++ b/cmd/erasure-bucket.go @@ -133,8 +133,7 @@ func (er erasureObjects) listBuckets(ctx context.Context) (bucketsInfo []BucketI if err == nil { // NOTE: The assumption here is that volumes across all disks in // readQuorum have consistent view i.e they all have same number - // of buckets. This is essentially not verified since healing - // should take care of this. + // of buckets. var bucketsInfo []BucketInfo for _, volInfo := range volsInfo { if isReservedOrInvalidBucket(volInfo.Name, true) { diff --git a/cmd/erasure-common.go b/cmd/erasure-common.go index 48218f1d4..639770e48 100644 --- a/cmd/erasure-common.go +++ b/cmd/erasure-common.go @@ -23,6 +23,17 @@ import ( "github.com/minio/minio/pkg/sync/errgroup" ) +func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) { + disks := er.getDisks() + // Based on the random shuffling return back randomized disks. + for _, i := range hashOrder(UTCNow().String(), len(disks)) { + if disks[i-1] != nil && disks[i-1].IsLocal() { + newDisks = append(newDisks, disks[i-1]) + } + } + return newDisks +} + // getLoadBalancedDisks - fetches load balanced (sufficiently randomized) disk slice. func (er erasureObjects) getLoadBalancedDisks() (newDisks []StorageAPI) { disks := er.getDisks() diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 6bdede785..09bb6f3c5 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -81,7 +81,8 @@ func (er erasureObjects) cleanupStaleMultipartUploads(ctx context.Context, clean return case <-ticker.C: var disk StorageAPI - for _, d := range er.getLoadBalancedDisks() { + // run multiple cleanup's local to this server. + for _, d := range er.getLoadBalancedLocalDisks() { if d != nil { disk = d break diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 45f726217..1be10ace5 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -75,7 +75,8 @@ type erasureSets struct { endpointStrings []string // Total number of sets and the number of disks per set. - setCount, drivesPerSet int + setCount, setDriveCount int + listTolerancePerSet int disksConnectEvent chan diskConnectInfo @@ -112,7 +113,7 @@ func (s *erasureSets) getDiskMap() map[string]StorageAPI { defer s.erasureDisksMu.RUnlock() for i := 0; i < s.setCount; i++ { - for j := 0; j < s.drivesPerSet; j++ { + for j := 0; j < s.setDriveCount; j++ { disk := s.erasureDisks[i][j] if disk == nil { continue @@ -228,7 +229,7 @@ func (s *erasureSets) connectDisks() { s.erasureDisks[setIndex][diskIndex].Close() } s.erasureDisks[setIndex][diskIndex] = disk - s.endpointStrings[setIndex*s.drivesPerSet+diskIndex] = disk.String() + s.endpointStrings[setIndex*s.setDriveCount+diskIndex] = disk.String() s.erasureDisksMu.Unlock() go func(setIndex int) { // Send a new disk connect event with a timeout @@ -260,7 +261,7 @@ func (s *erasureSets) monitorAndConnectEndpoints(ctx context.Context, monitorInt func (s *erasureSets) GetLockers(setIndex int) func() []dsync.NetLocker { return func() []dsync.NetLocker { - lockers := make([]dsync.NetLocker, s.drivesPerSet) + lockers := make([]dsync.NetLocker, s.setDriveCount) copy(lockers, s.erasureLockers[setIndex]) return lockers } @@ -271,9 +272,9 @@ func (s *erasureSets) GetEndpoints(setIndex int) func() []string { s.erasureDisksMu.RLock() defer s.erasureDisksMu.RUnlock() - eps := make([]string, s.drivesPerSet) - for i := 0; i < s.drivesPerSet; i++ { - eps[i] = s.endpointStrings[setIndex*s.drivesPerSet+i] + eps := make([]string, s.setDriveCount) + for i := 0; i < s.setDriveCount; i++ { + eps[i] = s.endpointStrings[setIndex*s.setDriveCount+i] } return eps } @@ -284,7 +285,7 @@ func (s *erasureSets) GetDisks(setIndex int) func() []StorageAPI { return func() []StorageAPI { s.erasureDisksMu.RLock() defer s.erasureDisksMu.RUnlock() - disks := make([]StorageAPI, s.drivesPerSet) + disks := make([]StorageAPI, s.setDriveCount) copy(disks, s.erasureDisks[setIndex]) return disks } @@ -295,46 +296,47 @@ const defaultMonitorConnectEndpointInterval = time.Second * 10 // Set to 10 secs // Initialize new set of erasure coded sets. func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageAPI, format *formatErasureV3) (*erasureSets, error) { setCount := len(format.Erasure.Sets) - drivesPerSet := len(format.Erasure.Sets[0]) + setDriveCount := len(format.Erasure.Sets[0]) endpointStrings := make([]string, len(endpoints)) // Initialize the erasure sets instance. s := &erasureSets{ - sets: make([]*erasureObjects, setCount), - erasureDisks: make([][]StorageAPI, setCount), - erasureLockers: make([][]dsync.NetLocker, setCount), - endpoints: endpoints, - endpointStrings: endpointStrings, - setCount: setCount, - drivesPerSet: drivesPerSet, - format: format, - disksConnectEvent: make(chan diskConnectInfo), - disksConnectDoneCh: make(chan struct{}), - distributionAlgo: format.Erasure.DistributionAlgo, - deploymentID: uuid.MustParse(format.ID), - pool: NewMergeWalkPool(globalMergeLookupTimeout), - poolSplunk: NewMergeWalkPool(globalMergeLookupTimeout), - poolVersions: NewMergeWalkVersionsPool(globalMergeLookupTimeout), - mrfOperations: make(map[healSource]int), + sets: make([]*erasureObjects, setCount), + erasureDisks: make([][]StorageAPI, setCount), + erasureLockers: make([][]dsync.NetLocker, setCount), + endpoints: endpoints, + endpointStrings: endpointStrings, + setCount: setCount, + setDriveCount: setDriveCount, + listTolerancePerSet: setDriveCount / 2, + format: format, + disksConnectEvent: make(chan diskConnectInfo), + disksConnectDoneCh: make(chan struct{}), + distributionAlgo: format.Erasure.DistributionAlgo, + deploymentID: uuid.MustParse(format.ID), + pool: NewMergeWalkPool(globalMergeLookupTimeout), + poolSplunk: NewMergeWalkPool(globalMergeLookupTimeout), + poolVersions: NewMergeWalkVersionsPool(globalMergeLookupTimeout), + mrfOperations: make(map[healSource]int), } mutex := newNSLock(globalIsDistErasure) // Initialize byte pool once for all sets, bpool size is set to - // setCount * drivesPerSet with each memory upto blockSizeV1. - bp := bpool.NewBytePoolCap(setCount*drivesPerSet, blockSizeV1, blockSizeV1*2) + // setCount * setDriveCount with each memory upto blockSizeV1. + bp := bpool.NewBytePoolCap(setCount*setDriveCount, blockSizeV1, blockSizeV1*2) for i := 0; i < setCount; i++ { - s.erasureDisks[i] = make([]StorageAPI, drivesPerSet) - s.erasureLockers[i] = make([]dsync.NetLocker, drivesPerSet) + s.erasureDisks[i] = make([]StorageAPI, setDriveCount) + s.erasureLockers[i] = make([]dsync.NetLocker, setDriveCount) } for i := 0; i < setCount; i++ { - for j := 0; j < drivesPerSet; j++ { - endpoint := endpoints[i*drivesPerSet+j] + for j := 0; j < setDriveCount; j++ { + endpoint := endpoints[i*setDriveCount+j] // Rely on endpoints list to initialize, init lockers and available disks. s.erasureLockers[i][j] = newLockAPI(endpoint) - disk := storageDisks[i*drivesPerSet+j] + disk := storageDisks[i*setDriveCount+j] if disk == nil { continue } @@ -348,7 +350,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto disk.Close() continue } - s.endpointStrings[m*drivesPerSet+n] = disk.String() + s.endpointStrings[m*setDriveCount+n] = disk.String() s.erasureDisks[m][n] = disk } @@ -384,7 +386,7 @@ func (s *erasureSets) NewNSLock(ctx context.Context, bucket string, objects ...s // SetDriveCount returns the current drives per set. func (s *erasureSets) SetDriveCount() int { - return s.drivesPerSet + return s.setDriveCount } // StorageUsageInfo - combines output of StorageInfo across all erasure coded object sets. @@ -458,13 +460,13 @@ func (s *erasureSets) StorageInfo(ctx context.Context, local bool) (StorageInfo, scParity := globalStorageClass.GetParityForSC(storageclass.STANDARD) if scParity == 0 { - scParity = s.drivesPerSet / 2 + scParity = s.setDriveCount / 2 } - storageInfo.Backend.StandardSCData = s.drivesPerSet - scParity + storageInfo.Backend.StandardSCData = s.setDriveCount - scParity storageInfo.Backend.StandardSCParity = scParity rrSCParity := globalStorageClass.GetParityForSC(storageclass.RRS) - storageInfo.Backend.RRSCData = s.drivesPerSet - rrSCParity + storageInfo.Backend.RRSCData = s.setDriveCount - rrSCParity storageInfo.Backend.RRSCParity = rrSCParity if local { @@ -838,17 +840,9 @@ func (f *FileInfoCh) Push(fi FileInfo) { // if the caller wishes to list N entries to call lexicallySortedEntry // N times until this boolean is 'false'. func lexicallySortedEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool) (FileInfo, int, bool) { - var wg sync.WaitGroup for j := range entryChs { - j := j - wg.Add(1) - // Pop() entries in parallel for large drive setups. - go func() { - defer wg.Done() - entries[j], entriesValid[j] = entryChs[j].Pop() - }() + entries[j], entriesValid[j] = entryChs[j].Pop() } - wg.Wait() var isTruncated = false for _, valid := range entriesValid { @@ -910,17 +904,9 @@ func lexicallySortedEntry(entryChs []FileInfoCh, entries []FileInfo, entriesVali // if the caller wishes to list N entries to call lexicallySortedEntry // N times until this boolean is 'false'. func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileInfoVersions, entriesValid []bool) (FileInfoVersions, int, bool) { - var wg sync.WaitGroup for j := range entryChs { - j := j - wg.Add(1) - // Pop() entries in parallel for large drive setups. - go func() { - defer wg.Done() - entries[j], entriesValid[j] = entryChs[j].Pop() - }() + entries[j], entriesValid[j] = entryChs[j].Pop() } - wg.Wait() var isTruncated = false for _, valid := range entriesValid { @@ -1232,7 +1218,7 @@ func (s *erasureSets) ReloadFormat(ctx context.Context, dryRun bool) (err error) }(storageDisks) formats, _ := loadFormatErasureAll(storageDisks, false) - if err = checkFormatErasureValues(formats, s.drivesPerSet); err != nil { + if err = checkFormatErasureValues(formats, s.setDriveCount); err != nil { return err } @@ -1272,7 +1258,7 @@ func (s *erasureSets) ReloadFormat(ctx context.Context, dryRun bool) (err error) s.erasureDisks[m][n].Close() } - s.endpointStrings[m*s.drivesPerSet+n] = disk.String() + s.endpointStrings[m*s.setDriveCount+n] = disk.String() s.erasureDisks[m][n] = disk } s.erasureDisksMu.Unlock() @@ -1354,7 +1340,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H }(storageDisks) formats, sErrs := loadFormatErasureAll(storageDisks, true) - if err = checkFormatErasureValues(formats, s.drivesPerSet); err != nil { + if err = checkFormatErasureValues(formats, s.setDriveCount); err != nil { return madmin.HealResultItem{}, err } @@ -1365,7 +1351,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H res = madmin.HealResultItem{ Type: madmin.HealItemMetadata, Detail: "disk-format", - DiskCount: s.setCount * s.drivesPerSet, + DiskCount: s.setCount * s.setDriveCount, SetCount: s.setCount, } @@ -1396,7 +1382,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H markUUIDsOffline(refFormat, formats) // Initialize a new set of set formats which will be written to disk. - newFormatSets := newHealFormatSets(refFormat, s.setCount, s.drivesPerSet, formats, sErrs) + newFormatSets := newHealFormatSets(refFormat, s.setCount, s.setDriveCount, formats, sErrs) // Look for all offline/unformatted disks in our reference format, // such that we can fill them up with new UUIDs, this looping also @@ -1413,7 +1399,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H newFormatSets[i][l].Erasure.This = mustGetUUID() refFormat.Erasure.Sets[i][j] = newFormatSets[i][l].Erasure.This for m, v := range res.After.Drives { - if v.Endpoint == s.endpoints.GetString(i*s.drivesPerSet+l) { + if v.Endpoint == s.endpoints.GetString(i*s.setDriveCount+l) { res.After.Drives[m].UUID = newFormatSets[i][l].Erasure.This res.After.Drives[m].State = madmin.DriveStateOk } @@ -1426,14 +1412,14 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H } if !dryRun { - var tmpNewFormats = make([]*formatErasureV3, s.setCount*s.drivesPerSet) + var tmpNewFormats = make([]*formatErasureV3, s.setCount*s.setDriveCount) for i := range newFormatSets { for j := range newFormatSets[i] { if newFormatSets[i][j] == nil { continue } - tmpNewFormats[i*s.drivesPerSet+j] = newFormatSets[i][j] - tmpNewFormats[i*s.drivesPerSet+j].Erasure.Sets = refFormat.Erasure.Sets + tmpNewFormats[i*s.setDriveCount+j] = newFormatSets[i][j] + tmpNewFormats[i*s.setDriveCount+j].Erasure.Sets = refFormat.Erasure.Sets } } @@ -1478,7 +1464,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H s.erasureDisks[m][n].Close() } - s.endpointStrings[m*s.drivesPerSet+n] = disk.String() + s.endpointStrings[m*s.setDriveCount+n] = disk.String() s.erasureDisks[m][n] = disk } s.erasureDisksMu.Unlock() @@ -1496,7 +1482,7 @@ func (s *erasureSets) HealBucket(ctx context.Context, bucket string, dryRun, rem result = madmin.HealResultItem{ Type: madmin.HealItemBucket, Bucket: bucket, - DiskCount: s.setCount * s.drivesPerSet, + DiskCount: s.setCount * s.setDriveCount, SetCount: s.setCount, } @@ -1512,7 +1498,7 @@ func (s *erasureSets) HealBucket(ctx context.Context, bucket string, dryRun, rem // Check if we had quorum to write, if not return an appropriate error. _, afterDriveOnline := result.GetOnlineCounts() - if afterDriveOnline < ((s.setCount*s.drivesPerSet)/2)+1 { + if afterDriveOnline < ((s.setCount*s.setDriveCount)/2)+1 { return result, toObjectErr(errErasureWriteQuorum, bucket) } @@ -1568,7 +1554,7 @@ func (s *erasureSets) Walk(ctx context.Context, bucket, prefix string, results c return } - if quorumCount >= s.drivesPerSet/2 { + if quorumCount >= s.setDriveCount/2 { // Read quorum exists proceed for _, version := range entry.Versions { results <- version.ToObjectInfo(bucket, version.Name) @@ -1595,7 +1581,7 @@ func (s *erasureSets) Walk(ctx context.Context, bucket, prefix string, results c return } - if quorumCount >= s.drivesPerSet/2 { + if quorumCount >= s.setDriveCount/2 { // Read quorum exists proceed results <- entry.ToObjectInfo(bucket, entry.Name) } @@ -1622,14 +1608,14 @@ func (s *erasureSets) HealObjects(ctx context.Context, bucket, prefix string, op break } - if quorumCount == s.drivesPerSet && opts.ScanMode == madmin.HealNormalScan { + if quorumCount == s.setDriveCount && opts.ScanMode == madmin.HealNormalScan { // Skip good entries. continue } for _, version := range entry.Versions { // Wait and proceed if there are active requests - waitForLowHTTPReq(int32(s.drivesPerSet), time.Second) + waitForLowHTTPReq(int32(s.setDriveCount), time.Second) if err := healObject(bucket, version.Name, version.VersionID); err != nil { return toObjectErr(err, bucket, version.Name) diff --git a/cmd/erasure-zones.go b/cmd/erasure-zones.go index 9ffe3fea1..b4adadfdf 100644 --- a/cmd/erasure-zones.go +++ b/cmd/erasure-zones.go @@ -690,15 +690,15 @@ func (z *erasureZones) ListObjectsV2(ctx context.Context, bucket, prefix, contin func (z *erasureZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) { var zonesEntryChs [][]FileInfoCh - var zonesDrivesPerSet []int + var zonesListTolerancePerSet []int endWalkCh := make(chan struct{}) defer close(endWalkCh) for _, zone := range z.zones { zonesEntryChs = append(zonesEntryChs, - zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, zone.drivesPerSet)) - zonesDrivesPerSet = append(zonesDrivesPerSet, zone.drivesPerSet) + zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, zone.listTolerancePerSet)) + zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet) } var objInfos []ObjectInfo @@ -723,7 +723,7 @@ func (z *erasureZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, break } - if quorumCount < zonesDrivesPerSet[zoneIndex]/2 { + if quorumCount < zonesListTolerancePerSet[zoneIndex] { // Skip entries which are not found on upto ndisks/2. continue } @@ -810,20 +810,20 @@ func (z *erasureZones) listObjectsSplunk(ctx context.Context, bucket, prefix, ma var zonesEntryChs [][]FileInfoCh var zonesEndWalkCh []chan struct{} - var drivesPerSets []int + var zonesListTolerancePerSet []int for _, zone := range z.zones { entryChs, endWalkCh := zone.poolSplunk.Release(listParams{bucket, recursive, marker, prefix}) if entryChs == nil { endWalkCh = make(chan struct{}) - entryChs = zone.startSplunkMergeWalksN(ctx, bucket, prefix, marker, endWalkCh, zone.drivesPerSet) + entryChs = zone.startSplunkMergeWalksN(ctx, bucket, prefix, marker, endWalkCh, zone.listTolerancePerSet) } zonesEntryChs = append(zonesEntryChs, entryChs) zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh) - drivesPerSets = append(drivesPerSets, zone.drivesPerSet) + zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet) } - entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, drivesPerSets) + entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, zonesListTolerancePerSet) if len(entries.Files) == 0 { return loi, nil } @@ -902,20 +902,20 @@ func (z *erasureZones) listObjects(ctx context.Context, bucket, prefix, marker, var zonesEntryChs [][]FileInfoCh var zonesEndWalkCh []chan struct{} - var drivesPerSets []int + var zonesListTolerancePerSet []int for _, zone := range z.zones { entryChs, endWalkCh := zone.pool.Release(listParams{bucket, recursive, marker, prefix}) if entryChs == nil { endWalkCh = make(chan struct{}) - entryChs = zone.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.drivesPerSet) + entryChs = zone.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.listTolerancePerSet) } zonesEntryChs = append(zonesEntryChs, entryChs) zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh) - drivesPerSets = append(drivesPerSets, zone.drivesPerSet) + zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet) } - entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, drivesPerSets) + entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, zonesListTolerancePerSet) if len(entries.Files) == 0 { return loi, nil } @@ -951,18 +951,9 @@ func (z *erasureZones) listObjects(ctx context.Context, bucket, prefix, marker, // N times until this boolean is 'false'. func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) (FileInfo, int, int, bool) { for i, entryChs := range zoneEntryChs { - i := i - var wg sync.WaitGroup for j := range entryChs { - j := j - wg.Add(1) - // Pop() entries in parallel for large drive setups. - go func() { - defer wg.Done() - zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() - }() + zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() } - wg.Wait() } var isTruncated = false @@ -1040,18 +1031,9 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI // N times until this boolean is 'false'. func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneEntries [][]FileInfoVersions, zoneEntriesValid [][]bool) (FileInfoVersions, int, int, bool) { for i, entryChs := range zoneEntryChs { - i := i - var wg sync.WaitGroup for j := range entryChs { - j := j - wg.Add(1) - // Pop() entries in parallel for large drive setups. - go func() { - defer wg.Done() - zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() - }() + zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() } - wg.Wait() } var isTruncated = false @@ -1119,7 +1101,7 @@ func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneE } // mergeZonesEntriesVersionsCh - merges FileInfoVersions channel to entries upto maxKeys. -func mergeZonesEntriesVersionsCh(zonesEntryChs [][]FileInfoVersionsCh, maxKeys int, drivesPerSets []int) (entries FilesInfoVersions) { +func mergeZonesEntriesVersionsCh(zonesEntryChs [][]FileInfoVersionsCh, maxKeys int, zonesListTolerancePerSet []int) (entries FilesInfoVersions) { var i = 0 var zonesEntriesInfos [][]FileInfoVersions var zonesEntriesValid [][]bool @@ -1134,8 +1116,8 @@ func mergeZonesEntriesVersionsCh(zonesEntryChs [][]FileInfoVersionsCh, maxKeys i break } - if quorumCount < drivesPerSets[zoneIndex]/2 { - // Skip entries which are not found on upto ndisks/2. + if quorumCount < zonesListTolerancePerSet[zoneIndex] { + // Skip entries which are not found upto the expected tolerance continue } @@ -1150,7 +1132,7 @@ func mergeZonesEntriesVersionsCh(zonesEntryChs [][]FileInfoVersionsCh, maxKeys i } // mergeZonesEntriesCh - merges FileInfo channel to entries upto maxKeys. -func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, drivesPerSets []int) (entries FilesInfo) { +func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, zonesListTolerancePerSet []int) (entries FilesInfo) { var i = 0 var zonesEntriesInfos [][]FileInfo var zonesEntriesValid [][]bool @@ -1165,8 +1147,8 @@ func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, drivesPerSet break } - if quorumCount < drivesPerSets[zoneIndex]/2 { - // Skip entries which are not found on upto ndisks/2. + if quorumCount < zonesListTolerancePerSet[zoneIndex] { + // Skip entries which are not found upto configured tolerance. continue } @@ -1182,18 +1164,9 @@ func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, drivesPerSet func isTruncatedZones(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) bool { for i, entryChs := range zoneEntryChs { - i := i - var wg sync.WaitGroup for j := range entryChs { - j := j - wg.Add(1) - // Pop() entries in parallel for large drive setups. - go func() { - defer wg.Done() - zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() - }() + zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() } - wg.Wait() } var isTruncated = false @@ -1214,24 +1187,16 @@ func isTruncatedZones(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zon zoneEntryChs[i][j].Push(zoneEntries[i][j]) } } + } return isTruncated } func isTruncatedZonesVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneEntries [][]FileInfoVersions, zoneEntriesValid [][]bool) bool { for i, entryChs := range zoneEntryChs { - i := i - var wg sync.WaitGroup for j := range entryChs { - j := j - wg.Add(1) - // Pop() entries in parallel for large drive setups. - go func() { - defer wg.Done() - zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() - }() + zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() } - wg.Wait() } var isTruncated = false @@ -1307,19 +1272,19 @@ func (z *erasureZones) listObjectVersions(ctx context.Context, bucket, prefix, m var zonesEntryChs [][]FileInfoVersionsCh var zonesEndWalkCh []chan struct{} - var drivesPerSets []int + var zonesListTolerancePerSet []int for _, zone := range z.zones { entryChs, endWalkCh := zone.poolVersions.Release(listParams{bucket, recursive, marker, prefix}) if entryChs == nil { endWalkCh = make(chan struct{}) - entryChs = zone.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.drivesPerSet) + entryChs = zone.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.listTolerancePerSet) } zonesEntryChs = append(zonesEntryChs, entryChs) zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh) - drivesPerSets = append(drivesPerSets, zone.drivesPerSet) + zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet) } - entries := mergeZonesEntriesVersionsCh(zonesEntryChs, maxKeys, drivesPerSets) + entries := mergeZonesEntriesVersionsCh(zonesEntryChs, maxKeys, zonesListTolerancePerSet) if len(entries.FilesVersions) == 0 { return loi, nil } @@ -1830,7 +1795,7 @@ func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results var zoneDrivesPerSet []int for _, zone := range z.zones { - zoneDrivesPerSet = append(zoneDrivesPerSet, zone.drivesPerSet) + zoneDrivesPerSet = append(zoneDrivesPerSet, zone.setDriveCount) } var zonesEntriesInfos [][]FileInfoVersions @@ -1871,7 +1836,7 @@ func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results var zoneDrivesPerSet []int for _, zone := range z.zones { - zoneDrivesPerSet = append(zoneDrivesPerSet, zone.drivesPerSet) + zoneDrivesPerSet = append(zoneDrivesPerSet, zone.setDriveCount) } var zonesEntriesInfos [][]FileInfo @@ -1918,7 +1883,7 @@ func (z *erasureZones) HealObjects(ctx context.Context, bucket, prefix string, o var zoneDrivesPerSet []int for _, zone := range z.zones { - zoneDrivesPerSet = append(zoneDrivesPerSet, zone.drivesPerSet) + zoneDrivesPerSet = append(zoneDrivesPerSet, zone.setDriveCount) } var zonesEntriesInfos [][]FileInfoVersions @@ -2082,7 +2047,7 @@ func (z *erasureZones) Health(ctx context.Context, opts HealthOptions) HealthRes for zoneIdx := range erasureSetUpCount { parityDrives := globalStorageClass.GetParityForSC(storageclass.STANDARD) - diskCount := z.zones[zoneIdx].drivesPerSet + diskCount := z.zones[zoneIdx].setDriveCount if parityDrives == 0 { parityDrives = getDefaultParityBlocks(diskCount) } diff --git a/cmd/format-erasure.go b/cmd/format-erasure.go index 8b57aacd7..bccebb4ef 100644 --- a/cmd/format-erasure.go +++ b/cmd/format-erasure.go @@ -439,7 +439,7 @@ func checkFormatErasureValue(formatErasure *formatErasureV3) error { } // Check all format values. -func checkFormatErasureValues(formats []*formatErasureV3, drivesPerSet int) error { +func checkFormatErasureValues(formats []*formatErasureV3, setDriveCount int) error { for i, formatErasure := range formats { if formatErasure == nil { continue @@ -454,8 +454,8 @@ func checkFormatErasureValues(formats []*formatErasureV3, drivesPerSet int) erro // Only if custom erasure drive count is set, // we should fail here other proceed to honor what // is present on the disk. - if globalCustomErasureDriveCount && len(formatErasure.Erasure.Sets[0]) != drivesPerSet { - return fmt.Errorf("%s disk is already formatted with %d drives per erasure set. This cannot be changed to %d, please revert your MINIO_ERASURE_SET_DRIVE_COUNT setting", humanize.Ordinal(i+1), len(formatErasure.Erasure.Sets[0]), drivesPerSet) + if globalCustomErasureDriveCount && len(formatErasure.Erasure.Sets[0]) != setDriveCount { + return fmt.Errorf("%s disk is already formatted with %d drives per erasure set. This cannot be changed to %d, please revert your MINIO_ERASURE_SET_DRIVE_COUNT setting", humanize.Ordinal(i+1), len(formatErasure.Erasure.Sets[0]), setDriveCount) } } return nil @@ -788,22 +788,22 @@ func fixFormatErasureV3(storageDisks []StorageAPI, endpoints Endpoints, formats } // initFormatErasure - save Erasure format configuration on all disks. -func initFormatErasure(ctx context.Context, storageDisks []StorageAPI, setCount, drivesPerSet int, deploymentID string, sErrs []error) (*formatErasureV3, error) { - format := newFormatErasureV3(setCount, drivesPerSet) +func initFormatErasure(ctx context.Context, storageDisks []StorageAPI, setCount, setDriveCount int, deploymentID string, sErrs []error) (*formatErasureV3, error) { + format := newFormatErasureV3(setCount, setDriveCount) formats := make([]*formatErasureV3, len(storageDisks)) - wantAtMost := ecDrivesNoConfig(drivesPerSet) + wantAtMost := ecDrivesNoConfig(setDriveCount) for i := 0; i < setCount; i++ { - hostCount := make(map[string]int, drivesPerSet) - for j := 0; j < drivesPerSet; j++ { - disk := storageDisks[i*drivesPerSet+j] + hostCount := make(map[string]int, setDriveCount) + for j := 0; j < setDriveCount; j++ { + disk := storageDisks[i*setDriveCount+j] newFormat := format.Clone() newFormat.Erasure.This = format.Erasure.Sets[i][j] if deploymentID != "" { newFormat.ID = deploymentID } hostCount[disk.Hostname()]++ - formats[i*drivesPerSet+j] = newFormat + formats[i*setDriveCount+j] = newFormat } if len(hostCount) > 0 { var once sync.Once @@ -817,8 +817,8 @@ func initFormatErasure(ctx context.Context, storageDisks []StorageAPI, setCount, return } logger.Info(" * Set %v:", i+1) - for j := 0; j < drivesPerSet; j++ { - disk := storageDisks[i*drivesPerSet+j] + for j := 0; j < setDriveCount; j++ { + disk := storageDisks[i*setDriveCount+j] logger.Info(" - Drive: %s", disk.String()) } }) @@ -842,15 +842,15 @@ func initFormatErasure(ctx context.Context, storageDisks []StorageAPI, setCount, // ecDrivesNoConfig returns the erasure coded drives in a set if no config has been set. // It will attempt to read it from env variable and fall back to drives/2. -func ecDrivesNoConfig(drivesPerSet int) int { +func ecDrivesNoConfig(setDriveCount int) int { ecDrives := globalStorageClass.GetParityForSC(storageclass.STANDARD) if ecDrives == 0 { - cfg, err := storageclass.LookupConfig(nil, drivesPerSet) + cfg, err := storageclass.LookupConfig(nil, setDriveCount) if err == nil { ecDrives = cfg.Standard.Parity } if ecDrives == 0 { - ecDrives = drivesPerSet / 2 + ecDrives = setDriveCount / 2 } } return ecDrives @@ -920,14 +920,14 @@ func markUUIDsOffline(refFormat *formatErasureV3, formats []*formatErasureV3) { } // Initialize a new set of set formats which will be written to all disks. -func newHealFormatSets(refFormat *formatErasureV3, setCount, drivesPerSet int, formats []*formatErasureV3, errs []error) [][]*formatErasureV3 { +func newHealFormatSets(refFormat *formatErasureV3, setCount, setDriveCount int, formats []*formatErasureV3, errs []error) [][]*formatErasureV3 { newFormats := make([][]*formatErasureV3, setCount) for i := range refFormat.Erasure.Sets { - newFormats[i] = make([]*formatErasureV3, drivesPerSet) + newFormats[i] = make([]*formatErasureV3, setDriveCount) } for i := range refFormat.Erasure.Sets { for j := range refFormat.Erasure.Sets[i] { - if errs[i*drivesPerSet+j] == errUnformattedDisk || errs[i*drivesPerSet+j] == nil { + if errs[i*setDriveCount+j] == errUnformattedDisk || errs[i*setDriveCount+j] == nil { newFormats[i][j] = &formatErasureV3{} newFormats[i][j].Version = refFormat.Version newFormats[i][j].ID = refFormat.ID @@ -935,13 +935,13 @@ func newHealFormatSets(refFormat *formatErasureV3, setCount, drivesPerSet int, f newFormats[i][j].Erasure.Version = refFormat.Erasure.Version newFormats[i][j].Erasure.DistributionAlgo = refFormat.Erasure.DistributionAlgo } - if errs[i*drivesPerSet+j] == errUnformattedDisk { + if errs[i*setDriveCount+j] == errUnformattedDisk { newFormats[i][j].Erasure.This = "" newFormats[i][j].Erasure.Sets = nil continue } - if errs[i*drivesPerSet+j] == nil { - newFormats[i][j].Erasure.This = formats[i*drivesPerSet+j].Erasure.This + if errs[i*setDriveCount+j] == nil { + newFormats[i][j].Erasure.This = formats[i*setDriveCount+j].Erasure.This newFormats[i][j].Erasure.Sets = nil } } diff --git a/cmd/format-erasure_test.go b/cmd/format-erasure_test.go index e64863aef..2d723b64b 100644 --- a/cmd/format-erasure_test.go +++ b/cmd/format-erasure_test.go @@ -324,16 +324,16 @@ func TestCheckFormatErasureValue(t *testing.T) { // Tests getFormatErasureInQuorum() func TestGetFormatErasureInQuorumCheck(t *testing.T) { setCount := 2 - drivesPerSet := 16 + setDriveCount := 16 - format := newFormatErasureV3(setCount, drivesPerSet) + format := newFormatErasureV3(setCount, setDriveCount) formats := make([]*formatErasureV3, 32) for i := 0; i < setCount; i++ { - for j := 0; j < drivesPerSet; j++ { + for j := 0; j < setDriveCount; j++ { newFormat := format.Clone() newFormat.Erasure.This = format.Erasure.Sets[i][j] - formats[i*drivesPerSet+j] = newFormat + formats[i*setDriveCount+j] = newFormat } } @@ -390,16 +390,16 @@ func TestGetFormatErasureInQuorumCheck(t *testing.T) { // Tests formatErasureGetDeploymentID() func TestGetErasureID(t *testing.T) { setCount := 2 - drivesPerSet := 8 + setDriveCount := 8 - format := newFormatErasureV3(setCount, drivesPerSet) + format := newFormatErasureV3(setCount, setDriveCount) formats := make([]*formatErasureV3, 16) for i := 0; i < setCount; i++ { - for j := 0; j < drivesPerSet; j++ { + for j := 0; j < setDriveCount; j++ { newFormat := format.Clone() newFormat.Erasure.This = format.Erasure.Sets[i][j] - formats[i*drivesPerSet+j] = newFormat + formats[i*setDriveCount+j] = newFormat } } @@ -445,17 +445,17 @@ func TestGetErasureID(t *testing.T) { // Initialize new format sets. func TestNewFormatSets(t *testing.T) { setCount := 2 - drivesPerSet := 16 + setDriveCount := 16 - format := newFormatErasureV3(setCount, drivesPerSet) + format := newFormatErasureV3(setCount, setDriveCount) formats := make([]*formatErasureV3, 32) errs := make([]error, 32) for i := 0; i < setCount; i++ { - for j := 0; j < drivesPerSet; j++ { + for j := 0; j < setDriveCount; j++ { newFormat := format.Clone() newFormat.Erasure.This = format.Erasure.Sets[i][j] - formats[i*drivesPerSet+j] = newFormat + formats[i*setDriveCount+j] = newFormat } } @@ -467,7 +467,7 @@ func TestNewFormatSets(t *testing.T) { // 16th disk is unformatted. errs[15] = errUnformattedDisk - newFormats := newHealFormatSets(quorumFormat, setCount, drivesPerSet, formats, errs) + newFormats := newHealFormatSets(quorumFormat, setCount, setDriveCount, formats, errs) if newFormats == nil { t.Fatal("Unexpected failure") } diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 8144e5fa3..7909c2e7a 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -89,7 +89,7 @@ func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) { } // healErasureSet lists and heals all objects in a specific erasure set -func healErasureSet(ctx context.Context, setIndex int, xlObj *erasureObjects, drivesPerSet int) error { +func healErasureSet(ctx context.Context, setIndex int, xlObj *erasureObjects, setDriveCount int) error { buckets, err := xlObj.ListBuckets(ctx) if err != nil { return err @@ -151,7 +151,7 @@ func healErasureSet(ctx context.Context, setIndex int, xlObj *erasureObjects, dr break } - if quorumCount == drivesPerSet { + if quorumCount == setDriveCount { // Skip good entries. continue } diff --git a/cmd/prepare-storage.go b/cmd/prepare-storage.go index 80e6c5e0d..13f5ba8be 100644 --- a/cmd/prepare-storage.go +++ b/cmd/prepare-storage.go @@ -230,7 +230,7 @@ func IsServerResolvable(endpoint Endpoint) error { // connect to list of endpoints and load all Erasure disk formats, validate the formats are correct // and are in quorum, if no formats are found attempt to initialize all of them for the first // time. additionally make sure to close all the disks used in this attempt. -func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints, zoneCount, setCount, drivesPerSet int, deploymentID string) (storageDisks []StorageAPI, format *formatErasureV3, err error) { +func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints, zoneCount, setCount, setDriveCount int, deploymentID string) (storageDisks []StorageAPI, format *formatErasureV3, err error) { // Initialize all storage disks storageDisks, errs := initStorageDisksWithErrors(endpoints) @@ -268,17 +268,17 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints, // most part unless one of the formats is not consistent // with expected Erasure format. For example if a user is // trying to pool FS backend into an Erasure set. - if err = checkFormatErasureValues(formatConfigs, drivesPerSet); err != nil { + if err = checkFormatErasureValues(formatConfigs, setDriveCount); err != nil { return nil, nil, err } // All disks report unformatted we should initialized everyone. if shouldInitErasureDisks(sErrs) && firstDisk { logger.Info("Formatting %s zone, %v set(s), %v drives per set.", - humanize.Ordinal(zoneCount), setCount, drivesPerSet) + humanize.Ordinal(zoneCount), setCount, setDriveCount) // Initialize erasure code format on disks - format, err = initFormatErasure(GlobalContext, storageDisks, setCount, drivesPerSet, deploymentID, sErrs) + format, err = initFormatErasure(GlobalContext, storageDisks, setCount, setDriveCount, deploymentID, sErrs) if err != nil { return nil, nil, err } @@ -347,8 +347,8 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints, } // Format disks before initialization of object layer. -func waitForFormatErasure(firstDisk bool, endpoints Endpoints, zoneCount, setCount, drivesPerSet int, deploymentID string) ([]StorageAPI, *formatErasureV3, error) { - if len(endpoints) == 0 || setCount == 0 || drivesPerSet == 0 { +func waitForFormatErasure(firstDisk bool, endpoints Endpoints, zoneCount, setCount, setDriveCount int, deploymentID string) ([]StorageAPI, *formatErasureV3, error) { + if len(endpoints) == 0 || setCount == 0 || setDriveCount == 0 { return nil, nil, errInvalidArgument } @@ -374,7 +374,7 @@ func waitForFormatErasure(firstDisk bool, endpoints Endpoints, zoneCount, setCou for { select { case <-ticker.C: - storageDisks, format, err := connectLoadInitFormats(tries, firstDisk, endpoints, zoneCount, setCount, drivesPerSet, deploymentID) + storageDisks, format, err := connectLoadInitFormats(tries, firstDisk, endpoints, zoneCount, setCount, setDriveCount, deploymentID) if err != nil { tries++ switch err { diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 1d40346a8..9565be82a 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -842,11 +842,7 @@ func (s *xlStorage) WalkSplunk(volume, dirPath, marker string, endWalkCh <-chan } walkResultCh := startTreeWalk(GlobalContext, volume, dirPath, marker, true, listDir, s.isLeafSplunk, s.isLeafDir, endWalkCh) - for { - walkResult, ok := <-walkResultCh - if !ok { - return - } + for walkResult := range walkResultCh { var fi FileInfo if HasSuffix(walkResult.entry, SlashSeparator) { fi = FileInfo{