|
|
|
@ -34,7 +34,7 @@ import ( |
|
|
|
|
"github.com/minio/minio/pkg/iodine" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// NewMultipartUpload -
|
|
|
|
|
// NewMultipartUpload - initiate a new multipart session
|
|
|
|
|
func (donut API) NewMultipartUpload(bucket, key, contentType string) (string, error) { |
|
|
|
|
donut.lock.Lock() |
|
|
|
|
defer donut.lock.Unlock() |
|
|
|
@ -65,7 +65,7 @@ func (donut API) NewMultipartUpload(bucket, key, contentType string) (string, er |
|
|
|
|
return uploadID, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// AbortMultipartUpload -
|
|
|
|
|
// AbortMultipartUpload - abort an incomplete multipart session
|
|
|
|
|
func (donut API) AbortMultipartUpload(bucket, key, uploadID string) error { |
|
|
|
|
donut.lock.Lock() |
|
|
|
|
defer donut.lock.Unlock() |
|
|
|
@ -83,11 +83,7 @@ func (donut API) AbortMultipartUpload(bucket, key, uploadID string) error { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func getMultipartKey(key string, uploadID string, partNumber int) string { |
|
|
|
|
return key + "?uploadId=" + uploadID + "&partNumber=" + strconv.Itoa(partNumber) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// CreateObjectPart -
|
|
|
|
|
// CreateObjectPart - create a part in a multipart session
|
|
|
|
|
func (donut API) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { |
|
|
|
|
if !IsValidBucket(bucket) { |
|
|
|
|
return "", iodine.New(BucketNameInvalid{Bucket: bucket}, nil) |
|
|
|
@ -98,22 +94,15 @@ func (donut API) CreateObjectPart(bucket, key, uploadID string, partID int, cont |
|
|
|
|
if !donut.storedBuckets.Exists(bucket) { |
|
|
|
|
return "", iodine.New(BucketNotFound{Bucket: bucket}, nil) |
|
|
|
|
} |
|
|
|
|
var etag string |
|
|
|
|
var err error |
|
|
|
|
{ |
|
|
|
|
donut.lock.Lock() |
|
|
|
|
etag, err = donut.createObjectPart(bucket, key, uploadID, partID, "", expectedMD5Sum, size, data) |
|
|
|
|
if err != nil { |
|
|
|
|
return "", iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
donut.lock.Unlock() |
|
|
|
|
} |
|
|
|
|
donut.lock.Lock() |
|
|
|
|
etag, err := donut.createObjectPart(bucket, key, uploadID, partID, "", expectedMD5Sum, size, data) |
|
|
|
|
donut.lock.Unlock() |
|
|
|
|
// possible free
|
|
|
|
|
debug.FreeOSMemory() |
|
|
|
|
return etag, nil |
|
|
|
|
return etag, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// createObject - PUT object to cache buffer
|
|
|
|
|
// createObject - internal wrapper function called by CreateObjectPart
|
|
|
|
|
func (donut API) createObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { |
|
|
|
|
storedBucket := donut.storedBuckets.Get(bucket).(storedBucket) |
|
|
|
|
// Verify upload id
|
|
|
|
@ -122,9 +111,8 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// get object key
|
|
|
|
|
partKey := bucket + "/" + getMultipartKey(key, uploadID, partID) |
|
|
|
|
if _, ok := storedBucket.partMetadata[partKey]; ok == true { |
|
|
|
|
return storedBucket.partMetadata[partKey].ETag, nil |
|
|
|
|
if _, ok := storedBucket.partMetadata[partID]; ok { |
|
|
|
|
return storedBucket.partMetadata[partID].ETag, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if contentType == "" { |
|
|
|
@ -164,7 +152,7 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont |
|
|
|
|
md5SumBytes := hash.Sum(nil) |
|
|
|
|
totalLength := int64(len(readBytes)) |
|
|
|
|
|
|
|
|
|
donut.multiPartObjects.Set(partKey, readBytes) |
|
|
|
|
donut.multiPartObjects.Set(partID, readBytes) |
|
|
|
|
// setting up for de-allocation
|
|
|
|
|
readBytes = nil |
|
|
|
|
|
|
|
|
@ -182,7 +170,7 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont |
|
|
|
|
Size: totalLength, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
storedBucket.partMetadata[partKey] = newPart |
|
|
|
|
storedBucket.partMetadata[partID] = newPart |
|
|
|
|
multiPartSession := storedBucket.multiPartSession[key] |
|
|
|
|
multiPartSession.totalParts++ |
|
|
|
|
storedBucket.multiPartSession[key] = multiPartSession |
|
|
|
@ -190,38 +178,42 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont |
|
|
|
|
return md5Sum, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// cleanupMultipartSession invoked during an abort or complete multipart session to cleanup session from memory
|
|
|
|
|
func (donut API) cleanupMultipartSession(bucket, key, uploadID string) { |
|
|
|
|
storedBucket := donut.storedBuckets.Get(bucket).(storedBucket) |
|
|
|
|
for i := 1; i <= storedBucket.multiPartSession[key].totalParts; i++ { |
|
|
|
|
objectKey := bucket + "/" + getMultipartKey(key, uploadID, i) |
|
|
|
|
donut.multiPartObjects.Delete(objectKey) |
|
|
|
|
donut.multiPartObjects.Delete(i) |
|
|
|
|
} |
|
|
|
|
delete(storedBucket.multiPartSession, key) |
|
|
|
|
donut.storedBuckets.Set(bucket, storedBucket) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// CompleteMultipartUpload -
|
|
|
|
|
// CompleteMultipartUpload - complete a multipart upload and persist the data
|
|
|
|
|
func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (ObjectMetadata, error) { |
|
|
|
|
donut.lock.Lock() |
|
|
|
|
if !IsValidBucket(bucket) { |
|
|
|
|
donut.lock.Unlock() |
|
|
|
|
return ObjectMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil) |
|
|
|
|
} |
|
|
|
|
if !IsValidObjectName(key) { |
|
|
|
|
donut.lock.Unlock() |
|
|
|
|
return ObjectMetadata{}, iodine.New(ObjectNameInvalid{Object: key}, nil) |
|
|
|
|
} |
|
|
|
|
// Verify upload id
|
|
|
|
|
if !donut.storedBuckets.Exists(bucket) { |
|
|
|
|
donut.lock.Unlock() |
|
|
|
|
return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil) |
|
|
|
|
} |
|
|
|
|
storedBucket := donut.storedBuckets.Get(bucket).(storedBucket) |
|
|
|
|
// Verify upload id
|
|
|
|
|
if storedBucket.multiPartSession[key].uploadID != uploadID { |
|
|
|
|
donut.lock.Unlock() |
|
|
|
|
return ObjectMetadata{}, iodine.New(InvalidUploadID{UploadID: uploadID}, nil) |
|
|
|
|
} |
|
|
|
|
var size int64 |
|
|
|
|
var fullObject bytes.Buffer |
|
|
|
|
for i := 1; i <= len(parts); i++ { |
|
|
|
|
recvMD5 := parts[i] |
|
|
|
|
object, ok := donut.multiPartObjects.Get(bucket + "/" + getMultipartKey(key, uploadID, i)) |
|
|
|
|
object, ok := donut.multiPartObjects.Get(i) |
|
|
|
|
if ok == false { |
|
|
|
|
donut.lock.Unlock() |
|
|
|
|
return ObjectMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) |
|
|
|
@ -231,13 +223,16 @@ func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, parts map |
|
|
|
|
// complete multi part request header md5sum per part is hex encoded
|
|
|
|
|
recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\"")) |
|
|
|
|
if err != nil { |
|
|
|
|
donut.lock.Unlock() |
|
|
|
|
return ObjectMetadata{}, iodine.New(InvalidDigest{Md5: recvMD5}, nil) |
|
|
|
|
} |
|
|
|
|
if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) { |
|
|
|
|
donut.lock.Unlock() |
|
|
|
|
return ObjectMetadata{}, iodine.New(BadDigest{}, nil) |
|
|
|
|
} |
|
|
|
|
_, err = io.Copy(&fullObject, bytes.NewBuffer(object)) |
|
|
|
|
if err != nil { |
|
|
|
|
donut.lock.Unlock() |
|
|
|
|
return ObjectMetadata{}, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
object = nil |
|
|
|
@ -269,7 +264,7 @@ func (a byKey) Len() int { return len(a) } |
|
|
|
|
func (a byKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] } |
|
|
|
|
func (a byKey) Less(i, j int) bool { return a[i].Key < a[j].Key } |
|
|
|
|
|
|
|
|
|
// ListMultipartUploads -
|
|
|
|
|
// ListMultipartUploads - list incomplete multipart sessions for a given bucket
|
|
|
|
|
func (donut API) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, error) { |
|
|
|
|
// TODO handle delimiter
|
|
|
|
|
donut.lock.Lock() |
|
|
|
@ -331,7 +326,7 @@ func (a partNumber) Len() int { return len(a) } |
|
|
|
|
func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] } |
|
|
|
|
func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber } |
|
|
|
|
|
|
|
|
|
// ListObjectParts -
|
|
|
|
|
// ListObjectParts - list parts from incomplete multipart session for a given object
|
|
|
|
|
func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, error) { |
|
|
|
|
// Verify upload id
|
|
|
|
|
donut.lock.Lock() |
|
|
|
@ -365,7 +360,7 @@ func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMe |
|
|
|
|
objectResourcesMetadata.NextPartNumberMarker = i |
|
|
|
|
return objectResourcesMetadata, nil |
|
|
|
|
} |
|
|
|
|
part, ok := storedBucket.partMetadata[bucket+"/"+getMultipartKey(key, resources.UploadID, i)] |
|
|
|
|
part, ok := storedBucket.partMetadata[i] |
|
|
|
|
if !ok { |
|
|
|
|
return ObjectResourcesMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) |
|
|
|
|
} |
|
|
|
@ -376,8 +371,9 @@ func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMe |
|
|
|
|
return objectResourcesMetadata, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// evictedPart - call back function called by caching module during individual cache evictions
|
|
|
|
|
func (donut API) evictedPart(a ...interface{}) { |
|
|
|
|
key := a[0].(string) |
|
|
|
|
key := a[0].(int) |
|
|
|
|
// loop through all buckets
|
|
|
|
|
buckets := donut.storedBuckets.GetAll() |
|
|
|
|
for bucketName, bucket := range buckets { |
|
|
|
|