From 8df201ef30aef7b0f937fdf957faba873dea9a45 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 5 Feb 2016 20:05:56 -0800 Subject: [PATCH] multipart: Multipart session map now is based on uploadID. - Fixes initiating parallel uploads, and configs being quickly re-written by another incoming request. - Parallel uploads work smoothly now and return expected behavior. --- pkg/fs/fs-bucket-listobjects.go | 3 +- pkg/fs/fs-multipart.go | 92 ++++++++++++++++++--------------- pkg/fs/fs-object.go | 2 +- pkg/fs/fs.go | 1 + server_fs_test.go | 10 +++- 5 files changed, 62 insertions(+), 46 deletions(-) diff --git a/pkg/fs/fs-bucket-listobjects.go b/pkg/fs/fs-bucket-listobjects.go index cc9121f57..6ba31e92f 100644 --- a/pkg/fs/fs-bucket-listobjects.go +++ b/pkg/fs/fs-bucket-listobjects.go @@ -82,7 +82,8 @@ func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKe if e != nil { return e } - if strings.HasSuffix(path, "$multiparts") { + // Skip special temporary files, kept for multipart transaction. + if strings.Contains(path, "$multiparts") || strings.Contains(path, "$tmpobject") { return nil } // We don't need to list the walk path. diff --git a/pkg/fs/fs-multipart.go b/pkg/fs/fs-multipart.go index 28421f3e7..71125dc32 100644 --- a/pkg/fs/fs-multipart.go +++ b/pkg/fs/fs-multipart.go @@ -42,17 +42,14 @@ import ( ) // isValidUploadID - is upload id. -func (fs Filesystem) isValidUploadID(object, uploadID string) bool { +func (fs Filesystem) isValidUploadID(object, uploadID string) (ok bool) { fs.rwLock.RLock() defer fs.rwLock.RUnlock() - s, ok := fs.multiparts.ActiveSession[object] + _, ok = fs.multiparts.ActiveSession[uploadID] if !ok { - return false - } - if uploadID == s.UploadID { - return true + return } - return false + return } // ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata @@ -73,40 +70,41 @@ func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipa var uploads []*UploadMetadata fs.rwLock.RLock() defer fs.rwLock.RUnlock() - for object, session := range fs.multiparts.ActiveSession { - if strings.HasPrefix(object, resources.Prefix) { + for uploadID, session := range fs.multiparts.ActiveSession { + objectName := session.ObjectName + if strings.HasPrefix(objectName, resources.Prefix) { if len(uploads) > resources.MaxUploads { sort.Sort(byUploadMetadataKey(uploads)) resources.Upload = uploads - resources.NextKeyMarker = object - resources.NextUploadIDMarker = session.UploadID + resources.NextKeyMarker = session.ObjectName + resources.NextUploadIDMarker = uploadID resources.IsTruncated = true return resources, nil } // UploadIDMarker is ignored if KeyMarker is empty. switch { case resources.KeyMarker != "" && resources.UploadIDMarker == "": - if object > resources.KeyMarker { + if objectName > resources.KeyMarker { upload := new(UploadMetadata) - upload.Object = object - upload.UploadID = session.UploadID + upload.Object = objectName + upload.UploadID = uploadID upload.Initiated = session.Initiated uploads = append(uploads, upload) } case resources.KeyMarker != "" && resources.UploadIDMarker != "": if session.UploadID > resources.UploadIDMarker { - if object >= resources.KeyMarker { + if objectName >= resources.KeyMarker { upload := new(UploadMetadata) - upload.Object = object - upload.UploadID = session.UploadID + upload.Object = objectName + upload.UploadID = uploadID upload.Initiated = session.Initiated uploads = append(uploads, upload) } } default: upload := new(UploadMetadata) - upload.Object = object - upload.UploadID = session.UploadID + upload.Object = objectName + upload.UploadID = uploadID upload.Initiated = session.Initiated uploads = append(uploads, upload) } @@ -158,7 +156,7 @@ func removeParts(partPathPrefix string, parts []PartMetadata) *probe.Error { for _, part := range parts { // We are on purpose ignoring the return values here, since // another thread would have purged these entries. - os.Remove(partPathPrefix + fmt.Sprintf("$%d-$multiparts", part.PartNumber)) + os.Remove(partPathPrefix + part.ETag + fmt.Sprintf("$%d-$multiparts", part.PartNumber)) } return nil } @@ -168,7 +166,9 @@ func saveParts(partPathPrefix string, mw io.Writer, parts []CompletePart) *probe var partReaders []io.Reader var partClosers []io.Closer for _, part := range parts { - partFile, e := os.OpenFile(partPathPrefix+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600) + md5Sum := strings.TrimPrefix(part.ETag, "\"") + md5Sum = strings.TrimSuffix(md5Sum, "\"") + partFile, e := os.OpenFile(partPathPrefix+md5Sum+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600) if e != nil { return probe.NewError(e) } @@ -236,21 +236,22 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E uploadIDSum := sha512.Sum512(id) uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47] + fs.rwLock.Lock() // Initialize multipart session. mpartSession := &MultipartSession{} mpartSession.TotalParts = 0 + mpartSession.ObjectName = object mpartSession.UploadID = uploadID mpartSession.Initiated = time.Now().UTC() var parts []PartMetadata mpartSession.Parts = parts - fs.rwLock.Lock() - fs.multiparts.ActiveSession[object] = mpartSession - fs.rwLock.Unlock() - + fs.multiparts.ActiveSession[uploadID] = mpartSession if err := saveMultipartsSession(*fs.multiparts); err != nil { + fs.rwLock.Unlock() return "", err.Trace(objectPath) } + fs.rwLock.Unlock() return uploadID, nil } @@ -317,7 +318,7 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s objectPath := filepath.Join(bucketPath, object) partPathPrefix := objectPath + uploadID - partPath := partPathPrefix + fmt.Sprintf("$%d-$multiparts", partID) + partPath := partPathPrefix + expectedMD5Sum + fmt.Sprintf("$%d-$multiparts", partID) partFile, e := atomic.FileCreateWithPrefix(partPath, "$multiparts") if e != nil { return "", probe.NewError(e) @@ -364,7 +365,7 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s partMetadata.LastModified = fi.ModTime() fs.rwLock.RLock() - deserializedMultipartSession, ok := fs.multiparts.ActiveSession[object] + deserializedMultipartSession, ok := fs.multiparts.ActiveSession[uploadID] fs.rwLock.RUnlock() if !ok { return "", probe.NewError(InvalidUploadID{UploadID: uploadID}) @@ -378,16 +379,16 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s deserializedMultipartSession.Parts[partID-1] = partMetadata } deserializedMultipartSession.TotalParts = len(deserializedMultipartSession.Parts) - - fs.rwLock.Lock() - fs.multiparts.ActiveSession[object] = deserializedMultipartSession - fs.rwLock.Unlock() - // Sort by part number before saving. sort.Sort(partNumber(deserializedMultipartSession.Parts)) + + fs.rwLock.Lock() + fs.multiparts.ActiveSession[uploadID] = deserializedMultipartSession if err := saveMultipartsSession(*fs.multiparts); err != nil { + fs.rwLock.Unlock() return "", err.Trace(partPathPrefix) } + fs.rwLock.Unlock() return partMetadata.ETag, nil } @@ -420,7 +421,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da } objectPath := filepath.Join(bucketPath, object) - file, e := atomic.FileCreateWithPrefix(objectPath, "") + file, e := atomic.FileCreateWithPrefix(objectPath, "$tmpobject") if e != nil { return ObjectMetadata{}, probe.NewError(e) } @@ -459,7 +460,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da parts := completeMultipartUpload.Part fs.rwLock.RLock() - savedParts := fs.multiparts.ActiveSession[object].Parts + savedParts := fs.multiparts.ActiveSession[uploadID].Parts fs.rwLock.RUnlock() if !doPartsMatch(parts, savedParts) { @@ -476,13 +477,14 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da removeParts(partPathPrefix, savedParts) fs.rwLock.Lock() - delete(fs.multiparts.ActiveSession, object) - fs.rwLock.Unlock() - + delete(fs.multiparts.ActiveSession, uploadID) if err := saveMultipartsSession(*fs.multiparts); err != nil { + fs.rwLock.Unlock() file.CloseAndPurge() return ObjectMetadata{}, err.Trace(partPathPrefix) } + fs.rwLock.Unlock() + file.Close() st, e := os.Stat(objectPath) @@ -519,9 +521,12 @@ func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectReso return ObjectResourcesMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) } + // Save upload id. + uploadID := resources.UploadID + // Verify if upload id is valid for incoming object. - if !fs.isValidUploadID(object, resources.UploadID) { - return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: resources.UploadID}) + if !fs.isValidUploadID(object, uploadID) { + return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: uploadID}) } objectResourcesMetadata := resources @@ -546,7 +551,7 @@ func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectReso } fs.rwLock.RLock() - deserializedMultipartSession, ok := fs.multiparts.ActiveSession[object] + deserializedMultipartSession, ok := fs.multiparts.ActiveSession[uploadID] fs.rwLock.RUnlock() if !ok { return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: resources.UploadID}) @@ -596,7 +601,7 @@ func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *prob objectPath := filepath.Join(bucketPath, object) partPathPrefix := objectPath + uploadID fs.rwLock.RLock() - savedParts := fs.multiparts.ActiveSession[object].Parts + savedParts := fs.multiparts.ActiveSession[uploadID].Parts fs.rwLock.RUnlock() if err := removeParts(partPathPrefix, savedParts); err != nil { @@ -604,10 +609,11 @@ func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *prob } fs.rwLock.Lock() - delete(fs.multiparts.ActiveSession, object) - fs.rwLock.Unlock() + delete(fs.multiparts.ActiveSession, uploadID) if err := saveMultipartsSession(*fs.multiparts); err != nil { + fs.rwLock.Unlock() return err.Trace(partPathPrefix) } + fs.rwLock.Unlock() return nil } diff --git a/pkg/fs/fs-object.go b/pkg/fs/fs-object.go index 1f41da6c3..b38d37fe0 100644 --- a/pkg/fs/fs-object.go +++ b/pkg/fs/fs-object.go @@ -237,7 +237,7 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in } // Write object. - file, e := atomic.FileCreateWithPrefix(objectPath, "") + file, e := atomic.FileCreateWithPrefix(objectPath, "$tmpobject") if e != nil { switch e := e.(type) { case *os.PathError: diff --git a/pkg/fs/fs.go b/pkg/fs/fs.go index e2948265e..7dafdd6e2 100644 --- a/pkg/fs/fs.go +++ b/pkg/fs/fs.go @@ -46,6 +46,7 @@ type Buckets struct { // MultipartSession holds active session information type MultipartSession struct { TotalParts int + ObjectName string UploadID string Initiated time.Time Parts []PartMetadata diff --git a/server_fs_test.go b/server_fs_test.go index f54b3ef44..9aa97d33d 100644 --- a/server_fs_test.go +++ b/server_fs_test.go @@ -18,6 +18,7 @@ package main import ( "bytes" + "crypto/md5" "io" "io/ioutil" "os" @@ -25,6 +26,7 @@ import ( "strings" "time" + "encoding/base64" "encoding/hex" "encoding/xml" "net/http" @@ -1075,8 +1077,13 @@ func (s *MyAPIFSCacheSuite) TestObjectMultipart(c *C) { c.Assert(len(newResponse.UploadID) > 0, Equals, true) uploadID := newResponse.UploadID + hasher := md5.New() + hasher.Write([]byte("hello world")) + md5Sum := hasher.Sum(nil) + buffer1 := bytes.NewReader([]byte("hello world")) request, err = s.newRequest("PUT", testAPIFSCacheServer.URL+"/objectmultiparts/object?uploadId="+uploadID+"&partNumber=1", int64(buffer1.Len()), buffer1) + request.Header.Set("Content-MD5", base64.StdEncoding.EncodeToString(md5Sum)) c.Assert(err, IsNil) client = http.Client{} @@ -1086,6 +1093,7 @@ func (s *MyAPIFSCacheSuite) TestObjectMultipart(c *C) { buffer2 := bytes.NewReader([]byte("hello world")) request, err = s.newRequest("PUT", testAPIFSCacheServer.URL+"/objectmultiparts/object?uploadId="+uploadID+"&partNumber=2", int64(buffer2.Len()), buffer2) + request.Header.Set("Content-MD5", base64.StdEncoding.EncodeToString(md5Sum)) c.Assert(err, IsNil) client = http.Client{} @@ -1093,7 +1101,7 @@ func (s *MyAPIFSCacheSuite) TestObjectMultipart(c *C) { c.Assert(err, IsNil) c.Assert(response2.StatusCode, Equals, http.StatusOK) - // complete multipart upload + // Complete multipart upload completeUploads := &fs.CompleteMultipartUpload{ Part: []fs.CompletePart{ {