From 3544e5ad0165337a4c780dfad2cd41ee449ddbfb Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 10 Aug 2017 14:11:57 -0700 Subject: [PATCH] fs: Fix Shutdown() behavior and handle tests properly. (#4796) Fixes #4795 --- cmd/fs-v1-multipart.go | 57 +++++++++++++----------- cmd/fs-v1-multipart_test.go | 88 ++++++++++++++++++++++++++++++++++--- cmd/fs-v1-rwpool.go | 2 +- cmd/fs-v1.go | 6 ++- cmd/fs-v1_test.go | 11 ++--- 5 files changed, 126 insertions(+), 38 deletions(-) diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index f1869409d..2cbf7409c 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -32,8 +32,10 @@ import ( ) const ( - fsMultipartExpiry = time.Hour * 24 * 14 - fsMultipartCleanupPeriod = time.Hour * 24 + // Expiry duration after which the multipart uploads are deemed stale. + fsMultipartExpiry = time.Hour * 24 * 14 // 2 weeks. + // Cleanup interval when the stale multipart cleanup is initiated. + fsMultipartCleanupInterval = time.Hour * 24 // 24 hrs. ) // Returns if the prefix is a multipart upload. @@ -937,33 +939,32 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error return nil } -// Remove multipart uploads left unattended in a given bucket older than `fsMultipartExpiry` -func (fs fsObjects) cleanupStaleMultipartUpload(bucket string) (err error) { +// Removes multipart uploads if any older than `expiry` duration in a given bucket. +func (fs fsObjects) cleanupStaleMultipartUpload(bucket string, expiry time.Duration) (err error) { var lmi ListMultipartsInfo - + var st os.FileInfo for { // List multipart uploads in a bucket 1000 at a time prefix := "" lmi, err = fs.listMultipartUploadsHelper(bucket, prefix, lmi.KeyMarker, lmi.UploadIDMarker, slashSeparator, 1000) if err != nil { - errorIf(err, fmt.Sprintf("Failed to list uploads of %s for cleaning up of multipart uploads older than %d weeks", bucket, fsMultipartExpiry)) + errorIf(err, "Unable to list uploads") return err } - // Remove uploads (and its parts) older than 2 weeks + // Remove uploads (and its parts) older than expiry duration. for _, upload := range lmi.Uploads { uploadIDPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, upload.Object, upload.UploadID) - st, err := fsStatDir(uploadIDPath) - if err != nil { - errorIf(err, "Failed to stat uploads directory", uploadIDPath) - return err + if st, err = fsStatDir(uploadIDPath); err != nil { + errorIf(err, "Failed to lookup uploads directory path %s", uploadIDPath) + continue } - if time.Since(st.ModTime()) > fsMultipartExpiry { + if time.Since(st.ModTime()) > expiry { fs.AbortMultipartUpload(bucket, upload.Object, upload.UploadID) } } - // No more incomplete uploads remain + // No more incomplete uploads remain, break and return. if !lmi.IsTruncated { break } @@ -972,20 +973,26 @@ func (fs fsObjects) cleanupStaleMultipartUpload(bucket string) (err error) { return nil } -// Remove multipart uploads left unattended for more than `fsMultipartExpiry` seconds. -func (fs fsObjects) cleanupStaleMultipartUploads() { +// Removes multipart uploads if any older than `expiry` duration +// on all buckets for every `cleanupInterval`, this function is +// blocking and should be run in a go-routine. +func (fs fsObjects) cleanupStaleMultipartUploads(cleanupInterval, expiry time.Duration, doneCh chan struct{}) { + timer := time.NewTimer(cleanupInterval) for { - bucketInfos, err := fs.ListBuckets() - if err != nil { - errorIf(err, fmt.Sprintf("Failed to list buckets for cleaning up of multipart uploads older than %d weeks", fsMultipartExpiry)) + select { + case <-doneCh: + // Stop the timer. + timer.Stop() return + case <-timer.C: + bucketInfos, err := fs.ListBuckets() + if err != nil { + errorIf(err, "Unable to list buckets") + continue + } + for _, bucketInfo := range bucketInfos { + fs.cleanupStaleMultipartUpload(bucketInfo.Name, expiry) + } } - - for _, bucketInfo := range bucketInfos { - fs.cleanupStaleMultipartUpload(bucketInfo.Name) - } - - // Schedule for the next multipart backend cleanup - time.Sleep(fsMultipartCleanupPeriod) } } diff --git a/cmd/fs-v1-multipart_test.go b/cmd/fs-v1-multipart_test.go index c69807440..6bf6826b6 100644 --- a/cmd/fs-v1-multipart_test.go +++ b/cmd/fs-v1-multipart_test.go @@ -20,8 +20,85 @@ import ( "bytes" "path/filepath" "testing" + "time" ) +func TestFSCleanupMultipartUploadsInRoutine(t *testing.T) { + // Prepare for tests + disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) + defer removeAll(disk) + + obj := initFSObjects(disk, t) + fs := obj.(*fsObjects) + + // Close the go-routine, we are going to + // manually start it and test in this test case. + globalServiceDoneCh <- struct{}{} + + bucketName := "bucket" + objectName := "object" + + obj.MakeBucketWithLocation(bucketName, "") + uploadID, err := obj.NewMultipartUpload(bucketName, objectName, nil) + if err != nil { + t.Fatal("Unexpected err: ", err) + } + + go fs.cleanupStaleMultipartUploads(20*time.Millisecond, 0, globalServiceDoneCh) + + // Wait for 40ms such that - we have given enough time for + // cleanup routine to kick in. + time.Sleep(40 * time.Millisecond) + + // Close the routine we do not need it anymore. + globalServiceDoneCh <- struct{}{} + + // Check if upload id was already purged. + if err = obj.AbortMultipartUpload(bucketName, objectName, uploadID); err != nil { + err = errorCause(err) + if _, ok := err.(InvalidUploadID); !ok { + t.Fatal("Unexpected err: ", err) + } + } +} + +// Tests cleanup of stale upload ids. +func TestFSCleanupMultipartUpload(t *testing.T) { + // Prepare for tests + disk := filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) + defer removeAll(disk) + + obj := initFSObjects(disk, t) + fs := obj.(*fsObjects) + + // Close the multipart cleanup go-routine. + // In this test we are going to manually call + // the function which actually cleans the stale + // uploads. + globalServiceDoneCh <- struct{}{} + + bucketName := "bucket" + objectName := "object" + + obj.MakeBucketWithLocation(bucketName, "") + uploadID, err := obj.NewMultipartUpload(bucketName, objectName, nil) + if err != nil { + t.Fatal("Unexpected err: ", err) + } + + if err = fs.cleanupStaleMultipartUpload(bucketName, 0); err != nil { + t.Fatal("Unexpected err: ", err) + } + + // Check if upload id was already purged. + if err = obj.AbortMultipartUpload(bucketName, objectName, uploadID); err != nil { + err = errorCause(err) + if _, ok := err.(InvalidUploadID); !ok { + t.Fatal("Unexpected err: ", err) + } + } +} + // TestFSWriteUploadJSON - tests for writeUploadJSON for FS func TestFSWriteUploadJSON(t *testing.T) { // Prepare for tests @@ -29,6 +106,7 @@ func TestFSWriteUploadJSON(t *testing.T) { defer removeAll(disk) obj := initFSObjects(disk, t) + fs := obj.(*fsObjects) bucketName := "bucket" objectName := "object" @@ -40,7 +118,7 @@ func TestFSWriteUploadJSON(t *testing.T) { } // newMultipartUpload will fail. - removeAll(disk) // Remove disk. + fs.fsPath = filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) _, err = obj.NewMultipartUpload(bucketName, objectName, nil) if err != nil { if _, ok := errorCause(err).(BucketNotFound); !ok { @@ -65,7 +143,7 @@ func TestNewMultipartUploadFaultyDisk(t *testing.T) { } // Test with disk removed. - removeAll(disk) // remove disk. + fs.fsPath = filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) if _, err := fs.NewMultipartUpload(bucketName, objectName, map[string]string{"X-Amz-Meta-xid": "3f"}); err != nil { if !isSameType(errorCause(err), BucketNotFound{}) { t.Fatal("Unexpected error ", err) @@ -103,7 +181,7 @@ func TestPutObjectPartFaultyDisk(t *testing.T) { md5Hex := getMD5Hash(data) sha256sum := "" - removeAll(disk) // Disk not found. + fs.fsPath = filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) _, err = fs.PutObjectPart(bucketName, objectName, uploadID, 1, dataLen, bytes.NewReader(data), md5Hex, sha256sum) if !isSameType(errorCause(err), BucketNotFound{}) { t.Fatal("Unexpected error ", err) @@ -140,7 +218,7 @@ func TestCompleteMultipartUploadFaultyDisk(t *testing.T) { parts := []completePart{{PartNumber: 1, ETag: md5Hex}} - removeAll(disk) // Disk not found. + fs.fsPath = filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) if _, err := fs.CompleteMultipartUpload(bucketName, objectName, uploadID, parts); err != nil { if !isSameType(errorCause(err), BucketNotFound{}) { t.Fatal("Unexpected error ", err) @@ -177,7 +255,7 @@ func TestListMultipartUploadsFaultyDisk(t *testing.T) { t.Fatal("Unexpected error ", err) } - removeAll(disk) // Disk not found. + fs.fsPath = filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) if _, err := fs.ListMultipartUploads(bucketName, objectName, "", "", "", 1000); err != nil { if !isSameType(errorCause(err), BucketNotFound{}) { t.Fatal("Unexpected error ", err) diff --git a/cmd/fs-v1-rwpool.go b/cmd/fs-v1-rwpool.go index 74a0f38b9..736683297 100644 --- a/cmd/fs-v1-rwpool.go +++ b/cmd/fs-v1-rwpool.go @@ -182,7 +182,7 @@ func (fsi *fsIOPool) Create(path string) (wlk *lock.LockedFile, err error) { } // Success. - return wlk, err + return wlk, nil } // Close implements closing the path referenced by the reader in such diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 96ce9f5f5..2694b4eba 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -157,8 +157,8 @@ func newFSObjectLayer(fsPath string) (ObjectLayer, error) { return nil, fmt.Errorf("Unable to initialize event notification. %s", err) } - // Start background process to cleanup old files in minio.sys.tmp - go fs.cleanupStaleMultipartUploads() + // Start background process to cleanup old files in `.minio.sys`. + go fs.cleanupStaleMultipartUploads(fsMultipartCleanupInterval, fsMultipartExpiry, globalServiceDoneCh) // Return successfully initialized object layer. return fs, nil @@ -166,6 +166,8 @@ func newFSObjectLayer(fsPath string) (ObjectLayer, error) { // Should be called when process shuts down. func (fs fsObjects) Shutdown() error { + fs.fsFormatRlk.Close() + // Cleanup and delete tmp uuid. return fsRemoveAll(pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID)) } diff --git a/cmd/fs-v1_test.go b/cmd/fs-v1_test.go index f60ff97d0..3166a014e 100644 --- a/cmd/fs-v1_test.go +++ b/cmd/fs-v1_test.go @@ -113,7 +113,8 @@ func TestFSGetBucketInfo(t *testing.T) { } // Check for buckets and should get disk not found. - removeAll(disk) + fs.fsPath = filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) + _, err = fs.GetBucketInfo(bucketName) if !isSameType(errorCause(err), BucketNotFound{}) { t.Fatal("BucketNotFound error not returned") @@ -225,7 +226,7 @@ func TestFSDeleteObject(t *testing.T) { } // Delete object should err disk not found. - removeAll(disk) + fs.fsPath = filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) if err := fs.DeleteObject(bucketName, objectName); err != nil { if !isSameType(errorCause(err), BucketNotFound{}) { t.Fatal("Unexpected error: ", err) @@ -264,8 +265,8 @@ func TestFSDeleteBucket(t *testing.T) { obj.MakeBucketWithLocation(bucketName, "") - // Delete bucker should get error disk not found. - removeAll(disk) + // Delete bucket should get error disk not found. + fs.fsPath = filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) if err = fs.DeleteBucket(bucketName); err != nil { if !isSameType(errorCause(err), BucketNotFound{}) { t.Fatal("Unexpected error: ", err) @@ -307,7 +308,7 @@ func TestFSListBuckets(t *testing.T) { } // Test ListBuckets with disk not found. - removeAll(disk) + fs.fsPath = filepath.Join(globalTestTmpDir, "minio-"+nextSuffix()) if _, err := fs.ListBuckets(); err != nil { if errorCause(err) != errDiskNotFound {