@ -22,10 +22,74 @@ import (
"github.com/minio/minio/pkg/errors"
)
// ReadFile reads as much data as requested from the file under the given volume and path and writes the data to the provided writer.
// The algorithm and the keys/checksums are used to verify the integrity of the given file. ReadFile will read data from the given offset
// up to the given length. If parts of the file are corrupted ReadFile tries to reconstruct the data.
func ( s ErasureStorage ) ReadFile ( writer io . Writer , volume , path string , offset , length int64 , totalLength int64 , checksums [ ] [ ] byte , algorithm BitrotAlgorithm , blocksize int64 ) ( f ErasureFileInfo , err error ) {
type errIdx struct {
idx int
err error
}
func ( s ErasureStorage ) readConcurrent ( volume , path string , offset , length int64 ,
verifiers [ ] * BitrotVerifier ) ( buffers [ ] [ ] byte , needsReconstruction bool ,
err error ) {
errChan := make ( chan errIdx )
stageBuffers := make ( [ ] [ ] byte , len ( s . disks ) )
buffers = make ( [ ] [ ] byte , len ( s . disks ) )
readDisk := func ( i int ) {
stageBuffers [ i ] = make ( [ ] byte , length )
disk := s . disks [ i ]
if disk == OfflineDisk {
errChan <- errIdx { i , errors . Trace ( errDiskNotFound ) }
return
}
_ , rerr := disk . ReadFile ( volume , path , offset , stageBuffers [ i ] , verifiers [ i ] )
errChan <- errIdx { i , rerr }
}
var finishedCount , successCount , launchIndex int
for ; launchIndex < s . dataBlocks ; launchIndex ++ {
go readDisk ( launchIndex )
}
for finishedCount < launchIndex {
select {
case errVal := <- errChan :
finishedCount ++
if errVal . err != nil {
// TODO: meaningfully log the disk read error
// A disk failed to return data, so we
// request an additional disk if possible
if launchIndex < s . dataBlocks + s . parityBlocks {
needsReconstruction = true
// requiredBlocks++
go readDisk ( launchIndex )
launchIndex ++
}
} else {
successCount ++
buffers [ errVal . idx ] = stageBuffers [ errVal . idx ]
stageBuffers [ errVal . idx ] = nil
}
}
}
if successCount != s . dataBlocks {
// Not enough disks returns data.
err = errors . Trace ( errXLReadQuorum )
}
return
}
// ReadFile reads as much data as requested from the file under the
// given volume and path and writes the data to the provided writer.
// The algorithm and the keys/checksums are used to verify the
// integrity of the given file. ReadFile will read data from the given
// offset up to the given length. If parts of the file are corrupted
// ReadFile tries to reconstruct the data.
func ( s ErasureStorage ) ReadFile ( writer io . Writer , volume , path string , offset ,
length , totalLength int64 , checksums [ ] [ ] byte , algorithm BitrotAlgorithm ,
blocksize int64 ) ( f ErasureFileInfo , err error ) {
if offset < 0 || length < 0 {
return f , errors . Trace ( errUnexpected )
}
@ -44,117 +108,122 @@ func (s ErasureStorage) ReadFile(writer io.Writer, volume, path string, offset,
}
verifiers [ i ] = NewBitrotVerifier ( algorithm , checksums [ i ] )
}
errChans := make ( [ ] chan error , len ( s . disks ) )
for i := range errChans {
errChans [ i ] = make ( chan error , 1 )
chunksize := ceilFrac ( blocksize , int64 ( s . dataBlocks ) )
// We read all whole-blocks of erasure coded data containing
// the requested data range.
//
// The start index of the erasure coded block containing the
// `offset` byte of data is:
partDataStartIndex := ( offset / blocksize ) * chunksize
// The start index of the erasure coded block containing the
// (last) byte of data at the index `offset + length - 1` is:
blockStartIndex := ( ( offset + length - 1 ) / blocksize ) * chunksize
// However, we need the end index of the e.c. block containing
// the last byte - we need to check if that block is the last
// block in the part (in that case, it may be have a different
// chunk size)
isLastBlock := ( totalLength - 1 ) / blocksize == ( offset + length - 1 ) / blocksize
var partDataEndIndex int64
if isLastBlock {
lastBlockChunkSize := chunksize
if totalLength % blocksize != 0 {
lastBlockChunkSize = ceilFrac ( totalLength % blocksize , int64 ( s . dataBlocks ) )
}
partDataEndIndex = blockStartIndex + lastBlockChunkSize - 1
} else {
partDataEndIndex = blockStartIndex + chunksize - 1
}
lastBlock := totalLength / blocksize
startOffset := offset % blocksize
chunksize := getChunkSize ( blocksize , s . dataBlocks )
blocks := make ( [ ] [ ] byte , len ( s . disks ) )
for i := range blocks {
blocks [ i ] = make ( [ ] byte , chunksize )
// Thus, the length of data to be read from the part file(s) is:
partDataLength := partDataEndIndex - partDataStartIndex + 1
// The calculation above does not apply when length == 0:
if length == 0 {
partDataLength = 0
}
for off := offset / blocksize ; length > 0 ; off ++ {
blockOffset := off * chunksize
if currentBlock := ( offset + f . Size ) / blocksize ; currentBlock == lastBlock {
blocksize = totalLength % blocksize
chunksize = getChunkSize ( blocksize , s . dataBlocks )
var buffers [ ] [ ] byte
var needsReconstruction bool
buffers , needsReconstruction , err = s . readConcurrent ( volume , path ,
partDataStartIndex , partDataLength , verifiers )
if err != nil {
// Could not read enough disks.
return
}
numChunks := ceilFrac ( partDataLength , chunksize )
blocks := make ( [ ] [ ] byte , len ( s . disks ) )
if needsReconstruction && numChunks > 1 {
// Allocate once for all the equal length blocks. The
// last block may have a different length - allocation
// for this happens inside the for loop below.
for i := range blocks {
blocks [ i ] = blocks [ i ] [ : chunksize ]
if len ( buffers [ i ] ) == 0 {
blocks [ i ] = make ( [ ] byte , chunksize )
}
}
err = s . readConcurrent ( volume , path , blockOffset , blocks , verifiers , errChans )
if err != nil {
return f , errors . Trace ( errXLReadQuorum )
}
writeLength := blocksize - startOffset
if length < writeLength {
writeLength = length
var buffOffset int64
for chunkNumber := int64 ( 0 ) ; chunkNumber < numChunks ; chunkNumber ++ {
if chunkNumber == numChunks - 1 && partDataLength % chunksize != 0 {
chunksize = partDataLength % chunksize
// We allocate again as the last chunk has a
// different size.
for i := range blocks {
if len ( buffers [ i ] ) == 0 {
blocks [ i ] = make ( [ ] byte , chunksize )
}
n , err := writeDataBlocks ( writer , blocks , s . dataBlocks , startOffset , writeLength )
if err != nil {
return f , err
}
startOffset = 0
f . Size += n
length -= n
}
f . Algorithm = algorithm
for i , disk := range s . disks {
if disk == OfflineDisk {
continue
for i := range blocks {
if len ( buffers [ i ] ) == 0 {
blocks [ i ] = blocks [ i ] [ 0 : 0 ]
}
f . Checksums [ i ] = verifiers [ i ] . Sum ( nil )
}
return f , nil
}
func erasureCountMissingBlocks ( blocks [ ] [ ] byte , limit int ) int {
missing := 0
for i := range blocks [ : limit ] {
if len ( blocks [ i ] ) == 0 {
missing ++
for i := range blocks {
if len ( buffers [ i ] ) != 0 {
blocks [ i ] = buffers [ i ] [ buffOffset : buffOffset + chunksize ]
}
}
return missing
}
// readConcurrent reads all requested data concurrently from the disks into blocks. It returns an error if
// too many disks failed while reading.
func ( s * ErasureStorage ) readConcurrent ( volume , path string , offset int64 , blocks [ ] [ ] byte , verifiers [ ] * BitrotVerifier , errChans [ ] chan error ) ( err error ) {
errs := make ( [ ] error , len ( s . disks ) )
buffOffset += chunksize
erasureReadBlocksConcurrent ( s . disks [ : s . dataBlocks ] , volume , path , offset , blocks [ : s . dataBlocks ] , verifiers [ : s . dataBlocks ] , errs [ : s . dataBlocks ] , errChans [ : s . dataBlocks ] )
missingDataBlocks := erasureCountMissingBlocks ( blocks , s . dataBlocks )
mustReconstruct := missingDataBlocks > 0
if mustReconstruct {
requiredReads := s . dataBlocks + missingDataBlocks
if requiredReads > s . dataBlocks + s . parityBlocks {
return errXLReadQuorum
if needsReconstruction {
if err = s . ErasureDecodeDataBlocks ( blocks ) ; err != nil {
return f , errors . Trace ( err )
}
erasureReadBlocksConcurrent ( s . disks [ s . dataBlocks : requiredReads ] , volume , path , offset , blocks [ s . dataBlocks : requiredReads ] , verifiers [ s . dataBlocks : requiredReads ] , errs [ s . dataBlocks : requiredReads ] , errChans [ s . dataBlocks : requiredReads ] )
if erasureCountMissingBlocks ( blocks , requiredReads ) > 0 {
erasureReadBlocksConcurrent ( s . disks [ requiredReads : ] , volume , path , offset , blocks [ requiredReads : ] , verifiers [ requiredReads : ] , errs [ requiredReads : ] , errChans [ requiredReads : ] )
}
var writeStart int64
if chunkNumber == 0 {
writeStart = offset % blocksize
}
if err = reduceReadQuorumErrs ( errs , [ ] error { } , s . dataBlocks ) ; err != nil {
return err
writeLength := blocksize - writeStart
if chunkNumber == numChunks - 1 {
lastBlockLength := ( offset + length ) % blocksize
if lastBlockLength != 0 {
writeLength = lastBlockLength - writeStart
}
if mustReconstruct {
if err = s . ErasureDecodeDataBlocks ( blocks ) ; err != nil {
return err
}
n , err := writeDataBlocks ( writer , blocks , s . dataBlocks , writeStart , writeLength )
if err != nil {
return f , err
}
return nil
}
// erasureReadBlocksConcurrent reads all data from each disk to each data block in parallel.
// Therefore disks, blocks, verifiers errors and locks must have the same length.
func erasureReadBlocksConcurrent ( disks [ ] StorageAPI , volume , path string , offset int64 , blocks [ ] [ ] byte , verifiers [ ] * BitrotVerifier , errors [ ] error , errChans [ ] chan error ) {
for i := range errChans {
go erasureReadFromFile ( disks [ i ] , volume , path , offset , blocks [ i ] , verifiers [ i ] , errChans [ i ] )
}
for i := range errChans {
errors [ i ] = <- errChans [ i ] // blocks until the go routine 'i' is done - no data race
if errors [ i ] != nil {
disks [ i ] = OfflineDisk
blocks [ i ] = blocks [ i ] [ : 0 ] // mark shard as missing
}
f . Size += n
}
}
// erasureReadFromFile reads data from the disk to buffer in parallel.
// It sends the returned error through the error channel.
func erasureReadFromFile ( disk StorageAPI , volume , path string , offset int64 , buffer [ ] byte , verifier * BitrotVerifier , errChan chan <- error ) {
if disk == OfflineDisk {
errChan <- errors . Trace ( errDiskNotFound )
retu rn
f . Algorithm = algorithm
for i , disk := range s . disks {
if disk == OfflineDisk || buffers [ i ] == nil {
continue
}
f . Checksums [ i ] = verifiers [ i ] . Sum ( nil )
}
_ , err := disk . ReadFile ( volume , path , offset , buffer , verifier )
errChan <- err
return f , nil
}