From d1712a46a7c48219f72b996728a9bad3558e0d10 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Thu, 5 Oct 2017 16:08:25 -0700 Subject: [PATCH] Enable ListMultipartUploads and ListObjectParts for FS (#4996) * Enable ListMultipartUploads and ListObjectParts for FS. Previously we had disabled ListMultipartUploads and ListObjectParts to see if any clients break. Docker registry broke. This patch enables ListMultipartUploads and ListObjectParts, however ListMultipartUploads with prefix based listing is not supported (which is not used by docker registry anyway). i.e ListMultipartUploads will need exact object name. --- cmd/fs-v1-multipart.go | 164 +++++++++++++++++++++++++++++------------ 1 file changed, 117 insertions(+), 47 deletions(-) diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 0a635cc3f..427f12ef6 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -164,8 +164,8 @@ func (fs fsObjects) listMultipartUploadIDs(bucketName, objectName, uploadIDMarke return uploads, end, nil } -// listMultipartUploads - lists all multipart uploads. -func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { +// listMultipartUploadsCleanup - lists all multipart uploads. Called by fs.cleanupStaleMultipartUpload() +func (fs fsObjects) listMultipartUploadsCleanup(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { result := ListMultipartsInfo{} recursive := true if delimiter == slashSeparator { @@ -308,44 +308,46 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark return result, nil } -// ListMultipartUploads - returns empty response always. The onus is -// on applications to remember uploadId of the multipart uploads that -// are in progress. -func (fs fsObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { - if err := checkListMultipartArgs(bucket, prefix, keyMarker, uploadIDMarker, delimiter, fs); err != nil { +// ListMultipartUploads - lists all the uploadIDs for the specified object. +// We do not support prefix based listing. +func (fs fsObjects) ListMultipartUploads(bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { + if err := checkListMultipartArgs(bucket, object, keyMarker, uploadIDMarker, delimiter, fs); err != nil { return lmi, err } if _, err := fs.statBucketDir(bucket); err != nil { return lmi, toObjectErr(err, bucket) } - return ListMultipartsInfo{ - Prefix: prefix, - KeyMarker: keyMarker, - UploadIDMarker: uploadIDMarker, - Delimiter: delimiter, - MaxUploads: maxUploads, - IsTruncated: false, - }, nil -} -// listMultipartUploadsHelper - lists all the pending multipart uploads on a -// bucket. Additionally takes 'prefix, keyMarker, uploadIDmarker and a -// delimiter' which allows us to list uploads match a particular -// prefix or lexically starting from 'keyMarker' or delimiting the -// output to get a directory like listing. -// -// This function remains here to aid in listing uploads that require cleanup. -func (fs fsObjects) listMultipartUploadsHelper(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { - if err := checkListMultipartArgs(bucket, prefix, keyMarker, uploadIDMarker, delimiter, fs); err != nil { + result := ListMultipartsInfo{} + + result.IsTruncated = true + result.MaxUploads = maxUploads + result.KeyMarker = keyMarker + result.Prefix = object + result.Delimiter = delimiter + + uploads, _, err := fs.listMultipartUploadIDs(bucket, object, uploadIDMarker, maxUploads) + if err != nil { return lmi, err } - if _, err := fs.statBucketDir(bucket); err != nil { - return lmi, toObjectErr(err, bucket) + result.NextKeyMarker = object + // Loop through all the received uploads fill in the multiparts result. + for _, upload := range uploads { + uploadID := upload.UploadID + result.Uploads = append(result.Uploads, upload) + result.NextUploadIDMarker = uploadID } - return fs.listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) + result.IsTruncated = len(uploads) == maxUploads + + if !result.IsTruncated { + result.NextKeyMarker = "" + result.NextUploadIDMarker = "" + } + + return result, nil } // newMultipartUpload - wrapper for initializing a new multipart @@ -596,9 +598,81 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, d }, nil } -// ListObjectParts - returns empty response always. Applications are -// expected to remember the uploaded part numbers and corresponding -// etags. +// listObjectParts - wrapper scanning through +// '.minio.sys/multipart/bucket/object/UPLOADID'. Lists all the parts +// saved inside '.minio.sys/multipart/bucket/object/UPLOADID'. +func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (lpi ListPartsInfo, e error) { + result := ListPartsInfo{} + + uploadIDPath := pathJoin(bucket, object, uploadID) + fsMetaPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile) + metaFile, err := fs.rwPool.Open(fsMetaPath) + if err != nil { + if err == errFileNotFound || err == errFileAccessDenied { + // On windows oddly this is returned. + return lpi, traceError(InvalidUploadID{UploadID: uploadID}) + } + return lpi, toObjectErr(traceError(err), bucket, object) + } + defer fs.rwPool.Close(fsMetaPath) + + fsMeta := fsMetaV1{} + _, err = fsMeta.ReadFrom(metaFile.LockedFile) + if err != nil { + return lpi, toObjectErr(err, minioMetaBucket, fsMetaPath) + } + + // Only parts with higher part numbers will be listed. + partIdx := fsMeta.ObjectPartIndex(partNumberMarker) + parts := fsMeta.Parts + if partIdx != -1 { + parts = fsMeta.Parts[partIdx+1:] + } + + count := maxParts + for _, part := range parts { + var fi os.FileInfo + partNamePath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, part.Name) + fi, err = fsStatFile(partNamePath) + if err != nil { + return lpi, toObjectErr(err, minioMetaMultipartBucket, partNamePath) + } + result.Parts = append(result.Parts, PartInfo{ + PartNumber: part.Number, + ETag: part.ETag, + LastModified: fi.ModTime(), + Size: fi.Size(), + }) + count-- + if count == 0 { + break + } + } + + // If listed entries are more than maxParts, we set IsTruncated as true. + if len(parts) > len(result.Parts) { + result.IsTruncated = true + // Make sure to fill next part number marker if IsTruncated is + // true for subsequent listing. + nextPartNumberMarker := result.Parts[len(result.Parts)-1].PartNumber + result.NextPartNumberMarker = nextPartNumberMarker + } + result.Bucket = bucket + result.Object = object + result.UploadID = uploadID + result.MaxParts = maxParts + + // Success. + return result, nil +} + +// ListObjectParts - lists all previously uploaded parts for a given +// object and uploadID. Takes additional input of part-number-marker +// to indicate where the listing should begin from. +// +// Implements S3 compatible ListObjectParts API. The resulting +// ListPartsInfo structure is unmarshalled directly into XML and +// replied back to the client. func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (lpi ListPartsInfo, e error) { if err := checkListPartsArgs(bucket, object, fs); err != nil { return lpi, err @@ -609,24 +683,20 @@ func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberM return lpi, toObjectErr(err, bucket) } - // Check if uploadID exists. N B This check may race with a - // concurrent abortMultipartUpload on the same uploadID. It is - // OK to be eventually consistent w.r.t listing of objects, - // uploads and parts. - uploadIDPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, object, uploadID) - _, err := fsStatDir(uploadIDPath) + // Hold the lock so that two parallel complete-multipart-uploads + // do not leave a stale uploads.json behind. + objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) + if err := objectMPartPathLock.GetRLock(globalListingTimeout); err != nil { + return lpi, traceError(err) + } + defer objectMPartPathLock.RUnlock() + + listPartsInfo, err := fs.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts) if err != nil { - return lpi, traceError(InvalidUploadID{UploadID: uploadID}) + return lpi, toObjectErr(err, bucket, object) } - // Return empty list parts response - return ListPartsInfo{ - Bucket: bucket, - Object: object, - UploadID: uploadID, - PartNumberMarker: 0, - MaxParts: maxParts, - }, nil + return listPartsInfo, nil } // CompleteMultipartUpload - completes an ongoing multipart @@ -944,7 +1014,7 @@ func (fs fsObjects) cleanupStaleMultipartUpload(bucket string, expiry time.Durat for { // List multipart uploads in a bucket 1000 at a time prefix := "" - lmi, err = fs.listMultipartUploadsHelper(bucket, prefix, lmi.KeyMarker, lmi.UploadIDMarker, "", 1000) + lmi, err = fs.listMultipartUploadsCleanup(bucket, prefix, lmi.KeyMarker, lmi.UploadIDMarker, "", 1000) if err != nil { errorIf(err, "Unable to list uploads") return err