serialize replication and feed it through task model (#10500)

this allows for eventually controlling the concurrency
of replication and overally control of throughput
master
Harshavardhana 4 years ago committed by GitHub
parent 24cab7f9df
commit d616d8a857
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 80
      cmd/bucket-replication.go
  2. 5
      cmd/data-crawler.go
  3. 36
      cmd/object-handlers.go
  4. 4
      cmd/server-main.go
  5. 11
      cmd/web-handlers.go
  6. 31
      pkg/event/name.go

@ -170,24 +170,29 @@ func putReplicationOpts(dest replication.Destination, objInfo ObjectInfo) (putOp
// replicateObject replicates the specified version of the object to destination bucket
// The source object is then updated to reflect the replication status.
func replicateObject(ctx context.Context, bucket, object, versionID string, objectAPI ObjectLayer, eventArg *eventArgs, healPending bool) {
func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLayer) {
bucket := objInfo.Bucket
object := objInfo.Name
cfg, err := getReplicationConfig(ctx, bucket)
if err != nil {
logger.LogIf(ctx, err)
return
}
tgt := globalBucketTargetSys.GetReplicationTargetClient(ctx, cfg.RoleArn)
if tgt == nil {
return
}
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
VersionID: versionID,
VersionID: objInfo.VersionID,
})
if err != nil {
return
}
objInfo := gr.ObjInfo
objInfo = gr.ObjInfo
size, err := objInfo.GetActualSize()
if err != nil {
logger.LogIf(ctx, err)
@ -200,6 +205,11 @@ func replicateObject(ctx context.Context, bucket, object, versionID string, obje
gr.Close()
return
}
// if heal encounters a pending replication status, either replication
// has failed due to server shutdown or crawler and PutObject replication are in contention.
healPending := objInfo.ReplicationStatus == replication.Pending
// In the rare event that replication is in pending state either due to
// server shut down/crash before replication completed or healing and PutObject
// race - do an additional stat to see if the version ID exists
@ -219,22 +229,25 @@ func replicateObject(ctx context.Context, bucket, object, versionID string, obje
gr.Close()
if err != nil {
replicationStatus = replication.Failed
// Notify replication failure event.
if eventArg == nil {
eventArg = &eventArgs{
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
}
}
eventArg.EventName = event.OperationReplicationFailed
eventArg.Object.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String()
sendEvent(*eventArg)
}
objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String()
if objInfo.UserTags != "" {
objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
}
// FIXME: add support for missing replication events
// - event.ObjectReplicationNotTracked
// - event.ObjectReplicationMissedThreshold
// - event.ObjectReplicationReplicatedAfterThreshold
if replicationStatus == replication.Failed {
sendEvent(eventArgs{
EventName: event.ObjectReplicationFailed,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [Replication]",
})
}
objInfo.metadataOnly = true // Perform only metadata updates.
if _, err = objectAPI.CopyObject(ctx, bucket, object, bucket, object, objInfo, ObjectOptions{
VersionID: objInfo.VersionID,
@ -267,3 +280,42 @@ func filterReplicationStatusMetadata(metadata map[string]string) map[string]stri
delKey(xhttp.AmzBucketReplicationStatus)
return dst
}
type replicationState struct {
// add future metrics here
replicaCh chan ObjectInfo
}
func (r *replicationState) queueReplicaTask(oi ObjectInfo) {
select {
case r.replicaCh <- oi:
default:
}
}
var globalReplicationState *replicationState
func newReplicationState() *replicationState {
return &replicationState{
// TODO: currently keeping it conservative
// but eventually can be tuned in future
replicaCh: make(chan ObjectInfo, 100),
}
}
func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) {
if globalReplicationState == nil {
return
}
go func() {
defer close(globalReplicationState.replicaCh)
for {
select {
case <-ctx.Done():
return
case oi := <-globalReplicationState.replicaCh:
replicateObject(ctx, oi, objectAPI)
}
}
}()
}

@ -766,9 +766,6 @@ func sleepDuration(d time.Duration, x float64) {
func (i *crawlItem) healReplication(ctx context.Context, o ObjectLayer, meta actionMeta) {
if meta.oi.ReplicationStatus == replication.Pending ||
meta.oi.ReplicationStatus == replication.Failed {
// if heal encounters a pending replication status, either replication
// has failed due to server shutdown or crawler and PutObject replication are in contention.
healPending := meta.oi.ReplicationStatus == replication.Pending
replicateObject(ctx, meta.oi.Bucket, meta.oi.Name, meta.oi.VersionID, o, nil, healPending)
globalReplicationState.queueReplicaTask(meta.oi)
}
}

@ -1251,16 +1251,9 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
response := generateCopyObjectResponse(objInfo.ETag, objInfo.ModTime)
encodedSuccessResponse := encodeResponse(response)
if mustReplicate(ctx, r, dstBucket, dstObject, objInfo.UserDefined, objInfo.ReplicationStatus.String()) {
go replicateObject(GlobalContext, dstBucket, dstObject, objInfo.VersionID, objectAPI, &eventArgs{
EventName: event.ObjectCreatedCopy,
BucketName: dstBucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
}, false)
globalReplicationState.queueReplicaTask(objInfo)
}
setPutObjHeaders(w, objInfo, false)
// We must not use the http.Header().Set method here because some (broken)
// clients expect the x-amz-copy-source-version-id header key to be literally
@ -1504,7 +1497,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
return
}
if mustReplicate(ctx, r, bucket, object, metadata, "") {
metadata[xhttp.AmzBucketReplicationStatus] = string(replication.Pending)
metadata[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()
}
if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() {
if s3Err = isPutActionAllowed(getRequestAuthType(r), bucket, object, r, iampolicy.ReplicateObjectAction); s3Err != ErrNone {
@ -1567,15 +1560,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
}
}
if mustReplicate(ctx, r, bucket, object, metadata, "") {
go replicateObject(GlobalContext, bucket, object, objInfo.VersionID, objectAPI, &eventArgs{
EventName: event.ObjectCreatedPut,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
}, false)
globalReplicationState.queueReplicaTask(objInfo)
}
setPutObjHeaders(w, objInfo, false)
@ -1692,7 +1677,7 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
return
}
if mustReplicate(ctx, r, bucket, object, metadata, "") {
metadata[xhttp.AmzBucketReplicationStatus] = string(replication.Pending)
metadata[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()
}
// We need to preserve the encryption headers set in EncryptRequest,
// so we do not want to override them, copy them instead.
@ -2645,16 +2630,9 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
setPutObjHeaders(w, objInfo, false)
if mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String()) {
go replicateObject(GlobalContext, bucket, object, objInfo.VersionID, objectAPI, &eventArgs{
EventName: event.ObjectCreatedCompleteMultipartUpload,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
}, false)
globalReplicationState.queueReplicaTask(objInfo)
}
// Write success response.
writeSuccessResponseXML(w, encodedSuccessResponse)

@ -224,9 +224,10 @@ func initSafeMode(ctx context.Context, newObject ObjectLayer) (err error) {
}
}(txnLk)
// Enable healing to heal drives if possible
// Enable background operations for erasure coding
if globalIsErasure {
initAutoHeal(ctx, newObject)
initBackgroundReplication(ctx, newObject)
}
// allocate dynamic timeout once before the loop
@ -444,6 +445,7 @@ func serverMain(ctx *cli.Context) {
// New global heal state
globalAllHealState = newHealState()
globalBackgroundHealState = newHealState()
globalReplicationState = newReplicationState()
}
// Initialize all sub-systems

@ -1166,16 +1166,9 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
}
}
if mustReplicate {
go replicateObject(GlobalContext, bucket, object, objInfo.VersionID, objectAPI, &eventArgs{
EventName: event.ObjectCreatedPut,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
}, false)
globalReplicationState.queueReplicaTask(objInfo)
}
// Notify object created event.
sendEvent(eventArgs{
EventName: event.ObjectCreatedPut,

@ -45,7 +45,11 @@ const (
ObjectRemovedDeleteMarkerCreated
BucketCreated
BucketRemoved
OperationReplicationFailed
ObjectReplicationAll
ObjectReplicationFailed
ObjectReplicationMissedThreshold
ObjectReplicationReplicatedAfterThreshold
ObjectReplicationNotTracked
)
// Expand - returns expanded values of abbreviated event type.
@ -71,6 +75,13 @@ func (name Name) Expand() []Name {
ObjectRemovedDelete,
ObjectRemovedDeleteMarkerCreated,
}
case ObjectReplicationAll:
return []Name{
ObjectReplicationFailed,
ObjectReplicationNotTracked,
ObjectReplicationMissedThreshold,
ObjectReplicationReplicatedAfterThreshold,
}
default:
return []Name{name}
}
@ -113,8 +124,14 @@ func (name Name) String() string {
return "s3:ObjectRemoved:Delete"
case ObjectRemovedDeleteMarkerCreated:
return "s3:ObjectRemoved:DeleteMarkerCreated"
case OperationReplicationFailed:
case ObjectReplicationFailed:
return "s3:Replication:OperationFailedReplication"
case ObjectReplicationNotTracked:
return "s3:Replication:OperationNotTracked"
case ObjectReplicationMissedThreshold:
return "s3:Replication:OperationMissedThreshold"
case ObjectReplicationReplicatedAfterThreshold:
return "s3:Replication:OperationReplicatedAfterThreshold"
}
return ""
@ -199,8 +216,16 @@ func ParseName(s string) (Name, error) {
return ObjectRemovedDelete, nil
case "s3:ObjectRemoved:DeleteMarkerCreated":
return ObjectRemovedDeleteMarkerCreated, nil
case "s3:Replication:*":
return ObjectReplicationAll, nil
case "s3:Replication:OperationFailedReplication":
return OperationReplicationFailed, nil
return ObjectReplicationFailed, nil
case "s3:Replication:OperationMissedThreshold":
return ObjectReplicationMissedThreshold, nil
case "s3:Replication:OperationReplicatedAfterThreshold":
return ObjectReplicationReplicatedAfterThreshold, nil
case "s3:Replication:OperationNotTracked":
return ObjectReplicationNotTracked, nil
default:
return 0, &ErrInvalidEventName{s}
}

Loading…
Cancel
Save