|
|
@ -17,6 +17,7 @@ |
|
|
|
package encoded |
|
|
|
package encoded |
|
|
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
import ( |
|
|
|
|
|
|
|
"bytes" |
|
|
|
"errors" |
|
|
|
"errors" |
|
|
|
"github.com/minio-io/minio/pkg/donutbox" |
|
|
|
"github.com/minio-io/minio/pkg/donutbox" |
|
|
|
"github.com/minio-io/minio/pkg/encoding/erasure" |
|
|
|
"github.com/minio-io/minio/pkg/encoding/erasure" |
|
|
@ -67,7 +68,57 @@ func (diskStorage StorageDriver) GetBucketPolicy(bucket string) (storage.BucketP |
|
|
|
|
|
|
|
|
|
|
|
// GetObject retrieves an object and writes it to a writer
|
|
|
|
// GetObject retrieves an object and writes it to a writer
|
|
|
|
func (diskStorage StorageDriver) GetObject(target io.Writer, bucket, key string) (int64, error) { |
|
|
|
func (diskStorage StorageDriver) GetObject(target io.Writer, bucket, key string) (int64, error) { |
|
|
|
return 0, errors.New("Not Implemented") |
|
|
|
metadata, err := diskStorage.donutBox.GetObjectMetadata(bucket, key, 0) |
|
|
|
|
|
|
|
chunkCount, err := strconv.Atoi(metadata["chunkCount"]) |
|
|
|
|
|
|
|
columnCount, err := strconv.Atoi(metadata["columnCount"]) |
|
|
|
|
|
|
|
blockSize, err := strconv.Atoi(metadata["blockSize"]) |
|
|
|
|
|
|
|
length, err := strconv.Atoi(metadata["length"]) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
return 0, err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
var readers []io.Reader |
|
|
|
|
|
|
|
for column := 0; column < columnCount; column++ { |
|
|
|
|
|
|
|
reader, err := diskStorage.donutBox.GetObjectReader(bucket, key, uint(column)) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
return 0, err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
readers = append(readers, reader) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
totalWritten := int64(length) |
|
|
|
|
|
|
|
totalRemaining := int64(length) |
|
|
|
|
|
|
|
if err != err { |
|
|
|
|
|
|
|
return 0, err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
params, err := erasure.ParseEncoderParams(8, 8, erasure.Cauchy) |
|
|
|
|
|
|
|
decoder := erasure.NewEncoder(params) |
|
|
|
|
|
|
|
for chunk := 0; chunk < chunkCount; chunk++ { |
|
|
|
|
|
|
|
blocks := make([][]byte, columnCount) |
|
|
|
|
|
|
|
for column := 0; column < columnCount; column++ { |
|
|
|
|
|
|
|
var block bytes.Buffer |
|
|
|
|
|
|
|
limitReader := io.LimitReader(readers[column], int64(blockSize)) |
|
|
|
|
|
|
|
_, err := io.Copy(&block, limitReader) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
return totalWritten, err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
blocks[column] = block.Bytes() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
curBlockSize := blockSize |
|
|
|
|
|
|
|
if totalRemaining < int64(blockSize) { |
|
|
|
|
|
|
|
curBlockSize = int(totalRemaining) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
original, err := decoder.Decode(blocks, curBlockSize) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
return totalWritten, err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
curWritten, err := io.Copy(target, bytes.NewBuffer(original)) |
|
|
|
|
|
|
|
totalRemaining = totalRemaining - curWritten |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
return totalWritten, err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return totalWritten, nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// GetPartialObject retrieves an object and writes it to a writer
|
|
|
|
// GetPartialObject retrieves an object and writes it to a writer
|
|
|
@ -100,12 +151,14 @@ func (diskStorage StorageDriver) CreateObject(bucketKey string, objectKey string |
|
|
|
writers[i] = newWriter |
|
|
|
writers[i] = newWriter |
|
|
|
} |
|
|
|
} |
|
|
|
totalLength := uint64(0) |
|
|
|
totalLength := uint64(0) |
|
|
|
|
|
|
|
chunkCount := 0 |
|
|
|
for chunk := range splitStream { |
|
|
|
for chunk := range splitStream { |
|
|
|
params, err := erasure.ParseEncoderParams(8, 8, erasure.Cauchy) |
|
|
|
params, err := erasure.ParseEncoderParams(8, 8, erasure.Cauchy) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
totalLength = totalLength + uint64(len(chunk.Data)) |
|
|
|
totalLength = totalLength + uint64(len(chunk.Data)) |
|
|
|
|
|
|
|
chunkCount = chunkCount + 1 |
|
|
|
encoder := erasure.NewEncoder(params) |
|
|
|
encoder := erasure.NewEncoder(params) |
|
|
|
if chunk.Err == nil { |
|
|
|
if chunk.Err == nil { |
|
|
|
parts, _ := encoder.Encode(chunk.Data) |
|
|
|
parts, _ := encoder.Encode(chunk.Data) |
|
|
@ -123,16 +176,20 @@ func (diskStorage StorageDriver) CreateObject(bucketKey string, objectKey string |
|
|
|
// write
|
|
|
|
// write
|
|
|
|
} |
|
|
|
} |
|
|
|
// close connections
|
|
|
|
// close connections
|
|
|
|
closeAllWriters(writers) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
metadata := make(map[string]string) |
|
|
|
metadata := make(map[string]string) |
|
|
|
metadata["length"] = strconv.FormatUint(totalLength, 10) |
|
|
|
metadata["length"] = strconv.FormatUint(totalLength, 10) |
|
|
|
metadata["blockSize"] = strconv.FormatUint(uint64(blockSize), 10) |
|
|
|
metadata["blockSize"] = strconv.FormatUint(uint64(blockSize), 10) |
|
|
|
|
|
|
|
metadata["chunkCount"] = strconv.FormatUint(uint64(chunkCount), 10) |
|
|
|
|
|
|
|
metadata["columnCount"] = strconv.FormatUint(uint64(16), 10) |
|
|
|
// metadata["md5"] := md5sum
|
|
|
|
// metadata["md5"] := md5sum
|
|
|
|
for column := uint(0); column < 16; column++ { |
|
|
|
for column := uint(0); column < 16; column++ { |
|
|
|
writers[column].SetMetadata(metadata) |
|
|
|
writers[column].SetMetadata(metadata) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// TODO capture errors in writers, enough should pass before returning
|
|
|
|
|
|
|
|
closeAllWriters(writers) |
|
|
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
return nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|