Bump response header timeout for proxying list request (#10420)

master
Harshavardhana 4 years ago committed by GitHub
parent 746f1585eb
commit eb19c8af40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      cmd/endpoint.go
  2. 16
      cmd/erasure-common.go
  3. 116
      cmd/erasure-sets.go
  4. 6
      cmd/erasure-zones.go

@ -753,9 +753,17 @@ func GetProxyEndpoints(endpointZones EndpointZones) ([]ProxyEndpoint, error) {
RootCAs: globalRootCAs,
}
}
tr := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout)()
// Allow more requests to be in flight with higher response header timeout.
tr.ResponseHeaderTimeout = 30 * time.Minute
tr.MaxConnsPerHost = 256
tr.MaxIdleConnsPerHost = 16
tr.MaxIdleConns = 256
proxyEps = append(proxyEps, ProxyEndpoint{
Endpoint: endpoint,
Transport: newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout)(),
Transport: tr,
})
}
}

@ -34,6 +34,22 @@ func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) {
return newDisks
}
// getLoadBalancedNDisks - fetches load balanced (sufficiently randomized) disk slice with N disks online
func (er erasureObjects) getLoadBalancedNDisks(ndisks int) (newDisks []StorageAPI) {
disks := er.getLoadBalancedDisks()
for _, disk := range disks {
if disk == nil {
continue
}
newDisks = append(newDisks, disk)
ndisks--
if ndisks == 0 {
break
}
}
return
}
// getLoadBalancedDisks - fetches load balanced (sufficiently randomized) disk slice.
func (er erasureObjects) getLoadBalancedDisks() (newDisks []StorageAPI) {
disks := er.getDisks()

@ -961,7 +961,7 @@ func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileI
}
func (s *erasureSets) startMergeWalks(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}) []FileInfoCh {
return s.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, -1)
return s.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, -1, false)
}
func (s *erasureSets) startMergeWalksVersions(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}) []FileInfoVersionsCh {
@ -972,90 +972,64 @@ func (s *erasureSets) startMergeWalksVersions(ctx context.Context, bucket, prefi
// FileInfoCh which can be read from.
func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}, ndisks int) []FileInfoVersionsCh {
var entryChs []FileInfoVersionsCh
var success int
var wg sync.WaitGroup
var mutex sync.Mutex
for _, set := range s.sets {
// Reset for the next erasure set.
success = ndisks
for _, disk := range set.getLoadBalancedDisks() {
if disk == nil {
// Disk can be offline
continue
}
entryCh, err := disk.WalkVersions(GlobalContext, bucket, prefix, marker, recursive, endWalkCh)
if err != nil {
logger.LogIf(ctx, err)
// Disk walk returned error, ignore it.
continue
}
entryChs = append(entryChs, FileInfoVersionsCh{
Ch: entryCh,
})
success--
if success == 0 {
break
}
}
}
return entryChs
}
for _, disk := range set.getLoadBalancedNDisks(ndisks) {
wg.Add(1)
go func(disk StorageAPI) {
defer wg.Done()
entryCh, err := disk.WalkVersions(GlobalContext, bucket, prefix, marker, recursive, endWalkCh)
if err != nil {
return
}
// Starts a walk channel across n number of disks and returns a slice of
// FileInfoCh which can be read from.
func (s *erasureSets) startMergeWalksN(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}, ndisks int) []FileInfoCh {
var entryChs []FileInfoCh
var success int
for _, set := range s.sets {
// Reset for the next erasure set.
success = ndisks
for _, disk := range set.getLoadBalancedDisks() {
if disk == nil {
// Disk can be offline
continue
}
entryCh, err := disk.Walk(GlobalContext, bucket, prefix, marker, recursive, endWalkCh)
if err != nil {
// Disk walk returned error, ignore it.
continue
}
entryChs = append(entryChs, FileInfoCh{
Ch: entryCh,
})
success--
if success == 0 {
break
}
mutex.Lock()
entryChs = append(entryChs, FileInfoVersionsCh{
Ch: entryCh,
})
mutex.Unlock()
}(disk)
}
}
wg.Wait()
return entryChs
}
// Starts a walk channel across n number of disks and returns a slice of
// FileInfo channels which can be read from.
func (s *erasureSets) startSplunkMergeWalksN(ctx context.Context, bucket, prefix, marker string, endWalkCh <-chan struct{}, ndisks int) []FileInfoCh {
// FileInfoCh which can be read from.
func (s *erasureSets) startMergeWalksN(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}, ndisks int, splunk bool) []FileInfoCh {
var entryChs []FileInfoCh
var success int
var wg sync.WaitGroup
var mutex sync.Mutex
for _, set := range s.sets {
// Reset for the next erasure set.
success = ndisks
for _, disk := range set.getLoadBalancedDisks() {
if disk == nil {
// Disk can be offline
continue
}
entryCh, err := disk.WalkSplunk(GlobalContext, bucket, prefix, marker, endWalkCh)
if err != nil {
// Disk walk returned error, ignore it.
continue
}
entryChs = append(entryChs, FileInfoCh{
Ch: entryCh,
})
success--
if success == 0 {
break
}
for _, disk := range set.getLoadBalancedNDisks(ndisks) {
wg.Add(1)
go func(disk StorageAPI) {
defer wg.Done()
var entryCh chan FileInfo
var err error
if splunk {
entryCh, err = disk.WalkSplunk(GlobalContext, bucket, prefix, marker, endWalkCh)
} else {
entryCh, err = disk.Walk(GlobalContext, bucket, prefix, marker, recursive, endWalkCh)
}
if err != nil {
// Disk walk returned error, ignore it.
return
}
mutex.Lock()
entryChs = append(entryChs, FileInfoCh{
Ch: entryCh,
})
mutex.Unlock()
}(disk)
}
}
wg.Wait()
return entryChs
}

@ -697,7 +697,7 @@ func (z *erasureZones) listObjectsNonSlash(ctx context.Context, bucket, prefix,
for _, zone := range z.zones {
zonesEntryChs = append(zonesEntryChs,
zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, zone.listTolerancePerSet))
zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, zone.listTolerancePerSet, false))
zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet)
}
@ -816,7 +816,7 @@ func (z *erasureZones) listObjectsSplunk(ctx context.Context, bucket, prefix, ma
entryChs, endWalkCh := zone.poolSplunk.Release(listParams{bucket, recursive, marker, prefix})
if entryChs == nil {
endWalkCh = make(chan struct{})
entryChs = zone.startSplunkMergeWalksN(ctx, bucket, prefix, marker, endWalkCh, zone.listTolerancePerSet)
entryChs = zone.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.listTolerancePerSet, true)
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)
@ -908,7 +908,7 @@ func (z *erasureZones) listObjects(ctx context.Context, bucket, prefix, marker,
entryChs, endWalkCh := zone.pool.Release(listParams{bucket, recursive, marker, prefix})
if entryChs == nil {
endWalkCh = make(chan struct{})
entryChs = zone.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.listTolerancePerSet)
entryChs = zone.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.listTolerancePerSet, false)
}
zonesEntryChs = append(zonesEntryChs, entryChs)
zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh)

Loading…
Cancel
Save