From 6275238a47c308fd10dabadb00723b18afed3eda Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 14 May 2015 19:31:36 -0700 Subject: [PATCH] For multiparts use its own cache avoids deadlocks during object expiration Using same object cache as regular objects for multiparts causes deadlocks during object expiration routine. As a side affect this also fixes the bug of listing multiparts during regular 'listing' of objects. --- pkg/storage/drivers/memory/memory.go | 269 +----------- .../drivers/memory/memory_multipart.go | 389 ++++++++++++++++++ 2 files changed, 396 insertions(+), 262 deletions(-) create mode 100644 pkg/storage/drivers/memory/memory_multipart.go diff --git a/pkg/storage/drivers/memory/memory.go b/pkg/storage/drivers/memory/memory.go index 260bd6648..f8d941d08 100644 --- a/pkg/storage/drivers/memory/memory.go +++ b/pkg/storage/drivers/memory/memory.go @@ -20,7 +20,6 @@ import ( "bufio" "bytes" "crypto/md5" - "crypto/sha512" "encoding/base64" "encoding/hex" "errors" @@ -33,22 +32,22 @@ import ( "sync" "time" - "math/rand" - "github.com/minio/minio/pkg/iodine" "github.com/minio/minio/pkg/storage/drivers" ) // memoryDriver - local variables type memoryDriver struct { - storedBuckets map[string]storedBucket - lock *sync.RWMutex - objects *Cache + storedBuckets map[string]storedBucket + lock *sync.RWMutex + objects *Cache + multiPartObjects *Cache } type storedBucket struct { bucketMetadata drivers.BucketMetadata objectMetadata map[string]drivers.ObjectMetadata + partMetadata map[string]drivers.PartMetadata multiPartSession map[string]multiPartSession } @@ -71,9 +70,11 @@ func Start(maxSize uint64, expiration time.Duration) (chan<- string, <-chan erro memory = new(memoryDriver) memory.storedBuckets = make(map[string]storedBucket) memory.objects = NewCache(maxSize, expiration) + memory.multiPartObjects = NewCache(0, time.Duration(0)) memory.lock = new(sync.RWMutex) memory.objects.OnExpired = memory.expiredObject + memory.multiPartObjects.OnExpired = memory.expiredPart // set up memory expiration memory.objects.ExpireObjects(time.Second * 5) @@ -513,259 +514,3 @@ func (memory *memoryDriver) expiredObject(a ...interface{}) { } debug.FreeOSMemory() } - -func (memory *memoryDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) { - memory.lock.RLock() - if !drivers.IsValidBucket(bucket) { - memory.lock.RUnlock() - return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) - } - if !drivers.IsValidObjectName(key) { - memory.lock.RUnlock() - return "", iodine.New(drivers.ObjectNameInvalid{Object: key}, nil) - } - if _, ok := memory.storedBuckets[bucket]; ok == false { - memory.lock.RUnlock() - return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) - } - storedBucket := memory.storedBuckets[bucket] - objectKey := bucket + "/" + key - if _, ok := storedBucket.objectMetadata[objectKey]; ok == true { - memory.lock.RUnlock() - return "", iodine.New(drivers.ObjectExists{Bucket: bucket, Object: key}, nil) - } - memory.lock.RUnlock() - - memory.lock.Lock() - id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String()) - uploadIDSum := sha512.Sum512(id) - uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47] - - memory.storedBuckets[bucket].multiPartSession[key] = multiPartSession{ - uploadID: uploadID, - initiated: time.Now(), - totalParts: 0, - } - memory.lock.Unlock() - - return uploadID, nil -} - -func (memory *memoryDriver) AbortMultipartUpload(bucket, key, uploadID string) error { - memory.lock.RLock() - storedBucket := memory.storedBuckets[bucket] - if storedBucket.multiPartSession[key].uploadID != uploadID { - memory.lock.RUnlock() - return iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) - } - memory.lock.RUnlock() - - memory.cleanupMultiparts(bucket, key, uploadID) - memory.cleanupMultipartSession(bucket, key, uploadID) - return nil -} - -func getMultipartKey(key string, uploadID string, partNumber int) string { - return key + "?uploadId=" + uploadID + "&partNumber=" + strconv.Itoa(partNumber) -} - -func (memory *memoryDriver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { - // Verify upload id - memory.lock.RLock() - storedBucket := memory.storedBuckets[bucket] - if storedBucket.multiPartSession[key].uploadID != uploadID { - memory.lock.RUnlock() - return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) - } - memory.lock.RUnlock() - - etag, err := memory.CreateObject(bucket, getMultipartKey(key, uploadID, partID), "", expectedMD5Sum, size, data) - if err != nil { - return "", iodine.New(err, nil) - } - // once successful, update totalParts - multiPartSession := storedBucket.multiPartSession[key] - multiPartSession.totalParts++ - storedBucket.multiPartSession[key] = multiPartSession - - return etag, nil -} - -func (memory *memoryDriver) cleanupMultipartSession(bucket, key, uploadID string) { - memory.lock.Lock() - defer memory.lock.Unlock() - delete(memory.storedBuckets[bucket].multiPartSession, key) -} - -func (memory *memoryDriver) cleanupMultiparts(bucket, key, uploadID string) { - memory.lock.Lock() - defer memory.lock.Unlock() - for i := 1; i <= memory.storedBuckets[bucket].multiPartSession[key].totalParts; i++ { - objectKey := bucket + "/" + getMultipartKey(key, uploadID, i) - memory.objects.Delete(objectKey) - } -} - -func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) { - if !drivers.IsValidBucket(bucket) { - return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) - } - if !drivers.IsValidObjectName(key) { - return "", iodine.New(drivers.ObjectNameInvalid{Object: key}, nil) - } - // Verify upload id - memory.lock.RLock() - if _, ok := memory.storedBuckets[bucket]; ok == false { - memory.lock.RUnlock() - return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) - } - storedBucket := memory.storedBuckets[bucket] - if storedBucket.multiPartSession[key].uploadID != uploadID { - memory.lock.RUnlock() - return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) - } - memory.lock.RUnlock() - - memory.lock.Lock() - var size int64 - for i := range parts { - object, ok := storedBucket.objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)] - if !ok { - memory.lock.Unlock() - return "", iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) - } - size += object.Size - } - - var fullObject bytes.Buffer - for i := 1; i <= len(parts); i++ { - recvMD5 := parts[i] - object, ok := memory.objects.Get(bucket + "/" + getMultipartKey(key, uploadID, i)) - if ok == false { - memory.lock.Unlock() - return "", iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) - } - obj := object.([]byte) - calcMD5Bytes := md5.Sum(obj) - // complete multi part request header md5sum per part is hex encoded - recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\"")) - if err != nil { - return "", iodine.New(drivers.InvalidDigest{Md5: recvMD5}, nil) - } - if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) { - return "", iodine.New(drivers.BadDigest{Md5: recvMD5, Bucket: bucket, Key: getMultipartKey(key, uploadID, i)}, nil) - } - _, err = io.Copy(&fullObject, bytes.NewBuffer(obj)) - if err != nil { - return "", iodine.New(err, nil) - } - } - memory.lock.Unlock() - - md5sumSlice := md5.Sum(fullObject.Bytes()) - md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:]) - etag, err := memory.CreateObject(bucket, key, "", md5sum, size, &fullObject) - if err != nil { - // No need to call internal cleanup functions here, caller will call AbortMultipartUpload() - // which would in-turn cleanup properly in accordance with S3 Spec - return "", iodine.New(err, nil) - } - memory.cleanupMultiparts(bucket, key, uploadID) - memory.cleanupMultipartSession(bucket, key, uploadID) - return etag, nil -} - -// byKey is a sortable interface for UploadMetadata slice -type byKey []*drivers.UploadMetadata - -func (a byKey) Len() int { return len(a) } -func (a byKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a byKey) Less(i, j int) bool { return a[i].Key < a[j].Key } - -func (memory *memoryDriver) ListMultipartUploads(bucket string, resources drivers.BucketMultipartResourcesMetadata) (drivers.BucketMultipartResourcesMetadata, error) { - // TODO handle delimiter, prefix, uploadIDMarker - memory.lock.RLock() - defer memory.lock.RUnlock() - if _, ok := memory.storedBuckets[bucket]; ok == false { - return drivers.BucketMultipartResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) - } - storedBucket := memory.storedBuckets[bucket] - var uploads []*drivers.UploadMetadata - - for key, session := range storedBucket.multiPartSession { - if len(uploads) > resources.MaxUploads { - sort.Sort(byKey(uploads)) - resources.Upload = uploads - resources.NextKeyMarker = key - resources.NextUploadIDMarker = session.uploadID - resources.IsTruncated = true - return resources, nil - } - if key > resources.KeyMarker { - upload := new(drivers.UploadMetadata) - upload.Key = key - upload.UploadID = session.uploadID - upload.Initiated = session.initiated - uploads = append(uploads, upload) - } - } - sort.Sort(byKey(uploads)) - resources.Upload = uploads - return resources, nil -} - -// partNumber is a sortable interface for Part slice -type partNumber []*drivers.PartMetadata - -func (a partNumber) Len() int { return len(a) } -func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber } - -func (memory *memoryDriver) ListObjectParts(bucket, key string, resources drivers.ObjectResourcesMetadata) (drivers.ObjectResourcesMetadata, error) { - // Verify upload id - memory.lock.RLock() - defer memory.lock.RUnlock() - if _, ok := memory.storedBuckets[bucket]; ok == false { - return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) - } - storedBucket := memory.storedBuckets[bucket] - if _, ok := storedBucket.multiPartSession[key]; ok == false { - return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: key}, nil) - } - if storedBucket.multiPartSession[key].uploadID != resources.UploadID { - return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.InvalidUploadID{UploadID: resources.UploadID}, nil) - } - objectResourcesMetadata := resources - objectResourcesMetadata.Bucket = bucket - objectResourcesMetadata.Key = key - var parts []*drivers.PartMetadata - var startPartNumber int - switch { - case objectResourcesMetadata.PartNumberMarker == 0: - startPartNumber = 1 - default: - startPartNumber = objectResourcesMetadata.PartNumberMarker - } - for i := startPartNumber; i <= storedBucket.multiPartSession[key].totalParts; i++ { - if len(parts) > objectResourcesMetadata.MaxParts { - sort.Sort(partNumber(parts)) - objectResourcesMetadata.IsTruncated = true - objectResourcesMetadata.Part = parts - objectResourcesMetadata.NextPartNumberMarker = i - return objectResourcesMetadata, nil - } - object, ok := storedBucket.objectMetadata[bucket+"/"+getMultipartKey(key, resources.UploadID, i)] - if !ok { - return drivers.ObjectResourcesMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) - } - partMetadata := &drivers.PartMetadata{} - partMetadata.Size = object.Size - partMetadata.LastModified = object.Created - partMetadata.ETag = object.Md5 - partMetadata.PartNumber = i - parts = append(parts, partMetadata) - } - sort.Sort(partNumber(parts)) - objectResourcesMetadata.Part = parts - return objectResourcesMetadata, nil -} diff --git a/pkg/storage/drivers/memory/memory_multipart.go b/pkg/storage/drivers/memory/memory_multipart.go new file mode 100644 index 000000000..cb24aea49 --- /dev/null +++ b/pkg/storage/drivers/memory/memory_multipart.go @@ -0,0 +1,389 @@ +/* + * Minimalist Object Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package memory + +import ( + "bytes" + "crypto/md5" + "crypto/sha512" + "encoding/base64" + "encoding/hex" + "errors" + "io" + "log" + "math/rand" + "runtime/debug" + "sort" + "strconv" + "strings" + "time" + + "github.com/minio/minio/pkg/iodine" + "github.com/minio/minio/pkg/storage/drivers" +) + +func (memory *memoryDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) { + memory.lock.RLock() + if !drivers.IsValidBucket(bucket) { + memory.lock.RUnlock() + return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) + } + if !drivers.IsValidObjectName(key) { + memory.lock.RUnlock() + return "", iodine.New(drivers.ObjectNameInvalid{Object: key}, nil) + } + if _, ok := memory.storedBuckets[bucket]; ok == false { + memory.lock.RUnlock() + return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + } + storedBucket := memory.storedBuckets[bucket] + objectKey := bucket + "/" + key + if _, ok := storedBucket.objectMetadata[objectKey]; ok == true { + memory.lock.RUnlock() + return "", iodine.New(drivers.ObjectExists{Bucket: bucket, Object: key}, nil) + } + memory.lock.RUnlock() + + memory.lock.Lock() + id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String()) + uploadIDSum := sha512.Sum512(id) + uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47] + + memory.storedBuckets[bucket].multiPartSession[key] = multiPartSession{ + uploadID: uploadID, + initiated: time.Now(), + totalParts: 0, + } + memory.lock.Unlock() + + return uploadID, nil +} + +func (memory *memoryDriver) AbortMultipartUpload(bucket, key, uploadID string) error { + memory.lock.RLock() + storedBucket := memory.storedBuckets[bucket] + if storedBucket.multiPartSession[key].uploadID != uploadID { + memory.lock.RUnlock() + return iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) + } + memory.lock.RUnlock() + + memory.cleanupMultiparts(bucket, key, uploadID) + memory.cleanupMultipartSession(bucket, key, uploadID) + return nil +} + +func getMultipartKey(key string, uploadID string, partNumber int) string { + return key + "?uploadId=" + uploadID + "&partNumber=" + strconv.Itoa(partNumber) +} + +func (memory *memoryDriver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { + // Verify upload id + memory.lock.RLock() + storedBucket := memory.storedBuckets[bucket] + if storedBucket.multiPartSession[key].uploadID != uploadID { + memory.lock.RUnlock() + return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) + } + memory.lock.RUnlock() + + etag, err := memory.createObjectPart(bucket, key, uploadID, partID, "", expectedMD5Sum, size, data) + if err != nil { + return "", iodine.New(err, nil) + } + // once successful, update totalParts + multiPartSession := storedBucket.multiPartSession[key] + multiPartSession.totalParts++ + storedBucket.multiPartSession[key] = multiPartSession + // free + debug.FreeOSMemory() + return etag, nil +} + +// createObject - PUT object to memory buffer +func (memory *memoryDriver) createObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { + memory.lock.RLock() + if !drivers.IsValidBucket(bucket) { + memory.lock.RUnlock() + return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) + } + if !drivers.IsValidObjectName(key) { + memory.lock.RUnlock() + return "", iodine.New(drivers.ObjectNameInvalid{Object: key}, nil) + } + if _, ok := memory.storedBuckets[bucket]; ok == false { + memory.lock.RUnlock() + return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + } + storedBucket := memory.storedBuckets[bucket] + // get object key + partKey := bucket + "/" + getMultipartKey(key, uploadID, partID) + if _, ok := storedBucket.partMetadata[partKey]; ok == true { + memory.lock.RUnlock() + return storedBucket.partMetadata[partKey].ETag, nil + } + memory.lock.RUnlock() + + if contentType == "" { + contentType = "application/octet-stream" + } + contentType = strings.TrimSpace(contentType) + if strings.TrimSpace(expectedMD5Sum) != "" { + expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum)) + if err != nil { + // pro-actively close the connection + return "", iodine.New(drivers.InvalidDigest{Md5: expectedMD5Sum}, nil) + } + expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) + } + + // calculate md5 + hash := md5.New() + var readBytes []byte + + var err error + var length int + for err == nil { + byteBuffer := make([]byte, 1024*1024) + length, err = data.Read(byteBuffer) + // While hash.Write() wouldn't mind a Nil byteBuffer + // It is necessary for us to verify this and break + if length == 0 { + break + } + hash.Write(byteBuffer[0:length]) + readBytes = append(readBytes, byteBuffer[0:length]...) + } + if err != io.EOF { + return "", iodine.New(err, nil) + } + go debug.FreeOSMemory() + md5SumBytes := hash.Sum(nil) + totalLength := int64(len(readBytes)) + + memory.lock.Lock() + memory.multiPartObjects.Set(partKey, readBytes) + memory.lock.Unlock() + // setting up for de-allocation + readBytes = nil + + md5Sum := hex.EncodeToString(md5SumBytes) + // Verify if the written object is equal to what is expected, only if it is requested as such + if strings.TrimSpace(expectedMD5Sum) != "" { + if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil { + return "", iodine.New(drivers.BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Key: key}, nil) + } + } + newPart := drivers.PartMetadata{ + PartNumber: partID, + LastModified: time.Now().UTC(), + ETag: md5Sum, + Size: totalLength, + } + + memory.lock.Lock() + memoryPart := make(map[string]drivers.PartMetadata) + switch { + case len(storedBucket.partMetadata) == 0: + storedBucket.partMetadata = memoryPart + storedBucket.partMetadata[partKey] = newPart + default: + storedBucket.partMetadata[partKey] = newPart + } + memory.storedBuckets[bucket] = storedBucket + memory.lock.Unlock() + + return md5Sum, nil +} + +func (memory *memoryDriver) cleanupMultipartSession(bucket, key, uploadID string) { + memory.lock.Lock() + defer memory.lock.Unlock() + delete(memory.storedBuckets[bucket].multiPartSession, key) +} + +func (memory *memoryDriver) cleanupMultiparts(bucket, key, uploadID string) { + memory.lock.Lock() + defer memory.lock.Unlock() + for i := 1; i <= memory.storedBuckets[bucket].multiPartSession[key].totalParts; i++ { + objectKey := bucket + "/" + getMultipartKey(key, uploadID, i) + memory.multiPartObjects.Delete(objectKey) + } +} + +func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) { + if !drivers.IsValidBucket(bucket) { + return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) + } + if !drivers.IsValidObjectName(key) { + return "", iodine.New(drivers.ObjectNameInvalid{Object: key}, nil) + } + // Verify upload id + memory.lock.RLock() + if _, ok := memory.storedBuckets[bucket]; ok == false { + memory.lock.RUnlock() + return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + } + storedBucket := memory.storedBuckets[bucket] + if storedBucket.multiPartSession[key].uploadID != uploadID { + memory.lock.RUnlock() + return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) + } + memory.lock.RUnlock() + + memory.lock.Lock() + var size int64 + var fullObject bytes.Buffer + for i := 1; i <= len(parts); i++ { + recvMD5 := parts[i] + object, ok := memory.multiPartObjects.Get(bucket + "/" + getMultipartKey(key, uploadID, i)) + if ok == false { + memory.lock.Unlock() + return "", iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) + } + obj := object.([]byte) + size += int64(len(obj)) + calcMD5Bytes := md5.Sum(obj) + // complete multi part request header md5sum per part is hex encoded + recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\"")) + if err != nil { + return "", iodine.New(drivers.InvalidDigest{Md5: recvMD5}, nil) + } + if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) { + return "", iodine.New(drivers.BadDigest{Md5: recvMD5, Bucket: bucket, Key: getMultipartKey(key, uploadID, i)}, nil) + } + _, err = io.Copy(&fullObject, bytes.NewBuffer(obj)) + if err != nil { + return "", iodine.New(err, nil) + } + } + memory.lock.Unlock() + + md5sumSlice := md5.Sum(fullObject.Bytes()) + md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:]) + etag, err := memory.CreateObject(bucket, key, "", md5sum, size, &fullObject) + if err != nil { + // No need to call internal cleanup functions here, caller will call AbortMultipartUpload() + // which would in-turn cleanup properly in accordance with S3 Spec + return "", iodine.New(err, nil) + } + memory.cleanupMultiparts(bucket, key, uploadID) + memory.cleanupMultipartSession(bucket, key, uploadID) + return etag, nil +} + +// byKey is a sortable interface for UploadMetadata slice +type byKey []*drivers.UploadMetadata + +func (a byKey) Len() int { return len(a) } +func (a byKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byKey) Less(i, j int) bool { return a[i].Key < a[j].Key } + +func (memory *memoryDriver) ListMultipartUploads(bucket string, resources drivers.BucketMultipartResourcesMetadata) (drivers.BucketMultipartResourcesMetadata, error) { + // TODO handle delimiter, prefix, uploadIDMarker + memory.lock.RLock() + defer memory.lock.RUnlock() + if _, ok := memory.storedBuckets[bucket]; ok == false { + return drivers.BucketMultipartResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + } + storedBucket := memory.storedBuckets[bucket] + var uploads []*drivers.UploadMetadata + + for key, session := range storedBucket.multiPartSession { + if len(uploads) > resources.MaxUploads { + sort.Sort(byKey(uploads)) + resources.Upload = uploads + resources.NextKeyMarker = key + resources.NextUploadIDMarker = session.uploadID + resources.IsTruncated = true + return resources, nil + } + if key > resources.KeyMarker { + upload := new(drivers.UploadMetadata) + upload.Key = key + upload.UploadID = session.uploadID + upload.Initiated = session.initiated + uploads = append(uploads, upload) + } + } + sort.Sort(byKey(uploads)) + resources.Upload = uploads + return resources, nil +} + +// partNumber is a sortable interface for Part slice +type partNumber []*drivers.PartMetadata + +func (a partNumber) Len() int { return len(a) } +func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber } + +func (memory *memoryDriver) ListObjectParts(bucket, key string, resources drivers.ObjectResourcesMetadata) (drivers.ObjectResourcesMetadata, error) { + // Verify upload id + memory.lock.RLock() + defer memory.lock.RUnlock() + if _, ok := memory.storedBuckets[bucket]; ok == false { + return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + } + storedBucket := memory.storedBuckets[bucket] + if _, ok := storedBucket.multiPartSession[key]; ok == false { + return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: key}, nil) + } + if storedBucket.multiPartSession[key].uploadID != resources.UploadID { + return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.InvalidUploadID{UploadID: resources.UploadID}, nil) + } + objectResourcesMetadata := resources + objectResourcesMetadata.Bucket = bucket + objectResourcesMetadata.Key = key + var parts []*drivers.PartMetadata + var startPartNumber int + switch { + case objectResourcesMetadata.PartNumberMarker == 0: + startPartNumber = 1 + default: + startPartNumber = objectResourcesMetadata.PartNumberMarker + } + for i := startPartNumber; i <= storedBucket.multiPartSession[key].totalParts; i++ { + if len(parts) > objectResourcesMetadata.MaxParts { + sort.Sort(partNumber(parts)) + objectResourcesMetadata.IsTruncated = true + objectResourcesMetadata.Part = parts + objectResourcesMetadata.NextPartNumberMarker = i + return objectResourcesMetadata, nil + } + part, ok := storedBucket.partMetadata[bucket+"/"+getMultipartKey(key, resources.UploadID, i)] + if !ok { + return drivers.ObjectResourcesMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) + } + parts = append(parts, &part) + } + sort.Sort(partNumber(parts)) + objectResourcesMetadata.Part = parts + return objectResourcesMetadata, nil +} + +func (memory *memoryDriver) expiredPart(a ...interface{}) { + cacheStats := memory.multiPartObjects.Stats() + log.Printf("CurrentSize: %d, CurrentItems: %d, TotalExpirations: %d", + cacheStats.Bytes, cacheStats.Items, cacheStats.Expired) + key := a[0].(string) + // loop through all buckets + for _, storedBucket := range memory.storedBuckets { + delete(storedBucket.partMetadata, key) + } + debug.FreeOSMemory() +}