diff --git a/erasure-readfile.go b/erasure-readfile.go index 7a0bc58a7..91f494db8 100644 --- a/erasure-readfile.go +++ b/erasure-readfile.go @@ -17,127 +17,204 @@ package main import ( - "bytes" "encoding/hex" "errors" "io" + "sync" "github.com/klauspost/reedsolomon" ) // erasureReadFile - read bytes from erasure coded files and writes to given writer. // Erasure coded files are read block by block as per given erasureInfo and data chunks -// are decoded into a data block. Data block is trimmed for given offset and length, -// then written to given writer. This function also supports bit-rot detection by +// are decoded into a data block. Data block is trimmed for given offset and length, +// then written to given writer. This function also supports bit-rot detection by // verifying checksum of individual block's checksum. -func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path string, partName string, eInfos []erasureInfo, offset int64, length int64) (int64, error) { - // Total bytes written to writer - bytesWritten := int64(0) +func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path string, partName string, eInfos []erasureInfo, offset int64, length int64, totalLength int64) (int64, error) { + // Pick one erasure info. + eInfo := pickValidErasureInfo(eInfos) // Gather previously calculated block checksums. blockCheckSums := metaPartBlockChecksums(disks, eInfos, partName) + orderedBlockCheckSums := make([]checkSumInfo, len(disks)) - // Pick one erasure info. - eInfo := pickValidErasureInfo(eInfos) + // []orderedDisks will have first eInfo.DataBlocks disks as data disks and rest will be parity. + orderedDisks := make([]StorageAPI, len(disks)) + for index := range disks { + blockIndex := eInfo.Distribution[index] + orderedDisks[blockIndex-1] = disks[index] + orderedBlockCheckSums[blockIndex-1] = blockCheckSums[index] + } + + // bitrotVerify verifies if the file on a particular disk does not have bitrot by verifying the hash of + // the contents of the file. + bitrotVerify := func() func(diskIndex int) bool { + verified := make([]bool, len(orderedDisks)) + // Return closure so that we have reference to []verified and not recalculate the hash on it + // everytime the function is called for the same disk. + return func(diskIndex int) bool { + if verified[diskIndex] { + return true + } + isValid := isValidBlock(orderedDisks[diskIndex], volume, path, orderedBlockCheckSums[diskIndex]) + verified[diskIndex] = isValid + return isValid + } + }() - // Get block info for given offset, length and block size. - startBlock, bytesToSkip, endBlock := getBlockInfo(offset, length, eInfo.BlockSize) + // Total bytes written to writer + bytesWritten := int64(0) - // Data chunk size on each block. - chunkSize := eInfo.BlockSize / int64(eInfo.DataBlocks) + // chunkSize is roughly BlockSize/DataBlocks. + // chunkSize is calculated such that chunkSize*DataBlocks accommodates BlockSize bytes. + // So chunkSize*DataBlocks can be slightly larger than BlockSize if BlockSize is not divisible by + // DataBlocks. The extra space will have 0-padding. + chunkSize := getEncodedBlockLen(eInfo.BlockSize, eInfo.DataBlocks) - for block := startBlock; block <= endBlock; block++ { - // Allocate encoded blocks up to storage disks. - enBlocks := make([][]byte, len(disks)) + startBlock, endBlock, bytesToSkip := getBlockInfo(offset, totalLength, eInfo.BlockSize) - // Counter to keep success data blocks. - var successDataBlocksCount = 0 - var noReconstruct bool // Set for no reconstruction. + // For each block, read chunk from each disk. If we are able to read all the data disks then we don't + // need to read parity disks. If one of the data disk is missing we need to read DataBlocks+1 number + // of disks. Once read, we Reconstruct() missing data if needed and write it to the given writer. + for block := startBlock; bytesWritten < length; block++ { + // curChunkSize will be chunkSize except for the last block because the size of the last block + // can be less than BlockSize. + curChunkSize := chunkSize + if block == endBlock && (totalLength%eInfo.BlockSize != 0) { + // If this is the last block and size of the block is < BlockSize. + curChunkSize = getEncodedBlockLen(totalLength%eInfo.BlockSize, eInfo.DataBlocks) + } - // Keep how many bytes are read for this block. - // In most cases, last block in the file is shorter than chunkSize - lastReadSize := int64(0) + // Each element of enBlocks holds curChunkSize'd amount of data read from its corresponding disk. + enBlocks := make([][]byte, len(disks)) - // Read from all the disks. - for index, disk := range disks { - blockIndex := eInfo.Distribution[index] - 1 - if !isValidBlock(disks, volume, path, toDiskIndex(blockIndex, eInfo.Distribution), blockCheckSums) { - continue - } + // Figure out the number of disks that are needed for the read. + // We will need DataBlocks number of disks if all the data disks are up. + // We will need DataBlocks+1 number of disks even if one of the data disks is down. + diskCount := 0 + // Count the number of data disks that are up. + for _, disk := range orderedDisks[:eInfo.DataBlocks] { if disk == nil { continue } + diskCount++ + } - // Initialize chunk slice and fill the data from each parts. - enBlocks[blockIndex] = make([]byte, chunkSize) + if diskCount < eInfo.DataBlocks { + // Not enough data disks up, so we need DataBlocks+1 number of disks for reed-solomon Reconstruct() + diskCount = eInfo.DataBlocks + 1 + } - // Read the necessary blocks. - n, err := disk.ReadFile(volume, path, block*chunkSize, enBlocks[blockIndex]) - if err != nil { - enBlocks[blockIndex] = nil - } else if n < chunkSize { - // As the data we got is smaller than chunk size, keep only required chunk slice - enBlocks[blockIndex] = append([]byte{}, enBlocks[blockIndex][:n]...) - } + wg := &sync.WaitGroup{} - // Remember bytes read at first time. - if lastReadSize == 0 { - lastReadSize = n + // current disk index from which to read, this will be used later in case one of the parallel reads fails. + index := 0 + // Read from the disks in parallel. + for _, disk := range orderedDisks { + if disk == nil { + index++ + continue } - - // If bytes read is not equal to bytes read lastly, treat it as corrupted chunk. - if n != lastReadSize { - return bytesWritten, errXLDataCorrupt + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + ok := bitrotVerify(index) + if !ok { + // So that we don't read from this disk for the next block. + orderedDisks[index] = nil + return + } + buf := make([]byte, curChunkSize) + // Note that for the offset calculation we have to use chunkSize and not + // curChunkSize. If we use curChunkSize for offset calculation then it + // can result in wrong offset for the last block. + n, err := disk.ReadFile(volume, path, block*chunkSize, buf) + if err != nil { + // So that we don't read from this disk for the next block. + orderedDisks[index] = nil + return + } + enBlocks[index] = buf[:n] + }(index, disk) + index++ + diskCount-- + if diskCount == 0 { + break } + } + wg.Wait() - // Verify if we have successfully read all the data blocks. - if blockIndex < eInfo.DataBlocks && enBlocks[blockIndex] != nil { + // Count number of data and parity blocks that were read. + var successDataBlocksCount = 0 + var successParityBlocksCount = 0 + for bufidx, buf := range enBlocks { + if buf == nil { + continue + } + if bufidx < eInfo.DataBlocks { successDataBlocksCount++ - // Set when we have all the data blocks and no - // reconstruction is needed, so that we can avoid - // erasure reconstruction. - noReconstruct = successDataBlocksCount == eInfo.DataBlocks - if noReconstruct { - // Break out we have read all the data blocks. - break - } + continue } + successParityBlocksCount++ } - // Verify if reconstruction is needed, proceed with reconstruction. - if !noReconstruct { + if successDataBlocksCount < eInfo.DataBlocks { + // If we don't have DataBlocks number of data blocks we will have to read enough + // parity blocks such that we have DataBlocks+1 number for blocks for reedsolomon.Reconstruct() + for ; index < len(orderedDisks); index++ { + if (successDataBlocksCount + successParityBlocksCount) == (eInfo.DataBlocks + 1) { + // We have DataBlocks+1 blocks, enough for reedsolomon.Reconstruct() + break + } + ok := bitrotVerify(index) + if !ok { + // Mark nil so that we don't read from this disk for the next block. + orderedDisks[index] = nil + continue + } + buf := make([]byte, curChunkSize) + n, err := orderedDisks[index].ReadFile(volume, path, block*chunkSize, buf) + if err != nil { + // Mark nil so that we don't read from this disk for the next block. + orderedDisks[index] = nil + continue + } + successParityBlocksCount++ + enBlocks[index] = buf[:n] + } + // Reconstruct the missing data blocks. err := decodeData(enBlocks, eInfo.DataBlocks, eInfo.ParityBlocks) if err != nil { return bytesWritten, err } } - // Get data blocks from encoded blocks. - dataBlocks, err := getDataBlocks(enBlocks, eInfo.DataBlocks, int(lastReadSize)*eInfo.DataBlocks) - if err != nil { - return bytesWritten, err + var outSize, outOffset int64 + // enBlocks data can have 0-padding hence we need to figure the exact number + // of bytes we want to read from enBlocks. + blockSize := eInfo.BlockSize + if block == endBlock && totalLength%eInfo.BlockSize != 0 { + // For the last block, the block size can be less than BlockSize. + blockSize = totalLength % eInfo.BlockSize } - // Keep required bytes into buf. - buf := dataBlocks - // If this is start block, skip unwanted bytes. if block == startBlock { - buf = append([]byte{}, dataBlocks[bytesToSkip:]...) + outOffset = bytesToSkip } - // If this is end block, retain only required bytes. - if block == endBlock { - buf = append([]byte{}, buf[:length-bytesWritten]...) + // Total data to be read. + outSize = blockSize + if length-bytesWritten < blockSize { + // We should not send more data than what was requested. + outSize = length - bytesWritten } - - // Copy data blocks. - var n int64 - n, err = io.Copy(writer, bytes.NewReader(buf)) - bytesWritten += int64(n) + // Write data blocks. + n, err := writeDataBlocks(writer, enBlocks, eInfo.DataBlocks, outOffset, outSize) if err != nil { return bytesWritten, err } + bytesWritten += n } return bytesWritten, nil @@ -179,23 +256,18 @@ func toDiskIndex(blockIdx int, distribution []int) int { // isValidBlock - calculates the checksum hash for the block and // validates if its correct returns true for valid cases, false otherwise. -func isValidBlock(disks []StorageAPI, volume, path string, diskIndex int, blockCheckSums []checkSumInfo) (ok bool) { +func isValidBlock(disk StorageAPI, volume, path string, blockCheckSum checkSumInfo) (ok bool) { ok = false - // Unknown block index requested, treat it as error. - if diskIndex == -1 { - return ok - } - // Disk is not present, treat entire block to be non existent. - if disks[diskIndex] == nil { - return ok + if disk == nil { + return false } // Read everything for a given block and calculate hash. - hashWriter := newHash(blockCheckSums[diskIndex].Algorithm) - hashBytes, err := hashSum(disks[diskIndex], volume, path, hashWriter) + hashWriter := newHash(blockCheckSum.Algorithm) + hashBytes, err := hashSum(disk, volume, path, hashWriter) if err != nil { return ok } - ok = hex.EncodeToString(hashBytes) == blockCheckSums[diskIndex].Hash + ok = hex.EncodeToString(hashBytes) == blockCheckSum.Hash return ok } diff --git a/erasure-utils.go b/erasure-utils.go index c4dcf8a5e..9daf39325 100644 --- a/erasure-utils.go +++ b/erasure-utils.go @@ -17,6 +17,7 @@ package main import ( + "bytes" "crypto/sha512" "hash" "io" @@ -62,43 +63,88 @@ func hashSum(disk StorageAPI, volume, path string, writer hash.Hash) ([]byte, er return writer.Sum(nil), nil } -// getDataBlocks - fetches the data block only part of the input encoded blocks. -func getDataBlocks(enBlocks [][]byte, dataBlocks int, curBlockSize int) (data []byte, err error) { - if len(enBlocks) < dataBlocks { - return nil, reedsolomon.ErrTooFewShards - } +// getDataBlockLen - get length of data blocks from encoded blocks. +func getDataBlockLen(enBlocks [][]byte, dataBlocks int) int { size := 0 - blocks := enBlocks[:dataBlocks] - for _, block := range blocks { + // Figure out the data block length. + for _, block := range enBlocks[:dataBlocks] { size += len(block) } - if size < curBlockSize { - return nil, reedsolomon.ErrShortData + return size +} + +// Writes all the data blocks from encoded blocks until requested +// outSize length. Provides a way to skip bytes until the offset. +func writeDataBlocks(dst io.Writer, enBlocks [][]byte, dataBlocks int, outOffset int64, outSize int64) (int64, error) { + // Do we have enough blocks? + if len(enBlocks) < dataBlocks { + return 0, reedsolomon.ErrTooFewShards } - write := curBlockSize - for _, block := range blocks { - if write < len(block) { - data = append(data, block[:write]...) - return data, nil + // Do we have enough data? + if int64(getDataBlockLen(enBlocks, dataBlocks)) < outSize { + return 0, reedsolomon.ErrShortData + } + + // Counter to decrement total left to write. + write := outSize + + // Counter to increment total written. + totalWritten := int64(0) + + // Write all data blocks to dst. + for _, block := range enBlocks[:dataBlocks] { + // Skip blocks until we have reached our offset. + if outOffset >= int64(len(block)) { + // Decrement offset. + outOffset -= int64(len(block)) + continue + } else { + // Skip until offset. + block = block[outOffset:] + + // Reset the offset for next iteration to read everything + // from subsequent blocks. + outOffset = 0 + } + // We have written all the blocks, write the last remaining block. + if write < int64(len(block)) { + n, err := io.Copy(dst, bytes.NewReader(block[:write])) + if err != nil { + return 0, err + } + totalWritten += n + break + } + // Copy the block. + n, err := io.Copy(dst, bytes.NewReader(block)) + if err != nil { + return 0, err } - data = append(data, block...) - write -= len(block) + + // Decrement output size. + write -= n + + // Increment written. + totalWritten += n } - return data, nil + + // Success. + return totalWritten, nil } // getBlockInfo - find start/end block and bytes to skip for given offset, length and block size. -func getBlockInfo(offset, length, blockSize int64) (startBlock, bytesToSkip, endBlock int64) { +func getBlockInfo(offset, length, blockSize int64) (startBlock, endBlock, bytesToSkip int64) { // Calculate start block for given offset and how many bytes to skip to get the offset. startBlock = offset / blockSize bytesToSkip = offset % blockSize - - // Calculate end block for given size to read - endBlock = (offset + length) / blockSize - if endBlock > 0 && (offset+length)%blockSize == 0 { - endBlock-- - } - + endBlock = length / blockSize return } + +// calculate the blockSize based on input length and total number of +// data blocks. +func getEncodedBlockLen(inputLen int64, dataBlocks int) (curEncBlockSize int64) { + curEncBlockSize = (inputLen + int64(dataBlocks) - 1) / int64(dataBlocks) + return curEncBlockSize +} diff --git a/server_xl_test.go b/server_xl_test.go index 4814cdc47..6e6be7de3 100644 --- a/server_xl_test.go +++ b/server_xl_test.go @@ -840,15 +840,17 @@ func (s *MyAPIXLSuite) TestPartialContent(c *C) { c.Assert(response.StatusCode, Equals, http.StatusOK) // Prepare request - var table = []struct { + var testCases = []struct { byteRange string expectedString string }{ - {"6-7", "Wo"}, + {"4-7", "o Wo"}, + {"1-", "ello World"}, {"6-", "World"}, + {"-2", "ld"}, {"-7", "o World"}, } - for _, t := range table { + for _, t := range testCases { request, err = newTestRequest("GET", s.testServer.Server.URL+"/partial-content/bar", 0, nil, s.testServer.AccessKey, s.testServer.SecretKey) c.Assert(err, IsNil) diff --git a/xl-v1-object.go b/xl-v1-object.go index aea58fd20..464c4069f 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -91,22 +91,29 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i totalBytesRead := int64(0) // Read from all parts. for ; partIndex <= lastPartIndex; partIndex++ { + if length == totalBytesRead { + break + } // Save the current part name and size. partName := xlMeta.Parts[partIndex].Name partSize := xlMeta.Parts[partIndex].Size - if partSize > (length - totalBytesRead) { - partSize = length - totalBytesRead + + readSize := partSize - partOffset + // readSize should be adjusted so that we don't write more data than what was requested. + if readSize > (length - totalBytesRead) { + readSize = length - totalBytesRead } // Start reading the part name. - n, err := erasureReadFile(writer, onlineDisks, bucket, pathJoin(object, partName), partName, eInfos, partOffset, partSize) + n, err := erasureReadFile(writer, onlineDisks, bucket, pathJoin(object, partName), partName, eInfos, partOffset, readSize, partSize) if err != nil { return err } totalBytesRead += n - // Reset part offset to 0 to read rest of the part from the beginning. + // partOffset will be valid only for the first part, hence reset it to 0 for + // the remaining parts. partOffset = 0 } // End of read all parts loop.