From d616d8a8570f6641449cb8fd0874581398452f6f Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 16 Sep 2020 16:04:55 -0700 Subject: [PATCH] serialize replication and feed it through task model (#10500) this allows for eventually controlling the concurrency of replication and overally control of throughput --- cmd/bucket-replication.go | 80 ++++++++++++++++++++++++++++++++------- cmd/data-crawler.go | 5 +-- cmd/object-handlers.go | 36 ++++-------------- cmd/server-main.go | 4 +- cmd/web-handlers.go | 11 +----- pkg/event/name.go | 31 +++++++++++++-- 6 files changed, 107 insertions(+), 60 deletions(-) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index a2ac13d49..53af69207 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -170,24 +170,29 @@ func putReplicationOpts(dest replication.Destination, objInfo ObjectInfo) (putOp // replicateObject replicates the specified version of the object to destination bucket // The source object is then updated to reflect the replication status. -func replicateObject(ctx context.Context, bucket, object, versionID string, objectAPI ObjectLayer, eventArg *eventArgs, healPending bool) { +func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLayer) { + bucket := objInfo.Bucket + object := objInfo.Name + cfg, err := getReplicationConfig(ctx, bucket) if err != nil { logger.LogIf(ctx, err) return } + tgt := globalBucketTargetSys.GetReplicationTargetClient(ctx, cfg.RoleArn) if tgt == nil { return } + gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{ - VersionID: versionID, + VersionID: objInfo.VersionID, }) if err != nil { return } - objInfo := gr.ObjInfo + objInfo = gr.ObjInfo size, err := objInfo.GetActualSize() if err != nil { logger.LogIf(ctx, err) @@ -200,6 +205,11 @@ func replicateObject(ctx context.Context, bucket, object, versionID string, obje gr.Close() return } + + // if heal encounters a pending replication status, either replication + // has failed due to server shutdown or crawler and PutObject replication are in contention. + healPending := objInfo.ReplicationStatus == replication.Pending + // In the rare event that replication is in pending state either due to // server shut down/crash before replication completed or healing and PutObject // race - do an additional stat to see if the version ID exists @@ -219,22 +229,25 @@ func replicateObject(ctx context.Context, bucket, object, versionID string, obje gr.Close() if err != nil { replicationStatus = replication.Failed - // Notify replication failure event. - if eventArg == nil { - eventArg = &eventArgs{ - BucketName: bucket, - Object: objInfo, - Host: "Internal: [Replication]", - } - } - eventArg.EventName = event.OperationReplicationFailed - eventArg.Object.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String() - sendEvent(*eventArg) } objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String() if objInfo.UserTags != "" { objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags } + + // FIXME: add support for missing replication events + // - event.ObjectReplicationNotTracked + // - event.ObjectReplicationMissedThreshold + // - event.ObjectReplicationReplicatedAfterThreshold + if replicationStatus == replication.Failed { + sendEvent(eventArgs{ + EventName: event.ObjectReplicationFailed, + BucketName: bucket, + Object: objInfo, + Host: "Internal: [Replication]", + }) + } + objInfo.metadataOnly = true // Perform only metadata updates. if _, err = objectAPI.CopyObject(ctx, bucket, object, bucket, object, objInfo, ObjectOptions{ VersionID: objInfo.VersionID, @@ -267,3 +280,42 @@ func filterReplicationStatusMetadata(metadata map[string]string) map[string]stri delKey(xhttp.AmzBucketReplicationStatus) return dst } + +type replicationState struct { + // add future metrics here + replicaCh chan ObjectInfo +} + +func (r *replicationState) queueReplicaTask(oi ObjectInfo) { + select { + case r.replicaCh <- oi: + default: + } +} + +var globalReplicationState *replicationState + +func newReplicationState() *replicationState { + return &replicationState{ + // TODO: currently keeping it conservative + // but eventually can be tuned in future + replicaCh: make(chan ObjectInfo, 100), + } +} + +func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) { + if globalReplicationState == nil { + return + } + go func() { + defer close(globalReplicationState.replicaCh) + for { + select { + case <-ctx.Done(): + return + case oi := <-globalReplicationState.replicaCh: + replicateObject(ctx, oi, objectAPI) + } + } + }() +} diff --git a/cmd/data-crawler.go b/cmd/data-crawler.go index 975735cfe..10a6eead3 100644 --- a/cmd/data-crawler.go +++ b/cmd/data-crawler.go @@ -766,9 +766,6 @@ func sleepDuration(d time.Duration, x float64) { func (i *crawlItem) healReplication(ctx context.Context, o ObjectLayer, meta actionMeta) { if meta.oi.ReplicationStatus == replication.Pending || meta.oi.ReplicationStatus == replication.Failed { - // if heal encounters a pending replication status, either replication - // has failed due to server shutdown or crawler and PutObject replication are in contention. - healPending := meta.oi.ReplicationStatus == replication.Pending - replicateObject(ctx, meta.oi.Bucket, meta.oi.Name, meta.oi.VersionID, o, nil, healPending) + globalReplicationState.queueReplicaTask(meta.oi) } } diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 8eb76554a..fc0b6f977 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -1251,16 +1251,9 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re response := generateCopyObjectResponse(objInfo.ETag, objInfo.ModTime) encodedSuccessResponse := encodeResponse(response) if mustReplicate(ctx, r, dstBucket, dstObject, objInfo.UserDefined, objInfo.ReplicationStatus.String()) { - go replicateObject(GlobalContext, dstBucket, dstObject, objInfo.VersionID, objectAPI, &eventArgs{ - EventName: event.ObjectCreatedCopy, - BucketName: dstBucket, - Object: objInfo, - ReqParams: extractReqParams(r), - RespElements: extractRespElements(w), - UserAgent: r.UserAgent(), - Host: handlers.GetSourceIP(r), - }, false) + globalReplicationState.queueReplicaTask(objInfo) } + setPutObjHeaders(w, objInfo, false) // We must not use the http.Header().Set method here because some (broken) // clients expect the x-amz-copy-source-version-id header key to be literally @@ -1504,7 +1497,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req return } if mustReplicate(ctx, r, bucket, object, metadata, "") { - metadata[xhttp.AmzBucketReplicationStatus] = string(replication.Pending) + metadata[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() { if s3Err = isPutActionAllowed(getRequestAuthType(r), bucket, object, r, iampolicy.ReplicateObjectAction); s3Err != ErrNone { @@ -1567,15 +1560,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req } } if mustReplicate(ctx, r, bucket, object, metadata, "") { - go replicateObject(GlobalContext, bucket, object, objInfo.VersionID, objectAPI, &eventArgs{ - EventName: event.ObjectCreatedPut, - BucketName: bucket, - Object: objInfo, - ReqParams: extractReqParams(r), - RespElements: extractRespElements(w), - UserAgent: r.UserAgent(), - Host: handlers.GetSourceIP(r), - }, false) + globalReplicationState.queueReplicaTask(objInfo) } setPutObjHeaders(w, objInfo, false) @@ -1692,7 +1677,7 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r return } if mustReplicate(ctx, r, bucket, object, metadata, "") { - metadata[xhttp.AmzBucketReplicationStatus] = string(replication.Pending) + metadata[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } // We need to preserve the encryption headers set in EncryptRequest, // so we do not want to override them, copy them instead. @@ -2645,16 +2630,9 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite setPutObjHeaders(w, objInfo, false) if mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String()) { - go replicateObject(GlobalContext, bucket, object, objInfo.VersionID, objectAPI, &eventArgs{ - EventName: event.ObjectCreatedCompleteMultipartUpload, - BucketName: bucket, - Object: objInfo, - ReqParams: extractReqParams(r), - RespElements: extractRespElements(w), - UserAgent: r.UserAgent(), - Host: handlers.GetSourceIP(r), - }, false) + globalReplicationState.queueReplicaTask(objInfo) } + // Write success response. writeSuccessResponseXML(w, encodedSuccessResponse) diff --git a/cmd/server-main.go b/cmd/server-main.go index 594b5d88e..225414b1c 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -224,9 +224,10 @@ func initSafeMode(ctx context.Context, newObject ObjectLayer) (err error) { } }(txnLk) - // Enable healing to heal drives if possible + // Enable background operations for erasure coding if globalIsErasure { initAutoHeal(ctx, newObject) + initBackgroundReplication(ctx, newObject) } // allocate dynamic timeout once before the loop @@ -444,6 +445,7 @@ func serverMain(ctx *cli.Context) { // New global heal state globalAllHealState = newHealState() globalBackgroundHealState = newHealState() + globalReplicationState = newReplicationState() } // Initialize all sub-systems diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index 6854f18dc..a27def091 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -1166,16 +1166,9 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { } } if mustReplicate { - go replicateObject(GlobalContext, bucket, object, objInfo.VersionID, objectAPI, &eventArgs{ - EventName: event.ObjectCreatedPut, - BucketName: bucket, - Object: objInfo, - ReqParams: extractReqParams(r), - RespElements: extractRespElements(w), - UserAgent: r.UserAgent(), - Host: handlers.GetSourceIP(r), - }, false) + globalReplicationState.queueReplicaTask(objInfo) } + // Notify object created event. sendEvent(eventArgs{ EventName: event.ObjectCreatedPut, diff --git a/pkg/event/name.go b/pkg/event/name.go index e698b57c6..5a8b07f69 100644 --- a/pkg/event/name.go +++ b/pkg/event/name.go @@ -45,7 +45,11 @@ const ( ObjectRemovedDeleteMarkerCreated BucketCreated BucketRemoved - OperationReplicationFailed + ObjectReplicationAll + ObjectReplicationFailed + ObjectReplicationMissedThreshold + ObjectReplicationReplicatedAfterThreshold + ObjectReplicationNotTracked ) // Expand - returns expanded values of abbreviated event type. @@ -71,6 +75,13 @@ func (name Name) Expand() []Name { ObjectRemovedDelete, ObjectRemovedDeleteMarkerCreated, } + case ObjectReplicationAll: + return []Name{ + ObjectReplicationFailed, + ObjectReplicationNotTracked, + ObjectReplicationMissedThreshold, + ObjectReplicationReplicatedAfterThreshold, + } default: return []Name{name} } @@ -113,8 +124,14 @@ func (name Name) String() string { return "s3:ObjectRemoved:Delete" case ObjectRemovedDeleteMarkerCreated: return "s3:ObjectRemoved:DeleteMarkerCreated" - case OperationReplicationFailed: + case ObjectReplicationFailed: return "s3:Replication:OperationFailedReplication" + case ObjectReplicationNotTracked: + return "s3:Replication:OperationNotTracked" + case ObjectReplicationMissedThreshold: + return "s3:Replication:OperationMissedThreshold" + case ObjectReplicationReplicatedAfterThreshold: + return "s3:Replication:OperationReplicatedAfterThreshold" } return "" @@ -199,8 +216,16 @@ func ParseName(s string) (Name, error) { return ObjectRemovedDelete, nil case "s3:ObjectRemoved:DeleteMarkerCreated": return ObjectRemovedDeleteMarkerCreated, nil + case "s3:Replication:*": + return ObjectReplicationAll, nil case "s3:Replication:OperationFailedReplication": - return OperationReplicationFailed, nil + return ObjectReplicationFailed, nil + case "s3:Replication:OperationMissedThreshold": + return ObjectReplicationMissedThreshold, nil + case "s3:Replication:OperationReplicatedAfterThreshold": + return ObjectReplicationReplicatedAfterThreshold, nil + case "s3:Replication:OperationNotTracked": + return ObjectReplicationNotTracked, nil default: return 0, &ErrInvalidEventName{s} }