|
|
|
@ -17,22 +17,36 @@ |
|
|
|
|
package singledisk |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"bytes" |
|
|
|
|
"errors" |
|
|
|
|
"github.com/minio-io/minio/pkg/encoding/erasure" |
|
|
|
|
"github.com/minio-io/minio/pkg/storage" |
|
|
|
|
"github.com/minio-io/minio/pkg/storage/donut/erasure/erasure1" |
|
|
|
|
"github.com/minio-io/minio/pkg/storage/donut/object/objectv1" |
|
|
|
|
"github.com/minio-io/minio/pkg/utils/split" |
|
|
|
|
"io" |
|
|
|
|
"os" |
|
|
|
|
"path" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// StorageDriver creates a new single disk storage driver using donut without encoding.
|
|
|
|
|
type StorageDriver struct { |
|
|
|
|
root string |
|
|
|
|
root string |
|
|
|
|
donutBox DonutBox |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// DonutBox is an interface specifying how the storage driver should interact with its underlying system.
|
|
|
|
|
type DonutBox interface { |
|
|
|
|
Store(objectv1.ObjectMetadata, erasure1.DataHeader, io.Reader) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Start a single disk subsystem
|
|
|
|
|
func Start(root string) (chan<- string, <-chan error, storage.Storage) { |
|
|
|
|
func Start(root string, donutBox DonutBox) (chan<- string, <-chan error, storage.Storage) { |
|
|
|
|
ctrlChannel := make(chan string) |
|
|
|
|
errorChannel := make(chan error) |
|
|
|
|
s := new(StorageDriver) |
|
|
|
|
s.root = root |
|
|
|
|
s.donutBox = donutBox |
|
|
|
|
go start(ctrlChannel, errorChannel, s) |
|
|
|
|
return ctrlChannel, errorChannel, s |
|
|
|
|
} |
|
|
|
@ -50,7 +64,8 @@ func (diskStorage StorageDriver) ListBuckets() ([]storage.BucketMetadata, error) |
|
|
|
|
|
|
|
|
|
// CreateBucket creates a new bucket
|
|
|
|
|
func (diskStorage StorageDriver) CreateBucket(bucket string) error { |
|
|
|
|
return errors.New("Not Implemented") |
|
|
|
|
bucketPath := path.Join(diskStorage.root, bucket) |
|
|
|
|
return os.MkdirAll(bucketPath, 0600) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// CreateBucketPolicy sets a bucket's access policy
|
|
|
|
@ -85,5 +100,42 @@ func (diskStorage StorageDriver) ListObjects(bucket string, resources storage.Bu |
|
|
|
|
|
|
|
|
|
// CreateObject creates a new object
|
|
|
|
|
func (diskStorage StorageDriver) CreateObject(bucket string, key string, contentType string, data io.Reader) error { |
|
|
|
|
// test if object exists
|
|
|
|
|
// split object into erasure parts
|
|
|
|
|
erasureParts := split.Stream(data, 10*1024*1024) |
|
|
|
|
// set up encoder
|
|
|
|
|
params, err := erasure.ParseEncoderParams(8, 8, erasure.Cauchy) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
encoder := erasure.NewEncoder(params) |
|
|
|
|
// for each erasure part
|
|
|
|
|
erasurePartIndex := 1 |
|
|
|
|
for erasurePart := range erasureParts { |
|
|
|
|
if erasurePart.Err != nil { |
|
|
|
|
return erasurePart.Err |
|
|
|
|
} |
|
|
|
|
// encode each erasure part into encoded parts
|
|
|
|
|
encodedParts, length := encoder.Encode(erasurePart.Data) |
|
|
|
|
// for each encoded part
|
|
|
|
|
for encodedPartIndex, encodedPart := range encodedParts { |
|
|
|
|
objectMetadata := objectv1.ObjectMetadata{ |
|
|
|
|
Bucket: bucket, |
|
|
|
|
Key: key, |
|
|
|
|
ErasurePart: uint16(erasurePartIndex), |
|
|
|
|
EncodedPart: uint8(encodedPartIndex), |
|
|
|
|
ContentType: contentType, |
|
|
|
|
} |
|
|
|
|
erasureMetadata := erasure1.DataHeader{ |
|
|
|
|
OriginalLength: uint32(length), |
|
|
|
|
EncoderK: 8, |
|
|
|
|
EncoderM: 8, |
|
|
|
|
EncoderTechnique: erasure1.Cauchy, |
|
|
|
|
} |
|
|
|
|
// store encoded part
|
|
|
|
|
diskStorage.donutBox.Store(objectMetadata, erasureMetadata, bytes.NewBuffer(encodedPart)) |
|
|
|
|
erasurePartIndex = erasurePartIndex + 1 |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return errors.New("Not Implemented") |
|
|
|
|
} |
|
|
|
|