add parallel workers to perform replication in parallel (#10525)

set the concurrency for replication be to runtime.NumCPU()/2
master
Harshavardhana 4 years ago committed by GitHub
parent a5da9120f3
commit 17e17da00d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 39
      cmd/bucket-replication.go

@ -20,6 +20,7 @@ import (
"context" "context"
"fmt" "fmt"
"net/http" "net/http"
"runtime"
"strings" "strings"
"time" "time"
@ -293,29 +294,47 @@ func (r *replicationState) queueReplicaTask(oi ObjectInfo) {
} }
} }
var globalReplicationState *replicationState var (
globalReplicationState *replicationState
// TODO: currently keeping it conservative
// but eventually can be tuned in future,
// take only half the CPUs for replication
// conservatively.
globalReplicationConcurrent = runtime.GOMAXPROCS(0) / 2
)
func newReplicationState() *replicationState { func newReplicationState() *replicationState {
return &replicationState{ return &replicationState{
// TODO: currently keeping it conservative replicaCh: make(chan ObjectInfo, globalReplicationConcurrent*2),
// but eventually can be tuned in future
replicaCh: make(chan ObjectInfo, 100),
} }
} }
func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) { // addWorker creates a new worker to process tasks
if globalReplicationState == nil { func (r *replicationState) addWorker(ctx context.Context, objectAPI ObjectLayer) {
return // Add a new worker.
}
go func() { go func() {
defer close(globalReplicationState.replicaCh)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
close(r.replicaCh)
return return
case oi := <-globalReplicationState.replicaCh: case oi, ok := <-r.replicaCh:
if !ok {
return
}
replicateObject(ctx, oi, objectAPI) replicateObject(ctx, oi, objectAPI)
} }
} }
}() }()
} }
func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) {
if globalReplicationState == nil {
return
}
// Start with globalReplicationConcurrent.
for i := 0; i < globalReplicationConcurrent; i++ {
globalReplicationState.addWorker(ctx, objectAPI)
}
}

Loading…
Cancel
Save