From 5291db60c6f2cdfdcd12e1a0021008a1ca26607a Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Tue, 28 Jun 2016 01:54:55 +0530 Subject: [PATCH] XL/erasure: refactor erasureReadFile. Move parallelRead into a separate function. (#2008) --- erasure-readfile.go | 123 ++++++++++++++++++++------------------------ 1 file changed, 55 insertions(+), 68 deletions(-) diff --git a/erasure-readfile.go b/erasure-readfile.go index 33c1cc8c2..bc36ee31b 100644 --- a/erasure-readfile.go +++ b/erasure-readfile.go @@ -125,6 +125,50 @@ func getReadDisks(orderedDisks []StorageAPI, index int, dataBlocks int) (readDis return nil, 0, errXLReadQuorum } +// parallelRead - reads chunks in parallel from the disks specified in []readDisks. +func parallelRead(volume, path string, readDisks []StorageAPI, orderedDisks []StorageAPI, enBlocks [][]byte, blockOffset int64, curChunkSize int64, bitRotVerify func(diskIndex int) bool) { + // WaitGroup to synchronise the read go-routines. + wg := &sync.WaitGroup{} + + // Read disks in parallel. + for index := range readDisks { + if readDisks[index] == nil { + continue + } + wg.Add(1) + // Reads chunk from readDisk[index] in routine. + go func(index int) { + defer wg.Done() + + // Verify bit rot for the file on this disk. + if !bitRotVerify(index) { + // So that we don't read from this disk for the next block. + orderedDisks[index] = nil + return + } + + // Chunk writer. + chunkWriter := bytes.NewBuffer(make([]byte, 0, curChunkSize)) + + // CopyN - copies until current chunk size. + err := copyN(chunkWriter, readDisks[index], volume, path, blockOffset, curChunkSize) + if err != nil { + // So that we don't read from this disk for the next block. + orderedDisks[index] = nil + return + } + + // Copy the read blocks. + enBlocks[index] = chunkWriter.Bytes() + + // Successfully read. + }(index) + } + + // Waiting for first routines to finish. + wg.Wait() +} + // erasureReadFile - read bytes from erasure coded files and writes to given writer. // Erasure coded files are read block by block as per given erasureInfo and data chunks // are decoded into a data block. Data block is trimmed for given offset and length, @@ -205,86 +249,29 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s // should happen. nextIndex := 0 - // parallelRead() reads DataBlocks number of disks if all data - // disks are available or DataBlocks+1 number of disks if one - // of the data disks is missing. All the reads happen in parallel. - var parallelRead func() error - parallelRead = func() error { - // If enough blocks are available to do rs.Reconstruct() - if isSuccessDecodeBlocks(enBlocks, eInfo.DataBlocks) { - return nil - } - if nextIndex == len(orderedDisks) { - // No more disks to read from. - return errXLReadQuorum - } - + for { // readDisks - disks from which we need to read in parallel. var readDisks []StorageAPI var err error readDisks, nextIndex, err = getReadDisks(orderedDisks, nextIndex, eInfo.DataBlocks) if err != nil { - return err + return bytesWritten, err } - - // WaitGroup to synchronise the read go-routines. - wg := &sync.WaitGroup{} - - // Read disks in parallel. - for index := range readDisks { - if readDisks[index] == nil { - continue - } - wg.Add(1) - // Reads chunk from readDisk[index] in routine. - go func(index int) { - defer wg.Done() - - // Verify bit rot for the file on this disk. - if !bitRotVerify(index) { - // So that we don't read from this disk for the next block. - orderedDisks[index] = nil - return - } - - // Chunk writer. - chunkWriter := bytes.NewBuffer(make([]byte, 0, curChunkSize)) - - // CopyN - copies until current chunk size. - err := copyN(chunkWriter, readDisks[index], volume, path, blockOffset, curChunkSize) - if err != nil { - // So that we don't read from this disk for the next block. - orderedDisks[index] = nil - return - } - - // Copy the read blocks. - enBlocks[index] = chunkWriter.Bytes() - - // Reset the buffer. - chunkWriter.Reset() - - // Successfully read. - }(index) + parallelRead(volume, path, readDisks, orderedDisks, enBlocks, blockOffset, curChunkSize, bitRotVerify) + if isSuccessDecodeBlocks(enBlocks, eInfo.DataBlocks) { + // If enough blocks are available to do rs.Reconstruct() + break + } + if nextIndex == len(orderedDisks) { + // No more disks to read from. + return bytesWritten, errXLReadQuorum } - - // Waiting for first routines to finish. - wg.Wait() - - // Continue to read the rest of the blocks in parallel. - return parallelRead() - } - - // Start reading all blocks in parallel. - err := parallelRead() - if err != nil { - return bytesWritten, err } // If we have all the data blocks no need to decode, continue to write. if !isSuccessDataBlocks(enBlocks, eInfo.DataBlocks) { // Reconstruct the missing data blocks. - if err = decodeData(enBlocks, eInfo.DataBlocks, eInfo.ParityBlocks); err != nil { + if err := decodeData(enBlocks, eInfo.DataBlocks, eInfo.ParityBlocks); err != nil { return bytesWritten, err } }