From f050d5e9744e51ef0fdfb2249f6b215c4e0219bd Mon Sep 17 00:00:00 2001 From: "Frederick F. Kautz IV" Date: Thu, 7 May 2015 19:55:30 -0700 Subject: [PATCH] Adding multipart support --- main.go | 4 + pkg/api/api_definitions.go | 25 ++++- pkg/api/api_object_handlers.go | 158 ++++++++++++++++++++++++++- pkg/api/api_response.go | 8 ++ pkg/api/api_router.go | 7 ++ pkg/storage/drivers/donut/donut.go | 13 +++ pkg/storage/drivers/driver.go | 5 + pkg/storage/drivers/memory/memory.go | 71 +++++++++++- pkg/storage/drivers/mocks/Driver.go | 29 +++++ 9 files changed, 314 insertions(+), 6 deletions(-) diff --git a/main.go b/main.go index e6ed69e62..6f00dffb9 100644 --- a/main.go +++ b/main.go @@ -28,6 +28,7 @@ import ( "github.com/dustin/go-humanize" "github.com/minio-io/cli" + "github.com/minio-io/minio/pkg/featureflags" "github.com/minio-io/minio/pkg/iodine" "github.com/minio-io/minio/pkg/server" "github.com/minio-io/minio/pkg/server/httpserver" @@ -295,6 +296,9 @@ func getSystemData() map[string]string { var Version = mustHashBinarySelf() func main() { + // enable features + featureflags.Enable(featureflags.MultipartPutObject) + // set up iodine iodine.SetGlobalState("minio.version", Version) iodine.SetGlobalState("minio.starttime", time.Now().Format(time.RFC3339)) diff --git a/pkg/api/api_definitions.go b/pkg/api/api_definitions.go index b62299a32..59debe85d 100644 --- a/pkg/api/api_definitions.go +++ b/pkg/api/api_definitions.go @@ -95,6 +95,26 @@ type Owner struct { DisplayName string } +// InitiateMultipartUploadResult - Returns upload id to use +type InitiateMultipartUploadResult struct { + XMLName xml.Name `xml:"http://doc.s3.amazonaws.com/2006-03-01 InitiateMultipartUploadResult" json:"-"` + + Bucket string + Key string + UploadID string `xml:"UploadId"` +} + +// CompleteMultipartUpload - Construct object from uploaded parts +type CompleteMultipartUpload struct { + Part []Part +} + +// Part - Description of a multipart part +type Part struct { + PartNumber int + ETag string +} + // List of not implemented bucket queries var notimplementedBucketResourceNames = map[string]bool{ "policy": true, @@ -113,7 +133,6 @@ var notimplementedBucketResourceNames = map[string]bool{ // List of not implemented object queries var notimplementedObjectResourceNames = map[string]bool{ - "uploadId": true, - "torrent": true, - "uploads": true, + "torrent": true, + "uploads": true, } diff --git a/pkg/api/api_object_handlers.go b/pkg/api/api_object_handlers.go index cf7bc1db3..4508d407e 100644 --- a/pkg/api/api_object_handlers.go +++ b/pkg/api/api_object_handlers.go @@ -20,6 +20,7 @@ import ( "net/http" "strconv" + "encoding/xml" "github.com/gorilla/mux" "github.com/minio-io/minio/pkg/iodine" "github.com/minio-io/minio/pkg/storage/drivers" @@ -173,8 +174,8 @@ func (server *minioAPI) putObjectHandler(w http.ResponseWriter, req *http.Reques return } // ignoring error here, TODO find a way to reply back if we can - sizeInt, _ := strconv.ParseInt(size, 10, 64) - calculatedMD5, err := server.driver.CreateObject(bucket, object, "", md5, sizeInt, req.Body) + sizeInt64, _ := strconv.ParseInt(size, 10, 64) + calculatedMD5, err := server.driver.CreateObject(bucket, object, "", md5, sizeInt64, req.Body) switch err := iodine.ToError(err).(type) { case nil: { @@ -210,3 +211,156 @@ func (server *minioAPI) putObjectHandler(w http.ResponseWriter, req *http.Reques } } } + +func (server *minioAPI) newMultipartUploadHandler(w http.ResponseWriter, req *http.Request) { + acceptsContentType := getContentType(req) + if acceptsContentType == unknownContentType { + writeErrorResponse(w, req, NotAcceptable, acceptsContentType, req.URL.Path) + return + } + + // handle ACL's here at bucket level + if !server.isValidOp(w, req, acceptsContentType) { + return + } + + var object, bucket string + vars := mux.Vars(req) + bucket = vars["bucket"] + object = vars["object"] + var uploadID string + var err error + if uploadID, err = server.driver.NewMultipartUpload(bucket, object, ""); err != nil { + log.Println(iodine.New(err, nil)) + writeErrorResponse(w, req, NotImplemented, acceptsContentType, req.URL.Path) + return + } + response := generateInitiateMultipartUploadResult(bucket, object, uploadID) + encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType) + // write headers + setCommonHeaders(w, getContentTypeString(acceptsContentType)) + // set content-length to the size of the body + w.Header().Set("Content-Length", strconv.Itoa(len(encodedSuccessResponse))) + w.WriteHeader(http.StatusOK) + // write body + w.Write(encodedSuccessResponse) +} + +func (server *minioAPI) putObjectPartHandler(w http.ResponseWriter, req *http.Request) { + acceptsContentType := getContentType(req) + if acceptsContentType == unknownContentType { + writeErrorResponse(w, req, NotAcceptable, acceptsContentType, req.URL.Path) + return + } + + // handle ACL's here at bucket level + if !server.isValidOp(w, req, acceptsContentType) { + return + } + + // get Content-MD5 sent by client and verify if valid + md5 := req.Header.Get("Content-MD5") + if !isValidMD5(md5) { + writeErrorResponse(w, req, InvalidDigest, acceptsContentType, req.URL.Path) + return + } + /// if Content-Length missing, throw away + size := req.Header.Get("Content-Length") + if size == "" { + writeErrorResponse(w, req, MissingContentLength, acceptsContentType, req.URL.Path) + return + } + /// maximum Upload size for objects in a single operation + if isMaxObjectSize(size) { + writeErrorResponse(w, req, EntityTooLarge, acceptsContentType, req.URL.Path) + return + } + /// minimum Upload size for objects in a single operation + if isMinObjectSize(size) { + writeErrorResponse(w, req, EntityTooSmall, acceptsContentType, req.URL.Path) + return + } + // ignoring error here, TODO find a way to reply back if we can + sizeInt64, _ := strconv.ParseInt(size, 10, 64) + + var object, bucket string + vars := mux.Vars(req) + bucket = vars["bucket"] + object = vars["object"] + uploadID := vars["uploadID"] + partIDString := vars["partNumber"] + partID, err := strconv.Atoi(partIDString) + if err != nil { + // TODO find the write value for this error + writeErrorResponse(w, req, NotAcceptable, acceptsContentType, req.URL.Path) + } + calculatedMD5, err := server.driver.CreateObjectPart(bucket, object, uploadID, partID, "", md5, sizeInt64, req.Body) + switch err := iodine.ToError(err).(type) { + case nil: + { + w.Header().Set("ETag", calculatedMD5) + writeSuccessResponse(w, acceptsContentType) + + } + case drivers.ObjectExists: + { + writeErrorResponse(w, req, MethodNotAllowed, acceptsContentType, req.URL.Path) + } + case drivers.BadDigest: + { + writeErrorResponse(w, req, BadDigest, acceptsContentType, req.URL.Path) + } + case drivers.EntityTooLarge: + { + writeErrorResponse(w, req, EntityTooLarge, acceptsContentType, req.URL.Path) + } + case drivers.InvalidDigest: + { + writeErrorResponse(w, req, InvalidDigest, acceptsContentType, req.URL.Path) + } + case drivers.ImplementationError: + { + log.Error.Println(err) + writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) + } + default: + { + log.Error.Println(err) + writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) + } + } +} + +func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, req *http.Request) { + acceptsContentType := getContentType(req) + if acceptsContentType == unknownContentType { + writeErrorResponse(w, req, NotAcceptable, acceptsContentType, req.URL.Path) + return + } + + decoder := xml.NewDecoder(req.Body) + parts := &CompleteMultipartUpload{} + err := decoder.Decode(parts) + if err != nil { + log.Error.Println(err) + writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) + } + + partMap := make(map[int]string) + + vars := mux.Vars(req) + bucket := vars["bucket"] + object := vars["object"] + uploadID := vars["uploadID"] + + for _, part := range parts.Part { + partMap[part.PartNumber] = part.ETag + } + + err = server.driver.CompleteMultipartUpload(bucket, object, uploadID, partMap) +} + +func (server *minioAPI) notImplementedHandler(w http.ResponseWriter, req *http.Request) { + acceptsContentType := getContentType(req) + writeErrorResponse(w, req, NotImplemented, acceptsContentType, req.URL.Path) +} diff --git a/pkg/api/api_response.go b/pkg/api/api_response.go index 757dff447..09bfccd03 100644 --- a/pkg/api/api_response.go +++ b/pkg/api/api_response.go @@ -112,6 +112,14 @@ func generateListObjectsResponse(bucket string, objects []drivers.ObjectMetadata return data } +func generateInitiateMultipartUploadResult(bucket, key, uploadID string) InitiateMultipartUploadResult { + return InitiateMultipartUploadResult{ + Bucket: bucket, + Key: key, + UploadID: uploadID, + } +} + // writeSuccessResponse - write success headers func writeSuccessResponse(w http.ResponseWriter, acceptsContentType contentType) { setCommonHeaders(w, getContentTypeString(acceptsContentType)) diff --git a/pkg/api/api_router.go b/pkg/api/api_router.go index 4f0e62965..613ffb313 100644 --- a/pkg/api/api_router.go +++ b/pkg/api/api_router.go @@ -24,6 +24,7 @@ import ( "github.com/minio-io/minio/pkg/api/config" "github.com/minio-io/minio/pkg/api/logging" "github.com/minio-io/minio/pkg/api/quota" + "github.com/minio-io/minio/pkg/featureflags" "github.com/minio-io/minio/pkg/iodine" "github.com/minio-io/minio/pkg/storage/drivers" ) @@ -46,6 +47,12 @@ func HTTPHandler(driver drivers.Driver) http.Handler { mux.HandleFunc("/{bucket}", api.headBucketHandler).Methods("HEAD") mux.HandleFunc("/{bucket}/{object:.*}", api.getObjectHandler).Methods("GET") mux.HandleFunc("/{bucket}/{object:.*}", api.headObjectHandler).Methods("HEAD") + if featureflags.Get(featureflags.MultipartPutObject) { + log.Println("Enabling feature", featureflags.MultipartPutObject) + mux.HandleFunc("/{bucket}/{object:.*}?uploads", api.newMultipartUploadHandler).Methods("POST") + mux.HandleFunc("/{bucket}/{object:.*}", api.putObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}").Methods("PUT") + mux.HandleFunc("/{bucket}/{object:.*}", api.completeMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}").Methods("POST") + } mux.HandleFunc("/{bucket}/{object:.*}", api.putObjectHandler).Methods("PUT") var conf = config.Config{} diff --git a/pkg/storage/drivers/donut/donut.go b/pkg/storage/drivers/donut/donut.go index 5da08c7ea..c6de90b59 100644 --- a/pkg/storage/drivers/donut/donut.go +++ b/pkg/storage/drivers/donut/donut.go @@ -29,6 +29,7 @@ import ( "io/ioutil" + "errors" "github.com/minio-io/minio/pkg/iodine" "github.com/minio-io/minio/pkg/storage/donut" "github.com/minio-io/minio/pkg/storage/drivers" @@ -397,3 +398,15 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM } return calculatedMD5Sum, nil } + +func (d donutDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) { + return "", iodine.New(errors.New("Not Implemented"), nil) +} + +func (d donutDriver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { + return "", iodine.New(errors.New("Not Implemented"), nil) +} + +func (d donutDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) error { + return iodine.New(errors.New("Not Implemented"), nil) +} diff --git a/pkg/storage/drivers/driver.go b/pkg/storage/drivers/driver.go index 8dd1a462b..e118c4c25 100644 --- a/pkg/storage/drivers/driver.go +++ b/pkg/storage/drivers/driver.go @@ -38,6 +38,11 @@ type Driver interface { GetObjectMetadata(bucket string, object string, prefix string) (ObjectMetadata, error) ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error) CreateObject(bucket string, key string, contentType string, md5sum string, size int64, data io.Reader) (string, error) + + // Object Multipart Operations + NewMultipartUpload(bucket string, key string, contentType string) (string, error) + CreateObjectPart(bucket string, key string, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error) + CompleteMultipartUpload(bucket string, key string, uploadID string, parts map[int]string) error } // BucketACL - bucket level access control diff --git a/pkg/storage/drivers/memory/memory.go b/pkg/storage/drivers/memory/memory.go index d084f18fe..e66179bc1 100644 --- a/pkg/storage/drivers/memory/memory.go +++ b/pkg/storage/drivers/memory/memory.go @@ -20,6 +20,7 @@ import ( "bufio" "bytes" "crypto/md5" + "crypto/sha512" "encoding/base64" "encoding/hex" "errors" @@ -32,6 +33,8 @@ import ( "sync" "time" + "math/rand" + "github.com/minio-io/minio/pkg/iodine" "github.com/minio-io/minio/pkg/storage/drivers" ) @@ -222,7 +225,7 @@ func getMD5AndData(reader io.Reader) ([]byte, []byte, error) { return hash.Sum(nil), data, nil } -// CreateObject - PUT object to memory buffer +// createObject - PUT object to memory buffer func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { memory.lock.RLock() if !drivers.IsValidBucket(bucket) { @@ -507,3 +510,69 @@ func (memory *memoryDriver) evictObject(a ...interface{}) { } debug.FreeOSMemory() } + +func (memory *memoryDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) { + // TODO verify object doesn't exist + id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String()) + uploadIDSum := sha512.Sum512(id) + uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:]) + md5sumBytes := md5.Sum([]byte(uploadID)) + md5sum := hex.EncodeToString(md5sumBytes[:]) + memory.CreateObject(bucket, key+"?uploadId="+uploadID, contentType, md5sum, int64(len(uploadID)), bytes.NewBufferString(uploadID)) + return uploadID, nil +} + +func getMultipartKey(key string, uploadID string, partNumber int) string { + return key + "?uploadId=" + uploadID + "&partNumber=" + strconv.Itoa(partNumber) +} + +func (memory *memoryDriver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { + // TODO verify upload id exists + return memory.CreateObject(bucket, getMultipartKey(key, uploadID, partID), "", expectedMD5Sum, size, data) +} + +func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) error { + // TODO verify upload id exists + memory.lock.Lock() + size := int64(0) + for i := 1; i <= len(parts); i++ { + if _, ok := parts[i]; ok { + if object, ok := memory.storedBuckets[bucket].objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)]; ok == true { + size += object.Size + } + } else { + memory.lock.Unlock() + return iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) + } + } + + var fullObject bytes.Buffer + + for i := 1; i <= len(parts); i++ { + if _, ok := parts[i]; ok { + if object, ok := memory.objects.Get(bucket + "/" + getMultipartKey(key, uploadID, i)); ok == true { + obj := object.([]byte) + io.Copy(&fullObject, bytes.NewBuffer(obj)) + } else { + log.Println("Cannot fetch: ", getMultipartKey(key, uploadID, i)) + } + } else { + memory.lock.Unlock() + return iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) + } + } + + for i := 1; i <= len(parts); i++ { + if _, ok := parts[i]; ok { + objectKey := bucket + "/" + getMultipartKey(key, uploadID, i) + memory.objects.Delete(objectKey) + } + } + + memory.lock.Unlock() + + md5sumSlice := md5.Sum(fullObject.Bytes()) + md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:]) + _, err := memory.CreateObject(bucket, key, "", md5sum, size, &fullObject) + return err +} diff --git a/pkg/storage/drivers/mocks/Driver.go b/pkg/storage/drivers/mocks/Driver.go index 455583215..d5cd0a02f 100644 --- a/pkg/storage/drivers/mocks/Driver.go +++ b/pkg/storage/drivers/mocks/Driver.go @@ -124,3 +124,32 @@ func (m *Driver) CreateObject(bucket string, key string, contentType string, md5 return r0, r1 } + +// NewMultipartUpload is a mock +func (m *Driver) NewMultipartUpload(bucket string, key string, contentType string) (string, error) { + ret := m.Called(bucket, key, contentType) + + r0 := ret.Get(0).(string) + r1 := ret.Error(1) + + return r0, r1 +} + +// CreateObjectPart is a mock +func (m *Driver) CreateObjectPart(bucket string, key string, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error) { + ret := m.Called(bucket, key, uploadID, partID, contentType, md5sum, size, data) + + r0 := ret.Get(0).(string) + r1 := ret.Error(1) + + return r0, r1 +} + +// CompleteMultipartUpload is a mock +func (m *Driver) CompleteMultipartUpload(bucket string, key string, uploadID string, parts map[int]string) error { + ret := m.Called(bucket, key, uploadID, parts) + + r0 := ret.Error(0) + + return r0 +}