From 2f681bed57de127e0c262dc3c470c0cdd9d50fe6 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 25 Jun 2020 23:20:12 -0700 Subject: [PATCH] fix: pop entries from each drives in parallel (#9918) --- cmd/erasure-decode.go | 8 +++--- cmd/erasure-zones.go | 44 +++++++++++++++++++++++++++--- cmd/object-api-listobjects_test.go | 2 +- 3 files changed, 45 insertions(+), 9 deletions(-) diff --git a/cmd/erasure-decode.go b/cmd/erasure-decode.go index ed8abbb16..68055979f 100644 --- a/cmd/erasure-decode.go +++ b/cmd/erasure-decode.go @@ -145,9 +145,9 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) { wg.Add(1) go func(i int) { defer wg.Done() - disk := p.readers[i] - if disk == nil { - // Since disk is nil, trigger another read. + rr := p.readers[i] + if rr == nil { + // Since reader is nil, trigger another read. readTriggerCh <- true return } @@ -160,7 +160,7 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) { // For the last shard, the shardsize might be less than previous shard sizes. // Hence the following statement ensures that the buffer size is reset to the right size. p.buf[bufIdx] = p.buf[bufIdx][:p.shardSize] - _, err := disk.ReadAt(p.buf[bufIdx], p.offset) + _, err := rr.ReadAt(p.buf[bufIdx], p.offset) if err != nil { if _, ok := err.(*errHashMismatch); ok { atomic.StoreInt32(&healRequired, 1) diff --git a/cmd/erasure-zones.go b/cmd/erasure-zones.go index 3fe5dd7ac..173ff60c2 100644 --- a/cmd/erasure-zones.go +++ b/cmd/erasure-zones.go @@ -928,9 +928,18 @@ func (z *erasureZones) listObjects(ctx context.Context, bucket, prefix, marker, // N times until this boolean is 'false'. func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) (FileInfo, int, int, bool) { for i, entryChs := range zoneEntryChs { + i := i + var wg sync.WaitGroup for j := range entryChs { - zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() + j := j + wg.Add(1) + // Pop() entries in parallel for large drive setups. + go func() { + defer wg.Done() + zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() + }() } + wg.Wait() } var isTruncated = false @@ -1008,9 +1017,18 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI // N times until this boolean is 'false'. func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneEntries [][]FileInfoVersions, zoneEntriesValid [][]bool) (FileInfoVersions, int, int, bool) { for i, entryChs := range zoneEntryChs { + i := i + var wg sync.WaitGroup for j := range entryChs { - zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() + j := j + wg.Add(1) + // Pop() entries in parallel for large drive setups. + go func() { + defer wg.Done() + zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() + }() } + wg.Wait() } var isTruncated = false @@ -1141,9 +1159,18 @@ func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, ndisks int) func isTruncatedZones(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) bool { for i, entryChs := range zoneEntryChs { + i := i + var wg sync.WaitGroup for j := range entryChs { - zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() + j := j + wg.Add(1) + // Pop() entries in parallel for large drive setups. + go func() { + defer wg.Done() + zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() + }() } + wg.Wait() } var isTruncated = false @@ -1170,9 +1197,18 @@ func isTruncatedZones(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zon func isTruncatedZonesVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneEntries [][]FileInfoVersions, zoneEntriesValid [][]bool) bool { for i, entryChs := range zoneEntryChs { + i := i + var wg sync.WaitGroup for j := range entryChs { - zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() + j := j + wg.Add(1) + // Pop() entries in parallel for large drive setups. + go func() { + defer wg.Done() + zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() + }() } + wg.Wait() } var isTruncated = false diff --git a/cmd/object-api-listobjects_test.go b/cmd/object-api-listobjects_test.go index c60966249..561774084 100644 --- a/cmd/object-api-listobjects_test.go +++ b/cmd/object-api-listobjects_test.go @@ -573,7 +573,7 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) { for i, testCase := range testCases { testCase := testCase - t.Run(fmt.Sprintf("Test%d-%s", i+1, instanceType), func(t *testing.T) { + t.Run(fmt.Sprintf("%s-Test%d", instanceType, i+1), func(t *testing.T) { result, err := obj.ListObjects(context.Background(), testCase.bucketName, testCase.prefix, testCase.marker, testCase.delimeter, int(testCase.maxKeys)) if err != nil && testCase.shouldPass {