From c87f259820e493b3a1b7cbc0cca2f0f9b9dab613 Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Sun, 29 May 2016 01:53:08 +0530 Subject: [PATCH] Remove parts that are missing in CompleteMultipartUpload (#1786) * Remove parts that are missing in CompleteMultipartUpload * Moved isUploadIDExists under proper namespace locks * Moved code that deletes part files to a function --- xl-v1-multipart-common.go | 17 +++++++++++++++++ xl-v1-multipart.go | 28 ++++++++++++++++++++++------ 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/xl-v1-multipart-common.go b/xl-v1-multipart-common.go index 8bc50e0d2..f09e187e7 100644 --- a/xl-v1-multipart-common.go +++ b/xl-v1-multipart-common.go @@ -511,3 +511,20 @@ func (xl xlObjects) isUploadIDExists(bucket, object, uploadID string) bool { uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) return xl.isObject(minioMetaBucket, uploadIDPath) } + +// Removes part given by partName belonging to a mulitpart upload from minioMetaBucket +func (xl xlObjects) removeObjectPart(bucket, object, uploadID, partName string) { + curpartPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partName) + wg := sync.WaitGroup{} + for i, disk := range xl.storageDisks { + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + // Ignoring failure to remove parts that weren't present in CompleteMultipartUpload + // requests. xl.json is the authoritative source of truth on which parts constitute + // the object. The presence of parts that don't belong in the object doesn't affect correctness. + _ = disk.DeleteFile(minioMetaBucket, curpartPath) + }(i, disk) + } + wg.Wait() +} diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index e49c96bad..d194b9da9 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -233,12 +233,14 @@ func (xl xlObjects) listObjectPartsCommon(bucket, object, uploadID string, partN if !IsValidObjectName(object) { return ListPartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object} } - if !xl.isUploadIDExists(bucket, object, uploadID) { - return ListPartsInfo{}, InvalidUploadID{UploadID: uploadID} - } // Hold lock so that there is no competing abort-multipart-upload or complete-multipart-upload. nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + + if !xl.isUploadIDExists(bucket, object, uploadID) { + return ListPartsInfo{}, InvalidUploadID{UploadID: uploadID} + } + result := ListPartsInfo{} uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) @@ -404,6 +406,19 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload return "", toObjectErr(err, bucket, object) } + // Remove parts that weren't present in CompleteMultipartUpload request + for _, curpart := range currentXLMeta.Parts { + if xlMeta.ObjectPartIndex(curpart.Number) == -1 { + // Delete the missing part files. e.g, + // Request 1: NewMultipart + // Request 2: PutObjectPart 1 + // Request 3: PutObjectPart 2 + // Request 4: CompleteMultipartUpload --part 2 + // N.B. 1st part is not present. This part should be removed from the storage. + xl.removeObjectPart(bucket, object, uploadID, curpart.Name) + } + } + if err = xl.renameObject(minioMetaBucket, uploadIDPath, bucket, object); err != nil { return "", toObjectErr(err, bucket, object) } @@ -450,14 +465,15 @@ func (xl xlObjects) abortMultipartUploadCommon(bucket, object, uploadID string) if !IsValidObjectName(object) { return ObjectNameInvalid{Bucket: bucket, Object: object} } - if !xl.isUploadIDExists(bucket, object, uploadID) { - return InvalidUploadID{UploadID: uploadID} - } // Hold lock so that there is no competing complete-multipart-upload or put-object-part. nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + if !xl.isUploadIDExists(bucket, object, uploadID) { + return InvalidUploadID{UploadID: uploadID} + } + // Cleanup all uploaded parts. if err := cleanupUploadedParts(bucket, object, uploadID, xl.storageDisks...); err != nil { return toObjectErr(err, bucket, object)