From 5c15656c559c0567524ae858d6b95c8d3ccb0f92 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 10 Jul 2020 09:26:21 -0700 Subject: [PATCH] support bootstrap client to use healthcheck restClient (#10004) - reduce locker timeout for early transaction lock for more eagerness to timeout - reduce leader lock timeout to range from 30sec to 1minute - add additional log message during bootstrap phase --- cmd/bootstrap-peer-server.go | 72 +++++++++++++++++------------------- cmd/global-heal.go | 9 +++-- cmd/server-main.go | 8 ++-- 3 files changed, 43 insertions(+), 46 deletions(-) diff --git a/cmd/bootstrap-peer-server.go b/cmd/bootstrap-peer-server.go index 4692d15ad..b8144293b 100644 --- a/cmd/bootstrap-peer-server.go +++ b/cmd/bootstrap-peer-server.go @@ -20,12 +20,12 @@ import ( "context" "crypto/tls" "encoding/json" + "errors" "fmt" "io" "net/http" "net/url" "runtime" - "sync/atomic" "time" "github.com/gorilla/mux" @@ -43,6 +43,7 @@ const ( ) const ( + bootstrapRESTMethodHealth = "/health" bootstrapRESTMethodVerify = "/verify" ) @@ -94,6 +95,9 @@ func getServerSystemCfg() ServerSystemConfig { } } +// HealthHandler returns success if request is valid +func (b *bootstrapRESTServer) HealthHandler(w http.ResponseWriter, r *http.Request) {} + func (b *bootstrapRESTServer) VerifyHandler(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "VerifyHandler") cfg := getServerSystemCfg() @@ -106,6 +110,9 @@ func registerBootstrapRESTHandlers(router *mux.Router) { server := &bootstrapRESTServer{} subrouter := router.PathPrefix(bootstrapRESTPrefix).Subrouter() + subrouter.Methods(http.MethodPost).Path(bootstrapRESTVersionPrefix + bootstrapRESTMethodHealth).HandlerFunc( + httpTraceHdrs(server.HealthHandler)) + subrouter.Methods(http.MethodPost).Path(bootstrapRESTVersionPrefix + bootstrapRESTMethodVerify).HandlerFunc( httpTraceHdrs(server.VerifyHandler)) } @@ -114,29 +121,12 @@ func registerBootstrapRESTHandlers(router *mux.Router) { type bootstrapRESTClient struct { endpoint Endpoint restClient *rest.Client - connected int32 -} - -// Reconnect to a bootstrap rest server.k -func (client *bootstrapRESTClient) reConnect() { - atomic.StoreInt32(&client.connected, 1) -} - -// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected -// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() -// after verifying format.json -func (client *bootstrapRESTClient) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { - return client.callWithContext(GlobalContext, method, values, body, length) } // Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected // permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() // after verifying format.json func (client *bootstrapRESTClient) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { - if !client.IsOnline() { - client.reConnect() - } - if values == nil { values = make(url.Values) } @@ -146,10 +136,6 @@ func (client *bootstrapRESTClient) callWithContext(ctx context.Context, method s return respBody, nil } - if isNetworkError(err) { - atomic.StoreInt32(&client.connected, 0) - } - return nil, err } @@ -158,24 +144,12 @@ func (client *bootstrapRESTClient) String() string { return client.endpoint.String() } -// IsOnline - returns whether RPC client failed to connect or not. -func (client *bootstrapRESTClient) IsOnline() bool { - return atomic.LoadInt32(&client.connected) == 1 -} - -// Close - marks the client as closed. -func (client *bootstrapRESTClient) Close() error { - atomic.StoreInt32(&client.connected, 0) - client.restClient.Close() - return nil -} - // Verify - fetches system server config. -func (client *bootstrapRESTClient) Verify(srcCfg ServerSystemConfig) (err error) { +func (client *bootstrapRESTClient) Verify(ctx context.Context, srcCfg ServerSystemConfig) (err error) { if newObjectLayerFn() != nil { return nil } - respBody, err := client.call(bootstrapRESTMethodVerify, nil, nil, -1) + respBody, err := client.callWithContext(ctx, bootstrapRESTMethodVerify, nil, nil, -1) if err != nil { return } @@ -187,14 +161,17 @@ func (client *bootstrapRESTClient) Verify(srcCfg ServerSystemConfig) (err error) return srcCfg.Diff(recvCfg) } -func verifyServerSystemConfig(endpointZones EndpointZones) error { +func verifyServerSystemConfig(ctx context.Context, endpointZones EndpointZones) error { srcCfg := getServerSystemCfg() clnts := newBootstrapRESTClients(endpointZones) var onlineServers int + var offlineEndpoints []string + var retries int for onlineServers < len(clnts)/2 { for _, clnt := range clnts { - if err := clnt.Verify(srcCfg); err != nil { + if err := clnt.Verify(ctx, srcCfg); err != nil { if isNetworkError(err) { + offlineEndpoints = append(offlineEndpoints, clnt.String()) continue } return fmt.Errorf("%s as has incorrect configuration: %w", clnt.String(), err) @@ -204,6 +181,14 @@ func verifyServerSystemConfig(endpointZones EndpointZones) error { // Sleep for a while - so that we don't go into // 100% CPU when half the endpoints are offline. time.Sleep(500 * time.Millisecond) + retries++ + // after 5 retries start logging that servers are not reachable yet + if retries >= 5 { + logger.Info(fmt.Sprintf("Waiting for atleast %d servers to be online for bootstrap check", len(clnts)/2)) + logger.Info(fmt.Sprintf("Following servers are currently offline or unreachable %s", offlineEndpoints)) + retries = 0 // reset to log again after 5 retries. + } + offlineEndpoints = nil } return nil } @@ -253,5 +238,14 @@ func newBootstrapRESTClient(endpoint Endpoint) (*bootstrapRESTClient, error) { return nil, err } - return &bootstrapRESTClient{endpoint: endpoint, restClient: restClient, connected: 1}, nil + restClient.HealthCheckFn = func() bool { + ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout) + respBody, err := restClient.CallWithContext(ctx, bootstrapRESTMethodHealth, nil, nil, -1) + xhttp.DrainBody(respBody) + cancel() + var ne *rest.NetworkError + return !errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &ne) + } + + return &bootstrapRESTClient{endpoint: endpoint, restClient: restClient}, nil } diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 1a3f3fa12..6e465c152 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -26,11 +26,14 @@ import ( const ( bgHealingUUID = "0000-0000-0000-0000" - leaderTick = time.Hour - healInterval = 30 * 24 * time.Hour + // sleep for an hour after a lock timeout + // before retrying to acquire lock again. + leaderLockTimeoutSleepInterval = time.Hour + // heal entire namespace once in 30 days + healInterval = 30 * 24 * time.Hour ) -var leaderLockTimeout = newDynamicTimeout(time.Minute, time.Minute) +var leaderLockTimeout = newDynamicTimeout(30*time.Second, time.Minute) // NewBgHealSequence creates a background healing sequence // operation which crawls all objects and heal them. diff --git a/cmd/server-main.go b/cmd/server-main.go index 0ef7d8a75..3db11ce2c 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -228,7 +228,7 @@ func initSafeMode(ctx context.Context, newObject ObjectLayer) (err error) { for range retry.NewTimer(retryCtx) { // let one of the server acquire the lock, if not let them timeout. // which shall be retried again by this loop. - if err = txnLk.GetLock(newDynamicTimeout(1*time.Second, 10*time.Second)); err != nil { + if err = txnLk.GetLock(newDynamicTimeout(1*time.Second, 3*time.Second)); err != nil { logger.Info("Waiting for all MinIO sub-systems to be initialized.. trying to acquire lock") continue } @@ -348,7 +348,7 @@ func startBackgroundOps(ctx context.Context, objAPI ObjectLayer) { for { err := locker.GetLock(leaderLockTimeout) if err != nil { - time.Sleep(leaderTick) + time.Sleep(leaderLockTimeoutSleepInterval) continue } break @@ -482,11 +482,11 @@ func serverMain(ctx *cli.Context) { if globalIsDistErasure && globalEndpoints.FirstLocal() { for { // Additionally in distributed setup, validate the setup and configuration. - err := verifyServerSystemConfig(globalEndpoints) + err := verifyServerSystemConfig(GlobalContext, globalEndpoints) if err == nil { break } - logger.LogIf(GlobalContext, err, "Unable to initialize distributed setup") + logger.LogIf(GlobalContext, err, "Unable to initialize distributed setup, retrying.. after 5 seconds") select { case <-GlobalContext.Done(): return