diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index b32a322a1..f868a6902 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -662,6 +662,13 @@ func (h *healSequence) healSequenceStart(objAPI ObjectLayer) { } } +func (h *healSequence) logHeal(healType madmin.HealItemType) { + h.mutex.Lock() + h.scannedItemsMap[healType]++ + h.lastHealActivity = UTCNow() + h.mutex.Unlock() +} + func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItemType) error { globalHealConfigMu.Lock() opts := globalHealConfig diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index d117713ed..d98265da9 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -204,8 +204,8 @@ wait: } } - lbDisks := z.serverPools[i].sets[setIndex].getOnlineDisks() - if err := healErasureSet(ctx, setIndex, buckets, lbDisks); err != nil { + err := z.serverPools[i].sets[setIndex].healErasureSet(ctx, buckets) + if err != nil { logger.LogIf(ctx, err) continue } diff --git a/cmd/global-heal.go b/cmd/global-heal.go index bb62404b0..e27bc87d4 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -18,7 +18,7 @@ package cmd import ( "context" - "sync" + "errors" "time" "github.com/minio/minio/cmd/logger" @@ -120,18 +120,14 @@ func mustGetHealSequence(ctx context.Context) *healSequence { } // healErasureSet lists and heals all objects in a specific erasure set -func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, disks []StorageAPI) error { +func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketInfo) error { bgSeq := mustGetHealSequence(ctx) - buckets = append(buckets, BucketInfo{ Name: pathJoin(minioMetaBucket, minioConfigPrefix), }) // Try to pro-actively heal backend-encrypted file. - if err := bgSeq.queueHealTask(healSource{ - bucket: minioMetaBucket, - object: backendEncryptedFile, - }, madmin.HealItemMetadata); err != nil { + if _, err := er.HealObject(ctx, minioMetaBucket, backendEncryptedFile, "", madmin.HealOpts{}); err != nil { if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { logger.LogIf(ctx, err) } @@ -140,61 +136,60 @@ func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, dis // Heal all buckets with all objects for _, bucket := range buckets { // Heal current bucket - if err := bgSeq.queueHealTask(healSource{ - bucket: bucket.Name, - }, madmin.HealItemBucket); err != nil { + if _, err := er.HealBucket(ctx, bucket.Name, madmin.HealOpts{}); err != nil { if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { logger.LogIf(ctx, err) } } if serverDebugLog { - console.Debugf(color.Green("healDisk:")+" healing bucket %s content on erasure set %d\n", bucket.Name, setIndex+1) + console.Debugf(color.Green("healDisk:")+" healing bucket %s content on erasure set %d\n", bucket.Name, er.setNumber+1) } - var entryChs []FileInfoVersionsCh - var mu sync.Mutex - var wg sync.WaitGroup - for _, disk := range disks { - disk := disk - wg.Add(1) - go func() { - defer wg.Done() - entryCh, err := disk.WalkVersions(ctx, bucket.Name, "", "", true, ctx.Done()) - if err != nil { - // Disk walk returned error, ignore it. - return - } - mu.Lock() - entryChs = append(entryChs, FileInfoVersionsCh{ - Ch: entryCh, - }) - mu.Unlock() - }() + disks, _ := er.getOnlineDisksWithHealing() + if len(disks) == 0 { + return errors.New("healErasureSet: No non-healing disks found") } - wg.Wait() - - entriesValid := make([]bool, len(entryChs)) - entries := make([]FileInfoVersions, len(entryChs)) - - for { - entry, _, ok := lexicallySortedEntryVersions(entryChs, entries, entriesValid) - if !ok { - break + // Limit listing to 3 drives. + if len(disks) > 3 { + disks = disks[:3] + } + healEntry := func(entry metaCacheEntry) { + if entry.isDir() { + return } - - for _, version := range entry.Versions { - if err := bgSeq.queueHealTask(healSource{ - bucket: bucket.Name, - object: version.Name, - versionID: version.VersionID, - }, madmin.HealItemObject); err != nil { + fivs, err := entry.fileInfoVersions(bucket.Name) + if err != nil { + logger.LogIf(ctx, err) + return + } + waitForLowHTTPReq(globalHealConfig.IOCount, globalHealConfig.Sleep) + for _, version := range fivs.Versions { + if _, err := er.HealObject(ctx, bucket.Name, version.Name, version.VersionID, madmin.HealOpts{ScanMode: madmin.HealNormalScan, Remove: true}); err != nil { if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { logger.LogIf(ctx, err) } } + bgSeq.logHeal(madmin.HealItemObject) } } + err := listPathRaw(ctx, listPathRawOptions{ + disks: disks, + bucket: bucket.Name, + recursive: true, + forwardTo: "", //TODO(klauspost): Set this to last known offset when resuming. + minDisks: 1, + reportNotFound: false, + agreed: healEntry, + partial: func(entries metaCacheEntries, nAgreed int, errs []error) { + entry, _ := entries.firstFound() + if entry != nil && !entry.isDir() { + healEntry(*entry) + } + }, + finished: nil, + }) + logger.LogIf(ctx, err) } return nil diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index 5e5e7ec7f..82bb3220c 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -782,7 +782,13 @@ type listPathRawOptions struct { disks []StorageAPI bucket, path string recursive bool + + // Only return results with this prefix. filterPrefix string + + // Forward to this prefix before returning results. + forwardTo string + // Minimum number of good disks to continue. // An error will be returned if this many disks returned an error. minDisks int @@ -836,7 +842,9 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) { BaseDir: opts.path, Recursive: opts.recursive, ReportNotFound: opts.reportNotFound, - FilterPrefix: opts.filterPrefix}, w) + FilterPrefix: opts.filterPrefix, + ForwardTo: opts.forwardTo, + }, w) w.CloseWithError(werr) if werr != io.EOF && werr != nil && werr.Error() != errFileNotFound.Error() && werr.Error() != errVolumeNotFound.Error() { logger.LogIf(ctx, werr) diff --git a/cmd/metacache-walk.go b/cmd/metacache-walk.go index 4c5ac33ae..990b63581 100644 --- a/cmd/metacache-walk.go +++ b/cmd/metacache-walk.go @@ -49,6 +49,9 @@ type WalkDirOptions struct { // FilterPrefix will only return results with given prefix within folder. // Should never contain a slash. FilterPrefix string + + // ForwardTo will forward to the given object path. + ForwardTo string } // WalkDir will traverse a directory and return all entries found. @@ -107,6 +110,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ } prefix := opts.FilterPrefix + forward := opts.ForwardTo var scanDir func(path string) error scanDir = func(current string) error { entries, err := s.ListDir(ctx, opts.Bucket, current, -1) @@ -126,6 +130,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ if len(prefix) > 0 && !strings.HasPrefix(entry, prefix) { continue } + if len(forward) > 0 && entry < forward { + continue + } if strings.HasSuffix(entry, slashSeparator) { if strings.HasSuffix(entry, globalDirSuffixWithSlash) { // Add without extension so it is sorted correctly. @@ -177,6 +184,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ sort.Strings(entries) dirStack := make([]string, 0, 5) prefix = "" // Remove prefix after first level. + for _, entry := range entries { if entry == "" { continue @@ -189,8 +197,11 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ out <- metaCacheEntry{name: pop} if opts.Recursive { // Scan folder we found. Should be in correct sort order where we are. - err := scanDir(pop) - logger.LogIf(ctx, err) + forward = "" + if len(opts.ForwardTo) > 0 && strings.HasPrefix(opts.ForwardTo, pop) { + forward = strings.TrimPrefix(opts.ForwardTo, pop) + } + logger.LogIf(ctx, scanDir(pop)) } dirStack = dirStack[:len(dirStack)-1] } @@ -239,8 +250,11 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ out <- metaCacheEntry{name: pop} if opts.Recursive { // Scan folder we found. Should be in correct sort order where we are. - err := scanDir(pop) - logger.LogIf(ctx, err) + forward = "" + if len(opts.ForwardTo) > 0 && strings.HasPrefix(opts.ForwardTo, pop) { + forward = strings.TrimPrefix(opts.ForwardTo, pop) + } + logger.LogIf(ctx, scanDir(pop)) } dirStack = dirStack[:len(dirStack)-1] } @@ -267,6 +281,7 @@ func (client *storageRESTClient) WalkDir(ctx context.Context, opts WalkDirOption values.Set(storageRESTRecursive, strconv.FormatBool(opts.Recursive)) values.Set(storageRESTReportNotFound, strconv.FormatBool(opts.ReportNotFound)) values.Set(storageRESTPrefixFilter, opts.FilterPrefix) + values.Set(storageRESTForwardFilter, opts.ForwardTo) respBody, err := client.call(ctx, storageRESTMethodWalkDir, values, nil, -1) if err != nil { logger.LogIf(ctx, err) @@ -299,6 +314,7 @@ func (s *storageRESTServer) WalkDirHandler(w http.ResponseWriter, r *http.Reques } prefix := r.URL.Query().Get(storageRESTPrefixFilter) + forward := r.URL.Query().Get(storageRESTForwardFilter) writer := streamHTTPResponse(w) writer.CloseWithError(s.storage.WalkDir(r.Context(), WalkDirOptions{ Bucket: volume, @@ -306,5 +322,6 @@ func (s *storageRESTServer) WalkDirHandler(w http.ResponseWriter, r *http.Reques Recursive: recursive, ReportNotFound: reportNotFound, FilterPrefix: prefix, + ForwardTo: forward, }, writer)) } diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 144dc47c1..0ad9c7007 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -72,6 +72,7 @@ const ( storageRESTCount = "count" storageRESTMarkerPath = "marker" storageRESTPrefixFilter = "prefix" + storageRESTForwardFilter = "forward" storageRESTRecursive = "recursive" storageRESTReportNotFound = "report-notfound" storageRESTBitrotAlgo = "bitrot-algo"