|
|
|
@ -22,6 +22,7 @@ import ( |
|
|
|
|
"errors" |
|
|
|
|
"fmt" |
|
|
|
|
"io" |
|
|
|
|
"path" |
|
|
|
|
"runtime/debug" |
|
|
|
|
"strings" |
|
|
|
|
"sync" |
|
|
|
@ -29,6 +30,7 @@ import ( |
|
|
|
|
|
|
|
|
|
"github.com/klauspost/compress/s2" |
|
|
|
|
"github.com/minio/minio/cmd/logger" |
|
|
|
|
"github.com/minio/minio/pkg/console" |
|
|
|
|
"github.com/minio/minio/pkg/hash" |
|
|
|
|
"github.com/tinylib/msgp/msgp" |
|
|
|
|
) |
|
|
|
@ -43,6 +45,8 @@ type bucketMetacache struct { |
|
|
|
|
|
|
|
|
|
// caches indexed by id.
|
|
|
|
|
caches map[string]metacache |
|
|
|
|
// cache ids indexed by root paths
|
|
|
|
|
cachesRoot map[string][]string `msg:"-"` |
|
|
|
|
|
|
|
|
|
// Internal state
|
|
|
|
|
mu sync.RWMutex `msg:"-"` |
|
|
|
@ -65,6 +69,7 @@ func newBucketMetacache(bucket string, cleanup bool) *bucketMetacache { |
|
|
|
|
return &bucketMetacache{ |
|
|
|
|
bucket: bucket, |
|
|
|
|
caches: make(map[string]metacache, 10), |
|
|
|
|
cachesRoot: make(map[string][]string, 10), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -128,6 +133,11 @@ func loadBucketMetaCache(ctx context.Context, bucket string) (*bucketMetacache, |
|
|
|
|
logger.Info("loadBucketMetaCache: loaded cache name mismatch, want %s, got %s. Discarding.", bucket, meta.bucket) |
|
|
|
|
return newBucketMetacache(bucket, true), nil |
|
|
|
|
} |
|
|
|
|
meta.cachesRoot = make(map[string][]string, len(meta.caches)/10) |
|
|
|
|
// Index roots
|
|
|
|
|
for id, cache := range meta.caches { |
|
|
|
|
meta.cachesRoot[cache.root] = append(meta.cachesRoot[cache.root], id) |
|
|
|
|
} |
|
|
|
|
return &meta, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -188,10 +198,8 @@ func (b *bucketMetacache) findCache(o listPathOptions) metacache { |
|
|
|
|
return metacache{} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
debugPrint := func(msg string, data ...interface{}) {} |
|
|
|
|
if false { |
|
|
|
|
debugPrint = logger.Info |
|
|
|
|
} |
|
|
|
|
extend := globalAPIConfig.getExtendListLife() |
|
|
|
|
const debugPrint = false |
|
|
|
|
|
|
|
|
|
// Grab a write lock, since we create one if we cannot find one.
|
|
|
|
|
if o.Create { |
|
|
|
@ -204,51 +212,104 @@ func (b *bucketMetacache) findCache(o listPathOptions) metacache { |
|
|
|
|
|
|
|
|
|
// Check if exists already.
|
|
|
|
|
if c, ok := b.caches[o.ID]; ok { |
|
|
|
|
debugPrint("returning existing %v", o.ID) |
|
|
|
|
if debugPrint { |
|
|
|
|
console.Info("returning existing %v", o.ID) |
|
|
|
|
} |
|
|
|
|
return c |
|
|
|
|
} |
|
|
|
|
// No need to do expensive checks on transients.
|
|
|
|
|
if b.transient { |
|
|
|
|
if !o.Create { |
|
|
|
|
return metacache{ |
|
|
|
|
id: o.ID, |
|
|
|
|
bucket: o.Bucket, |
|
|
|
|
status: scanStateNone, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Create new
|
|
|
|
|
best := o.newMetacache() |
|
|
|
|
b.caches[o.ID] = best |
|
|
|
|
b.updated = true |
|
|
|
|
if debugPrint { |
|
|
|
|
console.Info("returning new cache %s, bucket: %v", best.id, best.bucket) |
|
|
|
|
} |
|
|
|
|
return best |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Potentially interesting caches.
|
|
|
|
|
// Will only add root if request is for root.
|
|
|
|
|
var interesting []string |
|
|
|
|
rootSplit := strings.Split(o.BaseDir, slashSeparator) |
|
|
|
|
for i := range rootSplit { |
|
|
|
|
want := path.Join(rootSplit[:i+1]...) |
|
|
|
|
if debugPrint { |
|
|
|
|
console.Info("base: %s, want: %s", o.BaseDir, want) |
|
|
|
|
} |
|
|
|
|
interesting = append(interesting, b.cachesRoot[want]...) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var best metacache |
|
|
|
|
extend := globalAPIConfig.getExtendListLife() |
|
|
|
|
for _, cached := range b.caches { |
|
|
|
|
// Never return transient caches if there is no id.
|
|
|
|
|
if b.transient { |
|
|
|
|
break |
|
|
|
|
for _, id := range interesting { |
|
|
|
|
cached, ok := b.caches[id] |
|
|
|
|
if !ok { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Never return transient caches if there is no id.
|
|
|
|
|
if cached.status == scanStateError || cached.status == scanStateNone || cached.dataVersion != metacacheStreamVersion { |
|
|
|
|
debugPrint("cache %s state or stream version mismatch", cached.id) |
|
|
|
|
if debugPrint { |
|
|
|
|
console.Info("cache %s state or stream version mismatch", cached.id) |
|
|
|
|
} |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
if cached.startedCycle < o.OldestCycle { |
|
|
|
|
debugPrint("cache %s cycle too old", cached.id) |
|
|
|
|
if debugPrint { |
|
|
|
|
console.Info("cache %s cycle too old", cached.id) |
|
|
|
|
} |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// If the existing listing wasn't recursive root must match.
|
|
|
|
|
if !cached.recursive && o.BaseDir != cached.root { |
|
|
|
|
if debugPrint { |
|
|
|
|
console.Info("cache %s non rec prefix mismatch, cached:%v, want:%v", cached.id, cached.root, o.BaseDir) |
|
|
|
|
} |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Root of what we are looking for must at least have
|
|
|
|
|
if !strings.HasPrefix(o.BaseDir, cached.root) { |
|
|
|
|
debugPrint("cache %s prefix mismatch, cached:%v, want:%v", cached.id, cached.root, o.BaseDir) |
|
|
|
|
if debugPrint { |
|
|
|
|
console.Info("cache %s prefix mismatch, cached:%v, want:%v", cached.id, cached.root, o.BaseDir) |
|
|
|
|
} |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
if cached.filter != "" && strings.HasPrefix(cached.filter, o.FilterPrefix) { |
|
|
|
|
debugPrint("cache %s cannot be used because of filter %s", cached.id, cached.filter) |
|
|
|
|
continue |
|
|
|
|
if debugPrint { |
|
|
|
|
console.Info("cache %s cannot be used because of filter %s", cached.id, cached.filter) |
|
|
|
|
} |
|
|
|
|
// If the existing listing wasn't recursive root must match.
|
|
|
|
|
if !cached.recursive && o.BaseDir != cached.root { |
|
|
|
|
debugPrint("cache %s non rec prefix mismatch, cached:%v, want:%v", cached.id, cached.root, o.BaseDir) |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if o.Recursive && !cached.recursive { |
|
|
|
|
debugPrint("cache %s not recursive", cached.id) |
|
|
|
|
if debugPrint { |
|
|
|
|
console.Info("cache %s not recursive", cached.id) |
|
|
|
|
} |
|
|
|
|
// If this is recursive the cached listing must be as well.
|
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
if o.Separator != slashSeparator && !cached.recursive { |
|
|
|
|
debugPrint("cache %s not slashsep and not recursive", cached.id) |
|
|
|
|
if debugPrint { |
|
|
|
|
console.Info("cache %s not slashsep and not recursive", cached.id) |
|
|
|
|
} |
|
|
|
|
// Non slash separator requires recursive.
|
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
if !cached.finished() && time.Since(cached.lastUpdate) > metacacheMaxRunningAge { |
|
|
|
|
debugPrint("cache %s not running, time: %v", cached.id, time.Since(cached.lastUpdate)) |
|
|
|
|
if debugPrint { |
|
|
|
|
console.Info("cache %s not running, time: %v", cached.id, time.Since(cached.lastUpdate)) |
|
|
|
|
} |
|
|
|
|
// Abandoned
|
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
@ -256,17 +317,23 @@ func (b *bucketMetacache) findCache(o listPathOptions) metacache { |
|
|
|
|
if cached.finished() && cached.endedCycle <= o.OldestCycle { |
|
|
|
|
if extend <= 0 { |
|
|
|
|
// If scan has ended the oldest requested must be less.
|
|
|
|
|
debugPrint("cache %s ended and cycle (%v) <= oldest allowed (%v)", cached.id, cached.endedCycle, o.OldestCycle) |
|
|
|
|
if debugPrint { |
|
|
|
|
console.Info("cache %s ended and cycle (%v) <= oldest allowed (%v)", cached.id, cached.endedCycle, o.OldestCycle) |
|
|
|
|
} |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
if time.Since(cached.lastUpdate) > metacacheMaxRunningAge+extend { |
|
|
|
|
// Cache ended within bloom cycle, but we can extend the life.
|
|
|
|
|
debugPrint("cache %s ended (%v) and beyond extended life (%v)", cached.id, cached.lastUpdate, extend+metacacheMaxRunningAge) |
|
|
|
|
if debugPrint { |
|
|
|
|
console.Info("cache %s ended (%v) and beyond extended life (%v)", cached.id, cached.lastUpdate, extend+metacacheMaxRunningAge) |
|
|
|
|
} |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if cached.started.Before(best.started) { |
|
|
|
|
debugPrint("cache %s disregarded - we have a better", cached.id) |
|
|
|
|
if debugPrint { |
|
|
|
|
console.Info("cache %s disregarded - we have a better", cached.id) |
|
|
|
|
} |
|
|
|
|
// If we already have a newer, keep that.
|
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
@ -278,7 +345,9 @@ func (b *bucketMetacache) findCache(o listPathOptions) metacache { |
|
|
|
|
b.caches[best.id] = best |
|
|
|
|
b.updated = true |
|
|
|
|
} |
|
|
|
|
debugPrint("returning cached %s, status: %v, ended: %v", best.id, best.status, best.ended) |
|
|
|
|
if debugPrint { |
|
|
|
|
console.Info("returning cached %s, status: %v, ended: %v", best.id, best.status, best.ended) |
|
|
|
|
} |
|
|
|
|
return best |
|
|
|
|
} |
|
|
|
|
if !o.Create { |
|
|
|
@ -292,8 +361,11 @@ func (b *bucketMetacache) findCache(o listPathOptions) metacache { |
|
|
|
|
// Create new and add.
|
|
|
|
|
best = o.newMetacache() |
|
|
|
|
b.caches[o.ID] = best |
|
|
|
|
b.cachesRoot[best.root] = append(b.cachesRoot[best.root], best.id) |
|
|
|
|
b.updated = true |
|
|
|
|
debugPrint("returning new cache %s, bucket: %v", best.id, best.bucket) |
|
|
|
|
if debugPrint { |
|
|
|
|
console.Info("returning new cache %s, bucket: %v", best.id, best.bucket) |
|
|
|
|
} |
|
|
|
|
return best |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -311,7 +383,7 @@ func (b *bucketMetacache) cleanup() { |
|
|
|
|
b.mu.RLock() |
|
|
|
|
for id, cache := range b.caches { |
|
|
|
|
if b.transient && time.Since(cache.lastUpdate) > 15*time.Minute && time.Since(cache.lastHandout) > 15*time.Minute { |
|
|
|
|
// Keep transient caches only for 1 hour.
|
|
|
|
|
// Keep transient caches only for 15 minutes.
|
|
|
|
|
remove[id] = struct{}{} |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
@ -341,7 +413,11 @@ func (b *bucketMetacache) cleanup() { |
|
|
|
|
if _, ok := remove[id]; ok { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
for _, cache2 := range b.caches { |
|
|
|
|
for id2, cache2 := range b.caches { |
|
|
|
|
if _, ok := remove[id2]; ok { |
|
|
|
|
// Don't check against one we are already removing
|
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
if cache.canBeReplacedBy(&cache2) { |
|
|
|
|
debugPrint("cache %s can be replaced by %s", id, cache2.id) |
|
|
|
|
remove[id] = struct{}{} |
|
|
|
@ -421,6 +497,7 @@ func (b *bucketMetacache) deleteAll() { |
|
|
|
|
// Delete all.
|
|
|
|
|
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(b.bucket, slashSeparator)) |
|
|
|
|
b.caches = make(map[string]metacache, 10) |
|
|
|
|
b.cachesRoot = make(map[string][]string, 10) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -442,6 +519,15 @@ func (b *bucketMetacache) deleteCache(id string) { |
|
|
|
|
b.mu.Lock() |
|
|
|
|
c, ok := b.caches[id] |
|
|
|
|
if ok { |
|
|
|
|
// Delete from root map.
|
|
|
|
|
list := b.cachesRoot[c.root] |
|
|
|
|
for i, lid := range list { |
|
|
|
|
if id == lid { |
|
|
|
|
list = append(list[:i], list[i+1:]...) |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
b.cachesRoot[c.root] = list |
|
|
|
|
delete(b.caches, id) |
|
|
|
|
b.updated = true |
|
|
|
|
} |
|
|
|
|