Modify LRU further to add object expiration

master
Harshavardhana 10 years ago
parent 670f997b07
commit c8f31d97a8
  1. 44
      pkg/storage/drivers/memory/lru.go
  2. 45
      pkg/storage/drivers/memory/memory.go

@ -36,6 +36,7 @@ import (
"container/list" "container/list"
"io" "io"
"strconv" "strconv"
"time"
"github.com/minio-io/minio/pkg/iodine" "github.com/minio-io/minio/pkg/iodine"
"github.com/minio-io/minio/pkg/storage/drivers" "github.com/minio-io/minio/pkg/storage/drivers"
@ -53,6 +54,10 @@ type Cache struct {
// before an item is evicted. Zero means no limit // before an item is evicted. Zero means no limit
MaxSize uint64 MaxSize uint64
// Expiration is the maximum duration of individual objects to exist
// in cache before its evicted.
Expiration time.Duration
// OnEvicted optionally specificies a callback function to be // OnEvicted optionally specificies a callback function to be
// executed when an entry is purged from the cache. // executed when an entry is purged from the cache.
OnEvicted func(a ...interface{}) OnEvicted func(a ...interface{})
@ -63,20 +68,19 @@ type Cache struct {
cache map[interface{}]*list.Element cache map[interface{}]*list.Element
} }
// A Key may be any value that is comparable. See http://golang.org/ref/spec#Comparison_operators
type Key interface{}
type entry struct { type entry struct {
key Key key string
time time.Time
value *bytes.Buffer value *bytes.Buffer
} }
// NewCache creates a new Cache. // NewCache creates a new Cache.
// If maxEntries is zero, the cache has no limit and it's assumed // If maxEntries is zero, the cache has no limit and it's assumed
// that eviction is done by the caller. // that eviction is done by the caller.
func NewCache(maxSize uint64) *Cache { func NewCache(maxSize uint64, expiration time.Duration) *Cache {
return &Cache{ return &Cache{
MaxSize: maxSize, MaxSize: maxSize,
Expiration: expiration,
ll: list.New(), ll: list.New(),
cache: make(map[interface{}]*list.Element), cache: make(map[interface{}]*list.Element),
} }
@ -91,7 +95,7 @@ func (c *Cache) Stats() CacheStats {
} }
// Add adds a value to the cache. // Add adds a value to the cache.
func (c *Cache) Add(key Key, size int64) io.WriteCloser { func (c *Cache) Add(key string, size int64) io.WriteCloser {
r, w := io.Pipe() r, w := io.Pipe()
blockingWriter := NewBlockingWriteCloser(w) blockingWriter := NewBlockingWriteCloser(w)
go func() { go func() {
@ -105,9 +109,11 @@ func (c *Cache) Add(key Key, size int64) io.WriteCloser {
return return
} }
// If MaxSize is zero expecting infinite memory // If MaxSize is zero expecting infinite memory
if c.MaxSize != 0 && (c.totalSize+uint64(size)) > c.MaxSize { if c.MaxSize != 0 {
for (c.totalSize + uint64(size)) > c.MaxSize {
c.RemoveOldest() c.RemoveOldest()
} }
}
value := new(bytes.Buffer) value := new(bytes.Buffer)
n, err := io.CopyN(value, r, size) n, err := io.CopyN(value, r, size)
if err != nil { if err != nil {
@ -116,7 +122,7 @@ func (c *Cache) Add(key Key, size int64) io.WriteCloser {
blockingWriter.Release(err) blockingWriter.Release(err)
return return
} }
ele := c.ll.PushFront(&entry{key, value}) ele := c.ll.PushFront(&entry{key, time.Now(), value})
c.cache[key] = ele c.cache[key] = ele
c.totalSize += uint64(n) c.totalSize += uint64(n)
r.Close() r.Close()
@ -126,19 +132,20 @@ func (c *Cache) Add(key Key, size int64) io.WriteCloser {
} }
// Get looks up a key's value from the cache. // Get looks up a key's value from the cache.
func (c *Cache) Get(key Key) (value *bytes.Buffer, ok bool) { func (c *Cache) Get(key string) (value []byte, ok bool) {
if c.cache == nil { if c.cache == nil {
return return
} }
if ele, hit := c.cache[key]; hit { if ele, hit := c.cache[key]; hit {
c.ll.MoveToFront(ele) c.ll.MoveToFront(ele)
return ele.Value.(*entry).value, true ele.Value.(*entry).time = time.Now()
return ele.Value.(*entry).value.Bytes(), true
} }
return return
} }
// Remove removes the provided key from the cache. // Remove removes the provided key from the cache.
func (c *Cache) Remove(key Key) { func (c *Cache) Remove(key string) {
if c.cache == nil { if c.cache == nil {
return return
} }
@ -158,16 +165,21 @@ func (c *Cache) RemoveOldest() {
} }
} }
// GetOldest returns the oldest key // ExpireOldestAndWait expire old key which is expired and return wait times if any
func (c *Cache) GetOldest() (key Key, ok bool) { func (c *Cache) ExpireOldestAndWait() time.Duration {
if c.cache == nil { if c.cache == nil {
return nil, false return 0
} }
ele := c.ll.Back() ele := c.ll.Back()
if ele != nil { if ele != nil {
return ele.Value.(*entry).key, true switch {
case time.Now().Sub(ele.Value.(*entry).time) > c.Expiration:
c.removeElement(ele)
default:
return (c.Expiration - time.Now().Sub(ele.Value.(*entry).time))
}
} }
return nil, false return 0
} }
func (c *Cache) removeElement(e *list.Element) { func (c *Cache) removeElement(e *list.Element) {

@ -42,9 +42,6 @@ type memoryDriver struct {
lock *sync.RWMutex lock *sync.RWMutex
objects *Cache objects *Cache
lastAccessedObjects map[string]time.Time lastAccessedObjects map[string]time.Time
maxSize uint64
expiration time.Duration
shutdown bool
} }
type storedBucket struct { type storedBucket struct {
@ -65,11 +62,8 @@ func Start(maxSize uint64, expiration time.Duration) (chan<- string, <-chan erro
memory = new(memoryDriver) memory = new(memoryDriver)
memory.storedBuckets = make(map[string]storedBucket) memory.storedBuckets = make(map[string]storedBucket)
memory.lastAccessedObjects = make(map[string]time.Time) memory.lastAccessedObjects = make(map[string]time.Time)
memory.objects = NewCache(maxSize) memory.objects = NewCache(maxSize, expiration)
memory.maxSize = maxSize
memory.lock = new(sync.RWMutex) memory.lock = new(sync.RWMutex)
memory.expiration = expiration
memory.shutdown = false
memory.objects.OnEvicted = memory.evictObject memory.objects.OnEvicted = memory.evictObject
@ -100,19 +94,15 @@ func (memory *memoryDriver) GetObject(w io.Writer, bucket string, object string)
memory.lock.RUnlock() memory.lock.RUnlock()
return 0, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) return 0, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
} }
storedBucket := memory.storedBuckets[bucket]
// form objectKey
objectKey := bucket + "/" + object objectKey := bucket + "/" + object
if _, ok := storedBucket.objectMetadata[objectKey]; ok { data, ok := memory.objects.Get(objectKey)
if data, ok := memory.objects.Get(objectKey); ok { if !ok {
memory.lock.RUnlock() memory.lock.RUnlock()
go memory.updateAccessTime(objectKey) return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, nil)
written, err := io.Copy(w, data)
return written, iodine.New(err, nil)
}
} }
memory.lock.RUnlock() memory.lock.RUnlock()
return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, nil) written, err := io.Copy(w, bytes.NewBuffer(data))
return written, iodine.New(err, nil)
} }
// GetPartialObject - GET object from memory buffer range // GetPartialObject - GET object from memory buffer range
@ -477,34 +467,15 @@ func (memory *memoryDriver) evictObject(a ...interface{}) {
func (memory *memoryDriver) expireLRUObjects() { func (memory *memoryDriver) expireLRUObjects() {
for { for {
if memory.shutdown {
return
}
var sleepDuration time.Duration var sleepDuration time.Duration
memory.lock.Lock() memory.lock.Lock()
switch { switch {
case memory.objects.Len() > 0: case memory.objects.Len() > 0:
if k, ok := memory.objects.GetOldest(); ok { sleepDuration = memory.objects.ExpireOldestAndWait()
key := k.(string)
switch {
case time.Now().Sub(memory.lastAccessedObjects[key]) > memory.expiration:
memory.objects.RemoveOldest()
default: default:
sleepDuration = memory.expiration - time.Now().Sub(memory.lastAccessedObjects[key]) sleepDuration = memory.objects.Expiration
}
}
default:
sleepDuration = memory.expiration
} }
memory.lock.Unlock() memory.lock.Unlock()
time.Sleep(sleepDuration) time.Sleep(sleepDuration)
} }
} }
func (memory *memoryDriver) updateAccessTime(key string) {
memory.lock.Lock()
defer memory.lock.Unlock()
if _, ok := memory.lastAccessedObjects[key]; ok {
memory.lastAccessedObjects[key] = time.Now().UTC()
}
}

Loading…
Cancel
Save