diff --git a/pkg/server/server.go b/pkg/server/server.go index 479441038..3efbbded8 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -19,6 +19,9 @@ package server import ( "errors" "fmt" + "reflect" + "time" + "github.com/minio-io/minio/pkg/api" "github.com/minio-io/minio/pkg/api/web" "github.com/minio-io/minio/pkg/iodine" @@ -26,8 +29,6 @@ import ( "github.com/minio-io/minio/pkg/storage/drivers/donut" "github.com/minio-io/minio/pkg/storage/drivers/memory" "github.com/minio-io/minio/pkg/utils/log" - "reflect" - "time" ) // MemoryFactory is used to build memory api servers diff --git a/pkg/storage/drivers/memory/memory.go b/pkg/storage/drivers/memory/memory.go index 9f4b645da..30e81d924 100644 --- a/pkg/storage/drivers/memory/memory.go +++ b/pkg/storage/drivers/memory/memory.go @@ -40,25 +40,19 @@ import ( // memoryDriver - local variables type memoryDriver struct { - bucketMetadata map[string]storedBucket - objectMetadata map[string]storedObject - objects *lru.Cache - lock *sync.RWMutex - totalSize uint64 - maxSize uint64 - expiration time.Duration - shutdown bool + storedBuckets map[string]storedBucket + lock *sync.RWMutex + objects *lru.Cache + lastAccessedObjects map[string]time.Time + totalSize uint64 + maxSize uint64 + expiration time.Duration + shutdown bool } type storedBucket struct { - metadata drivers.BucketMetadata - // owner string // TODO - // id string // TODO -} - -type storedObject struct { - metadata drivers.ObjectMetadata - lastAccessed time.Time + bucketMetadata drivers.BucketMetadata + objectMetadata map[string]drivers.ObjectMetadata } const ( @@ -72,8 +66,8 @@ func Start(maxSize uint64, expiration time.Duration) (chan<- string, <-chan erro var memory *memoryDriver memory = new(memoryDriver) - memory.bucketMetadata = make(map[string]storedBucket) - memory.objectMetadata = make(map[string]storedObject) + memory.storedBuckets = make(map[string]storedBucket) + memory.lastAccessedObjects = make(map[string]time.Time) memory.objects = lru.New(0) memory.lock = new(sync.RWMutex) memory.expiration = expiration @@ -91,7 +85,7 @@ func Start(maxSize uint64, expiration time.Duration) (chan<- string, <-chan erro memory.objects.OnEvicted = memory.evictObject // set up memory expiration - go memory.expireObjects() + go memory.expireLRUObjects() go start(ctrlChannel, errorChannel) return ctrlChannel, errorChannel, memory @@ -111,12 +105,13 @@ func (memory *memoryDriver) GetObject(w io.Writer, bucket string, object string) if !drivers.IsValidObject(object) { return 0, iodine.New(drivers.ObjectNameInvalid{Object: object}, nil) } - if _, ok := memory.bucketMetadata[bucket]; ok == false { + if _, ok := memory.storedBuckets[bucket]; ok == false { return 0, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) } - // get object + storedBucket := memory.storedBuckets[bucket] + // form objectKey objectKey := bucket + "/" + object - if _, ok := memory.objectMetadata[objectKey]; ok { + if _, ok := storedBucket.objectMetadata[objectKey]; ok { if data, ok := memory.objects.Get(objectKey); ok { dataSlice := data.([]byte) objectBuffer := bytes.NewBuffer(dataSlice) @@ -155,10 +150,10 @@ func (memory *memoryDriver) GetBucketMetadata(bucket string) (drivers.BucketMeta if !drivers.IsValidBucket(bucket) { return drivers.BucketMetadata{}, iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) } - if _, ok := memory.bucketMetadata[bucket]; ok == false { + if _, ok := memory.storedBuckets[bucket]; ok == false { return drivers.BucketMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) } - return memory.bucketMetadata[bucket].metadata, nil + return memory.storedBuckets[bucket].bucketMetadata, nil } // SetBucketMetadata - @@ -168,7 +163,7 @@ func (memory *memoryDriver) SetBucketMetadata(bucket, acl string) error { memory.lock.RUnlock() return iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) } - if _, ok := memory.bucketMetadata[bucket]; ok == false { + if _, ok := memory.storedBuckets[bucket]; ok == false { memory.lock.RUnlock() return iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) } @@ -178,9 +173,9 @@ func (memory *memoryDriver) SetBucketMetadata(bucket, acl string) error { memory.lock.RUnlock() memory.lock.Lock() defer memory.lock.Unlock() - storedBucket := memory.bucketMetadata[bucket] - storedBucket.metadata.ACL = drivers.BucketACL(acl) - memory.bucketMetadata[bucket] = storedBucket + storedBucket := memory.storedBuckets[bucket] + storedBucket.bucketMetadata.ACL = drivers.BucketACL(acl) + memory.storedBuckets[bucket] = storedBucket return nil } @@ -214,12 +209,14 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su memory.lock.RUnlock() return iodine.New(drivers.ObjectNameInvalid{Object: key}, nil) } - if _, ok := memory.bucketMetadata[bucket]; ok == false { + if _, ok := memory.storedBuckets[bucket]; ok == false { memory.lock.RUnlock() return iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) } + storedBucket := memory.storedBuckets[bucket] + // get object key objectKey := bucket + "/" + key - if _, ok := memory.objectMetadata[objectKey]; ok == true { + if _, ok := storedBucket.objectMetadata[objectKey]; ok == true { memory.lock.RUnlock() return iodine.New(drivers.ObjectExists{Bucket: bucket, Object: key}, nil) } @@ -228,9 +225,7 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su if contentType == "" { contentType = "application/octet-stream" } - contentType = strings.TrimSpace(contentType) - if strings.TrimSpace(expectedMD5Sum) != "" { expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum)) if err != nil { @@ -241,7 +236,6 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su } var bytesBuffer bytes.Buffer - var newObject = storedObject{} chunks := split.Stream(data, 10*1024*1024) totalLength := 0 @@ -266,11 +260,13 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su // Verify if the written object is equal to what is expected, only if it is requested as such if strings.TrimSpace(expectedMD5Sum) != "" { if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil { + memory.lock.Lock() + defer memory.lock.Unlock() memory.objects.RemoveOldest() return iodine.New(drivers.BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Key: key}, nil) } } - newObject.metadata = drivers.ObjectMetadata{ + newObject := drivers.ObjectMetadata{ Bucket: bucket, Key: key, @@ -279,20 +275,22 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su Md5: md5Sum, Size: int64(totalLength), } - newObject.lastAccessed = time.Now() memory.lock.Lock() - if _, ok := memory.objectMetadata[objectKey]; ok == true { - memory.objects.RemoveOldest() - memory.lock.Unlock() - return iodine.New(drivers.ObjectExists{Bucket: bucket, Object: key}, nil) - } - memory.objectMetadata[objectKey] = newObject + memoryObject := make(map[string]drivers.ObjectMetadata) + if len(memory.storedBuckets[bucket].objectMetadata) == 0 { + storedBucket.objectMetadata = memoryObject + storedBucket.objectMetadata[objectKey] = newObject + } else { + storedBucket.objectMetadata[objectKey] = newObject + } + memory.storedBuckets[bucket] = storedBucket memory.objects.Add(objectKey, bytesBuffer.Bytes()) - memory.totalSize = memory.totalSize + uint64(newObject.metadata.Size) - for memory.totalSize > memory.maxSize { + memory.totalSize = memory.totalSize + uint64(newObject.Size) + if memory.totalSize > memory.maxSize { memory.objects.RemoveOldest() } memory.lock.Unlock() + // free memory if possible for kernel to reclaim debug.FreeOSMemory() return nil } @@ -300,7 +298,7 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su // CreateBucket - create bucket in memory func (memory *memoryDriver) CreateBucket(bucketName, acl string) error { memory.lock.RLock() - if len(memory.bucketMetadata) == totalBuckets { + if len(memory.storedBuckets) == totalBuckets { memory.lock.RLock() return iodine.New(drivers.TooManyBuckets{Bucket: bucketName}, nil) } @@ -312,7 +310,7 @@ func (memory *memoryDriver) CreateBucket(bucketName, acl string) error { memory.lock.RUnlock() return iodine.New(drivers.InvalidACL{ACL: acl}, nil) } - if _, ok := memory.bucketMetadata[bucketName]; ok == true { + if _, ok := memory.storedBuckets[bucketName]; ok == true { memory.lock.RUnlock() return iodine.New(drivers.BucketExists{Bucket: bucketName}, nil) } @@ -323,14 +321,14 @@ func (memory *memoryDriver) CreateBucket(bucketName, acl string) error { acl = "private" } var newBucket = storedBucket{} - newBucket.metadata = drivers.BucketMetadata{} - newBucket.metadata.Name = bucketName - newBucket.metadata.Created = time.Now() - newBucket.metadata.ACL = drivers.BucketACL(acl) + newBucket.objectMetadata = make(map[string]drivers.ObjectMetadata) + newBucket.bucketMetadata = drivers.BucketMetadata{} + newBucket.bucketMetadata.Name = bucketName + newBucket.bucketMetadata.Created = time.Now() + newBucket.bucketMetadata.ACL = drivers.BucketACL(acl) memory.lock.Lock() defer memory.lock.Unlock() - memory.bucketMetadata[bucketName] = newBucket - + memory.storedBuckets[bucketName] = newBucket return nil } @@ -403,12 +401,13 @@ func (memory *memoryDriver) ListObjects(bucket string, resources drivers.BucketR if !drivers.IsValidObject(resources.Prefix) { return nil, drivers.BucketResourcesMetadata{IsTruncated: false}, iodine.New(drivers.ObjectNameInvalid{Object: resources.Prefix}, nil) } - if _, ok := memory.bucketMetadata[bucket]; ok == false { + if _, ok := memory.storedBuckets[bucket]; ok == false { return nil, drivers.BucketResourcesMetadata{IsTruncated: false}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) } var results []drivers.ObjectMetadata var keys []string - for key := range memory.objectMetadata { + storedBucket := memory.storedBuckets[bucket] + for key := range storedBucket.objectMetadata { if strings.HasPrefix(key, bucket+"/") { key = key[len(bucket)+1:] keys, resources = memory.listObjectsInternal(keys, key, resources) @@ -420,10 +419,8 @@ func (memory *memoryDriver) ListObjects(bucket string, resources drivers.BucketR resources.IsTruncated = true return results, resources, nil } - object := memory.objectMetadata[bucket+"/"+key] - if bucket == object.metadata.Bucket { - results = append(results, object.metadata) - } + object := storedBucket.objectMetadata[bucket+"/"+key] + results = append(results, object) } return results, resources, nil } @@ -445,8 +442,8 @@ func (memory *memoryDriver) ListBuckets() ([]drivers.BucketMetadata, error) { memory.lock.RLock() defer memory.lock.RUnlock() var results []drivers.BucketMetadata - for _, bucket := range memory.bucketMetadata { - results = append(results, bucket.metadata) + for _, bucket := range memory.storedBuckets { + results = append(results, bucket.bucketMetadata) } sort.Sort(ByBucketName(results)) return results, nil @@ -463,38 +460,48 @@ func (memory *memoryDriver) GetObjectMetadata(bucket, key, prefix string) (drive if !drivers.IsValidObject(key) || !drivers.IsValidObject(prefix) { return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNameInvalid{Object: key}, nil) } - if _, ok := memory.bucketMetadata[bucket]; ok == false { + if _, ok := memory.storedBuckets[bucket]; ok == false { return drivers.ObjectMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) } + storedBucket := memory.storedBuckets[bucket] objectKey := bucket + "/" + key - if object, ok := memory.objectMetadata[objectKey]; ok == true { - return object.metadata, nil + if object, ok := storedBucket.objectMetadata[objectKey]; ok == true { + return object, nil } return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: key}, nil) } func (memory *memoryDriver) evictObject(key lru.Key, value interface{}) { k := key.(string) - memory.totalSize = memory.totalSize - uint64(memory.objectMetadata[k].metadata.Size) - delete(memory.objectMetadata, k) + // loop through all buckets + for bucket, storedBucket := range memory.storedBuckets { + memory.totalSize = memory.totalSize - uint64(storedBucket.objectMetadata[k].Size) + log.Printf("Evicting: %s of Size: %d", k, storedBucket.objectMetadata[k].Size) + log.Println("TotalSize:", memory.totalSize) + delete(storedBucket.objectMetadata, k) + // remove bucket if no objects found anymore + if len(storedBucket.objectMetadata) == 0 { + delete(memory.storedBuckets, bucket) + } + } + // free memory for kernel to reclaim if possible debug.FreeOSMemory() } -func (memory *memoryDriver) expireObjects() { +func (memory *memoryDriver) expireLRUObjects() { for { if memory.shutdown { return } var sleepDuration time.Duration memory.lock.Lock() - if len(memory.objectMetadata) > 0 { + if memory.objects.Len() > 0 { if k, _, ok := memory.objects.GetOldest(); ok { key := k.(string) - object := memory.objectMetadata[key] - if time.Now().Sub(object.lastAccessed) > memory.expiration { + if time.Now().Sub(memory.lastAccessedObjects[key]) > memory.expiration { memory.objects.RemoveOldest() } else { - sleepDuration = memory.expiration - time.Now().Sub(object.lastAccessed) + sleepDuration = memory.expiration - time.Now().Sub(memory.lastAccessedObjects[key]) } } } else { @@ -508,8 +515,5 @@ func (memory *memoryDriver) expireObjects() { func (memory *memoryDriver) updateAccessTime(key string) { memory.lock.Lock() defer memory.lock.Unlock() - if object, ok := memory.objectMetadata[key]; ok { - object.lastAccessed = time.Now() - memory.objectMetadata[key] = object - } + memory.lastAccessedObjects[key] = time.Now() }