@ -17,7 +17,6 @@
package cmd
import (
"encoding/hex"
"errors"
"io"
"sync"
@ -111,7 +110,9 @@ func getReadDisks(orderedDisks []StorageAPI, index int, dataBlocks int) (readDis
}
// parallelRead - reads chunks in parallel from the disks specified in []readDisks.
func parallelRead ( volume , path string , readDisks [ ] StorageAPI , orderedDisks [ ] StorageAPI , enBlocks [ ] [ ] byte , blockOffset int64 , curChunkSize int64 , bitRotVerify func ( diskIndex int ) bool , pool * bpool . BytePool ) {
func parallelRead ( volume , path string , readDisks , orderedDisks [ ] StorageAPI , enBlocks [ ] [ ] byte ,
blockOffset , curChunkSize int64 , brVerifiers [ ] bitRotVerifier , pool * bpool . BytePool ) {
// WaitGroup to synchronise the read go-routines.
wg := & sync . WaitGroup { }
@ -125,11 +126,15 @@ func parallelRead(volume, path string, readDisks []StorageAPI, orderedDisks []St
go func ( index int ) {
defer wg . Done ( )
// Verify bit rot for the file on this disk.
if ! bitRotVerify ( index ) {
// So that we don't read from this disk for the next block.
orderedDisks [ index ] = nil
return
// evaluate if we need to perform bit-rot checking
needBitRotVerification := true
if brVerifiers [ index ] . isVerified {
needBitRotVerification = false
// if file has bit-rot, do not reuse disk
if brVerifiers [ index ] . hasBitRot {
orderedDisks [ index ] = nil
return
}
}
buf , err := pool . Get ( )
@ -140,7 +145,25 @@ func parallelRead(volume, path string, readDisks []StorageAPI, orderedDisks []St
}
buf = buf [ : curChunkSize ]
_ , err = readDisks [ index ] . ReadFile ( volume , path , blockOffset , buf )
if needBitRotVerification {
_ , err = readDisks [ index ] . ReadFileWithVerify (
volume , path , blockOffset , buf ,
brVerifiers [ index ] . algo ,
brVerifiers [ index ] . checkSum )
} else {
_ , err = readDisks [ index ] . ReadFile ( volume , path ,
blockOffset , buf )
}
// if bit-rot verification was done, store the
// result of verification so we can skip
// re-doing it next time
if needBitRotVerification {
brVerifiers [ index ] . isVerified = true
_ , ok := err . ( hashMismatchError )
brVerifiers [ index ] . hasBitRot = ok
}
if err != nil {
orderedDisks [ index ] = nil
return
@ -153,12 +176,16 @@ func parallelRead(volume, path string, readDisks []StorageAPI, orderedDisks []St
wg . Wait ( )
}
// erasureReadFile - read bytes from erasure coded files and writes to given writer.
// Erasure coded files are read block by block as per given erasureInfo and data chunks
// are decoded into a data block. Data block is trimmed for given offset and length,
// then written to given writer. This function also supports bit-rot detection by
// verifying checksum of individual block's checksum.
func erasureReadFile ( writer io . Writer , disks [ ] StorageAPI , volume string , path string , offset int64 , length int64 , totalLength int64 , blockSize int64 , dataBlocks int , parityBlocks int , checkSums [ ] string , algo string , pool * bpool . BytePool ) ( int64 , error ) {
// erasureReadFile - read bytes from erasure coded files and writes to
// given writer. Erasure coded files are read block by block as per
// given erasureInfo and data chunks are decoded into a data
// block. Data block is trimmed for given offset and length, then
// written to given writer. This function also supports bit-rot
// detection by verifying checksum of individual block's checksum.
func erasureReadFile ( writer io . Writer , disks [ ] StorageAPI , volume , path string ,
offset , length , totalLength , blockSize int64 , dataBlocks , parityBlocks int ,
checkSums [ ] string , algo HashAlgo , pool * bpool . BytePool ) ( int64 , error ) {
// Offset and length cannot be negative.
if offset < 0 || length < 0 {
return 0 , traceError ( errUnexpected )
@ -169,27 +196,15 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
return 0 , traceError ( errUnexpected )
}
// chunkSize is the amount of data that needs to be read from each disk at a time.
// chunkSize is the amount of data that needs to be read from
// each disk at a time.
chunkSize := getChunkSize ( blockSize , dataBlocks )
// bitRotVerify verifies if the file on a particular disk doesn't have bitrot
// by verifying the hash of the contents of the file.
bitRotVerify := func ( ) func ( diskIndex int ) bool {
verified := make ( [ ] bool , len ( disks ) )
// Return closure so that we have reference to []verified and
// not recalculate the hash on it every time the function is
// called for the same disk.
return func ( diskIndex int ) bool {
if verified [ diskIndex ] {
// Already validated.
return true
}
// Is this a valid block?
isValid := isValidBlock ( disks [ diskIndex ] , volume , path , checkSums [ diskIndex ] , algo )
verified [ diskIndex ] = isValid
return isValid
}
} ( )
brVerifiers := make ( [ ] bitRotVerifier , len ( disks ) )
for i := range brVerifiers {
brVerifiers [ i ] . algo = algo
brVerifiers [ i ] . checkSum = checkSums [ i ]
}
// Total bytes written to writer
var bytesWritten int64
@ -241,7 +256,7 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
return bytesWritten , err
}
// Issue a parallel read across the disks specified in readDisks.
parallelRead ( volume , path , readDisks , disks , enBlocks , blockOffset , curChunkSize , bitRotVerify , pool )
parallelRead ( volume , path , readDisks , disks , enBlocks , blockOffset , curChunkSize , brVerifiers , pool )
if isSuccessDecodeBlocks ( enBlocks , dataBlocks ) {
// If enough blocks are available to do rs.Reconstruct()
break
@ -299,27 +314,6 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
return bytesWritten , nil
}
// isValidBlock - calculates the checksum hash for the block and
// validates if its correct returns true for valid cases, false otherwise.
func isValidBlock ( disk StorageAPI , volume , path , checkSum , checkSumAlgo string ) ( ok bool ) {
// Disk is not available, not a valid block.
if disk == nil {
return false
}
// Checksum not available, not a valid block.
if checkSum == "" {
return false
}
// Read everything for a given block and calculate hash.
hashWriter := newHash ( checkSumAlgo )
hashBytes , err := hashSum ( disk , volume , path , hashWriter )
if err != nil {
errorIf ( err , "Unable to calculate checksum %s/%s" , volume , path )
return false
}
return hex . EncodeToString ( hashBytes ) == checkSum
}
// decodeData - decode encoded blocks.
func decodeData ( enBlocks [ ] [ ] byte , dataBlocks , parityBlocks int ) error {
// Initialized reedsolomon.