From fe3aca70c3d2478a956250254da172bb55c5cc34 Mon Sep 17 00:00:00 2001 From: Poorna Krishnamoorthy Date: Tue, 2 Feb 2021 03:15:06 -0800 Subject: [PATCH] Make number of replication workers configurable. (#11379) MINIO_API_REPLICATION_WORKERS env.var and `mc admin config set api` allow number of replication workers to be configurable. Defaults to half the number of cpus available. Co-authored-by: Poorna Krishnamoorthy --- cmd/bucket-replication.go | 15 ++------------- cmd/config/api/api.go | 31 ++++++++++++++++++++++--------- cmd/config/api/help.go | 6 ++++++ cmd/config/errors.go | 6 ++++++ cmd/handler-api.go | 11 ++++++++++- cmd/server-main.go | 4 +++- 6 files changed, 49 insertions(+), 24 deletions(-) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 7125f7d7b..83b5dfd58 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "net/http" - "runtime" "strings" "time" @@ -631,19 +630,9 @@ func (r *replicationState) queueReplicaDeleteTask(doi DeletedObjectVersionInfo) var ( globalReplicationState *replicationState - // TODO: currently keeping it conservative - // but eventually can be tuned in future, - // take only half the CPUs for replication - // conservatively. - globalReplicationConcurrent = runtime.GOMAXPROCS(0) / 2 ) func newReplicationState() *replicationState { - - // fix minimum concurrent replication to 1 for single CPU setup - if globalReplicationConcurrent == 0 { - globalReplicationConcurrent = 1 - } rs := &replicationState{ replicaCh: make(chan ObjectInfo, 10000), replicaDeleteCh: make(chan DeletedObjectVersionInfo, 10000), @@ -684,8 +673,8 @@ func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) { return } - // Start with globalReplicationConcurrent. - for i := 0; i < globalReplicationConcurrent; i++ { + // Start replication workers per count set in api config or MINIO_API_REPLICATION_WORKERS. + for i := 0; i < globalAPIConfig.getReplicationWorkers(); i++ { globalReplicationState.addWorker(ctx, objectAPI) } } diff --git a/cmd/config/api/api.go b/cmd/config/api/api.go index dd37d2aff..9d8d23524 100644 --- a/cmd/config/api/api.go +++ b/cmd/config/api/api.go @@ -29,14 +29,14 @@ import ( // API sub-system constants const ( - apiRequestsMax = "requests_max" - apiRequestsDeadline = "requests_deadline" - apiClusterDeadline = "cluster_deadline" - apiCorsAllowOrigin = "cors_allow_origin" - apiRemoteTransportDeadline = "remote_transport_deadline" - apiListQuorum = "list_quorum" - apiExtendListCacheLife = "extend_list_cache_life" - + apiRequestsMax = "requests_max" + apiRequestsDeadline = "requests_deadline" + apiClusterDeadline = "cluster_deadline" + apiCorsAllowOrigin = "cors_allow_origin" + apiRemoteTransportDeadline = "remote_transport_deadline" + apiListQuorum = "list_quorum" + apiExtendListCacheLife = "extend_list_cache_life" + apiReplicationWorkers = "replication_workers" EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX" EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE" EnvAPIClusterDeadline = "MINIO_API_CLUSTER_DEADLINE" @@ -45,6 +45,7 @@ const ( EnvAPIListQuorum = "MINIO_API_LIST_QUORUM" EnvAPIExtendListCacheLife = "MINIO_API_EXTEND_LIST_CACHE_LIFE" EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS" + EnvAPIReplicationWorkers = "MINIO_API_REPLICATION_WORKERS" ) // Deprecated key and ENVs @@ -84,6 +85,10 @@ var ( Key: apiExtendListCacheLife, Value: "0s", }, + config.KV{ + Key: apiReplicationWorkers, + Value: "100", + }, } ) @@ -96,6 +101,7 @@ type Config struct { RemoteTransportDeadline time.Duration `json:"remote_transport_deadline"` ListQuorum string `json:"list_strict_quorum"` ExtendListLife time.Duration `json:"extend_list_cache_life"` + ReplicationWorkers int `json:"replication_workers"` } // UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON. @@ -172,7 +178,13 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) { if err != nil { return cfg, err } - + replicationWorkers, err := strconv.Atoi(env.Get(EnvAPIReplicationWorkers, kvs.Get(apiReplicationWorkers))) + if err != nil { + return cfg, err + } + if replicationWorkers <= 0 { + return cfg, config.ErrInvalidReplicationWorkersValue(nil).Msg("Minimum number of replication workers should be 1") + } return Config{ RequestsMax: requestsMax, RequestsDeadline: requestsDeadline, @@ -181,5 +193,6 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) { RemoteTransportDeadline: remoteTransportDeadline, ListQuorum: listQuorum, ExtendListLife: listLife, + ReplicationWorkers: replicationWorkers, }, nil } diff --git a/cmd/config/api/help.go b/cmd/config/api/help.go index c176b60d1..bb39d27c2 100644 --- a/cmd/config/api/help.go +++ b/cmd/config/api/help.go @@ -45,5 +45,11 @@ var ( Optional: true, Type: "duration", }, + config.HelpKV{ + Key: apiReplicationWorkers, + Description: `set the number of replication workers, defaults to 100`, + Optional: true, + Type: "number", + }, } ) diff --git a/cmd/config/errors.go b/cmd/config/errors.go index 67690a074..45a5957ee 100644 --- a/cmd/config/errors.go +++ b/cmd/config/errors.go @@ -281,4 +281,10 @@ Example 1: "", "Refer to https://docs.min.io/docs/minio-kms-quickstart-guide.html for setting up SSE", ) + + ErrInvalidReplicationWorkersValue = newErrFn( + "Invalid value for replication workers", + "", + "MINIO_API_REPLICATION_WORKERS: should be > 0", + ) ) diff --git a/cmd/handler-api.go b/cmd/handler-api.go index 20b548b0d..366bc9cb0 100644 --- a/cmd/handler-api.go +++ b/cmd/handler-api.go @@ -36,7 +36,8 @@ type apiConfig struct { extendListLife time.Duration corsAllowOrigins []string // total drives per erasure set across pools. - totalDriveCount int + totalDriveCount int + replicationWorkers int } func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) { @@ -78,6 +79,7 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) { t.requestsDeadline = cfg.RequestsDeadline t.listQuorum = cfg.GetListQuorum() t.extendListLife = cfg.ExtendListLife + t.replicationWorkers = cfg.ReplicationWorkers } func (t *apiConfig) getListQuorum() int { @@ -152,3 +154,10 @@ func maxClients(f http.HandlerFunc) http.HandlerFunc { } } } + +func (t *apiConfig) getReplicationWorkers() int { + t.mu.RLock() + defer t.mu.RUnlock() + + return t.replicationWorkers +} diff --git a/cmd/server-main.go b/cmd/server-main.go index 655c95cb2..64542b7a5 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -492,7 +492,6 @@ func serverMain(ctx *cli.Context) { // Enable background operations for erasure coding if globalIsErasure { initAutoHeal(GlobalContext, newObject) - initBackgroundReplication(GlobalContext, newObject) initBackgroundTransition(GlobalContext, newObject) initBackgroundExpiry(GlobalContext, newObject) } @@ -513,6 +512,9 @@ func serverMain(ctx *cli.Context) { } } + if globalIsErasure { // to be done after config init + initBackgroundReplication(GlobalContext, newObject) + } if globalCacheConfig.Enabled { // initialize the new disk cache objects. var cacheAPI CacheObjectLayer