|
|
|
@ -33,6 +33,7 @@ import ( |
|
|
|
|
"io/ioutil" |
|
|
|
|
|
|
|
|
|
"github.com/golang/groupcache/lru" |
|
|
|
|
"github.com/minio-io/minio/pkg/utils/log" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// memoryDriver - local variables
|
|
|
|
@ -41,6 +42,8 @@ type memoryDriver struct { |
|
|
|
|
objectMetadata map[string]storedObject |
|
|
|
|
objects *lru.Cache |
|
|
|
|
lock *sync.RWMutex |
|
|
|
|
totalSize int64 |
|
|
|
|
maxSize int64 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type storedBucket struct { |
|
|
|
@ -54,16 +57,26 @@ type storedObject struct { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Start memory object server
|
|
|
|
|
func Start(maxObjects int) (chan<- string, <-chan error, drivers.Driver) { |
|
|
|
|
func Start(maxSize int64) (chan<- string, <-chan error, drivers.Driver) { |
|
|
|
|
ctrlChannel := make(chan string) |
|
|
|
|
errorChannel := make(chan error) |
|
|
|
|
|
|
|
|
|
memory := new(memoryDriver) |
|
|
|
|
var memory *memoryDriver |
|
|
|
|
memory = new(memoryDriver) |
|
|
|
|
memory.bucketMetadata = make(map[string]storedBucket) |
|
|
|
|
memory.objectMetadata = make(map[string]storedObject) |
|
|
|
|
memory.objects = lru.New(maxObjects) |
|
|
|
|
memory.objects = lru.New(0) |
|
|
|
|
memory.lock = new(sync.RWMutex) |
|
|
|
|
|
|
|
|
|
switch { |
|
|
|
|
case maxSize == 0: |
|
|
|
|
memory.maxSize = 9223372036854775807 |
|
|
|
|
case maxSize > 0: |
|
|
|
|
memory.maxSize = maxSize |
|
|
|
|
default: |
|
|
|
|
log.Println("Error") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
memory.objects.OnEvicted = memory.evictObject |
|
|
|
|
|
|
|
|
|
go start(ctrlChannel, errorChannel) |
|
|
|
@ -75,7 +88,7 @@ func start(ctrlChannel <-chan string, errorChannel chan<- error) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GetObject - GET object from memory buffer
|
|
|
|
|
func (memory memoryDriver) GetObject(w io.Writer, bucket string, object string) (int64, error) { |
|
|
|
|
func (memory *memoryDriver) GetObject(w io.Writer, bucket string, object string) (int64, error) { |
|
|
|
|
memory.lock.RLock() |
|
|
|
|
defer memory.lock.RUnlock() |
|
|
|
|
if _, ok := memory.bucketMetadata[bucket]; ok == false { |
|
|
|
@ -95,7 +108,7 @@ func (memory memoryDriver) GetObject(w io.Writer, bucket string, object string) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GetPartialObject - GET object from memory buffer range
|
|
|
|
|
func (memory memoryDriver) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) { |
|
|
|
|
func (memory *memoryDriver) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) { |
|
|
|
|
memory.lock.RLock() |
|
|
|
|
defer memory.lock.RUnlock() |
|
|
|
|
var sourceBuffer bytes.Buffer |
|
|
|
@ -109,7 +122,7 @@ func (memory memoryDriver) GetPartialObject(w io.Writer, bucket, object string, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GetBucketMetadata -
|
|
|
|
|
func (memory memoryDriver) GetBucketMetadata(bucket string) (drivers.BucketMetadata, error) { |
|
|
|
|
func (memory *memoryDriver) GetBucketMetadata(bucket string) (drivers.BucketMetadata, error) { |
|
|
|
|
memory.lock.RLock() |
|
|
|
|
defer memory.lock.RUnlock() |
|
|
|
|
if _, ok := memory.bucketMetadata[bucket]; ok == false { |
|
|
|
@ -119,7 +132,7 @@ func (memory memoryDriver) GetBucketMetadata(bucket string) (drivers.BucketMetad |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// CreateObject - PUT object to memory buffer
|
|
|
|
|
func (memory memoryDriver) CreateObject(bucket, key, contentType, md5sum string, data io.Reader) error { |
|
|
|
|
func (memory *memoryDriver) CreateObject(bucket, key, contentType, md5sum string, data io.Reader) error { |
|
|
|
|
memory.lock.RLock() |
|
|
|
|
|
|
|
|
|
if _, ok := memory.bucketMetadata[bucket]; ok == false { |
|
|
|
@ -166,12 +179,16 @@ func (memory memoryDriver) CreateObject(bucket, key, contentType, md5sum string, |
|
|
|
|
} |
|
|
|
|
memory.objectMetadata[objectKey] = newObject |
|
|
|
|
memory.objects.Add(objectKey, dataSlice) |
|
|
|
|
memory.totalSize = memory.totalSize + newObject.metadata.Size |
|
|
|
|
for memory.totalSize > memory.maxSize { |
|
|
|
|
memory.objects.RemoveOldest() |
|
|
|
|
} |
|
|
|
|
memory.lock.Unlock() |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// CreateBucket - create bucket in memory
|
|
|
|
|
func (memory memoryDriver) CreateBucket(bucketName string) error { |
|
|
|
|
func (memory *memoryDriver) CreateBucket(bucketName string) error { |
|
|
|
|
memory.lock.RLock() |
|
|
|
|
if !drivers.IsValidBucket(bucketName) { |
|
|
|
|
memory.lock.RUnlock() |
|
|
|
@ -213,7 +230,7 @@ func appendUniq(slice []string, i string) []string { |
|
|
|
|
return append(slice, i) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (memory memoryDriver) filterDelimiterPrefix(keys []string, key, delimitedName string, resources drivers.BucketResourcesMetadata) (drivers.BucketResourcesMetadata, []string) { |
|
|
|
|
func (memory *memoryDriver) filterDelimiterPrefix(keys []string, key, delimitedName string, resources drivers.BucketResourcesMetadata) (drivers.BucketResourcesMetadata, []string) { |
|
|
|
|
switch true { |
|
|
|
|
case key == resources.Prefix: |
|
|
|
|
keys = appendUniq(keys, key) |
|
|
|
@ -227,7 +244,7 @@ func (memory memoryDriver) filterDelimiterPrefix(keys []string, key, delimitedNa |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ListObjects - list objects from memory
|
|
|
|
|
func (memory memoryDriver) ListObjects(bucket string, resources drivers.BucketResourcesMetadata) ([]drivers.ObjectMetadata, drivers.BucketResourcesMetadata, error) { |
|
|
|
|
func (memory *memoryDriver) ListObjects(bucket string, resources drivers.BucketResourcesMetadata) ([]drivers.ObjectMetadata, drivers.BucketResourcesMetadata, error) { |
|
|
|
|
memory.lock.RLock() |
|
|
|
|
defer memory.lock.RUnlock() |
|
|
|
|
if _, ok := memory.bucketMetadata[bucket]; ok == false { |
|
|
|
@ -290,7 +307,7 @@ func (b ByBucketName) Swap(i, j int) { b[i], b[j] = b[j], b[i] } |
|
|
|
|
func (b ByBucketName) Less(i, j int) bool { return b[i].Name < b[j].Name } |
|
|
|
|
|
|
|
|
|
// ListBuckets - List buckets from memory
|
|
|
|
|
func (memory memoryDriver) ListBuckets() ([]drivers.BucketMetadata, error) { |
|
|
|
|
func (memory *memoryDriver) ListBuckets() ([]drivers.BucketMetadata, error) { |
|
|
|
|
memory.lock.RLock() |
|
|
|
|
defer memory.lock.RUnlock() |
|
|
|
|
var results []drivers.BucketMetadata |
|
|
|
@ -302,7 +319,7 @@ func (memory memoryDriver) ListBuckets() ([]drivers.BucketMetadata, error) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GetObjectMetadata - get object metadata from memory
|
|
|
|
|
func (memory memoryDriver) GetObjectMetadata(bucket, key, prefix string) (drivers.ObjectMetadata, error) { |
|
|
|
|
func (memory *memoryDriver) GetObjectMetadata(bucket, key, prefix string) (drivers.ObjectMetadata, error) { |
|
|
|
|
memory.lock.RLock() |
|
|
|
|
defer memory.lock.RUnlock() |
|
|
|
|
// check if bucket exists
|
|
|
|
@ -318,8 +335,12 @@ func (memory memoryDriver) GetObjectMetadata(bucket, key, prefix string) (driver |
|
|
|
|
return drivers.ObjectMetadata{}, drivers.ObjectNotFound{Bucket: bucket, Object: key} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (memory memoryDriver) evictObject(key lru.Key, value interface{}) { |
|
|
|
|
func (memory *memoryDriver) evictObject(key lru.Key, value interface{}) { |
|
|
|
|
k := key.(string) |
|
|
|
|
|
|
|
|
|
memory.totalSize = memory.totalSize - memory.objectMetadata[k].metadata.Size |
|
|
|
|
|
|
|
|
|
log.Println("evicting:", k) |
|
|
|
|
|
|
|
|
|
delete(memory.objectMetadata, k) |
|
|
|
|
} |
|
|
|
|