From 95411228db62ca90907987511c325874b4362b78 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 21 Aug 2020 21:39:54 -0700 Subject: [PATCH] add missing cleanupStaleMultipartUploads (#10325) fixes #10319 --- cmd/erasure-multipart.go | 51 ++++++++++++++++++++++++++++++++++++++++ cmd/erasure-sets.go | 3 +++ 2 files changed, 54 insertions(+) diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index d06bed106..6bdede785 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -24,6 +24,7 @@ import ( "sort" "strconv" "strings" + "time" "github.com/minio/minio-go/v7/pkg/set" xhttp "github.com/minio/minio/cmd/http" @@ -69,6 +70,56 @@ func (er erasureObjects) removeObjectPart(bucket, object, uploadID, dataDir stri g.Wait() } +// Clean-up the old multipart uploads. Should be run in a Go routine. +func (er erasureObjects) cleanupStaleMultipartUploads(ctx context.Context, cleanupInterval, expiry time.Duration) { + ticker := time.NewTicker(cleanupInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + var disk StorageAPI + for _, d := range er.getLoadBalancedDisks() { + if d != nil { + disk = d + break + } + } + if disk == nil { + continue + } + er.cleanupStaleMultipartUploadsOnDisk(ctx, disk, expiry) + } + } +} + +// Remove the old multipart uploads on the given disk. +func (er erasureObjects) cleanupStaleMultipartUploadsOnDisk(ctx context.Context, disk StorageAPI, expiry time.Duration) { + now := time.Now() + shaDirs, err := disk.ListDir(minioMetaMultipartBucket, "", -1) + if err != nil { + return + } + for _, shaDir := range shaDirs { + uploadIDDirs, err := disk.ListDir(minioMetaMultipartBucket, shaDir, -1) + if err != nil { + continue + } + for _, uploadIDDir := range uploadIDDirs { + uploadIDPath := pathJoin(shaDir, uploadIDDir) + fi, err := disk.ReadVersion(minioMetaMultipartBucket, uploadIDPath, "") + if err != nil { + continue + } + if now.Sub(fi.ModTime) > expiry { + er.deleteObject(ctx, minioMetaMultipartBucket, uploadIDPath, fi.Erasure.DataBlocks+1) + } + } + } +} + // ListMultipartUploads - lists all the pending multipart // uploads for a particular object in a bucket. // diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 091c316b0..68e6acf77 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -361,6 +361,9 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto bp: bp, mrfOpCh: make(chan partialOperation, 10000), } + + go s.sets[i].cleanupStaleMultipartUploads(ctx, + GlobalMultipartCleanupInterval, GlobalMultipartExpiry) } // Start the disk monitoring and connect routine.