From 6135f072d2936f61013b4b52cf64c4fee76bf8bb Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Fri, 30 Oct 2020 09:33:16 -0700 Subject: [PATCH] Fix invalidated metacaches (#10784) * Fix caches having EOF marked as a failure. * Simplify cache updates. * Provide context for checkMetacacheState failures. * Log 499 when the client disconnects. --- cmd/api-errors.go | 16 ++++ cmd/data-update-tracker.go | 93 +++++++++++++--------- cmd/metacache-bucket.go | 14 ++-- cmd/metacache-manager.go | 23 ++++-- cmd/metacache-server-sets.go | 14 ++-- cmd/metacache-set.go | 148 +++++++++++++++++++---------------- cmd/metacache.go | 8 +- cmd/test-utils_test.go | 12 +++ 8 files changed, 197 insertions(+), 131 deletions(-) diff --git a/cmd/api-errors.go b/cmd/api-errors.go index d21a734b2..6839aace8 100644 --- a/cmd/api-errors.go +++ b/cmd/api-errors.go @@ -233,6 +233,7 @@ const ( ErrInvalidResourceName ErrServerNotInitialized ErrOperationTimedOut + ErrClientDisconnected ErrOperationMaxedOut ErrInvalidRequest // MinIO storage class error codes @@ -1216,6 +1217,11 @@ var errorCodes = errorCodeMap{ Description: "A timeout occurred while trying to lock a resource, please reduce your request rate", HTTPStatusCode: http.StatusServiceUnavailable, }, + ErrClientDisconnected: { + Code: "ClientDisconnected", + Description: "Client disconnected before response was ready", + HTTPStatusCode: 499, // No official code, use nginx value. + }, ErrOperationMaxedOut: { Code: "SlowDown", Description: "A timeout exceeded while waiting to proceed with the request, please reduce your request rate", @@ -1742,6 +1748,16 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) { return ErrNone } + // Only return ErrClientDisconnected if the provided context is actually canceled. + // This way downstream context.Canceled will still report ErrOperationTimedOut + select { + case <-ctx.Done(): + if ctx.Err() == context.Canceled { + return ErrClientDisconnected + } + default: + } + switch err { case errInvalidArgument: apiErr = ErrAdminInvalidArgument diff --git a/cmd/data-update-tracker.go b/cmd/data-update-tracker.go index 10405abaf..91751f278 100644 --- a/cmd/data-update-tracker.go +++ b/cmd/data-update-tracker.go @@ -43,7 +43,7 @@ const ( dataUpdateTrackerEstItems = 10000000 // ... we want this false positive rate: dataUpdateTrackerFP = 0.99 - dataUpdateTrackerQueueSize = 10000 + dataUpdateTrackerQueueSize = 0 dataUpdateTrackerFilename = dataUsageBucket + SlashSeparator + ".tracker.bin" dataUpdateTrackerVersion = 4 @@ -477,43 +477,64 @@ func (d *dataUpdateTracker) deserialize(src io.Reader, newerThan time.Time) erro // start a collector that picks up entries from objectUpdatedCh // and adds them to the current bloom filter. func (d *dataUpdateTracker) startCollector(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case in := <-d.input: - if d.debug { - logger.Info(color.Green("dataUpdateTracker:")+" got (%s)", in) + for in := range d.input { + bucket, _ := path2BucketObjectWithBasePath("", in) + if bucket == "" { + if d.debug && len(in) > 0 { + logger.Info(color.Green("dataUpdateTracker:")+" no bucket (%s)", in) } + continue + } - bucket, _ := path2BucketObjectWithBasePath("", in) - if bucket == "" { - if d.debug && len(in) > 0 { - logger.Info(color.Green("dataUpdateTracker:")+" no bucket (%s)", in) - } - continue + if isReservedOrInvalidBucket(bucket, false) { + if d.debug { + logger.Info(color.Green("dataUpdateTracker:")+" isReservedOrInvalidBucket: %v, entry: %v", bucket, in) } + continue + } + split := splitPathDeterministic(in) - if isReservedOrInvalidBucket(bucket, false) { - if d.debug { - logger.Info(color.Green("dataUpdateTracker:")+" isReservedOrInvalidBucket: %v, entry: %v", bucket, in) - } - continue + // Add all paths until done. + d.mu.Lock() + for i := range split { + if d.debug { + logger.Info(color.Green("dataUpdateTracker:") + " Marking path dirty: " + color.Blue(path.Join(split[:i+1]...))) } - split := splitPathDeterministic(in) + d.Current.bf.AddString(hashPath(path.Join(split[:i+1]...)).String()) + } + d.dirty = d.dirty || len(split) > 0 + d.mu.Unlock() + } +} - // Add all paths until done. - d.mu.Lock() - for i := range split { - if d.debug { - logger.Info(color.Green("dataUpdateTracker:") + " Marking path dirty: " + color.Blue(path.Join(split[:i+1]...))) - } - d.Current.bf.AddString(hashPath(path.Join(split[:i+1]...)).String()) - } - d.dirty = d.dirty || len(split) > 0 - d.mu.Unlock() +// markDirty adds the supplied path to the current bloom filter. +func (d *dataUpdateTracker) markDirty(in string) { + bucket, _ := path2BucketObjectWithBasePath("", in) + if bucket == "" { + if d.debug && len(in) > 0 { + logger.Info(color.Green("dataUpdateTracker:")+" no bucket (%s)", in) + } + return + } + + if isReservedOrInvalidBucket(bucket, false) { + if d.debug && false { + logger.Info(color.Green("dataUpdateTracker:")+" isReservedOrInvalidBucket: %v, entry: %v", bucket, in) } + return } + split := splitPathDeterministic(in) + + // Add all paths until done. + d.mu.Lock() + for i := range split { + if d.debug { + logger.Info(color.Green("dataUpdateTracker:") + " Marking path dirty: " + color.Blue(path.Join(split[:i+1]...))) + } + d.Current.bf.AddString(hashPath(path.Join(split[:i+1]...)).String()) + } + d.dirty = d.dirty || len(split) > 0 + d.mu.Unlock() } // find entry with specified index. @@ -620,7 +641,7 @@ func (d *dataUpdateTracker) cycleFilter(ctx context.Context, req bloomFilterRequ // Trailing slashes are removed. // Returns 0 length if no parts are found after trimming. func splitPathDeterministic(in string) []string { - split := strings.Split(in, SlashSeparator) + split := strings.Split(decodeDirObject(in), SlashSeparator) // Trim empty start/end for len(split) > 0 { @@ -663,13 +684,9 @@ type bloomFilterResponse struct { } // ObjectPathUpdated indicates a path has been updated. -// The function will never block. +// The function will block until the entry has been picked up. func ObjectPathUpdated(s string) { - if strings.HasPrefix(s, minioMetaBucket) { - return - } - select { - case objectUpdatedCh <- s: - default: + if intDataUpdateTracker != nil { + intDataUpdateTracker.markDirty(s) } } diff --git a/cmd/metacache-bucket.go b/cmd/metacache-bucket.go index 21a77813a..7501be4f5 100644 --- a/cmd/metacache-bucket.go +++ b/cmd/metacache-bucket.go @@ -247,7 +247,7 @@ func (b *bucketMetacache) findCache(o listPathOptions) metacache { b.caches[best.id] = best b.updated = true } - debugPrint("returning cached") + debugPrint("returning cached %s, status: %v, ended: %v", best.id, best.status, best.ended) return best } if !o.Create { @@ -348,16 +348,20 @@ func (b *bucketMetacache) updateCacheEntry(update metacache) (metacache, error) } existing.lastUpdate = UTCNow() - if existing.status == scanStateStarted && update.status != scanStateStarted { - existing.status = update.status - } - if existing.status == scanStateSuccess && update.status == scanStateSuccess { + + if existing.status == scanStateStarted && update.status == scanStateSuccess { existing.ended = UTCNow() existing.endedCycle = update.endedCycle } + + if existing.status == scanStateStarted && update.status != scanStateStarted { + existing.status = update.status + } + if existing.error == "" && update.error != "" { existing.error = update.error existing.status = scanStateError + existing.ended = UTCNow() } existing.fileNotFound = existing.fileNotFound || update.fileNotFound b.caches[update.id] = existing diff --git a/cmd/metacache-manager.go b/cmd/metacache-manager.go index 3e06d64f3..36c055c4c 100644 --- a/cmd/metacache-manager.go +++ b/cmd/metacache-manager.go @@ -18,7 +18,6 @@ package cmd import ( "context" - "errors" "fmt" "runtime/debug" "sync" @@ -129,6 +128,15 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket return b } +// deleteAll will delete all caches. +func (m *metacacheManager) deleteAll() { + m.mu.Lock() + defer m.mu.Unlock() + for _, b := range m.buckets { + b.deleteAll() + } +} + // getTransient will return a transient bucket. func (m *metacacheManager) getTransient() *bucketMetacache { m.init.Do(m.initManager) @@ -140,12 +148,11 @@ func (m *metacacheManager) getTransient() *bucketMetacache { // checkMetacacheState should be used if data is not updating. // Should only be called if a failure occurred. -func (o listPathOptions) checkMetacacheState(ctx context.Context) error { +func (o listPathOptions) checkMetacacheState(ctx context.Context, rpc *peerRESTClient) error { // We operate on a copy... o.Create = false var cache metacache if !o.Transient { - rpc := globalNotificationSys.restClientFromHash(o.Bucket) if rpc == nil { // Local cache = localMetacacheMgr.getBucket(ctx, o.Bucket).findCache(o) @@ -160,21 +167,21 @@ func (o listPathOptions) checkMetacacheState(ctx context.Context) error { cache = localMetacacheMgr.getTransient().findCache(o) } - if cache.status == scanStateNone { + if cache.status == scanStateNone || cache.fileNotFound { return errFileNotFound } if cache.status == scanStateSuccess { - if time.Since(cache.lastUpdate) > 10*time.Second { - return fmt.Errorf("timeout: Finished and data not available after 10 seconds") + if time.Since(cache.lastUpdate) > metacacheMaxRunningAge { + return fmt.Errorf("timeout: list %s finished and no update for 1 minute", cache.id) } return nil } if cache.error != "" { - return errors.New(cache.error) + return fmt.Errorf("async cache listing failed with: %s", cache.error) } if cache.status == scanStateStarted { if time.Since(cache.lastUpdate) > metacacheMaxRunningAge { - return errors.New("cache listing not updating") + return fmt.Errorf("cache id %s listing not updating. Last update %s seconds ago", cache.id, time.Since(cache.lastUpdate).Round(time.Second)) } } return nil diff --git a/cmd/metacache-server-sets.go b/cmd/metacache-server-sets.go index 6be6c73cb..7b951c6a7 100644 --- a/cmd/metacache-server-sets.go +++ b/cmd/metacache-server-sets.go @@ -18,6 +18,7 @@ package cmd import ( "context" + "errors" "io" "path" "sync" @@ -98,6 +99,10 @@ func (z *erasureServerSets) listPath(ctx context.Context, o listPathOptions) (en } else { c, err := rpc.GetMetacacheListing(ctx, o) if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + // Context is canceled, return at once. + return entries, err + } logger.LogIf(ctx, err) cache = localMetacacheMgr.getTransient().findCache(o) o.Transient = true @@ -183,14 +188,7 @@ func (z *erasureServerSets) listPath(ctx context.Context, o listPathOptions) (en // Update master cache with that information. cache.status = scanStateSuccess cache.fileNotFound = true - client := globalNotificationSys.restClientFromHash(o.Bucket) - if o.Transient { - cache, err = localMetacacheMgr.getTransient().updateCacheEntry(cache) - } else if client == nil { - cache, err = localMetacacheMgr.getBucket(GlobalContext, o.Bucket).updateCacheEntry(cache) - } else { - cache, err = client.UpdateMetacacheListing(context.Background(), cache) - } + _, err := o.updateMetacacheListing(cache, globalNotificationSys.restClientFromHash(o.Bucket)) logger.LogIf(ctx, err) return entries, errFileNotFound } diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index d5d96cd6a..9c69fc2c3 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -96,6 +96,25 @@ func init() { gob.Register(listPathOptions{}) } +// newMetacache constructs a new metacache from the options. +func (o listPathOptions) newMetacache() metacache { + return metacache{ + id: o.ID, + bucket: o.Bucket, + root: o.BaseDir, + recursive: o.Recursive, + status: scanStateStarted, + error: "", + started: UTCNow(), + lastHandout: UTCNow(), + lastUpdate: UTCNow(), + ended: time.Time{}, + startedCycle: o.CurrentCycle, + endedCycle: 0, + dataVersion: metacacheStreamVersion, + } +} + // gatherResults will collect all results on the input channel and filter results according to the options. // Caller should close the channel when done. // The returned function will return the results once there is enough or input is closed. @@ -230,23 +249,15 @@ func (o *listPathOptions) findFirstPart(fi FileInfo) (int, error) { } } -// newMetacache constructs a new metacache from the options. -func (o listPathOptions) newMetacache() metacache { - return metacache{ - id: o.ID, - bucket: o.Bucket, - root: o.BaseDir, - recursive: o.Recursive, - status: scanStateStarted, - error: "", - started: UTCNow(), - lastHandout: UTCNow(), - lastUpdate: UTCNow(), - ended: time.Time{}, - startedCycle: o.CurrentCycle, - endedCycle: 0, - dataVersion: metacacheStreamVersion, +// updateMetacacheListing will update the metacache listing. +func (o *listPathOptions) updateMetacacheListing(m metacache, rpc *peerRESTClient) (metacache, error) { + if o.Transient { + return localMetacacheMgr.getTransient().updateCacheEntry(m) } + if rpc == nil { + return localMetacacheMgr.getBucket(GlobalContext, o.Bucket).updateCacheEntry(m) + } + return rpc.UpdateMetacacheListing(context.Background(), m) } func getMetacacheBlockInfo(fi FileInfo, block int) (*metacacheBlock, error) { @@ -332,6 +343,7 @@ func (r *metacacheReader) filter(o listPathOptions) (entries metaCacheEntriesSor func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) { retries := 0 const debugPrint = false + rpc := globalNotificationSys.restClientFromHash(o.Bucket) for { select { case <-ctx.Done(): @@ -339,6 +351,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt default: } + const retryDelay = 500 * time.Millisecond // Load first part metadata... // All operations are performed without locks, so we must be careful and allow for failures. fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), ObjectOptions{}) @@ -346,18 +359,17 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt if err == errFileNotFound || errors.Is(err, errErasureReadQuorum) || errors.Is(err, InsufficientReadQuorum{}) { // Not ready yet... if retries == 10 { - err := o.checkMetacacheState(ctx) + err := o.checkMetacacheState(ctx, rpc) if debugPrint { logger.Info("waiting for first part (%s), err: %v", o.objectPath(0), err) } if err != nil { return entries, err } - retries = 0 - continue + retries = -1 } retries++ - time.Sleep(100 * time.Millisecond) + time.Sleep(retryDelay) continue } if debugPrint { @@ -375,22 +387,22 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt case nil: case io.ErrUnexpectedEOF, errErasureReadQuorum, InsufficientReadQuorum{}: if retries == 10 { - err := o.checkMetacacheState(ctx) + err := o.checkMetacacheState(ctx, rpc) if debugPrint { logger.Info("waiting for metadata, err: %v", err) } if err != nil { return entries, err } - retries = 0 - continue + retries = -1 } retries++ - time.Sleep(100 * time.Millisecond) + time.Sleep(retryDelay) continue case io.EOF: return entries, io.EOF } + // We got a stream to start at. loadedPart := 0 var buf bytes.Buffer @@ -407,25 +419,24 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt switch err { case errFileNotFound, errErasureReadQuorum, InsufficientReadQuorum{}: if retries >= 10 { - err := o.checkMetacacheState(ctx) + err := o.checkMetacacheState(ctx, rpc) if debugPrint { logger.Info("waiting for part data (%v), err: %v", o.objectPath(partN), err) } if err != nil { return entries, err } - retries = 0 - continue + retries = -1 } - time.Sleep(100 * time.Millisecond) + time.Sleep(retryDelay) continue default: - time.Sleep(100 * time.Millisecond) if retries >= 20 { // We had at least 10 retries without getting a result. logger.LogIf(ctx, err) return entries, err } + time.Sleep(retryDelay) retries++ continue case nil: @@ -452,7 +463,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt return entries, err } retries++ - time.Sleep(100 * time.Millisecond) + time.Sleep(retryDelay) continue default: logger.LogIf(ctx, err) @@ -511,36 +522,34 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr return entries, err } - rpcClient := globalNotificationSys.restClientFromHash(o.Bucket) meta := o.newMetacache() + rpc := globalNotificationSys.restClientFromHash(o.Bucket) var metaMu sync.Mutex + + if debugPrint { + console.Println("listPath: scanning bucket:", o.Bucket, "basedir:", o.BaseDir, "prefix:", o.Prefix, "marker:", o.Marker) + } + + // Disconnect from call above, but cancel on exit. + ctx, cancel := context.WithCancel(GlobalContext) + // We need to ask disks. + disks := er.getOnlineDisks() + defer func() { if debugPrint { console.Println("listPath returning:", entries.len(), "err:", err) } - if err != nil { + if err != nil && err != io.EOF { metaMu.Lock() if meta.status != scanStateError { meta.error = err.Error() meta.status = scanStateError } - lm := meta + meta, _ = o.updateMetacacheListing(meta, rpc) metaMu.Unlock() - if rpcClient == nil { - localMetacacheMgr.getBucket(GlobalContext, o.Bucket).updateCacheEntry(lm) - } else { - rpcClient.UpdateMetacacheListing(context.Background(), lm) - } + cancel() } }() - if debugPrint { - console.Println("listPath: scanning bucket:", o.Bucket, "basedir:", o.BaseDir, "prefix:", o.Prefix, "marker:", o.Marker) - } - - // Disconnect from call above, but cancel on exit. - ctx, cancel := context.WithCancel(GlobalContext) - // We need to ask disks. - disks := er.getOnlineDisks() askDisks := o.AskDisks if askDisks == -1 { @@ -571,7 +580,7 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr cancel() return entries, err } - // Send request. + // Send request to each disk. go func() { err := d.WalkDir(ctx, WalkDirOptions{Bucket: o.Bucket, BaseDir: o.BaseDir, Recursive: o.Recursive || o.Separator != SlashSeparator}, w) w.CloseWithError(err) @@ -596,6 +605,7 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr defer cancel() // Save continuous updates go func() { + var err error ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() var exit bool @@ -607,21 +617,13 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr } metaMu.Lock() meta.endedCycle = intDataUpdateTracker.current() - lm := meta - metaMu.Unlock() - var err error - if o.Transient { - lm, err = localMetacacheMgr.getTransient().updateCacheEntry(lm) - } else if rpcClient == nil { - lm, err = localMetacacheMgr.getBucket(GlobalContext, o.Bucket).updateCacheEntry(lm) - } else { - lm, err = rpcClient.UpdateMetacacheListing(context.Background(), lm) - } - logger.LogIf(ctx, err) - if lm.status == scanStateError { + meta, err = o.updateMetacacheListing(meta, rpc) + if meta.status == scanStateError { cancel() exit = true } + metaMu.Unlock() + logger.LogIf(ctx, err) } }() @@ -640,8 +642,10 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr _, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r, nil, nil), ObjectOptions{UserDefined: custom}) if err != nil { metaMu.Lock() - meta.status = scanStateError - meta.error = err.Error() + if meta.error != "" { + meta.status = scanStateError + meta.error = err.Error() + } metaMu.Unlock() cancel() return err @@ -758,18 +762,24 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr } } } - closeChannels() + + // Save success metaMu.Lock() if meta.error == "" { - if err := bw.Close(); err != nil { - meta.error = err.Error() - meta.status = scanStateError - } else { - meta.status = scanStateSuccess - meta.endedCycle = intDataUpdateTracker.current() - } + meta.status = scanStateSuccess + meta.endedCycle = intDataUpdateTracker.current() } + meta, _ = o.updateMetacacheListing(meta, rpc) metaMu.Unlock() + + closeChannels() + if err := bw.Close(); err != nil { + metaMu.Lock() + meta.error = err.Error() + meta.status = scanStateError + meta, err = o.updateMetacacheListing(meta, rpc) + metaMu.Unlock() + } }() return filteredResults() diff --git a/cmd/metacache.go b/cmd/metacache.go index 2516cb6b8..204e13e0b 100644 --- a/cmd/metacache.go +++ b/cmd/metacache.go @@ -78,8 +78,8 @@ func (m *metacache) worthKeeping(currentCycle uint64) bool { // Cycle is too old to be valuable. return false case cache.status == scanStateError || cache.status == scanStateNone: - // Remove failed listings - return false + // Remove failed listings after 10 minutes. + return time.Since(cache.lastUpdate) < 10*time.Minute } return true } @@ -91,7 +91,9 @@ func (m *metacache) canBeReplacedBy(other *metacache) bool { if other.started.Before(m.started) || m.id == other.id { return false } - + if other.status == scanStateNone || other.status == scanStateError { + return false + } // Keep it around a bit longer. if time.Since(m.lastHandout) < time.Hour { return false diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index c45ed0be8..b73373ebd 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -1879,6 +1879,9 @@ func ExecObjectLayerTest(t TestErrHandler, objTest objTestType) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + if localMetacacheMgr != nil { + localMetacacheMgr.deleteAll() + } defer setObjectLayer(newObjectLayerFn()) objLayer, fsDir, err := prepareFS() @@ -1900,6 +1903,11 @@ func ExecObjectLayerTest(t TestErrHandler, objTest objTestType) { // Executing the object layer tests for single node setup. objTest(objLayer, FSTestStr, t) + if localMetacacheMgr != nil { + localMetacacheMgr.deleteAll() + } + defer setObjectLayer(newObjectLayerFn()) + newAllSubsystems() objLayer, fsDirs, err := prepareErasureSets32(ctx) if err != nil { @@ -1914,6 +1922,10 @@ func ExecObjectLayerTest(t TestErrHandler, objTest objTestType) { defer removeRoots(append(fsDirs, fsDir)) // Executing the object layer tests for Erasure. objTest(objLayer, ErasureTestStr, t) + + if localMetacacheMgr != nil { + localMetacacheMgr.deleteAll() + } } // ExecObjectLayerTestWithDirs - executes object layer tests.