|
|
@ -2,30 +2,70 @@ package donut |
|
|
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
import ( |
|
|
|
"bytes" |
|
|
|
"bytes" |
|
|
|
|
|
|
|
"errors" |
|
|
|
"io" |
|
|
|
"io" |
|
|
|
"strconv" |
|
|
|
"strconv" |
|
|
|
|
|
|
|
"strings" |
|
|
|
"time" |
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
|
|
"crypto/md5" |
|
|
|
"crypto/md5" |
|
|
|
"encoding/hex" |
|
|
|
"encoding/hex" |
|
|
|
"errors" |
|
|
|
|
|
|
|
"github.com/minio-io/minio/pkg/encoding/erasure" |
|
|
|
"github.com/minio-io/minio/pkg/encoding/erasure" |
|
|
|
"github.com/minio-io/minio/pkg/utils/split" |
|
|
|
"github.com/minio-io/minio/pkg/utils/split" |
|
|
|
"strings" |
|
|
|
|
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// getErasureTechnique - convert technique string into Technique type
|
|
|
|
|
|
|
|
func getErasureTechnique(technique string) (erasure.Technique, error) { |
|
|
|
|
|
|
|
switch true { |
|
|
|
|
|
|
|
case technique == "Cauchy": |
|
|
|
|
|
|
|
return erasure.Cauchy, nil |
|
|
|
|
|
|
|
case technique == "Vandermonde": |
|
|
|
|
|
|
|
return erasure.Cauchy, nil |
|
|
|
|
|
|
|
default: |
|
|
|
|
|
|
|
return -1, errors.New("Invalid erasure technique") |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// erasureReader - returns aligned streaming reads over a PipeWriter
|
|
|
|
// erasureReader - returns aligned streaming reads over a PipeWriter
|
|
|
|
func erasureReader(readers []io.ReadCloser, donutMetadata map[string]string, writer *io.PipeWriter) { |
|
|
|
func erasureReader(readers []io.ReadCloser, donutMetadata map[string]string, writer *io.PipeWriter) { |
|
|
|
// TODO handle errors
|
|
|
|
totalChunks, err := strconv.Atoi(donutMetadata["chunkCount"]) |
|
|
|
totalChunks, _ := strconv.Atoi(donutMetadata["chunkCount"]) |
|
|
|
if err != nil { |
|
|
|
totalLeft, _ := strconv.Atoi(donutMetadata["size"]) |
|
|
|
writer.CloseWithError(err) |
|
|
|
blockSize, _ := strconv.Atoi(donutMetadata["blockSize"]) |
|
|
|
return |
|
|
|
k, _ := strconv.Atoi(donutMetadata["erasureK"]) |
|
|
|
} |
|
|
|
m, _ := strconv.Atoi(donutMetadata["erasureM"]) |
|
|
|
totalLeft, err := strconv.Atoi(donutMetadata["size"]) |
|
|
|
expectedMd5sum, _ := hex.DecodeString(donutMetadata["md5"]) |
|
|
|
if err != nil { |
|
|
|
|
|
|
|
writer.CloseWithError(err) |
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
blockSize, err := strconv.Atoi(donutMetadata["blockSize"]) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
writer.CloseWithError(err) |
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
k, err := strconv.Atoi(donutMetadata["erasureK"]) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
writer.CloseWithError(err) |
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
m, err := strconv.Atoi(donutMetadata["erasureM"]) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
writer.CloseWithError(err) |
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
expectedMd5sum, err := hex.DecodeString(donutMetadata["md5"]) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
writer.CloseWithError(err) |
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
technique, err := getErasureTechnique(donutMetadata["erasureTechnique"]) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
writer.CloseWithError(err) |
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
} |
|
|
|
summer := md5.New() |
|
|
|
summer := md5.New() |
|
|
|
// TODO select technique properly
|
|
|
|
params, _ := erasure.ParseEncoderParams(uint8(k), uint8(m), technique) |
|
|
|
params, _ := erasure.ParseEncoderParams(uint8(k), uint8(m), erasure.Cauchy) |
|
|
|
|
|
|
|
encoder := erasure.NewEncoder(params) |
|
|
|
encoder := erasure.NewEncoder(params) |
|
|
|
for _, reader := range readers { |
|
|
|
for _, reader := range readers { |
|
|
|
defer reader.Close() |
|
|
|
defer reader.Close() |
|
|
@ -36,12 +76,14 @@ func erasureReader(readers []io.ReadCloser, donutMetadata map[string]string, wri |
|
|
|
curBlockSize = blockSize |
|
|
|
curBlockSize = blockSize |
|
|
|
} |
|
|
|
} |
|
|
|
curChunkSize := erasure.GetEncodedChunkLen(curBlockSize, uint8(k)) |
|
|
|
curChunkSize := erasure.GetEncodedChunkLen(curBlockSize, uint8(k)) |
|
|
|
|
|
|
|
|
|
|
|
encodedBytes := make([][]byte, 16) |
|
|
|
encodedBytes := make([][]byte, 16) |
|
|
|
for i, reader := range readers { |
|
|
|
for i, reader := range readers { |
|
|
|
var bytesBuffer bytes.Buffer |
|
|
|
var bytesBuffer bytes.Buffer |
|
|
|
// TODO watch for errors
|
|
|
|
_, err := io.CopyN(&bytesBuffer, reader, int64(curChunkSize)) |
|
|
|
io.CopyN(&bytesBuffer, reader, int64(curChunkSize)) |
|
|
|
if err != nil { |
|
|
|
|
|
|
|
writer.CloseWithError(err) |
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
} |
|
|
|
encodedBytes[i] = bytesBuffer.Bytes() |
|
|
|
encodedBytes[i] = bytesBuffer.Bytes() |
|
|
|
} |
|
|
|
} |
|
|
|
decodedData, err := encoder.Decode(encodedBytes, curBlockSize) |
|
|
|
decodedData, err := encoder.Decode(encodedBytes, curBlockSize) |
|
|
@ -50,7 +92,11 @@ func erasureReader(readers []io.ReadCloser, donutMetadata map[string]string, wri |
|
|
|
return |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
summer.Write(decodedData) |
|
|
|
summer.Write(decodedData) |
|
|
|
io.Copy(writer, bytes.NewBuffer(decodedData)) |
|
|
|
_, err = io.Copy(writer, bytes.NewBuffer(decodedData)) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
writer.CloseWithError(err) |
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
} |
|
|
|
totalLeft = totalLeft - blockSize |
|
|
|
totalLeft = totalLeft - blockSize |
|
|
|
} |
|
|
|
} |
|
|
|
actualMd5sum := summer.Sum(nil) |
|
|
|
actualMd5sum := summer.Sum(nil) |
|
|
@ -59,6 +105,7 @@ func erasureReader(readers []io.ReadCloser, donutMetadata map[string]string, wri |
|
|
|
return |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
writer.Close() |
|
|
|
writer.Close() |
|
|
|
|
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// erasure writer
|
|
|
|
// erasure writer
|
|
|
|