diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 53af69207..509ae62b5 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net/http" + "runtime" "strings" "time" @@ -293,29 +294,47 @@ func (r *replicationState) queueReplicaTask(oi ObjectInfo) { } } -var globalReplicationState *replicationState +var ( + globalReplicationState *replicationState + // TODO: currently keeping it conservative + // but eventually can be tuned in future, + // take only half the CPUs for replication + // conservatively. + globalReplicationConcurrent = runtime.GOMAXPROCS(0) / 2 +) func newReplicationState() *replicationState { return &replicationState{ - // TODO: currently keeping it conservative - // but eventually can be tuned in future - replicaCh: make(chan ObjectInfo, 100), + replicaCh: make(chan ObjectInfo, globalReplicationConcurrent*2), } } -func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) { - if globalReplicationState == nil { - return - } +// addWorker creates a new worker to process tasks +func (r *replicationState) addWorker(ctx context.Context, objectAPI ObjectLayer) { + // Add a new worker. go func() { - defer close(globalReplicationState.replicaCh) for { select { case <-ctx.Done(): + close(r.replicaCh) return - case oi := <-globalReplicationState.replicaCh: + case oi, ok := <-r.replicaCh: + if !ok { + return + } replicateObject(ctx, oi, objectAPI) } } }() } + +func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) { + if globalReplicationState == nil { + return + } + + // Start with globalReplicationConcurrent. + for i := 0; i < globalReplicationConcurrent; i++ { + globalReplicationState.addWorker(ctx, objectAPI) + } +}