From 027e17468ab1174fbc7414356a2f0841a0e6d261 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 24 Dec 2020 15:02:02 -0800 Subject: [PATCH] fix: discarding results do not attempt in-memory metacache writer (#11163) Optimizations include - do not write the metacache block if the size of the block is '0' and it is the first block - where listing is attempted for a transient prefix, this helps to avoid creating lots of empty metacache entries for `minioMetaBucket` - avoid the entire initialization sequence of cacheCh , metacacheBlockWriter if we are simply going to skip them when discardResults is set to true. - No need to hold write locks while writing metacache blocks - each block is unique, per bucket, per prefix and also is written by a single node. --- cmd/data-crawler.go | 14 +++- cmd/erasure-object.go | 10 +-- cmd/global-heal.go | 12 +++- cmd/metacache-set.go | 127 +++++++++++++++++++++--------------- cmd/object-api-interface.go | 1 + 5 files changed, 101 insertions(+), 63 deletions(-) diff --git a/cmd/data-crawler.go b/cmd/data-crawler.go index 6c972fbf6..efb65de47 100644 --- a/cmd/data-crawler.go +++ b/cmd/data-crawler.go @@ -570,8 +570,13 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo bucket: bucket, object: entry.name, versionID: "", + opts: &madmin.HealOpts{ + Remove: true, + }, }, madmin.HealItemObject) - logger.LogIf(ctx, err) + if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { + logger.LogIf(ctx, err) + } foundObjs = foundObjs || err == nil return } @@ -583,8 +588,13 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo bucket: bucket, object: fiv.Name, versionID: ver.VersionID, + opts: &madmin.HealOpts{ + Remove: true, + }, }, madmin.HealItemObject) - logger.LogIf(ctx, err) + if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { + logger.LogIf(ctx, err) + } foundObjs = foundObjs || err == nil } }, diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index fb7b1ae2e..64e887a08 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -674,11 +674,13 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st return ObjectInfo{}, IncompleteBody{Bucket: bucket, Object: object} } - lk := er.NewNSLock(bucket, object) - if err := lk.GetLock(ctx, globalOperationTimeout); err != nil { - return ObjectInfo{}, err + if !opts.NoLock { + lk := er.NewNSLock(bucket, object) + if err := lk.GetLock(ctx, globalOperationTimeout); err != nil { + return ObjectInfo{}, err + } + defer lk.Unlock() } - defer lk.Unlock() for i, w := range writers { if w == nil { diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 221b95840..b42d313b8 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -130,7 +130,9 @@ func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, dis bucket: minioMetaBucket, object: backendEncryptedFile, }, madmin.HealItemMetadata); err != nil { - logger.LogIf(ctx, err) + if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { + logger.LogIf(ctx, err) + } } // Heal all buckets with all objects @@ -139,7 +141,9 @@ func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, dis if err := bgSeq.queueHealTask(healSource{ bucket: bucket.Name, }, madmin.HealItemBucket); err != nil { - logger.LogIf(ctx, err) + if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { + logger.LogIf(ctx, err) + } } var entryChs []FileInfoVersionsCh @@ -179,7 +183,9 @@ func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, dis object: version.Name, versionID: version.VersionID, }, madmin.HealItemObject); err != nil { - logger.LogIf(ctx, err) + if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { + logger.LogIf(ctx, err) + } } } } diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index 5c7c2f1d0..2c093c9c8 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -615,13 +615,18 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr } // Create output for our results. - cacheCh := make(chan metaCacheEntry, metacacheBlockSize) + var cacheCh chan metaCacheEntry + if !o.discardResult { + cacheCh = make(chan metaCacheEntry, metacacheBlockSize) + } // Create filter for results. filterCh := make(chan metaCacheEntry, 100) filteredResults := o.gatherResults(filterCh) closeChannels := func() { - close(cacheCh) + if !o.discardResult { + close(cacheCh) + } close(filterCh) } @@ -657,54 +662,62 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr }() const retryDelay = 200 * time.Millisecond - const maxTries = 10 - - // Write results to disk. - bw := newMetacacheBlockWriter(cacheCh, func(b *metacacheBlock) error { - if o.discardResult { - // Don't save single object listings. - return nil - } - o.debugln("listPath: saving block", b.n, "to", o.objectPath(b.n)) - r, err := hash.NewReader(bytes.NewBuffer(b.data), int64(len(b.data)), "", "", int64(len(b.data)), false) - logger.LogIf(ctx, err) - custom := b.headerKV() - _, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r, nil, nil), ObjectOptions{UserDefined: custom}) - if err != nil { - metaMu.Lock() - if meta.error != "" { - meta.status = scanStateError - meta.error = err.Error() - } - metaMu.Unlock() - cancel() - return err - } - if b.n == 0 { - return nil - } - // Update block 0 metadata. - var retries int - for { - err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), b.headerKV(), ObjectOptions{}) - if err == nil { - break + const maxTries = 5 + + var bw *metacacheBlockWriter + // Don't save single object listings. + if !o.discardResult { + // Write results to disk. + bw = newMetacacheBlockWriter(cacheCh, func(b *metacacheBlock) error { + // if the block is 0 bytes and its a first block skip it. + // skip only this for Transient caches. + if len(b.data) == 0 && b.n == 0 && o.Transient { + return nil } - switch err.(type) { - case ObjectNotFound: + o.debugln("listPath: saving block", b.n, "to", o.objectPath(b.n)) + r, err := hash.NewReader(bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data)), false) + logger.LogIf(ctx, err) + custom := b.headerKV() + _, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r, nil, nil), ObjectOptions{ + UserDefined: custom, + NoLock: true, // No need to hold namespace lock, each prefix caches uniquely. + }) + if err != nil { + metaMu.Lock() + if meta.error != "" { + meta.status = scanStateError + meta.error = err.Error() + } + metaMu.Unlock() + cancel() return err - case InsufficientReadQuorum: - default: - logger.LogIf(ctx, err) } - if retries >= maxTries { - return err + if b.n == 0 { + return nil } - retries++ - time.Sleep(retryDelay) - } - return nil - }) + // Update block 0 metadata. + var retries int + for { + err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), b.headerKV(), ObjectOptions{}) + if err == nil { + break + } + switch err.(type) { + case ObjectNotFound: + return err + case InsufficientReadQuorum: + default: + logger.LogIf(ctx, err) + } + if retries >= maxTries { + return err + } + retries++ + time.Sleep(retryDelay) + } + return nil + }) + } // How to resolve results. resolver := metadataResolutionParams{ @@ -721,14 +734,18 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr filterPrefix: o.FilterPrefix, minDisks: listingQuorum, agreed: func(entry metaCacheEntry) { - cacheCh <- entry + if !o.discardResult { + cacheCh <- entry + } filterCh <- entry }, partial: func(entries metaCacheEntries, nAgreed int, errs []error) { // Results Disagree :-( entry, ok := entries.resolve(&resolver) if ok { - cacheCh <- *entry + if !o.discardResult { + cacheCh <- *entry + } filterCh <- *entry } }, @@ -749,12 +766,14 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr metaMu.Unlock() closeChannels() - if err := bw.Close(); err != nil { - metaMu.Lock() - meta.error = err.Error() - meta.status = scanStateError - meta, err = o.updateMetacacheListing(meta, rpc) - metaMu.Unlock() + if !o.discardResult { + if err := bw.Close(); err != nil { + metaMu.Lock() + meta.error = err.Error() + meta.status = scanStateError + meta, err = o.updateMetacacheListing(meta, rpc) + metaMu.Unlock() + } } }() diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 6ad574f9e..460e68faa 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -51,6 +51,7 @@ type ObjectOptions struct { DeleteMarkerReplicationStatus string // Is only set in DELETE operations VersionPurgeStatus VersionPurgeStatusType // Is only set in DELETE operations for delete marker version to be permanently deleted. TransitionStatus string // status of the transition + NoLock bool // indicates to lower layers if the caller is expecting to hold locks. } // BucketOptions represents bucket options for ObjectLayer bucket operations