From cdb0e6ffed344ee15da5a80fa6f9d722fbb14e77 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 3 Jul 2020 19:27:13 -0700 Subject: [PATCH] support proper values for listMultipartUploads/listParts (#9970) object KMS is configured with auto-encryption, there were issues when using docker registry - this has been left unnoticed for a while. This PR fixes an issue with compatibility. Additionally also fix the continuation-token implementation infinite loop issue which was missed as part of #9939 Also fix the heal token to be generated as a client facing value instead of what is remembered by the server, this allows for the server to be stateless regarding the token's behavior. --- cmd/admin-handlers.go | 26 +++++----- cmd/admin-heal-ops.go | 17 ++++-- cmd/bucket-listobjects-handlers.go | 49 +++++++++++------- cmd/erasure-multipart.go | 83 ++++++++++++++++++++++++++---- cmd/object-handlers.go | 19 ++++--- 5 files changed, 143 insertions(+), 51 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 559d1990a..a5e9dcf46 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -652,19 +652,15 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) { return } - if globalIsDistErasure { - // Analyze the heal token and route the request accordingly - _, nodeIndex, parsed := parseRequestToken(hip.clientToken) - if parsed { - if proxyRequestByNodeIndex(ctx, w, r, nodeIndex) { - return - } - } else { - apiErr := errorCodes.ToAPIErr(ErrHealInvalidClientToken) - writeErrorResponseJSON(ctx, w, apiErr, r.URL) - return - } + // Analyze the heal token and route the request accordingly + token, success := proxyRequestByToken(ctx, w, r, hip.clientToken) + if success { + return } + hip.clientToken = token + // if request was not successful, try this server locally if token + // is not found the call will fail anyways. if token is empty + // try this server to generate a new token. type healResp struct { respBytes []byte @@ -736,8 +732,12 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) { if hip.clientToken == "" && !hip.forceStart && !hip.forceStop { nh, exists := globalAllHealState.getHealSequence(healPath) if exists && !nh.hasEnded() && len(nh.currentStatus.Items) > 0 { + clientToken := nh.clientToken + if globalIsDistErasure { + clientToken = fmt.Sprintf("%s@%d", nh.clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints)) + } b, err := json.Marshal(madmin.HealStartSuccess{ - ClientToken: nh.clientToken, + ClientToken: clientToken, ClientAddress: nh.clientAddress, StartTime: nh.startTime, }) diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index 9b234967e..16d86de05 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -155,8 +155,13 @@ func (ahs *allHealState) stopHealSequence(path string) ([]byte, APIError) { StartTime: UTCNow(), } } else { + clientToken := he.clientToken + if globalIsDistErasure { + clientToken = fmt.Sprintf("%s@%d", he.clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints)) + } + hsp = madmin.HealStopSuccess{ - ClientToken: he.clientToken, + ClientToken: clientToken, ClientAddress: he.clientAddress, StartTime: he.startTime, } @@ -232,8 +237,13 @@ func (ahs *allHealState) LaunchNewHealSequence(h *healSequence) ( // Launch top-level background heal go-routine go h.healSequenceStart() + clientToken := h.clientToken + if globalIsDistErasure { + clientToken = fmt.Sprintf("%s@%d", h.clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints)) + } + b, err := json.Marshal(madmin.HealStartSuccess{ - ClientToken: h.clientToken, + ClientToken: clientToken, ClientAddress: h.clientAddress, StartTime: h.startTime, }) @@ -371,9 +381,6 @@ func newHealSequence(ctx context.Context, bucket, objPrefix, clientAddr string, ctx, cancel := context.WithCancel(logger.SetReqInfo(ctx, reqInfo)) clientToken := mustGetUUID() - if globalIsDistErasure { - clientToken = fmt.Sprintf("%s@%d", clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints)) - } return &healSequence{ respCh: make(chan healResult), diff --git a/cmd/bucket-listobjects-handlers.go b/cmd/bucket-listobjects-handlers.go index 22c6e56d7..2c50898b4 100644 --- a/cmd/bucket-listobjects-handlers.go +++ b/cmd/bucket-listobjects-handlers.go @@ -161,12 +161,10 @@ func (api objectAPIHandlers) ListObjectsV2MHandler(w http.ResponseWriter, r *htt } // Analyze continuation token and route the request accordingly - subToken, nodeIndex, parsed := parseRequestToken(token) - if parsed { - if proxyRequestByNodeIndex(ctx, w, r, nodeIndex) { - return - } - token = subToken + var success bool + token, success = proxyRequestByToken(ctx, w, r, token) + if success { + return } listObjectsV2 := objectAPI.ListObjectsV2 @@ -192,7 +190,10 @@ func (api objectAPIHandlers) ListObjectsV2MHandler(w http.ResponseWriter, r *htt } // The next continuation token has id@node_index format to optimize paginated listing - nextContinuationToken := fmt.Sprintf("%s@%d", listObjectsV2Info.NextContinuationToken, getLocalNodeIndex()) + nextContinuationToken := listObjectsV2Info.NextContinuationToken + if nextContinuationToken != "" && listObjectsV2Info.IsTruncated { + nextContinuationToken = fmt.Sprintf("%s@%d", listObjectsV2Info.NextContinuationToken, getLocalNodeIndex()) + } response := generateListObjectsV2Response(bucket, prefix, token, nextContinuationToken, startAfter, delimiter, encodingType, fetchOwner, listObjectsV2Info.IsTruncated, @@ -246,12 +247,10 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http } // Analyze continuation token and route the request accordingly - subToken, nodeIndex, parsed := parseRequestToken(token) - if parsed { - if proxyRequestByNodeIndex(ctx, w, r, nodeIndex) { - return - } - token = subToken + var success bool + token, success = proxyRequestByToken(ctx, w, r, token) + if success { + return } listObjectsV2 := objectAPI.ListObjectsV2 @@ -277,7 +276,10 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http } // The next continuation token has id@node_index format to optimize paginated listing - nextContinuationToken := fmt.Sprintf("%s@%d", listObjectsV2Info.NextContinuationToken, getLocalNodeIndex()) + nextContinuationToken := listObjectsV2Info.NextContinuationToken + if nextContinuationToken != "" && listObjectsV2Info.IsTruncated { + nextContinuationToken = fmt.Sprintf("%s@%d", listObjectsV2Info.NextContinuationToken, getLocalNodeIndex()) + } response := generateListObjectsV2Response(bucket, prefix, token, nextContinuationToken, startAfter, delimiter, encodingType, fetchOwner, listObjectsV2Info.IsTruncated, @@ -299,17 +301,28 @@ func getLocalNodeIndex() int { return -1 } -func parseRequestToken(token string) (subToken string, nodeIndex int, success bool) { +func parseRequestToken(token string) (subToken string, nodeIndex int) { + if token == "" { + return token, -1 + } i := strings.Index(token, "@") if i < 0 { - return "", -1, false + return token, -1 } nodeIndex, err := strconv.Atoi(token[i+1:]) if err != nil { - return "", -1, false + return token, -1 } subToken = token[:i] - return subToken, nodeIndex, true + return subToken, nodeIndex +} + +func proxyRequestByToken(ctx context.Context, w http.ResponseWriter, r *http.Request, token string) (string, bool) { + subToken, nodeIndex := parseRequestToken(token) + if nodeIndex > 0 { + return subToken, proxyRequestByNodeIndex(ctx, w, r, nodeIndex) + } + return subToken, false } func proxyRequestByNodeIndex(ctx context.Context, w http.ResponseWriter, r *http.Request, index int) (success bool) { diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 299af2d2b..461e875a9 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -25,6 +25,7 @@ import ( "strconv" "strings" + "github.com/minio/minio-go/v6/pkg/set" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/mimedb" @@ -75,37 +76,101 @@ func (er erasureObjects) removeObjectPart(bucket, object, uploadID, dataDir stri // not support prefix based listing, this is a deliberate attempt // towards simplification of multipart APIs. // The resulting ListMultipartsInfo structure is unmarshalled directly as XML. -func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, e error) { +func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) { result.MaxUploads = maxUploads result.KeyMarker = keyMarker result.Prefix = object result.Delimiter = delimiter + var uploadIDs []string for _, disk := range er.getLoadBalancedDisks() { if disk == nil { continue } - uploadIDs, err := disk.ListDir(minioMetaMultipartBucket, er.getMultipartSHADir(bucket, object), -1) + uploadIDs, err = disk.ListDir(minioMetaMultipartBucket, er.getMultipartSHADir(bucket, object), -1) if err != nil { + if err == errDiskNotFound { + continue + } if err == errFileNotFound { return result, nil } logger.LogIf(ctx, err) - return result, err + return result, toObjectErr(err, bucket, object) } - for i := range uploadIDs { - uploadIDs[i] = strings.TrimSuffix(uploadIDs[i], SlashSeparator) + break + } + + for i := range uploadIDs { + uploadIDs[i] = strings.TrimSuffix(uploadIDs[i], SlashSeparator) + } + + // S3 spec says uploadIDs should be sorted based on initiated time, we need + // to read the metadata entry. + var uploads []MultipartInfo + + populatedUploadIds := set.NewStringSet() + +retry: + for _, disk := range er.getLoadBalancedDisks() { + if disk == nil { + continue } - sort.Strings(uploadIDs) for _, uploadID := range uploadIDs { - if len(result.Uploads) == maxUploads { - break + if populatedUploadIds.Contains(uploadID) { + continue + } + fi, err := disk.ReadVersion(minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "") + if err != nil { + if err == errDiskNotFound || err == errFileNotFound { + goto retry + } + return result, toObjectErr(err, bucket, object) } - result.Uploads = append(result.Uploads, MultipartInfo{Object: object, UploadID: uploadID}) + populatedUploadIds.Add(uploadID) + uploads = append(uploads, MultipartInfo{ + Object: object, + UploadID: uploadID, + Initiated: fi.ModTime, + }) } break } + sort.Slice(uploads, func(i int, j int) bool { + return uploads[i].Initiated.Before(uploads[j].Initiated) + }) + + uploadIndex := 0 + if uploadIDMarker != "" { + for uploadIndex < len(uploads) { + if uploads[uploadIndex].UploadID != uploadIDMarker { + uploadIndex++ + continue + } + if uploads[uploadIndex].UploadID == uploadIDMarker { + uploadIndex++ + break + } + uploadIndex++ + } + } + for uploadIndex < len(uploads) { + result.Uploads = append(result.Uploads, uploads[uploadIndex]) + result.NextUploadIDMarker = uploads[uploadIndex].UploadID + uploadIndex++ + if len(result.Uploads) == maxUploads { + break + } + } + + result.IsTruncated = uploadIndex < len(uploads) + + if !result.IsTruncated { + result.NextKeyMarker = "" + result.NextUploadIDMarker = "" + } + return result, nil } diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index b9538300f..fd6f4e48f 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -2326,13 +2326,20 @@ func (api objectAPIHandlers) ListObjectPartsHandler(w http.ResponseWriter, r *ht return } } - parts := make([]PartInfo, len(listPartsInfo.Parts)) - for i, p := range listPartsInfo.Parts { - part := p - part.ETag = tryDecryptETag(objectEncryptionKey, p.ETag, ssec) - parts[i] = part + for i := range listPartsInfo.Parts { + curp := listPartsInfo.Parts[i] + curp.ETag = tryDecryptETag(objectEncryptionKey, curp.ETag, ssec) + if !ssec { + var partSize uint64 + partSize, err = sio.DecryptedSize(uint64(curp.Size)) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + curp.Size = int64(partSize) + } + listPartsInfo.Parts[i] = curp } - listPartsInfo.Parts = parts } response := generateListPartsResponse(listPartsInfo, encodingType)