fix: background disk heal, to reload format consistently (#10502)

It was observed in VMware vsphere environment during a
pod replacement, `mc admin info` might report incorrect
offline nodes for the replaced drive. This issue eventually
goes away but requires quite a lot of time for all servers
to be in sync.

This PR fixes this behavior properly.
master
Harshavardhana 4 years ago committed by GitHub
parent d616d8a857
commit e60834838f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      cmd/background-heal-ops.go
  2. 43
      cmd/background-newdisks-heal-ops.go
  3. 32
      cmd/erasure-sets.go
  4. 25
      cmd/erasure-zones.go
  5. 9
      cmd/erasure.go
  6. 6
      cmd/peer-rest-client.go
  7. 15
      cmd/rest/client.go
  8. 2
      cmd/storage-rest-server.go

@ -21,7 +21,6 @@ import (
"path"
"time"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/madmin"
)
@ -129,24 +128,5 @@ func healDiskFormat(ctx context.Context, objAPI ObjectLayer, opts madmin.HealOpt
return madmin.HealResultItem{}, err
}
// Healing succeeded notify the peers to reload format and re-initialize disks.
// We will not notify peers if healing is not required.
if err == nil {
// Notify servers in background and retry if needed.
go func() {
retry:
for _, nerr := range globalNotificationSys.ReloadFormat(opts.DryRun) {
if nerr.Err != nil {
if nerr.Err.Error() == errServerNotInitialized.Error() {
time.Sleep(time.Second)
goto retry
}
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
}()
}
return res, nil
}

@ -106,23 +106,26 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, bgSeq *healS
case <-time.After(defaultMonitorNewDiskInterval):
waitForLowHTTPReq(int32(globalEndpoints.NEndpoints()), time.Second)
var erasureSetInZoneEndpointToHeal = make([]map[int]Endpoint, len(z.zones))
for i := range z.zones {
erasureSetInZoneEndpointToHeal[i] = map[int]Endpoint{}
}
var erasureSetInZoneEndpointToHeal []map[int]Endpoints
healDisks := globalBackgroundHealState.getHealLocalDisks()
// heal only if new disks found.
for _, endpoint := range healDisks {
logger.Info(fmt.Sprintf("Found drives to heal %d, proceeding to heal content...",
len(healDisks)))
if len(healDisks) > 0 {
// Reformat disks
bgSeq.sourceCh <- healSource{bucket: SlashSeparator}
// Ensure that reformatting disks is finished
bgSeq.sourceCh <- healSource{bucket: nopHeal}
logger.Info(fmt.Sprintf("Found drives to heal %d, proceeding to heal content...",
len(healDisks)))
erasureSetInZoneEndpointToHeal = make([]map[int]Endpoints, len(z.zones))
for i := range z.zones {
erasureSetInZoneEndpointToHeal[i] = map[int]Endpoints{}
}
}
// heal only if new disks found.
for _, endpoint := range healDisks {
// Load the new format of this passed endpoint
_, format, err := connectEndpoint(endpoint)
if err != nil {
@ -142,20 +145,22 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, bgSeq *healS
continue
}
erasureSetInZoneEndpointToHeal[zoneIdx][setIndex] = endpoint
erasureSetInZoneEndpointToHeal[zoneIdx][setIndex] = append(erasureSetInZoneEndpointToHeal[zoneIdx][setIndex], endpoint)
}
for i, setMap := range erasureSetInZoneEndpointToHeal {
for setIndex, endpoint := range setMap {
logger.Info("Healing disk '%s' on %s zone", endpoint, humanize.Ordinal(i+1))
for setIndex, endpoints := range setMap {
for _, ep := range endpoints {
logger.Info("Healing disk '%s' on %s zone", ep, humanize.Ordinal(i+1))
if err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex], z.zones[i].setDriveCount); err != nil {
logger.LogIf(ctx, err)
continue
}
if err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex], z.zones[i].setDriveCount); err != nil {
logger.LogIf(ctx, err)
continue
}
// Only upon success pop the healed disk.
globalBackgroundHealState.popHealLocalDisks(endpoint)
// Only upon success pop the healed disk.
globalBackgroundHealState.popHealLocalDisks(ep)
}
}
}
}

@ -80,9 +80,6 @@ type erasureSets struct {
disksConnectEvent chan diskConnectInfo
// Done channel to control monitoring loop.
disksConnectDoneCh chan struct{}
// Distribution algorithm of choice.
distributionAlgo string
deploymentID [16]byte
@ -115,7 +112,7 @@ func (s *erasureSets) getDiskMap() map[string]StorageAPI {
for i := 0; i < s.setCount; i++ {
for j := 0; j < s.setDriveCount; j++ {
disk := s.erasureDisks[i][j]
if disk == nil {
if disk == OfflineDisk {
continue
}
if !disk.IsOnline() {
@ -211,14 +208,16 @@ func (s *erasureSets) connectDisks() {
disk, format, err := connectEndpoint(endpoint)
if err != nil {
if endpoint.IsLocal && errors.Is(err, errUnformattedDisk) {
logger.Info(fmt.Sprintf("Found unformatted drive %s, attempting to heal...", endpoint))
globalBackgroundHealState.pushHealLocalDisks(endpoint)
logger.Info(fmt.Sprintf("Found unformatted drive %s, attempting to heal...", endpoint))
} else {
printEndpointError(endpoint, err, true)
}
return
}
s.erasureDisksMu.RLock()
setIndex, diskIndex, err := findDiskIndex(s.format, format)
s.erasureDisksMu.RUnlock()
if err != nil {
if endpoint.IsLocal {
globalBackgroundHealState.pushHealLocalDisks(endpoint)
@ -256,8 +255,6 @@ func (s *erasureSets) monitorAndConnectEndpoints(ctx context.Context, monitorInt
select {
case <-ctx.Done():
return
case <-s.disksConnectDoneCh:
return
case <-time.After(monitorInterval):
s.connectDisks()
}
@ -318,7 +315,6 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
listTolerancePerSet: setDriveCount / 2,
format: format,
disksConnectEvent: make(chan diskConnectInfo),
disksConnectDoneCh: make(chan struct{}),
distributionAlgo: format.Erasure.DistributionAlgo,
deploymentID: uuid.MustParse(format.ID),
pool: NewMergeWalkPool(globalMergeLookupTimeout),
@ -1191,16 +1187,12 @@ func (s *erasureSets) ReloadFormat(ctx context.Context, dryRun bool) (err error)
return err
}
// kill the monitoring loop such that we stop writing
// to indicate that we will re-initialize everything
// with new format.
s.disksConnectDoneCh <- struct{}{}
s.erasureDisksMu.Lock()
// Replace with new reference format.
s.format = refFormat
// Close all existing disks and reconnect all the disks.
s.erasureDisksMu.Lock()
for _, disk := range storageDisks {
if disk == nil {
continue
@ -1223,10 +1215,8 @@ func (s *erasureSets) ReloadFormat(ctx context.Context, dryRun bool) (err error)
s.endpointStrings[m*s.setDriveCount+n] = disk.String()
s.erasureDisks[m][n] = disk
}
s.erasureDisksMu.Unlock()
// Restart monitoring loop to monitor reformatted disks again.
go s.monitorAndConnectEndpoints(GlobalContext, defaultMonitorConnectEndpointInterval)
s.erasureDisksMu.Unlock()
return nil
}
@ -1400,16 +1390,12 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
return madmin.HealResultItem{}, err
}
// kill the monitoring loop such that we stop writing
// to indicate that we will re-initialize everything
// with new format.
s.disksConnectDoneCh <- struct{}{}
s.erasureDisksMu.Lock()
// Replace with new reference format.
s.format = refFormat
// Disconnect/relinquish all existing disks, lockers and reconnect the disks, lockers.
s.erasureDisksMu.Lock()
for _, disk := range storageDisks {
if disk == nil {
continue
@ -1432,10 +1418,8 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
s.endpointStrings[m*s.setDriveCount+n] = disk.String()
s.erasureDisks[m][n] = disk
}
s.erasureDisksMu.Unlock()
// Restart our monitoring loop to start monitoring newly formatted disks.
go s.monitorAndConnectEndpoints(GlobalContext, defaultMonitorConnectEndpointInterval)
s.erasureDisksMu.Unlock()
}
return res, nil

@ -18,6 +18,7 @@ package cmd
import (
"context"
"errors"
"fmt"
"io"
"math/rand"
@ -1608,13 +1609,8 @@ func (z *erasureZones) ListBuckets(ctx context.Context) (buckets []BucketInfo, e
}
func (z *erasureZones) ReloadFormat(ctx context.Context, dryRun bool) error {
// Acquire lock on format.json
formatLock := z.NewNSLock(ctx, minioMetaBucket, formatConfigFile)
if err := formatLock.GetRLock(globalOperationTimeout); err != nil {
return err
}
defer formatLock.RUnlock()
// No locks needed since reload happens in HealFormat under
// write lock across all nodes.
for _, zone := range z.zones {
if err := zone.ReloadFormat(ctx, dryRun); err != nil {
return err
@ -1639,13 +1635,13 @@ func (z *erasureZones) HealFormat(ctx context.Context, dryRun bool) (madmin.Heal
var countNoHeal int
for _, zone := range z.zones {
result, err := zone.HealFormat(ctx, dryRun)
if err != nil && err != errNoHealRequired {
if err != nil && !errors.Is(err, errNoHealRequired) {
logger.LogIf(ctx, err)
continue
}
// Count errNoHealRequired across all zones,
// to return appropriate error to the caller
if err == errNoHealRequired {
if errors.Is(err, errNoHealRequired) {
countNoHeal++
}
r.DiskCount += result.DiskCount
@ -1653,10 +1649,21 @@ func (z *erasureZones) HealFormat(ctx context.Context, dryRun bool) (madmin.Heal
r.Before.Drives = append(r.Before.Drives, result.Before.Drives...)
r.After.Drives = append(r.After.Drives, result.After.Drives...)
}
// Healing succeeded notify the peers to reload format and re-initialize disks.
// We will not notify peers if healing is not required.
for _, nerr := range globalNotificationSys.ReloadFormat(dryRun) {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
// No heal returned by all zones, return errNoHealRequired
if countNoHeal == len(z.zones) {
return r, errNoHealRequired
}
return r, nil
}

@ -140,6 +140,7 @@ func getDisksInfo(disks []StorageAPI, endpoints []string) (disksInfo []madmin.Di
index := index
g.Go(func() error {
if disks[index] == OfflineDisk {
logger.LogIf(GlobalContext, fmt.Errorf("%s: %s", errDiskNotFound, endpoints[index]))
disksInfo[index] = madmin.Disk{
State: diskErrToDriveState(errDiskNotFound),
Endpoint: endpoints[index],
@ -149,11 +150,9 @@ func getDisksInfo(disks []StorageAPI, endpoints []string) (disksInfo []madmin.Di
}
info, err := disks[index].DiskInfo(context.TODO())
if err != nil {
if !IsErr(err, baseErrs...) {
reqInfo := (&logger.ReqInfo{}).AppendTags("disk", disks[index].String())
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
logger.LogIf(ctx, err)
}
reqInfo := (&logger.ReqInfo{}).AppendTags("disk", disks[index].String())
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
logger.LogIf(ctx, err)
disksInfo[index] = madmin.Disk{
State: diskErrToDriveState(err),
Endpoint: endpoints[index],

@ -481,11 +481,7 @@ func (client *peerRESTClient) DeleteBucketMetadata(bucket string) error {
// ReloadFormat - reload format on the peer node.
func (client *peerRESTClient) ReloadFormat(dryRun bool) error {
values := make(url.Values)
if dryRun {
values.Set(peerRESTDryRun, "true")
} else {
values.Set(peerRESTDryRun, "false")
}
values.Set(peerRESTDryRun, strconv.FormatBool(dryRun))
respBody, err := client.call(peerRESTMethodReloadFormat, values, nil, -1)
if err != nil {

@ -103,9 +103,6 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.url.String()+method+querySep+values.Encode(), body)
if err != nil {
if xnet.IsNetworkOrHostDown(err) {
c.MarkOffline()
}
return nil, &NetworkError{err}
}
req.Header.Set("Authorization", "Bearer "+c.newAuthToken(req.URL.Query().Encode()))
@ -173,7 +170,6 @@ func NewClient(url *url.URL, newCustomTransport func() *http.Transport, newAuthT
url: url,
newAuthToken: newAuthToken,
connected: online,
MaxErrResponseSize: 4096,
HealthCheckInterval: 200 * time.Millisecond,
HealthCheckTimeout: time.Second,
@ -191,21 +187,18 @@ func (c *Client) MarkOffline() {
// Start goroutine that will attempt to reconnect.
// If server is already trying to reconnect this will have no effect.
if c.HealthCheckFn != nil && atomic.CompareAndSwapInt32(&c.connected, online, offline) {
if c.httpIdleConnsCloser != nil {
c.httpIdleConnsCloser()
}
go func() {
go func(healthFunc func() bool) {
ticker := time.NewTicker(c.HealthCheckInterval)
defer ticker.Stop()
for range ticker.C {
if status := atomic.LoadInt32(&c.connected); status == closed {
if atomic.LoadInt32(&c.connected) == closed {
return
}
if c.HealthCheckFn() {
if healthFunc() {
atomic.CompareAndSwapInt32(&c.connected, offline, online)
return
}
}
}()
}(c.HealthCheckFn)
}
}

@ -106,6 +106,7 @@ func (s *storageRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool
s.writeErrorResponse(w, err)
return false
}
diskID := r.URL.Query().Get(storageRESTDiskID)
if diskID == "" {
// Request sent empty disk-id, we allow the request
@ -113,6 +114,7 @@ func (s *storageRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool
// or create format.json
return true
}
storedDiskID, err := s.storage.GetDiskID()
if err != nil {
s.writeErrorResponse(w, err)

Loading…
Cancel
Save