From e1280779ed98e2f87df5ecf087651bbad1aa8d3d Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Tue, 28 Jul 2015 17:52:24 +0530 Subject: [PATCH 1/2] Read from the disks in parallel during object read --- pkg/donut/bucket.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/pkg/donut/bucket.go b/pkg/donut/bucket.go index fa8079505..806d513d2 100644 --- a/pkg/donut/bucket.go +++ b/pkg/donut/bucket.go @@ -543,13 +543,25 @@ func (b bucket) decodeEncodedData(totalLeft, blockSize int64, readers map[int]io return nil, iodine.New(err, nil) } encodedBytes := make([][]byte, encoder.k+encoder.m) + + errCh := make(chan error) + var errRet error + for i, reader := range readers { - var bytesBuffer bytes.Buffer - _, err := io.CopyN(&bytesBuffer, reader, int64(curChunkSize)) + go func(i int, reader io.Reader) { + encodedBytes[i] = make([]byte, curChunkSize) + _, err := io.ReadFull(reader, encodedBytes[i]) + errCh <- err + }(i, reader) + } + for range readers { + err := <-errCh if err != nil { - return nil, iodine.New(err, nil) + errRet = err } - encodedBytes[i] = bytesBuffer.Bytes() + } + if errRet != nil { + return nil, iodine.New(errRet, nil) } decodedData, err := encoder.Decode(encodedBytes, int(curBlockSize)) if err != nil { From bdc00624fdb3d086070d094761abc9b4dcae3548 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Wed, 29 Jul 2015 19:36:41 +0530 Subject: [PATCH 2/2] get erros from buffered channel. Return error during object read only if we have readers < k --- pkg/donut/bucket.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/pkg/donut/bucket.go b/pkg/donut/bucket.go index 806d513d2..2d9eae58f 100644 --- a/pkg/donut/bucket.go +++ b/pkg/donut/bucket.go @@ -544,23 +544,31 @@ func (b bucket) decodeEncodedData(totalLeft, blockSize int64, readers map[int]io } encodedBytes := make([][]byte, encoder.k+encoder.m) - errCh := make(chan error) + errCh := make(chan error, len(readers)) var errRet error + readCnt := 0 for i, reader := range readers { - go func(i int, reader io.Reader) { + i := i + reader := reader + go func() { encodedBytes[i] = make([]byte, curChunkSize) _, err := io.ReadFull(reader, encodedBytes[i]) + if err != nil { + encodedBytes[i] = nil + } errCh <- err - }(i, reader) + }() } for range readers { err := <-errCh if err != nil { errRet = err + } else { + readCnt++ } } - if errRet != nil { + if readCnt < int(encoder.k) { return nil, iodine.New(errRet, nil) } decodedData, err := encoder.Decode(encodedBytes, int(curBlockSize))