Enforce quota checks after crawl (#10036)

Enforce bucket quotas when crawling has finished. 
This ensures that we will not do quota enforcement on old data.

Additionally, delete less if we are closer to quota than we thought.
master
Klaus Post 4 years ago committed by GitHub
parent 14ff7f5fcf
commit 00d3cc4b69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 185
      cmd/bucket-quota.go
  2. 15
      cmd/data-usage-cache.go
  3. 9
      cmd/disk-cache-utils.go
  4. 12
      cmd/erasure-zones.go
  5. 1
      cmd/fs-v1.go
  6. 1
      cmd/server-main.go

@ -22,9 +22,7 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/minio/minio/cmd/config"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/env"
"github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/madmin" "github.com/minio/minio/pkg/madmin"
) )
@ -120,149 +118,112 @@ func enforceBucketQuota(ctx context.Context, bucket string, size int64) error {
return globalBucketQuotaSys.check(ctx, bucket, size) return globalBucketQuotaSys.check(ctx, bucket, size)
} }
const ( // enforceFIFOQuota deletes objects in FIFO order until sufficient objects
bgQuotaInterval = 1 * time.Hour // have been deleted so as to bring bucket usage within quota.
) func enforceFIFOQuotaBucket(ctx context.Context, objectAPI ObjectLayer, bucket string, bui BucketUsageInfo) {
// Check if the current bucket has quota restrictions, if not skip it
// initQuotaEnforcement starts the routine that deletes objects in bucket cfg, err := globalBucketQuotaSys.Get(bucket)
// that exceeds the FIFO quota if err != nil {
func initQuotaEnforcement(ctx context.Context, objAPI ObjectLayer) { return
if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOn {
go startBucketQuotaEnforcement(ctx, objAPI)
} }
}
func startBucketQuotaEnforcement(ctx context.Context, objAPI ObjectLayer) { if cfg.Type != madmin.FIFOQuota {
for { return
select { }
case <-ctx.Done():
return
case <-time.NewTimer(bgQuotaInterval).C:
enforceFIFOQuota(ctx, objAPI)
}
var toFree uint64
if bui.Size > cfg.Quota && cfg.Quota > 0 {
toFree = bui.Size - cfg.Quota
} }
}
// enforceFIFOQuota deletes objects in FIFO order until sufficient objects if toFree <= 0 {
// have been deleted so as to bring bucket usage within quota
func enforceFIFOQuota(ctx context.Context, objectAPI ObjectLayer) {
// Turn off quota enforcement if data usage info is unavailable.
if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOff {
return return
} }
buckets, err := objectAPI.ListBuckets(ctx) // Allocate new results channel to receive ObjectInfo.
if err != nil { objInfoCh := make(chan ObjectInfo)
versioned := globalBucketVersioningSys.Enabled(bucket)
// Walk through all objects
if err := objectAPI.Walk(ctx, bucket, "", objInfoCh, ObjectOptions{WalkVersions: versioned}); err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
return return
} }
dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI) // reuse the fileScorer used by disk cache to score entries by
// ModTime to find the oldest objects in bucket to delete. In
// the context of bucket quota enforcement - number of hits are
// irrelevant.
scorer, err := newFileScorer(toFree, time.Now().Unix(), 1)
if err != nil { if err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
return return
} }
for _, binfo := range buckets { rcfg, _ := globalBucketObjectLockSys.Get(bucket)
bucket := binfo.Name for obj := range objInfoCh {
if obj.DeleteMarker {
bui, ok := dataUsageInfo.BucketsUsage[bucket] // Delete markers are automatically added for FIFO purge.
if !ok { scorer.addFileWithObjInfo(obj, 1)
// bucket doesn't exist anymore, or we
// do not have any information to proceed.
continue
}
// Check if the current bucket has quota restrictions, if not skip it
cfg, err := globalBucketQuotaSys.Get(bucket)
if err != nil {
continue
}
if cfg.Type != madmin.FIFOQuota {
continue continue
} }
// skip objects currently under retention
var toFree uint64 if rcfg.LockEnabled && enforceRetentionForDeletion(ctx, obj) {
if bui.Size > cfg.Quota && cfg.Quota > 0 {
toFree = bui.Size - cfg.Quota
}
if toFree == 0 {
continue continue
} }
scorer.addFileWithObjInfo(obj, 1)
}
// Allocate new results channel to receive ObjectInfo. // If we saw less than quota we are good.
objInfoCh := make(chan ObjectInfo) if scorer.seenBytes <= cfg.Quota {
return
versioned := globalBucketVersioningSys.Enabled(bucket) }
// Calculate how much we want to delete now.
// Walk through all objects toFreeNow := scorer.seenBytes - cfg.Quota
if err := objectAPI.Walk(ctx, bucket, "", objInfoCh, ObjectOptions{WalkVersions: versioned}); err != nil { // We were less over quota than we thought. Adjust so we delete less.
logger.LogIf(ctx, err) // If we are more over, leave it for the next run to pick up.
continue if toFreeNow < toFree {
if !scorer.adjustSaveBytes(int64(toFreeNow) - int64(toFree)) {
// We got below or at quota.
return
} }
}
// reuse the fileScorer used by disk cache to score entries by var objects []ObjectToDelete
// ModTime to find the oldest objects in bucket to delete. In numKeys := len(scorer.fileObjInfos())
// the context of bucket quota enforcement - number of hits are for i, obj := range scorer.fileObjInfos() {
// irrelevant. objects = append(objects, ObjectToDelete{
scorer, err := newFileScorer(toFree, time.Now().Unix(), 1) ObjectName: obj.Name,
if err != nil { VersionID: obj.VersionID,
logger.LogIf(ctx, err) })
if len(objects) < maxDeleteList && (i < numKeys-1) {
// skip deletion until maxDeleteList or end of slice
continue continue
} }
rcfg, _ := globalBucketObjectLockSys.Get(bucket) if len(objects) == 0 {
for obj := range objInfoCh { break
if obj.DeleteMarker {
// Delete markers are automatically added for FIFO purge.
scorer.addFileWithObjInfo(obj, 1)
continue
}
// skip objects currently under retention
if rcfg.LockEnabled && enforceRetentionForDeletion(ctx, obj) {
continue
}
scorer.addFileWithObjInfo(obj, 1)
} }
var objects []ObjectToDelete // Deletes a list of objects.
numKeys := len(scorer.fileObjInfos()) _, deleteErrs := objectAPI.DeleteObjects(ctx, bucket, objects, ObjectOptions{
for i, obj := range scorer.fileObjInfos() { Versioned: versioned,
objects = append(objects, ObjectToDelete{ })
ObjectName: obj.Name, for i := range deleteErrs {
VersionID: obj.VersionID, if deleteErrs[i] != nil {
}) logger.LogIf(ctx, deleteErrs[i])
if len(objects) < maxDeleteList && (i < numKeys-1) {
// skip deletion until maxDeleteList or end of slice
continue continue
} }
if len(objects) == 0 { // Notify object deleted event.
break sendEvent(eventArgs{
} EventName: event.ObjectRemovedDelete,
BucketName: bucket,
// Deletes a list of objects. Object: obj,
_, deleteErrs := objectAPI.DeleteObjects(ctx, bucket, objects, ObjectOptions{ Host: "Internal: [FIFO-QUOTA-EXPIRY]",
Versioned: versioned,
}) })
for i := range deleteErrs {
if deleteErrs[i] != nil {
logger.LogIf(ctx, deleteErrs[i])
continue
}
// Notify object deleted event.
sendEvent(eventArgs{
EventName: event.ObjectRemovedDelete,
BucketName: bucket,
Object: obj,
Host: "Internal: [FIFO-QUOTA-EXPIRY]",
})
}
objects = nil
} }
objects = nil
} }
} }

@ -335,6 +335,21 @@ func (d *dataUsageCache) bucketsUsageInfo(buckets []BucketInfo) map[string]Bucke
return dst return dst
} }
// bucketUsageInfo returns the buckets usage info.
// If not found all values returned are zero values.
func (d *dataUsageCache) bucketUsageInfo(bucket string) BucketUsageInfo {
e := d.find(bucket)
if e == nil {
return BucketUsageInfo{}
}
flat := d.flatten(*e)
return BucketUsageInfo{
Size: uint64(flat.Size),
ObjectsCount: uint64(flat.Objects),
ObjectSizesHistogram: flat.ObjSizes.toMap(),
}
}
// sizeRecursive returns the path as a flattened entry. // sizeRecursive returns the path as a flattened entry.
func (d *dataUsageCache) sizeRecursive(path string) *dataUsageEntry { func (d *dataUsageCache) sizeRecursive(path string) *dataUsageEntry {
root := d.find(path) root := d.find(path)

@ -295,6 +295,7 @@ type fileScorer struct {
// The list is kept sorted according to score, highest at top, lowest at bottom. // The list is kept sorted according to score, highest at top, lowest at bottom.
queue list.List queue list.List
queuedBytes uint64 queuedBytes uint64
seenBytes uint64
} }
type queuedFile struct { type queuedFile struct {
@ -337,6 +338,7 @@ func (f *fileScorer) addFileWithObjInfo(objInfo ObjectInfo, hits int) {
versionID: objInfo.VersionID, versionID: objInfo.VersionID,
size: uint64(objInfo.Size), size: uint64(objInfo.Size),
} }
f.seenBytes += uint64(objInfo.Size)
var score float64 var score float64
if objInfo.ModTime.IsZero() { if objInfo.ModTime.IsZero() {
@ -369,9 +371,14 @@ func (f *fileScorer) addFileWithObjInfo(objInfo ObjectInfo, hits int) {
// adjustSaveBytes allows to adjust the number of bytes to save. // adjustSaveBytes allows to adjust the number of bytes to save.
// This can be used to adjust the count on the fly. // This can be used to adjust the count on the fly.
// Returns true if there still is a need to delete files (saveBytes >0), // Returns true if there still is a need to delete files (n+saveBytes >0),
// false if no more bytes needs to be saved. // false if no more bytes needs to be saved.
func (f *fileScorer) adjustSaveBytes(n int64) bool { func (f *fileScorer) adjustSaveBytes(n int64) bool {
if int64(f.saveBytes)+n <= 0 {
f.saveBytes = 0
f.trimQueue()
return false
}
if n < 0 { if n < 0 {
f.saveBytes -= ^uint64(n - 1) f.saveBytes -= ^uint64(n - 1)
} else { } else {

@ -337,13 +337,15 @@ func (z *erasureZones) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter
updateTicker := time.NewTicker(30 * time.Second) updateTicker := time.NewTicker(30 * time.Second)
defer updateTicker.Stop() defer updateTicker.Stop()
var lastUpdate time.Time var lastUpdate time.Time
// We need to merge since we will get the same buckets from each zone.
// Therefore to get the exact bucket sizes we must merge before we can convert.
allMerged := dataUsageCache{Info: dataUsageCacheInfo{Name: dataUsageRoot}}
update := func() { update := func() {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
// We need to merge since we will get the same buckets from each zone.
// Therefore to get the exact bucket sizes we must merge before we can convert.
allMerged := dataUsageCache{Info: dataUsageCacheInfo{Name: dataUsageRoot}}
for _, info := range results { for _, info := range results {
if info.Info.LastUpdate.IsZero() { if info.Info.LastUpdate.IsZero() {
// Not filled yet. // Not filled yet.
@ -362,6 +364,10 @@ func (z *erasureZones) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter
return return
case v := <-updateCloser: case v := <-updateCloser:
update() update()
// Enforce quotas when all is done.
for _, b := range allBuckets {
enforceFIFOQuotaBucket(ctx, z, b.Name, allMerged.bucketUsageInfo(b.Name))
}
close(v) close(v)
return return
case <-updateTicker.C: case <-updateTicker.C:

@ -301,6 +301,7 @@ func (fs *FSObjects) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter,
logger.LogIf(ctx, totalCache.save(ctx, fs, dataUsageCacheName)) logger.LogIf(ctx, totalCache.save(ctx, fs, dataUsageCacheName))
cloned := totalCache.clone() cloned := totalCache.clone()
updates <- cloned.dui(dataUsageRoot, buckets) updates <- cloned.dui(dataUsageRoot, buckets)
enforceFIFOQuotaBucket(ctx, fs, b.Name, cloned.bucketUsageInfo(b.Name))
} }
return nil return nil

@ -360,7 +360,6 @@ func startBackgroundOps(ctx context.Context, objAPI ObjectLayer) {
} }
initDataCrawler(ctx, objAPI) initDataCrawler(ctx, objAPI)
initQuotaEnforcement(ctx, objAPI)
} }
// serverMain handler called for 'minio server' command. // serverMain handler called for 'minio server' command.

Loading…
Cancel
Save