diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index 5152fcad5..a26ccf57c 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -259,26 +259,36 @@ func putTransitionOpts(objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions, er SourceETag: objInfo.ETag, }, } + if objInfo.UserTags != "" { - tag, err := tags.ParseObjectTags(objInfo.UserTags) - if err != nil { - return miniogo.PutObjectOptions{}, err + tag, _ := tags.ParseObjectTags(objInfo.UserTags) + if tag != nil { + putOpts.UserTags = tag.ToMap() } - putOpts.UserTags = tag.ToMap() } - if mode, ok := getMapValue(objInfo.UserDefined, xhttp.AmzObjectLockMode); ok { + lkMap := caseInsensitiveMap(objInfo.UserDefined) + if lang, ok := lkMap.Lookup(xhttp.ContentLanguage); ok { + putOpts.ContentLanguage = lang + } + if disp, ok := lkMap.Lookup(xhttp.ContentDisposition); ok { + putOpts.ContentDisposition = disp + } + if cc, ok := lkMap.Lookup(xhttp.CacheControl); ok { + putOpts.CacheControl = cc + } + if mode, ok := lkMap.Lookup(xhttp.AmzObjectLockMode); ok { rmode := miniogo.RetentionMode(mode) putOpts.Mode = rmode } - if retainDateStr, ok := getMapValue(objInfo.UserDefined, xhttp.AmzObjectLockRetainUntilDate); ok { + if retainDateStr, ok := lkMap.Lookup(xhttp.AmzObjectLockRetainUntilDate); ok { rdate, err := time.Parse(time.RFC3339, retainDateStr) if err != nil { - return miniogo.PutObjectOptions{}, err + return putOpts, err } putOpts.RetainUntilDate = rdate } - if lhold, ok := getMapValue(objInfo.UserDefined, xhttp.AmzObjectLockLegalHold); ok { + if lhold, ok := lkMap.Lookup(xhttp.AmzObjectLockLegalHold); ok { putOpts.LegalHold = miniogo.LegalHoldStatus(lhold) } @@ -385,7 +395,7 @@ func transitionObject(ctx context.Context, objectAPI ObjectLayer, objInfo Object putOpts, err := putTransitionOpts(oi) if err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to transition object %s/%s(%s): %w", oi.Bucket, oi.Name, oi.VersionID, err)) + gr.Close() return err } @@ -413,6 +423,7 @@ func transitionObject(ctx context.Context, objectAPI ObjectLayer, objInfo Object Object: objInfo, Host: "Internal: [ILM-Transition]", }) + return err } diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index a17a5dfbd..8b1f6042b 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -367,16 +367,24 @@ func getCopyObjMetadata(oi ObjectInfo, dest replication.Destination) map[string] return meta } -// lookup map entry case insensitively. -func getMapValue(m map[string]string, key string) (string, bool) { - if v, ok := m[key]; ok { - return v, ok +type caseInsensitiveMap map[string]string + +// Lookup map entry case insensitively. +func (m caseInsensitiveMap) Lookup(key string) (string, bool) { + if len(m) == 0 { + return "", false } - if v, ok := m[strings.ToLower(key)]; ok { - return v, ok + for _, k := range []string{ + key, + strings.ToLower(key), + http.CanonicalHeaderKey(key), + } { + v, ok := m[k] + if ok { + return v, ok + } } - v, ok := m[http.CanonicalHeaderKey(key)] - return v, ok + return "", false } func putReplicationOpts(ctx context.Context, dest replication.Destination, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions, err error) { @@ -413,27 +421,29 @@ func putReplicationOpts(ctx context.Context, dest replication.Destination, objIn putOpts.UserTags = tag.ToMap() } } - if lang, ok := getMapValue(objInfo.UserDefined, xhttp.ContentLanguage); ok { + + lkMap := caseInsensitiveMap(objInfo.UserDefined) + if lang, ok := lkMap.Lookup(xhttp.ContentLanguage); ok { putOpts.ContentLanguage = lang } - if disp, ok := getMapValue(objInfo.UserDefined, xhttp.ContentDisposition); ok { + if disp, ok := lkMap.Lookup(xhttp.ContentDisposition); ok { putOpts.ContentDisposition = disp } - if cc, ok := getMapValue(objInfo.UserDefined, xhttp.CacheControl); ok { + if cc, ok := lkMap.Lookup(xhttp.CacheControl); ok { putOpts.CacheControl = cc } - if mode, ok := getMapValue(objInfo.UserDefined, xhttp.AmzObjectLockMode); ok { + if mode, ok := lkMap.Lookup(xhttp.AmzObjectLockMode); ok { rmode := miniogo.RetentionMode(mode) putOpts.Mode = rmode } - if retainDateStr, ok := getMapValue(objInfo.UserDefined, xhttp.AmzObjectLockRetainUntilDate); ok { - rdate, err := time.Parse(time.RFC3339Nano, retainDateStr) + if retainDateStr, ok := lkMap.Lookup(xhttp.AmzObjectLockRetainUntilDate); ok { + rdate, err := time.Parse(time.RFC3339, retainDateStr) if err != nil { - return miniogo.PutObjectOptions{}, err + return putOpts, err } putOpts.RetainUntilDate = rdate } - if lhold, ok := getMapValue(objInfo.UserDefined, xhttp.AmzObjectLockLegalHold); ok { + if lhold, ok := lkMap.Lookup(xhttp.AmzObjectLockLegalHold); ok { putOpts.LegalHold = miniogo.LegalHoldStatus(lhold) } if crypto.S3.IsEncrypted(objInfo.UserDefined) { @@ -643,7 +653,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa } else { target, err := globalBucketMetadataSys.GetBucketTarget(bucket, cfg.RoleArn) if err != nil { - logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s cfg:%s err:%s", bucket, cfg.RoleArn, err)) + logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s cfg:%s err:%w", bucket, cfg.RoleArn, err)) sendEvent(eventArgs{ EventName: event.ObjectReplicationNotTracked, BucketName: bucket, @@ -655,9 +665,16 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa putOpts, err := putReplicationOpts(ctx, dest, objInfo) if err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to replicate object %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) + logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s cfg:%s err:%w", bucket, cfg.RoleArn, err)) + sendEvent(eventArgs{ + EventName: event.ObjectReplicationNotTracked, + BucketName: bucket, + Object: objInfo, + Host: "Internal: [Replication]", + }) return } + // Setup bandwidth throttling peers, _ := globalEndpoints.peers() totalNodesCount := len(peers) @@ -674,7 +691,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, objInfo.Bucket, objInfo.Name, gr, headerSize, b, target.BandwidthLimit) if _, err = tgt.PutObject(ctx, dest.Bucket, object, r, size, putOpts); err != nil { replicationStatus = replication.Failed - logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err)) + logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) } defer r.Close() } diff --git a/cmd/config/api/api.go b/cmd/config/api/api.go index 9d8d23524..2c89a326d 100644 --- a/cmd/config/api/api.go +++ b/cmd/config/api/api.go @@ -79,7 +79,7 @@ var ( }, config.KV{ Key: apiListQuorum, - Value: "optimal", + Value: "strict", }, config.KV{ Key: apiExtendListCacheLife, @@ -178,13 +178,16 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) { if err != nil { return cfg, err } + replicationWorkers, err := strconv.Atoi(env.Get(EnvAPIReplicationWorkers, kvs.Get(apiReplicationWorkers))) if err != nil { return cfg, err } + if replicationWorkers <= 0 { return cfg, config.ErrInvalidReplicationWorkersValue(nil).Msg("Minimum number of replication workers should be 1") } + return Config{ RequestsMax: requestsMax, RequestsDeadline: requestsDeadline, diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 4eee3c91e..866389265 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -179,7 +179,7 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri } if objInfo.TransitionStatus == lifecycle.TransitionComplete { // If transitioned, stream from transition tier unless object is restored locally or restore date is past. - restoreHdr, ok := getMapValue(objInfo.UserDefined, xhttp.AmzRestore) + restoreHdr, ok := caseInsensitiveMap(objInfo.UserDefined).Lookup(xhttp.AmzRestore) if !ok || !strings.HasPrefix(restoreHdr, "ongoing-request=false") || (!objInfo.RestoreExpires.IsZero() && time.Now().After(objInfo.RestoreExpires)) { return getTransitionedObjectReader(ctx, bucket, object, rs, h, objInfo, opts) } diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 3911944dc..23e865e29 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -623,17 +623,14 @@ func (z *erasureServerPools) DeleteObject(ctx context.Context, bucket string, ob if z.SinglePool() { return z.serverPools[0].DeleteObject(ctx, bucket, object, opts) } - for _, pool := range z.serverPools { - objInfo, err = pool.DeleteObject(ctx, bucket, object, opts) - if err == nil { - return objInfo, nil - } - if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { - break - } + + // We don't know the size here set 1GiB atleast. + idx, err := z.getPoolIdx(ctx, bucket, object, opts, 1<<30) + if err != nil { + return objInfo, err } - return objInfo, err + return z.serverPools[idx].DeleteObject(ctx, bucket, object, opts) } func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, objects []ObjectToDelete, opts ObjectOptions) ([]DeletedObject, []error) { diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index f980c4205..eab508c8e 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -1279,8 +1279,8 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { return } if retentionMode != "" { - opts.UserDefined[xhttp.AmzObjectLockMode] = string(retentionMode) - opts.UserDefined[xhttp.AmzObjectLockRetainUntilDate] = retentionDate.UTC().Format(iso8601TimeFormat) + opts.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = string(retentionMode) + opts.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = retentionDate.UTC().Format(iso8601TimeFormat) } objInfo, err := putObject(GlobalContext, bucket, object, pReader, opts)