@ -272,189 +272,54 @@ func commitXLMetadata(disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPr
return evalDisks ( disks , mErrs ) , err
return evalDisks ( disks , mErrs ) , err
}
}
// listMultipartUploads - lists all multipart uploads.
// ListMultipartUploads - lists all the pending multipart
func ( xl xlObjects ) listMultipartUploads ( bucket , prefix , keyMarker , uploadIDMarker , delimiter string , maxUploads int ) ( lmi ListMultipartsInfo , e error ) {
// uploads for a particular object in a bucket.
result := ListMultipartsInfo {
//
IsTruncated : true ,
// Implements minimal S3 compatible ListMultipartUploads API. We do
MaxUploads : maxUploads ,
// not support prefix based listing, this is a deliberate attempt
KeyMarker : keyMarker ,
// towards simplification of multipart APIs.
Prefix : prefix ,
// The resulting ListMultipartsInfo structure is unmarshalled directly as XML.
Delimiter : delimiter ,
func ( xl xlObjects ) ListMultipartUploads ( bucket , object , keyMarker , uploadIDMarker , delimiter string , maxUploads int ) ( lmi ListMultipartsInfo , e error ) {
}
if err := checkListMultipartArgs ( bucket , object , keyMarker , uploadIDMarker , delimiter , xl ) ; err != nil {
return lmi , err
recursive := true
}
if delimiter == slashSeparator {
recursive = false
result := ListMultipartsInfo { }
}
result . IsTruncated = true
// Not using path.Join() as it strips off the trailing '/'.
result . MaxUploads = maxUploads
multipartPrefixPath := pathJoin ( bucket , prefix )
result . KeyMarker = keyMarker
if prefix == "" {
result . Prefix = object
// Should have a trailing "/" if prefix is ""
result . Delimiter = delimiter
// For ex. multipartPrefixPath should be "multipart/bucket/" if prefix is ""
multipartPrefixPath += slashSeparator
for _ , disk := range xl . getLoadBalancedDisks ( ) {
}
if disk == nil {
multipartMarkerPath := ""
continue
if keyMarker != "" {
multipartMarkerPath = pathJoin ( bucket , keyMarker )
}
var uploads [ ] MultipartInfo
var err error
var eof bool
// List all upload ids for the keyMarker starting from
// uploadIDMarker first.
if uploadIDMarker != "" {
// hold lock on keyMarker path
keyMarkerLock := globalNSMutex . NewNSLock ( minioMetaMultipartBucket ,
pathJoin ( bucket , keyMarker ) )
if err = keyMarkerLock . GetRLock ( globalListingTimeout ) ; err != nil {
return lmi , err
}
for _ , disk := range xl . getLoadBalancedDisks ( ) {
if disk == nil {
continue
}
uploads , _ , err = listMultipartUploadIDs ( bucket , keyMarker , uploadIDMarker , maxUploads , disk )
if err == nil {
break
}
if errors . IsErrIgnored ( err , objMetadataOpIgnoredErrs ... ) {
continue
}
break
}
}
keyMarkerLock . RUnlock ( )
uploads , _ , err := listMultipartUploadIDs ( bucket , object , uploadIDMarker , maxUploads , disk )
if err != nil {
if err != nil {
return lmi , err
return lmi , err
}
}
maxUploads = maxUploads - len ( uploads )
}
result . NextKeyMarker = object
var walkerCh chan treeWalkResult
// Loop through all the received uploads fill in the multiparts result.
var walkerDoneCh chan struct { }
for _ , upload := range uploads {
heal := false // true only for xl.ListObjectsHeal
uploadID := upload . UploadID
// Validate if we need to list further depending on maxUploads.
if maxUploads > 0 {
walkerCh , walkerDoneCh = xl . listPool . Release ( listParams { minioMetaMultipartBucket , recursive , multipartMarkerPath , multipartPrefixPath , heal } )
if walkerCh == nil {
walkerDoneCh = make ( chan struct { } )
isLeaf := xl . isMultipartUpload
listDir := listDirFactory ( isLeaf , xlTreeWalkIgnoredErrs , xl . getLoadBalancedDisks ( ) ... )
walkerCh = startTreeWalk ( minioMetaMultipartBucket , multipartPrefixPath , multipartMarkerPath , recursive , listDir , isLeaf , walkerDoneCh )
}
// Collect uploads until we have reached maxUploads count to 0.
for maxUploads > 0 {
walkResult , ok := <- walkerCh
if ! ok {
// Closed channel.
eof = true
break
}
// For any walk error return right away.
if walkResult . err != nil {
return lmi , walkResult . err
}
entry := strings . TrimPrefix ( walkResult . entry , retainSlash ( bucket ) )
// For an entry looking like a directory, store and
// continue the loop not need to fetch uploads.
if hasSuffix ( walkResult . entry , slashSeparator ) {
uploads = append ( uploads , MultipartInfo {
Object : entry ,
} )
maxUploads --
if maxUploads == 0 {
eof = true
break
}
continue
}
var newUploads [ ] MultipartInfo
var end bool
uploadIDMarker = ""
// For the new object entry we get all its
// pending uploadIDs.
entryLock := globalNSMutex . NewNSLock ( minioMetaMultipartBucket ,
pathJoin ( bucket , entry ) )
if err = entryLock . GetRLock ( globalListingTimeout ) ; err != nil {
return lmi , err
}
var disk StorageAPI
for _ , disk = range xl . getLoadBalancedDisks ( ) {
if disk == nil {
continue
}
newUploads , end , err = listMultipartUploadIDs ( bucket , entry , uploadIDMarker , maxUploads , disk )
if err == nil {
break
}
if errors . IsErrIgnored ( err , objMetadataOpIgnoredErrs ... ) {
continue
}
break
}
entryLock . RUnlock ( )
if err != nil {
if errors . IsErrIgnored ( err , xlTreeWalkIgnoredErrs ... ) {
continue
}
return lmi , err
}
uploads = append ( uploads , newUploads ... )
maxUploads -= len ( newUploads )
if end && walkResult . end {
eof = true
break
}
}
}
// For all received uploads fill in the multiparts result.
for _ , upload := range uploads {
var objectName string
var uploadID string
if hasSuffix ( upload . Object , slashSeparator ) {
// All directory entries are common prefixes.
uploadID = "" // For common prefixes, upload ids are empty.
objectName = upload . Object
result . CommonPrefixes = append ( result . CommonPrefixes , objectName )
} else {
uploadID = upload . UploadID
objectName = upload . Object
result . Uploads = append ( result . Uploads , upload )
result . Uploads = append ( result . Uploads , upload )
result . NextUploadIDMarker = uploadID
}
}
result . NextKeyMarker = objectName
result . NextUploadIDMarker = uploadID
}
if ! eof {
// Save the go-routine state in the pool so that it can continue from where it left off on
// the next request.
xl . listPool . Set ( listParams { bucket , recursive , result . NextKeyMarker , prefix , heal } , walkerCh , walkerDoneCh )
}
result . IsTruncated = ! eof
result . IsTruncated = len ( uploads ) == maxUploads
// Result is not truncated, reset the markers.
if ! result . IsTruncated {
result . NextKeyMarker = ""
result . NextUploadIDMarker = ""
}
return result , nil
}
// ListMultipartUploads - lists all the pending multipart uploads on a
if ! result . IsTruncated {
// bucket. Additionally takes 'prefix, keyMarker, uploadIDmarker and a
result . NextKeyMarker = ""
// delimiter' which allows us to list uploads match a particular
result . NextUploadIDMarker = ""
// prefix or lexically starting from 'keyMarker' or delimiting the
}
// output to get a directory like listing.
break
//
// Implements S3 compatible ListMultipartUploads API. The resulting
// ListMultipartsInfo structure is unmarshalled directly into XML and
// replied back to the client.
func ( xl xlObjects ) ListMultipartUploads ( bucket , prefix , keyMarker , uploadIDMarker , delimiter string , maxUploads int ) ( lmi ListMultipartsInfo , e error ) {
if err := checkListMultipartArgs ( bucket , prefix , keyMarker , uploadIDMarker , delimiter , xl ) ; err != nil {
return lmi , err
}
}
return xl . listMultipartUploads ( bucket , prefix , keyMarker , uploadIDMarker , delimiter , maxUploads )
return result , nil
}
}
// newMultipartUpload - wrapper for initializing a new multipart
// newMultipartUpload - wrapper for initializing a new multipart