diff --git a/cmd/gateway-azure.go b/cmd/gateway-azure.go index c1366d8e4..8bc7c1cc0 100644 --- a/cmd/gateway-azure.go +++ b/cmd/gateway-azure.go @@ -17,13 +17,19 @@ package cmd import ( + "bytes" + "crypto/rand" + "crypto/sha256" "encoding/base64" + "encoding/hex" + "encoding/json" + "errors" "fmt" "io" "net/http" "net/url" + "strconv" "strings" - "sync" "time" "github.com/Azure/azure-sdk-for-go/storage" @@ -33,6 +39,7 @@ import ( const globalAzureAPIVersion = "2016-05-31" const azureBlockSize = 100 * humanize.MiByte +const metadataObjectNameTemplate = globalMinioSysTmp + "multipart/v1/%s.%x/azure.json" // Canonicalize the metadata headers, without this azure-sdk calculates // incorrect signature. This attempt to canonicalize is to convert @@ -87,38 +94,9 @@ func azureToS3ETag(etag string) string { return canonicalizeETag(etag) + "-1" } -// To store metadata during NewMultipartUpload which will be used after -// CompleteMultipartUpload to call SetBlobMetadata. -type azureMultipartMetaInfo struct { - meta map[string]map[string]string - *sync.Mutex -} - -// Return metadata map of the multipart object. -func (a *azureMultipartMetaInfo) get(key string) map[string]string { - a.Lock() - defer a.Unlock() - return a.meta[key] -} - -// Set metadata map for the multipart object. -func (a *azureMultipartMetaInfo) set(key string, value map[string]string) { - a.Lock() - defer a.Unlock() - a.meta[key] = value -} - -// Delete metadata map for the multipart object. -func (a *azureMultipartMetaInfo) del(key string) { - a.Lock() - defer a.Unlock() - delete(a.meta, key) -} - // azureObjects - Implements Object layer for Azure blob storage. type azureObjects struct { - client storage.BlobStorageClient // Azure sdk client - metaInfo azureMultipartMetaInfo + client storage.BlobStorageClient // Azure sdk client } // Convert azure errors to minio object layer errors. @@ -177,6 +155,68 @@ func azureToObjectError(err error, params ...string) error { return e } +// mustGetAzureUploadID - returns new upload ID which is hex encoded 8 bytes random value. +func mustGetAzureUploadID() string { + var id [8]byte + + n, err := io.ReadFull(rand.Reader, id[:]) + if err != nil { + panic(fmt.Errorf("unable to generate upload ID for azure. %s", err)) + } + if n != len(id) { + panic(fmt.Errorf("insufficient random data (expected: %d, read: %d)", len(id), n)) + } + + return fmt.Sprintf("%x", id[:]) +} + +// checkAzureUploadID - returns error in case of given string is upload ID. +func checkAzureUploadID(uploadID string) (err error) { + if len(uploadID) != 16 { + return traceError(MalformedUploadID{uploadID}) + } + + if _, err = hex.DecodeString(uploadID); err != nil { + return traceError(MalformedUploadID{uploadID}) + } + + return nil +} + +// Encode partID, subPartNumber, uploadID and md5Hex to blockID. +func azureGetBlockID(partID, subPartNumber int, uploadID, md5Hex string) string { + return base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%05d.%02d.%s.%s", partID, subPartNumber, uploadID, md5Hex))) +} + +// Parse blockID into partID, subPartNumber and md5Hex. +func azureParseBlockID(blockID string) (partID, subPartNumber int, uploadID, md5Hex string, err error) { + var blockIDBytes []byte + if blockIDBytes, err = base64.StdEncoding.DecodeString(blockID); err != nil { + return + } + + tokens := strings.Split(string(blockIDBytes), ".") + if len(tokens) != 4 { + err = fmt.Errorf("invalid block id '%s'", string(blockIDBytes)) + return + } + + if partID, err = strconv.Atoi(tokens[0]); err != nil || partID <= 0 { + err = fmt.Errorf("invalid part number in block id '%s'", string(blockIDBytes)) + return + } + + if subPartNumber, err = strconv.Atoi(tokens[1]); err != nil || subPartNumber <= 0 { + err = fmt.Errorf("invalid sub-part number in block id '%s'", string(blockIDBytes)) + return + } + + uploadID = tokens[2] + md5Hex = tokens[3] + + return +} + // Inits azure blob storage client and returns AzureObjects. func newAzureLayer(host string) (GatewayLayer, error) { var err error @@ -200,10 +240,6 @@ func newAzureLayer(host string) (GatewayLayer, error) { return &azureObjects{ client: c.GetBlobService(), - metaInfo: azureMultipartMetaInfo{ - meta: make(map[string]map[string]string), - Mutex: &sync.Mutex{}, - }, }, nil } @@ -286,6 +322,9 @@ func (a *azureObjects) ListObjects(bucket, prefix, marker, delimiter string, max result.IsTruncated = resp.NextMarker != "" result.NextMarker = resp.NextMarker for _, object := range resp.Blobs { + if strings.HasPrefix(object.Name, globalMinioSysTmp) { + continue + } t, e := time.Parse(time.RFC1123, object.Properties.LastModified) if e != nil { continue @@ -300,6 +339,14 @@ func (a *azureObjects) ListObjects(bucket, prefix, marker, delimiter string, max ContentEncoding: object.Properties.ContentEncoding, }) } + + // Remove minio.sys.tmp prefix. + for i, prefix := range resp.BlobPrefixes { + if prefix == globalMinioSysTmp { + resp.BlobPrefixes = append(resp.BlobPrefixes[:i], resp.BlobPrefixes[i+1:]...) + break + } + } result.Prefixes = resp.BlobPrefixes return result, nil } @@ -321,6 +368,9 @@ func (a *azureObjects) ListObjectsV2(bucket, prefix, continuationToken string, f result.NextContinuationToken = resp.NextMarker } for _, object := range resp.Blobs { + if strings.HasPrefix(object.Name, globalMinioSysTmp) { + continue + } t, e := time.Parse(time.RFC1123, object.Properties.LastModified) if e != nil { continue @@ -335,6 +385,14 @@ func (a *azureObjects) ListObjectsV2(bucket, prefix, continuationToken string, f ContentEncoding: object.Properties.ContentEncoding, }) } + + // Remove minio.sys.tmp prefix. + for i, prefix := range resp.BlobPrefixes { + if prefix == globalMinioSysTmp { + resp.BlobPrefixes = append(resp.BlobPrefixes[:i], resp.BlobPrefixes[i+1:]...) + break + } + } result.Prefixes = resp.BlobPrefixes return result, nil } @@ -437,69 +495,70 @@ func (a *azureObjects) DeleteObject(bucket, object string) error { return nil } -// ListMultipartUploads - Incomplete implementation, for now just return the prefix if it is an incomplete upload. -// FIXME: Full ListMultipartUploads is not supported yet. It is supported just enough to help our client libs to -// support re-uploads. a.client.ListBlobs() can be made to return entries which include uncommitted blobs using -// which we need to filter out the committed blobs to get the list of uncommitted blobs. +// ListMultipartUploads - It's decided not to support List Multipart Uploads, hence returning empty result. func (a *azureObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) { - result.MaxUploads = maxUploads - result.Prefix = prefix - result.Delimiter = delimiter - meta := a.metaInfo.get(prefix) - if meta == nil { - // In case minio was restarted after NewMultipartUpload and before CompleteMultipartUpload we expect - // the client to do a fresh upload so that any metadata like content-type are sent again in the - // NewMultipartUpload. - return result, nil - } - result.Uploads = []uploadMetadata{{prefix, prefix, UTCNow(), "", nil}} + // It's decided not to support List Multipart Uploads, hence returning empty result. return result, nil } +type azureMultipartMetadata struct { + Name string `json:"name"` + Metadata map[string]string `json:"metadata"` +} + +func getAzureMetadataObjectName(objectName, uploadID string) string { + return fmt.Sprintf(metadataObjectNameTemplate, uploadID, sha256.Sum256([]byte(objectName))) +} + +func (a *azureObjects) checkUploadIDExists(bucketName, objectName, uploadID string) (err error) { + _, err = a.client.GetBlobMetadata(bucketName, getAzureMetadataObjectName(objectName, uploadID)) + err = azureToObjectError(traceError(err), bucketName, objectName) + oerr := ObjectNotFound{bucketName, objectName} + if errorCause(err) == oerr { + err = traceError(InvalidUploadID{}) + } + return err +} + // NewMultipartUpload - Use Azure equivalent CreateBlockBlob. func (a *azureObjects) NewMultipartUpload(bucket, object string, metadata map[string]string) (uploadID string, err error) { - // Azure doesn't return a unique upload ID and we use object name in place of it. Azure allows multiple uploads to - // co-exist as long as the user keeps the blocks uploaded (in block blobs) unique amongst concurrent upload attempts. - // Each concurrent client, keeps its own blockID list which it can commit. - uploadID = object if metadata == nil { - // Store an empty map as a placeholder else ListObjectParts/PutObjectPart will not work properly. metadata = make(map[string]string) - } else { - metadata = s3ToAzureHeaders(metadata) } - a.metaInfo.set(uploadID, metadata) - return uploadID, nil -} -// CopyObjectPart - Not implemented. -func (a *azureObjects) CopyObjectPart(srcBucket, srcObject, destBucket, destObject string, uploadID string, partID int, startOffset int64, length int64) (info PartInfo, err error) { - return info, traceError(NotImplemented{}) -} - -// Encode partID, subPartNumber and md5Hex to blockID. -func azureGetBlockID(partID, subPartNumber int, md5Hex string) string { - return base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%05d.%02d.%s", partID, subPartNumber, md5Hex))) -} + uploadID = mustGetAzureUploadID() + if err = a.checkUploadIDExists(bucket, object, uploadID); err == nil { + return "", traceError(errors.New("Upload ID name collision")) + } + metadataObject := getAzureMetadataObjectName(object, uploadID) + metadata = s3ToAzureHeaders(metadata) -// Parse blockID into partID, subPartNumber and md5Hex. -func azureParseBlockID(blockID string) (partID, subPartNumber int, md5Hex string, err error) { - var blockIDBytes []byte - if blockIDBytes, err = base64.StdEncoding.DecodeString(blockID); err != nil { - return + var jsonData []byte + if jsonData, err = json.Marshal(azureMultipartMetadata{Name: object, Metadata: metadata}); err != nil { + return "", traceError(err) } - if _, err = fmt.Sscanf(string(blockIDBytes), "%05d.%02d.%s", &partID, &subPartNumber, &md5Hex); err != nil { - err = fmt.Errorf("invalid block id '%s'", string(blockIDBytes)) + err = a.client.CreateBlockBlobFromReader(bucket, metadataObject, uint64(len(jsonData)), bytes.NewBuffer(jsonData), nil) + if err != nil { + return "", azureToObjectError(traceError(err), bucket, metadataObject) } - return + return uploadID, nil +} + +// CopyObjectPart - Not implemented. +func (a *azureObjects) CopyObjectPart(srcBucket, srcObject, destBucket, destObject string, uploadID string, partID int, startOffset int64, length int64) (info PartInfo, err error) { + return info, traceError(NotImplemented{}) } // PutObjectPart - Use Azure equivalent PutBlockWithLength. func (a *azureObjects) PutObjectPart(bucket, object, uploadID string, partID int, data *HashReader) (info PartInfo, err error) { - if meta := a.metaInfo.get(uploadID); meta == nil { - return info, traceError(InvalidUploadID{}) + if err = a.checkUploadIDExists(bucket, object, uploadID); err != nil { + return info, err + } + + if err = checkAzureUploadID(uploadID); err != nil { + return info, err } etag := data.md5Sum @@ -519,7 +578,7 @@ func (a *azureObjects) PutObjectPart(bucket, object, uploadID string, partID int subPartSize = remainingSize } - id := azureGetBlockID(partID, subPartNumber, etag) + id := azureGetBlockID(partID, subPartNumber, uploadID, etag) err = a.client.PutBlockWithLength(bucket, object, id, uint64(subPartSize), io.LimitReader(data, subPartSize), nil) if err != nil { return info, azureToObjectError(traceError(err), bucket, object) @@ -540,66 +599,56 @@ func (a *azureObjects) PutObjectPart(bucket, object, uploadID string, partID int // ListObjectParts - Use Azure equivalent GetBlockList. func (a *azureObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker int, maxParts int) (result ListPartsInfo, err error) { - result.Bucket = bucket - result.Object = object - result.UploadID = uploadID - result.MaxParts = maxParts - - if meta := a.metaInfo.get(uploadID); meta == nil { - return result, nil - } - resp, err := a.client.GetBlockList(bucket, object, storage.BlockListTypeUncommitted) - if err != nil { - return result, azureToObjectError(traceError(err), bucket, object) - } - tmpMaxParts := 0 - partCount := 0 // Used for figuring out IsTruncated. - nextPartNumberMarker := 0 - for _, part := range resp.UncommittedBlocks { - if tmpMaxParts == maxParts { - // Also takes care of the case if maxParts = 0 - break - } - partCount++ - partID, _, md5Hex, err := azureParseBlockID(part.Name) - if err != nil { - return result, err - } - if partID <= partNumberMarker { - continue - } - result.Parts = append(result.Parts, PartInfo{ - partID, - UTCNow(), - md5Hex, - part.Size, - }) - tmpMaxParts++ - nextPartNumberMarker = partID - } - if partCount < len(resp.UncommittedBlocks) { - result.IsTruncated = true - result.NextPartNumberMarker = nextPartNumberMarker + if err = a.checkUploadIDExists(bucket, object, uploadID); err != nil { + return result, err } + // It's decided not to support List Object Parts, hence returning empty result. return result, nil } // AbortMultipartUpload - Not Implemented. // There is no corresponding API in azure to abort an incomplete upload. The uncommmitted blocks // gets deleted after one week. -func (a *azureObjects) AbortMultipartUpload(bucket, object, uploadID string) error { - a.metaInfo.del(uploadID) - return nil +func (a *azureObjects) AbortMultipartUpload(bucket, object, uploadID string) (err error) { + if err = a.checkUploadIDExists(bucket, object, uploadID); err != nil { + return err + } + + return a.client.DeleteBlob(bucket, getAzureMetadataObjectName(object, uploadID), nil) } // CompleteMultipartUpload - Use Azure equivalent PutBlockList. func (a *azureObjects) CompleteMultipartUpload(bucket, object, uploadID string, uploadedParts []completePart) (objInfo ObjectInfo, err error) { - meta := a.metaInfo.get(uploadID) - if meta == nil { - return objInfo, traceError(InvalidUploadID{uploadID}) + metadataObject := getAzureMetadataObjectName(object, uploadID) + if err = a.checkUploadIDExists(bucket, object, uploadID); err != nil { + return objInfo, err + } + + if err = checkAzureUploadID(uploadID); err != nil { + return objInfo, err + } + + var metadataReader io.Reader + if metadataReader, err = a.client.GetBlob(bucket, metadataObject); err != nil { + return objInfo, azureToObjectError(traceError(err), bucket, metadataObject) + } + + var metadata azureMultipartMetadata + if err = json.NewDecoder(metadataReader).Decode(&metadata); err != nil { + return objInfo, azureToObjectError(traceError(err), bucket, metadataObject) } + meta := metadata.Metadata + defer func() { + if err != nil { + return + } + + derr := a.client.DeleteBlob(bucket, metadataObject, nil) + errorIf(derr, "unable to remove meta data object for upload ID %s", uploadID) + }() + resp, err := a.client.GetBlockList(bucket, object, storage.BlockListTypeUncommitted) if err != nil { return objInfo, azureToObjectError(traceError(err), bucket, object) @@ -608,12 +657,13 @@ func (a *azureObjects) CompleteMultipartUpload(bucket, object, uploadID string, getBlocks := func(partNumber int, etag string) (blocks []storage.Block, size int64, err error) { for _, part := range resp.UncommittedBlocks { var partID int + var readUploadID string var md5Hex string - if partID, _, md5Hex, err = azureParseBlockID(part.Name); err != nil { + if partID, _, readUploadID, md5Hex, err = azureParseBlockID(part.Name); err != nil { return nil, 0, err } - if partNumber == partID && etag == md5Hex { + if partNumber == partID && uploadID == readUploadID && etag == md5Hex { blocks = append(blocks, storage.Block{ ID: part.Name, Status: storage.BlockStatusUncommitted, @@ -676,7 +726,6 @@ func (a *azureObjects) CompleteMultipartUpload(bucket, object, uploadID string, return objInfo, azureToObjectError(traceError(err), bucket, object) } } - a.metaInfo.del(uploadID) return a.GetObjectInfo(bucket, object) } diff --git a/cmd/gateway-azure_test.go b/cmd/gateway-azure_test.go index b49b03628..c39093334 100644 --- a/cmd/gateway-azure_test.go +++ b/cmd/gateway-azure_test.go @@ -147,14 +147,15 @@ func TestAzureGetBlockID(t *testing.T) { testCases := []struct { partID int subPartNumber int + uploadID string md5 string blockID string }{ - {1, 7, "d41d8cd98f00b204e9800998ecf8427e", "MDAwMDEuMDcuZDQxZDhjZDk4ZjAwYjIwNGU5ODAwOTk4ZWNmODQyN2U="}, - {2, 19, "a7fb6b7b36ee4ed66b5546fac4690273", "MDAwMDIuMTkuYTdmYjZiN2IzNmVlNGVkNjZiNTU0NmZhYzQ2OTAyNzM="}, + {1, 7, "f328c35cad938137", "d41d8cd98f00b204e9800998ecf8427e", "MDAwMDEuMDcuZjMyOGMzNWNhZDkzODEzNy5kNDFkOGNkOThmMDBiMjA0ZTk4MDA5OThlY2Y4NDI3ZQ=="}, + {2, 19, "abcdc35cad938137", "a7fb6b7b36ee4ed66b5546fac4690273", "MDAwMDIuMTkuYWJjZGMzNWNhZDkzODEzNy5hN2ZiNmI3YjM2ZWU0ZWQ2NmI1NTQ2ZmFjNDY5MDI3Mw=="}, } for _, test := range testCases { - blockID := azureGetBlockID(test.partID, test.subPartNumber, test.md5) + blockID := azureGetBlockID(test.partID, test.subPartNumber, test.uploadID, test.md5) if blockID != test.blockID { t.Fatalf("%s is not equal to %s", blockID, test.blockID) } @@ -167,13 +168,14 @@ func TestAzureParseBlockID(t *testing.T) { blockID string partID int subPartNumber int + uploadID string md5 string }{ - {"MDAwMDEuMDcuZDQxZDhjZDk4ZjAwYjIwNGU5ODAwOTk4ZWNmODQyN2U=", 1, 7, "d41d8cd98f00b204e9800998ecf8427e"}, - {"MDAwMDIuMTkuYTdmYjZiN2IzNmVlNGVkNjZiNTU0NmZhYzQ2OTAyNzM=", 2, 19, "a7fb6b7b36ee4ed66b5546fac4690273"}, + {"MDAwMDEuMDcuZjMyOGMzNWNhZDkzODEzNy5kNDFkOGNkOThmMDBiMjA0ZTk4MDA5OThlY2Y4NDI3ZQ==", 1, 7, "f328c35cad938137", "d41d8cd98f00b204e9800998ecf8427e"}, + {"MDAwMDIuMTkuYWJjZGMzNWNhZDkzODEzNy5hN2ZiNmI3YjM2ZWU0ZWQ2NmI1NTQ2ZmFjNDY5MDI3Mw==", 2, 19, "abcdc35cad938137", "a7fb6b7b36ee4ed66b5546fac4690273"}, } for _, test := range testCases { - partID, subPartNumber, md5, err := azureParseBlockID(test.blockID) + partID, subPartNumber, uploadID, md5, err := azureParseBlockID(test.blockID) if err != nil { t.Fatal(err) } @@ -183,12 +185,15 @@ func TestAzureParseBlockID(t *testing.T) { if subPartNumber != test.subPartNumber { t.Fatalf("%d not equal to %d", subPartNumber, test.subPartNumber) } + if uploadID != test.uploadID { + t.Fatalf("%s not equal to %s", uploadID, test.uploadID) + } if md5 != test.md5 { t.Fatalf("%s not equal to %s", md5, test.md5) } } - _, _, _, err := azureParseBlockID("junk") + _, _, _, _, err := azureParseBlockID("junk") if err == nil { t.Fatal("Expected azureParseBlockID() to return error") } @@ -271,3 +276,29 @@ func TestAnonErrToObjectErr(t *testing.T) { }) } } + +func TestCheckAzureUploadID(t *testing.T) { + invalidUploadIDs := []string{ + "123456789abcdefg", + "hello world", + "0x1234567890", + "1234567890abcdef1234567890abcdef", + } + + for _, uploadID := range invalidUploadIDs { + if err := checkAzureUploadID(uploadID); err == nil { + t.Fatalf("%s: expected: , got: ", uploadID) + } + } + + validUploadIDs := []string{ + "1234567890abcdef", + "1122334455667788", + } + + for _, uploadID := range validUploadIDs { + if err := checkAzureUploadID(uploadID); err != nil { + t.Fatalf("%s: expected: , got: %s", uploadID, err) + } + } +} diff --git a/cmd/gateway-gcs.go b/cmd/gateway-gcs.go index 120afc7ac..03d229d8d 100644 --- a/cmd/gateway-gcs.go +++ b/cmd/gateway-gcs.go @@ -40,15 +40,10 @@ import ( ) const ( - // gcsMinioSysTmp is used for multiparts. We have "minio.sys.tmp" prefix so that - // listing on the GCS lists this entry in the end. Also in the gateway - // ListObjects we filter out this entry. - gcsMinioSysTmp = "minio.sys.tmp/" - // Path where multipart objects are saved. // If we change the backend format we will use a different url path like /multipart/v2 // but we will not migrate old data. - gcsMinioMultipartPathV1 = gcsMinioSysTmp + "multipart/v1" + gcsMinioMultipartPathV1 = globalMinioSysTmp + "multipart/v1" // Multipart meta file. gcsMinioMultipartMeta = "gcs.json" @@ -290,7 +285,7 @@ func newGCSGateway(projectID string) (GatewayLayer, error) { // Cleanup old files in minio.sys.tmp of the given bucket. func (l *gcsGateway) CleanupGCSMinioSysTmpBucket(bucket string) { - it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Prefix: gcsMinioSysTmp, Versions: false}) + it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Prefix: globalMinioSysTmp, Versions: false}) for { attrs, err := it.Next() if err != nil { @@ -408,7 +403,7 @@ func (l *gcsGateway) DeleteBucket(bucket string) error { if err != nil { return gcsToObjectError(traceError(err)) } - if objAttrs.Prefix == gcsMinioSysTmp { + if objAttrs.Prefix == globalMinioSysTmp { gcsMinioPathFound = true continue } @@ -420,7 +415,7 @@ func (l *gcsGateway) DeleteBucket(bucket string) error { } if gcsMinioPathFound { // Remove minio.sys.tmp before deleting the bucket. - itObject = l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Versions: false, Prefix: gcsMinioSysTmp}) + itObject = l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Versions: false, Prefix: globalMinioSysTmp}) for { objAttrs, err := itObject.Next() if err == iterator.Done { @@ -505,7 +500,7 @@ func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, de // metadata folder, then just break // otherwise we've truncated the output attrs, _ := it.Next() - if attrs != nil && attrs.Prefix == gcsMinioSysTmp { + if attrs != nil && attrs.Prefix == globalMinioSysTmp { break } @@ -523,16 +518,16 @@ func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, de nextMarker = toGCSPageToken(attrs.Name) - if attrs.Prefix == gcsMinioSysTmp { + if attrs.Prefix == globalMinioSysTmp { // We don't return our metadata prefix. continue } - if !strings.HasPrefix(prefix, gcsMinioSysTmp) { + if !strings.HasPrefix(prefix, globalMinioSysTmp) { // If client lists outside gcsMinioPath then we filter out gcsMinioPath/* entries. // But if the client lists inside gcsMinioPath then we return the entries in gcsMinioPath/ // which will be helpful to observe the "directory structure" for debugging purposes. - if strings.HasPrefix(attrs.Prefix, gcsMinioSysTmp) || - strings.HasPrefix(attrs.Name, gcsMinioSysTmp) { + if strings.HasPrefix(attrs.Prefix, globalMinioSysTmp) || + strings.HasPrefix(attrs.Name, globalMinioSysTmp) { continue } } @@ -605,16 +600,16 @@ func (l *gcsGateway) ListObjectsV2(bucket, prefix, continuationToken string, fet return ListObjectsV2Info{}, gcsToObjectError(traceError(err), bucket, prefix) } - if attrs.Prefix == gcsMinioSysTmp { + if attrs.Prefix == globalMinioSysTmp { // We don't return our metadata prefix. continue } - if !strings.HasPrefix(prefix, gcsMinioSysTmp) { + if !strings.HasPrefix(prefix, globalMinioSysTmp) { // If client lists outside gcsMinioPath then we filter out gcsMinioPath/* entries. // But if the client lists inside gcsMinioPath then we return the entries in gcsMinioPath/ // which will be helpful to observe the "directory structure" for debugging purposes. - if strings.HasPrefix(attrs.Prefix, gcsMinioSysTmp) || - strings.HasPrefix(attrs.Name, gcsMinioSysTmp) { + if strings.HasPrefix(attrs.Prefix, globalMinioSysTmp) || + strings.HasPrefix(attrs.Name, globalMinioSysTmp) { continue } } @@ -954,7 +949,7 @@ func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID // Returns name of the composed object. gcsMultipartComposeName := func(uploadID string, composeNumber int) string { - return fmt.Sprintf("%s/tmp/%s/composed-object-%05d", gcsMinioSysTmp, uploadID, composeNumber) + return fmt.Sprintf("%s/tmp/%s/composed-object-%05d", globalMinioSysTmp, uploadID, composeNumber) } composeCount := int(math.Ceil(float64(len(parts)) / float64(gcsMaxComponents))) diff --git a/cmd/globals.go b/cmd/globals.go index 54c001494..ba5dc4d52 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -53,6 +53,10 @@ const ( globalMinioModeGatewayAzure = "mode-gateway-azure" globalMinioModeGatewayS3 = "mode-gateway-s3" globalMinioModeGatewayGCS = "mode-gateway-gcs" + + // globalMinioSysTmp prefix is used in Azure/GCS gateway for save metadata sent by Initialize Multipart Upload API. + globalMinioSysTmp = "minio.sys.tmp/" + // Add new global values here. )