re-use remote transports in Peer,Storage,Locker clients (#10788)

use one transport for internode communication
master
Harshavardhana 4 years ago committed by GitHub
parent d8e07f2c41
commit 4c773f7068
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      cmd/bootstrap-peer-server.go
  2. 4
      cmd/endpoint.go
  3. 3
      cmd/globals.go
  4. 14
      cmd/lock-rest-client.go
  5. 1
      cmd/lock-rest-client_test.go
  6. 8
      cmd/logger/logger.go
  7. 9
      cmd/logger/logonce.go
  8. 15
      cmd/peer-rest-client.go
  9. 3
      cmd/rest/client.go
  10. 11
      cmd/server-main.go
  11. 14
      cmd/storage-rest-client.go
  12. 4
      cmd/test-utils_test.go

@ -18,7 +18,6 @@ package cmd
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
@ -224,16 +223,7 @@ func newBootstrapRESTClient(endpoint Endpoint) *bootstrapRESTClient {
Path: bootstrapRESTPath,
}
var tlsConfig *tls.Config
if globalIsSSL {
tlsConfig = &tls.Config{
ServerName: endpoint.Hostname(),
RootCAs: globalRootCAs,
}
}
trFn := newInternodeHTTPTransport(tlsConfig, rest.DefaultTimeout)
restClient := rest.NewClient(serverURL, trFn, newAuthToken)
restClient := rest.NewClient(serverURL, globalInternodeTransport, newAuthToken)
restClient.HealthCheckFn = nil
return &bootstrapRESTClient{endpoint: endpoint, restClient: restClient}

@ -840,7 +840,7 @@ func getOnlineProxyEndpointIdx() int {
}
// GetProxyEndpoints - get all endpoints that can be used to proxy list request.
func GetProxyEndpoints(endpointServerSets EndpointServerSets) ([]ProxyEndpoint, error) {
func GetProxyEndpoints(endpointServerSets EndpointServerSets) []ProxyEndpoint {
var proxyEps []ProxyEndpoint
proxyEpSet := set.NewStringSet()
@ -874,7 +874,7 @@ func GetProxyEndpoints(endpointServerSets EndpointServerSets) ([]ProxyEndpoint,
})
}
}
return proxyEps, nil
return proxyEps
}
func updateDomainIPs(endPoints set.StringSet) {

@ -18,6 +18,7 @@ package cmd
import (
"crypto/x509"
"net/http"
"os"
"time"
@ -275,6 +276,8 @@ var (
globalProxyEndpoints []ProxyEndpoint
globalInternodeTransport http.RoundTripper
globalDNSCache *xhttp.DNSCache
// Add new variable global values here.
)

@ -19,7 +19,6 @@ package cmd
import (
"bytes"
"context"
"crypto/tls"
"errors"
"io"
"net/url"
@ -152,22 +151,13 @@ func newlockRESTClient(endpoint Endpoint) *lockRESTClient {
Path: pathJoin(lockRESTPrefix, endpoint.Path, lockRESTVersion),
}
var tlsConfig *tls.Config
if globalIsSSL {
tlsConfig = &tls.Config{
ServerName: endpoint.Hostname(),
RootCAs: globalRootCAs,
}
}
trFn := newInternodeHTTPTransport(tlsConfig, rest.DefaultTimeout)
restClient := rest.NewClient(serverURL, trFn, newAuthToken)
restClient := rest.NewClient(serverURL, globalInternodeTransport, newAuthToken)
restClient.ExpectTimeouts = true
restClient.HealthCheckFn = func() bool {
ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout)
// Instantiate a new rest client for healthcheck
// to avoid recursive healthCheckFn()
respBody, err := rest.NewClient(serverURL, trFn, newAuthToken).Call(ctx, lockRESTMethodHealth, nil, nil, -1)
respBody, err := rest.NewClient(serverURL, globalInternodeTransport, newAuthToken).Call(ctx, lockRESTMethodHealth, nil, nil, -1)
xhttp.DrainBody(respBody)
cancel()
var ne *rest.NetworkError

@ -29,6 +29,7 @@ func TestLockRESTlient(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error %v", err)
}
lkClient := newlockRESTClient(endpoint)
if !lkClient.IsOnline() {
t.Fatalf("unexpected error. connection failed")

@ -300,14 +300,12 @@ func LogIf(ctx context.Context, err error, errKind ...interface{}) {
return
}
if errors.Is(err, context.Canceled) || errors.Is(err, http.ErrServerClosed) {
if errors.Is(err, context.Canceled) {
return
}
if e := errors.Unwrap(err); e != nil {
if e.Error() == "disk not found" {
return
}
if err.Error() == http.ErrServerClosed.Error() || err.Error() == "disk not found" {
return
}
logIf(ctx, err, errKind...)

@ -21,7 +21,6 @@ import (
"errors"
"net/http"
"sync"
"time"
)
@ -83,14 +82,12 @@ func LogOnceIf(ctx context.Context, err error, id interface{}, errKind ...interf
return
}
if errors.Is(err, context.Canceled) || errors.Is(err, http.ErrServerClosed) {
if errors.Is(err, context.Canceled) {
return
}
if e := errors.Unwrap(err); e != nil {
if e.Error() == "disk not found" {
return
}
if err.Error() == http.ErrServerClosed.Error() || err.Error() == "disk not found" {
return
}
logOnce.logOnceIf(ctx, err, id, errKind...)

@ -19,7 +19,6 @@ package cmd
import (
"bytes"
"context"
"crypto/tls"
"encoding/gob"
"errors"
"fmt"
@ -872,23 +871,13 @@ func newPeerRESTClient(peer *xnet.Host) *peerRESTClient {
Path: peerRESTPath,
}
var tlsConfig *tls.Config
if globalIsSSL {
tlsConfig = &tls.Config{
ServerName: peer.Name,
RootCAs: globalRootCAs,
}
}
trFn := newInternodeHTTPTransport(tlsConfig, rest.DefaultTimeout)
restClient := rest.NewClient(serverURL, trFn, newAuthToken)
restClient := rest.NewClient(serverURL, globalInternodeTransport, newAuthToken)
// Construct a new health function.
restClient.HealthCheckFn = func() bool {
ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout)
// Instantiate a new rest client for healthcheck
// to avoid recursive healthCheckFn()
respBody, err := rest.NewClient(serverURL, trFn, newAuthToken).Call(ctx, peerRESTMethodHealth, nil, nil, -1)
respBody, err := rest.NewClient(serverURL, globalInternodeTransport, newAuthToken).Call(ctx, peerRESTMethodHealth, nil, nil, -1)
xhttp.DrainBody(respBody)
cancel()
var ne *rest.NetworkError

@ -164,10 +164,9 @@ func (c *Client) Close() {
}
// NewClient - returns new REST client.
func NewClient(url *url.URL, newCustomTransport func() *http.Transport, newAuthToken func(aud string) string) *Client {
func NewClient(url *url.URL, tr http.RoundTripper, newAuthToken func(aud string) string) *Client {
// Transport is exactly same as Go default in https://golang.org/pkg/net/http/#RoundTripper
// except custom DialContext and TLSClientConfig.
tr := newCustomTransport()
return &Client{
httpClient: &http.Client{Transport: tr},
url: url,

@ -18,6 +18,7 @@ package cmd
import (
"context"
"crypto/tls"
"errors"
"fmt"
"net"
@ -31,6 +32,7 @@ import (
"github.com/minio/minio/cmd/config"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/cmd/rest"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/bucket/bandwidth"
"github.com/minio/minio/pkg/certs"
@ -137,6 +139,11 @@ func serverHandleCmdArgs(ctx *cli.Context) {
globalEndpoints, setupType, err = createServerEndpoints(globalCLIContext.Addr, serverCmdArgs(ctx)...)
logger.FatalIf(err, "Invalid command line arguments")
globalProxyEndpoints = GetProxyEndpoints(globalEndpoints)
globalInternodeTransport = newInternodeHTTPTransport(&tls.Config{
RootCAs: globalRootCAs,
}, rest.DefaultTimeout)()
// On macOS, if a process already listens on LOCALIPADDR:PORT, net.Listen() falls back
// to IPv6 address ie minio will start listening on IPv6 address whereas another
// (non-)minio process is listening on IPv4 of given port.
@ -396,10 +403,6 @@ func serverMain(ctx *cli.Context) {
// Initialize all sub-systems
newAllSubsystems()
var err error
globalProxyEndpoints, err = GetProxyEndpoints(globalEndpoints)
logger.FatalIf(err, "Invalid command line arguments")
globalMinioEndpoint = func() string {
host := globalMinioHost
if host == "" {

@ -19,7 +19,6 @@ package cmd
import (
"bytes"
"context"
"crypto/tls"
"encoding/gob"
"encoding/hex"
"errors"
@ -670,22 +669,13 @@ func newStorageRESTClient(endpoint Endpoint, healthcheck bool) *storageRESTClien
Path: path.Join(storageRESTPrefix, endpoint.Path, storageRESTVersion),
}
var tlsConfig *tls.Config
if globalIsSSL {
tlsConfig = &tls.Config{
ServerName: endpoint.Hostname(),
RootCAs: globalRootCAs,
}
}
trFn := newInternodeHTTPTransport(tlsConfig, rest.DefaultTimeout)
restClient := rest.NewClient(serverURL, trFn, newAuthToken)
restClient := rest.NewClient(serverURL, globalInternodeTransport, newAuthToken)
if healthcheck {
restClient.HealthCheckFn = func() bool {
ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout)
// Instantiate a new rest client for healthcheck
// to avoid recursive healthCheckFn()
respBody, err := rest.NewClient(serverURL, trFn, newAuthToken).Call(ctx, storageRESTMethodHealth, nil, nil, -1)
respBody, err := rest.NewClient(serverURL, globalInternodeTransport, newAuthToken).Call(ctx, storageRESTMethodHealth, nil, nil, -1)
xhttp.DrainBody(respBody)
cancel()
return !errors.Is(err, context.DeadlineExceeded) && toStorageErr(err) != errDiskNotFound

@ -61,6 +61,7 @@ import (
"github.com/minio/minio/cmd/crypto"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/cmd/rest"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/bucket/policy"
"github.com/minio/minio/pkg/hash"
@ -69,6 +70,7 @@ import (
// TestMain to set up global env.
func TestMain(m *testing.M) {
flag.Parse()
globalActiveCred = auth.Credentials{
AccessKey: auth.DefaultAccessKey,
SecretKey: auth.DefaultSecretKey,
@ -107,6 +109,8 @@ func TestMain(m *testing.M) {
globalDNSCache = xhttp.NewDNSCache(3*time.Second, 10*time.Second)
globalInternodeTransport = newInternodeHTTPTransport(nil, rest.DefaultTimeout)()
initHelp()
resetTestGlobals()

Loading…
Cancel
Save