parent
e5995f0193
commit
568ca4e1bc
@ -0,0 +1,46 @@ |
|||||||
|
package main |
||||||
|
|
||||||
|
import ( |
||||||
|
"errors" |
||||||
|
"io" |
||||||
|
"os" |
||||||
|
"path" |
||||||
|
|
||||||
|
"github.com/minio-io/minio/pkgs/storage" |
||||||
|
es "github.com/minio-io/minio/pkgs/storage/encodedstorage" |
||||||
|
) |
||||||
|
|
||||||
|
func erasureGetList(config inputConfig) (io.Reader, error) { |
||||||
|
// do nothing
|
||||||
|
return nil, errors.New("Not Implemented") |
||||||
|
} |
||||||
|
|
||||||
|
func erasureGet(config inputConfig, objectPath string) (io.Reader, error) { |
||||||
|
var objectStorage storage.ObjectStorage |
||||||
|
rootDir := path.Join(config.rootDir, config.storageDriver) |
||||||
|
objectStorage, err := es.NewStorage(rootDir, 10, 6, 1024*1024) |
||||||
|
if err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
object, err := objectStorage.Get(objectPath) |
||||||
|
if err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
return object, nil |
||||||
|
} |
||||||
|
|
||||||
|
func erasurePut(config inputConfig, objectPath string, reader io.Reader) error { |
||||||
|
var err error |
||||||
|
rootDir := path.Join(config.rootDir, config.storageDriver) |
||||||
|
if err := os.MkdirAll(rootDir, 0700); err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
var objectStorage storage.ObjectStorage |
||||||
|
if objectStorage, err = es.NewStorage(rootDir, 10, 6, 1024*1024); err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
if err = objectStorage.Put(objectPath, reader); err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
return nil |
||||||
|
} |
@ -0,0 +1,232 @@ |
|||||||
|
package encodedstorage |
||||||
|
|
||||||
|
import ( |
||||||
|
"bytes" |
||||||
|
"encoding/gob" |
||||||
|
"errors" |
||||||
|
"io" |
||||||
|
"io/ioutil" |
||||||
|
"os" |
||||||
|
"path" |
||||||
|
"strconv" |
||||||
|
|
||||||
|
"github.com/minio-io/minio/pkgs/erasure" |
||||||
|
"github.com/minio-io/minio/pkgs/split" |
||||||
|
"github.com/minio-io/minio/pkgs/storage" |
||||||
|
"github.com/minio-io/minio/pkgs/storage/appendstorage" |
||||||
|
) |
||||||
|
|
||||||
|
type encodedStorage struct { |
||||||
|
RootDir string |
||||||
|
K int |
||||||
|
M int |
||||||
|
BlockSize uint64 |
||||||
|
objects map[string]StorageEntry |
||||||
|
diskStorage []storage.ObjectStorage |
||||||
|
} |
||||||
|
|
||||||
|
func NewStorage(rootDir string, k, m int, blockSize uint64) (storage.ObjectStorage, error) { |
||||||
|
// create storage files
|
||||||
|
storageNodes := make([]storage.ObjectStorage, 16) |
||||||
|
for i := 0; i < 16; i++ { |
||||||
|
storageNode, err := appendstorage.NewStorage(rootDir, i) |
||||||
|
storageNodes[i] = storageNode |
||||||
|
if err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
} |
||||||
|
objects := make(map[string]StorageEntry) |
||||||
|
indexPath := path.Join(rootDir, "index") |
||||||
|
if _, err := os.Stat(indexPath); err == nil { |
||||||
|
indexFile, err := os.Open(indexPath) |
||||||
|
defer indexFile.Close() |
||||||
|
if err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
encoder := gob.NewDecoder(indexFile) |
||||||
|
err = encoder.Decode(&objects) |
||||||
|
if err != nil { |
||||||
|
return nil, err |
||||||
|
} |
||||||
|
} |
||||||
|
newStorage := encodedStorage{ |
||||||
|
RootDir: rootDir, |
||||||
|
K: k, |
||||||
|
M: m, |
||||||
|
BlockSize: blockSize, |
||||||
|
objects: objects, |
||||||
|
diskStorage: storageNodes, |
||||||
|
} |
||||||
|
return &newStorage, nil |
||||||
|
} |
||||||
|
|
||||||
|
func (eStorage *encodedStorage) Get(objectPath string) (io.Reader, error) { |
||||||
|
entry, ok := eStorage.objects[objectPath] |
||||||
|
if ok == false { |
||||||
|
return nil, nil |
||||||
|
} |
||||||
|
reader, writer := io.Pipe() |
||||||
|
go eStorage.readObject(objectPath, entry, writer) |
||||||
|
return reader, nil |
||||||
|
} |
||||||
|
|
||||||
|
func (eStorage *encodedStorage) List(listPath string) ([]storage.ObjectDescription, error) { |
||||||
|
return nil, errors.New("Not Implemented") |
||||||
|
} |
||||||
|
|
||||||
|
func (eStorage *encodedStorage) Put(objectPath string, object io.Reader) error { |
||||||
|
// split
|
||||||
|
chunks := make(chan split.SplitMessage) |
||||||
|
go split.SplitStream(object, eStorage.BlockSize, chunks) |
||||||
|
|
||||||
|
// for each chunk
|
||||||
|
encoderParameters, err := erasure.ParseEncoderParams(eStorage.K, eStorage.M, erasure.CAUCHY) |
||||||
|
if err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
encoder := erasure.NewEncoder(encoderParameters) |
||||||
|
entry := StorageEntry{ |
||||||
|
Path: objectPath, |
||||||
|
Md5sum: "md5sum", |
||||||
|
Crc: 24, |
||||||
|
Blocks: make([]StorageBlockEntry, 0), |
||||||
|
} |
||||||
|
i := 0 |
||||||
|
// encode
|
||||||
|
for chunk := range chunks { |
||||||
|
if chunk.Err == nil { |
||||||
|
// encode each
|
||||||
|
blocks, length := encoder.Encode(chunk.Data) |
||||||
|
// store each
|
||||||
|
storeErrors := eStorage.storeBlocks(objectPath+"$"+strconv.Itoa(i), blocks) |
||||||
|
for _, err := range storeErrors { |
||||||
|
if err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
} |
||||||
|
blockEntry := StorageBlockEntry{ |
||||||
|
Index: i, |
||||||
|
Length: length, |
||||||
|
} |
||||||
|
entry.Blocks = append(entry.Blocks, blockEntry) |
||||||
|
} else { |
||||||
|
return chunk.Err |
||||||
|
} |
||||||
|
i++ |
||||||
|
} |
||||||
|
eStorage.objects[objectPath] = entry |
||||||
|
var gobBuffer bytes.Buffer |
||||||
|
gobEncoder := gob.NewEncoder(&gobBuffer) |
||||||
|
gobEncoder.Encode(eStorage.objects) |
||||||
|
ioutil.WriteFile(path.Join(eStorage.RootDir, "index"), gobBuffer.Bytes(), 0600) |
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
type storeRequest struct { |
||||||
|
path string |
||||||
|
data []byte |
||||||
|
} |
||||||
|
|
||||||
|
type storeResponse struct { |
||||||
|
data []byte |
||||||
|
err error |
||||||
|
} |
||||||
|
|
||||||
|
type StorageEntry struct { |
||||||
|
Path string |
||||||
|
Md5sum string |
||||||
|
Crc uint32 |
||||||
|
Blocks []StorageBlockEntry |
||||||
|
} |
||||||
|
|
||||||
|
type StorageBlockEntry struct { |
||||||
|
Index int |
||||||
|
Length int |
||||||
|
} |
||||||
|
|
||||||
|
func (eStorage *encodedStorage) storeBlocks(path string, blocks [][]byte) []error { |
||||||
|
returnChannels := make([]<-chan error, len(eStorage.diskStorage)) |
||||||
|
for i, store := range eStorage.diskStorage { |
||||||
|
returnChannels[i] = storageRoutine(store, path, bytes.NewBuffer(blocks[i])) |
||||||
|
} |
||||||
|
returnErrors := make([]error, 0) |
||||||
|
for _, returnChannel := range returnChannels { |
||||||
|
for returnValue := range returnChannel { |
||||||
|
if returnValue != nil { |
||||||
|
returnErrors = append(returnErrors, returnValue) |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
return returnErrors |
||||||
|
} |
||||||
|
|
||||||
|
func (eStorage *encodedStorage) readObject(objectPath string, entry StorageEntry, writer *io.PipeWriter) { |
||||||
|
params, err := erasure.ParseEncoderParams(eStorage.K, eStorage.M, erasure.CAUCHY) |
||||||
|
if err != nil { |
||||||
|
} |
||||||
|
encoder := erasure.NewEncoder(params) |
||||||
|
for i, chunk := range entry.Blocks { |
||||||
|
blockSlices := eStorage.getBlockSlices(objectPath + "$" + strconv.Itoa(i)) |
||||||
|
var blocks [][]byte |
||||||
|
for _, slice := range blockSlices { |
||||||
|
if slice.err != nil { |
||||||
|
writer.CloseWithError(err) |
||||||
|
return |
||||||
|
} |
||||||
|
blocks = append(blocks, slice.data) |
||||||
|
} |
||||||
|
data, err := encoder.Decode(blocks, chunk.Length) |
||||||
|
if err != nil { |
||||||
|
writer.CloseWithError(err) |
||||||
|
return |
||||||
|
} |
||||||
|
bytesWritten := 0 |
||||||
|
for bytesWritten != len(data) { |
||||||
|
written, err := writer.Write(data[bytesWritten:len(data)]) |
||||||
|
if err != nil { |
||||||
|
writer.CloseWithError(err) |
||||||
|
} |
||||||
|
bytesWritten += written |
||||||
|
} |
||||||
|
} |
||||||
|
writer.Close() |
||||||
|
} |
||||||
|
|
||||||
|
func (eStorage *encodedStorage) getBlockSlices(objectPath string) []storeResponse { |
||||||
|
responses := make([]<-chan storeResponse, 0) |
||||||
|
for i := 0; i < len(eStorage.diskStorage); i++ { |
||||||
|
response := getSlice(eStorage.diskStorage[i], objectPath) |
||||||
|
responses = append(responses, response) |
||||||
|
} |
||||||
|
results := make([]storeResponse, 0) |
||||||
|
for _, response := range responses { |
||||||
|
results = append(results, <-response) |
||||||
|
} |
||||||
|
return results |
||||||
|
} |
||||||
|
|
||||||
|
func getSlice(store storage.ObjectStorage, path string) <-chan storeResponse { |
||||||
|
out := make(chan storeResponse) |
||||||
|
go func() { |
||||||
|
obj, err := store.Get(path) |
||||||
|
if err != nil { |
||||||
|
out <- storeResponse{data: nil, err: err} |
||||||
|
} else { |
||||||
|
data, err := ioutil.ReadAll(obj) |
||||||
|
out <- storeResponse{data: data, err: err} |
||||||
|
} |
||||||
|
close(out) |
||||||
|
}() |
||||||
|
return out |
||||||
|
} |
||||||
|
|
||||||
|
func storageRoutine(store storage.ObjectStorage, path string, data io.Reader) <-chan error { |
||||||
|
out := make(chan error) |
||||||
|
go func() { |
||||||
|
if err := store.Put(path, data); err != nil { |
||||||
|
out <- err |
||||||
|
} |
||||||
|
close(out) |
||||||
|
}() |
||||||
|
return out |
||||||
|
} |
@ -0,0 +1,124 @@ |
|||||||
|
package encodedstorage |
||||||
|
|
||||||
|
import ( |
||||||
|
"bytes" |
||||||
|
"io/ioutil" |
||||||
|
"os" |
||||||
|
"strconv" |
||||||
|
"testing" |
||||||
|
|
||||||
|
"github.com/minio-io/minio/pkgs/storage" |
||||||
|
. "gopkg.in/check.v1" |
||||||
|
) |
||||||
|
|
||||||
|
type EncodedStorageSuite struct{} |
||||||
|
|
||||||
|
var _ = Suite(&EncodedStorageSuite{}) |
||||||
|
|
||||||
|
func Test(t *testing.T) { TestingT(t) } |
||||||
|
|
||||||
|
func makeTempTestDir() (string, error) { |
||||||
|
return ioutil.TempDir("/tmp", "minio-test-") |
||||||
|
} |
||||||
|
|
||||||
|
func (s *EncodedStorageSuite) TestFileStoragePutAtRootPath(c *C) { |
||||||
|
rootDir, err := makeTempTestDir() |
||||||
|
c.Assert(err, IsNil) |
||||||
|
defer os.RemoveAll(rootDir) |
||||||
|
|
||||||
|
var objectStorage storage.ObjectStorage |
||||||
|
objectStorage, err = NewStorage(rootDir, 10, 6, 1024) |
||||||
|
c.Assert(err, IsNil) |
||||||
|
objectBuffer := bytes.NewBuffer([]byte("object1")) |
||||||
|
objectStorage.Put("path1", objectBuffer) |
||||||
|
|
||||||
|
// assert object1 was created in correct path
|
||||||
|
objectResult1, err := objectStorage.Get("path1") |
||||||
|
c.Assert(err, IsNil) |
||||||
|
object1, _ := ioutil.ReadAll(objectResult1) |
||||||
|
c.Assert(string(object1), Equals, "object1") |
||||||
|
|
||||||
|
// objectList, err := objectStorage.List("/")
|
||||||
|
// c.Assert(err, IsNil)
|
||||||
|
// c.Assert(objectList[0].Path, Equals, "path1")
|
||||||
|
} |
||||||
|
|
||||||
|
func (s *EncodedStorageSuite) TestFileStoragePutDirPath(c *C) { |
||||||
|
rootDir, err := makeTempTestDir() |
||||||
|
c.Assert(err, IsNil) |
||||||
|
defer os.RemoveAll(rootDir) |
||||||
|
|
||||||
|
var objectStorage storage.ObjectStorage |
||||||
|
objectStorage, err = NewStorage(rootDir, 10, 6, 1024) |
||||||
|
c.Assert(err, IsNil) |
||||||
|
|
||||||
|
objectBuffer1 := bytes.NewBuffer([]byte("object1")) |
||||||
|
objectStorage.Put("path1/path2/path3", objectBuffer1) |
||||||
|
|
||||||
|
// assert object1 was created in correct path
|
||||||
|
objectResult1, err := objectStorage.Get("path1/path2/path3") |
||||||
|
c.Assert(err, IsNil) |
||||||
|
object1, _ := ioutil.ReadAll(objectResult1) |
||||||
|
c.Assert(string(object1), Equals, "object1") |
||||||
|
|
||||||
|
// add second object
|
||||||
|
objectBuffer2 := bytes.NewBuffer([]byte("object2")) |
||||||
|
err = objectStorage.Put("path2/path2/path2", objectBuffer2) |
||||||
|
c.Assert(err, IsNil) |
||||||
|
|
||||||
|
// add third object
|
||||||
|
objectBuffer3 := bytes.NewBuffer([]byte("object3")) |
||||||
|
err = objectStorage.Put("object3", objectBuffer3) |
||||||
|
c.Assert(err, IsNil) |
||||||
|
|
||||||
|
// TODO support list
|
||||||
|
// objectList, err := objectStorage.List("/")
|
||||||
|
// c.Assert(err, IsNil)
|
||||||
|
// c.Assert(objectList[0], Equals, storage.ObjectDescription{Path: "object3", IsDir: false, Hash: ""})
|
||||||
|
// c.Assert(objectList[1], Equals, storage.ObjectDescription{Path: "path1", IsDir: true, Hash: ""})
|
||||||
|
// c.Assert(objectList[2], Equals, storage.ObjectDescription{Path: "path2", IsDir: true, Hash: ""})
|
||||||
|
// c.Assert(len(objectList), Equals, 3)
|
||||||
|
//
|
||||||
|
// objectList, err = objectStorage.List("/path1")
|
||||||
|
// c.Assert(err, IsNil)
|
||||||
|
// c.Assert(objectList[0], Equals, storage.ObjectDescription{Path: "path2", IsDir: true, Hash: ""})
|
||||||
|
// c.Assert(len(objectList), Equals, 1)
|
||||||
|
//
|
||||||
|
// objectList, err = objectStorage.List("/path1/path2")
|
||||||
|
// c.Assert(err, IsNil)
|
||||||
|
// c.Assert(objectList[0], Equals, storage.ObjectDescription{Path: "path3", IsDir: false, Hash: ""})
|
||||||
|
// c.Assert(len(objectList), Equals, 1)
|
||||||
|
//
|
||||||
|
// objectList, err = objectStorage.List("/path1/path2/path3")
|
||||||
|
// c.Assert(err, Not(IsNil))
|
||||||
|
// c.Assert(objectList, IsNil)
|
||||||
|
} |
||||||
|
|
||||||
|
func (s *EncodedStorageSuite) TestObjectWithChunking(c *C) { |
||||||
|
rootDir, err := makeTempTestDir() |
||||||
|
c.Assert(err, IsNil) |
||||||
|
defer os.RemoveAll(rootDir) |
||||||
|
|
||||||
|
var objectStorage storage.ObjectStorage |
||||||
|
objectStorage, err = NewStorage(rootDir, 10, 6, 1024) |
||||||
|
c.Assert(err, IsNil) |
||||||
|
|
||||||
|
var buffer bytes.Buffer |
||||||
|
for i := 0; i <= 2048; i++ { |
||||||
|
buffer.Write([]byte(strconv.Itoa(i))) |
||||||
|
} |
||||||
|
|
||||||
|
reader := bytes.NewReader(buffer.Bytes()) |
||||||
|
|
||||||
|
err = objectStorage.Put("object", reader) |
||||||
|
c.Assert(err, IsNil) |
||||||
|
|
||||||
|
objectStorage2, err := NewStorage(rootDir, 10, 6, 1024) |
||||||
|
c.Assert(err, IsNil) |
||||||
|
objectResult, err := objectStorage2.Get("object") |
||||||
|
c.Assert(err, IsNil) |
||||||
|
result, err := ioutil.ReadAll(objectResult) |
||||||
|
c.Assert(err, IsNil) |
||||||
|
c.Assert(bytes.Compare(result, buffer.Bytes()), Equals, 0) |
||||||
|
|
||||||
|
} |
Loading…
Reference in new issue