From 2ea10c798b3f57850622dc4e91b50358906a4134 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 8 May 2015 02:02:51 -0700 Subject: [PATCH 1/2] Implement proper errors for Multipart --- pkg/api/api_definitions.go | 7 ++++ pkg/api/api_object_handlers.go | 13 ++++++- pkg/api/errors.go | 8 +++- pkg/storage/drivers/errors.go | 11 ++++++ pkg/storage/drivers/memory/memory.go | 58 ++++++++++++++++++++++++---- 5 files changed, 88 insertions(+), 9 deletions(-) diff --git a/pkg/api/api_definitions.go b/pkg/api/api_definitions.go index f4ff4ba2a..8398eb1e6 100644 --- a/pkg/api/api_definitions.go +++ b/pkg/api/api_definitions.go @@ -104,6 +104,13 @@ type InitiateMultipartUploadResult struct { UploadID string `xml:"UploadId"` } +// completedParts is a sortable interface for Part slice +type completedParts []Part + +func (a completedParts) Len() int { return len(a) } +func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a completedParts) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber } + // CompleteMultipartUpload container for completing multipart upload type CompleteMultipartUpload struct { Part []Part diff --git a/pkg/api/api_object_handlers.go b/pkg/api/api_object_handlers.go index 0d161a08c..85a7f0bfe 100644 --- a/pkg/api/api_object_handlers.go +++ b/pkg/api/api_object_handlers.go @@ -18,6 +18,7 @@ package api import ( "net/http" + "sort" "strconv" "encoding/xml" @@ -311,6 +312,10 @@ func (server *minioAPI) putObjectPartHandler(w http.ResponseWriter, req *http.Re writeSuccessResponse(w, acceptsContentType) } + case drivers.InvalidUploadID: + { + writeErrorResponse(w, req, NoSuchUpload, acceptsContentType, req.URL.Path) + } case drivers.ObjectExists: { writeErrorResponse(w, req, MethodNotAllowed, acceptsContentType, req.URL.Path) @@ -355,6 +360,10 @@ func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, re writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) return } + if !sort.IsSorted(completedParts(parts.Part)) { + writeErrorResponse(w, req, InvalidPartOrder, acceptsContentType, req.URL.Path) + return + } partMap := make(map[int]string) @@ -366,6 +375,7 @@ func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, re for _, part := range parts.Part { partMap[part.PartNumber] = part.ETag } + etag, err := server.driver.CompleteMultipartUpload(bucket, object, uploadID, partMap) switch err := iodine.ToError(err).(type) { case nil: @@ -378,8 +388,9 @@ func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, re w.WriteHeader(http.StatusOK) // write body w.Write(encodedSuccessResponse) + case drivers.InvalidUploadID: + writeErrorResponse(w, req, NoSuchUpload, acceptsContentType, req.URL.Path) default: - // TODO handle all other errors, properly log.Println(err) writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) } diff --git a/pkg/api/errors.go b/pkg/api/errors.go index 6a9b2e924..29a179451 100644 --- a/pkg/api/errors.go +++ b/pkg/api/errors.go @@ -64,11 +64,12 @@ const ( TooManyBuckets MethodNotAllowed InvalidPart + InvalidPartOrder ) // Error codes, non exhaustive list - standard HTTP errors const ( - NotAcceptable = iota + 24 + NotAcceptable = iota + 25 ) // Error code to Error structure map @@ -193,6 +194,11 @@ var errorCodeResponse = map[int]Error{ Description: "One or more of the specified parts could not be found", HTTPStatusCode: http.StatusBadRequest, }, + InvalidPartOrder: { + Code: "InvalidPartOrder", + Description: "The list of parts was not in ascending order. The parts list must be specified in order by part number.", + HTTPStatusCode: http.StatusBadRequest, + }, } // errorCodeError provides errorCode to Error. It returns empty if the code provided is unknown diff --git a/pkg/storage/drivers/errors.go b/pkg/storage/drivers/errors.go index b3dfa9925..935596d13 100644 --- a/pkg/storage/drivers/errors.go +++ b/pkg/storage/drivers/errors.go @@ -206,3 +206,14 @@ type InvalidRange struct { func (e InvalidRange) Error() string { return fmt.Sprintf("Invalid range start:%d length:%d", e.Start, e.Length) } + +/// Multipart related errors + +// InvalidUploadID invalid upload id +type InvalidUploadID struct { + UploadID string +} + +func (e InvalidUploadID) Error() string { + return "Invalid upload id " + e.UploadID +} diff --git a/pkg/storage/drivers/memory/memory.go b/pkg/storage/drivers/memory/memory.go index ce0a68850..4743cb933 100644 --- a/pkg/storage/drivers/memory/memory.go +++ b/pkg/storage/drivers/memory/memory.go @@ -515,14 +515,38 @@ func (memory *memoryDriver) evictObject(a ...interface{}) { } func (memory *memoryDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) { - // TODO verify object doesn't exist + memory.lock.RLock() + if !drivers.IsValidBucket(bucket) { + memory.lock.RUnlock() + return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) + } + if !drivers.IsValidObjectName(key) { + memory.lock.RUnlock() + return "", iodine.New(drivers.ObjectNameInvalid{Object: key}, nil) + } + if _, ok := memory.storedBuckets[bucket]; ok == false { + memory.lock.RUnlock() + return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + } + storedBucket := memory.storedBuckets[bucket] + objectKey := bucket + "/" + key + if _, ok := storedBucket.objectMetadata[objectKey]; ok == true { + memory.lock.RUnlock() + return "", iodine.New(drivers.ObjectExists{Bucket: bucket, Object: key}, nil) + } + memory.lock.RUnlock() + id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String()) uploadIDSum := sha512.Sum512(id) uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:]) md5sumBytes := md5.Sum([]byte(uploadID)) md5sum := hex.EncodeToString(md5sumBytes[:]) - memory.CreateObject(bucket, key+"?uploadId="+uploadID, contentType, md5sum, int64(len(uploadID)), bytes.NewBufferString(uploadID)) - return uploadID, nil + + // Create UploadID session, this is a temporary work around to instantiate a session. + // It would not be valid in future, since we need to work out proper sessions so that + // we can cleanly abort session and propagate failures. + _, err := memory.CreateObject(bucket, key+"?uploadId="+uploadID, contentType, md5sum, int64(len(uploadID)), bytes.NewBufferString(uploadID)) + return uploadID, iodine.New(err, nil) } func getMultipartKey(key string, uploadID string, partNumber int) string { @@ -530,14 +554,35 @@ func getMultipartKey(key string, uploadID string, partNumber int) string { } func (memory *memoryDriver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { - // TODO verify upload id exists + // Verify upload id + _, ok := memory.objects.Get(key + "?uploadId=" + uploadID) + if !ok { + return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) + } return memory.CreateObject(bucket, getMultipartKey(key, uploadID, partID), "", expectedMD5Sum, size, data) } func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) { - // TODO verify upload id exists + // Verify upload id + _, ok := memory.objects.Get(key + "?uploadId=" + uploadID) + if !ok { + return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) + } + if !drivers.IsValidBucket(bucket) { + return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) + } + if !drivers.IsValidObjectName(key) { + return "", iodine.New(drivers.ObjectNameInvalid{Object: key}, nil) + } + memory.lock.RLock() + if _, ok := memory.storedBuckets[bucket]; ok == false { + memory.lock.RUnlock() + return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + } + memory.lock.RUnlock() + memory.lock.Lock() - size := int64(0) + var size int64 for i := 1; i <= len(parts); i++ { if _, ok := parts[i]; ok { if object, ok := memory.storedBuckets[bucket].objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)]; ok == true { @@ -550,7 +595,6 @@ func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string } var fullObject bytes.Buffer - for i := 1; i <= len(parts); i++ { if _, ok := parts[i]; ok { if object, ok := memory.objects.Get(bucket + "/" + getMultipartKey(key, uploadID, i)); ok == true { From 616241a2c1bc2f53603531b9a6f405b5f238727d Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 8 May 2015 02:23:34 -0700 Subject: [PATCH 2/2] Re-enable bucket deletion, this time with uploadID sitting there --- pkg/storage/drivers/memory/memory.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/pkg/storage/drivers/memory/memory.go b/pkg/storage/drivers/memory/memory.go index 4743cb933..1e2fd7501 100644 --- a/pkg/storage/drivers/memory/memory.go +++ b/pkg/storage/drivers/memory/memory.go @@ -501,14 +501,11 @@ func (memory *memoryDriver) evictObject(a ...interface{}) { cacheStats.Bytes, cacheStats.Items, cacheStats.Evictions) key := a[0].(string) // loop through all buckets - for _, storedBucket := range memory.storedBuckets { + for bucket, storedBucket := range memory.storedBuckets { delete(storedBucket.objectMetadata, key) // remove bucket if no objects found anymore if len(storedBucket.objectMetadata) == 0 { - // TODO (y4m4) - // for now refrain from deleting buckets, due to multipart deletes before fullobject being written - // this case gets trigerred and we can't store the actual data at all receiving 404 on the client - // delete(memory.storedBuckets, bucket) + delete(memory.storedBuckets, bucket) } } debug.FreeOSMemory() @@ -540,7 +537,9 @@ func (memory *memoryDriver) NewMultipartUpload(bucket, key, contentType string) uploadIDSum := sha512.Sum512(id) uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:]) md5sumBytes := md5.Sum([]byte(uploadID)) - md5sum := hex.EncodeToString(md5sumBytes[:]) + // CreateObject expects in base64 which is coming over http request header + // while all response headers with ETag are hex encoding + md5sum := base64.StdEncoding.EncodeToString(md5sumBytes[:]) // Create UploadID session, this is a temporary work around to instantiate a session. // It would not be valid in future, since we need to work out proper sessions so that @@ -555,7 +554,7 @@ func getMultipartKey(key string, uploadID string, partNumber int) string { func (memory *memoryDriver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { // Verify upload id - _, ok := memory.objects.Get(key + "?uploadId=" + uploadID) + _, ok := memory.objects.Get(bucket + "/" + key + "?uploadId=" + uploadID) if !ok { return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) } @@ -564,7 +563,7 @@ func (memory *memoryDriver) CreateObjectPart(bucket, key, uploadID string, partI func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) { // Verify upload id - _, ok := memory.objects.Get(key + "?uploadId=" + uploadID) + _, ok := memory.objects.Get(bucket + "/" + key + "?uploadId=" + uploadID) if !ok { return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) }