diff --git a/pkg/storage/drivers/memory/lru.go b/pkg/storage/drivers/memory/lru.go index 07dc3c426..b69ed5dbd 100644 --- a/pkg/storage/drivers/memory/lru.go +++ b/pkg/storage/drivers/memory/lru.go @@ -36,6 +36,7 @@ import ( "container/list" "io" "strconv" + "time" "github.com/minio-io/minio/pkg/iodine" "github.com/minio-io/minio/pkg/storage/drivers" @@ -53,6 +54,10 @@ type Cache struct { // before an item is evicted. Zero means no limit MaxSize uint64 + // Expiration is the maximum duration of individual objects to exist + // in cache before its evicted. + Expiration time.Duration + // OnEvicted optionally specificies a callback function to be // executed when an entry is purged from the cache. OnEvicted func(a ...interface{}) @@ -63,22 +68,21 @@ type Cache struct { cache map[interface{}]*list.Element } -// A Key may be any value that is comparable. See http://golang.org/ref/spec#Comparison_operators -type Key interface{} - type entry struct { - key Key + key string + time time.Time value *bytes.Buffer } // NewCache creates a new Cache. // If maxEntries is zero, the cache has no limit and it's assumed // that eviction is done by the caller. -func NewCache(maxSize uint64) *Cache { +func NewCache(maxSize uint64, expiration time.Duration) *Cache { return &Cache{ - MaxSize: maxSize, - ll: list.New(), - cache: make(map[interface{}]*list.Element), + MaxSize: maxSize, + Expiration: expiration, + ll: list.New(), + cache: make(map[interface{}]*list.Element), } } @@ -91,7 +95,7 @@ func (c *Cache) Stats() CacheStats { } // Add adds a value to the cache. -func (c *Cache) Add(key Key, size int64) io.WriteCloser { +func (c *Cache) Add(key string, size int64) io.WriteCloser { r, w := io.Pipe() blockingWriter := NewBlockingWriteCloser(w) go func() { @@ -105,8 +109,10 @@ func (c *Cache) Add(key Key, size int64) io.WriteCloser { return } // If MaxSize is zero expecting infinite memory - if c.MaxSize != 0 && (c.totalSize+uint64(size)) > c.MaxSize { - c.RemoveOldest() + if c.MaxSize != 0 { + for (c.totalSize + uint64(size)) > c.MaxSize { + c.RemoveOldest() + } } value := new(bytes.Buffer) n, err := io.CopyN(value, r, size) @@ -116,7 +122,7 @@ func (c *Cache) Add(key Key, size int64) io.WriteCloser { blockingWriter.Release(err) return } - ele := c.ll.PushFront(&entry{key, value}) + ele := c.ll.PushFront(&entry{key, time.Now(), value}) c.cache[key] = ele c.totalSize += uint64(n) r.Close() @@ -126,19 +132,20 @@ func (c *Cache) Add(key Key, size int64) io.WriteCloser { } // Get looks up a key's value from the cache. -func (c *Cache) Get(key Key) (value *bytes.Buffer, ok bool) { +func (c *Cache) Get(key string) (value []byte, ok bool) { if c.cache == nil { return } if ele, hit := c.cache[key]; hit { c.ll.MoveToFront(ele) - return ele.Value.(*entry).value, true + ele.Value.(*entry).time = time.Now() + return ele.Value.(*entry).value.Bytes(), true } return } // Remove removes the provided key from the cache. -func (c *Cache) Remove(key Key) { +func (c *Cache) Remove(key string) { if c.cache == nil { return } @@ -158,16 +165,21 @@ func (c *Cache) RemoveOldest() { } } -// GetOldest returns the oldest key -func (c *Cache) GetOldest() (key Key, ok bool) { +// ExpireOldestAndWait expire old key which is expired and return wait times if any +func (c *Cache) ExpireOldestAndWait() time.Duration { if c.cache == nil { - return nil, false + return 0 } ele := c.ll.Back() if ele != nil { - return ele.Value.(*entry).key, true + switch { + case time.Now().Sub(ele.Value.(*entry).time) > c.Expiration: + c.removeElement(ele) + default: + return (c.Expiration - time.Now().Sub(ele.Value.(*entry).time)) + } } - return nil, false + return 0 } func (c *Cache) removeElement(e *list.Element) { diff --git a/pkg/storage/drivers/memory/memory.go b/pkg/storage/drivers/memory/memory.go index 873816b78..6c6e0b6f6 100644 --- a/pkg/storage/drivers/memory/memory.go +++ b/pkg/storage/drivers/memory/memory.go @@ -42,9 +42,6 @@ type memoryDriver struct { lock *sync.RWMutex objects *Cache lastAccessedObjects map[string]time.Time - maxSize uint64 - expiration time.Duration - shutdown bool } type storedBucket struct { @@ -65,11 +62,8 @@ func Start(maxSize uint64, expiration time.Duration) (chan<- string, <-chan erro memory = new(memoryDriver) memory.storedBuckets = make(map[string]storedBucket) memory.lastAccessedObjects = make(map[string]time.Time) - memory.objects = NewCache(maxSize) - memory.maxSize = maxSize + memory.objects = NewCache(maxSize, expiration) memory.lock = new(sync.RWMutex) - memory.expiration = expiration - memory.shutdown = false memory.objects.OnEvicted = memory.evictObject @@ -100,19 +94,15 @@ func (memory *memoryDriver) GetObject(w io.Writer, bucket string, object string) memory.lock.RUnlock() return 0, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) } - storedBucket := memory.storedBuckets[bucket] - // form objectKey objectKey := bucket + "/" + object - if _, ok := storedBucket.objectMetadata[objectKey]; ok { - if data, ok := memory.objects.Get(objectKey); ok { - memory.lock.RUnlock() - go memory.updateAccessTime(objectKey) - written, err := io.Copy(w, data) - return written, iodine.New(err, nil) - } + data, ok := memory.objects.Get(objectKey) + if !ok { + memory.lock.RUnlock() + return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, nil) } memory.lock.RUnlock() - return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, nil) + written, err := io.Copy(w, bytes.NewBuffer(data)) + return written, iodine.New(err, nil) } // GetPartialObject - GET object from memory buffer range @@ -477,34 +467,15 @@ func (memory *memoryDriver) evictObject(a ...interface{}) { func (memory *memoryDriver) expireLRUObjects() { for { - if memory.shutdown { - return - } var sleepDuration time.Duration memory.lock.Lock() switch { case memory.objects.Len() > 0: - if k, ok := memory.objects.GetOldest(); ok { - key := k.(string) - switch { - case time.Now().Sub(memory.lastAccessedObjects[key]) > memory.expiration: - memory.objects.RemoveOldest() - default: - sleepDuration = memory.expiration - time.Now().Sub(memory.lastAccessedObjects[key]) - } - } + sleepDuration = memory.objects.ExpireOldestAndWait() default: - sleepDuration = memory.expiration + sleepDuration = memory.objects.Expiration } memory.lock.Unlock() time.Sleep(sleepDuration) } } - -func (memory *memoryDriver) updateAccessTime(key string) { - memory.lock.Lock() - defer memory.lock.Unlock() - if _, ok := memory.lastAccessedObjects[key]; ok { - memory.lastAccessedObjects[key] = time.Now().UTC() - } -}