Merge pull request #1100 from harshavardhana/multipart-resume

multipart: Multipart resume simplify further.
master
Harshavardhana 9 years ago
commit 3f5804f75a
  1. 2
      pkg/fs/definitions.go
  2. 85
      pkg/fs/fs-multipart.go
  3. 2
      pkg/fs/fs.go

@ -91,7 +91,7 @@ type ObjectResourcesMetadata struct {
MaxParts int MaxParts int
IsTruncated bool IsTruncated bool
Part []*PartMetadata Part []PartMetadata
EncodingType string EncodingType string
} }

@ -20,7 +20,6 @@ import (
"crypto/md5" "crypto/md5"
"encoding/base64" "encoding/base64"
"encoding/hex" "encoding/hex"
"encoding/json"
"encoding/xml" "encoding/xml"
"errors" "errors"
"fmt" "fmt"
@ -120,7 +119,7 @@ func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipa
// verify if parts sent over the network do really match with what we // verify if parts sent over the network do really match with what we
// have for the session. // have for the session.
func doPartsMatch(parts []CompletePart, savedParts []*PartMetadata) bool { func doPartsMatch(parts []CompletePart, savedParts []PartMetadata) bool {
if parts == nil || savedParts == nil { if parts == nil || savedParts == nil {
return false return false
} }
@ -155,14 +154,11 @@ func MultiCloser(closers ...io.Closer) io.Closer {
} }
// removeParts - remove all parts. // removeParts - remove all parts.
func removeParts(partPathPrefix string, parts []*PartMetadata) *probe.Error { func removeParts(partPathPrefix string, parts []PartMetadata) *probe.Error {
for _, part := range parts { for _, part := range parts {
if e := os.Remove(partPathPrefix + fmt.Sprintf("$%d-$multiparts", part.PartNumber)); e != nil { // We are on purpose ignoring the return values here, since
return probe.NewError(e) // another thread would have purged these entries.
} os.Remove(partPathPrefix + fmt.Sprintf("$%d-$multiparts", part.PartNumber))
}
if e := os.Remove(partPathPrefix + "$multiparts"); e != nil {
return probe.NewError(e)
} }
return nil return nil
} }
@ -245,31 +241,21 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E
mpartSession.TotalParts = 0 mpartSession.TotalParts = 0
mpartSession.UploadID = uploadID mpartSession.UploadID = uploadID
mpartSession.Initiated = time.Now().UTC() mpartSession.Initiated = time.Now().UTC()
var parts []*PartMetadata var parts []PartMetadata
mpartSession.Parts = parts mpartSession.Parts = parts
fs.rwLock.Lock() fs.rwLock.Lock()
fs.multiparts.ActiveSession[object] = mpartSession fs.multiparts.ActiveSession[object] = mpartSession
fs.rwLock.Unlock() fs.rwLock.Unlock()
mpartSessionBytes, e := json.Marshal(mpartSession)
if e != nil {
return "", probe.NewError(e)
}
partPathPrefix := objectPath + uploadID
if e = ioutil.WriteFile(partPathPrefix+"$multiparts", mpartSessionBytes, 0600); e != nil {
return "", probe.NewError(e)
}
if err := saveMultipartsSession(*fs.multiparts); err != nil { if err := saveMultipartsSession(*fs.multiparts); err != nil {
return "", err.Trace(partPathPrefix) return "", err.Trace(objectPath)
} }
return uploadID, nil return uploadID, nil
} }
// partNumber is a sortable interface for Part slice. // partNumber is a sortable interface for Part slice.
type partNumber []*PartMetadata type partNumber []PartMetadata
func (a partNumber) Len() int { return len(a) } func (a partNumber) Len() int { return len(a) }
func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
@ -377,32 +363,32 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
partMetadata.Size = fi.Size() partMetadata.Size = fi.Size()
partMetadata.LastModified = fi.ModTime() partMetadata.LastModified = fi.ModTime()
multipartSessionBytes, e := ioutil.ReadFile(partPathPrefix + "$multiparts") fs.rwLock.RLock()
if e != nil { deserializedMultipartSession, ok := fs.multiparts.ActiveSession[object]
return "", probe.NewError(e) fs.rwLock.RUnlock()
if !ok {
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
} }
var deserializedMultipartSession MultipartSession // Append any pre-existing partNumber with new metadata, otherwise
if e = json.Unmarshal(multipartSessionBytes, &deserializedMultipartSession); e != nil { // append to the list.
return "", probe.NewError(e) if len(deserializedMultipartSession.Parts) < partID {
deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, partMetadata)
} else {
deserializedMultipartSession.Parts[partID-1] = partMetadata
} }
deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, &partMetadata) deserializedMultipartSession.TotalParts = len(deserializedMultipartSession.Parts)
deserializedMultipartSession.TotalParts++
fs.rwLock.Lock() fs.rwLock.Lock()
fs.multiparts.ActiveSession[object] = &deserializedMultipartSession fs.multiparts.ActiveSession[object] = deserializedMultipartSession
fs.rwLock.Unlock() fs.rwLock.Unlock()
// Sort by part number before saving. // Sort by part number before saving.
sort.Sort(partNumber(deserializedMultipartSession.Parts)) sort.Sort(partNumber(deserializedMultipartSession.Parts))
if err := saveMultipartsSession(*fs.multiparts); err != nil {
multipartSessionBytes, e = json.Marshal(deserializedMultipartSession) return "", err.Trace(partPathPrefix)
if e != nil {
return "", probe.NewError(e)
}
if e = ioutil.WriteFile(partPathPrefix+"$multiparts", multipartSessionBytes, 0600); e != nil {
return "", probe.NewError(e)
} }
return partMetadata.ETag, nil return partMetadata.ETag, nil
} }
@ -486,10 +472,8 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
file.CloseAndPurge() file.CloseAndPurge()
return ObjectMetadata{}, err.Trace(partPathPrefix) return ObjectMetadata{}, err.Trace(partPathPrefix)
} }
if err := removeParts(partPathPrefix, savedParts); err != nil { // Successfully saved, remove all parts.
file.CloseAndPurge() removeParts(partPathPrefix, savedParts)
return ObjectMetadata{}, err.Trace(partPathPrefix)
}
fs.rwLock.Lock() fs.rwLock.Lock()
delete(fs.multiparts.ActiveSession, object) delete(fs.multiparts.ActiveSession, object)
@ -561,18 +545,13 @@ func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectReso
return ObjectResourcesMetadata{}, probe.NewError(e) return ObjectResourcesMetadata{}, probe.NewError(e)
} }
objectPath := filepath.Join(bucketPath, object) fs.rwLock.RLock()
partPathPrefix := objectPath + resources.UploadID deserializedMultipartSession, ok := fs.multiparts.ActiveSession[object]
multipartSessionBytes, e := ioutil.ReadFile(partPathPrefix + "$multiparts") fs.rwLock.RUnlock()
if e != nil { if !ok {
return ObjectResourcesMetadata{}, probe.NewError(e) return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: resources.UploadID})
}
var deserializedMultipartSession MultipartSession
if e = json.Unmarshal(multipartSessionBytes, &deserializedMultipartSession); e != nil {
return ObjectResourcesMetadata{}, probe.NewError(e)
} }
var parts []PartMetadata
var parts []*PartMetadata
for i := startPartNumber; i <= deserializedMultipartSession.TotalParts; i++ { for i := startPartNumber; i <= deserializedMultipartSession.TotalParts; i++ {
if len(parts) > objectResourcesMetadata.MaxParts { if len(parts) > objectResourcesMetadata.MaxParts {
sort.Sort(partNumber(parts)) sort.Sort(partNumber(parts))

@ -48,7 +48,7 @@ type MultipartSession struct {
TotalParts int TotalParts int
UploadID string UploadID string
Initiated time.Time Initiated time.Time
Parts []*PartMetadata Parts []PartMetadata
} }
// Multiparts collection of many parts // Multiparts collection of many parts

Loading…
Cancel
Save