From fe3c39b583e86c93960bf7e83a337eda8342985b Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 9 Feb 2021 12:08:25 -0800 Subject: [PATCH] use the new errgroup API whereever applicable (#11466) start using the new errgroup concurrency control API introduced in #11457 --- cmd/bucket-handlers.go | 14 +++++++----- cmd/bucket-listobjects-handlers.go | 36 +++++++++++++----------------- 2 files changed, 23 insertions(+), 27 deletions(-) diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index cfa55b5fb..fb021c6b0 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -126,7 +126,10 @@ func initFederatorBackend(buckets []BucketInfo, objLayer ObjectLayer) { // Add/update buckets that are not registered with the DNS bucketsToBeUpdatedSlice := bucketsToBeUpdated.ToSlice() - g := errgroup.WithNErrs(len(bucketsToBeUpdatedSlice)) + g := errgroup.WithNErrs(len(bucketsToBeUpdatedSlice)).WithConcurrency(50) + ctx, cancel := g.WithCancelOnError(GlobalContext) + defer cancel() + for index := range bucketsToBeUpdatedSlice { index := index g.Go(func() error { @@ -134,14 +137,13 @@ func initFederatorBackend(buckets []BucketInfo, objLayer ObjectLayer) { }, index) } - for _, err := range g.Wait() { - if err != nil { - logger.LogIf(GlobalContext, err) - } + if err := g.WaitErr(); err != nil { + logger.LogIf(ctx, err) + return } for _, bucket := range bucketsInConflict.ToSlice() { - logger.LogIf(GlobalContext, fmt.Errorf("Unable to add bucket DNS entry for bucket %s, an entry exists for the same bucket by a different tenant. This local bucket will be ignored. Bucket names are globally unique in federated deployments. Use path style requests on following addresses '%v' to access this bucket.", bucket, globalDomainIPs.ToSlice())) + logger.LogIf(ctx, fmt.Errorf("Unable to add bucket DNS entry for bucket %s, an entry exists for the same bucket by a different tenant. This local bucket will be ignored. Bucket names are globally unique in federated deployments. Use path style requests on following addresses '%v' to access this bucket", bucket, globalDomainIPs.ToSlice())) } var wg sync.WaitGroup diff --git a/cmd/bucket-listobjects-handlers.go b/cmd/bucket-listobjects-handlers.go index 28e166892..f6be47b8a 100644 --- a/cmd/bucket-listobjects-handlers.go +++ b/cmd/bucket-listobjects-handlers.go @@ -31,27 +31,21 @@ import ( ) func concurrentDecryptETag(ctx context.Context, objects []ObjectInfo) { - inParallel := func(objects []ObjectInfo) { - g := errgroup.WithNErrs(len(objects)) - for index := range objects { - index := index - g.Go(func() error { - objects[index].ETag = objects[index].GetActualETag(nil) - objects[index].Size, _ = objects[index].GetActualSize() - return nil - }, index) - } - g.Wait() - } - const maxConcurrent = 500 - for { - if len(objects) < maxConcurrent { - inParallel(objects) - return - } - inParallel(objects[:maxConcurrent]) - objects = objects[maxConcurrent:] - } + g := errgroup.WithNErrs(len(objects)).WithConcurrency(500) + _, cancel := g.WithCancelOnError(ctx) + defer cancel() + for index := range objects { + index := index + g.Go(func() error { + size, err := objects[index].GetActualSize() + if err == nil { + objects[index].Size = size + } + objects[index].ETag = objects[index].GetActualETag(nil) + return nil + }, index) + } + g.WaitErr() } // Validate all the ListObjects query arguments, returns an APIErrorCode