More fixes for delete marker replication (#11504)

continuation of PR#11491 for multiple server pools and
bi-directional replication.

Moving proxying for GET/HEAD to handler level rather than
server pool layer as this was also causing incorrect proxying 
of HEAD.

Also fixing metadata update on CopyObject - minio-go was not passing
source version ID in X-Amz-Copy-Source header
master
Poorna Krishnamoorthy 4 years ago committed by GitHub
parent 466e95bb59
commit e6b4ea7618
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      cmd/bucket-replication.go
  2. 14
      cmd/erasure-server-pool.go
  3. 2
      cmd/gateway/s3/gateway-s3.go
  4. 35
      cmd/object-handlers.go
  5. 2
      go.mod
  6. 2
      go.sum

@ -184,6 +184,8 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet
if gerr != nil { if gerr != nil {
validReplStatus := false validReplStatus := false
switch oi.ReplicationStatus { switch oi.ReplicationStatus {
// Point to note: even if two way replication with Deletemarker or Delete sync is enabled,
// deletion of a REPLICA object/deletemarker will not trigger a replication to the other cluster.
case replication.Pending, replication.Completed, replication.Failed: case replication.Pending, replication.Completed, replication.Failed:
validReplStatus = true validReplStatus = true
} }
@ -648,9 +650,14 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
replicationStatus := replication.Completed replicationStatus := replication.Completed
if rtype != replicateAll { if rtype != replicateAll {
// replicate metadata for object tagging/copy with metadata replacement // replicate metadata for object tagging/copy with metadata replacement
srcOpts := miniogo.CopySrcOptions{
Bucket: dest.Bucket,
Object: object,
VersionID: objInfo.VersionID}
dstOpts := miniogo.PutObjectOptions{Internal: miniogo.AdvancedPutOptions{SourceVersionID: objInfo.VersionID}} dstOpts := miniogo.PutObjectOptions{Internal: miniogo.AdvancedPutOptions{SourceVersionID: objInfo.VersionID}}
c := &miniogo.Core{Client: tgt.Client} c := &miniogo.Core{Client: tgt.Client}
if _, err = c.CopyObject(ctx, dest.Bucket, object, dest.Bucket, object, getCopyObjMetadata(objInfo, dest), dstOpts); err != nil {
if _, err = c.CopyObject(ctx, dest.Bucket, object, dest.Bucket, object, getCopyObjMetadata(objInfo, dest), srcOpts, dstOpts); err != nil {
replicationStatus = replication.Failed replicationStatus = replication.Failed
logger.LogIf(ctx, fmt.Errorf("Unable to replicate metadata for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err)) logger.LogIf(ctx, fmt.Errorf("Unable to replicate metadata for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
} }
@ -876,6 +883,7 @@ func proxyGetToReplicationTarget(ctx context.Context, bucket, object string, rs
if err != nil { if err != nil {
return nil, false return nil, false
} }
reader.ObjInfo = oi.Clone()
return reader, true return reader, true
} }

@ -521,13 +521,6 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object
} }
return gr, nil return gr, nil
} }
if isProxyable(ctx, bucket) {
// proxy to replication target if active-active replication is in place.
reader, proxy := proxyGetToReplicationTarget(ctx, bucket, object, rs, h, opts)
if reader != nil && proxy {
return reader, nil
}
}
if opts.VersionID != "" { if opts.VersionID != "" {
return gr, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID} return gr, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID}
} }
@ -573,13 +566,6 @@ func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object s
return objInfo, nil return objInfo, nil
} }
object = decodeDirObject(object) object = decodeDirObject(object)
// proxy HEAD to replication target if active-active replication configured on bucket
if isProxyable(ctx, bucket) {
oi, proxy, err := proxyHeadToReplicationTarget(ctx, bucket, object, opts)
if proxy {
return oi, err
}
}
if opts.VersionID != "" { if opts.VersionID != "" {
return objInfo, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID} return objInfo, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID}
} }

@ -515,7 +515,7 @@ func (l *s3Objects) CopyObject(ctx context.Context, srcBucket string, srcObject
srcInfo.UserDefined[k] = v[0] srcInfo.UserDefined[k] = v[0]
} }
if _, err = l.Client.CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo.UserDefined, miniogo.PutObjectOptions{}); err != nil { if _, err = l.Client.CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo.UserDefined, miniogo.CopySrcOptions{}, miniogo.PutObjectOptions{}); err != nil {
return objInfo, minio.ErrorRespToObjectError(err, srcBucket, srcObject) return objInfo, minio.ErrorRespToObjectError(err, srcBucket, srcObject)
} }
return l.GetObjectInfo(ctx, dstBucket, dstObject, dstOpts) return l.GetObjectInfo(ctx, dstBucket, dstObject, dstOpts)

@ -411,10 +411,30 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
gr, err := getObjectNInfo(ctx, bucket, object, rs, r.Header, readLock, opts) gr, err := getObjectNInfo(ctx, bucket, object, rs, r.Header, readLock, opts)
if err != nil { if err != nil {
var (
reader *GetObjectReader
proxy bool
)
if isProxyable(ctx, bucket) {
// proxy to replication target if active-active replication is in place.
reader, proxy = proxyGetToReplicationTarget(ctx, bucket, object, rs, r.Header, opts)
if reader != nil && proxy {
gr = reader
}
}
if reader == nil || !proxy {
if isErrPreconditionFailed(err) { if isErrPreconditionFailed(err) {
return return
} }
if globalBucketVersioningSys.Enabled(bucket) && gr != nil { if globalBucketVersioningSys.Enabled(bucket) && gr != nil {
if !gr.ObjInfo.VersionPurgeStatus.Empty() {
// Shows the replication status of a permanent delete of a version
w.Header()[xhttp.MinIODeleteReplicationStatus] = []string{string(gr.ObjInfo.VersionPurgeStatus)}
}
if !gr.ObjInfo.ReplicationStatus.Empty() && gr.ObjInfo.DeleteMarker {
w.Header()[xhttp.MinIODeleteMarkerReplicationStatus] = []string{string(gr.ObjInfo.ReplicationStatus)}
}
// Versioning enabled quite possibly object is deleted might be delete-marker // Versioning enabled quite possibly object is deleted might be delete-marker
// if present set the headers, no idea why AWS S3 sets these headers. // if present set the headers, no idea why AWS S3 sets these headers.
if gr.ObjInfo.VersionID != "" && gr.ObjInfo.DeleteMarker { if gr.ObjInfo.VersionID != "" && gr.ObjInfo.DeleteMarker {
@ -425,6 +445,7 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return return
} }
}
defer gr.Close() defer gr.Close()
objInfo := gr.ObjInfo objInfo := gr.ObjInfo
@ -577,6 +598,19 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re
objInfo, err := getObjectInfo(ctx, bucket, object, opts) objInfo, err := getObjectInfo(ctx, bucket, object, opts)
if err != nil { if err != nil {
var (
proxy bool
perr error
oi ObjectInfo
)
// proxy HEAD to replication target if active-active replication configured on bucket
if isProxyable(ctx, bucket) {
oi, proxy, perr = proxyHeadToReplicationTarget(ctx, bucket, object, opts)
if proxy && perr == nil {
objInfo = oi
}
}
if !proxy || perr != nil {
if globalBucketVersioningSys.Enabled(bucket) { if globalBucketVersioningSys.Enabled(bucket) {
if !objInfo.VersionPurgeStatus.Empty() { if !objInfo.VersionPurgeStatus.Empty() {
// Shows the replication status of a permanent delete of a version // Shows the replication status of a permanent delete of a version
@ -595,6 +629,7 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err)) writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
return return
} }
}
// Automatically remove the object/version is an expiry lifecycle rule can be applied // Automatically remove the object/version is an expiry lifecycle rule can be applied
if lc, err := globalLifecycleSys.Get(bucket); err == nil { if lc, err := globalLifecycleSys.Get(bucket); err == nil {

@ -49,7 +49,7 @@ require (
github.com/minio/cli v1.22.0 github.com/minio/cli v1.22.0
github.com/minio/highwayhash v1.0.1 github.com/minio/highwayhash v1.0.1
github.com/minio/md5-simd v1.1.1 // indirect github.com/minio/md5-simd v1.1.1 // indirect
github.com/minio/minio-go/v7 v7.0.8 github.com/minio/minio-go/v7 v7.0.9-0.20210210235136-83423dddb072
github.com/minio/selfupdate v0.3.1 github.com/minio/selfupdate v0.3.1
github.com/minio/sha256-simd v0.1.1 github.com/minio/sha256-simd v0.1.1
github.com/minio/simdjson-go v0.2.1 github.com/minio/simdjson-go v0.2.1

@ -419,6 +419,8 @@ github.com/minio/md5-simd v1.1.1 h1:9ojcLbuZ4gXbB2sX53MKn8JUZ0sB/2wfwsEcRw+I08U=
github.com/minio/md5-simd v1.1.1/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw= github.com/minio/md5-simd v1.1.1/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=
github.com/minio/minio-go/v7 v7.0.8 h1:snnHtYkHz3TKrQJY1jTQGOZqnue79pbYTukuwqz/QvM= github.com/minio/minio-go/v7 v7.0.8 h1:snnHtYkHz3TKrQJY1jTQGOZqnue79pbYTukuwqz/QvM=
github.com/minio/minio-go/v7 v7.0.8/go.mod h1:pEZBUa+L2m9oECoIA6IcSK8bv/qggtQVLovjeKK5jYc= github.com/minio/minio-go/v7 v7.0.8/go.mod h1:pEZBUa+L2m9oECoIA6IcSK8bv/qggtQVLovjeKK5jYc=
github.com/minio/minio-go/v7 v7.0.9-0.20210210235136-83423dddb072 h1:zlheLAzZ66jYLUsa81R8gwPtSgKRI5FMJyAKuaJpkHE=
github.com/minio/minio-go/v7 v7.0.9-0.20210210235136-83423dddb072/go.mod h1:pEZBUa+L2m9oECoIA6IcSK8bv/qggtQVLovjeKK5jYc=
github.com/minio/selfupdate v0.3.1 h1:BWEFSNnrZVMUWXbXIgLDNDjbejkmpAmZvy/nCz1HlEs= github.com/minio/selfupdate v0.3.1 h1:BWEFSNnrZVMUWXbXIgLDNDjbejkmpAmZvy/nCz1HlEs=
github.com/minio/selfupdate v0.3.1/go.mod h1:b8ThJzzH7u2MkF6PcIra7KaXO9Khf6alWPvMSyTDCFM= github.com/minio/selfupdate v0.3.1/go.mod h1:b8ThJzzH7u2MkF6PcIra7KaXO9Khf6alWPvMSyTDCFM=
github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU= github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU=

Loading…
Cancel
Save