diff --git a/pkg/donut/donut-v2.go b/pkg/donut/donut-v2.go index 54aebd25c..41fbb2fa3 100644 --- a/pkg/donut/donut-v2.go +++ b/pkg/donut/donut-v2.go @@ -361,6 +361,7 @@ func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, s donut.storedBuckets.Set(bucket, storedBucket) return objMetadata, nil } + // calculate md5 hash := md5.New() sha256hash := sha256.New() diff --git a/pkg/donut/multipart.go b/pkg/donut/multipart.go index 1c779ac13..e500443bf 100644 --- a/pkg/donut/multipart.go +++ b/pkg/donut/multipart.go @@ -272,18 +272,63 @@ func (donut API) cleanupMultipartSession(bucket, key, uploadID string) { donut.storedBuckets.Set(bucket, storedBucket) } +func (donut API) mergeMultipart(parts *CompleteMultipartUpload, uploadID string, fullObjectWriter *io.PipeWriter) { + for _, part := range parts.Part { + recvMD5 := part.ETag + object, ok := donut.multiPartObjects[uploadID].Get(part.PartNumber) + if ok == false { + fullObjectWriter.CloseWithError(probe.WrapError(probe.NewError(InvalidPart{}))) + return + } + calcMD5Bytes := md5.Sum(object) + // complete multi part request header md5sum per part is hex encoded + recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\"")) + if err != nil { + fullObjectWriter.CloseWithError(probe.WrapError(probe.NewError(InvalidDigest{Md5: recvMD5}))) + return + } + if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) { + fullObjectWriter.CloseWithError(probe.WrapError(probe.NewError(BadDigest{}))) + return + } + + if _, err := io.Copy(fullObjectWriter, bytes.NewReader(object)); err != nil { + fullObjectWriter.CloseWithError(probe.WrapError(probe.NewError(err))) + return + } + object = nil + } + fullObjectWriter.Close() + return +} + // CompleteMultipartUpload - complete a multipart upload and persist the data func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error) { donut.lock.Lock() + defer donut.lock.Unlock() + size := int64(donut.multiPartObjects[uploadID].Stats().Bytes) + fullObjectReader, err := donut.completeMultipartUploadV2(bucket, key, uploadID, data, signature) + if err != nil { + return ObjectMetadata{}, err.Trace() + } + objectMetadata, err := donut.createObject(bucket, key, "", "", size, fullObjectReader, nil) + if err != nil { + // No need to call internal cleanup functions here, caller should call AbortMultipartUpload() + // which would in-turn cleanup properly in accordance with S3 Spec + return ObjectMetadata{}, err.Trace() + } + donut.cleanupMultipartSession(bucket, key, uploadID) + return objectMetadata, nil +} +func (donut API) completeMultipartUploadV2(bucket, key, uploadID string, data io.Reader, signature *Signature) (io.Reader, *probe.Error) { if !IsValidBucket(bucket) { - donut.lock.Unlock() - return ObjectMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) + return nil, probe.NewError(BucketNameInvalid{Bucket: bucket}) } if !IsValidObjectName(key) { - donut.lock.Unlock() - return ObjectMetadata{}, probe.NewError(ObjectNameInvalid{Object: key}) + return nil, probe.NewError(ObjectNameInvalid{Object: key}) } + // TODO: multipart support for donut is broken, since we haven't finalized the format in which // it can be stored, disabling this for now until we get the underlying layout stable. // @@ -293,88 +338,38 @@ func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, data io.R // } if !donut.storedBuckets.Exists(bucket) { - donut.lock.Unlock() - return ObjectMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket}) + return nil, probe.NewError(BucketNotFound{Bucket: bucket}) } storedBucket := donut.storedBuckets.Get(bucket).(storedBucket) // Verify upload id if storedBucket.multiPartSession[key].UploadID != uploadID { - donut.lock.Unlock() - return ObjectMetadata{}, probe.NewError(InvalidUploadID{UploadID: uploadID}) + return nil, probe.NewError(InvalidUploadID{UploadID: uploadID}) } partBytes, err := ioutil.ReadAll(data) if err != nil { - donut.lock.Unlock() - return ObjectMetadata{}, probe.NewError(err) + return nil, probe.NewError(err) } if signature != nil { ok, err := signature.DoesSignatureMatch(hex.EncodeToString(sha256.Sum256(partBytes)[:])) if err != nil { - donut.lock.Unlock() - return ObjectMetadata{}, err.Trace() + return nil, err.Trace() } if !ok { - donut.lock.Unlock() - return ObjectMetadata{}, probe.NewError(SignatureDoesNotMatch{}) + return nil, probe.NewError(SignatureDoesNotMatch{}) } } parts := &CompleteMultipartUpload{} if err := xml.Unmarshal(partBytes, parts); err != nil { - donut.lock.Unlock() - return ObjectMetadata{}, probe.NewError(MalformedXML{}) + return nil, probe.NewError(MalformedXML{}) } if !sort.IsSorted(completedParts(parts.Part)) { - donut.lock.Unlock() - return ObjectMetadata{}, probe.NewError(InvalidPartOrder{}) - } - - var size int64 - var fullObject bytes.Buffer - for i := 0; i < len(parts.Part); i++ { - recvMD5 := parts.Part[i].ETag - object, ok := donut.multiPartObjects[uploadID].Get(parts.Part[i].PartNumber) - if ok == false { - donut.lock.Unlock() - return ObjectMetadata{}, probe.NewError(InvalidPart{}) - } - size += int64(len(object)) - calcMD5Bytes := md5.Sum(object) - // complete multi part request header md5sum per part is hex encoded - recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\"")) - if err != nil { - donut.lock.Unlock() - return ObjectMetadata{}, probe.NewError(InvalidDigest{Md5: recvMD5}) - } - if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) { - donut.lock.Unlock() - return ObjectMetadata{}, probe.NewError(BadDigest{}) - } - if _, err := io.Copy(&fullObject, bytes.NewBuffer(object)); err != nil { - donut.lock.Unlock() - return ObjectMetadata{}, probe.NewError(err) - } - object = nil - go debug.FreeOSMemory() + return nil, probe.NewError(InvalidPartOrder{}) } - md5sumSlice := md5.Sum(fullObject.Bytes()) - // this is needed for final verification inside CreateObject, do not convert this to hex - md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:]) - donut.lock.Unlock() - { - objectMetadata, err := donut.CreateObject(bucket, key, md5sum, size, &fullObject, nil, nil) - if err != nil { - // No need to call internal cleanup functions here, caller should call AbortMultipartUpload() - // which would in-turn cleanup properly in accordance with S3 Spec - return ObjectMetadata{}, err.Trace() - } - fullObject.Reset() + fullObjectReader, fullObjectWriter := io.Pipe() + go donut.mergeMultipart(parts, uploadID, fullObjectWriter) - donut.lock.Lock() - donut.cleanupMultipartSession(bucket, key, uploadID) - donut.lock.Unlock() - return objectMetadata, nil - } + return fullObjectReader, nil } // byKey is a sortable interface for UploadMetadata slice