multipart: Multipart session map now is based on uploadID.

- Fixes initiating parallel uploads, and configs being quickly
  re-written by another incoming request.
- Parallel uploads work smoothly now and return expected behavior.
master
Harshavardhana 9 years ago
parent 3f5804f75a
commit 8df201ef30
  1. 3
      pkg/fs/fs-bucket-listobjects.go
  2. 92
      pkg/fs/fs-multipart.go
  3. 2
      pkg/fs/fs-object.go
  4. 1
      pkg/fs/fs.go
  5. 10
      server_fs_test.go

@ -82,7 +82,8 @@ func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKe
if e != nil {
return e
}
if strings.HasSuffix(path, "$multiparts") {
// Skip special temporary files, kept for multipart transaction.
if strings.Contains(path, "$multiparts") || strings.Contains(path, "$tmpobject") {
return nil
}
// We don't need to list the walk path.

@ -42,17 +42,14 @@ import (
)
// isValidUploadID - is upload id.
func (fs Filesystem) isValidUploadID(object, uploadID string) bool {
func (fs Filesystem) isValidUploadID(object, uploadID string) (ok bool) {
fs.rwLock.RLock()
defer fs.rwLock.RUnlock()
s, ok := fs.multiparts.ActiveSession[object]
_, ok = fs.multiparts.ActiveSession[uploadID]
if !ok {
return false
}
if uploadID == s.UploadID {
return true
return
}
return false
return
}
// ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata
@ -73,40 +70,41 @@ func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipa
var uploads []*UploadMetadata
fs.rwLock.RLock()
defer fs.rwLock.RUnlock()
for object, session := range fs.multiparts.ActiveSession {
if strings.HasPrefix(object, resources.Prefix) {
for uploadID, session := range fs.multiparts.ActiveSession {
objectName := session.ObjectName
if strings.HasPrefix(objectName, resources.Prefix) {
if len(uploads) > resources.MaxUploads {
sort.Sort(byUploadMetadataKey(uploads))
resources.Upload = uploads
resources.NextKeyMarker = object
resources.NextUploadIDMarker = session.UploadID
resources.NextKeyMarker = session.ObjectName
resources.NextUploadIDMarker = uploadID
resources.IsTruncated = true
return resources, nil
}
// UploadIDMarker is ignored if KeyMarker is empty.
switch {
case resources.KeyMarker != "" && resources.UploadIDMarker == "":
if object > resources.KeyMarker {
if objectName > resources.KeyMarker {
upload := new(UploadMetadata)
upload.Object = object
upload.UploadID = session.UploadID
upload.Object = objectName
upload.UploadID = uploadID
upload.Initiated = session.Initiated
uploads = append(uploads, upload)
}
case resources.KeyMarker != "" && resources.UploadIDMarker != "":
if session.UploadID > resources.UploadIDMarker {
if object >= resources.KeyMarker {
if objectName >= resources.KeyMarker {
upload := new(UploadMetadata)
upload.Object = object
upload.UploadID = session.UploadID
upload.Object = objectName
upload.UploadID = uploadID
upload.Initiated = session.Initiated
uploads = append(uploads, upload)
}
}
default:
upload := new(UploadMetadata)
upload.Object = object
upload.UploadID = session.UploadID
upload.Object = objectName
upload.UploadID = uploadID
upload.Initiated = session.Initiated
uploads = append(uploads, upload)
}
@ -158,7 +156,7 @@ func removeParts(partPathPrefix string, parts []PartMetadata) *probe.Error {
for _, part := range parts {
// We are on purpose ignoring the return values here, since
// another thread would have purged these entries.
os.Remove(partPathPrefix + fmt.Sprintf("$%d-$multiparts", part.PartNumber))
os.Remove(partPathPrefix + part.ETag + fmt.Sprintf("$%d-$multiparts", part.PartNumber))
}
return nil
}
@ -168,7 +166,9 @@ func saveParts(partPathPrefix string, mw io.Writer, parts []CompletePart) *probe
var partReaders []io.Reader
var partClosers []io.Closer
for _, part := range parts {
partFile, e := os.OpenFile(partPathPrefix+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600)
md5Sum := strings.TrimPrefix(part.ETag, "\"")
md5Sum = strings.TrimSuffix(md5Sum, "\"")
partFile, e := os.OpenFile(partPathPrefix+md5Sum+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600)
if e != nil {
return probe.NewError(e)
}
@ -236,21 +236,22 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E
uploadIDSum := sha512.Sum512(id)
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47]
fs.rwLock.Lock()
// Initialize multipart session.
mpartSession := &MultipartSession{}
mpartSession.TotalParts = 0
mpartSession.ObjectName = object
mpartSession.UploadID = uploadID
mpartSession.Initiated = time.Now().UTC()
var parts []PartMetadata
mpartSession.Parts = parts
fs.rwLock.Lock()
fs.multiparts.ActiveSession[object] = mpartSession
fs.rwLock.Unlock()
fs.multiparts.ActiveSession[uploadID] = mpartSession
if err := saveMultipartsSession(*fs.multiparts); err != nil {
fs.rwLock.Unlock()
return "", err.Trace(objectPath)
}
fs.rwLock.Unlock()
return uploadID, nil
}
@ -317,7 +318,7 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
objectPath := filepath.Join(bucketPath, object)
partPathPrefix := objectPath + uploadID
partPath := partPathPrefix + fmt.Sprintf("$%d-$multiparts", partID)
partPath := partPathPrefix + expectedMD5Sum + fmt.Sprintf("$%d-$multiparts", partID)
partFile, e := atomic.FileCreateWithPrefix(partPath, "$multiparts")
if e != nil {
return "", probe.NewError(e)
@ -364,7 +365,7 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
partMetadata.LastModified = fi.ModTime()
fs.rwLock.RLock()
deserializedMultipartSession, ok := fs.multiparts.ActiveSession[object]
deserializedMultipartSession, ok := fs.multiparts.ActiveSession[uploadID]
fs.rwLock.RUnlock()
if !ok {
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
@ -378,16 +379,16 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
deserializedMultipartSession.Parts[partID-1] = partMetadata
}
deserializedMultipartSession.TotalParts = len(deserializedMultipartSession.Parts)
fs.rwLock.Lock()
fs.multiparts.ActiveSession[object] = deserializedMultipartSession
fs.rwLock.Unlock()
// Sort by part number before saving.
sort.Sort(partNumber(deserializedMultipartSession.Parts))
fs.rwLock.Lock()
fs.multiparts.ActiveSession[uploadID] = deserializedMultipartSession
if err := saveMultipartsSession(*fs.multiparts); err != nil {
fs.rwLock.Unlock()
return "", err.Trace(partPathPrefix)
}
fs.rwLock.Unlock()
return partMetadata.ETag, nil
}
@ -420,7 +421,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
}
objectPath := filepath.Join(bucketPath, object)
file, e := atomic.FileCreateWithPrefix(objectPath, "")
file, e := atomic.FileCreateWithPrefix(objectPath, "$tmpobject")
if e != nil {
return ObjectMetadata{}, probe.NewError(e)
}
@ -459,7 +460,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
parts := completeMultipartUpload.Part
fs.rwLock.RLock()
savedParts := fs.multiparts.ActiveSession[object].Parts
savedParts := fs.multiparts.ActiveSession[uploadID].Parts
fs.rwLock.RUnlock()
if !doPartsMatch(parts, savedParts) {
@ -476,13 +477,14 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
removeParts(partPathPrefix, savedParts)
fs.rwLock.Lock()
delete(fs.multiparts.ActiveSession, object)
fs.rwLock.Unlock()
delete(fs.multiparts.ActiveSession, uploadID)
if err := saveMultipartsSession(*fs.multiparts); err != nil {
fs.rwLock.Unlock()
file.CloseAndPurge()
return ObjectMetadata{}, err.Trace(partPathPrefix)
}
fs.rwLock.Unlock()
file.Close()
st, e := os.Stat(objectPath)
@ -519,9 +521,12 @@ func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectReso
return ObjectResourcesMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// Save upload id.
uploadID := resources.UploadID
// Verify if upload id is valid for incoming object.
if !fs.isValidUploadID(object, resources.UploadID) {
return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: resources.UploadID})
if !fs.isValidUploadID(object, uploadID) {
return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
}
objectResourcesMetadata := resources
@ -546,7 +551,7 @@ func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectReso
}
fs.rwLock.RLock()
deserializedMultipartSession, ok := fs.multiparts.ActiveSession[object]
deserializedMultipartSession, ok := fs.multiparts.ActiveSession[uploadID]
fs.rwLock.RUnlock()
if !ok {
return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: resources.UploadID})
@ -596,7 +601,7 @@ func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *prob
objectPath := filepath.Join(bucketPath, object)
partPathPrefix := objectPath + uploadID
fs.rwLock.RLock()
savedParts := fs.multiparts.ActiveSession[object].Parts
savedParts := fs.multiparts.ActiveSession[uploadID].Parts
fs.rwLock.RUnlock()
if err := removeParts(partPathPrefix, savedParts); err != nil {
@ -604,10 +609,11 @@ func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *prob
}
fs.rwLock.Lock()
delete(fs.multiparts.ActiveSession, object)
fs.rwLock.Unlock()
delete(fs.multiparts.ActiveSession, uploadID)
if err := saveMultipartsSession(*fs.multiparts); err != nil {
fs.rwLock.Unlock()
return err.Trace(partPathPrefix)
}
fs.rwLock.Unlock()
return nil
}

@ -237,7 +237,7 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in
}
// Write object.
file, e := atomic.FileCreateWithPrefix(objectPath, "")
file, e := atomic.FileCreateWithPrefix(objectPath, "$tmpobject")
if e != nil {
switch e := e.(type) {
case *os.PathError:

@ -46,6 +46,7 @@ type Buckets struct {
// MultipartSession holds active session information
type MultipartSession struct {
TotalParts int
ObjectName string
UploadID string
Initiated time.Time
Parts []PartMetadata

@ -18,6 +18,7 @@ package main
import (
"bytes"
"crypto/md5"
"io"
"io/ioutil"
"os"
@ -25,6 +26,7 @@ import (
"strings"
"time"
"encoding/base64"
"encoding/hex"
"encoding/xml"
"net/http"
@ -1075,8 +1077,13 @@ func (s *MyAPIFSCacheSuite) TestObjectMultipart(c *C) {
c.Assert(len(newResponse.UploadID) > 0, Equals, true)
uploadID := newResponse.UploadID
hasher := md5.New()
hasher.Write([]byte("hello world"))
md5Sum := hasher.Sum(nil)
buffer1 := bytes.NewReader([]byte("hello world"))
request, err = s.newRequest("PUT", testAPIFSCacheServer.URL+"/objectmultiparts/object?uploadId="+uploadID+"&partNumber=1", int64(buffer1.Len()), buffer1)
request.Header.Set("Content-MD5", base64.StdEncoding.EncodeToString(md5Sum))
c.Assert(err, IsNil)
client = http.Client{}
@ -1086,6 +1093,7 @@ func (s *MyAPIFSCacheSuite) TestObjectMultipart(c *C) {
buffer2 := bytes.NewReader([]byte("hello world"))
request, err = s.newRequest("PUT", testAPIFSCacheServer.URL+"/objectmultiparts/object?uploadId="+uploadID+"&partNumber=2", int64(buffer2.Len()), buffer2)
request.Header.Set("Content-MD5", base64.StdEncoding.EncodeToString(md5Sum))
c.Assert(err, IsNil)
client = http.Client{}
@ -1093,7 +1101,7 @@ func (s *MyAPIFSCacheSuite) TestObjectMultipart(c *C) {
c.Assert(err, IsNil)
c.Assert(response2.StatusCode, Equals, http.StatusOK)
// complete multipart upload
// Complete multipart upload
completeUploads := &fs.CompleteMultipartUpload{
Part: []fs.CompletePart{
{

Loading…
Cancel
Save