diff --git a/cmd/endpoint.go b/cmd/endpoint.go index b9ecd1aa1..8c9a5ada1 100644 --- a/cmd/endpoint.go +++ b/cmd/endpoint.go @@ -753,9 +753,17 @@ func GetProxyEndpoints(endpointZones EndpointZones) ([]ProxyEndpoint, error) { RootCAs: globalRootCAs, } } + + tr := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout)() + // Allow more requests to be in flight with higher response header timeout. + tr.ResponseHeaderTimeout = 30 * time.Minute + tr.MaxConnsPerHost = 256 + tr.MaxIdleConnsPerHost = 16 + tr.MaxIdleConns = 256 + proxyEps = append(proxyEps, ProxyEndpoint{ Endpoint: endpoint, - Transport: newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout)(), + Transport: tr, }) } } diff --git a/cmd/erasure-common.go b/cmd/erasure-common.go index 4a05f00e3..bf84c4914 100644 --- a/cmd/erasure-common.go +++ b/cmd/erasure-common.go @@ -34,6 +34,22 @@ func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) { return newDisks } +// getLoadBalancedNDisks - fetches load balanced (sufficiently randomized) disk slice with N disks online +func (er erasureObjects) getLoadBalancedNDisks(ndisks int) (newDisks []StorageAPI) { + disks := er.getLoadBalancedDisks() + for _, disk := range disks { + if disk == nil { + continue + } + newDisks = append(newDisks, disk) + ndisks-- + if ndisks == 0 { + break + } + } + return +} + // getLoadBalancedDisks - fetches load balanced (sufficiently randomized) disk slice. func (er erasureObjects) getLoadBalancedDisks() (newDisks []StorageAPI) { disks := er.getDisks() diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index e5f6e717e..0e78e9b25 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -961,7 +961,7 @@ func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileI } func (s *erasureSets) startMergeWalks(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}) []FileInfoCh { - return s.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, -1) + return s.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, -1, false) } func (s *erasureSets) startMergeWalksVersions(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}) []FileInfoVersionsCh { @@ -972,90 +972,64 @@ func (s *erasureSets) startMergeWalksVersions(ctx context.Context, bucket, prefi // FileInfoCh which can be read from. func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}, ndisks int) []FileInfoVersionsCh { var entryChs []FileInfoVersionsCh - var success int + var wg sync.WaitGroup + var mutex sync.Mutex for _, set := range s.sets { // Reset for the next erasure set. - success = ndisks - for _, disk := range set.getLoadBalancedDisks() { - if disk == nil { - // Disk can be offline - continue - } - entryCh, err := disk.WalkVersions(GlobalContext, bucket, prefix, marker, recursive, endWalkCh) - if err != nil { - logger.LogIf(ctx, err) - // Disk walk returned error, ignore it. - continue - } - entryChs = append(entryChs, FileInfoVersionsCh{ - Ch: entryCh, - }) - success-- - if success == 0 { - break - } - } - } - return entryChs -} + for _, disk := range set.getLoadBalancedNDisks(ndisks) { + wg.Add(1) + go func(disk StorageAPI) { + defer wg.Done() + entryCh, err := disk.WalkVersions(GlobalContext, bucket, prefix, marker, recursive, endWalkCh) + if err != nil { + return + } -// Starts a walk channel across n number of disks and returns a slice of -// FileInfoCh which can be read from. -func (s *erasureSets) startMergeWalksN(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}, ndisks int) []FileInfoCh { - var entryChs []FileInfoCh - var success int - for _, set := range s.sets { - // Reset for the next erasure set. - success = ndisks - for _, disk := range set.getLoadBalancedDisks() { - if disk == nil { - // Disk can be offline - continue - } - entryCh, err := disk.Walk(GlobalContext, bucket, prefix, marker, recursive, endWalkCh) - if err != nil { - // Disk walk returned error, ignore it. - continue - } - entryChs = append(entryChs, FileInfoCh{ - Ch: entryCh, - }) - success-- - if success == 0 { - break - } + mutex.Lock() + entryChs = append(entryChs, FileInfoVersionsCh{ + Ch: entryCh, + }) + mutex.Unlock() + }(disk) } } + wg.Wait() return entryChs } // Starts a walk channel across n number of disks and returns a slice of -// FileInfo channels which can be read from. -func (s *erasureSets) startSplunkMergeWalksN(ctx context.Context, bucket, prefix, marker string, endWalkCh <-chan struct{}, ndisks int) []FileInfoCh { +// FileInfoCh which can be read from. +func (s *erasureSets) startMergeWalksN(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}, ndisks int, splunk bool) []FileInfoCh { var entryChs []FileInfoCh - var success int + var wg sync.WaitGroup + var mutex sync.Mutex for _, set := range s.sets { // Reset for the next erasure set. - success = ndisks - for _, disk := range set.getLoadBalancedDisks() { - if disk == nil { - // Disk can be offline - continue - } - entryCh, err := disk.WalkSplunk(GlobalContext, bucket, prefix, marker, endWalkCh) - if err != nil { - // Disk walk returned error, ignore it. - continue - } - entryChs = append(entryChs, FileInfoCh{ - Ch: entryCh, - }) - success-- - if success == 0 { - break - } + for _, disk := range set.getLoadBalancedNDisks(ndisks) { + wg.Add(1) + go func(disk StorageAPI) { + defer wg.Done() + + var entryCh chan FileInfo + var err error + if splunk { + entryCh, err = disk.WalkSplunk(GlobalContext, bucket, prefix, marker, endWalkCh) + } else { + entryCh, err = disk.Walk(GlobalContext, bucket, prefix, marker, recursive, endWalkCh) + } + if err != nil { + // Disk walk returned error, ignore it. + return + } + mutex.Lock() + entryChs = append(entryChs, FileInfoCh{ + Ch: entryCh, + }) + mutex.Unlock() + }(disk) } } + wg.Wait() return entryChs } diff --git a/cmd/erasure-zones.go b/cmd/erasure-zones.go index 17d7461b5..49e024320 100644 --- a/cmd/erasure-zones.go +++ b/cmd/erasure-zones.go @@ -697,7 +697,7 @@ func (z *erasureZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, for _, zone := range z.zones { zonesEntryChs = append(zonesEntryChs, - zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, zone.listTolerancePerSet)) + zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, zone.listTolerancePerSet, false)) zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet) } @@ -816,7 +816,7 @@ func (z *erasureZones) listObjectsSplunk(ctx context.Context, bucket, prefix, ma entryChs, endWalkCh := zone.poolSplunk.Release(listParams{bucket, recursive, marker, prefix}) if entryChs == nil { endWalkCh = make(chan struct{}) - entryChs = zone.startSplunkMergeWalksN(ctx, bucket, prefix, marker, endWalkCh, zone.listTolerancePerSet) + entryChs = zone.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.listTolerancePerSet, true) } zonesEntryChs = append(zonesEntryChs, entryChs) zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh) @@ -908,7 +908,7 @@ func (z *erasureZones) listObjects(ctx context.Context, bucket, prefix, marker, entryChs, endWalkCh := zone.pool.Release(listParams{bucket, recursive, marker, prefix}) if entryChs == nil { endWalkCh = make(chan struct{}) - entryChs = zone.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.listTolerancePerSet) + entryChs = zone.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.listTolerancePerSet, false) } zonesEntryChs = append(zonesEntryChs, entryChs) zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)