|
|
|
@ -192,6 +192,10 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet |
|
|
|
|
} |
|
|
|
|
return oi.DeleteMarker, false, sync |
|
|
|
|
} |
|
|
|
|
if oi.ReplicationStatus == replication.Replica { |
|
|
|
|
// avoid replicating a replica in bi-directional replication
|
|
|
|
|
return oi.DeleteMarker, false, sync |
|
|
|
|
} |
|
|
|
|
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn) |
|
|
|
|
// the target online status should not be used here while deciding
|
|
|
|
|
// whether to replicate deletes as the target could be temporarily down
|
|
|
|
@ -241,6 +245,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectA |
|
|
|
|
}) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn) |
|
|
|
|
if tgt == nil { |
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, rcfg.RoleArn)) |
|
|
|
@ -276,7 +281,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectA |
|
|
|
|
} else { |
|
|
|
|
versionPurgeStatus = Failed |
|
|
|
|
} |
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate delete marker to %s/%s(%s): %w", rcfg.GetDestination().Bucket, dobj.ObjectName, versionID, err)) |
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate delete marker to %s/%s(%s): %s", rcfg.GetDestination().Bucket, dobj.ObjectName, versionID, err)) |
|
|
|
|
} else { |
|
|
|
|
if dobj.VersionID == "" { |
|
|
|
|
replicationStatus = string(replication.Completed) |
|
|
|
@ -293,14 +298,13 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectA |
|
|
|
|
// Update metadata on the delete marker or purge permanent delete if replication success.
|
|
|
|
|
dobjInfo, err := objectAPI.DeleteObject(ctx, bucket, dobj.ObjectName, ObjectOptions{ |
|
|
|
|
VersionID: versionID, |
|
|
|
|
DeleteMarker: dobj.DeleteMarker, |
|
|
|
|
DeleteMarkerReplicationStatus: replicationStatus, |
|
|
|
|
VersionPurgeStatus: versionPurgeStatus, |
|
|
|
|
Versioned: globalBucketVersioningSys.Enabled(bucket), |
|
|
|
|
VersionSuspended: globalBucketVersioningSys.Suspended(bucket), |
|
|
|
|
}) |
|
|
|
|
if err != nil { |
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, dobj.ObjectName, versionID, err)) |
|
|
|
|
if err != nil && !isErrVersionNotFound(err) { // VersionNotFound would be reported by pool that object version is missing on.
|
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %s", bucket, dobj.ObjectName, versionID, err)) |
|
|
|
|
sendEvent(eventArgs{ |
|
|
|
|
BucketName: bucket, |
|
|
|
|
Object: ObjectInfo{ |
|
|
|
@ -648,12 +652,12 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa |
|
|
|
|
c := &miniogo.Core{Client: tgt.Client} |
|
|
|
|
if _, err = c.CopyObject(ctx, dest.Bucket, object, dest.Bucket, object, getCopyObjMetadata(objInfo, dest), dstOpts); err != nil { |
|
|
|
|
replicationStatus = replication.Failed |
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate metadata for object %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) |
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate metadata for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err)) |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
target, err := globalBucketMetadataSys.GetBucketTarget(bucket, cfg.RoleArn) |
|
|
|
|
if err != nil { |
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s cfg:%s err:%w", bucket, cfg.RoleArn, err)) |
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s cfg:%s err:%s", bucket, cfg.RoleArn, err)) |
|
|
|
|
sendEvent(eventArgs{ |
|
|
|
|
EventName: event.ObjectReplicationNotTracked, |
|
|
|
|
BucketName: bucket, |
|
|
|
|