xl: Fix how we deal with read offsets at erasure layer. (#1511)

Requires skipping necessary parts of dataBlocks during
decoding phase and requires us to properly skip the
entries as needed.

Thanks to Karthic for reproducing this important issue.

Fixes #1503
master
Harshavardhana 9 years ago
parent c06b9abc15
commit 0b74f5624e
  1. 25
      xl-erasure-v1-readfile.go
  2. 10
      xl-erasure-v1-utils.go

@ -26,7 +26,7 @@ import (
)
// ReadFile - read file
func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) {
func (xl XL) ReadFile(volume, path string, startOffset int64) (io.ReadCloser, error) {
// Input validation.
if !isValidVolname(volume) {
return nil, errInvalidArgument
@ -72,6 +72,7 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
// If disk.ReadFile returns error and we don't have read quorum it will be taken care as
// ReedSolomon.Reconstruct() will fail later.
var reader io.ReadCloser
offset := int64(0)
if reader, err = disk.ReadFile(volume, erasurePart, offset); err == nil {
readers[index] = reader
}
@ -168,8 +169,23 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
}
}
// Join the decoded blocks.
err = xl.ReedSolomon.Join(pipeWriter, enBlocks, int(curBlockSize))
// Get all the data blocks.
dataBlocks := getDataBlocks(enBlocks, metadata.Erasure.DataBlocks, int(curBlockSize))
// Verify if the offset is right for the block, if not move to
// the next block.
if startOffset > 0 {
startOffset = startOffset - int64(len(dataBlocks))
if startOffset > int64(len(dataBlocks)) {
continue
}
// Fetch back the overflow offset, to skip from the current data
// blocks.
startOffset = startOffset + int64(len(dataBlocks))
}
// Write safely the necessary blocks.
_, err = pipeWriter.Write(dataBlocks[int(startOffset):])
if err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
@ -179,6 +195,9 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
return
}
// Reset offset to '0' to read rest of the blocks.
startOffset = int64(0)
// Save what's left after reading erasureBlockSize.
totalLeft = totalLeft - metadata.Erasure.BlockSize
}

@ -16,6 +16,16 @@
package main
// getDataBlocks - fetches the data block only part of the input encoded blocks.
func getDataBlocks(enBlocks [][]byte, dataBlocks int, curBlockSize int) []byte {
var data []byte
for _, block := range enBlocks[:dataBlocks] {
data = append(data, block...)
}
data = data[:curBlockSize]
return data
}
// checkBlockSize return the size of a single block.
// The first non-zero size is returned,
// or 0 if all blocks are size 0.

Loading…
Cancel
Save