diff --git a/cmd/data-crawler.go b/cmd/data-crawler.go index c123993b2..c9d911cc5 100644 --- a/cmd/data-crawler.go +++ b/cmd/data-crawler.go @@ -148,6 +148,7 @@ type folderScanner struct { newFolders []cachedFolder existingFolders []cachedFolder + disks []StorageAPI } // crawlDataFolder will crawl the basepath+cache.Info.Name and return an updated cache. @@ -176,7 +177,6 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache, logger.LogIf(ctx, err) delayMult = dataCrawlSleepDefMult } - s := folderScanner{ root: basePath, getSize: getSize, @@ -190,6 +190,18 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache, healObjectSelect: 0, } + // Add disks for set healing. + if len(cache.Disks) > 0 { + objAPI, ok := newObjectLayerFn().(*erasureServerSets) + if ok { + s.disks = objAPI.GetDisksID(cache.Disks...) + if len(s.disks) != len(cache.Disks) { + logger.Info(logPrefix+"Missing disks, want %d, found %d. Cannot heal."+logSuffix, len(cache.Disks), len(s.disks)) + s.disks = s.disks[:0] + } + } + } + // Enable healing in XL mode. if globalIsErasure { // Include a clean folder one in n cycles. @@ -459,8 +471,8 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo continue } - objAPI := newObjectLayerFn() - if objAPI == nil { + objAPI, ok := newObjectLayerFn().(*erasureServerSets) + if !ok || len(f.disks) == 0 { continue } @@ -480,6 +492,13 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo // We therefore perform a heal check. // If that doesn't bring it back we remove the folder and assume it was deleted. // This means that the next run will not look for it. + // How to resolve results. + resolver := metadataResolutionParams{ + dirQuorum: getReadQuorum(len(f.disks)), + objQuorum: getReadQuorum(len(f.disks)), + bucket: "", + } + for k := range existing { bucket, prefix := path2BucketObject(k) if f.dataUsageCrawlDebug { @@ -488,30 +507,121 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo // Dynamic time delay. t := UTCNow() - - err = objAPI.HealObjects(ctx, bucket, prefix, madmin.HealOpts{Recursive: true, Remove: healDeleteDangling}, - func(bucket, object, versionID string) error { - // Wait for each heal as per crawler frequency. + resolver.bucket = bucket + + foundObjs := false + dangling := true + ctx, cancel := context.WithCancel(ctx) + err := listPathRaw(ctx, listPathRawOptions{ + disks: f.disks, + bucket: bucket, + path: prefix, + recursive: true, + reportNotFound: true, + minDisks: len(f.disks), // We want full consistency. + // Weird, maybe transient error. + agreed: func(entry metaCacheEntry) { + if f.dataUsageCrawlDebug { + logger.Info(color.Green("healObjects:")+" got agreement: %v", entry.name) + } + if entry.isObject() { + dangling = false + } + }, + // Some disks have data for this. + partial: func(entries metaCacheEntries, nAgreed int, errs []error) { + if f.dataUsageCrawlDebug { + logger.Info(color.Green("healObjects:")+" got partial, %d agreed, errs: %v", nAgreed, errs) + } + // Sleep and reset. sleepDuration(time.Since(t), f.dataUsageCrawlMult) + t = UTCNow() + entry, ok := entries.resolve(&resolver) + if !ok { + for _, err := range errs { + if err != nil { + // Not all disks are ready, do nothing for now. + dangling = false + return + } + } + + // If no errors, queue it for healing. + entry, _ = entries.firstFound() + } - defer func() { + if f.dataUsageCrawlDebug { + logger.Info(color.Green("healObjects:")+" resolved to: %v, dir: %v", entry.name, entry.isDir()) + } + if entry.isDir() { + return + } + dangling = false + // We got an entry which we should be able to heal. + fiv, err := entry.fileInfoVersions(bucket) + if err != nil { + err := bgSeq.queueHealTask(healSource{ + bucket: bucket, + object: entry.name, + versionID: "", + }, madmin.HealItemObject) + logger.LogIf(ctx, err) + foundObjs = foundObjs || err == nil + return + } + for _, ver := range fiv.Versions { + // Sleep and reset. + sleepDuration(time.Since(t), f.dataUsageCrawlMult) t = UTCNow() - }() - return bgSeq.queueHealTask(healSource{ - bucket: bucket, - object: object, - versionID: versionID, - }, madmin.HealItemObject) - }) + err := bgSeq.queueHealTask(healSource{ + bucket: bucket, + object: fiv.Name, + versionID: ver.VersionID, + }, madmin.HealItemObject) + logger.LogIf(ctx, err) + foundObjs = foundObjs || err == nil + } + }, + // Too many disks failed. + finished: func(errs []error) { + if f.dataUsageCrawlDebug { + logger.Info(color.Green("healObjects:")+" too many errors: %v", errs) + } + dangling = false + cancel() + }, + }) - sleepDuration(time.Since(t), f.dataUsageCrawlMult) + if f.dataUsageCrawlDebug && err != nil && err != errFileNotFound { + logger.Info(color.Green("healObjects:")+" checking returned value %v (%T)", err, err) + } - if f.dataUsageCrawlDebug && err != nil { - logger.Info(color.Green("healObjects:")+" checking returned value %v", err) + // If we found one or more disks with this folder, delete it. + if err == nil && dangling { + if f.dataUsageCrawlDebug { + logger.Info(color.Green("healObjects:")+" deleting dangling directory %s", prefix) + } + // If we have quorum, found directories, but no objects, issue heal to delete the dangling. + objAPI.HealObjects(ctx, bucket, prefix, madmin.HealOpts{Recursive: true, Remove: true}, + func(bucket, object, versionID string) error { + // Wait for each heal as per crawler frequency. + sleepDuration(time.Since(t), f.dataUsageCrawlMult) + + defer func() { + t = UTCNow() + }() + return bgSeq.queueHealTask(healSource{ + bucket: bucket, + object: object, + versionID: versionID, + }, madmin.HealItemObject) + }) } + sleepDuration(time.Since(t), f.dataUsageCrawlMult) + // Add unless healing returned an error. - if err == nil { + if foundObjs { this := cachedFolder{name: k, parent: &thisHash, objectHealProbDiv: folder.objectHealProbDiv} cache.addChild(hashPath(k)) if final { diff --git a/cmd/data-usage-cache.go b/cmd/data-usage-cache.go index 15ec30b93..9467bca50 100644 --- a/cmd/data-usage-cache.go +++ b/cmd/data-usage-cache.go @@ -56,6 +56,7 @@ type dataUsageEntry struct { // dataUsageCache contains a cache of data usage entries. type dataUsageCache struct { Info dataUsageCacheInfo + Disks []string Cache map[string]dataUsageEntry } diff --git a/cmd/data-usage-cache_gen.go b/cmd/data-usage-cache_gen.go index be5cdd9cc..32e3a53e9 100644 --- a/cmd/data-usage-cache_gen.go +++ b/cmd/data-usage-cache_gen.go @@ -30,35 +30,54 @@ func (z *dataUsageCache) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Info") return } - case "Cache": + case "Disks": var zb0002 uint32 - zb0002, err = dc.ReadMapHeader() + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Disks") + return + } + if cap(z.Disks) >= int(zb0002) { + z.Disks = (z.Disks)[:zb0002] + } else { + z.Disks = make([]string, zb0002) + } + for za0001 := range z.Disks { + z.Disks[za0001], err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Disks", za0001) + return + } + } + case "Cache": + var zb0003 uint32 + zb0003, err = dc.ReadMapHeader() if err != nil { err = msgp.WrapError(err, "Cache") return } if z.Cache == nil { - z.Cache = make(map[string]dataUsageEntry, zb0002) + z.Cache = make(map[string]dataUsageEntry, zb0003) } else if len(z.Cache) > 0 { for key := range z.Cache { delete(z.Cache, key) } } - for zb0002 > 0 { - zb0002-- - var za0001 string - var za0002 dataUsageEntry - za0001, err = dc.ReadString() + for zb0003 > 0 { + zb0003-- + var za0002 string + var za0003 dataUsageEntry + za0002, err = dc.ReadString() if err != nil { err = msgp.WrapError(err, "Cache") return } - err = za0002.DecodeMsg(dc) + err = za0003.DecodeMsg(dc) if err != nil { - err = msgp.WrapError(err, "Cache", za0001) + err = msgp.WrapError(err, "Cache", za0002) return } - z.Cache[za0001] = za0002 + z.Cache[za0002] = za0003 } default: err = dc.Skip() @@ -73,9 +92,9 @@ func (z *dataUsageCache) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *dataUsageCache) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 2 + // map header, size 3 // write "Info" - err = en.Append(0x82, 0xa4, 0x49, 0x6e, 0x66, 0x6f) + err = en.Append(0x83, 0xa4, 0x49, 0x6e, 0x66, 0x6f) if err != nil { return } @@ -84,6 +103,23 @@ func (z *dataUsageCache) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Info") return } + // write "Disks" + err = en.Append(0xa5, 0x44, 0x69, 0x73, 0x6b, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Disks))) + if err != nil { + err = msgp.WrapError(err, "Disks") + return + } + for za0001 := range z.Disks { + err = en.WriteString(z.Disks[za0001]) + if err != nil { + err = msgp.WrapError(err, "Disks", za0001) + return + } + } // write "Cache" err = en.Append(0xa5, 0x43, 0x61, 0x63, 0x68, 0x65) if err != nil { @@ -94,15 +130,15 @@ func (z *dataUsageCache) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Cache") return } - for za0001, za0002 := range z.Cache { - err = en.WriteString(za0001) + for za0002, za0003 := range z.Cache { + err = en.WriteString(za0002) if err != nil { err = msgp.WrapError(err, "Cache") return } - err = za0002.EncodeMsg(en) + err = za0003.EncodeMsg(en) if err != nil { - err = msgp.WrapError(err, "Cache", za0001) + err = msgp.WrapError(err, "Cache", za0002) return } } @@ -112,22 +148,28 @@ func (z *dataUsageCache) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *dataUsageCache) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 2 + // map header, size 3 // string "Info" - o = append(o, 0x82, 0xa4, 0x49, 0x6e, 0x66, 0x6f) + o = append(o, 0x83, 0xa4, 0x49, 0x6e, 0x66, 0x6f) o, err = z.Info.MarshalMsg(o) if err != nil { err = msgp.WrapError(err, "Info") return } + // string "Disks" + o = append(o, 0xa5, 0x44, 0x69, 0x73, 0x6b, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.Disks))) + for za0001 := range z.Disks { + o = msgp.AppendString(o, z.Disks[za0001]) + } // string "Cache" o = append(o, 0xa5, 0x43, 0x61, 0x63, 0x68, 0x65) o = msgp.AppendMapHeader(o, uint32(len(z.Cache))) - for za0001, za0002 := range z.Cache { - o = msgp.AppendString(o, za0001) - o, err = za0002.MarshalMsg(o) + for za0002, za0003 := range z.Cache { + o = msgp.AppendString(o, za0002) + o, err = za0003.MarshalMsg(o) if err != nil { - err = msgp.WrapError(err, "Cache", za0001) + err = msgp.WrapError(err, "Cache", za0002) return } } @@ -158,35 +200,54 @@ func (z *dataUsageCache) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Info") return } - case "Cache": + case "Disks": var zb0002 uint32 - zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Disks") + return + } + if cap(z.Disks) >= int(zb0002) { + z.Disks = (z.Disks)[:zb0002] + } else { + z.Disks = make([]string, zb0002) + } + for za0001 := range z.Disks { + z.Disks[za0001], bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Disks", za0001) + return + } + } + case "Cache": + var zb0003 uint32 + zb0003, bts, err = msgp.ReadMapHeaderBytes(bts) if err != nil { err = msgp.WrapError(err, "Cache") return } if z.Cache == nil { - z.Cache = make(map[string]dataUsageEntry, zb0002) + z.Cache = make(map[string]dataUsageEntry, zb0003) } else if len(z.Cache) > 0 { for key := range z.Cache { delete(z.Cache, key) } } - for zb0002 > 0 { - var za0001 string - var za0002 dataUsageEntry - zb0002-- - za0001, bts, err = msgp.ReadStringBytes(bts) + for zb0003 > 0 { + var za0002 string + var za0003 dataUsageEntry + zb0003-- + za0002, bts, err = msgp.ReadStringBytes(bts) if err != nil { err = msgp.WrapError(err, "Cache") return } - bts, err = za0002.UnmarshalMsg(bts) + bts, err = za0003.UnmarshalMsg(bts) if err != nil { - err = msgp.WrapError(err, "Cache", za0001) + err = msgp.WrapError(err, "Cache", za0002) return } - z.Cache[za0001] = za0002 + z.Cache[za0002] = za0003 } default: bts, err = msgp.Skip(bts) @@ -202,11 +263,15 @@ func (z *dataUsageCache) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *dataUsageCache) Msgsize() (s int) { - s = 1 + 5 + z.Info.Msgsize() + 6 + msgp.MapHeaderSize + s = 1 + 5 + z.Info.Msgsize() + 6 + msgp.ArrayHeaderSize + for za0001 := range z.Disks { + s += msgp.StringPrefixSize + len(z.Disks[za0001]) + } + s += 6 + msgp.MapHeaderSize if z.Cache != nil { - for za0001, za0002 := range z.Cache { - _ = za0002 - s += msgp.StringPrefixSize + len(za0001) + za0002.Msgsize() + for za0002, za0003 := range z.Cache { + _ = za0003 + s += msgp.StringPrefixSize + len(za0002) + za0003.Msgsize() } } return diff --git a/cmd/erasure-server-sets.go b/cmd/erasure-server-sets.go index 5a561e483..2299a5597 100644 --- a/cmd/erasure-server-sets.go +++ b/cmd/erasure-server-sets.go @@ -92,6 +92,26 @@ func (z *erasureServerSets) NewNSLock(bucket string, objects ...string) RWLocker return z.serverSets[0].NewNSLock(bucket, objects...) } +// GetDisksID will return disks by their ID. +func (z *erasureServerSets) GetDisksID(ids ...string) []StorageAPI { + idMap := make(map[string]struct{}) + for _, id := range ids { + idMap[id] = struct{}{} + } + res := make([]StorageAPI, 0, len(idMap)) + for _, ss := range z.serverSets { + for _, disks := range ss.erasureDisks { + for _, disk := range disks { + id, _ := disk.GetDiskID() + if _, ok := idMap[id]; ok { + res = append(res, disk) + } + } + } + } + return res +} + func (z *erasureServerSets) GetAllLockers() []dsync.NetLocker { return z.serverSets[0].GetAllLockers() } diff --git a/cmd/erasure.go b/cmd/erasure.go index 9c081a87d..ea5d9a687 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -252,6 +252,14 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc return nil } + // Collect disks for healing. + allDisks := er.getDisks() + allDiskIDs := make([]string, len(allDisks)) + for i, disk := range allDisks { + id, _ := disk.GetDiskID() + allDiskIDs[i] = id + } + // Load bucket totals oldCache := dataUsageCache{} if err := oldCache.load(ctx, er, dataUsageCacheName); err != nil { @@ -351,6 +359,7 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc cache.Info.Name = bucket.Name } cache.Info.BloomFilter = bloom + cache.Disks = allDiskIDs if cache.Info.Name != bucket.Name { logger.LogIf(ctx, fmt.Errorf("cache name mismatch: %s != %s", cache.Info.Name, bucket.Name)) cache.Info = dataUsageCacheInfo{ diff --git a/cmd/metacache-entries.go b/cmd/metacache-entries.go index 2481e8716..1cffd218f 100644 --- a/cmd/metacache-entries.go +++ b/cmd/metacache-entries.go @@ -246,6 +246,19 @@ func (m metaCacheEntries) resolve(r *metadataResolutionParams) (selected *metaCa return selected, selected != nil } +// firstFound returns the first found and the number of set entries. +func (m metaCacheEntries) firstFound() (first *metaCacheEntry, n int) { + for _, entry := range m { + if entry.name != "" { + n++ + if first == nil { + first = &entry + } + } + } + return first, n +} + // names will return all names in order. // Since this allocates it should not be used in critical functions. func (m metaCacheEntries) names() []string { diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index d3822d2a1..8ee68bd4c 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -627,29 +627,6 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr disks = disks[:askDisks] } - var readers = make([]*metacacheReader, askDisks) - for i := range disks { - r, w := io.Pipe() - d := disks[i] - readers[i], err = newMetacacheReader(r) - if err != nil { - cancel() - return entries, err - } - // Send request to each disk. - go func() { - err := d.WalkDir(ctx, WalkDirOptions{ - Bucket: o.Bucket, - BaseDir: o.BaseDir, - Recursive: o.Recursive || o.Separator != SlashSeparator, - FilterPrefix: o.FilterPrefix}, w) - w.CloseWithError(err) - if err != io.EOF { - logger.LogIf(ctx, err) - } - }() - } - // Create output for our results. cacheCh := make(chan metaCacheEntry, metacacheBlockSize) @@ -746,92 +723,38 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr bucket: o.Bucket, } - topEntries := make(metaCacheEntries, len(readers)) - for { - // Get the top entry from each - var current metaCacheEntry - var atEOF, agree int - for i, r := range readers { - topEntries[i].name = "" - entry, err := r.peek() - switch err { - case io.EOF: - atEOF++ - continue - case nil: - default: - closeChannels() - metaMu.Lock() - meta.status = scanStateError - meta.error = err.Error() - metaMu.Unlock() - return - } - // If no current, add it. - if current.name == "" { - topEntries[i] = entry - current = entry - agree++ - continue - } - // If exact match, we agree. - if current.matches(&entry, o.Bucket) { - topEntries[i] = entry - agree++ - continue - } - // If only the name matches we didn't agree, but add it for resolution. - if entry.name == current.name { - topEntries[i] = entry - continue + err := listPathRaw(ctx, listPathRawOptions{ + disks: disks, + bucket: o.Bucket, + path: o.BaseDir, + recursive: o.Recursive, + filterPrefix: o.FilterPrefix, + minDisks: askDisks - 1, + agreed: func(entry metaCacheEntry) { + cacheCh <- entry + filterCh <- entry + }, + partial: func(entries metaCacheEntries, nAgreed int, errs []error) { + // Results Disagree :-( + entry, ok := entries.resolve(&resolver) + if ok { + cacheCh <- *entry + filterCh <- *entry } - // We got different entries - if entry.name > current.name { - continue - } - // We got a new, better current. - // Clear existing entries. - for i := range topEntries[:i] { - topEntries[i] = metaCacheEntry{} - } - agree = 1 - current = entry - topEntries[i] = entry - } - // Break if all at EOF. - if atEOF == len(readers) { - break - } - if agree == len(readers) { - // Everybody agreed - for _, r := range readers { - r.skip(1) - } - cacheCh <- topEntries[0] - filterCh <- topEntries[0] - continue - } + }, + }) - // Results Disagree :-( - entry, ok := topEntries.resolve(&resolver) - if ok { - cacheCh <- *entry - filterCh <- *entry - } - // Skip the inputs we used. - for i, r := range readers { - if topEntries[i].name != "" { - r.skip(1) - } - } + metaMu.Lock() + if err != nil { + meta.status = scanStateError + meta.error = err.Error() } - // Save success - metaMu.Lock() if meta.error == "" { meta.status = scanStateSuccess meta.endedCycle = intDataUpdateTracker.current() } + meta, _ = o.updateMetacacheListing(meta, rpc) metaMu.Unlock() @@ -847,3 +770,184 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr return filteredResults() } + +type listPathRawOptions struct { + disks []StorageAPI + bucket, path string + recursive bool + filterPrefix string + // Minimum number of good disks to continue. + // An error will be returned if this many disks returned an error. + minDisks int + reportNotFound bool + + // Callbacks with results: + // If set to nil, it will not be called. + + // agreed is called if all disks agreed. + agreed func(entry metaCacheEntry) + + // partial will be returned when there is disagreement between disks. + // if disk did not return any result, but also haven't errored + // the entry will be empty and errs will + partial func(entries metaCacheEntries, nAgreed int, errs []error) + + // finished will be called when all streams have finished and + // more than one disk returned an error. + // Will not be called if everything operates as expected. + finished func(errs []error) +} + +// listPathRaw will list a path on the provided drives. +// See listPathRawOptions on how results are delivered. +// Directories are always returned. +// Cache will be bypassed. +// Context cancellation will be respected but may take a while to effectuate. +func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) { + disks := opts.disks + if len(disks) == 0 { + return fmt.Errorf("listPathRaw: 0 drives provided") + } + + // Disconnect from call above, but cancel on exit. + ctx, cancel := context.WithCancel(GlobalContext) + defer cancel() + + askDisks := len(disks) + var readers = make([]*metacacheReader, askDisks) + for i := range disks { + r, w := io.Pipe() + d := disks[i] + readers[i], err = newMetacacheReader(r) + if err != nil { + return err + } + // Send request to each disk. + go func() { + err := d.WalkDir(ctx, WalkDirOptions{ + Bucket: opts.bucket, + BaseDir: opts.path, + Recursive: opts.recursive, + ReportNotFound: opts.reportNotFound, + FilterPrefix: opts.filterPrefix}, w) + w.CloseWithError(err) + if err != io.EOF { + logger.LogIf(ctx, err) + } + }() + } + + topEntries := make(metaCacheEntries, len(readers)) + errs := make([]error, len(readers)) + for { + // Get the top entry from each + var current metaCacheEntry + var atEOF, fnf, hasErr, agree int + for i := range topEntries { + topEntries[i] = metaCacheEntry{} + } + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + for i, r := range readers { + if errs[i] != nil { + hasErr++ + continue + } + entry, err := r.peek() + switch err { + case io.EOF: + atEOF++ + continue + case nil: + default: + if err.Error() == errFileNotFound.Error() { + atEOF++ + fnf++ + continue + } + + hasErr++ + errs[i] = err + continue + } + // If no current, add it. + if current.name == "" { + topEntries[i] = entry + current = entry + agree++ + continue + } + // If exact match, we agree. + if current.matches(&entry, opts.bucket) { + topEntries[i] = entry + agree++ + continue + } + // If only the name matches we didn't agree, but add it for resolution. + if entry.name == current.name { + topEntries[i] = entry + continue + } + // We got different entries + if entry.name > current.name { + continue + } + // We got a new, better current. + // Clear existing entries. + for i := range topEntries[:i] { + topEntries[i] = metaCacheEntry{} + } + agree = 1 + current = entry + topEntries[i] = entry + } + + // Stop if we exceed number of bad disks + if hasErr > len(disks)-opts.minDisks && hasErr > 0 { + if opts.finished != nil { + opts.finished(errs) + } + var combinedErr []string + for i, err := range errs { + if err != nil { + combinedErr = append(combinedErr, fmt.Sprintf("disk %d returned: %s", i, err)) + } + } + return errors.New(strings.Join(combinedErr, ", ")) + } + + // Break if all at EOF or error. + if atEOF+hasErr == len(readers) { + if hasErr > 0 && opts.finished != nil { + opts.finished(errs) + } + break + } + if fnf == len(readers) { + return errFileNotFound + } + if agree == len(readers) { + // Everybody agreed + for _, r := range readers { + r.skip(1) + } + if opts.agreed != nil { + opts.agreed(current) + } + continue + } + if opts.partial != nil { + opts.partial(topEntries, agree, errs) + } + // Skip the inputs we used. + for i, r := range readers { + if topEntries[i].name != "" { + r.skip(1) + } + } + } + return nil +} diff --git a/cmd/metacache-walk.go b/cmd/metacache-walk.go index 62c7ab99e..3273c68ca 100644 --- a/cmd/metacache-walk.go +++ b/cmd/metacache-walk.go @@ -43,6 +43,9 @@ type WalkDirOptions struct { // Do a full recursive scan. Recursive bool + // ReportNotFound will return errFileNotFound if all disks reports the BaseDir cannot be found. + ReportNotFound bool + // FilterPrefix will only return results with given prefix within folder. // Should never contain a slash. FilterPrefix string @@ -98,6 +101,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ if err != errVolumeNotFound && err != errFileNotFound { logger.LogIf(ctx, err) } + if opts.ReportNotFound && err == errFileNotFound && current == opts.BaseDir { + return errFileNotFound + } // Forward some errors? return nil }