Remove MaxConnsPerHost settings to avoid potential hangs (#10438)

MaxConnsPerHost can potentially hang a call without any
way to timeout, we do not need this setting for our proxy
and gateway implementations instead IdleConn settings are
good enough.

Also ensure to use NewRequestWithContext and make sure to
take the disks offline only for network errors.

Fixes #10304
master
Harshavardhana 4 years ago committed by GitHub
parent 96997d2b21
commit c13afd56e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      cmd/admin-handlers.go
  2. 4
      cmd/disk-cache-backend.go
  3. 5
      cmd/endpoint.go
  4. 2
      cmd/erasure-multipart.go
  5. 3
      cmd/erasure-object.go
  6. 14
      cmd/fs-v1-multipart.go
  7. 2
      cmd/fs-v1.go
  8. 2
      cmd/object-api-putobject_test.go
  9. 12
      cmd/prepare-storage.go
  10. 11
      cmd/rest/client.go
  11. 3
      cmd/utils.go
  12. 6
      pkg/event/target/webhook.go
  13. 9
      pkg/madmin/api.go

@ -1677,11 +1677,6 @@ func checkConnection(endpointStr string, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(GlobalContext, timeout) ctx, cancel := context.WithTimeout(GlobalContext, timeout)
defer cancel() defer cancel()
req, err := http.NewRequest(http.MethodHead, endpointStr, nil)
if err != nil {
return err
}
client := &http.Client{Transport: &http.Transport{ client := &http.Client{Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment, Proxy: http.ProxyFromEnvironment,
DialContext: xhttp.NewCustomDialContext(timeout), DialContext: xhttp.NewCustomDialContext(timeout),
@ -1696,11 +1691,15 @@ func checkConnection(endpointStr string, timeout time.Duration) error {
}} }}
defer client.CloseIdleConnections() defer client.CloseIdleConnections()
resp, err := client.Do(req.WithContext(ctx)) req, err := http.NewRequestWithContext(ctx, http.MethodHead, endpointStr, nil)
if err != nil {
return err
}
resp, err := client.Do(req)
if err != nil { if err != nil {
return err return err
} }
defer xhttp.DrainBody(resp.Body) defer xhttp.DrainBody(resp.Body)
resp.Body.Close()
return nil return nil
} }

@ -721,7 +721,7 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read
if actualSize != uint64(n) { if actualSize != uint64(n) {
removeAll(cachePath) removeAll(cachePath)
return IncompleteBody{} return IncompleteBody{Bucket: bucket, Object: object}
} }
return c.saveMetadata(ctx, bucket, object, metadata, n, nil, "", incHitsOnly) return c.saveMetadata(ctx, bucket, object, metadata, n, nil, "", incHitsOnly)
} }
@ -768,7 +768,7 @@ func (c *diskCache) putRange(ctx context.Context, bucket, object string, data io
} }
if actualSize != uint64(n) { if actualSize != uint64(n) {
removeAll(cachePath) removeAll(cachePath)
return IncompleteBody{} return IncompleteBody{Bucket: bucket, Object: object}
} }
return c.saveMetadata(ctx, bucket, object, metadata, int64(objSize), rs, cacheFile, false) return c.saveMetadata(ctx, bucket, object, metadata, int64(objSize), rs, cacheFile, false)
} }

@ -773,9 +773,8 @@ func GetProxyEndpoints(endpointZones EndpointZones) ([]ProxyEndpoint, error) {
tr := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout)() tr := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout)()
// Allow more requests to be in flight with higher response header timeout. // Allow more requests to be in flight with higher response header timeout.
tr.ResponseHeaderTimeout = 30 * time.Minute tr.ResponseHeaderTimeout = 30 * time.Minute
tr.MaxConnsPerHost = 256 tr.MaxIdleConns = 64
tr.MaxIdleConnsPerHost = 16 tr.MaxIdleConnsPerHost = 64
tr.MaxIdleConns = 256
proxyEps = append(proxyEps, ProxyEndpoint{ proxyEps = append(proxyEps, ProxyEndpoint{
Endpoint: endpoint, Endpoint: endpoint,

@ -427,7 +427,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
// Should return IncompleteBody{} error when reader has fewer bytes // Should return IncompleteBody{} error when reader has fewer bytes
// than specified in request header. // than specified in request header.
if n < data.Size() { if n < data.Size() {
return pi, IncompleteBody{} return pi, IncompleteBody{Bucket: bucket, Object: object}
} }
for i := range writers { for i := range writers {

@ -695,8 +695,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
// Should return IncompleteBody{} error when reader has fewer bytes // Should return IncompleteBody{} error when reader has fewer bytes
// than specified in request header. // than specified in request header.
if n < data.Size() { if n < data.Size() {
logger.LogIf(ctx, IncompleteBody{}, logger.Application) return ObjectInfo{}, IncompleteBody{Bucket: bucket, Object: object}
return ObjectInfo{}, IncompleteBody{}
} }
for i, w := range writers { for i, w := range writers {

@ -306,7 +306,7 @@ func (fs *FSObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
_, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile)) _, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile))
if err != nil { if err != nil {
if err == errFileNotFound || err == errFileAccessDenied { if err == errFileNotFound || err == errFileAccessDenied {
return pi, InvalidUploadID{UploadID: uploadID} return pi, InvalidUploadID{Bucket: bucket, Object: object, UploadID: uploadID}
} }
return pi, toObjectErr(err, bucket, object) return pi, toObjectErr(err, bucket, object)
} }
@ -332,7 +332,7 @@ func (fs *FSObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
// Should return IncompleteBody{} error when reader has fewer // Should return IncompleteBody{} error when reader has fewer
// bytes than specified in request header. // bytes than specified in request header.
if bytesWritten < data.Size() { if bytesWritten < data.Size() {
return pi, IncompleteBody{} return pi, IncompleteBody{Bucket: bucket, Object: object}
} }
etag := r.MD5CurrentHexString() etag := r.MD5CurrentHexString()
@ -346,7 +346,7 @@ func (fs *FSObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
// Make sure not to create parent directories if they don't exist - the upload might have been aborted. // Make sure not to create parent directories if they don't exist - the upload might have been aborted.
if err = fsSimpleRenameFile(ctx, tmpPartPath, partPath); err != nil { if err = fsSimpleRenameFile(ctx, tmpPartPath, partPath); err != nil {
if err == errFileNotFound || err == errFileAccessDenied { if err == errFileNotFound || err == errFileAccessDenied {
return pi, InvalidUploadID{UploadID: uploadID} return pi, InvalidUploadID{Bucket: bucket, Object: object, UploadID: uploadID}
} }
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
} }
@ -389,7 +389,7 @@ func (fs *FSObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploa
uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID) uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)
if _, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile)); err != nil { if _, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile)); err != nil {
if err == errFileNotFound || err == errFileAccessDenied { if err == errFileNotFound || err == errFileAccessDenied {
return minfo, InvalidUploadID{UploadID: uploadID} return minfo, InvalidUploadID{Bucket: bucket, Object: object, UploadID: uploadID}
} }
return minfo, toObjectErr(err, bucket, object) return minfo, toObjectErr(err, bucket, object)
} }
@ -435,7 +435,7 @@ func (fs *FSObjects) ListObjectParts(ctx context.Context, bucket, object, upload
uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID) uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)
if _, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile)); err != nil { if _, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile)); err != nil {
if err == errFileNotFound || err == errFileAccessDenied { if err == errFileNotFound || err == errFileAccessDenied {
return result, InvalidUploadID{UploadID: uploadID} return result, InvalidUploadID{Bucket: bucket, Object: object, UploadID: uploadID}
} }
return result, toObjectErr(err, bucket, object) return result, toObjectErr(err, bucket, object)
} }
@ -571,7 +571,7 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
_, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile)) _, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile))
if err != nil { if err != nil {
if err == errFileNotFound || err == errFileAccessDenied { if err == errFileNotFound || err == errFileAccessDenied {
return oi, InvalidUploadID{UploadID: uploadID} return oi, InvalidUploadID{Bucket: bucket, Object: object, UploadID: uploadID}
} }
return oi, toObjectErr(err, bucket, object) return oi, toObjectErr(err, bucket, object)
} }
@ -802,7 +802,7 @@ func (fs *FSObjects) AbortMultipartUpload(ctx context.Context, bucket, object, u
_, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile)) _, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile))
if err != nil { if err != nil {
if err == errFileNotFound || err == errFileAccessDenied { if err == errFileNotFound || err == errFileAccessDenied {
return InvalidUploadID{UploadID: uploadID} return InvalidUploadID{Bucket: bucket, Object: object, UploadID: uploadID}
} }
return toObjectErr(err, bucket, object) return toObjectErr(err, bucket, object)
} }

@ -1217,7 +1217,7 @@ func (fs *FSObjects) putObject(ctx context.Context, bucket string, object string
// Should return IncompleteBody{} error when reader has fewer // Should return IncompleteBody{} error when reader has fewer
// bytes than specified in request header. // bytes than specified in request header.
if bytesWritten < data.Size() { if bytesWritten < data.Size() {
return ObjectInfo{}, IncompleteBody{} return ObjectInfo{}, IncompleteBody{Bucket: bucket, Object: object}
} }
// Entire object was written to the temp location, now it's safe to rename it to the actual location. // Entire object was written to the temp location, now it's safe to rename it to the actual location.

@ -153,7 +153,7 @@ func testObjectAPIPutObject(obj ObjectLayer, instanceType string, t TestErrHandl
// Test case 27-29. // Test case 27-29.
// data with size different from the actual number of bytes available in the reader // data with size different from the actual number of bytes available in the reader
{bucket, object, data, nil, "", int64(len(data) - 1), getMD5Hash(data[:len(data)-1]), nil}, {bucket, object, data, nil, "", int64(len(data) - 1), getMD5Hash(data[:len(data)-1]), nil},
{bucket, object, nilBytes, nil, "", int64(len(nilBytes) + 1), getMD5Hash(nilBytes), IncompleteBody{}}, {bucket, object, nilBytes, nil, "", int64(len(nilBytes) + 1), getMD5Hash(nilBytes), IncompleteBody{Bucket: bucket, Object: object}},
{bucket, object, fiveMBBytes, nil, "", 0, getMD5Hash(fiveMBBytes), nil}, {bucket, object, fiveMBBytes, nil, "", 0, getMD5Hash(fiveMBBytes), nil},
// Test case 30 // Test case 30

@ -188,11 +188,6 @@ func IsServerResolvable(endpoint Endpoint) error {
} }
} }
req, err := http.NewRequest(http.MethodGet, serverURL.String(), nil)
if err != nil {
return err
}
httpClient := &http.Client{ httpClient := &http.Client{
Transport: Transport:
// For more details about various values used here refer // For more details about various values used here refer
@ -215,7 +210,12 @@ func IsServerResolvable(endpoint Endpoint) error {
ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second) ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second)
defer cancel() defer cancel()
resp, err := httpClient.Do(req.WithContext(ctx)) req, err := http.NewRequestWithContext(ctx, http.MethodGet, serverURL.String(), nil)
if err != nil {
return err
}
resp, err := httpClient.Do(req)
if err != nil { if err != nil {
return err return err
} }

@ -27,6 +27,7 @@ import (
"time" "time"
xhttp "github.com/minio/minio/cmd/http" xhttp "github.com/minio/minio/cmd/http"
xnet "github.com/minio/minio/pkg/net"
) )
// DefaultRESTTimeout - default RPC timeout is one minute. // DefaultRESTTimeout - default RPC timeout is one minute.
@ -100,11 +101,13 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod
if !c.IsOnline() { if !c.IsOnline() {
return nil, &NetworkError{Err: &url.Error{Op: method, URL: c.url.String(), Err: restError("remote server offline")}} return nil, &NetworkError{Err: &url.Error{Op: method, URL: c.url.String(), Err: restError("remote server offline")}}
} }
req, err := http.NewRequest(http.MethodPost, c.url.String()+method+querySep+values.Encode(), body) req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.url.String()+method+querySep+values.Encode(), body)
if err != nil { if err != nil {
if xnet.IsNetworkOrHostDown(err) {
c.MarkOffline()
}
return nil, &NetworkError{err} return nil, &NetworkError{err}
} }
req = req.WithContext(ctx)
req.Header.Set("Authorization", "Bearer "+c.newAuthToken(req.URL.Query().Encode())) req.Header.Set("Authorization", "Bearer "+c.newAuthToken(req.URL.Query().Encode()))
req.Header.Set("X-Minio-Time", time.Now().UTC().Format(time.RFC3339)) req.Header.Set("X-Minio-Time", time.Now().UTC().Format(time.RFC3339))
if length > 0 { if length > 0 {
@ -112,9 +115,7 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod
} }
resp, err := c.httpClient.Do(req) resp, err := c.httpClient.Do(req)
if err != nil { if err != nil {
// A canceled context doesn't always mean a network problem. if xnet.IsNetworkOrHostDown(err) || errors.Is(err, context.DeadlineExceeded) {
if !errors.Is(err, context.Canceled) {
// We are safe from recursion
c.MarkOffline() c.MarkOffline()
} }
return nil, &NetworkError{err} return nil, &NetworkError{err}

@ -524,9 +524,8 @@ func newGatewayHTTPTransport(timeout time.Duration) *http.Transport {
// Allow more requests to be in flight. // Allow more requests to be in flight.
tr.ResponseHeaderTimeout = timeout tr.ResponseHeaderTimeout = timeout
tr.MaxConnsPerHost = 256
tr.MaxIdleConnsPerHost = 16
tr.MaxIdleConns = 256 tr.MaxIdleConns = 256
tr.MaxIdleConnsPerHost = 16
return tr return tr
} }

@ -109,7 +109,7 @@ func (target *WebhookTarget) IsActive() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
req, err := http.NewRequest(http.MethodHead, target.args.Endpoint.String(), nil) req, err := http.NewRequestWithContext(ctx, http.MethodHead, target.args.Endpoint.String(), nil)
if err != nil { if err != nil {
if xnet.IsNetworkOrHostDown(err) { if xnet.IsNetworkOrHostDown(err) {
return false, errNotConnected return false, errNotConnected
@ -117,9 +117,9 @@ func (target *WebhookTarget) IsActive() (bool, error) {
return false, err return false, err
} }
resp, err := target.httpClient.Do(req.WithContext(ctx)) resp, err := target.httpClient.Do(req)
if err != nil { if err != nil {
if xnet.IsNetworkOrHostDown(err) || err == context.DeadlineExceeded { if xnet.IsNetworkOrHostDown(err) || errors.Is(err, context.DeadlineExceeded) {
return false, errNotConnected return false, errNotConnected
} }
return false, err return false, err

@ -356,14 +356,11 @@ func (adm AdminClient) executeMethod(ctx context.Context, method string, reqData
for range adm.newRetryTimer(retryCtx, reqRetry, DefaultRetryUnit, DefaultRetryCap, MaxJitter) { for range adm.newRetryTimer(retryCtx, reqRetry, DefaultRetryUnit, DefaultRetryCap, MaxJitter) {
// Instantiate a new request. // Instantiate a new request.
var req *http.Request var req *http.Request
req, err = adm.newRequest(method, reqData) req, err = adm.newRequest(ctx, method, reqData)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Add context to request
req = req.WithContext(ctx)
// Initiate the request. // Initiate the request.
res, err = adm.do(req) res, err = adm.do(req)
if err != nil { if err != nil {
@ -440,7 +437,7 @@ func (adm AdminClient) getSecretKey() string {
} }
// newRequest - instantiate a new HTTP request for a given method. // newRequest - instantiate a new HTTP request for a given method.
func (adm AdminClient) newRequest(method string, reqData requestData) (req *http.Request, err error) { func (adm AdminClient) newRequest(ctx context.Context, method string, reqData requestData) (req *http.Request, err error) {
// If no method is supplied default to 'POST'. // If no method is supplied default to 'POST'.
if method == "" { if method == "" {
method = "POST" method = "POST"
@ -456,7 +453,7 @@ func (adm AdminClient) newRequest(method string, reqData requestData) (req *http
} }
// Initialize a new HTTP request for the method. // Initialize a new HTTP request for the method.
req, err = http.NewRequest(method, targetURL.String(), nil) req, err = http.NewRequestWithContext(ctx, method, targetURL.String(), nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }

Loading…
Cancel
Save