From b330c2c57ee3cfcdb83d39cff933cf9bd4f647f1 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 28 May 2020 12:36:20 -0700 Subject: [PATCH] Introduce simpler GetMultipartInfo call for performance (#9722) Advantages avoids 100's of stats which are needed for each upload operation in FS/NAS gateway mode when uploading a large multipart object, dramatically increases performance for multipart uploads by avoiding recursive calls. For other gateway's simplifies the approach since azure, gcs, hdfs gateway's don't capture any specific metadata during upload which needs handler validation for encryption/compression. Erasure coding was already optimized, additionally just avoids small allocations of large data structure. Fixes #7206 --- cmd/fs-v1-multipart.go | 44 ++++++++++++++++++ cmd/gateway-common.go | 1 - cmd/gateway-unsupported.go | 6 +++ cmd/gateway/azure/gateway-azure.go | 12 +++++ cmd/gateway/gcs/gateway-gcs.go | 8 ++++ cmd/gateway/hdfs/gateway-hdfs.go | 17 +++++++ cmd/gateway/s3/gateway-s3-sse.go | 15 ++++++ cmd/gateway/s3/gateway-s3.go | 30 +++++++++++- cmd/object-api-datatypes.go | 39 +++++++++------- cmd/object-api-interface.go | 1 + cmd/object-handlers.go | 66 ++++++++++++-------------- cmd/xl-sets.go | 5 ++ cmd/xl-v1-multipart.go | 45 ++++++++++++++++++ cmd/xl-zones.go | 74 ++++++++++++++++++++++++------ 14 files changed, 292 insertions(+), 71 deletions(-) diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index f04b2364a..467b9afb1 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -350,6 +350,50 @@ func (fs *FSObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID }, nil } +// GetMultipartInfo returns multipart metadata uploaded during newMultipartUpload, used +// by callers to verify object states +// - encrypted +// - compressed +func (fs *FSObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) { + minfo := MultipartInfo{ + Bucket: bucket, + Object: object, + UploadID: uploadID, + } + + if err := checkListPartsArgs(ctx, bucket, object, fs); err != nil { + return minfo, toObjectErr(err) + } + + // Check if bucket exists + if _, err := fs.statBucketDir(ctx, bucket); err != nil { + return minfo, toObjectErr(err, bucket) + } + + uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID) + if _, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile)); err != nil { + if err == errFileNotFound || err == errFileAccessDenied { + return minfo, InvalidUploadID{UploadID: uploadID} + } + return minfo, toObjectErr(err, bucket, object) + } + + fsMetaBytes, err := ioutil.ReadFile(pathJoin(uploadIDDir, fs.metaJSONFile)) + if err != nil { + logger.LogIf(ctx, err) + return minfo, toObjectErr(err, bucket, object) + } + + var fsMeta fsMetaV1 + var json = jsoniter.ConfigCompatibleWithStandardLibrary + if err = json.Unmarshal(fsMetaBytes, &fsMeta); err != nil { + return minfo, toObjectErr(err, bucket, object) + } + + minfo.UserDefined = fsMeta.Meta + return minfo, nil +} + // ListObjectParts - lists all previously uploaded parts for a given // object and uploadID. Takes additional input of part-number-marker // to indicate where the listing should begin from. diff --git a/cmd/gateway-common.go b/cmd/gateway-common.go index f1624f437..0c31952f3 100644 --- a/cmd/gateway-common.go +++ b/cmd/gateway-common.go @@ -130,7 +130,6 @@ func FromMinioClientListPartsInfo(lopr minio.ListObjectPartsResult) ListPartsInf NextPartNumberMarker: lopr.NextPartNumberMarker, MaxParts: lopr.MaxParts, IsTruncated: lopr.IsTruncated, - EncodingType: lopr.EncodingType, Parts: fromMinioClientObjectParts(lopr.ObjectParts), } } diff --git a/cmd/gateway-unsupported.go b/cmd/gateway-unsupported.go index 74c03d349..79270a3a6 100644 --- a/cmd/gateway-unsupported.go +++ b/cmd/gateway-unsupported.go @@ -83,6 +83,12 @@ func (a GatewayUnsupported) PutObjectPart(ctx context.Context, bucket string, ob return pi, NotImplemented{} } +// GetMultipartInfo returns metadata associated with the uploadId +func (a GatewayUnsupported) GetMultipartInfo(ctx context.Context, bucket string, object string, uploadID string, opts ObjectOptions) (MultipartInfo, error) { + logger.LogIf(ctx, NotImplemented{}) + return MultipartInfo{}, NotImplemented{} +} + // ListObjectParts returns all object parts for specified object in specified bucket func (a GatewayUnsupported) ListObjectParts(ctx context.Context, bucket string, object string, uploadID string, partNumberMarker int, maxParts int, opts ObjectOptions) (lpi ListPartsInfo, err error) { logger.LogIf(ctx, NotImplemented{}) diff --git a/cmd/gateway/azure/gateway-azure.go b/cmd/gateway/azure/gateway-azure.go index cfb2eeb10..819126435 100644 --- a/cmd/gateway/azure/gateway-azure.go +++ b/cmd/gateway/azure/gateway-azure.go @@ -1092,6 +1092,18 @@ func (a *azureObjects) PutObjectPart(ctx context.Context, bucket, object, upload return info, nil } +// GetMultipartInfo returns multipart info of the uploadId of the object +func (a *azureObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (result minio.MultipartInfo, err error) { + if err = a.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { + return result, err + } + + result.Bucket = bucket + result.Object = object + result.UploadID = uploadID + return result, nil +} + // ListObjectParts - Use Azure equivalent `ContainerURL.ListBlobsHierarchySegment`. func (a *azureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (result minio.ListPartsInfo, err error) { if err = a.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { diff --git a/cmd/gateway/gcs/gateway-gcs.go b/cmd/gateway/gcs/gateway-gcs.go index 6d6be0576..66d51117a 100644 --- a/cmd/gateway/gcs/gateway-gcs.go +++ b/cmd/gateway/gcs/gateway-gcs.go @@ -1152,6 +1152,14 @@ func gcsGetPartInfo(ctx context.Context, attrs *storage.ObjectAttrs) (minio.Part }, nil } +// GetMultipartInfo returns multipart info of the uploadId of the object +func (l *gcsGateway) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (result minio.MultipartInfo, err error) { + result.Bucket = bucket + result.Object = object + result.UploadID = uploadID + return result, nil +} + // ListObjectParts returns all object parts for specified object in specified bucket func (l *gcsGateway) ListObjectParts(ctx context.Context, bucket string, key string, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (minio.ListPartsInfo, error) { it := l.client.Bucket(bucket).Objects(ctx, &storage.Query{ diff --git a/cmd/gateway/hdfs/gateway-hdfs.go b/cmd/gateway/hdfs/gateway-hdfs.go index b25c40249..65e76b8e7 100644 --- a/cmd/gateway/hdfs/gateway-hdfs.go +++ b/cmd/gateway/hdfs/gateway-hdfs.go @@ -634,6 +634,23 @@ func (n *hdfsObjects) checkUploadIDExists(ctx context.Context, bucket, object, u return nil } +// GetMultipartInfo returns multipart info of the uploadId of the object +func (n *hdfsObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (result minio.MultipartInfo, err error) { + _, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket)) + if err != nil { + return result, hdfsToObjectErr(ctx, err, bucket) + } + + if err = n.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { + return result, err + } + + result.Bucket = bucket + result.Object = object + result.UploadID = uploadID + return result, nil +} + func (n *hdfsObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (result minio.ListPartsInfo, err error) { _, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket)) if err != nil { diff --git a/cmd/gateway/s3/gateway-s3-sse.go b/cmd/gateway/s3/gateway-s3-sse.go index d4471d23f..7cb172da1 100644 --- a/cmd/gateway/s3/gateway-s3-sse.go +++ b/cmd/gateway/s3/gateway-s3-sse.go @@ -528,6 +528,21 @@ func (l *s3EncObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, return l.PutObjectPart(ctx, destBucket, destObject, uploadID, partID, srcInfo.PutObjReader, dstOpts) } +// GetMultipartInfo returns multipart info of the uploadId of the object +func (l *s3EncObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (result minio.MultipartInfo, err error) { + result.Bucket = bucket + result.Object = object + result.UploadID = uploadID + // We do not store parts uploaded so far in the dare.meta. Only CompleteMultipartUpload finalizes the parts under upload prefix.Otherwise, + // there could be situations of dare.meta getting corrupted by competing upload parts. + dm, err := l.getGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID)) + if err != nil { + return l.s3Objects.GetMultipartInfo(ctx, bucket, object, uploadID, opts) + } + result.UserDefined = dm.ToObjectInfo(bucket, object).UserDefined + return result, nil +} + // ListObjectParts returns all object parts for specified object in specified bucket func (l *s3EncObjects) ListObjectParts(ctx context.Context, bucket string, object string, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (lpi minio.ListPartsInfo, e error) { // We do not store parts uploaded so far in the dare.meta. Only CompleteMultipartUpload finalizes the parts under upload prefix.Otherwise, diff --git a/cmd/gateway/s3/gateway-s3.go b/cmd/gateway/s3/gateway-s3.go index 4b847aa6c..29e75fd08 100644 --- a/cmd/gateway/s3/gateway-s3.go +++ b/cmd/gateway/s3/gateway-s3.go @@ -612,14 +612,40 @@ func (l *s3Objects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, de return p, nil } +// GetMultipartInfo returns multipart info of the uploadId of the object +func (l *s3Objects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (result minio.MultipartInfo, err error) { + result.Bucket = bucket + result.Object = object + result.UploadID = uploadID + return result, nil +} + // ListObjectParts returns all object parts for specified object in specified bucket func (l *s3Objects) ListObjectParts(ctx context.Context, bucket string, object string, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (lpi minio.ListPartsInfo, e error) { result, err := l.Client.ListObjectParts(bucket, object, uploadID, partNumberMarker, maxParts) if err != nil { - return lpi, minio.ErrorRespToObjectError(err, bucket, object) + return lpi, err } + lpi = minio.FromMinioClientListPartsInfo(result) + if lpi.IsTruncated && maxParts > len(lpi.Parts) { + partNumberMarker = lpi.NextPartNumberMarker + for { + result, err = l.Client.ListObjectParts(bucket, object, uploadID, partNumberMarker, maxParts) + if err != nil { + return lpi, err + } + + nlpi := minio.FromMinioClientListPartsInfo(result) - return minio.FromMinioClientListPartsInfo(result), nil + partNumberMarker = nlpi.NextPartNumberMarker + + lpi.Parts = append(lpi.Parts, nlpi.Parts...) + if !nlpi.IsTruncated { + break + } + } + } + return lpi, nil } // AbortMultipartUpload aborts a ongoing multipart upload diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go index 27d1f2bc9..926d306ae 100644 --- a/cmd/object-api-datatypes.go +++ b/cmd/object-api-datatypes.go @@ -194,6 +194,7 @@ type ObjectInfo struct { PutObjReader *PutObjReader `json:"-"` metadataOnly bool + keyRotation bool // Date and time when the object was last accessed. AccTime time.Time @@ -202,6 +203,28 @@ type ObjectInfo struct { backendType BackendType } +// MultipartInfo captures metadata information about the uploadId +// this data structure is used primarily for some internal purposes +// for verifying upload type such as was the upload +// - encrypted +// - compressed +type MultipartInfo struct { + // Name of the bucket. + Bucket string + + // Name of the object. + Object string + + // Upload ID identifying the multipart upload whose parts are being listed. + UploadID string + + // Date and time at which the multipart upload was initiated. + Initiated time.Time + + // Any metadata set during InitMultipartUpload, including encryption headers. + UserDefined map[string]string +} + // ListPartsInfo - represents list of all parts. type ListPartsInfo struct { // Name of the bucket. @@ -235,8 +258,6 @@ type ListPartsInfo struct { // Any metadata set during InitMultipartUpload, including encryption headers. UserDefined map[string]string - - EncodingType string // Not supported yet. } // Lookup - returns if uploadID is valid @@ -362,20 +383,6 @@ type PartInfo struct { ActualSize int64 } -// MultipartInfo - represents metadata in progress multipart upload. -type MultipartInfo struct { - // Object name for which the multipart upload was initiated. - Object string - - // Unique identifier for this multipart upload. - UploadID string - - // Date and time at which the multipart upload was initiated. - Initiated time.Time - - StorageClass string // Not supported yet. -} - // CompletePart - represents the part that was completed, this is sent by the client // during CompleteMultipartUpload request. type CompletePart struct { diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 5b82e941f..65ee01253 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -92,6 +92,7 @@ type ObjectLayer interface { CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, uploadID string, partID int, startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (info PartInfo, err error) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (info PartInfo, err error) + GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (info MultipartInfo, err error) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) error CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error) diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index fb373897e..b20d9e07c 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -1008,12 +1008,13 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re // Since we are rotating the keys, make sure to update the metadata. srcInfo.metadataOnly = true + srcInfo.keyRotation = true } else { if isSourceEncrypted || isTargetEncrypted { // We are not only copying just metadata instead // we are creating a new object at this point, even // if source and destination are same objects. - if !srcInfo.metadataOnly { + if !srcInfo.keyRotation { srcInfo.metadataOnly = false } } @@ -1858,15 +1859,14 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt actualPartSize = length var reader io.Reader - var li ListPartsInfo - li, err = objectAPI.ListObjectParts(ctx, dstBucket, dstObject, uploadID, 0, 1, dstOpts) + mi, err := objectAPI.GetMultipartInfo(ctx, dstBucket, dstObject, uploadID, dstOpts) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } // Read compression metadata preserved in the init multipart for the decision. - _, isCompressed := li.UserDefined[ReservedMetadataPrefix+"compression"] + _, isCompressed := mi.UserDefined[ReservedMetadataPrefix+"compression"] // Compress only if the compression is enabled during initial multipart. if isCompressed { s2c := newS2CompressReader(gr) @@ -1883,7 +1883,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt return } - dstOpts, err = copyDstOpts(ctx, r, dstBucket, dstObject, li.UserDefined) + dstOpts, err = copyDstOpts(ctx, r, dstBucket, dstObject, mi.UserDefined) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return @@ -1892,14 +1892,14 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt rawReader := srcInfo.Reader pReader := NewPutObjReader(rawReader, nil, nil) - isEncrypted := crypto.IsEncrypted(li.UserDefined) + isEncrypted := crypto.IsEncrypted(mi.UserDefined) var objectEncryptionKey crypto.ObjectKey if objectAPI.IsEncryptionSupported() && !isCompressed && isEncrypted { - if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(li.UserDefined) { + if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(mi.UserDefined) { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL, guessIsBrowserReq(r)) return } - if crypto.S3.IsEncrypted(li.UserDefined) && crypto.SSEC.IsRequested(r.Header) { + if crypto.S3.IsEncrypted(mi.UserDefined) && crypto.SSEC.IsRequested(r.Header) { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL, guessIsBrowserReq(r)) return } @@ -1911,7 +1911,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt return } } - key, err = decryptObjectInfo(key, dstBucket, dstObject, li.UserDefined) + key, err = decryptObjectInfo(key, dstBucket, dstObject, mi.UserDefined) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return @@ -2094,15 +2094,15 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http return } } - var li ListPartsInfo - li, err = objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, 1, opts) + + mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, opts) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } // Read compression metadata preserved in the init multipart for the decision. - _, isCompressed := li.UserDefined[ReservedMetadataPrefix+"compression"] + _, isCompressed := mi.UserDefined[ReservedMetadataPrefix+"compression"] if objectAPI.IsCompressionSupported() && isCompressed { actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize, globalCLIContext.StrictS3Compat) @@ -2128,15 +2128,15 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http rawReader := hashReader pReader := NewPutObjReader(rawReader, nil, nil) - isEncrypted := crypto.IsEncrypted(li.UserDefined) + isEncrypted := crypto.IsEncrypted(mi.UserDefined) var objectEncryptionKey crypto.ObjectKey if objectAPI.IsEncryptionSupported() && !isCompressed && isEncrypted { - if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(li.UserDefined) { + if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(mi.UserDefined) { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL, guessIsBrowserReq(r)) return } - opts, err = putOpts(ctx, r, bucket, object, li.UserDefined) + opts, err = putOpts(ctx, r, bucket, object, mi.UserDefined) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return @@ -2152,7 +2152,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http } // Calculating object encryption key - key, err = decryptObjectInfo(key, bucket, object, li.UserDefined) + key, err = decryptObjectInfo(key, bucket, object, mi.UserDefined) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return @@ -2438,19 +2438,18 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite var opts ObjectOptions var isEncrypted, ssec bool if objectAPI.IsEncryptionSupported() { - var li ListPartsInfo - li, err = objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, 1, opts) + mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, opts) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } - if crypto.IsEncrypted(li.UserDefined) { + if crypto.IsEncrypted(mi.UserDefined) { var key []byte isEncrypted = true - ssec = crypto.SSEC.IsEncrypted(li.UserDefined) - if crypto.S3.IsEncrypted(li.UserDefined) { + ssec = crypto.SSEC.IsEncrypted(mi.UserDefined) + if crypto.S3.IsEncrypted(mi.UserDefined) { // Calculating object encryption key - objectEncryptionKey, err = decryptObjectInfo(key, bucket, object, li.UserDefined) + objectEncryptionKey, err = decryptObjectInfo(key, bucket, object, mi.UserDefined) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return @@ -2461,21 +2460,14 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite partsMap := make(map[string]PartInfo) if isEncrypted { - var partNumberMarker int - maxParts := 1000 - for { - listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) - return - } - for _, part := range listPartsInfo.Parts { - partsMap[strconv.Itoa(part.PartNumber)] = part - } - partNumberMarker = listPartsInfo.NextPartNumberMarker - if !listPartsInfo.IsTruncated { - break - } + maxParts := 10000 + listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, maxParts, opts) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + for _, part := range listPartsInfo.Parts { + partsMap[strconv.Itoa(part.PartNumber)] = part } } diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 131e6f00c..b0cd84d8d 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -1169,6 +1169,11 @@ func (s *xlSets) PutObjectPart(ctx context.Context, bucket, object, uploadID str return s.getHashedSet(object).PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts) } +// GetMultipartInfo - return multipart metadata info uploaded at hashedSet. +func (s *xlSets) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (result MultipartInfo, err error) { + return s.getHashedSet(object).GetMultipartInfo(ctx, bucket, object, uploadID, opts) +} + // ListObjectParts - lists all uploaded parts to an object in hashedSet. func (s *xlSets) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) { return s.getHashedSet(object).ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts) diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 927059468..8cd8f173d 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -412,6 +412,51 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID }, nil } +// GetMultipartInfo returns multipart metadata uploaded during newMultipartUpload, used +// by callers to verify object states +// - encrypted +// - compressed +func (xl xlObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) { + result := MultipartInfo{ + Bucket: bucket, + Object: object, + UploadID: uploadID, + } + + if err := xl.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { + return result, toObjectErr(err, bucket, object, uploadID) + } + + uploadIDPath := xl.getUploadIDDir(bucket, object, uploadID) + + storageDisks := xl.getDisks() + + // Read metadata associated with the object from all disks. + partsMetadata, errs := readAllXLMetadata(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath) + + // get Quorum for this object + _, writeQuorum, err := objectQuorumFromMeta(ctx, xl, partsMetadata, errs) + if err != nil { + return result, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) + } + + reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) + if reducedErr == errXLWriteQuorum { + return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath) + } + + _, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) + + // Pick one from the first valid metadata. + xlMeta, err := pickValidXLMeta(ctx, partsMetadata, modTime, writeQuorum) + if err != nil { + return result, err + } + + result.UserDefined = xlMeta.Meta + return result, nil +} + // ListObjectParts - lists all previously uploaded parts for a given // object and uploadID. Takes additional input of part-number-marker // to indicate where the listing should begin from. diff --git a/cmd/xl-zones.go b/cmd/xl-zones.go index db92313b7..3d8d7f2e0 100644 --- a/cmd/xl-zones.go +++ b/cmd/xl-zones.go @@ -1046,13 +1046,17 @@ func (z *xlZones) PutObjectPart(ctx context.Context, bucket, object, uploadID st return z.zones[0].PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts) } for _, zone := range z.zones { - result, err := zone.ListMultipartUploads(ctx, bucket, object, "", "", "", maxUploadsList) - if err != nil { - return PartInfo{}, err - } - if result.Lookup(uploadID) { + _, err := zone.GetMultipartInfo(ctx, bucket, object, uploadID, opts) + if err == nil { return zone.PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts) } + switch err.(type) { + case InvalidUploadID: + // Look for information on the next zone + continue + } + // Any other unhandled errors such as quorum return. + return PartInfo{}, err } return PartInfo{}, InvalidUploadID{ @@ -1062,6 +1066,41 @@ func (z *xlZones) PutObjectPart(ctx context.Context, bucket, object, uploadID st } } +func (z *xlZones) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) { + if err := checkListPartsArgs(ctx, bucket, object, z); err != nil { + return MultipartInfo{}, err + } + + uploadIDLock := z.NewNSLock(ctx, bucket, pathJoin(object, uploadID)) + if err := uploadIDLock.GetRLock(globalOperationTimeout); err != nil { + return MultipartInfo{}, err + } + defer uploadIDLock.RUnlock() + + if z.SingleZone() { + return z.zones[0].GetMultipartInfo(ctx, bucket, object, uploadID, opts) + } + for _, zone := range z.zones { + mi, err := zone.GetMultipartInfo(ctx, bucket, object, uploadID, opts) + if err == nil { + return mi, nil + } + switch err.(type) { + case InvalidUploadID: + // upload id not found, continue to the next zone. + continue + } + // any other unhandled error return right here. + return MultipartInfo{}, err + } + return MultipartInfo{}, InvalidUploadID{ + Bucket: bucket, + Object: object, + UploadID: uploadID, + } + +} + // ListObjectParts - lists all uploaded parts to an object in hashedSet. func (z *xlZones) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts ObjectOptions) (ListPartsInfo, error) { if err := checkListPartsArgs(ctx, bucket, object, z); err != nil { @@ -1078,13 +1117,15 @@ func (z *xlZones) ListObjectParts(ctx context.Context, bucket, object, uploadID return z.zones[0].ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts) } for _, zone := range z.zones { - result, err := zone.ListMultipartUploads(ctx, bucket, object, "", "", "", maxUploadsList) - if err != nil { - return ListPartsInfo{}, err - } - if result.Lookup(uploadID) { + _, err := zone.GetMultipartInfo(ctx, bucket, object, uploadID, opts) + if err == nil { return zone.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts) } + switch err.(type) { + case InvalidUploadID: + continue + } + return ListPartsInfo{}, err } return ListPartsInfo{}, InvalidUploadID{ Bucket: bucket, @@ -1110,13 +1151,16 @@ func (z *xlZones) AbortMultipartUpload(ctx context.Context, bucket, object, uplo } for _, zone := range z.zones { - result, err := zone.ListMultipartUploads(ctx, bucket, object, "", "", "", maxUploadsList) - if err != nil { - return err - } - if result.Lookup(uploadID) { + _, err := zone.GetMultipartInfo(ctx, bucket, object, uploadID, ObjectOptions{}) + if err == nil { return zone.AbortMultipartUpload(ctx, bucket, object, uploadID) } + switch err.(type) { + case InvalidUploadID: + // upload id not found move to next zone + continue + } + return err } return InvalidUploadID{ Bucket: bucket,