From 69bd001c8bfa69660037d4725e77e797f6987c33 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 5 Feb 2016 16:47:31 -0800 Subject: [PATCH] multipart: Multipart resume simplify further. --- pkg/fs/definitions.go | 2 +- pkg/fs/fs-multipart.go | 85 ++++++++++++++++-------------------------- pkg/fs/fs.go | 2 +- 3 files changed, 34 insertions(+), 55 deletions(-) diff --git a/pkg/fs/definitions.go b/pkg/fs/definitions.go index a6d8d036b..a69f6a7f4 100644 --- a/pkg/fs/definitions.go +++ b/pkg/fs/definitions.go @@ -91,7 +91,7 @@ type ObjectResourcesMetadata struct { MaxParts int IsTruncated bool - Part []*PartMetadata + Part []PartMetadata EncodingType string } diff --git a/pkg/fs/fs-multipart.go b/pkg/fs/fs-multipart.go index 443a7eea8..28421f3e7 100644 --- a/pkg/fs/fs-multipart.go +++ b/pkg/fs/fs-multipart.go @@ -20,7 +20,6 @@ import ( "crypto/md5" "encoding/base64" "encoding/hex" - "encoding/json" "encoding/xml" "errors" "fmt" @@ -120,7 +119,7 @@ func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipa // verify if parts sent over the network do really match with what we // have for the session. -func doPartsMatch(parts []CompletePart, savedParts []*PartMetadata) bool { +func doPartsMatch(parts []CompletePart, savedParts []PartMetadata) bool { if parts == nil || savedParts == nil { return false } @@ -155,14 +154,11 @@ func MultiCloser(closers ...io.Closer) io.Closer { } // removeParts - remove all parts. -func removeParts(partPathPrefix string, parts []*PartMetadata) *probe.Error { +func removeParts(partPathPrefix string, parts []PartMetadata) *probe.Error { for _, part := range parts { - if e := os.Remove(partPathPrefix + fmt.Sprintf("$%d-$multiparts", part.PartNumber)); e != nil { - return probe.NewError(e) - } - } - if e := os.Remove(partPathPrefix + "$multiparts"); e != nil { - return probe.NewError(e) + // We are on purpose ignoring the return values here, since + // another thread would have purged these entries. + os.Remove(partPathPrefix + fmt.Sprintf("$%d-$multiparts", part.PartNumber)) } return nil } @@ -245,31 +241,21 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E mpartSession.TotalParts = 0 mpartSession.UploadID = uploadID mpartSession.Initiated = time.Now().UTC() - var parts []*PartMetadata + var parts []PartMetadata mpartSession.Parts = parts fs.rwLock.Lock() fs.multiparts.ActiveSession[object] = mpartSession fs.rwLock.Unlock() - mpartSessionBytes, e := json.Marshal(mpartSession) - if e != nil { - return "", probe.NewError(e) - } - - partPathPrefix := objectPath + uploadID - if e = ioutil.WriteFile(partPathPrefix+"$multiparts", mpartSessionBytes, 0600); e != nil { - return "", probe.NewError(e) - } - if err := saveMultipartsSession(*fs.multiparts); err != nil { - return "", err.Trace(partPathPrefix) + return "", err.Trace(objectPath) } return uploadID, nil } // partNumber is a sortable interface for Part slice. -type partNumber []*PartMetadata +type partNumber []PartMetadata func (a partNumber) Len() int { return len(a) } func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] } @@ -377,32 +363,32 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s partMetadata.Size = fi.Size() partMetadata.LastModified = fi.ModTime() - multipartSessionBytes, e := ioutil.ReadFile(partPathPrefix + "$multiparts") - if e != nil { - return "", probe.NewError(e) + fs.rwLock.RLock() + deserializedMultipartSession, ok := fs.multiparts.ActiveSession[object] + fs.rwLock.RUnlock() + if !ok { + return "", probe.NewError(InvalidUploadID{UploadID: uploadID}) } - var deserializedMultipartSession MultipartSession - if e = json.Unmarshal(multipartSessionBytes, &deserializedMultipartSession); e != nil { - return "", probe.NewError(e) + // Append any pre-existing partNumber with new metadata, otherwise + // append to the list. + if len(deserializedMultipartSession.Parts) < partID { + deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, partMetadata) + } else { + deserializedMultipartSession.Parts[partID-1] = partMetadata } - deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, &partMetadata) - deserializedMultipartSession.TotalParts++ + deserializedMultipartSession.TotalParts = len(deserializedMultipartSession.Parts) fs.rwLock.Lock() - fs.multiparts.ActiveSession[object] = &deserializedMultipartSession + fs.multiparts.ActiveSession[object] = deserializedMultipartSession fs.rwLock.Unlock() // Sort by part number before saving. sort.Sort(partNumber(deserializedMultipartSession.Parts)) - - multipartSessionBytes, e = json.Marshal(deserializedMultipartSession) - if e != nil { - return "", probe.NewError(e) - } - if e = ioutil.WriteFile(partPathPrefix+"$multiparts", multipartSessionBytes, 0600); e != nil { - return "", probe.NewError(e) + if err := saveMultipartsSession(*fs.multiparts); err != nil { + return "", err.Trace(partPathPrefix) } + return partMetadata.ETag, nil } @@ -486,10 +472,8 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da file.CloseAndPurge() return ObjectMetadata{}, err.Trace(partPathPrefix) } - if err := removeParts(partPathPrefix, savedParts); err != nil { - file.CloseAndPurge() - return ObjectMetadata{}, err.Trace(partPathPrefix) - } + // Successfully saved, remove all parts. + removeParts(partPathPrefix, savedParts) fs.rwLock.Lock() delete(fs.multiparts.ActiveSession, object) @@ -561,18 +545,13 @@ func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectReso return ObjectResourcesMetadata{}, probe.NewError(e) } - objectPath := filepath.Join(bucketPath, object) - partPathPrefix := objectPath + resources.UploadID - multipartSessionBytes, e := ioutil.ReadFile(partPathPrefix + "$multiparts") - if e != nil { - return ObjectResourcesMetadata{}, probe.NewError(e) - } - var deserializedMultipartSession MultipartSession - if e = json.Unmarshal(multipartSessionBytes, &deserializedMultipartSession); e != nil { - return ObjectResourcesMetadata{}, probe.NewError(e) + fs.rwLock.RLock() + deserializedMultipartSession, ok := fs.multiparts.ActiveSession[object] + fs.rwLock.RUnlock() + if !ok { + return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: resources.UploadID}) } - - var parts []*PartMetadata + var parts []PartMetadata for i := startPartNumber; i <= deserializedMultipartSession.TotalParts; i++ { if len(parts) > objectResourcesMetadata.MaxParts { sort.Sort(partNumber(parts)) diff --git a/pkg/fs/fs.go b/pkg/fs/fs.go index 8fb96feea..e2948265e 100644 --- a/pkg/fs/fs.go +++ b/pkg/fs/fs.go @@ -48,7 +48,7 @@ type MultipartSession struct { TotalParts int UploadID string Initiated time.Time - Parts []*PartMetadata + Parts []PartMetadata } // Multiparts collection of many parts