diff --git a/cmd/gateway/oss/gateway-oss.go b/cmd/gateway/oss/gateway-oss.go index 1423d6b7a..4a7c4c5a9 100644 --- a/cmd/gateway/oss/gateway-oss.go +++ b/cmd/gateway/oss/gateway-oss.go @@ -21,7 +21,6 @@ import ( "encoding/xml" "fmt" "io" - "io/ioutil" "net/http" "strconv" "strings" @@ -816,11 +815,8 @@ func ossListObjectParts(client *oss.Client, bucket, object, uploadID string, par return lupr, err } - defer func() { - // always drain output (response body) - io.CopyN(ioutil.Discard, resp.Body, 512) - resp.Body.Close() - }() + // always drain output (response body) + defer minio.CloseResponse(resp.Body) err = xml.NewDecoder(resp.Body).Decode(&lupr) if err != nil { diff --git a/cmd/gateway/sia/gateway-sia.go b/cmd/gateway/sia/gateway-sia.go index f2d2f49dd..8695b9b85 100644 --- a/cmd/gateway/sia/gateway-sia.go +++ b/cmd/gateway/sia/gateway-sia.go @@ -233,13 +233,13 @@ func apiGet(ctx context.Context, addr, call, apiPassword string) (*http.Response return nil, err } if resp.StatusCode == http.StatusNotFound { - resp.Body.Close() + minio.CloseResponse(resp.Body) logger.LogIf(ctx, MethodNotSupported{call}) return nil, MethodNotSupported{call} } if non2xx(resp.StatusCode) { err := decodeError(resp) - resp.Body.Close() + minio.CloseResponse(resp.Body) logger.LogIf(ctx, err) return nil, err } @@ -266,13 +266,13 @@ func apiPost(ctx context.Context, addr, call, vals, apiPassword string) (*http.R } if resp.StatusCode == http.StatusNotFound { - resp.Body.Close() + minio.CloseResponse(resp.Body) return nil, MethodNotSupported{call} } if non2xx(resp.StatusCode) { err := decodeError(resp) - resp.Body.Close() + minio.CloseResponse(resp.Body) return nil, err } return resp, nil @@ -285,7 +285,7 @@ func post(ctx context.Context, addr, call, vals, apiPassword string) error { if err != nil { return err } - resp.Body.Close() + minio.CloseResponse(resp.Body) return nil } @@ -295,7 +295,7 @@ func list(ctx context.Context, addr string, apiPassword string, obj *renterFiles if err != nil { return err } - defer resp.Body.Close() + defer minio.CloseResponse(resp.Body) if resp.StatusCode == http.StatusNoContent { logger.LogIf(ctx, fmt.Errorf("Expecting a response, but API returned %s", resp.Status)) @@ -313,7 +313,7 @@ func get(ctx context.Context, addr, call, apiPassword string) error { if err != nil { return err } - resp.Body.Close() + minio.CloseResponse(resp.Body) return nil } diff --git a/cmd/logger/http.go b/cmd/logger/http.go index d331eca36..5c5b9b72c 100644 --- a/cmd/logger/http.go +++ b/cmd/logger/http.go @@ -20,6 +20,8 @@ import ( "bytes" "encoding/json" "errors" + "io" + "io/ioutil" "net/http" ) @@ -54,6 +56,8 @@ func (h *HTTPTarget) startHTTPLogger() { continue } if resp.Body != nil { + buf := make([]byte, 512) + io.CopyBuffer(ioutil.Discard, resp.Body, buf) resp.Body.Close() } } diff --git a/cmd/rpc/client.go b/cmd/rpc/client.go index 44025986d..efa45ff69 100644 --- a/cmd/rpc/client.go +++ b/cmd/rpc/client.go @@ -22,6 +22,8 @@ import ( "encoding/gob" "errors" "fmt" + "io" + "io/ioutil" "net" "net/http" "reflect" @@ -40,6 +42,28 @@ type Client struct { serviceURL *xnet.URL } +// closeResponse close non nil response with any response Body. +// convenient wrapper to drain any remaining data on response body. +// +// Subsequently this allows golang http RoundTripper +// to re-use the same connection for future requests. +func closeResponse(body io.ReadCloser) { + // Callers should close resp.Body when done reading from it. + // If resp.Body is not closed, the Client's underlying RoundTripper + // (typically Transport) may not be able to re-use a persistent TCP + // connection to the server for a subsequent "keep-alive" request. + if body != nil { + // Drain any remaining Body and then close the connection. + // Without this closing connection would disallow re-using + // the same connection for future uses. + // - http://stackoverflow.com/a/17961593/4465767 + bufp := b512pool.Get().(*[]byte) + defer b512pool.Put(bufp) + io.CopyBuffer(ioutil.Discard, body, *bufp) + body.Close() + } +} + // Call - calls service method on RPC server. func (client *Client) Call(serviceMethod string, args, reply interface{}) error { replyKind := reflect.TypeOf(reply).Kind() @@ -69,7 +93,7 @@ func (client *Client) Call(serviceMethod string, args, reply interface{}) error if err != nil { return err } - defer response.Body.Close() + defer closeResponse(response.Body) if response.StatusCode != http.StatusOK { return fmt.Errorf("%v rpc call failed with error code %v", serviceMethod, response.StatusCode) diff --git a/cmd/rpc/pool.go b/cmd/rpc/pool.go index 84dcbf6e1..72f627316 100644 --- a/cmd/rpc/pool.go +++ b/cmd/rpc/pool.go @@ -21,6 +21,13 @@ import ( "sync" ) +var b512pool = sync.Pool{ + New: func() interface{} { + buf := make([]byte, 512) + return &buf + }, +} + // A Pool is a type-safe wrapper around a sync.Pool. type Pool struct { p *sync.Pool diff --git a/cmd/update-main.go b/cmd/update-main.go index dae3b87c7..4f31ce2f3 100644 --- a/cmd/update-main.go +++ b/cmd/update-main.go @@ -324,7 +324,7 @@ func downloadReleaseURL(releaseChecksumURL string, timeout time.Duration, mode s if resp == nil { return content, fmt.Errorf("No response from server to download URL %s", releaseChecksumURL) } - defer resp.Body.Close() + defer CloseResponse(resp.Body) if resp.StatusCode != http.StatusOK { return content, fmt.Errorf("Error downloading URL %s. Response: %v", releaseChecksumURL, resp.Status) @@ -471,7 +471,7 @@ func doUpdate(sha256Hex string, latestReleaseTime time.Time, ok bool) (updateSta if err != nil { return updateStatusMsg, err } - defer resp.Body.Close() + defer CloseResponse(resp.Body) // FIXME: add support for gpg verification as well. if err = update.Apply(resp.Body, diff --git a/cmd/utils.go b/cmd/utils.go index 904d00a91..17aa2b966 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -25,12 +25,14 @@ import ( "encoding/xml" "fmt" "io" + "io/ioutil" "net" "net/http" "net/url" "os" "reflect" "strings" + "sync" "time" "github.com/minio/minio/cmd/logger" @@ -381,3 +383,32 @@ func isNetworkOrHostDown(err error) bool { } return false } + +var b512pool = sync.Pool{ + New: func() interface{} { + buf := make([]byte, 512) + return &buf + }, +} + +// CloseResponse close non nil response with any response Body. +// convenient wrapper to drain any remaining data on response body. +// +// Subsequently this allows golang http RoundTripper +// to re-use the same connection for future requests. +func CloseResponse(respBody io.ReadCloser) { + // Callers should close resp.Body when done reading from it. + // If resp.Body is not closed, the Client's underlying RoundTripper + // (typically Transport) may not be able to re-use a persistent TCP + // connection to the server for a subsequent "keep-alive" request. + if respBody != nil { + // Drain any remaining Body and then close the connection. + // Without this closing connection would disallow re-using + // the same connection for future uses. + // - http://stackoverflow.com/a/17961593/4465767 + bufp := b512pool.Get().(*[]byte) + defer b512pool.Put(bufp) + io.CopyBuffer(ioutil.Discard, respBody, *bufp) + respBody.Close() + } +} diff --git a/pkg/event/target/webhook.go b/pkg/event/target/webhook.go index a3bb03877..9ca49f6de 100644 --- a/pkg/event/target/webhook.go +++ b/pkg/event/target/webhook.go @@ -23,6 +23,8 @@ import ( "encoding/json" "errors" "fmt" + "io" + "io/ioutil" "net" "net/http" "net/url" @@ -89,6 +91,7 @@ func (target *WebhookTarget) Send(eventData event.Event) error { } // FIXME: log returned error. ignore time being. + io.Copy(ioutil.Discard, resp.Body) _ = resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode > 299 { diff --git a/pkg/madmin/config-commands.go b/pkg/madmin/config-commands.go index 7719fb57c..429a7aaf0 100644 --- a/pkg/madmin/config-commands.go +++ b/pkg/madmin/config-commands.go @@ -85,7 +85,7 @@ func (adm *AdminClient) GetConfig() ([]byte, error) { if resp.StatusCode != http.StatusOK { return nil, httpRespToErrorResponse(resp) } - defer resp.Body.Close() + defer closeResponse(resp) return DecryptServerConfigData(adm.secretAccessKey, resp.Body) }