diff --git a/cmd/lock-rest-client.go b/cmd/lock-rest-client.go index 08f0662a8..d9e5ebdcb 100644 --- a/cmd/lock-rest-client.go +++ b/cmd/lock-rest-client.go @@ -19,11 +19,7 @@ package cmd import ( "bytes" "crypto/tls" - "errors" "io" - "sync/atomic" - "time" - "net/url" "github.com/minio/minio/cmd/http" @@ -36,7 +32,6 @@ import ( type lockRESTClient struct { restClient *rest.Client endpoint Endpoint - connected int32 } func toLockError(err error) error { @@ -62,10 +57,6 @@ func (client *lockRESTClient) String() string { // permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() // after verifying format.json func (client *lockRESTClient) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { - if !client.IsOnline() { - return nil, errors.New("Lock rest server node is down") - } - if values == nil { values = make(url.Values) } @@ -75,26 +66,16 @@ func (client *lockRESTClient) call(method string, values url.Values, body io.Rea return respBody, nil } - if isNetworkError(err) { - time.AfterFunc(time.Second, func() { - // After 1 seconds, take this lock client online for a retry. - atomic.StoreInt32(&client.connected, 1) - }) - - atomic.StoreInt32(&client.connected, 0) - } - return nil, toLockError(err) } // IsOnline - returns whether REST client failed to connect or not. func (client *lockRESTClient) IsOnline() bool { - return atomic.LoadInt32(&client.connected) == 1 + return client.restClient.IsOnline() } // Close - marks the client as closed. func (client *lockRESTClient) Close() error { - atomic.StoreInt32(&client.connected, 0) client.restClient.Close() return nil } @@ -173,9 +154,9 @@ func newlockRESTClient(endpoint Endpoint) *lockRESTClient { trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout) restClient, err := rest.NewClient(serverURL, trFn, newAuthToken) if err != nil { - logger.LogIf(GlobalContext, err) - return &lockRESTClient{endpoint: endpoint, restClient: restClient, connected: 0} + logger.Fatal(err, "Unable to create lock rest client") } + restClient.HealthCheckPath = "/" - return &lockRESTClient{endpoint: endpoint, restClient: restClient, connected: 1} + return &lockRESTClient{endpoint: endpoint, restClient: restClient} } diff --git a/cmd/lock-rest-client_test.go b/cmd/lock-rest-client_test.go index b7b4a746d..d05659ccd 100644 --- a/cmd/lock-rest-client_test.go +++ b/cmd/lock-rest-client_test.go @@ -29,7 +29,7 @@ func TestLockRESTlient(t *testing.T) { t.Fatalf("unexpected error %v", err) } lkClient := newlockRESTClient(endpoint) - if lkClient.connected == 0 { + if !lkClient.IsOnline() { t.Fatalf("unexpected error. connection failed") } diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 2e556cae9..665745f2f 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -44,12 +44,6 @@ import ( type peerRESTClient struct { host *xnet.Host restClient *rest.Client - connected int32 -} - -// Reconnect to a peer rest server. -func (client *peerRESTClient) reConnect() { - atomic.StoreInt32(&client.connected, 1) } // Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected @@ -63,10 +57,6 @@ func (client *peerRESTClient) call(method string, values url.Values, body io.Rea // permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() // after verifying format.json func (client *peerRESTClient) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { - if !client.IsOnline() { - client.reConnect() - } - if values == nil { values = make(url.Values) } @@ -76,10 +66,6 @@ func (client *peerRESTClient) callWithContext(ctx context.Context, method string return respBody, nil } - if isNetworkError(err) { - atomic.StoreInt32(&client.connected, 0) - } - return nil, err } @@ -88,14 +74,8 @@ func (client *peerRESTClient) String() string { return client.host.String() } -// IsOnline - returns whether RPC client failed to connect or not. -func (client *peerRESTClient) IsOnline() bool { - return atomic.LoadInt32(&client.connected) == 1 -} - // Close - marks the client as closed. func (client *peerRESTClient) Close() error { - atomic.StoreInt32(&client.connected, 0) client.restClient.Close() return nil } @@ -901,6 +881,7 @@ func newPeerRESTClient(peer *xnet.Host) (*peerRESTClient, error) { if err != nil { return nil, err } + restClient.HealthCheckPath = peerRESTMethodGetLocalDiskIDs - return &peerRESTClient{host: peer, restClient: restClient, connected: 1}, nil + return &peerRESTClient{host: peer, restClient: restClient}, nil } diff --git a/cmd/rest/client.go b/cmd/rest/client.go index 57f995818..528f4f3e2 100644 --- a/cmd/rest/client.go +++ b/cmd/rest/client.go @@ -23,6 +23,7 @@ import ( "io/ioutil" "net/http" "net/url" + "sync/atomic" "time" xhttp "github.com/minio/minio/cmd/http" @@ -31,6 +32,12 @@ import ( // DefaultRESTTimeout - default RPC timeout is one minute. const DefaultRESTTimeout = 1 * time.Minute +const ( + offline = iota + online + closed +) + // NetworkError - error type in case of errors related to http/transport // for ex. connection refused, connection reset, dns resolution failure etc. // All errors returned by storage-rest-server (ex errFileNotFound, errDiskNotFound) are not considered to be network errors. @@ -42,12 +49,34 @@ func (n *NetworkError) Error() string { return n.Err.Error() } +// Unwrap returns the error wrapped in NetworkError. +func (n *NetworkError) Unwrap() error { + return n.Err +} + // Client - http based RPC client. type Client struct { + // HealthCheckPath is the path to test for health. + // If left empty the client will not keep track of health. + // Calling this can return any http status code/contents. + HealthCheckPath string + + // HealthCheckInterval will be the duration between re-connection attempts + // when a call has failed with a network error. + HealthCheckInterval time.Duration + + // HealthCheckTimeout determines timeout for each call. + HealthCheckTimeout time.Duration + + // MaxErrResponseSize is the maximum expected response size. + // Should only be modified before any calls are made. + MaxErrResponseSize int64 + httpClient *http.Client httpIdleConnsCloser func() url *url.URL newAuthToken func(audience string) string + connected int32 } // URL query separator constants @@ -57,6 +86,9 @@ const ( // CallWithContext - make a REST call with context. func (c *Client) CallWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (reply io.ReadCloser, err error) { + if !c.IsOnline() { + return nil, &NetworkError{Err: errors.New("remote server offline")} + } req, err := http.NewRequest(http.MethodPost, c.url.String()+method+querySep+values.Encode(), body) if err != nil { return nil, &NetworkError{err} @@ -69,7 +101,11 @@ func (c *Client) CallWithContext(ctx context.Context, method string, values url. } resp, err := c.httpClient.Do(req) if err != nil { - c.httpClient.CloseIdleConnections() + // A canceled context doesn't always mean a network problem. + if !errors.Is(err, context.Canceled) { + // We are safe from recursion + c.MarkOffline() + } return nil, &NetworkError{err} } @@ -82,7 +118,7 @@ func (c *Client) CallWithContext(ctx context.Context, method string, values url. if resp.StatusCode != http.StatusOK { defer xhttp.DrainBody(resp.Body) // Limit the ReadAll(), just in case, because of a bug, the server responds with large data. - b, err := ioutil.ReadAll(io.LimitReader(resp.Body, 4096)) + b, err := ioutil.ReadAll(io.LimitReader(resp.Body, c.MaxErrResponseSize)) if err != nil { return nil, err } @@ -102,6 +138,7 @@ func (c *Client) Call(method string, values url.Values, body io.Reader, length i // Close closes all idle connections of the underlying http client func (c *Client) Close() { + atomic.StoreInt32(&c.connected, closed) if c.httpIdleConnsCloser != nil { c.httpIdleConnsCloser() } @@ -117,5 +154,46 @@ func NewClient(url *url.URL, newCustomTransport func() *http.Transport, newAuthT httpIdleConnsCloser: tr.CloseIdleConnections, url: url, newAuthToken: newAuthToken, + connected: online, + + MaxErrResponseSize: 4096, + HealthCheckPath: "", + HealthCheckInterval: 200 * time.Millisecond, + HealthCheckTimeout: time.Second, }, nil } + +// IsOnline returns whether the client is likely to be online. +func (c *Client) IsOnline() bool { + return atomic.LoadInt32(&c.connected) == online +} + +// MarkOffline - will mark a client as being offline and spawns +// a goroutine that will attempt to reconnect if a HealthCheckPath is set. +func (c *Client) MarkOffline() { + // Start goroutine that will attempt to reconnect. + // If server is already trying to reconnect this will have no effect. + if len(c.HealthCheckPath) > 0 && atomic.CompareAndSwapInt32(&c.connected, online, offline) { + if c.httpIdleConnsCloser != nil { + c.httpIdleConnsCloser() + } + go func() { + ticker := time.NewTicker(c.HealthCheckInterval) + defer ticker.Stop() + for range ticker.C { + if status := atomic.LoadInt32(&c.connected); status == closed { + return + } + ctx, cancel := context.WithTimeout(context.Background(), c.HealthCheckTimeout) + respBody, err := c.CallWithContext(ctx, c.HealthCheckPath, nil, nil, -1) + xhttp.DrainBody(respBody) + cancel() + var ne *NetworkError + if !errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &ne) { + atomic.CompareAndSwapInt32(&c.connected, offline, online) + return + } + } + }() + } +} diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index c683d35bf..dec69ef25 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -28,7 +28,6 @@ import ( "path" "strconv" "strings" - "sync/atomic" "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" @@ -109,7 +108,6 @@ func toStorageErr(err error) error { type storageRESTClient struct { endpoint Endpoint restClient *rest.Client - connected int32 diskID string } @@ -130,9 +128,6 @@ func (client *storageRESTClient) call(method string, values url.Values, body io. } err = toStorageErr(err) - if err == errDiskNotFound { - atomic.StoreInt32(&client.connected, 0) - } return nil, err } @@ -144,7 +139,7 @@ func (client *storageRESTClient) String() string { // IsOnline - returns whether RPC client failed to connect or not. func (client *storageRESTClient) IsOnline() bool { - return atomic.LoadInt32(&client.connected) == 1 + return client.restClient.IsOnline() } func (client *storageRESTClient) IsLocal() bool { @@ -635,7 +630,6 @@ func (client *storageRESTClient) VerifyFile(volume, path string, fi FileInfo) er // Close - marks the client as closed. func (client *storageRESTClient) Close() error { - atomic.StoreInt32(&client.connected, 0) client.restClient.Close() return nil } @@ -660,8 +654,8 @@ func newStorageRESTClient(endpoint Endpoint) *storageRESTClient { trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout) restClient, err := rest.NewClient(serverURL, trFn, newAuthToken) if err != nil { - logger.LogIf(GlobalContext, err) - return &storageRESTClient{endpoint: endpoint, restClient: restClient, connected: 0} + logger.Fatal(err, "Unable to initialize remote REST disks") } - return &storageRESTClient{endpoint: endpoint, restClient: restClient, connected: 1} + restClient.HealthCheckPath = "/" + return &storageRESTClient{endpoint: endpoint, restClient: restClient} }