|
|
|
@ -251,37 +251,31 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectA |
|
|
|
|
versionPurgeStatus = Complete |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var eventName = event.ObjectReplicationComplete |
|
|
|
|
if replicationStatus == string(replication.Failed) || versionPurgeStatus == Failed { |
|
|
|
|
eventName = event.ObjectReplicationFailed |
|
|
|
|
} |
|
|
|
|
objInfo := ObjectInfo{ |
|
|
|
|
Name: dobj.ObjectName, |
|
|
|
|
DeleteMarker: dobj.DeleteMarker, |
|
|
|
|
VersionID: versionID, |
|
|
|
|
ReplicationStatus: replication.StatusType(dobj.DeleteMarkerReplicationStatus), |
|
|
|
|
VersionPurgeStatus: versionPurgeStatus, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
eventArg := &eventArgs{ |
|
|
|
|
BucketName: bucket, |
|
|
|
|
Object: objInfo, |
|
|
|
|
Host: "Internal: [Replication]", |
|
|
|
|
EventName: eventName, |
|
|
|
|
} |
|
|
|
|
sendEvent(*eventArg) |
|
|
|
|
|
|
|
|
|
// Update metadata on the delete marker or purge permanent delete if replication success.
|
|
|
|
|
if _, err = objectAPI.DeleteObject(ctx, bucket, dobj.ObjectName, ObjectOptions{ |
|
|
|
|
objInfo, err := objectAPI.DeleteObject(ctx, bucket, dobj.ObjectName, ObjectOptions{ |
|
|
|
|
VersionID: versionID, |
|
|
|
|
DeleteMarker: dobj.DeleteMarker, |
|
|
|
|
DeleteMarkerReplicationStatus: replicationStatus, |
|
|
|
|
Versioned: globalBucketVersioningSys.Enabled(bucket), |
|
|
|
|
VersionPurgeStatus: versionPurgeStatus, |
|
|
|
|
VersionSuspended: globalBucketVersioningSys.Suspended(bucket), |
|
|
|
|
}); err != nil { |
|
|
|
|
}) |
|
|
|
|
if err != nil { |
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s %s: %w", bucket, dobj.ObjectName, dobj.VersionID, err)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
sendEvent(eventArgs{ |
|
|
|
|
BucketName: bucket, |
|
|
|
|
Object: objInfo, |
|
|
|
|
Host: "Internal: [Replication]", |
|
|
|
|
EventName: eventName, |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func getCopyObjMetadata(oi ObjectInfo, dest replication.Destination) map[string]string { |
|
|
|
@ -511,6 +505,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa |
|
|
|
|
} |
|
|
|
|
r.Close() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String() |
|
|
|
|
if objInfo.UserTags != "" { |
|
|
|
|
objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags |
|
|
|
@ -524,20 +519,23 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa |
|
|
|
|
if replicationStatus == replication.Failed { |
|
|
|
|
eventName = event.ObjectReplicationFailed |
|
|
|
|
} |
|
|
|
|
sendEvent(eventArgs{ |
|
|
|
|
EventName: eventName, |
|
|
|
|
BucketName: bucket, |
|
|
|
|
Object: objInfo, |
|
|
|
|
Host: "Internal: [Replication]", |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
objInfo.metadataOnly = true // Perform only metadata updates.
|
|
|
|
|
if _, err = objectAPI.CopyObject(ctx, bucket, object, bucket, object, objInfo, ObjectOptions{ |
|
|
|
|
objInfo, err = objectAPI.CopyObject(ctx, bucket, object, bucket, object, objInfo, ObjectOptions{ |
|
|
|
|
VersionID: objInfo.VersionID, |
|
|
|
|
}, ObjectOptions{ |
|
|
|
|
VersionID: objInfo.VersionID, |
|
|
|
|
}); err != nil { |
|
|
|
|
}) |
|
|
|
|
if err != nil { |
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s: %s", objInfo.VersionID, err)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
sendEvent(eventArgs{ |
|
|
|
|
EventName: eventName, |
|
|
|
|
BucketName: bucket, |
|
|
|
|
Object: objInfo, |
|
|
|
|
Host: "Internal: [Replication]", |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// filterReplicationStatusMetadata filters replication status metadata for COPY
|
|
|
|
|