diff --git a/cmd/config-current.go b/cmd/config-current.go index 69722feb4..35c4022f8 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -25,6 +25,7 @@ import ( "sync" "github.com/minio/minio/cmd/crypto" + xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/event" @@ -552,7 +553,7 @@ func (s *serverConfig) loadToCachedConfigs() { URL: s.Policy.OPA.URL, AuthToken: s.Policy.OPA.AuthToken, Transport: NewCustomHTTPTransport(), - CloseRespFn: CloseResponse, + CloseRespFn: xhttp.DrainBody, }) } } diff --git a/cmd/gateway/oss/gateway-oss.go b/cmd/gateway/oss/gateway-oss.go index db68a80f6..ba957433e 100644 --- a/cmd/gateway/oss/gateway-oss.go +++ b/cmd/gateway/oss/gateway-oss.go @@ -31,6 +31,7 @@ import ( "github.com/minio/cli" miniogopolicy "github.com/minio/minio-go/pkg/policy" minio "github.com/minio/minio/cmd" + xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/hash" @@ -844,7 +845,7 @@ func ossListObjectParts(client *oss.Client, bucket, object, uploadID string, par } // always drain output (response body) - defer minio.CloseResponse(resp.Body) + defer xhttp.DrainBody(resp.Body) err = xml.NewDecoder(resp.Body).Decode(&lupr) if err != nil { diff --git a/cmd/http/close.go b/cmd/http/close.go new file mode 100644 index 000000000..818e665ed --- /dev/null +++ b/cmd/http/close.go @@ -0,0 +1,52 @@ +/* + * Minio Cloud Storage, (C) 2019 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package http + +import ( + "io" + "io/ioutil" + "sync" +) + +var b512pool = sync.Pool{ + New: func() interface{} { + buf := make([]byte, 512) + return &buf + }, +} + +// DrainBody close non nil response with any response Body. +// convenient wrapper to drain any remaining data on response body. +// +// Subsequently this allows golang http RoundTripper +// to re-use the same connection for future requests. +func DrainBody(respBody io.ReadCloser) { + // Callers should close resp.Body when done reading from it. + // If resp.Body is not closed, the Client's underlying RoundTripper + // (typically Transport) may not be able to re-use a persistent TCP + // connection to the server for a subsequent "keep-alive" request. + if respBody != nil { + // Drain any remaining Body and then close the connection. + // Without this closing connection would disallow re-using + // the same connection for future uses. + // - http://stackoverflow.com/a/17961593/4465767 + bufp := b512pool.Get().(*[]byte) + defer b512pool.Put(bufp) + io.CopyBuffer(ioutil.Discard, respBody, *bufp) + respBody.Close() + } +} diff --git a/cmd/logger/target/http/http.go b/cmd/logger/target/http/http.go index 8b914674f..d169cf18c 100644 --- a/cmd/logger/target/http/http.go +++ b/cmd/logger/target/http/http.go @@ -1,5 +1,5 @@ /* - * Minio Cloud Storage, (C) 2018 Minio, Inc. + * Minio Cloud Storage, (C) 2018, 2019 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,9 +20,9 @@ import ( "bytes" "encoding/json" "errors" - "io" - "io/ioutil" gohttp "net/http" + + xhttp "github.com/minio/minio/cmd/http" ) // Target implements logger.Target and sends the json @@ -56,11 +56,9 @@ func (h *Target) startHTTPLogger() { if err != nil { continue } - if resp.Body != nil { - buf := make([]byte, 512) - io.CopyBuffer(ioutil.Discard, resp.Body, buf) - resp.Body.Close() - } + + // Drain any response. + xhttp.DrainBody(resp.Body) } }() } diff --git a/cmd/rest/client.go b/cmd/rest/client.go index 7770273b9..16ca2fc40 100644 --- a/cmd/rest/client.go +++ b/cmd/rest/client.go @@ -59,9 +59,9 @@ func (c *Client) Call(method string, values url.Values, body io.Reader, length i } if resp.StatusCode != http.StatusOK { + defer xhttp.DrainBody(resp.Body) // Limit the ReadAll(), just in case, because of a bug, the server responds with large data. - r := io.LimitReader(resp.Body, 1024) - b, err := ioutil.ReadAll(r) + b, err := ioutil.ReadAll(io.LimitReader(resp.Body, 4096)) if err != nil { return nil, err } diff --git a/cmd/rpc/client.go b/cmd/rpc/client.go index efa45ff69..a7a4101c8 100644 --- a/cmd/rpc/client.go +++ b/cmd/rpc/client.go @@ -22,8 +22,6 @@ import ( "encoding/gob" "errors" "fmt" - "io" - "io/ioutil" "net" "net/http" "reflect" @@ -42,28 +40,6 @@ type Client struct { serviceURL *xnet.URL } -// closeResponse close non nil response with any response Body. -// convenient wrapper to drain any remaining data on response body. -// -// Subsequently this allows golang http RoundTripper -// to re-use the same connection for future requests. -func closeResponse(body io.ReadCloser) { - // Callers should close resp.Body when done reading from it. - // If resp.Body is not closed, the Client's underlying RoundTripper - // (typically Transport) may not be able to re-use a persistent TCP - // connection to the server for a subsequent "keep-alive" request. - if body != nil { - // Drain any remaining Body and then close the connection. - // Without this closing connection would disallow re-using - // the same connection for future uses. - // - http://stackoverflow.com/a/17961593/4465767 - bufp := b512pool.Get().(*[]byte) - defer b512pool.Put(bufp) - io.CopyBuffer(ioutil.Discard, body, *bufp) - body.Close() - } -} - // Call - calls service method on RPC server. func (client *Client) Call(serviceMethod string, args, reply interface{}) error { replyKind := reflect.TypeOf(reply).Kind() @@ -93,7 +69,7 @@ func (client *Client) Call(serviceMethod string, args, reply interface{}) error if err != nil { return err } - defer closeResponse(response.Body) + defer xhttp.DrainBody(response.Body) if response.StatusCode != http.StatusOK { return fmt.Errorf("%v rpc call failed with error code %v", serviceMethod, response.StatusCode) diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 1184a58d5..9e1a3bf33 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -32,6 +32,7 @@ import ( "fmt" "strings" + "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/rest" xnet "github.com/minio/minio/pkg/net" @@ -164,7 +165,7 @@ func (client *storageRESTClient) DiskInfo() (info DiskInfo, err error) { if err != nil { return } - defer CloseResponse(respBody) + defer http.DrainBody(respBody) err = gob.NewDecoder(respBody).Decode(&info) return info, err } @@ -174,7 +175,7 @@ func (client *storageRESTClient) MakeVol(volume string) (err error) { values := make(url.Values) values.Set(storageRESTVolume, volume) respBody, err := client.call(storageRESTMethodMakeVol, values, nil, -1) - defer CloseResponse(respBody) + defer http.DrainBody(respBody) return err } @@ -184,7 +185,7 @@ func (client *storageRESTClient) ListVols() (volinfo []VolInfo, err error) { if err != nil { return } - defer CloseResponse(respBody) + defer http.DrainBody(respBody) err = gob.NewDecoder(respBody).Decode(&volinfo) return volinfo, err } @@ -197,7 +198,7 @@ func (client *storageRESTClient) StatVol(volume string) (volInfo VolInfo, err er if err != nil { return } - defer CloseResponse(respBody) + defer http.DrainBody(respBody) err = gob.NewDecoder(respBody).Decode(&volInfo) return volInfo, err } @@ -207,7 +208,7 @@ func (client *storageRESTClient) DeleteVol(volume string) (err error) { values := make(url.Values) values.Set(storageRESTVolume, volume) respBody, err := client.call(storageRESTMethodDeleteVol, values, nil, -1) - defer CloseResponse(respBody) + defer http.DrainBody(respBody) return err } @@ -218,7 +219,7 @@ func (client *storageRESTClient) AppendFile(volume, path string, buffer []byte) values.Set(storageRESTFilePath, path) reader := bytes.NewBuffer(buffer) respBody, err := client.call(storageRESTMethodAppendFile, values, reader, -1) - defer CloseResponse(respBody) + defer http.DrainBody(respBody) return err } @@ -228,7 +229,7 @@ func (client *storageRESTClient) CreateFile(volume, path string, length int64, r values.Set(storageRESTFilePath, path) values.Set(storageRESTLength, strconv.Itoa(int(length))) respBody, err := client.call(storageRESTMethodCreateFile, values, r, length) - defer CloseResponse(respBody) + defer http.DrainBody(respBody) return err } @@ -239,7 +240,7 @@ func (client *storageRESTClient) WriteAll(volume, path string, buffer []byte) er values.Set(storageRESTFilePath, path) reader := bytes.NewBuffer(buffer) respBody, err := client.call(storageRESTMethodWriteAll, values, reader, -1) - defer CloseResponse(respBody) + defer http.DrainBody(respBody) return err } @@ -252,7 +253,7 @@ func (client *storageRESTClient) StatFile(volume, path string) (info FileInfo, e if err != nil { return info, err } - defer CloseResponse(respBody) + defer http.DrainBody(respBody) err = gob.NewDecoder(respBody).Decode(&info) return info, err } @@ -266,7 +267,7 @@ func (client *storageRESTClient) ReadAll(volume, path string) ([]byte, error) { if err != nil { return nil, err } - defer CloseResponse(respBody) + defer http.DrainBody(respBody) return ioutil.ReadAll(respBody) } @@ -302,7 +303,7 @@ func (client *storageRESTClient) ReadFile(volume, path string, offset int64, buf if err != nil { return 0, err } - defer CloseResponse(respBody) + defer http.DrainBody(respBody) n, err := io.ReadFull(respBody, buffer) return int64(n), err } @@ -317,7 +318,7 @@ func (client *storageRESTClient) ListDir(volume, dirPath string, count int) (ent if err != nil { return nil, err } - defer CloseResponse(respBody) + defer http.DrainBody(respBody) err = gob.NewDecoder(respBody).Decode(&entries) return entries, err } @@ -328,7 +329,7 @@ func (client *storageRESTClient) DeleteFile(volume, path string) error { values.Set(storageRESTVolume, volume) values.Set(storageRESTFilePath, path) respBody, err := client.call(storageRESTMethodDeleteFile, values, nil, -1) - defer CloseResponse(respBody) + defer http.DrainBody(respBody) return err } @@ -340,7 +341,7 @@ func (client *storageRESTClient) RenameFile(srcVolume, srcPath, dstVolume, dstPa values.Set(storageRESTDstVolume, dstVolume) values.Set(storageRESTDstPath, dstPath) respBody, err := client.call(storageRESTMethodRenameFile, values, nil, -1) - defer CloseResponse(respBody) + defer http.DrainBody(respBody) return err } diff --git a/cmd/update-main.go b/cmd/update-main.go index 4d9e77434..4ddf4bc30 100644 --- a/cmd/update-main.go +++ b/cmd/update-main.go @@ -32,6 +32,7 @@ import ( "github.com/inconshreveable/go-update" "github.com/minio/cli" + xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" _ "github.com/minio/sha256-simd" // Needed for sha256 hash verifier. "github.com/segmentio/go-prompt" @@ -318,7 +319,7 @@ func downloadReleaseURL(releaseChecksumURL string, timeout time.Duration, mode s if resp == nil { return content, fmt.Errorf("No response from server to download URL %s", releaseChecksumURL) } - defer CloseResponse(resp.Body) + defer xhttp.DrainBody(resp.Body) if resp.StatusCode != http.StatusOK { return content, fmt.Errorf("Error downloading URL %s. Response: %v", releaseChecksumURL, resp.Status) @@ -465,7 +466,7 @@ func doUpdate(sha256Hex string, latestReleaseTime time.Time, ok bool) (updateSta if err != nil { return updateStatusMsg, err } - defer CloseResponse(resp.Body) + defer xhttp.DrainBody(resp.Body) // FIXME: add support for gpg verification as well. if err = update.Apply(resp.Body, diff --git a/cmd/utils.go b/cmd/utils.go index b32ebd01e..ba69b3a69 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -34,7 +34,6 @@ import ( "path/filepath" "reflect" "strings" - "sync" "time" "github.com/minio/minio/cmd/logger" @@ -467,35 +466,6 @@ func isNetworkOrHostDown(err error) bool { return false } -var b512pool = sync.Pool{ - New: func() interface{} { - buf := make([]byte, 512) - return &buf - }, -} - -// CloseResponse close non nil response with any response Body. -// convenient wrapper to drain any remaining data on response body. -// -// Subsequently this allows golang http RoundTripper -// to re-use the same connection for future requests. -func CloseResponse(respBody io.ReadCloser) { - // Callers should close resp.Body when done reading from it. - // If resp.Body is not closed, the Client's underlying RoundTripper - // (typically Transport) may not be able to re-use a persistent TCP - // connection to the server for a subsequent "keep-alive" request. - if respBody != nil { - // Drain any remaining Body and then close the connection. - // Without this closing connection would disallow re-using - // the same connection for future uses. - // - http://stackoverflow.com/a/17961593/4465767 - bufp := b512pool.Get().(*[]byte) - defer b512pool.Put(bufp) - io.CopyBuffer(ioutil.Discard, respBody, *bufp) - respBody.Close() - } -} - // Used for registering with rest handlers (have a look at registerStorageRESTHandlers for usage example) // If it is passed ["aaaa", "bbbb"], it returns ["aaaa", "{aaaa:.*}", "bbbb", "{bbbb:.*}"] func restQueries(keys ...string) []string {