re-attach offline drive after new drive replacement (#10416)

inconsistent drive healing when one of the drive is offline
while a new drive was replaced, this change is to ensure
that we can add the offline drive back into the mix by
healing it again.
master
Harshavardhana 4 years ago committed by GitHub
parent eb19c8af40
commit b0e1d4ce78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 38
      cmd/admin-heal-ops.go
  2. 134
      cmd/background-newdisks-heal-ops.go
  3. 16
      cmd/endpoint.go
  4. 64
      cmd/erasure-sets.go
  5. 43
      cmd/erasure-zones.go
  6. 11
      cmd/global-heal.go
  7. 4
      cmd/healthcheck-handler.go

@ -89,13 +89,14 @@ type allHealState struct {
// map of heal path to heal sequence
healSeqMap map[string]*healSequence
healLocalDisks []Endpoints
healLocalDisks map[Endpoint]struct{}
}
// newHealState - initialize global heal state management
func newHealState() *allHealState {
healState := &allHealState{
healSeqMap: make(map[string]*healSequence),
healSeqMap: make(map[string]*healSequence),
healLocalDisks: map[Endpoint]struct{}{},
}
go healState.periodicHealSeqsClean(GlobalContext)
@ -103,20 +104,43 @@ func newHealState() *allHealState {
return healState
}
func (ahs *allHealState) getHealLocalDisks() []Endpoints {
func (ahs *allHealState) healDriveCount() int {
ahs.Lock()
defer ahs.Unlock()
healLocalDisks := make([]Endpoints, len(ahs.healLocalDisks))
copy(healLocalDisks, ahs.healLocalDisks)
fmt.Println(ahs.healLocalDisks)
return len(ahs.healLocalDisks)
}
func (ahs *allHealState) getHealLocalDisks() Endpoints {
ahs.Lock()
defer ahs.Unlock()
var healLocalDisks Endpoints
for ep := range ahs.healLocalDisks {
healLocalDisks = append(healLocalDisks, ep)
}
return healLocalDisks
}
func (ahs *allHealState) updateHealLocalDisks(healLocalDisks []Endpoints) {
func (ahs *allHealState) popHealLocalDisks(healLocalDisks ...Endpoint) {
ahs.Lock()
defer ahs.Unlock()
ahs.healLocalDisks = healLocalDisks
for _, ep := range healLocalDisks {
delete(ahs.healLocalDisks, ep)
}
fmt.Println(ahs.healLocalDisks)
}
func (ahs *allHealState) pushHealLocalDisks(healLocalDisks ...Endpoint) {
ahs.Lock()
defer ahs.Unlock()
for _, ep := range healLocalDisks {
ahs.healLocalDisks[ep] = struct{}{}
}
fmt.Println(ahs.healLocalDisks)
}
func (ahs *allHealState) periodicHealSeqsClean(ctx context.Context) {

@ -26,7 +26,7 @@ import (
"github.com/minio/minio/cmd/logger"
)
const defaultMonitorNewDiskInterval = time.Minute * 3
const defaultMonitorNewDiskInterval = time.Second * 10
func initAutoHeal(ctx context.Context, objAPI ObjectLayer) {
z, ok := objAPI.(*erasureZones)
@ -36,15 +36,6 @@ func initAutoHeal(ctx context.Context, objAPI ObjectLayer) {
initBackgroundHealing(ctx, objAPI) // start quick background healing
localDisksInZoneHeal := getLocalDisksToHeal(objAPI)
globalBackgroundHealState.updateHealLocalDisks(localDisksInZoneHeal)
drivesToHeal := getDrivesToHealCount(localDisksInZoneHeal)
if drivesToHeal != 0 {
logger.Info(fmt.Sprintf("Found drives to heal %d, waiting until %s to heal the content...",
drivesToHeal, defaultMonitorNewDiskInterval))
}
var bgSeq *healSequence
var found bool
@ -56,7 +47,14 @@ func initAutoHeal(ctx context.Context, objAPI ObjectLayer) {
time.Sleep(time.Second)
}
if drivesToHeal != 0 {
for _, ep := range getLocalDisksToHeal() {
globalBackgroundHealState.pushHealLocalDisks(ep)
}
if drivesToHeal := globalBackgroundHealState.healDriveCount(); drivesToHeal > 0 {
logger.Info(fmt.Sprintf("Found drives to heal %d, waiting until %s to heal the content...",
drivesToHeal, defaultMonitorNewDiskInterval))
// Heal any disk format and metadata early, if possible.
if err := bgSeq.healDiskMeta(); err != nil {
if newObjectLayerFn() != nil {
@ -67,19 +65,11 @@ func initAutoHeal(ctx context.Context, objAPI ObjectLayer) {
}
}
go monitorLocalDisksAndHeal(ctx, z, drivesToHeal, localDisksInZoneHeal, bgSeq)
go monitorLocalDisksAndHeal(ctx, z, bgSeq)
}
func getLocalDisksToHeal(objAPI ObjectLayer) []Endpoints {
z, ok := objAPI.(*erasureZones)
if !ok {
return nil
}
// Attempt a heal as the server starts-up first.
localDisksInZoneHeal := make([]Endpoints, len(z.zones))
for i, ep := range globalEndpoints {
localDisksToHeal := Endpoints{}
func getLocalDisksToHeal() (disksToHeal Endpoints) {
for _, ep := range globalEndpoints {
for _, endpoint := range ep.Endpoints {
if !endpoint.IsLocal {
continue
@ -88,28 +78,14 @@ func getLocalDisksToHeal(objAPI ObjectLayer) []Endpoints {
// and reformat if the current disk is not formatted
_, _, err := connectEndpoint(endpoint)
if errors.Is(err, errUnformattedDisk) {
localDisksToHeal = append(localDisksToHeal, endpoint)
disksToHeal = append(disksToHeal, endpoint)
}
}
if len(localDisksToHeal) == 0 {
continue
}
localDisksInZoneHeal[i] = localDisksToHeal
}
return localDisksInZoneHeal
return disksToHeal
}
func getDrivesToHealCount(localDisksInZoneHeal []Endpoints) int {
var drivesToHeal int
for _, eps := range localDisksInZoneHeal {
for range eps {
drivesToHeal++
}
}
return drivesToHeal
}
func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
// Run the background healer
globalBackgroundHealRoutine = newHealRoutine()
@ -121,77 +97,65 @@ func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
// monitorLocalDisksAndHeal - ensures that detected new disks are healed
// 1. Only the concerned erasure set will be listed and healed
// 2. Only the node hosting the disk is responsible to perform the heal
func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, drivesToHeal int, localDisksInZoneHeal []Endpoints, bgSeq *healSequence) {
func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, bgSeq *healSequence) {
// Perform automatic disk healing when a disk is replaced locally.
for {
select {
case <-ctx.Done():
return
case <-time.After(defaultMonitorNewDiskInterval):
// heal only if new disks found.
if drivesToHeal == 0 {
localDisksInZoneHeal = getLocalDisksToHeal(z)
drivesToHeal = getDrivesToHealCount(localDisksInZoneHeal)
if drivesToHeal == 0 {
// No drives to heal.
globalBackgroundHealState.updateHealLocalDisks(nil)
continue
}
globalBackgroundHealState.updateHealLocalDisks(localDisksInZoneHeal)
waitForLowHTTPReq(int32(globalEndpoints.NEndpoints()), time.Second)
var erasureSetInZoneEndpointToHeal = make([]map[int]Endpoint, len(z.zones))
for i := range z.zones {
erasureSetInZoneEndpointToHeal[i] = map[int]Endpoint{}
}
healDisks := globalBackgroundHealState.getHealLocalDisks()
// heal only if new disks found.
for _, endpoint := range healDisks {
logger.Info(fmt.Sprintf("Found drives to heal %d, proceeding to heal content...",
drivesToHeal))
len(healDisks)))
// Reformat disks
bgSeq.sourceCh <- healSource{bucket: SlashSeparator}
// Ensure that reformatting disks is finished
bgSeq.sourceCh <- healSource{bucket: nopHeal}
}
var erasureSetInZoneToHeal = make([][]int, len(localDisksInZoneHeal))
// Compute the list of erasure set to heal
for i, localDisksToHeal := range localDisksInZoneHeal {
var erasureSetToHeal []int
for _, endpoint := range localDisksToHeal {
// Load the new format of this passed endpoint
_, format, err := connectEndpoint(endpoint)
if err != nil {
printEndpointError(endpoint, err, true)
continue
}
// Calculate the set index where the current endpoint belongs
setIndex, _, err := findDiskIndex(z.zones[i].format, format)
if err != nil {
printEndpointError(endpoint, err, false)
continue
}
// Load the new format of this passed endpoint
_, format, err := connectEndpoint(endpoint)
if err != nil {
printEndpointError(endpoint, err, true)
continue
}
erasureSetToHeal = append(erasureSetToHeal, setIndex)
zoneIdx := globalEndpoints.GetLocalZoneIdx(endpoint)
if zoneIdx < 0 {
continue
}
erasureSetInZoneToHeal[i] = erasureSetToHeal
}
logger.Info("New unformatted drives detected attempting to heal the content...")
for i, disks := range localDisksInZoneHeal {
for _, disk := range disks {
logger.Info("Healing disk '%s' on %s zone", disk, humanize.Ordinal(i+1))
// Calculate the set index where the current endpoint belongs
setIndex, _, err := findDiskIndex(z.zones[zoneIdx].format, format)
if err != nil {
printEndpointError(endpoint, err, false)
continue
}
erasureSetInZoneEndpointToHeal[zoneIdx][setIndex] = endpoint
}
// Heal all erasure sets that need
for i, erasureSetToHeal := range erasureSetInZoneToHeal {
for _, setIndex := range erasureSetToHeal {
err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex], z.zones[i].setDriveCount)
if err != nil {
for i, setMap := range erasureSetInZoneEndpointToHeal {
for setIndex, endpoint := range setMap {
logger.Info("Healing disk '%s' on %s zone", endpoint, humanize.Ordinal(i+1))
if err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex], z.zones[i].setDriveCount); err != nil {
logger.LogIf(ctx, err)
continue
}
// Only upon success reduce the counter
if err == nil {
drivesToHeal--
}
// Only upon success pop the healed disk.
globalBackgroundHealState.popHealLocalDisks(endpoint)
}
}
}

@ -24,6 +24,7 @@ import (
"net/url"
"path"
"path/filepath"
"reflect"
"runtime"
"strconv"
"strings"
@ -203,6 +204,21 @@ type ZoneEndpoints struct {
// EndpointZones - list of list of endpoints
type EndpointZones []ZoneEndpoints
// GetLocalZoneIdx returns the zone which endpoint belongs to locally.
// if ep is remote this code will return -1 zoneIndex
func (l EndpointZones) GetLocalZoneIdx(ep Endpoint) int {
for i, zep := range l {
for _, cep := range zep.Endpoints {
if cep.IsLocal && ep.IsLocal {
if reflect.DeepEqual(cep, ep) {
return i
}
}
}
}
return -1
}
// Add add zone endpoints
func (l *EndpointZones) Add(zeps ZoneEndpoints) error {
existSet := set.NewStringSet()

@ -137,13 +137,10 @@ func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatErasureV3, error) {
format, err := loadFormatErasure(disk)
if err != nil {
// Close the internal connection to avoid connection leaks.
disk.Close()
if errors.Is(err, errUnformattedDisk) {
info, derr := disk.DiskInfo(context.TODO())
if derr != nil && info.RootDisk {
return nil, nil, fmt.Errorf("Disk: %s returned %w but its a root disk refusing to use it",
disk, derr) // make sure to '%w' to wrap the error
return nil, nil, fmt.Errorf("Disk: %s returned %w", disk, derr) // make sure to '%w' to wrap the error
}
}
return nil, nil, fmt.Errorf("Disk: %s returned %w", disk, err) // make sure to '%w' to wrap the error
@ -213,14 +210,22 @@ func (s *erasureSets) connectDisks() {
defer wg.Done()
disk, format, err := connectEndpoint(endpoint)
if err != nil {
printEndpointError(endpoint, err, true)
if endpoint.IsLocal && errors.Is(err, errUnformattedDisk) {
logger.Info(fmt.Sprintf("Found unformatted drive %s, attempting to heal...", endpoint))
globalBackgroundHealState.pushHealLocalDisks(endpoint)
} else {
printEndpointError(endpoint, err, true)
}
return
}
setIndex, diskIndex, err := findDiskIndex(s.format, format)
if err != nil {
// Close the internal connection to avoid connection leaks.
disk.Close()
printEndpointError(endpoint, err, false)
if endpoint.IsLocal {
globalBackgroundHealState.pushHealLocalDisks(endpoint)
logger.Info(fmt.Sprintf("Found inconsistent drive %s with format.json, attempting to heal...", endpoint))
} else {
printEndpointError(endpoint, err, false)
}
return
}
disk.SetDiskID(format.Erasure.This)
@ -291,7 +296,9 @@ func (s *erasureSets) GetDisks(setIndex int) func() []StorageAPI {
}
}
const defaultMonitorConnectEndpointInterval = time.Second * 10 // Set to 10 secs.
// defaultMonitorConnectEndpointInterval is the interval to monitor endpoint connections.
// Must be bigger than defaultMonitorNewDiskInterval.
const defaultMonitorConnectEndpointInterval = defaultMonitorNewDiskInterval + time.Second*5
// Initialize new set of erasure coded sets.
func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageAPI, format *formatErasureV3) (*erasureSets, error) {
@ -342,12 +349,10 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
}
diskID, derr := disk.GetDiskID()
if derr != nil {
disk.Close()
continue
}
m, n, err := findDiskIndexByDiskID(format, diskID)
if err != nil {
disk.Close()
continue
}
s.endpointStrings[m*setDriveCount+n] = disk.String()
@ -1218,13 +1223,11 @@ func (s *erasureSets) ReloadFormat(ctx context.Context, dryRun bool) (err error)
diskID, err := disk.GetDiskID()
if err != nil {
disk.Close()
continue
}
m, n, err := findDiskIndexByDiskID(refFormat, diskID)
if err != nil {
disk.Close()
continue
}
@ -1248,17 +1251,14 @@ func (s *erasureSets) ReloadFormat(ctx context.Context, dryRun bool) (err error)
func isTestSetup(infos []DiskInfo, errs []error) bool {
rootDiskCount := 0
for i := range errs {
if errs[i] != nil && errs[i] != errUnformattedDisk {
// On any error which is not unformatted disk
// it is safer to reject healing.
return false
}
if infos[i].RootDisk {
rootDiskCount++
if errs[i] == nil || errs[i] == errUnformattedDisk {
if infos[i].RootDisk {
rootDiskCount++
}
}
}
// It is a test setup if all disks are root disks.
return rootDiskCount == len(infos)
// It is a test setup if all disks are root disks in quorum.
return rootDiskCount >= len(infos)/2+1
}
func getHealDiskInfos(storageDisks []StorageAPI, errs []error) ([]DiskInfo, []error) {
@ -1321,6 +1321,19 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
// Mark all root disks down
markRootDisksAsDown(storageDisks, sErrs)
refFormat, err := getFormatErasureInQuorum(formats)
if err != nil {
return res, err
}
for i, format := range formats {
if format != nil {
if ferr := formatErasureV3Check(refFormat, format); ferr != nil {
sErrs[i] = errUnformattedDisk
}
}
}
// Prepare heal-result
res = madmin.HealResultItem{
Type: madmin.HealItemMetadata,
@ -1346,11 +1359,6 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
return res, errNoHealRequired
}
refFormat, err := getFormatErasureInQuorum(formats)
if err != nil {
return res, err
}
// Mark all UUIDs which might be offline, use list
// of formats to mark them appropriately.
markUUIDsOffline(refFormat, formats)
@ -1424,13 +1432,11 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
diskID, err := disk.GetDiskID()
if err != nil {
disk.Close()
continue
}
m, n, err := findDiskIndexByDiskID(refFormat, diskID)
if err != nil {
disk.Close()
continue
}

@ -2056,6 +2056,25 @@ func (z *erasureZones) Health(ctx context.Context, opts HealthOptions) HealthRes
writeQuorum++
}
var aggHealStateResult madmin.BgHealState
if opts.Maintenance {
// check if local disks are being healed, if they are being healed
// we need to tell healthy status as 'false' so that this server
// is not taken down for maintenance
var err error
aggHealStateResult, err = getAggregatedBackgroundHealState(ctx)
if err != nil {
logger.LogIf(logger.SetReqInfo(ctx, reqInfo), fmt.Errorf("Unable to verify global heal status: %w", err))
return HealthResult{
Healthy: false,
}
}
if len(aggHealStateResult.HealDisks) > 0 {
logger.LogIf(logger.SetReqInfo(ctx, reqInfo), fmt.Errorf("Total drives to be healed %d", len(aggHealStateResult.HealDisks)))
}
}
for zoneIdx := range erasureSetUpCount {
for setIdx := range erasureSetUpCount[zoneIdx] {
if erasureSetUpCount[zoneIdx][setIdx] < writeQuorum {
@ -2063,10 +2082,11 @@ func (z *erasureZones) Health(ctx context.Context, opts HealthOptions) HealthRes
fmt.Errorf("Write quorum may be lost on zone: %d, set: %d, expected write quorum: %d",
zoneIdx, setIdx, writeQuorum))
return HealthResult{
Healthy: false,
ZoneID: zoneIdx,
SetID: setIdx,
WriteQuorum: writeQuorum,
Healthy: false,
HealingDrives: len(aggHealStateResult.HealDisks),
ZoneID: zoneIdx,
SetID: setIdx,
WriteQuorum: writeQuorum,
}
}
}
@ -2081,21 +2101,6 @@ func (z *erasureZones) Health(ctx context.Context, opts HealthOptions) HealthRes
}
}
// check if local disks are being healed, if they are being healed
// we need to tell healthy status as 'false' so that this server
// is not taken down for maintenance
aggHealStateResult, err := getAggregatedBackgroundHealState(ctx)
if err != nil {
logger.LogIf(logger.SetReqInfo(ctx, reqInfo), fmt.Errorf("Unable to verify global heal status: %w", err))
return HealthResult{
Healthy: false,
}
}
if len(aggHealStateResult.HealDisks) > 0 {
logger.LogIf(logger.SetReqInfo(ctx, reqInfo), fmt.Errorf("Total drives to be healed %d", len(aggHealStateResult.HealDisks)))
}
return HealthResult{
Healthy: len(aggHealStateResult.HealDisks) == 0,
HealingDrives: len(aggHealStateResult.HealDisks),

@ -73,11 +73,14 @@ func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) {
return madmin.BgHealState{}, false
}
objAPI := newObjectLayerWithoutSafeModeFn()
if objAPI == nil {
return madmin.BgHealState{}, false
}
var healDisks []string
for _, eps := range globalBackgroundHealState.getHealLocalDisks() {
for _, ep := range eps {
healDisks = append(healDisks, ep.String())
}
for _, ep := range getLocalDisksToHeal() {
healDisks = append(healDisks, ep.String())
}
return madmin.BgHealState{

@ -43,7 +43,9 @@ func ClusterCheckHandler(w http.ResponseWriter, r *http.Request) {
}
if !result.Healthy {
// return how many drives are being healed if any
w.Header().Set("X-Minio-Healing-Drives", strconv.Itoa(result.HealingDrives))
if result.HealingDrives > 0 {
w.Header().Set("X-Minio-Healing-Drives", strconv.Itoa(result.HealingDrives))
}
// As a maintenance call we are purposefully asked to be taken
// down, this is for orchestrators to know if we can safely
// take this server down, return appropriate error.

Loading…
Cancel
Save