diff --git a/cmd/config/cache/config.go b/cmd/config/cache/config.go index 8d78145be..adaffbef6 100644 --- a/cmd/config/cache/config.go +++ b/cmd/config/cache/config.go @@ -34,6 +34,7 @@ type Config struct { MaxUse int `json:"maxuse"` Quota int `json:"quota"` Exclude []string `json:"exclude"` + After int `json:"after"` } // UnmarshalJSON - implements JSON unmarshal interface for unmarshalling @@ -60,7 +61,9 @@ func (cfg *Config) UnmarshalJSON(data []byte) (err error) { if _cfg.Quota < 0 { return errors.New("config quota value should not be null or negative") } - + if _cfg.After < 0 { + return errors.New("cache after value should not be less than 0") + } return nil } diff --git a/cmd/config/cache/help.go b/cmd/config/cache/help.go index a3bfe36ec..cb2ba9436 100644 --- a/cmd/config/cache/help.go +++ b/cmd/config/cache/help.go @@ -50,5 +50,11 @@ var ( Optional: true, Type: "sentence", }, + config.HelpKV{ + Key: After, + Description: `minimum accesses before caching an object`, + Optional: true, + Type: "number", + }, } ) diff --git a/cmd/config/cache/lookup.go b/cmd/config/cache/lookup.go index 41e678d30..1a187c2e9 100644 --- a/cmd/config/cache/lookup.go +++ b/cmd/config/cache/lookup.go @@ -31,16 +31,20 @@ const ( Expiry = "expiry" MaxUse = "maxuse" Quota = "quota" + After = "after" + + EnvCacheDrives = "MINIO_CACHE_DRIVES" + EnvCacheExclude = "MINIO_CACHE_EXCLUDE" + EnvCacheExpiry = "MINIO_CACHE_EXPIRY" + EnvCacheMaxUse = "MINIO_CACHE_MAXUSE" + EnvCacheQuota = "MINIO_CACHE_QUOTA" + EnvCacheAfter = "MINIO_CACHE_AFTER" - EnvCacheDrives = "MINIO_CACHE_DRIVES" - EnvCacheExclude = "MINIO_CACHE_EXCLUDE" - EnvCacheExpiry = "MINIO_CACHE_EXPIRY" - EnvCacheMaxUse = "MINIO_CACHE_MAXUSE" - EnvCacheQuota = "MINIO_CACHE_QUOTA" EnvCacheEncryptionMasterKey = "MINIO_CACHE_ENCRYPTION_MASTER_KEY" DefaultExpiry = "90" DefaultQuota = "80" + DefaultAfter = "0" ) // DefaultKVS - default KV settings for caching. @@ -62,6 +66,10 @@ var ( Key: Quota, Value: DefaultQuota, }, + config.KV{ + Key: After, + Value: DefaultAfter, + }, } ) @@ -134,5 +142,17 @@ func LookupConfig(kvs config.KVS) (Config, error) { cfg.MaxUse = cfg.Quota } + if afterStr := env.Get(EnvCacheAfter, kvs.Get(After)); afterStr != "" { + cfg.After, err = strconv.Atoi(afterStr) + if err != nil { + return cfg, config.ErrInvalidCacheAfter(err) + } + // after should be a valid value >= 0. + if cfg.After < 0 { + err := errors.New("cache after value cannot be less than 0") + return cfg, config.ErrInvalidCacheAfter(err) + } + } + return cfg, nil } diff --git a/cmd/config/errors.go b/cmd/config/errors.go index 12d6af0bf..dd2f01304 100644 --- a/cmd/config/errors.go +++ b/cmd/config/errors.go @@ -66,6 +66,12 @@ var ( "MINIO_CACHE_QUOTA: Valid cache quota value must be between 0-100", ) + ErrInvalidCacheAfter = newErrFn( + "Invalid cache after value", + "Please check the passed value", + "MINIO_CACHE_AFTER: Valid cache after value must be 0 or greater", + ) + ErrInvalidCacheEncryptionKey = newErrFn( "Invalid cache encryption master key value", "Please check the passed value", diff --git a/cmd/disk-cache-backend.go b/cmd/disk-cache-backend.go index aaa4fa8bc..ff6133234 100644 --- a/cmd/disk-cache-backend.go +++ b/cmd/disk-cache-backend.go @@ -27,7 +27,6 @@ import ( "log" "net/http" "os" - "reflect" "sync" "time" @@ -68,6 +67,8 @@ type cacheMeta struct { Meta map[string]string `json:"meta,omitempty"` // Ranges maps cached range to associated filename. Ranges map[string]string `json:"ranges,omitempty"` + // Hits is a counter on the number of times this object has been accessed so far. + Hits int `json:"hits,omitempty"` } // RangeInfo has the range, file and range length information for a cached range. @@ -133,10 +134,15 @@ type diskCache struct { // purge() listens on this channel to start the cache-purge process purgeChan chan struct{} pool sync.Pool + after int // minimum accesses before an object is cached. + // nsMutex namespace lock + nsMutex *nsLockMap + // Object functions pointing to the corresponding functions of backend implementation. + NewNSLockFn func(ctx context.Context, cachePath string) RWLocker } // Inits the disk cache dir if it is not initialized already. -func newDiskCache(dir string, expiry int, quotaPct int) (*diskCache, error) { +func newDiskCache(dir string, expiry int, quotaPct, after int) (*diskCache, error) { if err := os.MkdirAll(dir, 0777); err != nil { return nil, fmt.Errorf("Unable to initialize '%s' dir, %w", dir, err) } @@ -144,6 +150,7 @@ func newDiskCache(dir string, expiry int, quotaPct int) (*diskCache, error) { dir: dir, expiry: expiry, quotaPct: quotaPct, + after: after, purgeChan: make(chan struct{}), online: true, onlineMutex: &sync.RWMutex{}, @@ -153,6 +160,10 @@ func newDiskCache(dir string, expiry int, quotaPct int) (*diskCache, error) { return &b }, }, + nsMutex: newNSLock(false), + } + cache.NewNSLockFn = func(ctx context.Context, cachePath string) RWLocker { + return cache.nsMutex.NewNSLock(ctx, nil, cachePath, "") } return &cache, nil } @@ -241,7 +252,7 @@ func (c *diskCache) purge() { if obj.Name() == minioMetaBucket { continue } - meta, _, err := c.statCachedMeta(pathJoin(c.dir, obj.Name())) + meta, _, _, err := c.statCachedMeta(context.Background(), pathJoin(c.dir, obj.Name())) if err != nil { // delete any partially filled cache entry left behind. removeAll(pathJoin(c.dir, obj.Name())) @@ -270,11 +281,9 @@ func (c *diskCache) purge() { break } } - lastRunTime := time.Now() for { <-c.purgeChan - timeElapsed := time.Since(lastRunTime) - if timeElapsed > time.Hour { + if c.diskUsageHigh() { break } } @@ -296,63 +305,67 @@ func (c *diskCache) IsOnline() bool { } // Stat returns ObjectInfo from disk cache -func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectInfo, err error) { +func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectInfo, numHits int, err error) { + var partial bool + var meta *cacheMeta + cacheObjPath := getCacheSHADir(c.dir, bucket, object) - oi, err = c.statCache(cacheObjPath) + // Stat the file to get file size. + meta, partial, numHits, err = c.statCachedMeta(ctx, cacheObjPath) if err != nil { return } + if partial { + return oi, numHits, errFileNotFound + } + oi = meta.ToObjectInfo("", "") oi.Bucket = bucket oi.Name = object if err = decryptCacheObjectETag(&oi); err != nil { - return oi, err + return } return } // statCachedMeta returns metadata from cache - including ranges cached, partial to indicate // if partial object is cached. -func (c *diskCache) statCachedMeta(cacheObjPath string) (meta *cacheMeta, partial bool, err error) { - // Stat the file to get file size. - metaPath := pathJoin(cacheObjPath, cacheMetaJSONFile) - f, err := os.Open(metaPath) - if err != nil { - return meta, partial, err - } - defer f.Close() - meta = &cacheMeta{Version: cacheMetaVersion} - if err := jsonLoad(f, meta); err != nil { - return meta, partial, err - } - // get metadata of part.1 if full file has been cached. - partial = true - fi, err := os.Stat(pathJoin(cacheObjPath, cacheDataFile)) - if err == nil { - meta.Stat.ModTime = atime.Get(fi) - partial = false +func (c *diskCache) statCachedMeta(ctx context.Context, cacheObjPath string) (meta *cacheMeta, partial bool, numHits int, err error) { + + cLock := c.NewNSLockFn(ctx, cacheObjPath) + if err = cLock.GetRLock(globalObjectTimeout); err != nil { + return } - return meta, partial, nil + + defer cLock.RUnlock() + return c.statCache(ctx, cacheObjPath) } // statRange returns ObjectInfo and RangeInfo from disk cache -func (c *diskCache) statRange(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (oi ObjectInfo, rngInfo RangeInfo, err error) { +func (c *diskCache) statRange(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (oi ObjectInfo, rngInfo RangeInfo, numHits int, err error) { // Stat the file to get file size. cacheObjPath := getCacheSHADir(c.dir, bucket, object) + var meta *cacheMeta + var partial bool - if rs == nil { - oi, err = c.statCache(cacheObjPath) - return oi, rngInfo, err - } - meta, _, err := c.statCachedMeta(cacheObjPath) + meta, partial, numHits, err = c.statCachedMeta(ctx, cacheObjPath) if err != nil { - return oi, rngInfo, err + return + } + + oi = meta.ToObjectInfo("", "") + oi.Bucket = bucket + oi.Name = object + if !partial { + err = decryptCacheObjectETag(&oi) + return } actualSize := uint64(meta.Stat.Size) - _, length, err := rs.GetOffsetLength(int64(actualSize)) + var length int64 + _, length, err = rs.GetOffsetLength(int64(actualSize)) if err != nil { - return oi, rngInfo, err + return } actualRngSize := uint64(length) @@ -363,38 +376,58 @@ func (c *diskCache) statRange(ctx context.Context, bucket, object string, rs *HT rng := rs.String(int64(actualSize)) rngFile, ok := meta.Ranges[rng] if !ok { - return oi, rngInfo, ObjectNotFound{Bucket: bucket, Object: object} + return oi, rngInfo, numHits, ObjectNotFound{Bucket: bucket, Object: object} } rngInfo = RangeInfo{Range: rng, File: rngFile, Size: int64(actualRngSize)} - oi = meta.ToObjectInfo("", "") - oi.Bucket = bucket - oi.Name = object - - if err = decryptCacheObjectETag(&oi); err != nil { - return oi, rngInfo, err - } + err = decryptCacheObjectETag(&oi) return } // statCache is a convenience function for purge() to get ObjectInfo for cached object -func (c *diskCache) statCache(cacheObjPath string) (oi ObjectInfo, e error) { +func (c *diskCache) statCache(ctx context.Context, cacheObjPath string) (meta *cacheMeta, partial bool, numHits int, err error) { // Stat the file to get file size. - meta, partial, err := c.statCachedMeta(cacheObjPath) + metaPath := pathJoin(cacheObjPath, cacheMetaJSONFile) + f, err := os.Open(metaPath) if err != nil { - return oi, err + return meta, partial, 0, err } - if partial { - return oi, errFileNotFound + defer f.Close() + meta = &cacheMeta{Version: cacheMetaVersion} + if err := jsonLoad(f, meta); err != nil { + return meta, partial, 0, err + } + // get metadata of part.1 if full file has been cached. + partial = true + fi, err := os.Stat(pathJoin(cacheObjPath, cacheDataFile)) + if err == nil { + meta.Stat.ModTime = atime.Get(fi) + partial = false } - return meta.ToObjectInfo("", ""), nil + return meta, partial, meta.Hits, nil } // saves object metadata to disk cache -func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string) error { - fileName := getCacheSHADir(c.dir, bucket, object) - metaPath := pathJoin(fileName, cacheMetaJSONFile) +// incHitsOnly is true if metadata update is incrementing only the hit counter +func (c *diskCache) SaveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string, incHitsOnly bool) error { + cachedPath := getCacheSHADir(c.dir, bucket, object) + cLock := c.NewNSLockFn(ctx, cachedPath) + if err := cLock.GetLock(globalObjectTimeout); err != nil { + return err + } + defer cLock.Unlock() + return c.saveMetadata(ctx, bucket, object, meta, actualSize, rs, rsFileName, incHitsOnly) +} +// saves object metadata to disk cache +// incHitsOnly is true if metadata update is incrementing only the hit counter +func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string, incHitsOnly bool) error { + cachedPath := getCacheSHADir(c.dir, bucket, object) + metaPath := pathJoin(cachedPath, cacheMetaJSONFile) + // Create cache directory if needed + if err := os.MkdirAll(cachedPath, 0777); err != nil { + return err + } f, err := os.OpenFile(metaPath, os.O_RDWR|os.O_CREATE, 0666) if err != nil { return err @@ -405,6 +438,7 @@ func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, met if err := jsonLoad(f, m); err != nil && err != io.EOF { return err } + // increment hits if rs != nil { if m.Ranges == nil { m.Ranges = make(map[string]string) @@ -413,44 +447,28 @@ func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, met } else { // this is necessary cleanup of range files if entire object is cached. for _, f := range m.Ranges { - removeAll(pathJoin(fileName, f)) + removeAll(pathJoin(cachedPath, f)) } m.Ranges = nil } m.Stat.Size = actualSize m.Stat.ModTime = UTCNow() - m.Meta = meta - m.Checksum = CacheChecksumInfoV1{Algorithm: HighwayHash256S.String(), Blocksize: cacheBlkSize} - - return jsonSave(f, m) -} - -// Backend metadata could have changed through server side copy - reset cache metadata if that is the case -func (c *diskCache) updateMetadataIfChanged(ctx context.Context, bucket, object string, bkObjectInfo, cacheObjInfo ObjectInfo) error { - - bkMeta := make(map[string]string) - cacheMeta := make(map[string]string) - for k, v := range bkObjectInfo.UserDefined { - if HasPrefix(k, ReservedMetadataPrefix) { - // Do not need to send any internal metadata - continue + if !incHitsOnly { + // reset meta + m.Meta = meta + } else { + if m.Meta == nil { + m.Meta = make(map[string]string) } - bkMeta[http.CanonicalHeaderKey(k)] = v - } - for k, v := range cacheObjInfo.UserDefined { - if HasPrefix(k, ReservedMetadataPrefix) { - // Do not need to send any internal metadata - continue + if etag, ok := meta["etag"]; !ok { + m.Meta["etag"] = etag } - cacheMeta[http.CanonicalHeaderKey(k)] = v - } - if !reflect.DeepEqual(bkMeta, cacheMeta) || - bkObjectInfo.ETag != cacheObjInfo.ETag || - bkObjectInfo.ContentType != cacheObjInfo.ContentType || - bkObjectInfo.Expires != cacheObjInfo.Expires { - return c.saveMetadata(ctx, bucket, object, getMetadata(bkObjectInfo), bkObjectInfo.Size, nil, "") } - return nil + + m.Hits++ + + m.Checksum = CacheChecksumInfoV1{Algorithm: HighwayHash256S.String(), Blocksize: cacheBlkSize} + return jsonSave(f, m) } func getCacheSHADir(dir, bucket, object string) string { @@ -548,7 +566,7 @@ func newCacheEncryptMetadata(bucket, object string, metadata map[string]string) } // Caches the object to disk -func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions) error { +func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions, incHitsOnly bool) error { if c.diskUsageHigh() { select { case c.purgeChan <- struct{}{}: @@ -556,13 +574,34 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read } return errDiskFull } + cachePath := getCacheSHADir(c.dir, bucket, object) + cLock := c.NewNSLockFn(ctx, cachePath) + if err := cLock.GetLock(globalObjectTimeout); err != nil { + return err + } + defer cLock.Unlock() + + meta, _, numHits, err := c.statCache(ctx, cachePath) + // Case where object not yet cached + if os.IsNotExist(err) && c.after >= 1 { + return c.saveMetadata(ctx, bucket, object, opts.UserDefined, size, nil, "", false) + } + // Case where object already has a cache metadata entry but not yet cached + if err == nil && numHits < c.after { + cETag := extractETag(meta.Meta) + bETag := extractETag(opts.UserDefined) + if cETag == bETag { + return c.saveMetadata(ctx, bucket, object, opts.UserDefined, size, nil, "", false) + } + incHitsOnly = true + } + if rs != nil { return c.putRange(ctx, bucket, object, data, size, rs, opts) } if !c.diskAvailable(size) { return errDiskFull } - cachePath := getCacheSHADir(c.dir, bucket, object) if err := os.MkdirAll(cachePath, 0777); err != nil { return err } @@ -572,7 +611,6 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read } var reader = data var actualSize = uint64(size) - var err error if globalCacheKMS != nil { reader, err = newCacheEncryptReader(data, bucket, object, metadata) if err != nil { @@ -584,6 +622,7 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read if IsErr(err, baseErrs...) { c.setOnline(false) } + if err != nil { removeAll(cachePath) return err @@ -592,7 +631,7 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read removeAll(cachePath) return IncompleteBody{} } - return c.saveMetadata(ctx, bucket, object, metadata, n, nil, "") + return c.saveMetadata(ctx, bucket, object, metadata, n, nil, "", incHitsOnly) } // Caches the range to disk @@ -638,7 +677,7 @@ func (c *diskCache) putRange(ctx context.Context, bucket, object string, data io removeAll(cachePath) return IncompleteBody{} } - return c.saveMetadata(ctx, bucket, object, metadata, int64(objSize), rs, cacheFile) + return c.saveMetadata(ctx, bucket, object, metadata, int64(objSize), rs, cacheFile, false) } // checks streaming bitrot checksum of cached object before returning data @@ -738,13 +777,18 @@ func (c *diskCache) bitrotReadFromCache(ctx context.Context, filePath string, of } // Get returns ObjectInfo and reader for object from disk cache -func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) { - var objInfo ObjectInfo +func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, numHits int, err error) { cacheObjPath := getCacheSHADir(c.dir, bucket, object) - var rngInfo RangeInfo + cLock := c.NewNSLockFn(ctx, cacheObjPath) + if err := cLock.GetRLock(globalObjectTimeout); err != nil { + return nil, numHits, err + } - if objInfo, rngInfo, err = c.statRange(ctx, bucket, object, rs); err != nil { - return nil, toObjectErr(err, bucket, object) + defer cLock.RUnlock() + var objInfo ObjectInfo + var rngInfo RangeInfo + if objInfo, rngInfo, numHits, err = c.statRange(ctx, bucket, object, rs); err != nil { + return nil, numHits, toObjectErr(err, bucket, object) } cacheFile := cacheDataFile objSize := objInfo.Size @@ -760,12 +804,13 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang if HasSuffix(object, SlashSeparator) { // The lock taken above is released when // objReader.Close() is called by the caller. - return NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts.CheckCopyPrecondFn, nsUnlocker) + gr, gerr := NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts.CheckCopyPrecondFn, nsUnlocker) + return gr, numHits, gerr } fn, off, length, nErr := NewGetObjectReader(rs, objInfo, opts.CheckCopyPrecondFn, nsUnlocker) if nErr != nil { - return nil, nErr + return nil, numHits, nErr } filePath := pathJoin(cacheObjPath, cacheFile) pr, pw := io.Pipe() @@ -782,7 +827,7 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang gr, gerr := fn(pr, h, opts.CheckCopyPrecondFn, pipeCloser) if gerr != nil { - return gr, gerr + return gr, numHits, gerr } if globalCacheKMS != nil { // clean up internal SSE cache metadata @@ -792,15 +837,24 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang // overlay Size with actual object size and not the range size gr.ObjInfo.Size = objSize } - return gr, nil + return gr, numHits, nil } // Deletes the cached object -func (c *diskCache) Delete(ctx context.Context, bucket, object string) (err error) { - cachePath := getCacheSHADir(c.dir, bucket, object) - return removeAll(cachePath) +func (c *diskCache) delete(ctx context.Context, cacheObjPath string) (err error) { + cLock := c.NewNSLockFn(ctx, cacheObjPath) + if err := cLock.GetLock(globalObjectTimeout); err != nil { + return err + } + defer cLock.Unlock() + return removeAll(cacheObjPath) +} +// Deletes the cached object +func (c *diskCache) Delete(ctx context.Context, bucket, object string) (err error) { + cacheObjPath := getCacheSHADir(c.dir, bucket, object) + return c.delete(ctx, cacheObjPath) } // convenience function to check if object is cached on this diskCache diff --git a/cmd/disk-cache-utils.go b/cmd/disk-cache-utils.go index 9c97df74c..5c6397a5f 100644 --- a/cmd/disk-cache-utils.go +++ b/cmd/disk-cache-utils.go @@ -265,3 +265,21 @@ func decryptCacheObjectETag(info *ObjectInfo) error { return nil } + +func isMetadataSame(m1, m2 map[string]string) bool { + if m1 == nil && m2 == nil { + return true + } + if (m1 == nil && m2 != nil) || (m2 == nil && m1 != nil) { + return false + } + if len(m1) != len(m2) { + return false + } + for k1, v1 := range m1 { + if v2, ok := m2[k1]; !ok || (v1 != v2) { + return false + } + } + return true +} diff --git a/cmd/disk-cache-utils_test.go b/cmd/disk-cache-utils_test.go index b2953f8e4..b158fda18 100644 --- a/cmd/disk-cache-utils_test.go +++ b/cmd/disk-cache-utils_test.go @@ -58,3 +58,28 @@ func TestGetCacheControlOpts(t *testing.T) { }) } } + +func TestIsMetadataSame(t *testing.T) { + + testCases := []struct { + m1 map[string]string + m2 map[string]string + expected bool + }{ + {nil, nil, true}, + {nil, map[string]string{}, false}, + {map[string]string{"k": "v"}, map[string]string{"k": "v"}, true}, + {map[string]string{"k": "v"}, map[string]string{"a": "b"}, false}, + {map[string]string{"k1": "v1", "k2": "v2"}, map[string]string{"k1": "v1", "k2": "v1"}, false}, + {map[string]string{"k1": "v1", "k2": "v2"}, map[string]string{"k1": "v1", "k2": "v2"}, true}, + {map[string]string{"K1": "v1", "k2": "v2"}, map[string]string{"k1": "v1", "k2": "v2"}, false}, + {map[string]string{"k1": "v1", "k2": "v2", "k3": "v3"}, map[string]string{"k1": "v1", "k2": "v2"}, false}, + } + + for i, testCase := range testCases { + actual := isMetadataSame(testCase.m1, testCase.m2) + if testCase.expected != actual { + t.Errorf("test %d expected %v, got %v", i, testCase.expected, actual) + } + } +} diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index 0b17785bf..3ad56b4db 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -1,5 +1,5 @@ /* - * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * MinIO Cloud Storage, (C) 2019,2020 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,20 +67,16 @@ type cacheObjects struct { cache []*diskCache // file path patterns to exclude from cache exclude []string - + // number of accesses after which to cache an object + after int // if true migration is in progress from v1 to v2 migrating bool // mutex to protect migration bool migMutex sync.Mutex - // nsMutex namespace lock - nsMutex *nsLockMap - // Cache stats cacheStats *CacheStats - // Object functions pointing to the corresponding functions of backend implementation. - NewNSLockFn func(ctx context.Context, bucket, object string) RWLocker GetObjectNInfoFn func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) GetObjectInfoFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) DeleteObjectFn func(ctx context.Context, bucket, object string) error @@ -88,53 +84,39 @@ type cacheObjects struct { PutObjectFn func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) } -func (c *cacheObjects) delete(ctx context.Context, dcache *diskCache, bucket, object string) (err error) { - cLock := c.NewNSLockFn(ctx, bucket, object) - if err := cLock.GetLock(globalObjectTimeout); err != nil { - return err - } - defer cLock.Unlock() - return dcache.Delete(ctx, bucket, object) +func (c *cacheObjects) incHitsToMeta(ctx context.Context, dcache *diskCache, bucket, object string, size int64, eTag string) error { + metadata := make(map[string]string) + metadata["etag"] = eTag + return dcache.SaveMetadata(ctx, bucket, object, metadata, size, nil, "", true) } -func (c *cacheObjects) put(ctx context.Context, dcache *diskCache, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions) error { - cLock := c.NewNSLockFn(ctx, bucket, object) - if err := cLock.GetLock(globalObjectTimeout); err != nil { - return err - } - defer cLock.Unlock() - return dcache.Put(ctx, bucket, object, data, size, rs, opts) -} +// Backend metadata could have changed through server side copy - reset cache metadata if that is the case +func (c *cacheObjects) updateMetadataIfChanged(ctx context.Context, dcache *diskCache, bucket, object string, bkObjectInfo, cacheObjInfo ObjectInfo) error { -func (c *cacheObjects) get(ctx context.Context, dcache *diskCache, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) { - cLock := c.NewNSLockFn(ctx, bucket, object) - if err := cLock.GetRLock(globalObjectTimeout); err != nil { - return nil, err + bkMeta := make(map[string]string) + cacheMeta := make(map[string]string) + for k, v := range bkObjectInfo.UserDefined { + if HasPrefix(k, ReservedMetadataPrefix) { + // Do not need to send any internal metadata + continue + } + bkMeta[http.CanonicalHeaderKey(k)] = v } - - defer cLock.RUnlock() - return dcache.Get(ctx, bucket, object, rs, h, opts) -} - -func (c *cacheObjects) stat(ctx context.Context, dcache *diskCache, bucket, object string) (oi ObjectInfo, err error) { - cLock := c.NewNSLockFn(ctx, bucket, object) - if err := cLock.GetRLock(globalObjectTimeout); err != nil { - return oi, err + for k, v := range cacheObjInfo.UserDefined { + if HasPrefix(k, ReservedMetadataPrefix) { + // Do not need to send any internal metadata + continue + } + cacheMeta[http.CanonicalHeaderKey(k)] = v } - defer cLock.RUnlock() - return dcache.Stat(ctx, bucket, object) -} - -func (c *cacheObjects) statRange(ctx context.Context, dcache *diskCache, bucket, object string, rs *HTTPRangeSpec) (oi ObjectInfo, err error) { - cLock := c.NewNSLockFn(ctx, bucket, object) - if err := cLock.GetRLock(globalObjectTimeout); err != nil { - return oi, err + if !isMetadataSame(bkMeta, cacheMeta) || + bkObjectInfo.ETag != cacheObjInfo.ETag || + bkObjectInfo.ContentType != cacheObjInfo.ContentType || + !bkObjectInfo.Expires.Equal(cacheObjInfo.Expires) { + return dcache.SaveMetadata(ctx, bucket, object, getMetadata(bkObjectInfo), bkObjectInfo.Size, nil, "", false) } - - defer cLock.RUnlock() - oi, _, err = dcache.statRange(ctx, bucket, object, rs) - return oi, err + return c.incHitsToMeta(ctx, dcache, bucket, object, cacheObjInfo.Size, cacheObjInfo.ETag) } // DeleteObject clears cache entry if backend delete operation succeeds @@ -150,9 +132,7 @@ func (c *cacheObjects) DeleteObject(ctx context.Context, bucket, object string) if cerr != nil { return } - if dcache.Exists(ctx, bucket, object) { - c.delete(ctx, dcache, bucket, object) - } + dcache.Delete(ctx, bucket, object) return } @@ -200,7 +180,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts) } - cacheReader, cacheErr := c.get(ctx, dcache, bucket, object, rs, h, opts) + cacheReader, numCacheHits, cacheErr := dcache.Get(ctx, bucket, object, rs, h, opts) if cacheErr == nil { cacheObjSize = cacheReader.ObjInfo.Size if rs != nil { @@ -220,6 +200,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string } c.cacheStats.incHit() c.cacheStats.incBytesServed(bytesServed) + c.incHitsToMeta(ctx, dcache, bucket, object, cacheReader.ObjInfo.Size, cacheReader.ObjInfo.ETag) return cacheReader, nil } if cc.noStore { @@ -263,13 +244,13 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string // if ETag matches for stale cache entry, serve from cache if cacheReader.ObjInfo.ETag == objInfo.ETag { // Update metadata in case server-side copy might have changed object metadata - dcache.updateMetadataIfChanged(ctx, bucket, object, objInfo, cacheReader.ObjInfo) + c.updateMetadataIfChanged(ctx, dcache, bucket, object, objInfo, cacheReader.ObjInfo) c.incCacheStats(cacheObjSize) return cacheReader, nil } cacheReader.Close() // Object is stale, so delete from cache - c.delete(ctx, dcache, bucket, object) + dcache.Delete(ctx, bucket, object) } // Reaching here implies cache miss @@ -284,9 +265,17 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string } bkReader, bkErr := c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts) + if bkErr != nil { return bkReader, bkErr } + // If object has less hits than configured cache after, just increment the hit counter + // but do not cache it. + if numCacheHits < c.after { + c.incHitsToMeta(ctx, dcache, bucket, object, objInfo.Size, objInfo.ETag) + return bkReader, bkErr + } + // Record if cache has a hit that was invalidated by ETag verification if cacheErr == nil { bkReader.ObjInfo.CacheLookupStatus = CacheHit @@ -303,10 +292,10 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string return } defer bReader.Close() - oi, err := c.statRange(ctx, dcache, bucket, object, rs) + oi, _, _, err := dcache.statRange(ctx, bucket, object, rs) // avoid cache overwrite if another background routine filled cache if err != nil || oi.ETag != bReader.ObjInfo.ETag { - c.put(ctx, dcache, bucket, object, bReader, bReader.ObjInfo.Size, rs, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}) + dcache.Put(ctx, bucket, object, bReader, bReader.ObjInfo.Size, rs, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}, true) } }() return bkReader, bkErr @@ -316,7 +305,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string pipeReader, pipeWriter := io.Pipe() teeReader := io.TeeReader(bkReader, pipeWriter) go func() { - putErr := c.put(ctx, dcache, bucket, object, io.LimitReader(pipeReader, bkReader.ObjInfo.Size), bkReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bkReader.ObjInfo)}) + putErr := dcache.Put(ctx, bucket, object, io.LimitReader(pipeReader, bkReader.ObjInfo.Size), bkReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bkReader.ObjInfo)}, false) // close the write end of the pipe, so the error gets // propagated to getObjReader pipeWriter.CloseWithError(putErr) @@ -341,7 +330,7 @@ func (c *cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string, } var cc cacheControl // if cache control setting is valid, avoid HEAD operation to backend - cachedObjInfo, cerr := c.stat(ctx, dcache, bucket, object) + cachedObjInfo, _, cerr := dcache.Stat(ctx, bucket, object) if cerr == nil { cc = cacheControlOpts(cachedObjInfo) if !cc.isStale(cachedObjInfo.ModTime) { @@ -355,7 +344,7 @@ func (c *cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string, if err != nil { if _, ok := err.(ObjectNotFound); ok { // Delete the cached entry if backend object was deleted. - c.delete(ctx, dcache, bucket, object) + dcache.Delete(ctx, bucket, object) c.cacheStats.incMiss() return ObjectInfo{}, err } @@ -379,7 +368,7 @@ func (c *cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string, } if cachedObjInfo.ETag != objInfo.ETag { // Delete the cached entry if the backend object was replaced. - c.delete(ctx, dcache, bucket, object) + dcache.Delete(ctx, bucket, object) } return objInfo, nil } @@ -508,7 +497,7 @@ func newCache(config cache.Config) ([]*diskCache, bool, error) { quota = config.Quota } - cache, err := newDiskCache(dir, config.Expiry, quota) + cache, err := newDiskCache(dir, config.Expiry, quota, config.After) if err != nil { return nil, false, err } @@ -516,7 +505,6 @@ func newCache(config cache.Config) ([]*diskCache, bool, error) { if !migrating { go cache.purge() } - caches = append(caches, cache) } return caches, migrating, nil @@ -630,10 +618,10 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r * return } defer bReader.Close() - oi, err := c.stat(ctx, dcache, bucket, object) + oi, _, err := dcache.Stat(ctx, bucket, object) // avoid cache overwrite if another background routine filled cache if err != nil || oi.ETag != bReader.ObjInfo.ETag { - c.put(ctx, dcache, bucket, object, bReader, bReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}) + dcache.Put(ctx, bucket, object, bReader, bReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}, true) } }() } @@ -649,13 +637,12 @@ func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjec if err != nil { return nil, err } - c := &cacheObjects{ cache: cache, exclude: config.Exclude, + after: config.After, migrating: migrateSw, migMutex: sync.Mutex{}, - nsMutex: newNSLock(false), cacheStats: newCacheStats(), GetObjectInfoFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) { return newObjectLayerFn().GetObjectInfo(ctx, bucket, object, opts) @@ -677,9 +664,6 @@ func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjec return newObjectLayerFn().PutObject(ctx, bucket, object, data, opts) }, } - c.NewNSLockFn = func(ctx context.Context, bucket, object string) RWLocker { - return c.nsMutex.NewNSLock(ctx, nil, bucket, object) - } if migrateSw { go c.migrateCacheFromV1toV2(ctx) diff --git a/cmd/disk-cache_test.go b/cmd/disk-cache_test.go index a255b553a..7f5b09b23 100644 --- a/cmd/disk-cache_test.go +++ b/cmd/disk-cache_test.go @@ -27,15 +27,15 @@ import ( ) // Initialize cache objects. -func initCacheObjects(disk string, cacheMaxUse int) (*diskCache, error) { - return newDiskCache(disk, 80, cacheMaxUse) +func initCacheObjects(disk string, cacheMaxUse, cacheAfter int) (*diskCache, error) { + return newDiskCache(disk, 80, cacheMaxUse, cacheAfter) } // inits diskCache struct for nDisks -func initDiskCaches(drives []string, cacheMaxUse int, t *testing.T) ([]*diskCache, error) { +func initDiskCaches(drives []string, cacheMaxUse, cacheAfter int, t *testing.T) ([]*diskCache, error) { var cb []*diskCache for _, d := range drives { - obj, err := initCacheObjects(d, cacheMaxUse) + obj, err := initCacheObjects(d, cacheMaxUse, cacheAfter) if err != nil { return nil, err } @@ -70,7 +70,7 @@ func TestGetCachedLoc(t *testing.T) { if err != nil { t.Fatal(err) } - d, err := initDiskCaches(fsDirs, 100, t) + d, err := initDiskCaches(fsDirs, 100, 1, t) if err != nil { t.Fatal(err) } @@ -109,7 +109,7 @@ func TestGetCacheMaxUse(t *testing.T) { if err != nil { t.Fatal(err) } - d, err := initDiskCaches(fsDirs, 80, t) + d, err := initDiskCaches(fsDirs, 80, 1, t) if err != nil { t.Fatal(err) } @@ -180,7 +180,7 @@ func TestDiskCache(t *testing.T) { if err != nil { t.Fatal(err) } - d, err := initDiskCaches(fsDirs, 100, t) + d, err := initDiskCaches(fsDirs, 100, 0, t) if err != nil { t.Fatal(err) } @@ -212,11 +212,11 @@ func TestDiskCache(t *testing.T) { if err != nil { t.Fatal(err) } - err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), nil, ObjectOptions{UserDefined: httpMeta}) + err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), nil, ObjectOptions{UserDefined: httpMeta}, false) if err != nil { t.Fatal(err) } - cReader, err := cache.Get(ctx, bucketName, objectName, nil, http.Header{ + cReader, _, err := cache.Get(ctx, bucketName, objectName, nil, http.Header{ "Content-Type": []string{"application/json"}, }, opts) if err != nil { @@ -258,7 +258,7 @@ func TestDiskCacheMaxUse(t *testing.T) { if err != nil { t.Fatal(err) } - d, err := initDiskCaches(fsDirs, 80, t) + d, err := initDiskCaches(fsDirs, 80, 0, t) if err != nil { t.Fatal(err) } @@ -290,16 +290,16 @@ func TestDiskCacheMaxUse(t *testing.T) { t.Fatal(err) } if !cache.diskAvailable(int64(size)) { - err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), nil, ObjectOptions{UserDefined: httpMeta}) + err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), nil, ObjectOptions{UserDefined: httpMeta}, false) if err != errDiskFull { t.Fatal("Cache max-use limit violated.") } } else { - err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), nil, ObjectOptions{UserDefined: httpMeta}) + err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), nil, ObjectOptions{UserDefined: httpMeta}, false) if err != nil { t.Fatal(err) } - cReader, err := cache.Get(ctx, bucketName, objectName, nil, nil, opts) + cReader, _, err := cache.Get(ctx, bucketName, objectName, nil, nil, opts) if err != nil { t.Fatal(err) } diff --git a/docs/config/README.md b/docs/config/README.md index 3299a263b..acecbce7a 100644 --- a/docs/config/README.md +++ b/docs/config/README.md @@ -121,6 +121,7 @@ drives* (csv) comma separated mountpoints e.g. "/optane1,/optane2" expiry (number) cache expiry duration in days e.g. "90" quota (number) limit cache drive usage in percentage e.g. "90" exclude (csv) comma separated wildcard exclusion patterns e.g. "bucket/*.tmp,*.exe" +after (number) minimum number of access before caching an object comment (sentence) optionally add a comment to this setting ``` @@ -134,6 +135,7 @@ MINIO_CACHE_DRIVES* (csv) comma separated mountpoints e.g. "/optane1,/opt MINIO_CACHE_EXPIRY (number) cache expiry duration in days e.g. "90" MINIO_CACHE_QUOTA (number) limit cache drive usage in percentage e.g. "90" MINIO_CACHE_EXCLUDE (csv) comma separated wildcard exclusion patterns e.g. "bucket/*.tmp,*.exe" +MINIO_CACHE_AFTER (number) minimum number of access before caching an object MINIO_CACHE_COMMENT (sentence) optionally add a comment to this setting ``` diff --git a/docs/disk-caching/DESIGN.md b/docs/disk-caching/DESIGN.md index d241d3969..11cfb32c6 100644 --- a/docs/disk-caching/DESIGN.md +++ b/docs/disk-caching/DESIGN.md @@ -13,16 +13,19 @@ minio gateway -h MINIO_CACHE_EXCLUDE: List of cache exclusion patterns delimited by "," MINIO_CACHE_EXPIRY: Cache expiry duration in days MINIO_CACHE_QUOTA: Maximum permitted usage of the cache in percentage (0-100). + MINIO_CACHE_AFTER: Minimum number of access before caching an object. + ... ... 7. Start MinIO gateway to s3 with edge caching enabled on '/mnt/drive1', '/mnt/drive2' and '/mnt/export1 ... /mnt/export24', exclude all objects under 'mybucket', exclude all objects with '.pdf' as extension - with expiry up to 40 days. + with expiry up to 40 days. Cache only those objects accessed atleast 3 times. $ export MINIO_CACHE_DRIVES="/mnt/drive1,/mnt/drive2,/mnt/export{1..24}" $ export MINIO_CACHE_EXCLUDE="mybucket/*,*.pdf" $ export MINIO_CACHE_EXPIRY=40 $ export MINIO_CACHE_QUOTA=80 + $ export MINIO_CACHE_AFTER=3 $ minio gateway s3 ``` diff --git a/docs/disk-caching/README.md b/docs/disk-caching/README.md index 6ca136e60..b56095c1a 100644 --- a/docs/disk-caching/README.md +++ b/docs/disk-caching/README.md @@ -15,7 +15,7 @@ Install MinIO - [MinIO Quickstart Guide](https://docs.min.io/docs/minio-quicksta Disk caching can be enabled by setting the `cache` environment variables for MinIO gateway . `cache` environment variables takes the mounted drive(s) or directory paths, cache expiry duration (in days) and any wildcard patterns to exclude from being cached. -Following example uses `/mnt/drive1`, `/mnt/drive2` ,`/mnt/cache1` ... `/mnt/cache3` for caching, with expiry up to 90 days while excluding all objects under bucket `mybucket` and all objects with '.pdf' as extension while starting a s3 gateway setup. Cache max usage is restricted to 80% of disk capacity in this example. +Following example uses `/mnt/drive1`, `/mnt/drive2` ,`/mnt/cache1` ... `/mnt/cache3` for caching, with expiry up to 90 days while excluding all objects under bucket `mybucket` and all objects with '.pdf' as extension while starting a s3 gateway setup. Objects are cached if they have been accessed three times or more.Cache max usage is restricted to 80% of disk capacity in this example. ```bash export MINIO_CACHE="on" @@ -23,6 +23,7 @@ export MINIO_CACHE_DRIVES="/mnt/drive1,/mnt/drive2,/mnt/cache{1...3}" export MINIO_CACHE_EXPIRY=90 export MINIO_CACHE_EXCLUDE="*.pdf,mybucket/*" export MINIO_CACHE_QUOTA=80 +export MINIO_CACHE_AFTER=3 minio gateway s3 ```