|
|
|
@ -6,16 +6,22 @@ import ( |
|
|
|
|
"strconv" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"encoding/hex" |
|
|
|
|
"errors" |
|
|
|
|
"github.com/minio-io/minio/pkg/encoding/erasure" |
|
|
|
|
"github.com/minio-io/minio/pkg/utils/crypto/sha512" |
|
|
|
|
"github.com/minio-io/minio/pkg/utils/split" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
func erasureReader(readers []io.ReadCloser, donutMetadata map[string]string, writer *io.PipeWriter) { |
|
|
|
|
// TODO handle errors
|
|
|
|
|
totalChunks, _ := strconv.Atoi(donutMetadata["chunkCount"]) |
|
|
|
|
totalLeft, _ := strconv.Atoi(donutMetadata["totalLength"]) |
|
|
|
|
blockSize, _ := strconv.Atoi(donutMetadata["blockSize"]) |
|
|
|
|
k, _ := strconv.Atoi(donutMetadata["erasureK"]) |
|
|
|
|
m, _ := strconv.Atoi(donutMetadata["erasureM"]) |
|
|
|
|
expectedSha512, _ := hex.DecodeString(donutMetadata["sha512"]) |
|
|
|
|
summer := sha512.New() |
|
|
|
|
// TODO select technique properly
|
|
|
|
|
params, _ := erasure.ParseEncoderParams(uint8(k), uint8(m), erasure.Cauchy) |
|
|
|
|
encoder := erasure.NewEncoder(params) |
|
|
|
@ -32,6 +38,7 @@ func erasureReader(readers []io.ReadCloser, donutMetadata map[string]string, wri |
|
|
|
|
encodedBytes := make([][]byte, 16) |
|
|
|
|
for i, reader := range readers { |
|
|
|
|
var bytesBuffer bytes.Buffer |
|
|
|
|
// TODO watch for errors
|
|
|
|
|
io.CopyN(&bytesBuffer, reader, int64(curChunkSize)) |
|
|
|
|
encodedBytes[i] = bytesBuffer.Bytes() |
|
|
|
|
} |
|
|
|
@ -40,9 +47,14 @@ func erasureReader(readers []io.ReadCloser, donutMetadata map[string]string, wri |
|
|
|
|
writer.CloseWithError(err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
summer.Write(decodedData) |
|
|
|
|
io.Copy(writer, bytes.NewBuffer(decodedData)) |
|
|
|
|
totalLeft = totalLeft - blockSize |
|
|
|
|
} |
|
|
|
|
actualSha512 := summer.Sum(nil) |
|
|
|
|
if bytes.Compare(expectedSha512, actualSha512) != 0 { |
|
|
|
|
writer.CloseWithError(errors.New("decoded sha512 did not match")) |
|
|
|
|
} |
|
|
|
|
writer.Close() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -75,16 +87,19 @@ func erasureGoroutine(r *io.PipeReader, eWriter erasureWriter, isClosed chan<- b |
|
|
|
|
encoder := erasure.NewEncoder(params) |
|
|
|
|
chunkCount := 0 |
|
|
|
|
totalLength := 0 |
|
|
|
|
summer := sha512.New() |
|
|
|
|
for chunk := range chunks { |
|
|
|
|
if chunk.Err == nil { |
|
|
|
|
totalLength = totalLength + len(chunk.Data) |
|
|
|
|
encodedBlocks, _ := encoder.Encode(chunk.Data) |
|
|
|
|
summer.Write(chunk.Data) |
|
|
|
|
for blockIndex, block := range encodedBlocks { |
|
|
|
|
io.Copy(eWriter.writers[blockIndex], bytes.NewBuffer(block)) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
chunkCount = chunkCount + 1 |
|
|
|
|
} |
|
|
|
|
dataSha512 := summer.Sum(nil) |
|
|
|
|
metadata := make(map[string]string) |
|
|
|
|
metadata["blockSize"] = strconv.Itoa(10 * 1024 * 1024) |
|
|
|
|
metadata["chunkCount"] = strconv.Itoa(chunkCount) |
|
|
|
@ -93,6 +108,7 @@ func erasureGoroutine(r *io.PipeReader, eWriter erasureWriter, isClosed chan<- b |
|
|
|
|
metadata["erasureM"] = "8" |
|
|
|
|
metadata["erasureTechnique"] = "Cauchy" |
|
|
|
|
metadata["totalLength"] = strconv.Itoa(totalLength) |
|
|
|
|
metadata["sha512"] = hex.EncodeToString(dataSha512) |
|
|
|
|
for _, nodeWriter := range eWriter.writers { |
|
|
|
|
if nodeWriter != nil { |
|
|
|
|
nodeWriter.SetMetadata(eWriter.metadata) |
|
|
|
|