diff --git a/pkg/storage/donut/erasure/erasure.go b/pkg/storage/donut/erasure/erasure.go index ee96be4f3..901e2e779 100644 --- a/pkg/storage/donut/erasure/erasure.go +++ b/pkg/storage/donut/erasure/erasure.go @@ -18,7 +18,7 @@ const ( ) // Write latest donut format -func Write(target io.Writer, key string, part uint8, length uint32, k, m uint8, technique EncoderTechnique, data io.Reader) error { +func Write(target io.Writer, length uint32, k, m uint8, technique EncoderTechnique, data io.Reader) error { var versionedTechnique erasure1.EncoderTechnique switch { case technique == Vandermonde: @@ -28,7 +28,7 @@ func Write(target io.Writer, key string, part uint8, length uint32, k, m uint8, default: errors.New("Unknown encoder technique") } - return erasure1.Write(target, key, part, length, k, m, versionedTechnique, data) + return erasure1.Write(target, length, k, m, versionedTechnique, data) } // Read any donut format diff --git a/pkg/storage/donut/erasure/erasure1/erasure.go b/pkg/storage/donut/erasure/erasure1/erasure.go index 137441714..660d750bc 100644 --- a/pkg/storage/donut/erasure/erasure1/erasure.go +++ b/pkg/storage/donut/erasure/erasure1/erasure.go @@ -32,11 +32,6 @@ const ( // DataHeader represents the structure serialized to gob. type DataHeader struct { - // object + block stored - Key string - // chunk index of encoded block - ChunkIndex uint8 - // Original Length of the block output OriginalLength uint32 // Data Blocks EncoderK uint8 @@ -58,10 +53,6 @@ const ( // validate populated header func validateHeader(header DataHeader) error { - if header.Key == "" { - return errors.New("Empty Key") - } - if header.EncoderTechnique > 1 { return errors.New("Invalid encoder technique") } @@ -70,10 +61,8 @@ func validateHeader(header DataHeader) error { } // Write returns error upon any failure -func Write(target io.Writer, key string, part uint8, length uint32, k, m uint8, technique EncoderTechnique, data io.Reader) error { +func Write(target io.Writer, length uint32, k, m uint8, technique EncoderTechnique, data io.Reader) error { header := DataHeader{ - Key: key, - ChunkIndex: part, OriginalLength: length, EncoderK: k, EncoderM: m, diff --git a/pkg/storage/donut/erasure/erasure1/erasure_test.go b/pkg/storage/donut/erasure/erasure1/erasure_test.go index 37a1a3af4..57009db5f 100644 --- a/pkg/storage/donut/erasure/erasure1/erasure_test.go +++ b/pkg/storage/donut/erasure/erasure1/erasure_test.go @@ -36,15 +36,13 @@ func (s *MySuite) TestSingleWrite(c *C) { var testBuffer bytes.Buffer testData := "Hello, World" testHeader := DataHeader{ - Key: "testobj", - ChunkIndex: 1, OriginalLength: uint32(len(testData)), EncoderK: 8, EncoderM: 8, EncoderTechnique: Cauchy, } - err := Write(&testBuffer, testHeader.Key, testHeader.ChunkIndex, testHeader.OriginalLength, testHeader.EncoderK, testHeader.EncoderM, testHeader.EncoderTechnique, bytes.NewBufferString(testData)) + err := Write(&testBuffer, testHeader.OriginalLength, testHeader.EncoderK, testHeader.EncoderM, testHeader.EncoderTechnique, bytes.NewBufferString(testData)) c.Assert(err, IsNil) actualVersion := make([]byte, 4) @@ -71,15 +69,13 @@ func (s *MySuite) TestReadWrite(c *C) { var testBuffer bytes.Buffer testData := "Hello, World" testHeader := DataHeader{ - Key: "testobj", - ChunkIndex: 1, OriginalLength: uint32(len(testData)), EncoderK: 8, EncoderM: 8, EncoderTechnique: Cauchy, } - err := Write(&testBuffer, testHeader.Key, testHeader.ChunkIndex, testHeader.OriginalLength, testHeader.EncoderK, testHeader.EncoderM, testHeader.EncoderTechnique, bytes.NewBufferString(testData)) + err := Write(&testBuffer, testHeader.OriginalLength, testHeader.EncoderK, testHeader.EncoderM, testHeader.EncoderTechnique, bytes.NewBufferString(testData)) c.Assert(err, IsNil) header, err := ReadHeader(&testBuffer) diff --git a/pkg/storage/singledisk/singledisk.go b/pkg/storage/singledisk/singledisk.go index fb70ba96d..1a086b96f 100644 --- a/pkg/storage/singledisk/singledisk.go +++ b/pkg/storage/singledisk/singledisk.go @@ -17,22 +17,36 @@ package singledisk import ( + "bytes" "errors" + "github.com/minio-io/minio/pkg/encoding/erasure" "github.com/minio-io/minio/pkg/storage" + "github.com/minio-io/minio/pkg/storage/donut/erasure/erasure1" + "github.com/minio-io/minio/pkg/storage/donut/object/objectv1" + "github.com/minio-io/minio/pkg/utils/split" "io" "os" + "path" ) // StorageDriver creates a new single disk storage driver using donut without encoding. type StorageDriver struct { - root string + root string + donutBox DonutBox +} + +// DonutBox is an interface specifying how the storage driver should interact with its underlying system. +type DonutBox interface { + Store(objectv1.ObjectMetadata, erasure1.DataHeader, io.Reader) } // Start a single disk subsystem -func Start(root string) (chan<- string, <-chan error, storage.Storage) { +func Start(root string, donutBox DonutBox) (chan<- string, <-chan error, storage.Storage) { ctrlChannel := make(chan string) errorChannel := make(chan error) s := new(StorageDriver) + s.root = root + s.donutBox = donutBox go start(ctrlChannel, errorChannel, s) return ctrlChannel, errorChannel, s } @@ -50,7 +64,8 @@ func (diskStorage StorageDriver) ListBuckets() ([]storage.BucketMetadata, error) // CreateBucket creates a new bucket func (diskStorage StorageDriver) CreateBucket(bucket string) error { - return errors.New("Not Implemented") + bucketPath := path.Join(diskStorage.root, bucket) + return os.MkdirAll(bucketPath, 0600) } // CreateBucketPolicy sets a bucket's access policy @@ -85,5 +100,42 @@ func (diskStorage StorageDriver) ListObjects(bucket string, resources storage.Bu // CreateObject creates a new object func (diskStorage StorageDriver) CreateObject(bucket string, key string, contentType string, data io.Reader) error { + // test if object exists + // split object into erasure parts + erasureParts := split.Stream(data, 10*1024*1024) + // set up encoder + params, err := erasure.ParseEncoderParams(8, 8, erasure.Cauchy) + if err != nil { + return err + } + encoder := erasure.NewEncoder(params) + // for each erasure part + erasurePartIndex := 1 + for erasurePart := range erasureParts { + if erasurePart.Err != nil { + return erasurePart.Err + } + // encode each erasure part into encoded parts + encodedParts, length := encoder.Encode(erasurePart.Data) + // for each encoded part + for encodedPartIndex, encodedPart := range encodedParts { + objectMetadata := objectv1.ObjectMetadata{ + Bucket: bucket, + Key: key, + ErasurePart: uint16(erasurePartIndex), + EncodedPart: uint8(encodedPartIndex), + ContentType: contentType, + } + erasureMetadata := erasure1.DataHeader{ + OriginalLength: uint32(length), + EncoderK: 8, + EncoderM: 8, + EncoderTechnique: erasure1.Cauchy, + } + // store encoded part + diskStorage.donutBox.Store(objectMetadata, erasureMetadata, bytes.NewBuffer(encodedPart)) + erasurePartIndex = erasurePartIndex + 1 + } + } return errors.New("Not Implemented") } diff --git a/pkg/storage/singledisk/singledisk_test.go b/pkg/storage/singledisk/singledisk_test.go index ed9f63702..a02d1c6b3 100644 --- a/pkg/storage/singledisk/singledisk_test.go +++ b/pkg/storage/singledisk/singledisk_test.go @@ -39,7 +39,7 @@ func (s *MySuite) TestAPISuite(c *C) { path, err := ioutil.TempDir(os.TempDir(), "minio-fs-") c.Check(err, IsNil) storageList = append(storageList, path) - _, _, store := Start(path) + _, _, store := Start(path, nil) // TODO Make InMemory driver return store } mstorage.APITestSuite(c, create)