gateway/azure: allow putObject to support block based upload (#6552)

Current implementation simply uses all the memory locally
and crashes when a large upload is initiated using Minio
browser UI.

This PR uploads stream in blocks and finally commits the blocks
allowing for low memory footprint while uploading large objects
through Minio browser UI.

This PR also adds ETag compatibility for single PUT operations.

Fixes #6542
Fixes #6550
master
Harshavardhana 6 years ago committed by GitHub
parent 223967fd32
commit 5d859b2178
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 119
      cmd/gateway/azure/gateway-azure.go

@ -685,10 +685,33 @@ func (a *azureObjects) GetObjectInfo(ctx context.Context, bucket, object string,
return objInfo, azureToObjectError(err, bucket, object)
}
// Populate correct ETag's if possible, this code primarily exists
// because AWS S3 indicates that
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonResponseHeaders.html
//
// Objects created by the PUT Object, POST Object, or Copy operation,
// or through the AWS Management Console, and are encrypted by SSE-S3
// or plaintext, have ETags that are an MD5 digest of their object data.
//
// Some applications depend on this behavior refer https://github.com/minio/minio/issues/6550
// So we handle it here and make this consistent.
etag := minio.ToS3ETag(blob.Properties.Etag)
switch {
case blob.Properties.ContentMD5 != "":
b, err := base64.StdEncoding.DecodeString(blob.Properties.ContentMD5)
if err == nil {
etag = hex.EncodeToString(b)
}
case blob.Metadata["md5sum"] != "":
etag = blob.Metadata["md5sum"]
delete(blob.Metadata, "md5sum")
}
return minio.ObjectInfo{
Bucket: bucket,
UserDefined: azurePropertiesToS3Meta(blob.Metadata, blob.Properties),
ETag: minio.ToS3ETag(blob.Properties.Etag),
ETag: etag,
ModTime: time.Time(blob.Properties.LastModified),
Name: object,
Size: blob.Properties.ContentLength,
@ -700,15 +723,103 @@ func (a *azureObjects) GetObjectInfo(ctx context.Context, bucket, object string,
// PutObject - Create a new blob with the incoming data,
// uses Azure equivalent CreateBlockBlobFromReader.
func (a *azureObjects) PutObject(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
if data.Size() < azureBlockSize/10 {
blob := a.client.GetContainerReference(bucket).GetBlobReference(object)
blob.Metadata, blob.Properties, err = s3MetaToAzureProperties(ctx, metadata)
if err = blob.CreateBlockBlobFromReader(data, nil); err != nil {
return objInfo, azureToObjectError(err, bucket, object)
}
return a.GetObjectInfo(ctx, bucket, object, opts)
}
uuid, err := getAzureUploadID()
if err != nil {
return objInfo, err
}
etag := data.MD5HexString()
if etag == "" {
etag = minio.GenETag()
}
blob := a.client.GetContainerReference(bucket).GetBlobReference(object)
subPartSize, subPartNumber := int64(azureBlockSize), 1
for remainingSize := data.Size(); remainingSize >= 0; remainingSize -= subPartSize {
// Allow to create zero sized part.
if remainingSize == 0 && subPartNumber > 1 {
break
}
if remainingSize < subPartSize {
subPartSize = remainingSize
}
id := azureGetBlockID(1, subPartNumber, uuid, etag)
if err = blob.PutBlockWithLength(id, uint64(subPartSize), io.LimitReader(data, subPartSize), nil); err != nil {
return objInfo, azureToObjectError(err, bucket, object)
}
subPartNumber++
}
objBlob := a.client.GetContainerReference(bucket).GetBlobReference(object)
resp, err := objBlob.GetBlockList(storage.BlockListTypeUncommitted, nil)
if err != nil {
return objInfo, azureToObjectError(err, bucket, object)
}
getBlocks := func(partNumber int, etag string) (blocks []storage.Block, size int64, aerr error) {
for _, part := range resp.UncommittedBlocks {
var partID int
var readUploadID string
var md5Hex string
if partID, _, readUploadID, md5Hex, aerr = azureParseBlockID(part.Name); aerr != nil {
return nil, 0, aerr
}
if partNumber == partID && uuid == readUploadID && etag == md5Hex {
blocks = append(blocks, storage.Block{
ID: part.Name,
Status: storage.BlockStatusUncommitted,
})
size += part.Size
}
}
if len(blocks) == 0 {
return nil, 0, minio.InvalidPart{}
}
return blocks, size, nil
}
var blocks []storage.Block
blocks, _, err = getBlocks(1, etag)
if err != nil {
logger.LogIf(ctx, err)
return objInfo, err
}
if err = objBlob.PutBlockList(blocks, nil); err != nil {
return objInfo, azureToObjectError(err, bucket, object)
}
err = blob.CreateBlockBlobFromReader(data, nil)
if len(metadata) == 0 {
metadata = map[string]string{}
}
// Save md5sum for future processing on the object.
metadata["x-amz-meta-md5sum"] = hex.EncodeToString(data.MD5Current())
objBlob.Metadata, objBlob.Properties, err = s3MetaToAzureProperties(ctx, metadata)
if err != nil {
return objInfo, azureToObjectError(err, bucket, object)
}
if err = objBlob.SetProperties(nil); err != nil {
return objInfo, azureToObjectError(err, bucket, object)
}
if err = objBlob.SetMetadata(nil); err != nil {
return objInfo, azureToObjectError(err, bucket, object)
}
return a.GetObjectInfo(ctx, bucket, object, opts)
}
@ -974,10 +1085,6 @@ func (a *azureObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje
}
defer func() {
if err != nil {
return
}
blob := a.client.GetContainerReference(bucket).GetBlobReference(metadataObject)
derr := blob.Delete(nil)
logger.GetReqInfo(ctx).AppendTags("uploadID", uploadID)

Loading…
Cancel
Save