|
|
|
@ -25,15 +25,17 @@ import ( |
|
|
|
|
"encoding/json" |
|
|
|
|
"fmt" |
|
|
|
|
"io" |
|
|
|
|
"io/ioutil" |
|
|
|
|
"net/http" |
|
|
|
|
"net/url" |
|
|
|
|
"path" |
|
|
|
|
"sort" |
|
|
|
|
"strconv" |
|
|
|
|
"strings" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"github.com/Azure/azure-sdk-for-go/storage" |
|
|
|
|
"github.com/Azure/go-autorest/autorest/azure" |
|
|
|
|
"github.com/Azure/azure-pipeline-go/pipeline" |
|
|
|
|
"github.com/Azure/azure-storage-blob-go/azblob" |
|
|
|
|
humanize "github.com/dustin/go-humanize" |
|
|
|
|
"github.com/minio/cli" |
|
|
|
|
miniogopolicy "github.com/minio/minio-go/v6/pkg/policy" |
|
|
|
@ -48,7 +50,17 @@ import ( |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
const ( |
|
|
|
|
globalAzureAPIVersion = "2016-05-31" |
|
|
|
|
// The defaultDialTimeout for communicating with the cloud backends is set
|
|
|
|
|
// to 30 seconds in utils.go; the Azure SDK recommends to set a timeout of 60
|
|
|
|
|
// seconds per MB of data a client expects to upload so we must transfer less
|
|
|
|
|
// than 0.5 MB per chunk to stay within the defaultDialTimeout tolerance.
|
|
|
|
|
// See https://github.com/Azure/azure-storage-blob-go/blob/fc70003/azblob/zc_policy_retry.go#L39-L44 for more details.
|
|
|
|
|
azureUploadChunkSize = 0.25 * humanize.MiByte |
|
|
|
|
azureSdkTimeout = (azureUploadChunkSize / humanize.MiByte) * 60 * time.Second |
|
|
|
|
azureUploadMaxMemoryUsage = 10 * humanize.MiByte |
|
|
|
|
azureUploadConcurrency = azureUploadMaxMemoryUsage / azureUploadChunkSize |
|
|
|
|
|
|
|
|
|
azureDownloadRetryAttempts = 5 |
|
|
|
|
azureBlockSize = 100 * humanize.MiByte |
|
|
|
|
azureS3MinPartSize = 5 * humanize.MiByte |
|
|
|
|
metadataObjectNameTemplate = minio.GatewayMinioSysTmp + "multipart/v1/%s.%x/azure.json" |
|
|
|
@ -144,53 +156,76 @@ func (g *Azure) Name() string { |
|
|
|
|
return azureBackend |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// All known cloud environments of Azure
|
|
|
|
|
var azureEnvs = []azure.Environment{ |
|
|
|
|
azure.PublicCloud, |
|
|
|
|
azure.USGovernmentCloud, |
|
|
|
|
azure.ChinaCloud, |
|
|
|
|
azure.GermanCloud, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// NewGatewayLayer initializes azure blob storage client and returns AzureObjects.
|
|
|
|
|
func (g *Azure) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error) { |
|
|
|
|
var err error |
|
|
|
|
// The default endpoint is the public cloud
|
|
|
|
|
var endpoint = azure.PublicCloud.StorageEndpointSuffix |
|
|
|
|
var secure = true |
|
|
|
|
|
|
|
|
|
// Load the endpoint url if supplied by the user.
|
|
|
|
|
if g.host != "" { |
|
|
|
|
endpoint, secure, err = minio.ParseGatewayEndpoint(g.host) |
|
|
|
|
endpointURL, err := parseStorageEndpoint(g.host, creds.AccessKey) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
// Reformat the full account storage endpoint to the base format.
|
|
|
|
|
// e.g. testazure.blob.core.windows.net => core.windows.net
|
|
|
|
|
endpoint = strings.ToLower(endpoint) |
|
|
|
|
for _, env := range azureEnvs { |
|
|
|
|
if strings.Contains(endpoint, env.StorageEndpointSuffix) { |
|
|
|
|
endpoint = env.StorageEndpointSuffix |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
c, err := storage.NewClient(creds.AccessKey, creds.SecretKey, endpoint, globalAzureAPIVersion, secure) |
|
|
|
|
credential, err := azblob.NewSharedKeyCredential(creds.AccessKey, creds.SecretKey) |
|
|
|
|
if err != nil { |
|
|
|
|
return &azureObjects{}, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
c.AddToUserAgent(fmt.Sprintf("APN/1.0 MinIO/1.0 MinIO/%s", minio.Version)) |
|
|
|
|
c.HTTPClient = &http.Client{Transport: minio.NewCustomHTTPTransport()} |
|
|
|
|
httpClient := &http.Client{Transport: minio.NewCustomHTTPTransport()} |
|
|
|
|
userAgent := fmt.Sprintf("APN/1.0 MinIO/1.0 MinIO/%s", minio.Version) |
|
|
|
|
|
|
|
|
|
pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{ |
|
|
|
|
Retry: azblob.RetryOptions{ |
|
|
|
|
TryTimeout: azureSdkTimeout, |
|
|
|
|
}, |
|
|
|
|
HTTPSender: pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc { |
|
|
|
|
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) { |
|
|
|
|
request.Header.Set("User-Agent", userAgent) |
|
|
|
|
resp, err := httpClient.Do(request.WithContext(ctx)) |
|
|
|
|
return pipeline.NewHTTPResponse(resp), err |
|
|
|
|
} |
|
|
|
|
}), |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
client := azblob.NewServiceURL(*endpointURL, pipeline) |
|
|
|
|
|
|
|
|
|
return &azureObjects{ |
|
|
|
|
endpoint: fmt.Sprintf("https://%s.blob.core.windows.net", creds.AccessKey), |
|
|
|
|
httpClient: c.HTTPClient, |
|
|
|
|
client: c.GetBlobService(), |
|
|
|
|
endpoint: endpointURL.String(), |
|
|
|
|
httpClient: httpClient, |
|
|
|
|
client: client, |
|
|
|
|
}, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func parseStorageEndpoint(host string, accountName string) (*url.URL, error) { |
|
|
|
|
var endpoint string |
|
|
|
|
|
|
|
|
|
// Load the endpoint url if supplied by the user.
|
|
|
|
|
if host != "" { |
|
|
|
|
host, secure, err := minio.ParseGatewayEndpoint(host) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var protocol string |
|
|
|
|
if secure { |
|
|
|
|
protocol = "https" |
|
|
|
|
} else { |
|
|
|
|
protocol = "http" |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// for containerized storage deployments like Azurite or IoT Edge Storage,
|
|
|
|
|
// account resolution isn't handled via a hostname prefix like
|
|
|
|
|
// `http://${account}.host/${path}` but instead via a route prefix like
|
|
|
|
|
// `http://host/${account}/${path}` so adjusting for that here
|
|
|
|
|
if !strings.HasPrefix(host, fmt.Sprintf("%s.", accountName)) { |
|
|
|
|
host = fmt.Sprintf("%s/%s", host, accountName) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
endpoint = fmt.Sprintf("%s://%s", protocol, host) |
|
|
|
|
} else { |
|
|
|
|
endpoint = fmt.Sprintf("https://%s.blob.core.windows.net", accountName) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return url.Parse(endpoint) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Production - Azure gateway is production ready.
|
|
|
|
|
func (g *Azure) Production() bool { |
|
|
|
|
return true |
|
|
|
@ -210,11 +245,10 @@ func (g *Azure) Production() bool { |
|
|
|
|
// copied into BlobProperties.
|
|
|
|
|
//
|
|
|
|
|
// Header names are canonicalized as in http.Header.
|
|
|
|
|
func s3MetaToAzureProperties(ctx context.Context, s3Metadata map[string]string) (storage.BlobMetadata, |
|
|
|
|
storage.BlobProperties, error) { |
|
|
|
|
func s3MetaToAzureProperties(ctx context.Context, s3Metadata map[string]string) (azblob.Metadata, azblob.BlobHTTPHeaders, error) { |
|
|
|
|
for k := range s3Metadata { |
|
|
|
|
if strings.Contains(k, "--") { |
|
|
|
|
return storage.BlobMetadata{}, storage.BlobProperties{}, minio.UnsupportedMetadata{} |
|
|
|
|
return azblob.Metadata{}, azblob.BlobHTTPHeaders{}, minio.UnsupportedMetadata{} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -232,8 +266,9 @@ func s3MetaToAzureProperties(ctx context.Context, s3Metadata map[string]string) |
|
|
|
|
} |
|
|
|
|
return strings.Join(tokens, "__") |
|
|
|
|
} |
|
|
|
|
var blobMeta storage.BlobMetadata = make(map[string]string) |
|
|
|
|
var props storage.BlobProperties |
|
|
|
|
var blobMeta azblob.Metadata = make(map[string]string) |
|
|
|
|
var err error |
|
|
|
|
var props azblob.BlobHTTPHeaders |
|
|
|
|
for k, v := range s3Metadata { |
|
|
|
|
k = http.CanonicalHeaderKey(k) |
|
|
|
|
switch { |
|
|
|
@ -253,18 +288,15 @@ func s3MetaToAzureProperties(ctx context.Context, s3Metadata map[string]string) |
|
|
|
|
props.ContentDisposition = v |
|
|
|
|
case k == "Content-Encoding": |
|
|
|
|
props.ContentEncoding = v |
|
|
|
|
case k == "Content-Length": |
|
|
|
|
// assume this doesn't fail
|
|
|
|
|
props.ContentLength, _ = strconv.ParseInt(v, 10, 64) |
|
|
|
|
case k == "Content-Md5": |
|
|
|
|
props.ContentMD5 = v |
|
|
|
|
props.ContentMD5, err = base64.StdEncoding.DecodeString(v) |
|
|
|
|
case k == "Content-Type": |
|
|
|
|
props.ContentType = v |
|
|
|
|
case k == "Content-Language": |
|
|
|
|
props.ContentLanguage = v |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return blobMeta, props, nil |
|
|
|
|
return blobMeta, props, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const ( |
|
|
|
@ -291,7 +323,7 @@ func newPartMetaV1(uploadID string, partID int) (partMeta *partMetadataV1) { |
|
|
|
|
// metadata. It is the reverse of s3MetaToAzureProperties. Azure's
|
|
|
|
|
// `.GetMetadata()` lower-cases all header keys, so this is taken into
|
|
|
|
|
// account by this function.
|
|
|
|
|
func azurePropertiesToS3Meta(meta storage.BlobMetadata, props storage.BlobProperties) map[string]string { |
|
|
|
|
func azurePropertiesToS3Meta(meta azblob.Metadata, props azblob.BlobHTTPHeaders, contentLength int64) map[string]string { |
|
|
|
|
// Decoding technique for each key is used here is as follows
|
|
|
|
|
// Each '_' is converted to '-'
|
|
|
|
|
// Each '__' is converted to '_'
|
|
|
|
@ -327,11 +359,11 @@ func azurePropertiesToS3Meta(meta storage.BlobMetadata, props storage.BlobProper |
|
|
|
|
if props.ContentEncoding != "" { |
|
|
|
|
s3Metadata["Content-Encoding"] = props.ContentEncoding |
|
|
|
|
} |
|
|
|
|
if props.ContentLength != 0 { |
|
|
|
|
s3Metadata["Content-Length"] = fmt.Sprintf("%d", props.ContentLength) |
|
|
|
|
if contentLength != 0 { |
|
|
|
|
s3Metadata["Content-Length"] = fmt.Sprintf("%d", contentLength) |
|
|
|
|
} |
|
|
|
|
if props.ContentMD5 != "" { |
|
|
|
|
s3Metadata["Content-MD5"] = props.ContentMD5 |
|
|
|
|
if len(props.ContentMD5) != 0 { |
|
|
|
|
s3Metadata["Content-MD5"] = base64.StdEncoding.EncodeToString(props.ContentMD5) |
|
|
|
|
} |
|
|
|
|
if props.ContentType != "" { |
|
|
|
|
s3Metadata["Content-Type"] = props.ContentType |
|
|
|
@ -347,7 +379,7 @@ type azureObjects struct { |
|
|
|
|
minio.GatewayUnsupported |
|
|
|
|
endpoint string |
|
|
|
|
httpClient *http.Client |
|
|
|
|
client storage.BlobStorageClient // Azure sdk client
|
|
|
|
|
client azblob.ServiceURL // Azure sdk client
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Convert azure errors to minio object layer errors.
|
|
|
|
@ -365,14 +397,21 @@ func azureToObjectError(err error, params ...string) error { |
|
|
|
|
object = params[1] |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
azureErr, ok := err.(storage.AzureStorageServiceError) |
|
|
|
|
azureErr, ok := err.(azblob.StorageError) |
|
|
|
|
if !ok { |
|
|
|
|
// We don't interpret non Azure errors. As azure errors will
|
|
|
|
|
// have StatusCode to help to convert to object errors.
|
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
switch azureErr.Code { |
|
|
|
|
serviceCode := string(azureErr.ServiceCode()) |
|
|
|
|
statusCode := azureErr.Response().StatusCode |
|
|
|
|
|
|
|
|
|
return azureCodesToObjectError(err, serviceCode, statusCode, bucket, object) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func azureCodesToObjectError(err error, serviceCode string, statusCode int, bucket string, object string) error { |
|
|
|
|
switch serviceCode { |
|
|
|
|
case "ContainerAlreadyExists": |
|
|
|
|
err = minio.BucketExists{Bucket: bucket} |
|
|
|
|
case "InvalidResourceName": |
|
|
|
@ -382,7 +421,7 @@ func azureToObjectError(err error, params ...string) error { |
|
|
|
|
case "InvalidMetadata": |
|
|
|
|
err = minio.UnsupportedMetadata{} |
|
|
|
|
default: |
|
|
|
|
switch azureErr.StatusCode { |
|
|
|
|
switch statusCode { |
|
|
|
|
case http.StatusNotFound: |
|
|
|
|
if object != "" { |
|
|
|
|
err = minio.ObjectNotFound{ |
|
|
|
@ -466,10 +505,8 @@ func (a *azureObjects) MakeBucketWithLocation(ctx context.Context, bucket, locat |
|
|
|
|
return minio.BucketNameInvalid{Bucket: bucket} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
container := a.client.GetContainerReference(bucket) |
|
|
|
|
err := container.Create(&storage.CreateContainerOptions{ |
|
|
|
|
Access: storage.ContainerAccessTypePrivate, |
|
|
|
|
}) |
|
|
|
|
containerURL := a.client.NewContainerURL(bucket) |
|
|
|
|
_, err := containerURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone) |
|
|
|
|
return azureToObjectError(err, bucket) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -477,55 +514,66 @@ func (a *azureObjects) MakeBucketWithLocation(ctx context.Context, bucket, locat |
|
|
|
|
func (a *azureObjects) GetBucketInfo(ctx context.Context, bucket string) (bi minio.BucketInfo, e error) { |
|
|
|
|
// Azure does not have an equivalent call, hence use
|
|
|
|
|
// ListContainers with prefix
|
|
|
|
|
resp, err := a.client.ListContainers(storage.ListContainersParameters{ |
|
|
|
|
|
|
|
|
|
marker := azblob.Marker{} |
|
|
|
|
|
|
|
|
|
for marker.NotDone() { |
|
|
|
|
resp, err := a.client.ListContainersSegment(ctx, marker, azblob.ListContainersSegmentOptions{ |
|
|
|
|
Prefix: bucket, |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
return bi, azureToObjectError(err, bucket) |
|
|
|
|
} |
|
|
|
|
for _, container := range resp.Containers { |
|
|
|
|
|
|
|
|
|
for _, container := range resp.ContainerItems { |
|
|
|
|
if container.Name == bucket { |
|
|
|
|
t, e := time.Parse(time.RFC1123, container.Properties.LastModified) |
|
|
|
|
if e == nil { |
|
|
|
|
t := container.Properties.LastModified |
|
|
|
|
return minio.BucketInfo{ |
|
|
|
|
Name: bucket, |
|
|
|
|
Created: t, |
|
|
|
|
}, nil |
|
|
|
|
} // else continue
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
marker = resp.NextMarker |
|
|
|
|
} |
|
|
|
|
return bi, minio.BucketNotFound{Bucket: bucket} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ListBuckets - Lists all azure containers, uses Azure equivalent ListContainers.
|
|
|
|
|
// ListBuckets - Lists all azure containers, uses Azure equivalent `ServiceURL.ListContainersSegment`.
|
|
|
|
|
func (a *azureObjects) ListBuckets(ctx context.Context) (buckets []minio.BucketInfo, err error) { |
|
|
|
|
resp, err := a.client.ListContainers(storage.ListContainersParameters{}) |
|
|
|
|
marker := azblob.Marker{} |
|
|
|
|
|
|
|
|
|
for marker.NotDone() { |
|
|
|
|
resp, err := a.client.ListContainersSegment(ctx, marker, azblob.ListContainersSegmentOptions{}) |
|
|
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
return nil, azureToObjectError(err) |
|
|
|
|
} |
|
|
|
|
for _, container := range resp.Containers { |
|
|
|
|
t, e := time.Parse(time.RFC1123, container.Properties.LastModified) |
|
|
|
|
if e != nil { |
|
|
|
|
logger.LogIf(ctx, e) |
|
|
|
|
return nil, e |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for _, container := range resp.ContainerItems { |
|
|
|
|
t := container.Properties.LastModified |
|
|
|
|
buckets = append(buckets, minio.BucketInfo{ |
|
|
|
|
Name: container.Name, |
|
|
|
|
Created: t, |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
marker = resp.NextMarker |
|
|
|
|
} |
|
|
|
|
return buckets, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// DeleteBucket - delete a container on azure, uses Azure equivalent DeleteContainer.
|
|
|
|
|
// DeleteBucket - delete a container on azure, uses Azure equivalent `ContainerURL.Delete`.
|
|
|
|
|
func (a *azureObjects) DeleteBucket(ctx context.Context, bucket string) error { |
|
|
|
|
container := a.client.GetContainerReference(bucket) |
|
|
|
|
err := container.Delete(nil) |
|
|
|
|
containerURL := a.client.NewContainerURL(bucket) |
|
|
|
|
_, err := containerURL.Delete(ctx, azblob.ContainerAccessConditions{}) |
|
|
|
|
return azureToObjectError(err, bucket) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ListObjects - lists all blobs on azure with in a container filtered by prefix
|
|
|
|
|
// and marker, uses Azure equivalent ListBlobs.
|
|
|
|
|
// and marker, uses Azure equivalent `ContainerURL.ListBlobsHierarchySegment`.
|
|
|
|
|
// To accommodate S3-compatible applications using
|
|
|
|
|
// ListObjectsV1 to use object keys as markers to control the
|
|
|
|
|
// listing of objects, we use the following encoding scheme to
|
|
|
|
@ -542,26 +590,25 @@ func (a *azureObjects) ListObjects(ctx context.Context, bucket, prefix, marker, |
|
|
|
|
var objects []minio.ObjectInfo |
|
|
|
|
var prefixes []string |
|
|
|
|
|
|
|
|
|
azureListMarker := "" |
|
|
|
|
azureListMarker := azblob.Marker{} |
|
|
|
|
if isAzureMarker(marker) { |
|
|
|
|
// If application is using Azure continuation token we should
|
|
|
|
|
// strip the azureTokenPrefix we added in the previous list response.
|
|
|
|
|
azureListMarker = strings.TrimPrefix(marker, azureMarkerPrefix) |
|
|
|
|
azureMarker := strings.TrimPrefix(marker, azureMarkerPrefix) |
|
|
|
|
azureListMarker.Val = &azureMarker |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
container := a.client.GetContainerReference(bucket) |
|
|
|
|
containerURL := a.client.NewContainerURL(bucket) |
|
|
|
|
for len(objects) == 0 && len(prefixes) == 0 { |
|
|
|
|
resp, err := container.ListBlobs(storage.ListBlobsParameters{ |
|
|
|
|
resp, err := containerURL.ListBlobsHierarchySegment(ctx, azureListMarker, delimiter, azblob.ListBlobsSegmentOptions{ |
|
|
|
|
Prefix: prefix, |
|
|
|
|
Marker: azureListMarker, |
|
|
|
|
Delimiter: delimiter, |
|
|
|
|
MaxResults: uint(maxKeys), |
|
|
|
|
MaxResults: int32(maxKeys), |
|
|
|
|
}) |
|
|
|
|
if err != nil { |
|
|
|
|
return result, azureToObjectError(err, bucket, prefix) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for _, blob := range resp.Blobs { |
|
|
|
|
for _, blob := range resp.Segment.BlobItems { |
|
|
|
|
if delimiter == "" && strings.HasPrefix(blob.Name, minio.GatewayMinioSysTmp) { |
|
|
|
|
// We filter out minio.GatewayMinioSysTmp entries in the recursive listing.
|
|
|
|
|
continue |
|
|
|
@ -582,13 +629,10 @@ func (a *azureObjects) ListObjects(ctx context.Context, bucket, prefix, marker, |
|
|
|
|
//
|
|
|
|
|
// Some applications depend on this behavior refer https://github.com/minio/minio/issues/6550
|
|
|
|
|
// So we handle it here and make this consistent.
|
|
|
|
|
etag := minio.ToS3ETag(blob.Properties.Etag) |
|
|
|
|
etag := minio.ToS3ETag(string(blob.Properties.Etag)) |
|
|
|
|
switch { |
|
|
|
|
case blob.Properties.ContentMD5 != "": |
|
|
|
|
b, err := base64.StdEncoding.DecodeString(blob.Properties.ContentMD5) |
|
|
|
|
if err == nil { |
|
|
|
|
etag = hex.EncodeToString(b) |
|
|
|
|
} |
|
|
|
|
case len(blob.Properties.ContentMD5) != 0: |
|
|
|
|
etag = hex.EncodeToString(blob.Properties.ContentMD5) |
|
|
|
|
case blob.Metadata["md5sum"] != "": |
|
|
|
|
etag = blob.Metadata["md5sum"] |
|
|
|
|
delete(blob.Metadata, "md5sum") |
|
|
|
@ -597,31 +641,31 @@ func (a *azureObjects) ListObjects(ctx context.Context, bucket, prefix, marker, |
|
|
|
|
objects = append(objects, minio.ObjectInfo{ |
|
|
|
|
Bucket: bucket, |
|
|
|
|
Name: blob.Name, |
|
|
|
|
ModTime: time.Time(blob.Properties.LastModified), |
|
|
|
|
Size: blob.Properties.ContentLength, |
|
|
|
|
ModTime: blob.Properties.LastModified, |
|
|
|
|
Size: *blob.Properties.ContentLength, |
|
|
|
|
ETag: etag, |
|
|
|
|
ContentType: blob.Properties.ContentType, |
|
|
|
|
ContentEncoding: blob.Properties.ContentEncoding, |
|
|
|
|
ContentType: *blob.Properties.ContentType, |
|
|
|
|
ContentEncoding: *blob.Properties.ContentEncoding, |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for _, blobPrefix := range resp.BlobPrefixes { |
|
|
|
|
if blobPrefix == minio.GatewayMinioSysTmp { |
|
|
|
|
for _, blobPrefix := range resp.Segment.BlobPrefixes { |
|
|
|
|
if blobPrefix.Name == minio.GatewayMinioSysTmp { |
|
|
|
|
// We don't do strings.HasPrefix(blob.Name, minio.GatewayMinioSysTmp) here so that
|
|
|
|
|
// we can use tools like mc to inspect the contents of minio.sys.tmp/
|
|
|
|
|
// It is OK to allow listing of minio.sys.tmp/ in non-recursive mode as it aids in debugging.
|
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
if !isAzureMarker(marker) && blobPrefix <= marker { |
|
|
|
|
if !isAzureMarker(marker) && blobPrefix.Name <= marker { |
|
|
|
|
// If the application used ListObjectsV1 style marker then we
|
|
|
|
|
// skip all the entries till we reach the marker.
|
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
prefixes = append(prefixes, blobPrefix) |
|
|
|
|
prefixes = append(prefixes, blobPrefix.Name) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
azureListMarker = resp.NextMarker |
|
|
|
|
if azureListMarker == "" { |
|
|
|
|
if !azureListMarker.NotDone() { |
|
|
|
|
// Reached end of listing.
|
|
|
|
|
break |
|
|
|
|
} |
|
|
|
@ -629,10 +673,10 @@ func (a *azureObjects) ListObjects(ctx context.Context, bucket, prefix, marker, |
|
|
|
|
|
|
|
|
|
result.Objects = objects |
|
|
|
|
result.Prefixes = prefixes |
|
|
|
|
if azureListMarker != "" { |
|
|
|
|
if azureListMarker.NotDone() { |
|
|
|
|
// We add the {minio} prefix so that we know in the subsequent request that this
|
|
|
|
|
// marker is a azure continuation token and not ListObjectV1 marker.
|
|
|
|
|
result.NextMarker = azureMarkerPrefix + azureListMarker |
|
|
|
|
result.NextMarker = azureMarkerPrefix + *azureListMarker.Val |
|
|
|
|
result.IsTruncated = true |
|
|
|
|
} |
|
|
|
|
return result, nil |
|
|
|
@ -696,34 +740,24 @@ func (a *azureObjects) GetObject(ctx context.Context, bucket, object string, sta |
|
|
|
|
return azureToObjectError(minio.InvalidRange{}, bucket, object) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
blobRange := &storage.BlobRange{Start: uint64(startOffset)} |
|
|
|
|
if length > 0 { |
|
|
|
|
blobRange.End = uint64(startOffset + length - 1) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
blob := a.client.GetContainerReference(bucket).GetBlobReference(object) |
|
|
|
|
var rc io.ReadCloser |
|
|
|
|
var err error |
|
|
|
|
if startOffset == 0 && length == 0 { |
|
|
|
|
rc, err = blob.Get(nil) |
|
|
|
|
} else { |
|
|
|
|
rc, err = blob.GetRange(&storage.GetBlobRangeOptions{ |
|
|
|
|
Range: blobRange, |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
blobURL := a.client.NewContainerURL(bucket).NewBlobURL(object) |
|
|
|
|
blob, err := blobURL.Download(ctx, startOffset, length, azblob.BlobAccessConditions{}, false) |
|
|
|
|
if err != nil { |
|
|
|
|
return azureToObjectError(err, bucket, object) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
rc := blob.Body(azblob.RetryReaderOptions{MaxRetryRequests: azureDownloadRetryAttempts}) |
|
|
|
|
|
|
|
|
|
_, err = io.Copy(writer, rc) |
|
|
|
|
rc.Close() |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GetObjectInfo - reads blob metadata properties and replies back minio.ObjectInfo,
|
|
|
|
|
// uses zure equivalent GetBlobProperties.
|
|
|
|
|
// uses Azure equivalent `BlobURL.GetProperties`.
|
|
|
|
|
func (a *azureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) { |
|
|
|
|
blob := a.client.GetContainerReference(bucket).GetBlobReference(object) |
|
|
|
|
err = blob.GetProperties(nil) |
|
|
|
|
blobURL := a.client.NewContainerURL(bucket).NewBlobURL(object) |
|
|
|
|
blob, err := blobURL.GetProperties(ctx, azblob.BlobAccessConditions{}) |
|
|
|
|
if err != nil { |
|
|
|
|
return objInfo, azureToObjectError(err, bucket, object) |
|
|
|
|
} |
|
|
|
@ -739,124 +773,102 @@ func (a *azureObjects) GetObjectInfo(ctx context.Context, bucket, object string, |
|
|
|
|
//
|
|
|
|
|
// Some applications depend on this behavior refer https://github.com/minio/minio/issues/6550
|
|
|
|
|
// So we handle it here and make this consistent.
|
|
|
|
|
etag := minio.ToS3ETag(blob.Properties.Etag) |
|
|
|
|
etag := minio.ToS3ETag(string(blob.ETag())) |
|
|
|
|
metadata := blob.NewMetadata() |
|
|
|
|
contentMD5 := blob.ContentMD5() |
|
|
|
|
switch { |
|
|
|
|
case blob.Properties.ContentMD5 != "": |
|
|
|
|
b, err := base64.StdEncoding.DecodeString(blob.Properties.ContentMD5) |
|
|
|
|
if err == nil { |
|
|
|
|
etag = hex.EncodeToString(b) |
|
|
|
|
} |
|
|
|
|
case blob.Metadata["md5sum"] != "": |
|
|
|
|
etag = blob.Metadata["md5sum"] |
|
|
|
|
delete(blob.Metadata, "md5sum") |
|
|
|
|
case len(contentMD5) != 0: |
|
|
|
|
etag = hex.EncodeToString(contentMD5) |
|
|
|
|
case metadata["md5sum"] != "": |
|
|
|
|
etag = metadata["md5sum"] |
|
|
|
|
delete(metadata, "md5sum") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return minio.ObjectInfo{ |
|
|
|
|
Bucket: bucket, |
|
|
|
|
UserDefined: azurePropertiesToS3Meta(blob.Metadata, blob.Properties), |
|
|
|
|
UserDefined: azurePropertiesToS3Meta(metadata, blob.NewHTTPHeaders(), blob.ContentLength()), |
|
|
|
|
ETag: etag, |
|
|
|
|
ModTime: time.Time(blob.Properties.LastModified), |
|
|
|
|
ModTime: blob.LastModified(), |
|
|
|
|
Name: object, |
|
|
|
|
Size: blob.Properties.ContentLength, |
|
|
|
|
ContentType: blob.Properties.ContentType, |
|
|
|
|
ContentEncoding: blob.Properties.ContentEncoding, |
|
|
|
|
Size: blob.ContentLength(), |
|
|
|
|
ContentType: blob.ContentType(), |
|
|
|
|
ContentEncoding: blob.ContentEncoding(), |
|
|
|
|
}, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// PutObject - Create a new blob with the incoming data,
|
|
|
|
|
// uses Azure equivalent CreateBlockBlobFromReader.
|
|
|
|
|
// uses Azure equivalent `UploadStreamToBlockBlob`.
|
|
|
|
|
func (a *azureObjects) PutObject(ctx context.Context, bucket, object string, r *minio.PutObjReader, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) { |
|
|
|
|
data := r.Reader |
|
|
|
|
if data.Size() <= azureBlockSize/2 { |
|
|
|
|
blob := a.client.GetContainerReference(bucket).GetBlobReference(object) |
|
|
|
|
blob.Metadata, blob.Properties, err = s3MetaToAzureProperties(ctx, opts.UserDefined) |
|
|
|
|
if err != nil { |
|
|
|
|
return objInfo, azureToObjectError(err, bucket, object) |
|
|
|
|
} |
|
|
|
|
if err = blob.CreateBlockBlobFromReader(data, nil); err != nil { |
|
|
|
|
return objInfo, azureToObjectError(err, bucket, object) |
|
|
|
|
} |
|
|
|
|
return a.GetObjectInfo(ctx, bucket, object, opts) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
blob := a.client.GetContainerReference(bucket).GetBlobReference(object) |
|
|
|
|
var blocks []storage.Block |
|
|
|
|
subPartSize, subPartNumber := int64(azureBlockSize), 1 |
|
|
|
|
for remainingSize := data.Size(); remainingSize >= 0; remainingSize -= subPartSize { |
|
|
|
|
// Allow to create zero sized part.
|
|
|
|
|
if remainingSize == 0 && subPartNumber > 1 { |
|
|
|
|
break |
|
|
|
|
if data.Size() > azureBlockSize/2 { |
|
|
|
|
if len(opts.UserDefined) == 0 { |
|
|
|
|
opts.UserDefined = map[string]string{} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if remainingSize < subPartSize { |
|
|
|
|
subPartSize = remainingSize |
|
|
|
|
// Save md5sum for future processing on the object.
|
|
|
|
|
opts.UserDefined["x-amz-meta-md5sum"] = r.MD5CurrentHexString() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
id := base64.StdEncoding.EncodeToString([]byte(minio.MustGetUUID())) |
|
|
|
|
err = blob.PutBlockWithLength(id, uint64(subPartSize), io.LimitReader(data, subPartSize), nil) |
|
|
|
|
metadata, properties, err := s3MetaToAzureProperties(ctx, opts.UserDefined) |
|
|
|
|
if err != nil { |
|
|
|
|
return objInfo, azureToObjectError(err, bucket, object) |
|
|
|
|
} |
|
|
|
|
blocks = append(blocks, storage.Block{ |
|
|
|
|
ID: id, |
|
|
|
|
Status: storage.BlockStatusUncommitted, |
|
|
|
|
}) |
|
|
|
|
subPartNumber++ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if err = blob.PutBlockList(blocks, nil); err != nil { |
|
|
|
|
return objInfo, azureToObjectError(err, bucket, object) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if len(opts.UserDefined) == 0 { |
|
|
|
|
opts.UserDefined = map[string]string{} |
|
|
|
|
} |
|
|
|
|
blobURL := a.client.NewContainerURL(bucket).NewBlockBlobURL(object) |
|
|
|
|
|
|
|
|
|
// Save md5sum for future processing on the object.
|
|
|
|
|
opts.UserDefined["x-amz-meta-md5sum"] = r.MD5CurrentHexString() |
|
|
|
|
blob.Metadata, blob.Properties, err = s3MetaToAzureProperties(ctx, opts.UserDefined) |
|
|
|
|
_, err = azblob.UploadStreamToBlockBlob(ctx, data, blobURL, azblob.UploadStreamToBlockBlobOptions{ |
|
|
|
|
BufferSize: azureUploadChunkSize, |
|
|
|
|
MaxBuffers: azureUploadConcurrency, |
|
|
|
|
BlobHTTPHeaders: properties, |
|
|
|
|
Metadata: metadata, |
|
|
|
|
}) |
|
|
|
|
if err != nil { |
|
|
|
|
return objInfo, azureToObjectError(err, bucket, object) |
|
|
|
|
} |
|
|
|
|
if err = blob.SetProperties(nil); err != nil { |
|
|
|
|
return objInfo, azureToObjectError(err, bucket, object) |
|
|
|
|
} |
|
|
|
|
if err = blob.SetMetadata(nil); err != nil { |
|
|
|
|
return objInfo, azureToObjectError(err, bucket, object) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return a.GetObjectInfo(ctx, bucket, object, opts) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// CopyObject - Copies a blob from source container to destination container.
|
|
|
|
|
// Uses Azure equivalent CopyBlob API.
|
|
|
|
|
// Uses Azure equivalent `BlobURL.StartCopyFromURL`.
|
|
|
|
|
func (a *azureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) { |
|
|
|
|
if srcOpts.CheckCopyPrecondFn != nil && srcOpts.CheckCopyPrecondFn(srcInfo, "") { |
|
|
|
|
return minio.ObjectInfo{}, minio.PreConditionFailed{} |
|
|
|
|
} |
|
|
|
|
srcBlobURL := a.client.GetContainerReference(srcBucket).GetBlobReference(srcObject).GetURL() |
|
|
|
|
destBlob := a.client.GetContainerReference(destBucket).GetBlobReference(destObject) |
|
|
|
|
srcBlobURL := a.client.NewContainerURL(srcBucket).NewBlobURL(srcObject).URL() |
|
|
|
|
destBlob := a.client.NewContainerURL(destBucket).NewBlobURL(destObject) |
|
|
|
|
azureMeta, props, err := s3MetaToAzureProperties(ctx, srcInfo.UserDefined) |
|
|
|
|
if err != nil { |
|
|
|
|
return objInfo, azureToObjectError(err, srcBucket, srcObject) |
|
|
|
|
} |
|
|
|
|
destBlob.Metadata = azureMeta |
|
|
|
|
err = destBlob.Copy(srcBlobURL, nil) |
|
|
|
|
res, err := destBlob.StartCopyFromURL(ctx, srcBlobURL, azureMeta, azblob.ModifiedAccessConditions{}, azblob.BlobAccessConditions{}) |
|
|
|
|
if err != nil { |
|
|
|
|
return objInfo, azureToObjectError(err, srcBucket, srcObject) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// StartCopyFromURL is an asynchronous operation so need to poll for completion,
|
|
|
|
|
// see https://docs.microsoft.com/en-us/rest/api/storageservices/copy-blob#remarks.
|
|
|
|
|
copyStatus := res.CopyStatus() |
|
|
|
|
for copyStatus != azblob.CopyStatusSuccess { |
|
|
|
|
destProps, err := destBlob.GetProperties(ctx, azblob.BlobAccessConditions{}) |
|
|
|
|
if err != nil { |
|
|
|
|
return objInfo, azureToObjectError(err, srcBucket, srcObject) |
|
|
|
|
} |
|
|
|
|
copyStatus = destProps.CopyStatus() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Azure will copy metadata from the source object when an empty metadata map is provided.
|
|
|
|
|
// To handle the case where the source object should be copied without its metadata,
|
|
|
|
|
// the metadata must be removed from the dest. object after the copy completes
|
|
|
|
|
if len(azureMeta) == 0 && len(destBlob.Metadata) != 0 { |
|
|
|
|
destBlob.Metadata = azureMeta |
|
|
|
|
err = destBlob.SetMetadata(nil) |
|
|
|
|
if len(azureMeta) == 0 { |
|
|
|
|
_, err := destBlob.SetMetadata(ctx, azureMeta, azblob.BlobAccessConditions{}) |
|
|
|
|
if err != nil { |
|
|
|
|
return objInfo, azureToObjectError(err, srcBucket, srcObject) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
destBlob.Properties = props |
|
|
|
|
err = destBlob.SetProperties(nil) |
|
|
|
|
|
|
|
|
|
_, err = destBlob.SetHTTPHeaders(ctx, props, azblob.BlobAccessConditions{}) |
|
|
|
|
if err != nil { |
|
|
|
|
return objInfo, azureToObjectError(err, srcBucket, srcObject) |
|
|
|
|
} |
|
|
|
@ -864,10 +876,10 @@ func (a *azureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, des |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// DeleteObject - Deletes a blob on azure container, uses Azure
|
|
|
|
|
// equivalent DeleteBlob API.
|
|
|
|
|
// equivalent `BlobURL.Delete`.
|
|
|
|
|
func (a *azureObjects) DeleteObject(ctx context.Context, bucket, object string) error { |
|
|
|
|
blob := a.client.GetContainerReference(bucket).GetBlobReference(object) |
|
|
|
|
err := blob.Delete(nil) |
|
|
|
|
blob := a.client.NewContainerURL(bucket).NewBlobURL(object) |
|
|
|
|
_, err := blob.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}) |
|
|
|
|
if err != nil { |
|
|
|
|
return azureToObjectError(err, bucket, object) |
|
|
|
|
} |
|
|
|
@ -909,9 +921,9 @@ func getAzureMetadataPartPrefix(uploadID, objectName string) string { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (a *azureObjects) checkUploadIDExists(ctx context.Context, bucketName, objectName, uploadID string) (err error) { |
|
|
|
|
blob := a.client.GetContainerReference(bucketName).GetBlobReference( |
|
|
|
|
blobURL := a.client.NewContainerURL(bucketName).NewBlobURL( |
|
|
|
|
getAzureMetadataObjectName(objectName, uploadID)) |
|
|
|
|
err = blob.GetMetadata(nil) |
|
|
|
|
_, err = blobURL.GetProperties(ctx, azblob.BlobAccessConditions{}) |
|
|
|
|
err = azureToObjectError(err, bucketName, objectName) |
|
|
|
|
oerr := minio.ObjectNotFound{ |
|
|
|
|
Bucket: bucketName, |
|
|
|
@ -925,7 +937,7 @@ func (a *azureObjects) checkUploadIDExists(ctx context.Context, bucketName, obje |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// NewMultipartUpload - Use Azure equivalent CreateBlockBlob.
|
|
|
|
|
// NewMultipartUpload - Use Azure equivalent `BlobURL.Upload`.
|
|
|
|
|
func (a *azureObjects) NewMultipartUpload(ctx context.Context, bucket, object string, opts minio.ObjectOptions) (uploadID string, err error) { |
|
|
|
|
uploadID, err = getAzureUploadID() |
|
|
|
|
if err != nil { |
|
|
|
@ -940,8 +952,8 @@ func (a *azureObjects) NewMultipartUpload(ctx context.Context, bucket, object st |
|
|
|
|
return "", err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
blob := a.client.GetContainerReference(bucket).GetBlobReference(metadataObject) |
|
|
|
|
err = blob.CreateBlockBlobFromReader(bytes.NewBuffer(jsonData), nil) |
|
|
|
|
blobURL := a.client.NewContainerURL(bucket).NewBlockBlobURL(metadataObject) |
|
|
|
|
_, err = blobURL.Upload(ctx, bytes.NewReader(jsonData), azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}) |
|
|
|
|
if err != nil { |
|
|
|
|
return "", azureToObjectError(err, bucket, metadataObject) |
|
|
|
|
} |
|
|
|
@ -949,7 +961,7 @@ func (a *azureObjects) NewMultipartUpload(ctx context.Context, bucket, object st |
|
|
|
|
return uploadID, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// PutObjectPart - Use Azure equivalent PutBlockWithLength.
|
|
|
|
|
// PutObjectPart - Use Azure equivalent `BlobURL.StageBlock`.
|
|
|
|
|
func (a *azureObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *minio.PutObjReader, opts minio.ObjectOptions) (info minio.PartInfo, err error) { |
|
|
|
|
data := r.Reader |
|
|
|
|
if err = a.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { |
|
|
|
@ -961,20 +973,19 @@ func (a *azureObjects) PutObjectPart(ctx context.Context, bucket, object, upload |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
partMetaV1 := newPartMetaV1(uploadID, partID) |
|
|
|
|
subPartSize, subPartNumber := int64(azureBlockSize), 1 |
|
|
|
|
for remainingSize := data.Size(); remainingSize >= 0; remainingSize -= subPartSize { |
|
|
|
|
// Allow to create zero sized part.
|
|
|
|
|
if remainingSize == 0 && subPartNumber > 1 { |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
subPartSize, subPartNumber := int64(azureUploadChunkSize), 1 |
|
|
|
|
for remainingSize := data.Size(); remainingSize > 0; remainingSize -= subPartSize { |
|
|
|
|
if remainingSize < subPartSize { |
|
|
|
|
subPartSize = remainingSize |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
id := base64.StdEncoding.EncodeToString([]byte(minio.MustGetUUID())) |
|
|
|
|
blob := a.client.GetContainerReference(bucket).GetBlobReference(object) |
|
|
|
|
err = blob.PutBlockWithLength(id, uint64(subPartSize), io.LimitReader(data, subPartSize), nil) |
|
|
|
|
blobURL := a.client.NewContainerURL(bucket).NewBlockBlobURL(object) |
|
|
|
|
body, err := ioutil.ReadAll(io.LimitReader(data, subPartSize)) |
|
|
|
|
if err != nil { |
|
|
|
|
return info, azureToObjectError(err, bucket, object) |
|
|
|
|
} |
|
|
|
|
_, err = blobURL.StageBlock(ctx, id, bytes.NewReader(body), azblob.LeaseAccessConditions{}, nil) |
|
|
|
|
if err != nil { |
|
|
|
|
return info, azureToObjectError(err, bucket, object) |
|
|
|
|
} |
|
|
|
@ -994,8 +1005,8 @@ func (a *azureObjects) PutObjectPart(ctx context.Context, bucket, object, upload |
|
|
|
|
return info, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
blob := a.client.GetContainerReference(bucket).GetBlobReference(metadataObject) |
|
|
|
|
err = blob.CreateBlockBlobFromReader(bytes.NewBuffer(jsonData), nil) |
|
|
|
|
blobURL := a.client.NewContainerURL(bucket).NewBlockBlobURL(metadataObject) |
|
|
|
|
_, err = blobURL.Upload(ctx, bytes.NewReader(jsonData), azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}) |
|
|
|
|
if err != nil { |
|
|
|
|
return info, azureToObjectError(err, bucket, metadataObject) |
|
|
|
|
} |
|
|
|
@ -1007,7 +1018,7 @@ func (a *azureObjects) PutObjectPart(ctx context.Context, bucket, object, upload |
|
|
|
|
return info, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ListObjectParts - Use Azure equivalent GetBlockList.
|
|
|
|
|
// ListObjectParts - Use Azure equivalent `ContainerURL.ListBlobsHierarchySegment`.
|
|
|
|
|
func (a *azureObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (result minio.ListPartsInfo, err error) { |
|
|
|
|
if err = a.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { |
|
|
|
|
return result, err |
|
|
|
@ -1018,25 +1029,26 @@ func (a *azureObjects) ListObjectParts(ctx context.Context, bucket, object, uplo |
|
|
|
|
result.UploadID = uploadID |
|
|
|
|
result.MaxParts = maxParts |
|
|
|
|
|
|
|
|
|
azureListMarker := "" |
|
|
|
|
marker := azblob.Marker{Val: &azureListMarker} |
|
|
|
|
|
|
|
|
|
var parts []minio.PartInfo |
|
|
|
|
var marker, delimiter string |
|
|
|
|
var delimiter string |
|
|
|
|
maxKeys := maxPartsCount |
|
|
|
|
if partNumberMarker == 0 { |
|
|
|
|
maxKeys = maxParts |
|
|
|
|
} |
|
|
|
|
prefix := getAzureMetadataPartPrefix(uploadID, object) |
|
|
|
|
container := a.client.GetContainerReference(bucket) |
|
|
|
|
resp, err := container.ListBlobs(storage.ListBlobsParameters{ |
|
|
|
|
containerURL := a.client.NewContainerURL(bucket) |
|
|
|
|
resp, err := containerURL.ListBlobsHierarchySegment(ctx, marker, delimiter, azblob.ListBlobsSegmentOptions{ |
|
|
|
|
Prefix: prefix, |
|
|
|
|
Marker: marker, |
|
|
|
|
Delimiter: delimiter, |
|
|
|
|
MaxResults: uint(maxKeys), |
|
|
|
|
MaxResults: int32(maxKeys), |
|
|
|
|
}) |
|
|
|
|
if err != nil { |
|
|
|
|
return result, azureToObjectError(err, bucket, prefix) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for _, blob := range resp.Blobs { |
|
|
|
|
for _, blob := range resp.Segment.BlobItems { |
|
|
|
|
if delimiter == "" && !strings.HasPrefix(blob.Name, minio.GatewayMinioSysTmp) { |
|
|
|
|
// We filter out non minio.GatewayMinioSysTmp entries in the recursive listing.
|
|
|
|
|
continue |
|
|
|
@ -1045,7 +1057,7 @@ func (a *azureObjects) ListObjectParts(ctx context.Context, bucket, object, uplo |
|
|
|
|
if strings.HasSuffix(blob.Name, "azure.json") { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
if !isAzureMarker(marker) && blob.Name <= marker { |
|
|
|
|
if !isAzureMarker(*marker.Val) && blob.Name <= *marker.Val { |
|
|
|
|
// If the application used ListObjectsV1 style marker then we
|
|
|
|
|
// skip all the entries till we reach the marker.
|
|
|
|
|
continue |
|
|
|
@ -1055,11 +1067,12 @@ func (a *azureObjects) ListObjectParts(ctx context.Context, bucket, object, uplo |
|
|
|
|
return result, azureToObjectError(fmt.Errorf("Unexpected error"), bucket, object) |
|
|
|
|
} |
|
|
|
|
var metadata partMetadataV1 |
|
|
|
|
var metadataReader io.Reader |
|
|
|
|
blob := a.client.GetContainerReference(bucket).GetBlobReference(blob.Name) |
|
|
|
|
if metadataReader, err = blob.Get(nil); err != nil { |
|
|
|
|
blobURL := containerURL.NewBlobURL(blob.Name) |
|
|
|
|
blob, err := blobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false) |
|
|
|
|
if err != nil { |
|
|
|
|
return result, azureToObjectError(fmt.Errorf("Unexpected error"), bucket, object) |
|
|
|
|
} |
|
|
|
|
metadataReader := blob.Body(azblob.RetryReaderOptions{MaxRetryRequests: azureDownloadRetryAttempts}) |
|
|
|
|
if err = json.NewDecoder(metadataReader).Decode(&metadata); err != nil { |
|
|
|
|
logger.LogIf(ctx, err) |
|
|
|
|
return result, azureToObjectError(err, bucket, object) |
|
|
|
@ -1114,9 +1127,9 @@ func (a *azureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
for _, part := range lpi.Parts { |
|
|
|
|
pblob := a.client.GetContainerReference(bucket).GetBlobReference( |
|
|
|
|
pblob := a.client.NewContainerURL(bucket).NewBlobURL( |
|
|
|
|
getAzureMetadataPartName(object, uploadID, part.PartNumber)) |
|
|
|
|
pblob.Delete(nil) |
|
|
|
|
pblob.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}) |
|
|
|
|
} |
|
|
|
|
partNumberMarker = lpi.NextPartNumberMarker |
|
|
|
|
if !lpi.IsTruncated { |
|
|
|
@ -1124,12 +1137,13 @@ func (a *azureObjects) AbortMultipartUpload(ctx context.Context, bucket, object, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
blob := a.client.GetContainerReference(bucket).GetBlobReference( |
|
|
|
|
blobURL := a.client.NewContainerURL(bucket).NewBlobURL( |
|
|
|
|
getAzureMetadataObjectName(object, uploadID)) |
|
|
|
|
return blob.Delete(nil) |
|
|
|
|
_, err = blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}) |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// CompleteMultipartUpload - Use Azure equivalent PutBlockList.
|
|
|
|
|
// CompleteMultipartUpload - Use Azure equivalent `BlobURL.CommitBlockList`.
|
|
|
|
|
func (a *azureObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []minio.CompletePart, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) { |
|
|
|
|
metadataObject := getAzureMetadataObjectName(object, uploadID) |
|
|
|
|
if err = a.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { |
|
|
|
@ -1140,30 +1154,32 @@ func (a *azureObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje |
|
|
|
|
return objInfo, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var metadataReader io.Reader |
|
|
|
|
blob := a.client.GetContainerReference(bucket).GetBlobReference(metadataObject) |
|
|
|
|
if metadataReader, err = blob.Get(nil); err != nil { |
|
|
|
|
blobURL := a.client.NewContainerURL(bucket).NewBlobURL(metadataObject) |
|
|
|
|
blob, err := blobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false) |
|
|
|
|
if err != nil { |
|
|
|
|
return objInfo, azureToObjectError(err, bucket, metadataObject) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var metadata azureMultipartMetadata |
|
|
|
|
metadataReader := blob.Body(azblob.RetryReaderOptions{MaxRetryRequests: azureDownloadRetryAttempts}) |
|
|
|
|
if err = json.NewDecoder(metadataReader).Decode(&metadata); err != nil { |
|
|
|
|
logger.LogIf(ctx, err) |
|
|
|
|
return objInfo, azureToObjectError(err, bucket, metadataObject) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
objBlob := a.client.GetContainerReference(bucket).GetBlobReference(object) |
|
|
|
|
objBlob := a.client.NewContainerURL(bucket).NewBlockBlobURL(object) |
|
|
|
|
|
|
|
|
|
var allBlocks []storage.Block |
|
|
|
|
var allBlocks []string |
|
|
|
|
for i, part := range uploadedParts { |
|
|
|
|
var partMetadataReader io.Reader |
|
|
|
|
var partMetadata partMetadataV1 |
|
|
|
|
partMetadataObject := getAzureMetadataPartName(object, uploadID, part.PartNumber) |
|
|
|
|
pblob := a.client.GetContainerReference(bucket).GetBlobReference(partMetadataObject) |
|
|
|
|
if partMetadataReader, err = pblob.Get(nil); err != nil { |
|
|
|
|
pblobURL := a.client.NewContainerURL(bucket).NewBlobURL(partMetadataObject) |
|
|
|
|
pblob, err := pblobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false) |
|
|
|
|
if err != nil { |
|
|
|
|
return objInfo, azureToObjectError(err, bucket, partMetadataObject) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
partMetadataReader := pblob.Body(azblob.RetryReaderOptions{MaxRetryRequests: azureDownloadRetryAttempts}) |
|
|
|
|
if err = json.NewDecoder(partMetadataReader).Decode(&partMetadata); err != nil { |
|
|
|
|
logger.LogIf(ctx, err) |
|
|
|
|
return objInfo, azureToObjectError(err, bucket, partMetadataObject) |
|
|
|
@ -1172,9 +1188,7 @@ func (a *azureObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje |
|
|
|
|
if partMetadata.ETag != part.ETag { |
|
|
|
|
return objInfo, minio.InvalidPart{} |
|
|
|
|
} |
|
|
|
|
for _, blockID := range partMetadata.BlockIDs { |
|
|
|
|
allBlocks = append(allBlocks, storage.Block{ID: blockID, Status: storage.BlockStatusUncommitted}) |
|
|
|
|
} |
|
|
|
|
allBlocks = append(allBlocks, partMetadata.BlockIDs...) |
|
|
|
|
if i < (len(uploadedParts)-1) && partMetadata.Size < azureS3MinPartSize { |
|
|
|
|
return objInfo, minio.PartTooSmall{ |
|
|
|
|
PartNumber: uploadedParts[i].PartNumber, |
|
|
|
@ -1184,20 +1198,13 @@ func (a *azureObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
err = objBlob.PutBlockList(allBlocks, nil) |
|
|
|
|
if err != nil { |
|
|
|
|
return objInfo, azureToObjectError(err, bucket, object) |
|
|
|
|
} |
|
|
|
|
objBlob.Metadata, objBlob.Properties, err = s3MetaToAzureProperties(ctx, metadata.Metadata) |
|
|
|
|
objMetadata, objProperties, err := s3MetaToAzureProperties(ctx, metadata.Metadata) |
|
|
|
|
if err != nil { |
|
|
|
|
return objInfo, azureToObjectError(err, bucket, object) |
|
|
|
|
} |
|
|
|
|
objBlob.Metadata["md5sum"] = cmd.ComputeCompleteMultipartMD5(uploadedParts) |
|
|
|
|
err = objBlob.SetProperties(nil) |
|
|
|
|
if err != nil { |
|
|
|
|
return objInfo, azureToObjectError(err, bucket, object) |
|
|
|
|
} |
|
|
|
|
err = objBlob.SetMetadata(nil) |
|
|
|
|
objMetadata["md5sum"] = cmd.ComputeCompleteMultipartMD5(uploadedParts) |
|
|
|
|
|
|
|
|
|
_, err = objBlob.CommitBlockList(ctx, allBlocks, objProperties, objMetadata, azblob.BlobAccessConditions{}) |
|
|
|
|
if err != nil { |
|
|
|
|
return objInfo, azureToObjectError(err, bucket, object) |
|
|
|
|
} |
|
|
|
@ -1208,9 +1215,9 @@ func (a *azureObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
for _, part := range lpi.Parts { |
|
|
|
|
pblob := a.client.GetContainerReference(bucket).GetBlobReference( |
|
|
|
|
pblob := a.client.NewContainerURL(bucket).NewBlobURL( |
|
|
|
|
getAzureMetadataPartName(object, uploadID, part.PartNumber)) |
|
|
|
|
pblob.Delete(nil) |
|
|
|
|
pblob.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}) |
|
|
|
|
} |
|
|
|
|
partNumberMarker = lpi.NextPartNumberMarker |
|
|
|
|
if !lpi.IsTruncated { |
|
|
|
@ -1218,8 +1225,7 @@ func (a *azureObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
blob = a.client.GetContainerReference(bucket).GetBlobReference(metadataObject) |
|
|
|
|
derr := blob.Delete(nil) |
|
|
|
|
_, derr := blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}) |
|
|
|
|
logger.GetReqInfo(ctx).AppendTags("uploadID", uploadID) |
|
|
|
|
logger.LogIf(ctx, derr) |
|
|
|
|
|
|
|
|
@ -1227,9 +1233,9 @@ func (a *azureObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// SetBucketPolicy - Azure supports three types of container policies:
|
|
|
|
|
// storage.ContainerAccessTypeContainer - readonly in minio terminology
|
|
|
|
|
// storage.ContainerAccessTypeBlob - readonly without listing in minio terminology
|
|
|
|
|
// storage.ContainerAccessTypePrivate - none in minio terminology
|
|
|
|
|
// azblob.PublicAccessContainer - readonly in minio terminology
|
|
|
|
|
// azblob.PublicAccessBlob - readonly without listing in minio terminology
|
|
|
|
|
// azblob.PublicAccessNone - none in minio terminology
|
|
|
|
|
// As the common denominator for minio and azure is readonly and none, we support
|
|
|
|
|
// these two policies at the bucket level.
|
|
|
|
|
func (a *azureObjects) SetBucketPolicy(ctx context.Context, bucket string, bucketPolicy *policy.Policy) error { |
|
|
|
@ -1257,26 +1263,25 @@ func (a *azureObjects) SetBucketPolicy(ctx context.Context, bucket string, bucke |
|
|
|
|
if policies[0].Policy != miniogopolicy.BucketPolicyReadOnly { |
|
|
|
|
return minio.NotImplemented{} |
|
|
|
|
} |
|
|
|
|
perm := storage.ContainerPermissions{ |
|
|
|
|
AccessType: storage.ContainerAccessTypeContainer, |
|
|
|
|
AccessPolicies: nil, |
|
|
|
|
} |
|
|
|
|
container := a.client.GetContainerReference(bucket) |
|
|
|
|
err = container.SetPermissions(perm, nil) |
|
|
|
|
perm := azblob.PublicAccessContainer |
|
|
|
|
container := a.client.NewContainerURL(bucket) |
|
|
|
|
_, err = container.SetAccessPolicy(ctx, perm, nil, azblob.ContainerAccessConditions{}) |
|
|
|
|
return azureToObjectError(err, bucket) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GetBucketPolicy - Get the container ACL and convert it to canonical []bucketAccessPolicy
|
|
|
|
|
func (a *azureObjects) GetBucketPolicy(ctx context.Context, bucket string) (*policy.Policy, error) { |
|
|
|
|
container := a.client.GetContainerReference(bucket) |
|
|
|
|
perm, err := container.GetPermissions(nil) |
|
|
|
|
container := a.client.NewContainerURL(bucket) |
|
|
|
|
perm, err := container.GetAccessPolicy(ctx, azblob.LeaseAccessConditions{}) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, azureToObjectError(err, bucket) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if perm.AccessType == storage.ContainerAccessTypePrivate { |
|
|
|
|
permAccessType := perm.BlobPublicAccess() |
|
|
|
|
|
|
|
|
|
if permAccessType == azblob.PublicAccessNone { |
|
|
|
|
return nil, minio.BucketPolicyNotFound{Bucket: bucket} |
|
|
|
|
} else if perm.AccessType != storage.ContainerAccessTypeContainer { |
|
|
|
|
} else if permAccessType != azblob.PublicAccessContainer { |
|
|
|
|
return nil, azureToObjectError(minio.NotImplemented{}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1303,12 +1308,9 @@ func (a *azureObjects) GetBucketPolicy(ctx context.Context, bucket string) (*pol |
|
|
|
|
|
|
|
|
|
// DeleteBucketPolicy - Set the container ACL to "private"
|
|
|
|
|
func (a *azureObjects) DeleteBucketPolicy(ctx context.Context, bucket string) error { |
|
|
|
|
perm := storage.ContainerPermissions{ |
|
|
|
|
AccessType: storage.ContainerAccessTypePrivate, |
|
|
|
|
AccessPolicies: nil, |
|
|
|
|
} |
|
|
|
|
container := a.client.GetContainerReference(bucket) |
|
|
|
|
err := container.SetPermissions(perm, nil) |
|
|
|
|
perm := azblob.PublicAccessNone |
|
|
|
|
containerURL := a.client.NewContainerURL(bucket) |
|
|
|
|
_, err := containerURL.SetAccessPolicy(ctx, perm, nil, azblob.ContainerAccessConditions{}) |
|
|
|
|
return azureToObjectError(err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|