@ -20,6 +20,7 @@ import (
"context"
"fmt"
"net/http"
"reflect"
"strings"
"time"
@ -168,14 +169,8 @@ func hasReplicationRules(ctx context.Context, bucket string, objects []ObjectToD
}
// isStandardHeader returns true if header is a supported header and not a custom header
func isStandardHeader ( headerKey string ) bool {
key := strings . ToLower ( headerKey )
for _ , header := range standardHeaders {
if strings . ToLower ( header ) == key {
return true
}
}
return false
func isStandardHeader ( matchHeaderKey string ) bool {
return equals ( matchHeaderKey , standardHeaders ... )
}
// returns whether object version is a deletemarker and if object qualifies for replication
@ -225,20 +220,44 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet
// on target.
func replicateDelete ( ctx context . Context , dobj DeletedObjectVersionInfo , objectAPI ObjectLayer ) {
bucket := dobj . Bucket
versionID := dobj . DeleteMarkerVersionID
if versionID == "" {
versionID = dobj . VersionID
}
rcfg , err := getReplicationConfig ( ctx , bucket )
if err != nil || rcfg == nil {
logger . LogIf ( ctx , err )
sendEvent ( eventArgs {
BucketName : bucket ,
Object : ObjectInfo {
Bucket : bucket ,
Name : dobj . ObjectName ,
VersionID : versionID ,
DeleteMarker : dobj . DeleteMarker ,
} ,
Host : "Internal: [Replication]" ,
EventName : event . ObjectReplicationNotTracked ,
} )
return
}
tgt := globalBucketTargetSys . GetRemoteTargetClient ( ctx , rcfg . RoleArn )
if tgt == nil {
logger . LogIf ( ctx , fmt . Errorf ( "failed to get target for bucket:%s arn:%s" , bucket , rcfg . RoleArn ) )
sendEvent ( eventArgs {
BucketName : bucket ,
Object : ObjectInfo {
Bucket : bucket ,
Name : dobj . ObjectName ,
VersionID : versionID ,
DeleteMarker : dobj . DeleteMarker ,
} ,
Host : "Internal: [Replication]" ,
EventName : event . ObjectReplicationNotTracked ,
} )
return
}
versionID := dobj . DeleteMarkerVersionID
if versionID == "" {
versionID = dobj . VersionID
}
rmErr := tgt . RemoveObject ( ctx , rcfg . GetDestination ( ) . Bucket , dobj . ObjectName , miniogo . RemoveObjectOptions {
VersionID : versionID ,
Internal : miniogo . AdvancedRemoveOptions {
@ -257,6 +276,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectA
} else {
versionPurgeStatus = Failed
}
logger . LogIf ( ctx , fmt . Errorf ( "Unable to replicate delete marker to %s/%s(%s): %w" , rcfg . GetDestination ( ) . Bucket , dobj . ObjectName , versionID , err ) )
} else {
if dobj . VersionID == "" {
replicationStatus = string ( replication . Completed )
@ -271,61 +291,78 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectA
}
// Update metadata on the delete marker or purge permanent delete if replication success.
objInfo , err := objectAPI . DeleteObject ( ctx , bucket , dobj . ObjectName , ObjectOptions {
d objInfo, err := objectAPI . DeleteObject ( ctx , bucket , dobj . ObjectName , ObjectOptions {
VersionID : versionID ,
DeleteMarker : dobj . DeleteMarker ,
DeleteMarkerReplicationStatus : replicationStatus ,
Versioned : globalBucketVersioningSys . Enabled ( bucket ) ,
VersionPurgeStatus : versionPurgeStatus ,
Versioned : globalBucketVersioningSys . Enabled ( bucket ) ,
VersionSuspended : globalBucketVersioningSys . Suspended ( bucket ) ,
} )
if err != nil {
logger . LogIf ( ctx , fmt . Errorf ( "Unable to update replication metadata for %s/%s %s: %w" , bucket , dobj . ObjectName , dobj . VersionID , err ) )
}
logger . LogIf ( ctx , fmt . Errorf ( "Unable to update replication metadata for %s/%s(%s): %w" , bucket , dobj . ObjectName , versionID , err ) )
sendEvent ( eventArgs {
BucketName : bucket ,
Object : objInfo ,
Object : ObjectInfo {
Bucket : bucket ,
Name : dobj . ObjectName ,
VersionID : versionID ,
DeleteMarker : dobj . DeleteMarker ,
} ,
Host : "Internal: [Replication]" ,
EventName : eventName ,
} )
} else {
sendEvent ( eventArgs {
BucketName : bucket ,
Object : dobjInfo ,
Host : "Internal: [Replication]" ,
EventName : eventName ,
} )
}
}
func getCopyObjMetadata ( oi ObjectInfo , dest replication . Destination ) map [ string ] string {
meta := make ( map [ string ] string , len ( oi . UserDefined ) )
for k , v := range oi . UserDefined {
if k == xhttp . AmzBucketReplicationStatus {
if strings . HasPrefix ( strings . ToLower ( k ) , ReservedMetadataPrefixLower ) {
continue
}
if strings . HasPrefix ( strings . ToLower ( k ) , ReservedMetadataPrefixLower ) {
if equals ( k , xhttp . AmzBucketReplicationStatus ) {
continue
}
// https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w
if equals ( k , xhttp . AmzMetaUnencryptedContentLength , xhttp . AmzMetaUnencryptedContentMD5 ) {
continue
}
meta [ k ] = v
}
if oi . ContentEncoding != "" {
meta [ xhttp . ContentEncoding ] = oi . ContentEncoding
}
if oi . ContentType != "" {
meta [ xhttp . ContentType ] = oi . ContentType
}
tag , err := tags . ParseObjectTags ( oi . UserTags )
if err != nil {
return nil
}
if tag != nil {
meta [ xhttp . AmzObjectTagging ] = tag . String ( )
if oi . UserTags != "" {
meta [ xhttp . AmzObjectTagging ] = oi . UserTags
meta [ xhttp . AmzTagDirective ] = "REPLACE"
}
sc := dest . StorageClass
if sc == "" {
sc = oi . StorageClass
}
if sc != "" {
meta [ xhttp . AmzStorageClass ] = sc
if oi . UserTags != "" {
meta [ xhttp . AmzObjectTagging ] = oi . UserTags
}
meta [ xhttp . MinIOSourceMTime ] = oi . ModTime . Format ( time . RFC3339Nano )
meta [ xhttp . MinIOSourceETag ] = oi . ETag
meta [ xhttp . MinIOSourceMTime ] = oi . ModTime . Format ( time . RFC3339Nano )
meta [ xhttp . AmzBucketReplicationStatus ] = replication . Replica . String ( )
return meta
}
@ -341,17 +378,13 @@ func putReplicationOpts(ctx context.Context, dest replication.Destination, objIn
}
meta [ k ] = v
}
tag , err := tags . ParseObjectTags ( objInfo . UserTags )
if err != nil {
return
}
sc := dest . StorageClass
if sc == "" {
sc = objInfo . StorageClass
}
putOpts = miniogo . PutObjectOptions {
UserMetadata : meta ,
UserTags : tag . ToMap ( ) ,
ContentType : objInfo . ContentType ,
ContentEncoding : objInfo . ContentEncoding ,
StorageClass : sc ,
@ -362,6 +395,12 @@ func putReplicationOpts(ctx context.Context, dest replication.Destination, objIn
SourceETag : objInfo . ETag ,
} ,
}
if objInfo . UserTags != "" {
tag , _ := tags . ParseObjectTags ( objInfo . UserTags )
if tag != nil {
putOpts . UserTags = tag . ToMap ( )
}
}
if lang , ok := objInfo . UserDefined [ xhttp . ContentLanguage ] ; ok {
putOpts . ContentLanguage = lang
}
@ -400,6 +439,16 @@ const (
replicateAll replicationAction = "all"
)
// matches k1 with all keys, returns 'true' if one of them matches
func equals ( k1 string , keys ... string ) bool {
for _ , k2 := range keys {
if strings . ToLower ( k1 ) == strings . ToLower ( k2 ) {
return true
}
}
return false
}
// returns replicationAction by comparing metadata between source and target
func getReplicationAction ( oi1 ObjectInfo , oi2 minio . ObjectInfo ) replicationAction {
// needs full replication
@ -407,83 +456,153 @@ func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationActio
oi1 . VersionID != oi2 . VersionID ||
oi1 . Size != oi2 . Size ||
oi1 . DeleteMarker != oi2 . IsDeleteMarker ||
! oi1 . ModTime . Equal ( oi2 . LastModified ) {
oi1 . ModTime . Unix ( ) != oi2 . LastModified . Unix ( ) {
return replicateAll
}
if oi1 . ContentType != oi2 . ContentType {
return replicateMetadata
}
if oi1 . ContentEncoding != "" {
enc , ok := oi2 . Metadata [ xhttp . ContentEncoding ]
if ! ok || strings . Join ( enc , "" ) != oi1 . ContentEncoding {
if ! ok {
enc , ok = oi2 . Metadata [ strings . ToLower ( xhttp . ContentEncoding ) ]
if ! ok {
return replicateMetadata
}
}
// compare metadata on both maps to see if meta is identical
for k1 , v1 := range oi1 . UserDefined {
if v2 , ok := oi2 . UserMetadata [ k1 ] ; ok && v1 == v2 {
continue
if strings . Join ( enc , "," ) != oi1 . ContentEncoding {
return replicateMetadata
}
if v2 , ok := oi2 . Metadata [ k1 ] ; ok && v1 == strings . Join ( v2 , "" ) {
continue
}
t , _ := tags . ParseObjectTags ( oi1 . UserTags )
if ! reflect . DeepEqual ( oi2 . UserTags , t . ToMap ( ) ) {
return replicateMetadata
}
for k1 , v1 := range oi2 . UserMetadata {
if v2 , ok := oi1 . UserDefined [ k1 ] ; ! ok || v1 != v2 {
return replicateMetadata
// Compare only necessary headers
compareKeys := [ ] string {
"Expires" ,
"Cache-Control" ,
"Content-Language" ,
"Content-Disposition" ,
"X-Amz-Object-Lock-Mode" ,
"X-Amz-Object-Lock-Retain-Until-Date" ,
"X-Amz-Object-Lock-Legal-Hold" ,
"X-Amz-Website-Redirect-Location" ,
"X-Amz-Meta-" ,
}
// compare metadata on both maps to see if meta is identical
compareMeta1 := make ( map [ string ] string )
for k , v := range oi1 . UserDefined {
var found bool
for _ , prefix := range compareKeys {
if ! strings . HasPrefix ( strings . ToLower ( k ) , strings . ToLower ( prefix ) ) {
continue
}
found = true
break
}
for k1 , v1slc := range oi2 . Metadata {
v1 := strings . Join ( v1slc , "" )
if k1 == xhttp . ContentEncoding { //already compared
if found {
compareMeta1 [ strings . ToLower ( k ) ] = v
}
}
compareMeta2 := make ( map [ string ] string )
for k , v := range oi2 . Metadata {
var found bool
for _ , prefix := range compareKeys {
if ! strings . HasPrefix ( strings . ToLower ( k ) , strings . ToLower ( prefix ) ) {
continue
}
if v2 , ok := oi1 . UserDefined [ k1 ] ; ! ok || v1 != v2 {
return replicateMetadata
found = true
break
}
if found {
compareMeta2 [ strings . ToLower ( k ) ] = strings . Join ( v , "," )
}
}
t , _ := tags . MapToObjectTags ( oi2 . UserTags )
if t . String ( ) != oi1 . UserTags {
if ! reflect . DeepEqual ( compareMeta1 , compareMeta2 ) {
return replicateMetadata
}
return replicateNone
}
// replicateObject replicates the specified version of the object to destination bucket
// The source object is then updated to reflect the replication status.
func replicateObject ( ctx context . Context , objInfo ObjectInfo , objectAPI ObjectLayer ) {
z , ok := objectAPI . ( * erasureServerPools )
if ! ok {
return
}
bucket := objInfo . Bucket
object := objInfo . Name
cfg , err := getReplicationConfig ( ctx , bucket )
if err != nil {
logger . LogIf ( ctx , err )
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
return
}
tgt := globalBucketTargetSys . GetRemoteTargetClient ( ctx , cfg . RoleArn )
if tgt == nil {
logger . LogIf ( ctx , fmt . Errorf ( "failed to get target for bucket:%s arn:%s" , bucket , cfg . RoleArn ) )
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
return
}
gr , err := objectAPI . GetObjectNInfo ( ctx , bucket , object , nil , http . Header { } , readLock , ObjectOptions {
VersionID : objInfo . VersionID ,
} )
if err != nil {
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
logger . LogIf ( ctx , err )
return
}
defer gr . Close ( ) // hold read lock for entire transaction
objInfo = gr . ObjInfo
size , err := objInfo . GetActualSize ( )
if err != nil {
logger . LogIf ( ctx , err )
gr . Close ( )
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
return
}
dest := cfg . GetDestination ( )
if dest . Bucket == "" {
gr . Close ( )
logger . LogIf ( ctx , fmt . Errorf ( "Unable to replicate object %s(%s), bucket is empty" , objInfo . Name , objInfo . VersionID ) )
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
return
}
@ -496,7 +615,6 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
if err == nil {
rtype = getReplicationAction ( objInfo , oi )
if rtype == replicateNone {
gr . Close ( )
// object with same VersionID already exists, replication kicked off by
// PutObject might have completed.
return
@ -504,19 +622,23 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
}
replicationStatus := replication . Completed
if rtype != replicateAll {
gr . Close ( )
// replicate metadata for object tagging/copy with metadata replacement
dstOpts := miniogo . PutObjectOptions { Internal : miniogo . AdvancedPutOptions { SourceVersionID : objInfo . VersionID } }
c := & miniogo . Core { Client : tgt . Client }
_ , err = c . CopyObject ( ctx , dest . Bucket , object , dest . Bucket , object , getCopyObjMetadata ( objInfo , dest ) , dstOpts )
if err != nil {
if _ , err = c . CopyObject ( ctx , dest . Bucket , object , dest . Bucket , object , getCopyObjMetadata ( objInfo , dest ) , dstOpts ) ; err != nil {
replicationStatus = replication . Failed
logger . LogIf ( ctx , fmt . Errorf ( "Unable to replicate metadata for object %s/%s(%s): %s" , bucket , objInfo . Name , objInfo . VersionID , err ) )
}
} else {
target , err := globalBucketMetadataSys . GetBucketTarget ( bucket , cfg . RoleArn )
if err != nil {
logger . LogIf ( ctx , fmt . Errorf ( "failed to get target for replication bucket:%s cfg:%s err:%s" , bucket , cfg . RoleArn , err ) )
gr . Close ( )
sendEvent ( eventArgs {
EventName : event . ObjectReplicationNotTracked ,
BucketName : bucket ,
Object : objInfo ,
Host : "Internal: [Replication]" ,
} )
return
}
@ -535,11 +657,11 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
// r takes over closing gr.
r := bandwidth . NewMonitoredReader ( ctx , globalBucketMonitor , objInfo . Bucket , objInfo . Name , gr , headerSize , b , target . BandwidthLimit )
_ , err = tgt . PutObject ( ctx , dest . Bucket , object , r , size , putOpts )
if err != nil {
if _ , err = tgt . PutObject ( ctx , dest . Bucket , object , r , size , putOpts ) ; err != nil {
replicationStatus = replication . Failed
logger . LogIf ( ctx , fmt . Errorf ( "Unable to replicate for object %s/%s(%s): %s" , bucket , objInfo . Name , objInfo . VersionID , err ) )
}
r . Close ( )
defer r . Close ( )
}
objInfo . UserDefined [ xhttp . AmzBucketReplicationStatus ] = replicationStatus . String ( )
@ -548,7 +670,6 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
}
// FIXME: add support for missing replication events
// - event.ObjectReplicationNotTracked
// - event.ObjectReplicationMissedThreshold
// - event.ObjectReplicationReplicatedAfterThreshold
var eventName = event . ObjectReplicationComplete
@ -556,16 +677,19 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
eventName = event . ObjectReplicationFailed
}
objInfo . metadataOnly = true // Perform only metadata updates .
objInf o , err = objectAPI . CopyObject ( ctx , bucket , object , bucket , object , objInfo , ObjectOptions {
// This lower level implementation is necessary to avoid write locks from CopyObject .
p oolIdx , err := z . getPoolIdx ( ctx , bucket , object , ObjectOptions {
VersionID : objInfo . VersionID ,
} , ObjectOptions {
VersionID : objInfo . VersionID ,
} )
} , objInfo . Size )
if err != nil {
logger . LogIf ( ctx , fmt . Errorf ( "Unable to update replication metadata for %s: %s" , objInfo . VersionID , err ) )
logger . LogIf ( ctx , fmt . Errorf ( "Unable to update replication metadata for %s/%s(%s): %w" , bucket , objInfo . Name , objInfo . VersionID , err ) )
} else {
if err = z . serverPools [ poolIdx ] . getHashedSet ( object ) . updateObjectMeta ( ctx , bucket , object , objInfo . UserDefined , ObjectOptions {
VersionID : objInfo . VersionID ,
} ) ; err != nil {
logger . LogIf ( ctx , fmt . Errorf ( "Unable to update replication metadata for %s/%s(%s): %w" , bucket , objInfo . Name , objInfo . VersionID , err ) )
}
}
sendEvent ( eventArgs {
EventName : eventName ,
BucketName : bucket ,
@ -703,8 +827,11 @@ func proxyGetToReplicationTarget(ctx context.Context, bucket, object string, rs
return nil , false
}
}
// Make sure to match ETag when proxying.
if err = gopts . SetMatchETag ( oi . ETag ) ; err != nil {
return nil , false
}
c := miniogo . Core { Client : tgt . Client }
obj , _ , _ , err := c . GetObject ( ctx , bucket , object , gopts )
if err != nil {
return nil , false
@ -770,6 +897,7 @@ func proxyHeadToRepTarget(ctx context.Context, bucket, object string, opts Objec
if err != nil {
return nil , oi , false , err
}
tags , _ := tags . MapToObjectTags ( objInfo . UserTags )
oi = ObjectInfo {
Bucket : bucket ,
@ -784,12 +912,17 @@ func proxyHeadToRepTarget(ctx context.Context, bucket, object string, opts Objec
Expires : objInfo . Expires ,
StorageClass : objInfo . StorageClass ,
ReplicationStatus : replication . StatusType ( objInfo . ReplicationStatus ) ,
UserDefined : cloneMSS ( objInfo . UserMetadata ) ,
UserTags : tags . String ( ) ,
}
if ce , ok := oi . UserDefined [ xhttp . ContentEncoding ] ; ok {
for k , v := range objInfo . Metadata {
oi . UserDefined [ k ] = v [ 0 ]
}
ce , ok := oi . UserDefined [ xhttp . ContentEncoding ]
if ! ok {
ce , ok = oi . UserDefined [ strings . ToLower ( xhttp . ContentEncoding ) ]
}
if ok {
oi . ContentEncoding = ce
delete ( oi . UserDefined , xhttp . ContentEncoding )
}
return tgt , oi , true , nil
}