|
|
@ -553,13 +553,31 @@ func (b bucket) decodeEncodedData(totalLeft, blockSize int64, readers map[int]io |
|
|
|
return nil, err.Trace() |
|
|
|
return nil, err.Trace() |
|
|
|
} |
|
|
|
} |
|
|
|
encodedBytes := make([][]byte, encoder.k+encoder.m) |
|
|
|
encodedBytes := make([][]byte, encoder.k+encoder.m) |
|
|
|
|
|
|
|
errCh := make(chan error, len(readers)) |
|
|
|
|
|
|
|
var errRet error |
|
|
|
|
|
|
|
var readCnt int |
|
|
|
|
|
|
|
|
|
|
|
for i, reader := range readers { |
|
|
|
for i, reader := range readers { |
|
|
|
var bytesBuffer bytes.Buffer |
|
|
|
go func(reader io.Reader, i int) { |
|
|
|
_, err := io.CopyN(&bytesBuffer, reader, int64(curChunkSize)) |
|
|
|
encodedBytes[i] = make([]byte, curChunkSize) |
|
|
|
|
|
|
|
_, err := io.ReadFull(reader, encodedBytes[i]) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
return nil, probe.NewError(err) |
|
|
|
encodedBytes[i] = nil |
|
|
|
|
|
|
|
errCh <- err |
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
errCh <- nil |
|
|
|
|
|
|
|
}(reader, i) |
|
|
|
|
|
|
|
// read through errCh for any errors
|
|
|
|
|
|
|
|
err := <-errCh |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
errRet = err |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
readCnt++ |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
encodedBytes[i] = bytesBuffer.Bytes() |
|
|
|
if readCnt < int(encoder.k) { |
|
|
|
|
|
|
|
return nil, probe.NewError(errRet) |
|
|
|
} |
|
|
|
} |
|
|
|
decodedData, err := encoder.Decode(encodedBytes, int(curBlockSize)) |
|
|
|
decodedData, err := encoder.Decode(encodedBytes, int(curBlockSize)) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|