diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 8b1f6042b..40972f469 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -192,6 +192,10 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet } return oi.DeleteMarker, false, sync } + if oi.ReplicationStatus == replication.Replica { + // avoid replicating a replica in bi-directional replication + return oi.DeleteMarker, false, sync + } tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn) // the target online status should not be used here while deciding // whether to replicate deletes as the target could be temporarily down @@ -241,6 +245,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectA }) return } + tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn) if tgt == nil { logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, rcfg.RoleArn)) @@ -276,7 +281,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectA } else { versionPurgeStatus = Failed } - logger.LogIf(ctx, fmt.Errorf("Unable to replicate delete marker to %s/%s(%s): %w", rcfg.GetDestination().Bucket, dobj.ObjectName, versionID, err)) + logger.LogIf(ctx, fmt.Errorf("Unable to replicate delete marker to %s/%s(%s): %s", rcfg.GetDestination().Bucket, dobj.ObjectName, versionID, err)) } else { if dobj.VersionID == "" { replicationStatus = string(replication.Completed) @@ -293,14 +298,13 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectA // Update metadata on the delete marker or purge permanent delete if replication success. dobjInfo, err := objectAPI.DeleteObject(ctx, bucket, dobj.ObjectName, ObjectOptions{ VersionID: versionID, - DeleteMarker: dobj.DeleteMarker, DeleteMarkerReplicationStatus: replicationStatus, VersionPurgeStatus: versionPurgeStatus, Versioned: globalBucketVersioningSys.Enabled(bucket), VersionSuspended: globalBucketVersioningSys.Suspended(bucket), }) - if err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, dobj.ObjectName, versionID, err)) + if err != nil && !isErrVersionNotFound(err) { // VersionNotFound would be reported by pool that object version is missing on. + logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %s", bucket, dobj.ObjectName, versionID, err)) sendEvent(eventArgs{ BucketName: bucket, Object: ObjectInfo{ @@ -648,12 +652,12 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa c := &miniogo.Core{Client: tgt.Client} if _, err = c.CopyObject(ctx, dest.Bucket, object, dest.Bucket, object, getCopyObjMetadata(objInfo, dest), dstOpts); err != nil { replicationStatus = replication.Failed - logger.LogIf(ctx, fmt.Errorf("Unable to replicate metadata for object %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) + logger.LogIf(ctx, fmt.Errorf("Unable to replicate metadata for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err)) } } else { target, err := globalBucketMetadataSys.GetBucketTarget(bucket, cfg.RoleArn) if err != nil { - logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s cfg:%s err:%w", bucket, cfg.RoleArn, err)) + logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s cfg:%s err:%s", bucket, cfg.RoleArn, err)) sendEvent(eventArgs{ EventName: event.ObjectReplicationNotTracked, BucketName: bucket, diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 866389265..1a15459fe 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -1034,11 +1034,10 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string if goi.VersionID != "" { markDelete = true } + // Default deleteMarker to true if object is under versioning - deleteMarker := true - if gerr == nil { - deleteMarker = goi.VersionID != "" - } + deleteMarker := opts.Versioned + if opts.VersionID != "" { // case where replica version needs to be deleted on target cluster if versionFound && opts.DeleteMarkerReplicationStatus == replication.Replica.String() { @@ -1047,7 +1046,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string if opts.VersionPurgeStatus.Empty() && opts.DeleteMarkerReplicationStatus == "" { markDelete = false } - if opts.DeleteMarker && opts.VersionPurgeStatus == Complete { + if opts.VersionPurgeStatus == Complete { markDelete = false } // determine if the version represents an object delete