From 029758cb2007c90e5e9c923d1a40ea792d1b4116 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 26 Oct 2020 10:29:29 -0700 Subject: [PATCH] fix: retain the previous UUID for newly replaced drives (#10759) only newly replaced drives get the new `format.json`, this avoids disks reloading their in-memory reference format, ensures that drives are online without reloading the in-memory reference format. keeping reference format in-tact means UUIDs never change once they are formatted. --- cmd/background-newdisks-heal-ops.go | 76 ------------------------ cmd/bootstrap-peer-server.go | 12 +--- cmd/erasure-decode_test.go | 5 +- cmd/erasure-sets.go | 75 ++++++++++++------------ cmd/format-erasure.go | 91 +++++++---------------------- cmd/format-erasure_test.go | 58 +----------------- cmd/local-locker.go | 15 ++--- cmd/lock-rest-server.go | 6 +- cmd/object-api-common.go | 14 ++++- cmd/storage-rest-client.go | 20 ++++--- cmd/storage-rest_test.go | 2 +- 11 files changed, 100 insertions(+), 274 deletions(-) diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index f4c35a072..5bfc28242 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -73,8 +73,6 @@ func initAutoHeal(ctx context.Context, objAPI ObjectLayer) { } } - go monitorLocalDisksInconsistentAndHeal(ctx, z, bgSeq) - go monitorLocalDisksAndHeal(ctx, z, bgSeq) } @@ -98,50 +96,6 @@ func getLocalDisksToHeal() (disksToHeal Endpoints) { } -func getLocalDisksToHealInconsistent() (refFormats []*formatErasureV3, diskFormats [][]*formatErasureV3, disksToHeal [][]StorageAPI) { - disksToHeal = make([][]StorageAPI, len(globalEndpoints)) - diskFormats = make([][]*formatErasureV3, len(globalEndpoints)) - refFormats = make([]*formatErasureV3, len(globalEndpoints)) - for k, ep := range globalEndpoints { - disksToHeal[k] = make([]StorageAPI, len(ep.Endpoints)) - diskFormats[k] = make([]*formatErasureV3, len(ep.Endpoints)) - formats := make([]*formatErasureV3, len(ep.Endpoints)) - storageDisks, _ := initStorageDisksWithErrors(ep.Endpoints) - for i, disk := range storageDisks { - if disk != nil { - format, err := loadFormatErasure(disk) - if err != nil { - // any error we don't care proceed. - continue - } - formats[i] = format - } - } - refFormat, err := getFormatErasureInQuorum(formats) - if err != nil { - logger.LogIf(GlobalContext, fmt.Errorf("No erasured disks are in quorum or too many disks are offline - please investigate immediately")) - continue - } - // We have obtained reference format - check if disks are inconsistent - for i, format := range formats { - if format == nil { - continue - } - if err := formatErasureV3Check(refFormat, format); err != nil { - if errors.Is(err, errInconsistentDisk) { - // Found inconsistencies - check which disk it is. - if storageDisks[i] != nil && storageDisks[i].IsLocal() { - disksToHeal[k][i] = storageDisks[i] - } - } - } - } - refFormats[k] = refFormat - diskFormats[k] = formats - } - return refFormats, diskFormats, disksToHeal -} - func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) { // Run the background healer globalBackgroundHealRoutine = newHealRoutine() @@ -150,36 +104,6 @@ func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) { globalBackgroundHealState.LaunchNewHealSequence(newBgHealSequence()) } -// monitorLocalDisksInconsistentAndHeal - ensures that inconsistent -// disks are healed appropriately. -func monitorLocalDisksInconsistentAndHeal(ctx context.Context, z *erasureServerSets, bgSeq *healSequence) { - // Perform automatic disk healing when a disk is found to be inconsistent. - for { - select { - case <-ctx.Done(): - return - case <-time.After(defaultMonitorNewDiskInterval): - waitForLowHTTPReq(int32(globalEndpoints.NEndpoints()), time.Second) - - refFormats, diskFormats, localDisksHeal := getLocalDisksToHealInconsistent() - for k := range refFormats { - for j, disk := range localDisksHeal[k] { - if disk == nil { - continue - } - format := diskFormats[k][j].Clone() - format.Erasure.Sets = refFormats[k].Erasure.Sets - if err := saveFormatErasure(disk, format, true); err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable fix inconsistent format for drive %s: %w", disk, err)) - continue - } - globalBackgroundHealState.pushHealLocalDisks(disk.Endpoint()) - } - } - } - } -} - // monitorLocalDisksAndHeal - ensures that detected new disks are healed // 1. Only the concerned erasure set will be listed and healed // 2. Only the node hosting the disk is responsible to perform the heal diff --git a/cmd/bootstrap-peer-server.go b/cmd/bootstrap-peer-server.go index 22af35e23..fb29abdbb 100644 --- a/cmd/bootstrap-peer-server.go +++ b/cmd/bootstrap-peer-server.go @@ -20,7 +20,6 @@ import ( "context" "crypto/tls" "encoding/json" - "errors" "fmt" "io" "net/http" @@ -235,16 +234,7 @@ func newBootstrapRESTClient(endpoint Endpoint) *bootstrapRESTClient { trFn := newInternodeHTTPTransport(tlsConfig, rest.DefaultTimeout) restClient := rest.NewClient(serverURL, trFn, newAuthToken) - restClient.HealthCheckFn = func() bool { - ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout) - // Instantiate a new rest client for healthcheck - // to avoid recursive healthCheckFn() - respBody, err := rest.NewClient(serverURL, trFn, newAuthToken).Call(ctx, bootstrapRESTMethodHealth, nil, nil, -1) - xhttp.DrainBody(respBody) - cancel() - var ne *rest.NetworkError - return !errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &ne) - } + restClient.HealthCheckFn = nil return &bootstrapRESTClient{endpoint: endpoint, restClient: restClient} } diff --git a/cmd/erasure-decode_test.go b/cmd/erasure-decode_test.go index 3af1bdbce..7797ccced 100644 --- a/cmd/erasure-decode_test.go +++ b/cmd/erasure-decode_test.go @@ -203,8 +203,9 @@ func TestErasureDecode(t *testing.T) { // This test is t.Skip()ed as it a long time to run, hence should be run // explicitly after commenting out t.Skip() func TestErasureDecodeRandomOffsetLength(t *testing.T) { - // Comment the following line to run this test. - t.SkipNow() + if testing.Short() { + t.Skip() + } // Initialize environment needed for the test. dataBlocks := 7 parityBlocks := 7 diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 61f274bdb..fd0dc256c 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -133,7 +133,7 @@ func (s *erasureSets) getDiskMap() map[string]StorageAPI { // Initializes a new StorageAPI from the endpoint argument, returns // StorageAPI and also `format` which exists on the disk. func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatErasureV3, error) { - disk, err := newStorageAPI(endpoint) + disk, err := newStorageAPIWithoutHealthCheck(endpoint) if err != nil { return nil, nil, err } @@ -221,7 +221,7 @@ func (s *erasureSets) connectDisks() { } return } - if endpoint.IsLocal && disk.Healing() { + if disk.IsLocal() && disk.Healing() { globalBackgroundHealState.pushHealLocalDisks(disk.Endpoint()) logger.Info(fmt.Sprintf("Found the drive %s that needs healing, attempting to heal...", disk)) } @@ -232,13 +232,24 @@ func (s *erasureSets) connectDisks() { printEndpointError(endpoint, err, false) return } - disk.SetDiskID(format.Erasure.This) s.erasureDisksMu.Lock() if s.erasureDisks[setIndex][diskIndex] != nil { s.erasureDisks[setIndex][diskIndex].Close() } - s.erasureDisks[setIndex][diskIndex] = disk + if disk.IsLocal() { + disk.SetDiskID(format.Erasure.This) + s.erasureDisks[setIndex][diskIndex] = disk + } else { + // Enable healthcheck disk for remote endpoint. + disk, err = newStorageAPI(endpoint) + if err != nil { + printEndpointError(endpoint, err, false) + return + } + disk.SetDiskID(format.Erasure.This) + s.erasureDisks[setIndex][diskIndex] = disk + } s.endpointStrings[setIndex*s.setDriveCount+diskIndex] = disk.String() s.erasureDisksMu.Unlock() go func(setIndex int) { @@ -1132,7 +1143,7 @@ func formatsToDrivesInfo(endpoints Endpoints, formats []*formatErasureV3, sErrs // Reloads the format from the disk, usually called by a remote peer notifier while // healing in a distributed setup. func (s *erasureSets) ReloadFormat(ctx context.Context, dryRun bool) (err error) { - storageDisks, errs := initStorageDisksWithErrors(s.endpoints) + storageDisks, errs := initStorageDisksWithErrorsWithoutHealthCheck(s.endpoints) for i, err := range errs { if err != nil && err != errDiskNotFound { return fmt.Errorf("Disk %s: %w", s.endpoints[i], err) @@ -1182,6 +1193,15 @@ func (s *erasureSets) ReloadFormat(ctx context.Context, dryRun bool) (err error) } s.endpointStrings[m*s.setDriveCount+n] = disk.String() + if !disk.IsLocal() { + // Enable healthcheck disk for remote endpoint. + disk, err = newStorageAPI(disk.Endpoint()) + if err != nil { + continue + } + disk.SetDiskID(diskID) + } + s.erasureDisks[m][n] = disk } @@ -1249,7 +1269,7 @@ func markRootDisksAsDown(storageDisks []StorageAPI, errs []error) { // HealFormat - heals missing `format.json` on fresh unformatted disks. func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealResultItem, err error) { - storageDisks, errs := initStorageDisksWithErrors(s.endpoints) + storageDisks, errs := initStorageDisksWithErrorsWithoutHealthCheck(s.endpoints) for i, derr := range errs { if derr != nil && derr != errDiskNotFound { return madmin.HealResultItem{}, fmt.Errorf("Disk %s: %w", s.endpoints[i], derr) @@ -1298,40 +1318,9 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H return res, errNoHealRequired } - // Mark all UUIDs which might be offline, use list - // of formats to mark them appropriately. - markUUIDsOffline(refFormat, formats, sErrs) - // Initialize a new set of set formats which will be written to disk. newFormatSets := newHealFormatSets(refFormat, s.setCount, s.setDriveCount, formats, sErrs) - // Look for all offline/unformatted disks in our reference format, - // such that we can fill them up with new UUIDs, this looping also - // ensures that the replaced disks allocated evenly across all sets. - // Making sure that the redundancy is not lost. - for i := range refFormat.Erasure.Sets { - for j := range refFormat.Erasure.Sets[i] { - if refFormat.Erasure.Sets[i][j] == offlineDiskUUID { - for l := range newFormatSets[i] { - if newFormatSets[i][l] == nil { - continue - } - if newFormatSets[i][l].Erasure.This == "" { - newFormatSets[i][l].Erasure.This = mustGetUUID() - refFormat.Erasure.Sets[i][j] = newFormatSets[i][l].Erasure.This - for m, v := range res.After.Drives { - if v.Endpoint == s.endpoints.GetString(i*s.setDriveCount+l) { - res.After.Drives[m].UUID = newFormatSets[i][l].Erasure.This - res.After.Drives[m].State = madmin.DriveStateOk - } - } - break - } - } - } - } - } - if !dryRun { var tmpNewFormats = make([]*formatErasureV3, s.setCount*s.setDriveCount) for i := range newFormatSets { @@ -1339,8 +1328,9 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H if newFormatSets[i][j] == nil { continue } + res.After.Drives[i*s.setDriveCount+j].UUID = newFormatSets[i][j].Erasure.This + res.After.Drives[i*s.setDriveCount+j].State = madmin.DriveStateOk tmpNewFormats[i*s.setDriveCount+j] = newFormatSets[i][j] - tmpNewFormats[i*s.setDriveCount+j].Erasure.Sets = refFormat.Erasure.Sets } } @@ -1382,7 +1372,16 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H } s.endpointStrings[m*s.setDriveCount+n] = disk.String() + if !disk.IsLocal() { + // Enable healthcheck disk for remote endpoint. + disk, err = newStorageAPI(disk.Endpoint()) + if err != nil { + continue + } + disk.SetDiskID(diskID) + } s.erasureDisks[m][n] = disk + } s.erasureDisksMu.Unlock() diff --git a/cmd/format-erasure.go b/cmd/format-erasure.go index 3cd03c86c..24c646875 100644 --- a/cmd/format-erasure.go +++ b/cmd/format-erasure.go @@ -716,7 +716,10 @@ func saveFormatErasureAllWithErrs(ctx context.Context, storageDisks []StorageAPI if formats[index] == nil { return errDiskNotFound } - return saveFormatErasure(storageDisks[index], formats[index], errors.Is(fErrs[index], errUnformattedDisk)) + if errors.Is(fErrs[index], errUnformattedDisk) { + return saveFormatErasure(storageDisks[index], formats[index], true) + } + return nil }, index) } @@ -755,6 +758,20 @@ func closeStorageDisks(storageDisks []StorageAPI) { } } +func initStorageDisksWithErrorsWithoutHealthCheck(endpoints Endpoints) ([]StorageAPI, []error) { + // Bootstrap disks. + storageDisks := make([]StorageAPI, len(endpoints)) + g := errgroup.WithNErrs(len(endpoints)) + for index := range endpoints { + index := index + g.Go(func() (err error) { + storageDisks[index], err = newStorageAPIWithoutHealthCheck(endpoints[index]) + return err + }, index) + } + return storageDisks, g.Wait() +} + // Initialize storage disks for each endpoint. // Errors are returned for each endpoint with matching index. func initStorageDisksWithErrors(endpoints Endpoints) ([]StorageAPI, []error) { @@ -905,63 +922,6 @@ func makeFormatErasureMetaVolumes(disk StorageAPI) error { return disk.MakeVolBulk(context.TODO(), minioMetaBucket, minioMetaTmpBucket, minioMetaMultipartBucket, dataUsageBucket) } -// Get all UUIDs which are present in reference format should -// be present in the list of formats provided, those are considered -// as online UUIDs. -func getOnlineUUIDs(refFormat *formatErasureV3, formats []*formatErasureV3) (onlineUUIDs []string) { - for _, format := range formats { - if format == nil { - continue - } - for _, set := range refFormat.Erasure.Sets { - for _, uuid := range set { - if format.Erasure.This == uuid { - onlineUUIDs = append(onlineUUIDs, uuid) - } - } - } - } - return onlineUUIDs -} - -// Look for all UUIDs which are not present in reference format -// but are present in the onlineUUIDs list, construct of list such -// offline UUIDs. -func getOfflineUUIDs(refFormat *formatErasureV3, formats []*formatErasureV3) (offlineUUIDs []string) { - onlineUUIDs := getOnlineUUIDs(refFormat, formats) - for i, set := range refFormat.Erasure.Sets { - for j, uuid := range set { - var found bool - for _, onlineUUID := range onlineUUIDs { - if refFormat.Erasure.Sets[i][j] == onlineUUID { - found = true - } - } - if !found { - offlineUUIDs = append(offlineUUIDs, uuid) - } - } - } - return offlineUUIDs -} - -// Mark all UUIDs that are offline. -func markUUIDsOffline(refFormat *formatErasureV3, formats []*formatErasureV3, errs []error) { - offlineUUIDs := getOfflineUUIDs(refFormat, formats) - for i, set := range refFormat.Erasure.Sets { - setDriveCount := len(set) - for j := range set { - for _, offlineUUID := range offlineUUIDs { - if refFormat.Erasure.Sets[i][j] == offlineUUID && - errors.Is(errs[i*setDriveCount+j], errUnformattedDisk) { - // Unformatted drive gets an offline disk UUID - refFormat.Erasure.Sets[i][j] = offlineDiskUUID - } - } - } - } -} - // Initialize a new set of set formats which will be written to all disks. func newHealFormatSets(refFormat *formatErasureV3, setCount, setDriveCount int, formats []*formatErasureV3, errs []error) [][]*formatErasureV3 { newFormats := make([][]*formatErasureV3, setCount) @@ -970,23 +930,16 @@ func newHealFormatSets(refFormat *formatErasureV3, setCount, setDriveCount int, } for i := range refFormat.Erasure.Sets { for j := range refFormat.Erasure.Sets[i] { - if errs[i*setDriveCount+j] == errUnformattedDisk || errs[i*setDriveCount+j] == nil { + if errors.Is(errs[i*setDriveCount+j], errUnformattedDisk) { newFormats[i][j] = &formatErasureV3{} - newFormats[i][j].Version = refFormat.Version newFormats[i][j].ID = refFormat.ID newFormats[i][j].Format = refFormat.Format + newFormats[i][j].Version = refFormat.Version + newFormats[i][j].Erasure.This = refFormat.Erasure.Sets[i][j] + newFormats[i][j].Erasure.Sets = refFormat.Erasure.Sets newFormats[i][j].Erasure.Version = refFormat.Erasure.Version newFormats[i][j].Erasure.DistributionAlgo = refFormat.Erasure.DistributionAlgo } - if errs[i*setDriveCount+j] == errUnformattedDisk { - newFormats[i][j].Erasure.This = "" - newFormats[i][j].Erasure.Sets = nil - continue - } - if errs[i*setDriveCount+j] == nil { - newFormats[i][j].Erasure.This = formats[i*setDriveCount+j].Erasure.This - newFormats[i][j].Erasure.Sets = nil - } } } return newFormats diff --git a/cmd/format-erasure_test.go b/cmd/format-erasure_test.go index 30a8ed859..f28c60c1a 100644 --- a/cmd/format-erasure_test.go +++ b/cmd/format-erasure_test.go @@ -25,61 +25,6 @@ import ( "testing" ) -// Test get offline/online uuids. -func TestGetUUIDs(t *testing.T) { - fmtV2 := newFormatErasureV3(4, 16, "CRCMOD") - formats := make([]*formatErasureV3, 64) - - for i := 0; i < 4; i++ { - for j := 0; j < 16; j++ { - newFormat := *fmtV2 - newFormat.Erasure.This = fmtV2.Erasure.Sets[i][j] - formats[i*16+j] = &newFormat - } - } - - gotCount := len(getOnlineUUIDs(fmtV2, formats)) - if gotCount != 64 { - t.Errorf("Expected online count '64', got '%d'", gotCount) - } - - for i := 0; i < 4; i++ { - for j := 0; j < 16; j++ { - if j < 4 { - formats[i*16+j] = nil - } - } - } - - gotCount = len(getOnlineUUIDs(fmtV2, formats)) - if gotCount != 48 { - t.Errorf("Expected online count '48', got '%d'", gotCount) - } - - gotCount = len(getOfflineUUIDs(fmtV2, formats)) - if gotCount != 16 { - t.Errorf("Expected offline count '16', got '%d'", gotCount) - } - - var errs []error - for i := 0; i < 4*16; i++ { - errs = append(errs, errUnformattedDisk) - } - - markUUIDsOffline(fmtV2, formats, errs) - gotCount = 0 - for i := range fmtV2.Erasure.Sets { - for j := range fmtV2.Erasure.Sets[i] { - if fmtV2.Erasure.Sets[i][j] == offlineDiskUUID { - gotCount++ - } - } - } - if gotCount != 16 { - t.Errorf("Expected offline count '16', got '%d'", gotCount) - } -} - // tests fixFormatErasureV3 - fix format.json on all disks. func TestFixFormatV3(t *testing.T) { erasureDirs, err := getRandomDisks(8) @@ -480,6 +425,9 @@ func TestNewFormatSets(t *testing.T) { // Check if deployment IDs are preserved. for i := range newFormats { for j := range newFormats[i] { + if newFormats[i][j] == nil { + continue + } if newFormats[i][j].ID != quorumFormat.ID { t.Fatal("Deployment id in the new format is lost") } diff --git a/cmd/local-locker.go b/cmd/local-locker.go index 1b0731cdb..f1ff8199c 100644 --- a/cmd/local-locker.go +++ b/cmd/local-locker.go @@ -114,9 +114,9 @@ func (l *localLocker) Unlock(args dsync.LockArgs) (reply bool, err error) { return reply, fmt.Errorf("Unlock attempted on a read locked entity: %s", args.Resources) } for _, resource := range args.Resources { - lri := l.lockMap[resource] - if !l.removeEntry(resource, args, &lri) { - return false, fmt.Errorf("Unlock unable to find corresponding lock for uid: %s on resource %s", args.UID, resource) + lri, ok := l.lockMap[resource] + if ok { + l.removeEntry(resource, args, &lri) } } return true, nil @@ -180,15 +180,13 @@ func (l *localLocker) RUnlock(args dsync.LockArgs) (reply bool, err error) { resource := args.Resources[0] if lri, reply = l.lockMap[resource]; !reply { // No lock is held on the given name - return reply, fmt.Errorf("RUnlock attempted on an unlocked entity: %s", resource) + return true, nil } if reply = !isWriteLock(lri); !reply { // A write-lock is held, cannot release a read lock return reply, fmt.Errorf("RUnlock attempted on a write locked entity: %s", resource) } - if !l.removeEntry(resource, args, &lri) { - return false, fmt.Errorf("RUnlock unable to find corresponding read lock for uid: %s", args.UID) - } + l.removeEntry(resource, args, &lri) return reply, nil } @@ -243,6 +241,9 @@ func (l *localLocker) Expired(ctx context.Context, args dsync.LockArgs) (expired // Similar to removeEntry but only removes an entry only if the lock entry exists in map. // Caller must hold 'l.mutex' lock. func (l *localLocker) removeEntryIfExists(nlrip nameLockRequesterInfoPair) { + l.mutex.Lock() + defer l.mutex.Unlock() + // Check if entry is still in map (could have been removed altogether by 'concurrent' (R)Unlock of last entry) if lri, ok := l.lockMap[nlrip.name]; ok { // Even if the entry exists, it may not be the same entry which was diff --git a/cmd/lock-rest-server.go b/cmd/lock-rest-server.go index 722f0d318..d430a32fd 100644 --- a/cmd/lock-rest-server.go +++ b/cmd/lock-rest-server.go @@ -33,7 +33,7 @@ import ( const ( // Lock maintenance interval. - lockMaintenanceInterval = 15 * time.Second + lockMaintenanceInterval = 30 * time.Second // Lock validity check interval. lockValidityCheckInterval = 5 * time.Second @@ -311,12 +311,8 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error { // less than the quorum, we have locks expired. if nlripsMap[nlrip.name].locks < nlrip.lri.Quorum { - // The lock is no longer active at server that originated - // the lock, attempt to remove the lock. - globalLockServers[lendpoint].mutex.Lock() // Purge the stale entry if it exists. globalLockServers[lendpoint].removeEntryIfExists(nlrip) - globalLockServers[lendpoint].mutex.Unlock() } } diff --git a/cmd/object-api-common.go b/cmd/object-api-common.go index d26bbefee..79aadaa60 100644 --- a/cmd/object-api-common.go +++ b/cmd/object-api-common.go @@ -57,6 +57,18 @@ func isObjectDir(object string, size int64) bool { return HasSuffix(object, SlashSeparator) && size == 0 } +func newStorageAPIWithoutHealthCheck(endpoint Endpoint) (storage StorageAPI, err error) { + if endpoint.IsLocal { + storage, err := newXLStorage(endpoint) + if err != nil { + return nil, err + } + return &xlStorageDiskIDCheck{storage: storage}, nil + } + + return newStorageRESTClient(endpoint, false), nil +} + // Depending on the disk type network or local, initialize storage API. func newStorageAPI(endpoint Endpoint) (storage StorageAPI, err error) { if endpoint.IsLocal { @@ -67,7 +79,7 @@ func newStorageAPI(endpoint Endpoint) (storage StorageAPI, err error) { return &xlStorageDiskIDCheck{storage: storage}, nil } - return newStorageRESTClient(endpoint), nil + return newStorageRESTClient(endpoint, true), nil } // Cleanup a directory recursively. diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index d8abd8884..24a9062ca 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -661,7 +661,7 @@ func (client *storageRESTClient) Close() error { } // Returns a storage rest client. -func newStorageRESTClient(endpoint Endpoint) *storageRESTClient { +func newStorageRESTClient(endpoint Endpoint, healthcheck bool) *storageRESTClient { serverURL := &url.URL{ Scheme: endpoint.Scheme, Host: endpoint.Host, @@ -678,14 +678,16 @@ func newStorageRESTClient(endpoint Endpoint) *storageRESTClient { trFn := newInternodeHTTPTransport(tlsConfig, rest.DefaultTimeout) restClient := rest.NewClient(serverURL, trFn, newAuthToken) - restClient.HealthCheckFn = func() bool { - ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout) - // Instantiate a new rest client for healthcheck - // to avoid recursive healthCheckFn() - respBody, err := rest.NewClient(serverURL, trFn, newAuthToken).Call(ctx, storageRESTMethodHealth, nil, nil, -1) - xhttp.DrainBody(respBody) - cancel() - return !errors.Is(err, context.DeadlineExceeded) && toStorageErr(err) != errDiskNotFound + if healthcheck { + restClient.HealthCheckFn = func() bool { + ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout) + // Instantiate a new rest client for healthcheck + // to avoid recursive healthCheckFn() + respBody, err := rest.NewClient(serverURL, trFn, newAuthToken).Call(ctx, storageRESTMethodHealth, nil, nil, -1) + xhttp.DrainBody(respBody) + cancel() + return !errors.Is(err, context.DeadlineExceeded) && toStorageErr(err) != errDiskNotFound + } } return &storageRESTClient{endpoint: endpoint, restClient: restClient} diff --git a/cmd/storage-rest_test.go b/cmd/storage-rest_test.go index 376587302..361e03cf1 100644 --- a/cmd/storage-rest_test.go +++ b/cmd/storage-rest_test.go @@ -454,7 +454,7 @@ func newStorageRESTHTTPServerClient(t *testing.T) (*httptest.Server, *storageRES globalServerConfig = newServerConfig() lookupConfigs(globalServerConfig, 0) - restClient := newStorageRESTClient(endpoint) + restClient := newStorageRESTClient(endpoint, false) return httpServer, restClient, prevGlobalServerConfig, endpointPath }