From f108873c4826eccb5400a5909fda9e4f338d0b39 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 3 Feb 2021 20:41:33 -0800 Subject: [PATCH] fix: replication metadata comparsion and other fixes (#11410) - using miniogo.ObjectInfo.UserMetadata is not correct - using UserTags from Map->String() can change order - ContentType comparison needs to be removed. - Compare both lowercase and uppercase key names. - do not silently error out constructing PutObjectOptions if tag parsing fails - avoid notification for empty object info, failed operations should rely on valid objInfo for notification in all situations - optimize copyObject implementation, also introduce a new replication event - clone ObjectInfo() before scheduling for replication - add additional headers for comparison - remove strings.EqualFold comparison avoid unexpected bugs - fix pool based proxying with multiple pools - compare only specific metadata Co-authored-by: Poorna Krishnamoorthy --- cmd/api-headers.go | 6 +- cmd/api-response.go | 14 +- cmd/bucket-handlers.go | 3 +- cmd/bucket-replication.go | 297 ++++++++++++++++++++++--------- cmd/erasure-object.go | 15 -- cmd/erasure-server-pool.go | 18 +- cmd/gateway/s3/gateway-s3-sse.go | 2 +- cmd/handler-utils.go | 29 +-- cmd/handler-utils_test.go | 3 +- cmd/object-api-datatypes.go | 45 +++++ cmd/object-handlers.go | 22 +-- cmd/web-handlers.go | 2 +- cmd/xl-storage-format-v2.go | 17 +- cmd/xl-storage.go | 2 +- 14 files changed, 337 insertions(+), 138 deletions(-) diff --git a/cmd/api-headers.go b/cmd/api-headers.go index 843835a9e..fdc620011 100644 --- a/cmd/api-headers.go +++ b/cmd/api-headers.go @@ -133,18 +133,20 @@ func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, rs *HTTPRangeSp } // https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w - if strings.EqualFold(k, xhttp.AmzMetaUnencryptedContentLength) || strings.EqualFold(k, xhttp.AmzMetaUnencryptedContentMD5) { + if equals(k, xhttp.AmzMetaUnencryptedContentLength, xhttp.AmzMetaUnencryptedContentMD5) { continue } + var isSet bool for _, userMetadataPrefix := range userMetadataKeyPrefixes { - if !strings.HasPrefix(k, userMetadataPrefix) { + if !strings.HasPrefix(strings.ToLower(k), strings.ToLower(userMetadataPrefix)) { continue } w.Header()[strings.ToLower(k)] = []string{v} isSet = true break } + if !isSet { w.Header().Set(k, v) } diff --git a/cmd/api-response.go b/cmd/api-response.go index 9ce47b441..82864eafe 100644 --- a/cmd/api-response.go +++ b/cmd/api-response.go @@ -565,7 +565,7 @@ func generateListObjectsV2Response(bucket, prefix, token, nextToken, startAfter, continue } // https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w - if strings.EqualFold(k, xhttp.AmzMetaUnencryptedContentLength) || strings.EqualFold(k, xhttp.AmzMetaUnencryptedContentMD5) { + if equals(k, xhttp.AmzMetaUnencryptedContentLength, xhttp.AmzMetaUnencryptedContentMD5) { continue } content.UserMetadata[k] = v @@ -639,8 +639,16 @@ func generateListPartsResponse(partsInfo ListPartsInfo, encodingType string) Lis listPartsResponse.Key = s3EncodeName(partsInfo.Object, encodingType) listPartsResponse.UploadID = partsInfo.UploadID listPartsResponse.StorageClass = globalMinioDefaultStorageClass - listPartsResponse.Initiator.ID = globalMinioDefaultOwnerID - listPartsResponse.Owner.ID = globalMinioDefaultOwnerID + + // Dumb values not meaningful + listPartsResponse.Initiator = Initiator{ + ID: globalMinioDefaultOwnerID, + DisplayName: globalMinioDefaultOwnerID, + } + listPartsResponse.Owner = Owner{ + ID: globalMinioDefaultOwnerID, + DisplayName: globalMinioDefaultOwnerID, + } listPartsResponse.MaxParts = partsInfo.MaxParts listPartsResponse.PartNumberMarker = partsInfo.PartNumberMarker diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 7ca900fc2..cfa55b5fb 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "net/http" + "net/textproto" "net/url" "path" "path/filepath" @@ -903,7 +904,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h // Extract metadata to be saved from received Form. metadata := make(map[string]string) - err = extractMetadataFromMap(ctx, formValues, metadata) + err = extractMetadataFromMime(ctx, textproto.MIMEHeader(formValues), metadata) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 83b5dfd58..773b2b973 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net/http" + "reflect" "strings" "time" @@ -168,14 +169,8 @@ func hasReplicationRules(ctx context.Context, bucket string, objects []ObjectToD } // isStandardHeader returns true if header is a supported header and not a custom header -func isStandardHeader(headerKey string) bool { - key := strings.ToLower(headerKey) - for _, header := range standardHeaders { - if strings.ToLower(header) == key { - return true - } - } - return false +func isStandardHeader(matchHeaderKey string) bool { + return equals(matchHeaderKey, standardHeaders...) } // returns whether object version is a deletemarker and if object qualifies for replication @@ -225,20 +220,44 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet // on target. func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectAPI ObjectLayer) { bucket := dobj.Bucket + versionID := dobj.DeleteMarkerVersionID + if versionID == "" { + versionID = dobj.VersionID + } + rcfg, err := getReplicationConfig(ctx, bucket) if err != nil || rcfg == nil { logger.LogIf(ctx, err) + sendEvent(eventArgs{ + BucketName: bucket, + Object: ObjectInfo{ + Bucket: bucket, + Name: dobj.ObjectName, + VersionID: versionID, + DeleteMarker: dobj.DeleteMarker, + }, + Host: "Internal: [Replication]", + EventName: event.ObjectReplicationNotTracked, + }) return } tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn) if tgt == nil { logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, rcfg.RoleArn)) + sendEvent(eventArgs{ + BucketName: bucket, + Object: ObjectInfo{ + Bucket: bucket, + Name: dobj.ObjectName, + VersionID: versionID, + DeleteMarker: dobj.DeleteMarker, + }, + Host: "Internal: [Replication]", + EventName: event.ObjectReplicationNotTracked, + }) return } - versionID := dobj.DeleteMarkerVersionID - if versionID == "" { - versionID = dobj.VersionID - } + rmErr := tgt.RemoveObject(ctx, rcfg.GetDestination().Bucket, dobj.ObjectName, miniogo.RemoveObjectOptions{ VersionID: versionID, Internal: miniogo.AdvancedRemoveOptions{ @@ -257,6 +276,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectA } else { versionPurgeStatus = Failed } + logger.LogIf(ctx, fmt.Errorf("Unable to replicate delete marker to %s/%s(%s): %w", rcfg.GetDestination().Bucket, dobj.ObjectName, versionID, err)) } else { if dobj.VersionID == "" { replicationStatus = string(replication.Completed) @@ -271,61 +291,78 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectA } // Update metadata on the delete marker or purge permanent delete if replication success. - objInfo, err := objectAPI.DeleteObject(ctx, bucket, dobj.ObjectName, ObjectOptions{ + dobjInfo, err := objectAPI.DeleteObject(ctx, bucket, dobj.ObjectName, ObjectOptions{ VersionID: versionID, DeleteMarker: dobj.DeleteMarker, DeleteMarkerReplicationStatus: replicationStatus, - Versioned: globalBucketVersioningSys.Enabled(bucket), VersionPurgeStatus: versionPurgeStatus, + Versioned: globalBucketVersioningSys.Enabled(bucket), VersionSuspended: globalBucketVersioningSys.Suspended(bucket), }) if err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s %s: %w", bucket, dobj.ObjectName, dobj.VersionID, err)) + logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, dobj.ObjectName, versionID, err)) + sendEvent(eventArgs{ + BucketName: bucket, + Object: ObjectInfo{ + Bucket: bucket, + Name: dobj.ObjectName, + VersionID: versionID, + DeleteMarker: dobj.DeleteMarker, + }, + Host: "Internal: [Replication]", + EventName: eventName, + }) + } else { + sendEvent(eventArgs{ + BucketName: bucket, + Object: dobjInfo, + Host: "Internal: [Replication]", + EventName: eventName, + }) } - - sendEvent(eventArgs{ - BucketName: bucket, - Object: objInfo, - Host: "Internal: [Replication]", - EventName: eventName, - }) } func getCopyObjMetadata(oi ObjectInfo, dest replication.Destination) map[string]string { meta := make(map[string]string, len(oi.UserDefined)) for k, v := range oi.UserDefined { - if k == xhttp.AmzBucketReplicationStatus { + if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) { continue } - if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) { + + if equals(k, xhttp.AmzBucketReplicationStatus) { + continue + } + + // https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w + if equals(k, xhttp.AmzMetaUnencryptedContentLength, xhttp.AmzMetaUnencryptedContentMD5) { continue } + meta[k] = v } + if oi.ContentEncoding != "" { meta[xhttp.ContentEncoding] = oi.ContentEncoding } + if oi.ContentType != "" { meta[xhttp.ContentType] = oi.ContentType } - tag, err := tags.ParseObjectTags(oi.UserTags) - if err != nil { - return nil - } - if tag != nil { - meta[xhttp.AmzObjectTagging] = tag.String() + + if oi.UserTags != "" { + meta[xhttp.AmzObjectTagging] = oi.UserTags meta[xhttp.AmzTagDirective] = "REPLACE" } + sc := dest.StorageClass if sc == "" { sc = oi.StorageClass } - meta[xhttp.AmzStorageClass] = sc - if oi.UserTags != "" { - meta[xhttp.AmzObjectTagging] = oi.UserTags + if sc != "" { + meta[xhttp.AmzStorageClass] = sc } - meta[xhttp.MinIOSourceMTime] = oi.ModTime.Format(time.RFC3339Nano) meta[xhttp.MinIOSourceETag] = oi.ETag + meta[xhttp.MinIOSourceMTime] = oi.ModTime.Format(time.RFC3339Nano) meta[xhttp.AmzBucketReplicationStatus] = replication.Replica.String() return meta } @@ -341,17 +378,13 @@ func putReplicationOpts(ctx context.Context, dest replication.Destination, objIn } meta[k] = v } - tag, err := tags.ParseObjectTags(objInfo.UserTags) - if err != nil { - return - } + sc := dest.StorageClass if sc == "" { sc = objInfo.StorageClass } putOpts = miniogo.PutObjectOptions{ UserMetadata: meta, - UserTags: tag.ToMap(), ContentType: objInfo.ContentType, ContentEncoding: objInfo.ContentEncoding, StorageClass: sc, @@ -362,6 +395,12 @@ func putReplicationOpts(ctx context.Context, dest replication.Destination, objIn SourceETag: objInfo.ETag, }, } + if objInfo.UserTags != "" { + tag, _ := tags.ParseObjectTags(objInfo.UserTags) + if tag != nil { + putOpts.UserTags = tag.ToMap() + } + } if lang, ok := objInfo.UserDefined[xhttp.ContentLanguage]; ok { putOpts.ContentLanguage = lang } @@ -400,6 +439,16 @@ const ( replicateAll replicationAction = "all" ) +// matches k1 with all keys, returns 'true' if one of them matches +func equals(k1 string, keys ...string) bool { + for _, k2 := range keys { + if strings.ToLower(k1) == strings.ToLower(k2) { + return true + } + } + return false +} + // returns replicationAction by comparing metadata between source and target func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationAction { // needs full replication @@ -407,83 +456,153 @@ func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationActio oi1.VersionID != oi2.VersionID || oi1.Size != oi2.Size || oi1.DeleteMarker != oi2.IsDeleteMarker || - !oi1.ModTime.Equal(oi2.LastModified) { + oi1.ModTime.Unix() != oi2.LastModified.Unix() { return replicateAll } + if oi1.ContentType != oi2.ContentType { return replicateMetadata } + if oi1.ContentEncoding != "" { enc, ok := oi2.Metadata[xhttp.ContentEncoding] - if !ok || strings.Join(enc, "") != oi1.ContentEncoding { + if !ok { + enc, ok = oi2.Metadata[strings.ToLower(xhttp.ContentEncoding)] + if !ok { + return replicateMetadata + } + } + if strings.Join(enc, ",") != oi1.ContentEncoding { return replicateMetadata } } - // compare metadata on both maps to see if meta is identical - for k1, v1 := range oi1.UserDefined { - if v2, ok := oi2.UserMetadata[k1]; ok && v1 == v2 { - continue - } - if v2, ok := oi2.Metadata[k1]; ok && v1 == strings.Join(v2, "") { - continue - } + + t, _ := tags.ParseObjectTags(oi1.UserTags) + if !reflect.DeepEqual(oi2.UserTags, t.ToMap()) { return replicateMetadata } - for k1, v1 := range oi2.UserMetadata { - if v2, ok := oi1.UserDefined[k1]; !ok || v1 != v2 { - return replicateMetadata + + // Compare only necessary headers + compareKeys := []string{ + "Expires", + "Cache-Control", + "Content-Language", + "Content-Disposition", + "X-Amz-Object-Lock-Mode", + "X-Amz-Object-Lock-Retain-Until-Date", + "X-Amz-Object-Lock-Legal-Hold", + "X-Amz-Website-Redirect-Location", + "X-Amz-Meta-", + } + + // compare metadata on both maps to see if meta is identical + compareMeta1 := make(map[string]string) + for k, v := range oi1.UserDefined { + var found bool + for _, prefix := range compareKeys { + if !strings.HasPrefix(strings.ToLower(k), strings.ToLower(prefix)) { + continue + } + found = true + break + } + if found { + compareMeta1[strings.ToLower(k)] = v } } - for k1, v1slc := range oi2.Metadata { - v1 := strings.Join(v1slc, "") - if k1 == xhttp.ContentEncoding { //already compared - continue + + compareMeta2 := make(map[string]string) + for k, v := range oi2.Metadata { + var found bool + for _, prefix := range compareKeys { + if !strings.HasPrefix(strings.ToLower(k), strings.ToLower(prefix)) { + continue + } + found = true + break } - if v2, ok := oi1.UserDefined[k1]; !ok || v1 != v2 { - return replicateMetadata + if found { + compareMeta2[strings.ToLower(k)] = strings.Join(v, ",") } } - t, _ := tags.MapToObjectTags(oi2.UserTags) - if t.String() != oi1.UserTags { + + if !reflect.DeepEqual(compareMeta1, compareMeta2) { return replicateMetadata } + return replicateNone } // replicateObject replicates the specified version of the object to destination bucket // The source object is then updated to reflect the replication status. func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLayer) { + z, ok := objectAPI.(*erasureServerPools) + if !ok { + return + } + bucket := objInfo.Bucket object := objInfo.Name cfg, err := getReplicationConfig(ctx, bucket) if err != nil { logger.LogIf(ctx, err) + sendEvent(eventArgs{ + EventName: event.ObjectReplicationNotTracked, + BucketName: bucket, + Object: objInfo, + Host: "Internal: [Replication]", + }) return } tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, cfg.RoleArn) if tgt == nil { logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, cfg.RoleArn)) + sendEvent(eventArgs{ + EventName: event.ObjectReplicationNotTracked, + BucketName: bucket, + Object: objInfo, + Host: "Internal: [Replication]", + }) return } gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{ VersionID: objInfo.VersionID, }) if err != nil { + sendEvent(eventArgs{ + EventName: event.ObjectReplicationNotTracked, + BucketName: bucket, + Object: objInfo, + Host: "Internal: [Replication]", + }) logger.LogIf(ctx, err) return } + defer gr.Close() // hold read lock for entire transaction + objInfo = gr.ObjInfo size, err := objInfo.GetActualSize() if err != nil { logger.LogIf(ctx, err) - gr.Close() + sendEvent(eventArgs{ + EventName: event.ObjectReplicationNotTracked, + BucketName: bucket, + Object: objInfo, + Host: "Internal: [Replication]", + }) return } dest := cfg.GetDestination() if dest.Bucket == "" { - gr.Close() + logger.LogIf(ctx, fmt.Errorf("Unable to replicate object %s(%s), bucket is empty", objInfo.Name, objInfo.VersionID)) + sendEvent(eventArgs{ + EventName: event.ObjectReplicationNotTracked, + BucketName: bucket, + Object: objInfo, + Host: "Internal: [Replication]", + }) return } @@ -496,7 +615,6 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa if err == nil { rtype = getReplicationAction(objInfo, oi) if rtype == replicateNone { - gr.Close() // object with same VersionID already exists, replication kicked off by // PutObject might have completed. return @@ -504,19 +622,23 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa } replicationStatus := replication.Completed if rtype != replicateAll { - gr.Close() // replicate metadata for object tagging/copy with metadata replacement dstOpts := miniogo.PutObjectOptions{Internal: miniogo.AdvancedPutOptions{SourceVersionID: objInfo.VersionID}} c := &miniogo.Core{Client: tgt.Client} - _, err = c.CopyObject(ctx, dest.Bucket, object, dest.Bucket, object, getCopyObjMetadata(objInfo, dest), dstOpts) - if err != nil { + if _, err = c.CopyObject(ctx, dest.Bucket, object, dest.Bucket, object, getCopyObjMetadata(objInfo, dest), dstOpts); err != nil { replicationStatus = replication.Failed + logger.LogIf(ctx, fmt.Errorf("Unable to replicate metadata for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err)) } } else { target, err := globalBucketMetadataSys.GetBucketTarget(bucket, cfg.RoleArn) if err != nil { logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s cfg:%s err:%s", bucket, cfg.RoleArn, err)) - gr.Close() + sendEvent(eventArgs{ + EventName: event.ObjectReplicationNotTracked, + BucketName: bucket, + Object: objInfo, + Host: "Internal: [Replication]", + }) return } @@ -535,11 +657,11 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa // r takes over closing gr. r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, objInfo.Bucket, objInfo.Name, gr, headerSize, b, target.BandwidthLimit) - _, err = tgt.PutObject(ctx, dest.Bucket, object, r, size, putOpts) - if err != nil { + if _, err = tgt.PutObject(ctx, dest.Bucket, object, r, size, putOpts); err != nil { replicationStatus = replication.Failed + logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err)) } - r.Close() + defer r.Close() } objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String() @@ -548,7 +670,6 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa } // FIXME: add support for missing replication events - // - event.ObjectReplicationNotTracked // - event.ObjectReplicationMissedThreshold // - event.ObjectReplicationReplicatedAfterThreshold var eventName = event.ObjectReplicationComplete @@ -556,16 +677,19 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa eventName = event.ObjectReplicationFailed } - objInfo.metadataOnly = true // Perform only metadata updates. - objInfo, err = objectAPI.CopyObject(ctx, bucket, object, bucket, object, objInfo, ObjectOptions{ + // This lower level implementation is necessary to avoid write locks from CopyObject. + poolIdx, err := z.getPoolIdx(ctx, bucket, object, ObjectOptions{ VersionID: objInfo.VersionID, - }, ObjectOptions{ - VersionID: objInfo.VersionID, - }) + }, objInfo.Size) if err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s: %s", objInfo.VersionID, err)) + logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) + } else { + if err = z.serverPools[poolIdx].getHashedSet(object).updateObjectMeta(ctx, bucket, object, objInfo.UserDefined, ObjectOptions{ + VersionID: objInfo.VersionID, + }); err != nil { + logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) + } } - sendEvent(eventArgs{ EventName: eventName, BucketName: bucket, @@ -703,8 +827,11 @@ func proxyGetToReplicationTarget(ctx context.Context, bucket, object string, rs return nil, false } } + // Make sure to match ETag when proxying. + if err = gopts.SetMatchETag(oi.ETag); err != nil { + return nil, false + } c := miniogo.Core{Client: tgt.Client} - obj, _, _, err := c.GetObject(ctx, bucket, object, gopts) if err != nil { return nil, false @@ -770,6 +897,7 @@ func proxyHeadToRepTarget(ctx context.Context, bucket, object string, opts Objec if err != nil { return nil, oi, false, err } + tags, _ := tags.MapToObjectTags(objInfo.UserTags) oi = ObjectInfo{ Bucket: bucket, @@ -784,12 +912,17 @@ func proxyHeadToRepTarget(ctx context.Context, bucket, object string, opts Objec Expires: objInfo.Expires, StorageClass: objInfo.StorageClass, ReplicationStatus: replication.StatusType(objInfo.ReplicationStatus), - UserDefined: cloneMSS(objInfo.UserMetadata), UserTags: tags.String(), } - if ce, ok := oi.UserDefined[xhttp.ContentEncoding]; ok { + for k, v := range objInfo.Metadata { + oi.UserDefined[k] = v[0] + } + ce, ok := oi.UserDefined[xhttp.ContentEncoding] + if !ok { + ce, ok = oi.UserDefined[strings.ToLower(xhttp.ContentEncoding)] + } + if ok { oi.ContentEncoding = ce - delete(oi.UserDefined, xhttp.ContentEncoding) } return tgt, oi, true, nil } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 27091f2bb..5fddb43fc 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -162,14 +162,6 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true) if err != nil { - if isProxyable(ctx, bucket) && (errors.Is(err, errFileNotFound) || errors.Is(err, errFileVersionNotFound)) { - // proxy to replication target if active-active replication is in place. - reader, proxy := proxyGetToReplicationTarget(ctx, bucket, object, rs, h, opts) - if reader == nil || !proxy { - return nil, toObjectErr(err, bucket, object) - } - return reader, nil - } return nil, toObjectErr(err, bucket, object) } @@ -454,13 +446,6 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { fi, _, _, err := er.getObjectFileInfo(ctx, bucket, object, opts, false) if err != nil { - // proxy HEAD to replication target if active-active replication configured on bucket - if isProxyable(ctx, bucket) && (errors.Is(err, errFileNotFound) || errors.Is(err, errFileVersionNotFound)) { - oi, proxy, err := proxyHeadToReplicationTarget(ctx, bucket, object, opts) - if proxy { - return oi, err - } - } return objInfo, toObjectErr(err, bucket, object) } diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 5145e98c4..32f919afa 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -259,13 +259,15 @@ func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object stri for i, pool := range z.serverPools { objInfo, err := pool.GetObjectInfo(ctx, bucket, object, opts) switch err.(type) { + case VersionNotFound: + // VersionId not found, versionId was specified case ObjectNotFound: // VersionId was not specified but found delete marker or no versions exist. case MethodNotAllowed: // VersionId was specified but found delete marker default: + // All other unhandled errors return right here. if err != nil { - // any other un-handled errors return right here. return -1, err } } @@ -531,6 +533,13 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object } return gr, nil } + if isProxyable(ctx, bucket) { + // proxy to replication target if active-active replication is in place. + reader, proxy := proxyGetToReplicationTarget(ctx, bucket, object, rs, h, opts) + if reader != nil && proxy { + return reader, nil + } + } if opts.VersionID != "" { return gr, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID} } @@ -576,6 +585,13 @@ func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object s return objInfo, nil } object = decodeDirObject(object) + // proxy HEAD to replication target if active-active replication configured on bucket + if isProxyable(ctx, bucket) { + oi, proxy, err := proxyHeadToReplicationTarget(ctx, bucket, object, opts) + if proxy { + return oi, err + } + } if opts.VersionID != "" { return objInfo, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID} } diff --git a/cmd/gateway/s3/gateway-s3-sse.go b/cmd/gateway/s3/gateway-s3-sse.go index 096610c66..b46f01db4 100644 --- a/cmd/gateway/s3/gateway-s3-sse.go +++ b/cmd/gateway/s3/gateway-s3-sse.go @@ -365,7 +365,7 @@ func (l *s3EncObjects) GetObjectInfo(ctx context.Context, bucket string, object // CopyObject copies an object from source bucket to a destination bucket. func (l *s3EncObjects) CopyObject(ctx context.Context, srcBucket string, srcObject string, dstBucket string, dstObject string, srcInfo minio.ObjectInfo, s, d minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) { - cpSrcDstSame := strings.EqualFold(path.Join(srcBucket, srcObject), path.Join(dstBucket, dstObject)) + cpSrcDstSame := path.Join(srcBucket, srcObject) == path.Join(dstBucket, dstObject) if cpSrcDstSame { var gwMeta gwMetaV1 if s.ServerSideEncryption != nil && d.ServerSideEncryption != nil && diff --git a/cmd/handler-utils.go b/cmd/handler-utils.go index a5695670d..cbee6d919 100644 --- a/cmd/handler-utils.go +++ b/cmd/handler-utils.go @@ -26,6 +26,7 @@ import ( "mime/multipart" "net" "net/http" + "net/textproto" "net/url" "regexp" "strings" @@ -74,6 +75,7 @@ var supportedHeaders = []string{ "content-language", "content-encoding", "content-disposition", + "x-amz-storage-class", xhttp.AmzStorageClass, xhttp.AmzObjectTagging, "expires", @@ -103,8 +105,6 @@ func isDirectiveReplace(value string) bool { // All values stored with a key starting with one of the following prefixes // must be extracted from the header. var userMetadataKeyPrefixes = []string{ - "X-Amz-Meta-", - "X-Minio-Meta-", "x-amz-meta-", "x-minio-meta-", } @@ -115,13 +115,13 @@ func extractMetadata(ctx context.Context, r *http.Request) (metadata map[string] header := r.Header metadata = make(map[string]string) // Extract all query values. - err = extractMetadataFromMap(ctx, query, metadata) + err = extractMetadataFromMime(ctx, textproto.MIMEHeader(query), metadata) if err != nil { return nil, err } // Extract all header values. - err = extractMetadataFromMap(ctx, header, metadata) + err = extractMetadataFromMime(ctx, textproto.MIMEHeader(header), metadata) if err != nil { return nil, err } @@ -133,7 +133,7 @@ func extractMetadata(ctx context.Context, r *http.Request) (metadata map[string] // https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w for k := range metadata { - if strings.EqualFold(k, xhttp.AmzMetaUnencryptedContentLength) || strings.EqualFold(k, xhttp.AmzMetaUnencryptedContentMD5) { + if equals(k, xhttp.AmzMetaUnencryptedContentLength, xhttp.AmzMetaUnencryptedContentMD5) { delete(metadata, k) } } @@ -161,25 +161,32 @@ func extractMetadata(ctx context.Context, r *http.Request) (metadata map[string] } // extractMetadata extracts metadata from map values. -func extractMetadataFromMap(ctx context.Context, v map[string][]string, m map[string]string) error { +func extractMetadataFromMime(ctx context.Context, v textproto.MIMEHeader, m map[string]string) error { if v == nil { logger.LogIf(ctx, errInvalidArgument) return errInvalidArgument } + + nv := make(textproto.MIMEHeader, len(v)) + for k, kv := range v { + // Canonicalize all headers, to remove any duplicates. + nv[http.CanonicalHeaderKey(k)] = kv + } + // Save all supported headers. for _, supportedHeader := range supportedHeaders { - if value, ok := v[http.CanonicalHeaderKey(supportedHeader)]; ok { - m[supportedHeader] = value[0] - } else if value, ok := v[supportedHeader]; ok { - m[supportedHeader] = value[0] + value, ok := nv[http.CanonicalHeaderKey(supportedHeader)] + if ok { + m[supportedHeader] = strings.Join(value, ",") } } + for key := range v { for _, prefix := range userMetadataKeyPrefixes { if !strings.HasPrefix(strings.ToLower(key), strings.ToLower(prefix)) { continue } - value, ok := v[key] + value, ok := nv[http.CanonicalHeaderKey(key)] if ok { m[key] = strings.Join(value, ",") break diff --git a/cmd/handler-utils_test.go b/cmd/handler-utils_test.go index 505e03000..73547f031 100644 --- a/cmd/handler-utils_test.go +++ b/cmd/handler-utils_test.go @@ -22,6 +22,7 @@ import ( "encoding/xml" "io/ioutil" "net/http" + "net/textproto" "os" "reflect" "strings" @@ -197,7 +198,7 @@ func TestExtractMetadataHeaders(t *testing.T) { // Validate if the extracting headers. for i, testCase := range testCases { metadata := make(map[string]string) - err := extractMetadataFromMap(context.Background(), testCase.header, metadata) + err := extractMetadataFromMime(context.Background(), textproto.MIMEHeader(testCase.header), metadata) if err != nil && !testCase.shouldFail { t.Fatalf("Test %d failed to extract metadata: %v", i+1, err) } diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go index 86c091d11..a7b4931b8 100644 --- a/cmd/object-api-datatypes.go +++ b/cmd/object-api-datatypes.go @@ -244,6 +244,51 @@ type ObjectInfo struct { SuccessorModTime time.Time } +// Clone - Returns a cloned copy of current objectInfo +func (o ObjectInfo) Clone() (cinfo ObjectInfo) { + cinfo = ObjectInfo{ + Bucket: o.Bucket, + Name: o.Name, + ModTime: o.ModTime, + Size: o.Size, + IsDir: o.IsDir, + ETag: o.ETag, + InnerETag: o.InnerETag, + VersionID: o.VersionID, + IsLatest: o.IsLatest, + DeleteMarker: o.DeleteMarker, + TransitionStatus: o.TransitionStatus, + RestoreExpires: o.RestoreExpires, + RestoreOngoing: o.RestoreOngoing, + ContentType: o.ContentType, + ContentEncoding: o.ContentEncoding, + Expires: o.Expires, + CacheStatus: o.CacheStatus, + CacheLookupStatus: o.CacheLookupStatus, + StorageClass: o.StorageClass, + ReplicationStatus: o.ReplicationStatus, + UserTags: o.UserTags, + Parts: o.Parts, + Writer: o.Writer, + Reader: o.Reader, + PutObjReader: o.PutObjReader, + metadataOnly: o.metadataOnly, + versionOnly: o.versionOnly, + keyRotation: o.keyRotation, + backendType: o.backendType, + AccTime: o.AccTime, + Legacy: o.Legacy, + VersionPurgeStatus: o.VersionPurgeStatus, + NumVersions: o.NumVersions, + SuccessorModTime: o.SuccessorModTime, + } + cinfo.UserDefined = make(map[string]string, len(o.UserDefined)) + for k, v := range o.UserDefined { + cinfo.UserDefined[k] = v + } + return cinfo +} + // MultipartInfo captures metadata information about the uploadId // this data structure is used primarily for some internal purposes // for verifying upload type such as was the upload diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index fc8a8a4a2..a83e32942 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -1284,7 +1284,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re response := generateCopyObjectResponse(objInfo.ETag, objInfo.ModTime) encodedSuccessResponse := encodeResponse(response) if replicate, sync := mustReplicate(ctx, r, dstBucket, dstObject, objInfo.UserDefined, objInfo.ReplicationStatus.String()); replicate { - scheduleReplication(ctx, objInfo, objectAPI, sync) + scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync) } setPutObjHeaders(w, objInfo, false) @@ -1598,7 +1598,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req } } if replicate, sync := mustReplicate(ctx, r, bucket, object, metadata, ""); replicate { - scheduleReplication(ctx, objInfo, objectAPI, sync) + scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync) } setPutObjHeaders(w, objInfo, false) @@ -2677,7 +2677,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite setPutObjHeaders(w, objInfo, false) if replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String()); replicate { - scheduleReplication(ctx, objInfo, objectAPI, sync) + scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync) } // Write success response. @@ -2930,10 +2930,11 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r return } if replicate { - scheduleReplication(ctx, objInfo, objectAPI, sync) + scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync) } writeSuccessResponseHeadersOnly(w) - // Notify object event. + + // Notify object event. sendEvent(eventArgs{ EventName: event.ObjectCreatedPutLegalHold, BucketName: bucket, @@ -3102,7 +3103,7 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r return } if replicate { - scheduleReplication(ctx, objInfo, objectAPI, sync) + scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync) } writeSuccessNoContent(w) @@ -3285,7 +3286,7 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h } if replicate { - scheduleReplication(ctx, objInfo, objAPI, sync) + scheduleReplication(ctx, objInfo.Clone(), objAPI, sync) } if objInfo.VersionID != "" { @@ -3340,6 +3341,7 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } + oi, err := objAPI.GetObjectInfo(ctx, bucket, object, opts) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) @@ -3351,14 +3353,14 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r opts.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } - if _, err = objAPI.DeleteObjectTags(ctx, bucket, object, opts); err != nil { + oi, err = objAPI.DeleteObjectTags(ctx, bucket, object, opts) + if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } - oi.UserTags = "" if replicate { - scheduleReplication(ctx, oi, objAPI, sync) + scheduleReplication(ctx, oi.Clone(), objAPI, sync) } if oi.VersionID != "" { diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index adee040ca..f980c4205 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -1298,7 +1298,7 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { } } if mustReplicate { - scheduleReplication(ctx, objInfo, objectAPI, sync) + scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync) } // Notify object created event. diff --git a/cmd/xl-storage-format-v2.go b/cmd/xl-storage-format-v2.go index 0026d7c3c..e5ad9e155 100644 --- a/cmd/xl-storage-format-v2.go +++ b/cmd/xl-storage-format-v2.go @@ -363,10 +363,10 @@ func (j xlMetaV2DeleteMarker) ToFileInfo(volume, path string) (FileInfo, error) Deleted: true, } for k, v := range j.MetaSys { - if strings.EqualFold(k, xhttp.AmzBucketReplicationStatus) { + switch { + case equals(k, xhttp.AmzBucketReplicationStatus): fi.DeleteMarkerReplicationStatus = string(v) - } - if strings.EqualFold(k, VersionPurgeStatusKey) { + case equals(k, VersionPurgeStatusKey): fi.VersionPurgeStatus = VersionPurgeStatusType(string(v)) } } @@ -408,20 +408,19 @@ func (j xlMetaV2Object) ToFileInfo(volume, path string) (FileInfo, error) { fi.Metadata = make(map[string]string, len(j.MetaUser)+len(j.MetaSys)) for k, v := range j.MetaUser { // https://github.com/google/security-research/security/advisories/GHSA-76wf-9vgp-pj7w - if strings.EqualFold(k, xhttp.AmzMetaUnencryptedContentLength) || strings.EqualFold(k, xhttp.AmzMetaUnencryptedContentMD5) { + if equals(k, xhttp.AmzMetaUnencryptedContentLength, xhttp.AmzMetaUnencryptedContentMD5) { continue } fi.Metadata[k] = v } for k, v := range j.MetaSys { - if strings.EqualFold(strings.ToLower(k), ReservedMetadataPrefixLower+"transition-status") { + switch { + case equals(k, ReservedMetadataPrefixLower+"transition-status"): fi.TransitionStatus = string(v) - } - if strings.EqualFold(k, VersionPurgeStatusKey) { + case equals(k, VersionPurgeStatusKey): fi.VersionPurgeStatus = VersionPurgeStatusType(string(v)) - } - if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) { + case strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower): fi.Metadata[k] = string(v) } } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 299d43cec..063391364 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -386,7 +386,7 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac oi: oi, bitRotScan: healOpts.Bitrot, }) - item.healReplication(ctx, objAPI, oi, &sizeS) + item.healReplication(ctx, objAPI, oi.Clone(), &sizeS) } } sizeS.totalSize = totalSize