use list cache for Walk() with webUI and quota (#10814)

bring list cache optimizations for web UI
object listing, also FIFO quota enforcement
through list cache as well.
master
Harshavardhana 4 years ago committed by GitHub
parent 68de5a6f6a
commit ad382799b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 168
      cmd/erasure-server-sets.go
  2. 13
      cmd/metacache-server-sets.go

@ -658,99 +658,6 @@ func (z *erasureServerSets) ListObjectsV2(ctx context.Context, bucket, prefix, c
return listObjectsV2Info, err return listObjectsV2Info, err
} }
// Calculate least entry across zones and across multiple FileInfo
// channels, returns the least common entry and the total number of times
// we found this entry. Additionally also returns a boolean
// to indicate if the caller needs to call this function
// again to list the next entry. It is callers responsibility
// if the caller wishes to list N entries to call lexicallySortedEntry
// N times until this boolean is 'false'.
func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) (FileInfo, int, int, bool) {
for i, entryChs := range zoneEntryChs {
for j := range entryChs {
zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop()
}
}
var isTruncated = false
for _, entriesValid := range zoneEntriesValid {
for _, valid := range entriesValid {
if !valid {
continue
}
isTruncated = true
break
}
if isTruncated {
break
}
}
var lentry FileInfo
var found bool
var zoneIndex = -1
// TODO: following loop can be merged with above
// loop, explore this possibility.
for i, entriesValid := range zoneEntriesValid {
for j, valid := range entriesValid {
if !valid {
continue
}
if !found {
lentry = zoneEntries[i][j]
found = true
zoneIndex = i
continue
}
str1 := zoneEntries[i][j].Name
str2 := lentry.Name
if HasSuffix(str1, globalDirSuffix) {
str1 = strings.TrimSuffix(str1, globalDirSuffix) + slashSeparator
}
if HasSuffix(str2, globalDirSuffix) {
str2 = strings.TrimSuffix(str2, globalDirSuffix) + slashSeparator
}
if str1 < str2 {
lentry = zoneEntries[i][j]
zoneIndex = i
}
}
}
// We haven't been able to find any least entry,
// this would mean that we don't have valid entry.
if !found {
return lentry, 0, zoneIndex, isTruncated
}
lexicallySortedEntryCount := 0
for i, entriesValid := range zoneEntriesValid {
for j, valid := range entriesValid {
if !valid {
continue
}
// Entries are duplicated across disks,
// we should simply skip such entries.
if lentry.Name == zoneEntries[i][j].Name && lentry.ModTime.Equal(zoneEntries[i][j].ModTime) {
lexicallySortedEntryCount++
continue
}
// Push all entries which are lexically higher
// and will be returned later in Pop()
zoneEntryChs[i][j].Push(zoneEntries[i][j])
}
}
if HasSuffix(lentry.Name, globalDirSuffix) {
lentry.Name = strings.TrimSuffix(lentry.Name, globalDirSuffix) + slashSeparator
}
return lentry, lexicallySortedEntryCount, zoneIndex, isTruncated
}
// Calculate least entry across serverSets and across multiple FileInfoVersions // Calculate least entry across serverSets and across multiple FileInfoVersions
// channels, returns the least common entry and the total number of times // channels, returns the least common entry and the total number of times
// we found this entry. Additionally also returns a boolean // we found this entry. Additionally also returns a boolean
@ -854,6 +761,7 @@ func (z *erasureServerSets) ListObjectVersions(ctx context.Context, bucket, pref
Limit: maxKeys, Limit: maxKeys,
Marker: marker, Marker: marker,
InclDeleted: true, InclDeleted: true,
AskDisks: globalAPIConfig.getListQuorum(),
}) })
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return loi, err return loi, err
@ -881,6 +789,7 @@ func (z *erasureServerSets) ListObjects(ctx context.Context, bucket, prefix, mar
Limit: maxKeys, Limit: maxKeys,
Marker: marker, Marker: marker,
InclDeleted: false, InclDeleted: false,
AskDisks: globalAPIConfig.getListQuorum(),
}) })
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
@ -1330,76 +1239,51 @@ func (z *erasureServerSets) Walk(ctx context.Context, bucket, prefix string, res
return err return err
} }
serverSetsListTolerancePerSet := make([]int, 0, len(z.serverSets))
for _, zone := range z.serverSets {
quorum := globalAPIConfig.getListQuorum()
switch quorum {
case -1:
serverSetsListTolerancePerSet = append(serverSetsListTolerancePerSet, zone.setDriveCount/2)
default:
serverSetsListTolerancePerSet = append(serverSetsListTolerancePerSet, quorum)
}
}
if opts.WalkVersions { if opts.WalkVersions {
var serverSetsEntryChs [][]FileInfoVersionsCh
for _, zone := range z.serverSets {
serverSetsEntryChs = append(serverSetsEntryChs, zone.startMergeWalksVersions(ctx, bucket, prefix, "", true, ctx.Done()))
}
var serverSetsEntriesInfos [][]FileInfoVersions
var serverSetsEntriesValid [][]bool
for _, entryChs := range serverSetsEntryChs {
serverSetsEntriesInfos = append(serverSetsEntriesInfos, make([]FileInfoVersions, len(entryChs)))
serverSetsEntriesValid = append(serverSetsEntriesValid, make([]bool, len(entryChs)))
}
go func() { go func() {
defer close(results) defer close(results)
var marker, versionIDMarker string
for { for {
entry, quorumCount, zoneIdx, ok := lexicallySortedEntryZoneVersions(serverSetsEntryChs, serverSetsEntriesInfos, serverSetsEntriesValid) loi, err := z.ListObjectVersions(ctx, bucket, prefix, marker, versionIDMarker, "", 1000)
if !ok { if err != nil {
// We have reached EOF across all entryChs, break the loop. break
return
} }
if quorumCount >= serverSetsListTolerancePerSet[zoneIdx] { for _, obj := range loi.Objects {
for _, version := range entry.Versions { results <- obj
results <- version.ToObjectInfo(bucket, version.Name)
}
} }
}
}()
return nil if !loi.IsTruncated {
break
} }
serverSetsEntryChs := make([][]FileInfoCh, 0, len(z.serverSets)) marker = loi.NextMarker
for _, zone := range z.serverSets { versionIDMarker = loi.NextVersionIDMarker
serverSetsEntryChs = append(serverSetsEntryChs, zone.startMergeWalks(ctx, bucket, prefix, "", true, ctx.Done()))
} }
}()
serverSetsEntriesInfos := make([][]FileInfo, 0, len(serverSetsEntryChs)) return nil
serverSetsEntriesValid := make([][]bool, 0, len(serverSetsEntryChs))
for _, entryChs := range serverSetsEntryChs {
serverSetsEntriesInfos = append(serverSetsEntriesInfos, make([]FileInfo, len(entryChs)))
serverSetsEntriesValid = append(serverSetsEntriesValid, make([]bool, len(entryChs)))
} }
go func() { go func() {
defer close(results) defer close(results)
var marker string
for { for {
entry, quorumCount, zoneIdx, ok := lexicallySortedEntryZone(serverSetsEntryChs, serverSetsEntriesInfos, serverSetsEntriesValid) loi, err := z.ListObjects(ctx, bucket, prefix, marker, "", 1000)
if !ok { if err != nil {
// We have reached EOF across all entryChs, break the loop. break
return
} }
if quorumCount >= serverSetsListTolerancePerSet[zoneIdx] { for _, obj := range loi.Objects {
results <- entry.ToObjectInfo(bucket, entry.Name) results <- obj
} }
if !loi.IsTruncated {
break
}
marker = loi.NextMarker
} }
}() }()

@ -99,7 +99,8 @@ func (z *erasureServerSets) listPath(ctx context.Context, o listPathOptions) (en
if err != nil { if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// Context is canceled, return at once. // Context is canceled, return at once.
return entries, err // request canceled, no entries to return
return entries, io.EOF
} }
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
cache = localMetacacheMgr.getTransient().findCache(o) cache = localMetacacheMgr.getTransient().findCache(o)
@ -109,17 +110,14 @@ func (z *erasureServerSets) listPath(ctx context.Context, o listPathOptions) (en
} }
} }
if cache.fileNotFound { if cache.fileNotFound {
return entries, errFileNotFound // No cache found, no entries found.
return entries, io.EOF
} }
// Only create if we created a new. // Only create if we created a new.
o.Create = o.ID == cache.id o.Create = o.ID == cache.id
o.ID = cache.id o.ID = cache.id
} }
if o.AskDisks == 0 {
o.AskDisks = globalAPIConfig.getListQuorum()
}
var mu sync.Mutex var mu sync.Mutex
var wg sync.WaitGroup var wg sync.WaitGroup
var errs []error var errs []error
@ -175,7 +173,8 @@ func (z *erasureServerSets) listPath(ctx context.Context, o listPathOptions) (en
cache.fileNotFound = true cache.fileNotFound = true
_, err := o.updateMetacacheListing(cache, globalNotificationSys.restClientFromHash(o.Bucket)) _, err := o.updateMetacacheListing(cache, globalNotificationSys.restClientFromHash(o.Bucket))
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
return entries, errFileNotFound // cache returned not found, entries truncated.
return entries, io.EOF
} }
for _, err := range errs { for _, err := range errs {

Loading…
Cancel
Save