From d86ba5f33608acad407635be3e395dfe23fad11e Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sun, 31 May 2015 01:17:28 -0700 Subject: [PATCH] Implement multipart support - implemented NewMultipartUpload() - implemented CreateObjectPart() - implemented ListObjectParts() - implemented AbortMultipartUpload() - implemented CompleteMultipartUpload() --- pkg/storage/drivers/api_testsuite.go | 11 +- pkg/storage/drivers/fs/fs.go | 8 +- pkg/storage/drivers/fs/fs_multipart.go | 442 +++++++++++++++++- pkg/storage/drivers/fs/fs_object.go | 2 +- pkg/storage/drivers/fs/fs_test.go | 3 +- .../drivers/memory/memory_multipart.go | 1 + 6 files changed, 454 insertions(+), 13 deletions(-) diff --git a/pkg/storage/drivers/api_testsuite.go b/pkg/storage/drivers/api_testsuite.go index 5844ec955..7b6762d6a 100644 --- a/pkg/storage/drivers/api_testsuite.go +++ b/pkg/storage/drivers/api_testsuite.go @@ -48,6 +48,7 @@ func APITestSuite(c *check.C, create func() Driver) { testGetDirectoryReturnsObjectNotFound(c, create) testDefaultContentType(c, create) testMultipartObjectCreation(c, create) + testMultipartObjectAbort(c, create) } func testCreateBucket(c *check.C, create func() Driver) { @@ -59,7 +60,7 @@ func testCreateBucket(c *check.C, create func() Driver) { func testMultipartObjectCreation(c *check.C, create func() Driver) { drivers := create() switch { - case reflect.TypeOf(drivers).String() != "*memory.memoryDriver": + case reflect.TypeOf(drivers).String() == "*donut.donutDriver": return } err := drivers.CreateBucket("bucket", "") @@ -68,6 +69,7 @@ func testMultipartObjectCreation(c *check.C, create func() Driver) { c.Assert(err, check.IsNil) parts := make(map[int]string) + finalHasher := md5.New() for i := 1; i <= 10; i++ { randomPerm := rand.Perm(10) randomString := "" @@ -76,6 +78,7 @@ func testMultipartObjectCreation(c *check.C, create func() Driver) { } hasher := md5.New() + finalHasher.Write([]byte(randomString)) hasher.Write([]byte(randomString)) expectedmd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) expectedmd5Sumhex := hex.EncodeToString(hasher.Sum(nil)) @@ -86,14 +89,16 @@ func testMultipartObjectCreation(c *check.C, create func() Driver) { c.Assert(calculatedmd5sum, check.Equals, expectedmd5Sumhex) parts[i] = calculatedmd5sum } - _, err = drivers.CompleteMultipartUpload("bucket", "key", uploadID, parts) + finalExpectedmd5SumHex := hex.EncodeToString(finalHasher.Sum(nil)) + calculatedFinalmd5Sum, err := drivers.CompleteMultipartUpload("bucket", "key", uploadID, parts) c.Assert(err, check.IsNil) + c.Assert(calculatedFinalmd5Sum, check.Equals, finalExpectedmd5SumHex) } func testMultipartObjectAbort(c *check.C, create func() Driver) { drivers := create() switch { - case reflect.TypeOf(drivers).String() != "*memory.memoryDriver": + case reflect.TypeOf(drivers).String() == "*donut.donutDriver": return } err := drivers.CreateBucket("bucket", "") diff --git a/pkg/storage/drivers/fs/fs.go b/pkg/storage/drivers/fs/fs.go index cec41b651..dfcf7ec10 100644 --- a/pkg/storage/drivers/fs/fs.go +++ b/pkg/storage/drivers/fs/fs.go @@ -24,8 +24,9 @@ import ( ) type fsDriver struct { - root string - lock *sync.Mutex + root string + lock *sync.Mutex + multiparts *Multiparts } // Start filesystem channel @@ -35,6 +36,9 @@ func Start(root string) (chan<- string, <-chan error, drivers.Driver) { fs := new(fsDriver) fs.root = root fs.lock = new(sync.Mutex) + // internal related to multiparts + fs.multiparts = new(Multiparts) + fs.multiparts.ActiveSession = make(map[string]*MultipartSession) go start(ctrlChannel, errorChannel, fs) return ctrlChannel, errorChannel, fs } diff --git a/pkg/storage/drivers/fs/fs_multipart.go b/pkg/storage/drivers/fs/fs_multipart.go index c2bb4d17b..e73ddffef 100644 --- a/pkg/storage/drivers/fs/fs_multipart.go +++ b/pkg/storage/drivers/fs/fs_multipart.go @@ -1,33 +1,465 @@ package filesystem import ( + "bytes" + "crypto/md5" + "crypto/sha512" + "encoding/base64" + "encoding/gob" + "encoding/hex" "errors" + "fmt" "io" + "io/ioutil" + "math/rand" + "os" + "path" + "sort" + "strconv" + "strings" + "time" "github.com/minio/minio/pkg/iodine" "github.com/minio/minio/pkg/storage/drivers" ) +// MultipartSession holds active session information +type MultipartSession struct { + TotalParts int + UploadID string + Initiated time.Time + Parts []*drivers.PartMetadata +} + +// Multiparts collection of many parts +type Multiparts struct { + ActiveSession map[string]*MultipartSession +} + func (fs *fsDriver) ListMultipartUploads(bucket string, resources drivers.BucketMultipartResourcesMetadata) (drivers.BucketMultipartResourcesMetadata, error) { + fs.lock.Lock() + defer fs.lock.Unlock() return drivers.BucketMultipartResourcesMetadata{}, iodine.New(errors.New("Not Implemented"), nil) } func (fs *fsDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) { - return "", iodine.New(errors.New("Not Implemented"), nil) + fs.lock.Lock() + defer fs.lock.Unlock() + + if !drivers.IsValidBucket(bucket) { + return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) + } + if !drivers.IsValidObjectName(key) { + return "", iodine.New(drivers.ObjectNameInvalid{Object: key}, nil) + } + + bucketPath := path.Join(fs.root, bucket) + _, err := os.Stat(bucketPath) + + // check bucket exists + if os.IsNotExist(err) { + return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + } + if err != nil { + return "", iodine.New(drivers.InternalError{}, nil) + } + + objectPath := path.Join(bucketPath, key) + objectDir := path.Dir(objectPath) + if _, err := os.Stat(objectDir); os.IsNotExist(err) { + err = os.MkdirAll(objectDir, 0700) + if err != nil { + return "", iodine.New(err, nil) + } + } + + // check if object exists + if _, err := os.Stat(objectPath); !os.IsNotExist(err) { + return "", iodine.New(drivers.ObjectExists{ + Bucket: bucket, + Object: key, + }, nil) + } + + id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String()) + uploadIDSum := sha512.Sum512(id) + uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47] + + file, err := os.OpenFile(objectPath+"$multiparts", os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + return "", iodine.New(err, nil) + } + defer file.Close() + + mpartSession := new(MultipartSession) + mpartSession.TotalParts = 0 + mpartSession.UploadID = uploadID + mpartSession.Initiated = time.Now() + var parts []*drivers.PartMetadata + mpartSession.Parts = parts + fs.multiparts.ActiveSession[uploadID] = mpartSession + + // serialize metadata to gob + encoder := gob.NewEncoder(file) + err = encoder.Encode(mpartSession) + if err != nil { + return "", iodine.New(err, nil) + } + return uploadID, nil +} + +func (fs *fsDriver) isValidUploadID(uploadID string) bool { + _, ok := fs.multiparts.ActiveSession[uploadID] + return ok } +func (fs *fsDriver) writePart(objectPath string, partID int, size int64, data io.Reader) (drivers.PartMetadata, error) { + partPath := objectPath + fmt.Sprintf("$%d", partID) + // write part + partFile, err := os.OpenFile(partPath, os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + return drivers.PartMetadata{}, iodine.New(err, nil) + } + defer partFile.Close() + + h := md5.New() + mw := io.MultiWriter(partFile, h) + + _, err = io.CopyN(mw, data, size) + if err != nil { + return drivers.PartMetadata{}, iodine.New(err, nil) + } + + fi, err := os.Stat(partPath) + if err != nil { + return drivers.PartMetadata{}, iodine.New(err, nil) + } + partMetadata := drivers.PartMetadata{} + partMetadata.ETag = hex.EncodeToString(h.Sum(nil)) + partMetadata.PartNumber = partID + partMetadata.Size = fi.Size() + partMetadata.LastModified = fi.ModTime() + return partMetadata, nil +} + +// partNumber is a sortable interface for Part slice +type partNumber []*drivers.PartMetadata + +func (a partNumber) Len() int { return len(a) } +func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber } + func (fs *fsDriver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { - return "", iodine.New(errors.New("Not Implemented"), nil) + fs.lock.Lock() + defer fs.lock.Unlock() + + if partID <= 0 { + return "", iodine.New(errors.New("invalid part id, cannot be zero or less than zero"), nil) + } + // check bucket name valid + if drivers.IsValidBucket(bucket) == false { + return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) + } + + // verify object path legal + if drivers.IsValidObjectName(key) == false { + return "", iodine.New(drivers.ObjectNameInvalid{Bucket: bucket, Object: key}, nil) + } + + if !fs.isValidUploadID(uploadID) { + return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) + } + + if strings.TrimSpace(expectedMD5Sum) != "" { + expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum)) + if err != nil { + // pro-actively close the connection + return "", iodine.New(drivers.InvalidDigest{Md5: expectedMD5Sum}, nil) + } + expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) + } + + bucketPath := path.Join(fs.root, bucket) + _, err := os.Stat(bucketPath) + + // check bucket exists + if os.IsNotExist(err) { + return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + } + if err != nil { + return "", iodine.New(drivers.InternalError{}, nil) + } + + objectPath := path.Join(bucketPath, key) + objectDir := path.Dir(objectPath) + if _, err := os.Stat(objectDir); os.IsNotExist(err) { + err = os.MkdirAll(objectDir, 0700) + if err != nil { + return "", iodine.New(err, nil) + } + } + + // check if object exists + if _, err := os.Stat(objectPath); !os.IsNotExist(err) { + return "", iodine.New(drivers.ObjectExists{ + Bucket: bucket, + Object: key, + }, nil) + } + partMetadata, err := fs.writePart(objectPath, partID, size, data) + if err != nil { + return "", iodine.New(err, nil) + } + + // Verify if the written object is equal to what is expected, only if it is requested as such + if strings.TrimSpace(expectedMD5Sum) != "" { + if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), partMetadata.ETag); err != nil { + return "", iodine.New(drivers.BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Key: key}, nil) + } + } + + multiPartfile, err := os.OpenFile(objectPath+"$multiparts", os.O_RDWR, 0600) + if err != nil { + return "", iodine.New(err, nil) + } + defer multiPartfile.Close() + + var deserializedMultipartSession MultipartSession + decoder := gob.NewDecoder(multiPartfile) + err = decoder.Decode(&deserializedMultipartSession) + if err != nil { + return "", iodine.New(err, nil) + } + deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, &partMetadata) + deserializedMultipartSession.TotalParts++ + fs.multiparts.ActiveSession[uploadID] = &deserializedMultipartSession + + sort.Sort(partNumber(deserializedMultipartSession.Parts)) + encoder := gob.NewEncoder(multiPartfile) + err = encoder.Encode(&deserializedMultipartSession) + if err != nil { + return "", iodine.New(err, nil) + } + return partMetadata.ETag, nil } func (fs *fsDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) { - return "", iodine.New(errors.New("Not Implemented"), nil) + fs.lock.Lock() + defer fs.lock.Unlock() + + // check bucket name valid + if drivers.IsValidBucket(bucket) == false { + return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) + } + + // verify object path legal + if drivers.IsValidObjectName(key) == false { + return "", iodine.New(drivers.ObjectNameInvalid{Bucket: bucket, Object: key}, nil) + } + + if !fs.isValidUploadID(uploadID) { + return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) + } + + bucketPath := path.Join(fs.root, bucket) + _, err := os.Stat(bucketPath) + // check bucket exists + if os.IsNotExist(err) { + return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + } + if err != nil { + return "", iodine.New(drivers.InternalError{}, nil) + } + + objectPath := path.Join(bucketPath, key) + // check if object exists + if _, err := os.Stat(objectPath); !os.IsNotExist(err) { + return "", iodine.New(drivers.ObjectExists{ + Bucket: bucket, + Object: key, + }, nil) + } + + file, err := os.OpenFile(objectPath, os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + return "", iodine.New(err, nil) + } + defer file.Close() + h := md5.New() + mw := io.MultiWriter(file, h) + + for i := 1; i <= len(parts); i++ { + recvMD5 := parts[i] + partFile, err := os.OpenFile(objectPath+fmt.Sprintf("$%d", i), os.O_RDONLY, 0600) + if err != nil { + return "", iodine.New(err, nil) + } + obj, err := ioutil.ReadAll(partFile) + if err != nil { + return "", iodine.New(err, nil) + } + calcMD5Bytes := md5.Sum(obj) + // complete multi part request header md5sum per part is hex encoded + recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\"")) + if err != nil { + return "", iodine.New(drivers.InvalidDigest{Md5: recvMD5}, nil) + } + if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) { + return "", iodine.New(drivers.BadDigest{Md5: recvMD5, Bucket: bucket, Key: key}, nil) + } + _, err = io.Copy(mw, bytes.NewBuffer(obj)) + if err != nil { + return "", iodine.New(err, nil) + } + } + md5sum := hex.EncodeToString(h.Sum(nil)) + + multiPartfile, err := os.OpenFile(objectPath+"$multiparts", os.O_RDWR, 0600) + if err != nil { + return "", iodine.New(err, nil) + } + var deserializedMultipartSession MultipartSession + decoder := gob.NewDecoder(multiPartfile) + err = decoder.Decode(&deserializedMultipartSession) + if err != nil { + return "", iodine.New(err, nil) + } + multiPartfile.Close() // close it right here, since we will delete it subsequently + + delete(fs.multiparts.ActiveSession, uploadID) + for _, part := range deserializedMultipartSession.Parts { + err = os.RemoveAll(objectPath + fmt.Sprintf("$%d", part.PartNumber)) + if err != nil { + return "", iodine.New(err, nil) + } + } + err = os.RemoveAll(objectPath + "$multiparts") + if err != nil { + return "", iodine.New(err, nil) + } + return md5sum, nil } func (fs *fsDriver) ListObjectParts(bucket, key string, resources drivers.ObjectResourcesMetadata) (drivers.ObjectResourcesMetadata, error) { - return drivers.ObjectResourcesMetadata{}, iodine.New(errors.New("Not Implemented"), nil) + fs.lock.Lock() + defer fs.lock.Unlock() + + // check bucket name valid + if drivers.IsValidBucket(bucket) == false { + return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) + } + + // verify object path legal + if drivers.IsValidObjectName(key) == false { + return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.ObjectNameInvalid{Bucket: bucket, Object: key}, nil) + } + + if !fs.isValidUploadID(resources.UploadID) { + return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.InvalidUploadID{UploadID: resources.UploadID}, nil) + } + + objectResourcesMetadata := resources + objectResourcesMetadata.Bucket = bucket + objectResourcesMetadata.Key = key + var startPartNumber int + switch { + case objectResourcesMetadata.PartNumberMarker == 0: + startPartNumber = 1 + default: + startPartNumber = objectResourcesMetadata.PartNumberMarker + } + + bucketPath := path.Join(fs.root, bucket) + _, err := os.Stat(bucketPath) + // check bucket exists + if os.IsNotExist(err) { + return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + } + if err != nil { + return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.InternalError{}, nil) + } + + objectPath := path.Join(bucketPath, key) + multiPartfile, err := os.OpenFile(objectPath+"$multiparts", os.O_RDWR, 0600) + if err != nil { + return drivers.ObjectResourcesMetadata{}, iodine.New(err, nil) + } + defer multiPartfile.Close() + + var deserializedMultipartSession MultipartSession + decoder := gob.NewDecoder(multiPartfile) + err = decoder.Decode(&deserializedMultipartSession) + if err != nil { + return drivers.ObjectResourcesMetadata{}, iodine.New(err, nil) + } + var parts []*drivers.PartMetadata + for i := startPartNumber; i <= deserializedMultipartSession.TotalParts; i++ { + if len(parts) > objectResourcesMetadata.MaxParts { + sort.Sort(partNumber(parts)) + objectResourcesMetadata.IsTruncated = true + objectResourcesMetadata.Part = parts + objectResourcesMetadata.NextPartNumberMarker = i + return objectResourcesMetadata, nil + } + parts = append(parts, deserializedMultipartSession.Parts[i-1]) + } + sort.Sort(partNumber(parts)) + objectResourcesMetadata.Part = parts + return objectResourcesMetadata, nil } func (fs *fsDriver) AbortMultipartUpload(bucket, key, uploadID string) error { - return iodine.New(errors.New("Not Implemented"), nil) + fs.lock.Lock() + defer fs.lock.Unlock() + + // check bucket name valid + if drivers.IsValidBucket(bucket) == false { + return iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) + } + + // verify object path legal + if drivers.IsValidObjectName(key) == false { + return iodine.New(drivers.ObjectNameInvalid{Bucket: bucket, Object: key}, nil) + } + + if !fs.isValidUploadID(uploadID) { + return iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) + } + + bucketPath := path.Join(fs.root, bucket) + _, err := os.Stat(bucketPath) + // check bucket exists + if os.IsNotExist(err) { + return iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + } + if err != nil { + return iodine.New(drivers.InternalError{}, nil) + } + + objectPath := path.Join(bucketPath, key) + multiPartfile, err := os.OpenFile(objectPath+"$multiparts", os.O_RDWR, 0600) + if err != nil { + return iodine.New(err, nil) + } + + var deserializedMultipartSession MultipartSession + decoder := gob.NewDecoder(multiPartfile) + err = decoder.Decode(&deserializedMultipartSession) + if err != nil { + return iodine.New(err, nil) + } + multiPartfile.Close() // close it right here, since we will delete it subsequently + + delete(fs.multiparts.ActiveSession, uploadID) + for _, part := range deserializedMultipartSession.Parts { + err = os.RemoveAll(objectPath + fmt.Sprintf("$%d", part.PartNumber)) + if err != nil { + return iodine.New(err, nil) + } + } + err = os.RemoveAll(objectPath + "$multiparts") + if err != nil { + return iodine.New(err, nil) + } + return nil } diff --git a/pkg/storage/drivers/fs/fs_object.go b/pkg/storage/drivers/fs/fs_object.go index 752babd7b..b599c33b2 100644 --- a/pkg/storage/drivers/fs/fs_object.go +++ b/pkg/storage/drivers/fs/fs_object.go @@ -266,7 +266,7 @@ func (fs *fsDriver) CreateObject(bucket, key, contentType, expectedMD5Sum string h := md5.New() mw := io.MultiWriter(file, h) - _, err = io.Copy(mw, data) + _, err = io.CopyN(mw, data, size) if err != nil { return "", iodine.New(err, nil) } diff --git a/pkg/storage/drivers/fs/fs_test.go b/pkg/storage/drivers/fs/fs_test.go index 54ba11d6c..33cb66d31 100644 --- a/pkg/storage/drivers/fs/fs_test.go +++ b/pkg/storage/drivers/fs/fs_test.go @@ -34,7 +34,6 @@ var _ = Suite(&MySuite{}) func (s *MySuite) TestAPISuite(c *C) { var storageList []string - defer removeRoots(c, storageList) create := func() drivers.Driver { path, err := ioutil.TempDir(os.TempDir(), "minio-fs-") c.Check(err, IsNil) @@ -43,7 +42,7 @@ func (s *MySuite) TestAPISuite(c *C) { return store } drivers.APITestSuite(c, create) - + defer removeRoots(c, storageList) } func removeRoots(c *C, roots []string) { diff --git a/pkg/storage/drivers/memory/memory_multipart.go b/pkg/storage/drivers/memory/memory_multipart.go index de8d26ebf..a9145bf9f 100644 --- a/pkg/storage/drivers/memory/memory_multipart.go +++ b/pkg/storage/drivers/memory/memory_multipart.go @@ -263,6 +263,7 @@ func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string memory.lock.Unlock() md5sumSlice := md5.Sum(fullObject.Bytes()) + // this is needed for final verification inside CreateObject, do not convert this to hex md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:]) etag, err := memory.CreateObject(bucket, key, "", md5sum, size, &fullObject) if err != nil {