evict cached entry for server side copy (#8947)

Fixes #8942
master
poornas 5 years ago committed by GitHub
parent 502e652b7a
commit 9b4d46a6ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 29
      cmd/disk-cache.go
  2. 6
      cmd/object-handlers.go

@ -56,6 +56,7 @@ type CacheObjectLayer interface {
DeleteObject(ctx context.Context, bucket, object string) error DeleteObject(ctx context.Context, bucket, object string) error
DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error)
PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error)
// Storage operations. // Storage operations.
StorageInfo(ctx context.Context) CacheStorageInfo StorageInfo(ctx context.Context) CacheStorageInfo
CacheStats() *CacheStats CacheStats() *CacheStats
@ -82,6 +83,7 @@ type cacheObjects struct {
DeleteObjectFn func(ctx context.Context, bucket, object string) error DeleteObjectFn func(ctx context.Context, bucket, object string) error
DeleteObjectsFn func(ctx context.Context, bucket string, objects []string) ([]error, error) DeleteObjectsFn func(ctx context.Context, bucket string, objects []string) ([]error, error)
PutObjectFn func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) PutObjectFn func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
CopyObjectFn func(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error)
} }
func (c *cacheObjects) incHitsToMeta(ctx context.Context, dcache *diskCache, bucket, object string, size int64, eTag string) error { func (c *cacheObjects) incHitsToMeta(ctx context.Context, dcache *diskCache, bucket, object string, size int64, eTag string) error {
@ -373,6 +375,30 @@ func (c *cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string,
return objInfo, nil return objInfo, nil
} }
// CopyObject reverts to backend after evicting any stale cache entries
func (c *cacheObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) {
copyObjectFn := c.CopyObjectFn
if c.isCacheExclude(srcBucket, srcObject) || c.skipCache() {
return copyObjectFn(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
}
if srcBucket != dstBucket || srcObject != dstObject {
return copyObjectFn(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
}
// fetch diskCache if object is currently cached or nearest available cache drive
dcache, err := c.getCacheToLoc(ctx, srcBucket, srcObject)
if err != nil {
return copyObjectFn(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
}
// if currently cached, evict old entry and revert to backend.
if cachedObjInfo, _, cerr := dcache.Stat(ctx, srcBucket, srcObject); cerr == nil {
cc := cacheControlOpts(cachedObjInfo)
if !cc.isStale(cachedObjInfo.ModTime) {
dcache.Delete(ctx, srcBucket, srcObject)
}
}
return copyObjectFn(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
}
// StorageInfo - returns underlying storage statistics. // StorageInfo - returns underlying storage statistics.
func (c *cacheObjects) StorageInfo(ctx context.Context) (cInfo CacheStorageInfo) { func (c *cacheObjects) StorageInfo(ctx context.Context) (cInfo CacheStorageInfo) {
var total, free uint64 var total, free uint64
@ -663,6 +689,9 @@ func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjec
PutObjectFn: func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { PutObjectFn: func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
return newObjectLayerFn().PutObject(ctx, bucket, object, data, opts) return newObjectLayerFn().PutObject(ctx, bucket, object, data, opts)
}, },
CopyObjectFn: func(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) {
return newObjectLayerFn().CopyObject(ctx, srcBucket, srcObject, destBucket, destObject, srcInfo, srcOpts, dstOpts)
},
} }
if migrateSw { if migrateSw {

@ -1077,9 +1077,13 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
objInfo.ETag = remoteObjInfo.ETag objInfo.ETag = remoteObjInfo.ETag
objInfo.ModTime = remoteObjInfo.LastModified objInfo.ModTime = remoteObjInfo.LastModified
} else { } else {
copyObjectFn := objectAPI.CopyObject
if api.CacheAPI() != nil {
copyObjectFn = api.CacheAPI().CopyObject
}
// Copy source object to destination, if source and destination // Copy source object to destination, if source and destination
// object is same then only metadata is updated. // object is same then only metadata is updated.
objInfo, err = objectAPI.CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts) objInfo, err = copyObjectFn(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
if err != nil { if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return return

Loading…
Cancel
Save