xl: CompleteMultipartUpload should rename files in a routine. (#1527)

This solves the client timeout while renaming 9000+ parts.

Fixes #1526
master
Harshavardhana 9 years ago committed by Anand Babu (AB) Periasamy
parent 56b7df90e1
commit 3f51dd4fd4
  1. 15
      fs-objects-multipart.go
  2. 8
      object-utils.go
  3. 66
      xl-objects-multipart.go

@ -60,13 +60,19 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
return "", InvalidUploadID{UploadID: uploadID} return "", InvalidUploadID{UploadID: uploadID}
} }
// Calculate s3 compatible md5sum for complete multipart.
s3MD5, err := completeMultipartMD5(parts...)
if err != nil {
return "", err
}
tempObj := path.Join(tmpMetaPrefix, bucket, object, uploadID, incompleteFile) tempObj := path.Join(tmpMetaPrefix, bucket, object, uploadID, incompleteFile)
fileWriter, err := fs.storage.CreateFile(minioMetaBucket, tempObj) fileWriter, err := fs.storage.CreateFile(minioMetaBucket, tempObj)
if err != nil { if err != nil {
return "", toObjectErr(err, bucket, object) return "", toObjectErr(err, bucket, object)
} }
var md5Sums []string // Loop through all parts, validate them and then commit to disk.
for _, part := range parts { for _, part := range parts {
// Construct part suffix. // Construct part suffix.
partSuffix := fmt.Sprintf("%.5d.%s", part.PartNumber, part.ETag) partSuffix := fmt.Sprintf("%.5d.%s", part.PartNumber, part.ETag)
@ -96,7 +102,6 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
} }
return "", err return "", err
} }
md5Sums = append(md5Sums, part.ETag)
} }
err = fileWriter.Close() err = fileWriter.Close()
@ -117,12 +122,6 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
return "", toObjectErr(err, bucket, object) return "", toObjectErr(err, bucket, object)
} }
// Save the s3 md5.
s3MD5, err := completeMultipartMD5(md5Sums...)
if err != nil {
return "", err
}
// Cleanup all the parts if everything else has been safely committed. // Cleanup all the parts if everything else has been safely committed.
if err = cleanupUploadedParts(fs.storage, mpartMetaPrefix, bucket, object, uploadID); err != nil { if err = cleanupUploadedParts(fs.storage, mpartMetaPrefix, bucket, object, uploadID); err != nil {
return "", err return "", err

@ -109,10 +109,10 @@ func pathJoin(s1 string, s2 string) string {
} }
// Create an s3 compatible MD5sum for complete multipart transaction. // Create an s3 compatible MD5sum for complete multipart transaction.
func completeMultipartMD5(md5Strs ...string) (string, error) { func completeMultipartMD5(parts ...completePart) (string, error) {
var finalMD5Bytes []byte var finalMD5Bytes []byte
for _, md5Str := range md5Strs { for _, part := range parts {
md5Bytes, err := hex.DecodeString(md5Str) md5Bytes, err := hex.DecodeString(part.ETag)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -120,7 +120,7 @@ func completeMultipartMD5(md5Strs ...string) (string, error) {
} }
md5Hasher := md5.New() md5Hasher := md5.New()
md5Hasher.Write(finalMD5Bytes) md5Hasher.Write(finalMD5Bytes)
s3MD5 := fmt.Sprintf("%s-%d", hex.EncodeToString(md5Hasher.Sum(nil)), len(md5Strs)) s3MD5 := fmt.Sprintf("%s-%d", hex.EncodeToString(md5Hasher.Sum(nil)), len(parts))
return s3MD5, nil return s3MD5, nil
} }

@ -22,6 +22,7 @@ import (
"io" "io"
"path" "path"
"strings" "strings"
"sync"
"time" "time"
) )
@ -109,13 +110,26 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
if !isUploadIDExists(xl.storage, bucket, object, uploadID) { if !isUploadIDExists(xl.storage, bucket, object, uploadID) {
return "", InvalidUploadID{UploadID: uploadID} return "", InvalidUploadID{UploadID: uploadID}
} }
// Calculate s3 compatible md5sum for complete multipart.
s3MD5, err := completeMultipartMD5(parts...)
if err != nil {
return "", err
}
var metadata = MultipartObjectInfo{} var metadata = MultipartObjectInfo{}
var md5Sums []string var errs = make([]error, len(parts))
// Waitgroup to wait for go-routines.
var wg = &sync.WaitGroup{}
// Loop through all parts, validate them and then commit to disk.
for _, part := range parts { for _, part := range parts {
// Construct part suffix. // Construct part suffix.
partSuffix := fmt.Sprintf("%.5d.%s", part.PartNumber, part.ETag) partSuffix := fmt.Sprintf("%.5d.%s", part.PartNumber, part.ETag)
multipartPartFile := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix) multipartPartFile := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix)
fi, err := xl.storage.StatFile(minioMetaBucket, multipartPartFile) var fi FileInfo
fi, err = xl.storage.StatFile(minioMetaBucket, multipartPartFile)
if err != nil { if err != nil {
if err == errFileNotFound { if err == errFileNotFound {
return "", InvalidPart{} return "", InvalidPart{}
@ -129,22 +143,48 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
Size: fi.Size, Size: fi.Size,
}) })
metadata.Size += fi.Size metadata.Size += fi.Size
}
// Loop through and atomically rename the parts to their actual location.
for index, part := range parts {
wg.Add(1)
go func(index int, part completePart) {
defer wg.Done()
partSuffix := fmt.Sprintf("%.5d.%s", part.PartNumber, part.ETag)
multipartPartFile := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix)
tmpMultipartObjSuffix := path.Join(tmpMetaPrefix, bucket, object, partNumToPartFileName(part.PartNumber))
rErr := xl.storage.RenameFile(minioMetaBucket, multipartPartFile, minioMetaBucket, tmpMultipartObjSuffix)
if rErr != nil {
errs[index] = rErr
log.Errorf("Unable to rename file %s to %s, failed with %s", multipartPartFile, tmpMultipartObjSuffix, rErr)
return
}
multipartObjSuffix := path.Join(object, partNumToPartFileName(part.PartNumber))
rErr = xl.storage.RenameFile(minioMetaBucket, tmpMultipartObjSuffix, bucket, multipartObjSuffix)
if rErr != nil {
if dErr := xl.storage.DeleteFile(minioMetaBucket, tmpMultipartObjSuffix); dErr != nil {
errs[index] = dErr
log.Errorf("Unable to delete file %s, failed with %s", tmpMultipartObjSuffix, dErr)
return
}
log.Debugf("Delete file succeeded on %s", tmpMultipartObjSuffix)
errs[index] = rErr
log.Errorf("Unable to rename file %s to %s, failed with %s", tmpMultipartObjSuffix, multipartObjSuffix, rErr)
}
}(index, part)
}
multipartObjSuffix := path.Join(object, partNumToPartFileName(part.PartNumber)) // Wait for all the renames to finish.
err = xl.storage.RenameFile(minioMetaBucket, multipartPartFile, bucket, multipartObjSuffix) wg.Wait()
// Loop through errs list and return first error.
for _, err := range errs {
if err != nil { if err != nil {
return "", err return "", toObjectErr(err, bucket, object)
} }
// Save md5sum for future response.
md5Sums = append(md5Sums, part.ETag)
} }
// Calculate and save s3 compatible md5sum. // Save successfully calculated md5sum.
s3MD5, err := completeMultipartMD5(md5Sums...)
if err != nil {
return "", err
}
metadata.MD5Sum = s3MD5 metadata.MD5Sum = s3MD5
// Save modTime as well as the current time. // Save modTime as well as the current time.
metadata.ModTime = time.Now().UTC() metadata.ModTime = time.Now().UTC()

Loading…
Cancel
Save