Implement ListMultipartUploads, ListObjectParts for GCS gateway (#6377)

ListMultipartUploads implementation is meant for docker-registry
use-case only. It lists only the first upload with a prefix matching
the object being uploaded.
master
Krishnan Parthasarathi 6 years ago committed by kannappanr
parent 2c46214291
commit 1126410e62
  1. 172
      cmd/gateway/gcs/gateway-gcs.go

@ -20,11 +20,15 @@ import (
"context" "context"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"math" "math"
"net/http" "net/http"
"path"
"strconv"
"os" "os"
"regexp" "regexp"
"strings" "strings"
@ -936,14 +940,84 @@ func (l *gcsGateway) NewMultipartUpload(ctx context.Context, bucket string, key
return uploadID, nil return uploadID, nil
} }
// ListMultipartUploads - lists all multipart uploads. // ListMultipartUploads - lists the (first) multipart upload for an object
// matched _exactly_ by the prefix
func (l *gcsGateway) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (minio.ListMultipartsInfo, error) { func (l *gcsGateway) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (minio.ListMultipartsInfo, error) {
// List objects under <bucket>/gcsMinioMultipartPathV1
it := l.client.Bucket(bucket).Objects(ctx, &storage.Query{
Prefix: gcsMinioMultipartPathV1,
})
var uploads []minio.MultipartInfo
for {
attrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
logger.LogIf(ctx, err)
return minio.ListMultipartsInfo{
KeyMarker: keyMarker,
UploadIDMarker: uploadIDMarker,
MaxUploads: maxUploads,
Prefix: prefix,
Delimiter: delimiter,
}, gcsToObjectError(err)
}
// Skip entries other than gcs.json
if !strings.HasSuffix(attrs.Name, gcsMinioMultipartMeta) {
continue
}
// Extract multipart upload information from gcs.json
obj := l.client.Bucket(bucket).Object(attrs.Name)
objReader, rErr := obj.NewReader(ctx)
if rErr != nil {
logger.LogIf(ctx, rErr)
return minio.ListMultipartsInfo{}, rErr
}
defer objReader.Close()
var mpMeta gcsMultipartMetaV1
dec := json.NewDecoder(objReader)
decErr := dec.Decode(&mpMeta)
if decErr != nil {
logger.LogIf(ctx, decErr)
return minio.ListMultipartsInfo{}, decErr
}
if prefix == mpMeta.Object {
// Extract uploadId
// E.g minio.sys.tmp/multipart/v1/d063ad89-fdc4-4ea3-a99e-22dba98151f5/gcs.json
components := strings.SplitN(attrs.Name, "/", 5)
if len(components) != 5 {
compErr := errors.New("Invalid multipart upload format")
logger.LogIf(ctx, compErr)
return minio.ListMultipartsInfo{}, compErr
}
upload := minio.MultipartInfo{
Object: mpMeta.Object,
UploadID: components[3],
Initiated: attrs.Created,
}
uploads = []minio.MultipartInfo{upload}
break
}
}
return minio.ListMultipartsInfo{ return minio.ListMultipartsInfo{
KeyMarker: keyMarker, KeyMarker: keyMarker,
UploadIDMarker: uploadIDMarker, UploadIDMarker: uploadIDMarker,
MaxUploads: maxUploads, MaxUploads: maxUploads,
Prefix: prefix, Prefix: prefix,
Delimiter: delimiter, Delimiter: delimiter,
Uploads: uploads,
NextKeyMarker: "",
NextUploadIDMarker: "",
IsTruncated: false,
}, nil }, nil
} }
@ -987,9 +1061,91 @@ func (l *gcsGateway) PutObjectPart(ctx context.Context, bucket string, key strin
} }
// ListObjectParts returns all object parts for specified object in specified bucket // gcsGetPartInfo returns PartInfo of a given object part
func gcsGetPartInfo(ctx context.Context, attrs *storage.ObjectAttrs) (minio.PartInfo, error) {
components := strings.SplitN(attrs.Name, "/", 5)
if len(components) != 5 {
logger.LogIf(ctx, errors.New("Invalid multipart upload format"))
return minio.PartInfo{}, errors.New("Invalid multipart upload format")
}
partComps := strings.SplitN(components[4], ".", 2)
if len(partComps) != 2 {
logger.LogIf(ctx, errors.New("Invalid multipart part format"))
return minio.PartInfo{}, errors.New("Invalid multipart part format")
}
partNum, pErr := strconv.Atoi(partComps[0])
if pErr != nil {
logger.LogIf(ctx, pErr)
return minio.PartInfo{}, errors.New("Invalid part number")
}
return minio.PartInfo{
PartNumber: partNum,
LastModified: attrs.Updated,
Size: attrs.Size,
ETag: partComps[1],
}, nil
}
// ListObjectParts returns all object parts for specified object in specified bucket
func (l *gcsGateway) ListObjectParts(ctx context.Context, bucket string, key string, uploadID string, partNumberMarker int, maxParts int) (minio.ListPartsInfo, error) { func (l *gcsGateway) ListObjectParts(ctx context.Context, bucket string, key string, uploadID string, partNumberMarker int, maxParts int) (minio.ListPartsInfo, error) {
return minio.ListPartsInfo{}, l.checkUploadIDExists(ctx, bucket, key, uploadID) it := l.client.Bucket(bucket).Objects(ctx, &storage.Query{
Prefix: path.Join(gcsMinioMultipartPathV1, uploadID),
})
var (
count int
partInfos []minio.PartInfo
)
isTruncated := true
for count < maxParts {
attrs, err := it.Next()
if err == iterator.Done {
isTruncated = false
break
}
if err != nil {
logger.LogIf(ctx, err)
return minio.ListPartsInfo{}, gcsToObjectError(err)
}
if strings.HasSuffix(attrs.Name, gcsMinioMultipartMeta) {
continue
}
partInfo, pErr := gcsGetPartInfo(ctx, attrs)
if pErr != nil {
logger.LogIf(ctx, pErr)
return minio.ListPartsInfo{}, pErr
}
if partInfo.PartNumber <= partNumberMarker {
continue
}
partInfos = append(partInfos, partInfo)
count++
}
nextPartNumberMarker := 0
if isTruncated {
nextPartNumberMarker = partInfos[maxParts-1].PartNumber
}
return minio.ListPartsInfo{
Bucket: bucket,
Object: key,
UploadID: uploadID,
PartNumberMarker: partNumberMarker,
NextPartNumberMarker: nextPartNumberMarker,
MaxParts: maxParts,
Parts: partInfos,
IsTruncated: isTruncated,
}, nil
} }
// Called by AbortMultipartUpload and CompleteMultipartUpload for cleaning up. // Called by AbortMultipartUpload and CompleteMultipartUpload for cleaning up.

Loading…
Cancel
Save