Deprecate LRU use just map[string]interface{} and call it intelligent 'cache'

master
Harshavardhana 10 years ago
parent 63edb1e9a0
commit ed1259d6f0
  1. 65
      pkg/storage/drivers/memory/blockingwriter.go
  2. 204
      pkg/storage/drivers/memory/lru.go
  3. 165
      pkg/storage/drivers/memory/lru/lru.go
  4. 110
      pkg/storage/drivers/memory/lru/lru_test.go
  5. 83
      pkg/storage/drivers/memory/memory.go
  6. 151
      pkg/storage/drivers/memory/memory_intelligent.go

@ -1,65 +0,0 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package memory
import (
"io"
"sync"
)
// BlockingWriteCloser is a WriteCloser that blocks until released
type BlockingWriteCloser struct {
w io.WriteCloser
release *sync.WaitGroup
err error
}
// Write to the underlying writer
func (b *BlockingWriteCloser) Write(p []byte) (int, error) {
n, err := b.w.Write(p)
if err != nil {
b.err = err
}
return n, b.err
}
// Close blocks until another goroutine calls Release(error). Returns error code if either
// writer fails or Release is called with an error.
func (b *BlockingWriteCloser) Close() error {
err := b.w.Close()
if err != nil {
b.err = err
}
b.release.Wait()
return b.err
}
// Release the Close, causing it to unblock. Only call this once. Calling it multiple times results in a panic.
func (b *BlockingWriteCloser) Release(err error) {
b.release.Done()
if err != nil {
b.err = err
}
return
}
// NewBlockingWriteCloser Creates a new write closer that must be released by the read consumer.
func NewBlockingWriteCloser(w io.WriteCloser) *BlockingWriteCloser {
wg := &sync.WaitGroup{}
wg.Add(1)
return &BlockingWriteCloser{w: w, release: wg}
}

@ -1,204 +0,0 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---
Modifications from Minio under the following license:
Minimalist Object Storage, (C) 2015 Minio, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package memory
import (
"bytes"
"container/list"
"io"
"strconv"
"time"
"github.com/minio-io/minio/pkg/iodine"
"github.com/minio-io/minio/pkg/storage/drivers"
)
// CacheStats are returned by stats accessors on Group.
type CacheStats struct {
Bytes uint64
Evictions int64
}
// Cache is an LRU cache. It is not safe for concurrent access.
type Cache struct {
// MaxSize is the maximum number of cache size for entries
// before an item is evicted. Zero means no limit
MaxSize uint64
// Expiration is the maximum duration of individual objects to exist
// in cache before its evicted.
Expiration time.Duration
// OnEvicted optionally specificies a callback function to be
// executed when an entry is purged from the cache.
OnEvicted func(a ...interface{})
ll *list.List
totalSize uint64
totalEvicted int64
cache map[interface{}]*list.Element
}
type entry struct {
key string
time time.Time
value *bytes.Buffer
size int64
}
// NewCache creates a new Cache.
// If maxEntries is zero, the cache has no limit and it's assumed
// that eviction is done by the caller.
func NewCache(maxSize uint64, expiration time.Duration) *Cache {
return &Cache{
MaxSize: maxSize,
Expiration: expiration,
ll: list.New(),
cache: make(map[interface{}]*list.Element),
}
}
// Stats return cache stats
func (c *Cache) Stats() CacheStats {
return CacheStats{
Bytes: c.totalSize,
Evictions: c.totalEvicted,
}
}
// Add adds a value to the cache.
func (c *Cache) Add(key string, size int64) io.WriteCloser {
r, w := io.Pipe()
blockingWriter := NewBlockingWriteCloser(w)
go func() {
if uint64(size) > c.MaxSize {
err := iodine.New(drivers.EntityTooLarge{
Size: strconv.FormatInt(size, 10),
MaxSize: strconv.FormatUint(c.MaxSize, 10),
}, nil)
r.CloseWithError(err)
blockingWriter.Release(err)
return
}
// If MaxSize is zero expecting infinite memory
if c.MaxSize != 0 {
for (c.totalSize + uint64(size)) > c.MaxSize {
c.RemoveOldest()
}
}
buffer := new(bytes.Buffer)
written, err := io.CopyN(buffer, r, size)
if err != nil {
err := iodine.New(err, nil)
r.CloseWithError(err)
blockingWriter.Release(err)
return
}
ele := c.ll.PushFront(&entry{key, time.Now(), buffer, written})
c.cache[key] = ele
c.totalSize += uint64(written)
r.Close()
blockingWriter.Release(nil)
}()
return blockingWriter
}
// Get looks up a key's value from the cache.
func (c *Cache) Get(key string) (value []byte, ok bool) {
if c.cache == nil {
return
}
if ele, hit := c.cache[key]; hit {
c.ll.MoveToFront(ele)
ele.Value.(*entry).time = time.Now()
return ele.Value.(*entry).value.Bytes(), true
}
return
}
// Remove removes the provided key from the cache.
func (c *Cache) Remove(key string) {
if c.cache == nil {
return
}
if ele, hit := c.cache[key]; hit {
c.removeElement(ele)
}
}
// RemoveOldest removes the oldest item from the cache.
func (c *Cache) RemoveOldest() {
if c.cache == nil {
return
}
ele := c.ll.Back()
if ele != nil {
c.removeElement(ele)
}
}
// ExpireOldestAndWait expire old key which is expired and return wait times if any
func (c *Cache) ExpireOldestAndWait() time.Duration {
if c.cache == nil {
return 0
}
ele := c.ll.Back()
if ele != nil {
switch {
case time.Now().Sub(ele.Value.(*entry).time) > c.Expiration:
c.removeElement(ele)
default:
return (c.Expiration - time.Now().Sub(ele.Value.(*entry).time))
}
}
return 0
}
func (c *Cache) removeElement(e *list.Element) {
c.ll.Remove(e)
kv := e.Value.(*entry)
delete(c.cache, kv.key)
c.totalEvicted++
c.totalSize -= uint64(kv.size)
kv.value.Reset()
if c.OnEvicted != nil {
c.OnEvicted(kv.key)
}
}
// Len returns the number of items in the cache.
func (c *Cache) Len() int {
if c.cache == nil {
return 0
}
return c.ll.Len()
}

@ -1,165 +0,0 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---
Modifications from Minio under the following license:
Minimalist Object Storage, (C) 2015 Minio, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package lru implements an LRU cache.
package lru
import (
"container/list"
"sync"
)
// Cache is an LRU cache. It is not safe for concurrent access.
type Cache struct {
// MaxEntries is the maximum number of cache entries before
// an item is evicted. Zero means no limit.
MaxEntries int
// OnEvicted optionally specificies a callback function to be
// executed when an entry is purged from the cache.
OnEvicted func(key Key, value interface{})
ll *list.List
cache map[interface{}]*list.Element
lock *sync.RWMutex
}
// A Key may be any value that is comparable. See http://golang.org/ref/spec#Comparison_operators
type Key interface{}
type entry struct {
key Key
value interface{}
}
// New creates a new Cache.
// If maxEntries is zero, the cache has no limit and it's assumed
// that eviction is done by the caller.
func New(maxEntries int) *Cache {
return &Cache{
MaxEntries: maxEntries,
ll: list.New(),
cache: make(map[interface{}]*list.Element),
lock: &sync.RWMutex{},
}
}
// Add adds a value to the cache.
func (c *Cache) Add(key Key, value interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
if c.cache == nil {
c.cache = make(map[interface{}]*list.Element)
c.ll = list.New()
}
if ee, ok := c.cache[key]; ok {
c.ll.MoveToFront(ee)
ee.Value.(*entry).value = value
return
}
ele := c.ll.PushFront(&entry{key, value})
c.cache[key] = ele
if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
c.RemoveOldest()
}
}
// Get looks up a key's value from the cache.
func (c *Cache) Get(key Key) (value interface{}, ok bool) {
c.lock.Lock()
defer c.lock.Unlock()
if c.cache == nil {
return
}
if ele, hit := c.cache[key]; hit {
c.ll.MoveToFront(ele)
return ele.Value.(*entry).value, true
}
return
}
// Remove removes the provided key from the cache.
func (c *Cache) Remove(key Key) {
c.lock.Lock()
defer c.lock.Unlock()
if c.cache == nil {
return
}
if ele, hit := c.cache[key]; hit {
c.removeElement(ele)
}
}
// RemoveOldest removes the oldest item from the cache.
func (c *Cache) RemoveOldest() {
c.lock.Lock()
defer c.lock.Unlock()
if c.cache == nil {
return
}
ele := c.ll.Back()
if ele != nil {
c.removeElement(ele)
}
}
// GetOldest returns the oldest key, value, ok without modifying the lru
func (c *Cache) GetOldest() (key Key, value interface{}, ok bool) {
c.lock.Lock()
defer c.lock.Unlock()
if c.cache == nil {
return nil, nil, false
}
ele := c.ll.Back()
if ele != nil {
return ele.Value.(*entry).key, ele.Value.(*entry).value, true
}
return nil, nil, false
}
func (c *Cache) removeElement(e *list.Element) {
c.ll.Remove(e)
kv := e.Value.(*entry)
delete(c.cache, kv.key)
if c.OnEvicted != nil {
c.OnEvicted(kv.key, kv.value)
}
}
// Len returns the number of items in the cache.
func (c *Cache) Len() int {
c.lock.Lock()
defer c.lock.Unlock()
if c.cache == nil {
return 0
}
return c.ll.Len()
}

@ -1,110 +0,0 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---
Modifications from Minio under the following license:
Minimalist Object Storage, (C) 2015 Minio, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package lru
import (
"testing"
)
type simpleStruct struct {
int
string
}
type complexStruct struct {
int
simpleStruct
}
var getTests = []struct {
name string
keyToAdd interface{}
keyToGet interface{}
expectedOk bool
}{
{"string_hit", "myKey", "myKey", true},
{"string_miss", "myKey", "nonsense", false},
{"simple_struct_hit", simpleStruct{1, "two"}, simpleStruct{1, "two"}, true},
{"simeple_struct_miss", simpleStruct{1, "two"}, simpleStruct{0, "noway"}, false},
{"complex_struct_hit", complexStruct{1, simpleStruct{2, "three"}},
complexStruct{1, simpleStruct{2, "three"}}, true},
}
func TestGet(t *testing.T) {
for _, tt := range getTests {
lru := New(0)
lru.Add(tt.keyToAdd, 1234)
val, ok := lru.Get(tt.keyToGet)
if ok != tt.expectedOk {
t.Fatalf("%s: cache hit = %v; want %v", tt.name, ok, !ok)
} else if ok && val != 1234 {
t.Fatalf("%s expected get to return 1234 but got %v", tt.name, val)
}
}
}
func TestRemove(t *testing.T) {
lru := New(0)
lru.Add("myKey", 1234)
if val, ok := lru.Get("myKey"); !ok {
t.Fatal("TestRemove returned no match")
} else if val != 1234 {
t.Fatalf("TestRemove failed. Expected %d, got %v", 1234, val)
}
lru.Remove("myKey")
if _, ok := lru.Get("myKey"); ok {
t.Fatal("TestRemove returned a removed entry")
}
}
func TestOldest(t *testing.T) {
lru := New(0)
lru.Add("a", 1)
lru.Add("b", 2)
lru.Add("c", 3)
key, val, ok := lru.GetOldest()
if ok != true {
t.Fatalf("%s expected get to return 1 but got %v", "a", val)
}
if key != "a" && val != 1 {
t.Fatalf("%s expected get to return 1 but got %v", "a", val)
}
lru.RemoveOldest()
key, val, ok = lru.GetOldest()
if ok != true {
t.Fatalf("%s expected get to return 1 but got %v", "a", val)
}
if key != "b" && val != 2 {
t.Fatalf("%s expected get to return 1 but got %v", "a", val)
}
}

@ -40,8 +40,7 @@ import (
type memoryDriver struct {
storedBuckets map[string]storedBucket
lock *sync.RWMutex
objects *Cache
lastAccessedObjects map[string]time.Time
objects *Intelligent
}
type storedBucket struct {
@ -61,16 +60,14 @@ func Start(maxSize uint64, expiration time.Duration) (chan<- string, <-chan erro
var memory *memoryDriver
memory = new(memoryDriver)
memory.storedBuckets = make(map[string]storedBucket)
memory.lastAccessedObjects = make(map[string]time.Time)
memory.objects = NewCache(maxSize, expiration)
memory.objects = NewIntelligent(maxSize, expiration)
memory.lock = new(sync.RWMutex)
memory.objects.OnEvicted = memory.evictObject
// set up memory expiration
if expiration > 0 {
go memory.expireLRUObjects()
}
memory.objects.ExpireObjects(time.Millisecond * 10)
go start(ctrlChannel, errorChannel)
return ctrlChannel, errorChannel, memory
}
@ -100,7 +97,7 @@ func (memory *memoryDriver) GetObject(w io.Writer, bucket string, object string)
memory.lock.RUnlock()
return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, nil)
}
written, err := io.Copy(w, bytes.NewBuffer(data))
written, err := io.Copy(w, bytes.NewBuffer(data.([]byte)))
memory.lock.RUnlock()
return written, iodine.New(err, nil)
}
@ -134,7 +131,7 @@ func (memory *memoryDriver) GetPartialObject(w io.Writer, bucket, object string,
memory.lock.RUnlock()
return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, errParams)
}
written, err := io.CopyN(w, bytes.NewBuffer(data[start:]), length)
written, err := io.CopyN(w, bytes.NewBuffer(data.([]byte)[start:]), length)
memory.lock.RUnlock()
return written, iodine.New(err, nil)
}
@ -195,7 +192,34 @@ func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error {
}
func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
return memory.createObject(bucket, key, contentType, expectedMD5Sum, size, data)
humanReadableErr, err := memory.createObject(bucket, key, contentType, expectedMD5Sum, size, data)
// free
debug.FreeOSMemory()
return humanReadableErr, iodine.New(err, nil)
}
// getMD5AndData - this is written as a wrapper to capture md5sum and data in a more memory efficient way
func getMD5AndData(reader io.Reader) ([]byte, []byte, error) {
hash := md5.New()
var data []byte
var err error
var length int
for err == nil {
byteBuffer := make([]byte, 1024*1024)
length, err = reader.Read(byteBuffer)
// While hash.Write() wouldn't mind a Nil byteBuffer
// It is necessary for us to verify this and break
if length == 0 {
break
}
hash.Write(byteBuffer[0:length])
data = append(data, byteBuffer[0:length]...)
}
if err != io.EOF {
return nil, nil, err
}
return hash.Sum(nil), data, nil
}
// CreateObject - PUT object to memory buffer
@ -234,23 +258,17 @@ func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Su
}
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
}
memory.lock.Lock()
md5Writer := md5.New()
lruWriter := memory.objects.Add(objectKey, size)
mw := io.MultiWriter(md5Writer, lruWriter)
totalLength, err := io.CopyN(mw, data, size)
md5SumBytes, readBytes, err := getMD5AndData(data)
if err != nil {
memory.lock.Unlock()
return "", iodine.New(err, nil)
}
if err := lruWriter.Close(); err != nil {
memory.lock.Unlock()
return "", iodine.New(err, nil)
}
totalLength := len(readBytes)
memory.lock.Lock()
memory.objects.Set(objectKey, readBytes)
memory.lock.Unlock()
// de-allocating
readBytes = nil
md5SumBytes := md5Writer.Sum(nil)
md5Sum := hex.EncodeToString(md5SumBytes)
// Verify if the written object is equal to what is expected, only if it is requested as such
if strings.TrimSpace(expectedMD5Sum) != "" {
@ -280,9 +298,6 @@ func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Su
}
memory.storedBuckets[bucket] = storedBucket
memory.lock.Unlock()
// free
debug.FreeOSMemory()
return newObject.Md5, nil
}
@ -465,12 +480,11 @@ func (memory *memoryDriver) GetObjectMetadata(bucket, key, prefix string) (drive
func (memory *memoryDriver) evictObject(a ...interface{}) {
cacheStats := memory.objects.Stats()
log.Printf("CurrenSize: %d, CurrentItems: %d, TotalEvictions: %d",
cacheStats.Bytes, memory.objects.Len(), cacheStats.Evictions)
cacheStats.Bytes, cacheStats.Items, cacheStats.Evictions)
key := a[0].(string)
// loop through all buckets
for bucket, storedBucket := range memory.storedBuckets {
delete(storedBucket.objectMetadata, key)
delete(memory.lastAccessedObjects, key)
// remove bucket if no objects found anymore
if len(storedBucket.objectMetadata) == 0 {
delete(memory.storedBuckets, bucket)
@ -478,18 +492,3 @@ func (memory *memoryDriver) evictObject(a ...interface{}) {
}
debug.FreeOSMemory()
}
func (memory *memoryDriver) expireLRUObjects() {
for {
var sleepDuration time.Duration
memory.lock.Lock()
switch {
case memory.objects.Len() > 0:
sleepDuration = memory.objects.ExpireOldestAndWait()
default:
sleepDuration = memory.objects.Expiration
}
memory.lock.Unlock()
time.Sleep(sleepDuration)
}
}

@ -0,0 +1,151 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package memory
import (
"sync"
"time"
)
var zeroExpiration = time.Duration(0)
// Intelligent holds the required variables to compose an in memory cache system
// which also provides expiring key mechanism and also maxSize
type Intelligent struct {
// Mutex is used for handling the concurrent
// read/write requests for cache
sync.Mutex
// items hold the cached objects
items map[string]interface{}
// createdAt holds the time that related item's created At
createdAt map[string]time.Time
// expiration is a duration for a cache key to expire
expiration time.Duration
// gcInterval is a duration for garbage collection
gcInterval time.Duration
// maxSize is a total size for overall cache
maxSize uint64
// currentSize is a current size in memory
currentSize uint64
// OnEvicted - callback function for eviction
OnEvicted func(a ...interface{})
// totalEvicted counter to keep track of total evictions
totalEvicted uint64
}
// Stats current cache statistics
type Stats struct {
Bytes uint64
Items uint64
Evictions uint64
}
// NewIntelligent creates an inmemory cache
//
// maxSize is used for expiring objects before we run out of memory
// expiration is used for expiration of a key from cache
func NewIntelligent(maxSize uint64, expiration time.Duration) *Intelligent {
return &Intelligent{
items: map[string]interface{}{},
createdAt: map[string]time.Time{},
expiration: expiration,
maxSize: maxSize,
}
}
// Stats get current cache statistics
func (r *Intelligent) Stats() Stats {
return Stats{
Bytes: r.currentSize,
Items: uint64(len(r.items)),
Evictions: r.totalEvicted,
}
}
// ExpireObjects expire objects in go routine
func (r *Intelligent) ExpireObjects(gcInterval time.Duration) {
r.gcInterval = gcInterval
go func() {
for range time.Tick(gcInterval) {
for key := range r.items {
r.Lock()
if !r.isValid(key) {
r.Delete(key)
}
r.Unlock()
}
}
}()
}
// Get returns a value of a given key if it exists
func (r *Intelligent) Get(key string) (interface{}, bool) {
r.Lock()
defer r.Unlock()
value, ok := r.items[key]
return value, ok
}
// Set will persist a value to the cache
func (r *Intelligent) Set(key string, value interface{}) {
r.Lock()
// remove random key if only we reach the maxSize threshold,
// if not assume infinite memory
if r.maxSize > 0 {
for key := range r.items {
for (r.currentSize + uint64(len(value.([]byte)))) > r.maxSize {
r.Delete(key)
}
break
}
}
r.items[key] = value
r.currentSize += uint64(len(value.([]byte)))
r.createdAt[key] = time.Now()
r.Unlock()
return
}
// Delete deletes a given key if exists
func (r *Intelligent) Delete(key string) {
r.currentSize -= uint64(len(r.items[key].([]byte)))
delete(r.items, key)
delete(r.createdAt, key)
r.totalEvicted++
if r.OnEvicted != nil {
r.OnEvicted(key)
}
}
func (r *Intelligent) isValid(key string) bool {
createdAt, ok := r.createdAt[key]
if !ok {
return false
}
if r.expiration == zeroExpiration {
return true
}
return createdAt.Add(r.expiration).After(time.Now())
}
Loading…
Cancel
Save