From 7824e19d20c71b7e4f11d45ef106a3e12d0a209c Mon Sep 17 00:00:00 2001 From: Poorna Krishnamoorthy Date: Mon, 11 Jan 2021 22:36:51 -0800 Subject: [PATCH] Allow synchronous replication if enabled. (#11165) Synchronous replication can be enabled by setting the --sync flag while adding a remote replication target. This PR also adds proxying on GET/HEAD to another node in a active-active replication setup in the event of a 404 on the current node. --- cmd/bucket-handlers.go | 14 +- cmd/bucket-lifecycle.go | 4 +- cmd/bucket-replication.go | 210 ++++++++++++++++++++++++--- cmd/bucket-targets.go | 58 +++++++- cmd/erasure-object.go | 17 +++ cmd/http/headers.go | 2 + cmd/object-api-interface.go | 2 + cmd/object-api-options.go | 1 + cmd/object-handlers.go | 42 +++--- cmd/web-handlers.go | 23 +-- docs/bucket/replication/README.md | 8 + go.mod | 2 +- go.sum | 12 +- pkg/madmin/remote-target-commands.go | 52 ++++--- 14 files changed, 347 insertions(+), 100 deletions(-) diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index f7c558b19..d116a240c 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -405,9 +405,9 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, getObjectInfoFn = api.CacheAPI().GetObjectInfo } var ( - hasLockEnabled, hasLifecycleConfig bool - goi ObjectInfo - gerr error + hasLockEnabled, hasLifecycleConfig, replicateSync bool + goi ObjectInfo + gerr error ) replicateDeletes := hasReplicationRules(ctx, bucket, deleteObjects.Objects) if rcfg, _ := globalBucketObjectLockSys.Get(bucket); rcfg.LockEnabled { @@ -455,10 +455,11 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, object.PurgeTransitioned = goi.TransitionStatus } if replicateDeletes { - delMarker, replicate := checkReplicateDelete(ctx, bucket, ObjectToDelete{ + delMarker, replicate, repsync := checkReplicateDelete(ctx, bucket, ObjectToDelete{ ObjectName: object.ObjectName, VersionID: object.VersionID, }, goi, gerr) + replicateSync = repsync if replicate { if object.VersionID != "" { object.VersionPurgeStatus = Pending @@ -549,10 +550,11 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, for _, dobj := range deletedObjects { if replicateDeletes { if dobj.DeleteMarkerReplicationStatus == string(replication.Pending) || dobj.VersionPurgeStatus == Pending { - globalReplicationState.queueReplicaDeleteTask(DeletedObjectVersionInfo{ + dv := DeletedObjectVersionInfo{ DeletedObject: dobj, Bucket: bucket, - }) + } + scheduleReplicationDelete(ctx, dv, objectAPI, replicateSync) } } diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index 93b1b9c08..0e11deef8 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -346,7 +346,7 @@ func transitionObject(ctx context.Context, objectAPI ObjectLayer, objInfo Object } putOpts := putTransitionOpts(oi) - if _, err = tgt.PutObject(ctx, arn.Bucket, oi.Name, gr, oi.Size, "", "", putOpts); err != nil { + if _, err = tgt.PutObject(ctx, arn.Bucket, oi.Name, gr, oi.Size, putOpts); err != nil { gr.Close() return err } @@ -421,7 +421,7 @@ func getTransitionedObjectReader(ctx context.Context, bucket, object string, rs } } - reader, _, _, err := tgt.GetObject(ctx, arn.Bucket, object, gopts) + reader, err := tgt.GetObject(ctx, arn.Bucket, object, gopts) if err != nil { return nil, err } diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 719b762b8..4f12603e6 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -88,35 +88,37 @@ func validateReplicationDestination(ctx context.Context, bucket string, rCfg *re return false, BucketRemoteTargetNotFound{Bucket: bucket} } -func mustReplicateWeb(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string, permErr APIErrorCode) bool { +func mustReplicateWeb(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string, permErr APIErrorCode) (replicate bool, sync bool) { if permErr != ErrNone { - return false + return } return mustReplicater(ctx, bucket, object, meta, replStatus) } -// mustReplicate returns true if object meets replication criteria. -func mustReplicate(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string) bool { +// mustReplicate returns 2 booleans - true if object meets replication criteria and true if replication is to be done in +// a synchronous manner. +func mustReplicate(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string) (replicate bool, sync bool) { if s3Err := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, "", r, iampolicy.GetReplicationConfigurationAction); s3Err != ErrNone { - return false + return } return mustReplicater(ctx, bucket, object, meta, replStatus) } -// mustReplicater returns true if object meets replication criteria. -func mustReplicater(ctx context.Context, bucket, object string, meta map[string]string, replStatus string) bool { +// mustReplicater returns 2 booleans - true if object meets replication criteria and true if replication is to be done in +// a synchronous manner. +func mustReplicater(ctx context.Context, bucket, object string, meta map[string]string, replStatus string) (replicate bool, sync bool) { if globalIsGateway { - return false + return replicate, sync } if rs, ok := meta[xhttp.AmzBucketReplicationStatus]; ok { replStatus = rs } if replication.StatusType(replStatus) == replication.Replica { - return false + return replicate, sync } cfg, err := getReplicationConfig(ctx, bucket) if err != nil { - return false + return replicate, sync } opts := replication.ObjectOpts{ Name: object, @@ -126,7 +128,20 @@ func mustReplicater(ctx context.Context, bucket, object string, meta map[string] if ok { opts.UserTags = tagStr } - return cfg.Replicate(opts) + tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, cfg.RoleArn) + if tgt == nil || tgt.isOffline() { + return cfg.Replicate(opts), false + } + return cfg.Replicate(opts), tgt.replicateSync +} + +// Standard headers that needs to be extracted from User metadata. +var standardHeaders = []string{ + "content-type", + "content-encoding", + xhttp.AmzStorageClass, + xhttp.AmzObjectTagging, + xhttp.AmzBucketReplicationStatus, } // returns true if any of the objects being deleted qualifies for replication. @@ -143,11 +158,22 @@ func hasReplicationRules(ctx context.Context, bucket string, objects []ObjectToD return false } +// isStandardHeader returns true if header is a supported header and not a custom header +func isStandardHeader(headerKey string) bool { + key := strings.ToLower(headerKey) + for _, header := range standardHeaders { + if strings.ToLower(header) == key { + return true + } + } + return false +} + // returns whether object version is a deletemarker and if object qualifies for replication -func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, gerr error) (dm, replicate bool) { +func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, gerr error) (dm, replicate, sync bool) { rcfg, err := getReplicationConfig(ctx, bucket) if err != nil || rcfg == nil { - return false, false + return false, false, sync } // when incoming delete is removal of a delete marker( a.k.a versioned delete), // GetObjectInfo returns extra information even though it returns errFileNotFound @@ -158,9 +184,13 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet validReplStatus = true } if oi.DeleteMarker && validReplStatus { - return oi.DeleteMarker, true + return oi.DeleteMarker, true, sync } - return oi.DeleteMarker, false + return oi.DeleteMarker, false, sync + } + tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn) + if tgt == nil || tgt.isOffline() { + return oi.DeleteMarker, false, false } opts := replication.ObjectOpts{ Name: dobj.ObjectName, @@ -169,7 +199,7 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet DeleteMarker: oi.DeleteMarker, VersionID: dobj.VersionID, } - return oi.DeleteMarker, rcfg.Replicate(opts) + return oi.DeleteMarker, rcfg.Replicate(opts), tgt.replicateSync } // replicate deletes to the designated replication target if replication configuration @@ -296,10 +326,10 @@ func getCopyObjMetadata(oi ObjectInfo, dest replication.Destination) map[string] func putReplicationOpts(ctx context.Context, dest replication.Destination, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions) { meta := make(map[string]string) for k, v := range objInfo.UserDefined { - if k == xhttp.AmzBucketReplicationStatus { + if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) { continue } - if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) { + if isStandardHeader(k) { continue } meta[k] = v @@ -413,6 +443,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa VersionID: objInfo.VersionID, }) if err != nil { + logger.LogIf(ctx, err) return } objInfo = gr.ObjInfo @@ -440,14 +471,14 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa return } } - replicationStatus := replication.Complete if rtype != replicateAll { gr.Close() // replicate metadata for object tagging/copy with metadata replacement dstOpts := miniogo.PutObjectOptions{Internal: miniogo.AdvancedPutOptions{SourceVersionID: objInfo.VersionID}} - _, err = tgt.CopyObject(ctx, dest.Bucket, object, dest.Bucket, object, getCopyObjMetadata(objInfo, dest), dstOpts) + c := &miniogo.Core{Client: tgt.Client} + _, err = c.CopyObject(ctx, dest.Bucket, object, dest.Bucket, object, getCopyObjMetadata(objInfo, dest), dstOpts) if err != nil { replicationStatus = replication.Failed } @@ -474,7 +505,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa // r takes over closing gr. r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, objInfo.Bucket, objInfo.Name, gr, headerSize, b, target.BandwidthLimit) - _, err = tgt.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts) + _, err = tgt.PutObject(ctx, dest.Bucket, object, r, size, putOpts) if err != nil { replicationStatus = replication.Failed } @@ -623,3 +654,140 @@ func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) { globalReplicationState.addWorker(ctx, objectAPI) } } + +// get Reader from replication target if active-active replication is in place and +// this node returns a 404 +func proxyGetToReplicationTarget(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, proxy bool) { + tgt, oi, proxy, err := proxyHeadToRepTarget(ctx, bucket, object, opts) + if !proxy || err != nil { + return nil, false + } + fn, off, length, err := NewGetObjectReader(rs, oi, opts) + if err != nil { + return nil, false + } + gopts := miniogo.GetObjectOptions{ + VersionID: opts.VersionID, + ServerSideEncryption: opts.ServerSideEncryption, + Internal: miniogo.AdvancedGetOptions{ + ReplicationProxyRequest: true, + }, + } + // get correct offsets for encrypted object + if off >= 0 && length >= 0 { + if err := gopts.SetRange(off, off+length-1); err != nil { + return nil, false + } + } + c := miniogo.Core{Client: tgt.Client} + + obj, _, _, err := c.GetObject(ctx, bucket, object, gopts) + if err != nil { + return nil, false + } + closeReader := func() { obj.Close() } + + reader, err := fn(obj, h, opts.CheckPrecondFn, closeReader) + if err != nil { + return nil, false + } + return reader, true +} + +// isProxyable returns true if replication config found for this bucket +func isProxyable(ctx context.Context, bucket string) bool { + cfg, err := getReplicationConfig(ctx, bucket) + if err != nil { + return false + } + dest := cfg.GetDestination() + return dest.Bucket == bucket +} +func proxyHeadToRepTarget(ctx context.Context, bucket, object string, opts ObjectOptions) (tgt *TargetClient, oi ObjectInfo, proxy bool, err error) { + // this option is set when active-active replication is in place between site A -> B, + // and site B does not have the object yet. + if opts.ProxyRequest { // true only when site B sets MinIOSourceProxyRequest header + return nil, oi, false, nil + } + cfg, err := getReplicationConfig(ctx, bucket) + if err != nil { + return nil, oi, false, err + } + dest := cfg.GetDestination() + if dest.Bucket != bucket { // not active-active + return nil, oi, false, err + } + ssec := false + if opts.ServerSideEncryption != nil { + ssec = opts.ServerSideEncryption.Type() == encrypt.SSEC + } + ropts := replication.ObjectOpts{ + Name: object, + SSEC: ssec, + } + if !cfg.Replicate(ropts) { // no matching rule for object prefix + return nil, oi, false, nil + } + tgt = globalBucketTargetSys.GetRemoteTargetClient(ctx, cfg.RoleArn) + if tgt == nil || tgt.isOffline() { + return nil, oi, false, fmt.Errorf("missing target") + } + + gopts := miniogo.GetObjectOptions{ + VersionID: opts.VersionID, + ServerSideEncryption: opts.ServerSideEncryption, + Internal: miniogo.AdvancedGetOptions{ + ReplicationProxyRequest: true, + }, + } + + objInfo, err := tgt.StatObject(ctx, dest.Bucket, object, gopts) + if err != nil { + return nil, oi, false, err + } + tags, _ := tags.MapToObjectTags(objInfo.UserTags) + oi = ObjectInfo{ + Bucket: bucket, + Name: object, + ModTime: objInfo.LastModified, + Size: objInfo.Size, + ETag: objInfo.ETag, + VersionID: objInfo.VersionID, + IsLatest: objInfo.IsLatest, + DeleteMarker: objInfo.IsDeleteMarker, + ContentType: objInfo.ContentType, + Expires: objInfo.Expires, + StorageClass: objInfo.StorageClass, + ReplicationStatus: replication.StatusType(objInfo.ReplicationStatus), + UserDefined: cloneMSS(objInfo.UserMetadata), + UserTags: tags.String(), + } + if ce, ok := oi.UserDefined[xhttp.ContentEncoding]; ok { + oi.ContentEncoding = ce + delete(oi.UserDefined, xhttp.ContentEncoding) + } + return tgt, oi, true, nil +} + +// get object info from replication target if active-active replication is in place and +// this node returns a 404 +func proxyHeadToReplicationTarget(ctx context.Context, bucket, object string, opts ObjectOptions) (oi ObjectInfo, proxy bool, err error) { + _, oi, proxy, err = proxyHeadToRepTarget(ctx, bucket, object, opts) + return oi, proxy, err +} + +func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, sync bool) { + if sync { + replicateObject(ctx, objInfo, o) + } else { + globalReplicationState.queueReplicaTask(objInfo) + } +} + +func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectVersionInfo, o ObjectLayer, sync bool) { + if sync { + replicateDelete(ctx, dv, o) + } else { + globalReplicationState.queueReplicaDeleteTask(dv) + } +} diff --git a/cmd/bucket-targets.go b/cmd/bucket-targets.go index f5ad765fb..8f9f1522e 100644 --- a/cmd/bucket-targets.go +++ b/cmd/bucket-targets.go @@ -23,6 +23,7 @@ import ( "net/http" "strings" "sync" + "sync/atomic" "time" minio "github.com/minio/minio-go/v7" @@ -34,10 +35,14 @@ import ( sha256 "github.com/minio/sha256-simd" ) +const ( + defaultHealthCheckDuration = 60 * time.Second +) + // BucketTargetSys represents bucket targets subsystem type BucketTargetSys struct { sync.RWMutex - arnRemotesMap map[string]*miniogo.Core + arnRemotesMap map[string]*TargetClient targetsMap map[string][]madmin.BucketTarget } @@ -219,7 +224,7 @@ func (sys *BucketTargetSys) RemoveTarget(ctx context.Context, bucket, arnStr str } // GetRemoteTargetClient returns minio-go client for replication target instance -func (sys *BucketTargetSys) GetRemoteTargetClient(ctx context.Context, arn string) *miniogo.Core { +func (sys *BucketTargetSys) GetRemoteTargetClient(ctx context.Context, arn string) *TargetClient { sys.RLock() defer sys.RUnlock() return sys.arnRemotesMap[arn] @@ -266,7 +271,7 @@ func (sys *BucketTargetSys) GetRemoteLabelWithArn(ctx context.Context, bucket, a // NewBucketTargetSys - creates new replication system. func NewBucketTargetSys() *BucketTargetSys { return &BucketTargetSys{ - arnRemotesMap: make(map[string]*miniogo.Core), + arnRemotesMap: make(map[string]*TargetClient), targetsMap: make(map[string][]madmin.BucketTarget), } } @@ -347,20 +352,33 @@ var getRemoteTargetInstanceTransport http.RoundTripper var getRemoteTargetInstanceTransportOnce sync.Once // Returns a minio-go Client configured to access remote host described in replication target config. -func (sys *BucketTargetSys) getRemoteTargetClient(tcfg *madmin.BucketTarget) (*miniogo.Core, error) { +func (sys *BucketTargetSys) getRemoteTargetClient(tcfg *madmin.BucketTarget) (*TargetClient, error) { config := tcfg.Credentials creds := credentials.NewStaticV4(config.AccessKey, config.SecretKey, "") getRemoteTargetInstanceTransportOnce.Do(func() { getRemoteTargetInstanceTransport = newGatewayHTTPTransport(10 * time.Minute) }) - - core, err := miniogo.NewCore(tcfg.URL().Host, &miniogo.Options{ + api, err := minio.New(tcfg.Endpoint, &miniogo.Options{ Creds: creds, Secure: tcfg.Secure, Transport: getRemoteTargetInstanceTransport, }) - return core, err + if err != nil { + return nil, err + } + hcDuration := tcfg.HealthCheckDuration + if hcDuration < 1 { // require minimum health check duration of 1 sec. + hcDuration = defaultHealthCheckDuration + } + tc := &TargetClient{ + Client: api, + healthCheckDuration: hcDuration, + bucket: tcfg.TargetBucket, + replicateSync: tcfg.ReplicationSync, + } + go tc.healthCheck() + return tc, nil } // getRemoteARN gets existing ARN for an endpoint or generates a new one. @@ -424,3 +442,29 @@ func parseBucketTargetConfig(bucket string, cdata, cmetadata []byte) (*madmin.Bu } return &t, nil } + +// TargetClient is the struct for remote target client. +type TargetClient struct { + *miniogo.Client + up int32 + healthCheckDuration time.Duration + bucket string // remote bucket target + replicateSync bool +} + +func (tc *TargetClient) isOffline() bool { + return atomic.LoadInt32(&tc.up) == 0 +} + +func (tc *TargetClient) healthCheck() { + for { + _, err := tc.BucketExists(GlobalContext, tc.bucket) + if err != nil { + atomic.StoreInt32(&tc.up, 0) + time.Sleep(tc.healthCheckDuration) + continue + } + atomic.StoreInt32(&tc.up, 1) + time.Sleep(tc.healthCheckDuration) + } +} diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 2db07f73e..d8c9c7012 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -18,6 +18,7 @@ package cmd import ( "context" + "errors" "fmt" "io" "net/http" @@ -159,6 +160,14 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true) if err != nil { + if isProxyable(ctx, bucket) && (errors.Is(err, errFileNotFound) || errors.Is(err, errFileVersionNotFound)) { + // proxy to replication target if active-active replication is in place. + reader, proxy := proxyGetToReplicationTarget(ctx, bucket, object, rs, h, opts) + if reader == nil || !proxy { + return nil, toObjectErr(err, bucket, object) + } + return reader, nil + } return nil, toObjectErr(err, bucket, object) } @@ -413,7 +422,15 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { fi, _, _, err := er.getObjectFileInfo(ctx, bucket, object, opts, false) if err != nil { + // proxy HEAD to replication target if active-active replication configured on bucket + if isProxyable(ctx, bucket) && (errors.Is(err, errFileNotFound) || errors.Is(err, errFileVersionNotFound)) { + oi, proxy, err := proxyHeadToReplicationTarget(ctx, bucket, object, opts) + if proxy { + return oi, err + } + } return objInfo, toObjectErr(err, bucket, object) + } objInfo = fi.ToObjectInfo(bucket, object) if objInfo.TransitionStatus == lifecycle.TransitionComplete { diff --git a/cmd/http/headers.go b/cmd/http/headers.go index 156c58fae..0ef3562b5 100644 --- a/cmd/http/headers.go +++ b/cmd/http/headers.go @@ -163,6 +163,8 @@ const ( MinIODeleteReplicationStatus = "X-Minio-Replication-Delete-Status" // Header indicates delete-marker replication status. MinIODeleteMarkerReplicationStatus = "X-Minio-Replication-DeleteMarker-Status" + // Header indicates if its a GET/HEAD proxy request for active-active replication + MinIOSourceProxyRequest = "x-minio-source-proxy-request" ) // Common http query params S3 API diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 2cb310cd0..5709159e7 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -52,6 +52,8 @@ type ObjectOptions struct { VersionPurgeStatus VersionPurgeStatusType // Is only set in DELETE operations for delete marker version to be permanently deleted. TransitionStatus string // status of the transition NoLock bool // indicates to lower layers if the caller is expecting to hold locks. + ProxyRequest bool // only set for GET/HEAD in active-active replication scenario + } // BucketOptions represents bucket options for ObjectLayer bucket operations diff --git a/cmd/object-api-options.go b/cmd/object-api-options.go index a793e331b..9930b58db 100644 --- a/cmd/object-api-options.go +++ b/cmd/object-api-options.go @@ -66,6 +66,7 @@ func getDefaultOpts(header http.Header, copySource bool, metadata map[string]str if crypto.S3.IsRequested(header) || (metadata != nil && crypto.S3.IsEncrypted(metadata)) { opts.ServerSideEncryption = encrypt.NewSSE() } + opts.ProxyRequest = header.Get(xhttp.MinIOSourceProxyRequest) == "true" return } diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 26e15fbf4..8b6491cc5 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -1193,7 +1193,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re if rs := r.Header.Get(xhttp.AmzBucketReplicationStatus); rs != "" { srcInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = rs } - if mustReplicate(ctx, r, dstBucket, dstObject, srcInfo.UserDefined, srcInfo.ReplicationStatus.String()) { + if ok, _ := mustReplicate(ctx, r, dstBucket, dstObject, srcInfo.UserDefined, srcInfo.ReplicationStatus.String()); ok { srcInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } // Store the preserved compression metadata. @@ -1273,8 +1273,8 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re objInfo.ETag = getDecryptedETag(r.Header, objInfo, false) response := generateCopyObjectResponse(objInfo.ETag, objInfo.ModTime) encodedSuccessResponse := encodeResponse(response) - if mustReplicate(ctx, r, dstBucket, dstObject, objInfo.UserDefined, objInfo.ReplicationStatus.String()) { - globalReplicationState.queueReplicaTask(objInfo) + if replicate, sync := mustReplicate(ctx, r, dstBucket, dstObject, objInfo.UserDefined, objInfo.ReplicationStatus.String()); replicate { + scheduleReplication(ctx, objInfo, objectAPI, sync) } setPutObjHeaders(w, objInfo, false) @@ -1519,7 +1519,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r)) return } - if mustReplicate(ctx, r, bucket, object, metadata, "") { + if ok, _ := mustReplicate(ctx, r, bucket, object, metadata, ""); ok { metadata[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() { @@ -1587,8 +1587,8 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req objInfo.ETag = objInfo.ETag + "-1" } } - if mustReplicate(ctx, r, bucket, object, metadata, "") { - globalReplicationState.queueReplicaTask(objInfo) + if replicate, sync := mustReplicate(ctx, r, bucket, object, metadata, ""); replicate { + scheduleReplication(ctx, objInfo, objectAPI, sync) } setPutObjHeaders(w, objInfo, false) @@ -1704,7 +1704,7 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r)) return } - if mustReplicate(ctx, r, bucket, object, metadata, "") { + if ok, _ := mustReplicate(ctx, r, bucket, object, metadata, ""); ok { metadata[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } // We need to preserve the encryption headers set in EncryptRequest, @@ -2666,10 +2666,9 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite } setPutObjHeaders(w, objInfo, false) - if mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String()) { - globalReplicationState.queueReplicaTask(objInfo) + if replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String()); replicate { + scheduleReplication(ctx, objInfo, objectAPI, sync) } - // Write success response. writeSuccessResponseXML(w, encodedSuccessResponse) @@ -2747,7 +2746,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. VersionID: opts.VersionID, }) } - _, replicateDel := checkReplicateDelete(ctx, bucket, ObjectToDelete{ObjectName: object, VersionID: opts.VersionID}, goi, gerr) + _, replicateDel, replicateSync := checkReplicateDelete(ctx, bucket, ObjectToDelete{ObjectName: object, VersionID: opts.VersionID}, goi, gerr) if replicateDel { if opts.VersionID != "" { opts.VersionPurgeStatus = Pending @@ -2808,7 +2807,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. } else { versionID = objInfo.VersionID } - globalReplicationState.queueReplicaDeleteTask(DeletedObjectVersionInfo{ + dobj := DeletedObjectVersionInfo{ DeletedObject: DeletedObject{ ObjectName: object, VersionID: versionID, @@ -2819,7 +2818,8 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. VersionPurgeStatus: objInfo.VersionPurgeStatus, }, Bucket: bucket, - }) + } + scheduleReplicationDelete(ctx, dobj, objectAPI, replicateSync) } if goi.TransitionStatus == lifecycle.TransitionComplete { // clean up transitioned tier @@ -2907,7 +2907,7 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r if objInfo.UserTags != "" { objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags } - replicate := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, "") + replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, "") if replicate { objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } @@ -2923,7 +2923,7 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r return } if replicate { - globalReplicationState.queueReplicaTask(objInfo) + scheduleReplication(ctx, objInfo, objectAPI, sync) } writeSuccessResponseHeadersOnly(w) // Notify object event. @@ -3080,7 +3080,7 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r if objInfo.UserTags != "" { objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags } - replicate := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, "") + replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, "") if replicate { objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } @@ -3095,7 +3095,7 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r return } if replicate { - globalReplicationState.queueReplicaTask(objInfo) + scheduleReplication(ctx, objInfo, objectAPI, sync) } writeSuccessNoContent(w) @@ -3262,7 +3262,7 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h return } - replicate := mustReplicate(ctx, r, bucket, object, map[string]string{xhttp.AmzObjectTagging: tags.String()}, "") + replicate, sync := mustReplicate(ctx, r, bucket, object, map[string]string{xhttp.AmzObjectTagging: tags.String()}, "") if replicate { opts.UserDefined = make(map[string]string) opts.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() @@ -3277,7 +3277,7 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h if replicate { if objInfo, err := objAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil { - globalReplicationState.queueReplicaTask(objInfo) + scheduleReplication(ctx, objInfo, objAPI, sync) } } @@ -3327,7 +3327,7 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } - replicate := mustReplicate(ctx, r, bucket, object, map[string]string{xhttp.AmzObjectTagging: oi.UserTags}, "") + replicate, sync := mustReplicate(ctx, r, bucket, object, map[string]string{xhttp.AmzObjectTagging: oi.UserTags}, "") if replicate { opts.UserDefined = make(map[string]string) opts.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() @@ -3343,7 +3343,7 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r } if replicate { - globalReplicationState.queueReplicaTask(oi) + scheduleReplication(ctx, oi, objAPI, sync) } writeSuccessNoContent(w) diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index d9d7cd635..901f5ea43 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -709,7 +709,10 @@ func (web *webAPIHandlers) RemoveObject(r *http.Request, args *RemoveObjectArgs, Versioned: globalBucketVersioningSys.Enabled(args.BucketName), VersionSuspended: globalBucketVersioningSys.Suspended(args.BucketName), } - var err error + var ( + err error + replicateSync bool + ) next: for _, objectName := range args.Objects { // If not a directory, remove the object. @@ -751,7 +754,7 @@ next: } if hasReplicationRules(ctx, args.BucketName, []ObjectToDelete{{ObjectName: objectName}}) || hasLifecycleConfig { goi, gerr = getObjectInfoFn(ctx, args.BucketName, objectName, opts) - if _, replicateDel = checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ObjectName: objectName, VersionID: goi.VersionID}, goi, gerr); replicateDel { + if _, replicateDel, replicateSync = checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ObjectName: objectName, VersionID: goi.VersionID}, goi, gerr); replicateDel { opts.DeleteMarkerReplicationStatus = string(replication.Pending) opts.DeleteMarker = true } @@ -759,7 +762,7 @@ next: oi, err := deleteObject(ctx, objectAPI, web.CacheAPI(), args.BucketName, objectName, nil, r, opts) if replicateDel && err == nil { - globalReplicationState.queueReplicaDeleteTask(DeletedObjectVersionInfo{ + dobj := DeletedObjectVersionInfo{ DeletedObject: DeletedObject{ ObjectName: objectName, DeleteMarkerVersionID: oi.VersionID, @@ -769,7 +772,8 @@ next: VersionPurgeStatus: oi.VersionPurgeStatus, }, Bucket: args.BucketName, - }) + } + scheduleReplicationDelete(ctx, dobj, objectAPI, replicateSync) } if goi.TransitionStatus == lifecycle.TransitionComplete && err == nil && goi.VersionID == "" { action := lifecycle.DeleteAction @@ -855,7 +859,7 @@ next: } } } - _, replicateDel := checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ObjectName: obj.Name, VersionID: obj.VersionID}, obj, nil) + _, replicateDel, _ := checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ObjectName: obj.Name, VersionID: obj.VersionID}, obj, nil) // since versioned delete is not available on web browser, yet - this is a simple DeleteMarker replication objToDel := ObjectToDelete{ObjectName: obj.Name} if replicateDel { @@ -904,10 +908,11 @@ next: Host: handlers.GetSourceIP(r), }) if dobj.DeleteMarkerReplicationStatus == string(replication.Pending) || dobj.VersionPurgeStatus == Pending { - globalReplicationState.queueReplicaDeleteTask(DeletedObjectVersionInfo{ + dv := DeletedObjectVersionInfo{ DeletedObject: dobj, Bucket: args.BucketName, - }) + } + scheduleReplicationDelete(ctx, dv, objectAPI, replicateSync) } } } @@ -1228,7 +1233,7 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { } } - mustReplicate := mustReplicateWeb(ctx, r, bucket, object, metadata, "", replPerms) + mustReplicate, sync := mustReplicateWeb(ctx, r, bucket, object, metadata, "", replPerms) if mustReplicate { metadata[xhttp.AmzBucketReplicationStatus] = string(replication.Pending) } @@ -1298,7 +1303,7 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { } } if mustReplicate { - globalReplicationState.queueReplicaTask(objInfo) + scheduleReplication(ctx, objInfo, objectAPI, sync) } // Notify object created event. diff --git a/docs/bucket/replication/README.md b/docs/bucket/replication/README.md index 72edffed3..2b59ce48a 100644 --- a/docs/bucket/replication/README.md +++ b/docs/bucket/replication/README.md @@ -130,6 +130,7 @@ It is recommended that replication be run in a system with atleast two CPU's ava ![head](https://raw.githubusercontent.com/minio/minio/master/docs/bucket/replication/HEAD_bucket_replication.png) ## MinIO Extension +### Replicating Deletes Delete marker replication is allowed in [AWS V1 Configuration](https://aws.amazon.com/blogs/storage/managing-delete-marker-replication-in-amazon-s3/) but not in V2 configuration. The MinIO implementation above is based on V2 configuration, however it has been extended to allow both DeleteMarker replication and replication of versioned deletes with the `DeleteMarkerReplication` and `DeleteReplication` fields in the replication configuration above. By default, this is set to `Disabled` unless the user specifies it while adding a replication rule. When an object is deleted from the source bucket, the corresponding replica version will be marked deleted if delete marker replication is enabled in the replication configuration. Replication of deletes that specify a version id (a.k.a hard deletes) can be enabled by setting the `DeleteReplication` status to enabled in the replication configuration. This is a MinIO specific extension that can be enabled using the `mc replicate add` or `mc replicate edit` command with the --replicate "delete" flag. @@ -154,6 +155,13 @@ The status of replication can be monitored by configuring event notifications on On the target bucket, `s3:PutObject` event shows `X-Amz-Replication-Status` status of `REPLICA` in the metadata. Additional metrics to monitor backlog state for the purpose of bandwidth management and resource allocation are an upcoming feature. +### Sync/Async Replication +By default, replication is completed asynchronously. If synchronous replication is desired, set the --sync flag while adding a +remote replication target using the `mc admin bucket remote add` command +``` + mc admin bucket remote add myminio/srcbucket https://accessKey:secretKey@replica-endpoint:9000/destbucket --service replication --region us-east-1 --sync --healthcheck-seconds 100s +``` + ## Explore Further - [MinIO Bucket Versioning Implementation](https://docs.minio.io/docs/minio-bucket-versioning-guide.html) - [MinIO Client Quickstart Guide](https://docs.minio.io/docs/minio-client-quickstart-guide.html) diff --git a/go.mod b/go.mod index 54af0748c..d121dd536 100644 --- a/go.mod +++ b/go.mod @@ -46,7 +46,7 @@ require ( github.com/miekg/dns v1.1.35 github.com/minio/cli v1.22.0 github.com/minio/highwayhash v1.0.0 - github.com/minio/minio-go/v7 v7.0.7-0.20201217170524-3baf9ea06f7c + github.com/minio/minio-go/v7 v7.0.7-0.20210105224719-8dddba43079f github.com/minio/selfupdate v0.3.1 github.com/minio/sha256-simd v0.1.1 github.com/minio/simdjson-go v0.1.5 diff --git a/go.sum b/go.sum index 4296eeb1d..512da7bf9 100644 --- a/go.sum +++ b/go.sum @@ -173,7 +173,6 @@ github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903 h1:LbsanbbD6LieFkXbj9YNNBupiGHJgFeLpO0j0Fza1h8= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -182,7 +181,6 @@ github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb github.com/golang/protobuf v1.1.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= @@ -198,12 +196,10 @@ github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gomodule/redigo v1.8.3 h1:HR0kYDX2RJZvAup8CsiJwxB4dTCSC0AaUq6S4SiLwUc= github.com/gomodule/redigo v1.8.3/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= -github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= -github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= @@ -231,7 +227,6 @@ github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= -github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c h1:Lh2aW+HnU2Nbe1gqD9SOJLJxW1jBMmQOktN2acDyJk8= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= @@ -336,10 +331,8 @@ github.com/klauspost/compress v1.10.1/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYs github.com/klauspost/compress v1.11.0/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.3 h1:dB4Bn0tN3wdCzQxnS8r06kV74qN/TAfaIS0bVE8h3jc= github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/cpuid v1.2.2 h1:1xAgYebNnsb9LKCdLOvFWtAxGU/33mjJtyOVbmUa0Us= github.com/klauspost/cpuid v1.2.2/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= -github.com/klauspost/cpuid v1.2.4 h1:EBfaK0SWSwk+fgk6efYFWdzl8MwRWoOO1gkmiaTXPW4= github.com/klauspost/cpuid v1.2.4/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid v1.3.1 h1:5JNjFYYQrZeKRJ0734q51WCEEn2huer72Dc7K+R/b6s= github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4= @@ -349,11 +342,9 @@ github.com/klauspost/readahead v1.3.1 h1:QqXNYvm+VvqYcbrRT4LojUciM0XrznFRIDrbHiJ github.com/klauspost/readahead v1.3.1/go.mod h1:AH9juHzNH7xqdqFHrMRSHeH2Ps+vFf+kblDqzPFiLJg= github.com/klauspost/reedsolomon v1.9.9 h1:qCL7LZlv17xMixl55nq2/Oa1Y86nfO8EqDfv2GHND54= github.com/klauspost/reedsolomon v1.9.9/go.mod h1:O7yFFHiQwDR6b2t63KPUpccPtNdp5ADgh1gg4fd12wo= -github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -379,7 +370,6 @@ github.com/mattn/go-ieproxy v0.0.1 h1:qiyop7gCflfhwCzGyeT0gro3sF9AIg9HU98JORTkqf github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= -github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= github.com/mattn/go-isatty v0.0.11 h1:FxPOTFNqGkuDUGi3H/qkUbQO4ZiBa2brKq5r0l8TGeM= @@ -403,6 +393,8 @@ github.com/minio/md5-simd v1.1.0 h1:QPfiOqlZH+Cj9teu0t9b1nTBfPbyTl16Of5MeuShdK4= github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw= github.com/minio/minio-go/v7 v7.0.7-0.20201217170524-3baf9ea06f7c h1:NgTbI1w/B+2Jcl+YKTULAAXqkwWqMZbkzmVdWNwzKnA= github.com/minio/minio-go/v7 v7.0.7-0.20201217170524-3baf9ea06f7c/go.mod h1:pEZBUa+L2m9oECoIA6IcSK8bv/qggtQVLovjeKK5jYc= +github.com/minio/minio-go/v7 v7.0.7-0.20210105224719-8dddba43079f h1:XMEV9mP1TMX/lPvhnEH5vAr4AKfF+A9vycTninVcgOA= +github.com/minio/minio-go/v7 v7.0.7-0.20210105224719-8dddba43079f/go.mod h1:pEZBUa+L2m9oECoIA6IcSK8bv/qggtQVLovjeKK5jYc= github.com/minio/selfupdate v0.3.1 h1:BWEFSNnrZVMUWXbXIgLDNDjbejkmpAmZvy/nCz1HlEs= github.com/minio/selfupdate v0.3.1/go.mod h1:b8ThJzzH7u2MkF6PcIra7KaXO9Khf6alWPvMSyTDCFM= github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU= diff --git a/pkg/madmin/remote-target-commands.go b/pkg/madmin/remote-target-commands.go index 53daf02d3..60792aaf3 100644 --- a/pkg/madmin/remote-target-commands.go +++ b/pkg/madmin/remote-target-commands.go @@ -25,6 +25,7 @@ import ( "net/http" "net/url" "strings" + "time" "github.com/minio/minio/pkg/auth" ) @@ -86,34 +87,39 @@ func ParseARN(s string) (*ARN, error) { // BucketTarget represents the target bucket and site association. type BucketTarget struct { - SourceBucket string `json:"sourcebucket"` - Endpoint string `json:"endpoint"` - Credentials *auth.Credentials `json:"credentials"` - TargetBucket string `json:"targetbucket"` - Secure bool `json:"secure"` - Path string `json:"path,omitempty"` - API string `json:"api,omitempty"` - Arn string `json:"arn,omitempty"` - Type ServiceType `json:"type"` - Region string `json:"omitempty"` - Label string `json:"label,omitempty"` - BandwidthLimit int64 `json:"bandwidthlimit,omitempty"` + SourceBucket string `json:"sourcebucket"` + Endpoint string `json:"endpoint"` + Credentials *auth.Credentials `json:"credentials"` + TargetBucket string `json:"targetbucket"` + Secure bool `json:"secure"` + Path string `json:"path,omitempty"` + API string `json:"api,omitempty"` + Arn string `json:"arn,omitempty"` + Type ServiceType `json:"type"` + Region string `json:"omitempty"` + Label string `json:"label,omitempty"` + BandwidthLimit int64 `json:"bandwidthlimit,omitempty"` + ReplicationSync bool `json:"replicationSync"` + HealthCheckDuration time.Duration `json:"healthCheckDuration,omitempty"` } // Clone returns shallow clone of BucketTarget without secret key in credentials func (t *BucketTarget) Clone() BucketTarget { return BucketTarget{ - SourceBucket: t.SourceBucket, - Endpoint: t.Endpoint, - TargetBucket: t.TargetBucket, - Credentials: &auth.Credentials{AccessKey: t.Credentials.AccessKey}, - Secure: t.Secure, - Path: t.Path, - API: t.Path, - Arn: t.Arn, - Type: t.Type, - Region: t.Region, - Label: t.Label, + SourceBucket: t.SourceBucket, + Endpoint: t.Endpoint, + TargetBucket: t.TargetBucket, + Credentials: &auth.Credentials{AccessKey: t.Credentials.AccessKey}, + Secure: t.Secure, + Path: t.Path, + API: t.Path, + Arn: t.Arn, + Type: t.Type, + Region: t.Region, + Label: t.Label, + BandwidthLimit: t.BandwidthLimit, + ReplicationSync: t.ReplicationSync, + HealthCheckDuration: t.HealthCheckDuration, } }