From 3c4012f1e71f1cf89c4839bfd926be5d04ee2578 Mon Sep 17 00:00:00 2001 From: "Frederick F. Kautz IV" Date: Mon, 23 Mar 2015 20:06:15 -0700 Subject: [PATCH] Adding donut backend and setting as default --- main.go | 6 +- pkg/server/server.go | 14 +- pkg/storage/donut/donut.go | 5 +- pkg/storage/donut/donutdriver.go | 27 ++- pkg/storage/donut/donutdriver_test.go | 6 + pkg/storage/donut/donutwriter.go | 2 +- pkg/storage/donut/erasure.go | 4 +- pkg/storage/donut/local.go | 5 + pkg/storage/donutstorage/donutstorage.go | 209 ++++++++++++++++-- pkg/storage/donutstorage/donutstorage_test.go | 4 +- pkg/storage/storage_api_suite.go | 6 +- 11 files changed, 254 insertions(+), 34 deletions(-) diff --git a/main.go b/main.go index 0ea8e2c3e..9d66e311d 100644 --- a/main.go +++ b/main.go @@ -30,6 +30,8 @@ func getStorageType(input string) server.StorageType { return server.File case input == "memory": return server.Memory + case input == "donut": + return server.Donut default: { log.Println("Unknown storage type:", input) @@ -109,8 +111,8 @@ func main() { }, cli.StringFlag{ Name: "storage-type,s", - Value: "file", - Usage: "valid entries: file,inmemory", + Value: "donut", + Usage: "valid entries: file,inmemory,donut", }, } app.Action = runCmd diff --git a/pkg/server/server.go b/pkg/server/server.go index 5e4ad555c..ec2f0903f 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -26,6 +26,7 @@ import ( "github.com/minio-io/minio/pkg/api/web" "github.com/minio-io/minio/pkg/server/httpserver" mstorage "github.com/minio-io/minio/pkg/storage" + "github.com/minio-io/minio/pkg/storage/donutstorage" "github.com/minio-io/minio/pkg/storage/file" "github.com/minio-io/minio/pkg/storage/memory" ) @@ -138,11 +139,22 @@ func getStorageChannels(storageType StorageType) (ctrlChans []chan<- string, sta if err != nil { return nil, nil, nil } - root := path.Join(u.HomeDir, "minio-storage") + root := path.Join(u.HomeDir, "minio-storage", "file") ctrlChan, statusChan, storage = file.Start(root) ctrlChans = append(ctrlChans, ctrlChan) statusChans = append(statusChans, statusChan) } + case storageType == Donut: + { + u, err := user.Current() + if err != nil { + return nil, nil, nil + } + root := path.Join(u.HomeDir, "minio-storage", "donut") + ctrlChan, statusChan, storage = donutstorage.Start(root) + ctrlChans = append(ctrlChans, ctrlChan) + statusChans = append(statusChans, statusChan) + } default: // should never happen log.Fatal("No storage driver found") } diff --git a/pkg/storage/donut/donut.go b/pkg/storage/donut/donut.go index d6d6bf720..0514f91af 100644 --- a/pkg/storage/donut/donut.go +++ b/pkg/storage/donut/donut.go @@ -1,6 +1,8 @@ package donut -import "io" +import ( + "io" +) // INTERFACES @@ -21,6 +23,7 @@ type Bucket interface { // Node interface type Node interface { + CreateBucket(bucket string) error GetBuckets() ([]string, error) GetDonutMetadata(bucket, object string) (map[string]string, error) GetMetadata(bucket, object string) (map[string]string, error) diff --git a/pkg/storage/donut/donutdriver.go b/pkg/storage/donut/donutdriver.go index c68ea2f08..f06555a75 100644 --- a/pkg/storage/donut/donutdriver.go +++ b/pkg/storage/donut/donutdriver.go @@ -35,6 +35,9 @@ func (driver donutDriver) CreateBucket(bucketName string) error { nodes := make([]string, 16) for i := 0; i < 16; i++ { nodes[i] = "localhost" + if node, ok := driver.nodes["localhost"]; ok { + node.CreateBucket(bucketName + ":0:" + strconv.Itoa(i)) + } } bucket := bucketDriver{ nodes: nodes, @@ -63,7 +66,15 @@ func (driver donutDriver) GetObjectWriter(bucketName, objectName string) (Object } for i, nodeID := range nodes { if node, ok := driver.nodes[nodeID]; ok == true { - writer, _ := node.GetWriter(bucketName+":0:"+strconv.Itoa(i), objectName) + writer, err := node.GetWriter(bucketName+":0:"+strconv.Itoa(i), objectName) + if err != nil { + for _, writerToClose := range writers { + if writerToClose != nil { + writerToClose.CloseWithError(err) + } + } + return nil, err + } writers[i] = writer } } @@ -111,7 +122,19 @@ func (driver donutDriver) GetObjectMetadata(bucketName, object string) (map[stri return nil, err } if node, ok := driver.nodes[nodes[0]]; ok { - return node.GetMetadata(bucketName+":0:0", object) + bucketID := bucketName + ":0:0" + metadata, err := node.GetMetadata(bucketID, object) + if err != nil { + return nil, err + } + donutMetadata, err := node.GetDonutMetadata(bucketID, object) + if err != nil { + return nil, err + } + metadata["sys.created"] = donutMetadata["created"] + metadata["sys.md5"] = donutMetadata["md5"] + metadata["sys.size"] = donutMetadata["size"] + return metadata, nil } return nil, errors.New("Cannot connect to node: " + nodes[0]) } diff --git a/pkg/storage/donut/donutdriver_test.go b/pkg/storage/donut/donutdriver_test.go index d4bd93e99..19cde193e 100644 --- a/pkg/storage/donut/donutdriver_test.go +++ b/pkg/storage/donut/donutdriver_test.go @@ -8,6 +8,7 @@ import ( "io" "io/ioutil" "os" + "time" ) func Test(t *testing.T) { TestingT(t) } @@ -162,6 +163,11 @@ func (s *MySuite) TestNewObjectCanBeWritten(c *C) { actualMetadata, err := donut.GetObjectMetadata("foo", "obj") c.Assert(err, IsNil) + expectedMetadata["sys.md5"] = "b10a8db164e0754105b7a99be72e3fe5" + expectedMetadata["sys.size"] = "11" + _, err = time.Parse(time.RFC3339Nano, actualMetadata["sys.created"]) + c.Assert(err, IsNil) + expectedMetadata["sys.created"] = actualMetadata["sys.created"] c.Assert(actualMetadata, DeepEquals, expectedMetadata) } diff --git a/pkg/storage/donut/donutwriter.go b/pkg/storage/donut/donutwriter.go index 3d837a2fe..ebf24befe 100644 --- a/pkg/storage/donut/donutwriter.go +++ b/pkg/storage/donut/donutwriter.go @@ -8,7 +8,7 @@ import ( ) func newDonutFileWriter(objectDir string) (Writer, error) { - dataFile, err := os.OpenFile(path.Join(objectDir, "data"), os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_EXCL, 0600) + dataFile, err := os.OpenFile(path.Join(objectDir, "data"), os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600) if err != nil { return nil, err } diff --git a/pkg/storage/donut/erasure.go b/pkg/storage/donut/erasure.go index 3574b1906..ef89ed77b 100644 --- a/pkg/storage/donut/erasure.go +++ b/pkg/storage/donut/erasure.go @@ -17,7 +17,7 @@ import ( func erasureReader(readers []io.ReadCloser, donutMetadata map[string]string, writer *io.PipeWriter) { // TODO handle errors totalChunks, _ := strconv.Atoi(donutMetadata["chunkCount"]) - totalLeft, _ := strconv.Atoi(donutMetadata["totalLength"]) + totalLeft, _ := strconv.Atoi(donutMetadata["size"]) blockSize, _ := strconv.Atoi(donutMetadata["blockSize"]) k, _ := strconv.Atoi(donutMetadata["erasureK"]) m, _ := strconv.Atoi(donutMetadata["erasureM"]) @@ -110,7 +110,7 @@ func erasureGoroutine(r *io.PipeReader, eWriter erasureWriter, isClosed chan<- b metadata["erasureM"] = "8" metadata["erasureTechnique"] = "Cauchy" metadata["md5"] = hex.EncodeToString(dataMd5sum) - metadata["totalLength"] = strconv.Itoa(totalLength) + metadata["size"] = strconv.Itoa(totalLength) for _, nodeWriter := range eWriter.writers { if nodeWriter != nil { nodeWriter.SetMetadata(eWriter.metadata) diff --git a/pkg/storage/donut/local.go b/pkg/storage/donut/local.go index 9462ac62c..c1fcff915 100644 --- a/pkg/storage/donut/local.go +++ b/pkg/storage/donut/local.go @@ -16,6 +16,11 @@ type localDirectoryNode struct { root string } +func (d localDirectoryNode) CreateBucket(bucket string) error { + objectPath := path.Join(d.root, bucket) + return os.MkdirAll(objectPath, 0700) +} + func (d localDirectoryNode) GetBuckets() ([]string, error) { return nil, errors.New("Not Implemented") } diff --git a/pkg/storage/donutstorage/donutstorage.go b/pkg/storage/donutstorage/donutstorage.go index fcc9a0906..19e345c05 100644 --- a/pkg/storage/donutstorage/donutstorage.go +++ b/pkg/storage/donutstorage/donutstorage.go @@ -19,75 +19,242 @@ package donutstorage import ( "errors" "github.com/minio-io/minio/pkg/storage" + "github.com/minio-io/minio/pkg/storage/donut" "io" + "sort" + "strconv" + "strings" + "time" ) -// DonutDriver creates a new single disk storage driver using donut without encoding. -type DonutDriver struct{} +// Storage creates a new single disk storage driver using donut without encoding. +type Storage struct { + donut donut.Donut +} const ( blockSize = 10 * 1024 * 1024 ) // Start a single disk subsystem -func Start() (chan<- string, <-chan error, storage.Storage) { +func Start(path string) (chan<- string, <-chan error, storage.Storage) { + ctrlChannel := make(chan string) errorChannel := make(chan error) - s := new(DonutDriver) + s := new(Storage) + + // TODO donut driver should be passed in as Start param and driven by config + s.donut = donut.NewDonutDriver(path) + go start(ctrlChannel, errorChannel, s) return ctrlChannel, errorChannel, s } -func start(ctrlChannel <-chan string, errorChannel chan<- error, s *DonutDriver) { +func start(ctrlChannel <-chan string, errorChannel chan<- error, s *Storage) { close(errorChannel) } // ListBuckets returns a list of buckets -func (donutStorage DonutDriver) ListBuckets() (results []storage.BucketMetadata, err error) { - return nil, errors.New("Not Implemented") +func (donutStorage Storage) ListBuckets() (results []storage.BucketMetadata, err error) { + buckets, err := donutStorage.donut.ListBuckets() + if err != nil { + return nil, err + } + for _, bucket := range buckets { + if err != nil { + return nil, err + } + result := storage.BucketMetadata{ + Name: bucket, + // TODO Add real created date + Created: time.Now(), + } + results = append(results, result) + } + return results, nil } // CreateBucket creates a new bucket -func (donutStorage DonutDriver) CreateBucket(bucket string) error { - return errors.New("Not Implemented") +func (donutStorage Storage) CreateBucket(bucket string) error { + return donutStorage.donut.CreateBucket(bucket) } // GetBucketMetadata retrieves an bucket's metadata -func (donutStorage DonutDriver) GetBucketMetadata(bucket string) (storage.BucketMetadata, error) { +func (donutStorage Storage) GetBucketMetadata(bucket string) (storage.BucketMetadata, error) { return storage.BucketMetadata{}, errors.New("Not Implemented") } // CreateBucketPolicy sets a bucket's access policy -func (donutStorage DonutDriver) CreateBucketPolicy(bucket string, p storage.BucketPolicy) error { +func (donutStorage Storage) CreateBucketPolicy(bucket string, p storage.BucketPolicy) error { return errors.New("Not Implemented") } // GetBucketPolicy returns a bucket's access policy -func (donutStorage DonutDriver) GetBucketPolicy(bucket string) (storage.BucketPolicy, error) { +func (donutStorage Storage) GetBucketPolicy(bucket string) (storage.BucketPolicy, error) { return storage.BucketPolicy{}, errors.New("Not Implemented") } // GetObject retrieves an object and writes it to a writer -func (donutStorage DonutDriver) GetObject(target io.Writer, bucket, key string) (int64, error) { - return 0, errors.New("Not Implemented") +func (donutStorage Storage) GetObject(target io.Writer, bucket, key string) (int64, error) { + reader, err := donutStorage.donut.GetObject(bucket, key) + if err != nil { + return 0, storage.ObjectNotFound{ + Bucket: bucket, + Object: key, + } + } + return io.Copy(target, reader) } // GetPartialObject retrieves an object and writes it to a writer -func (donutStorage DonutDriver) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) { +func (donutStorage Storage) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) { return 0, errors.New("Not Implemented") } // GetObjectMetadata retrieves an object's metadata -func (donutStorage DonutDriver) GetObjectMetadata(bucket, key string, prefix string) (storage.ObjectMetadata, error) { - return storage.ObjectMetadata{}, errors.New("Not Implemented") +func (donutStorage Storage) GetObjectMetadata(bucket, key string, prefix string) (storage.ObjectMetadata, error) { + metadata, err := donutStorage.donut.GetObjectMetadata(bucket, key) + created, err := time.Parse(time.RFC3339Nano, metadata["sys.created"]) + if err != nil { + return storage.ObjectMetadata{}, err + } + size, err := strconv.ParseInt(metadata["sys.size"], 10, 64) + if err != nil { + return storage.ObjectMetadata{}, err + } + objectMetadata := storage.ObjectMetadata{ + Bucket: bucket, + Key: key, + + ContentType: metadata["contentType"], + Created: created, + Md5: metadata["sys.md5"], + Size: size, + } + return objectMetadata, nil } // ListObjects lists objects -func (donutStorage DonutDriver) ListObjects(bucket string, resources storage.BucketResourcesMetadata) ([]storage.ObjectMetadata, storage.BucketResourcesMetadata, error) { - return nil, storage.BucketResourcesMetadata{}, errors.New("Not Implemented") +func (donutStorage Storage) ListObjects(bucket string, resources storage.BucketResourcesMetadata) ([]storage.ObjectMetadata, storage.BucketResourcesMetadata, error) { + // TODO Fix IsPrefixSet && IsDelimiterSet and use them + objects, err := donutStorage.donut.ListObjects(bucket) + if err != nil { + return nil, storage.BucketResourcesMetadata{}, err + } + sort.Strings(objects) + if resources.Prefix != "" { + objects = filterPrefix(objects, resources.Prefix) + objects = removePrefix(objects, resources.Prefix) + } + if resources.Maxkeys <= 0 || resources.Maxkeys > 1000 { + resources.Maxkeys = 1000 + } + + var actualObjects []string + var commonPrefixes []string + if strings.TrimSpace(resources.Delimiter) != "" { + actualObjects = filterDelimited(objects, resources.Delimiter) + commonPrefixes = filterNotDelimited(objects, resources.Delimiter) + commonPrefixes = extractDir(commonPrefixes, resources.Delimiter) + commonPrefixes = uniqueObjects(commonPrefixes) + resources.CommonPrefixes = commonPrefixes + } else { + actualObjects = objects + } + + var results []storage.ObjectMetadata + for _, object := range actualObjects { + if len(results) >= resources.Maxkeys { + resources.IsTruncated = true + break + } + metadata, err := donutStorage.GetObjectMetadata(bucket, resources.Prefix+object, "") + if err != nil { + return nil, storage.BucketResourcesMetadata{}, err + } + results = append(results, metadata) + } + return results, resources, nil +} + +func filterPrefix(objects []string, prefix string) []string { + var results []string + for _, object := range objects { + if strings.HasPrefix(object, prefix) { + results = append(results, object) + } + } + return results +} + +func removePrefix(objects []string, prefix string) []string { + var results []string + for _, object := range objects { + results = append(results, strings.TrimPrefix(object, prefix)) + } + return results +} + +func filterDelimited(objects []string, delim string) []string { + var results []string + for _, object := range objects { + if !strings.Contains(object, delim) { + results = append(results, object) + } + } + return results +} +func filterNotDelimited(objects []string, delim string) []string { + var results []string + for _, object := range objects { + if strings.Contains(object, delim) { + results = append(results, object) + } + } + return results +} + +func extractDir(objects []string, delim string) []string { + var results []string + for _, object := range objects { + parts := strings.Split(object, delim) + results = append(results, parts[0]+"/") + } + return results +} + +func uniqueObjects(objects []string) []string { + objectMap := make(map[string]string) + for _, v := range objects { + objectMap[v] = v + } + var results []string + for k := range objectMap { + results = append(results, k) + } + sort.Strings(results) + return results } // CreateObject creates a new object -func (donutStorage DonutDriver) CreateObject(bucketKey, objectKey, contentType, md5sum string, reader io.Reader) error { - return errors.New("Not Implemented") +func (donutStorage Storage) CreateObject(bucketKey, objectKey, contentType, expectedMd5sum string, reader io.Reader) error { + writer, err := donutStorage.donut.GetObjectWriter(bucketKey, objectKey) + if err != nil { + return err + } + if _, err := io.Copy(writer, reader); err != nil { + return err + } + if contentType == "" { + contentType = "application/octet-stream" + } + contentType = strings.TrimSpace(contentType) + metadata := make(map[string]string) + metadata["bucket"] = bucketKey + metadata["object"] = objectKey + metadata["contentType"] = contentType + if err = writer.SetMetadata(metadata); err != nil { + return err + } + return writer.Close() } diff --git a/pkg/storage/donutstorage/donutstorage_test.go b/pkg/storage/donutstorage/donutstorage_test.go index 9476bf52a..d9070f69a 100644 --- a/pkg/storage/donutstorage/donutstorage_test.go +++ b/pkg/storage/donutstorage/donutstorage_test.go @@ -33,13 +33,13 @@ type MySuite struct{} var _ = Suite(&MySuite{}) func (s *MySuite) TestAPISuite(c *C) { - c.Skip("Not Implemented") + // c.Skip("Not Implemented") var storageList []string create := func() mstorage.Storage { path, err := ioutil.TempDir(os.TempDir(), "minio-fs-") c.Check(err, IsNil) storageList = append(storageList, path) - _, _, store := Start() // TODO Make InMemory driver + _, _, store := Start(path) // TODO Make InMemory driver return store } mstorage.APITestSuite(c, create) diff --git a/pkg/storage/storage_api_suite.go b/pkg/storage/storage_api_suite.go index 157043ae3..72a49c24f 100644 --- a/pkg/storage/storage_api_suite.go +++ b/pkg/storage/storage_api_suite.go @@ -22,6 +22,7 @@ import ( "strconv" "gopkg.in/check.v1" + "time" ) // APITestSuite - collection of API tests @@ -136,7 +137,7 @@ func testPaging(c *check.C, create func() Storage) { // check delimited results with delimiter and prefix { storage.CreateObject("bucket", "this/is/delimited", "", "", bytes.NewBufferString("prefix1")) - storage.CreateObject("bucket", "this/is/also/delimited", "", "", bytes.NewBufferString("prefix2")) + storage.CreateObject("bucket", "this/is/also/a/delimited/file", "", "", bytes.NewBufferString("prefix2")) var prefixes []string resources.CommonPrefixes = prefixes // allocate new everytime resources.Delimiter = "/" @@ -147,6 +148,7 @@ func testPaging(c *check.C, create func() Storage) { c.Assert(len(objects), check.Equals, 1) c.Assert(resources.CommonPrefixes[0], check.Equals, "also/") } + time.Sleep(time.Second) // check delimited results with delimiter without prefix { @@ -232,8 +234,8 @@ func testListBuckets(c *check.C, create func() Storage) { // test empty list buckets, err := storage.ListBuckets() - c.Assert(len(buckets), check.Equals, 0) c.Assert(err, check.IsNil) + c.Assert(len(buckets), check.Equals, 0) // add one and test exists err = storage.CreateBucket("bucket1")