|
|
|
@ -40,10 +40,10 @@ import ( |
|
|
|
|
|
|
|
|
|
// memoryDriver - local variables
|
|
|
|
|
type memoryDriver struct { |
|
|
|
|
bucketMetadata map[string]storedBucket |
|
|
|
|
objectMetadata map[string]storedObject |
|
|
|
|
objects *lru.Cache |
|
|
|
|
storedBuckets map[string]storedBucket |
|
|
|
|
lock *sync.RWMutex |
|
|
|
|
objects *lru.Cache |
|
|
|
|
lastAccessedObjects map[string]time.Time |
|
|
|
|
totalSize uint64 |
|
|
|
|
maxSize uint64 |
|
|
|
|
expiration time.Duration |
|
|
|
@ -51,14 +51,8 @@ type memoryDriver struct { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type storedBucket struct { |
|
|
|
|
metadata drivers.BucketMetadata |
|
|
|
|
// owner string // TODO
|
|
|
|
|
// id string // TODO
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type storedObject struct { |
|
|
|
|
metadata drivers.ObjectMetadata |
|
|
|
|
lastAccessed time.Time |
|
|
|
|
bucketMetadata drivers.BucketMetadata |
|
|
|
|
objectMetadata map[string]drivers.ObjectMetadata |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const ( |
|
|
|
@ -72,8 +66,8 @@ func Start(maxSize uint64, expiration time.Duration) (chan<- string, <-chan erro |
|
|
|
|
|
|
|
|
|
var memory *memoryDriver |
|
|
|
|
memory = new(memoryDriver) |
|
|
|
|
memory.bucketMetadata = make(map[string]storedBucket) |
|
|
|
|
memory.objectMetadata = make(map[string]storedObject) |
|
|
|
|
memory.storedBuckets = make(map[string]storedBucket) |
|
|
|
|
memory.lastAccessedObjects = make(map[string]time.Time) |
|
|
|
|
memory.objects = lru.New(0) |
|
|
|
|
memory.lock = new(sync.RWMutex) |
|
|
|
|
memory.expiration = expiration |
|
|
|
@ -91,7 +85,7 @@ func Start(maxSize uint64, expiration time.Duration) (chan<- string, <-chan erro |
|
|
|
|
memory.objects.OnEvicted = memory.evictObject |
|
|
|
|
|
|
|
|
|
// set up memory expiration
|
|
|
|
|
go memory.expireObjects() |
|
|
|
|
go memory.expireLRUObjects() |
|
|
|
|
|
|
|
|
|
go start(ctrlChannel, errorChannel) |
|
|
|
|
return ctrlChannel, errorChannel, memory |
|
|
|
@ -111,12 +105,13 @@ func (memory *memoryDriver) GetObject(w io.Writer, bucket string, object string) |
|
|
|
|
if !drivers.IsValidObject(object) { |
|
|
|
|
return 0, iodine.New(drivers.ObjectNameInvalid{Object: object}, nil) |
|
|
|
|
} |
|
|
|
|
if _, ok := memory.bucketMetadata[bucket]; ok == false { |
|
|
|
|
if _, ok := memory.storedBuckets[bucket]; ok == false { |
|
|
|
|
return 0, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) |
|
|
|
|
} |
|
|
|
|
// get object
|
|
|
|
|
storedBucket := memory.storedBuckets[bucket] |
|
|
|
|
// form objectKey
|
|
|
|
|
objectKey := bucket + "/" + object |
|
|
|
|
if _, ok := memory.objectMetadata[objectKey]; ok { |
|
|
|
|
if _, ok := storedBucket.objectMetadata[objectKey]; ok { |
|
|
|
|
if data, ok := memory.objects.Get(objectKey); ok { |
|
|
|
|
dataSlice := data.([]byte) |
|
|
|
|
objectBuffer := bytes.NewBuffer(dataSlice) |
|
|
|
@ -155,10 +150,10 @@ func (memory *memoryDriver) GetBucketMetadata(bucket string) (drivers.BucketMeta |
|
|
|
|
if !drivers.IsValidBucket(bucket) { |
|
|
|
|
return drivers.BucketMetadata{}, iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) |
|
|
|
|
} |
|
|
|
|
if _, ok := memory.bucketMetadata[bucket]; ok == false { |
|
|
|
|
if _, ok := memory.storedBuckets[bucket]; ok == false { |
|
|
|
|
return drivers.BucketMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) |
|
|
|
|
} |
|
|
|
|
return memory.bucketMetadata[bucket].metadata, nil |
|
|
|
|
return memory.storedBuckets[bucket].bucketMetadata, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// SetBucketMetadata -
|
|
|
|
@ -168,7 +163,7 @@ func (memory *memoryDriver) SetBucketMetadata(bucket, acl string) error { |
|
|
|
|
memory.lock.RUnlock() |
|
|
|
|
return iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) |
|
|
|
|
} |
|
|
|
|
if _, ok := memory.bucketMetadata[bucket]; ok == false { |
|
|
|
|
if _, ok := memory.storedBuckets[bucket]; ok == false { |
|
|
|
|
memory.lock.RUnlock() |
|
|
|
|
return iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) |
|
|
|
|
} |
|
|
|
@ -178,9 +173,9 @@ func (memory *memoryDriver) SetBucketMetadata(bucket, acl string) error { |
|
|
|
|
memory.lock.RUnlock() |
|
|
|
|
memory.lock.Lock() |
|
|
|
|
defer memory.lock.Unlock() |
|
|
|
|
storedBucket := memory.bucketMetadata[bucket] |
|
|
|
|
storedBucket.metadata.ACL = drivers.BucketACL(acl) |
|
|
|
|
memory.bucketMetadata[bucket] = storedBucket |
|
|
|
|
storedBucket := memory.storedBuckets[bucket] |
|
|
|
|
storedBucket.bucketMetadata.ACL = drivers.BucketACL(acl) |
|
|
|
|
memory.storedBuckets[bucket] = storedBucket |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -214,12 +209,14 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su |
|
|
|
|
memory.lock.RUnlock() |
|
|
|
|
return iodine.New(drivers.ObjectNameInvalid{Object: key}, nil) |
|
|
|
|
} |
|
|
|
|
if _, ok := memory.bucketMetadata[bucket]; ok == false { |
|
|
|
|
if _, ok := memory.storedBuckets[bucket]; ok == false { |
|
|
|
|
memory.lock.RUnlock() |
|
|
|
|
return iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) |
|
|
|
|
} |
|
|
|
|
storedBucket := memory.storedBuckets[bucket] |
|
|
|
|
// get object key
|
|
|
|
|
objectKey := bucket + "/" + key |
|
|
|
|
if _, ok := memory.objectMetadata[objectKey]; ok == true { |
|
|
|
|
if _, ok := storedBucket.objectMetadata[objectKey]; ok == true { |
|
|
|
|
memory.lock.RUnlock() |
|
|
|
|
return iodine.New(drivers.ObjectExists{Bucket: bucket, Object: key}, nil) |
|
|
|
|
} |
|
|
|
@ -228,9 +225,7 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su |
|
|
|
|
if contentType == "" { |
|
|
|
|
contentType = "application/octet-stream" |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
contentType = strings.TrimSpace(contentType) |
|
|
|
|
|
|
|
|
|
if strings.TrimSpace(expectedMD5Sum) != "" { |
|
|
|
|
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum)) |
|
|
|
|
if err != nil { |
|
|
|
@ -241,7 +236,6 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var bytesBuffer bytes.Buffer |
|
|
|
|
var newObject = storedObject{} |
|
|
|
|
|
|
|
|
|
chunks := split.Stream(data, 10*1024*1024) |
|
|
|
|
totalLength := 0 |
|
|
|
@ -266,11 +260,13 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su |
|
|
|
|
// Verify if the written object is equal to what is expected, only if it is requested as such
|
|
|
|
|
if strings.TrimSpace(expectedMD5Sum) != "" { |
|
|
|
|
if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil { |
|
|
|
|
memory.lock.Lock() |
|
|
|
|
defer memory.lock.Unlock() |
|
|
|
|
memory.objects.RemoveOldest() |
|
|
|
|
return iodine.New(drivers.BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Key: key}, nil) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
newObject.metadata = drivers.ObjectMetadata{ |
|
|
|
|
newObject := drivers.ObjectMetadata{ |
|
|
|
|
Bucket: bucket, |
|
|
|
|
Key: key, |
|
|
|
|
|
|
|
|
@ -279,20 +275,22 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su |
|
|
|
|
Md5: md5Sum, |
|
|
|
|
Size: int64(totalLength), |
|
|
|
|
} |
|
|
|
|
newObject.lastAccessed = time.Now() |
|
|
|
|
memory.lock.Lock() |
|
|
|
|
if _, ok := memory.objectMetadata[objectKey]; ok == true { |
|
|
|
|
memory.objects.RemoveOldest() |
|
|
|
|
memory.lock.Unlock() |
|
|
|
|
return iodine.New(drivers.ObjectExists{Bucket: bucket, Object: key}, nil) |
|
|
|
|
memoryObject := make(map[string]drivers.ObjectMetadata) |
|
|
|
|
if len(memory.storedBuckets[bucket].objectMetadata) == 0 { |
|
|
|
|
storedBucket.objectMetadata = memoryObject |
|
|
|
|
storedBucket.objectMetadata[objectKey] = newObject |
|
|
|
|
} else { |
|
|
|
|
storedBucket.objectMetadata[objectKey] = newObject |
|
|
|
|
} |
|
|
|
|
memory.objectMetadata[objectKey] = newObject |
|
|
|
|
memory.storedBuckets[bucket] = storedBucket |
|
|
|
|
memory.objects.Add(objectKey, bytesBuffer.Bytes()) |
|
|
|
|
memory.totalSize = memory.totalSize + uint64(newObject.metadata.Size) |
|
|
|
|
for memory.totalSize > memory.maxSize { |
|
|
|
|
memory.totalSize = memory.totalSize + uint64(newObject.Size) |
|
|
|
|
if memory.totalSize > memory.maxSize { |
|
|
|
|
memory.objects.RemoveOldest() |
|
|
|
|
} |
|
|
|
|
memory.lock.Unlock() |
|
|
|
|
// free memory if possible for kernel to reclaim
|
|
|
|
|
debug.FreeOSMemory() |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
@ -300,7 +298,7 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su |
|
|
|
|
// CreateBucket - create bucket in memory
|
|
|
|
|
func (memory *memoryDriver) CreateBucket(bucketName, acl string) error { |
|
|
|
|
memory.lock.RLock() |
|
|
|
|
if len(memory.bucketMetadata) == totalBuckets { |
|
|
|
|
if len(memory.storedBuckets) == totalBuckets { |
|
|
|
|
memory.lock.RLock() |
|
|
|
|
return iodine.New(drivers.TooManyBuckets{Bucket: bucketName}, nil) |
|
|
|
|
} |
|
|
|
@ -312,7 +310,7 @@ func (memory *memoryDriver) CreateBucket(bucketName, acl string) error { |
|
|
|
|
memory.lock.RUnlock() |
|
|
|
|
return iodine.New(drivers.InvalidACL{ACL: acl}, nil) |
|
|
|
|
} |
|
|
|
|
if _, ok := memory.bucketMetadata[bucketName]; ok == true { |
|
|
|
|
if _, ok := memory.storedBuckets[bucketName]; ok == true { |
|
|
|
|
memory.lock.RUnlock() |
|
|
|
|
return iodine.New(drivers.BucketExists{Bucket: bucketName}, nil) |
|
|
|
|
} |
|
|
|
@ -323,14 +321,14 @@ func (memory *memoryDriver) CreateBucket(bucketName, acl string) error { |
|
|
|
|
acl = "private" |
|
|
|
|
} |
|
|
|
|
var newBucket = storedBucket{} |
|
|
|
|
newBucket.metadata = drivers.BucketMetadata{} |
|
|
|
|
newBucket.metadata.Name = bucketName |
|
|
|
|
newBucket.metadata.Created = time.Now() |
|
|
|
|
newBucket.metadata.ACL = drivers.BucketACL(acl) |
|
|
|
|
newBucket.objectMetadata = make(map[string]drivers.ObjectMetadata) |
|
|
|
|
newBucket.bucketMetadata = drivers.BucketMetadata{} |
|
|
|
|
newBucket.bucketMetadata.Name = bucketName |
|
|
|
|
newBucket.bucketMetadata.Created = time.Now() |
|
|
|
|
newBucket.bucketMetadata.ACL = drivers.BucketACL(acl) |
|
|
|
|
memory.lock.Lock() |
|
|
|
|
defer memory.lock.Unlock() |
|
|
|
|
memory.bucketMetadata[bucketName] = newBucket |
|
|
|
|
|
|
|
|
|
memory.storedBuckets[bucketName] = newBucket |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -403,12 +401,13 @@ func (memory *memoryDriver) ListObjects(bucket string, resources drivers.BucketR |
|
|
|
|
if !drivers.IsValidObject(resources.Prefix) { |
|
|
|
|
return nil, drivers.BucketResourcesMetadata{IsTruncated: false}, iodine.New(drivers.ObjectNameInvalid{Object: resources.Prefix}, nil) |
|
|
|
|
} |
|
|
|
|
if _, ok := memory.bucketMetadata[bucket]; ok == false { |
|
|
|
|
if _, ok := memory.storedBuckets[bucket]; ok == false { |
|
|
|
|
return nil, drivers.BucketResourcesMetadata{IsTruncated: false}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) |
|
|
|
|
} |
|
|
|
|
var results []drivers.ObjectMetadata |
|
|
|
|
var keys []string |
|
|
|
|
for key := range memory.objectMetadata { |
|
|
|
|
storedBucket := memory.storedBuckets[bucket] |
|
|
|
|
for key := range storedBucket.objectMetadata { |
|
|
|
|
if strings.HasPrefix(key, bucket+"/") { |
|
|
|
|
key = key[len(bucket)+1:] |
|
|
|
|
keys, resources = memory.listObjectsInternal(keys, key, resources) |
|
|
|
@ -420,10 +419,8 @@ func (memory *memoryDriver) ListObjects(bucket string, resources drivers.BucketR |
|
|
|
|
resources.IsTruncated = true |
|
|
|
|
return results, resources, nil |
|
|
|
|
} |
|
|
|
|
object := memory.objectMetadata[bucket+"/"+key] |
|
|
|
|
if bucket == object.metadata.Bucket { |
|
|
|
|
results = append(results, object.metadata) |
|
|
|
|
} |
|
|
|
|
object := storedBucket.objectMetadata[bucket+"/"+key] |
|
|
|
|
results = append(results, object) |
|
|
|
|
} |
|
|
|
|
return results, resources, nil |
|
|
|
|
} |
|
|
|
@ -445,8 +442,8 @@ func (memory *memoryDriver) ListBuckets() ([]drivers.BucketMetadata, error) { |
|
|
|
|
memory.lock.RLock() |
|
|
|
|
defer memory.lock.RUnlock() |
|
|
|
|
var results []drivers.BucketMetadata |
|
|
|
|
for _, bucket := range memory.bucketMetadata { |
|
|
|
|
results = append(results, bucket.metadata) |
|
|
|
|
for _, bucket := range memory.storedBuckets { |
|
|
|
|
results = append(results, bucket.bucketMetadata) |
|
|
|
|
} |
|
|
|
|
sort.Sort(ByBucketName(results)) |
|
|
|
|
return results, nil |
|
|
|
@ -463,38 +460,48 @@ func (memory *memoryDriver) GetObjectMetadata(bucket, key, prefix string) (drive |
|
|
|
|
if !drivers.IsValidObject(key) || !drivers.IsValidObject(prefix) { |
|
|
|
|
return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNameInvalid{Object: key}, nil) |
|
|
|
|
} |
|
|
|
|
if _, ok := memory.bucketMetadata[bucket]; ok == false { |
|
|
|
|
if _, ok := memory.storedBuckets[bucket]; ok == false { |
|
|
|
|
return drivers.ObjectMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) |
|
|
|
|
} |
|
|
|
|
storedBucket := memory.storedBuckets[bucket] |
|
|
|
|
objectKey := bucket + "/" + key |
|
|
|
|
if object, ok := memory.objectMetadata[objectKey]; ok == true { |
|
|
|
|
return object.metadata, nil |
|
|
|
|
if object, ok := storedBucket.objectMetadata[objectKey]; ok == true { |
|
|
|
|
return object, nil |
|
|
|
|
} |
|
|
|
|
return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: key}, nil) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (memory *memoryDriver) evictObject(key lru.Key, value interface{}) { |
|
|
|
|
k := key.(string) |
|
|
|
|
memory.totalSize = memory.totalSize - uint64(memory.objectMetadata[k].metadata.Size) |
|
|
|
|
delete(memory.objectMetadata, k) |
|
|
|
|
// loop through all buckets
|
|
|
|
|
for bucket, storedBucket := range memory.storedBuckets { |
|
|
|
|
memory.totalSize = memory.totalSize - uint64(storedBucket.objectMetadata[k].Size) |
|
|
|
|
log.Printf("Evicting: %s of Size: %d", k, storedBucket.objectMetadata[k].Size) |
|
|
|
|
log.Println("TotalSize:", memory.totalSize) |
|
|
|
|
delete(storedBucket.objectMetadata, k) |
|
|
|
|
// remove bucket if no objects found anymore
|
|
|
|
|
if len(storedBucket.objectMetadata) == 0 { |
|
|
|
|
delete(memory.storedBuckets, bucket) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// free memory for kernel to reclaim if possible
|
|
|
|
|
debug.FreeOSMemory() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (memory *memoryDriver) expireObjects() { |
|
|
|
|
func (memory *memoryDriver) expireLRUObjects() { |
|
|
|
|
for { |
|
|
|
|
if memory.shutdown { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
var sleepDuration time.Duration |
|
|
|
|
memory.lock.Lock() |
|
|
|
|
if len(memory.objectMetadata) > 0 { |
|
|
|
|
if memory.objects.Len() > 0 { |
|
|
|
|
if k, _, ok := memory.objects.GetOldest(); ok { |
|
|
|
|
key := k.(string) |
|
|
|
|
object := memory.objectMetadata[key] |
|
|
|
|
if time.Now().Sub(object.lastAccessed) > memory.expiration { |
|
|
|
|
if time.Now().Sub(memory.lastAccessedObjects[key]) > memory.expiration { |
|
|
|
|
memory.objects.RemoveOldest() |
|
|
|
|
} else { |
|
|
|
|
sleepDuration = memory.expiration - time.Now().Sub(object.lastAccessed) |
|
|
|
|
sleepDuration = memory.expiration - time.Now().Sub(memory.lastAccessedObjects[key]) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
@ -508,8 +515,5 @@ func (memory *memoryDriver) expireObjects() { |
|
|
|
|
func (memory *memoryDriver) updateAccessTime(key string) { |
|
|
|
|
memory.lock.Lock() |
|
|
|
|
defer memory.lock.Unlock() |
|
|
|
|
if object, ok := memory.objectMetadata[key]; ok { |
|
|
|
|
object.lastAccessed = time.Now() |
|
|
|
|
memory.objectMetadata[key] = object |
|
|
|
|
} |
|
|
|
|
memory.lastAccessedObjects[key] = time.Now() |
|
|
|
|
} |
|
|
|
|