From d4bea5fbf80509071ac6cbc351f3e49942038852 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Wed, 22 Jun 2016 21:35:03 +0530 Subject: [PATCH] XL/erasure-read: Add Comments and enable bitrot detection. --- erasure-readfile.go | 146 ++++++++++++++++++++++++++++---------------- erasure-utils.go | 3 +- xl-v1-object.go | 5 +- 3 files changed, 100 insertions(+), 54 deletions(-) diff --git a/erasure-readfile.go b/erasure-readfile.go index 71b4eb1a9..7f9a0389c 100644 --- a/erasure-readfile.go +++ b/erasure-readfile.go @@ -17,7 +17,6 @@ package main import ( - "bytes" "encoding/hex" "errors" "io" @@ -28,51 +27,72 @@ import ( // erasureReadFile - read bytes from erasure coded files and writes to given writer. // Erasure coded files are read block by block as per given erasureInfo and data chunks -// are decoded into a data block. Data block is trimmed for given offset and length, -// then written to given writer. This function also supports bit-rot detection by +// are decoded into a data block. Data block is trimmed for given offset and length, +// then written to given writer. This function also supports bit-rot detection by // verifying checksum of individual block's checksum. func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path string, partName string, eInfos []erasureInfo, offset int64, length int64, totalLength int64) (int64, error) { - min := func(a int64, b int64) int { - if a < b { - return int(a) - } - return int(b) - } - // Total bytes written to writer - bytesWritten := int64(0) - - // Gather previously calculated block checksums. - // blockCheckSums := metaPartBlockChecksums(disks, eInfos, partName) - // Pick one erasure info. eInfo := pickValidErasureInfo(eInfos) - // Data chunk size on each block. - chunkSize := getEncodedBlockLen(eInfo.BlockSize, eInfo.DataBlocks) - - // Get block info for given offset, length and block size. - startBlock, bytesToSkip := getBlockInfo(offset, eInfo.BlockSize) + // Gather previously calculated block checksums. + blockCheckSums := metaPartBlockChecksums(disks, eInfos, partName) + orderedBlockCheckSums := make([]checkSumInfo, len(disks)) + // []orderedDisks will have first eInfo.DataBlocks disks as data disks and rest will be parity. orderedDisks := make([]StorageAPI, len(disks)) for index := range disks { blockIndex := eInfo.Distribution[index] orderedDisks[blockIndex-1] = disks[index] + orderedBlockCheckSums[blockIndex-1] = blockCheckSums[index] } + // bitrotVerify verifies if the file on a particular disk does not have bitrot by verifying the hash of + // the contents of the file. + bitrotVerify := func() func(diskIndex int) bool { + verified := make([]bool, len(orderedDisks)) + // Return closure so that we have reference to []verified and not recalculate the hash on it + // everytime the function is called for the same disk. + return func(diskIndex int) bool { + if verified[diskIndex] { + return true + } + isValid := isValidBlock(orderedDisks[diskIndex], volume, path, orderedBlockCheckSums[diskIndex]) + verified[diskIndex] = isValid + return isValid + } + }() + + // Total bytes written to writer + bytesWritten := int64(0) + + // chunkSize is roughly BlockSize/DataBlocks. + // chunkSize is calculated such that chunkSize*DataBlocks accommodates BlockSize bytes. + // So chunkSize*DataBlocks can be slightly larger than BlockSize if BlockSize is not divisible by + // DataBlocks. The extra space will have 0-padding. + chunkSize := getEncodedBlockLen(eInfo.BlockSize, eInfo.DataBlocks) + + startBlock, endBlock, bytesToSkip := getBlockInfo(offset, totalLength, eInfo.BlockSize) + + // For each block, read chunk from each disk. If we are able to read all the data disks then we don't + // need to read parity disks. If one of the data disk is missing we need to read DataBlocks+1 number + // of disks. Once read, we Reconstruct() missing data if needed and write it to the given writer. for block := startBlock; bytesWritten < length; block++ { + // curChunkSize will be chunkSize except for the last block because the size of the last block + // can be less than BlockSize. curChunkSize := chunkSize - if totalLength-offset+bytesWritten < curChunkSize { - curChunkSize = getEncodedBlockLen(totalLength-offset+bytesWritten, eInfo.DataBlocks) + if block == endBlock && (totalLength%eInfo.BlockSize != 0) { + // If this is the last block and size of the block is < BlockSize. + curChunkSize = getEncodedBlockLen(totalLength%eInfo.BlockSize, eInfo.DataBlocks) } - // Allocate encoded blocks up to storage disks. + // Each element of enBlocks holds curChunkSize'd amount of data read from its corresponding disk. enBlocks := make([][]byte, len(disks)) // Figure out the number of disks that are needed for the read. - // If all the data disks are available then dataDiskCount = eInfo.DataBlocks - // Else dataDiskCount = eInfo.DataBlocks + 1 - + // We will need DataBlocks number of disks if all the data disks are up. + // We will need DataBlocks+1 number of disks even if one of the data disks is down. diskCount := 0 + // Count the number of data disks that are up. for _, disk := range orderedDisks[:eInfo.DataBlocks] { if disk == nil { continue @@ -81,11 +101,15 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s } if diskCount < eInfo.DataBlocks { + // Not enough data disks up, so we need DataBlocks+1 number of disks for reed-solomon Reconstruct() diskCount = eInfo.DataBlocks + 1 } wg := &sync.WaitGroup{} + + // current disk index from which to read, this will be used later in case one of the parallel reads fails. index := 0 + // Read from the disks in parallel. for _, disk := range orderedDisks { if disk == nil { index++ @@ -94,9 +118,19 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s wg.Add(1) go func(index int, disk StorageAPI) { defer wg.Done() + ok := bitrotVerify(index) + if !ok { + // So that we don't read from this disk for the next block. + orderedDisks[index] = nil + return + } buf := make([]byte, curChunkSize) - n, err := disk.ReadFile(volume, path, block*curChunkSize, buf) + // Note that for the offset calculation we have to use chunkSize and not + // curChunkSize. If we use curChunkSize for offset calculation then it + // can result in wrong offset for the last block. + n, err := disk.ReadFile(volume, path, block*chunkSize, buf) if err != nil { + // So that we don't read from this disk for the next block. orderedDisks[index] = nil return } @@ -110,7 +144,7 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s } wg.Wait() - // Counter to keep success data blocks. + // Count number of data and parity blocks that were read. var successDataBlocksCount = 0 var successParityBlocksCount = 0 for bufidx, buf := range enBlocks { @@ -125,50 +159,63 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s } if successDataBlocksCount < eInfo.DataBlocks { + // If we don't have DataBlocks number of data blocks we will have to read enough + // parity blocks such that we have DataBlocks+1 number for blocks for reedsolomon.Reconstruct() for ; index < len(orderedDisks); index++ { if (successDataBlocksCount + successParityBlocksCount) == (eInfo.DataBlocks + 1) { + // We have DataBlocks+1 blocks, enough for reedsolomon.Reconstruct() break } + ok := bitrotVerify(index) + if !ok { + // Mark nil so that we don't read from this disk for the next block. + orderedDisks[index] = nil + continue + } buf := make([]byte, curChunkSize) - n, err := orderedDisks[index].ReadFile(volume, path, block*curChunkSize, buf) + n, err := orderedDisks[index].ReadFile(volume, path, block*chunkSize, buf) if err != nil { + // Mark nil so that we don't read from this disk for the next block. orderedDisks[index] = nil continue } successParityBlocksCount++ enBlocks[index] = buf[:n] } + // Reconstruct the missing data blocks. err := decodeData(enBlocks, eInfo.DataBlocks, eInfo.ParityBlocks) if err != nil { return bytesWritten, err } } - // Get data blocks from encoded blocks. - dataBlocks, err := getDataBlocks(enBlocks, eInfo.DataBlocks, min(eInfo.BlockSize, totalLength-offset+bytesWritten)) + // enBlocks data can have 0-padding hence we need to figure the exact number + // of bytes we want to read from enBlocks. + blockSize := eInfo.BlockSize + if block == endBlock && totalLength%eInfo.BlockSize != 0 { + // For the last block, the block size can be less than BlockSize. + blockSize = totalLength % eInfo.BlockSize + } + data, err := getDataBlocks(enBlocks, eInfo.DataBlocks, int(blockSize)) if err != nil { return bytesWritten, err } - // Keep required bytes into buf. - buf := dataBlocks - // If this is start block, skip unwanted bytes. if block == startBlock { - buf = buf[bytesToSkip:] + data = data[bytesToSkip:] } - if len(buf) > int(length-bytesWritten) { - buf = buf[:length-bytesWritten] + if len(data) > int(length-bytesWritten) { + // We should not send more data than what was requested. + data = data[:length-bytesWritten] } - // Copy data blocks. - var n int64 - n, err = io.Copy(writer, bytes.NewReader(buf)) - bytesWritten += int64(n) + _, err = writer.Write(data) if err != nil { return bytesWritten, err } + bytesWritten += int64(len(data)) } return bytesWritten, nil @@ -210,23 +257,18 @@ func toDiskIndex(blockIdx int, distribution []int) int { // isValidBlock - calculates the checksum hash for the block and // validates if its correct returns true for valid cases, false otherwise. -func isValidBlock(disks []StorageAPI, volume, path string, diskIndex int, blockCheckSums []checkSumInfo) (ok bool) { +func isValidBlock(disk StorageAPI, volume, path string, blockCheckSum checkSumInfo) (ok bool) { ok = false - // Unknown block index requested, treat it as error. - if diskIndex == -1 { - return ok - } - // Disk is not present, treat entire block to be non existent. - if disks[diskIndex] == nil { - return ok + if disk == nil { + return false } // Read everything for a given block and calculate hash. - hashWriter := newHash(blockCheckSums[diskIndex].Algorithm) - hashBytes, err := hashSum(disks[diskIndex], volume, path, hashWriter) + hashWriter := newHash(blockCheckSum.Algorithm) + hashBytes, err := hashSum(disk, volume, path, hashWriter) if err != nil { return ok } - ok = hex.EncodeToString(hashBytes) == blockCheckSums[diskIndex].Hash + ok = hex.EncodeToString(hashBytes) == blockCheckSum.Hash return ok } diff --git a/erasure-utils.go b/erasure-utils.go index 9316e739a..521bc7b21 100644 --- a/erasure-utils.go +++ b/erasure-utils.go @@ -89,10 +89,11 @@ func getDataBlocks(enBlocks [][]byte, dataBlocks int, curBlockSize int) (data [] } // getBlockInfo - find start/end block and bytes to skip for given offset, length and block size. -func getBlockInfo(offset, blockSize int64) (startBlock, bytesToSkip int64) { +func getBlockInfo(offset, length, blockSize int64) (startBlock, endBlock, bytesToSkip int64) { // Calculate start block for given offset and how many bytes to skip to get the offset. startBlock = offset / blockSize bytesToSkip = offset % blockSize + endBlock = length / blockSize return } diff --git a/xl-v1-object.go b/xl-v1-object.go index 0237173c3..464c4069f 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -97,7 +97,9 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i // Save the current part name and size. partName := xlMeta.Parts[partIndex].Name partSize := xlMeta.Parts[partIndex].Size + readSize := partSize - partOffset + // readSize should be adjusted so that we don't write more data than what was requested. if readSize > (length - totalBytesRead) { readSize = length - totalBytesRead } @@ -110,7 +112,8 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i totalBytesRead += n - // Reset part offset to 0 to read rest of the part from the beginning. + // partOffset will be valid only for the first part, hence reset it to 0 for + // the remaining parts. partOffset = 0 } // End of read all parts loop.