|
|
@ -25,10 +25,33 @@ type encodedStorage struct { |
|
|
|
diskStorage []storage.ObjectStorage |
|
|
|
diskStorage []storage.ObjectStorage |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
type StorageEntry struct { |
|
|
|
|
|
|
|
Path string |
|
|
|
|
|
|
|
Md5sum string |
|
|
|
|
|
|
|
Crc uint32 |
|
|
|
|
|
|
|
Blocks []StorageBlockEntry |
|
|
|
|
|
|
|
Encoderparams *erasure.EncoderParams |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
type StorageBlockEntry struct { |
|
|
|
|
|
|
|
Index int |
|
|
|
|
|
|
|
Length int |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
type storeRequest struct { |
|
|
|
|
|
|
|
path string |
|
|
|
|
|
|
|
data []byte |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
type storeResponse struct { |
|
|
|
|
|
|
|
data []byte |
|
|
|
|
|
|
|
err error |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func NewStorage(rootDir string, k, m int, blockSize uint64) (storage.ObjectStorage, error) { |
|
|
|
func NewStorage(rootDir string, k, m int, blockSize uint64) (storage.ObjectStorage, error) { |
|
|
|
// create storage files
|
|
|
|
// create storage files
|
|
|
|
storageNodes := make([]storage.ObjectStorage, 16) |
|
|
|
storageNodes := make([]storage.ObjectStorage, k+m) |
|
|
|
for i := 0; i < 16; i++ { |
|
|
|
for i := 0; i < k+m; i++ { |
|
|
|
storageNode, err := appendstorage.NewStorage(rootDir, i) |
|
|
|
storageNode, err := appendstorage.NewStorage(rootDir, i) |
|
|
|
storageNodes[i] = storageNode |
|
|
|
storageNodes[i] = storageNode |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
@ -45,7 +68,7 @@ func NewStorage(rootDir string, k, m int, blockSize uint64) (storage.ObjectStora |
|
|
|
} |
|
|
|
} |
|
|
|
encoder := gob.NewDecoder(indexFile) |
|
|
|
encoder := gob.NewDecoder(indexFile) |
|
|
|
err = encoder.Decode(&objects) |
|
|
|
err = encoder.Decode(&objects) |
|
|
|
if err != nil { |
|
|
|
if err != nil && err != io.EOF { |
|
|
|
return nil, err |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -86,10 +109,11 @@ func (eStorage *encodedStorage) Put(objectPath string, object io.Reader) error { |
|
|
|
} |
|
|
|
} |
|
|
|
encoder := erasure.NewEncoder(encoderParameters) |
|
|
|
encoder := erasure.NewEncoder(encoderParameters) |
|
|
|
entry := StorageEntry{ |
|
|
|
entry := StorageEntry{ |
|
|
|
Path: objectPath, |
|
|
|
Path: objectPath, |
|
|
|
Md5sum: "md5sum", |
|
|
|
Md5sum: "md5sum", |
|
|
|
Crc: 24, |
|
|
|
Crc: 0, |
|
|
|
Blocks: make([]StorageBlockEntry, 0), |
|
|
|
Blocks: make([]StorageBlockEntry, 0), |
|
|
|
|
|
|
|
Encoderparams: encoderParameters, |
|
|
|
} |
|
|
|
} |
|
|
|
i := 0 |
|
|
|
i := 0 |
|
|
|
// encode
|
|
|
|
// encode
|
|
|
@ -122,28 +146,6 @@ func (eStorage *encodedStorage) Put(objectPath string, object io.Reader) error { |
|
|
|
return nil |
|
|
|
return nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
type storeRequest struct { |
|
|
|
|
|
|
|
path string |
|
|
|
|
|
|
|
data []byte |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
type storeResponse struct { |
|
|
|
|
|
|
|
data []byte |
|
|
|
|
|
|
|
err error |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
type StorageEntry struct { |
|
|
|
|
|
|
|
Path string |
|
|
|
|
|
|
|
Md5sum string |
|
|
|
|
|
|
|
Crc uint32 |
|
|
|
|
|
|
|
Blocks []StorageBlockEntry |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
type StorageBlockEntry struct { |
|
|
|
|
|
|
|
Index int |
|
|
|
|
|
|
|
Length int |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (eStorage *encodedStorage) storeBlocks(path string, blocks [][]byte) []error { |
|
|
|
func (eStorage *encodedStorage) storeBlocks(path string, blocks [][]byte) []error { |
|
|
|
returnChannels := make([]<-chan error, len(eStorage.diskStorage)) |
|
|
|
returnChannels := make([]<-chan error, len(eStorage.diskStorage)) |
|
|
|
for i, store := range eStorage.diskStorage { |
|
|
|
for i, store := range eStorage.diskStorage { |
|
|
@ -161,16 +163,13 @@ func (eStorage *encodedStorage) storeBlocks(path string, blocks [][]byte) []erro |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (eStorage *encodedStorage) readObject(objectPath string, entry StorageEntry, writer *io.PipeWriter) { |
|
|
|
func (eStorage *encodedStorage) readObject(objectPath string, entry StorageEntry, writer *io.PipeWriter) { |
|
|
|
params, err := erasure.ParseEncoderParams(eStorage.K, eStorage.M, erasure.CAUCHY) |
|
|
|
encoder := erasure.NewEncoder(entry.Encoderparams) |
|
|
|
if err != nil { |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
encoder := erasure.NewEncoder(params) |
|
|
|
|
|
|
|
for i, chunk := range entry.Blocks { |
|
|
|
for i, chunk := range entry.Blocks { |
|
|
|
blockSlices := eStorage.getBlockSlices(objectPath + "$" + strconv.Itoa(i)) |
|
|
|
blockSlices := eStorage.getBlockSlices(objectPath + "$" + strconv.Itoa(i)) |
|
|
|
var blocks [][]byte |
|
|
|
var blocks [][]byte |
|
|
|
for _, slice := range blockSlices { |
|
|
|
for _, slice := range blockSlices { |
|
|
|
if slice.err != nil { |
|
|
|
if slice.err != nil { |
|
|
|
writer.CloseWithError(err) |
|
|
|
writer.CloseWithError(slice.err) |
|
|
|
return |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
blocks = append(blocks, slice.data) |
|
|
|
blocks = append(blocks, slice.data) |
|
|
|