diff --git a/erasure-readfile.go b/erasure-readfile.go index 7a0bc58a7..71b4eb1a9 100644 --- a/erasure-readfile.go +++ b/erasure-readfile.go @@ -21,6 +21,7 @@ import ( "encoding/hex" "errors" "io" + "sync" "github.com/klauspost/reedsolomon" ) @@ -30,82 +31,113 @@ import ( // are decoded into a data block. Data block is trimmed for given offset and length, // then written to given writer. This function also supports bit-rot detection by // verifying checksum of individual block's checksum. -func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path string, partName string, eInfos []erasureInfo, offset int64, length int64) (int64, error) { +func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path string, partName string, eInfos []erasureInfo, offset int64, length int64, totalLength int64) (int64, error) { + min := func(a int64, b int64) int { + if a < b { + return int(a) + } + return int(b) + } // Total bytes written to writer bytesWritten := int64(0) // Gather previously calculated block checksums. - blockCheckSums := metaPartBlockChecksums(disks, eInfos, partName) + // blockCheckSums := metaPartBlockChecksums(disks, eInfos, partName) // Pick one erasure info. eInfo := pickValidErasureInfo(eInfos) + // Data chunk size on each block. + chunkSize := getEncodedBlockLen(eInfo.BlockSize, eInfo.DataBlocks) + // Get block info for given offset, length and block size. - startBlock, bytesToSkip, endBlock := getBlockInfo(offset, length, eInfo.BlockSize) + startBlock, bytesToSkip := getBlockInfo(offset, eInfo.BlockSize) - // Data chunk size on each block. - chunkSize := eInfo.BlockSize / int64(eInfo.DataBlocks) + orderedDisks := make([]StorageAPI, len(disks)) + for index := range disks { + blockIndex := eInfo.Distribution[index] + orderedDisks[blockIndex-1] = disks[index] + } + + for block := startBlock; bytesWritten < length; block++ { + curChunkSize := chunkSize + if totalLength-offset+bytesWritten < curChunkSize { + curChunkSize = getEncodedBlockLen(totalLength-offset+bytesWritten, eInfo.DataBlocks) + } - for block := startBlock; block <= endBlock; block++ { // Allocate encoded blocks up to storage disks. enBlocks := make([][]byte, len(disks)) - // Counter to keep success data blocks. - var successDataBlocksCount = 0 - var noReconstruct bool // Set for no reconstruction. + // Figure out the number of disks that are needed for the read. + // If all the data disks are available then dataDiskCount = eInfo.DataBlocks + // Else dataDiskCount = eInfo.DataBlocks + 1 - // Keep how many bytes are read for this block. - // In most cases, last block in the file is shorter than chunkSize - lastReadSize := int64(0) - - // Read from all the disks. - for index, disk := range disks { - blockIndex := eInfo.Distribution[index] - 1 - if !isValidBlock(disks, volume, path, toDiskIndex(blockIndex, eInfo.Distribution), blockCheckSums) { - continue - } + diskCount := 0 + for _, disk := range orderedDisks[:eInfo.DataBlocks] { if disk == nil { continue } + diskCount++ + } - // Initialize chunk slice and fill the data from each parts. - enBlocks[blockIndex] = make([]byte, chunkSize) + if diskCount < eInfo.DataBlocks { + diskCount = eInfo.DataBlocks + 1 + } - // Read the necessary blocks. - n, err := disk.ReadFile(volume, path, block*chunkSize, enBlocks[blockIndex]) - if err != nil { - enBlocks[blockIndex] = nil - } else if n < chunkSize { - // As the data we got is smaller than chunk size, keep only required chunk slice - enBlocks[blockIndex] = append([]byte{}, enBlocks[blockIndex][:n]...) + wg := &sync.WaitGroup{} + index := 0 + for _, disk := range orderedDisks { + if disk == nil { + index++ + continue } - - // Remember bytes read at first time. - if lastReadSize == 0 { - lastReadSize = n + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + buf := make([]byte, curChunkSize) + n, err := disk.ReadFile(volume, path, block*curChunkSize, buf) + if err != nil { + orderedDisks[index] = nil + return + } + enBlocks[index] = buf[:n] + }(index, disk) + index++ + diskCount-- + if diskCount == 0 { + break } + } + wg.Wait() - // If bytes read is not equal to bytes read lastly, treat it as corrupted chunk. - if n != lastReadSize { - return bytesWritten, errXLDataCorrupt + // Counter to keep success data blocks. + var successDataBlocksCount = 0 + var successParityBlocksCount = 0 + for bufidx, buf := range enBlocks { + if buf == nil { + continue } - - // Verify if we have successfully read all the data blocks. - if blockIndex < eInfo.DataBlocks && enBlocks[blockIndex] != nil { + if bufidx < eInfo.DataBlocks { successDataBlocksCount++ - // Set when we have all the data blocks and no - // reconstruction is needed, so that we can avoid - // erasure reconstruction. - noReconstruct = successDataBlocksCount == eInfo.DataBlocks - if noReconstruct { - // Break out we have read all the data blocks. - break - } + continue } + successParityBlocksCount++ } - // Verify if reconstruction is needed, proceed with reconstruction. - if !noReconstruct { + if successDataBlocksCount < eInfo.DataBlocks { + for ; index < len(orderedDisks); index++ { + if (successDataBlocksCount + successParityBlocksCount) == (eInfo.DataBlocks + 1) { + break + } + buf := make([]byte, curChunkSize) + n, err := orderedDisks[index].ReadFile(volume, path, block*curChunkSize, buf) + if err != nil { + orderedDisks[index] = nil + continue + } + successParityBlocksCount++ + enBlocks[index] = buf[:n] + } err := decodeData(enBlocks, eInfo.DataBlocks, eInfo.ParityBlocks) if err != nil { return bytesWritten, err @@ -113,7 +145,7 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s } // Get data blocks from encoded blocks. - dataBlocks, err := getDataBlocks(enBlocks, eInfo.DataBlocks, int(lastReadSize)*eInfo.DataBlocks) + dataBlocks, err := getDataBlocks(enBlocks, eInfo.DataBlocks, min(eInfo.BlockSize, totalLength-offset+bytesWritten)) if err != nil { return bytesWritten, err } @@ -123,12 +155,11 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s // If this is start block, skip unwanted bytes. if block == startBlock { - buf = append([]byte{}, dataBlocks[bytesToSkip:]...) + buf = buf[bytesToSkip:] } - // If this is end block, retain only required bytes. - if block == endBlock { - buf = append([]byte{}, buf[:length-bytesWritten]...) + if len(buf) > int(length-bytesWritten) { + buf = buf[:length-bytesWritten] } // Copy data blocks. diff --git a/erasure-utils.go b/erasure-utils.go index c4dcf8a5e..9316e739a 100644 --- a/erasure-utils.go +++ b/erasure-utils.go @@ -89,16 +89,16 @@ func getDataBlocks(enBlocks [][]byte, dataBlocks int, curBlockSize int) (data [] } // getBlockInfo - find start/end block and bytes to skip for given offset, length and block size. -func getBlockInfo(offset, length, blockSize int64) (startBlock, bytesToSkip, endBlock int64) { +func getBlockInfo(offset, blockSize int64) (startBlock, bytesToSkip int64) { // Calculate start block for given offset and how many bytes to skip to get the offset. startBlock = offset / blockSize bytesToSkip = offset % blockSize - - // Calculate end block for given size to read - endBlock = (offset + length) / blockSize - if endBlock > 0 && (offset+length)%blockSize == 0 { - endBlock-- - } - return } + +// calculate the blockSize based on input length and total number of +// data blocks. +func getEncodedBlockLen(inputLen int64, dataBlocks int) (curEncBlockSize int64) { + curEncBlockSize = (inputLen + int64(dataBlocks) - 1) / int64(dataBlocks) + return curEncBlockSize +} diff --git a/xl-v1-object.go b/xl-v1-object.go index aea58fd20..0237173c3 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -91,15 +91,19 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i totalBytesRead := int64(0) // Read from all parts. for ; partIndex <= lastPartIndex; partIndex++ { + if length == totalBytesRead { + break + } // Save the current part name and size. partName := xlMeta.Parts[partIndex].Name partSize := xlMeta.Parts[partIndex].Size - if partSize > (length - totalBytesRead) { - partSize = length - totalBytesRead + readSize := partSize - partOffset + if readSize > (length - totalBytesRead) { + readSize = length - totalBytesRead } // Start reading the part name. - n, err := erasureReadFile(writer, onlineDisks, bucket, pathJoin(object, partName), partName, eInfos, partOffset, partSize) + n, err := erasureReadFile(writer, onlineDisks, bucket, pathJoin(object, partName), partName, eInfos, partOffset, readSize, partSize) if err != nil { return err }