From 1126410e6218507e082a9113cf5f49dded9613f3 Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Wed, 5 Sep 2018 01:41:33 +0530 Subject: [PATCH] Implement ListMultipartUploads, ListObjectParts for GCS gateway (#6377) ListMultipartUploads implementation is meant for docker-registry use-case only. It lists only the first upload with a prefix matching the object being uploaded. --- cmd/gateway/gcs/gateway-gcs.go | 172 +++++++++++++++++++++++++++++++-- 1 file changed, 164 insertions(+), 8 deletions(-) diff --git a/cmd/gateway/gcs/gateway-gcs.go b/cmd/gateway/gcs/gateway-gcs.go index cd1135a2b..4b838ee96 100644 --- a/cmd/gateway/gcs/gateway-gcs.go +++ b/cmd/gateway/gcs/gateway-gcs.go @@ -20,11 +20,15 @@ import ( "context" "encoding/base64" "encoding/json" + "errors" "fmt" "io" "io/ioutil" "math" "net/http" + "path" + "strconv" + "os" "regexp" "strings" @@ -936,14 +940,84 @@ func (l *gcsGateway) NewMultipartUpload(ctx context.Context, bucket string, key return uploadID, nil } -// ListMultipartUploads - lists all multipart uploads. +// ListMultipartUploads - lists the (first) multipart upload for an object +// matched _exactly_ by the prefix func (l *gcsGateway) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (minio.ListMultipartsInfo, error) { + // List objects under /gcsMinioMultipartPathV1 + it := l.client.Bucket(bucket).Objects(ctx, &storage.Query{ + Prefix: gcsMinioMultipartPathV1, + }) + + var uploads []minio.MultipartInfo + + for { + attrs, err := it.Next() + if err == iterator.Done { + break + } + + if err != nil { + logger.LogIf(ctx, err) + return minio.ListMultipartsInfo{ + KeyMarker: keyMarker, + UploadIDMarker: uploadIDMarker, + MaxUploads: maxUploads, + Prefix: prefix, + Delimiter: delimiter, + }, gcsToObjectError(err) + } + + // Skip entries other than gcs.json + if !strings.HasSuffix(attrs.Name, gcsMinioMultipartMeta) { + continue + } + + // Extract multipart upload information from gcs.json + obj := l.client.Bucket(bucket).Object(attrs.Name) + objReader, rErr := obj.NewReader(ctx) + if rErr != nil { + logger.LogIf(ctx, rErr) + return minio.ListMultipartsInfo{}, rErr + } + defer objReader.Close() + + var mpMeta gcsMultipartMetaV1 + dec := json.NewDecoder(objReader) + decErr := dec.Decode(&mpMeta) + if decErr != nil { + logger.LogIf(ctx, decErr) + return minio.ListMultipartsInfo{}, decErr + } + + if prefix == mpMeta.Object { + // Extract uploadId + // E.g minio.sys.tmp/multipart/v1/d063ad89-fdc4-4ea3-a99e-22dba98151f5/gcs.json + components := strings.SplitN(attrs.Name, "/", 5) + if len(components) != 5 { + compErr := errors.New("Invalid multipart upload format") + logger.LogIf(ctx, compErr) + return minio.ListMultipartsInfo{}, compErr + } + upload := minio.MultipartInfo{ + Object: mpMeta.Object, + UploadID: components[3], + Initiated: attrs.Created, + } + uploads = []minio.MultipartInfo{upload} + break + } + } + return minio.ListMultipartsInfo{ - KeyMarker: keyMarker, - UploadIDMarker: uploadIDMarker, - MaxUploads: maxUploads, - Prefix: prefix, - Delimiter: delimiter, + KeyMarker: keyMarker, + UploadIDMarker: uploadIDMarker, + MaxUploads: maxUploads, + Prefix: prefix, + Delimiter: delimiter, + Uploads: uploads, + NextKeyMarker: "", + NextUploadIDMarker: "", + IsTruncated: false, }, nil } @@ -987,9 +1061,91 @@ func (l *gcsGateway) PutObjectPart(ctx context.Context, bucket string, key strin } -// ListObjectParts returns all object parts for specified object in specified bucket +// gcsGetPartInfo returns PartInfo of a given object part +func gcsGetPartInfo(ctx context.Context, attrs *storage.ObjectAttrs) (minio.PartInfo, error) { + components := strings.SplitN(attrs.Name, "/", 5) + if len(components) != 5 { + logger.LogIf(ctx, errors.New("Invalid multipart upload format")) + return minio.PartInfo{}, errors.New("Invalid multipart upload format") + } + + partComps := strings.SplitN(components[4], ".", 2) + if len(partComps) != 2 { + logger.LogIf(ctx, errors.New("Invalid multipart part format")) + return minio.PartInfo{}, errors.New("Invalid multipart part format") + } + + partNum, pErr := strconv.Atoi(partComps[0]) + if pErr != nil { + logger.LogIf(ctx, pErr) + return minio.PartInfo{}, errors.New("Invalid part number") + } + + return minio.PartInfo{ + PartNumber: partNum, + LastModified: attrs.Updated, + Size: attrs.Size, + ETag: partComps[1], + }, nil +} + +// ListObjectParts returns all object parts for specified object in specified bucket func (l *gcsGateway) ListObjectParts(ctx context.Context, bucket string, key string, uploadID string, partNumberMarker int, maxParts int) (minio.ListPartsInfo, error) { - return minio.ListPartsInfo{}, l.checkUploadIDExists(ctx, bucket, key, uploadID) + it := l.client.Bucket(bucket).Objects(ctx, &storage.Query{ + Prefix: path.Join(gcsMinioMultipartPathV1, uploadID), + }) + + var ( + count int + partInfos []minio.PartInfo + ) + + isTruncated := true + for count < maxParts { + attrs, err := it.Next() + if err == iterator.Done { + isTruncated = false + break + } + + if err != nil { + logger.LogIf(ctx, err) + return minio.ListPartsInfo{}, gcsToObjectError(err) + } + + if strings.HasSuffix(attrs.Name, gcsMinioMultipartMeta) { + continue + } + + partInfo, pErr := gcsGetPartInfo(ctx, attrs) + if pErr != nil { + logger.LogIf(ctx, pErr) + return minio.ListPartsInfo{}, pErr + } + + if partInfo.PartNumber <= partNumberMarker { + continue + } + + partInfos = append(partInfos, partInfo) + count++ + } + + nextPartNumberMarker := 0 + if isTruncated { + nextPartNumberMarker = partInfos[maxParts-1].PartNumber + } + + return minio.ListPartsInfo{ + Bucket: bucket, + Object: key, + UploadID: uploadID, + PartNumberMarker: partNumberMarker, + NextPartNumberMarker: nextPartNumberMarker, + MaxParts: maxParts, + Parts: partInfos, + IsTruncated: isTruncated, + }, nil } // Called by AbortMultipartUpload and CompleteMultipartUpload for cleaning up.