diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 0a635cc3f..427f12ef6 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -164,8 +164,8 @@ func (fs fsObjects) listMultipartUploadIDs(bucketName, objectName, uploadIDMarke return uploads, end, nil } -// listMultipartUploads - lists all multipart uploads. -func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { +// listMultipartUploadsCleanup - lists all multipart uploads. Called by fs.cleanupStaleMultipartUpload() +func (fs fsObjects) listMultipartUploadsCleanup(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { result := ListMultipartsInfo{} recursive := true if delimiter == slashSeparator { @@ -308,44 +308,46 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark return result, nil } -// ListMultipartUploads - returns empty response always. The onus is -// on applications to remember uploadId of the multipart uploads that -// are in progress. -func (fs fsObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { - if err := checkListMultipartArgs(bucket, prefix, keyMarker, uploadIDMarker, delimiter, fs); err != nil { +// ListMultipartUploads - lists all the uploadIDs for the specified object. +// We do not support prefix based listing. +func (fs fsObjects) ListMultipartUploads(bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { + if err := checkListMultipartArgs(bucket, object, keyMarker, uploadIDMarker, delimiter, fs); err != nil { return lmi, err } if _, err := fs.statBucketDir(bucket); err != nil { return lmi, toObjectErr(err, bucket) } - return ListMultipartsInfo{ - Prefix: prefix, - KeyMarker: keyMarker, - UploadIDMarker: uploadIDMarker, - Delimiter: delimiter, - MaxUploads: maxUploads, - IsTruncated: false, - }, nil -} -// listMultipartUploadsHelper - lists all the pending multipart uploads on a -// bucket. Additionally takes 'prefix, keyMarker, uploadIDmarker and a -// delimiter' which allows us to list uploads match a particular -// prefix or lexically starting from 'keyMarker' or delimiting the -// output to get a directory like listing. -// -// This function remains here to aid in listing uploads that require cleanup. -func (fs fsObjects) listMultipartUploadsHelper(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { - if err := checkListMultipartArgs(bucket, prefix, keyMarker, uploadIDMarker, delimiter, fs); err != nil { + result := ListMultipartsInfo{} + + result.IsTruncated = true + result.MaxUploads = maxUploads + result.KeyMarker = keyMarker + result.Prefix = object + result.Delimiter = delimiter + + uploads, _, err := fs.listMultipartUploadIDs(bucket, object, uploadIDMarker, maxUploads) + if err != nil { return lmi, err } - if _, err := fs.statBucketDir(bucket); err != nil { - return lmi, toObjectErr(err, bucket) + result.NextKeyMarker = object + // Loop through all the received uploads fill in the multiparts result. + for _, upload := range uploads { + uploadID := upload.UploadID + result.Uploads = append(result.Uploads, upload) + result.NextUploadIDMarker = uploadID } - return fs.listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) + result.IsTruncated = len(uploads) == maxUploads + + if !result.IsTruncated { + result.NextKeyMarker = "" + result.NextUploadIDMarker = "" + } + + return result, nil } // newMultipartUpload - wrapper for initializing a new multipart @@ -596,9 +598,81 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, d }, nil } -// ListObjectParts - returns empty response always. Applications are -// expected to remember the uploaded part numbers and corresponding -// etags. +// listObjectParts - wrapper scanning through +// '.minio.sys/multipart/bucket/object/UPLOADID'. Lists all the parts +// saved inside '.minio.sys/multipart/bucket/object/UPLOADID'. +func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (lpi ListPartsInfo, e error) { + result := ListPartsInfo{} + + uploadIDPath := pathJoin(bucket, object, uploadID) + fsMetaPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile) + metaFile, err := fs.rwPool.Open(fsMetaPath) + if err != nil { + if err == errFileNotFound || err == errFileAccessDenied { + // On windows oddly this is returned. + return lpi, traceError(InvalidUploadID{UploadID: uploadID}) + } + return lpi, toObjectErr(traceError(err), bucket, object) + } + defer fs.rwPool.Close(fsMetaPath) + + fsMeta := fsMetaV1{} + _, err = fsMeta.ReadFrom(metaFile.LockedFile) + if err != nil { + return lpi, toObjectErr(err, minioMetaBucket, fsMetaPath) + } + + // Only parts with higher part numbers will be listed. + partIdx := fsMeta.ObjectPartIndex(partNumberMarker) + parts := fsMeta.Parts + if partIdx != -1 { + parts = fsMeta.Parts[partIdx+1:] + } + + count := maxParts + for _, part := range parts { + var fi os.FileInfo + partNamePath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, part.Name) + fi, err = fsStatFile(partNamePath) + if err != nil { + return lpi, toObjectErr(err, minioMetaMultipartBucket, partNamePath) + } + result.Parts = append(result.Parts, PartInfo{ + PartNumber: part.Number, + ETag: part.ETag, + LastModified: fi.ModTime(), + Size: fi.Size(), + }) + count-- + if count == 0 { + break + } + } + + // If listed entries are more than maxParts, we set IsTruncated as true. + if len(parts) > len(result.Parts) { + result.IsTruncated = true + // Make sure to fill next part number marker if IsTruncated is + // true for subsequent listing. + nextPartNumberMarker := result.Parts[len(result.Parts)-1].PartNumber + result.NextPartNumberMarker = nextPartNumberMarker + } + result.Bucket = bucket + result.Object = object + result.UploadID = uploadID + result.MaxParts = maxParts + + // Success. + return result, nil +} + +// ListObjectParts - lists all previously uploaded parts for a given +// object and uploadID. Takes additional input of part-number-marker +// to indicate where the listing should begin from. +// +// Implements S3 compatible ListObjectParts API. The resulting +// ListPartsInfo structure is unmarshalled directly into XML and +// replied back to the client. func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (lpi ListPartsInfo, e error) { if err := checkListPartsArgs(bucket, object, fs); err != nil { return lpi, err @@ -609,24 +683,20 @@ func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberM return lpi, toObjectErr(err, bucket) } - // Check if uploadID exists. N B This check may race with a - // concurrent abortMultipartUpload on the same uploadID. It is - // OK to be eventually consistent w.r.t listing of objects, - // uploads and parts. - uploadIDPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, object, uploadID) - _, err := fsStatDir(uploadIDPath) + // Hold the lock so that two parallel complete-multipart-uploads + // do not leave a stale uploads.json behind. + objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) + if err := objectMPartPathLock.GetRLock(globalListingTimeout); err != nil { + return lpi, traceError(err) + } + defer objectMPartPathLock.RUnlock() + + listPartsInfo, err := fs.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts) if err != nil { - return lpi, traceError(InvalidUploadID{UploadID: uploadID}) + return lpi, toObjectErr(err, bucket, object) } - // Return empty list parts response - return ListPartsInfo{ - Bucket: bucket, - Object: object, - UploadID: uploadID, - PartNumberMarker: 0, - MaxParts: maxParts, - }, nil + return listPartsInfo, nil } // CompleteMultipartUpload - completes an ongoing multipart @@ -944,7 +1014,7 @@ func (fs fsObjects) cleanupStaleMultipartUpload(bucket string, expiry time.Durat for { // List multipart uploads in a bucket 1000 at a time prefix := "" - lmi, err = fs.listMultipartUploadsHelper(bucket, prefix, lmi.KeyMarker, lmi.UploadIDMarker, "", 1000) + lmi, err = fs.listMultipartUploadsCleanup(bucket, prefix, lmi.KeyMarker, lmi.UploadIDMarker, "", 1000) if err != nil { errorIf(err, "Unable to list uploads") return err