erasure: Support cleaning up of stale multipart objects (#5250)

Just like our single directory/disk setup, this PR brings
the functionality to cleanup stale multipart objects
older > 2 weeks.
master
Harshavardhana 7 years ago committed by GitHub
parent 59749a2b85
commit 490c30f853
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 65
      cmd/fs-v1-multipart.go
  2. 5
      cmd/fs-v1-multipart_test.go
  3. 4
      cmd/fs-v1.go
  4. 62
      cmd/object-api-multipart-common.go
  5. 168
      cmd/xl-v1-multipart.go
  6. 97
      cmd/xl-v1-multipart_test.go
  7. 11
      cmd/xl-v1.go

@ -30,13 +30,6 @@ import (
"github.com/minio/minio/pkg/lock" "github.com/minio/minio/pkg/lock"
) )
const (
// Expiry duration after which the multipart uploads are deemed stale.
fsMultipartExpiry = time.Hour * 24 * 14 // 2 weeks.
// Cleanup interval when the stale multipart cleanup is initiated.
fsMultipartCleanupInterval = time.Hour * 24 // 24 hrs.
)
// Returns if the prefix is a multipart upload. // Returns if the prefix is a multipart upload.
func (fs fsObjects) isMultipartUpload(bucket, prefix string) bool { func (fs fsObjects) isMultipartUpload(bucket, prefix string) bool {
uploadsIDPath := pathJoin(fs.fsPath, bucket, prefix, uploadsJSONFile) uploadsIDPath := pathJoin(fs.fsPath, bucket, prefix, uploadsJSONFile)
@ -1042,61 +1035,3 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error
return nil return nil
} }
// Removes multipart uploads if any older than `expiry` duration in a given bucket.
func (fs fsObjects) cleanupStaleMultipartUpload(bucket string, expiry time.Duration) (err error) {
var lmi ListMultipartsInfo
var st os.FileInfo
for {
// List multipart uploads in a bucket 1000 at a time
prefix := ""
lmi, err = fs.listMultipartUploadsCleanup(bucket, prefix, lmi.KeyMarker, lmi.UploadIDMarker, "", 1000)
if err != nil {
errorIf(err, "Unable to list uploads")
return err
}
// Remove uploads (and its parts) older than expiry duration.
for _, upload := range lmi.Uploads {
uploadIDPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, upload.Object, upload.UploadID)
if st, err = fsStatDir(uploadIDPath); err != nil {
errorIf(err, "Failed to lookup uploads directory path %s", uploadIDPath)
continue
}
if time.Since(st.ModTime()) > expiry {
fs.AbortMultipartUpload(bucket, upload.Object, upload.UploadID)
}
}
// No more incomplete uploads remain, break and return.
if !lmi.IsTruncated {
break
}
}
return nil
}
// Removes multipart uploads if any older than `expiry` duration
// on all buckets for every `cleanupInterval`, this function is
// blocking and should be run in a go-routine.
func (fs fsObjects) cleanupStaleMultipartUploads(cleanupInterval, expiry time.Duration, doneCh chan struct{}) {
ticker := time.NewTicker(cleanupInterval)
for {
select {
case <-doneCh:
// Stop the timer.
ticker.Stop()
return
case <-ticker.C:
bucketInfos, err := fs.ListBuckets()
if err != nil {
errorIf(err, "Unable to list buckets")
continue
}
for _, bucketInfo := range bucketInfos {
fs.cleanupStaleMultipartUpload(bucketInfo.Name, expiry)
}
}
}
}

@ -26,6 +26,7 @@ import (
"github.com/minio/minio/pkg/errors" "github.com/minio/minio/pkg/errors"
) )
// Tests cleanup multipart uploads for filesystem backend.
func TestFSCleanupMultipartUploadsInRoutine(t *testing.T) { func TestFSCleanupMultipartUploadsInRoutine(t *testing.T) {
// Prepare for tests // Prepare for tests
disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix())
@ -47,7 +48,7 @@ func TestFSCleanupMultipartUploadsInRoutine(t *testing.T) {
t.Fatal("Unexpected err: ", err) t.Fatal("Unexpected err: ", err)
} }
go fs.cleanupStaleMultipartUploads(20*time.Millisecond, 0, globalServiceDoneCh) go cleanupStaleMultipartUploads(20*time.Millisecond, 0, obj, fs.listMultipartUploadsCleanup, globalServiceDoneCh)
// Wait for 40ms such that - we have given enough time for // Wait for 40ms such that - we have given enough time for
// cleanup routine to kick in. // cleanup routine to kick in.
@ -89,7 +90,7 @@ func TestFSCleanupMultipartUpload(t *testing.T) {
t.Fatal("Unexpected err: ", err) t.Fatal("Unexpected err: ", err)
} }
if err = fs.cleanupStaleMultipartUpload(bucketName, 0); err != nil { if err = cleanupStaleMultipartUpload(bucketName, 0, obj, fs.listMultipartUploadsCleanup); err != nil {
t.Fatal("Unexpected err: ", err) t.Fatal("Unexpected err: ", err)
} }

@ -156,8 +156,8 @@ func newFSObjectLayer(fsPath string) (ObjectLayer, error) {
return nil, fmt.Errorf("Unable to initialize event notification. %s", err) return nil, fmt.Errorf("Unable to initialize event notification. %s", err)
} }
// Start background process to cleanup old files in `.minio.sys`. // Start background process to cleanup old multipart objects in `.minio.sys`.
go fs.cleanupStaleMultipartUploads(fsMultipartCleanupInterval, fsMultipartExpiry, globalServiceDoneCh) go cleanupStaleMultipartUploads(multipartCleanupInterval, multipartExpiry, fs, fs.listMultipartUploadsCleanup, globalServiceDoneCh)
// Return successfully initialized object layer. // Return successfully initialized object layer.
return fs, nil return fs, nil

@ -28,6 +28,13 @@ import (
"github.com/minio/minio/pkg/lock" "github.com/minio/minio/pkg/lock"
) )
const (
// Expiry duration after which the multipart uploads are deemed stale.
multipartExpiry = time.Hour * 24 * 14 // 2 weeks.
// Cleanup interval when the stale multipart cleanup is initiated.
multipartCleanupInterval = time.Hour * 24 // 24 hrs.
)
// A uploadInfo represents the s3 compatible spec. // A uploadInfo represents the s3 compatible spec.
type uploadInfo struct { type uploadInfo struct {
UploadID string `json:"uploadId"` // UploadID for the active multipart upload. UploadID string `json:"uploadId"` // UploadID for the active multipart upload.
@ -198,3 +205,58 @@ func listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count
end := (index == len(uploadsJSON.Uploads)) end := (index == len(uploadsJSON.Uploads))
return uploads, end, nil return uploads, end, nil
} }
// List multipart uploads func defines the function signature of list multipart recursive function.
type listMultipartUploadsFunc func(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error)
// Removes multipart uploads if any older than `expiry` duration
// on all buckets for every `cleanupInterval`, this function is
// blocking and should be run in a go-routine.
func cleanupStaleMultipartUploads(cleanupInterval, expiry time.Duration, obj ObjectLayer, listFn listMultipartUploadsFunc, doneCh chan struct{}) {
ticker := time.NewTicker(cleanupInterval)
for {
select {
case <-doneCh:
// Stop the timer.
ticker.Stop()
return
case <-ticker.C:
bucketInfos, err := obj.ListBuckets()
if err != nil {
errorIf(err, "Unable to list buckets")
continue
}
for _, bucketInfo := range bucketInfos {
cleanupStaleMultipartUpload(bucketInfo.Name, expiry, obj, listFn)
}
}
}
}
// Removes multipart uploads if any older than `expiry` duration in a given bucket.
func cleanupStaleMultipartUpload(bucket string, expiry time.Duration, obj ObjectLayer, listFn listMultipartUploadsFunc) (err error) {
var lmi ListMultipartsInfo
for {
// List multipart uploads in a bucket 1000 at a time
prefix := ""
lmi, err = listFn(bucket, prefix, lmi.KeyMarker, lmi.UploadIDMarker, "", 1000)
if err != nil {
errorIf(err, "Unable to list uploads")
return err
}
// Remove uploads (and its parts) older than expiry duration.
for _, upload := range lmi.Uploads {
if time.Since(upload.Initiated) > expiry {
obj.AbortMultipartUpload(bucket, upload.Object, upload.UploadID)
}
}
// No more incomplete uploads remain, break and return.
if !lmi.IsTruncated {
break
}
}
return nil
}

@ -272,6 +272,174 @@ func commitXLMetadata(disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPr
return evalDisks(disks, mErrs), err return evalDisks(disks, mErrs), err
} }
// listMultipartUploadsCleanup - lists all multipart uploads. Called by xl.cleanupStaleMultipartUpload()
func (xl xlObjects) listMultipartUploadsCleanup(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) {
result := ListMultipartsInfo{
IsTruncated: true,
MaxUploads: maxUploads,
KeyMarker: keyMarker,
Prefix: prefix,
Delimiter: delimiter,
}
recursive := true
if delimiter == slashSeparator {
recursive = false
}
// Not using path.Join() as it strips off the trailing '/'.
multipartPrefixPath := pathJoin(bucket, prefix)
if prefix == "" {
// Should have a trailing "/" if prefix is ""
// For ex. multipartPrefixPath should be "multipart/bucket/" if prefix is ""
multipartPrefixPath += slashSeparator
}
multipartMarkerPath := ""
if keyMarker != "" {
multipartMarkerPath = pathJoin(bucket, keyMarker)
}
var uploads []MultipartInfo
var err error
var eof bool
// List all upload ids for the keyMarker starting from
// uploadIDMarker first.
if uploadIDMarker != "" {
// hold lock on keyMarker path
keyMarkerLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
pathJoin(bucket, keyMarker))
if err = keyMarkerLock.GetRLock(globalListingTimeout); err != nil {
return lmi, err
}
for _, disk := range xl.getLoadBalancedDisks() {
if disk == nil {
continue
}
uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, disk)
if err == nil {
break
}
if errors.IsErrIgnored(err, objMetadataOpIgnoredErrs...) {
continue
}
break
}
keyMarkerLock.RUnlock()
if err != nil {
return lmi, err
}
maxUploads = maxUploads - len(uploads)
}
var walkerCh chan treeWalkResult
var walkerDoneCh chan struct{}
heal := false // true only for xl.ListObjectsHeal
// Validate if we need to list further depending on maxUploads.
if maxUploads > 0 {
walkerCh, walkerDoneCh = xl.listPool.Release(listParams{minioMetaMultipartBucket, recursive, multipartMarkerPath, multipartPrefixPath, heal})
if walkerCh == nil {
walkerDoneCh = make(chan struct{})
isLeaf := xl.isMultipartUpload
listDir := listDirFactory(isLeaf, xlTreeWalkIgnoredErrs, xl.getLoadBalancedDisks()...)
walkerCh = startTreeWalk(minioMetaMultipartBucket, multipartPrefixPath, multipartMarkerPath, recursive, listDir, isLeaf, walkerDoneCh)
}
// Collect uploads until we have reached maxUploads count to 0.
for maxUploads > 0 {
walkResult, ok := <-walkerCh
if !ok {
// Closed channel.
eof = true
break
}
// For any walk error return right away.
if walkResult.err != nil {
return lmi, walkResult.err
}
entry := strings.TrimPrefix(walkResult.entry, retainSlash(bucket))
// For an entry looking like a directory, store and
// continue the loop not need to fetch uploads.
if hasSuffix(walkResult.entry, slashSeparator) {
uploads = append(uploads, MultipartInfo{
Object: entry,
})
maxUploads--
if maxUploads == 0 {
eof = true
break
}
continue
}
var newUploads []MultipartInfo
var end bool
uploadIDMarker = ""
// For the new object entry we get all its
// pending uploadIDs.
entryLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
pathJoin(bucket, entry))
if err = entryLock.GetRLock(globalListingTimeout); err != nil {
return lmi, err
}
var disk StorageAPI
for _, disk = range xl.getLoadBalancedDisks() {
if disk == nil {
continue
}
newUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, disk)
if err == nil {
break
}
if errors.IsErrIgnored(err, objMetadataOpIgnoredErrs...) {
continue
}
break
}
entryLock.RUnlock()
if err != nil {
if errors.IsErrIgnored(err, xlTreeWalkIgnoredErrs...) {
continue
}
return lmi, err
}
uploads = append(uploads, newUploads...)
maxUploads -= len(newUploads)
if end && walkResult.end {
eof = true
break
}
}
}
// For all received uploads fill in the multiparts result.
for _, upload := range uploads {
var objectName string
var uploadID string
if hasSuffix(upload.Object, slashSeparator) {
// All directory entries are common prefixes.
uploadID = "" // For common prefixes, upload ids are empty.
objectName = upload.Object
result.CommonPrefixes = append(result.CommonPrefixes, objectName)
} else {
uploadID = upload.UploadID
objectName = upload.Object
result.Uploads = append(result.Uploads, upload)
}
result.NextKeyMarker = objectName
result.NextUploadIDMarker = uploadID
}
if !eof {
// Save the go-routine state in the pool so that it can continue from where it left off on
// the next request.
xl.listPool.Set(listParams{bucket, recursive, result.NextKeyMarker, prefix, heal}, walkerCh, walkerDoneCh)
}
result.IsTruncated = !eof
// Result is not truncated, reset the markers.
if !result.IsTruncated {
result.NextKeyMarker = ""
result.NextUploadIDMarker = ""
}
return result, nil
}
// ListMultipartUploads - lists all the pending multipart // ListMultipartUploads - lists all the pending multipart
// uploads for a particular object in a bucket. // uploads for a particular object in a bucket.
// //

@ -20,8 +20,105 @@ import (
"os" "os"
"testing" "testing"
"time" "time"
"github.com/minio/minio/pkg/errors"
) )
// Tests cleanup multipart uploads for erasure coded backend.
func TestXLCleanupMultipartUploadsInRoutine(t *testing.T) {
// Initialize configuration
root, err := newTestConfig(globalMinioDefaultRegion)
if err != nil {
t.Fatalf("%s", err)
}
defer os.RemoveAll(root)
// Create an instance of xl backend
obj, fsDirs, err := prepareXL()
if err != nil {
t.Fatal(err)
}
// Defer cleanup of backend directories
defer removeRoots(fsDirs)
xl := obj.(*xlObjects)
// Close the go-routine, we are going to
// manually start it and test in this test case.
globalServiceDoneCh <- struct{}{}
bucketName := "bucket"
objectName := "object"
obj.MakeBucketWithLocation(bucketName, "")
uploadID, err := obj.NewMultipartUpload(bucketName, objectName, nil)
if err != nil {
t.Fatal("Unexpected err: ", err)
}
go cleanupStaleMultipartUploads(20*time.Millisecond, 0, obj, xl.listMultipartUploadsCleanup, globalServiceDoneCh)
// Wait for 40ms such that - we have given enough time for
// cleanup routine to kick in.
time.Sleep(40 * time.Millisecond)
// Close the routine we do not need it anymore.
globalServiceDoneCh <- struct{}{}
// Check if upload id was already purged.
if err = obj.AbortMultipartUpload(bucketName, objectName, uploadID); err != nil {
err = errors.Cause(err)
if _, ok := err.(InvalidUploadID); !ok {
t.Fatal("Unexpected err: ", err)
}
}
}
// Tests cleanup of stale upload ids.
func TestXLCleanupMultipartUpload(t *testing.T) {
// Initialize configuration
root, err := newTestConfig(globalMinioDefaultRegion)
if err != nil {
t.Fatalf("%s", err)
}
defer os.RemoveAll(root)
// Create an instance of xl backend
obj, fsDirs, err := prepareXL()
if err != nil {
t.Fatal(err)
}
// Defer cleanup of backend directories
defer removeRoots(fsDirs)
xl := obj.(*xlObjects)
// Close the go-routine, we are going to
// manually start it and test in this test case.
globalServiceDoneCh <- struct{}{}
bucketName := "bucket"
objectName := "object"
obj.MakeBucketWithLocation(bucketName, "")
uploadID, err := obj.NewMultipartUpload(bucketName, objectName, nil)
if err != nil {
t.Fatal("Unexpected err: ", err)
}
if err = cleanupStaleMultipartUpload(bucketName, 0, obj, xl.listMultipartUploadsCleanup); err != nil {
t.Fatal("Unexpected err: ", err)
}
// Check if upload id was already purged.
if err = obj.AbortMultipartUpload(bucketName, objectName, uploadID); err != nil {
err = errors.Cause(err)
if _, ok := err.(InvalidUploadID); !ok {
t.Fatal("Unexpected err: ", err)
}
}
}
func TestUpdateUploadJSON(t *testing.T) { func TestUpdateUploadJSON(t *testing.T) {
// Initialize configuration // Initialize configuration
root, err := newTestConfig(globalMinioDefaultRegion) root, err := newTestConfig(globalMinioDefaultRegion)

@ -159,8 +159,15 @@ func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) {
return xl, nil return xl, nil
} }
// Do a quick heal on the buckets themselves for any discrepancies. // Perform a quick heal on the buckets and bucket metadata for any discrepancies.
return xl, quickHeal(xl.storageDisks, xl.writeQuorum, xl.readQuorum) if err = quickHeal(xl.storageDisks, xl.writeQuorum, xl.readQuorum); err != nil {
return nil, err
}
// Start background process to cleanup old multipart objects in `.minio.sys`.
go cleanupStaleMultipartUploads(multipartCleanupInterval, multipartExpiry, xl, xl.listMultipartUploadsCleanup, globalServiceDoneCh)
return xl, nil
} }
// Shutdown function for object storage interface. // Shutdown function for object storage interface.

Loading…
Cancel
Save