Merge pull request #288 from fkautz/pr_out_more_refactoring_to_meet_new_donut_spec

master
Frederick F. Kautz IV 10 years ago
commit 1047383bcf
  1. 4
      pkg/storage/donut/erasure/erasure.go
  2. 13
      pkg/storage/donut/erasure/erasure1/erasure.go
  3. 8
      pkg/storage/donut/erasure/erasure1/erasure_test.go
  4. 58
      pkg/storage/singledisk/singledisk.go
  5. 2
      pkg/storage/singledisk/singledisk_test.go

@ -18,7 +18,7 @@ const (
) )
// Write latest donut format // Write latest donut format
func Write(target io.Writer, key string, part uint8, length uint32, k, m uint8, technique EncoderTechnique, data io.Reader) error { func Write(target io.Writer, length uint32, k, m uint8, technique EncoderTechnique, data io.Reader) error {
var versionedTechnique erasure1.EncoderTechnique var versionedTechnique erasure1.EncoderTechnique
switch { switch {
case technique == Vandermonde: case technique == Vandermonde:
@ -28,7 +28,7 @@ func Write(target io.Writer, key string, part uint8, length uint32, k, m uint8,
default: default:
errors.New("Unknown encoder technique") errors.New("Unknown encoder technique")
} }
return erasure1.Write(target, key, part, length, k, m, versionedTechnique, data) return erasure1.Write(target, length, k, m, versionedTechnique, data)
} }
// Read any donut format // Read any donut format

@ -32,11 +32,6 @@ const (
// DataHeader represents the structure serialized to gob. // DataHeader represents the structure serialized to gob.
type DataHeader struct { type DataHeader struct {
// object + block stored
Key string
// chunk index of encoded block
ChunkIndex uint8
// Original Length of the block output
OriginalLength uint32 OriginalLength uint32
// Data Blocks // Data Blocks
EncoderK uint8 EncoderK uint8
@ -58,10 +53,6 @@ const (
// validate populated header // validate populated header
func validateHeader(header DataHeader) error { func validateHeader(header DataHeader) error {
if header.Key == "" {
return errors.New("Empty Key")
}
if header.EncoderTechnique > 1 { if header.EncoderTechnique > 1 {
return errors.New("Invalid encoder technique") return errors.New("Invalid encoder technique")
} }
@ -70,10 +61,8 @@ func validateHeader(header DataHeader) error {
} }
// Write returns error upon any failure // Write returns error upon any failure
func Write(target io.Writer, key string, part uint8, length uint32, k, m uint8, technique EncoderTechnique, data io.Reader) error { func Write(target io.Writer, length uint32, k, m uint8, technique EncoderTechnique, data io.Reader) error {
header := DataHeader{ header := DataHeader{
Key: key,
ChunkIndex: part,
OriginalLength: length, OriginalLength: length,
EncoderK: k, EncoderK: k,
EncoderM: m, EncoderM: m,

@ -36,15 +36,13 @@ func (s *MySuite) TestSingleWrite(c *C) {
var testBuffer bytes.Buffer var testBuffer bytes.Buffer
testData := "Hello, World" testData := "Hello, World"
testHeader := DataHeader{ testHeader := DataHeader{
Key: "testobj",
ChunkIndex: 1,
OriginalLength: uint32(len(testData)), OriginalLength: uint32(len(testData)),
EncoderK: 8, EncoderK: 8,
EncoderM: 8, EncoderM: 8,
EncoderTechnique: Cauchy, EncoderTechnique: Cauchy,
} }
err := Write(&testBuffer, testHeader.Key, testHeader.ChunkIndex, testHeader.OriginalLength, testHeader.EncoderK, testHeader.EncoderM, testHeader.EncoderTechnique, bytes.NewBufferString(testData)) err := Write(&testBuffer, testHeader.OriginalLength, testHeader.EncoderK, testHeader.EncoderM, testHeader.EncoderTechnique, bytes.NewBufferString(testData))
c.Assert(err, IsNil) c.Assert(err, IsNil)
actualVersion := make([]byte, 4) actualVersion := make([]byte, 4)
@ -71,15 +69,13 @@ func (s *MySuite) TestReadWrite(c *C) {
var testBuffer bytes.Buffer var testBuffer bytes.Buffer
testData := "Hello, World" testData := "Hello, World"
testHeader := DataHeader{ testHeader := DataHeader{
Key: "testobj",
ChunkIndex: 1,
OriginalLength: uint32(len(testData)), OriginalLength: uint32(len(testData)),
EncoderK: 8, EncoderK: 8,
EncoderM: 8, EncoderM: 8,
EncoderTechnique: Cauchy, EncoderTechnique: Cauchy,
} }
err := Write(&testBuffer, testHeader.Key, testHeader.ChunkIndex, testHeader.OriginalLength, testHeader.EncoderK, testHeader.EncoderM, testHeader.EncoderTechnique, bytes.NewBufferString(testData)) err := Write(&testBuffer, testHeader.OriginalLength, testHeader.EncoderK, testHeader.EncoderM, testHeader.EncoderTechnique, bytes.NewBufferString(testData))
c.Assert(err, IsNil) c.Assert(err, IsNil)
header, err := ReadHeader(&testBuffer) header, err := ReadHeader(&testBuffer)

@ -17,22 +17,36 @@
package singledisk package singledisk
import ( import (
"bytes"
"errors" "errors"
"github.com/minio-io/minio/pkg/encoding/erasure"
"github.com/minio-io/minio/pkg/storage" "github.com/minio-io/minio/pkg/storage"
"github.com/minio-io/minio/pkg/storage/donut/erasure/erasure1"
"github.com/minio-io/minio/pkg/storage/donut/object/objectv1"
"github.com/minio-io/minio/pkg/utils/split"
"io" "io"
"os" "os"
"path"
) )
// StorageDriver creates a new single disk storage driver using donut without encoding. // StorageDriver creates a new single disk storage driver using donut without encoding.
type StorageDriver struct { type StorageDriver struct {
root string root string
donutBox DonutBox
}
// DonutBox is an interface specifying how the storage driver should interact with its underlying system.
type DonutBox interface {
Store(objectv1.ObjectMetadata, erasure1.DataHeader, io.Reader)
} }
// Start a single disk subsystem // Start a single disk subsystem
func Start(root string) (chan<- string, <-chan error, storage.Storage) { func Start(root string, donutBox DonutBox) (chan<- string, <-chan error, storage.Storage) {
ctrlChannel := make(chan string) ctrlChannel := make(chan string)
errorChannel := make(chan error) errorChannel := make(chan error)
s := new(StorageDriver) s := new(StorageDriver)
s.root = root
s.donutBox = donutBox
go start(ctrlChannel, errorChannel, s) go start(ctrlChannel, errorChannel, s)
return ctrlChannel, errorChannel, s return ctrlChannel, errorChannel, s
} }
@ -50,7 +64,8 @@ func (diskStorage StorageDriver) ListBuckets() ([]storage.BucketMetadata, error)
// CreateBucket creates a new bucket // CreateBucket creates a new bucket
func (diskStorage StorageDriver) CreateBucket(bucket string) error { func (diskStorage StorageDriver) CreateBucket(bucket string) error {
return errors.New("Not Implemented") bucketPath := path.Join(diskStorage.root, bucket)
return os.MkdirAll(bucketPath, 0600)
} }
// CreateBucketPolicy sets a bucket's access policy // CreateBucketPolicy sets a bucket's access policy
@ -85,5 +100,42 @@ func (diskStorage StorageDriver) ListObjects(bucket string, resources storage.Bu
// CreateObject creates a new object // CreateObject creates a new object
func (diskStorage StorageDriver) CreateObject(bucket string, key string, contentType string, data io.Reader) error { func (diskStorage StorageDriver) CreateObject(bucket string, key string, contentType string, data io.Reader) error {
// test if object exists
// split object into erasure parts
erasureParts := split.Stream(data, 10*1024*1024)
// set up encoder
params, err := erasure.ParseEncoderParams(8, 8, erasure.Cauchy)
if err != nil {
return err
}
encoder := erasure.NewEncoder(params)
// for each erasure part
erasurePartIndex := 1
for erasurePart := range erasureParts {
if erasurePart.Err != nil {
return erasurePart.Err
}
// encode each erasure part into encoded parts
encodedParts, length := encoder.Encode(erasurePart.Data)
// for each encoded part
for encodedPartIndex, encodedPart := range encodedParts {
objectMetadata := objectv1.ObjectMetadata{
Bucket: bucket,
Key: key,
ErasurePart: uint16(erasurePartIndex),
EncodedPart: uint8(encodedPartIndex),
ContentType: contentType,
}
erasureMetadata := erasure1.DataHeader{
OriginalLength: uint32(length),
EncoderK: 8,
EncoderM: 8,
EncoderTechnique: erasure1.Cauchy,
}
// store encoded part
diskStorage.donutBox.Store(objectMetadata, erasureMetadata, bytes.NewBuffer(encodedPart))
erasurePartIndex = erasurePartIndex + 1
}
}
return errors.New("Not Implemented") return errors.New("Not Implemented")
} }

@ -39,7 +39,7 @@ func (s *MySuite) TestAPISuite(c *C) {
path, err := ioutil.TempDir(os.TempDir(), "minio-fs-") path, err := ioutil.TempDir(os.TempDir(), "minio-fs-")
c.Check(err, IsNil) c.Check(err, IsNil)
storageList = append(storageList, path) storageList = append(storageList, path)
_, _, store := Start(path) _, _, store := Start(path, nil) // TODO Make InMemory driver
return store return store
} }
mstorage.APITestSuite(c, create) mstorage.APITestSuite(c, create)

Loading…
Cancel
Save