use the new errgroup API whereever applicable (#11466)

start using the new errgroup concurrency control
API introduced in #11457
master
Harshavardhana 4 years ago committed by GitHub
parent 84d400487f
commit fe3c39b583
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      cmd/bucket-handlers.go
  2. 36
      cmd/bucket-listobjects-handlers.go

@ -126,7 +126,10 @@ func initFederatorBackend(buckets []BucketInfo, objLayer ObjectLayer) {
// Add/update buckets that are not registered with the DNS // Add/update buckets that are not registered with the DNS
bucketsToBeUpdatedSlice := bucketsToBeUpdated.ToSlice() bucketsToBeUpdatedSlice := bucketsToBeUpdated.ToSlice()
g := errgroup.WithNErrs(len(bucketsToBeUpdatedSlice)) g := errgroup.WithNErrs(len(bucketsToBeUpdatedSlice)).WithConcurrency(50)
ctx, cancel := g.WithCancelOnError(GlobalContext)
defer cancel()
for index := range bucketsToBeUpdatedSlice { for index := range bucketsToBeUpdatedSlice {
index := index index := index
g.Go(func() error { g.Go(func() error {
@ -134,14 +137,13 @@ func initFederatorBackend(buckets []BucketInfo, objLayer ObjectLayer) {
}, index) }, index)
} }
for _, err := range g.Wait() { if err := g.WaitErr(); err != nil {
if err != nil { logger.LogIf(ctx, err)
logger.LogIf(GlobalContext, err) return
}
} }
for _, bucket := range bucketsInConflict.ToSlice() { for _, bucket := range bucketsInConflict.ToSlice() {
logger.LogIf(GlobalContext, fmt.Errorf("Unable to add bucket DNS entry for bucket %s, an entry exists for the same bucket by a different tenant. This local bucket will be ignored. Bucket names are globally unique in federated deployments. Use path style requests on following addresses '%v' to access this bucket.", bucket, globalDomainIPs.ToSlice())) logger.LogIf(ctx, fmt.Errorf("Unable to add bucket DNS entry for bucket %s, an entry exists for the same bucket by a different tenant. This local bucket will be ignored. Bucket names are globally unique in federated deployments. Use path style requests on following addresses '%v' to access this bucket", bucket, globalDomainIPs.ToSlice()))
} }
var wg sync.WaitGroup var wg sync.WaitGroup

@ -31,27 +31,21 @@ import (
) )
func concurrentDecryptETag(ctx context.Context, objects []ObjectInfo) { func concurrentDecryptETag(ctx context.Context, objects []ObjectInfo) {
inParallel := func(objects []ObjectInfo) { g := errgroup.WithNErrs(len(objects)).WithConcurrency(500)
g := errgroup.WithNErrs(len(objects)) _, cancel := g.WithCancelOnError(ctx)
for index := range objects { defer cancel()
index := index for index := range objects {
g.Go(func() error { index := index
objects[index].ETag = objects[index].GetActualETag(nil) g.Go(func() error {
objects[index].Size, _ = objects[index].GetActualSize() size, err := objects[index].GetActualSize()
return nil if err == nil {
}, index) objects[index].Size = size
} }
g.Wait() objects[index].ETag = objects[index].GetActualETag(nil)
} return nil
const maxConcurrent = 500 }, index)
for { }
if len(objects) < maxConcurrent { g.WaitErr()
inParallel(objects)
return
}
inParallel(objects[:maxConcurrent])
objects = objects[maxConcurrent:]
}
} }
// Validate all the ListObjects query arguments, returns an APIErrorCode // Validate all the ListObjects query arguments, returns an APIErrorCode

Loading…
Cancel
Save