|
|
|
@ -367,16 +367,24 @@ func getCopyObjMetadata(oi ObjectInfo, dest replication.Destination) map[string] |
|
|
|
|
return meta |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// lookup map entry case insensitively.
|
|
|
|
|
func getMapValue(m map[string]string, key string) (string, bool) { |
|
|
|
|
if v, ok := m[key]; ok { |
|
|
|
|
return v, ok |
|
|
|
|
type caseInsensitiveMap map[string]string |
|
|
|
|
|
|
|
|
|
// Lookup map entry case insensitively.
|
|
|
|
|
func (m caseInsensitiveMap) Lookup(key string) (string, bool) { |
|
|
|
|
if len(m) == 0 { |
|
|
|
|
return "", false |
|
|
|
|
} |
|
|
|
|
if v, ok := m[strings.ToLower(key)]; ok { |
|
|
|
|
return v, ok |
|
|
|
|
for _, k := range []string{ |
|
|
|
|
key, |
|
|
|
|
strings.ToLower(key), |
|
|
|
|
http.CanonicalHeaderKey(key), |
|
|
|
|
} { |
|
|
|
|
v, ok := m[k] |
|
|
|
|
if ok { |
|
|
|
|
return v, ok |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
v, ok := m[http.CanonicalHeaderKey(key)] |
|
|
|
|
return v, ok |
|
|
|
|
return "", false |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func putReplicationOpts(ctx context.Context, dest replication.Destination, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions, err error) { |
|
|
|
@ -413,27 +421,29 @@ func putReplicationOpts(ctx context.Context, dest replication.Destination, objIn |
|
|
|
|
putOpts.UserTags = tag.ToMap() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if lang, ok := getMapValue(objInfo.UserDefined, xhttp.ContentLanguage); ok { |
|
|
|
|
|
|
|
|
|
lkMap := caseInsensitiveMap(objInfo.UserDefined) |
|
|
|
|
if lang, ok := lkMap.Lookup(xhttp.ContentLanguage); ok { |
|
|
|
|
putOpts.ContentLanguage = lang |
|
|
|
|
} |
|
|
|
|
if disp, ok := getMapValue(objInfo.UserDefined, xhttp.ContentDisposition); ok { |
|
|
|
|
if disp, ok := lkMap.Lookup(xhttp.ContentDisposition); ok { |
|
|
|
|
putOpts.ContentDisposition = disp |
|
|
|
|
} |
|
|
|
|
if cc, ok := getMapValue(objInfo.UserDefined, xhttp.CacheControl); ok { |
|
|
|
|
if cc, ok := lkMap.Lookup(xhttp.CacheControl); ok { |
|
|
|
|
putOpts.CacheControl = cc |
|
|
|
|
} |
|
|
|
|
if mode, ok := getMapValue(objInfo.UserDefined, xhttp.AmzObjectLockMode); ok { |
|
|
|
|
if mode, ok := lkMap.Lookup(xhttp.AmzObjectLockMode); ok { |
|
|
|
|
rmode := miniogo.RetentionMode(mode) |
|
|
|
|
putOpts.Mode = rmode |
|
|
|
|
} |
|
|
|
|
if retainDateStr, ok := getMapValue(objInfo.UserDefined, xhttp.AmzObjectLockRetainUntilDate); ok { |
|
|
|
|
rdate, err := time.Parse(time.RFC3339Nano, retainDateStr) |
|
|
|
|
if retainDateStr, ok := lkMap.Lookup(xhttp.AmzObjectLockRetainUntilDate); ok { |
|
|
|
|
rdate, err := time.Parse(time.RFC3339, retainDateStr) |
|
|
|
|
if err != nil { |
|
|
|
|
return miniogo.PutObjectOptions{}, err |
|
|
|
|
return putOpts, err |
|
|
|
|
} |
|
|
|
|
putOpts.RetainUntilDate = rdate |
|
|
|
|
} |
|
|
|
|
if lhold, ok := getMapValue(objInfo.UserDefined, xhttp.AmzObjectLockLegalHold); ok { |
|
|
|
|
if lhold, ok := lkMap.Lookup(xhttp.AmzObjectLockLegalHold); ok { |
|
|
|
|
putOpts.LegalHold = miniogo.LegalHoldStatus(lhold) |
|
|
|
|
} |
|
|
|
|
if crypto.S3.IsEncrypted(objInfo.UserDefined) { |
|
|
|
@ -643,7 +653,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa |
|
|
|
|
} else { |
|
|
|
|
target, err := globalBucketMetadataSys.GetBucketTarget(bucket, cfg.RoleArn) |
|
|
|
|
if err != nil { |
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s cfg:%s err:%s", bucket, cfg.RoleArn, err)) |
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s cfg:%s err:%w", bucket, cfg.RoleArn, err)) |
|
|
|
|
sendEvent(eventArgs{ |
|
|
|
|
EventName: event.ObjectReplicationNotTracked, |
|
|
|
|
BucketName: bucket, |
|
|
|
@ -655,9 +665,16 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa |
|
|
|
|
|
|
|
|
|
putOpts, err := putReplicationOpts(ctx, dest, objInfo) |
|
|
|
|
if err != nil { |
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate object %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) |
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s cfg:%s err:%w", bucket, cfg.RoleArn, err)) |
|
|
|
|
sendEvent(eventArgs{ |
|
|
|
|
EventName: event.ObjectReplicationNotTracked, |
|
|
|
|
BucketName: bucket, |
|
|
|
|
Object: objInfo, |
|
|
|
|
Host: "Internal: [Replication]", |
|
|
|
|
}) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Setup bandwidth throttling
|
|
|
|
|
peers, _ := globalEndpoints.peers() |
|
|
|
|
totalNodesCount := len(peers) |
|
|
|
@ -674,7 +691,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa |
|
|
|
|
r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, objInfo.Bucket, objInfo.Name, gr, headerSize, b, target.BandwidthLimit) |
|
|
|
|
if _, err = tgt.PutObject(ctx, dest.Bucket, object, r, size, putOpts); err != nil { |
|
|
|
|
replicationStatus = replication.Failed |
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err)) |
|
|
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) |
|
|
|
|
} |
|
|
|
|
defer r.Close() |
|
|
|
|
} |
|
|
|
|