Set default ETag value if vendor returns empty md5 string (#4409)

The ETag is constructed from md5 atttribute of object attributes
returned by the vendor's Composer. The md5 attribute comes back
as nil for large uploads. Instead the CRC32C should be used.

Refer to https://cloud.google.com/storage/docs/hashes-etags

Fixes #4397
master
poornas 8 years ago committed by Harshavardhana
parent e4e0abfc05
commit 9bd0eb1a9e
  1. 51
      cmd/gateway-gcs.go

@ -17,7 +17,6 @@
package cmd package cmd
import ( import (
"bytes"
"context" "context"
"crypto/sha256" "crypto/sha256"
"encoding/base64" "encoding/base64"
@ -356,7 +355,7 @@ func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, de
Bucket: attrs.Bucket, Bucket: attrs.Bucket,
ModTime: attrs.Updated, ModTime: attrs.Updated,
Size: attrs.Size, Size: attrs.Size,
ETag: hex.EncodeToString(attrs.MD5), ETag: fmt.Sprintf("%d", attrs.CRC32C),
UserDefined: attrs.Metadata, UserDefined: attrs.Metadata,
ContentType: attrs.ContentType, ContentType: attrs.ContentType,
ContentEncoding: attrs.ContentEncoding, ContentEncoding: attrs.ContentEncoding,
@ -401,7 +400,7 @@ func (l *gcsGateway) ListObjectsV2(bucket, prefix, continuationToken string, fet
continue continue
} }
objects = append(objects, fromGCSObjectInfo(attrs)) objects = append(objects, fromGCSAttrsToObjectInfo(attrs))
} }
return ListObjectsV2Info{ return ListObjectsV2Info{
@ -441,18 +440,21 @@ func (l *gcsGateway) GetObject(bucket string, key string, startOffset int64, len
return nil return nil
} }
// fromGCSObjectInfo converts GCS BucketAttrs to gateway ObjectInfo // fromGCSAttrsToObjectInfo converts GCS BucketAttrs to gateway ObjectInfo
func fromGCSObjectInfo(attrs *storage.ObjectAttrs) ObjectInfo { func fromGCSAttrsToObjectInfo(attrs *storage.ObjectAttrs) ObjectInfo {
// All google cloud storage objects have a CRC32c hash, whereas composite objects may not have a MD5 hash
// Refer https://cloud.google.com/storage/docs/hashes-etags. Use CRC32C for ETag
return ObjectInfo{ return ObjectInfo{
Name: attrs.Name, Name: attrs.Name,
Bucket: attrs.Bucket, Bucket: attrs.Bucket,
ModTime: attrs.Updated, ModTime: attrs.Updated,
Size: attrs.Size, Size: attrs.Size,
ETag: hex.EncodeToString(attrs.MD5), ETag: fmt.Sprintf("%d", attrs.CRC32C),
UserDefined: attrs.Metadata, UserDefined: attrs.Metadata,
ContentType: attrs.ContentType, ContentType: attrs.ContentType,
ContentEncoding: attrs.ContentEncoding, ContentEncoding: attrs.ContentEncoding,
} }
} }
// GetObjectInfo - reads object info and replies back ObjectInfo // GetObjectInfo - reads object info and replies back ObjectInfo
@ -464,11 +466,14 @@ func (l *gcsGateway) GetObjectInfo(bucket string, object string) (ObjectInfo, er
} }
attrs, err := l.client.Bucket(bucket).Object(object).Attrs(l.ctx) attrs, err := l.client.Bucket(bucket).Object(object).Attrs(l.ctx)
if err != nil { if err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, object) return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, object)
} }
objInfo := fromGCSAttrsToObjectInfo(attrs)
objInfo.ETag = fmt.Sprintf("%d", attrs.CRC32C)
return fromGCSObjectInfo(attrs), nil return objInfo, nil
} }
// PutObject - Create a new object with the incoming data, // PutObject - Create a new object with the incoming data,
@ -499,6 +504,13 @@ func (l *gcsGateway) PutObject(bucket string, key string, size int64, data io.Re
w.ContentType = metadata["content-type"] w.ContentType = metadata["content-type"]
w.ContentEncoding = metadata["content-encoding"] w.ContentEncoding = metadata["content-encoding"]
if md5sum == "" {
} else if md5, err := hex.DecodeString(md5sum); err != nil {
return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key)
} else {
w.MD5 = md5
}
w.Metadata = metadata w.Metadata = metadata
_, err := io.Copy(w, teeReader) _, err := io.Copy(w, teeReader)
@ -522,14 +534,7 @@ func (l *gcsGateway) PutObject(bucket string, key string, size int64, data io.Re
return ObjectInfo{}, traceError(SHA256Mismatch{}) return ObjectInfo{}, traceError(SHA256Mismatch{})
} }
if md5sum == "" { return fromGCSAttrsToObjectInfo(attrs), nil
} else if b, err := hex.DecodeString(md5sum); err != nil {
} else if bytes.Compare(b, attrs.MD5) != 0 {
object.Delete(l.ctx)
return ObjectInfo{}, traceError(SignatureDoesNotMatch{})
}
return fromGCSObjectInfo(attrs), nil
} }
// CopyObject - Copies a blob from source container to destination container. // CopyObject - Copies a blob from source container to destination container.
@ -542,7 +547,7 @@ func (l *gcsGateway) CopyObject(srcBucket string, srcObject string, destBucket s
return ObjectInfo{}, gcsToObjectError(traceError(err), destBucket, destObject) return ObjectInfo{}, gcsToObjectError(traceError(err), destBucket, destObject)
} }
return fromGCSObjectInfo(attrs), nil return fromGCSAttrsToObjectInfo(attrs), nil
} }
// DeleteObject - Deletes a blob in bucket // DeleteObject - Deletes a blob in bucket
@ -747,7 +752,7 @@ func (l *gcsGateway) ListObjectParts(bucket string, key string, uploadID string,
parts = append(parts, PartInfo{ parts = append(parts, PartInfo{
PartNumber: partID, PartNumber: partID,
LastModified: attrs.Updated, LastModified: attrs.Updated,
ETag: hex.EncodeToString(attrs.MD5), ETag: fmt.Sprintf("%d", attrs.CRC32C),
Size: attrs.Size, Size: attrs.Size,
}) })
} }
@ -805,11 +810,12 @@ func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID
parts := make([]*storage.ObjectHandle, len(uploadedParts)) parts := make([]*storage.ObjectHandle, len(uploadedParts))
for i, uploadedPart := range uploadedParts { for i, uploadedPart := range uploadedParts {
object := l.client.Bucket(bucket).Object(toGCSMultipartKey(key, uploadID, uploadedPart.PartNumber)) object := l.client.Bucket(bucket).Object(toGCSMultipartKey(key, uploadID, uploadedPart.PartNumber))
attrs, partErr := object.Attrs(l.ctx)
if etag, partErr := hex.DecodeString(uploadedPart.ETag); partErr != nil { if partErr != nil {
} else if attrs, partErr := object.Attrs(l.ctx); partErr != nil {
return ObjectInfo{}, gcsToObjectError(traceError(partErr), bucket, key) return ObjectInfo{}, gcsToObjectError(traceError(partErr), bucket, key)
} else if bytes.Compare(attrs.MD5, etag) != 0 { }
crc32cStr := fmt.Sprintf("%d", attrs.CRC32C)
if crc32cStr != uploadedPart.ETag {
return ObjectInfo{}, gcsToObjectError(traceError(InvalidPart{}), bucket, key) return ObjectInfo{}, gcsToObjectError(traceError(InvalidPart{}), bucket, key)
} }
@ -832,7 +838,6 @@ func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID
composer.Metadata = partZeroAttrs.Metadata composer.Metadata = partZeroAttrs.Metadata
attrs, err := composer.Run(l.ctx) attrs, err := composer.Run(l.ctx)
// cleanup, delete all parts // cleanup, delete all parts
for _, uploadedPart := range uploadedParts { for _, uploadedPart := range uploadedParts {
l.client.Bucket(bucket).Object(toGCSMultipartKey(key, uploadID, uploadedPart.PartNumber)).Delete(l.ctx) l.client.Bucket(bucket).Object(toGCSMultipartKey(key, uploadID, uploadedPart.PartNumber)).Delete(l.ctx)
@ -840,7 +845,7 @@ func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID
partZero.Delete(l.ctx) partZero.Delete(l.ctx)
return fromGCSObjectInfo(attrs), gcsToObjectError(traceError(err), bucket, key) return fromGCSAttrsToObjectInfo(attrs), gcsToObjectError(traceError(err), bucket, key)
} }
// SetBucketPolicies - Set policy on bucket // SetBucketPolicies - Set policy on bucket

Loading…
Cancel
Save