From 29f64355ce22f4769b54a65945b7334d1b9dffdc Mon Sep 17 00:00:00 2001 From: poornas Date: Thu, 5 Sep 2019 07:20:16 -0700 Subject: [PATCH] Allow caching on single PutObject (#8100) --- cmd/disk-cache-backend.go | 22 ++++++++++ cmd/disk-cache.go | 86 ++++++++++++++++++++++++++++++++++++++- cmd/object-handlers.go | 7 +++- cmd/web-handlers.go | 3 ++ 4 files changed, 116 insertions(+), 2 deletions(-) diff --git a/cmd/disk-cache-backend.go b/cmd/disk-cache-backend.go index 20495f4b3..ee2ad0c3f 100644 --- a/cmd/disk-cache-backend.go +++ b/cmd/disk-cache-backend.go @@ -333,6 +333,28 @@ func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, met return err } +// updates ETag in cache metadata file to the backend ETag. +func (c *diskCache) updateETag(ctx context.Context, bucket, object string, etag string) error { + metaPath := path.Join(getCacheSHADir(c.dir, bucket, object), cacheMetaJSONFile) + f, err := os.OpenFile(metaPath, os.O_RDWR, 0) + if err != nil { + return err + } + defer f.Close() + + meta := &cacheMeta{Version: cacheMetaVersion} + if err := jsonLoad(f, meta); err != nil { + return err + } + meta.Meta["etag"] = etag + jsonData, err := json.Marshal(meta) + if err != nil { + return err + } + _, err = f.Write(jsonData) + return err +} + // Backend metadata could have changed through server side copy - reset cache metadata if that is the case func (c *diskCache) updateMetadataIfChanged(ctx context.Context, bucket, object string, bkObjectInfo, cacheObjInfo ObjectInfo) error { diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index ec5a24c9b..c060c521b 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -14,6 +14,7 @@ import ( "github.com/djherbis/atime" "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/hash" "github.com/minio/minio/pkg/wildcard" ) @@ -35,6 +36,7 @@ type CacheObjectLayer interface { GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) DeleteObject(ctx context.Context, bucket, object string) error DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) + PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) // Storage operations. StorageInfo(ctx context.Context) CacheStorageInfo } @@ -58,6 +60,7 @@ type cacheObjects struct { GetObjectInfoFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) DeleteObjectFn func(ctx context.Context, bucket, object string) error DeleteObjectsFn func(ctx context.Context, bucket string, objects []string) ([]error, error) + PutObjectFn func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) } func (c *cacheObjects) delete(ctx context.Context, dcache *diskCache, bucket, object string) (err error) { @@ -231,7 +234,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string pipeReader, pipeWriter := io.Pipe() teeReader := io.TeeReader(bkReader, pipeWriter) go func() { - putErr := dcache.Put(ctx, bucket, object, io.LimitReader(pipeReader, bkReader.ObjInfo.Size), bkReader.ObjInfo.Size, ObjectOptions{UserDefined: getMetadata(bkReader.ObjInfo)}) + putErr := c.put(ctx, dcache, bucket, object, io.LimitReader(pipeReader, bkReader.ObjInfo.Size), bkReader.ObjInfo.Size, ObjectOptions{UserDefined: getMetadata(bkReader.ObjInfo)}) // close the write end of the pipe, so the error gets // propagated to getObjReader pipeWriter.CloseWithError(putErr) @@ -483,6 +486,84 @@ func (c *cacheObjects) migrateCacheFromV1toV2(ctx context.Context) { logger.StartupMessage(colorBlue("Cache migration completed successfully.")) } +// PutObject - caches the uploaded object for single Put operations +func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { + putObjectFn := c.PutObjectFn + data := r.rawReader + dcache, err := c.getCacheToLoc(ctx, bucket, object) + if err != nil { + // disk cache could not be located,execute backend call. + return putObjectFn(ctx, bucket, object, r, opts) + } + size := r.Size() + if c.skipCache() { + return putObjectFn(ctx, bucket, object, r, opts) + } + + // fetch from backend if there is no space on cache drive + if !dcache.diskAvailable(size) { + return putObjectFn(ctx, bucket, object, r, opts) + } + if opts.ServerSideEncryption != nil { + return putObjectFn(ctx, bucket, object, r, opts) + } + // fetch from backend if cache exclude pattern or cache-control + // directive set to exclude + if c.isCacheExclude(bucket, object) { + dcache.Delete(ctx, bucket, object) + return putObjectFn(ctx, bucket, object, r, opts) + } + // Initialize pipe to stream data to backend + pipeReader, pipeWriter := io.Pipe() + hashReader, err := hash.NewReader(pipeReader, size, "", "", data.ActualSize(), globalCLIContext.StrictS3Compat) + if err != nil { + return ObjectInfo{}, err + } + // Initialize pipe to stream data to cache + rPipe, wPipe := io.Pipe() + + oinfoCh := make(chan ObjectInfo) + errCh := make(chan error) + go func() { + oinfo, perr := putObjectFn(ctx, bucket, object, NewPutObjReader(hashReader, nil, nil), opts) + if perr != nil { + pipeWriter.CloseWithError(perr) + wPipe.CloseWithError(perr) + close(oinfoCh) + errCh <- perr + return + } + close(errCh) + oinfoCh <- oinfo + }() + // get a namespace lock on cache until cache is filled. + cLock := c.nsMutex.NewNSLock(ctx, bucket, object) + if err := cLock.GetLock(globalObjectTimeout); err != nil { + return ObjectInfo{}, err + } + defer cLock.Unlock() + go func() { + if err = dcache.Put(ctx, bucket, object, rPipe, data.Size(), opts); err != nil { + wPipe.CloseWithError(err) + return + } + }() + + mwriter := io.MultiWriter(pipeWriter, wPipe) + _, err = io.Copy(mwriter, data) + if err != nil { + err = <-errCh + return objInfo, err + } + pipeWriter.Close() + wPipe.Close() + objInfo = <-oinfoCh + dcache.updateETag(ctx, bucket, object, objInfo.ETag) + + return objInfo, err + +} + // Returns cacheObjects for use by Server. func newServerCacheObjects(ctx context.Context, config CacheConfig) (CacheObjectLayer, error) { // list of disk caches for cache "drives" specified in config.json or MINIO_CACHE_DRIVES env var. @@ -513,6 +594,9 @@ func newServerCacheObjects(ctx context.Context, config CacheConfig) (CacheObject } return errs, nil }, + PutObjectFn: func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { + return newObjectLayerFn().PutObject(ctx, bucket, object, data, opts) + }, } if migrateSw { go c.migrateCacheFromV1toV2(ctx) diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index fc3a46040..5776603bd 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -1205,6 +1205,11 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } + getObjectInfo := objectAPI.GetObjectInfo + if api.CacheAPI() != nil { + getObjectInfo = api.CacheAPI().GetObjectInfo + putObject = api.CacheAPI().PutObject + } rawReader := hashReader pReader := NewPutObjReader(rawReader, nil, nil) @@ -1219,7 +1224,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req // Deny if WORM is enabled if globalWORMEnabled { - if _, err = objectAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil { + if _, err = getObjectInfo(ctx, bucket, object, opts); err == nil { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r)) return } diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index 350293e55..8a0148f9e 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -1053,6 +1053,9 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { } putObject := objectAPI.PutObject + if web.CacheAPI() != nil { + putObject = web.CacheAPI().PutObject + } objInfo, err := putObject(context.Background(), bucket, object, pReader, opts) if err != nil {