diff --git a/cmd/gateway/azure/gateway-azure.go b/cmd/gateway/azure/gateway-azure.go index 2546e269e..622a192b1 100644 --- a/cmd/gateway/azure/gateway-azure.go +++ b/cmd/gateway/azure/gateway-azure.go @@ -26,6 +26,7 @@ import ( "fmt" "io" "net/http" + "path" "sort" "strconv" "strings" @@ -52,6 +53,8 @@ const ( metadataObjectNameTemplate = minio.GatewayMinioSysTmp + "multipart/v1/%s.%x/azure.json" azureBackend = "azure" azureMarkerPrefix = "{minio}" + metadataPartNamePrefix = minio.GatewayMinioSysTmp + "multipart/v1/%s.%x" + maxPartsCount = 10000 ) func init() { @@ -261,6 +264,26 @@ func s3MetaToAzureProperties(ctx context.Context, s3Metadata map[string]string) return blobMeta, props, nil } +const ( + partMetaVersionV1 = "1" +) + +// partMetadataV1 struct holds the part specific metadata for +// multipart operations. +type partMetadataV1 struct { + Version string `json:"version"` + Size int64 `json:"Size"` + BlockIDs []string `json:"blockIDs"` + ETag string `json:"etag"` +} + +// Returns the initialized part metadata struct +func newPartMetaV1(uploadID string, partID int) (partMeta *partMetadataV1) { + p := &partMetadataV1{} + p.Version = partMetaVersionV1 + return p +} + // azurePropertiesToS3Meta converts Azure metadata/properties to S3 // metadata. It is the reverse of s3MetaToAzureProperties. Azure's // `.GetMetadata()` lower-cases all header keys, so this is taken into @@ -405,37 +428,13 @@ func checkAzureUploadID(ctx context.Context, uploadID string) (err error) { return nil } -// Encode partID, subPartNumber, uploadID and md5Hex to blockID. -func azureGetBlockID(partID, subPartNumber int, uploadID, md5Hex string) string { - return base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%05d.%02d.%s.%s", partID, subPartNumber, uploadID, md5Hex))) -} - -// Parse blockID into partID, subPartNumber and md5Hex. -func azureParseBlockID(blockID string) (partID, subPartNumber int, uploadID, md5Hex string, err error) { - var blockIDBytes []byte - if blockIDBytes, err = base64.StdEncoding.DecodeString(blockID); err != nil { - return - } - - tokens := strings.Split(string(blockIDBytes), ".") - if len(tokens) != 4 { - err = fmt.Errorf("invalid block id '%s'", string(blockIDBytes)) - return - } - - if partID, err = strconv.Atoi(tokens[0]); err != nil || partID <= 0 { - err = fmt.Errorf("invalid part number in block id '%s'", string(blockIDBytes)) - return - } - - if subPartNumber, err = strconv.Atoi(tokens[1]); err != nil || subPartNumber <= 0 { - err = fmt.Errorf("invalid sub-part number in block id '%s'", string(blockIDBytes)) +// parses partID from part metadata file name +func parseAzurePart(metaPartFileName, prefix string) (partID int, err error) { + partStr := strings.TrimPrefix(metaPartFileName, prefix+"/") + if partID, err = strconv.Atoi(partStr); err != nil || partID <= 0 { + err = fmt.Errorf("invalid part number in block id '%s'", string(partID)) return } - - uploadID = tokens[2] - md5Hex = tokens[3] - return } @@ -747,14 +746,7 @@ func (a *azureObjects) PutObject(ctx context.Context, bucket, object string, r * return a.GetObjectInfo(ctx, bucket, object, opts) } - uuid, err := getAzureUploadID() - if err != nil { - return objInfo, err - } - etag := data.MD5HexString() - if etag == "" { - etag = minio.GenETag() - } + blockIDs := make(map[string]string) blob := a.client.GetContainerReference(bucket).GetBlobReference(object) subPartSize, subPartNumber := int64(azureBlockSize), 1 @@ -768,7 +760,8 @@ func (a *azureObjects) PutObject(ctx context.Context, bucket, object string, r * subPartSize = remainingSize } - id := azureGetBlockID(1, subPartNumber, uuid, etag) + id := base64.StdEncoding.EncodeToString([]byte(minio.MustGetUUID())) + blockIDs[id] = "" if err = blob.PutBlockWithLength(id, uint64(subPartSize), io.LimitReader(data, subPartSize), nil); err != nil { return objInfo, azureToObjectError(err, bucket, object) } @@ -780,17 +773,9 @@ func (a *azureObjects) PutObject(ctx context.Context, bucket, object string, r * if err != nil { return objInfo, azureToObjectError(err, bucket, object) } - - getBlocks := func(partNumber int, etag string) (blocks []storage.Block, size int64, aerr error) { + getBlocks := func(blocksMap map[string]string) (blocks []storage.Block, size int64, aerr error) { for _, part := range resp.UncommittedBlocks { - var partID int - var readUploadID string - var md5Hex string - if partID, _, readUploadID, md5Hex, aerr = azureParseBlockID(part.Name); aerr != nil { - return nil, 0, aerr - } - - if partNumber == partID && uuid == readUploadID && etag == md5Hex { + if _, ok := blocksMap[part.Name]; ok { blocks = append(blocks, storage.Block{ ID: part.Name, Status: storage.BlockStatusUncommitted, @@ -808,7 +793,7 @@ func (a *azureObjects) PutObject(ctx context.Context, bucket, object string, r * } var blocks []storage.Block - blocks, _, err = getBlocks(1, etag) + blocks, _, err = getBlocks(blockIDs) if err != nil { logger.LogIf(ctx, err) return objInfo, err @@ -823,7 +808,7 @@ func (a *azureObjects) PutObject(ctx context.Context, bucket, object string, r * } // Save md5sum for future processing on the object. - metadata["x-amz-meta-md5sum"] = hex.EncodeToString(data.MD5Current()) + metadata["x-amz-meta-md5sum"] = r.MD5CurrentHexString() objBlob.Metadata, objBlob.Properties, err = s3MetaToAzureProperties(ctx, metadata) if err != nil { return objInfo, azureToObjectError(err, bucket, object) @@ -896,6 +881,17 @@ func getAzureMetadataObjectName(objectName, uploadID string) string { return fmt.Sprintf(metadataObjectNameTemplate, uploadID, sha256.Sum256([]byte(objectName))) } +// gets the name of part metadata file for multipart upload operations +func getAzureMetadataPartName(objectName, uploadID string, partID int) string { + partMetaPrefix := getAzureMetadataPartPrefix(uploadID, objectName) + return path.Join(partMetaPrefix, fmt.Sprintf("%d", partID)) +} + +// gets the prefix of part metadata file +func getAzureMetadataPartPrefix(uploadID, objectName string) string { + return fmt.Sprintf(metadataPartNamePrefix, uploadID, sha256.Sum256([]byte(objectName))) +} + func (a *azureObjects) checkUploadIDExists(ctx context.Context, bucketName, objectName, uploadID string) (err error) { blob := a.client.GetContainerReference(bucketName).GetBlobReference( getAzureMetadataObjectName(objectName, uploadID)) @@ -948,11 +944,7 @@ func (a *azureObjects) PutObjectPart(ctx context.Context, bucket, object, upload return info, err } - etag := data.MD5HexString() - if etag == "" { - etag = minio.GenETag() - } - + partMetaV1 := newPartMetaV1(uploadID, partID) subPartSize, subPartNumber := int64(azureBlockSize), 1 for remainingSize := data.Size(); remainingSize >= 0; remainingSize -= subPartSize { // Allow to create zero sized part. @@ -964,7 +956,9 @@ func (a *azureObjects) PutObjectPart(ctx context.Context, bucket, object, upload subPartSize = remainingSize } - id := azureGetBlockID(partID, subPartNumber, uploadID, etag) + id := base64.StdEncoding.EncodeToString([]byte(minio.MustGetUUID())) + partMetaV1.BlockIDs = append(partMetaV1.BlockIDs, id) + blob := a.client.GetContainerReference(bucket).GetBlobReference(object) err = blob.PutBlockWithLength(id, uint64(subPartSize), io.LimitReader(data, subPartSize), nil) if err != nil { @@ -973,8 +967,26 @@ func (a *azureObjects) PutObjectPart(ctx context.Context, bucket, object, upload subPartNumber++ } + partMetaV1.ETag = r.MD5CurrentHexString() + partMetaV1.Size = data.Size() + + // maintain per part md5sum in a temporary part metadata file until upload + // is finalized. + metadataObject := getAzureMetadataPartName(object, uploadID, partID) + var jsonData []byte + if jsonData, err = json.Marshal(partMetaV1); err != nil { + logger.LogIf(ctx, err) + return info, err + } + + blob := a.client.GetContainerReference(bucket).GetBlobReference(metadataObject) + err = blob.CreateBlockBlobFromReader(bytes.NewBuffer(jsonData), nil) + if err != nil { + return info, azureToObjectError(err, bucket, metadataObject) + } + info.PartNumber = partID - info.ETag = etag + info.ETag = partMetaV1.ETag info.LastModified = minio.UTCNow() info.Size = data.Size() return info, nil @@ -991,48 +1003,57 @@ func (a *azureObjects) ListObjectParts(ctx context.Context, bucket, object, uplo result.UploadID = uploadID result.MaxParts = maxParts - objBlob := a.client.GetContainerReference(bucket).GetBlobReference(object) - resp, err := objBlob.GetBlockList(storage.BlockListTypeUncommitted, nil) - azureErr, ok := err.(storage.AzureStorageServiceError) - if ok && azureErr.StatusCode == http.StatusNotFound { - // If no parts are uploaded yet then we return empty list. - return result, nil + var parts []minio.PartInfo + var marker, delimiter string + maxKeys := maxPartsCount + if partNumberMarker == 0 { + maxKeys = maxParts } + prefix := getAzureMetadataPartPrefix(uploadID, object) + container := a.client.GetContainerReference(bucket) + resp, err := container.ListBlobs(storage.ListBlobsParameters{ + Prefix: prefix, + Marker: marker, + Delimiter: delimiter, + MaxResults: uint(maxKeys), + }) if err != nil { - return result, azureToObjectError(err, bucket, object) - } - // Build a sorted list of parts and return the requested entries. - partsMap := make(map[int]minio.PartInfo) - for _, block := range resp.UncommittedBlocks { - var partNumber int - var parsedUploadID string - var md5Hex string - if partNumber, _, parsedUploadID, md5Hex, err = azureParseBlockID(block.Name); err != nil { - return result, azureToObjectError(fmt.Errorf("Unexpected error"), bucket, object) + return result, azureToObjectError(err, bucket, prefix) + } + + for _, blob := range resp.Blobs { + if delimiter == "" && !strings.HasPrefix(blob.Name, minio.GatewayMinioSysTmp) { + // We filter out non minio.GatewayMinioSysTmp entries in the recursive listing. + continue } - if parsedUploadID != uploadID { + // filter temporary metadata file for blob + if strings.HasSuffix(blob.Name, "azure.json") { continue } - part, ok := partsMap[partNumber] - if !ok { - partsMap[partNumber] = minio.PartInfo{ - PartNumber: partNumber, - Size: block.Size, - ETag: md5Hex, - } + if !isAzureMarker(marker) && blob.Name <= marker { + // If the application used ListObjectsV1 style marker then we + // skip all the entries till we reach the marker. continue } - if part.ETag != md5Hex { - // If two parts of same partNumber were uploaded with different contents - // return error as we won't be able to decide which the latest part is. + partNumber, err := parseAzurePart(blob.Name, prefix) + if err != nil { return result, azureToObjectError(fmt.Errorf("Unexpected error"), bucket, object) } - part.Size += block.Size - partsMap[partNumber] = part - } - var parts []minio.PartInfo - for _, part := range partsMap { - parts = append(parts, part) + var metadata partMetadataV1 + var metadataReader io.Reader + blob := a.client.GetContainerReference(bucket).GetBlobReference(blob.Name) + if metadataReader, err = blob.Get(nil); err != nil { + return result, azureToObjectError(fmt.Errorf("Unexpected error"), bucket, object) + } + if err = json.NewDecoder(metadataReader).Decode(&metadata); err != nil { + logger.LogIf(ctx, err) + return result, azureToObjectError(err, bucket, object) + } + parts = append(parts, minio.PartInfo{ + PartNumber: partNumber, + Size: metadata.Size, + ETag: metadata.ETag, + }) } sort.Slice(parts, func(i int, j int) bool { return parts[i].PartNumber < parts[j].PartNumber @@ -1071,6 +1092,22 @@ func (a *azureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, if err = a.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { return err } + var partNumberMarker int + for { + lpi, err := a.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxPartsCount, minio.ObjectOptions{}) + if err != nil { + break + } + for _, part := range lpi.Parts { + pblob := a.client.GetContainerReference(bucket).GetBlobReference( + getAzureMetadataPartName(object, uploadID, part.PartNumber)) + pblob.Delete(nil) + } + partNumberMarker = lpi.NextPartNumberMarker + if !lpi.IsTruncated { + break + } + } blob := a.client.GetContainerReference(bucket).GetBlobReference( getAzureMetadataObjectName(object, uploadID)) @@ -1100,66 +1137,33 @@ func (a *azureObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje return objInfo, azureToObjectError(err, bucket, metadataObject) } - defer func() { - blob := a.client.GetContainerReference(bucket).GetBlobReference(metadataObject) - derr := blob.Delete(nil) - logger.GetReqInfo(ctx).AppendTags("uploadID", uploadID) - logger.LogIf(ctx, derr) - }() - objBlob := a.client.GetContainerReference(bucket).GetBlobReference(object) - resp, err := objBlob.GetBlockList(storage.BlockListTypeUncommitted, nil) - if err != nil { - return objInfo, azureToObjectError(err, bucket, object) - } - - getBlocks := func(partNumber int, etag string) (blocks []storage.Block, size int64, err error) { - for _, part := range resp.UncommittedBlocks { - var partID int - var readUploadID string - var md5Hex string - if partID, _, readUploadID, md5Hex, err = azureParseBlockID(part.Name); err != nil { - return nil, 0, err - } - - if partNumber == partID && uploadID == readUploadID && etag == md5Hex { - blocks = append(blocks, storage.Block{ - ID: part.Name, - Status: storage.BlockStatusUncommitted, - }) - - size += part.Size - } - } - - if len(blocks) == 0 { - return nil, 0, minio.InvalidPart{} - } - - return blocks, size, nil - } var allBlocks []storage.Block - partSizes := make([]int64, len(uploadedParts)) for i, part := range uploadedParts { - var blocks []storage.Block - var size int64 - blocks, size, err = getBlocks(part.PartNumber, part.ETag) - if err != nil { - logger.LogIf(ctx, err) - return objInfo, err + var partMetadataReader io.Reader + var partMetadata partMetadataV1 + partMetadataObject := getAzureMetadataPartName(object, uploadID, part.PartNumber) + pblob := a.client.GetContainerReference(bucket).GetBlobReference(partMetadataObject) + if partMetadataReader, err = pblob.Get(nil); err != nil { + return objInfo, azureToObjectError(err, bucket, partMetadataObject) } - allBlocks = append(allBlocks, blocks...) - partSizes[i] = size - } + if err = json.NewDecoder(partMetadataReader).Decode(&partMetadata); err != nil { + logger.LogIf(ctx, err) + return objInfo, azureToObjectError(err, bucket, partMetadataObject) + } - // Error out if parts except last part sizing < 5MiB. - for i, size := range partSizes[:len(partSizes)-1] { - if size < azureS3MinPartSize { + if partMetadata.ETag != part.ETag { + return objInfo, minio.InvalidPart{} + } + for _, blockID := range partMetadata.BlockIDs { + allBlocks = append(allBlocks, storage.Block{ID: blockID, Status: storage.BlockStatusUncommitted}) + } + if i < (len(uploadedParts)-1) && partMetadata.Size < azureS3MinPartSize { return objInfo, minio.PartTooSmall{ PartNumber: uploadedParts[i].PartNumber, - PartSize: size, + PartSize: partMetadata.Size, PartETag: uploadedParts[i].ETag, } } @@ -1183,6 +1187,28 @@ func (a *azureObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje return objInfo, azureToObjectError(err, bucket, object) } } + var partNumberMarker int + for { + lpi, err := a.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxPartsCount, minio.ObjectOptions{}) + if err != nil { + break + } + for _, part := range lpi.Parts { + pblob := a.client.GetContainerReference(bucket).GetBlobReference( + getAzureMetadataPartName(object, uploadID, part.PartNumber)) + pblob.Delete(nil) + } + partNumberMarker = lpi.NextPartNumberMarker + if !lpi.IsTruncated { + break + } + } + + blob = a.client.GetContainerReference(bucket).GetBlobReference(metadataObject) + derr := blob.Delete(nil) + logger.GetReqInfo(ctx).AppendTags("uploadID", uploadID) + logger.LogIf(ctx, derr) + return a.GetObjectInfo(ctx, bucket, object, minio.ObjectOptions{}) } diff --git a/cmd/gateway/azure/gateway-azure_test.go b/cmd/gateway/azure/gateway-azure_test.go index dcae19a7a..e7c028345 100644 --- a/cmd/gateway/azure/gateway-azure_test.go +++ b/cmd/gateway/azure/gateway-azure_test.go @@ -188,72 +188,6 @@ func TestAzureToObjectError(t *testing.T) { } } -// Test azureGetBlockID(). -func TestAzureGetBlockID(t *testing.T) { - testCases := []struct { - partID int - subPartNumber int - uploadID string - md5 string - blockID string - }{ - {1, 7, "f328c35cad938137", "d41d8cd98f00b204e9800998ecf8427e", "MDAwMDEuMDcuZjMyOGMzNWNhZDkzODEzNy5kNDFkOGNkOThmMDBiMjA0ZTk4MDA5OThlY2Y4NDI3ZQ=="}, - {2, 19, "abcdc35cad938137", "a7fb6b7b36ee4ed66b5546fac4690273", "MDAwMDIuMTkuYWJjZGMzNWNhZDkzODEzNy5hN2ZiNmI3YjM2ZWU0ZWQ2NmI1NTQ2ZmFjNDY5MDI3Mw=="}, - } - for _, test := range testCases { - blockID := azureGetBlockID(test.partID, test.subPartNumber, test.uploadID, test.md5) - if blockID != test.blockID { - t.Fatalf("%s is not equal to %s", blockID, test.blockID) - } - } -} - -// Test azureParseBlockID(). -func TestAzureParseBlockID(t *testing.T) { - testCases := []struct { - blockID string - partID int - subPartNumber int - uploadID string - md5 string - success bool - }{ - // Invalid base64. - {"MDAwMDEuMDcuZjMyOGMzNWNhZDkzODEzNy5kNDFkOGNkOThmMDBiMjA0ZTk4MDA5OThlY2Y4NDI3ZQ=", 0, 0, "", "", false}, - // Invalid number of tokens. - {"MDAwMDEuQUEuZjMyOGMzNWNhZDkzODEzNwo=", 0, 0, "", "", false}, - // Invalid encoded part ID. - {"MDAwMGEuMDcuZjMyOGMzNWNhZDkzODEzNy5kNDFkOGNkOThmMDBiMjA0ZTk4MDA5OThlY2Y4NDI3ZQo=", 0, 0, "", "", false}, - // Invalid sub part ID. - {"MDAwMDEuQUEuZjMyOGMzNWNhZDkzODEzNy5kNDFkOGNkOThmMDBiMjA0ZTk4MDA5OThlY2Y4NDI3ZQo=", 0, 0, "", "", false}, - {"MDAwMDEuMDcuZjMyOGMzNWNhZDkzODEzNy5kNDFkOGNkOThmMDBiMjA0ZTk4MDA5OThlY2Y4NDI3ZQ==", 1, 7, "f328c35cad938137", "d41d8cd98f00b204e9800998ecf8427e", true}, - {"MDAwMDIuMTkuYWJjZGMzNWNhZDkzODEzNy5hN2ZiNmI3YjM2ZWU0ZWQ2NmI1NTQ2ZmFjNDY5MDI3Mw==", 2, 19, "abcdc35cad938137", "a7fb6b7b36ee4ed66b5546fac4690273", true}, - } - for i, test := range testCases { - partID, subPartNumber, uploadID, md5, err := azureParseBlockID(test.blockID) - if err != nil && test.success { - t.Errorf("Test %d: Expected success but failed %s", i+1, err) - } - if err == nil && !test.success { - t.Errorf("Test %d: Expected to fail but succeeeded insteadl", i+1) - } - if err == nil { - if partID != test.partID { - t.Errorf("Test %d: %d not equal to %d", i+1, partID, test.partID) - } - if subPartNumber != test.subPartNumber { - t.Errorf("Test %d: %d not equal to %d", i+1, subPartNumber, test.subPartNumber) - } - if uploadID != test.uploadID { - t.Errorf("Test %d: %s not equal to %s", i+1, uploadID, test.uploadID) - } - if md5 != test.md5 { - t.Errorf("Test %d: %s not equal to %s", i+1, md5, test.md5) - } - } - } -} - func TestAnonErrToObjectErr(t *testing.T) { testCases := []struct { name string