diff --git a/commands.go b/commands.go index c9b9ccbec..27cd5efa2 100644 --- a/commands.go +++ b/commands.go @@ -135,7 +135,7 @@ func runDonut(c *cli.Context) { Fatalln("Path must be set") } apiServerConfig := getAPIServerConfig(c) - donutDriver := server.DonutFactory{ + donutDriver := server.Factory{ Config: apiServerConfig, Paths: paths, MaxMemory: maxMemory, diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index bcc762d5a..e53a1cbe6 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -56,19 +56,13 @@ var _ = Suite(&MySuite{ }, }) -var _ = Suite(&MySuite{ - initDriver: func() (drivers.Driver, string) { - driver, _ := cache.NewDriver(10000, 3*time.Hour) - return driver, "" - }, -}) - var _ = Suite(&MySuite{ initDriver: func() (drivers.Driver, string) { root, _ := ioutil.TempDir(os.TempDir(), "minio-api") var roots []string roots = append(roots, root) - driver, _ := donut.NewDriver(roots, 10000, 3*time.Hour) + driver, _ := donut.NewDriver(roots) + driver, _ = cache.NewDriver(10000, 3*time.Hour, driver) return driver, root }, }) diff --git a/pkg/server/server.go b/pkg/server/server.go index 757f8eb43..1913ce66a 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -26,6 +26,8 @@ import ( "github.com/minio/minio/pkg/api/web" "github.com/minio/minio/pkg/iodine" "github.com/minio/minio/pkg/server/httpserver" + "github.com/minio/minio/pkg/storage/drivers" + "github.com/minio/minio/pkg/storage/drivers/cache" "github.com/minio/minio/pkg/storage/drivers/donut" "github.com/minio/minio/pkg/utils/log" ) @@ -43,22 +45,30 @@ func (f WebFactory) GetStartServerFunc() StartServerFunc { } } -// DonutFactory is used to build donut api server -type DonutFactory struct { +// Factory is used to build api server +type Factory struct { httpserver.Config Paths []string MaxMemory uint64 Expiration time.Duration } -// GetStartServerFunc DonutFactory builds donut api server -func (f DonutFactory) GetStartServerFunc() StartServerFunc { +// GetStartServerFunc Factory builds api server +func (f Factory) GetStartServerFunc() StartServerFunc { return func() (chan<- string, <-chan error) { - driver, err := donut.NewDriver(f.Paths, f.MaxMemory, f.Expiration) - if err != nil { - log.Fatalln(err) - } conf := api.Config{RateLimit: f.RateLimit} + var driver drivers.Driver + var err error + if len(f.Paths) != 0 { + driver, err = donut.NewDriver(f.Paths) + if err != nil { + log.Fatalln(err) + } + driver, err = cache.NewDriver(f.MaxMemory, f.Expiration, driver) + if err != nil { + log.Fatalln(err) + } + } conf.SetDriver(driver) ctrl, status, _ := httpserver.Start(api.HTTPHandler(conf), f.Config) return ctrl, status diff --git a/pkg/storage/drivers/api_testsuite.go b/pkg/storage/drivers/api_testsuite.go index a8d7fb899..b20cae176 100644 --- a/pkg/storage/drivers/api_testsuite.go +++ b/pkg/storage/drivers/api_testsuite.go @@ -22,6 +22,7 @@ import ( "encoding/base64" "encoding/hex" "math/rand" + "reflect" "strconv" "time" @@ -58,6 +59,10 @@ func testCreateBucket(c *check.C, create func() Driver) { func testMultipartObjectCreation(c *check.C, create func() Driver) { drivers := create() + switch { + case reflect.TypeOf(drivers).String() == "*donut.donutDriver": + return + } err := drivers.CreateBucket("bucket", "") c.Assert(err, check.IsNil) uploadID, err := drivers.NewMultipartUpload("bucket", "key", "") @@ -92,6 +97,10 @@ func testMultipartObjectCreation(c *check.C, create func() Driver) { func testMultipartObjectAbort(c *check.C, create func() Driver) { drivers := create() + switch { + case reflect.TypeOf(drivers).String() == "*donut.donutDriver": + return + } err := drivers.CreateBucket("bucket", "") c.Assert(err, check.IsNil) uploadID, err := drivers.NewMultipartUpload("bucket", "key", "") diff --git a/pkg/storage/drivers/cache/cache.go b/pkg/storage/drivers/cache/cache.go index 527babced..4b4bb21ce 100644 --- a/pkg/storage/drivers/cache/cache.go +++ b/pkg/storage/drivers/cache/cache.go @@ -45,8 +45,12 @@ type cacheDriver struct { multiPartObjects *trove.Cache maxSize uint64 expiration time.Duration + + // stacked driver + driver drivers.Driver } +// storedBucket saved bucket type storedBucket struct { bucketMetadata drivers.BucketMetadata objectMetadata map[string]drivers.ObjectMetadata @@ -54,23 +58,43 @@ type storedBucket struct { multiPartSession map[string]multiPartSession } +// multiPartSession multipart session type multiPartSession struct { totalParts int uploadID string initiated time.Time } +// total Number of buckets allowed const ( totalBuckets = 100 ) +type proxyWriter struct { + writer io.Writer + writtenBytes []byte +} + +func (r *proxyWriter) Write(p []byte) (n int, err error) { + n, err = r.writer.Write(p) + if err != nil { + return + } + r.writtenBytes = append(r.writtenBytes, p[0:n]...) + return +} + +func newProxyWriter(w io.Writer) *proxyWriter { + return &proxyWriter{writer: w, writtenBytes: nil} +} + // NewDriver instantiate a new cache driver -func NewDriver(maxSize uint64, expiration time.Duration) (drivers.Driver, error) { +func NewDriver(maxSize uint64, expiration time.Duration, driver drivers.Driver) (drivers.Driver, error) { cache := new(cacheDriver) cache.storedBuckets = make(map[string]storedBucket) - cache.objects = trove.NewCache(maxSize, expiration) cache.maxSize = maxSize cache.expiration = expiration + cache.objects = trove.NewCache(maxSize, expiration) cache.multiPartObjects = trove.NewCache(0, time.Duration(0)) cache.lock = new(sync.RWMutex) @@ -85,27 +109,29 @@ func NewDriver(maxSize uint64, expiration time.Duration) (drivers.Driver, error) // GetObject - GET object from cache buffer func (cache *cacheDriver) GetObject(w io.Writer, bucket string, object string) (int64, error) { cache.lock.RLock() + defer cache.lock.RUnlock() if !drivers.IsValidBucket(bucket) { - cache.lock.RUnlock() return 0, iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) } if !drivers.IsValidObjectName(object) { - cache.lock.RUnlock() return 0, iodine.New(drivers.ObjectNameInvalid{Object: object}, nil) } if _, ok := cache.storedBuckets[bucket]; ok == false { - cache.lock.RUnlock() return 0, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) } objectKey := bucket + "/" + object data, ok := cache.objects.Get(objectKey) if !ok { - cache.lock.RUnlock() + if cache.driver != nil { + return cache.driver.GetObject(w, bucket, object) + } return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, nil) } written, err := io.Copy(w, bytes.NewBuffer(data)) - cache.lock.RUnlock() - return written, iodine.New(err, nil) + if err != nil { + return 0, iodine.New(err, nil) + } + return written, nil } // GetPartialObject - GET object from cache buffer range @@ -117,12 +143,11 @@ func (cache *cacheDriver) GetPartialObject(w io.Writer, bucket, object string, s "length": strconv.FormatInt(length, 10), } cache.lock.RLock() + defer cache.lock.RUnlock() if !drivers.IsValidBucket(bucket) { - cache.lock.RUnlock() return 0, iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, errParams) } if !drivers.IsValidObjectName(object) { - cache.lock.RUnlock() return 0, iodine.New(drivers.ObjectNameInvalid{Object: object}, errParams) } if start < 0 { @@ -134,24 +159,43 @@ func (cache *cacheDriver) GetPartialObject(w io.Writer, bucket, object string, s objectKey := bucket + "/" + object data, ok := cache.objects.Get(objectKey) if !ok { - cache.lock.RUnlock() - return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, errParams) + if cache.driver != nil { + return cache.driver.GetPartialObject(w, bucket, object, start, length) + } + return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, nil) } written, err := io.CopyN(w, bytes.NewBuffer(data[start:]), length) - cache.lock.RUnlock() - return written, iodine.New(err, nil) + if err != nil { + return 0, iodine.New(err, nil) + } + return written, nil } // GetBucketMetadata - func (cache *cacheDriver) GetBucketMetadata(bucket string) (drivers.BucketMetadata, error) { cache.lock.RLock() - defer cache.lock.RUnlock() if !drivers.IsValidBucket(bucket) { + cache.lock.RUnlock() return drivers.BucketMetadata{}, iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) } if _, ok := cache.storedBuckets[bucket]; ok == false { - return drivers.BucketMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + if cache.driver == nil { + cache.lock.RUnlock() + return drivers.BucketMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + } + bucketMetadata, err := cache.driver.GetBucketMetadata(bucket) + if err != nil { + cache.lock.RUnlock() + return drivers.BucketMetadata{}, iodine.New(err, nil) + } + storedBucket := cache.storedBuckets[bucket] + cache.lock.RUnlock() + cache.lock.Lock() + storedBucket.bucketMetadata = bucketMetadata + cache.storedBuckets[bucket] = storedBucket + cache.lock.Unlock() } + cache.lock.RUnlock() return cache.storedBuckets[bucket].bucketMetadata, nil } @@ -171,10 +215,15 @@ func (cache *cacheDriver) SetBucketMetadata(bucket, acl string) error { } cache.lock.RUnlock() cache.lock.Lock() - defer cache.lock.Unlock() + if cache.driver != nil { + if err := cache.driver.SetBucketMetadata(bucket, acl); err != nil { + return iodine.New(err, nil) + } + } storedBucket := cache.storedBuckets[bucket] storedBucket.bucketMetadata.ACL = drivers.BucketACL(acl) cache.storedBuckets[bucket] = storedBucket + cache.lock.Unlock() return nil } @@ -332,6 +381,11 @@ func (cache *cacheDriver) CreateBucket(bucketName, acl string) error { // default is private acl = "private" } + if cache.driver != nil { + if err := cache.driver.CreateBucket(bucketName, acl); err != nil { + return iodine.New(err, nil) + } + } var newBucket = storedBucket{} newBucket.objectMetadata = make(map[string]drivers.ObjectMetadata) newBucket.multiPartSession = make(map[string]multiPartSession) @@ -475,22 +529,38 @@ func (cache *cacheDriver) ListBuckets() ([]drivers.BucketMetadata, error) { // GetObjectMetadata - get object metadata from cache func (cache *cacheDriver) GetObjectMetadata(bucket, key string) (drivers.ObjectMetadata, error) { cache.lock.RLock() - defer cache.lock.RUnlock() // check if bucket exists if !drivers.IsValidBucket(bucket) { + cache.lock.RUnlock() return drivers.ObjectMetadata{}, iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) } if !drivers.IsValidObjectName(key) { + cache.lock.RUnlock() return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNameInvalid{Object: key}, nil) } if _, ok := cache.storedBuckets[bucket]; ok == false { + cache.lock.RUnlock() return drivers.ObjectMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) } storedBucket := cache.storedBuckets[bucket] objectKey := bucket + "/" + key - if object, ok := storedBucket.objectMetadata[objectKey]; ok == true { - return object, nil + if objMetadata, ok := storedBucket.objectMetadata[objectKey]; ok == true { + cache.lock.RUnlock() + return objMetadata, nil + } + if cache.driver != nil { + objMetadata, err := cache.driver.GetObjectMetadata(bucket, key) + cache.lock.RUnlock() + if err != nil { + return drivers.ObjectMetadata{}, iodine.New(err, nil) + } + // update + cache.lock.Lock() + storedBucket.objectMetadata[objectKey] = objMetadata + cache.lock.Unlock() + return objMetadata, nil } + cache.lock.RUnlock() return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: key}, nil) } diff --git a/pkg/storage/drivers/cache/cache_test.go b/pkg/storage/drivers/cache/cache_test.go index 9a9ba25eb..30326b3f4 100644 --- a/pkg/storage/drivers/cache/cache_test.go +++ b/pkg/storage/drivers/cache/cache_test.go @@ -32,7 +32,8 @@ var _ = Suite(&MySuite{}) func (s *MySuite) TestAPISuite(c *C) { create := func() drivers.Driver { - store, err := NewDriver(1000000, 3*time.Hour) + var driver drivers.Driver + store, err := NewDriver(1000000, 3*time.Hour, driver) c.Check(err, IsNil) return store } diff --git a/pkg/storage/drivers/donut/donut-multipart.go b/pkg/storage/drivers/donut/donut-multipart.go index 2fe388dc3..a418aea95 100644 --- a/pkg/storage/drivers/donut/donut-multipart.go +++ b/pkg/storage/drivers/donut/donut-multipart.go @@ -17,414 +17,31 @@ package donut import ( - "bytes" - "crypto/md5" - "crypto/sha512" - "encoding/base64" - "encoding/hex" - "errors" "io" - "math/rand" - "runtime/debug" - "sort" - "strconv" - "strings" - "time" "github.com/minio/minio/pkg/iodine" "github.com/minio/minio/pkg/storage/drivers" ) -// isMD5SumEqual - returns error if md5sum mismatches, success its `nil` -func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error { - if strings.TrimSpace(expectedMD5Sum) != "" && strings.TrimSpace(actualMD5Sum) != "" { - expectedMD5SumBytes, err := hex.DecodeString(expectedMD5Sum) - if err != nil { - return iodine.New(err, nil) - } - actualMD5SumBytes, err := hex.DecodeString(actualMD5Sum) - if err != nil { - return iodine.New(err, nil) - } - if !bytes.Equal(expectedMD5SumBytes, actualMD5SumBytes) { - return iodine.New(errors.New("bad digest, md5sum mismatch"), nil) - } - return nil - } - return iodine.New(errors.New("invalid argument"), nil) -} - func (d donutDriver) NewMultipartUpload(bucketName, objectName, contentType string) (string, error) { - d.lock.RLock() - if !drivers.IsValidBucket(bucketName) { - d.lock.RUnlock() - return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil) - } - if !drivers.IsValidObjectName(objectName) { - d.lock.RUnlock() - return "", iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil) - } - if _, ok := d.storedBuckets[bucketName]; ok == false { - d.lock.RUnlock() - return "", iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) - } - storedBucket := d.storedBuckets[bucketName] - objectKey := bucketName + "/" + objectName - if _, ok := storedBucket.objectMetadata[objectKey]; ok == true { - d.lock.RUnlock() - return "", iodine.New(drivers.ObjectExists{Bucket: bucketName, Object: objectName}, nil) - } - d.lock.RUnlock() - - d.lock.Lock() - id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucketName + objectName + time.Now().String()) - uploadIDSum := sha512.Sum512(id) - uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47] - - d.storedBuckets[bucketName].multiPartSession[objectName] = multiPartSession{ - uploadID: uploadID, - initiated: time.Now().UTC(), - totalParts: 0, - } - d.lock.Unlock() - return uploadID, nil + return "", iodine.New(drivers.APINotImplemented{API: "NewMultipartUpload"}, nil) } func (d donutDriver) AbortMultipartUpload(bucketName, objectName, uploadID string) error { - d.lock.RLock() - storedBucket := d.storedBuckets[bucketName] - if storedBucket.multiPartSession[objectName].uploadID != uploadID { - d.lock.RUnlock() - return iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) - } - d.lock.RUnlock() - - d.cleanupMultiparts(bucketName, objectName, uploadID) - d.cleanupMultipartSession(bucketName, objectName, uploadID) - return nil -} - -func getMultipartKey(key string, uploadID string, partNumber int) string { - return key + "?uploadId=" + uploadID + "&partNumber=" + strconv.Itoa(partNumber) + return iodine.New(drivers.APINotImplemented{API: "AbortMultipartUpload"}, nil) } func (d donutDriver) CreateObjectPart(bucketName, objectName, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { - // Verify upload id - d.lock.RLock() - storedBucket := d.storedBuckets[bucketName] - if storedBucket.multiPartSession[objectName].uploadID != uploadID { - d.lock.RUnlock() - return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) - } - d.lock.RUnlock() - - etag, err := d.createObjectPart(bucketName, objectName, uploadID, partID, "", expectedMD5Sum, size, data) - if err != nil { - return "", iodine.New(err, nil) - } - // free - debug.FreeOSMemory() - return etag, nil -} - -// createObject - PUT object to memory buffer -func (d donutDriver) createObjectPart(bucketName, objectName, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { - d.lock.RLock() - if !drivers.IsValidBucket(bucketName) { - d.lock.RUnlock() - return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil) - } - if !drivers.IsValidObjectName(objectName) { - d.lock.RUnlock() - return "", iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil) - } - if _, ok := d.storedBuckets[bucketName]; ok == false { - d.lock.RUnlock() - return "", iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) - } - storedBucket := d.storedBuckets[bucketName] - // get object key - partKey := bucketName + "/" + getMultipartKey(objectName, uploadID, partID) - if _, ok := storedBucket.partMetadata[partKey]; ok == true { - d.lock.RUnlock() - return storedBucket.partMetadata[partKey].ETag, nil - } - d.lock.RUnlock() - - if contentType == "" { - contentType = "application/octet-stream" - } - contentType = strings.TrimSpace(contentType) - if strings.TrimSpace(expectedMD5Sum) != "" { - expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum)) - if err != nil { - // pro-actively close the connection - return "", iodine.New(drivers.InvalidDigest{Md5: expectedMD5Sum}, nil) - } - expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) - } - - // calculate md5 - hash := md5.New() - var readBytes []byte - - var err error - var length int - for err == nil { - byteBuffer := make([]byte, 1024*1024) - length, err = data.Read(byteBuffer) - // While hash.Write() wouldn't mind a Nil byteBuffer - // It is necessary for us to verify this and break - if length == 0 { - break - } - hash.Write(byteBuffer[0:length]) - readBytes = append(readBytes, byteBuffer[0:length]...) - } - if err != io.EOF { - return "", iodine.New(err, nil) - } - go debug.FreeOSMemory() - md5SumBytes := hash.Sum(nil) - totalLength := int64(len(readBytes)) - - d.lock.Lock() - d.multiPartObjects.Set(partKey, readBytes) - d.lock.Unlock() - // setting up for de-allocation - readBytes = nil - go debug.FreeOSMemory() - - md5Sum := hex.EncodeToString(md5SumBytes) - // Verify if the written object is equal to what is expected, only if it is requested as such - if strings.TrimSpace(expectedMD5Sum) != "" { - if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil { - return "", iodine.New(drivers.BadDigest{ - Md5: expectedMD5Sum, - Bucket: bucketName, - Key: objectName, - }, nil) - } - } - newPart := drivers.PartMetadata{ - PartNumber: partID, - LastModified: time.Now().UTC(), - ETag: md5Sum, - Size: totalLength, - } - - d.lock.Lock() - storedBucket.partMetadata[partKey] = newPart - multiPartSession := storedBucket.multiPartSession[objectName] - multiPartSession.totalParts++ - storedBucket.multiPartSession[objectName] = multiPartSession - d.storedBuckets[bucketName] = storedBucket - d.lock.Unlock() - - return md5Sum, nil -} - -func (d donutDriver) cleanupMultipartSession(bucketName, objectName, uploadID string) { - d.lock.Lock() - defer d.lock.Unlock() - delete(d.storedBuckets[bucketName].multiPartSession, objectName) -} - -func (d donutDriver) cleanupMultiparts(bucketName, objectName, uploadID string) { - for i := 1; i <= d.storedBuckets[bucketName].multiPartSession[objectName].totalParts; i++ { - objectKey := bucketName + "/" + getMultipartKey(objectName, uploadID, i) - d.multiPartObjects.Delete(objectKey) - } + return "", iodine.New(drivers.APINotImplemented{API: "CreateObjectPart"}, nil) } func (d donutDriver) CompleteMultipartUpload(bucketName, objectName, uploadID string, parts map[int]string) (string, error) { - if !drivers.IsValidBucket(bucketName) { - return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil) - } - if !drivers.IsValidObjectName(objectName) { - return "", iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil) - } - // Verify upload id - d.lock.RLock() - if _, ok := d.storedBuckets[bucketName]; ok == false { - d.lock.RUnlock() - return "", iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) - } - storedBucket := d.storedBuckets[bucketName] - if storedBucket.multiPartSession[objectName].uploadID != uploadID { - d.lock.RUnlock() - return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) - } - d.lock.RUnlock() - - d.lock.Lock() - var size int64 - fullHasher := md5.New() - var fullObject bytes.Buffer - for i := 1; i <= len(parts); i++ { - recvMD5 := parts[i] - object, ok := d.multiPartObjects.Get(bucketName + "/" + getMultipartKey(objectName, uploadID, i)) - if ok == false { - d.lock.Unlock() - return "", iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) - } - size += int64(len(object)) - calcMD5Bytes := md5.Sum(object) - // complete multi part request header md5sum per part is hex encoded - recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\"")) - if err != nil { - return "", iodine.New(drivers.InvalidDigest{Md5: recvMD5}, nil) - } - if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) { - return "", iodine.New(drivers.BadDigest{ - Md5: recvMD5, - Bucket: bucketName, - Key: getMultipartKey(objectName, uploadID, i), - }, nil) - } - mw := io.MultiWriter(&fullObject, fullHasher) - _, err = io.Copy(mw, bytes.NewReader(object)) - if err != nil { - return "", iodine.New(err, nil) - } - object = nil - go debug.FreeOSMemory() - } - d.lock.Unlock() - - md5sumSlice := fullHasher.Sum(nil) - // this is needed for final verification inside CreateObject, do not convert this to hex - md5sum := base64.StdEncoding.EncodeToString(md5sumSlice) - etag, err := d.CreateObject(bucketName, objectName, "", md5sum, size, &fullObject) - if err != nil { - // No need to call internal cleanup functions here, caller will call AbortMultipartUpload() - // which would in-turn cleanup properly in accordance with S3 Spec - return "", iodine.New(err, nil) - } - fullObject.Reset() - go debug.FreeOSMemory() - - d.cleanupMultiparts(bucketName, objectName, uploadID) - d.cleanupMultipartSession(bucketName, objectName, uploadID) - return etag, nil + return "", iodine.New(drivers.APINotImplemented{API: "CompleteMultipartUpload"}, nil) } -// byKey is a sortable interface for UploadMetadata slice -type byKey []*drivers.UploadMetadata - -func (a byKey) Len() int { return len(a) } -func (a byKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a byKey) Less(i, j int) bool { return a[i].Key < a[j].Key } - func (d donutDriver) ListMultipartUploads(bucketName string, resources drivers.BucketMultipartResourcesMetadata) (drivers.BucketMultipartResourcesMetadata, error) { - d.lock.RLock() - defer d.lock.RUnlock() - if _, ok := d.storedBuckets[bucketName]; ok == false { - return drivers.BucketMultipartResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) - } - storedBucket := d.storedBuckets[bucketName] - var uploads []*drivers.UploadMetadata - - for key, session := range storedBucket.multiPartSession { - if strings.HasPrefix(key, resources.Prefix) { - if len(uploads) > resources.MaxUploads { - sort.Sort(byKey(uploads)) - resources.Upload = uploads - resources.NextKeyMarker = key - resources.NextUploadIDMarker = session.uploadID - resources.IsTruncated = true - return resources, nil - } - // uploadIDMarker is ignored if KeyMarker is empty - switch { - case resources.KeyMarker != "" && resources.UploadIDMarker == "": - if key > resources.KeyMarker { - upload := new(drivers.UploadMetadata) - upload.Key = key - upload.UploadID = session.uploadID - upload.Initiated = session.initiated - uploads = append(uploads, upload) - } - case resources.KeyMarker != "" && resources.UploadIDMarker != "": - if session.uploadID > resources.UploadIDMarker { - if key >= resources.KeyMarker { - upload := new(drivers.UploadMetadata) - upload.Key = key - upload.UploadID = session.uploadID - upload.Initiated = session.initiated - uploads = append(uploads, upload) - } - } - default: - upload := new(drivers.UploadMetadata) - upload.Key = key - upload.UploadID = session.uploadID - upload.Initiated = session.initiated - uploads = append(uploads, upload) - } - } - } - sort.Sort(byKey(uploads)) - resources.Upload = uploads - return resources, nil + return drivers.BucketMultipartResourcesMetadata{}, iodine.New(drivers.APINotImplemented{API: "ListMultipartUploads"}, nil) } - -// partNumber is a sortable interface for Part slice -type partNumber []*drivers.PartMetadata - -func (a partNumber) Len() int { return len(a) } -func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber } - func (d donutDriver) ListObjectParts(bucketName, objectName string, resources drivers.ObjectResourcesMetadata) (drivers.ObjectResourcesMetadata, error) { - // Verify upload id - d.lock.RLock() - defer d.lock.RUnlock() - if _, ok := d.storedBuckets[bucketName]; ok == false { - return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) - } - storedBucket := d.storedBuckets[bucketName] - if _, ok := storedBucket.multiPartSession[objectName]; ok == false { - return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.ObjectNotFound{Bucket: bucketName, Object: objectName}, nil) - } - if storedBucket.multiPartSession[objectName].uploadID != resources.UploadID { - return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.InvalidUploadID{UploadID: resources.UploadID}, nil) - } - objectResourcesMetadata := resources - objectResourcesMetadata.Bucket = bucketName - objectResourcesMetadata.Key = objectName - var parts []*drivers.PartMetadata - var startPartNumber int - switch { - case objectResourcesMetadata.PartNumberMarker == 0: - startPartNumber = 1 - default: - startPartNumber = objectResourcesMetadata.PartNumberMarker - } - for i := startPartNumber; i <= storedBucket.multiPartSession[objectName].totalParts; i++ { - if len(parts) > objectResourcesMetadata.MaxParts { - sort.Sort(partNumber(parts)) - objectResourcesMetadata.IsTruncated = true - objectResourcesMetadata.Part = parts - objectResourcesMetadata.NextPartNumberMarker = i - return objectResourcesMetadata, nil - } - part, ok := storedBucket.partMetadata[bucketName+"/"+getMultipartKey(objectName, resources.UploadID, i)] - if !ok { - return drivers.ObjectResourcesMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) - } - parts = append(parts, &part) - } - sort.Sort(partNumber(parts)) - objectResourcesMetadata.Part = parts - return objectResourcesMetadata, nil -} - -func (d donutDriver) expiredPart(a ...interface{}) { - key := a[0].(string) - // loop through all buckets - for _, storedBucket := range d.storedBuckets { - delete(storedBucket.partMetadata, key) - } - go debug.FreeOSMemory() + return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.APINotImplemented{API: "ListObjectParts"}, nil) } diff --git a/pkg/storage/drivers/donut/donut.go b/pkg/storage/drivers/donut/donut.go index 0b5ff27f6..b23fd925b 100644 --- a/pkg/storage/drivers/donut/donut.go +++ b/pkg/storage/drivers/donut/donut.go @@ -17,55 +17,28 @@ package donut import ( - "bytes" "encoding/base64" "encoding/hex" "io" "os" "path/filepath" - "runtime/debug" "sort" "strconv" "strings" "sync" - "time" "io/ioutil" "github.com/minio/minio/pkg/iodine" "github.com/minio/minio/pkg/storage/donut" "github.com/minio/minio/pkg/storage/drivers" - "github.com/minio/minio/pkg/storage/trove" - "github.com/minio/minio/pkg/utils/log" -) - -type storedBucket struct { - bucketMetadata drivers.BucketMetadata - objectMetadata map[string]drivers.ObjectMetadata - partMetadata map[string]drivers.PartMetadata - multiPartSession map[string]multiPartSession -} - -type multiPartSession struct { - totalParts int - uploadID string - initiated time.Time -} - -const ( - totalBuckets = 100 ) // donutDriver - creates a new single disk drivers driver using donut type donutDriver struct { - donut donut.Donut - paths []string - lock *sync.RWMutex - storedBuckets map[string]storedBucket - objects *trove.Cache - multiPartObjects *trove.Cache - maxSize uint64 - expiration time.Duration + donut donut.Donut + paths []string + lock *sync.RWMutex } // This is a dummy nodeDiskMap which is going to be deprecated soon @@ -101,83 +74,18 @@ func createNodeDiskMap(paths []string) map[string][]string { return nodes } -func initialize(d *donutDriver) error { - // Soon to be user configurable, when Management API is available - // we should remove "default" to something which is passed down - // from configuration paramters - var err error - d.donut, err = donut.NewDonut("default", createNodeDiskMap(d.paths)) - if err != nil { - return iodine.New(err, nil) - } - buckets, err := d.donut.ListBuckets() - if err != nil { - return iodine.New(err, nil) - } - for bucketName, metadata := range buckets { - d.lock.RLock() - storedBucket := d.storedBuckets[bucketName] - d.lock.RUnlock() - if len(storedBucket.multiPartSession) == 0 { - storedBucket.multiPartSession = make(map[string]multiPartSession) - } - if len(storedBucket.objectMetadata) == 0 { - storedBucket.objectMetadata = make(map[string]drivers.ObjectMetadata) - } - if len(storedBucket.partMetadata) == 0 { - storedBucket.partMetadata = make(map[string]drivers.PartMetadata) - } - storedBucket.bucketMetadata = drivers.BucketMetadata{ - Name: metadata.Name, - Created: metadata.Created, - ACL: drivers.BucketACL(metadata.ACL), - } - d.lock.Lock() - d.storedBuckets[bucketName] = storedBucket - d.lock.Unlock() - } - return nil -} - // NewDriver instantiate a donut driver -func NewDriver(paths []string, maxSize uint64, expiration time.Duration) (drivers.Driver, error) { +func NewDriver(paths []string) (drivers.Driver, error) { driver := new(donutDriver) - driver.storedBuckets = make(map[string]storedBucket) - driver.objects = trove.NewCache(maxSize, expiration) - driver.maxSize = maxSize - driver.expiration = expiration - driver.multiPartObjects = trove.NewCache(0, time.Duration(0)) - driver.lock = new(sync.RWMutex) - - driver.objects.OnExpired = driver.expiredObject - driver.multiPartObjects.OnExpired = driver.expiredPart - - // set up memory expiration - driver.objects.ExpireObjects(time.Second * 5) - driver.paths = paths driver.lock = new(sync.RWMutex) - err := initialize(driver) - return driver, err -} - -func (d donutDriver) expiredObject(a ...interface{}) { - cacheStats := d.objects.Stats() - log.Printf("CurrentSize: %d, CurrentItems: %d, TotalExpirations: %d", - cacheStats.Bytes, cacheStats.Items, cacheStats.Expired) - key := a[0].(string) - // loop through all buckets - for bucket, storedBucket := range d.storedBuckets { - delete(storedBucket.objectMetadata, key) - // remove bucket if no objects found anymore - if len(storedBucket.objectMetadata) == 0 { - if time.Since(d.storedBuckets[bucket].bucketMetadata.Created) > d.expiration { - delete(d.storedBuckets, bucket) - } - } - } - go debug.FreeOSMemory() + // Soon to be user configurable, when Management API is available + // we should remove "default" to something which is passed down + // from configuration paramters + var err error + driver.donut, err = donut.NewDonut("default", createNodeDiskMap(driver.paths)) + return driver, iodine.New(err, nil) } // byBucketName is a type for sorting bucket metadata by bucket name @@ -192,8 +100,17 @@ func (d donutDriver) ListBuckets() (results []drivers.BucketMetadata, err error) if d.donut == nil { return nil, iodine.New(drivers.InternalError{}, nil) } - for _, storedBucket := range d.storedBuckets { - results = append(results, storedBucket.bucketMetadata) + buckets, err := d.donut.ListBuckets() + if err != nil { + return nil, iodine.New(err, nil) + } + for _, metadata := range buckets { + result := drivers.BucketMetadata{ + Name: metadata.Name, + Created: metadata.Created, + ACL: drivers.BucketACL(metadata.ACL), + } + results = append(results, result) } sort.Sort(byBucketName(results)) return results, nil @@ -203,9 +120,6 @@ func (d donutDriver) ListBuckets() (results []drivers.BucketMetadata, err error) func (d donutDriver) CreateBucket(bucketName, acl string) error { d.lock.Lock() defer d.lock.Unlock() - if len(d.storedBuckets) == totalBuckets { - return iodine.New(drivers.TooManyBuckets{Bucket: bucketName}, nil) - } if d.donut == nil { return iodine.New(drivers.InternalError{}, nil) } @@ -223,20 +137,6 @@ func (d donutDriver) CreateBucket(bucketName, acl string) error { } return iodine.New(err, nil) } - var newBucket = storedBucket{} - newBucket.objectMetadata = make(map[string]drivers.ObjectMetadata) - newBucket.multiPartSession = make(map[string]multiPartSession) - newBucket.partMetadata = make(map[string]drivers.PartMetadata) - metadata, err := d.donut.GetBucketMetadata(bucketName) - if err != nil { - return iodine.New(err, nil) - } - newBucket.bucketMetadata = drivers.BucketMetadata{ - Name: metadata.Name, - Created: metadata.Created, - ACL: drivers.BucketACL(metadata.ACL), - } - d.storedBuckets[bucketName] = newBucket return nil } return iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil) @@ -252,9 +152,6 @@ func (d donutDriver) GetBucketMetadata(bucketName string) (drivers.BucketMetadat if !drivers.IsValidBucket(bucketName) { return drivers.BucketMetadata{}, drivers.BucketNameInvalid{Bucket: bucketName} } - if d.storedBuckets[bucketName].bucketMetadata.Name != "" { - return d.storedBuckets[bucketName].bucketMetadata, nil - } metadata, err := d.donut.GetBucketMetadata(bucketName) if err != nil { return drivers.BucketMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) @@ -286,9 +183,6 @@ func (d donutDriver) SetBucketMetadata(bucketName, acl string) error { if err != nil { return iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) } - storedBucket := d.storedBuckets[bucketName] - storedBucket.bucketMetadata.ACL = drivers.BucketACL(acl) - d.storedBuckets[bucketName] = storedBucket return nil } @@ -303,41 +197,23 @@ func (d donutDriver) GetObject(w io.Writer, bucketName, objectName string) (int6 if !drivers.IsValidObjectName(objectName) { return 0, iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil) } - if _, ok := d.storedBuckets[bucketName]; ok == false { - return 0, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) - } d.lock.RLock() defer d.lock.RUnlock() - objectKey := bucketName + "/" + objectName - data, ok := d.objects.Get(objectKey) - if !ok { - reader, size, err := d.donut.GetObject(bucketName, objectName) - if err != nil { - switch iodine.ToError(err).(type) { - case donut.BucketNotFound: - return 0, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) - case donut.ObjectNotFound: - return 0, iodine.New(drivers.ObjectNotFound{ - Bucket: bucketName, - Object: objectName, - }, nil) - default: - return 0, iodine.New(drivers.InternalError{}, nil) - } - } - pw := newProxyWriter(w) - n, err := io.CopyN(pw, reader, size) - if err != nil { - return 0, iodine.New(err, nil) + reader, size, err := d.donut.GetObject(bucketName, objectName) + if err != nil { + switch iodine.ToError(err).(type) { + case donut.BucketNotFound: + return 0, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) + case donut.ObjectNotFound: + return 0, iodine.New(drivers.ObjectNotFound{ + Bucket: bucketName, + Object: objectName, + }, nil) + default: + return 0, iodine.New(drivers.InternalError{}, nil) } - // Save in memory for future reads - d.objects.Set(objectKey, pw.writtenBytes) - // free up - pw.writtenBytes = nil - go debug.FreeOSMemory() - return n, nil - } - written, err := io.Copy(w, bytes.NewBuffer(data)) + } + written, err := io.CopyN(w, reader, size) if err != nil { return 0, iodine.New(err, nil) } @@ -369,45 +245,36 @@ func (d donutDriver) GetPartialObject(w io.Writer, bucketName, objectName string Length: length, }, errParams) } - if _, ok := d.storedBuckets[bucketName]; ok == false { - return 0, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) - } - objectKey := bucketName + "/" + objectName - data, ok := d.objects.Get(objectKey) - if !ok { - reader, size, err := d.donut.GetObject(bucketName, objectName) - if err != nil { - switch iodine.ToError(err).(type) { - case donut.BucketNotFound: - return 0, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) - case donut.ObjectNotFound: - return 0, iodine.New(drivers.ObjectNotFound{ - Bucket: bucketName, - Object: objectName, - }, nil) - default: - return 0, iodine.New(drivers.InternalError{}, nil) - } - } - defer reader.Close() - if start > size || (start+length-1) > size { - return 0, iodine.New(drivers.InvalidRange{ - Start: start, - Length: length, - }, errParams) - } - _, err = io.CopyN(ioutil.Discard, reader, start) - if err != nil { - return 0, iodine.New(err, errParams) - } - n, err := io.CopyN(w, reader, length) - if err != nil { - return 0, iodine.New(err, errParams) + reader, size, err := d.donut.GetObject(bucketName, objectName) + if err != nil { + switch iodine.ToError(err).(type) { + case donut.BucketNotFound: + return 0, iodine.New(drivers.BucketNotFound{Bucket: bucketName}, nil) + case donut.ObjectNotFound: + return 0, iodine.New(drivers.ObjectNotFound{ + Bucket: bucketName, + Object: objectName, + }, nil) + default: + return 0, iodine.New(drivers.InternalError{}, nil) } - return n, nil } - written, err := io.CopyN(w, bytes.NewBuffer(data[start:]), length) - return written, iodine.New(err, nil) + defer reader.Close() + if start > size || (start+length-1) > size { + return 0, iodine.New(drivers.InvalidRange{ + Start: start, + Length: length, + }, errParams) + } + _, err = io.CopyN(ioutil.Discard, reader, start) + if err != nil { + return 0, iodine.New(err, errParams) + } + n, err := io.CopyN(w, reader, length) + if err != nil { + return 0, iodine.New(err, errParams) + } + return n, nil } // GetObjectMetadata retrieves an object's metadata @@ -428,13 +295,6 @@ func (d donutDriver) GetObjectMetadata(bucketName, objectName string) (drivers.O if !drivers.IsValidObjectName(objectName) { return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNameInvalid{Object: objectName}, errParams) } - if _, ok := d.storedBuckets[bucketName]; ok { - storedBucket := d.storedBuckets[bucketName] - objectKey := bucketName + "/" + objectName - if object, ok := storedBucket.objectMetadata[objectKey]; ok { - return object, nil - } - } metadata, err := d.donut.GetObjectMetadata(bucketName, objectName) if err != nil { return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNotFound{ @@ -498,24 +358,6 @@ func (d donutDriver) ListObjects(bucketName string, resources drivers.BucketReso return results, resources, nil } -type proxyWriter struct { - writer io.Writer - writtenBytes []byte -} - -func (r *proxyWriter) Write(p []byte) (n int, err error) { - n, err = r.writer.Write(p) - if err != nil { - return - } - r.writtenBytes = append(r.writtenBytes, p[0:n]...) - return -} - -func newProxyWriter(w io.Writer) *proxyWriter { - return &proxyWriter{writer: w, writtenBytes: nil} -} - // CreateObject creates a new object func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedMD5Sum string, size int64, reader io.Reader) (string, error) { d.lock.Lock() @@ -528,27 +370,12 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM if d.donut == nil { return "", iodine.New(drivers.InternalError{}, errParams) } - // TODO - Should be able to write bigger than cache - if size > int64(d.maxSize) { - generic := drivers.GenericObjectError{Bucket: bucketName, Object: objectName} - return "", iodine.New(drivers.EntityTooLarge{ - GenericObjectError: generic, - Size: strconv.FormatInt(size, 10), - MaxSize: strconv.FormatUint(d.maxSize, 10), - }, nil) - } if !drivers.IsValidBucket(bucketName) { return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil) } if !drivers.IsValidObjectName(objectName) { return "", iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil) } - storedBucket := d.storedBuckets[bucketName] - // get object key - objectKey := bucketName + "/" + objectName - if _, ok := storedBucket.objectMetadata[objectKey]; ok == true { - return "", iodine.New(drivers.ObjectExists{Bucket: bucketName, Object: objectName}, nil) - } if strings.TrimSpace(contentType) == "" { contentType = "application/octet-stream" } @@ -579,7 +406,5 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM Md5: objMetadata.MD5Sum, Size: objMetadata.Size, } - storedBucket.objectMetadata[objectKey] = newObject - d.storedBuckets[bucketName] = storedBucket return newObject.Md5, nil } diff --git a/pkg/storage/drivers/donut/donut_test.go b/pkg/storage/drivers/donut/donut_test.go index c0d5f50b2..1d823e6f0 100644 --- a/pkg/storage/drivers/donut/donut_test.go +++ b/pkg/storage/drivers/donut/donut_test.go @@ -20,7 +20,6 @@ import ( "io/ioutil" "os" "testing" - "time" . "github.com/minio/check" "github.com/minio/minio/pkg/storage/drivers" @@ -40,7 +39,7 @@ func (s *MySuite) TestAPISuite(c *C) { c.Check(err, IsNil) storageList = append(storageList, p) paths = append(paths, p) - store, err := NewDriver(paths, 1000000, 3*time.Hour) + store, err := NewDriver(paths) c.Check(err, IsNil) return store }