From 17e17da00d3cb4fb0506aa9b652e56b54492d9cf Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 21 Sep 2020 13:43:29 -0700 Subject: [PATCH] add parallel workers to perform replication in parallel (#10525) set the concurrency for replication be to runtime.NumCPU()/2 --- cmd/bucket-replication.go | 39 +++++++++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 53af69207..509ae62b5 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net/http" + "runtime" "strings" "time" @@ -293,29 +294,47 @@ func (r *replicationState) queueReplicaTask(oi ObjectInfo) { } } -var globalReplicationState *replicationState +var ( + globalReplicationState *replicationState + // TODO: currently keeping it conservative + // but eventually can be tuned in future, + // take only half the CPUs for replication + // conservatively. + globalReplicationConcurrent = runtime.GOMAXPROCS(0) / 2 +) func newReplicationState() *replicationState { return &replicationState{ - // TODO: currently keeping it conservative - // but eventually can be tuned in future - replicaCh: make(chan ObjectInfo, 100), + replicaCh: make(chan ObjectInfo, globalReplicationConcurrent*2), } } -func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) { - if globalReplicationState == nil { - return - } +// addWorker creates a new worker to process tasks +func (r *replicationState) addWorker(ctx context.Context, objectAPI ObjectLayer) { + // Add a new worker. go func() { - defer close(globalReplicationState.replicaCh) for { select { case <-ctx.Done(): + close(r.replicaCh) return - case oi := <-globalReplicationState.replicaCh: + case oi, ok := <-r.replicaCh: + if !ok { + return + } replicateObject(ctx, oi, objectAPI) } } }() } + +func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) { + if globalReplicationState == nil { + return + } + + // Start with globalReplicationConcurrent. + for i := 0; i < globalReplicationConcurrent; i++ { + globalReplicationState.addWorker(ctx, objectAPI) + } +}