Drain response body properly for http connection pool (#6415)

Currently Go http connection pool was not being properly
utilized leading to degrading performance as the number
of concurrent requests increased.

As recommended by Go implementation, we have to drain the
response body and close it.
master
Harshavardhana 6 years ago committed by GitHub
parent 1961f2ef54
commit fd1b8491db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      cmd/gateway/oss/gateway-oss.go
  2. 14
      cmd/gateway/sia/gateway-sia.go
  3. 4
      cmd/logger/http.go
  4. 26
      cmd/rpc/client.go
  5. 7
      cmd/rpc/pool.go
  6. 4
      cmd/update-main.go
  7. 31
      cmd/utils.go
  8. 3
      pkg/event/target/webhook.go
  9. 2
      pkg/madmin/config-commands.go

@ -21,7 +21,6 @@ import (
"encoding/xml" "encoding/xml"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
@ -816,11 +815,8 @@ func ossListObjectParts(client *oss.Client, bucket, object, uploadID string, par
return lupr, err return lupr, err
} }
defer func() { // always drain output (response body)
// always drain output (response body) defer minio.CloseResponse(resp.Body)
io.CopyN(ioutil.Discard, resp.Body, 512)
resp.Body.Close()
}()
err = xml.NewDecoder(resp.Body).Decode(&lupr) err = xml.NewDecoder(resp.Body).Decode(&lupr)
if err != nil { if err != nil {

@ -233,13 +233,13 @@ func apiGet(ctx context.Context, addr, call, apiPassword string) (*http.Response
return nil, err return nil, err
} }
if resp.StatusCode == http.StatusNotFound { if resp.StatusCode == http.StatusNotFound {
resp.Body.Close() minio.CloseResponse(resp.Body)
logger.LogIf(ctx, MethodNotSupported{call}) logger.LogIf(ctx, MethodNotSupported{call})
return nil, MethodNotSupported{call} return nil, MethodNotSupported{call}
} }
if non2xx(resp.StatusCode) { if non2xx(resp.StatusCode) {
err := decodeError(resp) err := decodeError(resp)
resp.Body.Close() minio.CloseResponse(resp.Body)
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
return nil, err return nil, err
} }
@ -266,13 +266,13 @@ func apiPost(ctx context.Context, addr, call, vals, apiPassword string) (*http.R
} }
if resp.StatusCode == http.StatusNotFound { if resp.StatusCode == http.StatusNotFound {
resp.Body.Close() minio.CloseResponse(resp.Body)
return nil, MethodNotSupported{call} return nil, MethodNotSupported{call}
} }
if non2xx(resp.StatusCode) { if non2xx(resp.StatusCode) {
err := decodeError(resp) err := decodeError(resp)
resp.Body.Close() minio.CloseResponse(resp.Body)
return nil, err return nil, err
} }
return resp, nil return resp, nil
@ -285,7 +285,7 @@ func post(ctx context.Context, addr, call, vals, apiPassword string) error {
if err != nil { if err != nil {
return err return err
} }
resp.Body.Close() minio.CloseResponse(resp.Body)
return nil return nil
} }
@ -295,7 +295,7 @@ func list(ctx context.Context, addr string, apiPassword string, obj *renterFiles
if err != nil { if err != nil {
return err return err
} }
defer resp.Body.Close() defer minio.CloseResponse(resp.Body)
if resp.StatusCode == http.StatusNoContent { if resp.StatusCode == http.StatusNoContent {
logger.LogIf(ctx, fmt.Errorf("Expecting a response, but API returned %s", resp.Status)) logger.LogIf(ctx, fmt.Errorf("Expecting a response, but API returned %s", resp.Status))
@ -313,7 +313,7 @@ func get(ctx context.Context, addr, call, apiPassword string) error {
if err != nil { if err != nil {
return err return err
} }
resp.Body.Close() minio.CloseResponse(resp.Body)
return nil return nil
} }

@ -20,6 +20,8 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"io"
"io/ioutil"
"net/http" "net/http"
) )
@ -54,6 +56,8 @@ func (h *HTTPTarget) startHTTPLogger() {
continue continue
} }
if resp.Body != nil { if resp.Body != nil {
buf := make([]byte, 512)
io.CopyBuffer(ioutil.Discard, resp.Body, buf)
resp.Body.Close() resp.Body.Close()
} }
} }

@ -22,6 +22,8 @@ import (
"encoding/gob" "encoding/gob"
"errors" "errors"
"fmt" "fmt"
"io"
"io/ioutil"
"net" "net"
"net/http" "net/http"
"reflect" "reflect"
@ -40,6 +42,28 @@ type Client struct {
serviceURL *xnet.URL serviceURL *xnet.URL
} }
// closeResponse close non nil response with any response Body.
// convenient wrapper to drain any remaining data on response body.
//
// Subsequently this allows golang http RoundTripper
// to re-use the same connection for future requests.
func closeResponse(body io.ReadCloser) {
// Callers should close resp.Body when done reading from it.
// If resp.Body is not closed, the Client's underlying RoundTripper
// (typically Transport) may not be able to re-use a persistent TCP
// connection to the server for a subsequent "keep-alive" request.
if body != nil {
// Drain any remaining Body and then close the connection.
// Without this closing connection would disallow re-using
// the same connection for future uses.
// - http://stackoverflow.com/a/17961593/4465767
bufp := b512pool.Get().(*[]byte)
defer b512pool.Put(bufp)
io.CopyBuffer(ioutil.Discard, body, *bufp)
body.Close()
}
}
// Call - calls service method on RPC server. // Call - calls service method on RPC server.
func (client *Client) Call(serviceMethod string, args, reply interface{}) error { func (client *Client) Call(serviceMethod string, args, reply interface{}) error {
replyKind := reflect.TypeOf(reply).Kind() replyKind := reflect.TypeOf(reply).Kind()
@ -69,7 +93,7 @@ func (client *Client) Call(serviceMethod string, args, reply interface{}) error
if err != nil { if err != nil {
return err return err
} }
defer response.Body.Close() defer closeResponse(response.Body)
if response.StatusCode != http.StatusOK { if response.StatusCode != http.StatusOK {
return fmt.Errorf("%v rpc call failed with error code %v", serviceMethod, response.StatusCode) return fmt.Errorf("%v rpc call failed with error code %v", serviceMethod, response.StatusCode)

@ -21,6 +21,13 @@ import (
"sync" "sync"
) )
var b512pool = sync.Pool{
New: func() interface{} {
buf := make([]byte, 512)
return &buf
},
}
// A Pool is a type-safe wrapper around a sync.Pool. // A Pool is a type-safe wrapper around a sync.Pool.
type Pool struct { type Pool struct {
p *sync.Pool p *sync.Pool

@ -324,7 +324,7 @@ func downloadReleaseURL(releaseChecksumURL string, timeout time.Duration, mode s
if resp == nil { if resp == nil {
return content, fmt.Errorf("No response from server to download URL %s", releaseChecksumURL) return content, fmt.Errorf("No response from server to download URL %s", releaseChecksumURL)
} }
defer resp.Body.Close() defer CloseResponse(resp.Body)
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
return content, fmt.Errorf("Error downloading URL %s. Response: %v", releaseChecksumURL, resp.Status) return content, fmt.Errorf("Error downloading URL %s. Response: %v", releaseChecksumURL, resp.Status)
@ -471,7 +471,7 @@ func doUpdate(sha256Hex string, latestReleaseTime time.Time, ok bool) (updateSta
if err != nil { if err != nil {
return updateStatusMsg, err return updateStatusMsg, err
} }
defer resp.Body.Close() defer CloseResponse(resp.Body)
// FIXME: add support for gpg verification as well. // FIXME: add support for gpg verification as well.
if err = update.Apply(resp.Body, if err = update.Apply(resp.Body,

@ -25,12 +25,14 @@ import (
"encoding/xml" "encoding/xml"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
"reflect" "reflect"
"strings" "strings"
"sync"
"time" "time"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
@ -381,3 +383,32 @@ func isNetworkOrHostDown(err error) bool {
} }
return false return false
} }
var b512pool = sync.Pool{
New: func() interface{} {
buf := make([]byte, 512)
return &buf
},
}
// CloseResponse close non nil response with any response Body.
// convenient wrapper to drain any remaining data on response body.
//
// Subsequently this allows golang http RoundTripper
// to re-use the same connection for future requests.
func CloseResponse(respBody io.ReadCloser) {
// Callers should close resp.Body when done reading from it.
// If resp.Body is not closed, the Client's underlying RoundTripper
// (typically Transport) may not be able to re-use a persistent TCP
// connection to the server for a subsequent "keep-alive" request.
if respBody != nil {
// Drain any remaining Body and then close the connection.
// Without this closing connection would disallow re-using
// the same connection for future uses.
// - http://stackoverflow.com/a/17961593/4465767
bufp := b512pool.Get().(*[]byte)
defer b512pool.Put(bufp)
io.CopyBuffer(ioutil.Discard, respBody, *bufp)
respBody.Close()
}
}

@ -23,6 +23,8 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io"
"io/ioutil"
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
@ -89,6 +91,7 @@ func (target *WebhookTarget) Send(eventData event.Event) error {
} }
// FIXME: log returned error. ignore time being. // FIXME: log returned error. ignore time being.
io.Copy(ioutil.Discard, resp.Body)
_ = resp.Body.Close() _ = resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 { if resp.StatusCode < 200 || resp.StatusCode > 299 {

@ -85,7 +85,7 @@ func (adm *AdminClient) GetConfig() ([]byte, error) {
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
return nil, httpRespToErrorResponse(resp) return nil, httpRespToErrorResponse(resp)
} }
defer resp.Body.Close() defer closeResponse(resp)
return DecryptServerConfigData(adm.secretAccessKey, resp.Body) return DecryptServerConfigData(adm.secretAccessKey, resp.Body)
} }

Loading…
Cancel
Save