diff --git a/erasure-readfile.go b/erasure-readfile.go index 722c09c7e..e3ec5cee1 100644 --- a/erasure-readfile.go +++ b/erasure-readfile.go @@ -26,42 +26,42 @@ import ( "github.com/klauspost/reedsolomon" ) -// isSuccessDecodeBlocks - do we have all the blocks to be successfully decoded?. -// input disks here are expected to be ordered i.e parityBlocks -// are preceded by dataBlocks. For for information look at getOrderedDisks(). -func isSuccessDecodeBlocks(disks []StorageAPI, dataBlocks int) bool { +// isSuccessDecodeBlocks - do we have all the blocks to be +// successfully decoded?. Input encoded blocks ordered matrix. +func isSuccessDecodeBlocks(enBlocks [][]byte, dataBlocks int) bool { // Count number of data and parity blocks that were read. var successDataBlocksCount = 0 var successParityBlocksCount = 0 - for index, disk := range disks { - if disk == nil { + for index := range enBlocks { + if enBlocks[index] == nil { continue } + // block index lesser than data blocks, update data block count. if index < dataBlocks { successDataBlocksCount++ continue - } + } // else { // update parity block count. successParityBlocksCount++ } // Returns true if we have atleast dataBlocks + 1 parity. - return successDataBlocksCount+successParityBlocksCount >= dataBlocks+1 + return successDataBlocksCount == dataBlocks || successDataBlocksCount+successParityBlocksCount >= dataBlocks+1 } // isSuccessDataBlocks - do we have all the data blocks? -// input disks here are expected to be ordered i.e parityBlocks -// are preceded by dataBlocks. For for information look at getOrderedDisks(). -func isSuccessDataBlocks(disks []StorageAPI, dataBlocks int) bool { +// Input encoded blocks ordered matrix. +func isSuccessDataBlocks(enBlocks [][]byte, dataBlocks int) bool { // Count number of data blocks that were read. var successDataBlocksCount = 0 - for index, disk := range disks[:dataBlocks] { - if disk == nil { + for index := range enBlocks[:dataBlocks] { + if enBlocks[index] == nil { continue } + // block index lesser than data blocks, update data block count. if index < dataBlocks { successDataBlocksCount++ } } - // Returns true if we have all the dataBlocks. + // Returns true if we have atleast the dataBlocks. return successDataBlocksCount >= dataBlocks } @@ -79,6 +79,52 @@ func getOrderedDisks(distribution []int, disks []StorageAPI, blockCheckSums []ch return orderedDisks, orderedBlockCheckSums } +// Return readable disks slice from which we can read parallely. +func getReadDisks(orderedDisks []StorageAPI, index int, dataBlocks int) (readDisks []StorageAPI, nextIndex int, err error) { + readDisks = make([]StorageAPI, len(orderedDisks)) + dataDisks := 0 + parityDisks := 0 + // Count already read data and parity chunks. + for i := 0; i < index; i++ { + if orderedDisks[i] == nil { + continue + } + if i < dataBlocks { + dataDisks++ + } else { + parityDisks++ + } + } + + // Sanity checks - we should never have this situation. + if dataDisks == dataBlocks { + return nil, 0, errUnexpected + } + if dataDisks+parityDisks >= dataBlocks+1 { + return nil, 0, errUnexpected + } + + // Find the disks from which next set of parallel reads should happen. + for i := index; i < len(orderedDisks); i++ { + if orderedDisks[i] == nil { + continue + } + if i < dataBlocks { + dataDisks++ + } else { + parityDisks++ + } + readDisks[i] = orderedDisks[i] + if dataDisks == dataBlocks { + return readDisks, i + 1, nil + } + if dataDisks+parityDisks == dataBlocks+1 { + return readDisks, i + 1, nil + } + } + return nil, 0, errXLReadQuorum +} + // erasureReadFile - read bytes from erasure coded files and writes to given writer. // Erasure coded files are read block by block as per given erasureInfo and data chunks // are decoded into a data block. Data block is trimmed for given offset and length, @@ -95,9 +141,9 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s // disks and rest will be parity. orderedDisks, orderedBlockCheckSums := getOrderedDisks(eInfo.Distribution, disks, blockCheckSums) - // bitrotVerify verifies if the file on a particular disk doesn't have bitrot + // bitRotVerify verifies if the file on a particular disk doesn't have bitrot // by verifying the hash of the contents of the file. - bitrotVerify := func() func(diskIndex int) bool { + bitRotVerify := func() func(diskIndex int) bool { verified := make([]bool, len(orderedDisks)) // Return closure so that we have reference to []verified and // not recalculate the hash on it everytime the function is @@ -117,9 +163,6 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s // Total bytes written to writer bytesWritten := int64(0) - // Each element of enBlocks holds curChunkSize'd amount of data read from its corresponding disk. - enBlocks := make([][]byte, len(orderedDisks)) - // chunkSize is roughly BlockSize/DataBlocks. // chunkSize is calculated such that chunkSize*DataBlocks accommodates BlockSize bytes. // So chunkSize*DataBlocks can be slightly larger than BlockSize if BlockSize is not divisible by @@ -133,11 +176,23 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s // need to read parity disks. If one of the data disk is missing we need to read DataBlocks+1 number // of disks. Once read, we Reconstruct() missing data if needed and write it to the given writer. for block := startBlock; bytesWritten < length; block++ { + // Each element of enBlocks holds curChunkSize'd amount of data read from its corresponding disk. + enBlocks := make([][]byte, len(orderedDisks)) + + // enBlocks data can have 0-padding hence we need to figure the exact number + // of bytes we want to read from enBlocks. + blockSize := eInfo.BlockSize + // curChunkSize is chunkSize until end block. curChunkSize := chunkSize + + // We have endBlock, verify if we need to have padding. if block == endBlock && (totalLength%eInfo.BlockSize != 0) { // If this is the last block and size of the block is < BlockSize. curChunkSize = getEncodedBlockLen(totalLength%eInfo.BlockSize, eInfo.DataBlocks) + + // For the last block, the block size can be less than BlockSize. + blockSize = totalLength % eInfo.BlockSize } // Block offset. @@ -146,148 +201,95 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s // then it can result in wrong offset for the last block. blockOffset := block * chunkSize - // Figure out the number of disks that are needed for the read. - // We will need DataBlocks number of disks if all the data disks are up. - // We will need DataBlocks+1 number of disks even if one of the data disks is down. - readableDiskCount := 0 - - // Count the number of data disks that are up. - for _, disk := range orderedDisks[:eInfo.DataBlocks] { - if disk == nil { - continue + // nextIndex - index from which next set of parallel reads + // should happen. + nextIndex := 0 + + // parallelRead() reads DataBlocks number of disks if all data + // disks are available or DataBlocks+1 number of disks if one + // of the data disks is missing. All the reads happen in parallel. + var parallelRead func() error + parallelRead = func() error { + // If enough blocks are available to do rs.Reconstruct() + if isSuccessDecodeBlocks(enBlocks, eInfo.DataBlocks) { + return nil } - readableDiskCount++ - } - - // Readable disks.. - if readableDiskCount < eInfo.DataBlocks { - // Not enough data disks up, so we need DataBlocks+1 number - // of disks for reed-solomon Reconstruct() - readableDiskCount = eInfo.DataBlocks + 1 - } - - // Initialize wait group. - var wg = &sync.WaitGroup{} - - // Current disk index from which to read, this will be used later - // in case one of the parallel reads fails. - index := 0 - - // Read from the disks in parallel. - for _, disk := range orderedDisks { - if disk == nil { - index++ - continue + if nextIndex == len(orderedDisks) { + // No more disks to read from. + return errXLReadQuorum } - // Increment wait group. - wg.Add(1) - - // Start reading from disk in a go-routine. - go func(index int, disk StorageAPI) { - defer wg.Done() - - // Verify bit rot for this disk slice. - if !bitrotVerify(index) { - // So that we don't read from this disk for the next block. - orderedDisks[index] = nil - return - } - - // Chunk writer. - chunkWriter := bytes.NewBuffer(make([]byte, 0, curChunkSize)) - - // CopyN copies until current chunk size. - err := copyN(chunkWriter, disk, volume, path, blockOffset, curChunkSize) - if err != nil { - // So that we don't read from this disk for the next block. - orderedDisks[index] = nil - return - } - - // Copy the read blocks. - enBlocks[index] = chunkWriter.Bytes() - - // Reset the buffer. - chunkWriter.Reset() - - // Successfully read. - }(index, disk) - - index++ - readableDiskCount-- - // We have read all the readable disks. - if readableDiskCount == 0 { - break + // readDisks - disks from which we need to read in parallel. + var readDisks []StorageAPI + var err error + readDisks, nextIndex, err = getReadDisks(orderedDisks, nextIndex, eInfo.DataBlocks) + if err != nil { + return err } - } - // Wait for all the reads to finish. - wg.Wait() - - // FIXME: make this parallel. - - // If we have all the data blocks no need to decode. - if !isSuccessDataBlocks(orderedDisks, eInfo.DataBlocks) { - // If we don't have DataBlocks number of data blocks we - // will have to read enough parity blocks such that we - // have DataBlocks+1 number for blocks for rs.Reconstruct(). - // index is either dataBlocks or dataBlocks + 1. - for ; index < len(orderedDisks); index++ { - // We have enough blocks to decode, break out. - if isSuccessDecodeBlocks(orderedDisks, eInfo.DataBlocks) { - // We have DataBlocks+1 blocks, enough for rs.Reconstruct() - break - } - - // This disk was previously set to nil and ignored, do not read again. - if orderedDisks[index] == nil { - continue - } + // WaitGroup to synchronise the read go-routines. + wg := &sync.WaitGroup{} - // Verify bit-rot for this index. - if !bitrotVerify(index) { - // Mark nil so that we don't read from this disk for the next block. - orderedDisks[index] = nil + // Read disks in parallel. + for index := range readDisks { + if readDisks[index] == nil { continue } + wg.Add(1) + // Reads chunk from readDisk[index] in routine. + go func(index int) { + defer wg.Done() + + // Verify bit rot for the file on this disk. + if !bitRotVerify(index) { + // So that we don't read from this disk for the next block. + orderedDisks[index] = nil + return + } + + // Chunk writer. + chunkWriter := bytes.NewBuffer(make([]byte, 0, curChunkSize)) + + // CopyN - copies until current chunk size. + err := copyN(chunkWriter, readDisks[index], volume, path, blockOffset, curChunkSize) + if err != nil { + // So that we don't read from this disk for the next block. + orderedDisks[index] = nil + return + } + + // Copy the read blocks. + enBlocks[index] = chunkWriter.Bytes() + + // Reset the buffer. + chunkWriter.Reset() + + // Successfully read. + }(index) + } - // Chunk writer. - chunkWriter := bytes.NewBuffer(make([]byte, 0, curChunkSize)) + // Waiting for first routines to finish. + wg.Wait() - // CopyN copies until current chunk size. - err := copyN(chunkWriter, orderedDisks[index], volume, path, blockOffset, curChunkSize) - if err != nil { - // ERROR: Mark nil so that we don't read from - // this disk for the next block. - orderedDisks[index] = nil - continue - } - - // Copy the read blocks. - chunkWriter.Read(enBlocks[index]) + // Continue to read the rest of the blocks in parallel. + return parallelRead() + } - // Reset the buffer. - chunkWriter.Reset() - } + // Start reading all blocks in parallel. + err := parallelRead() + if err != nil { + return bytesWritten, err + } + // If we have all the data blocks no need to decode, continue to write. + if !isSuccessDataBlocks(enBlocks, eInfo.DataBlocks) { // Reconstruct the missing data blocks. - err := decodeData(enBlocks, eInfo.DataBlocks, eInfo.ParityBlocks) - if err != nil { + if err = decodeData(enBlocks, eInfo.DataBlocks, eInfo.ParityBlocks); err != nil { return bytesWritten, err } - // Success. } var outSize, outOffset int64 - // enBlocks data can have 0-padding hence we need to figure the exact number - // of bytes we want to read from enBlocks. - blockSize := eInfo.BlockSize - if block == endBlock && totalLength%eInfo.BlockSize != 0 { - // For the last block, the block size can be less than BlockSize. - blockSize = totalLength % eInfo.BlockSize - } - // If this is start block, skip unwanted bytes. if block == startBlock { outOffset = bytesToSkip