From 6c6137b2e79e787b2ca17f96c7d2803fa6b64c35 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 7 Aug 2020 13:22:53 -0700 Subject: [PATCH] add cluster maintenance healthcheck drive heal affinity (#10218) --- cmd/admin-handlers.go | 51 ++++++++----- cmd/admin-heal-ops.go | 19 ++++- cmd/background-heal-ops.go | 3 + cmd/background-newdisks-heal-ops.go | 112 +++++++++++++++++++--------- cmd/erasure-sets.go | 5 ++ cmd/erasure-zones.go | 18 ++++- cmd/global-heal.go | 8 ++ cmd/notification.go | 18 +++-- cmd/xl-storage.go | 23 +----- pkg/madmin/heal-commands.go | 1 + 10 files changed, 178 insertions(+), 80 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 49d67d41a..7699b9f5d 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -799,22 +799,7 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) { keepConnLive(w, r, respCh) } -func (a adminAPIHandlers) BackgroundHealStatusHandler(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "HealBackgroundStatus") - - defer logger.AuditLog(w, r, "HealBackgroundStatus", mustGetClaimsFromToken(r)) - - objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.HealAdminAction) - if objectAPI == nil { - return - } - - // Check if this setup has an erasure coded backend. - if !globalIsErasure { - writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrHealNotImplemented), r.URL) - return - } - +func getAggregatedBackgroundHealState(ctx context.Context, failOnErr bool) (madmin.BgHealState, error) { var bgHealStates []madmin.BgHealState // Get local heal status first @@ -822,7 +807,15 @@ func (a adminAPIHandlers) BackgroundHealStatusHandler(w http.ResponseWriter, r * if globalIsDistErasure { // Get heal status from other peers - peersHealStates := globalNotificationSys.BackgroundHealStatus() + peersHealStates, nerrs := globalNotificationSys.BackgroundHealStatus() + for _, nerr := range nerrs { + if nerr.Err != nil { + if failOnErr { + return madmin.BgHealState{}, nerr.Err + } + logger.LogIf(ctx, nerr.Err) + } + } bgHealStates = append(bgHealStates, peersHealStates...) } @@ -831,12 +824,14 @@ func (a adminAPIHandlers) BackgroundHealStatusHandler(w http.ResponseWriter, r * ScannedItemsCount: bgHealStates[0].ScannedItemsCount, LastHealActivity: bgHealStates[0].LastHealActivity, NextHealRound: bgHealStates[0].NextHealRound, + HealDisks: bgHealStates[0].HealDisks, } bgHealStates = bgHealStates[1:] for _, state := range bgHealStates { aggregatedHealStateResult.ScannedItemsCount += state.ScannedItemsCount + aggregatedHealStateResult.HealDisks = append(aggregatedHealStateResult.HealDisks, state.HealDisks...) if !state.LastHealActivity.IsZero() && aggregatedHealStateResult.LastHealActivity.Before(state.LastHealActivity) { aggregatedHealStateResult.LastHealActivity = state.LastHealActivity // The node which has the last heal activity means its @@ -847,7 +842,27 @@ func (a adminAPIHandlers) BackgroundHealStatusHandler(w http.ResponseWriter, r * } } - if err := json.NewEncoder(w).Encode(aggregatedHealStateResult); err != nil { + return aggregatedHealStateResult, nil +} + +func (a adminAPIHandlers) BackgroundHealStatusHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "HealBackgroundStatus") + + defer logger.AuditLog(w, r, "HealBackgroundStatus", mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.HealAdminAction) + if objectAPI == nil { + return + } + + // Check if this setup has an erasure coded backend. + if !globalIsErasure { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrHealNotImplemented), r.URL) + return + } + + aggregateHealStateResult, _ := getAggregatedBackgroundHealState(r.Context(), false) + if err := json.NewEncoder(w).Encode(aggregateHealStateResult); err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index 914686a3e..0fa655fa7 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -88,7 +88,8 @@ type allHealState struct { sync.Mutex // map of heal path to heal sequence - healSeqMap map[string]*healSequence + healSeqMap map[string]*healSequence + healLocalDisks []Endpoints } // newHealState - initialize global heal state management @@ -102,6 +103,22 @@ func newHealState() *allHealState { return healState } +func (ahs *allHealState) getHealLocalDisks() []Endpoints { + ahs.Lock() + defer ahs.Unlock() + + healLocalDisks := make([]Endpoints, len(ahs.healLocalDisks)) + copy(healLocalDisks, ahs.healLocalDisks) + return healLocalDisks +} + +func (ahs *allHealState) updateHealLocalDisks(eps []Endpoints) { + ahs.Lock() + defer ahs.Unlock() + + ahs.healLocalDisks = eps +} + func (ahs *allHealState) periodicHealSeqsClean(ctx context.Context) { // Launch clean-up routine to remove this heal sequence (after // it ends) from the global state after timeout has elapsed. diff --git a/cmd/background-heal-ops.go b/cmd/background-heal-ops.go index 0c9033a8c..beef71d62 100644 --- a/cmd/background-heal-ops.go +++ b/cmd/background-heal-ops.go @@ -90,6 +90,9 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) { case task.bucket == nopHeal: continue case task.bucket == SlashSeparator: + // Quickly check if drives need healing upon start-up + globalBackgroundHealState.updateHealLocalDisks(getLocalDisksToHeal(objAPI)) + res, err = healDiskFormat(ctx, objAPI, task.opts) case task.bucket != "" && task.object == "": res, err = objAPI.HealBucket(ctx, task.bucket, task.opts.DryRun, task.opts.Remove) diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index bbd2c7f94..54ad1a2f5 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -18,6 +18,7 @@ package cmd import ( "context" + "fmt" "time" "github.com/dustin/go-humanize" @@ -30,6 +31,46 @@ func initLocalDisksAutoHeal(ctx context.Context, objAPI ObjectLayer) { go monitorLocalDisksAndHeal(ctx, objAPI) } +func getLocalDisksToHeal(objAPI ObjectLayer) []Endpoints { + z, ok := objAPI.(*erasureZones) + if !ok { + return nil + } + + // Attempt a heal as the server starts-up first. + localDisksInZoneHeal := make([]Endpoints, len(z.zones)) + for i, ep := range globalEndpoints { + localDisksToHeal := Endpoints{} + for _, endpoint := range ep.Endpoints { + if !endpoint.IsLocal { + continue + } + // Try to connect to the current endpoint + // and reformat if the current disk is not formatted + _, _, err := connectEndpoint(endpoint) + if err == errUnformattedDisk { + localDisksToHeal = append(localDisksToHeal, endpoint) + } + } + if len(localDisksToHeal) == 0 { + continue + } + localDisksInZoneHeal[i] = localDisksToHeal + } + return localDisksInZoneHeal + +} + +func getDrivesToHealCount(localDisksInZoneHeal []Endpoints) int { + var drivesToHeal int + for _, eps := range localDisksInZoneHeal { + for range eps { + drivesToHeal++ + } + } + return drivesToHeal +} + // monitorLocalDisksAndHeal - ensures that detected new disks are healed // 1. Only the concerned erasure set will be listed and healed // 2. Only the node hosting the disk is responsible to perform the heal @@ -50,53 +91,43 @@ func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) { time.Sleep(time.Second) } + localDisksInZoneHeal := globalBackgroundHealState.getHealLocalDisks() + + drivesToHeal := getDrivesToHealCount(localDisksInZoneHeal) + if drivesToHeal != 0 { + logger.Info(fmt.Sprintf("Found drives to heal %d, waiting until %s to heal the content...", + drivesToHeal, defaultMonitorNewDiskInterval)) + } + + firstTime := true + // Perform automatic disk healing when a disk is replaced locally. for { select { case <-ctx.Done(): return case <-time.After(defaultMonitorNewDiskInterval): - // Attempt a heal as the server starts-up first. - localDisksInZoneHeal := make([]Endpoints, len(z.zones)) - var healNewDisks bool - for i, ep := range globalEndpoints { - localDisksToHeal := Endpoints{} - for _, endpoint := range ep.Endpoints { - if !endpoint.IsLocal { - continue - } - // Try to connect to the current endpoint - // and reformat if the current disk is not formatted - _, _, err := connectEndpoint(endpoint) - if err == errUnformattedDisk { - localDisksToHeal = append(localDisksToHeal, endpoint) - } - } - if len(localDisksToHeal) == 0 { + // heal only if new disks found. + if drivesToHeal == 0 { + firstTime = false + localDisksInZoneHeal = getLocalDisksToHeal(z) + drivesToHeal = getDrivesToHealCount(localDisksInZoneHeal) + if drivesToHeal == 0 { + // No drives to heal. + globalBackgroundHealState.updateHealLocalDisks(nil) continue } - localDisksInZoneHeal[i] = localDisksToHeal - healNewDisks = true + globalBackgroundHealState.updateHealLocalDisks(localDisksInZoneHeal) } - // Reformat disks only if needed. - if !healNewDisks { - continue - } + if !firstTime { + // Reformat disks + bgSeq.sourceCh <- healSource{bucket: SlashSeparator} - logger.Info("New unformatted drives detected attempting to heal...") - for i, disks := range localDisksInZoneHeal { - for _, disk := range disks { - logger.Info("Healing disk '%s' on %s zone", disk, humanize.Ordinal(i+1)) - } + // Ensure that reformatting disks is finished + bgSeq.sourceCh <- healSource{bucket: nopHeal} } - // Reformat disks - bgSeq.sourceCh <- healSource{bucket: SlashSeparator} - - // Ensure that reformatting disks is finished - bgSeq.sourceCh <- healSource{bucket: nopHeal} - var erasureSetInZoneToHeal = make([][]int, len(localDisksInZoneHeal)) // Compute the list of erasure set to heal for i, localDisksToHeal := range localDisksInZoneHeal { @@ -108,6 +139,7 @@ func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) { printEndpointError(endpoint, err, true) continue } + // Calculate the set index where the current endpoint belongs setIndex, _, err := findDiskIndex(z.zones[i].format, format) if err != nil { @@ -120,6 +152,13 @@ func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) { erasureSetInZoneToHeal[i] = erasureSetToHeal } + logger.Info("New unformatted drives detected attempting to heal the content...") + for i, disks := range localDisksInZoneHeal { + for _, disk := range disks { + logger.Info("Healing disk '%s' on %s zone", disk, humanize.Ordinal(i+1)) + } + } + // Heal all erasure sets that need for i, erasureSetToHeal := range erasureSetInZoneToHeal { for _, setIndex := range erasureSetToHeal { @@ -127,6 +166,11 @@ func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) { if err != nil { logger.LogIf(ctx, err) } + + // Only upon success reduce the counter + if err == nil { + drivesToHeal-- + } } } } diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index ba5fe4f59..ca92b47e0 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -1414,6 +1414,11 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H return madmin.HealResultItem{}, err } + refFormat, err = getFormatErasureInQuorum(tmpNewFormats) + if err != nil { + return madmin.HealResultItem{}, err + } + // kill the monitoring loop such that we stop writing // to indicate that we will re-initialize everything // with new format. diff --git a/cmd/erasure-zones.go b/cmd/erasure-zones.go index 7738fdd6d..e33aef92c 100644 --- a/cmd/erasure-zones.go +++ b/cmd/erasure-zones.go @@ -2032,6 +2032,7 @@ type HealthOptions struct { // was queried type HealthResult struct { Healthy bool + HealingDrives int ZoneID, SetID int WriteQuorum int } @@ -2086,8 +2087,23 @@ func (z *erasureZones) Health(ctx context.Context, opts HealthOptions) HealthRes } } } + + // check if local disks are being healed, if they are being healed + // we need to tell healthy status as 'false' so that this server + // is not taken down for maintenance + aggHealStateResult, err := getAggregatedBackgroundHealState(ctx, true) + if err != nil { + logger.LogIf(ctx, fmt.Errorf("Unable to verify global heal status: %w", err)) + return HealthResult{ + Healthy: false, + } + } + + healthy := len(aggHealStateResult.HealDisks) == 0 + return HealthResult{ - Healthy: true, + Healthy: healthy, + HealingDrives: len(aggHealStateResult.HealDisks), } } diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 2659bc9cf..f60c9e0d1 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -73,9 +73,17 @@ func getLocalBackgroundHealStatus() madmin.BgHealState { return madmin.BgHealState{} } + var healDisks []string + for _, eps := range globalBackgroundHealState.getHealLocalDisks() { + for _, ep := range eps { + healDisks = append(healDisks, ep.String()) + } + } + return madmin.BgHealState{ ScannedItemsCount: bgSeq.getScannedItemsCount(), LastHealActivity: bgSeq.lastHealActivity, + HealDisks: healDisks, NextHealRound: UTCNow().Add(durationToNextHealRound(bgSeq.lastHealActivity)), } } diff --git a/cmd/notification.go b/cmd/notification.go index d36ee8acc..337ee1651 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -269,21 +269,25 @@ func (sys *NotificationSys) LoadServiceAccount(accessKey string) []NotificationP } // BackgroundHealStatus - returns background heal status of all peers -func (sys *NotificationSys) BackgroundHealStatus() []madmin.BgHealState { +func (sys *NotificationSys) BackgroundHealStatus() ([]madmin.BgHealState, []NotificationPeerErr) { + ng := WithNPeers(len(sys.peerClients)) states := make([]madmin.BgHealState, len(sys.peerClients)) for idx, client := range sys.peerClients { if client == nil { continue } - st, err := client.BackgroundHealStatus() - if err != nil { - logger.LogIf(GlobalContext, err) - } else { + client := client + ng.Go(GlobalContext, func() error { + st, err := client.BackgroundHealStatus() + if err != nil { + return err + } states[idx] = st - } + return nil + }, idx, *client.host) } - return states + return states, ng.Wait() } // StartProfiling - start profiling on remote peers, by initiating a remote RPC. diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 03f4c3099..0083fa178 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -2103,9 +2103,6 @@ func (s *xlStorage) RenameData(srcVolume, srcPath, dataDir, dstVolume, dstPath s legacyDataPath := pathJoin(dstVolumeDir, dstPath, legacyDataDir) // legacy data dir means its old content, honor system umask. if err = os.MkdirAll(legacyDataPath, 0777); err != nil { - if isSysErrIO(err) { - return errFaultyDisk - } return osErrToFileErr(err) } @@ -2116,14 +2113,11 @@ func (s *xlStorage) RenameData(srcVolume, srcPath, dataDir, dstVolume, dstPath s for _, entry := range entries { // Skip xl.meta renames further, also ignore any directories such as `legacyDataDir` - if entry == xlStorageFormatFile || strings.HasPrefix(entry, SlashSeparator) { + if entry == xlStorageFormatFile || strings.HasSuffix(entry, slashSeparator) { continue } if err = os.Rename(pathJoin(currentDataPath, entry), pathJoin(legacyDataPath, entry)); err != nil { - if isSysErrIO(err) { - return errFaultyDisk - } return osErrToFileErr(err) } } @@ -2159,20 +2153,14 @@ func (s *xlStorage) RenameData(srcVolume, srcPath, dataDir, dstVolume, dstPath s } if err = renameAll(srcFilePath, dstFilePath); err != nil { - if isSysErrIO(err) { - return errFaultyDisk - } - return err + return osErrToFileErr(err) } if srcDataPath != "" { removeAll(oldDstDataPath) removeAll(dstDataPath) if err = renameAll(srcDataPath, dstDataPath); err != nil { - if isSysErrIO(err) { - return errFaultyDisk - } - return err + return osErrToFileErr(err) } } @@ -2265,10 +2253,7 @@ func (s *xlStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (e } if err = renameAll(srcFilePath, dstFilePath); err != nil { - if isSysErrIO(err) { - return errFaultyDisk - } - return err + return osErrToFileErr(err) } // Remove parent dir of the source file if empty diff --git a/pkg/madmin/heal-commands.go b/pkg/madmin/heal-commands.go index d5a22c10b..689c5a848 100644 --- a/pkg/madmin/heal-commands.go +++ b/pkg/madmin/heal-commands.go @@ -294,6 +294,7 @@ type BgHealState struct { ScannedItemsCount int64 LastHealActivity time.Time NextHealRound time.Time + HealDisks []string } // BackgroundHealStatus returns the background heal status of the