Add async caching commit option in diskcache (#10742)

Add store and a forward option for a single part
uploads when an async mode is enabled with env
MINIO_CACHE_COMMIT=writeback 

It defaults to `writethrough` if unspecified.
master
Poorna Krishnamoorthy 4 years ago committed by GitHub
parent 4c773f7068
commit 03fdbc3ec2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 31
      cmd/config/cache/config.go
  2. 6
      cmd/config/cache/help.go
  3. 13
      cmd/config/cache/lookup.go
  4. 6
      cmd/config/errors.go
  5. 166
      cmd/disk-cache-backend.go
  6. 148
      cmd/disk-cache.go
  7. 2
      cmd/format-disk-cache.go

@ -28,16 +28,17 @@ import (
// Config represents cache config settings // Config represents cache config settings
type Config struct { type Config struct {
Enabled bool `json:"-"` Enabled bool `json:"-"`
Drives []string `json:"drives"` Drives []string `json:"drives"`
Expiry int `json:"expiry"` Expiry int `json:"expiry"`
MaxUse int `json:"maxuse"` MaxUse int `json:"maxuse"`
Quota int `json:"quota"` Quota int `json:"quota"`
Exclude []string `json:"exclude"` Exclude []string `json:"exclude"`
After int `json:"after"` After int `json:"after"`
WatermarkLow int `json:"watermark_low"` WatermarkLow int `json:"watermark_low"`
WatermarkHigh int `json:"watermark_high"` WatermarkHigh int `json:"watermark_high"`
Range bool `json:"range"` Range bool `json:"range"`
CommitWriteback bool `json:"-"`
} }
// UnmarshalJSON - implements JSON unmarshal interface for unmarshalling // UnmarshalJSON - implements JSON unmarshal interface for unmarshalling
@ -152,3 +153,13 @@ func parseCacheExcludes(excludes string) ([]string, error) {
return excludesSlice, nil return excludesSlice, nil
} }
func parseCacheCommitMode(commitStr string) (bool, error) {
switch strings.ToLower(commitStr) {
case "writeback":
return true, nil
case "writethrough":
return false, nil
}
return false, config.ErrInvalidCacheCommitValue(nil).Msg("cache commit value must be `writeback` or `writethrough`")
}

@ -74,5 +74,11 @@ var (
Optional: true, Optional: true,
Type: "string", Type: "string",
}, },
config.HelpKV{
Key: Commit,
Description: `set to control cache commit behavior, defaults to "writethrough"`,
Optional: true,
Type: "string",
},
} }
) )

@ -35,6 +35,7 @@ const (
WatermarkLow = "watermark_low" WatermarkLow = "watermark_low"
WatermarkHigh = "watermark_high" WatermarkHigh = "watermark_high"
Range = "range" Range = "range"
Commit = "commit"
EnvCacheDrives = "MINIO_CACHE_DRIVES" EnvCacheDrives = "MINIO_CACHE_DRIVES"
EnvCacheExclude = "MINIO_CACHE_EXCLUDE" EnvCacheExclude = "MINIO_CACHE_EXCLUDE"
@ -45,6 +46,7 @@ const (
EnvCacheWatermarkLow = "MINIO_CACHE_WATERMARK_LOW" EnvCacheWatermarkLow = "MINIO_CACHE_WATERMARK_LOW"
EnvCacheWatermarkHigh = "MINIO_CACHE_WATERMARK_HIGH" EnvCacheWatermarkHigh = "MINIO_CACHE_WATERMARK_HIGH"
EnvCacheRange = "MINIO_CACHE_RANGE" EnvCacheRange = "MINIO_CACHE_RANGE"
EnvCacheCommit = "MINIO_CACHE_COMMIT"
EnvCacheEncryptionMasterKey = "MINIO_CACHE_ENCRYPTION_MASTER_KEY" EnvCacheEncryptionMasterKey = "MINIO_CACHE_ENCRYPTION_MASTER_KEY"
@ -53,6 +55,7 @@ const (
DefaultAfter = "0" DefaultAfter = "0"
DefaultWaterMarkLow = "70" DefaultWaterMarkLow = "70"
DefaultWaterMarkHigh = "80" DefaultWaterMarkHigh = "80"
DefaultCacheCommit = "writethrough"
) )
// DefaultKVS - default KV settings for caching. // DefaultKVS - default KV settings for caching.
@ -90,6 +93,10 @@ var (
Key: Range, Key: Range,
Value: config.EnableOn, Value: config.EnableOn,
}, },
config.KV{
Key: Commit,
Value: DefaultCacheCommit,
},
} }
) )
@ -210,6 +217,12 @@ func LookupConfig(kvs config.KVS) (Config, error) {
} }
cfg.Range = rng cfg.Range = rng
} }
if commit := env.Get(EnvCacheCommit, kvs.Get(Commit)); commit != "" {
cfg.CommitWriteback, err = parseCacheCommitMode(commit)
if err != nil {
return cfg, err
}
}
return cfg, nil return cfg, nil
} }

@ -102,6 +102,12 @@ var (
"MINIO_CACHE_RANGE: Valid expected value is `on` or `off`", "MINIO_CACHE_RANGE: Valid expected value is `on` or `off`",
) )
ErrInvalidCacheCommitValue = newErrFn(
"Invalid cache commit value",
"Please check the passed value",
"MINIO_CACHE_COMMIT: Valid expected value is `writeback` or `writethrough`",
)
ErrInvalidRotatingCredentialsBackendEncrypted = newErrFn( ErrInvalidRotatingCredentialsBackendEncrypted = newErrFn(
"Invalid rotating credentials", "Invalid rotating credentials",
"Please set correct rotating credentials in the environment for decryption", "Please set correct rotating credentials in the environment for decryption",

@ -19,7 +19,9 @@ package cmd
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/md5"
"crypto/rand" "crypto/rand"
"encoding/base64"
"encoding/hex" "encoding/hex"
"errors" "errors"
"fmt" "fmt"
@ -70,7 +72,9 @@ type cacheMeta struct {
// Ranges maps cached range to associated filename. // Ranges maps cached range to associated filename.
Ranges map[string]string `json:"ranges,omitempty"` Ranges map[string]string `json:"ranges,omitempty"`
// Hits is a counter on the number of times this object has been accessed so far. // Hits is a counter on the number of times this object has been accessed so far.
Hits int `json:"hits,omitempty"` Hits int `json:"hits,omitempty"`
Bucket string `json:"bucket,omitempty"`
Object string `json:"object,omitempty"`
} }
// RangeInfo has the range, file and range length information for a cached range. // RangeInfo has the range, file and range length information for a cached range.
@ -130,15 +134,17 @@ type diskCache struct {
online uint32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG online uint32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
purgeRunning int32 purgeRunning int32
triggerGC chan struct{} triggerGC chan struct{}
dir string // caching directory dir string // caching directory
stats CacheDiskStats // disk cache stats for prometheus stats CacheDiskStats // disk cache stats for prometheus
quotaPct int // max usage in % quotaPct int // max usage in %
pool sync.Pool pool sync.Pool
after int // minimum accesses before an object is cached. after int // minimum accesses before an object is cached.
lowWatermark int lowWatermark int
highWatermark int highWatermark int
enableRange bool enableRange bool
commitWriteback bool
retryWritebackCh chan ObjectInfo
// nsMutex namespace lock // nsMutex namespace lock
nsMutex *nsLockMap nsMutex *nsLockMap
// Object functions pointing to the corresponding functions of backend implementation. // Object functions pointing to the corresponding functions of backend implementation.
@ -156,15 +162,17 @@ func newDiskCache(ctx context.Context, dir string, config cache.Config) (*diskCa
return nil, fmt.Errorf("Unable to initialize '%s' dir, %w", dir, err) return nil, fmt.Errorf("Unable to initialize '%s' dir, %w", dir, err)
} }
cache := diskCache{ cache := diskCache{
dir: dir, dir: dir,
triggerGC: make(chan struct{}, 1), triggerGC: make(chan struct{}, 1),
stats: CacheDiskStats{Dir: dir}, stats: CacheDiskStats{Dir: dir},
quotaPct: quotaPct, quotaPct: quotaPct,
after: config.After, after: config.After,
lowWatermark: config.WatermarkLow, lowWatermark: config.WatermarkLow,
highWatermark: config.WatermarkHigh, highWatermark: config.WatermarkHigh,
enableRange: config.Range, enableRange: config.Range,
online: 1, commitWriteback: config.CommitWriteback,
retryWritebackCh: make(chan ObjectInfo, 10000),
online: 1,
pool: sync.Pool{ pool: sync.Pool{
New: func() interface{} { New: func() interface{} {
b := disk.AlignedBlock(int(cacheBlkSize)) b := disk.AlignedBlock(int(cacheBlkSize))
@ -174,6 +182,9 @@ func newDiskCache(ctx context.Context, dir string, config cache.Config) (*diskCa
nsMutex: newNSLock(false), nsMutex: newNSLock(false),
} }
go cache.purgeWait(ctx) go cache.purgeWait(ctx)
if cache.commitWriteback {
go cache.scanCacheWritebackFailures(ctx)
}
cache.diskSpaceAvailable(0) // update if cache usage is already high. cache.diskSpaceAvailable(0) // update if cache usage is already high.
cache.NewNSLockFn = func(ctx context.Context, cachePath string) RWLocker { cache.NewNSLockFn = func(ctx context.Context, cachePath string) RWLocker {
return cache.nsMutex.NewNSLock(ctx, nil, cachePath, "") return cache.nsMutex.NewNSLock(ctx, nil, cachePath, "")
@ -323,6 +334,12 @@ func (c *diskCache) purge(ctx context.Context) {
// stat all cached file ranges and cacheDataFile. // stat all cached file ranges and cacheDataFile.
cachedFiles := fiStatFn(meta.Ranges, cacheDataFile, pathJoin(c.dir, name)) cachedFiles := fiStatFn(meta.Ranges, cacheDataFile, pathJoin(c.dir, name))
objInfo := meta.ToObjectInfo("", "") objInfo := meta.ToObjectInfo("", "")
// prevent gc from clearing un-synced commits. This metadata is present when
// cache writeback commit setting is enabled.
status, ok := objInfo.UserDefined[writeBackStatusHeader]
if ok && status != CommitComplete.String() {
return nil
}
cc := cacheControlOpts(objInfo) cc := cacheControlOpts(objInfo)
for fname, fi := range cachedFiles { for fname, fi := range cachedFiles {
if cc != nil { if cc != nil {
@ -524,7 +541,11 @@ func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, met
} }
defer f.Close() defer f.Close()
m := &cacheMeta{Version: cacheMetaVersion} m := &cacheMeta{
Version: cacheMetaVersion,
Bucket: bucket,
Object: object,
}
if err := jsonLoad(f, m); err != nil && err != io.EOF { if err := jsonLoad(f, m); err != nil && err != io.EOF {
return err return err
} }
@ -561,7 +582,6 @@ func (c *diskCache) saveMetadata(ctx context.Context, bucket, object string, met
m.Meta["etag"] = etag m.Meta["etag"] = etag
} }
} }
m.Hits++ m.Hits++
m.Checksum = CacheChecksumInfoV1{Algorithm: HighwayHash256S.String(), Blocksize: cacheBlkSize} m.Checksum = CacheChecksumInfoV1{Algorithm: HighwayHash256S.String(), Blocksize: cacheBlkSize}
@ -573,22 +593,22 @@ func getCacheSHADir(dir, bucket, object string) string {
} }
// Cache data to disk with bitrot checksum added for each block of 1MB // Cache data to disk with bitrot checksum added for each block of 1MB
func (c *diskCache) bitrotWriteToCache(cachePath, fileName string, reader io.Reader, size uint64) (int64, error) { func (c *diskCache) bitrotWriteToCache(cachePath, fileName string, reader io.Reader, size uint64) (int64, string, error) {
if err := os.MkdirAll(cachePath, 0777); err != nil { if err := os.MkdirAll(cachePath, 0777); err != nil {
return 0, err return 0, "", err
} }
filePath := pathJoin(cachePath, fileName) filePath := pathJoin(cachePath, fileName)
if filePath == "" || reader == nil { if filePath == "" || reader == nil {
return 0, errInvalidArgument return 0, "", errInvalidArgument
} }
if err := checkPathLength(filePath); err != nil { if err := checkPathLength(filePath); err != nil {
return 0, err return 0, "", err
} }
f, err := os.Create(filePath) f, err := os.Create(filePath)
if err != nil { if err != nil {
return 0, osErrToFileErr(err) return 0, "", osErrToFileErr(err)
} }
defer f.Close() defer f.Close()
@ -598,12 +618,12 @@ func (c *diskCache) bitrotWriteToCache(cachePath, fileName string, reader io.Rea
bufp := c.pool.Get().(*[]byte) bufp := c.pool.Get().(*[]byte)
defer c.pool.Put(bufp) defer c.pool.Put(bufp)
md5Hash := md5.New()
var n, n2 int var n, n2 int
for { for {
n, err = io.ReadFull(reader, *bufp) n, err = io.ReadFull(reader, *bufp)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
return 0, err return 0, "", err
} }
eof := err == io.EOF || err == io.ErrUnexpectedEOF eof := err == io.EOF || err == io.ErrUnexpectedEOF
if n == 0 && size != 0 { if n == 0 && size != 0 {
@ -612,21 +632,28 @@ func (c *diskCache) bitrotWriteToCache(cachePath, fileName string, reader io.Rea
} }
h.Reset() h.Reset()
if _, err = h.Write((*bufp)[:n]); err != nil { if _, err = h.Write((*bufp)[:n]); err != nil {
return 0, err return 0, "", err
} }
hashBytes := h.Sum(nil) hashBytes := h.Sum(nil)
// compute md5Hash of original data stream if writeback commit to cache
if c.commitWriteback {
if _, err = md5Hash.Write((*bufp)[:n]); err != nil {
return 0, "", err
}
}
if _, err = f.Write(hashBytes); err != nil { if _, err = f.Write(hashBytes); err != nil {
return 0, err return 0, "", err
} }
if n2, err = f.Write((*bufp)[:n]); err != nil { if n2, err = f.Write((*bufp)[:n]); err != nil {
return 0, err return 0, "", err
} }
bytesWritten += int64(n2) bytesWritten += int64(n2)
if eof { if eof {
break break
} }
} }
return bytesWritten, nil
return bytesWritten, base64.StdEncoding.EncodeToString(md5Hash.Sum(nil)), nil
} }
func newCacheEncryptReader(content io.Reader, bucket, object string, metadata map[string]string) (r io.Reader, err error) { func newCacheEncryptReader(content io.Reader, bucket, object string, metadata map[string]string) (r io.Reader, err error) {
@ -663,41 +690,41 @@ func newCacheEncryptMetadata(bucket, object string, metadata map[string]string)
} }
// Caches the object to disk // Caches the object to disk
func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions, incHitsOnly bool) error { func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions, incHitsOnly bool) (oi ObjectInfo, err error) {
if !c.diskSpaceAvailable(size) { if !c.diskSpaceAvailable(size) {
io.Copy(ioutil.Discard, data) io.Copy(ioutil.Discard, data)
return errDiskFull return oi, errDiskFull
} }
cachePath := getCacheSHADir(c.dir, bucket, object) cachePath := getCacheSHADir(c.dir, bucket, object)
cLock := c.NewNSLockFn(ctx, cachePath) cLock := c.NewNSLockFn(ctx, cachePath)
if err := cLock.GetLock(globalOperationTimeout); err != nil { if err := cLock.GetLock(globalOperationTimeout); err != nil {
return err return oi, err
} }
defer cLock.Unlock() defer cLock.Unlock()
meta, _, numHits, err := c.statCache(ctx, cachePath) meta, _, numHits, err := c.statCache(ctx, cachePath)
// Case where object not yet cached // Case where object not yet cached
if os.IsNotExist(err) && c.after >= 1 { if os.IsNotExist(err) && c.after >= 1 {
return c.saveMetadata(ctx, bucket, object, opts.UserDefined, size, nil, "", false) return oi, c.saveMetadata(ctx, bucket, object, opts.UserDefined, size, nil, "", false)
} }
// Case where object already has a cache metadata entry but not yet cached // Case where object already has a cache metadata entry but not yet cached
if err == nil && numHits < c.after { if err == nil && numHits < c.after {
cETag := extractETag(meta.Meta) cETag := extractETag(meta.Meta)
bETag := extractETag(opts.UserDefined) bETag := extractETag(opts.UserDefined)
if cETag == bETag { if cETag == bETag {
return c.saveMetadata(ctx, bucket, object, opts.UserDefined, size, nil, "", false) return oi, c.saveMetadata(ctx, bucket, object, opts.UserDefined, size, nil, "", false)
} }
incHitsOnly = true incHitsOnly = true
} }
if rs != nil { if rs != nil {
return c.putRange(ctx, bucket, object, data, size, rs, opts) return oi, c.putRange(ctx, bucket, object, data, size, rs, opts)
} }
if !c.diskSpaceAvailable(size) { if !c.diskSpaceAvailable(size) {
return errDiskFull return oi, errDiskFull
} }
if err := os.MkdirAll(cachePath, 0777); err != nil { if err := os.MkdirAll(cachePath, 0777); err != nil {
return err return oi, err
} }
var metadata = cloneMSS(opts.UserDefined) var metadata = cloneMSS(opts.UserDefined)
var reader = data var reader = data
@ -705,25 +732,39 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read
if globalCacheKMS != nil { if globalCacheKMS != nil {
reader, err = newCacheEncryptReader(data, bucket, object, metadata) reader, err = newCacheEncryptReader(data, bucket, object, metadata)
if err != nil { if err != nil {
return err return oi, err
} }
actualSize, _ = sio.EncryptedSize(uint64(size)) actualSize, _ = sio.EncryptedSize(uint64(size))
} }
n, err := c.bitrotWriteToCache(cachePath, cacheDataFile, reader, actualSize) n, md5sum, err := c.bitrotWriteToCache(cachePath, cacheDataFile, reader, actualSize)
if IsErr(err, baseErrs...) { if IsErr(err, baseErrs...) {
// take the cache drive offline // take the cache drive offline
c.setOffline() c.setOffline()
} }
if err != nil { if err != nil {
removeAll(cachePath) removeAll(cachePath)
return err return oi, err
} }
if actualSize != uint64(n) { if actualSize != uint64(n) {
removeAll(cachePath) removeAll(cachePath)
return IncompleteBody{Bucket: bucket, Object: object} return oi, IncompleteBody{Bucket: bucket, Object: object}
} }
return c.saveMetadata(ctx, bucket, object, metadata, n, nil, "", incHitsOnly) if c.commitWriteback {
metadata["content-md5"] = md5sum
if md5bytes, err := base64.StdEncoding.DecodeString(md5sum); err == nil {
metadata["etag"] = hex.EncodeToString(md5bytes)
}
metadata[writeBackStatusHeader] = CommitPending.String()
}
return ObjectInfo{
Bucket: bucket,
Name: object,
ETag: metadata["etag"],
Size: n,
UserDefined: metadata,
},
c.saveMetadata(ctx, bucket, object, metadata, n, nil, "", incHitsOnly)
} }
// Caches the range to disk // Caches the range to disk
@ -754,7 +795,7 @@ func (c *diskCache) putRange(ctx context.Context, bucket, object string, data io
} }
cacheFile := MustGetUUID() cacheFile := MustGetUUID()
n, err := c.bitrotWriteToCache(cachePath, cacheFile, reader, actualSize) n, _, err := c.bitrotWriteToCache(cachePath, cacheFile, reader, actualSize)
if IsErr(err, baseErrs...) { if IsErr(err, baseErrs...) {
// take the cache drive offline // take the cache drive offline
c.setOffline() c.setOffline()
@ -954,3 +995,36 @@ func (c *diskCache) Exists(ctx context.Context, bucket, object string) bool {
} }
return true return true
} }
// queues writeback upload failures on server startup
func (c *diskCache) scanCacheWritebackFailures(ctx context.Context) {
defer close(c.retryWritebackCh)
filterFn := func(name string, typ os.FileMode) error {
if name == minioMetaBucket {
// Proceed to next file.
return nil
}
cacheDir := pathJoin(c.dir, name)
meta, _, _, err := c.statCachedMeta(ctx, cacheDir)
if err != nil {
return nil
}
objInfo := meta.ToObjectInfo("", "")
status, ok := objInfo.UserDefined[writeBackStatusHeader]
if !ok || status == CommitComplete.String() {
return nil
}
select {
case c.retryWritebackCh <- objInfo:
default:
}
return nil
}
if err := readDirFilterFn(c.dir, filterFn); err != nil {
logger.LogIf(ctx, err)
return
}
}

@ -22,24 +22,47 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/minio/minio/cmd/config/cache" "github.com/minio/minio/cmd/config/cache"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
objectlock "github.com/minio/minio/pkg/bucket/object/lock" objectlock "github.com/minio/minio/pkg/bucket/object/lock"
"github.com/minio/minio/pkg/color" "github.com/minio/minio/pkg/color"
"github.com/minio/minio/pkg/hash"
"github.com/minio/minio/pkg/sync/errgroup" "github.com/minio/minio/pkg/sync/errgroup"
"github.com/minio/minio/pkg/wildcard" "github.com/minio/minio/pkg/wildcard"
) )
const ( const (
cacheBlkSize = int64(1 * 1024 * 1024) cacheBlkSize = 1 << 20
cacheGCInterval = time.Minute * 30 cacheGCInterval = time.Minute * 30
writeBackStatusHeader = ReservedMetadataPrefixLower + "write-back-status"
writeBackRetryHeader = ReservedMetadataPrefixLower + "write-back-retry"
) )
type cacheCommitStatus string
const (
// CommitPending - cache writeback with backend is pending.
CommitPending cacheCommitStatus = "pending"
// CommitComplete - cache writeback completed ok.
CommitComplete cacheCommitStatus = "complete"
// CommitFailed - cache writeback needs a retry.
CommitFailed cacheCommitStatus = "failed"
)
// String returns string representation of status
func (s cacheCommitStatus) String() string {
return string(s)
}
// CacheStorageInfo - represents total, free capacity of // CacheStorageInfo - represents total, free capacity of
// underlying cache storage. // underlying cache storage.
type CacheStorageInfo struct { type CacheStorageInfo struct {
@ -69,11 +92,14 @@ type cacheObjects struct {
exclude []string exclude []string
// number of accesses after which to cache an object // number of accesses after which to cache an object
after int after int
// commit objects in async manner
commitWriteback bool
// if true migration is in progress from v1 to v2 // if true migration is in progress from v1 to v2
migrating bool migrating bool
// mutex to protect migration bool // mutex to protect migration bool
migMutex sync.Mutex migMutex sync.Mutex
// retry queue for writeback cache mode to reattempt upload to backend
wbRetryCh chan ObjectInfo
// Cache stats // Cache stats
cacheStats *CacheStats cacheStats *CacheStats
@ -333,7 +359,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
teeReader := io.TeeReader(bkReader, pipeWriter) teeReader := io.TeeReader(bkReader, pipeWriter)
userDefined := getMetadata(bkReader.ObjInfo) userDefined := getMetadata(bkReader.ObjInfo)
go func() { go func() {
putErr := dcache.Put(ctx, bucket, object, _, putErr := dcache.Put(ctx, bucket, object,
io.LimitReader(pipeReader, bkReader.ObjInfo.Size), io.LimitReader(pipeReader, bkReader.ObjInfo.Size),
bkReader.ObjInfo.Size, rs, ObjectOptions{ bkReader.ObjInfo.Size, rs, ObjectOptions{
UserDefined: userDefined, UserDefined: userDefined,
@ -629,7 +655,14 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r *
dcache.Delete(ctx, bucket, object) dcache.Delete(ctx, bucket, object)
return putObjectFn(ctx, bucket, object, r, opts) return putObjectFn(ctx, bucket, object, r, opts)
} }
if c.commitWriteback {
oi, err := dcache.Put(ctx, bucket, object, r, r.Size(), nil, opts, false)
if err != nil {
return ObjectInfo{}, err
}
go c.uploadObject(GlobalContext, oi)
return oi, nil
}
objInfo, err = putObjectFn(ctx, bucket, object, r, opts) objInfo, err = putObjectFn(ctx, bucket, object, r, opts)
if err == nil { if err == nil {
@ -647,9 +680,68 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r *
} }
}() }()
} }
return objInfo, err return objInfo, err
}
// upload cached object to backend in async commit mode.
func (c *cacheObjects) uploadObject(ctx context.Context, oi ObjectInfo) {
dcache, err := c.getCacheToLoc(ctx, oi.Bucket, oi.Name)
if err != nil {
// disk cache could not be located.
logger.LogIf(ctx, fmt.Errorf("Could not upload %s/%s to backend: %w", oi.Bucket, oi.Name, err))
return
}
cReader, _, bErr := dcache.Get(ctx, oi.Bucket, oi.Name, nil, http.Header{}, ObjectOptions{})
if bErr != nil {
return
}
defer cReader.Close()
if cReader.ObjInfo.ETag != oi.ETag {
return
}
st := cacheCommitStatus(oi.UserDefined[writeBackStatusHeader])
if st == CommitComplete || st.String() == "" {
return
}
hashReader, err := hash.NewReader(cReader, oi.Size, "", "", oi.Size, globalCLIContext.StrictS3Compat)
if err != nil {
return
}
var opts ObjectOptions
opts.UserDefined = make(map[string]string)
opts.UserDefined[xhttp.ContentMD5] = oi.UserDefined["content-md5"]
objInfo, err := c.InnerPutObjectFn(ctx, oi.Bucket, oi.Name, NewPutObjReader(hashReader, nil, nil), opts)
wbCommitStatus := CommitComplete
if err != nil {
wbCommitStatus = CommitFailed
}
meta := cloneMSS(cReader.ObjInfo.UserDefined)
retryCnt := 0
if wbCommitStatus == CommitFailed {
retryCnt, _ = strconv.Atoi(meta[writeBackRetryHeader])
retryCnt++
meta[writeBackRetryHeader] = strconv.Itoa(retryCnt)
} else {
delete(meta, writeBackRetryHeader)
}
meta[writeBackStatusHeader] = wbCommitStatus.String()
meta["etag"] = oi.ETag
dcache.SaveMetadata(ctx, oi.Bucket, oi.Name, meta, objInfo.Size, nil, "", false)
if retryCnt > 0 {
// slow down retries
time.Sleep(time.Second * time.Duration(retryCnt%10+1))
c.queueWritebackRetry(oi)
}
}
func (c *cacheObjects) queueWritebackRetry(oi ObjectInfo) {
select {
case c.wbRetryCh <- oi:
c.uploadObject(GlobalContext, oi)
default:
}
} }
// Returns cacheObjects for use by Server. // Returns cacheObjects for use by Server.
@ -660,12 +752,13 @@ func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjec
return nil, err return nil, err
} }
c := &cacheObjects{ c := &cacheObjects{
cache: cache, cache: cache,
exclude: config.Exclude, exclude: config.Exclude,
after: config.After, after: config.After,
migrating: migrateSw, migrating: migrateSw,
migMutex: sync.Mutex{}, migMutex: sync.Mutex{},
cacheStats: newCacheStats(), commitWriteback: config.CommitWriteback,
cacheStats: newCacheStats(),
InnerGetObjectInfoFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) { InnerGetObjectInfoFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
return newObjectLayerFn().GetObjectInfo(ctx, bucket, object, opts) return newObjectLayerFn().GetObjectInfo(ctx, bucket, object, opts)
}, },
@ -699,6 +792,15 @@ func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjec
go c.migrateCacheFromV1toV2(ctx) go c.migrateCacheFromV1toV2(ctx)
} }
go c.gc(ctx) go c.gc(ctx)
if c.commitWriteback {
c.wbRetryCh = make(chan ObjectInfo, 10000)
go func() {
<-GlobalContext.Done()
close(c.wbRetryCh)
}()
go c.queuePendingWriteback(ctx)
}
return c, nil return c, nil
} }
@ -724,3 +826,25 @@ func (c *cacheObjects) gc(ctx context.Context) {
} }
} }
} }
// queues any pending or failed async commits when server restarts
func (c *cacheObjects) queuePendingWriteback(ctx context.Context) {
for _, dcache := range c.cache {
if dcache != nil {
for {
select {
case <-ctx.Done():
return
case oi, ok := <-dcache.retryWritebackCh:
if !ok {
goto next
}
c.queueWritebackRetry(oi)
default:
time.Sleep(time.Second * 1)
}
}
next:
}
}
}

@ -365,7 +365,7 @@ func migrateCacheData(ctx context.Context, c *diskCache, bucket, object, oldfile
} }
actualSize, _ = sio.EncryptedSize(uint64(st.Size())) actualSize, _ = sio.EncryptedSize(uint64(st.Size()))
} }
_, err = c.bitrotWriteToCache(destDir, cacheDataFile, reader, actualSize) _, _, err = c.bitrotWriteToCache(destDir, cacheDataFile, reader, actualSize)
return err return err
} }

Loading…
Cancel
Save