From 79b6a43467b9ae5d26afe5d1e97207d9245f5b4e Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 12 Feb 2021 18:17:52 -0800 Subject: [PATCH] fix: avoid timed value for network calls (#11531) additionally simply timedValue to have RWMutex to avoid concurrent calls to DiskInfo() getting serialized, this has an effect on all calls that use GetDiskInfo() on the same disks. Such as getOnlineDisks, getOnlineDisksWithoutHealing --- cmd/config-current.go | 11 ++++- cmd/storage-rest-client.go | 36 ++++++----------- cmd/utils.go | 83 +++++++++++++++++++++++++++++--------- 3 files changed, 86 insertions(+), 44 deletions(-) diff --git a/cmd/config-current.go b/cmd/config-current.go index 71b41253a..211dfeee1 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -18,6 +18,7 @@ package cmd import ( "context" + "crypto/tls" "fmt" "strings" "sync" @@ -295,7 +296,10 @@ func validateConfig(s config.Config, setDriveCounts []int) error { } } { - kmsCfg, err := crypto.LookupConfig(s, globalCertsCADir.Get(), NewGatewayHTTPTransport()) + kmsCfg, err := crypto.LookupConfig(s, globalCertsCADir.Get(), newCustomHTTPTransportWithHTTP2( + &tls.Config{ + RootCAs: globalRootCAs, + }, defaultDialTimeout)()) if err != nil { return err } @@ -471,7 +475,10 @@ func lookupConfigs(s config.Config, setDriveCounts []int) { } } - kmsCfg, err := crypto.LookupConfig(s, globalCertsCADir.Get(), NewGatewayHTTPTransport()) + kmsCfg, err := crypto.LookupConfig(s, globalCertsCADir.Get(), newCustomHTTPTransportWithHTTP2( + &tls.Config{ + RootCAs: globalRootCAs, + }, defaultDialTimeout)()) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to setup KMS config: %w", err)) } diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 37bb279ce..dcb1eca78 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -29,7 +29,6 @@ import ( "strconv" "strings" "sync" - "time" "github.com/minio/minio/cmd/http" xhttp "github.com/minio/minio/cmd/http" @@ -120,8 +119,6 @@ type storageRESTClient struct { endpoint Endpoint restClient *rest.Client diskID string - - diskInfoCache timedValue } // Wrapper to restClient.Call to handle network errors, in case of network error the connection is makred disconnected @@ -218,27 +215,18 @@ func (client *storageRESTClient) SetDiskID(id string) { // DiskInfo - fetch disk information for a remote disk. func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, err error) { - client.diskInfoCache.Once.Do(func() { - client.diskInfoCache.TTL = time.Second - client.diskInfoCache.Update = func() (interface{}, error) { - var info DiskInfo - respBody, err := client.call(ctx, storageRESTMethodDiskInfo, nil, nil, -1) - if err != nil { - return info, err - } - defer http.DrainBody(respBody) - if err = msgp.Decode(respBody, &info); err != nil { - return info, err - } - if info.Error != "" { - return info, toStorageErr(errors.New(info.Error)) - } - return info, nil - } - }) - v, err := client.diskInfoCache.Get() - info = v.(DiskInfo) - return info, err + respBody, err := client.call(ctx, storageRESTMethodDiskInfo, nil, nil, -1) + if err != nil { + return info, err + } + defer http.DrainBody(respBody) + if err = msgp.Decode(respBody, &info); err != nil { + return info, err + } + if info.Error != "" { + return info, toStorageErr(errors.New(info.Error)) + } + return info, nil } // MakeVolBulk - create multiple volumes in a bulk operation. diff --git a/cmd/utils.go b/cmd/utils.go index 96c943ee9..d8499d931 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -45,6 +45,7 @@ import ( "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/handlers" "github.com/minio/minio/pkg/madmin" + "golang.org/x/net/http2" ) const ( @@ -523,6 +524,45 @@ func newCustomHTTPProxyTransport(tlsConfig *tls.Config, dialTimeout time.Duratio } } +func newCustomHTTPTransportWithHTTP2(tlsConfig *tls.Config, dialTimeout time.Duration) func() *http.Transport { + // For more details about various values used here refer + // https://golang.org/pkg/net/http/#Transport documentation + tr := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: xhttp.DialContextWithDNSCache(globalDNSCache, xhttp.NewInternodeDialContext(dialTimeout)), + MaxIdleConnsPerHost: 1024, + IdleConnTimeout: 15 * time.Second, + ResponseHeaderTimeout: 1 * time.Minute, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 10 * time.Second, + TLSClientConfig: tlsConfig, + // Go net/http automatically unzip if content-type is + // gzip disable this feature, as we are always interested + // in raw stream. + DisableCompression: true, + } + + if tlsConfig != nil { + trhttp2, _ := http2.ConfigureTransports(tr) + if trhttp2 != nil { + // ReadIdleTimeout is the timeout after which a health check using ping + // frame will be carried out if no frame is received on the + // connection. 5 minutes is sufficient time for any idle connection. + trhttp2.ReadIdleTimeout = 5 * time.Minute + // PingTimeout is the timeout after which the connection will be closed + // if a response to Ping is not received. + trhttp2.PingTimeout = dialTimeout + // DisableCompression, if true, prevents the Transport from + // requesting compression with an "Accept-Encoding: gzip" + trhttp2.DisableCompression = true + } + } + + return func() *http.Transport { + return tr + } +} + func newCustomHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration) func() *http.Transport { // For more details about various values used here refer // https://golang.org/pkg/net/http/#Transport documentation @@ -767,38 +807,45 @@ type timedValue struct { // Managed values. value interface{} lastUpdate time.Time - mu sync.Mutex + mu sync.RWMutex } // Get will return a cached value or fetch a new one. // If the Update function returns an error the value is forwarded as is and not cached. func (t *timedValue) Get() (interface{}, error) { - t.mu.Lock() - defer t.mu.Unlock() - if t.TTL <= 0 { - t.TTL = time.Second - } - if t.value != nil { - if time.Since(t.lastUpdate) < t.TTL { - v := t.value - return v, nil - } - t.value = nil + v := t.get() + if v != nil { + return v, nil } + v, err := t.Update() if err != nil { return v, err } - t.value = v - t.lastUpdate = time.Now() + + t.update(v) return v, nil } -// Invalidate the value in the cache. -func (t *timedValue) Invalidate() { +func (t *timedValue) get() (v interface{}) { + ttl := t.TTL + if ttl <= 0 { + ttl = time.Second + } + t.mu.RLock() + defer t.mu.RUnlock() + v = t.value + if time.Since(t.lastUpdate) < ttl { + return v + } + return nil +} + +func (t *timedValue) update(v interface{}) { t.mu.Lock() - t.value = nil - t.mu.Unlock() + defer t.mu.Unlock() + t.value = v + t.lastUpdate = time.Now() } // On MinIO a directory object is stored as a regular object with "__XLDIR__" suffix.