From fd3f02637acf918a2db2bebede7f8bc99c1b9201 Mon Sep 17 00:00:00 2001 From: Poorna Krishnamoorthy Date: Wed, 27 Jan 2021 11:22:34 -0800 Subject: [PATCH] fix: replication regression due to proxying requests (#11356) In PR #11165 due to incorrect proxying for 2 way replication even when the object was not yet replicated Additionally, fix metadata comparisons when deciding to do full replication vs metadata copy. fixes #11340 --- cmd/admin-bucket-handlers.go | 1 - cmd/bucket-replication.go | 88 ++++++++++++++++++++++++++---------- cmd/bucket-targets.go | 6 +-- cmd/http/headers.go | 2 +- cmd/object-api-interface.go | 1 + cmd/object-api-options.go | 5 +- go.mod | 4 +- go.sum | 2 + 8 files changed, 76 insertions(+), 33 deletions(-) diff --git a/cmd/admin-bucket-handlers.go b/cmd/admin-bucket-handlers.go index 5b431aca7..e69867155 100644 --- a/cmd/admin-bucket-handlers.go +++ b/cmd/admin-bucket-handlers.go @@ -156,7 +156,6 @@ func (a adminAPIHandlers) SetRemoteTargetHandler(w http.ResponseWriter, r *http. writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrAdminConfigBadJSON, err), r.URL) return } - sameTarget, _ := isLocalHost(target.URL().Hostname(), target.URL().Port(), globalMinioPort) if sameTarget && bucket == target.TargetBucket { writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrBucketRemoteIdenticalToSource), r.URL) diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index ad5a2143d..7125f7d7b 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -129,19 +129,29 @@ func mustReplicater(ctx context.Context, bucket, object string, meta map[string] opts.UserTags = tagStr } tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, cfg.RoleArn) - if tgt == nil || tgt.isOffline() { - return cfg.Replicate(opts), false + // the target online status should not be used here while deciding + // whether to replicate as the target could be temporarily down + if tgt != nil { + return cfg.Replicate(opts), tgt.replicateSync } - return cfg.Replicate(opts), tgt.replicateSync + return cfg.Replicate(opts), false } // Standard headers that needs to be extracted from User metadata. var standardHeaders = []string{ - "content-type", - "content-encoding", + xhttp.ContentType, + xhttp.CacheControl, + xhttp.ContentEncoding, + xhttp.ContentLanguage, + xhttp.ContentDisposition, xhttp.AmzStorageClass, xhttp.AmzObjectTagging, xhttp.AmzBucketReplicationStatus, + xhttp.AmzObjectLockMode, + xhttp.AmzObjectLockRetainUntilDate, + xhttp.AmzObjectLockLegalHold, + xhttp.AmzTagCount, + xhttp.AmzServerSideEncryption, } // returns true if any of the objects being deleted qualifies for replication. @@ -189,7 +199,9 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet return oi.DeleteMarker, false, sync } tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn) - if tgt == nil || tgt.isOffline() { + // the target online status should not be used here while deciding + // whether to replicate deletes as the target could be temporarily down + if tgt == nil { return oi.DeleteMarker, false, false } opts := replication.ObjectOpts{ @@ -216,10 +228,12 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectA bucket := dobj.Bucket rcfg, err := getReplicationConfig(ctx, bucket) if err != nil || rcfg == nil { + logger.LogIf(ctx, err) return } tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn) if tgt == nil { + logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, rcfg.RoleArn)) return } versionID := dobj.DeleteMarkerVersionID @@ -349,6 +363,15 @@ func putReplicationOpts(ctx context.Context, dest replication.Destination, objIn SourceETag: objInfo.ETag, }, } + if lang, ok := objInfo.UserDefined[xhttp.ContentLanguage]; ok { + putOpts.ContentLanguage = lang + } + if disp, ok := objInfo.UserDefined[xhttp.ContentDisposition]; ok { + putOpts.ContentDisposition = disp + } + if cc, ok := objInfo.UserDefined[xhttp.CacheControl]; ok { + putOpts.CacheControl = cc + } if mode, ok := objInfo.UserDefined[xhttp.AmzObjectLockMode]; ok { rmode := miniogo.RetentionMode(mode) putOpts.Mode = rmode @@ -384,29 +407,40 @@ func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationActio if oi1.ETag != oi2.ETag || oi1.VersionID != oi2.VersionID || oi1.Size != oi2.Size || - oi1.DeleteMarker != oi2.IsDeleteMarker { + oi1.DeleteMarker != oi2.IsDeleteMarker || + !oi1.ModTime.Equal(oi2.LastModified) { return replicateAll } - - if !oi1.ModTime.Equal(oi2.LastModified) || - oi1.ContentType != oi2.ContentType || - oi1.StorageClass != oi2.StorageClass { + if oi1.ContentType != oi2.ContentType { return replicateMetadata } if oi1.ContentEncoding != "" { - enc, ok := oi2.UserMetadata[xhttp.ContentEncoding] - if !ok || enc != oi1.ContentEncoding { + enc, ok := oi2.Metadata[xhttp.ContentEncoding] + if !ok || strings.Join(enc, "") != oi1.ContentEncoding { return replicateMetadata } } - for k, v := range oi2.UserMetadata { - oi2.Metadata[k] = []string{v} - } - if len(oi2.Metadata) != len(oi1.UserDefined) { + // compare metadata on both maps to see if meta is identical + for k1, v1 := range oi1.UserDefined { + if v2, ok := oi2.UserMetadata[k1]; ok && v1 == v2 { + continue + } + if v2, ok := oi2.Metadata[k1]; ok && v1 == strings.Join(v2, "") { + continue + } return replicateMetadata } - for k1, v1 := range oi1.UserDefined { - if v2, ok := oi2.Metadata[k1]; !ok || v1 != strings.Join(v2, "") { + for k1, v1 := range oi2.UserMetadata { + if v2, ok := oi1.UserDefined[k1]; !ok || v1 != v2 { + return replicateMetadata + } + } + for k1, v1slc := range oi2.Metadata { + v1 := strings.Join(v1slc, "") + if k1 == xhttp.ContentEncoding { //already compared + continue + } + if v2, ok := oi1.UserDefined[k1]; !ok || v1 != v2 { return replicateMetadata } } @@ -455,7 +489,11 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa } rtype := replicateAll - oi, err := tgt.StatObject(ctx, dest.Bucket, object, miniogo.StatObjectOptions{VersionID: objInfo.VersionID}) + oi, err := tgt.StatObject(ctx, dest.Bucket, object, miniogo.StatObjectOptions{ + VersionID: objInfo.VersionID, + Internal: miniogo.AdvancedGetOptions{ + ReplicationProxyRequest: "false", + }}) if err == nil { rtype = getReplicationAction(objInfo, oi) if rtype == replicateNone { @@ -468,7 +506,6 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa replicationStatus := replication.Completed if rtype != replicateAll { gr.Close() - // replicate metadata for object tagging/copy with metadata replacement dstOpts := miniogo.PutObjectOptions{Internal: miniogo.AdvancedPutOptions{SourceVersionID: objInfo.VersionID}} c := &miniogo.Core{Client: tgt.Client} @@ -668,7 +705,7 @@ func proxyGetToReplicationTarget(ctx context.Context, bucket, object string, rs VersionID: opts.VersionID, ServerSideEncryption: opts.ServerSideEncryption, Internal: miniogo.AdvancedGetOptions{ - ReplicationProxyRequest: true, + ReplicationProxyRequest: "true", }, } // get correct offsets for encrypted object @@ -701,10 +738,11 @@ func isProxyable(ctx context.Context, bucket string) bool { dest := cfg.GetDestination() return dest.Bucket == bucket } + func proxyHeadToRepTarget(ctx context.Context, bucket, object string, opts ObjectOptions) (tgt *TargetClient, oi ObjectInfo, proxy bool, err error) { // this option is set when active-active replication is in place between site A -> B, // and site B does not have the object yet. - if opts.ProxyRequest { // true only when site B sets MinIOSourceProxyRequest header + if opts.ProxyRequest || (opts.ProxyHeaderSet && !opts.ProxyRequest) { // true only when site B sets MinIOSourceProxyRequest header return nil, oi, false, nil } cfg, err := getReplicationConfig(ctx, bucket) @@ -728,14 +766,14 @@ func proxyHeadToRepTarget(ctx context.Context, bucket, object string, opts Objec } tgt = globalBucketTargetSys.GetRemoteTargetClient(ctx, cfg.RoleArn) if tgt == nil || tgt.isOffline() { - return nil, oi, false, fmt.Errorf("missing target") + return nil, oi, false, fmt.Errorf("target is offline or not configured") } gopts := miniogo.GetObjectOptions{ VersionID: opts.VersionID, ServerSideEncryption: opts.ServerSideEncryption, Internal: miniogo.AdvancedGetOptions{ - ReplicationProxyRequest: true, + ReplicationProxyRequest: "true", }, } diff --git a/cmd/bucket-targets.go b/cmd/bucket-targets.go index 645864e55..f118e0d65 100644 --- a/cmd/bucket-targets.go +++ b/cmd/bucket-targets.go @@ -36,7 +36,7 @@ import ( ) const ( - defaultHealthCheckDuration = 60 * time.Second + defaultHealthCheckDuration = 100 * time.Second ) // BucketTargetSys represents bucket targets subsystem @@ -461,10 +461,10 @@ func (tc *TargetClient) healthCheck() { _, err := tc.BucketExists(GlobalContext, tc.bucket) if err != nil { atomic.StoreInt32(&tc.up, 0) - time.Sleep(tc.healthCheckDuration * time.Second) + time.Sleep(tc.healthCheckDuration) continue } atomic.StoreInt32(&tc.up, 1) - time.Sleep(tc.healthCheckDuration * time.Second) + time.Sleep(tc.healthCheckDuration) } } diff --git a/cmd/http/headers.go b/cmd/http/headers.go index 0ef3562b5..882f9a397 100644 --- a/cmd/http/headers.go +++ b/cmd/http/headers.go @@ -164,7 +164,7 @@ const ( // Header indicates delete-marker replication status. MinIODeleteMarkerReplicationStatus = "X-Minio-Replication-DeleteMarker-Status" // Header indicates if its a GET/HEAD proxy request for active-active replication - MinIOSourceProxyRequest = "x-minio-source-proxy-request" + MinIOSourceProxyRequest = "X-Minio-Source-Proxy-Request" ) // Common http query params S3 API diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 6a8041f5e..6d59ef239 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -53,6 +53,7 @@ type ObjectOptions struct { TransitionStatus string // status of the transition NoLock bool // indicates to lower layers if the caller is expecting to hold locks. ProxyRequest bool // only set for GET/HEAD in active-active replication scenario + ProxyHeaderSet bool // only set for GET/HEAD in active-active replication scenario ParentIsObject func(ctx context.Context, bucket, parent string) bool // Used to verify if parent is an object. } diff --git a/cmd/object-api-options.go b/cmd/object-api-options.go index 9930b58db..8de687d0d 100644 --- a/cmd/object-api-options.go +++ b/cmd/object-api-options.go @@ -66,7 +66,10 @@ func getDefaultOpts(header http.Header, copySource bool, metadata map[string]str if crypto.S3.IsRequested(header) || (metadata != nil && crypto.S3.IsEncrypted(metadata)) { opts.ServerSideEncryption = encrypt.NewSSE() } - opts.ProxyRequest = header.Get(xhttp.MinIOSourceProxyRequest) == "true" + if v, ok := header[xhttp.MinIOSourceProxyRequest]; ok { + opts.ProxyHeaderSet = true + opts.ProxyRequest = strings.Join(v, "") == "true" + } return } diff --git a/go.mod b/go.mod index 0b3d74257..c4968200f 100644 --- a/go.mod +++ b/go.mod @@ -46,7 +46,7 @@ require ( github.com/miekg/dns v1.1.35 github.com/minio/cli v1.22.0 github.com/minio/highwayhash v1.0.0 - github.com/minio/minio-go/v7 v7.0.7-0.20210105224719-8dddba43079f + github.com/minio/minio-go/v7 v7.0.8-0.20210127003153-c40722862654 github.com/minio/selfupdate v0.3.1 github.com/minio/sha256-simd v0.1.1 github.com/minio/simdjson-go v0.2.0 @@ -90,4 +90,4 @@ require ( gopkg.in/jcmturner/gokrb5.v7 v7.5.0 gopkg.in/ldap.v3 v3.0.3 gopkg.in/yaml.v2 v2.3.0 -) +) \ No newline at end of file diff --git a/go.sum b/go.sum index 841f1b203..4c2c19dfb 100644 --- a/go.sum +++ b/go.sum @@ -393,6 +393,8 @@ github.com/minio/md5-simd v1.1.0 h1:QPfiOqlZH+Cj9teu0t9b1nTBfPbyTl16Of5MeuShdK4= github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw= github.com/minio/minio-go/v7 v7.0.7-0.20210105224719-8dddba43079f h1:XMEV9mP1TMX/lPvhnEH5vAr4AKfF+A9vycTninVcgOA= github.com/minio/minio-go/v7 v7.0.7-0.20210105224719-8dddba43079f/go.mod h1:pEZBUa+L2m9oECoIA6IcSK8bv/qggtQVLovjeKK5jYc= +github.com/minio/minio-go/v7 v7.0.8-0.20210127003153-c40722862654 h1:pRHAWZsfFGyqG58dSB8S4vlDeR1r1godusC3NHVquns= +github.com/minio/minio-go/v7 v7.0.8-0.20210127003153-c40722862654/go.mod h1:pEZBUa+L2m9oECoIA6IcSK8bv/qggtQVLovjeKK5jYc= github.com/minio/selfupdate v0.3.1 h1:BWEFSNnrZVMUWXbXIgLDNDjbejkmpAmZvy/nCz1HlEs= github.com/minio/selfupdate v0.3.1/go.mod h1:b8ThJzzH7u2MkF6PcIra7KaXO9Khf6alWPvMSyTDCFM= github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU=