Merge pull request #1126 from harshavardhana/fix-multipart

fs/multipart: Handle un-ordered creation of multiparts.
master
Harshavardhana 9 years ago
commit e73301944a
  1. 38
      pkg/fs/fs-multipart.go

@ -122,6 +122,9 @@ func doPartsMatch(parts []CompletePart, savedParts []PartMetadata) bool {
if parts == nil || savedParts == nil { if parts == nil || savedParts == nil {
return false return false
} }
if len(parts) != len(savedParts) {
return false
}
// Range of incoming parts and compare them with saved parts. // Range of incoming parts and compare them with saved parts.
for i, part := range parts { for i, part := range parts {
if strings.Trim(part.ETag, "\"") != savedParts[i].ETag { if strings.Trim(part.ETag, "\"") != savedParts[i].ETag {
@ -245,6 +248,7 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E
mpartSession.ObjectName = object mpartSession.ObjectName = object
mpartSession.UploadID = uploadID mpartSession.UploadID = uploadID
mpartSession.Initiated = time.Now().UTC() mpartSession.Initiated = time.Now().UTC()
// Multipart has maximum of 10000 parts.
var parts []PartMetadata var parts []PartMetadata
mpartSession.Parts = parts mpartSession.Parts = parts
@ -257,6 +261,26 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E
return uploadID, nil return uploadID, nil
} }
// Remove all duplicated parts based on the latest time of their upload.
func removeDuplicateParts(parts []PartMetadata) []PartMetadata {
length := len(parts) - 1
for i := 0; i < length; i++ {
for j := i + 1; j <= length; j++ {
if parts[i].PartNumber == parts[j].PartNumber {
if parts[i].LastModified.Sub(parts[j].LastModified) > 0 {
parts[i] = parts[length]
} else {
parts[j] = parts[length]
}
parts = parts[0:length]
length--
j--
}
}
}
return parts
}
// partNumber is a sortable interface for Part slice. // partNumber is a sortable interface for Part slice.
type partNumber []PartMetadata type partNumber []PartMetadata
@ -374,14 +398,14 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
return "", probe.NewError(InvalidUploadID{UploadID: uploadID}) return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
} }
// Append any pre-existing partNumber with new metadata, otherwise // Add all incoming parts.
// append to the list. deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, partMetadata)
if len(deserializedMultipartSession.Parts) < partID {
deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, partMetadata) // Remove duplicate parts based on the most recent uploaded.
} else { deserializedMultipartSession.Parts = removeDuplicateParts(deserializedMultipartSession.Parts)
deserializedMultipartSession.Parts[partID-1] = partMetadata // Save total parts uploaded.
}
deserializedMultipartSession.TotalParts = len(deserializedMultipartSession.Parts) deserializedMultipartSession.TotalParts = len(deserializedMultipartSession.Parts)
// Sort by part number before saving. // Sort by part number before saving.
sort.Sort(partNumber(deserializedMultipartSession.Parts)) sort.Sort(partNumber(deserializedMultipartSession.Parts))

Loading…
Cancel
Save