From 445a9bd8279c8ccf71452828fbead80aba10753d Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 28 Dec 2020 10:31:00 -0800 Subject: [PATCH] fix: heal optimizations in crawler to avoid multiple healing attempts (#11173) Fixes two problems - Double healing when bitrot is enabled, instead heal attempt once in applyActions() before lifecycle is applied. - If applyActions() is successful and getSize() returns proper value, then object is accounted for and should be removed from the oldCache namespace map to avoid double heal attempts. --- cmd/data-crawler.go | 111 ++++++++++++++++++++++++------------- cmd/metacache-walk.go | 20 +++++-- cmd/storage-rest-common.go | 45 +++++++-------- cmd/xl-storage.go | 25 +-------- 4 files changed, 113 insertions(+), 88 deletions(-) diff --git a/cmd/data-crawler.go b/cmd/data-crawler.go index 31fb804ec..9861c64a0 100644 --- a/cmd/data-crawler.go +++ b/cmd/data-crawler.go @@ -45,7 +45,7 @@ import ( const ( dataCrawlSleepPerFolder = time.Millisecond // Time to wait between folders. - dataCrawlStartDelay = 5 * time.Minute // Time to wait on startup and between cycles. + dataCrawlStartDelay = 1 * time.Minute // Time to wait on startup and between cycles. dataUsageUpdateDirCycles = 16 // Visit all folders every n cycles. healDeleteDangling = true @@ -426,8 +426,11 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo cache.addChildString(entName) this := cachedFolder{name: entName, parent: &thisHash, objectHealProbDiv: folder.objectHealProbDiv} - delete(existing, h.Key()) + + delete(existing, h.Key()) // h.Key() already accounted for. + cache.addChild(h) + if final { if exists { f.existingFolders = append(f.existingFolders, this) @@ -454,17 +457,30 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo lifeCycle: activeLifeCycle, heal: thisHash.mod(f.oldCache.Info.NextCycle, f.healObjectSelect/folder.objectHealProbDiv) && globalIsErasure, } - sizeSummary, err := f.getSize(item) - wait() + sizeSummary, err := f.getSize(item) if err == errSkipFile { + wait() // wait to proceed to next entry. + return nil } - logger.LogIf(ctx, err) + + // successfully read means we have a valid object. + + // Remove filename i.e is the meta file to construct object name + item.transformMetaDir() + + // Object already accounted for, remove from heal map, + // simply because getSize() function already heals the + // object. + delete(existing, path.Join(item.bucket, item.objectPath())) + cache.addSizes(sizeSummary) cache.Objects++ cache.ObjSizes.add(sizeSummary.totalSize) + wait() // wait to proceed to next entry. + return nil }) if err != nil { @@ -516,7 +532,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo resolver.bucket = bucket foundObjs := false - dangling := true + dangling := false ctx, cancel := context.WithCancel(ctx) err := listPathRaw(ctx, listPathRawOptions{ disks: f.disks, @@ -530,15 +546,16 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo if f.dataUsageCrawlDebug { logger.Info(color.Green("healObjects:")+" got agreement: %v", entry.name) } - if entry.isObject() { - dangling = false - } }, // Some disks have data for this. partial: func(entries metaCacheEntries, nAgreed int, errs []error) { if f.dataUsageCrawlDebug { logger.Info(color.Green("healObjects:")+" got partial, %d agreed, errs: %v", nAgreed, errs) } + + // agreed value less than expected quorum + dangling = nAgreed < resolver.objQuorum || nAgreed < resolver.dirQuorum + // Sleep and reset. wait() wait = crawlerSleeper.Timer(ctx) @@ -546,8 +563,6 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo if !ok { for _, err := range errs { if err != nil { - // Not all disks are ready, do nothing for now. - dangling = false return } } @@ -559,10 +574,10 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo if f.dataUsageCrawlDebug { logger.Info(color.Green("healObjects:")+" resolved to: %v, dir: %v", entry.name, entry.isDir()) } + if entry.isDir() { return } - dangling = false // We got an entry which we should be able to heal. fiv, err := entry.fileInfoVersions(bucket) if err != nil { @@ -597,7 +612,6 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo if f.dataUsageCrawlDebug { logger.Info(color.Green("healObjects:")+" too many errors: %v", errs) } - dangling = false cancel() }, }) @@ -612,8 +626,20 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo logger.Info(color.Green("healObjects:")+" deleting dangling directory %s", prefix) } - // If we have quorum, found directories, but no objects, issue heal to delete the dangling. - objAPI.HealObject(ctx, bucket, prefix, "", madmin.HealOpts{Recursive: true, Remove: true}) + objAPI.HealObjects(ctx, bucket, prefix, madmin.HealOpts{ + Recursive: true, + Remove: true, + }, + func(bucket, object, versionID string) error { + // Wait for each heal as per crawler frequency. + wait() + wait = crawlerSleeper.Timer(ctx) + return bgSeq.queueHealTask(healSource{ + bucket: bucket, + object: object, + versionID: versionID, + }, madmin.HealItemObject) + }) } wait() @@ -746,6 +772,7 @@ type actionMeta struct { oi ObjectInfo successorModTime time.Time // The modtime of the successor version numVersions int // The number of versions of this object + bitRotScan bool // indicates if bitrot check was requested. } // applyActions will apply lifecycle checks on to a scanned item. @@ -765,7 +792,11 @@ func (i *crawlItem) applyActions(ctx context.Context, o ObjectLayer, meta action logger.Info(color.Green("applyActions:")+" heal checking: %v/%v", i.bucket, i.objectPath()) } } - res, err := o.HealObject(ctx, i.bucket, i.objectPath(), meta.oi.VersionID, madmin.HealOpts{Remove: healDeleteDangling}) + healOpts := madmin.HealOpts{Remove: healDeleteDangling} + if meta.bitRotScan { + healOpts.ScanMode = madmin.HealDeepScan + } + res, err := o.HealObject(ctx, i.bucket, i.objectPath(), meta.oi.VersionID, healOpts) if isErrObjectNotFound(err) || isErrVersionNotFound(err) { return 0 } @@ -940,52 +971,52 @@ func (i *crawlItem) objectPath() string { } // healReplication will heal a scanned item that has failed replication. -func (i *crawlItem) healReplication(ctx context.Context, o ObjectLayer, meta actionMeta, sizeS *sizeSummary) { - if meta.oi.DeleteMarker || !meta.oi.VersionPurgeStatus.Empty() { +func (i *crawlItem) healReplication(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) { + if oi.DeleteMarker || !oi.VersionPurgeStatus.Empty() { // heal delete marker replication failure or versioned delete replication failure - if meta.oi.ReplicationStatus == replication.Pending || - meta.oi.ReplicationStatus == replication.Failed || - meta.oi.VersionPurgeStatus == Failed || meta.oi.VersionPurgeStatus == Pending { - i.healReplicationDeletes(ctx, o, meta) + if oi.ReplicationStatus == replication.Pending || + oi.ReplicationStatus == replication.Failed || + oi.VersionPurgeStatus == Failed || oi.VersionPurgeStatus == Pending { + i.healReplicationDeletes(ctx, o, oi) return } } - switch meta.oi.ReplicationStatus { + switch oi.ReplicationStatus { case replication.Pending: - sizeS.pendingSize += meta.oi.Size - globalReplicationState.queueReplicaTask(meta.oi) + sizeS.pendingSize += oi.Size + globalReplicationState.queueReplicaTask(oi) case replication.Failed: - sizeS.failedSize += meta.oi.Size - globalReplicationState.queueReplicaTask(meta.oi) + sizeS.failedSize += oi.Size + globalReplicationState.queueReplicaTask(oi) case replication.Complete: - sizeS.replicatedSize += meta.oi.Size + sizeS.replicatedSize += oi.Size case replication.Replica: - sizeS.replicaSize += meta.oi.Size + sizeS.replicaSize += oi.Size } } // healReplicationDeletes will heal a scanned deleted item that failed to replicate deletes. -func (i *crawlItem) healReplicationDeletes(ctx context.Context, o ObjectLayer, meta actionMeta) { +func (i *crawlItem) healReplicationDeletes(ctx context.Context, o ObjectLayer, oi ObjectInfo) { // handle soft delete and permanent delete failures here. - if meta.oi.DeleteMarker || !meta.oi.VersionPurgeStatus.Empty() { + if oi.DeleteMarker || !oi.VersionPurgeStatus.Empty() { versionID := "" dmVersionID := "" - if meta.oi.VersionPurgeStatus.Empty() { - dmVersionID = meta.oi.VersionID + if oi.VersionPurgeStatus.Empty() { + dmVersionID = oi.VersionID } else { - versionID = meta.oi.VersionID + versionID = oi.VersionID } globalReplicationState.queueReplicaDeleteTask(DeletedObjectVersionInfo{ DeletedObject: DeletedObject{ - ObjectName: meta.oi.Name, + ObjectName: oi.Name, DeleteMarkerVersionID: dmVersionID, VersionID: versionID, - DeleteMarkerReplicationStatus: string(meta.oi.ReplicationStatus), - DeleteMarkerMTime: DeleteMarkerMTime{meta.oi.ModTime}, - DeleteMarker: meta.oi.DeleteMarker, - VersionPurgeStatus: meta.oi.VersionPurgeStatus, + DeleteMarkerReplicationStatus: string(oi.ReplicationStatus), + DeleteMarkerMTime: DeleteMarkerMTime{oi.ModTime}, + DeleteMarker: oi.DeleteMarker, + VersionPurgeStatus: oi.VersionPurgeStatus, }, - Bucket: meta.oi.Bucket, + Bucket: oi.Bucket, }) } } diff --git a/cmd/metacache-walk.go b/cmd/metacache-walk.go index 3273c68ca..e9130e38e 100644 --- a/cmd/metacache-walk.go +++ b/cmd/metacache-walk.go @@ -249,6 +249,7 @@ func (client *storageRESTClient) WalkDir(ctx context.Context, opts WalkDirOption values.Set(storageRESTVolume, opts.Bucket) values.Set(storageRESTDirPath, opts.BaseDir) values.Set(storageRESTRecursive, strconv.FormatBool(opts.Recursive)) + values.Set(storageRESTReportNotFound, strconv.FormatBool(opts.ReportNotFound)) values.Set(storageRESTPrefixFilter, opts.FilterPrefix) respBody, err := client.call(ctx, storageRESTMethodWalkDir, values, nil, -1) if err != nil { @@ -271,12 +272,23 @@ func (s *storageRESTServer) WalkDirHandler(w http.ResponseWriter, r *http.Reques s.writeErrorResponse(w, err) return } + + var reportNotFound bool + if v := vars[storageRESTReportNotFound]; v != "" { + reportNotFound, err = strconv.ParseBool(v) + if err != nil { + s.writeErrorResponse(w, err) + return + } + } + prefix := r.URL.Query().Get(storageRESTPrefixFilter) writer := streamHTTPResponse(w) writer.CloseWithError(s.storage.WalkDir(r.Context(), WalkDirOptions{ - Bucket: volume, - BaseDir: dirPath, - Recursive: recursive, - FilterPrefix: prefix, + Bucket: volume, + BaseDir: dirPath, + Recursive: recursive, + ReportNotFound: reportNotFound, + FilterPrefix: prefix, }, writer)) } diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 8f5a5a276..7ac8bb3c0 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -54,26 +54,27 @@ const ( ) const ( - storageRESTVolume = "volume" - storageRESTVolumes = "volumes" - storageRESTDirPath = "dir-path" - storageRESTFilePath = "file-path" - storageRESTVersionID = "version-id" - storageRESTCheckDataDir = "check-data-dir" - storageRESTTotalVersions = "total-versions" - storageRESTSrcVolume = "source-volume" - storageRESTSrcPath = "source-path" - storageRESTDataDir = "data-dir" - storageRESTDstVolume = "destination-volume" - storageRESTDstPath = "destination-path" - storageRESTOffset = "offset" - storageRESTLength = "length" - storageRESTCount = "count" - storageRESTMarkerPath = "marker" - storageRESTPrefixFilter = "prefix" - storageRESTRecursive = "recursive" - storageRESTBitrotAlgo = "bitrot-algo" - storageRESTBitrotHash = "bitrot-hash" - storageRESTDiskID = "disk-id" - storageRESTForceDelete = "force-delete" + storageRESTVolume = "volume" + storageRESTVolumes = "volumes" + storageRESTDirPath = "dir-path" + storageRESTFilePath = "file-path" + storageRESTVersionID = "version-id" + storageRESTCheckDataDir = "check-data-dir" + storageRESTTotalVersions = "total-versions" + storageRESTSrcVolume = "source-volume" + storageRESTSrcPath = "source-path" + storageRESTDataDir = "data-dir" + storageRESTDstVolume = "destination-volume" + storageRESTDstPath = "destination-path" + storageRESTOffset = "offset" + storageRESTLength = "length" + storageRESTCount = "count" + storageRESTMarkerPath = "marker" + storageRESTPrefixFilter = "prefix" + storageRESTRecursive = "recursive" + storageRESTReportNotFound = "report-notfound" + storageRESTBitrotAlgo = "bitrot-algo" + storageRESTBitrotHash = "bitrot-hash" + storageRESTDiskID = "disk-id" + storageRESTForceDelete = "force-delete" ) diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index b37b77ae8..b3f3a2b96 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -50,7 +50,6 @@ import ( "github.com/minio/minio/pkg/disk" "github.com/minio/minio/pkg/env" xioutil "github.com/minio/minio/pkg/ioutil" - "github.com/minio/minio/pkg/madmin" ) const ( @@ -391,31 +390,13 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac } oi := version.ToObjectInfo(item.bucket, item.objectPath()) if objAPI != nil { - size := item.applyActions(ctx, objAPI, actionMeta{ + totalSize += item.applyActions(ctx, objAPI, actionMeta{ numVersions: numVersions, successorModTime: successorModTime, oi: oi, + bitRotScan: healOpts.Bitrot, }) - if !version.Deleted { - // Bitrot check local data - if size > 0 && item.heal && healOpts.Bitrot { - // HealObject verifies bitrot requirement internally - res, err := objAPI.HealObject(ctx, item.bucket, item.objectPath(), oi.VersionID, madmin.HealOpts{ - Remove: healDeleteDangling, - ScanMode: madmin.HealDeepScan, - }) - if err != nil { - if !errors.Is(err, NotImplemented{}) { - logger.LogIf(ctx, err) - } - size = 0 - } else { - size = res.ObjectSize - } - } - totalSize += size - } - item.healReplication(ctx, objAPI, actionMeta{oi: version.ToObjectInfo(item.bucket, item.objectPath())}, &sizeS) + item.healReplication(ctx, objAPI, oi, &sizeS) } } sizeS.totalSize = totalSize