|
|
|
@ -19,7 +19,6 @@ package donut |
|
|
|
|
import ( |
|
|
|
|
"bytes" |
|
|
|
|
"crypto/md5" |
|
|
|
|
"encoding/base64" |
|
|
|
|
"encoding/hex" |
|
|
|
|
"encoding/json" |
|
|
|
|
"errors" |
|
|
|
@ -36,7 +35,7 @@ import ( |
|
|
|
|
|
|
|
|
|
func (b bucket) isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error { |
|
|
|
|
if strings.TrimSpace(expectedMD5Sum) != "" && strings.TrimSpace(actualMD5Sum) != "" { |
|
|
|
|
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(expectedMD5Sum) |
|
|
|
|
expectedMD5SumBytes, err := hex.DecodeString(expectedMD5Sum) |
|
|
|
|
if err != nil { |
|
|
|
|
return iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
@ -52,13 +51,33 @@ func (b bucket) isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error { |
|
|
|
|
return iodine.New(errors.New("invalid argument"), nil) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b bucket) writeDonutObjectMetadata(objectName string, objectMetadata map[string]string) error { |
|
|
|
|
func (b bucket) writeObjectMetadata(objectName string, objectMetadata map[string]string) error { |
|
|
|
|
if len(objectMetadata) == 0 { |
|
|
|
|
return errors.New("invalid argument") |
|
|
|
|
return iodine.New(errors.New("invalid argument"), nil) |
|
|
|
|
} |
|
|
|
|
objectMetadataWriters, err := b.getDiskWriters(objectName, objectMetadataConfig) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
return iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
for _, objectMetadataWriter := range objectMetadataWriters { |
|
|
|
|
defer objectMetadataWriter.Close() |
|
|
|
|
} |
|
|
|
|
for _, objectMetadataWriter := range objectMetadataWriters { |
|
|
|
|
jenc := json.NewEncoder(objectMetadataWriter) |
|
|
|
|
if err := jenc.Encode(objectMetadata); err != nil { |
|
|
|
|
return iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b bucket) writeDonutObjectMetadata(objectName string, objectMetadata map[string]string) error { |
|
|
|
|
if len(objectMetadata) == 0 { |
|
|
|
|
return iodine.New(errors.New("invalid argument"), nil) |
|
|
|
|
} |
|
|
|
|
objectMetadataWriters, err := b.getDiskWriters(objectName, donutObjectMetadataConfig) |
|
|
|
|
if err != nil { |
|
|
|
|
return iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
for _, objectMetadataWriter := range objectMetadataWriters { |
|
|
|
|
defer objectMetadataWriter.Close() |
|
|
|
@ -66,7 +85,7 @@ func (b bucket) writeDonutObjectMetadata(objectName string, objectMetadata map[s |
|
|
|
|
for _, objectMetadataWriter := range objectMetadataWriters { |
|
|
|
|
jenc := json.NewEncoder(objectMetadataWriter) |
|
|
|
|
if err := jenc.Encode(objectMetadata); err != nil { |
|
|
|
|
return err |
|
|
|
|
return iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
@ -80,12 +99,12 @@ func (b bucket) normalizeObjectName(objectName string) string { |
|
|
|
|
|
|
|
|
|
func (b bucket) getDataAndParity(totalWriters int) (k uint8, m uint8, err error) { |
|
|
|
|
if totalWriters <= 1 { |
|
|
|
|
return 0, 0, errors.New("invalid argument") |
|
|
|
|
return 0, 0, iodine.New(errors.New("invalid argument"), nil) |
|
|
|
|
} |
|
|
|
|
quotient := totalWriters / 2 // not using float or abs to let integer round off to lower value
|
|
|
|
|
// quotient cannot be bigger than (255 / 2) = 127
|
|
|
|
|
if quotient > 127 { |
|
|
|
|
return 0, 0, errors.New("parity over flow") |
|
|
|
|
return 0, 0, iodine.New(errors.New("parity over flow"), nil) |
|
|
|
|
} |
|
|
|
|
remainder := totalWriters % 2 // will be 1 for odd and 0 for even numbers
|
|
|
|
|
k = uint8(quotient + remainder) |
|
|
|
@ -97,7 +116,7 @@ func (b bucket) writeEncodedData(k, m uint8, writers []io.WriteCloser, objectDat |
|
|
|
|
chunks := split.Stream(objectData, 10*1024*1024) |
|
|
|
|
encoder, err := NewEncoder(k, m, "Cauchy") |
|
|
|
|
if err != nil { |
|
|
|
|
return 0, 0, err |
|
|
|
|
return 0, 0, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
chunkCount := 0 |
|
|
|
|
totalLength := 0 |
|
|
|
@ -109,7 +128,7 @@ func (b bucket) writeEncodedData(k, m uint8, writers []io.WriteCloser, objectDat |
|
|
|
|
for blockIndex, block := range encodedBlocks { |
|
|
|
|
_, err := io.Copy(writers[blockIndex], bytes.NewBuffer(block)) |
|
|
|
|
if err != nil { |
|
|
|
|
return 0, 0, err |
|
|
|
|
return 0, 0, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -118,45 +137,46 @@ func (b bucket) writeEncodedData(k, m uint8, writers []io.WriteCloser, objectDat |
|
|
|
|
return chunkCount, totalLength, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b bucket) readEncodedData(objectName string, writer *io.PipeWriter, objectMetadata map[string]string) { |
|
|
|
|
expectedMd5sum, err := hex.DecodeString(objectMetadata["md5"]) |
|
|
|
|
func (b bucket) readEncodedData(objectName string, writer *io.PipeWriter, donutObjectMetadata map[string]string) { |
|
|
|
|
expectedMd5sum, err := hex.DecodeString(donutObjectMetadata["sys.md5"]) |
|
|
|
|
if err != nil { |
|
|
|
|
writer.CloseWithError(err) |
|
|
|
|
writer.CloseWithError(iodine.New(err, nil)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
readers, err := b.getDiskReaders(objectName, "data") |
|
|
|
|
if err != nil { |
|
|
|
|
writer.CloseWithError(err) |
|
|
|
|
writer.CloseWithError(iodine.New(err, nil)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
hasher := md5.New() |
|
|
|
|
mwriter := io.MultiWriter(writer, hasher) |
|
|
|
|
switch len(readers) == 1 { |
|
|
|
|
case false: |
|
|
|
|
totalChunks, totalLeft, blockSize, k, m, err := b.metadata2Values(objectMetadata) |
|
|
|
|
totalChunks, totalLeft, blockSize, k, m, err := b.metadata2Values(donutObjectMetadata) |
|
|
|
|
if err != nil { |
|
|
|
|
writer.CloseWithError(err) |
|
|
|
|
writer.CloseWithError(iodine.New(err, nil)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
technique, ok := objectMetadata["erasureTechnique"] |
|
|
|
|
technique, ok := donutObjectMetadata["sys.erasureTechnique"] |
|
|
|
|
if !ok { |
|
|
|
|
writer.CloseWithError(errors.New("missing erasure Technique")) |
|
|
|
|
err := errors.New("missing erasure Technique") |
|
|
|
|
writer.CloseWithError(iodine.New(err, nil)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
encoder, err := NewEncoder(uint8(k), uint8(m), technique) |
|
|
|
|
if err != nil { |
|
|
|
|
writer.CloseWithError(err) |
|
|
|
|
writer.CloseWithError(iodine.New(err, nil)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
for i := 0; i < totalChunks; i++ { |
|
|
|
|
decodedData, err := b.decodeData(totalLeft, blockSize, readers, encoder, writer) |
|
|
|
|
if err != nil { |
|
|
|
|
writer.CloseWithError(err) |
|
|
|
|
writer.CloseWithError(iodine.New(err, nil)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
_, err = io.Copy(mwriter, bytes.NewBuffer(decodedData)) |
|
|
|
|
if err != nil { |
|
|
|
|
writer.CloseWithError(err) |
|
|
|
|
writer.CloseWithError(iodine.New(err, nil)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
totalLeft = totalLeft - int64(blockSize) |
|
|
|
@ -164,13 +184,14 @@ func (b bucket) readEncodedData(objectName string, writer *io.PipeWriter, object |
|
|
|
|
case true: |
|
|
|
|
_, err := io.Copy(writer, readers[0]) |
|
|
|
|
if err != nil { |
|
|
|
|
writer.CloseWithError(err) |
|
|
|
|
writer.CloseWithError(iodine.New(err, nil)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// check if decodedData md5sum matches
|
|
|
|
|
if !bytes.Equal(expectedMd5sum, hasher.Sum(nil)) { |
|
|
|
|
writer.CloseWithError(errors.New("checksum mismatch")) |
|
|
|
|
err := errors.New("checksum mismatch") |
|
|
|
|
writer.CloseWithError(iodine.New(err, nil)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
writer.Close() |
|
|
|
@ -186,31 +207,46 @@ func (b bucket) decodeData(totalLeft, blockSize int64, readers []io.ReadCloser, |
|
|
|
|
} |
|
|
|
|
curChunkSize, err := encoder.GetEncodedBlockLen(int(curBlockSize)) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
return nil, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
encodedBytes := make([][]byte, len(readers)) |
|
|
|
|
for i, reader := range readers { |
|
|
|
|
var bytesBuffer bytes.Buffer |
|
|
|
|
_, err := io.CopyN(&bytesBuffer, reader, int64(curChunkSize)) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
return nil, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
encodedBytes[i] = bytesBuffer.Bytes() |
|
|
|
|
} |
|
|
|
|
decodedData, err := encoder.Decode(encodedBytes, int(curBlockSize)) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
return nil, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
return decodedData, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b bucket) metadata2Values(objectMetadata map[string]string) (totalChunks int, totalLeft, blockSize int64, k, m uint64, err error) { |
|
|
|
|
totalChunks, err = strconv.Atoi(objectMetadata["chunkCount"]) |
|
|
|
|
totalLeft, err = strconv.ParseInt(objectMetadata["size"], 10, 64) |
|
|
|
|
blockSize, err = strconv.ParseInt(objectMetadata["blockSize"], 10, 64) |
|
|
|
|
k, err = strconv.ParseUint(objectMetadata["erasureK"], 10, 8) |
|
|
|
|
m, err = strconv.ParseUint(objectMetadata["erasureM"], 10, 8) |
|
|
|
|
return |
|
|
|
|
func (b bucket) metadata2Values(donutObjectMetadata map[string]string) (totalChunks int, totalLeft, blockSize int64, k, m uint64, err error) { |
|
|
|
|
totalChunks, err = strconv.Atoi(donutObjectMetadata["sys.chunkCount"]) |
|
|
|
|
if err != nil { |
|
|
|
|
return 0, 0, 0, 0, 0, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
totalLeft, err = strconv.ParseInt(donutObjectMetadata["sys.size"], 10, 64) |
|
|
|
|
if err != nil { |
|
|
|
|
return 0, 0, 0, 0, 0, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
blockSize, err = strconv.ParseInt(donutObjectMetadata["sys.blockSize"], 10, 64) |
|
|
|
|
if err != nil { |
|
|
|
|
return 0, 0, 0, 0, 0, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
k, err = strconv.ParseUint(donutObjectMetadata["sys.erasureK"], 10, 8) |
|
|
|
|
if err != nil { |
|
|
|
|
return 0, 0, 0, 0, 0, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
m, err = strconv.ParseUint(donutObjectMetadata["sys.erasureM"], 10, 8) |
|
|
|
|
if err != nil { |
|
|
|
|
return 0, 0, 0, 0, 0, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
return totalChunks, totalLeft, blockSize, k, m, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b bucket) getDiskReaders(objectName, objectMeta string) ([]io.ReadCloser, error) { |
|
|
|
@ -219,7 +255,7 @@ func (b bucket) getDiskReaders(objectName, objectMeta string) ([]io.ReadCloser, |
|
|
|
|
for _, node := range b.nodes { |
|
|
|
|
disks, err := node.ListDisks() |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
return nil, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
readers = make([]io.ReadCloser, len(disks)) |
|
|
|
|
for _, disk := range disks { |
|
|
|
@ -227,7 +263,7 @@ func (b bucket) getDiskReaders(objectName, objectMeta string) ([]io.ReadCloser, |
|
|
|
|
objectPath := path.Join(b.donutName, bucketSlice, objectName, objectMeta) |
|
|
|
|
objectSlice, err := disk.OpenFile(objectPath) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
return nil, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
readers[disk.GetOrder()] = objectSlice |
|
|
|
|
} |
|
|
|
@ -242,7 +278,7 @@ func (b bucket) getDiskWriters(objectName, objectMeta string) ([]io.WriteCloser, |
|
|
|
|
for _, node := range b.nodes { |
|
|
|
|
disks, err := node.ListDisks() |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
return nil, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
writers = make([]io.WriteCloser, len(disks)) |
|
|
|
|
for _, disk := range disks { |
|
|
|
@ -250,7 +286,7 @@ func (b bucket) getDiskWriters(objectName, objectMeta string) ([]io.WriteCloser, |
|
|
|
|
objectPath := path.Join(b.donutName, bucketSlice, objectName, objectMeta) |
|
|
|
|
objectSlice, err := disk.MakeFile(objectPath) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
return nil, iodine.New(err, nil) |
|
|
|
|
} |
|
|
|
|
writers[disk.GetOrder()] = objectSlice |
|
|
|
|
} |
|
|
|
|