fix: pop entries from each drives in parallel (#9918)

master
Harshavardhana 4 years ago committed by GitHub
parent 2d0f65a5e3
commit 2f681bed57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      cmd/erasure-decode.go
  2. 36
      cmd/erasure-zones.go
  3. 2
      cmd/object-api-listobjects_test.go

@ -145,9 +145,9 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) {
wg.Add(1) wg.Add(1)
go func(i int) { go func(i int) {
defer wg.Done() defer wg.Done()
disk := p.readers[i] rr := p.readers[i]
if disk == nil { if rr == nil {
// Since disk is nil, trigger another read. // Since reader is nil, trigger another read.
readTriggerCh <- true readTriggerCh <- true
return return
} }
@ -160,7 +160,7 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) {
// For the last shard, the shardsize might be less than previous shard sizes. // For the last shard, the shardsize might be less than previous shard sizes.
// Hence the following statement ensures that the buffer size is reset to the right size. // Hence the following statement ensures that the buffer size is reset to the right size.
p.buf[bufIdx] = p.buf[bufIdx][:p.shardSize] p.buf[bufIdx] = p.buf[bufIdx][:p.shardSize]
_, err := disk.ReadAt(p.buf[bufIdx], p.offset) _, err := rr.ReadAt(p.buf[bufIdx], p.offset)
if err != nil { if err != nil {
if _, ok := err.(*errHashMismatch); ok { if _, ok := err.(*errHashMismatch); ok {
atomic.StoreInt32(&healRequired, 1) atomic.StoreInt32(&healRequired, 1)

@ -928,9 +928,18 @@ func (z *erasureZones) listObjects(ctx context.Context, bucket, prefix, marker,
// N times until this boolean is 'false'. // N times until this boolean is 'false'.
func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) (FileInfo, int, int, bool) { func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) (FileInfo, int, int, bool) {
for i, entryChs := range zoneEntryChs { for i, entryChs := range zoneEntryChs {
i := i
var wg sync.WaitGroup
for j := range entryChs { for j := range entryChs {
j := j
wg.Add(1)
// Pop() entries in parallel for large drive setups.
go func() {
defer wg.Done()
zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop()
}()
} }
wg.Wait()
} }
var isTruncated = false var isTruncated = false
@ -1008,9 +1017,18 @@ func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileI
// N times until this boolean is 'false'. // N times until this boolean is 'false'.
func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneEntries [][]FileInfoVersions, zoneEntriesValid [][]bool) (FileInfoVersions, int, int, bool) { func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneEntries [][]FileInfoVersions, zoneEntriesValid [][]bool) (FileInfoVersions, int, int, bool) {
for i, entryChs := range zoneEntryChs { for i, entryChs := range zoneEntryChs {
i := i
var wg sync.WaitGroup
for j := range entryChs { for j := range entryChs {
j := j
wg.Add(1)
// Pop() entries in parallel for large drive setups.
go func() {
defer wg.Done()
zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop()
}()
} }
wg.Wait()
} }
var isTruncated = false var isTruncated = false
@ -1141,9 +1159,18 @@ func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, ndisks int)
func isTruncatedZones(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) bool { func isTruncatedZones(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) bool {
for i, entryChs := range zoneEntryChs { for i, entryChs := range zoneEntryChs {
i := i
var wg sync.WaitGroup
for j := range entryChs { for j := range entryChs {
j := j
wg.Add(1)
// Pop() entries in parallel for large drive setups.
go func() {
defer wg.Done()
zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop()
}()
} }
wg.Wait()
} }
var isTruncated = false var isTruncated = false
@ -1170,9 +1197,18 @@ func isTruncatedZones(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zon
func isTruncatedZonesVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneEntries [][]FileInfoVersions, zoneEntriesValid [][]bool) bool { func isTruncatedZonesVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneEntries [][]FileInfoVersions, zoneEntriesValid [][]bool) bool {
for i, entryChs := range zoneEntryChs { for i, entryChs := range zoneEntryChs {
i := i
var wg sync.WaitGroup
for j := range entryChs { for j := range entryChs {
j := j
wg.Add(1)
// Pop() entries in parallel for large drive setups.
go func() {
defer wg.Done()
zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop()
}()
} }
wg.Wait()
} }
var isTruncated = false var isTruncated = false

@ -573,7 +573,7 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
for i, testCase := range testCases { for i, testCase := range testCases {
testCase := testCase testCase := testCase
t.Run(fmt.Sprintf("Test%d-%s", i+1, instanceType), func(t *testing.T) { t.Run(fmt.Sprintf("%s-Test%d", instanceType, i+1), func(t *testing.T) {
result, err := obj.ListObjects(context.Background(), testCase.bucketName, result, err := obj.ListObjects(context.Background(), testCase.bucketName,
testCase.prefix, testCase.marker, testCase.delimeter, int(testCase.maxKeys)) testCase.prefix, testCase.marker, testCase.delimeter, int(testCase.maxKeys))
if err != nil && testCase.shouldPass { if err != nil && testCase.shouldPass {

Loading…
Cancel
Save