erasure-readfile: Use chunk size to read from each disk for a block. (#1949)

A block of data is split into data chunk and each data chunk is
written to each disk.  Previously block size was used to read data
chunk which returns corrupted data.

This patch fixes the issue by reading chunk sized data from each disk
and assembles a block.

Fixes #1939
master
Bala FA 8 years ago committed by Harshavardhana
parent 393c504de0
commit 7d757033f2
  1. 13
      erasure-readfile.go

@ -43,6 +43,9 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
// Get block info for given offset, length and block size. // Get block info for given offset, length and block size.
startBlock, bytesToSkip, endBlock := getBlockInfo(offset, length, eInfo.BlockSize) startBlock, bytesToSkip, endBlock := getBlockInfo(offset, length, eInfo.BlockSize)
// Data chunk size on each block.
chunkSize := eInfo.BlockSize / int64(eInfo.DataBlocks)
for block := startBlock; block <= endBlock; block++ { for block := startBlock; block <= endBlock; block++ {
// Allocate encoded blocks up to storage disks. // Allocate encoded blocks up to storage disks.
enBlocks := make([][]byte, len(disks)) enBlocks := make([][]byte, len(disks))
@ -52,7 +55,7 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
var noReconstruct bool // Set for no reconstruction. var noReconstruct bool // Set for no reconstruction.
// Keep how many bytes are read for this block. // Keep how many bytes are read for this block.
// In most cases, last block in the file is shorter than eInfo.BlockSize. // In most cases, last block in the file is shorter than chunkSize
lastReadSize := int64(0) lastReadSize := int64(0)
// Read from all the disks. // Read from all the disks.
@ -66,14 +69,14 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
} }
// Initialize chunk slice and fill the data from each parts. // Initialize chunk slice and fill the data from each parts.
enBlocks[blockIndex] = make([]byte, eInfo.BlockSize) enBlocks[blockIndex] = make([]byte, chunkSize)
// Read the necessary blocks. // Read the necessary blocks.
n, err := disk.ReadFile(volume, path, block*eInfo.BlockSize, enBlocks[blockIndex]) n, err := disk.ReadFile(volume, path, block*chunkSize, enBlocks[blockIndex])
if err != nil { if err != nil {
enBlocks[blockIndex] = nil enBlocks[blockIndex] = nil
} else if n < eInfo.BlockSize { } else if n < chunkSize {
// As the data we got is smaller than eInfo.BlockSize, keep only required chunk slice // As the data we got is smaller than chunk size, keep only required chunk slice
enBlocks[blockIndex] = append([]byte{}, enBlocks[blockIndex][:n]...) enBlocks[blockIndex] = append([]byte{}, enBlocks[blockIndex][:n]...)
} }

Loading…
Cancel
Save