From 4c773f7068fc7fd058c701f29b37cb2a3088e72f Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 2 Nov 2020 07:43:11 -0800 Subject: [PATCH] re-use remote transports in Peer,Storage,Locker clients (#10788) use one transport for internode communication --- cmd/bootstrap-peer-server.go | 12 +----------- cmd/endpoint.go | 4 ++-- cmd/globals.go | 3 +++ cmd/lock-rest-client.go | 14 ++------------ cmd/lock-rest-client_test.go | 1 + cmd/logger/logger.go | 8 +++----- cmd/logger/logonce.go | 9 +++------ cmd/peer-rest-client.go | 15 ++------------- cmd/rest/client.go | 3 +-- cmd/server-main.go | 11 +++++++---- cmd/storage-rest-client.go | 14 ++------------ cmd/test-utils_test.go | 4 ++++ 12 files changed, 31 insertions(+), 67 deletions(-) diff --git a/cmd/bootstrap-peer-server.go b/cmd/bootstrap-peer-server.go index fb29abdbb..3cb7df4a9 100644 --- a/cmd/bootstrap-peer-server.go +++ b/cmd/bootstrap-peer-server.go @@ -18,7 +18,6 @@ package cmd import ( "context" - "crypto/tls" "encoding/json" "fmt" "io" @@ -224,16 +223,7 @@ func newBootstrapRESTClient(endpoint Endpoint) *bootstrapRESTClient { Path: bootstrapRESTPath, } - var tlsConfig *tls.Config - if globalIsSSL { - tlsConfig = &tls.Config{ - ServerName: endpoint.Hostname(), - RootCAs: globalRootCAs, - } - } - - trFn := newInternodeHTTPTransport(tlsConfig, rest.DefaultTimeout) - restClient := rest.NewClient(serverURL, trFn, newAuthToken) + restClient := rest.NewClient(serverURL, globalInternodeTransport, newAuthToken) restClient.HealthCheckFn = nil return &bootstrapRESTClient{endpoint: endpoint, restClient: restClient} diff --git a/cmd/endpoint.go b/cmd/endpoint.go index 50e7325c8..a42aa6244 100644 --- a/cmd/endpoint.go +++ b/cmd/endpoint.go @@ -840,7 +840,7 @@ func getOnlineProxyEndpointIdx() int { } // GetProxyEndpoints - get all endpoints that can be used to proxy list request. -func GetProxyEndpoints(endpointServerSets EndpointServerSets) ([]ProxyEndpoint, error) { +func GetProxyEndpoints(endpointServerSets EndpointServerSets) []ProxyEndpoint { var proxyEps []ProxyEndpoint proxyEpSet := set.NewStringSet() @@ -874,7 +874,7 @@ func GetProxyEndpoints(endpointServerSets EndpointServerSets) ([]ProxyEndpoint, }) } } - return proxyEps, nil + return proxyEps } func updateDomainIPs(endPoints set.StringSet) { diff --git a/cmd/globals.go b/cmd/globals.go index b96c31071..f6d2ea64d 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -18,6 +18,7 @@ package cmd import ( "crypto/x509" + "net/http" "os" "time" @@ -275,6 +276,8 @@ var ( globalProxyEndpoints []ProxyEndpoint + globalInternodeTransport http.RoundTripper + globalDNSCache *xhttp.DNSCache // Add new variable global values here. ) diff --git a/cmd/lock-rest-client.go b/cmd/lock-rest-client.go index 3c8a4bd6e..24c116b38 100644 --- a/cmd/lock-rest-client.go +++ b/cmd/lock-rest-client.go @@ -19,7 +19,6 @@ package cmd import ( "bytes" "context" - "crypto/tls" "errors" "io" "net/url" @@ -152,22 +151,13 @@ func newlockRESTClient(endpoint Endpoint) *lockRESTClient { Path: pathJoin(lockRESTPrefix, endpoint.Path, lockRESTVersion), } - var tlsConfig *tls.Config - if globalIsSSL { - tlsConfig = &tls.Config{ - ServerName: endpoint.Hostname(), - RootCAs: globalRootCAs, - } - } - - trFn := newInternodeHTTPTransport(tlsConfig, rest.DefaultTimeout) - restClient := rest.NewClient(serverURL, trFn, newAuthToken) + restClient := rest.NewClient(serverURL, globalInternodeTransport, newAuthToken) restClient.ExpectTimeouts = true restClient.HealthCheckFn = func() bool { ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout) // Instantiate a new rest client for healthcheck // to avoid recursive healthCheckFn() - respBody, err := rest.NewClient(serverURL, trFn, newAuthToken).Call(ctx, lockRESTMethodHealth, nil, nil, -1) + respBody, err := rest.NewClient(serverURL, globalInternodeTransport, newAuthToken).Call(ctx, lockRESTMethodHealth, nil, nil, -1) xhttp.DrainBody(respBody) cancel() var ne *rest.NetworkError diff --git a/cmd/lock-rest-client_test.go b/cmd/lock-rest-client_test.go index cd2b7c6d5..187e32591 100644 --- a/cmd/lock-rest-client_test.go +++ b/cmd/lock-rest-client_test.go @@ -29,6 +29,7 @@ func TestLockRESTlient(t *testing.T) { if err != nil { t.Fatalf("unexpected error %v", err) } + lkClient := newlockRESTClient(endpoint) if !lkClient.IsOnline() { t.Fatalf("unexpected error. connection failed") diff --git a/cmd/logger/logger.go b/cmd/logger/logger.go index 1e2bf704c..b25f16b14 100644 --- a/cmd/logger/logger.go +++ b/cmd/logger/logger.go @@ -300,14 +300,12 @@ func LogIf(ctx context.Context, err error, errKind ...interface{}) { return } - if errors.Is(err, context.Canceled) || errors.Is(err, http.ErrServerClosed) { + if errors.Is(err, context.Canceled) { return } - if e := errors.Unwrap(err); e != nil { - if e.Error() == "disk not found" { - return - } + if err.Error() == http.ErrServerClosed.Error() || err.Error() == "disk not found" { + return } logIf(ctx, err, errKind...) diff --git a/cmd/logger/logonce.go b/cmd/logger/logonce.go index dbac4310c..3535ddaf8 100644 --- a/cmd/logger/logonce.go +++ b/cmd/logger/logonce.go @@ -21,7 +21,6 @@ import ( "errors" "net/http" "sync" - "time" ) @@ -83,14 +82,12 @@ func LogOnceIf(ctx context.Context, err error, id interface{}, errKind ...interf return } - if errors.Is(err, context.Canceled) || errors.Is(err, http.ErrServerClosed) { + if errors.Is(err, context.Canceled) { return } - if e := errors.Unwrap(err); e != nil { - if e.Error() == "disk not found" { - return - } + if err.Error() == http.ErrServerClosed.Error() || err.Error() == "disk not found" { + return } logOnce.logOnceIf(ctx, err, id, errKind...) diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 6bae9417d..136ef4b2d 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -19,7 +19,6 @@ package cmd import ( "bytes" "context" - "crypto/tls" "encoding/gob" "errors" "fmt" @@ -872,23 +871,13 @@ func newPeerRESTClient(peer *xnet.Host) *peerRESTClient { Path: peerRESTPath, } - var tlsConfig *tls.Config - if globalIsSSL { - tlsConfig = &tls.Config{ - ServerName: peer.Name, - RootCAs: globalRootCAs, - } - } - - trFn := newInternodeHTTPTransport(tlsConfig, rest.DefaultTimeout) - restClient := rest.NewClient(serverURL, trFn, newAuthToken) - + restClient := rest.NewClient(serverURL, globalInternodeTransport, newAuthToken) // Construct a new health function. restClient.HealthCheckFn = func() bool { ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout) // Instantiate a new rest client for healthcheck // to avoid recursive healthCheckFn() - respBody, err := rest.NewClient(serverURL, trFn, newAuthToken).Call(ctx, peerRESTMethodHealth, nil, nil, -1) + respBody, err := rest.NewClient(serverURL, globalInternodeTransport, newAuthToken).Call(ctx, peerRESTMethodHealth, nil, nil, -1) xhttp.DrainBody(respBody) cancel() var ne *rest.NetworkError diff --git a/cmd/rest/client.go b/cmd/rest/client.go index 8b9539837..0392520ce 100644 --- a/cmd/rest/client.go +++ b/cmd/rest/client.go @@ -164,10 +164,9 @@ func (c *Client) Close() { } // NewClient - returns new REST client. -func NewClient(url *url.URL, newCustomTransport func() *http.Transport, newAuthToken func(aud string) string) *Client { +func NewClient(url *url.URL, tr http.RoundTripper, newAuthToken func(aud string) string) *Client { // Transport is exactly same as Go default in https://golang.org/pkg/net/http/#RoundTripper // except custom DialContext and TLSClientConfig. - tr := newCustomTransport() return &Client{ httpClient: &http.Client{Transport: tr}, url: url, diff --git a/cmd/server-main.go b/cmd/server-main.go index e3cecbeea..d9b2eea64 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -18,6 +18,7 @@ package cmd import ( "context" + "crypto/tls" "errors" "fmt" "net" @@ -31,6 +32,7 @@ import ( "github.com/minio/minio/cmd/config" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/cmd/rest" "github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/bucket/bandwidth" "github.com/minio/minio/pkg/certs" @@ -137,6 +139,11 @@ func serverHandleCmdArgs(ctx *cli.Context) { globalEndpoints, setupType, err = createServerEndpoints(globalCLIContext.Addr, serverCmdArgs(ctx)...) logger.FatalIf(err, "Invalid command line arguments") + globalProxyEndpoints = GetProxyEndpoints(globalEndpoints) + globalInternodeTransport = newInternodeHTTPTransport(&tls.Config{ + RootCAs: globalRootCAs, + }, rest.DefaultTimeout)() + // On macOS, if a process already listens on LOCALIPADDR:PORT, net.Listen() falls back // to IPv6 address ie minio will start listening on IPv6 address whereas another // (non-)minio process is listening on IPv4 of given port. @@ -396,10 +403,6 @@ func serverMain(ctx *cli.Context) { // Initialize all sub-systems newAllSubsystems() - var err error - globalProxyEndpoints, err = GetProxyEndpoints(globalEndpoints) - logger.FatalIf(err, "Invalid command line arguments") - globalMinioEndpoint = func() string { host := globalMinioHost if host == "" { diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 0237b722b..8e20b3e8f 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -19,7 +19,6 @@ package cmd import ( "bytes" "context" - "crypto/tls" "encoding/gob" "encoding/hex" "errors" @@ -670,22 +669,13 @@ func newStorageRESTClient(endpoint Endpoint, healthcheck bool) *storageRESTClien Path: path.Join(storageRESTPrefix, endpoint.Path, storageRESTVersion), } - var tlsConfig *tls.Config - if globalIsSSL { - tlsConfig = &tls.Config{ - ServerName: endpoint.Hostname(), - RootCAs: globalRootCAs, - } - } - - trFn := newInternodeHTTPTransport(tlsConfig, rest.DefaultTimeout) - restClient := rest.NewClient(serverURL, trFn, newAuthToken) + restClient := rest.NewClient(serverURL, globalInternodeTransport, newAuthToken) if healthcheck { restClient.HealthCheckFn = func() bool { ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout) // Instantiate a new rest client for healthcheck // to avoid recursive healthCheckFn() - respBody, err := rest.NewClient(serverURL, trFn, newAuthToken).Call(ctx, storageRESTMethodHealth, nil, nil, -1) + respBody, err := rest.NewClient(serverURL, globalInternodeTransport, newAuthToken).Call(ctx, storageRESTMethodHealth, nil, nil, -1) xhttp.DrainBody(respBody) cancel() return !errors.Is(err, context.DeadlineExceeded) && toStorageErr(err) != errDiskNotFound diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index b73373ebd..dbbfcb87b 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -61,6 +61,7 @@ import ( "github.com/minio/minio/cmd/crypto" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/cmd/rest" "github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/bucket/policy" "github.com/minio/minio/pkg/hash" @@ -69,6 +70,7 @@ import ( // TestMain to set up global env. func TestMain(m *testing.M) { flag.Parse() + globalActiveCred = auth.Credentials{ AccessKey: auth.DefaultAccessKey, SecretKey: auth.DefaultSecretKey, @@ -107,6 +109,8 @@ func TestMain(m *testing.M) { globalDNSCache = xhttp.NewDNSCache(3*time.Second, 10*time.Second) + globalInternodeTransport = newInternodeHTTPTransport(nil, rest.DefaultTimeout)() + initHelp() resetTestGlobals()