diff --git a/fs-objects-multipart.go b/fs-objects-multipart.go index e073f4832..f358a0477 100644 --- a/fs-objects-multipart.go +++ b/fs-objects-multipart.go @@ -60,13 +60,19 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload return "", InvalidUploadID{UploadID: uploadID} } + // Calculate s3 compatible md5sum for complete multipart. + s3MD5, err := completeMultipartMD5(parts...) + if err != nil { + return "", err + } + tempObj := path.Join(tmpMetaPrefix, bucket, object, uploadID, incompleteFile) fileWriter, err := fs.storage.CreateFile(minioMetaBucket, tempObj) if err != nil { return "", toObjectErr(err, bucket, object) } - var md5Sums []string + // Loop through all parts, validate them and then commit to disk. for _, part := range parts { // Construct part suffix. partSuffix := fmt.Sprintf("%.5d.%s", part.PartNumber, part.ETag) @@ -96,7 +102,6 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload } return "", err } - md5Sums = append(md5Sums, part.ETag) } err = fileWriter.Close() @@ -117,12 +122,6 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload return "", toObjectErr(err, bucket, object) } - // Save the s3 md5. - s3MD5, err := completeMultipartMD5(md5Sums...) - if err != nil { - return "", err - } - // Cleanup all the parts if everything else has been safely committed. if err = cleanupUploadedParts(fs.storage, mpartMetaPrefix, bucket, object, uploadID); err != nil { return "", err diff --git a/object-utils.go b/object-utils.go index cced1f4b8..583fca34c 100644 --- a/object-utils.go +++ b/object-utils.go @@ -109,10 +109,10 @@ func pathJoin(s1 string, s2 string) string { } // Create an s3 compatible MD5sum for complete multipart transaction. -func completeMultipartMD5(md5Strs ...string) (string, error) { +func completeMultipartMD5(parts ...completePart) (string, error) { var finalMD5Bytes []byte - for _, md5Str := range md5Strs { - md5Bytes, err := hex.DecodeString(md5Str) + for _, part := range parts { + md5Bytes, err := hex.DecodeString(part.ETag) if err != nil { return "", err } @@ -120,7 +120,7 @@ func completeMultipartMD5(md5Strs ...string) (string, error) { } md5Hasher := md5.New() md5Hasher.Write(finalMD5Bytes) - s3MD5 := fmt.Sprintf("%s-%d", hex.EncodeToString(md5Hasher.Sum(nil)), len(md5Strs)) + s3MD5 := fmt.Sprintf("%s-%d", hex.EncodeToString(md5Hasher.Sum(nil)), len(parts)) return s3MD5, nil } diff --git a/xl-objects-multipart.go b/xl-objects-multipart.go index 7e2458268..30e81baa2 100644 --- a/xl-objects-multipart.go +++ b/xl-objects-multipart.go @@ -22,6 +22,7 @@ import ( "io" "path" "strings" + "sync" "time" ) @@ -109,13 +110,26 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload if !isUploadIDExists(xl.storage, bucket, object, uploadID) { return "", InvalidUploadID{UploadID: uploadID} } + + // Calculate s3 compatible md5sum for complete multipart. + s3MD5, err := completeMultipartMD5(parts...) + if err != nil { + return "", err + } + var metadata = MultipartObjectInfo{} - var md5Sums []string + var errs = make([]error, len(parts)) + + // Waitgroup to wait for go-routines. + var wg = &sync.WaitGroup{} + + // Loop through all parts, validate them and then commit to disk. for _, part := range parts { // Construct part suffix. partSuffix := fmt.Sprintf("%.5d.%s", part.PartNumber, part.ETag) multipartPartFile := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix) - fi, err := xl.storage.StatFile(minioMetaBucket, multipartPartFile) + var fi FileInfo + fi, err = xl.storage.StatFile(minioMetaBucket, multipartPartFile) if err != nil { if err == errFileNotFound { return "", InvalidPart{} @@ -129,22 +143,48 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload Size: fi.Size, }) metadata.Size += fi.Size + } + + // Loop through and atomically rename the parts to their actual location. + for index, part := range parts { + wg.Add(1) + go func(index int, part completePart) { + defer wg.Done() + partSuffix := fmt.Sprintf("%.5d.%s", part.PartNumber, part.ETag) + multipartPartFile := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix) + tmpMultipartObjSuffix := path.Join(tmpMetaPrefix, bucket, object, partNumToPartFileName(part.PartNumber)) + rErr := xl.storage.RenameFile(minioMetaBucket, multipartPartFile, minioMetaBucket, tmpMultipartObjSuffix) + if rErr != nil { + errs[index] = rErr + log.Errorf("Unable to rename file %s to %s, failed with %s", multipartPartFile, tmpMultipartObjSuffix, rErr) + return + } + multipartObjSuffix := path.Join(object, partNumToPartFileName(part.PartNumber)) + rErr = xl.storage.RenameFile(minioMetaBucket, tmpMultipartObjSuffix, bucket, multipartObjSuffix) + if rErr != nil { + if dErr := xl.storage.DeleteFile(minioMetaBucket, tmpMultipartObjSuffix); dErr != nil { + errs[index] = dErr + log.Errorf("Unable to delete file %s, failed with %s", tmpMultipartObjSuffix, dErr) + return + } + log.Debugf("Delete file succeeded on %s", tmpMultipartObjSuffix) + errs[index] = rErr + log.Errorf("Unable to rename file %s to %s, failed with %s", tmpMultipartObjSuffix, multipartObjSuffix, rErr) + } + }(index, part) + } - multipartObjSuffix := path.Join(object, partNumToPartFileName(part.PartNumber)) - err = xl.storage.RenameFile(minioMetaBucket, multipartPartFile, bucket, multipartObjSuffix) + // Wait for all the renames to finish. + wg.Wait() + + // Loop through errs list and return first error. + for _, err := range errs { if err != nil { - return "", err + return "", toObjectErr(err, bucket, object) } - - // Save md5sum for future response. - md5Sums = append(md5Sums, part.ETag) } - // Calculate and save s3 compatible md5sum. - s3MD5, err := completeMultipartMD5(md5Sums...) - if err != nil { - return "", err - } + // Save successfully calculated md5sum. metadata.MD5Sum = s3MD5 // Save modTime as well as the current time. metadata.ModTime = time.Now().UTC()