diff --git a/cmd/gateway/azure/gateway-azure.go b/cmd/gateway/azure/gateway-azure.go index 7d2fe2cbc..700f795f7 100644 --- a/cmd/gateway/azure/gateway-azure.go +++ b/cmd/gateway/azure/gateway-azure.go @@ -685,10 +685,33 @@ func (a *azureObjects) GetObjectInfo(ctx context.Context, bucket, object string, return objInfo, azureToObjectError(err, bucket, object) } + // Populate correct ETag's if possible, this code primarily exists + // because AWS S3 indicates that + // + // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonResponseHeaders.html + // + // Objects created by the PUT Object, POST Object, or Copy operation, + // or through the AWS Management Console, and are encrypted by SSE-S3 + // or plaintext, have ETags that are an MD5 digest of their object data. + // + // Some applications depend on this behavior refer https://github.com/minio/minio/issues/6550 + // So we handle it here and make this consistent. + etag := minio.ToS3ETag(blob.Properties.Etag) + switch { + case blob.Properties.ContentMD5 != "": + b, err := base64.StdEncoding.DecodeString(blob.Properties.ContentMD5) + if err == nil { + etag = hex.EncodeToString(b) + } + case blob.Metadata["md5sum"] != "": + etag = blob.Metadata["md5sum"] + delete(blob.Metadata, "md5sum") + } + return minio.ObjectInfo{ Bucket: bucket, UserDefined: azurePropertiesToS3Meta(blob.Metadata, blob.Properties), - ETag: minio.ToS3ETag(blob.Properties.Etag), + ETag: etag, ModTime: time.Time(blob.Properties.LastModified), Name: object, Size: blob.Properties.ContentLength, @@ -700,15 +723,103 @@ func (a *azureObjects) GetObjectInfo(ctx context.Context, bucket, object string, // PutObject - Create a new blob with the incoming data, // uses Azure equivalent CreateBlockBlobFromReader. func (a *azureObjects) PutObject(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) { + if data.Size() < azureBlockSize/10 { + blob := a.client.GetContainerReference(bucket).GetBlobReference(object) + blob.Metadata, blob.Properties, err = s3MetaToAzureProperties(ctx, metadata) + if err = blob.CreateBlockBlobFromReader(data, nil); err != nil { + return objInfo, azureToObjectError(err, bucket, object) + } + return a.GetObjectInfo(ctx, bucket, object, opts) + } + + uuid, err := getAzureUploadID() + if err != nil { + return objInfo, err + } + etag := data.MD5HexString() + if etag == "" { + etag = minio.GenETag() + } + blob := a.client.GetContainerReference(bucket).GetBlobReference(object) - blob.Metadata, blob.Properties, err = s3MetaToAzureProperties(ctx, metadata) + subPartSize, subPartNumber := int64(azureBlockSize), 1 + for remainingSize := data.Size(); remainingSize >= 0; remainingSize -= subPartSize { + // Allow to create zero sized part. + if remainingSize == 0 && subPartNumber > 1 { + break + } + + if remainingSize < subPartSize { + subPartSize = remainingSize + } + + id := azureGetBlockID(1, subPartNumber, uuid, etag) + if err = blob.PutBlockWithLength(id, uint64(subPartSize), io.LimitReader(data, subPartSize), nil); err != nil { + return objInfo, azureToObjectError(err, bucket, object) + } + subPartNumber++ + } + + objBlob := a.client.GetContainerReference(bucket).GetBlobReference(object) + resp, err := objBlob.GetBlockList(storage.BlockListTypeUncommitted, nil) if err != nil { return objInfo, azureToObjectError(err, bucket, object) } - err = blob.CreateBlockBlobFromReader(data, nil) + + getBlocks := func(partNumber int, etag string) (blocks []storage.Block, size int64, aerr error) { + for _, part := range resp.UncommittedBlocks { + var partID int + var readUploadID string + var md5Hex string + if partID, _, readUploadID, md5Hex, aerr = azureParseBlockID(part.Name); aerr != nil { + return nil, 0, aerr + } + + if partNumber == partID && uuid == readUploadID && etag == md5Hex { + blocks = append(blocks, storage.Block{ + ID: part.Name, + Status: storage.BlockStatusUncommitted, + }) + + size += part.Size + } + } + + if len(blocks) == 0 { + return nil, 0, minio.InvalidPart{} + } + + return blocks, size, nil + } + + var blocks []storage.Block + blocks, _, err = getBlocks(1, etag) if err != nil { + logger.LogIf(ctx, err) + return objInfo, err + } + + if err = objBlob.PutBlockList(blocks, nil); err != nil { + return objInfo, azureToObjectError(err, bucket, object) + } + + if len(metadata) == 0 { + metadata = map[string]string{} + } + + // Save md5sum for future processing on the object. + metadata["x-amz-meta-md5sum"] = hex.EncodeToString(data.MD5Current()) + objBlob.Metadata, objBlob.Properties, err = s3MetaToAzureProperties(ctx, metadata) + if err != nil { + return objInfo, azureToObjectError(err, bucket, object) + } + if err = objBlob.SetProperties(nil); err != nil { + return objInfo, azureToObjectError(err, bucket, object) + } + if err = objBlob.SetMetadata(nil); err != nil { return objInfo, azureToObjectError(err, bucket, object) } + return a.GetObjectInfo(ctx, bucket, object, opts) } @@ -974,10 +1085,6 @@ func (a *azureObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje } defer func() { - if err != nil { - return - } - blob := a.client.GetContainerReference(bucket).GetBlobReference(metadataObject) derr := blob.Delete(nil) logger.GetReqInfo(ctx).AppendTags("uploadID", uploadID)