From fbab7128d5f5773c6e03d5c93ca99a8eaba172a3 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sun, 14 Feb 2016 00:39:13 -0800 Subject: [PATCH] fs/multipart: Handle un-ordered creation of multiparts. Fixes #1125 --- pkg/fs/fs-multipart.go | 38 +++++++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/pkg/fs/fs-multipart.go b/pkg/fs/fs-multipart.go index 1e6374a2d..672d9e57a 100644 --- a/pkg/fs/fs-multipart.go +++ b/pkg/fs/fs-multipart.go @@ -122,6 +122,9 @@ func doPartsMatch(parts []CompletePart, savedParts []PartMetadata) bool { if parts == nil || savedParts == nil { return false } + if len(parts) != len(savedParts) { + return false + } // Range of incoming parts and compare them with saved parts. for i, part := range parts { if strings.Trim(part.ETag, "\"") != savedParts[i].ETag { @@ -245,6 +248,7 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E mpartSession.ObjectName = object mpartSession.UploadID = uploadID mpartSession.Initiated = time.Now().UTC() + // Multipart has maximum of 10000 parts. var parts []PartMetadata mpartSession.Parts = parts @@ -257,6 +261,26 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E return uploadID, nil } +// Remove all duplicated parts based on the latest time of their upload. +func removeDuplicateParts(parts []PartMetadata) []PartMetadata { + length := len(parts) - 1 + for i := 0; i < length; i++ { + for j := i + 1; j <= length; j++ { + if parts[i].PartNumber == parts[j].PartNumber { + if parts[i].LastModified.Sub(parts[j].LastModified) > 0 { + parts[i] = parts[length] + } else { + parts[j] = parts[length] + } + parts = parts[0:length] + length-- + j-- + } + } + } + return parts +} + // partNumber is a sortable interface for Part slice. type partNumber []PartMetadata @@ -374,14 +398,14 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s return "", probe.NewError(InvalidUploadID{UploadID: uploadID}) } - // Append any pre-existing partNumber with new metadata, otherwise - // append to the list. - if len(deserializedMultipartSession.Parts) < partID { - deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, partMetadata) - } else { - deserializedMultipartSession.Parts[partID-1] = partMetadata - } + // Add all incoming parts. + deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, partMetadata) + + // Remove duplicate parts based on the most recent uploaded. + deserializedMultipartSession.Parts = removeDuplicateParts(deserializedMultipartSession.Parts) + // Save total parts uploaded. deserializedMultipartSession.TotalParts = len(deserializedMultipartSession.Parts) + // Sort by part number before saving. sort.Sort(partNumber(deserializedMultipartSession.Parts))