cache: Only start at high watermark (#10403)

Currently, cache purges are triggered as soon as the low watermark is exceeded.
To reduce IO this should only be done when reaching the high watermark.
This simplifies checks and reduces all calls for a GC to go through
`dcache.diskSpaceAvailable(size)`. While a comment claims that 
`dcache.triggerGC <- struct{}{}` was non-blocking I don't see how 
that was possible. Instead, we add a 1 size to the queue channel 
and use channel  semantics to avoid blocking when a GC has 
already been requested.

`bytesToClear` now takes the high watermark into account to it will 
not request any bytes to be cleared until that is reached.
master
Klaus Post 4 years ago committed by GitHub
parent d08b4b147d
commit 650dccfa9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 54
      cmd/disk-cache-backend.go
  2. 12
      cmd/disk-cache-utils.go
  3. 20
      cmd/disk-cache-utils_test.go
  4. 17
      cmd/disk-cache.go
  5. 4
      docs/disk-caching/README.md

@ -157,7 +157,7 @@ func newDiskCache(ctx context.Context, dir string, config cache.Config) (*diskCa
} }
cache := diskCache{ cache := diskCache{
dir: dir, dir: dir,
triggerGC: make(chan struct{}), triggerGC: make(chan struct{}, 1),
stats: CacheDiskStats{Dir: dir}, stats: CacheDiskStats{Dir: dir},
quotaPct: quotaPct, quotaPct: quotaPct,
after: config.After, after: config.After,
@ -174,7 +174,7 @@ func newDiskCache(ctx context.Context, dir string, config cache.Config) (*diskCa
nsMutex: newNSLock(false), nsMutex: newNSLock(false),
} }
go cache.purgeWait(ctx) go cache.purgeWait(ctx)
cache.diskUsageHigh() // update if cache usage is already high. cache.diskSpaceAvailable(0) // update if cache usage is already high.
cache.NewNSLockFn = func(ctx context.Context, cachePath string) RWLocker { cache.NewNSLockFn = func(ctx context.Context, cachePath string) RWLocker {
return cache.nsMutex.NewNSLock(ctx, nil, cachePath, "") return cache.nsMutex.NewNSLock(ctx, nil, cachePath, "")
} }
@ -203,9 +203,9 @@ func (c *diskCache) diskUsageLow() bool {
return low return low
} }
// Returns if the disk usage reaches high water mark w.r.t the configured cache quota. // Returns if the disk usage reaches or exceeds configured cache quota when size is added.
// gc starts if high water mark reached. // If current usage without size exceeds high watermark a GC is automatically queued.
func (c *diskCache) diskUsageHigh() bool { func (c *diskCache) diskSpaceAvailable(size int64) bool {
gcTriggerPct := c.quotaPct * c.highWatermark / 100 gcTriggerPct := c.quotaPct * c.highWatermark / 100
di, err := disk.GetInfo(c.dir) di, err := disk.GetInfo(c.dir)
if err != nil { if err != nil {
@ -214,27 +214,30 @@ func (c *diskCache) diskUsageHigh() bool {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
return false return false
} }
usedPercent := (di.Total - di.Free) * 100 / di.Total if di.Total == 0 {
high := int(usedPercent) >= gcTriggerPct logger.Info("diskCache: Received 0 total disk size")
atomic.StoreUint64(&c.stats.UsagePercent, usedPercent) return false
if high { }
usedPercent := float64(di.Total-di.Free) * 100 / float64(di.Total)
if usedPercent >= float64(gcTriggerPct) {
atomic.StoreInt32(&c.stats.UsageState, 1) atomic.StoreInt32(&c.stats.UsageState, 1)
c.queueGC()
} }
return high atomic.StoreUint64(&c.stats.UsagePercent, uint64(usedPercent))
// Recalculate percentage with provided size added.
usedPercent = float64(di.Total-di.Free+uint64(size)) * 100 / float64(di.Total)
return usedPercent < float64(c.quotaPct)
} }
// Returns if size space can be allocated without exceeding // queueGC will queue a GC.
// max disk usable for caching // Calling this function is always non-blocking.
func (c *diskCache) diskAvailable(size int64) bool { func (c *diskCache) queueGC() {
di, err := disk.GetInfo(c.dir) select {
if err != nil { case c.triggerGC <- struct{}{}:
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", c.dir) default:
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
logger.LogIf(ctx, err)
return false
} }
usedPercent := (di.Total - (di.Free - uint64(size))) * 100 / di.Total
return int(usedPercent) < c.quotaPct
} }
// toClear returns how many bytes should be cleared to reach the low watermark quota. // toClear returns how many bytes should be cleared to reach the low watermark quota.
@ -247,7 +250,7 @@ func (c *diskCache) toClear() uint64 {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
return 0 return 0
} }
return bytesToClear(int64(di.Total), int64(di.Free), uint64(c.quotaPct), uint64(c.lowWatermark)) return bytesToClear(int64(di.Total), int64(di.Free), uint64(c.quotaPct), uint64(c.lowWatermark), uint64(c.highWatermark))
} }
var ( var (
@ -658,8 +661,7 @@ func newCacheEncryptMetadata(bucket, object string, metadata map[string]string)
// Caches the object to disk // Caches the object to disk
func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions, incHitsOnly bool) error { func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions, incHitsOnly bool) error {
if c.diskUsageHigh() { if !c.diskSpaceAvailable(size) {
c.triggerGC <- struct{}{}
io.Copy(ioutil.Discard, data) io.Copy(ioutil.Discard, data)
return errDiskFull return errDiskFull
} }
@ -688,7 +690,7 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read
if rs != nil { if rs != nil {
return c.putRange(ctx, bucket, object, data, size, rs, opts) return c.putRange(ctx, bucket, object, data, size, rs, opts)
} }
if !c.diskAvailable(size) { if !c.diskSpaceAvailable(size) {
return errDiskFull return errDiskFull
} }
if err := os.MkdirAll(cachePath, 0777); err != nil { if err := os.MkdirAll(cachePath, 0777); err != nil {
@ -730,7 +732,7 @@ func (c *diskCache) putRange(ctx context.Context, bucket, object string, data io
if err != nil { if err != nil {
return err return err
} }
if !c.diskAvailable(rlen) { if !c.diskSpaceAvailable(rlen) {
return errDiskFull return errDiskFull
} }
cachePath := getCacheSHADir(c.dir, bucket, object) cachePath := getCacheSHADir(c.dir, bucket, object)

@ -489,9 +489,15 @@ func (f *fileScorer) queueString() string {
// bytesToClear() returns the number of bytes to clear to reach low watermark // bytesToClear() returns the number of bytes to clear to reach low watermark
// w.r.t quota given disk total and free space, quota in % allocated to cache // w.r.t quota given disk total and free space, quota in % allocated to cache
// and low watermark % w.r.t allowed quota. // and low watermark % w.r.t allowed quota.
func bytesToClear(total, free int64, quotaPct, lowWatermark uint64) uint64 { // If the high watermark hasn't been reached 0 will be returned.
used := (total - free) func bytesToClear(total, free int64, quotaPct, lowWatermark, highWatermark uint64) uint64 {
used := total - free
quotaAllowed := total * (int64)(quotaPct) / 100 quotaAllowed := total * (int64)(quotaPct) / 100
lowWMUsage := (total * (int64)(lowWatermark*quotaPct) / (100 * 100)) highWMUsage := total * (int64)(highWatermark*quotaPct) / (100 * 100)
if used < highWMUsage {
return 0
}
// Return bytes needed to reach low watermark.
lowWMUsage := total * (int64)(lowWatermark*quotaPct) / (100 * 100)
return (uint64)(math.Min(float64(quotaAllowed), math.Max(0.0, float64(used-lowWMUsage)))) return (uint64)(math.Min(float64(quotaAllowed), math.Max(0.0, float64(used-lowWMUsage))))
} }

@ -153,18 +153,22 @@ func TestBytesToClear(t *testing.T) {
free int64 free int64
quotaPct uint64 quotaPct uint64
watermarkLow uint64 watermarkLow uint64
watermarkHigh uint64
expected uint64 expected uint64
}{ }{
{1000, 800, 40, 90, 0}, {total: 1000, free: 800, quotaPct: 40, watermarkLow: 90, watermarkHigh: 90, expected: 0},
{1000, 200, 40, 90, 400}, {total: 1000, free: 200, quotaPct: 40, watermarkLow: 90, watermarkHigh: 90, expected: 400},
{1000, 400, 40, 90, 240}, {total: 1000, free: 400, quotaPct: 40, watermarkLow: 90, watermarkHigh: 90, expected: 240},
{1000, 600, 40, 90, 40}, {total: 1000, free: 600, quotaPct: 40, watermarkLow: 90, watermarkHigh: 90, expected: 40},
{1000, 600, 40, 70, 120}, {total: 1000, free: 600, quotaPct: 40, watermarkLow: 70, watermarkHigh: 70, expected: 120},
{1000, 1000, 90, 70, 0}, {total: 1000, free: 1000, quotaPct: 90, watermarkLow: 70, watermarkHigh: 70, expected: 0},
{1000, 0, 90, 70, 370},
// High not yet reached..
{total: 1000, free: 250, quotaPct: 100, watermarkLow: 50, watermarkHigh: 90, expected: 0},
{total: 1000, free: 250, quotaPct: 100, watermarkLow: 50, watermarkHigh: 90, expected: 0},
} }
for i, tc := range testCases { for i, tc := range testCases {
toClear := bytesToClear(tc.total, tc.free, tc.quotaPct, tc.watermarkLow) toClear := bytesToClear(tc.total, tc.free, tc.quotaPct, tc.watermarkLow, tc.watermarkHigh)
if tc.expected != toClear { if tc.expected != toClear {
t.Errorf("test %d expected %v, got %v", i, tc.expected, toClear) t.Errorf("test %d expected %v, got %v", i, tc.expected, toClear)
} }

@ -284,12 +284,6 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
// Reaching here implies cache miss // Reaching here implies cache miss
c.cacheStats.incMiss() c.cacheStats.incMiss()
// Since we got here, we are serving the request from backend,
// and also adding the object to the cache.
if dcache.diskUsageHigh() {
dcache.triggerGC <- struct{}{} // this is non-blocking
}
bkReader, bkErr := c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts) bkReader, bkErr := c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
if bkErr != nil { if bkErr != nil {
@ -306,7 +300,9 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
if cacheErr == nil { if cacheErr == nil {
bkReader.ObjInfo.CacheLookupStatus = CacheHit bkReader.ObjInfo.CacheLookupStatus = CacheHit
} }
if !dcache.diskAvailable(objInfo.Size) {
// Check if we can add it without exceeding total cache size.
if !dcache.diskSpaceAvailable(objInfo.Size) {
return bkReader, bkErr return bkReader, bkErr
} }
@ -612,9 +608,10 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r *
} }
// fetch from backend if there is no space on cache drive // fetch from backend if there is no space on cache drive
if !dcache.diskAvailable(size) { if !dcache.diskSpaceAvailable(size) {
return putObjectFn(ctx, bucket, object, r, opts) return putObjectFn(ctx, bucket, object, r, opts)
} }
if opts.ServerSideEncryption != nil { if opts.ServerSideEncryption != nil {
dcache.Delete(ctx, bucket, object) dcache.Delete(ctx, bucket, object)
return putObjectFn(ctx, bucket, object, r, opts) return putObjectFn(ctx, bucket, object, r, opts)
@ -721,7 +718,9 @@ func (c *cacheObjects) gc(ctx context.Context) {
} }
for _, dcache := range c.cache { for _, dcache := range c.cache {
if dcache != nil { if dcache != nil {
dcache.triggerGC <- struct{}{} // Check if there is disk.
// Will queue a GC scan if at high watermark.
dcache.diskSpaceAvailable(0)
} }
} }
} }

@ -29,6 +29,10 @@ export MINIO_CACHE_WATERMARK_HIGH=90
minio gateway s3 minio gateway s3
``` ```
The `CACHE_WATERMARK` numbers are percentages of `CACHE_QUOTA`.
In the example above this means that `MINIO_CACHE_WATERMARK_LOW` is effectively `0.8 * 0.7 * 100 = 56%` and the `MINIO_CACHE_WATERMARK_LOW` is effectively `0.8 * 0.9 * 100 = 72%` of total disk space.
### 3. Test your setup ### 3. Test your setup
To test this setup, access the MinIO gateway via browser or [`mc`](https://docs.min.io/docs/minio-client-quickstart-guide). You’ll see the uploaded files are accessible from all the MinIO endpoints. To test this setup, access the MinIO gateway via browser or [`mc`](https://docs.min.io/docs/minio-client-quickstart-guide). You’ll see the uploaded files are accessible from all the MinIO endpoints.

Loading…
Cancel
Save