Allow caching on single PutObject (#8100)

master
poornas 5 years ago committed by kannappanr
parent 496fba3e9a
commit 29f64355ce
  1. 22
      cmd/disk-cache-backend.go
  2. 86
      cmd/disk-cache.go
  3. 7
      cmd/object-handlers.go
  4. 3
      cmd/web-handlers.go

@ -333,6 +333,28 @@ func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, met
return err
}
// updates ETag in cache metadata file to the backend ETag.
func (c *diskCache) updateETag(ctx context.Context, bucket, object string, etag string) error {
metaPath := path.Join(getCacheSHADir(c.dir, bucket, object), cacheMetaJSONFile)
f, err := os.OpenFile(metaPath, os.O_RDWR, 0)
if err != nil {
return err
}
defer f.Close()
meta := &cacheMeta{Version: cacheMetaVersion}
if err := jsonLoad(f, meta); err != nil {
return err
}
meta.Meta["etag"] = etag
jsonData, err := json.Marshal(meta)
if err != nil {
return err
}
_, err = f.Write(jsonData)
return err
}
// Backend metadata could have changed through server side copy - reset cache metadata if that is the case
func (c *diskCache) updateMetadataIfChanged(ctx context.Context, bucket, object string, bkObjectInfo, cacheObjInfo ObjectInfo) error {

@ -14,6 +14,7 @@ import (
"github.com/djherbis/atime"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/hash"
"github.com/minio/minio/pkg/wildcard"
)
@ -35,6 +36,7 @@ type CacheObjectLayer interface {
GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
DeleteObject(ctx context.Context, bucket, object string) error
DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error)
PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
// Storage operations.
StorageInfo(ctx context.Context) CacheStorageInfo
}
@ -58,6 +60,7 @@ type cacheObjects struct {
GetObjectInfoFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
DeleteObjectFn func(ctx context.Context, bucket, object string) error
DeleteObjectsFn func(ctx context.Context, bucket string, objects []string) ([]error, error)
PutObjectFn func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
}
func (c *cacheObjects) delete(ctx context.Context, dcache *diskCache, bucket, object string) (err error) {
@ -231,7 +234,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
pipeReader, pipeWriter := io.Pipe()
teeReader := io.TeeReader(bkReader, pipeWriter)
go func() {
putErr := dcache.Put(ctx, bucket, object, io.LimitReader(pipeReader, bkReader.ObjInfo.Size), bkReader.ObjInfo.Size, ObjectOptions{UserDefined: getMetadata(bkReader.ObjInfo)})
putErr := c.put(ctx, dcache, bucket, object, io.LimitReader(pipeReader, bkReader.ObjInfo.Size), bkReader.ObjInfo.Size, ObjectOptions{UserDefined: getMetadata(bkReader.ObjInfo)})
// close the write end of the pipe, so the error gets
// propagated to getObjReader
pipeWriter.CloseWithError(putErr)
@ -483,6 +486,84 @@ func (c *cacheObjects) migrateCacheFromV1toV2(ctx context.Context) {
logger.StartupMessage(colorBlue("Cache migration completed successfully."))
}
// PutObject - caches the uploaded object for single Put operations
func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
putObjectFn := c.PutObjectFn
data := r.rawReader
dcache, err := c.getCacheToLoc(ctx, bucket, object)
if err != nil {
// disk cache could not be located,execute backend call.
return putObjectFn(ctx, bucket, object, r, opts)
}
size := r.Size()
if c.skipCache() {
return putObjectFn(ctx, bucket, object, r, opts)
}
// fetch from backend if there is no space on cache drive
if !dcache.diskAvailable(size) {
return putObjectFn(ctx, bucket, object, r, opts)
}
if opts.ServerSideEncryption != nil {
return putObjectFn(ctx, bucket, object, r, opts)
}
// fetch from backend if cache exclude pattern or cache-control
// directive set to exclude
if c.isCacheExclude(bucket, object) {
dcache.Delete(ctx, bucket, object)
return putObjectFn(ctx, bucket, object, r, opts)
}
// Initialize pipe to stream data to backend
pipeReader, pipeWriter := io.Pipe()
hashReader, err := hash.NewReader(pipeReader, size, "", "", data.ActualSize(), globalCLIContext.StrictS3Compat)
if err != nil {
return ObjectInfo{}, err
}
// Initialize pipe to stream data to cache
rPipe, wPipe := io.Pipe()
oinfoCh := make(chan ObjectInfo)
errCh := make(chan error)
go func() {
oinfo, perr := putObjectFn(ctx, bucket, object, NewPutObjReader(hashReader, nil, nil), opts)
if perr != nil {
pipeWriter.CloseWithError(perr)
wPipe.CloseWithError(perr)
close(oinfoCh)
errCh <- perr
return
}
close(errCh)
oinfoCh <- oinfo
}()
// get a namespace lock on cache until cache is filled.
cLock := c.nsMutex.NewNSLock(ctx, bucket, object)
if err := cLock.GetLock(globalObjectTimeout); err != nil {
return ObjectInfo{}, err
}
defer cLock.Unlock()
go func() {
if err = dcache.Put(ctx, bucket, object, rPipe, data.Size(), opts); err != nil {
wPipe.CloseWithError(err)
return
}
}()
mwriter := io.MultiWriter(pipeWriter, wPipe)
_, err = io.Copy(mwriter, data)
if err != nil {
err = <-errCh
return objInfo, err
}
pipeWriter.Close()
wPipe.Close()
objInfo = <-oinfoCh
dcache.updateETag(ctx, bucket, object, objInfo.ETag)
return objInfo, err
}
// Returns cacheObjects for use by Server.
func newServerCacheObjects(ctx context.Context, config CacheConfig) (CacheObjectLayer, error) {
// list of disk caches for cache "drives" specified in config.json or MINIO_CACHE_DRIVES env var.
@ -513,6 +594,9 @@ func newServerCacheObjects(ctx context.Context, config CacheConfig) (CacheObject
}
return errs, nil
},
PutObjectFn: func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
return newObjectLayerFn().PutObject(ctx, bucket, object, data, opts)
},
}
if migrateSw {
go c.migrateCacheFromV1toV2(ctx)

@ -1205,6 +1205,11 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
putObject = api.CacheAPI().PutObject
}
rawReader := hashReader
pReader := NewPutObjReader(rawReader, nil, nil)
@ -1219,7 +1224,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
// Deny if WORM is enabled
if globalWORMEnabled {
if _, err = objectAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil {
if _, err = getObjectInfo(ctx, bucket, object, opts); err == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r))
return
}

@ -1053,6 +1053,9 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
}
putObject := objectAPI.PutObject
if web.CacheAPI() != nil {
putObject = web.CacheAPI().PutObject
}
objInfo, err := putObject(context.Background(), bucket, object, pReader, opts)
if err != nil {

Loading…
Cancel
Save