fix: replication regression due to proxying requests (#11356)

In PR #11165 due to incorrect proxying for 2 
way replication even when the object was not 
yet replicated

Additionally, fix metadata comparisons when
deciding to do full replication vs metadata copy.

fixes #11340
master
Poorna Krishnamoorthy 4 years ago committed by GitHub
parent e019f21bda
commit fd3f02637a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      cmd/admin-bucket-handlers.go
  2. 86
      cmd/bucket-replication.go
  3. 6
      cmd/bucket-targets.go
  4. 2
      cmd/http/headers.go
  5. 1
      cmd/object-api-interface.go
  6. 5
      cmd/object-api-options.go
  7. 2
      go.mod
  8. 2
      go.sum

@ -156,7 +156,6 @@ func (a adminAPIHandlers) SetRemoteTargetHandler(w http.ResponseWriter, r *http.
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrAdminConfigBadJSON, err), r.URL)
return
}
sameTarget, _ := isLocalHost(target.URL().Hostname(), target.URL().Port(), globalMinioPort)
if sameTarget && bucket == target.TargetBucket {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrBucketRemoteIdenticalToSource), r.URL)

@ -129,19 +129,29 @@ func mustReplicater(ctx context.Context, bucket, object string, meta map[string]
opts.UserTags = tagStr
}
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, cfg.RoleArn)
if tgt == nil || tgt.isOffline() {
return cfg.Replicate(opts), false
}
// the target online status should not be used here while deciding
// whether to replicate as the target could be temporarily down
if tgt != nil {
return cfg.Replicate(opts), tgt.replicateSync
}
return cfg.Replicate(opts), false
}
// Standard headers that needs to be extracted from User metadata.
var standardHeaders = []string{
"content-type",
"content-encoding",
xhttp.ContentType,
xhttp.CacheControl,
xhttp.ContentEncoding,
xhttp.ContentLanguage,
xhttp.ContentDisposition,
xhttp.AmzStorageClass,
xhttp.AmzObjectTagging,
xhttp.AmzBucketReplicationStatus,
xhttp.AmzObjectLockMode,
xhttp.AmzObjectLockRetainUntilDate,
xhttp.AmzObjectLockLegalHold,
xhttp.AmzTagCount,
xhttp.AmzServerSideEncryption,
}
// returns true if any of the objects being deleted qualifies for replication.
@ -189,7 +199,9 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet
return oi.DeleteMarker, false, sync
}
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn)
if tgt == nil || tgt.isOffline() {
// the target online status should not be used here while deciding
// whether to replicate deletes as the target could be temporarily down
if tgt == nil {
return oi.DeleteMarker, false, false
}
opts := replication.ObjectOpts{
@ -216,10 +228,12 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectA
bucket := dobj.Bucket
rcfg, err := getReplicationConfig(ctx, bucket)
if err != nil || rcfg == nil {
logger.LogIf(ctx, err)
return
}
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn)
if tgt == nil {
logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, rcfg.RoleArn))
return
}
versionID := dobj.DeleteMarkerVersionID
@ -349,6 +363,15 @@ func putReplicationOpts(ctx context.Context, dest replication.Destination, objIn
SourceETag: objInfo.ETag,
},
}
if lang, ok := objInfo.UserDefined[xhttp.ContentLanguage]; ok {
putOpts.ContentLanguage = lang
}
if disp, ok := objInfo.UserDefined[xhttp.ContentDisposition]; ok {
putOpts.ContentDisposition = disp
}
if cc, ok := objInfo.UserDefined[xhttp.CacheControl]; ok {
putOpts.CacheControl = cc
}
if mode, ok := objInfo.UserDefined[xhttp.AmzObjectLockMode]; ok {
rmode := miniogo.RetentionMode(mode)
putOpts.Mode = rmode
@ -384,29 +407,40 @@ func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationActio
if oi1.ETag != oi2.ETag ||
oi1.VersionID != oi2.VersionID ||
oi1.Size != oi2.Size ||
oi1.DeleteMarker != oi2.IsDeleteMarker {
oi1.DeleteMarker != oi2.IsDeleteMarker ||
!oi1.ModTime.Equal(oi2.LastModified) {
return replicateAll
}
if !oi1.ModTime.Equal(oi2.LastModified) ||
oi1.ContentType != oi2.ContentType ||
oi1.StorageClass != oi2.StorageClass {
if oi1.ContentType != oi2.ContentType {
return replicateMetadata
}
if oi1.ContentEncoding != "" {
enc, ok := oi2.UserMetadata[xhttp.ContentEncoding]
if !ok || enc != oi1.ContentEncoding {
enc, ok := oi2.Metadata[xhttp.ContentEncoding]
if !ok || strings.Join(enc, "") != oi1.ContentEncoding {
return replicateMetadata
}
}
for k, v := range oi2.UserMetadata {
oi2.Metadata[k] = []string{v}
// compare metadata on both maps to see if meta is identical
for k1, v1 := range oi1.UserDefined {
if v2, ok := oi2.UserMetadata[k1]; ok && v1 == v2 {
continue
}
if v2, ok := oi2.Metadata[k1]; ok && v1 == strings.Join(v2, "") {
continue
}
if len(oi2.Metadata) != len(oi1.UserDefined) {
return replicateMetadata
}
for k1, v1 := range oi1.UserDefined {
if v2, ok := oi2.Metadata[k1]; !ok || v1 != strings.Join(v2, "") {
for k1, v1 := range oi2.UserMetadata {
if v2, ok := oi1.UserDefined[k1]; !ok || v1 != v2 {
return replicateMetadata
}
}
for k1, v1slc := range oi2.Metadata {
v1 := strings.Join(v1slc, "")
if k1 == xhttp.ContentEncoding { //already compared
continue
}
if v2, ok := oi1.UserDefined[k1]; !ok || v1 != v2 {
return replicateMetadata
}
}
@ -455,7 +489,11 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
}
rtype := replicateAll
oi, err := tgt.StatObject(ctx, dest.Bucket, object, miniogo.StatObjectOptions{VersionID: objInfo.VersionID})
oi, err := tgt.StatObject(ctx, dest.Bucket, object, miniogo.StatObjectOptions{
VersionID: objInfo.VersionID,
Internal: miniogo.AdvancedGetOptions{
ReplicationProxyRequest: "false",
}})
if err == nil {
rtype = getReplicationAction(objInfo, oi)
if rtype == replicateNone {
@ -468,7 +506,6 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
replicationStatus := replication.Completed
if rtype != replicateAll {
gr.Close()
// replicate metadata for object tagging/copy with metadata replacement
dstOpts := miniogo.PutObjectOptions{Internal: miniogo.AdvancedPutOptions{SourceVersionID: objInfo.VersionID}}
c := &miniogo.Core{Client: tgt.Client}
@ -668,7 +705,7 @@ func proxyGetToReplicationTarget(ctx context.Context, bucket, object string, rs
VersionID: opts.VersionID,
ServerSideEncryption: opts.ServerSideEncryption,
Internal: miniogo.AdvancedGetOptions{
ReplicationProxyRequest: true,
ReplicationProxyRequest: "true",
},
}
// get correct offsets for encrypted object
@ -701,10 +738,11 @@ func isProxyable(ctx context.Context, bucket string) bool {
dest := cfg.GetDestination()
return dest.Bucket == bucket
}
func proxyHeadToRepTarget(ctx context.Context, bucket, object string, opts ObjectOptions) (tgt *TargetClient, oi ObjectInfo, proxy bool, err error) {
// this option is set when active-active replication is in place between site A -> B,
// and site B does not have the object yet.
if opts.ProxyRequest { // true only when site B sets MinIOSourceProxyRequest header
if opts.ProxyRequest || (opts.ProxyHeaderSet && !opts.ProxyRequest) { // true only when site B sets MinIOSourceProxyRequest header
return nil, oi, false, nil
}
cfg, err := getReplicationConfig(ctx, bucket)
@ -728,14 +766,14 @@ func proxyHeadToRepTarget(ctx context.Context, bucket, object string, opts Objec
}
tgt = globalBucketTargetSys.GetRemoteTargetClient(ctx, cfg.RoleArn)
if tgt == nil || tgt.isOffline() {
return nil, oi, false, fmt.Errorf("missing target")
return nil, oi, false, fmt.Errorf("target is offline or not configured")
}
gopts := miniogo.GetObjectOptions{
VersionID: opts.VersionID,
ServerSideEncryption: opts.ServerSideEncryption,
Internal: miniogo.AdvancedGetOptions{
ReplicationProxyRequest: true,
ReplicationProxyRequest: "true",
},
}

@ -36,7 +36,7 @@ import (
)
const (
defaultHealthCheckDuration = 60 * time.Second
defaultHealthCheckDuration = 100 * time.Second
)
// BucketTargetSys represents bucket targets subsystem
@ -461,10 +461,10 @@ func (tc *TargetClient) healthCheck() {
_, err := tc.BucketExists(GlobalContext, tc.bucket)
if err != nil {
atomic.StoreInt32(&tc.up, 0)
time.Sleep(tc.healthCheckDuration * time.Second)
time.Sleep(tc.healthCheckDuration)
continue
}
atomic.StoreInt32(&tc.up, 1)
time.Sleep(tc.healthCheckDuration * time.Second)
time.Sleep(tc.healthCheckDuration)
}
}

@ -164,7 +164,7 @@ const (
// Header indicates delete-marker replication status.
MinIODeleteMarkerReplicationStatus = "X-Minio-Replication-DeleteMarker-Status"
// Header indicates if its a GET/HEAD proxy request for active-active replication
MinIOSourceProxyRequest = "x-minio-source-proxy-request"
MinIOSourceProxyRequest = "X-Minio-Source-Proxy-Request"
)
// Common http query params S3 API

@ -53,6 +53,7 @@ type ObjectOptions struct {
TransitionStatus string // status of the transition
NoLock bool // indicates to lower layers if the caller is expecting to hold locks.
ProxyRequest bool // only set for GET/HEAD in active-active replication scenario
ProxyHeaderSet bool // only set for GET/HEAD in active-active replication scenario
ParentIsObject func(ctx context.Context, bucket, parent string) bool // Used to verify if parent is an object.
}

@ -66,7 +66,10 @@ func getDefaultOpts(header http.Header, copySource bool, metadata map[string]str
if crypto.S3.IsRequested(header) || (metadata != nil && crypto.S3.IsEncrypted(metadata)) {
opts.ServerSideEncryption = encrypt.NewSSE()
}
opts.ProxyRequest = header.Get(xhttp.MinIOSourceProxyRequest) == "true"
if v, ok := header[xhttp.MinIOSourceProxyRequest]; ok {
opts.ProxyHeaderSet = true
opts.ProxyRequest = strings.Join(v, "") == "true"
}
return
}

@ -46,7 +46,7 @@ require (
github.com/miekg/dns v1.1.35
github.com/minio/cli v1.22.0
github.com/minio/highwayhash v1.0.0
github.com/minio/minio-go/v7 v7.0.7-0.20210105224719-8dddba43079f
github.com/minio/minio-go/v7 v7.0.8-0.20210127003153-c40722862654
github.com/minio/selfupdate v0.3.1
github.com/minio/sha256-simd v0.1.1
github.com/minio/simdjson-go v0.2.0

@ -393,6 +393,8 @@ github.com/minio/md5-simd v1.1.0 h1:QPfiOqlZH+Cj9teu0t9b1nTBfPbyTl16Of5MeuShdK4=
github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=
github.com/minio/minio-go/v7 v7.0.7-0.20210105224719-8dddba43079f h1:XMEV9mP1TMX/lPvhnEH5vAr4AKfF+A9vycTninVcgOA=
github.com/minio/minio-go/v7 v7.0.7-0.20210105224719-8dddba43079f/go.mod h1:pEZBUa+L2m9oECoIA6IcSK8bv/qggtQVLovjeKK5jYc=
github.com/minio/minio-go/v7 v7.0.8-0.20210127003153-c40722862654 h1:pRHAWZsfFGyqG58dSB8S4vlDeR1r1godusC3NHVquns=
github.com/minio/minio-go/v7 v7.0.8-0.20210127003153-c40722862654/go.mod h1:pEZBUa+L2m9oECoIA6IcSK8bv/qggtQVLovjeKK5jYc=
github.com/minio/selfupdate v0.3.1 h1:BWEFSNnrZVMUWXbXIgLDNDjbejkmpAmZvy/nCz1HlEs=
github.com/minio/selfupdate v0.3.1/go.mod h1:b8ThJzzH7u2MkF6PcIra7KaXO9Khf6alWPvMSyTDCFM=
github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU=

Loading…
Cancel
Save