Merge pull request #1101 from harshavardhana/combine

multipart: Multipart session map now is based on uploadID.
master
Harshavardhana 9 years ago
commit 643ef30533
  1. 3
      pkg/fs/fs-bucket-listobjects.go
  2. 92
      pkg/fs/fs-multipart.go
  3. 2
      pkg/fs/fs-object.go
  4. 1
      pkg/fs/fs.go
  5. 10
      server_fs_test.go

@ -82,7 +82,8 @@ func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKe
if e != nil { if e != nil {
return e return e
} }
if strings.HasSuffix(path, "$multiparts") { // Skip special temporary files, kept for multipart transaction.
if strings.Contains(path, "$multiparts") || strings.Contains(path, "$tmpobject") {
return nil return nil
} }
// We don't need to list the walk path. // We don't need to list the walk path.

@ -42,17 +42,14 @@ import (
) )
// isValidUploadID - is upload id. // isValidUploadID - is upload id.
func (fs Filesystem) isValidUploadID(object, uploadID string) bool { func (fs Filesystem) isValidUploadID(object, uploadID string) (ok bool) {
fs.rwLock.RLock() fs.rwLock.RLock()
defer fs.rwLock.RUnlock() defer fs.rwLock.RUnlock()
s, ok := fs.multiparts.ActiveSession[object] _, ok = fs.multiparts.ActiveSession[uploadID]
if !ok { if !ok {
return false return
}
if uploadID == s.UploadID {
return true
} }
return false return
} }
// ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata // ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata
@ -73,40 +70,41 @@ func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipa
var uploads []*UploadMetadata var uploads []*UploadMetadata
fs.rwLock.RLock() fs.rwLock.RLock()
defer fs.rwLock.RUnlock() defer fs.rwLock.RUnlock()
for object, session := range fs.multiparts.ActiveSession { for uploadID, session := range fs.multiparts.ActiveSession {
if strings.HasPrefix(object, resources.Prefix) { objectName := session.ObjectName
if strings.HasPrefix(objectName, resources.Prefix) {
if len(uploads) > resources.MaxUploads { if len(uploads) > resources.MaxUploads {
sort.Sort(byUploadMetadataKey(uploads)) sort.Sort(byUploadMetadataKey(uploads))
resources.Upload = uploads resources.Upload = uploads
resources.NextKeyMarker = object resources.NextKeyMarker = session.ObjectName
resources.NextUploadIDMarker = session.UploadID resources.NextUploadIDMarker = uploadID
resources.IsTruncated = true resources.IsTruncated = true
return resources, nil return resources, nil
} }
// UploadIDMarker is ignored if KeyMarker is empty. // UploadIDMarker is ignored if KeyMarker is empty.
switch { switch {
case resources.KeyMarker != "" && resources.UploadIDMarker == "": case resources.KeyMarker != "" && resources.UploadIDMarker == "":
if object > resources.KeyMarker { if objectName > resources.KeyMarker {
upload := new(UploadMetadata) upload := new(UploadMetadata)
upload.Object = object upload.Object = objectName
upload.UploadID = session.UploadID upload.UploadID = uploadID
upload.Initiated = session.Initiated upload.Initiated = session.Initiated
uploads = append(uploads, upload) uploads = append(uploads, upload)
} }
case resources.KeyMarker != "" && resources.UploadIDMarker != "": case resources.KeyMarker != "" && resources.UploadIDMarker != "":
if session.UploadID > resources.UploadIDMarker { if session.UploadID > resources.UploadIDMarker {
if object >= resources.KeyMarker { if objectName >= resources.KeyMarker {
upload := new(UploadMetadata) upload := new(UploadMetadata)
upload.Object = object upload.Object = objectName
upload.UploadID = session.UploadID upload.UploadID = uploadID
upload.Initiated = session.Initiated upload.Initiated = session.Initiated
uploads = append(uploads, upload) uploads = append(uploads, upload)
} }
} }
default: default:
upload := new(UploadMetadata) upload := new(UploadMetadata)
upload.Object = object upload.Object = objectName
upload.UploadID = session.UploadID upload.UploadID = uploadID
upload.Initiated = session.Initiated upload.Initiated = session.Initiated
uploads = append(uploads, upload) uploads = append(uploads, upload)
} }
@ -158,7 +156,7 @@ func removeParts(partPathPrefix string, parts []PartMetadata) *probe.Error {
for _, part := range parts { for _, part := range parts {
// We are on purpose ignoring the return values here, since // We are on purpose ignoring the return values here, since
// another thread would have purged these entries. // another thread would have purged these entries.
os.Remove(partPathPrefix + fmt.Sprintf("$%d-$multiparts", part.PartNumber)) os.Remove(partPathPrefix + part.ETag + fmt.Sprintf("$%d-$multiparts", part.PartNumber))
} }
return nil return nil
} }
@ -168,7 +166,9 @@ func saveParts(partPathPrefix string, mw io.Writer, parts []CompletePart) *probe
var partReaders []io.Reader var partReaders []io.Reader
var partClosers []io.Closer var partClosers []io.Closer
for _, part := range parts { for _, part := range parts {
partFile, e := os.OpenFile(partPathPrefix+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600) md5Sum := strings.TrimPrefix(part.ETag, "\"")
md5Sum = strings.TrimSuffix(md5Sum, "\"")
partFile, e := os.OpenFile(partPathPrefix+md5Sum+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600)
if e != nil { if e != nil {
return probe.NewError(e) return probe.NewError(e)
} }
@ -236,21 +236,22 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E
uploadIDSum := sha512.Sum512(id) uploadIDSum := sha512.Sum512(id)
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47] uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47]
fs.rwLock.Lock()
// Initialize multipart session. // Initialize multipart session.
mpartSession := &MultipartSession{} mpartSession := &MultipartSession{}
mpartSession.TotalParts = 0 mpartSession.TotalParts = 0
mpartSession.ObjectName = object
mpartSession.UploadID = uploadID mpartSession.UploadID = uploadID
mpartSession.Initiated = time.Now().UTC() mpartSession.Initiated = time.Now().UTC()
var parts []PartMetadata var parts []PartMetadata
mpartSession.Parts = parts mpartSession.Parts = parts
fs.rwLock.Lock() fs.multiparts.ActiveSession[uploadID] = mpartSession
fs.multiparts.ActiveSession[object] = mpartSession
fs.rwLock.Unlock()
if err := saveMultipartsSession(*fs.multiparts); err != nil { if err := saveMultipartsSession(*fs.multiparts); err != nil {
fs.rwLock.Unlock()
return "", err.Trace(objectPath) return "", err.Trace(objectPath)
} }
fs.rwLock.Unlock()
return uploadID, nil return uploadID, nil
} }
@ -317,7 +318,7 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
objectPath := filepath.Join(bucketPath, object) objectPath := filepath.Join(bucketPath, object)
partPathPrefix := objectPath + uploadID partPathPrefix := objectPath + uploadID
partPath := partPathPrefix + fmt.Sprintf("$%d-$multiparts", partID) partPath := partPathPrefix + expectedMD5Sum + fmt.Sprintf("$%d-$multiparts", partID)
partFile, e := atomic.FileCreateWithPrefix(partPath, "$multiparts") partFile, e := atomic.FileCreateWithPrefix(partPath, "$multiparts")
if e != nil { if e != nil {
return "", probe.NewError(e) return "", probe.NewError(e)
@ -364,7 +365,7 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
partMetadata.LastModified = fi.ModTime() partMetadata.LastModified = fi.ModTime()
fs.rwLock.RLock() fs.rwLock.RLock()
deserializedMultipartSession, ok := fs.multiparts.ActiveSession[object] deserializedMultipartSession, ok := fs.multiparts.ActiveSession[uploadID]
fs.rwLock.RUnlock() fs.rwLock.RUnlock()
if !ok { if !ok {
return "", probe.NewError(InvalidUploadID{UploadID: uploadID}) return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
@ -378,16 +379,16 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
deserializedMultipartSession.Parts[partID-1] = partMetadata deserializedMultipartSession.Parts[partID-1] = partMetadata
} }
deserializedMultipartSession.TotalParts = len(deserializedMultipartSession.Parts) deserializedMultipartSession.TotalParts = len(deserializedMultipartSession.Parts)
fs.rwLock.Lock()
fs.multiparts.ActiveSession[object] = deserializedMultipartSession
fs.rwLock.Unlock()
// Sort by part number before saving. // Sort by part number before saving.
sort.Sort(partNumber(deserializedMultipartSession.Parts)) sort.Sort(partNumber(deserializedMultipartSession.Parts))
fs.rwLock.Lock()
fs.multiparts.ActiveSession[uploadID] = deserializedMultipartSession
if err := saveMultipartsSession(*fs.multiparts); err != nil { if err := saveMultipartsSession(*fs.multiparts); err != nil {
fs.rwLock.Unlock()
return "", err.Trace(partPathPrefix) return "", err.Trace(partPathPrefix)
} }
fs.rwLock.Unlock()
return partMetadata.ETag, nil return partMetadata.ETag, nil
} }
@ -420,7 +421,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
} }
objectPath := filepath.Join(bucketPath, object) objectPath := filepath.Join(bucketPath, object)
file, e := atomic.FileCreateWithPrefix(objectPath, "") file, e := atomic.FileCreateWithPrefix(objectPath, "$tmpobject")
if e != nil { if e != nil {
return ObjectMetadata{}, probe.NewError(e) return ObjectMetadata{}, probe.NewError(e)
} }
@ -459,7 +460,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
parts := completeMultipartUpload.Part parts := completeMultipartUpload.Part
fs.rwLock.RLock() fs.rwLock.RLock()
savedParts := fs.multiparts.ActiveSession[object].Parts savedParts := fs.multiparts.ActiveSession[uploadID].Parts
fs.rwLock.RUnlock() fs.rwLock.RUnlock()
if !doPartsMatch(parts, savedParts) { if !doPartsMatch(parts, savedParts) {
@ -476,13 +477,14 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
removeParts(partPathPrefix, savedParts) removeParts(partPathPrefix, savedParts)
fs.rwLock.Lock() fs.rwLock.Lock()
delete(fs.multiparts.ActiveSession, object) delete(fs.multiparts.ActiveSession, uploadID)
fs.rwLock.Unlock()
if err := saveMultipartsSession(*fs.multiparts); err != nil { if err := saveMultipartsSession(*fs.multiparts); err != nil {
fs.rwLock.Unlock()
file.CloseAndPurge() file.CloseAndPurge()
return ObjectMetadata{}, err.Trace(partPathPrefix) return ObjectMetadata{}, err.Trace(partPathPrefix)
} }
fs.rwLock.Unlock()
file.Close() file.Close()
st, e := os.Stat(objectPath) st, e := os.Stat(objectPath)
@ -519,9 +521,12 @@ func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectReso
return ObjectResourcesMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) return ObjectResourcesMetadata{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
} }
// Save upload id.
uploadID := resources.UploadID
// Verify if upload id is valid for incoming object. // Verify if upload id is valid for incoming object.
if !fs.isValidUploadID(object, resources.UploadID) { if !fs.isValidUploadID(object, uploadID) {
return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: resources.UploadID}) return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
} }
objectResourcesMetadata := resources objectResourcesMetadata := resources
@ -546,7 +551,7 @@ func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectReso
} }
fs.rwLock.RLock() fs.rwLock.RLock()
deserializedMultipartSession, ok := fs.multiparts.ActiveSession[object] deserializedMultipartSession, ok := fs.multiparts.ActiveSession[uploadID]
fs.rwLock.RUnlock() fs.rwLock.RUnlock()
if !ok { if !ok {
return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: resources.UploadID}) return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: resources.UploadID})
@ -596,7 +601,7 @@ func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *prob
objectPath := filepath.Join(bucketPath, object) objectPath := filepath.Join(bucketPath, object)
partPathPrefix := objectPath + uploadID partPathPrefix := objectPath + uploadID
fs.rwLock.RLock() fs.rwLock.RLock()
savedParts := fs.multiparts.ActiveSession[object].Parts savedParts := fs.multiparts.ActiveSession[uploadID].Parts
fs.rwLock.RUnlock() fs.rwLock.RUnlock()
if err := removeParts(partPathPrefix, savedParts); err != nil { if err := removeParts(partPathPrefix, savedParts); err != nil {
@ -604,10 +609,11 @@ func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *prob
} }
fs.rwLock.Lock() fs.rwLock.Lock()
delete(fs.multiparts.ActiveSession, object) delete(fs.multiparts.ActiveSession, uploadID)
fs.rwLock.Unlock()
if err := saveMultipartsSession(*fs.multiparts); err != nil { if err := saveMultipartsSession(*fs.multiparts); err != nil {
fs.rwLock.Unlock()
return err.Trace(partPathPrefix) return err.Trace(partPathPrefix)
} }
fs.rwLock.Unlock()
return nil return nil
} }

@ -237,7 +237,7 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in
} }
// Write object. // Write object.
file, e := atomic.FileCreateWithPrefix(objectPath, "") file, e := atomic.FileCreateWithPrefix(objectPath, "$tmpobject")
if e != nil { if e != nil {
switch e := e.(type) { switch e := e.(type) {
case *os.PathError: case *os.PathError:

@ -46,6 +46,7 @@ type Buckets struct {
// MultipartSession holds active session information // MultipartSession holds active session information
type MultipartSession struct { type MultipartSession struct {
TotalParts int TotalParts int
ObjectName string
UploadID string UploadID string
Initiated time.Time Initiated time.Time
Parts []PartMetadata Parts []PartMetadata

@ -18,6 +18,7 @@ package main
import ( import (
"bytes" "bytes"
"crypto/md5"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
@ -25,6 +26,7 @@ import (
"strings" "strings"
"time" "time"
"encoding/base64"
"encoding/hex" "encoding/hex"
"encoding/xml" "encoding/xml"
"net/http" "net/http"
@ -1075,8 +1077,13 @@ func (s *MyAPIFSCacheSuite) TestObjectMultipart(c *C) {
c.Assert(len(newResponse.UploadID) > 0, Equals, true) c.Assert(len(newResponse.UploadID) > 0, Equals, true)
uploadID := newResponse.UploadID uploadID := newResponse.UploadID
hasher := md5.New()
hasher.Write([]byte("hello world"))
md5Sum := hasher.Sum(nil)
buffer1 := bytes.NewReader([]byte("hello world")) buffer1 := bytes.NewReader([]byte("hello world"))
request, err = s.newRequest("PUT", testAPIFSCacheServer.URL+"/objectmultiparts/object?uploadId="+uploadID+"&partNumber=1", int64(buffer1.Len()), buffer1) request, err = s.newRequest("PUT", testAPIFSCacheServer.URL+"/objectmultiparts/object?uploadId="+uploadID+"&partNumber=1", int64(buffer1.Len()), buffer1)
request.Header.Set("Content-MD5", base64.StdEncoding.EncodeToString(md5Sum))
c.Assert(err, IsNil) c.Assert(err, IsNil)
client = http.Client{} client = http.Client{}
@ -1086,6 +1093,7 @@ func (s *MyAPIFSCacheSuite) TestObjectMultipart(c *C) {
buffer2 := bytes.NewReader([]byte("hello world")) buffer2 := bytes.NewReader([]byte("hello world"))
request, err = s.newRequest("PUT", testAPIFSCacheServer.URL+"/objectmultiparts/object?uploadId="+uploadID+"&partNumber=2", int64(buffer2.Len()), buffer2) request, err = s.newRequest("PUT", testAPIFSCacheServer.URL+"/objectmultiparts/object?uploadId="+uploadID+"&partNumber=2", int64(buffer2.Len()), buffer2)
request.Header.Set("Content-MD5", base64.StdEncoding.EncodeToString(md5Sum))
c.Assert(err, IsNil) c.Assert(err, IsNil)
client = http.Client{} client = http.Client{}
@ -1093,7 +1101,7 @@ func (s *MyAPIFSCacheSuite) TestObjectMultipart(c *C) {
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(response2.StatusCode, Equals, http.StatusOK) c.Assert(response2.StatusCode, Equals, http.StatusOK)
// complete multipart upload // Complete multipart upload
completeUploads := &fs.CompleteMultipartUpload{ completeUploads := &fs.CompleteMultipartUpload{
Part: []fs.CompletePart{ Part: []fs.CompletePart{
{ {

Loading…
Cancel
Save