From 14b5828505dfec5f93bc5947f80eeff489c6a3e3 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sat, 9 May 2015 19:39:00 -0700 Subject: [PATCH] Handle partNumberMarker with listObjectParts now and other fixes --- pkg/api/api_object_handlers.go | 83 +++++++++++++++------------- pkg/storage/drivers/donut/donut.go | 2 +- pkg/storage/drivers/driver.go | 2 +- pkg/storage/drivers/memory/memory.go | 23 +++++--- pkg/storage/drivers/mocks/Driver.go | 4 +- 5 files changed, 62 insertions(+), 52 deletions(-) diff --git a/pkg/api/api_object_handlers.go b/pkg/api/api_object_handlers.go index 494bb6767..6a7defbcc 100644 --- a/pkg/api/api_object_handlers.go +++ b/pkg/api/api_object_handlers.go @@ -27,7 +27,10 @@ import ( "github.com/minio-io/minio/pkg/iodine" "github.com/minio-io/minio/pkg/storage/drivers" "github.com/minio-io/minio/pkg/utils/log" - "strings" +) + +const ( + maxPartsList = 1000 ) // GET Object @@ -235,22 +238,24 @@ func (server *minioAPI) newMultipartUploadHandler(w http.ResponseWriter, req *ht vars := mux.Vars(req) bucket = vars["bucket"] object = vars["object"] - var uploadID string - var err error - if uploadID, err = server.driver.NewMultipartUpload(bucket, object, ""); err != nil { + uploadID, err := server.driver.NewMultipartUpload(bucket, object, "") + switch err := iodine.ToError(err).(type) { + case nil: + response := generateInitiateMultipartUploadResult(bucket, object, uploadID) + encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType) + // write headers + setCommonHeaders(w, getContentTypeString(acceptsContentType)) + // set content-length to the size of the body + w.Header().Set("Content-Length", strconv.Itoa(len(encodedSuccessResponse))) + w.WriteHeader(http.StatusOK) + // write body + w.Write(encodedSuccessResponse) + case drivers.ObjectExists: + writeErrorResponse(w, req, MethodNotAllowed, acceptsContentType, req.URL.Path) + default: log.Println(iodine.New(err, nil)) - writeErrorResponse(w, req, NotAcceptable, acceptsContentType, req.URL.Path) - return + writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) } - response := generateInitiateMultipartUploadResult(bucket, object, uploadID) - encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType) - // write headers - setCommonHeaders(w, getContentTypeString(acceptsContentType)) - // set content-length to the size of the body - w.Header().Set("Content-Length", strconv.Itoa(len(encodedSuccessResponse))) - w.WriteHeader(http.StatusOK) - // write body - w.Write(encodedSuccessResponse) } func (server *minioAPI) putObjectPartHandler(w http.ResponseWriter, req *http.Request) { @@ -295,16 +300,21 @@ func (server *minioAPI) putObjectPartHandler(w http.ResponseWriter, req *http.Re writeErrorResponse(w, req, InvalidRequest, acceptsContentType, req.URL.Path) return } - var object, bucket string + vars := mux.Vars(req) - bucket = vars["bucket"] - object = vars["object"] - uploadID := vars["uploadId"] - // workaround for mux not splitting on & properly - if len(uploadID) > 1 { - uploadID = strings.Split(uploadID, "&")[0] + bucket := vars["bucket"] + object := vars["object"] + + var uploadID, partIDString string + for key, value := range req.URL.Query() { + switch true { + case key == "uploadId": + uploadID = value[0] + case key == "partNumber": + partIDString = value[0] + } } - partIDString := vars["partNumber"] + partID, err := strconv.Atoi(partIDString) if err != nil { writeErrorResponse(w, req, InvalidPart, acceptsContentType, req.URL.Path) @@ -337,11 +347,6 @@ func (server *minioAPI) putObjectPartHandler(w http.ResponseWriter, req *http.Re { writeErrorResponse(w, req, InvalidDigest, acceptsContentType, req.URL.Path) } - case drivers.ImplementationError: - { - log.Error.Println(err) - writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) - } default: { log.Error.Println(err) @@ -359,9 +364,10 @@ func (server *minioAPI) abortMultipartUploadHandler(w http.ResponseWriter, req * vars := mux.Vars(req) bucket := vars["bucket"] object := vars["object"] - uploadID := vars["uploadId"] - err := server.driver.AbortMultipartUpload(bucket, object, uploadID) + objectResourcesMetadata := getObjectResources(req.URL.Query()) + + err := server.driver.AbortMultipartUpload(bucket, object, objectResourcesMetadata.UploadID) switch err := iodine.ToError(err).(type) { case nil: setCommonHeaders(w, getContentTypeString(acceptsContentType)) @@ -383,9 +389,12 @@ func (server *minioAPI) listObjectPartsHandler(w http.ResponseWriter, req *http. vars := mux.Vars(req) bucket := vars["bucket"] object := vars["object"] - uploadID := vars["uploadId"] + objectResourcesMetadata := getObjectResources(req.URL.Query()) + if objectResourcesMetadata.MaxParts == 0 { + objectResourcesMetadata.MaxParts = maxPartsList + } - objectResourcesMetadata, err := server.driver.ListObjectParts(bucket, object, uploadID) + objectResourcesMetadata, err := server.driver.ListObjectParts(bucket, object, objectResourcesMetadata) switch err := iodine.ToError(err).(type) { case nil: response := generateListPartsResult(objectResourcesMetadata) @@ -428,13 +437,14 @@ func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, re vars := mux.Vars(req) bucket := vars["bucket"] object := vars["object"] - uploadID := vars["uploadId"] + objectResourcesMetadata := getObjectResources(req.URL.Query()) partMap := make(map[int]string) for _, part := range parts.Part { partMap[part.PartNumber] = part.ETag } - etag, err := server.driver.CompleteMultipartUpload(bucket, object, uploadID, partMap) + + etag, err := server.driver.CompleteMultipartUpload(bucket, object, objectResourcesMetadata.UploadID, partMap) switch err := iodine.ToError(err).(type) { case nil: response := generateCompleteMultpartUploadResult(bucket, object, "", etag) @@ -453,8 +463,3 @@ func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, re writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) } } - -func (server *minioAPI) notImplementedHandler(w http.ResponseWriter, req *http.Request) { - acceptsContentType := getContentType(req) - writeErrorResponse(w, req, NotImplemented, acceptsContentType, req.URL.Path) -} diff --git a/pkg/storage/drivers/donut/donut.go b/pkg/storage/drivers/donut/donut.go index 46796b6f4..2a2b5cb33 100644 --- a/pkg/storage/drivers/donut/donut.go +++ b/pkg/storage/drivers/donut/donut.go @@ -412,7 +412,7 @@ func (d donutDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts return "", iodine.New(errors.New("Not Implemented"), nil) } -func (d donutDriver) ListObjectParts(bucket, key, uploadID string) (drivers.ObjectResourcesMetadata, error) { +func (d donutDriver) ListObjectParts(bucket, key string, resources drivers.ObjectResourcesMetadata) (drivers.ObjectResourcesMetadata, error) { return drivers.ObjectResourcesMetadata{}, iodine.New(errors.New("Not Implemented"), nil) } diff --git a/pkg/storage/drivers/driver.go b/pkg/storage/drivers/driver.go index 8f8cdad34..b5a29e6a4 100644 --- a/pkg/storage/drivers/driver.go +++ b/pkg/storage/drivers/driver.go @@ -44,7 +44,7 @@ type Driver interface { AbortMultipartUpload(bucket, key, UploadID string) error CreateObjectPart(bucket, key, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) - ListObjectParts(bucket, key, uploadID string) (ObjectResourcesMetadata, error) + ListObjectParts(bucket, key string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, error) } // BucketACL - bucket level access control diff --git a/pkg/storage/drivers/memory/memory.go b/pkg/storage/drivers/memory/memory.go index 4f0ae0ed2..c082974ad 100644 --- a/pkg/storage/drivers/memory/memory.go +++ b/pkg/storage/drivers/memory/memory.go @@ -685,7 +685,7 @@ func (a partNumber) Len() int { return len(a) } func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber } -func (memory *memoryDriver) ListObjectParts(bucket, key, uploadID string) (drivers.ObjectResourcesMetadata, error) { +func (memory *memoryDriver) ListObjectParts(bucket, key string, resources drivers.ObjectResourcesMetadata) (drivers.ObjectResourcesMetadata, error) { // Verify upload id memory.lock.RLock() defer memory.lock.RUnlock() @@ -693,24 +693,29 @@ func (memory *memoryDriver) ListObjectParts(bucket, key, uploadID string) (drive return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) } storedBucket := memory.storedBuckets[bucket] - if storedBucket.multiPartSession[key].uploadID != uploadID { - return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) + if storedBucket.multiPartSession[key].uploadID != resources.UploadID { + return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.InvalidUploadID{UploadID: resources.UploadID}, nil) } - // TODO support PartNumberMarker and NextPartNumberMarker - objectResourcesMetadata := drivers.ObjectResourcesMetadata{} - objectResourcesMetadata.UploadID = uploadID + objectResourcesMetadata := resources objectResourcesMetadata.Bucket = bucket objectResourcesMetadata.Key = key - objectResourcesMetadata.MaxParts = 1000 var parts []*drivers.PartMetadata - for i := 1; i <= storedBucket.multiPartSession[key].totalParts; i++ { + var startPartNumber int + switch { + case objectResourcesMetadata.PartNumberMarker == 0: + startPartNumber = 1 + default: + startPartNumber = objectResourcesMetadata.PartNumberMarker + } + for i := startPartNumber; i <= storedBucket.multiPartSession[key].totalParts; i++ { if len(parts) > objectResourcesMetadata.MaxParts { sort.Sort(partNumber(parts)) objectResourcesMetadata.IsTruncated = true objectResourcesMetadata.Part = parts + objectResourcesMetadata.NextPartNumberMarker = i return objectResourcesMetadata, nil } - object, ok := storedBucket.objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)] + object, ok := storedBucket.objectMetadata[resources.Bucket+"/"+getMultipartKey(resources.Key, resources.UploadID, i)] if !ok { return drivers.ObjectResourcesMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) } diff --git a/pkg/storage/drivers/mocks/Driver.go b/pkg/storage/drivers/mocks/Driver.go index fd515c760..861d90cfb 100644 --- a/pkg/storage/drivers/mocks/Driver.go +++ b/pkg/storage/drivers/mocks/Driver.go @@ -158,8 +158,8 @@ func (m *Driver) CompleteMultipartUpload(bucket, key, uploadID string, parts map } // ListObjectParts is a mock -func (m *Driver) ListObjectParts(bucket, key, uploadID string) (drivers.ObjectResourcesMetadata, error) { - ret := m.Called(bucket, key, uploadID) +func (m *Driver) ListObjectParts(bucket, key string, resources drivers.ObjectResourcesMetadata) (drivers.ObjectResourcesMetadata, error) { + ret := m.Called(bucket, key, resources) r0 := ret.Get(0).(drivers.ObjectResourcesMetadata) r1 := ret.Error(1)