diff --git a/cmd/background-heal-ops.go b/cmd/background-heal-ops.go index c77bfe040..a5f511c50 100644 --- a/cmd/background-heal-ops.go +++ b/cmd/background-heal-ops.go @@ -21,7 +21,6 @@ import ( "path" "time" - "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/madmin" ) @@ -129,24 +128,5 @@ func healDiskFormat(ctx context.Context, objAPI ObjectLayer, opts madmin.HealOpt return madmin.HealResultItem{}, err } - // Healing succeeded notify the peers to reload format and re-initialize disks. - // We will not notify peers if healing is not required. - if err == nil { - // Notify servers in background and retry if needed. - go func() { - retry: - for _, nerr := range globalNotificationSys.ReloadFormat(opts.DryRun) { - if nerr.Err != nil { - if nerr.Err.Error() == errServerNotInitialized.Error() { - time.Sleep(time.Second) - goto retry - } - logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) - logger.LogIf(ctx, nerr.Err) - } - } - }() - } - return res, nil } diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index 10fd76b9c..ce04d7179 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -106,23 +106,26 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, bgSeq *healS case <-time.After(defaultMonitorNewDiskInterval): waitForLowHTTPReq(int32(globalEndpoints.NEndpoints()), time.Second) - var erasureSetInZoneEndpointToHeal = make([]map[int]Endpoint, len(z.zones)) - for i := range z.zones { - erasureSetInZoneEndpointToHeal[i] = map[int]Endpoint{} - } - + var erasureSetInZoneEndpointToHeal []map[int]Endpoints healDisks := globalBackgroundHealState.getHealLocalDisks() - // heal only if new disks found. - for _, endpoint := range healDisks { - logger.Info(fmt.Sprintf("Found drives to heal %d, proceeding to heal content...", - len(healDisks))) - + if len(healDisks) > 0 { // Reformat disks bgSeq.sourceCh <- healSource{bucket: SlashSeparator} // Ensure that reformatting disks is finished bgSeq.sourceCh <- healSource{bucket: nopHeal} + logger.Info(fmt.Sprintf("Found drives to heal %d, proceeding to heal content...", + len(healDisks))) + + erasureSetInZoneEndpointToHeal = make([]map[int]Endpoints, len(z.zones)) + for i := range z.zones { + erasureSetInZoneEndpointToHeal[i] = map[int]Endpoints{} + } + } + + // heal only if new disks found. + for _, endpoint := range healDisks { // Load the new format of this passed endpoint _, format, err := connectEndpoint(endpoint) if err != nil { @@ -142,20 +145,22 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, bgSeq *healS continue } - erasureSetInZoneEndpointToHeal[zoneIdx][setIndex] = endpoint + erasureSetInZoneEndpointToHeal[zoneIdx][setIndex] = append(erasureSetInZoneEndpointToHeal[zoneIdx][setIndex], endpoint) } for i, setMap := range erasureSetInZoneEndpointToHeal { - for setIndex, endpoint := range setMap { - logger.Info("Healing disk '%s' on %s zone", endpoint, humanize.Ordinal(i+1)) + for setIndex, endpoints := range setMap { + for _, ep := range endpoints { + logger.Info("Healing disk '%s' on %s zone", ep, humanize.Ordinal(i+1)) - if err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex], z.zones[i].setDriveCount); err != nil { - logger.LogIf(ctx, err) - continue - } + if err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex], z.zones[i].setDriveCount); err != nil { + logger.LogIf(ctx, err) + continue + } - // Only upon success pop the healed disk. - globalBackgroundHealState.popHealLocalDisks(endpoint) + // Only upon success pop the healed disk. + globalBackgroundHealState.popHealLocalDisks(ep) + } } } } diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 7df3e2296..2cadac7c1 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -80,9 +80,6 @@ type erasureSets struct { disksConnectEvent chan diskConnectInfo - // Done channel to control monitoring loop. - disksConnectDoneCh chan struct{} - // Distribution algorithm of choice. distributionAlgo string deploymentID [16]byte @@ -115,7 +112,7 @@ func (s *erasureSets) getDiskMap() map[string]StorageAPI { for i := 0; i < s.setCount; i++ { for j := 0; j < s.setDriveCount; j++ { disk := s.erasureDisks[i][j] - if disk == nil { + if disk == OfflineDisk { continue } if !disk.IsOnline() { @@ -211,14 +208,16 @@ func (s *erasureSets) connectDisks() { disk, format, err := connectEndpoint(endpoint) if err != nil { if endpoint.IsLocal && errors.Is(err, errUnformattedDisk) { - logger.Info(fmt.Sprintf("Found unformatted drive %s, attempting to heal...", endpoint)) globalBackgroundHealState.pushHealLocalDisks(endpoint) + logger.Info(fmt.Sprintf("Found unformatted drive %s, attempting to heal...", endpoint)) } else { printEndpointError(endpoint, err, true) } return } + s.erasureDisksMu.RLock() setIndex, diskIndex, err := findDiskIndex(s.format, format) + s.erasureDisksMu.RUnlock() if err != nil { if endpoint.IsLocal { globalBackgroundHealState.pushHealLocalDisks(endpoint) @@ -256,8 +255,6 @@ func (s *erasureSets) monitorAndConnectEndpoints(ctx context.Context, monitorInt select { case <-ctx.Done(): return - case <-s.disksConnectDoneCh: - return case <-time.After(monitorInterval): s.connectDisks() } @@ -318,7 +315,6 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto listTolerancePerSet: setDriveCount / 2, format: format, disksConnectEvent: make(chan diskConnectInfo), - disksConnectDoneCh: make(chan struct{}), distributionAlgo: format.Erasure.DistributionAlgo, deploymentID: uuid.MustParse(format.ID), pool: NewMergeWalkPool(globalMergeLookupTimeout), @@ -1191,16 +1187,12 @@ func (s *erasureSets) ReloadFormat(ctx context.Context, dryRun bool) (err error) return err } - // kill the monitoring loop such that we stop writing - // to indicate that we will re-initialize everything - // with new format. - s.disksConnectDoneCh <- struct{}{} + s.erasureDisksMu.Lock() // Replace with new reference format. s.format = refFormat // Close all existing disks and reconnect all the disks. - s.erasureDisksMu.Lock() for _, disk := range storageDisks { if disk == nil { continue @@ -1223,10 +1215,8 @@ func (s *erasureSets) ReloadFormat(ctx context.Context, dryRun bool) (err error) s.endpointStrings[m*s.setDriveCount+n] = disk.String() s.erasureDisks[m][n] = disk } - s.erasureDisksMu.Unlock() - // Restart monitoring loop to monitor reformatted disks again. - go s.monitorAndConnectEndpoints(GlobalContext, defaultMonitorConnectEndpointInterval) + s.erasureDisksMu.Unlock() return nil } @@ -1400,16 +1390,12 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H return madmin.HealResultItem{}, err } - // kill the monitoring loop such that we stop writing - // to indicate that we will re-initialize everything - // with new format. - s.disksConnectDoneCh <- struct{}{} + s.erasureDisksMu.Lock() // Replace with new reference format. s.format = refFormat // Disconnect/relinquish all existing disks, lockers and reconnect the disks, lockers. - s.erasureDisksMu.Lock() for _, disk := range storageDisks { if disk == nil { continue @@ -1432,10 +1418,8 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H s.endpointStrings[m*s.setDriveCount+n] = disk.String() s.erasureDisks[m][n] = disk } - s.erasureDisksMu.Unlock() - // Restart our monitoring loop to start monitoring newly formatted disks. - go s.monitorAndConnectEndpoints(GlobalContext, defaultMonitorConnectEndpointInterval) + s.erasureDisksMu.Unlock() } return res, nil diff --git a/cmd/erasure-zones.go b/cmd/erasure-zones.go index 3545cec08..432797781 100644 --- a/cmd/erasure-zones.go +++ b/cmd/erasure-zones.go @@ -18,6 +18,7 @@ package cmd import ( "context" + "errors" "fmt" "io" "math/rand" @@ -1608,13 +1609,8 @@ func (z *erasureZones) ListBuckets(ctx context.Context) (buckets []BucketInfo, e } func (z *erasureZones) ReloadFormat(ctx context.Context, dryRun bool) error { - // Acquire lock on format.json - formatLock := z.NewNSLock(ctx, minioMetaBucket, formatConfigFile) - if err := formatLock.GetRLock(globalOperationTimeout); err != nil { - return err - } - defer formatLock.RUnlock() - + // No locks needed since reload happens in HealFormat under + // write lock across all nodes. for _, zone := range z.zones { if err := zone.ReloadFormat(ctx, dryRun); err != nil { return err @@ -1639,13 +1635,13 @@ func (z *erasureZones) HealFormat(ctx context.Context, dryRun bool) (madmin.Heal var countNoHeal int for _, zone := range z.zones { result, err := zone.HealFormat(ctx, dryRun) - if err != nil && err != errNoHealRequired { + if err != nil && !errors.Is(err, errNoHealRequired) { logger.LogIf(ctx, err) continue } // Count errNoHealRequired across all zones, // to return appropriate error to the caller - if err == errNoHealRequired { + if errors.Is(err, errNoHealRequired) { countNoHeal++ } r.DiskCount += result.DiskCount @@ -1653,10 +1649,21 @@ func (z *erasureZones) HealFormat(ctx context.Context, dryRun bool) (madmin.Heal r.Before.Drives = append(r.Before.Drives, result.Before.Drives...) r.After.Drives = append(r.After.Drives, result.After.Drives...) } + + // Healing succeeded notify the peers to reload format and re-initialize disks. + // We will not notify peers if healing is not required. + for _, nerr := range globalNotificationSys.ReloadFormat(dryRun) { + if nerr.Err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(ctx, nerr.Err) + } + } + // No heal returned by all zones, return errNoHealRequired if countNoHeal == len(z.zones) { return r, errNoHealRequired } + return r, nil } diff --git a/cmd/erasure.go b/cmd/erasure.go index bf4c8077c..01cfab5d9 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -140,6 +140,7 @@ func getDisksInfo(disks []StorageAPI, endpoints []string) (disksInfo []madmin.Di index := index g.Go(func() error { if disks[index] == OfflineDisk { + logger.LogIf(GlobalContext, fmt.Errorf("%s: %s", errDiskNotFound, endpoints[index])) disksInfo[index] = madmin.Disk{ State: diskErrToDriveState(errDiskNotFound), Endpoint: endpoints[index], @@ -149,11 +150,9 @@ func getDisksInfo(disks []StorageAPI, endpoints []string) (disksInfo []madmin.Di } info, err := disks[index].DiskInfo(context.TODO()) if err != nil { - if !IsErr(err, baseErrs...) { - reqInfo := (&logger.ReqInfo{}).AppendTags("disk", disks[index].String()) - ctx := logger.SetReqInfo(GlobalContext, reqInfo) - logger.LogIf(ctx, err) - } + reqInfo := (&logger.ReqInfo{}).AppendTags("disk", disks[index].String()) + ctx := logger.SetReqInfo(GlobalContext, reqInfo) + logger.LogIf(ctx, err) disksInfo[index] = madmin.Disk{ State: diskErrToDriveState(err), Endpoint: endpoints[index], diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index e67daf848..4a22a6263 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -481,11 +481,7 @@ func (client *peerRESTClient) DeleteBucketMetadata(bucket string) error { // ReloadFormat - reload format on the peer node. func (client *peerRESTClient) ReloadFormat(dryRun bool) error { values := make(url.Values) - if dryRun { - values.Set(peerRESTDryRun, "true") - } else { - values.Set(peerRESTDryRun, "false") - } + values.Set(peerRESTDryRun, strconv.FormatBool(dryRun)) respBody, err := client.call(peerRESTMethodReloadFormat, values, nil, -1) if err != nil { diff --git a/cmd/rest/client.go b/cmd/rest/client.go index 148226125..6b91b5437 100644 --- a/cmd/rest/client.go +++ b/cmd/rest/client.go @@ -103,9 +103,6 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod } req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.url.String()+method+querySep+values.Encode(), body) if err != nil { - if xnet.IsNetworkOrHostDown(err) { - c.MarkOffline() - } return nil, &NetworkError{err} } req.Header.Set("Authorization", "Bearer "+c.newAuthToken(req.URL.Query().Encode())) @@ -173,7 +170,6 @@ func NewClient(url *url.URL, newCustomTransport func() *http.Transport, newAuthT url: url, newAuthToken: newAuthToken, connected: online, - MaxErrResponseSize: 4096, HealthCheckInterval: 200 * time.Millisecond, HealthCheckTimeout: time.Second, @@ -191,21 +187,18 @@ func (c *Client) MarkOffline() { // Start goroutine that will attempt to reconnect. // If server is already trying to reconnect this will have no effect. if c.HealthCheckFn != nil && atomic.CompareAndSwapInt32(&c.connected, online, offline) { - if c.httpIdleConnsCloser != nil { - c.httpIdleConnsCloser() - } - go func() { + go func(healthFunc func() bool) { ticker := time.NewTicker(c.HealthCheckInterval) defer ticker.Stop() for range ticker.C { - if status := atomic.LoadInt32(&c.connected); status == closed { + if atomic.LoadInt32(&c.connected) == closed { return } - if c.HealthCheckFn() { + if healthFunc() { atomic.CompareAndSwapInt32(&c.connected, offline, online) return } } - }() + }(c.HealthCheckFn) } } diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index b4603a7af..73dd70891 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -106,6 +106,7 @@ func (s *storageRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool s.writeErrorResponse(w, err) return false } + diskID := r.URL.Query().Get(storageRESTDiskID) if diskID == "" { // Request sent empty disk-id, we allow the request @@ -113,6 +114,7 @@ func (s *storageRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool // or create format.json return true } + storedDiskID, err := s.storage.GetDiskID() if err != nil { s.writeErrorResponse(w, err)