From 3c30e4503da27a55cff28054433310015cc98c19 Mon Sep 17 00:00:00 2001 From: poornas Date: Sun, 8 Dec 2019 13:58:04 -0800 Subject: [PATCH] Cache only the range requested for range GETs (#8599) --- cmd/disk-cache-backend.go | 263 +++++++++++++++++++++++++++++++------- cmd/disk-cache.go | 35 +++-- cmd/disk-cache_test.go | 6 +- cmd/format-disk-cache.go | 3 +- cmd/httprange.go | 12 ++ 5 files changed, 256 insertions(+), 63 deletions(-) diff --git a/cmd/disk-cache-backend.go b/cmd/disk-cache-backend.go index 75d1e6617..890179d63 100644 --- a/cmd/disk-cache-backend.go +++ b/cmd/disk-cache-backend.go @@ -21,14 +21,12 @@ import ( "context" "crypto/rand" "encoding/hex" - "encoding/json" "fmt" "io" "io/ioutil" "log" "net/http" "os" - "path" "reflect" "sync" "time" @@ -68,6 +66,20 @@ type cacheMeta struct { Checksum CacheChecksumInfoV1 `json:"checksum,omitempty"` // Metadata map for current object. Meta map[string]string `json:"meta,omitempty"` + // Ranges maps cached range to associated filename. + Ranges map[string]string `json:"ranges,omitempty"` +} + +// RangeInfo has the range, file and range length information for a cached range. +type RangeInfo struct { + Range string + File string + Size int64 +} + +// Empty returns true if this is an empty struct +func (r *RangeInfo) Empty() bool { + return r.Range == "" && r.File == "" && r.Size == 0 } func (m *cacheMeta) ToObjectInfo(bucket, object string) (o ObjectInfo) { @@ -190,6 +202,22 @@ func (c *diskCache) diskAvailable(size int64) bool { // Purge cache entries that were not accessed. func (c *diskCache) purge() { + // this function returns FileInfo for cached range files and cache data file. + fiStatFn := func(ranges map[string]string, dataFile, pathPrefix string) map[string]os.FileInfo { + fm := make(map[string]os.FileInfo) + fname := pathJoin(pathPrefix, cacheDataFile) + if fi, err := os.Stat(fname); err == nil { + fm[fname] = fi + } + + for _, rngFile := range ranges { + fname = pathJoin(pathPrefix, rngFile) + if fi, err := os.Stat(fname); err == nil { + fm[fname] = fi + } + } + return fm + } ctx := context.Background() for { olderThan := c.expiry @@ -211,30 +239,28 @@ func (c *diskCache) purge() { if obj.Name() == minioMetaBucket { continue } - // stat entry to get atime - var fi os.FileInfo - fi, err := os.Stat(pathJoin(c.dir, obj.Name(), cacheDataFile)) - if err != nil { - continue - } - - objInfo, err := c.statCache(pathJoin(c.dir, obj.Name())) + meta, _, err := c.statCachedMeta(pathJoin(c.dir, obj.Name())) if err != nil { // delete any partially filled cache entry left behind. removeAll(pathJoin(c.dir, obj.Name())) continue } + // stat all cached file ranges and cacheDataFile. + fis := fiStatFn(meta.Ranges, cacheDataFile, pathJoin(c.dir, obj.Name())) + objInfo := meta.ToObjectInfo("", "") cc := cacheControlOpts(objInfo) - if atime.Get(fi).Before(expiry) || - cc.isStale(objInfo.ModTime) { - if err = removeAll(pathJoin(c.dir, obj.Name())); err != nil { - logger.LogIf(ctx, err) - } - deletedCount++ - // break early if sufficient disk space reclaimed. - if !c.diskUsageLow() { - break + for fname, fi := range fis { + if atime.Get(fi).Before(expiry) || + cc.isStale(objInfo.ModTime) { + if err = removeAll(fname); err != nil { + logger.LogIf(ctx, err) + } + deletedCount++ + // break early if sufficient disk space reclaimed. + if !c.diskUsageLow() { + break + } } } } @@ -283,49 +309,118 @@ func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectI return } -// statCache is a convenience function for purge() to get ObjectInfo for cached object -func (c *diskCache) statCache(cacheObjPath string) (oi ObjectInfo, e error) { +// statCachedMeta returns metadata from cache - including ranges cached, partial to indicate +// if partial object is cached. +func (c *diskCache) statCachedMeta(cacheObjPath string) (meta *cacheMeta, partial bool, err error) { // Stat the file to get file size. - metaPath := path.Join(cacheObjPath, cacheMetaJSONFile) + metaPath := pathJoin(cacheObjPath, cacheMetaJSONFile) f, err := os.Open(metaPath) if err != nil { - return oi, err + return meta, partial, err } defer f.Close() - - meta := &cacheMeta{Version: cacheMetaVersion} + meta = &cacheMeta{Version: cacheMetaVersion} if err := jsonLoad(f, meta); err != nil { - return oi, err + return meta, partial, err } + // get metadata of part.1 if full file has been cached. + partial = true fi, err := os.Stat(pathJoin(cacheObjPath, cacheDataFile)) + if err == nil { + meta.Stat.ModTime = atime.Get(fi) + partial = false + } + return meta, partial, nil +} + +// statRange returns ObjectInfo and RangeInfo from disk cache +func (c *diskCache) statRange(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (oi ObjectInfo, rngInfo RangeInfo, err error) { + // Stat the file to get file size. + cacheObjPath := getCacheSHADir(c.dir, bucket, object) + + if rs == nil { + oi, err = c.statCache(cacheObjPath) + return oi, rngInfo, err + } + meta, _, err := c.statCachedMeta(cacheObjPath) + if err != nil { + return oi, rngInfo, err + } + + actualSize := uint64(meta.Stat.Size) + _, length, err := rs.GetOffsetLength(int64(actualSize)) + if err != nil { + return oi, rngInfo, err + } + + actualRngSize := uint64(length) + if globalCacheKMS != nil { + actualRngSize, _ = sio.EncryptedSize(uint64(length)) + } + + rng := rs.String(int64(actualSize)) + rngFile, ok := meta.Ranges[rng] + if !ok { + return oi, rngInfo, ObjectNotFound{Bucket: bucket, Object: object} + } + rngInfo = RangeInfo{Range: rng, File: rngFile, Size: int64(actualRngSize)} + + oi = meta.ToObjectInfo("", "") + oi.Bucket = bucket + oi.Name = object + + if err = decryptCacheObjectETag(&oi); err != nil { + return oi, rngInfo, err + } + return +} + +// statCache is a convenience function for purge() to get ObjectInfo for cached object +func (c *diskCache) statCache(cacheObjPath string) (oi ObjectInfo, e error) { + // Stat the file to get file size. + meta, partial, err := c.statCachedMeta(cacheObjPath) if err != nil { return oi, err } - meta.Stat.ModTime = atime.Get(fi) + if partial { + return oi, errFileNotFound + } return meta.ToObjectInfo("", ""), nil } // saves object metadata to disk cache -func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64) error { +func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, meta map[string]string, actualSize int64, rs *HTTPRangeSpec, rsFileName string) error { fileName := getCacheSHADir(c.dir, bucket, object) metaPath := pathJoin(fileName, cacheMetaJSONFile) - f, err := os.Create(metaPath) + f, err := os.OpenFile(metaPath, os.O_RDWR|os.O_CREATE, 0666) if err != nil { return err } defer f.Close() - m := cacheMeta{Meta: meta, Version: cacheMetaVersion} + m := &cacheMeta{Version: cacheMetaVersion} + if err := jsonLoad(f, m); err != nil && err != io.EOF { + return err + } + if rs != nil { + if m.Ranges == nil { + m.Ranges = make(map[string]string) + } + m.Ranges[rs.String(actualSize)] = rsFileName + } else { + // this is necessary cleanup of range files if entire object is cached. + for _, f := range m.Ranges { + removeAll(pathJoin(fileName, f)) + } + m.Ranges = nil + } m.Stat.Size = actualSize m.Stat.ModTime = UTCNow() + m.Meta = meta m.Checksum = CacheChecksumInfoV1{Algorithm: HighwayHash256S.String(), Blocksize: cacheBlkSize} - jsonData, err := json.Marshal(m) - if err != nil { - return err - } - _, err = f.Write(jsonData) - return err + + return jsonSave(f, m) } // Backend metadata could have changed through server side copy - reset cache metadata if that is the case @@ -351,21 +446,21 @@ func (c *diskCache) updateMetadataIfChanged(ctx context.Context, bucket, object bkObjectInfo.ETag != cacheObjInfo.ETag || bkObjectInfo.ContentType != cacheObjInfo.ContentType || bkObjectInfo.Expires != cacheObjInfo.Expires { - return c.saveMetadata(ctx, bucket, object, getMetadata(bkObjectInfo), bkObjectInfo.Size) + return c.saveMetadata(ctx, bucket, object, getMetadata(bkObjectInfo), bkObjectInfo.Size, nil, "") } return nil } func getCacheSHADir(dir, bucket, object string) string { - return path.Join(dir, getSHA256Hash([]byte(path.Join(bucket, object)))) + return pathJoin(dir, getSHA256Hash([]byte(pathJoin(bucket, object)))) } // Cache data to disk with bitrot checksum added for each block of 1MB -func (c *diskCache) bitrotWriteToCache(cachePath string, reader io.Reader, size uint64) (int64, error) { +func (c *diskCache) bitrotWriteToCache(cachePath, fileName string, reader io.Reader, size uint64) (int64, error) { if err := os.MkdirAll(cachePath, 0777); err != nil { return 0, err } - filePath := path.Join(cachePath, cacheDataFile) + filePath := pathJoin(cachePath, fileName) if filePath == "" || reader == nil { return 0, errInvalidArgument @@ -434,7 +529,7 @@ func newCacheEncryptMetadata(bucket, object string, metadata map[string]string) if globalCacheKMS == nil { return nil, errKMSNotConfigured } - key, encKey, err := globalCacheKMS.GenerateKey(globalCacheKMS.KeyID(), crypto.Context{bucket: path.Join(bucket, object)}) + key, encKey, err := globalCacheKMS.GenerateKey(globalCacheKMS.KeyID(), crypto.Context{bucket: pathJoin(bucket, object)}) if err != nil { return nil, err } @@ -451,7 +546,7 @@ func newCacheEncryptMetadata(bucket, object string, metadata map[string]string) } // Caches the object to disk -func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, opts ObjectOptions) error { +func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions) error { if c.diskUsageHigh() { select { case c.purgeChan <- struct{}{}: @@ -459,6 +554,9 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read } return errDiskFull } + if rs != nil { + return c.putRange(ctx, bucket, object, data, size, rs, opts) + } if !c.diskAvailable(size) { return errDiskFull } @@ -480,7 +578,7 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read } actualSize, _ = sio.EncryptedSize(uint64(size)) } - n, err := c.bitrotWriteToCache(cachePath, reader, actualSize) + n, err := c.bitrotWriteToCache(cachePath, cacheDataFile, reader, actualSize) if IsErr(err, baseErrs...) { c.setOnline(false) } @@ -492,7 +590,53 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read removeAll(cachePath) return IncompleteBody{} } - return c.saveMetadata(ctx, bucket, object, metadata, n) + return c.saveMetadata(ctx, bucket, object, metadata, n, nil, "") +} + +// Caches the range to disk +func (c *diskCache) putRange(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions) error { + rlen, err := rs.GetLength(size) + if err != nil { + return err + } + if !c.diskAvailable(rlen) { + return errDiskFull + } + cachePath := getCacheSHADir(c.dir, bucket, object) + if err := os.MkdirAll(cachePath, 0777); err != nil { + return err + } + var metadata = make(map[string]string) + for k, v := range opts.UserDefined { + metadata[k] = v + } + var reader = data + var actualSize = uint64(rlen) + // objSize is the actual size of object (with encryption overhead if any) + var objSize = uint64(size) + if globalCacheKMS != nil { + reader, err = newCacheEncryptReader(data, bucket, object, metadata) + if err != nil { + return err + } + actualSize, _ = sio.EncryptedSize(uint64(rlen)) + objSize, _ = sio.EncryptedSize(uint64(size)) + + } + cacheFile := MustGetUUID() + n, err := c.bitrotWriteToCache(cachePath, cacheFile, reader, actualSize) + if IsErr(err, baseErrs...) { + c.setOnline(false) + } + if err != nil { + removeAll(cachePath) + return err + } + if actualSize != uint64(n) { + removeAll(cachePath) + return IncompleteBody{} + } + return c.saveMetadata(ctx, bucket, object, metadata, int64(objSize), rs, cacheFile) } // checks streaming bitrot checksum of cached object before returning data @@ -595,11 +739,20 @@ func (c *diskCache) bitrotReadFromCache(ctx context.Context, filePath string, of func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) { var objInfo ObjectInfo cacheObjPath := getCacheSHADir(c.dir, bucket, object) + var rngInfo RangeInfo - if objInfo, err = c.Stat(ctx, bucket, object); err != nil { + if objInfo, rngInfo, err = c.statRange(ctx, bucket, object, rs); err != nil { return nil, toObjectErr(err, bucket, object) } - + cacheFile := cacheDataFile + objSize := objInfo.Size + if !rngInfo.Empty() { + // for cached ranges, need to pass actual range file size to GetObjectReader + // and clear out range spec + cacheFile = rngInfo.File + objInfo.Size = rngInfo.Size + rs = nil + } var nsUnlocker = func() {} // For a directory, we need to send an reader that returns no bytes. if HasSuffix(object, SlashSeparator) { @@ -612,8 +765,7 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang if nErr != nil { return nil, nErr } - - filePath := path.Join(cacheObjPath, cacheDataFile) + filePath := pathJoin(cacheObjPath, cacheFile) pr, pw := io.Pipe() go func() { err := c.bitrotReadFromCache(ctx, filePath, off, length, pw) @@ -625,7 +777,20 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang // Cleanup function to cause the go routine above to exit, in // case of incomplete read. pipeCloser := func() { pr.Close() } - return fn(pr, h, opts.CheckCopyPrecondFn, pipeCloser) + + gr, gerr := fn(pr, h, opts.CheckCopyPrecondFn, pipeCloser) + if gerr != nil { + return gr, gerr + } + if globalCacheKMS != nil { + // clean up internal SSE cache metadata + delete(gr.ObjInfo.UserDefined, crypto.SSEHeader) + } + if !rngInfo.Empty() { + // overlay Size with actual object size and not the range size + gr.ObjInfo.Size = objSize + } + return gr, nil } diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index efc96c8fe..9f2a89e7e 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -96,13 +96,13 @@ func (c *cacheObjects) delete(ctx context.Context, dcache *diskCache, bucket, ob return dcache.Delete(ctx, bucket, object) } -func (c *cacheObjects) put(ctx context.Context, dcache *diskCache, bucket, object string, data io.Reader, size int64, opts ObjectOptions) error { +func (c *cacheObjects) put(ctx context.Context, dcache *diskCache, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions) error { cLock := c.NewNSLockFn(ctx, bucket, object) if err := cLock.GetLock(globalObjectTimeout); err != nil { return err } defer cLock.Unlock() - return dcache.Put(ctx, bucket, object, data, size, opts) + return dcache.Put(ctx, bucket, object, data, size, rs, opts) } func (c *cacheObjects) get(ctx context.Context, dcache *diskCache, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) { @@ -125,6 +125,17 @@ func (c *cacheObjects) stat(ctx context.Context, dcache *diskCache, bucket, obje return dcache.Stat(ctx, bucket, object) } +func (c *cacheObjects) statRange(ctx context.Context, dcache *diskCache, bucket, object string, rs *HTTPRangeSpec) (oi ObjectInfo, err error) { + cLock := c.NewNSLockFn(ctx, bucket, object) + if err := cLock.GetRLock(globalObjectTimeout); err != nil { + return oi, err + } + + defer cLock.RUnlock() + oi, _, err = dcache.statRange(ctx, bucket, object, rs) + return oi, err +} + // DeleteObject clears cache entry if backend delete operation succeeds func (c *cacheObjects) DeleteObject(ctx context.Context, bucket, object string) (err error) { if err = c.DeleteObjectFn(ctx, bucket, object); err != nil { @@ -200,7 +211,14 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string if (!cc.isEmpty() && !cc.isStale(cacheReader.ObjInfo.ModTime)) || cc.onlyIfCached { // This is a cache hit, mark it so - c.incCacheStats(cacheObjSize) + bytesServed := cacheReader.ObjInfo.Size + if rs != nil { + if _, len, err := rs.GetOffsetLength(bytesServed); err == nil { + bytesServed = len + } + } + c.cacheStats.incHit() + c.cacheStats.incBytesServed(bytesServed) return cacheReader, nil } if cc.noStore { @@ -261,20 +279,19 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string if rs != nil { go func() { // fill cache in the background for range GET requests - bReader, bErr := c.GetObjectNInfoFn(ctx, bucket, object, nil, h, lockType, opts) + bReader, bErr := c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts) if bErr != nil { return } defer bReader.Close() - oi, err := c.stat(ctx, dcache, bucket, object) + oi, err := c.statRange(ctx, dcache, bucket, object, rs) // avoid cache overwrite if another background routine filled cache if err != nil || oi.ETag != bReader.ObjInfo.ETag { - c.put(ctx, dcache, bucket, object, bReader, bReader.ObjInfo.Size, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}) + c.put(ctx, dcache, bucket, object, bReader, bReader.ObjInfo.Size, rs, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}) } }() return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts) } - bkReader, bkErr := c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts) if bkErr != nil { return nil, bkErr @@ -283,7 +300,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string pipeReader, pipeWriter := io.Pipe() teeReader := io.TeeReader(bkReader, pipeWriter) go func() { - putErr := c.put(ctx, dcache, bucket, object, io.LimitReader(pipeReader, bkReader.ObjInfo.Size), bkReader.ObjInfo.Size, ObjectOptions{UserDefined: getMetadata(bkReader.ObjInfo)}) + putErr := c.put(ctx, dcache, bucket, object, io.LimitReader(pipeReader, bkReader.ObjInfo.Size), bkReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bkReader.ObjInfo)}) // close the write end of the pipe, so the error gets // propagated to getObjReader pipeWriter.CloseWithError(putErr) @@ -597,7 +614,7 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r * oi, err := c.stat(ctx, dcache, bucket, object) // avoid cache overwrite if another background routine filled cache if err != nil || oi.ETag != bReader.ObjInfo.ETag { - c.put(ctx, dcache, bucket, object, bReader, bReader.ObjInfo.Size, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}) + c.put(ctx, dcache, bucket, object, bReader, bReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}) } }() } diff --git a/cmd/disk-cache_test.go b/cmd/disk-cache_test.go index f6ebe9a6e..a255b553a 100644 --- a/cmd/disk-cache_test.go +++ b/cmd/disk-cache_test.go @@ -212,7 +212,7 @@ func TestDiskCache(t *testing.T) { if err != nil { t.Fatal(err) } - err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), ObjectOptions{UserDefined: httpMeta}) + err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), nil, ObjectOptions{UserDefined: httpMeta}) if err != nil { t.Fatal(err) } @@ -290,12 +290,12 @@ func TestDiskCacheMaxUse(t *testing.T) { t.Fatal(err) } if !cache.diskAvailable(int64(size)) { - err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), ObjectOptions{UserDefined: httpMeta}) + err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), nil, ObjectOptions{UserDefined: httpMeta}) if err != errDiskFull { t.Fatal("Cache max-use limit violated.") } } else { - err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), ObjectOptions{UserDefined: httpMeta}) + err = cache.Put(ctx, bucketName, objectName, hashReader, hashReader.Size(), nil, ObjectOptions{UserDefined: httpMeta}) if err != nil { t.Fatal(err) } diff --git a/cmd/format-disk-cache.go b/cmd/format-disk-cache.go index 65d2f3dcc..4beec1b1d 100644 --- a/cmd/format-disk-cache.go +++ b/cmd/format-disk-cache.go @@ -383,8 +383,7 @@ func migrateCacheData(ctx context.Context, c *diskCache, bucket, object, oldfile } actualSize, _ = sio.EncryptedSize(uint64(st.Size())) } - - _, err = c.bitrotWriteToCache(destDir, reader, uint64(actualSize)) + _, err = c.bitrotWriteToCache(destDir, cacheDataFile, reader, uint64(actualSize)) return err } diff --git a/cmd/httprange.go b/cmd/httprange.go index 8482f2fc1..145e54f69 100644 --- a/cmd/httprange.go +++ b/cmd/httprange.go @@ -161,3 +161,15 @@ func parseRequestRangeSpec(rangeString string) (hrange *HTTPRangeSpec, err error return nil, fmt.Errorf("'%s' does not have valid range value", rangeString) } } + +// String returns stringified representation of range for a particular resource size. +func (h *HTTPRangeSpec) String(resourceSize int64) string { + if h == nil { + return "" + } + off, length, err := h.GetOffsetLength(resourceSize) + if err != nil { + return "" + } + return fmt.Sprintf("%d-%d", off, off+length-1) +}