gcs: Save partNumber as part of backend format. (#4666)

Fixes #4637
master
Harshavardhana 7 years ago committed by Krishna Srinivas
parent ce7c9c651d
commit bc73a1a1cb
  1. 22
      cmd/gateway-gcs.go
  2. 12
      cmd/gateway-gcs_test.go

@ -80,8 +80,8 @@ func gcsMultipartMetaName(uploadID string) string {
} }
// Returns name of the part object. // Returns name of the part object.
func gcsMultipartDataName(uploadID, etag string) string { func gcsMultipartDataName(uploadID string, partNumber int, etag string) string {
return fmt.Sprintf("%s/%s/%s", gcsMinioMultipartPathV1, uploadID, etag) return fmt.Sprintf("%s/%s/%05d.%s", gcsMinioMultipartPathV1, uploadID, partNumber, etag)
} }
// Convert Minio errors to minio object layer errors. // Convert Minio errors to minio object layer errors.
@ -800,7 +800,7 @@ func (l *gcsGateway) checkUploadIDExists(bucket string, key string, uploadID str
} }
// PutObjectPart puts a part of object in bucket // PutObjectPart puts a part of object in bucket
func (l *gcsGateway) PutObjectPart(bucket string, key string, uploadID string, partID int, size int64, data io.Reader, md5Hex string, sha256sum string) (PartInfo, error) { func (l *gcsGateway) PutObjectPart(bucket string, key string, uploadID string, partNumber int, size int64, data io.Reader, md5Hex string, sha256sum string) (PartInfo, error) {
if err := l.checkUploadIDExists(bucket, key, uploadID); err != nil { if err := l.checkUploadIDExists(bucket, key, uploadID); err != nil {
return PartInfo{}, err return PartInfo{}, err
} }
@ -822,7 +822,7 @@ func (l *gcsGateway) PutObjectPart(bucket string, key string, uploadID string, p
reader = io.TeeReader(data, sha256Writer) reader = io.TeeReader(data, sha256Writer)
} }
object := l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, etag)) object := l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, partNumber, etag))
w := object.NewWriter(l.ctx) w := object.NewWriter(l.ctx)
// Disable "chunked" uploading in GCS client. If enabled, it can cause a corner case // Disable "chunked" uploading in GCS client. If enabled, it can cause a corner case
// where it tries to upload 0 bytes in the last chunk and get error from server. // where it tries to upload 0 bytes in the last chunk and get error from server.
@ -855,7 +855,7 @@ func (l *gcsGateway) PutObjectPart(bucket string, key string, uploadID string, p
} }
return PartInfo{ return PartInfo{
PartNumber: partID, PartNumber: partNumber,
ETag: etag, ETag: etag,
LastModified: UTCNow(), LastModified: UTCNow(),
Size: size, Size: size,
@ -942,7 +942,13 @@ func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID
var parts []*storage.ObjectHandle var parts []*storage.ObjectHandle
for _, uploadedPart := range uploadedParts { for _, uploadedPart := range uploadedParts {
parts = append(parts, l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, uploadedPart.ETag))) parts = append(parts, l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID,
uploadedPart.PartNumber, uploadedPart.ETag)))
}
// Returns name of the composed object.
gcsMultipartComposeName := func(uploadID string, composeNumber int) string {
return fmt.Sprintf("%s/tmp/%s/composed-object-%05d", gcsMinioPath, uploadID, composeNumber)
} }
composeCount := int(math.Ceil(float64(len(parts)) / float64(maxComponents))) composeCount := int(math.Ceil(float64(len(parts)) / float64(maxComponents)))
@ -951,9 +957,7 @@ func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID
composeParts := make([]*storage.ObjectHandle, composeCount) composeParts := make([]*storage.ObjectHandle, composeCount)
for i := 0; i < composeCount; i++ { for i := 0; i < composeCount; i++ {
// Create 'composed-object-N' using next 32 parts. // Create 'composed-object-N' using next 32 parts.
composeName := fmt.Sprintf("composed-object-%d", i) composeParts[i] = l.client.Bucket(bucket).Object(gcsMultipartComposeName(uploadID, i))
composeParts[i] = l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, composeName))
start := i * maxComponents start := i * maxComponents
end := start + maxComponents end := start + maxComponents
if end > len(parts) { if end > len(parts) {

@ -17,6 +17,7 @@
package cmd package cmd
import ( import (
"fmt"
"reflect" "reflect"
"testing" "testing"
@ -144,10 +145,13 @@ func TestGCSMultipartMetaName(t *testing.T) {
// Test for gcsMultipartDataName. // Test for gcsMultipartDataName.
func TestGCSMultipartDataName(t *testing.T) { func TestGCSMultipartDataName(t *testing.T) {
uploadID := "a" var (
etag := "b" uploadID = "a"
expected := pathJoin(gcsMinioMultipartPathV1, uploadID, etag) etag = "b"
got := gcsMultipartDataName(uploadID, etag) partNumber = 1
)
expected := pathJoin(gcsMinioMultipartPathV1, uploadID, fmt.Sprintf("%05d.%s", partNumber, etag))
got := gcsMultipartDataName(uploadID, partNumber, etag)
if expected != got { if expected != got {
t.Errorf("expected: %s, got: %s", expected, got) t.Errorf("expected: %s, got: %s", expected, got)
} }

Loading…
Cancel
Save