From 71ce63f79c67ed8a2286a3283ad9f4d58deaec83 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 30 Apr 2020 20:23:00 -0700 Subject: [PATCH] fix: background heal to call HealFormat only if needed (#9491) In large setups this avoids unnecessary data transfer across nodes and potential locks. This PR also optimizes heal result channel, which should be avoided for each queueHealTask as its expensive to create/close channels for large number of objects. --- cmd/admin-heal-ops.go | 85 ++++++++++++++++------------- cmd/background-heal-ops.go | 7 +-- cmd/background-newdisks-heal-ops.go | 7 +++ cmd/global-heal.go | 3 +- pkg/madmin/heal-commands.go | 14 +++++ 5 files changed, 73 insertions(+), 43 deletions(-) diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index 4696ff74b..ea991944a 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -308,8 +308,8 @@ func (ahs *allHealState) PopHealStatusJSON(path string, // healSource denotes single entity and heal option. type healSource struct { - path string // entity path (format, buckets, objects) to heal - opts *madmin.HealOpts // optional heal option overrides default setting + path string // entity path (format, buckets, objects) to heal + opts madmin.HealOpts // optional heal option overrides default setting } // healSequence - state for each heal sequence initiated on the @@ -321,9 +321,12 @@ type healSequence struct { // path is just pathJoin(bucket, objPrefix) path string - // List of entities (format, buckets, objects) to heal + // A channel of entities (format, buckets, objects) to heal sourceCh chan healSource + // A channel of entities with heal result + respCh chan healResult + // Report healing progress reportProgress bool @@ -385,6 +388,7 @@ func newHealSequence(bucket, objPrefix, clientAddr string, ctx := logger.SetReqInfo(GlobalContext, reqInfo) return &healSequence{ + respCh: make(chan healResult), bucket: bucket, objPrefix: objPrefix, path: pathJoin(bucket, objPrefix), @@ -636,53 +640,58 @@ func (h *healSequence) healSequenceStart() { } func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItemType) error { - var respCh = make(chan healResult) - defer close(respCh) // Send heal request task := healTask{ path: source.path, - responseCh: respCh, opts: h.settings, + responseCh: h.respCh, } - if source.opts != nil { - task.opts = *source.opts + if !source.opts.Equal(h.settings) { + task.opts = source.opts } globalBackgroundHealRoutine.queueHealTask(task) - // Wait for answer and push result to the client - res := <-respCh - if !h.reportProgress { - h.mutex.Lock() - defer h.mutex.Unlock() - - // Progress is not reported in case of background heal processing. - // Instead we increment relevant counter based on the heal result - // for prometheus reporting. - if res.err != nil && !isErrObjectNotFound(res.err) { - for _, d := range res.result.After.Drives { - // For failed items we report the endpoint and drive state - // This will help users take corrective actions for drives - h.healFailedItemsMap[d.Endpoint+","+d.State]++ + + select { + case res := <-h.respCh: + if !h.reportProgress { + h.mutex.Lock() + defer h.mutex.Unlock() + + // Progress is not reported in case of background heal processing. + // Instead we increment relevant counter based on the heal result + // for prometheus reporting. + if res.err != nil && !isErrObjectNotFound(res.err) { + for _, d := range res.result.After.Drives { + // For failed items we report the endpoint and drive state + // This will help users take corrective actions for drives + h.healFailedItemsMap[d.Endpoint+","+d.State]++ + } + } else { + // Only object type reported for successful healing + h.healedItemsMap[res.result.Type]++ } - } else { - // Only object type reported for successful healing - h.healedItemsMap[res.result.Type]++ - } - return nil - } - res.result.Type = healType - if res.err != nil { - // Object might have been deleted, by the time heal - // was attempted, we should ignore this object and return success. - if isErrObjectNotFound(res.err) { return nil } - // Only report object error - if healType != madmin.HealItemObject { - return res.err + res.result.Type = healType + if res.err != nil { + // Object might have been deleted, by the time heal + // was attempted, we should ignore this object and return success. + if isErrObjectNotFound(res.err) { + return nil + } + // Only report object error + if healType != madmin.HealItemObject { + return res.err + } + res.result.Detail = res.err.Error() } - res.result.Detail = res.err.Error() + return h.pushHealResultItem(res.result) + case <-h.ctx.Done(): + return nil + case <-h.traverseAndHealDoneCh: + return nil } - return h.pushHealResultItem(res.result) + } func (h *healSequence) healItemsFromSourceCh() error { diff --git a/cmd/background-heal-ops.go b/cmd/background-heal-ops.go index 1659d5939..eaea99e83 100644 --- a/cmd/background-heal-ops.go +++ b/cmd/background-heal-ops.go @@ -18,7 +18,6 @@ package cmd import ( "context" - "path" "time" "github.com/minio/minio/cmd/logger" @@ -89,10 +88,10 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) { case bucket != "" && object != "": res, err = objAPI.HealObject(ctx, bucket, object, task.opts) } - ObjectPathUpdated(path.Join(bucket, object)) - if task.responseCh != nil { - task.responseCh <- healResult{result: res, err: err} + if task.path != slashSeparator && task.path != nopHeal { + ObjectPathUpdated(task.path) } + task.responseCh <- healResult{result: res, err: err} case <-h.doneCh: return case <-ctx.Done(): diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index 0f9a5f43a..e95334d36 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -57,6 +57,7 @@ func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) { case <-time.After(defaultMonitorNewDiskInterval): // Attempt a heal as the server starts-up first. localDisksInZoneHeal := make([]Endpoints, len(z.zones)) + var healNewDisks bool for i, ep := range globalEndpoints { localDisksToHeal := Endpoints{} for _, endpoint := range ep.Endpoints { @@ -74,6 +75,12 @@ func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) { continue } localDisksInZoneHeal[i] = localDisksToHeal + healNewDisks = true + } + + // Reformat disks only if needed. + if !healNewDisks { + continue } // Reformat disks diff --git a/cmd/global-heal.go b/cmd/global-heal.go index fb4ef571b..91528810c 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -48,6 +48,7 @@ func newBgHealSequence(numDisks int) *healSequence { return &healSequence{ sourceCh: make(chan healSource), + respCh: make(chan healResult), startTime: UTCNow(), clientToken: bgHealingUUID, settings: hs, @@ -125,7 +126,7 @@ func deepHealObject(objectPath string) { bgSeq.sourceCh <- healSource{ path: objectPath, - opts: &madmin.HealOpts{ScanMode: madmin.HealDeepScan}, + opts: madmin.HealOpts{ScanMode: madmin.HealDeepScan}, } } diff --git a/pkg/madmin/heal-commands.go b/pkg/madmin/heal-commands.go index 47910122e..c417ccd71 100644 --- a/pkg/madmin/heal-commands.go +++ b/pkg/madmin/heal-commands.go @@ -45,6 +45,20 @@ type HealOpts struct { ScanMode HealScanMode `json:"scanMode"` } +// Equal returns true if no is same as o. +func (o HealOpts) Equal(no HealOpts) bool { + if o.Recursive != no.Recursive { + return false + } + if o.DryRun != no.DryRun { + return false + } + if o.Remove != no.Remove { + return false + } + return o.ScanMode == no.ScanMode +} + // HealStartSuccess - holds information about a successfully started // heal operation type HealStartSuccess struct {