diff --git a/cmd/erasure-decode.go b/cmd/erasure-decode.go index 1e6afbee5..2468ab661 100644 --- a/cmd/erasure-decode.go +++ b/cmd/erasure-decode.go @@ -31,22 +31,59 @@ var errHealRequired = errors.New("heal required") // Reads in parallel from readers. type parallelReader struct { readers []io.ReaderAt + orgReaders []io.ReaderAt dataBlocks int offset int64 shardSize int64 shardFileSize int64 buf [][]byte + readerToBuf []int } // newParallelReader returns parallelReader. func newParallelReader(readers []io.ReaderAt, e Erasure, offset, totalLength int64) *parallelReader { + r2b := make([]int, len(readers)) + for i := range r2b { + r2b[i] = i + } return ¶llelReader{ - readers, - e.dataBlocks, - (offset / e.blockSize) * e.ShardSize(), - e.ShardSize(), - e.ShardFileSize(totalLength), - make([][]byte, len(readers)), + readers: readers, + orgReaders: readers, + dataBlocks: e.dataBlocks, + offset: (offset / e.blockSize) * e.ShardSize(), + shardSize: e.ShardSize(), + shardFileSize: e.ShardFileSize(totalLength), + buf: make([][]byte, len(readers)), + readerToBuf: r2b, + } +} + +// preferReaders can mark readers as preferred. +// These will be chosen before others. +func (p *parallelReader) preferReaders(prefer []bool) { + if len(prefer) != len(p.orgReaders) { + return + } + // Copy so we don't change our input. + tmp := make([]io.ReaderAt, len(p.orgReaders)) + copy(tmp, p.orgReaders) + p.readers = tmp + // next is the next non-preferred index. + next := 0 + for i, ok := range prefer { + if !ok || p.readers[i] == nil { + continue + } + if i == next { + next++ + continue + } + // Move reader with index i to index next. + // Do this by swapping next and i + p.readers[next], p.readers[i] = p.readers[i], p.readers[next] + p.readerToBuf[next] = i + p.readerToBuf[i] = next + next++ } } @@ -54,7 +91,7 @@ func newParallelReader(readers []io.ReaderAt, e Erasure, offset, totalLength int func (p *parallelReader) canDecode(buf [][]byte) bool { bufCount := 0 for _, b := range buf { - if b != nil { + if len(b) > 0 { bufCount++ } } @@ -62,13 +99,23 @@ func (p *parallelReader) canDecode(buf [][]byte) bool { } // Read reads from readers in parallel. Returns p.dataBlocks number of bufs. -func (p *parallelReader) Read() ([][]byte, error) { - newBuf := make([][]byte, len(p.readers)) +func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) { + newBuf := dst + if len(dst) != len(p.readers) { + newBuf = make([][]byte, len(p.readers)) + } else { + for i := range newBuf { + newBuf[i] = newBuf[i][:0] + } + } var newBufLK sync.RWMutex if p.offset+p.shardSize > p.shardFileSize { p.shardSize = p.shardFileSize - p.offset } + if p.shardSize == 0 { + return newBuf, nil + } readTriggerCh := make(chan bool, len(p.readers)) for i := 0; i < p.dataBlocks; i++ { @@ -104,26 +151,30 @@ func (p *parallelReader) Read() ([][]byte, error) { readTriggerCh <- true return } - if p.buf[i] == nil { + bufIdx := p.readerToBuf[i] + if p.buf[bufIdx] == nil { // Reading first time on this disk, hence the buffer needs to be allocated. // Subsequent reads will re-use this buffer. - p.buf[i] = make([]byte, p.shardSize) + p.buf[bufIdx] = make([]byte, p.shardSize) } // For the last shard, the shardsize might be less than previous shard sizes. // Hence the following statement ensures that the buffer size is reset to the right size. - p.buf[i] = p.buf[i][:p.shardSize] - _, err := disk.ReadAt(p.buf[i], p.offset) + p.buf[bufIdx] = p.buf[bufIdx][:p.shardSize] + _, err := disk.ReadAt(p.buf[bufIdx], p.offset) if err != nil { if _, ok := err.(*errHashMismatch); ok { atomic.StoreInt32(&healRequired, 1) } + + // This will be communicated upstream. + p.orgReaders[bufIdx] = nil p.readers[i] = nil // Since ReadAt returned error, trigger another read. readTriggerCh <- true return } newBufLK.Lock() - newBuf[i] = p.buf[i] + newBuf[bufIdx] = p.buf[bufIdx] newBufLK.Unlock() // Since ReadAt returned success, there is no need to trigger another read. readTriggerCh <- false @@ -156,8 +207,9 @@ func (err *errDecodeHealRequired) Unwrap() error { } // Decode reads from readers, reconstructs data if needed and writes the data to the writer. -func (e Erasure) Decode(ctx context.Context, writer io.Writer, readers []io.ReaderAt, offset, length, totalLength int64) error { - healRequired, err := e.decode(ctx, writer, readers, offset, length, totalLength) +// A set of preferred drives can be supplied. In that case they will be used and the data reconstructed. +func (e Erasure) Decode(ctx context.Context, writer io.Writer, readers []io.ReaderAt, offset, length, totalLength int64, prefer []bool) error { + healRequired, err := e.decode(ctx, writer, readers, offset, length, totalLength, prefer) if healRequired { return &errDecodeHealRequired{err} } @@ -166,7 +218,7 @@ func (e Erasure) Decode(ctx context.Context, writer io.Writer, readers []io.Read } // Decode reads from readers, reconstructs data if needed and writes the data to the writer. -func (e Erasure) decode(ctx context.Context, writer io.Writer, readers []io.ReaderAt, offset, length, totalLength int64) (bool, error) { +func (e Erasure) decode(ctx context.Context, writer io.Writer, readers []io.ReaderAt, offset, length, totalLength int64, prefer []bool) (bool, error) { if offset < 0 || length < 0 { logger.LogIf(ctx, errInvalidArgument) return false, errInvalidArgument @@ -180,12 +232,16 @@ func (e Erasure) decode(ctx context.Context, writer io.Writer, readers []io.Read } reader := newParallelReader(readers, e, offset, totalLength) + if len(prefer) == len(readers) { + reader.preferReaders(prefer) + } startBlock := offset / e.blockSize endBlock := (offset + length) / e.blockSize var healRequired bool var bytesWritten int64 + var bufs [][]byte for block := startBlock; block <= endBlock; block++ { var blockOffset, blockLength int64 switch { @@ -205,9 +261,11 @@ func (e Erasure) decode(ctx context.Context, writer io.Writer, readers []io.Read if blockLength == 0 { break } - bufs, err := reader.Read() + var err error + bufs, err = reader.Read(bufs) if err != nil { if errors.Is(err, errHealRequired) { + // errHealRequired is only returned if there are be enough data for reconstruction. healRequired = true } else { return healRequired, err diff --git a/cmd/erasure-decode_test.go b/cmd/erasure-decode_test.go index 9439147b2..f168b3cb1 100644 --- a/cmd/erasure-decode_test.go +++ b/cmd/erasure-decode_test.go @@ -138,7 +138,7 @@ func TestErasureDecode(t *testing.T) { } writer := bytes.NewBuffer(nil) - err = erasure.Decode(context.Background(), writer, bitrotReaders, test.offset, test.length, test.data) + err = erasure.Decode(context.Background(), writer, bitrotReaders, test.offset, test.length, test.data, nil) closeBitrotReaders(bitrotReaders) if err != nil && !test.shouldFail { t.Errorf("Test %d: should pass but failed with: %v", i, err) @@ -181,7 +181,7 @@ func TestErasureDecode(t *testing.T) { bitrotReaders[0] = nil } writer.Reset() - err = erasure.Decode(context.Background(), writer, bitrotReaders, test.offset, test.length, test.data) + err = erasure.Decode(context.Background(), writer, bitrotReaders, test.offset, test.length, test.data, nil) closeBitrotReaders(bitrotReaders) if err != nil && !test.shouldFailQuorum { t.Errorf("Test %d: should pass but failed with: %v", i, err) @@ -191,7 +191,7 @@ func TestErasureDecode(t *testing.T) { } if !test.shouldFailQuorum { if content := writer.Bytes(); !bytes.Equal(content, data[test.offset:test.offset+test.length]) { - t.Errorf("Test %d: read retruns wrong file content", i) + t.Errorf("Test %d: read returns wrong file content", i) } } } @@ -271,7 +271,7 @@ func TestErasureDecodeRandomOffsetLength(t *testing.T) { tillOffset := erasure.ShardFileTillOffset(offset, readLen, length) bitrotReaders[index] = newStreamingBitrotReader(disk, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) } - err = erasure.Decode(context.Background(), buf, bitrotReaders, offset, readLen, length) + err = erasure.Decode(context.Background(), buf, bitrotReaders, offset, readLen, length, nil) closeBitrotReaders(bitrotReaders) if err != nil { t.Fatal(err, offset, readLen) @@ -333,7 +333,7 @@ func benchmarkErasureDecode(data, parity, dataDown, parityDown int, size int64, tillOffset := erasure.ShardFileTillOffset(0, size, size) bitrotReaders[index] = newStreamingBitrotReader(disk, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) } - if err = erasure.Decode(context.Background(), bytes.NewBuffer(content[:0]), bitrotReaders, 0, size, size); err != nil { + if err = erasure.Decode(context.Background(), bytes.NewBuffer(content[:0]), bitrotReaders, 0, size, size, nil); err != nil { panic(err) } closeBitrotReaders(bitrotReaders) diff --git a/cmd/erasure-heal.go b/cmd/erasure-heal.go index d6a9c6d54..947416abf 100644 --- a/cmd/erasure-heal.go +++ b/cmd/erasure-heal.go @@ -28,7 +28,7 @@ import ( func (e Erasure) Heal(ctx context.Context, readers []io.ReaderAt, writers []io.Writer, size int64) error { r, w := io.Pipe() go func() { - if err := e.Decode(ctx, w, readers, 0, size, size); err != nil { + if err := e.Decode(ctx, w, readers, 0, size, size, nil); err != nil { w.CloseWithError(err) return } diff --git a/cmd/erasure.go b/cmd/erasure.go index dd235b7de..4a6f31a6a 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -87,14 +87,15 @@ func (e *Erasure) EncodeData(ctx context.Context, data []byte) ([][]byte, error) // It only decodes the data blocks but does not verify them. // It returns an error if the decoding failed. func (e *Erasure) DecodeDataBlocks(data [][]byte) error { - needsReconstruction := false - for _, b := range data[:e.dataBlocks] { - if b == nil { - needsReconstruction = true + var isZero = 0 + for _, b := range data[:] { + if len(b) == 0 { + isZero++ break } } - if !needsReconstruction { + if isZero == 0 || isZero == len(data) { + // If all are zero, payload is 0 bytes. return nil } return e.encoder().ReconstructData(data) diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 4adb7754a..14ec3b356 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -265,6 +265,7 @@ func (xl xlObjects) getObject(ctx context.Context, bucket, object string, startO return toObjectErr(err, bucket, object) } + var healOnce sync.Once for ; partIndex <= lastPartIndex; partIndex++ { if length == totalBytesRead { break @@ -284,6 +285,7 @@ func (xl xlObjects) getObject(ctx context.Context, bucket, object string, startO tillOffset := erasure.ShardFileTillOffset(partOffset, partLength, partSize) // Get the checksums of the current part. readers := make([]io.ReaderAt, len(onlineDisks)) + prefer := make([]bool, len(onlineDisks)) for index, disk := range onlineDisks { if disk == OfflineDisk { continue @@ -292,14 +294,19 @@ func (xl xlObjects) getObject(ctx context.Context, bucket, object string, startO partPath := pathJoin(object, fmt.Sprintf("part.%d", partNumber)) readers[index] = newBitrotReader(disk, bucket, partPath, tillOffset, checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize()) + + // Prefer local disks + prefer[index] = disk.Hostname() == "" } - err := erasure.Decode(ctx, writer, readers, partOffset, partLength, partSize) + err := erasure.Decode(ctx, writer, readers, partOffset, partLength, partSize, prefer) // Note: we should not be defer'ing the following closeBitrotReaders() call as we are inside a for loop i.e if we use defer, we would accumulate a lot of open files by the time // we return from this function. closeBitrotReaders(readers) if err != nil { if decodeHealErr, ok := err.(*errDecodeHealRequired); ok { - go deepHealObject(pathJoin(bucket, object)) + healOnce.Do(func() { + go deepHealObject(pathJoin(bucket, object)) + }) err = decodeHealErr.err } if err != nil {