diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index ae73e5b79..1055ea5bf 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -30,13 +30,6 @@ import ( "github.com/minio/minio/pkg/lock" ) -const ( - // Expiry duration after which the multipart uploads are deemed stale. - fsMultipartExpiry = time.Hour * 24 * 14 // 2 weeks. - // Cleanup interval when the stale multipart cleanup is initiated. - fsMultipartCleanupInterval = time.Hour * 24 // 24 hrs. -) - // Returns if the prefix is a multipart upload. func (fs fsObjects) isMultipartUpload(bucket, prefix string) bool { uploadsIDPath := pathJoin(fs.fsPath, bucket, prefix, uploadsJSONFile) @@ -1042,61 +1035,3 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error return nil } - -// Removes multipart uploads if any older than `expiry` duration in a given bucket. -func (fs fsObjects) cleanupStaleMultipartUpload(bucket string, expiry time.Duration) (err error) { - var lmi ListMultipartsInfo - var st os.FileInfo - for { - // List multipart uploads in a bucket 1000 at a time - prefix := "" - lmi, err = fs.listMultipartUploadsCleanup(bucket, prefix, lmi.KeyMarker, lmi.UploadIDMarker, "", 1000) - if err != nil { - errorIf(err, "Unable to list uploads") - return err - } - - // Remove uploads (and its parts) older than expiry duration. - for _, upload := range lmi.Uploads { - uploadIDPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, upload.Object, upload.UploadID) - if st, err = fsStatDir(uploadIDPath); err != nil { - errorIf(err, "Failed to lookup uploads directory path %s", uploadIDPath) - continue - } - if time.Since(st.ModTime()) > expiry { - fs.AbortMultipartUpload(bucket, upload.Object, upload.UploadID) - } - } - - // No more incomplete uploads remain, break and return. - if !lmi.IsTruncated { - break - } - } - - return nil -} - -// Removes multipart uploads if any older than `expiry` duration -// on all buckets for every `cleanupInterval`, this function is -// blocking and should be run in a go-routine. -func (fs fsObjects) cleanupStaleMultipartUploads(cleanupInterval, expiry time.Duration, doneCh chan struct{}) { - ticker := time.NewTicker(cleanupInterval) - for { - select { - case <-doneCh: - // Stop the timer. - ticker.Stop() - return - case <-ticker.C: - bucketInfos, err := fs.ListBuckets() - if err != nil { - errorIf(err, "Unable to list buckets") - continue - } - for _, bucketInfo := range bucketInfos { - fs.cleanupStaleMultipartUpload(bucketInfo.Name, expiry) - } - } - } -} diff --git a/cmd/fs-v1-multipart_test.go b/cmd/fs-v1-multipart_test.go index 0cea3f9e7..7692866aa 100644 --- a/cmd/fs-v1-multipart_test.go +++ b/cmd/fs-v1-multipart_test.go @@ -26,6 +26,7 @@ import ( "github.com/minio/minio/pkg/errors" ) +// Tests cleanup multipart uploads for filesystem backend. func TestFSCleanupMultipartUploadsInRoutine(t *testing.T) { // Prepare for tests disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) @@ -47,7 +48,7 @@ func TestFSCleanupMultipartUploadsInRoutine(t *testing.T) { t.Fatal("Unexpected err: ", err) } - go fs.cleanupStaleMultipartUploads(20*time.Millisecond, 0, globalServiceDoneCh) + go cleanupStaleMultipartUploads(20*time.Millisecond, 0, obj, fs.listMultipartUploadsCleanup, globalServiceDoneCh) // Wait for 40ms such that - we have given enough time for // cleanup routine to kick in. @@ -89,7 +90,7 @@ func TestFSCleanupMultipartUpload(t *testing.T) { t.Fatal("Unexpected err: ", err) } - if err = fs.cleanupStaleMultipartUpload(bucketName, 0); err != nil { + if err = cleanupStaleMultipartUpload(bucketName, 0, obj, fs.listMultipartUploadsCleanup); err != nil { t.Fatal("Unexpected err: ", err) } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index f1502e224..c7368a358 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -156,8 +156,8 @@ func newFSObjectLayer(fsPath string) (ObjectLayer, error) { return nil, fmt.Errorf("Unable to initialize event notification. %s", err) } - // Start background process to cleanup old files in `.minio.sys`. - go fs.cleanupStaleMultipartUploads(fsMultipartCleanupInterval, fsMultipartExpiry, globalServiceDoneCh) + // Start background process to cleanup old multipart objects in `.minio.sys`. + go cleanupStaleMultipartUploads(multipartCleanupInterval, multipartExpiry, fs, fs.listMultipartUploadsCleanup, globalServiceDoneCh) // Return successfully initialized object layer. return fs, nil diff --git a/cmd/object-api-multipart-common.go b/cmd/object-api-multipart-common.go index 4a0e73f51..9a30e156b 100644 --- a/cmd/object-api-multipart-common.go +++ b/cmd/object-api-multipart-common.go @@ -28,6 +28,13 @@ import ( "github.com/minio/minio/pkg/lock" ) +const ( + // Expiry duration after which the multipart uploads are deemed stale. + multipartExpiry = time.Hour * 24 * 14 // 2 weeks. + // Cleanup interval when the stale multipart cleanup is initiated. + multipartCleanupInterval = time.Hour * 24 // 24 hrs. +) + // A uploadInfo represents the s3 compatible spec. type uploadInfo struct { UploadID string `json:"uploadId"` // UploadID for the active multipart upload. @@ -198,3 +205,58 @@ func listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count end := (index == len(uploadsJSON.Uploads)) return uploads, end, nil } + +// List multipart uploads func defines the function signature of list multipart recursive function. +type listMultipartUploadsFunc func(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) + +// Removes multipart uploads if any older than `expiry` duration +// on all buckets for every `cleanupInterval`, this function is +// blocking and should be run in a go-routine. +func cleanupStaleMultipartUploads(cleanupInterval, expiry time.Duration, obj ObjectLayer, listFn listMultipartUploadsFunc, doneCh chan struct{}) { + ticker := time.NewTicker(cleanupInterval) + for { + select { + case <-doneCh: + // Stop the timer. + ticker.Stop() + return + case <-ticker.C: + bucketInfos, err := obj.ListBuckets() + if err != nil { + errorIf(err, "Unable to list buckets") + continue + } + for _, bucketInfo := range bucketInfos { + cleanupStaleMultipartUpload(bucketInfo.Name, expiry, obj, listFn) + } + } + } +} + +// Removes multipart uploads if any older than `expiry` duration in a given bucket. +func cleanupStaleMultipartUpload(bucket string, expiry time.Duration, obj ObjectLayer, listFn listMultipartUploadsFunc) (err error) { + var lmi ListMultipartsInfo + for { + // List multipart uploads in a bucket 1000 at a time + prefix := "" + lmi, err = listFn(bucket, prefix, lmi.KeyMarker, lmi.UploadIDMarker, "", 1000) + if err != nil { + errorIf(err, "Unable to list uploads") + return err + } + + // Remove uploads (and its parts) older than expiry duration. + for _, upload := range lmi.Uploads { + if time.Since(upload.Initiated) > expiry { + obj.AbortMultipartUpload(bucket, upload.Object, upload.UploadID) + } + } + + // No more incomplete uploads remain, break and return. + if !lmi.IsTruncated { + break + } + } + + return nil +} diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 4aaf03765..ab291463a 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -272,6 +272,174 @@ func commitXLMetadata(disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPr return evalDisks(disks, mErrs), err } +// listMultipartUploadsCleanup - lists all multipart uploads. Called by xl.cleanupStaleMultipartUpload() +func (xl xlObjects) listMultipartUploadsCleanup(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { + result := ListMultipartsInfo{ + IsTruncated: true, + MaxUploads: maxUploads, + KeyMarker: keyMarker, + Prefix: prefix, + Delimiter: delimiter, + } + + recursive := true + if delimiter == slashSeparator { + recursive = false + } + + // Not using path.Join() as it strips off the trailing '/'. + multipartPrefixPath := pathJoin(bucket, prefix) + if prefix == "" { + // Should have a trailing "/" if prefix is "" + // For ex. multipartPrefixPath should be "multipart/bucket/" if prefix is "" + multipartPrefixPath += slashSeparator + } + multipartMarkerPath := "" + if keyMarker != "" { + multipartMarkerPath = pathJoin(bucket, keyMarker) + } + var uploads []MultipartInfo + var err error + var eof bool + // List all upload ids for the keyMarker starting from + // uploadIDMarker first. + if uploadIDMarker != "" { + // hold lock on keyMarker path + keyMarkerLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, + pathJoin(bucket, keyMarker)) + if err = keyMarkerLock.GetRLock(globalListingTimeout); err != nil { + return lmi, err + } + for _, disk := range xl.getLoadBalancedDisks() { + if disk == nil { + continue + } + uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, disk) + if err == nil { + break + } + if errors.IsErrIgnored(err, objMetadataOpIgnoredErrs...) { + continue + } + break + } + keyMarkerLock.RUnlock() + if err != nil { + return lmi, err + } + maxUploads = maxUploads - len(uploads) + } + var walkerCh chan treeWalkResult + var walkerDoneCh chan struct{} + heal := false // true only for xl.ListObjectsHeal + // Validate if we need to list further depending on maxUploads. + if maxUploads > 0 { + walkerCh, walkerDoneCh = xl.listPool.Release(listParams{minioMetaMultipartBucket, recursive, multipartMarkerPath, multipartPrefixPath, heal}) + if walkerCh == nil { + walkerDoneCh = make(chan struct{}) + isLeaf := xl.isMultipartUpload + listDir := listDirFactory(isLeaf, xlTreeWalkIgnoredErrs, xl.getLoadBalancedDisks()...) + walkerCh = startTreeWalk(minioMetaMultipartBucket, multipartPrefixPath, multipartMarkerPath, recursive, listDir, isLeaf, walkerDoneCh) + } + // Collect uploads until we have reached maxUploads count to 0. + for maxUploads > 0 { + walkResult, ok := <-walkerCh + if !ok { + // Closed channel. + eof = true + break + } + // For any walk error return right away. + if walkResult.err != nil { + return lmi, walkResult.err + } + entry := strings.TrimPrefix(walkResult.entry, retainSlash(bucket)) + // For an entry looking like a directory, store and + // continue the loop not need to fetch uploads. + if hasSuffix(walkResult.entry, slashSeparator) { + uploads = append(uploads, MultipartInfo{ + Object: entry, + }) + maxUploads-- + if maxUploads == 0 { + eof = true + break + } + continue + } + var newUploads []MultipartInfo + var end bool + uploadIDMarker = "" + + // For the new object entry we get all its + // pending uploadIDs. + entryLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, + pathJoin(bucket, entry)) + if err = entryLock.GetRLock(globalListingTimeout); err != nil { + return lmi, err + } + var disk StorageAPI + for _, disk = range xl.getLoadBalancedDisks() { + if disk == nil { + continue + } + newUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, disk) + if err == nil { + break + } + if errors.IsErrIgnored(err, objMetadataOpIgnoredErrs...) { + continue + } + break + } + entryLock.RUnlock() + if err != nil { + if errors.IsErrIgnored(err, xlTreeWalkIgnoredErrs...) { + continue + } + return lmi, err + } + uploads = append(uploads, newUploads...) + maxUploads -= len(newUploads) + if end && walkResult.end { + eof = true + break + } + } + } + // For all received uploads fill in the multiparts result. + for _, upload := range uploads { + var objectName string + var uploadID string + if hasSuffix(upload.Object, slashSeparator) { + // All directory entries are common prefixes. + uploadID = "" // For common prefixes, upload ids are empty. + objectName = upload.Object + result.CommonPrefixes = append(result.CommonPrefixes, objectName) + } else { + uploadID = upload.UploadID + objectName = upload.Object + result.Uploads = append(result.Uploads, upload) + } + result.NextKeyMarker = objectName + result.NextUploadIDMarker = uploadID + } + + if !eof { + // Save the go-routine state in the pool so that it can continue from where it left off on + // the next request. + xl.listPool.Set(listParams{bucket, recursive, result.NextKeyMarker, prefix, heal}, walkerCh, walkerDoneCh) + } + + result.IsTruncated = !eof + // Result is not truncated, reset the markers. + if !result.IsTruncated { + result.NextKeyMarker = "" + result.NextUploadIDMarker = "" + } + return result, nil +} + // ListMultipartUploads - lists all the pending multipart // uploads for a particular object in a bucket. // diff --git a/cmd/xl-v1-multipart_test.go b/cmd/xl-v1-multipart_test.go index 93cb3ab0f..72dd9b431 100644 --- a/cmd/xl-v1-multipart_test.go +++ b/cmd/xl-v1-multipart_test.go @@ -20,8 +20,105 @@ import ( "os" "testing" "time" + + "github.com/minio/minio/pkg/errors" ) +// Tests cleanup multipart uploads for erasure coded backend. +func TestXLCleanupMultipartUploadsInRoutine(t *testing.T) { + // Initialize configuration + root, err := newTestConfig(globalMinioDefaultRegion) + if err != nil { + t.Fatalf("%s", err) + } + defer os.RemoveAll(root) + + // Create an instance of xl backend + obj, fsDirs, err := prepareXL() + if err != nil { + t.Fatal(err) + } + // Defer cleanup of backend directories + defer removeRoots(fsDirs) + + xl := obj.(*xlObjects) + + // Close the go-routine, we are going to + // manually start it and test in this test case. + globalServiceDoneCh <- struct{}{} + + bucketName := "bucket" + objectName := "object" + + obj.MakeBucketWithLocation(bucketName, "") + uploadID, err := obj.NewMultipartUpload(bucketName, objectName, nil) + if err != nil { + t.Fatal("Unexpected err: ", err) + } + + go cleanupStaleMultipartUploads(20*time.Millisecond, 0, obj, xl.listMultipartUploadsCleanup, globalServiceDoneCh) + + // Wait for 40ms such that - we have given enough time for + // cleanup routine to kick in. + time.Sleep(40 * time.Millisecond) + + // Close the routine we do not need it anymore. + globalServiceDoneCh <- struct{}{} + + // Check if upload id was already purged. + if err = obj.AbortMultipartUpload(bucketName, objectName, uploadID); err != nil { + err = errors.Cause(err) + if _, ok := err.(InvalidUploadID); !ok { + t.Fatal("Unexpected err: ", err) + } + } +} + +// Tests cleanup of stale upload ids. +func TestXLCleanupMultipartUpload(t *testing.T) { + // Initialize configuration + root, err := newTestConfig(globalMinioDefaultRegion) + if err != nil { + t.Fatalf("%s", err) + } + defer os.RemoveAll(root) + + // Create an instance of xl backend + obj, fsDirs, err := prepareXL() + if err != nil { + t.Fatal(err) + } + // Defer cleanup of backend directories + defer removeRoots(fsDirs) + + xl := obj.(*xlObjects) + + // Close the go-routine, we are going to + // manually start it and test in this test case. + globalServiceDoneCh <- struct{}{} + + bucketName := "bucket" + objectName := "object" + + obj.MakeBucketWithLocation(bucketName, "") + uploadID, err := obj.NewMultipartUpload(bucketName, objectName, nil) + if err != nil { + t.Fatal("Unexpected err: ", err) + } + + if err = cleanupStaleMultipartUpload(bucketName, 0, obj, xl.listMultipartUploadsCleanup); err != nil { + t.Fatal("Unexpected err: ", err) + } + + // Check if upload id was already purged. + if err = obj.AbortMultipartUpload(bucketName, objectName, uploadID); err != nil { + err = errors.Cause(err) + if _, ok := err.(InvalidUploadID); !ok { + t.Fatal("Unexpected err: ", err) + } + } +} + func TestUpdateUploadJSON(t *testing.T) { // Initialize configuration root, err := newTestConfig(globalMinioDefaultRegion) diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index a1999d559..5b6bc4f5b 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -159,8 +159,15 @@ func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) { return xl, nil } - // Do a quick heal on the buckets themselves for any discrepancies. - return xl, quickHeal(xl.storageDisks, xl.writeQuorum, xl.readQuorum) + // Perform a quick heal on the buckets and bucket metadata for any discrepancies. + if err = quickHeal(xl.storageDisks, xl.writeQuorum, xl.readQuorum); err != nil { + return nil, err + } + + // Start background process to cleanup old multipart objects in `.minio.sys`. + go cleanupStaleMultipartUploads(multipartCleanupInterval, multipartExpiry, xl, xl.listMultipartUploadsCleanup, globalServiceDoneCh) + + return xl, nil } // Shutdown function for object storage interface.