Enable ListMultipartUploads and ListObjectParts for FS (#4996)

* Enable ListMultipartUploads and ListObjectParts for FS.
Previously we had disabled ListMultipartUploads and ListObjectParts
to see if any clients break. Docker registry broke. This patch
enables ListMultipartUploads and ListObjectParts, however
ListMultipartUploads with prefix based listing is not
supported (which is not used by docker registry anyway).
i.e ListMultipartUploads will need exact object name.
master
Krishna Srinivas 7 years ago committed by Dee Koder
parent c46c2a6dd6
commit d1712a46a7
  1. 164
      cmd/fs-v1-multipart.go

@ -164,8 +164,8 @@ func (fs fsObjects) listMultipartUploadIDs(bucketName, objectName, uploadIDMarke
return uploads, end, nil return uploads, end, nil
} }
// listMultipartUploads - lists all multipart uploads. // listMultipartUploadsCleanup - lists all multipart uploads. Called by fs.cleanupStaleMultipartUpload()
func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { func (fs fsObjects) listMultipartUploadsCleanup(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) {
result := ListMultipartsInfo{} result := ListMultipartsInfo{}
recursive := true recursive := true
if delimiter == slashSeparator { if delimiter == slashSeparator {
@ -308,44 +308,46 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
return result, nil return result, nil
} }
// ListMultipartUploads - returns empty response always. The onus is // ListMultipartUploads - lists all the uploadIDs for the specified object.
// on applications to remember uploadId of the multipart uploads that // We do not support prefix based listing.
// are in progress. func (fs fsObjects) ListMultipartUploads(bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) {
func (fs fsObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { if err := checkListMultipartArgs(bucket, object, keyMarker, uploadIDMarker, delimiter, fs); err != nil {
if err := checkListMultipartArgs(bucket, prefix, keyMarker, uploadIDMarker, delimiter, fs); err != nil {
return lmi, err return lmi, err
} }
if _, err := fs.statBucketDir(bucket); err != nil { if _, err := fs.statBucketDir(bucket); err != nil {
return lmi, toObjectErr(err, bucket) return lmi, toObjectErr(err, bucket)
} }
return ListMultipartsInfo{
Prefix: prefix,
KeyMarker: keyMarker,
UploadIDMarker: uploadIDMarker,
Delimiter: delimiter,
MaxUploads: maxUploads,
IsTruncated: false,
}, nil
}
// listMultipartUploadsHelper - lists all the pending multipart uploads on a result := ListMultipartsInfo{}
// bucket. Additionally takes 'prefix, keyMarker, uploadIDmarker and a
// delimiter' which allows us to list uploads match a particular result.IsTruncated = true
// prefix or lexically starting from 'keyMarker' or delimiting the result.MaxUploads = maxUploads
// output to get a directory like listing. result.KeyMarker = keyMarker
// result.Prefix = object
// This function remains here to aid in listing uploads that require cleanup. result.Delimiter = delimiter
func (fs fsObjects) listMultipartUploadsHelper(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) {
if err := checkListMultipartArgs(bucket, prefix, keyMarker, uploadIDMarker, delimiter, fs); err != nil { uploads, _, err := fs.listMultipartUploadIDs(bucket, object, uploadIDMarker, maxUploads)
if err != nil {
return lmi, err return lmi, err
} }
if _, err := fs.statBucketDir(bucket); err != nil { result.NextKeyMarker = object
return lmi, toObjectErr(err, bucket) // Loop through all the received uploads fill in the multiparts result.
for _, upload := range uploads {
uploadID := upload.UploadID
result.Uploads = append(result.Uploads, upload)
result.NextUploadIDMarker = uploadID
} }
return fs.listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) result.IsTruncated = len(uploads) == maxUploads
if !result.IsTruncated {
result.NextKeyMarker = ""
result.NextUploadIDMarker = ""
}
return result, nil
} }
// newMultipartUpload - wrapper for initializing a new multipart // newMultipartUpload - wrapper for initializing a new multipart
@ -596,9 +598,81 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, d
}, nil }, nil
} }
// ListObjectParts - returns empty response always. Applications are // listObjectParts - wrapper scanning through
// expected to remember the uploaded part numbers and corresponding // '.minio.sys/multipart/bucket/object/UPLOADID'. Lists all the parts
// etags. // saved inside '.minio.sys/multipart/bucket/object/UPLOADID'.
func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (lpi ListPartsInfo, e error) {
result := ListPartsInfo{}
uploadIDPath := pathJoin(bucket, object, uploadID)
fsMetaPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile)
metaFile, err := fs.rwPool.Open(fsMetaPath)
if err != nil {
if err == errFileNotFound || err == errFileAccessDenied {
// On windows oddly this is returned.
return lpi, traceError(InvalidUploadID{UploadID: uploadID})
}
return lpi, toObjectErr(traceError(err), bucket, object)
}
defer fs.rwPool.Close(fsMetaPath)
fsMeta := fsMetaV1{}
_, err = fsMeta.ReadFrom(metaFile.LockedFile)
if err != nil {
return lpi, toObjectErr(err, minioMetaBucket, fsMetaPath)
}
// Only parts with higher part numbers will be listed.
partIdx := fsMeta.ObjectPartIndex(partNumberMarker)
parts := fsMeta.Parts
if partIdx != -1 {
parts = fsMeta.Parts[partIdx+1:]
}
count := maxParts
for _, part := range parts {
var fi os.FileInfo
partNamePath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, part.Name)
fi, err = fsStatFile(partNamePath)
if err != nil {
return lpi, toObjectErr(err, minioMetaMultipartBucket, partNamePath)
}
result.Parts = append(result.Parts, PartInfo{
PartNumber: part.Number,
ETag: part.ETag,
LastModified: fi.ModTime(),
Size: fi.Size(),
})
count--
if count == 0 {
break
}
}
// If listed entries are more than maxParts, we set IsTruncated as true.
if len(parts) > len(result.Parts) {
result.IsTruncated = true
// Make sure to fill next part number marker if IsTruncated is
// true for subsequent listing.
nextPartNumberMarker := result.Parts[len(result.Parts)-1].PartNumber
result.NextPartNumberMarker = nextPartNumberMarker
}
result.Bucket = bucket
result.Object = object
result.UploadID = uploadID
result.MaxParts = maxParts
// Success.
return result, nil
}
// ListObjectParts - lists all previously uploaded parts for a given
// object and uploadID. Takes additional input of part-number-marker
// to indicate where the listing should begin from.
//
// Implements S3 compatible ListObjectParts API. The resulting
// ListPartsInfo structure is unmarshalled directly into XML and
// replied back to the client.
func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (lpi ListPartsInfo, e error) { func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (lpi ListPartsInfo, e error) {
if err := checkListPartsArgs(bucket, object, fs); err != nil { if err := checkListPartsArgs(bucket, object, fs); err != nil {
return lpi, err return lpi, err
@ -609,24 +683,20 @@ func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberM
return lpi, toObjectErr(err, bucket) return lpi, toObjectErr(err, bucket)
} }
// Check if uploadID exists. N B This check may race with a // Hold the lock so that two parallel complete-multipart-uploads
// concurrent abortMultipartUpload on the same uploadID. It is // do not leave a stale uploads.json behind.
// OK to be eventually consistent w.r.t listing of objects, objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object))
// uploads and parts. if err := objectMPartPathLock.GetRLock(globalListingTimeout); err != nil {
uploadIDPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, object, uploadID) return lpi, traceError(err)
_, err := fsStatDir(uploadIDPath) }
defer objectMPartPathLock.RUnlock()
listPartsInfo, err := fs.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts)
if err != nil { if err != nil {
return lpi, traceError(InvalidUploadID{UploadID: uploadID}) return lpi, toObjectErr(err, bucket, object)
} }
// Return empty list parts response return listPartsInfo, nil
return ListPartsInfo{
Bucket: bucket,
Object: object,
UploadID: uploadID,
PartNumberMarker: 0,
MaxParts: maxParts,
}, nil
} }
// CompleteMultipartUpload - completes an ongoing multipart // CompleteMultipartUpload - completes an ongoing multipart
@ -944,7 +1014,7 @@ func (fs fsObjects) cleanupStaleMultipartUpload(bucket string, expiry time.Durat
for { for {
// List multipart uploads in a bucket 1000 at a time // List multipart uploads in a bucket 1000 at a time
prefix := "" prefix := ""
lmi, err = fs.listMultipartUploadsHelper(bucket, prefix, lmi.KeyMarker, lmi.UploadIDMarker, "", 1000) lmi, err = fs.listMultipartUploadsCleanup(bucket, prefix, lmi.KeyMarker, lmi.UploadIDMarker, "", 1000)
if err != nil { if err != nil {
errorIf(err, "Unable to list uploads") errorIf(err, "Unable to list uploads")
return err return err

Loading…
Cancel
Save