diff --git a/cmd/server-main.go b/cmd/server-main.go index 64542b7a5..d65cf704c 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -41,6 +41,7 @@ import ( "github.com/minio/minio/pkg/color" "github.com/minio/minio/pkg/env" "github.com/minio/minio/pkg/madmin" + "github.com/minio/minio/pkg/sync/errgroup" ) // ServerFlags - server command specific flags @@ -346,10 +347,22 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) { logger.Info(fmt.Sprintf("Verifying if %d buckets are consistent across drives...", len(buckets))) } } - for _, bucket := range buckets { - if _, err = newObject.HealBucket(ctx, bucket.Name, madmin.HealOpts{Recreate: true}); err != nil { - return fmt.Errorf("Unable to list buckets to heal: %w", err) - } + + // Limit to no more than 50 concurrent buckets. + g := errgroup.WithNErrs(len(buckets)).WithConcurrency(50) + ctx, cancel := g.WithCancelOnError(ctx) + defer cancel() + for index := range buckets { + index := index + g.Go(func() error { + if _, berr := newObject.HealBucket(ctx, buckets[index].Name, madmin.HealOpts{Recreate: true}); berr != nil { + return fmt.Errorf("Unable to list buckets to heal: %w", berr) + } + return nil + }, index) + } + if err := g.WaitErr(); err != nil { + return err } } diff --git a/pkg/sync/errgroup/errgroup.go b/pkg/sync/errgroup/errgroup.go index bf0ce1c7d..74fbabc53 100644 --- a/pkg/sync/errgroup/errgroup.go +++ b/pkg/sync/errgroup/errgroup.go @@ -17,43 +17,116 @@ package errgroup import ( + "context" "sync" + "sync/atomic" ) // A Group is a collection of goroutines working on subtasks that are part of // the same overall task. // -// A zero Group is valid and does not cancel on error. +// A zero Group can be used if errors should not be tracked. type Group struct { - wg sync.WaitGroup - errs []error + wg sync.WaitGroup + bucket chan struct{} + errs []error + firstErr int64 + cancel context.CancelFunc + ctxCancel <-chan struct{} // nil if no context. + ctxErr func() error } // WithNErrs returns a new Group with length of errs slice upto nerrs, // upon Wait() errors are returned collected from all tasks. func WithNErrs(nerrs int) *Group { - return &Group{errs: make([]error, nerrs)} + return &Group{errs: make([]error, nerrs), firstErr: -1} } // Wait blocks until all function calls from the Go method have returned, then // returns the slice of errors from all function calls. func (g *Group) Wait() []error { g.wg.Wait() + if g.cancel != nil { + g.cancel() + } return g.errs } +// WaitErr blocks until all function calls from the Go method have returned, then +// returns the first error returned. +func (g *Group) WaitErr() error { + g.wg.Wait() + if g.cancel != nil { + g.cancel() + } + if g.firstErr >= 0 && len(g.errs) > int(g.firstErr) { + // len(g.errs) > int(g.firstErr) is for then used uninitialized. + return g.errs[g.firstErr] + } + return nil +} + +// WithConcurrency allows to limit the concurrency of the group. +// This must be called before starting any async processes. +// There is no order to which functions are allowed to run. +// If n <= 0 no concurrency limits are enforced. +// g is modified and returned as well. +func (g *Group) WithConcurrency(n int) *Group { + if n <= 0 { + g.bucket = nil + return g + } + + // Fill bucket with tokens + g.bucket = make(chan struct{}, n) + for i := 0; i < n; i++ { + g.bucket <- struct{}{} + } + return g +} + +// WithCancelOnError will return a context that is canceled +// as soon as an error occurs. +// The returned CancelFunc must always be called similar to context.WithCancel. +// If the supplied context is canceled any goroutines waiting for execution are also canceled. +func (g *Group) WithCancelOnError(ctx context.Context) (context.Context, context.CancelFunc) { + ctx, g.cancel = context.WithCancel(ctx) + g.ctxCancel = ctx.Done() + g.ctxErr = ctx.Err + return ctx, g.cancel +} + // Go calls the given function in a new goroutine. // -// The first call to return a non-nil error will be -// collected in errs slice and returned by Wait(). +// The errors will be collected in errs slice and returned by Wait(). func (g *Group) Go(f func() error, index int) { g.wg.Add(1) - go func() { defer g.wg.Done() - + if g.bucket != nil { + // Wait for token + select { + case <-g.bucket: + defer func() { + // Put back token.. + g.bucket <- struct{}{} + }() + case <-g.ctxCancel: + if len(g.errs) > index { + atomic.CompareAndSwapInt64(&g.firstErr, -1, int64(index)) + g.errs[index] = g.ctxErr() + } + return + } + } if err := f(); err != nil { - g.errs[index] = err + if len(g.errs) > index { + atomic.CompareAndSwapInt64(&g.firstErr, -1, int64(index)) + g.errs[index] = err + } + if g.cancel != nil { + g.cancel() + } } }() }