From ad382799b14617f3509143e5e5d57d5b7d1b2aab Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 3 Nov 2020 08:53:48 -0800 Subject: [PATCH] use list cache for Walk() with webUI and quota (#10814) bring list cache optimizations for web UI object listing, also FIFO quota enforcement through list cache as well. --- cmd/erasure-server-sets.go | 170 ++++++----------------------------- cmd/metacache-server-sets.go | 13 ++- 2 files changed, 33 insertions(+), 150 deletions(-) diff --git a/cmd/erasure-server-sets.go b/cmd/erasure-server-sets.go index ff5f10dd8..ff86fc3a4 100644 --- a/cmd/erasure-server-sets.go +++ b/cmd/erasure-server-sets.go @@ -658,99 +658,6 @@ func (z *erasureServerSets) ListObjectsV2(ctx context.Context, bucket, prefix, c return listObjectsV2Info, err } -// Calculate least entry across zones and across multiple FileInfo -// channels, returns the least common entry and the total number of times -// we found this entry. Additionally also returns a boolean -// to indicate if the caller needs to call this function -// again to list the next entry. It is callers responsibility -// if the caller wishes to list N entries to call lexicallySortedEntry -// N times until this boolean is 'false'. -func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) (FileInfo, int, int, bool) { - for i, entryChs := range zoneEntryChs { - for j := range entryChs { - zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() - } - } - - var isTruncated = false - for _, entriesValid := range zoneEntriesValid { - for _, valid := range entriesValid { - if !valid { - continue - } - isTruncated = true - break - } - if isTruncated { - break - } - } - - var lentry FileInfo - var found bool - var zoneIndex = -1 - // TODO: following loop can be merged with above - // loop, explore this possibility. - for i, entriesValid := range zoneEntriesValid { - for j, valid := range entriesValid { - if !valid { - continue - } - if !found { - lentry = zoneEntries[i][j] - found = true - zoneIndex = i - continue - } - str1 := zoneEntries[i][j].Name - str2 := lentry.Name - if HasSuffix(str1, globalDirSuffix) { - str1 = strings.TrimSuffix(str1, globalDirSuffix) + slashSeparator - } - if HasSuffix(str2, globalDirSuffix) { - str2 = strings.TrimSuffix(str2, globalDirSuffix) + slashSeparator - } - - if str1 < str2 { - lentry = zoneEntries[i][j] - zoneIndex = i - } - } - } - - // We haven't been able to find any least entry, - // this would mean that we don't have valid entry. - if !found { - return lentry, 0, zoneIndex, isTruncated - } - - lexicallySortedEntryCount := 0 - for i, entriesValid := range zoneEntriesValid { - for j, valid := range entriesValid { - if !valid { - continue - } - - // Entries are duplicated across disks, - // we should simply skip such entries. - if lentry.Name == zoneEntries[i][j].Name && lentry.ModTime.Equal(zoneEntries[i][j].ModTime) { - lexicallySortedEntryCount++ - continue - } - - // Push all entries which are lexically higher - // and will be returned later in Pop() - zoneEntryChs[i][j].Push(zoneEntries[i][j]) - } - } - - if HasSuffix(lentry.Name, globalDirSuffix) { - lentry.Name = strings.TrimSuffix(lentry.Name, globalDirSuffix) + slashSeparator - } - - return lentry, lexicallySortedEntryCount, zoneIndex, isTruncated -} - // Calculate least entry across serverSets and across multiple FileInfoVersions // channels, returns the least common entry and the total number of times // we found this entry. Additionally also returns a boolean @@ -854,6 +761,7 @@ func (z *erasureServerSets) ListObjectVersions(ctx context.Context, bucket, pref Limit: maxKeys, Marker: marker, InclDeleted: true, + AskDisks: globalAPIConfig.getListQuorum(), }) if err != nil && err != io.EOF { return loi, err @@ -881,6 +789,7 @@ func (z *erasureServerSets) ListObjects(ctx context.Context, bucket, prefix, mar Limit: maxKeys, Marker: marker, InclDeleted: false, + AskDisks: globalAPIConfig.getListQuorum(), }) if err != nil && err != io.EOF { logger.LogIf(ctx, err) @@ -1330,76 +1239,51 @@ func (z *erasureServerSets) Walk(ctx context.Context, bucket, prefix string, res return err } - serverSetsListTolerancePerSet := make([]int, 0, len(z.serverSets)) - for _, zone := range z.serverSets { - quorum := globalAPIConfig.getListQuorum() - switch quorum { - case -1: - serverSetsListTolerancePerSet = append(serverSetsListTolerancePerSet, zone.setDriveCount/2) - default: - serverSetsListTolerancePerSet = append(serverSetsListTolerancePerSet, quorum) - } - } - if opts.WalkVersions { - var serverSetsEntryChs [][]FileInfoVersionsCh - for _, zone := range z.serverSets { - serverSetsEntryChs = append(serverSetsEntryChs, zone.startMergeWalksVersions(ctx, bucket, prefix, "", true, ctx.Done())) - } - - var serverSetsEntriesInfos [][]FileInfoVersions - var serverSetsEntriesValid [][]bool - for _, entryChs := range serverSetsEntryChs { - serverSetsEntriesInfos = append(serverSetsEntriesInfos, make([]FileInfoVersions, len(entryChs))) - serverSetsEntriesValid = append(serverSetsEntriesValid, make([]bool, len(entryChs))) - } - go func() { defer close(results) + var marker, versionIDMarker string for { - entry, quorumCount, zoneIdx, ok := lexicallySortedEntryZoneVersions(serverSetsEntryChs, serverSetsEntriesInfos, serverSetsEntriesValid) - if !ok { - // We have reached EOF across all entryChs, break the loop. - return + loi, err := z.ListObjectVersions(ctx, bucket, prefix, marker, versionIDMarker, "", 1000) + if err != nil { + break } - if quorumCount >= serverSetsListTolerancePerSet[zoneIdx] { - for _, version := range entry.Versions { - results <- version.ToObjectInfo(bucket, version.Name) - } + for _, obj := range loi.Objects { + results <- obj } + + if !loi.IsTruncated { + break + } + + marker = loi.NextMarker + versionIDMarker = loi.NextVersionIDMarker } }() - return nil } - serverSetsEntryChs := make([][]FileInfoCh, 0, len(z.serverSets)) - for _, zone := range z.serverSets { - serverSetsEntryChs = append(serverSetsEntryChs, zone.startMergeWalks(ctx, bucket, prefix, "", true, ctx.Done())) - } - - serverSetsEntriesInfos := make([][]FileInfo, 0, len(serverSetsEntryChs)) - serverSetsEntriesValid := make([][]bool, 0, len(serverSetsEntryChs)) - for _, entryChs := range serverSetsEntryChs { - serverSetsEntriesInfos = append(serverSetsEntriesInfos, make([]FileInfo, len(entryChs))) - serverSetsEntriesValid = append(serverSetsEntriesValid, make([]bool, len(entryChs))) - } - go func() { defer close(results) + var marker string for { - entry, quorumCount, zoneIdx, ok := lexicallySortedEntryZone(serverSetsEntryChs, serverSetsEntriesInfos, serverSetsEntriesValid) - if !ok { - // We have reached EOF across all entryChs, break the loop. - return + loi, err := z.ListObjects(ctx, bucket, prefix, marker, "", 1000) + if err != nil { + break } - if quorumCount >= serverSetsListTolerancePerSet[zoneIdx] { - results <- entry.ToObjectInfo(bucket, entry.Name) + for _, obj := range loi.Objects { + results <- obj } + + if !loi.IsTruncated { + break + } + + marker = loi.NextMarker } }() diff --git a/cmd/metacache-server-sets.go b/cmd/metacache-server-sets.go index 574da7346..194040e8d 100644 --- a/cmd/metacache-server-sets.go +++ b/cmd/metacache-server-sets.go @@ -99,7 +99,8 @@ func (z *erasureServerSets) listPath(ctx context.Context, o listPathOptions) (en if err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { // Context is canceled, return at once. - return entries, err + // request canceled, no entries to return + return entries, io.EOF } logger.LogIf(ctx, err) cache = localMetacacheMgr.getTransient().findCache(o) @@ -109,17 +110,14 @@ func (z *erasureServerSets) listPath(ctx context.Context, o listPathOptions) (en } } if cache.fileNotFound { - return entries, errFileNotFound + // No cache found, no entries found. + return entries, io.EOF } // Only create if we created a new. o.Create = o.ID == cache.id o.ID = cache.id } - if o.AskDisks == 0 { - o.AskDisks = globalAPIConfig.getListQuorum() - } - var mu sync.Mutex var wg sync.WaitGroup var errs []error @@ -175,7 +173,8 @@ func (z *erasureServerSets) listPath(ctx context.Context, o listPathOptions) (en cache.fileNotFound = true _, err := o.updateMetacacheListing(cache, globalNotificationSys.restClientFromHash(o.Bucket)) logger.LogIf(ctx, err) - return entries, errFileNotFound + // cache returned not found, entries truncated. + return entries, io.EOF } for _, err := range errs {