crawler: Missing folder heal check per set (#10876)

master
Klaus Post 4 years ago committed by GitHub
parent 790833f3b2
commit e6ea5c2703
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 128
      cmd/data-crawler.go
  2. 1
      cmd/data-usage-cache.go
  3. 141
      cmd/data-usage-cache_gen.go
  4. 20
      cmd/erasure-server-sets.go
  5. 9
      cmd/erasure.go
  6. 13
      cmd/metacache-entries.go
  7. 230
      cmd/metacache-set.go
  8. 6
      cmd/metacache-walk.go

@ -148,6 +148,7 @@ type folderScanner struct {
newFolders []cachedFolder newFolders []cachedFolder
existingFolders []cachedFolder existingFolders []cachedFolder
disks []StorageAPI
} }
// crawlDataFolder will crawl the basepath+cache.Info.Name and return an updated cache. // crawlDataFolder will crawl the basepath+cache.Info.Name and return an updated cache.
@ -176,7 +177,6 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
delayMult = dataCrawlSleepDefMult delayMult = dataCrawlSleepDefMult
} }
s := folderScanner{ s := folderScanner{
root: basePath, root: basePath,
getSize: getSize, getSize: getSize,
@ -190,6 +190,18 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
healObjectSelect: 0, healObjectSelect: 0,
} }
// Add disks for set healing.
if len(cache.Disks) > 0 {
objAPI, ok := newObjectLayerFn().(*erasureServerSets)
if ok {
s.disks = objAPI.GetDisksID(cache.Disks...)
if len(s.disks) != len(cache.Disks) {
logger.Info(logPrefix+"Missing disks, want %d, found %d. Cannot heal."+logSuffix, len(cache.Disks), len(s.disks))
s.disks = s.disks[:0]
}
}
}
// Enable healing in XL mode. // Enable healing in XL mode.
if globalIsErasure { if globalIsErasure {
// Include a clean folder one in n cycles. // Include a clean folder one in n cycles.
@ -459,8 +471,8 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
continue continue
} }
objAPI := newObjectLayerFn() objAPI, ok := newObjectLayerFn().(*erasureServerSets)
if objAPI == nil { if !ok || len(f.disks) == 0 {
continue continue
} }
@ -480,6 +492,13 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
// We therefore perform a heal check. // We therefore perform a heal check.
// If that doesn't bring it back we remove the folder and assume it was deleted. // If that doesn't bring it back we remove the folder and assume it was deleted.
// This means that the next run will not look for it. // This means that the next run will not look for it.
// How to resolve results.
resolver := metadataResolutionParams{
dirQuorum: getReadQuorum(len(f.disks)),
objQuorum: getReadQuorum(len(f.disks)),
bucket: "",
}
for k := range existing { for k := range existing {
bucket, prefix := path2BucketObject(k) bucket, prefix := path2BucketObject(k)
if f.dataUsageCrawlDebug { if f.dataUsageCrawlDebug {
@ -488,8 +507,102 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
// Dynamic time delay. // Dynamic time delay.
t := UTCNow() t := UTCNow()
resolver.bucket = bucket
err = objAPI.HealObjects(ctx, bucket, prefix, madmin.HealOpts{Recursive: true, Remove: healDeleteDangling}, foundObjs := false
dangling := true
ctx, cancel := context.WithCancel(ctx)
err := listPathRaw(ctx, listPathRawOptions{
disks: f.disks,
bucket: bucket,
path: prefix,
recursive: true,
reportNotFound: true,
minDisks: len(f.disks), // We want full consistency.
// Weird, maybe transient error.
agreed: func(entry metaCacheEntry) {
if f.dataUsageCrawlDebug {
logger.Info(color.Green("healObjects:")+" got agreement: %v", entry.name)
}
if entry.isObject() {
dangling = false
}
},
// Some disks have data for this.
partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
if f.dataUsageCrawlDebug {
logger.Info(color.Green("healObjects:")+" got partial, %d agreed, errs: %v", nAgreed, errs)
}
// Sleep and reset.
sleepDuration(time.Since(t), f.dataUsageCrawlMult)
t = UTCNow()
entry, ok := entries.resolve(&resolver)
if !ok {
for _, err := range errs {
if err != nil {
// Not all disks are ready, do nothing for now.
dangling = false
return
}
}
// If no errors, queue it for healing.
entry, _ = entries.firstFound()
}
if f.dataUsageCrawlDebug {
logger.Info(color.Green("healObjects:")+" resolved to: %v, dir: %v", entry.name, entry.isDir())
}
if entry.isDir() {
return
}
dangling = false
// We got an entry which we should be able to heal.
fiv, err := entry.fileInfoVersions(bucket)
if err != nil {
err := bgSeq.queueHealTask(healSource{
bucket: bucket,
object: entry.name,
versionID: "",
}, madmin.HealItemObject)
logger.LogIf(ctx, err)
foundObjs = foundObjs || err == nil
return
}
for _, ver := range fiv.Versions {
// Sleep and reset.
sleepDuration(time.Since(t), f.dataUsageCrawlMult)
t = UTCNow()
err := bgSeq.queueHealTask(healSource{
bucket: bucket,
object: fiv.Name,
versionID: ver.VersionID,
}, madmin.HealItemObject)
logger.LogIf(ctx, err)
foundObjs = foundObjs || err == nil
}
},
// Too many disks failed.
finished: func(errs []error) {
if f.dataUsageCrawlDebug {
logger.Info(color.Green("healObjects:")+" too many errors: %v", errs)
}
dangling = false
cancel()
},
})
if f.dataUsageCrawlDebug && err != nil && err != errFileNotFound {
logger.Info(color.Green("healObjects:")+" checking returned value %v (%T)", err, err)
}
// If we found one or more disks with this folder, delete it.
if err == nil && dangling {
if f.dataUsageCrawlDebug {
logger.Info(color.Green("healObjects:")+" deleting dangling directory %s", prefix)
}
// If we have quorum, found directories, but no objects, issue heal to delete the dangling.
objAPI.HealObjects(ctx, bucket, prefix, madmin.HealOpts{Recursive: true, Remove: true},
func(bucket, object, versionID string) error { func(bucket, object, versionID string) error {
// Wait for each heal as per crawler frequency. // Wait for each heal as per crawler frequency.
sleepDuration(time.Since(t), f.dataUsageCrawlMult) sleepDuration(time.Since(t), f.dataUsageCrawlMult)
@ -503,15 +616,12 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
versionID: versionID, versionID: versionID,
}, madmin.HealItemObject) }, madmin.HealItemObject)
}) })
}
sleepDuration(time.Since(t), f.dataUsageCrawlMult) sleepDuration(time.Since(t), f.dataUsageCrawlMult)
if f.dataUsageCrawlDebug && err != nil {
logger.Info(color.Green("healObjects:")+" checking returned value %v", err)
}
// Add unless healing returned an error. // Add unless healing returned an error.
if err == nil { if foundObjs {
this := cachedFolder{name: k, parent: &thisHash, objectHealProbDiv: folder.objectHealProbDiv} this := cachedFolder{name: k, parent: &thisHash, objectHealProbDiv: folder.objectHealProbDiv}
cache.addChild(hashPath(k)) cache.addChild(hashPath(k))
if final { if final {

@ -56,6 +56,7 @@ type dataUsageEntry struct {
// dataUsageCache contains a cache of data usage entries. // dataUsageCache contains a cache of data usage entries.
type dataUsageCache struct { type dataUsageCache struct {
Info dataUsageCacheInfo Info dataUsageCacheInfo
Disks []string
Cache map[string]dataUsageEntry Cache map[string]dataUsageEntry
} }

@ -30,35 +30,54 @@ func (z *dataUsageCache) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "Info") err = msgp.WrapError(err, "Info")
return return
} }
case "Cache": case "Disks":
var zb0002 uint32 var zb0002 uint32
zb0002, err = dc.ReadMapHeader() zb0002, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err, "Disks")
return
}
if cap(z.Disks) >= int(zb0002) {
z.Disks = (z.Disks)[:zb0002]
} else {
z.Disks = make([]string, zb0002)
}
for za0001 := range z.Disks {
z.Disks[za0001], err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Disks", za0001)
return
}
}
case "Cache":
var zb0003 uint32
zb0003, err = dc.ReadMapHeader()
if err != nil { if err != nil {
err = msgp.WrapError(err, "Cache") err = msgp.WrapError(err, "Cache")
return return
} }
if z.Cache == nil { if z.Cache == nil {
z.Cache = make(map[string]dataUsageEntry, zb0002) z.Cache = make(map[string]dataUsageEntry, zb0003)
} else if len(z.Cache) > 0 { } else if len(z.Cache) > 0 {
for key := range z.Cache { for key := range z.Cache {
delete(z.Cache, key) delete(z.Cache, key)
} }
} }
for zb0002 > 0 { for zb0003 > 0 {
zb0002-- zb0003--
var za0001 string var za0002 string
var za0002 dataUsageEntry var za0003 dataUsageEntry
za0001, err = dc.ReadString() za0002, err = dc.ReadString()
if err != nil { if err != nil {
err = msgp.WrapError(err, "Cache") err = msgp.WrapError(err, "Cache")
return return
} }
err = za0002.DecodeMsg(dc) err = za0003.DecodeMsg(dc)
if err != nil { if err != nil {
err = msgp.WrapError(err, "Cache", za0001) err = msgp.WrapError(err, "Cache", za0002)
return return
} }
z.Cache[za0001] = za0002 z.Cache[za0002] = za0003
} }
default: default:
err = dc.Skip() err = dc.Skip()
@ -73,9 +92,9 @@ func (z *dataUsageCache) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable // EncodeMsg implements msgp.Encodable
func (z *dataUsageCache) EncodeMsg(en *msgp.Writer) (err error) { func (z *dataUsageCache) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 2 // map header, size 3
// write "Info" // write "Info"
err = en.Append(0x82, 0xa4, 0x49, 0x6e, 0x66, 0x6f) err = en.Append(0x83, 0xa4, 0x49, 0x6e, 0x66, 0x6f)
if err != nil { if err != nil {
return return
} }
@ -84,6 +103,23 @@ func (z *dataUsageCache) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "Info") err = msgp.WrapError(err, "Info")
return return
} }
// write "Disks"
err = en.Append(0xa5, 0x44, 0x69, 0x73, 0x6b, 0x73)
if err != nil {
return
}
err = en.WriteArrayHeader(uint32(len(z.Disks)))
if err != nil {
err = msgp.WrapError(err, "Disks")
return
}
for za0001 := range z.Disks {
err = en.WriteString(z.Disks[za0001])
if err != nil {
err = msgp.WrapError(err, "Disks", za0001)
return
}
}
// write "Cache" // write "Cache"
err = en.Append(0xa5, 0x43, 0x61, 0x63, 0x68, 0x65) err = en.Append(0xa5, 0x43, 0x61, 0x63, 0x68, 0x65)
if err != nil { if err != nil {
@ -94,15 +130,15 @@ func (z *dataUsageCache) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "Cache") err = msgp.WrapError(err, "Cache")
return return
} }
for za0001, za0002 := range z.Cache { for za0002, za0003 := range z.Cache {
err = en.WriteString(za0001) err = en.WriteString(za0002)
if err != nil { if err != nil {
err = msgp.WrapError(err, "Cache") err = msgp.WrapError(err, "Cache")
return return
} }
err = za0002.EncodeMsg(en) err = za0003.EncodeMsg(en)
if err != nil { if err != nil {
err = msgp.WrapError(err, "Cache", za0001) err = msgp.WrapError(err, "Cache", za0002)
return return
} }
} }
@ -112,22 +148,28 @@ func (z *dataUsageCache) EncodeMsg(en *msgp.Writer) (err error) {
// MarshalMsg implements msgp.Marshaler // MarshalMsg implements msgp.Marshaler
func (z *dataUsageCache) MarshalMsg(b []byte) (o []byte, err error) { func (z *dataUsageCache) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize()) o = msgp.Require(b, z.Msgsize())
// map header, size 2 // map header, size 3
// string "Info" // string "Info"
o = append(o, 0x82, 0xa4, 0x49, 0x6e, 0x66, 0x6f) o = append(o, 0x83, 0xa4, 0x49, 0x6e, 0x66, 0x6f)
o, err = z.Info.MarshalMsg(o) o, err = z.Info.MarshalMsg(o)
if err != nil { if err != nil {
err = msgp.WrapError(err, "Info") err = msgp.WrapError(err, "Info")
return return
} }
// string "Disks"
o = append(o, 0xa5, 0x44, 0x69, 0x73, 0x6b, 0x73)
o = msgp.AppendArrayHeader(o, uint32(len(z.Disks)))
for za0001 := range z.Disks {
o = msgp.AppendString(o, z.Disks[za0001])
}
// string "Cache" // string "Cache"
o = append(o, 0xa5, 0x43, 0x61, 0x63, 0x68, 0x65) o = append(o, 0xa5, 0x43, 0x61, 0x63, 0x68, 0x65)
o = msgp.AppendMapHeader(o, uint32(len(z.Cache))) o = msgp.AppendMapHeader(o, uint32(len(z.Cache)))
for za0001, za0002 := range z.Cache { for za0002, za0003 := range z.Cache {
o = msgp.AppendString(o, za0001) o = msgp.AppendString(o, za0002)
o, err = za0002.MarshalMsg(o) o, err = za0003.MarshalMsg(o)
if err != nil { if err != nil {
err = msgp.WrapError(err, "Cache", za0001) err = msgp.WrapError(err, "Cache", za0002)
return return
} }
} }
@ -158,35 +200,54 @@ func (z *dataUsageCache) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "Info") err = msgp.WrapError(err, "Info")
return return
} }
case "Cache": case "Disks":
var zb0002 uint32 var zb0002 uint32
zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Disks")
return
}
if cap(z.Disks) >= int(zb0002) {
z.Disks = (z.Disks)[:zb0002]
} else {
z.Disks = make([]string, zb0002)
}
for za0001 := range z.Disks {
z.Disks[za0001], bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Disks", za0001)
return
}
}
case "Cache":
var zb0003 uint32
zb0003, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil { if err != nil {
err = msgp.WrapError(err, "Cache") err = msgp.WrapError(err, "Cache")
return return
} }
if z.Cache == nil { if z.Cache == nil {
z.Cache = make(map[string]dataUsageEntry, zb0002) z.Cache = make(map[string]dataUsageEntry, zb0003)
} else if len(z.Cache) > 0 { } else if len(z.Cache) > 0 {
for key := range z.Cache { for key := range z.Cache {
delete(z.Cache, key) delete(z.Cache, key)
} }
} }
for zb0002 > 0 { for zb0003 > 0 {
var za0001 string var za0002 string
var za0002 dataUsageEntry var za0003 dataUsageEntry
zb0002-- zb0003--
za0001, bts, err = msgp.ReadStringBytes(bts) za0002, bts, err = msgp.ReadStringBytes(bts)
if err != nil { if err != nil {
err = msgp.WrapError(err, "Cache") err = msgp.WrapError(err, "Cache")
return return
} }
bts, err = za0002.UnmarshalMsg(bts) bts, err = za0003.UnmarshalMsg(bts)
if err != nil { if err != nil {
err = msgp.WrapError(err, "Cache", za0001) err = msgp.WrapError(err, "Cache", za0002)
return return
} }
z.Cache[za0001] = za0002 z.Cache[za0002] = za0003
} }
default: default:
bts, err = msgp.Skip(bts) bts, err = msgp.Skip(bts)
@ -202,11 +263,15 @@ func (z *dataUsageCache) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *dataUsageCache) Msgsize() (s int) { func (z *dataUsageCache) Msgsize() (s int) {
s = 1 + 5 + z.Info.Msgsize() + 6 + msgp.MapHeaderSize s = 1 + 5 + z.Info.Msgsize() + 6 + msgp.ArrayHeaderSize
for za0001 := range z.Disks {
s += msgp.StringPrefixSize + len(z.Disks[za0001])
}
s += 6 + msgp.MapHeaderSize
if z.Cache != nil { if z.Cache != nil {
for za0001, za0002 := range z.Cache { for za0002, za0003 := range z.Cache {
_ = za0002 _ = za0003
s += msgp.StringPrefixSize + len(za0001) + za0002.Msgsize() s += msgp.StringPrefixSize + len(za0002) + za0003.Msgsize()
} }
} }
return return

@ -92,6 +92,26 @@ func (z *erasureServerSets) NewNSLock(bucket string, objects ...string) RWLocker
return z.serverSets[0].NewNSLock(bucket, objects...) return z.serverSets[0].NewNSLock(bucket, objects...)
} }
// GetDisksID will return disks by their ID.
func (z *erasureServerSets) GetDisksID(ids ...string) []StorageAPI {
idMap := make(map[string]struct{})
for _, id := range ids {
idMap[id] = struct{}{}
}
res := make([]StorageAPI, 0, len(idMap))
for _, ss := range z.serverSets {
for _, disks := range ss.erasureDisks {
for _, disk := range disks {
id, _ := disk.GetDiskID()
if _, ok := idMap[id]; ok {
res = append(res, disk)
}
}
}
}
return res
}
func (z *erasureServerSets) GetAllLockers() []dsync.NetLocker { func (z *erasureServerSets) GetAllLockers() []dsync.NetLocker {
return z.serverSets[0].GetAllLockers() return z.serverSets[0].GetAllLockers()
} }

@ -252,6 +252,14 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc
return nil return nil
} }
// Collect disks for healing.
allDisks := er.getDisks()
allDiskIDs := make([]string, len(allDisks))
for i, disk := range allDisks {
id, _ := disk.GetDiskID()
allDiskIDs[i] = id
}
// Load bucket totals // Load bucket totals
oldCache := dataUsageCache{} oldCache := dataUsageCache{}
if err := oldCache.load(ctx, er, dataUsageCacheName); err != nil { if err := oldCache.load(ctx, er, dataUsageCacheName); err != nil {
@ -351,6 +359,7 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc
cache.Info.Name = bucket.Name cache.Info.Name = bucket.Name
} }
cache.Info.BloomFilter = bloom cache.Info.BloomFilter = bloom
cache.Disks = allDiskIDs
if cache.Info.Name != bucket.Name { if cache.Info.Name != bucket.Name {
logger.LogIf(ctx, fmt.Errorf("cache name mismatch: %s != %s", cache.Info.Name, bucket.Name)) logger.LogIf(ctx, fmt.Errorf("cache name mismatch: %s != %s", cache.Info.Name, bucket.Name))
cache.Info = dataUsageCacheInfo{ cache.Info = dataUsageCacheInfo{

@ -246,6 +246,19 @@ func (m metaCacheEntries) resolve(r *metadataResolutionParams) (selected *metaCa
return selected, selected != nil return selected, selected != nil
} }
// firstFound returns the first found and the number of set entries.
func (m metaCacheEntries) firstFound() (first *metaCacheEntry, n int) {
for _, entry := range m {
if entry.name != "" {
n++
if first == nil {
first = &entry
}
}
}
return first, n
}
// names will return all names in order. // names will return all names in order.
// Since this allocates it should not be used in critical functions. // Since this allocates it should not be used in critical functions.
func (m metaCacheEntries) names() []string { func (m metaCacheEntries) names() []string {

@ -627,29 +627,6 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
disks = disks[:askDisks] disks = disks[:askDisks]
} }
var readers = make([]*metacacheReader, askDisks)
for i := range disks {
r, w := io.Pipe()
d := disks[i]
readers[i], err = newMetacacheReader(r)
if err != nil {
cancel()
return entries, err
}
// Send request to each disk.
go func() {
err := d.WalkDir(ctx, WalkDirOptions{
Bucket: o.Bucket,
BaseDir: o.BaseDir,
Recursive: o.Recursive || o.Separator != SlashSeparator,
FilterPrefix: o.FilterPrefix}, w)
w.CloseWithError(err)
if err != io.EOF {
logger.LogIf(ctx, err)
}
}()
}
// Create output for our results. // Create output for our results.
cacheCh := make(chan metaCacheEntry, metacacheBlockSize) cacheCh := make(chan metaCacheEntry, metacacheBlockSize)
@ -746,13 +723,139 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
bucket: o.Bucket, bucket: o.Bucket,
} }
err := listPathRaw(ctx, listPathRawOptions{
disks: disks,
bucket: o.Bucket,
path: o.BaseDir,
recursive: o.Recursive,
filterPrefix: o.FilterPrefix,
minDisks: askDisks - 1,
agreed: func(entry metaCacheEntry) {
cacheCh <- entry
filterCh <- entry
},
partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
// Results Disagree :-(
entry, ok := entries.resolve(&resolver)
if ok {
cacheCh <- *entry
filterCh <- *entry
}
},
})
metaMu.Lock()
if err != nil {
meta.status = scanStateError
meta.error = err.Error()
}
// Save success
if meta.error == "" {
meta.status = scanStateSuccess
meta.endedCycle = intDataUpdateTracker.current()
}
meta, _ = o.updateMetacacheListing(meta, rpc)
metaMu.Unlock()
closeChannels()
if err := bw.Close(); err != nil {
metaMu.Lock()
meta.error = err.Error()
meta.status = scanStateError
meta, err = o.updateMetacacheListing(meta, rpc)
metaMu.Unlock()
}
}()
return filteredResults()
}
type listPathRawOptions struct {
disks []StorageAPI
bucket, path string
recursive bool
filterPrefix string
// Minimum number of good disks to continue.
// An error will be returned if this many disks returned an error.
minDisks int
reportNotFound bool
// Callbacks with results:
// If set to nil, it will not be called.
// agreed is called if all disks agreed.
agreed func(entry metaCacheEntry)
// partial will be returned when there is disagreement between disks.
// if disk did not return any result, but also haven't errored
// the entry will be empty and errs will
partial func(entries metaCacheEntries, nAgreed int, errs []error)
// finished will be called when all streams have finished and
// more than one disk returned an error.
// Will not be called if everything operates as expected.
finished func(errs []error)
}
// listPathRaw will list a path on the provided drives.
// See listPathRawOptions on how results are delivered.
// Directories are always returned.
// Cache will be bypassed.
// Context cancellation will be respected but may take a while to effectuate.
func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
disks := opts.disks
if len(disks) == 0 {
return fmt.Errorf("listPathRaw: 0 drives provided")
}
// Disconnect from call above, but cancel on exit.
ctx, cancel := context.WithCancel(GlobalContext)
defer cancel()
askDisks := len(disks)
var readers = make([]*metacacheReader, askDisks)
for i := range disks {
r, w := io.Pipe()
d := disks[i]
readers[i], err = newMetacacheReader(r)
if err != nil {
return err
}
// Send request to each disk.
go func() {
err := d.WalkDir(ctx, WalkDirOptions{
Bucket: opts.bucket,
BaseDir: opts.path,
Recursive: opts.recursive,
ReportNotFound: opts.reportNotFound,
FilterPrefix: opts.filterPrefix}, w)
w.CloseWithError(err)
if err != io.EOF {
logger.LogIf(ctx, err)
}
}()
}
topEntries := make(metaCacheEntries, len(readers)) topEntries := make(metaCacheEntries, len(readers))
errs := make([]error, len(readers))
for { for {
// Get the top entry from each // Get the top entry from each
var current metaCacheEntry var current metaCacheEntry
var atEOF, agree int var atEOF, fnf, hasErr, agree int
for i := range topEntries {
topEntries[i] = metaCacheEntry{}
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
for i, r := range readers { for i, r := range readers {
topEntries[i].name = "" if errs[i] != nil {
hasErr++
continue
}
entry, err := r.peek() entry, err := r.peek()
switch err { switch err {
case io.EOF: case io.EOF:
@ -760,12 +863,15 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
continue continue
case nil: case nil:
default: default:
closeChannels() if err.Error() == errFileNotFound.Error() {
metaMu.Lock() atEOF++
meta.status = scanStateError fnf++
meta.error = err.Error() continue
metaMu.Unlock() }
return
hasErr++
errs[i] = err
continue
} }
// If no current, add it. // If no current, add it.
if current.name == "" { if current.name == "" {
@ -775,7 +881,7 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
continue continue
} }
// If exact match, we agree. // If exact match, we agree.
if current.matches(&entry, o.Bucket) { if current.matches(&entry, opts.bucket) {
topEntries[i] = entry topEntries[i] = entry
agree++ agree++
continue continue
@ -798,25 +904,43 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
current = entry current = entry
topEntries[i] = entry topEntries[i] = entry
} }
// Break if all at EOF.
if atEOF == len(readers) { // Stop if we exceed number of bad disks
if hasErr > len(disks)-opts.minDisks && hasErr > 0 {
if opts.finished != nil {
opts.finished(errs)
}
var combinedErr []string
for i, err := range errs {
if err != nil {
combinedErr = append(combinedErr, fmt.Sprintf("disk %d returned: %s", i, err))
}
}
return errors.New(strings.Join(combinedErr, ", "))
}
// Break if all at EOF or error.
if atEOF+hasErr == len(readers) {
if hasErr > 0 && opts.finished != nil {
opts.finished(errs)
}
break break
} }
if fnf == len(readers) {
return errFileNotFound
}
if agree == len(readers) { if agree == len(readers) {
// Everybody agreed // Everybody agreed
for _, r := range readers { for _, r := range readers {
r.skip(1) r.skip(1)
} }
cacheCh <- topEntries[0] if opts.agreed != nil {
filterCh <- topEntries[0] opts.agreed(current)
}
continue continue
} }
if opts.partial != nil {
// Results Disagree :-( opts.partial(topEntries, agree, errs)
entry, ok := topEntries.resolve(&resolver)
if ok {
cacheCh <- *entry
filterCh <- *entry
} }
// Skip the inputs we used. // Skip the inputs we used.
for i, r := range readers { for i, r := range readers {
@ -825,25 +949,5 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
} }
} }
} }
return nil
// Save success
metaMu.Lock()
if meta.error == "" {
meta.status = scanStateSuccess
meta.endedCycle = intDataUpdateTracker.current()
}
meta, _ = o.updateMetacacheListing(meta, rpc)
metaMu.Unlock()
closeChannels()
if err := bw.Close(); err != nil {
metaMu.Lock()
meta.error = err.Error()
meta.status = scanStateError
meta, err = o.updateMetacacheListing(meta, rpc)
metaMu.Unlock()
}
}()
return filteredResults()
} }

@ -43,6 +43,9 @@ type WalkDirOptions struct {
// Do a full recursive scan. // Do a full recursive scan.
Recursive bool Recursive bool
// ReportNotFound will return errFileNotFound if all disks reports the BaseDir cannot be found.
ReportNotFound bool
// FilterPrefix will only return results with given prefix within folder. // FilterPrefix will only return results with given prefix within folder.
// Should never contain a slash. // Should never contain a slash.
FilterPrefix string FilterPrefix string
@ -98,6 +101,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
if err != errVolumeNotFound && err != errFileNotFound { if err != errVolumeNotFound && err != errFileNotFound {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
} }
if opts.ReportNotFound && err == errFileNotFound && current == opts.BaseDir {
return errFileNotFound
}
// Forward some errors? // Forward some errors?
return nil return nil
} }

Loading…
Cancel
Save