From f1119971840ecb25a8ef23a3b0f0d636ac9b6ad3 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 1 Mar 2016 20:01:40 -0800 Subject: [PATCH] multipart: remove proper MD5, rather create MD5 based on parts to be s3 compatible. This increases the performance phenominally. --- api-response.go | 5 +++ bucket-handlers.go | 2 +- object-handlers.go | 5 ++- pkg/fs/api_suite_nix_test.go | 5 +-- pkg/fs/api_suite_windows_test.go | 5 +-- pkg/fs/fs-multipart.go | 64 ++++++++++++++++++++++++-------- 6 files changed, 60 insertions(+), 26 deletions(-) diff --git a/api-response.go b/api-response.go index ce2421b11..463917885 100644 --- a/api-response.go +++ b/api-response.go @@ -218,6 +218,11 @@ type CompleteMultipartUploadResponse struct { ETag string } +// getLocation get URL location. +func getLocation(r *http.Request) string { + return r.URL.Path +} + // takes an array of Bucketmetadata information for serialization // input: // array of bucket metadata diff --git a/bucket-handlers.go b/bucket-handlers.go index 318c96bd7..15043fe01 100644 --- a/bucket-handlers.go +++ b/bucket-handlers.go @@ -291,7 +291,7 @@ func (api storageAPI) PutBucketHandler(w http.ResponseWriter, r *http.Request) { return } // Make sure to add Location information here only for bucket - w.Header().Set("Location", "/"+bucket) + w.Header().Set("Location", getLocation(r)) writeSuccessResponse(w, nil) } diff --git a/object-handlers.go b/object-handlers.go index 1bd225eb6..3e6958f21 100644 --- a/object-handlers.go +++ b/object-handlers.go @@ -798,7 +798,10 @@ func (api storageAPI) CompleteMultipartUploadHandler(w http.ResponseWriter, r *h } return } - response := generateCompleteMultpartUploadResponse(bucket, object, r.URL.String(), metadata.MD5) + // get object location. + location := getLocation(r) + // Generate complete multipart response. + response := generateCompleteMultpartUploadResponse(bucket, object, location, metadata.MD5) encodedSuccessResponse := encodeSuccessResponse(response) // write headers setCommonHeaders(w) diff --git a/pkg/fs/api_suite_nix_test.go b/pkg/fs/api_suite_nix_test.go index 0c6378734..a0aaff9d3 100644 --- a/pkg/fs/api_suite_nix_test.go +++ b/pkg/fs/api_suite_nix_test.go @@ -66,7 +66,6 @@ func testMultipartObjectCreation(c *check.C, create func() Filesystem) { completedParts := CompleteMultipartUpload{} completedParts.Part = make([]CompletePart, 0) - finalHasher := md5.New() for i := 1; i <= 10; i++ { randomPerm := rand.Perm(10) randomString := "" @@ -75,7 +74,6 @@ func testMultipartObjectCreation(c *check.C, create func() Filesystem) { } hasher := md5.New() - finalHasher.Write([]byte(randomString)) hasher.Write([]byte(randomString)) expectedmd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) expectedmd5Sumhex := hex.EncodeToString(hasher.Sum(nil)) @@ -87,12 +85,11 @@ func testMultipartObjectCreation(c *check.C, create func() Filesystem) { c.Assert(calculatedmd5sum, check.Equals, expectedmd5Sumhex) completedParts.Part = append(completedParts.Part, CompletePart{PartNumber: i, ETag: calculatedmd5sum}) } - finalExpectedmd5SumHex := hex.EncodeToString(finalHasher.Sum(nil)) completedPartsBytes, e := xml.Marshal(completedParts) c.Assert(e, check.IsNil) objectMetadata, err := fs.CompleteMultipartUpload("bucket", "key", uploadID, bytes.NewReader(completedPartsBytes), nil) c.Assert(err, check.IsNil) - c.Assert(objectMetadata.MD5, check.Equals, finalExpectedmd5SumHex) + c.Assert(objectMetadata.MD5, check.Equals, "9b7d6f13ba00e24d0b02de92e814891b-10") } func testMultipartObjectAbort(c *check.C, create func() Filesystem) { diff --git a/pkg/fs/api_suite_windows_test.go b/pkg/fs/api_suite_windows_test.go index ecc179374..4f43c5486 100644 --- a/pkg/fs/api_suite_windows_test.go +++ b/pkg/fs/api_suite_windows_test.go @@ -65,7 +65,6 @@ func testMultipartObjectCreation(c *check.C, create func() Filesystem) { completedParts := CompleteMultipartUpload{} completedParts.Part = make([]CompletePart, 0) - finalHasher := md5.New() for i := 1; i <= 10; i++ { randomPerm := rand.Perm(10) randomString := "" @@ -74,7 +73,6 @@ func testMultipartObjectCreation(c *check.C, create func() Filesystem) { } hasher := md5.New() - finalHasher.Write([]byte(randomString)) hasher.Write([]byte(randomString)) expectedmd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) expectedmd5Sumhex := hex.EncodeToString(hasher.Sum(nil)) @@ -86,12 +84,11 @@ func testMultipartObjectCreation(c *check.C, create func() Filesystem) { c.Assert(calculatedmd5sum, check.Equals, expectedmd5Sumhex) completedParts.Part = append(completedParts.Part, CompletePart{PartNumber: i, ETag: calculatedmd5sum}) } - finalExpectedmd5SumHex := hex.EncodeToString(finalHasher.Sum(nil)) completedPartsBytes, e := xml.Marshal(completedParts) c.Assert(e, check.IsNil) objectMetadata, err := fs.CompleteMultipartUpload("bucket", "key", uploadID, bytes.NewReader(completedPartsBytes), nil) c.Assert(err, check.IsNil) - c.Assert(objectMetadata.MD5, check.Equals, finalExpectedmd5SumHex) + c.Assert(objectMetadata.MD5, check.Equals, "9b7d6f13ba00e24d0b02de92e814891b-10") } func testMultipartObjectAbort(c *check.C, create func() Filesystem) { diff --git a/pkg/fs/fs-multipart.go b/pkg/fs/fs-multipart.go index 0468e7fe4..290eea22e 100644 --- a/pkg/fs/fs-multipart.go +++ b/pkg/fs/fs-multipart.go @@ -141,6 +141,22 @@ func doPartsMatch(parts []CompletePart, savedParts []PartMetadata) bool { return true } +// Create an s3 compatible MD5sum for complete multipart transaction. +func makeS3MD5(md5Strs ...string) (string, *probe.Error) { + var finalMD5Bytes []byte + for _, md5Str := range md5Strs { + md5Bytes, e := hex.DecodeString(md5Str) + if e != nil { + return "", probe.NewError(e) + } + finalMD5Bytes = append(finalMD5Bytes, md5Bytes...) + } + md5Hasher := md5.New() + md5Hasher.Write(finalMD5Bytes) + s3MD5 := fmt.Sprintf("%s-%d", hex.EncodeToString(md5Hasher.Sum(nil)), len(md5Strs)) + return s3MD5, nil +} + type multiCloser struct { Closers []io.Closer } @@ -177,7 +193,9 @@ func saveParts(partPathPrefix string, mw io.Writer, parts []CompletePart) *probe var partReaders []io.Reader var partClosers []io.Closer for _, part := range parts { + // Trim prefix md5Sum := strings.TrimPrefix(part.ETag, "\"") + // Trim suffix md5Sum = strings.TrimSuffix(md5Sum, "\"") partFile, e := os.OpenFile(partPathPrefix+md5Sum+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600) if e != nil { @@ -199,7 +217,8 @@ func saveParts(partPathPrefix string, mw io.Writer, parts []CompletePart) *probe defer closer.Close() reader := io.MultiReader(partReaders...) - readBuffer := make([]byte, 4*1024*1024) + readBufferSize := 8 * 1024 * 1024 // 8MiB + readBuffer := make([]byte, readBufferSize) // Allocate 8MiB buffer. if _, e := io.CopyBuffer(mw, reader, readBuffer); e != nil { return probe.NewError(e) } @@ -465,16 +484,14 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da } objectPath := filepath.Join(bucketPath, object) - file, e := atomic.FileCreateWithPrefix(objectPath, "$tmpobject") + objectWriter, e := atomic.FileCreateWithPrefix(objectPath, "$tmpobject") if e != nil { return ObjectMetadata{}, probe.NewError(e) } - md5Hasher := md5.New() - objectWriter := io.MultiWriter(file, md5Hasher) partBytes, e := ioutil.ReadAll(data) if e != nil { - file.CloseAndPurge() + objectWriter.CloseAndPurge() return ObjectMetadata{}, probe.NewError(e) } if signature != nil { @@ -482,21 +499,21 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da sh.Write(partBytes) ok, err := signature.DoesSignatureMatch(hex.EncodeToString(sh.Sum(nil))) if err != nil { - file.CloseAndPurge() + objectWriter.CloseAndPurge() return ObjectMetadata{}, err.Trace() } if !ok { - file.CloseAndPurge() + objectWriter.CloseAndPurge() return ObjectMetadata{}, probe.NewError(SignDoesNotMatch{}) } } completeMultipartUpload := &CompleteMultipartUpload{} if e = xml.Unmarshal(partBytes, completeMultipartUpload); e != nil { - file.CloseAndPurge() + objectWriter.CloseAndPurge() return ObjectMetadata{}, probe.NewError(MalformedXML{}) } if !sort.IsSorted(completedParts(completeMultipartUpload.Part)) { - file.CloseAndPurge() + objectWriter.CloseAndPurge() return ObjectMetadata{}, probe.NewError(InvalidPartOrder{}) } @@ -509,28 +526,42 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da fs.rwLock.RUnlock() if !doPartsMatch(parts, savedParts) { - file.CloseAndPurge() + objectWriter.CloseAndPurge() return ObjectMetadata{}, probe.NewError(InvalidPart{}) } // Parts successfully validated, save all the parts. partPathPrefix := objectPath + uploadID if err := saveParts(partPathPrefix, objectWriter, parts); err != nil { - file.CloseAndPurge() + objectWriter.CloseAndPurge() return ObjectMetadata{}, err.Trace(partPathPrefix) } - // Successfully saved, remove all parts. - removeParts(partPathPrefix, savedParts) + var md5Strs []string + for _, part := range savedParts { + md5Strs = append(md5Strs, part.ETag) + } + // Save the s3 md5. + s3MD5, err := makeS3MD5(md5Strs...) + if err != nil { + objectWriter.CloseAndPurge() + return ObjectMetadata{}, err.Trace(md5Strs...) + } + + // Successfully saved multipart, remove all parts in a routine. + go removeParts(partPathPrefix, savedParts) // Critical region requiring write lock. fs.rwLock.Lock() delete(fs.multiparts.ActiveSession, uploadID) if err := saveMultipartsSession(*fs.multiparts); err != nil { fs.rwLock.Unlock() - file.CloseAndPurge() + objectWriter.CloseAndPurge() return ObjectMetadata{}, err.Trace(partPathPrefix) } - file.Close() + if e = objectWriter.Close(); e != nil { + fs.rwLock.Unlock() + return ObjectMetadata{}, probe.NewError(e) + } fs.rwLock.Unlock() // Send stat again to get object metadata. @@ -538,6 +569,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da if e != nil { return ObjectMetadata{}, probe.NewError(e) } + contentType := "application/octet-stream" if objectExt := filepath.Ext(objectPath); objectExt != "" { content, ok := mimedb.DB[strings.ToLower(strings.TrimPrefix(objectExt, "."))] @@ -551,7 +583,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da LastModified: st.ModTime(), Size: st.Size(), ContentType: contentType, - MD5: hex.EncodeToString(md5Hasher.Sum(nil)), + MD5: s3MD5, } return newObject, nil }