diff --git a/pkg/storage/donut/bucketdriver.go b/pkg/storage/donut/bucketdriver.go new file mode 100644 index 000000000..938932e2b --- /dev/null +++ b/pkg/storage/donut/bucketdriver.go @@ -0,0 +1,14 @@ +package donut + +type bucketDriver struct { + nodes []string + objects map[string][]byte +} + +func (self bucketDriver) GetNodes() ([]string, error) { + var nodes []string + for _, node := range self.nodes { + nodes = append(nodes, node) + } + return nodes, nil +} diff --git a/pkg/storage/donut/donut.go b/pkg/storage/donut/donut.go index 3ea14d0d2..65cbc3023 100644 --- a/pkg/storage/donut/donut.go +++ b/pkg/storage/donut/donut.go @@ -7,39 +7,39 @@ import "io" // Donut interface type Donut interface { CreateBucket(bucket string) error - GetBuckets() ([]string, error) - GetObject(bucket, object string) (io.ReadCloser, error) - GetObjectMetadata(bucket, object string) (map[string]string, error) + ListBuckets() ([]string, error) GetObjectWriter(bucket, object string) (ObjectWriter, error) + GetObject(bucket, object string) (io.ReadCloser, error) + // GetObjectMetadata(bucket, object string) (map[string]string, error) + // GetObjectWriter(bucket, object string) (ObjectWriter, error) + // ListObjects(bucket string) ([]string, error) } -// Bucket is an interface for managing buckets +// Bucket interface type Bucket interface { - GetObject(object string) (io.Reader, error) - GetObjectMetadata(object string) (map[string]string, error) - GetObjectWriter(object string) (ObjectWriter, error) - GetObjects() ([]string, error) + GetNodes() ([]string, error) } -// Disk is an interface for managing disks -type Disk interface { - GetBuckets(object string) ([]string, error) +type Node interface { + GetBuckets() ([]string, error) + GetWriter(bucket, object string) (DonutWriter, error) + GetReader(bucket, object string) (io.ReadCloser, error) + GetMetadata(bucket, object string) (map[string]string, error) + GetDonutMetadata(bucket, object string) (map[string]string, error) } -// ObjectWriter is an interface for writing new objects +// ObjectWriter interface type ObjectWriter interface { - Write([]byte) error + Write([]byte) (int, error) Close() error CloseWithError(error) error - - SetMetadata(map[string]string) - GetMetadata() map[string]string + SetMetadata(map[string]string) error + GetMetadata() (map[string]string, error) } -// InternalObjectWriter is an interface for use internally to donut -type InternalObjectWriter interface { +type DonutWriter interface { ObjectWriter - SetDonutMetadata(map[string]string) - GetDonutMetadata() map[string]string + GetDonutMetadata() (map[string]string, error) + SetDonutMetadata(map[string]string) error } diff --git a/pkg/storage/donut/donutdriver.go b/pkg/storage/donut/donutdriver.go index 98bb3fd42..25da29ade 100644 --- a/pkg/storage/donut/donutdriver.go +++ b/pkg/storage/donut/donutdriver.go @@ -1,48 +1,374 @@ package donut import ( + "bytes" + "encoding/gob" + "encoding/json" "errors" + "github.com/minio-io/minio/pkg/encoding/erasure" + "github.com/minio-io/minio/pkg/utils/split" "io" + "io/ioutil" + "log" + "os" + "path" + "sort" + "strconv" + "strings" + "time" ) type donutDriver struct { buckets map[string]Bucket - disks map[string]Disk + nodes map[string]Node } -// NewDonutDriver instantiates a donut driver for use in object storage -func NewDonutDriver() Donut { - return donutDriver{ +func NewDonutDriver(root string) Donut { + nodes := make(map[string]Node) + nodes["localhost"] = localDirectoryNode{root: root} + driver := donutDriver{ buckets: make(map[string]Bucket), - disks: make(map[string]Disk), + nodes: nodes, } + return driver } -func notImplemented() error { - return errors.New("Not Implemented") +func (driver donutDriver) CreateBucket(bucketName string) error { + if _, ok := driver.buckets[bucketName]; ok == false { + bucketName = strings.TrimSpace(bucketName) + if bucketName == "" { + return errors.New("Cannot create bucket with no name") + } + // assign nodes + // TODO assign other nodes + nodes := make([]string, 16) + for i := 0; i < 16; i++ { + nodes[i] = "localhost" + } + bucket := bucketDriver{ + nodes: nodes, + } + driver.buckets[bucketName] = bucket + return nil + } + return errors.New("Bucket exists") +} + +func (driver donutDriver) ListBuckets() ([]string, error) { + buckets := make([]string, 0) + for bucket, _ := range driver.buckets { + buckets = append(buckets, bucket) + } + sort.Strings(buckets) + return buckets, nil +} + +func (driver donutDriver) GetObjectWriter(bucketName, objectName string) (ObjectWriter, error) { + if bucket, ok := driver.buckets[bucketName]; ok == true { + writers := make([]DonutWriter, 16) + nodes, err := bucket.GetNodes() + if err != nil { + return nil, err + } + for i, nodeId := range nodes { + if node, ok := driver.nodes[nodeId]; ok == true { + writer, _ := node.GetWriter(bucketName+":0:"+strconv.Itoa(i), objectName) + writers[i] = writer + } + } + return newErasureWriter(writers), nil + } + return nil, errors.New("Bucket not found") +} + +func (driver donutDriver) GetObject(bucketName, objectName string) (io.ReadCloser, error) { + r, w := io.Pipe() + if bucket, ok := driver.buckets[bucketName]; ok == true { + readers := make([]io.ReadCloser, 16) + nodes, err := bucket.GetNodes() + if err != nil { + return nil, err + } + var metadata map[string]string + for i, nodeId := range nodes { + if node, ok := driver.nodes[nodeId]; ok == true { + bucketId := bucketName + ":0:" + strconv.Itoa(i) + reader, err := node.GetReader(bucketId, objectName) + if err != nil { + return nil, err + } + readers[i] = reader + if metadata == nil { + metadata, err = node.GetDonutMetadata(bucketId, objectName) + if err != nil { + return nil, err + } + } + } + } + go erasureReader(readers, metadata, w) + return r, nil + } + return nil, errors.New("Bucket not found") +} + +func erasureReader(readers []io.ReadCloser, donutMetadata map[string]string, writer *io.PipeWriter) { + totalChunks, _ := strconv.Atoi(donutMetadata["chunkCount"]) + totalLeft, _ := strconv.Atoi(donutMetadata["totalLength"]) + blockSize, _ := strconv.Atoi(donutMetadata["blockSize"]) + params, _ := erasure.ParseEncoderParams(8, 8, erasure.Cauchy) + encoder := erasure.NewEncoder(params) + for _, reader := range readers { + defer reader.Close() + } + for i := 0; i < totalChunks; i++ { + encodedBytes := make([][]byte, 16) + for i, reader := range readers { + var bytesArray []byte + decoder := gob.NewDecoder(reader) + err := decoder.Decode(&bytesArray) + if err != nil { + log.Println(err) + } + encodedBytes[i] = bytesArray + } + curBlockSize := totalLeft + if blockSize < totalLeft { + curBlockSize = blockSize + } + log.Println("decoding block size", curBlockSize) + decodedData, err := encoder.Decode(encodedBytes, curBlockSize) + if err != nil { + writer.CloseWithError(err) + return + } + io.Copy(writer, bytes.NewBuffer(decodedData)) + totalLeft = totalLeft - blockSize + } + writer.Close() +} + +// erasure writer + +type erasureWriter struct { + writers []DonutWriter + metadata map[string]string + donutMetadata map[string]string // not exposed + erasureWriter *io.PipeWriter + isClosed <-chan bool +} + +func newErasureWriter(writers []DonutWriter) ObjectWriter { + r, w := io.Pipe() + isClosed := make(chan bool) + writer := erasureWriter{ + writers: writers, + metadata: make(map[string]string), + erasureWriter: w, + isClosed: isClosed, + } + go erasureGoroutine(r, writer, isClosed) + return writer +} + +func erasureGoroutine(r *io.PipeReader, eWriter erasureWriter, isClosed chan<- bool) { + chunks := split.Stream(r, 10*1024*1024) + params, _ := erasure.ParseEncoderParams(8, 8, erasure.Cauchy) + encoder := erasure.NewEncoder(params) + chunkCount := 0 + totalLength := 0 + for chunk := range chunks { + if chunk.Err == nil { + totalLength = totalLength + len(chunk.Data) + encodedBlocks, _ := encoder.Encode(chunk.Data) + for blockIndex, block := range encodedBlocks { + var byteBuffer bytes.Buffer + gobEncoder := gob.NewEncoder(&byteBuffer) + gobEncoder.Encode(block) + io.Copy(eWriter.writers[blockIndex], &byteBuffer) + } + } + chunkCount = chunkCount + 1 + } + metadata := make(map[string]string) + metadata["blockSize"] = strconv.Itoa(10 * 1024 * 1024) + metadata["chunkCount"] = strconv.Itoa(chunkCount) + metadata["created"] = time.Now().Format(time.RFC3339Nano) + metadata["erasureK"] = "8" + metadata["erasureM"] = "8" + metadata["erasureTechnique"] = "Cauchy" + metadata["totalLength"] = strconv.Itoa(totalLength) + for _, nodeWriter := range eWriter.writers { + if nodeWriter != nil { + nodeWriter.SetMetadata(eWriter.metadata) + nodeWriter.SetDonutMetadata(metadata) + nodeWriter.Close() + } + } + isClosed <- true } -// CreateBucket creates a bucket -func (driver donutDriver) CreateBucket(bucket string) error { - return notImplemented() +func (self erasureWriter) Write(data []byte) (int, error) { + io.Copy(self.erasureWriter, bytes.NewBuffer(data)) + return len(data), nil } -// GetBuckets returns a list of buckets -func (driver donutDriver) GetBuckets() ([]string, error) { - return nil, notImplemented() +func (self erasureWriter) Close() error { + self.erasureWriter.Close() + <-self.isClosed + return nil } -// GetObject returns an object -func (driver donutDriver) GetObject(bucket, object string) (io.ReadCloser, error) { - return nil, notImplemented() +func (self erasureWriter) CloseWithError(err error) error { + for _, writer := range self.writers { + if writer != nil { + writer.CloseWithError(err) + } + } + return nil } -// GetObjectMetadata returns object metadata -func (driver donutDriver) GetObjectMetadata(bucket, object string) (map[string]string, error) { - return nil, notImplemented() +func (self erasureWriter) SetMetadata(metadata map[string]string) error { + for k, _ := range self.metadata { + delete(self.metadata, k) + } + for k, v := range metadata { + self.metadata[k] = v + } + return nil } -// GetObjectWriter returns a writer for creating a new object. -func (driver donutDriver) GetObjectWriter(bucket, object string) (ObjectWriter, error) { - return nil, notImplemented() +func (self erasureWriter) GetMetadata() (map[string]string, error) { + metadata := make(map[string]string) + for k, v := range self.metadata { + metadata[k] = v + } + return metadata, nil +} + +type localDirectoryNode struct { + root string +} + +func (self localDirectoryNode) GetBuckets() ([]string, error) { + return nil, errors.New("Not Implemented") +} + +func (self localDirectoryNode) GetWriter(bucket, object string) (DonutWriter, error) { + objectPath := path.Join(self.root, bucket, object) + err := os.MkdirAll(objectPath, 0700) + if err != nil { + return nil, err + } + return newDonutFileWriter(objectPath) +} + +func (self localDirectoryNode) GetReader(bucket, object string) (io.ReadCloser, error) { + return os.Open(path.Join(self.root, bucket, object, "data")) +} + +func (self localDirectoryNode) GetMetadata(bucket, object string) (map[string]string, error) { + return self.getMetadata(bucket, object, "metadata.json") +} +func (self localDirectoryNode) GetDonutMetadata(bucket, object string) (map[string]string, error) { + return self.getMetadata(bucket, object, "donutMetadata.json") +} + +func (self localDirectoryNode) getMetadata(bucket, object, fileName string) (map[string]string, error) { + file, err := os.Open(path.Join(self.root, bucket, object, fileName)) + defer file.Close() + if err != nil { + return nil, err + } + metadata := make(map[string]string) + decoder := json.NewDecoder(file) + if err := decoder.Decode(&metadata); err != nil { + return nil, err + } + return metadata, nil + +} + +func newDonutFileWriter(objectDir string) (DonutWriter, error) { + dataFile, err := os.OpenFile(path.Join(objectDir, "data"), os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + return nil, err + } + return donutFileWriter{ + root: objectDir, + file: dataFile, + metadata: make(map[string]string), + donutMetadata: make(map[string]string), + }, nil +} + +type donutFileWriter struct { + root string + file *os.File + metadata map[string]string + donutMetadata map[string]string + err error +} + +func (self donutFileWriter) Write(data []byte) (int, error) { + return self.file.Write(data) +} + +func (self donutFileWriter) Close() error { + if self.err != nil { + return self.err + } + + self.file.Close() + + metadata, _ := json.Marshal(self.metadata) + ioutil.WriteFile(path.Join(self.root, "metadata.json"), metadata, 0600) + donutMetadata, _ := json.Marshal(self.donutMetadata) + ioutil.WriteFile(path.Join(self.root, "donutMetadata.json"), donutMetadata, 0600) + + return nil +} + +func (self donutFileWriter) CloseWithError(err error) error { + if self.err != nil { + self.err = err + } + self.file.Close() + return nil +} + +func (self donutFileWriter) SetMetadata(metadata map[string]string) error { + for k := range self.metadata { + delete(self.metadata, k) + } + for k, v := range metadata { + self.metadata[k] = v + } + return nil +} + +func (self donutFileWriter) GetMetadata() (map[string]string, error) { + metadata := make(map[string]string) + for k, v := range self.metadata { + metadata[k] = v + } + return metadata, nil +} + +func (self donutFileWriter) SetDonutMetadata(metadata map[string]string) error { + for k := range self.donutMetadata { + delete(self.donutMetadata, k) + } + for k, v := range metadata { + self.donutMetadata[k] = v + } + return nil +} + +func (self donutFileWriter) GetDonutMetadata() (map[string]string, error) { + donutMetadata := make(map[string]string) + for k, v := range self.donutMetadata { + donutMetadata[k] = v + } + return donutMetadata, nil } diff --git a/pkg/storage/donut/donutdriver_test.go b/pkg/storage/donut/donutdriver_test.go new file mode 100644 index 000000000..02f48c089 --- /dev/null +++ b/pkg/storage/donut/donutdriver_test.go @@ -0,0 +1,197 @@ +package donut + +import ( + "testing" + + "bytes" + . "gopkg.in/check.v1" + "io" + "io/ioutil" + "os" +) + +func Test(t *testing.T) { TestingT(t) } + +type MySuite struct{} + +var _ = Suite(&MySuite{}) + +func (s *MySuite) TestEmptyBucket(c *C) { + root, err := ioutil.TempDir(os.TempDir(), "donut-") + c.Assert(err, IsNil) + defer os.RemoveAll(root) + donut := NewDonutDriver(root) + + // check buckets are empty + buckets, err := donut.ListBuckets() + c.Assert(err, IsNil) + c.Assert(buckets, DeepEquals, make([]string, 0)) +} + +func (s *MySuite) TestBucketWithoutNameFails(c *C) { + root, err := ioutil.TempDir(os.TempDir(), "donut-") + c.Assert(err, IsNil) + defer os.RemoveAll(root) + donut := NewDonutDriver(root) + // fail to create new bucket without a name + err = donut.CreateBucket("") + c.Assert(err, Not(IsNil)) + + err = donut.CreateBucket(" ") + c.Assert(err, Not(IsNil)) +} + +func (s *MySuite) TestCreateBucketAndList(c *C) { + root, err := ioutil.TempDir(os.TempDir(), "donut-") + c.Assert(err, IsNil) + defer os.RemoveAll(root) + donut := NewDonutDriver(root) + // create bucket + err = donut.CreateBucket("foo") + c.Assert(err, IsNil) + + // check bucket exists + buckets, err := donut.ListBuckets() + c.Assert(err, IsNil) + c.Assert(buckets, DeepEquals, []string{"foo"}) +} + +func (s *MySuite) TestCreateBucketWithSameNameFails(c *C) { + root, err := ioutil.TempDir(os.TempDir(), "donut-") + c.Assert(err, IsNil) + defer os.RemoveAll(root) + donut := NewDonutDriver(root) + err = donut.CreateBucket("foo") + c.Assert(err, IsNil) + + err = donut.CreateBucket("foo") + c.Assert(err, Not(IsNil)) +} + +func (s *MySuite) TestCreateMultipleBucketsAndList(c *C) { + root, err := ioutil.TempDir(os.TempDir(), "donut-") + c.Assert(err, IsNil) + defer os.RemoveAll(root) + donut := NewDonutDriver(root) + // add a second bucket + err = donut.CreateBucket("foo") + c.Assert(err, IsNil) + + err = donut.CreateBucket("bar") + c.Assert(err, IsNil) + + buckets, err := donut.ListBuckets() + c.Assert(err, IsNil) + c.Assert(buckets, DeepEquals, []string{"bar", "foo"}) + + err = donut.CreateBucket("foobar") + c.Assert(err, IsNil) + + buckets, err = donut.ListBuckets() + c.Assert(err, IsNil) + c.Assert(buckets, DeepEquals, []string{"bar", "foo", "foobar"}) +} + +func (s *MySuite) TestNewObjectFailsWithoutBucket(c *C) { + root, err := ioutil.TempDir(os.TempDir(), "donut-") + c.Assert(err, IsNil) + defer os.RemoveAll(root) + donut := NewDonutDriver(root) + + writer, err := donut.GetObjectWriter("foo", "obj") + c.Assert(err, Not(IsNil)) + c.Assert(writer, IsNil) +} + +func (s *MySuite) TestNewObjectFailsWithEmptyName(c *C) { + root, err := ioutil.TempDir(os.TempDir(), "donut-") + c.Assert(err, IsNil) + defer os.RemoveAll(root) + donut := NewDonutDriver(root) + + writer, err := donut.GetObjectWriter("foo", "") + c.Assert(err, Not(IsNil)) + c.Assert(writer, IsNil) + + writer, err = donut.GetObjectWriter("foo", " ") + c.Assert(err, Not(IsNil)) + c.Assert(writer, IsNil) +} + +func (s *MySuite) TestNewObjectCanBeWritten(c *C) { + root, err := ioutil.TempDir(os.TempDir(), "donut-") + c.Assert(err, IsNil) + defer os.RemoveAll(root) + donut := NewDonutDriver(root) + + err = donut.CreateBucket("foo") + c.Assert(err, IsNil) + + writer, err := donut.GetObjectWriter("foo", "obj") + c.Assert(err, IsNil) + + data := "Hello World" + length, err := writer.Write([]byte(data)) + c.Assert(length, Equals, len(data)) + + expectedMetadata := map[string]string{ + "foo": "bar", + "created": "one", + "hello": "world", + } + + err = writer.SetMetadata(expectedMetadata) + c.Assert(err, IsNil) + + err = writer.Close() + c.Assert(err, IsNil) + + actualWriterMetadata, err := writer.GetMetadata() + c.Assert(err, IsNil) + c.Assert(actualWriterMetadata, DeepEquals, expectedMetadata) + + c.Assert(err, IsNil) + + reader, err := donut.GetObject("foo", "obj") + c.Assert(err, IsNil) + + var actualData bytes.Buffer + _, err = io.Copy(&actualData, reader) + c.Assert(err, IsNil) + c.Assert(actualData.Bytes(), DeepEquals, []byte(data)) +} + +func (s *MySuite) TestMultipleNewObjects(c *C) { + root, err := ioutil.TempDir(os.TempDir(), "donut-") + c.Assert(err, IsNil) + defer os.RemoveAll(root) + donut := NewDonutDriver(root) + + c.Assert(donut.CreateBucket("foo"), IsNil) + writer, err := donut.GetObjectWriter("foo", "obj1") + c.Assert(err, IsNil) + writer.Write([]byte("one")) + writer.Close() + + writer, err = donut.GetObjectWriter("foo", "obj2") + c.Assert(err, IsNil) + writer.Write([]byte("two")) + writer.Close() + + // c.Skip("not complete") + + reader, err := donut.GetObject("foo", "obj1") + c.Assert(err, IsNil) + var readerBuffer1 bytes.Buffer + _, err = io.Copy(&readerBuffer1, reader) + c.Assert(err, IsNil) + // c.Skip("Not Implemented") + c.Assert(readerBuffer1.Bytes(), DeepEquals, []byte("one")) + + reader, err = donut.GetObject("foo", "obj2") + c.Assert(err, IsNil) + var readerBuffer2 bytes.Buffer + _, err = io.Copy(&readerBuffer2, reader) + c.Assert(err, IsNil) + c.Assert(readerBuffer2.Bytes(), DeepEquals, []byte("two")) +} diff --git a/pkg/storage/donut/objectwriter.go b/pkg/storage/donut/objectwriter.go new file mode 100644 index 000000000..17b04b135 --- /dev/null +++ b/pkg/storage/donut/objectwriter.go @@ -0,0 +1,39 @@ +package donut + +import ( + "errors" +) + +type objectWriter struct { + metadata map[string]string +} + +func (self objectWriter) Write(data []byte) (length int, err error) { + return 11, nil +} + +func (self objectWriter) Close() error { + return nil +} + +func (self objectWriter) CloseWithError(err error) error { + return errors.New("Not Implemented") +} + +func (self objectWriter) SetMetadata(metadata map[string]string) error { + for k := range self.metadata { + delete(self.metadata, k) + } + for k, v := range metadata { + self.metadata[k] = v + } + return nil +} + +func (self objectWriter) GetMetadata() (map[string]string, error) { + ret := make(map[string]string) + for k, v := range self.metadata { + ret[k] = v + } + return ret, nil +}