Simplify parallelReader.Read() (#7109)

Simplify parallelReader.Read() which also fixes previous 
implementation where it was returning before all the parallel 
reading go-routines had terminated which caused race conditions.
master
Krishna Srinivas 6 years ago committed by Nitish Tiwari
parent 6dd8a83c5a
commit 730ac5381c
  1. 107
      cmd/erasure-decode.go

@ -19,6 +19,7 @@ package cmd
import ( import (
"context" "context"
"io" "io"
"sync"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
) )
@ -58,74 +59,74 @@ func (p *parallelReader) canDecode(buf [][]byte) bool {
// Read reads from readers in parallel. Returns p.dataBlocks number of bufs. // Read reads from readers in parallel. Returns p.dataBlocks number of bufs.
func (p *parallelReader) Read() ([][]byte, error) { func (p *parallelReader) Read() ([][]byte, error) {
type errIdx struct {
idx int
buf []byte
err error
}
errCh := make(chan errIdx)
currReaderIndex := 0
newBuf := make([][]byte, len(p.readers)) newBuf := make([][]byte, len(p.readers))
var newBufLK sync.RWMutex
if p.offset+p.shardSize > p.shardFileSize { if p.offset+p.shardSize > p.shardFileSize {
p.shardSize = p.shardFileSize - p.offset p.shardSize = p.shardFileSize - p.offset
} }
read := func(currReaderIndex int) { readTriggerCh := make(chan bool, len(p.readers))
if p.buf[currReaderIndex] == nil { for i := 0; i < p.dataBlocks; i++ {
p.buf[currReaderIndex] = make([]byte, p.shardSize) // Setup read triggers for p.dataBlocks number of reads so that it reads in parallel.
} readTriggerCh <- true
p.buf[currReaderIndex] = p.buf[currReaderIndex][:p.shardSize]
_, err := p.readers[currReaderIndex].ReadAt(p.buf[currReaderIndex], p.offset)
errCh <- errIdx{currReaderIndex, p.buf[currReaderIndex], err}
} }
readerCount := 0 readerIndex := 0
for _, r := range p.readers { var wg sync.WaitGroup
if r != nil { // if readTrigger is true, it implies next disk.ReadAt() should be tried
readerCount++ // if readTrigger is false, it implies previous disk.ReadAt() was successful and there is no need
} // to try reading the next disk.
} for readTrigger := range readTriggerCh {
if readerCount < p.dataBlocks { newBufLK.RLock()
return nil, errXLReadQuorum canDecode := p.canDecode(newBuf)
} newBufLK.RUnlock()
if canDecode {
readerCount = 0 break
for i, r := range p.readers {
if r == nil {
continue
} }
go read(i) if readerIndex == len(p.readers) {
readerCount++
if readerCount == p.dataBlocks {
currReaderIndex = i + 1
break break
} }
} if !readTrigger {
for errVal := range errCh {
if errVal.err == nil {
newBuf[errVal.idx] = errVal.buf
if p.canDecode(newBuf) {
p.offset += int64(p.shardSize)
return newBuf, nil
}
continue continue
} }
p.readers[errVal.idx] = nil wg.Add(1)
for currReaderIndex < len(p.readers) { go func(i int) {
if p.readers[currReaderIndex] != nil { defer wg.Done()
break disk := p.readers[i]
if disk == nil {
// Since disk is nil, trigger another read.
readTriggerCh <- true
return
} }
currReaderIndex++ if p.buf[i] == nil {
} // Reading first time on this disk, hence the buffer needs to be allocated.
// Subsequent reads will re-use this buffer.
p.buf[i] = make([]byte, p.shardSize)
}
// For the last shard, the shardsize might be less than previous shard sizes.
// Hence the following statement ensures that the buffer size is reset to the right size.
p.buf[i] = p.buf[i][:p.shardSize]
_, err := disk.ReadAt(p.buf[i], p.offset)
if err != nil {
p.readers[i] = nil
// Since ReadAt returned error, trigger another read.
readTriggerCh <- true
return
}
newBufLK.Lock()
newBuf[i] = p.buf[i]
newBufLK.Unlock()
// Since ReadAt returned success, there is no need to trigger another read.
readTriggerCh <- false
}(readerIndex)
readerIndex++
}
wg.Wait()
if currReaderIndex == len(p.readers) { if p.canDecode(newBuf) {
break p.offset += p.shardSize
} return newBuf, nil
go read(currReaderIndex)
currReaderIndex++
} }
return nil, errXLReadQuorum return nil, errXLReadQuorum

Loading…
Cancel
Save