From 5d859b217815be613ce3dde733665f81248bc707 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 2 Oct 2018 23:08:16 -0700 Subject: [PATCH] gateway/azure: allow putObject to support block based upload (#6552) Current implementation simply uses all the memory locally and crashes when a large upload is initiated using Minio browser UI. This PR uploads stream in blocks and finally commits the blocks allowing for low memory footprint while uploading large objects through Minio browser UI. This PR also adds ETag compatibility for single PUT operations. Fixes #6542 Fixes #6550 --- cmd/gateway/azure/gateway-azure.go | 121 +++++++++++++++++++++++++++-- 1 file changed, 114 insertions(+), 7 deletions(-) diff --git a/cmd/gateway/azure/gateway-azure.go b/cmd/gateway/azure/gateway-azure.go index 7d2fe2cbc..700f795f7 100644 --- a/cmd/gateway/azure/gateway-azure.go +++ b/cmd/gateway/azure/gateway-azure.go @@ -685,10 +685,33 @@ func (a *azureObjects) GetObjectInfo(ctx context.Context, bucket, object string, return objInfo, azureToObjectError(err, bucket, object) } + // Populate correct ETag's if possible, this code primarily exists + // because AWS S3 indicates that + // + // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonResponseHeaders.html + // + // Objects created by the PUT Object, POST Object, or Copy operation, + // or through the AWS Management Console, and are encrypted by SSE-S3 + // or plaintext, have ETags that are an MD5 digest of their object data. + // + // Some applications depend on this behavior refer https://github.com/minio/minio/issues/6550 + // So we handle it here and make this consistent. + etag := minio.ToS3ETag(blob.Properties.Etag) + switch { + case blob.Properties.ContentMD5 != "": + b, err := base64.StdEncoding.DecodeString(blob.Properties.ContentMD5) + if err == nil { + etag = hex.EncodeToString(b) + } + case blob.Metadata["md5sum"] != "": + etag = blob.Metadata["md5sum"] + delete(blob.Metadata, "md5sum") + } + return minio.ObjectInfo{ Bucket: bucket, UserDefined: azurePropertiesToS3Meta(blob.Metadata, blob.Properties), - ETag: minio.ToS3ETag(blob.Properties.Etag), + ETag: etag, ModTime: time.Time(blob.Properties.LastModified), Name: object, Size: blob.Properties.ContentLength, @@ -700,15 +723,103 @@ func (a *azureObjects) GetObjectInfo(ctx context.Context, bucket, object string, // PutObject - Create a new blob with the incoming data, // uses Azure equivalent CreateBlockBlobFromReader. func (a *azureObjects) PutObject(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) { + if data.Size() < azureBlockSize/10 { + blob := a.client.GetContainerReference(bucket).GetBlobReference(object) + blob.Metadata, blob.Properties, err = s3MetaToAzureProperties(ctx, metadata) + if err = blob.CreateBlockBlobFromReader(data, nil); err != nil { + return objInfo, azureToObjectError(err, bucket, object) + } + return a.GetObjectInfo(ctx, bucket, object, opts) + } + + uuid, err := getAzureUploadID() + if err != nil { + return objInfo, err + } + etag := data.MD5HexString() + if etag == "" { + etag = minio.GenETag() + } + blob := a.client.GetContainerReference(bucket).GetBlobReference(object) - blob.Metadata, blob.Properties, err = s3MetaToAzureProperties(ctx, metadata) + subPartSize, subPartNumber := int64(azureBlockSize), 1 + for remainingSize := data.Size(); remainingSize >= 0; remainingSize -= subPartSize { + // Allow to create zero sized part. + if remainingSize == 0 && subPartNumber > 1 { + break + } + + if remainingSize < subPartSize { + subPartSize = remainingSize + } + + id := azureGetBlockID(1, subPartNumber, uuid, etag) + if err = blob.PutBlockWithLength(id, uint64(subPartSize), io.LimitReader(data, subPartSize), nil); err != nil { + return objInfo, azureToObjectError(err, bucket, object) + } + subPartNumber++ + } + + objBlob := a.client.GetContainerReference(bucket).GetBlobReference(object) + resp, err := objBlob.GetBlockList(storage.BlockListTypeUncommitted, nil) if err != nil { return objInfo, azureToObjectError(err, bucket, object) } - err = blob.CreateBlockBlobFromReader(data, nil) + + getBlocks := func(partNumber int, etag string) (blocks []storage.Block, size int64, aerr error) { + for _, part := range resp.UncommittedBlocks { + var partID int + var readUploadID string + var md5Hex string + if partID, _, readUploadID, md5Hex, aerr = azureParseBlockID(part.Name); aerr != nil { + return nil, 0, aerr + } + + if partNumber == partID && uuid == readUploadID && etag == md5Hex { + blocks = append(blocks, storage.Block{ + ID: part.Name, + Status: storage.BlockStatusUncommitted, + }) + + size += part.Size + } + } + + if len(blocks) == 0 { + return nil, 0, minio.InvalidPart{} + } + + return blocks, size, nil + } + + var blocks []storage.Block + blocks, _, err = getBlocks(1, etag) if err != nil { + logger.LogIf(ctx, err) + return objInfo, err + } + + if err = objBlob.PutBlockList(blocks, nil); err != nil { + return objInfo, azureToObjectError(err, bucket, object) + } + + if len(metadata) == 0 { + metadata = map[string]string{} + } + + // Save md5sum for future processing on the object. + metadata["x-amz-meta-md5sum"] = hex.EncodeToString(data.MD5Current()) + objBlob.Metadata, objBlob.Properties, err = s3MetaToAzureProperties(ctx, metadata) + if err != nil { + return objInfo, azureToObjectError(err, bucket, object) + } + if err = objBlob.SetProperties(nil); err != nil { + return objInfo, azureToObjectError(err, bucket, object) + } + if err = objBlob.SetMetadata(nil); err != nil { return objInfo, azureToObjectError(err, bucket, object) } + return a.GetObjectInfo(ctx, bucket, object, opts) } @@ -974,10 +1085,6 @@ func (a *azureObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje } defer func() { - if err != nil { - return - } - blob := a.client.GetContainerReference(bucket).GetBlobReference(metadataObject) derr := blob.Delete(nil) logger.GetReqInfo(ctx).AppendTags("uploadID", uploadID)