@ -31,22 +31,59 @@ var errHealRequired = errors.New("heal required")
// Reads in parallel from readers.
type parallelReader struct {
readers [ ] io . ReaderAt
orgReaders [ ] io . ReaderAt
dataBlocks int
offset int64
shardSize int64
shardFileSize int64
buf [ ] [ ] byte
readerToBuf [ ] int
}
// newParallelReader returns parallelReader.
func newParallelReader ( readers [ ] io . ReaderAt , e Erasure , offset , totalLength int64 ) * parallelReader {
r2b := make ( [ ] int , len ( readers ) )
for i := range r2b {
r2b [ i ] = i
}
return & parallelReader {
readers ,
e . dataBlocks ,
( offset / e . blockSize ) * e . ShardSize ( ) ,
e . ShardSize ( ) ,
e . ShardFileSize ( totalLength ) ,
make ( [ ] [ ] byte , len ( readers ) ) ,
readers : readers ,
orgReaders : readers ,
dataBlocks : e . dataBlocks ,
offset : ( offset / e . blockSize ) * e . ShardSize ( ) ,
shardSize : e . ShardSize ( ) ,
shardFileSize : e . ShardFileSize ( totalLength ) ,
buf : make ( [ ] [ ] byte , len ( readers ) ) ,
readerToBuf : r2b ,
}
}
// preferReaders can mark readers as preferred.
// These will be chosen before others.
func ( p * parallelReader ) preferReaders ( prefer [ ] bool ) {
if len ( prefer ) != len ( p . orgReaders ) {
return
}
// Copy so we don't change our input.
tmp := make ( [ ] io . ReaderAt , len ( p . orgReaders ) )
copy ( tmp , p . orgReaders )
p . readers = tmp
// next is the next non-preferred index.
next := 0
for i , ok := range prefer {
if ! ok || p . readers [ i ] == nil {
continue
}
if i == next {
next ++
continue
}
// Move reader with index i to index next.
// Do this by swapping next and i
p . readers [ next ] , p . readers [ i ] = p . readers [ i ] , p . readers [ next ]
p . readerToBuf [ next ] = i
p . readerToBuf [ i ] = next
next ++
}
}
@ -54,7 +91,7 @@ func newParallelReader(readers []io.ReaderAt, e Erasure, offset, totalLength int
func ( p * parallelReader ) canDecode ( buf [ ] [ ] byte ) bool {
bufCount := 0
for _ , b := range buf {
if b != nil {
if len ( b ) > 0 {
bufCount ++
}
}
@ -62,13 +99,23 @@ func (p *parallelReader) canDecode(buf [][]byte) bool {
}
// Read reads from readers in parallel. Returns p.dataBlocks number of bufs.
func ( p * parallelReader ) Read ( ) ( [ ] [ ] byte , error ) {
newBuf := make ( [ ] [ ] byte , len ( p . readers ) )
func ( p * parallelReader ) Read ( dst [ ] [ ] byte ) ( [ ] [ ] byte , error ) {
newBuf := dst
if len ( dst ) != len ( p . readers ) {
newBuf = make ( [ ] [ ] byte , len ( p . readers ) )
} else {
for i := range newBuf {
newBuf [ i ] = newBuf [ i ] [ : 0 ]
}
}
var newBufLK sync . RWMutex
if p . offset + p . shardSize > p . shardFileSize {
p . shardSize = p . shardFileSize - p . offset
}
if p . shardSize == 0 {
return newBuf , nil
}
readTriggerCh := make ( chan bool , len ( p . readers ) )
for i := 0 ; i < p . dataBlocks ; i ++ {
@ -104,26 +151,30 @@ func (p *parallelReader) Read() ([][]byte, error) {
readTriggerCh <- true
return
}
if p . buf [ i ] == nil {
bufIdx := p . readerToBuf [ i ]
if p . buf [ bufIdx ] == nil {
// Reading first time on this disk, hence the buffer needs to be allocated.
// Subsequent reads will re-use this buffer.
p . buf [ i ] = make ( [ ] byte , p . shardSize )
p . buf [ bufIdx ] = make ( [ ] byte , p . shardSize )
}
// For the last shard, the shardsize might be less than previous shard sizes.
// Hence the following statement ensures that the buffer size is reset to the right size.
p . buf [ i ] = p . buf [ i ] [ : p . shardSize ]
_ , err := disk . ReadAt ( p . buf [ i ] , p . offset )
p . buf [ bufIdx ] = p . buf [ bufIdx ] [ : p . shardSize ]
_ , err := disk . ReadAt ( p . buf [ bufIdx ] , p . offset )
if err != nil {
if _ , ok := err . ( * errHashMismatch ) ; ok {
atomic . StoreInt32 ( & healRequired , 1 )
}
// This will be communicated upstream.
p . orgReaders [ bufIdx ] = nil
p . readers [ i ] = nil
// Since ReadAt returned error, trigger another read.
readTriggerCh <- true
return
}
newBufLK . Lock ( )
newBuf [ i ] = p . buf [ i ]
newBuf [ bufIdx ] = p . buf [ bufIdx ]
newBufLK . Unlock ( )
// Since ReadAt returned success, there is no need to trigger another read.
readTriggerCh <- false
@ -156,8 +207,9 @@ func (err *errDecodeHealRequired) Unwrap() error {
}
// Decode reads from readers, reconstructs data if needed and writes the data to the writer.
func ( e Erasure ) Decode ( ctx context . Context , writer io . Writer , readers [ ] io . ReaderAt , offset , length , totalLength int64 ) error {
healRequired , err := e . decode ( ctx , writer , readers , offset , length , totalLength )
// A set of preferred drives can be supplied. In that case they will be used and the data reconstructed.
func ( e Erasure ) Decode ( ctx context . Context , writer io . Writer , readers [ ] io . ReaderAt , offset , length , totalLength int64 , prefer [ ] bool ) error {
healRequired , err := e . decode ( ctx , writer , readers , offset , length , totalLength , prefer )
if healRequired {
return & errDecodeHealRequired { err }
}
@ -166,7 +218,7 @@ func (e Erasure) Decode(ctx context.Context, writer io.Writer, readers []io.Read
}
// Decode reads from readers, reconstructs data if needed and writes the data to the writer.
func ( e Erasure ) decode ( ctx context . Context , writer io . Writer , readers [ ] io . ReaderAt , offset , length , totalLength int64 ) ( bool , error ) {
func ( e Erasure ) decode ( ctx context . Context , writer io . Writer , readers [ ] io . ReaderAt , offset , length , totalLength int64 , prefer [ ] bool ) ( bool , error ) {
if offset < 0 || length < 0 {
logger . LogIf ( ctx , errInvalidArgument )
return false , errInvalidArgument
@ -180,12 +232,16 @@ func (e Erasure) decode(ctx context.Context, writer io.Writer, readers []io.Read
}
reader := newParallelReader ( readers , e , offset , totalLength )
if len ( prefer ) == len ( readers ) {
reader . preferReaders ( prefer )
}
startBlock := offset / e . blockSize
endBlock := ( offset + length ) / e . blockSize
var healRequired bool
var bytesWritten int64
var bufs [ ] [ ] byte
for block := startBlock ; block <= endBlock ; block ++ {
var blockOffset , blockLength int64
switch {
@ -205,9 +261,11 @@ func (e Erasure) decode(ctx context.Context, writer io.Writer, readers []io.Read
if blockLength == 0 {
break
}
bufs , err := reader . Read ( )
var err error
bufs , err = reader . Read ( bufs )
if err != nil {
if errors . Is ( err , errHealRequired ) {
// errHealRequired is only returned if there are be enough data for reconstruction.
healRequired = true
} else {
return healRequired , err