From b363709c112c9065e976eb797c1e30f56a97ba63 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 8 Dec 2016 20:35:07 -0800 Subject: [PATCH] caching: Optimize memory allocations. (#3405) This change brings in changes at multiple places - Reuse buffers at almost all locations ranging from rpc, fs, xl, checksum etc. - Change caching behavior to disable itself under low memory conditions i.e < 8GB of RAM. - Only objects cached are of size 1/10th the size of the cache for example if 4GB is the cache size the maximum object size which will be cached is going to be 400MB. This change is an optimization to cache more objects rather than few larger objects. - If object cache is enabled default GC percent has been reduced to 20% in lieu with newly found behavior of GC. If the cache utilization reaches 75% of the maximum value GC percent is reduced to 10% to make GC more aggressive. - Do not use *bytes.Buffer* due to its growth requirements. For every allocation *bytes.Buffer* allocates an additional buffer for its internal purposes. This is undesirable for us, so implemented a new cappedWriter which is capped to a desired size, beyond this all writes rejected. Possible fix for #3403. --- cmd/erasure-createfile.go | 2 +- cmd/erasure-utils.go | 17 +++- cmd/fs-v1-background-append.go | 7 +- cmd/globals.go | 6 +- cmd/prepare-storage.go | 6 +- cmd/server-rlimit-nix.go | 10 +- cmd/server-rlimit-win.go | 13 ++- cmd/storage-rpc-client.go | 12 ++- cmd/storage-rpc-server-datatypes.go | 4 +- cmd/storage-rpc-server.go | 14 +-- cmd/test-utils_test.go | 3 + cmd/xl-v1-multipart.go | 18 ++-- cmd/xl-v1-object.go | 6 +- cmd/xl-v1.go | 35 ++++--- pkg/objcache/capped-writer.go | 50 ++++++++++ pkg/objcache/objcache.go | 138 +++++++++++++++++++--------- pkg/objcache/objcache_test.go | 34 +++++-- 17 files changed, 271 insertions(+), 104 deletions(-) create mode 100644 pkg/objcache/capped-writer.go diff --git a/cmd/erasure-createfile.go b/cmd/erasure-createfile.go index 62bd21715..18797ba5f 100644 --- a/cmd/erasure-createfile.go +++ b/cmd/erasure-createfile.go @@ -29,7 +29,7 @@ import ( // all the disks, writes also calculate individual block's checksum // for future bit-rot protection. func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader, blockSize int64, dataBlocks int, parityBlocks int, algo string, writeQuorum int) (bytesWritten int64, checkSums []string, err error) { - // Allocated blockSized buffer for reading. + // Allocated blockSized buffer for reading from incoming stream. buf := make([]byte, blockSize) hashWriters := newHashWriters(len(disks), algo) diff --git a/cmd/erasure-utils.go b/cmd/erasure-utils.go index 2f05f027a..0b8c6f47a 100644 --- a/cmd/erasure-utils.go +++ b/cmd/erasure-utils.go @@ -21,6 +21,7 @@ import ( "errors" "hash" "io" + "sync" "github.com/klauspost/reedsolomon" "github.com/minio/blake2b-simd" @@ -47,13 +48,23 @@ func newHash(algo string) hash.Hash { } } +// Hash buffer pool is a pool of reusable +// buffers used while checksumming a stream. +var hashBufferPool = sync.Pool{ + New: func() interface{} { + b := make([]byte, readSizeV1) + return &b + }, +} + // hashSum calculates the hash of the entire path and returns. func hashSum(disk StorageAPI, volume, path string, writer hash.Hash) ([]byte, error) { - // Allocate staging buffer of 128KiB for copyBuffer. - buf := make([]byte, readSizeV1) + // Fetch staging a new staging buffer from the pool. + bufp := hashBufferPool.Get().(*[]byte) + defer hashBufferPool.Put(bufp) // Copy entire buffer to writer. - if err := copyBuffer(writer, disk, volume, path, buf); err != nil { + if err := copyBuffer(writer, disk, volume, path, *bufp); err != nil { return nil, err } diff --git a/cmd/fs-v1-background-append.go b/cmd/fs-v1-background-append.go index 06f8a2702..83634d680 100644 --- a/cmd/fs-v1-background-append.go +++ b/cmd/fs-v1-background-append.go @@ -129,6 +129,8 @@ func (b *backgroundAppend) abort(uploadID string) { func (b *backgroundAppend) appendParts(disk StorageAPI, bucket, object, uploadID string, info bgAppendPartsInfo) { // Holds the list of parts that is already appended to the "append" file. appendMeta := fsMetaV1{} + // Allocate staging read buffer. + buf := make([]byte, readSizeV1) for { select { case input := <-info.inputCh: @@ -151,7 +153,7 @@ func (b *backgroundAppend) appendParts(disk StorageAPI, bucket, object, uploadID } break } - if err := appendPart(disk, bucket, object, uploadID, part); err != nil { + if err := appendPart(disk, bucket, object, uploadID, part, buf); err != nil { disk.DeleteFile(minioMetaTmpBucket, uploadID) appendMeta.Parts = nil input.errCh <- err @@ -183,12 +185,11 @@ func (b *backgroundAppend) appendParts(disk StorageAPI, bucket, object, uploadID // Appends the "part" to the append-file inside "tmp/" that finally gets moved to the actual location // upon complete-multipart-upload. -func appendPart(disk StorageAPI, bucket, object, uploadID string, part objectPartInfo) error { +func appendPart(disk StorageAPI, bucket, object, uploadID string, part objectPartInfo, buf []byte) error { partPath := pathJoin(bucket, object, uploadID, part.Name) offset := int64(0) totalLeft := part.Size - buf := make([]byte, readSizeV1) for totalLeft > 0 { curLeft := int64(readSizeV1) if totalLeft < readSizeV1 { diff --git a/cmd/globals.go b/cmd/globals.go index 2eccb62f3..0223766b6 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -61,8 +61,10 @@ var ( globalIsDistXL = false // "Is Distributed?" flag. - // Maximum cache size. - globalMaxCacheSize = uint64(maxCacheSize) + // Maximum cache size. Defaults to disabled. + // Caching is enabled only for RAM size > 8GiB. + globalMaxCacheSize = uint64(0) + // Cache expiry. globalCacheExpiry = objcache.DefaultExpiry // Minio local server address (in `host:port` format) diff --git a/cmd/prepare-storage.go b/cmd/prepare-storage.go index ab6192e09..a15672d55 100644 --- a/cmd/prepare-storage.go +++ b/cmd/prepare-storage.go @@ -253,11 +253,9 @@ func retryFormattingDisks(firstDisk bool, endpoints []*url.URL, storageDisks []S // Print configuration errors. printConfigErrMsg(storageDisks, sErrs, printOnceFn()) case WaitForAll: - console.Printf("Initializing data volume for first time. Waiting for other servers to come online (elapsed %s)\n", - getElapsedTime()) + console.Printf("Initializing data volume for first time. Waiting for other servers to come online (elapsed %s)\n", getElapsedTime()) case WaitForFormatting: - console.Println("Initializing data volume for first time. Waiting for first server to come online (elapsed %s)\n", - getElapsedTime()) + console.Printf("Initializing data volume for first time. Waiting for first server to come online (elapsed %s)\n", getElapsedTime()) } continue } // else We have FS backend now. Check fs format as well now. diff --git a/cmd/server-rlimit-nix.go b/cmd/server-rlimit-nix.go index ff8bf1c65..9c0b73f00 100644 --- a/cmd/server-rlimit-nix.go +++ b/cmd/server-rlimit-nix.go @@ -66,18 +66,18 @@ func setMaxMemory() error { // Validate if rlimit memory is set to lower // than max cache size. Then we should use such value. if uint64(rLimit.Cur) < globalMaxCacheSize { - globalMaxCacheSize = (80 / 100) * uint64(rLimit.Cur) + globalMaxCacheSize = uint64(float64(50*rLimit.Cur) / 100) } // Make sure globalMaxCacheSize is less than RAM size. stats, err := sys.GetStats() if err != nil && err != sys.ErrNotImplemented { - // sys.GetStats() is implemented only on linux. Ignore errors - // from other OSes. return err } - if err == nil && stats.TotalRAM < globalMaxCacheSize { - globalMaxCacheSize = uint64(float64(80*stats.TotalRAM) / 100) + // If TotalRAM is >= minRAMSize we proceed to enable cache. + // cache is always 50% of the totalRAM. + if err == nil && stats.TotalRAM >= minRAMSize { + globalMaxCacheSize = uint64(float64(50*stats.TotalRAM) / 100) } return nil } diff --git a/cmd/server-rlimit-win.go b/cmd/server-rlimit-win.go index 8dd2a6c70..954e0f734 100644 --- a/cmd/server-rlimit-win.go +++ b/cmd/server-rlimit-win.go @@ -18,6 +18,8 @@ package cmd +import "github.com/minio/minio/pkg/sys" + func setMaxOpenFiles() error { // Golang uses Win32 file API (CreateFile, WriteFile, ReadFile, // CloseHandle, etc.), then you don't have a limit on open files @@ -26,6 +28,15 @@ func setMaxOpenFiles() error { } func setMaxMemory() error { - // TODO: explore if Win32 API's provide anything special here. + // Make sure globalMaxCacheSize is less than RAM size. + stats, err := sys.GetStats() + if err != nil && err != sys.ErrNotImplemented { + return err + } + // If TotalRAM is <= minRAMSize we proceed to enable cache. + // cache is always 50% of the totalRAM. + if err == nil && stats.TotalRAM >= minRAMSize { + globalMaxCacheSize = uint64(float64(50*stats.TotalRAM) / 100) + } return nil } diff --git a/cmd/storage-rpc-client.go b/cmd/storage-rpc-client.go index e90bc903d..4ed639c2f 100644 --- a/cmd/storage-rpc-client.go +++ b/cmd/storage-rpc-client.go @@ -17,6 +17,7 @@ package cmd import ( + "bytes" "io" "net" "net/rpc" @@ -366,6 +367,13 @@ func (n *networkStorage) ReadFile(volume string, path string, offset int64, buff } }() + defer func() { + if r := recover(); r != nil { + // Recover any panic from allocation, and return error. + err = bytes.ErrTooLarge + } + }() // Do not crash the server. + // Take remote disk offline if the total network errors. // are more than maximum allowable IO error limit. if n.networkIOErrCount > maxAllowedNetworkIOError { @@ -377,10 +385,12 @@ func (n *networkStorage) ReadFile(volume string, path string, offset int64, buff Vol: volume, Path: path, Offset: offset, - Size: len(buffer), + Buffer: buffer, }, &result) + // Copy results to buffer. copy(buffer, result) + // Return length of result, err if any. return int64(len(result)), toStorageErr(err) } diff --git a/cmd/storage-rpc-server-datatypes.go b/cmd/storage-rpc-server-datatypes.go index f0b75d153..8f474feff 100644 --- a/cmd/storage-rpc-server-datatypes.go +++ b/cmd/storage-rpc-server-datatypes.go @@ -57,8 +57,8 @@ type ReadFileArgs struct { // Starting offset to start reading into Buffer. Offset int64 - // Data size read from the path at offset. - Size int + // Data buffer read from the path at offset. + Buffer []byte } // PrepareFileArgs represents append file RPC arguments. diff --git a/cmd/storage-rpc-server.go b/cmd/storage-rpc-server.go index ce20ed5f5..11658a299 100644 --- a/cmd/storage-rpc-server.go +++ b/cmd/storage-rpc-server.go @@ -17,7 +17,6 @@ package cmd import ( - "bytes" "io" "net/rpc" "path" @@ -156,19 +155,12 @@ func (s *storageServer) ReadAllHandler(args *ReadFileArgs, reply *[]byte) error // ReadFileHandler - read file handler is rpc wrapper to read file. func (s *storageServer) ReadFileHandler(args *ReadFileArgs, reply *[]byte) (err error) { - defer func() { - if r := recover(); r != nil { - // Recover any panic and return ErrCacheFull. - err = bytes.ErrTooLarge - } - }() // Do not crash the server. if !isRPCTokenValid(args.Token) { return errInvalidToken } - // Allocate the requested buffer from the client. - *reply = make([]byte, args.Size) + var n int64 - n, err = s.storage.ReadFile(args.Vol, args.Path, args.Offset, *reply) + n, err = s.storage.ReadFile(args.Vol, args.Path, args.Offset, args.Buffer) // Sending an error over the rpc layer, would cause unmarshalling to fail. In situations // when we have short read i.e `io.ErrUnexpectedEOF` treat it as good condition and copy // the buffer properly. @@ -176,7 +168,7 @@ func (s *storageServer) ReadFileHandler(args *ReadFileArgs, reply *[]byte) (err // Reset to nil as good condition. err = nil } - *reply = (*reply)[0:n] + *reply = args.Buffer[0:n] return err } diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index 962a10a1f..01746bd94 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -62,6 +62,9 @@ func init() { // Disable printing console messages during tests. color.Output = ioutil.Discard + + // Enable caching. + setMaxMemory() } func prepareFS() (ObjectLayer, string, error) { diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index e22c60a33..496e2672f 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -708,17 +708,21 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload destLock := nsMutex.NewNSLock(bucket, object) destLock.Lock() defer func() { - // A new complete multipart upload invalidates any - // previously cached object in memory. - xl.objCache.Delete(path.Join(bucket, object)) + if xl.objCacheEnabled { + // A new complete multipart upload invalidates any + // previously cached object in memory. + xl.objCache.Delete(path.Join(bucket, object)) + } // This lock also protects the cache namespace. destLock.Unlock() - // Prefetch the object from disk by triggering a fake GetObject call - // Unlike a regular single PutObject, multipart PutObject is comes in - // stages and it is harder to cache. - go xl.GetObject(bucket, object, 0, objectSize, ioutil.Discard) + if xl.objCacheEnabled { + // Prefetch the object from disk by triggering a fake GetObject call + // Unlike a regular single PutObject, multipart PutObject is comes in + // stages and it is harder to cache. + go xl.GetObject(bucket, object, 0, objectSize, ioutil.Discard) + } }() // Rename if an object already exists to temporary location. diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 71564d839..a8ed0f7fe 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -621,8 +621,10 @@ func (xl xlObjects) DeleteObject(bucket, object string) (err error) { return toObjectErr(err, bucket, object) } - // Delete from the cache. - xl.objCache.Delete(pathJoin(bucket, object)) + if xl.objCacheEnabled { + // Delete from the cache. + xl.objCache.Delete(pathJoin(bucket, object)) + } // Success. return nil diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index 490f9f732..e6affaa23 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -19,6 +19,7 @@ package cmd import ( "fmt" "os" + "runtime/debug" "sort" "strings" "sync" @@ -42,8 +43,9 @@ const ( // Uploads metadata file carries per multipart object metadata. uploadsJSONFile = "uploads.json" - // 8GiB cache by default. - maxCacheSize = 8 * humanize.GiByte + // Represents the minimum required RAM size before + // we enable caching. + minRAMSize = 8 * humanize.GiByte // Maximum erasure blocks. maxErasureBlocks = 16 @@ -92,9 +94,6 @@ func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) { // Calculate data and parity blocks. dataBlocks, parityBlocks := len(newStorageDisks)/2, len(newStorageDisks)/2 - // Initialize object cache. - objCache := objcache.New(globalMaxCacheSize, globalCacheExpiry) - // Initialize list pool. listPool := newTreeWalkPool(globalLookupTimeout) @@ -103,13 +102,25 @@ func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) { // Initialize xl objects. xl := &xlObjects{ - mutex: &sync.Mutex{}, - storageDisks: newStorageDisks, - dataBlocks: dataBlocks, - parityBlocks: parityBlocks, - listPool: listPool, - objCache: objCache, - objCacheEnabled: !objCacheDisabled, + mutex: &sync.Mutex{}, + storageDisks: newStorageDisks, + dataBlocks: dataBlocks, + parityBlocks: parityBlocks, + listPool: listPool, + } + + // Object cache is enabled when _MINIO_CACHE env is missing. + // and cache size is > 0. + xl.objCacheEnabled = !objCacheDisabled && globalMaxCacheSize > 0 + + // Check if object cache is enabled. + if xl.objCacheEnabled { + // Initialize object cache. + objCache := objcache.New(globalMaxCacheSize, globalCacheExpiry) + objCache.OnEviction = func(key string) { + debug.FreeOSMemory() + } + xl.objCache = objCache } // Initialize meta volume, if volume already exists ignores it. diff --git a/pkg/objcache/capped-writer.go b/pkg/objcache/capped-writer.go new file mode 100644 index 000000000..485896e0b --- /dev/null +++ b/pkg/objcache/capped-writer.go @@ -0,0 +1,50 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package objcache implements in memory caching methods. +package objcache + +// Used for adding entry to the object cache. +// Implements io.WriteCloser +type cappedWriter struct { + offset int64 + cap int64 + buffer []byte + onClose func() error +} + +// Write implements a limited writer, returns error. +// if the writes go beyond allocated size. +func (c *cappedWriter) Write(b []byte) (n int, err error) { + if c.offset+int64(len(b)) > c.cap { + return 0, ErrExcessData + } + n = copy(c.buffer[int(c.offset):int(c.offset)+len(b)], b) + c.offset = c.offset + int64(n) + return n, nil +} + +// Reset relinquishes the allocated underlying buffer. +func (c *cappedWriter) Reset() { + c.buffer = nil +} + +// On close, onClose() is called which checks if all object contents +// have been written so that it can save the buffer to the cache. +func (c cappedWriter) Close() (err error) { + return c.onClose() +} diff --git a/pkg/objcache/objcache.go b/pkg/objcache/objcache.go index 11cf3cc88..3a63a68de 100644 --- a/pkg/objcache/objcache.go +++ b/pkg/objcache/objcache.go @@ -22,6 +22,7 @@ import ( "bytes" "errors" "io" + "runtime/debug" "sync" "time" ) @@ -32,6 +33,13 @@ var NoExpiry = time.Duration(0) // DefaultExpiry represents default time duration value when individual entries will be expired. var DefaultExpiry = time.Duration(72 * time.Hour) // 72hrs. +// DefaultBufferRatio represents default ratio used to calculate the +// individual cache entry buffer size. +var DefaultBufferRatio = uint64(10) + +// DefaultGCPercent represents default garbage collection target percentage. +var DefaultGCPercent = 20 + // buffer represents the in memory cache of a single entry. // buffer carries value of the data and last accessed time. type buffer struct { @@ -46,9 +54,16 @@ type Cache struct { // read/write requests for cache mutex sync.Mutex + // Once is used for resetting GC once after + // peak cache usage. + onceGC sync.Once + // maxSize is a total size for overall cache maxSize uint64 + // maxCacheEntrySize is a total size per key buffer. + maxCacheEntrySize uint64 + // currentSize is a current size in memory currentSize uint64 @@ -68,27 +83,58 @@ type Cache struct { stopGC chan struct{} } -// New - Return a new cache with a given default expiry duration. -// If the expiry duration is less than one (or NoExpiry), -// the items in the cache never expire (by default), and must be deleted -// manually. +// New - Return a new cache with a given default expiry +// duration. If the expiry duration is less than one +// (or NoExpiry), the items in the cache never expire +// (by default), and must be deleted manually. func New(maxSize uint64, expiry time.Duration) *Cache { if maxSize == 0 { panic("objcache: setting maximum cache size to zero is forbidden.") } - C := &Cache{ - maxSize: maxSize, - entries: make(map[string]*buffer), - expiry: expiry, + + // A garbage collection is triggered when the ratio + // of freshly allocated data to live data remaining + // after the previous collection reaches this percentage. + // + // - https://golang.org/pkg/runtime/debug/#SetGCPercent + // + // This means that by default GC is triggered after + // we've allocated an extra amount of memory proportional + // to the amount already in use. + // + // If gcpercent=100 and we're using 4M, we'll gc again + // when we get to 8M. + // + // Set this value to 20% if caching is enabled. + debug.SetGCPercent(DefaultGCPercent) + + // Max cache entry size - indicates the + // maximum buffer per key that can be held in + // memory. Currently this value is 1/10th + // the size of requested cache size. + maxCacheEntrySize := func() uint64 { + i := maxSize / DefaultBufferRatio + if i == 0 { + i = maxSize + } + return i + }() + c := &Cache{ + onceGC: sync.Once{}, + maxSize: maxSize, + maxCacheEntrySize: maxCacheEntrySize, + entries: make(map[string]*buffer), + expiry: expiry, } // We have expiry start the janitor routine. if expiry > 0 { - C.stopGC = make(chan struct{}) + // Initialize a new stop GC channel. + c.stopGC = make(chan struct{}) // Start garbage collection routine to expire objects. - C.startGC() + c.StartGC() } - return C + return c } // ErrKeyNotFoundInCache - key not found in cache. @@ -100,18 +146,6 @@ var ErrCacheFull = errors.New("Not enough space in cache") // ErrExcessData - excess data was attempted to be written on cache. var ErrExcessData = errors.New("Attempted excess write on cache") -// Used for adding entry to the object cache. Implements io.WriteCloser -type cacheBuffer struct { - *bytes.Buffer // Implements io.Writer - onClose func() error -} - -// On close, onClose() is called which checks if all object contents -// have been written so that it can save the buffer to the cache. -func (c cacheBuffer) Close() (err error) { - return c.onClose() -} - // Create - validates if object size fits with in cache size limit and returns a io.WriteCloser // to which object contents can be written and finally Close()'d. During Close() we // checks if the amount of data written is equal to the size of the object, in which @@ -126,29 +160,46 @@ func (c *Cache) Create(key string, size int64) (w io.WriteCloser, err error) { }() // Do not crash the server. valueLen := uint64(size) - // Check if the size of the object is not bigger than the capacity of the cache. - if c.maxSize > 0 && valueLen > c.maxSize { + // Check if the size of the object is > 1/10th the size + // of the cache, if yes then we ignore it. + if valueLen > c.maxCacheEntrySize { + return nil, ErrCacheFull + } + + // Check if the incoming size is going to exceed + // the effective cache size, if yes return error + // instead. + c.mutex.Lock() + if c.currentSize+valueLen > c.maxSize { + c.mutex.Unlock() return nil, ErrCacheFull } + // Change GC percent if the current cache usage + // is already 75% of the maximum allowed usage. + if c.currentSize > (75 * c.maxSize / 100) { + c.onceGC.Do(func() { debug.SetGCPercent(DefaultGCPercent - 10) }) + } + c.mutex.Unlock() - // Will hold the object contents. - buf := bytes.NewBuffer(make([]byte, 0, size)) + cbuf := &cappedWriter{ + offset: 0, + cap: size, + buffer: make([]byte, size), + } // Function called on close which saves the object contents // to the object cache. onClose := func() error { c.mutex.Lock() defer c.mutex.Unlock() - if size != int64(buf.Len()) { + if size != cbuf.offset { + cbuf.Reset() // Reset resets the buffer to be empty. // Full object not available hence do not save buf to object cache. return io.ErrShortBuffer } - if c.maxSize > 0 && c.currentSize+valueLen > c.maxSize { - return ErrExcessData - } // Full object available in buf, save it to cache. c.entries[key] = &buffer{ - value: buf.Bytes(), + value: cbuf.buffer, lastAccessed: time.Now().UTC(), // Save last accessed time. } // Account for the memory allocated above. @@ -156,12 +207,10 @@ func (c *Cache) Create(key string, size int64) (w io.WriteCloser, err error) { return nil } - // Object contents that is written - cacheBuffer.Write(data) + // Object contents that is written - cappedWriter.Write(data) // will be accumulated in buf which implements io.Writer. - return cacheBuffer{ - buf, - onClose, - }, nil + cbuf.onClose = onClose + return cbuf, nil } // Open - open the in-memory file, returns an in memory read seeker. @@ -215,17 +264,17 @@ func (c *Cache) gc() { // StopGC sends a message to the expiry routine to stop // expiring cached entries. NOTE: once this is called, cached -// entries will not be expired if the consumer has called this. +// entries will not be expired, be careful if you are using this. func (c *Cache) StopGC() { if c.stopGC != nil { c.stopGC <- struct{}{} } } -// startGC starts running a routine ticking at expiry interval, on each interval -// this routine does a sweep across the cache entries and garbage collects all the -// expired entries. -func (c *Cache) startGC() { +// StartGC starts running a routine ticking at expiry interval, +// on each interval this routine does a sweep across the cache +// entries and garbage collects all the expired entries. +func (c *Cache) StartGC() { go func() { for { select { @@ -242,9 +291,10 @@ func (c *Cache) startGC() { // Deletes a requested entry from the cache. func (c *Cache) delete(key string) { - if buf, ok := c.entries[key]; ok { + if _, ok := c.entries[key]; ok { + deletedSize := uint64(len(c.entries[key].value)) delete(c.entries, key) - c.currentSize -= uint64(len(buf.value)) + c.currentSize -= deletedSize c.totalEvicted++ } } diff --git a/pkg/objcache/objcache_test.go b/pkg/objcache/objcache_test.go index 31d96eab7..f1eed32d3 100644 --- a/pkg/objcache/objcache_test.go +++ b/pkg/objcache/objcache_test.go @@ -117,6 +117,12 @@ func TestObjCache(t *testing.T) { cacheSize: 5, closeErr: ErrExcessData, }, + // Validate error excess data during write. + { + expiry: NoExpiry, + cacheSize: 2048, + err: ErrExcessData, + }, } // Test 1 validating Open failure. @@ -232,14 +238,30 @@ func TestObjCache(t *testing.T) { if err = w.Close(); err != nil { t.Errorf("Test case 7 expected to pass, failed instead %s", err) } - w, err = cache.Create("test2", 1) - if err != nil { + _, err = cache.Create("test2", 1) + if err != ErrCacheFull { t.Errorf("Test case 7 expected to pass, failed instead %s", err) } - // Write '1' byte. - w.Write([]byte("H")) - if err = w.Close(); err != testCase.closeErr { - t.Errorf("Test case 7 expected to fail, passed instead") + + // Test 8 validates rejecting Writes which write excess data. + testCase = testCases[7] + cache = New(testCase.cacheSize, testCase.expiry) + w, err = cache.Create("test1", 5) + if err != nil { + t.Errorf("Test case 8 expected to pass, failed instead %s", err) + } + // Write '5' bytes. + n, err := w.Write([]byte("Hello")) + if err != nil { + t.Errorf("Test case 8 expected to pass, failed instead %s", err) + } + if n != 5 { + t.Errorf("Test case 8 expected 5 bytes written, instead found %d", n) + } + // Write '1' more byte, should return error. + n, err = w.Write([]byte("W")) + if n == 0 && err != testCase.err { + t.Errorf("Test case 8 expected to fail with ErrExcessData, but failed with %s instead", err) } }