Make sure to drain body upon an error (#7197)

Also cleanup redundant code and use it at a common place
master
Harshavardhana 6 years ago committed by kannappanr
parent 2d168b532b
commit 817269475f
  1. 3
      cmd/config-current.go
  2. 3
      cmd/gateway/oss/gateway-oss.go
  3. 52
      cmd/http/close.go
  4. 14
      cmd/logger/target/http/http.go
  5. 4
      cmd/rest/client.go
  6. 26
      cmd/rpc/client.go
  7. 29
      cmd/storage-rest-client.go
  8. 5
      cmd/update-main.go
  9. 30
      cmd/utils.go

@ -25,6 +25,7 @@ import (
"sync" "sync"
"github.com/minio/minio/cmd/crypto" "github.com/minio/minio/cmd/crypto"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/event"
@ -552,7 +553,7 @@ func (s *serverConfig) loadToCachedConfigs() {
URL: s.Policy.OPA.URL, URL: s.Policy.OPA.URL,
AuthToken: s.Policy.OPA.AuthToken, AuthToken: s.Policy.OPA.AuthToken,
Transport: NewCustomHTTPTransport(), Transport: NewCustomHTTPTransport(),
CloseRespFn: CloseResponse, CloseRespFn: xhttp.DrainBody,
}) })
} }
} }

@ -31,6 +31,7 @@ import (
"github.com/minio/cli" "github.com/minio/cli"
miniogopolicy "github.com/minio/minio-go/pkg/policy" miniogopolicy "github.com/minio/minio-go/pkg/policy"
minio "github.com/minio/minio/cmd" minio "github.com/minio/minio/cmd"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/hash" "github.com/minio/minio/pkg/hash"
@ -844,7 +845,7 @@ func ossListObjectParts(client *oss.Client, bucket, object, uploadID string, par
} }
// always drain output (response body) // always drain output (response body)
defer minio.CloseResponse(resp.Body) defer xhttp.DrainBody(resp.Body)
err = xml.NewDecoder(resp.Body).Decode(&lupr) err = xml.NewDecoder(resp.Body).Decode(&lupr)
if err != nil { if err != nil {

@ -0,0 +1,52 @@
/*
* Minio Cloud Storage, (C) 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package http
import (
"io"
"io/ioutil"
"sync"
)
var b512pool = sync.Pool{
New: func() interface{} {
buf := make([]byte, 512)
return &buf
},
}
// DrainBody close non nil response with any response Body.
// convenient wrapper to drain any remaining data on response body.
//
// Subsequently this allows golang http RoundTripper
// to re-use the same connection for future requests.
func DrainBody(respBody io.ReadCloser) {
// Callers should close resp.Body when done reading from it.
// If resp.Body is not closed, the Client's underlying RoundTripper
// (typically Transport) may not be able to re-use a persistent TCP
// connection to the server for a subsequent "keep-alive" request.
if respBody != nil {
// Drain any remaining Body and then close the connection.
// Without this closing connection would disallow re-using
// the same connection for future uses.
// - http://stackoverflow.com/a/17961593/4465767
bufp := b512pool.Get().(*[]byte)
defer b512pool.Put(bufp)
io.CopyBuffer(ioutil.Discard, respBody, *bufp)
respBody.Close()
}
}

@ -1,5 +1,5 @@
/* /*
* Minio Cloud Storage, (C) 2018 Minio, Inc. * Minio Cloud Storage, (C) 2018, 2019 Minio, Inc.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -20,9 +20,9 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"io"
"io/ioutil"
gohttp "net/http" gohttp "net/http"
xhttp "github.com/minio/minio/cmd/http"
) )
// Target implements logger.Target and sends the json // Target implements logger.Target and sends the json
@ -56,11 +56,9 @@ func (h *Target) startHTTPLogger() {
if err != nil { if err != nil {
continue continue
} }
if resp.Body != nil {
buf := make([]byte, 512) // Drain any response.
io.CopyBuffer(ioutil.Discard, resp.Body, buf) xhttp.DrainBody(resp.Body)
resp.Body.Close()
}
} }
}() }()
} }

@ -59,9 +59,9 @@ func (c *Client) Call(method string, values url.Values, body io.Reader, length i
} }
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
defer xhttp.DrainBody(resp.Body)
// Limit the ReadAll(), just in case, because of a bug, the server responds with large data. // Limit the ReadAll(), just in case, because of a bug, the server responds with large data.
r := io.LimitReader(resp.Body, 1024) b, err := ioutil.ReadAll(io.LimitReader(resp.Body, 4096))
b, err := ioutil.ReadAll(r)
if err != nil { if err != nil {
return nil, err return nil, err
} }

@ -22,8 +22,6 @@ import (
"encoding/gob" "encoding/gob"
"errors" "errors"
"fmt" "fmt"
"io"
"io/ioutil"
"net" "net"
"net/http" "net/http"
"reflect" "reflect"
@ -42,28 +40,6 @@ type Client struct {
serviceURL *xnet.URL serviceURL *xnet.URL
} }
// closeResponse close non nil response with any response Body.
// convenient wrapper to drain any remaining data on response body.
//
// Subsequently this allows golang http RoundTripper
// to re-use the same connection for future requests.
func closeResponse(body io.ReadCloser) {
// Callers should close resp.Body when done reading from it.
// If resp.Body is not closed, the Client's underlying RoundTripper
// (typically Transport) may not be able to re-use a persistent TCP
// connection to the server for a subsequent "keep-alive" request.
if body != nil {
// Drain any remaining Body and then close the connection.
// Without this closing connection would disallow re-using
// the same connection for future uses.
// - http://stackoverflow.com/a/17961593/4465767
bufp := b512pool.Get().(*[]byte)
defer b512pool.Put(bufp)
io.CopyBuffer(ioutil.Discard, body, *bufp)
body.Close()
}
}
// Call - calls service method on RPC server. // Call - calls service method on RPC server.
func (client *Client) Call(serviceMethod string, args, reply interface{}) error { func (client *Client) Call(serviceMethod string, args, reply interface{}) error {
replyKind := reflect.TypeOf(reply).Kind() replyKind := reflect.TypeOf(reply).Kind()
@ -93,7 +69,7 @@ func (client *Client) Call(serviceMethod string, args, reply interface{}) error
if err != nil { if err != nil {
return err return err
} }
defer closeResponse(response.Body) defer xhttp.DrainBody(response.Body)
if response.StatusCode != http.StatusOK { if response.StatusCode != http.StatusOK {
return fmt.Errorf("%v rpc call failed with error code %v", serviceMethod, response.StatusCode) return fmt.Errorf("%v rpc call failed with error code %v", serviceMethod, response.StatusCode)

@ -32,6 +32,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/cmd/rest" "github.com/minio/minio/cmd/rest"
xnet "github.com/minio/minio/pkg/net" xnet "github.com/minio/minio/pkg/net"
@ -164,7 +165,7 @@ func (client *storageRESTClient) DiskInfo() (info DiskInfo, err error) {
if err != nil { if err != nil {
return return
} }
defer CloseResponse(respBody) defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info) err = gob.NewDecoder(respBody).Decode(&info)
return info, err return info, err
} }
@ -174,7 +175,7 @@ func (client *storageRESTClient) MakeVol(volume string) (err error) {
values := make(url.Values) values := make(url.Values)
values.Set(storageRESTVolume, volume) values.Set(storageRESTVolume, volume)
respBody, err := client.call(storageRESTMethodMakeVol, values, nil, -1) respBody, err := client.call(storageRESTMethodMakeVol, values, nil, -1)
defer CloseResponse(respBody) defer http.DrainBody(respBody)
return err return err
} }
@ -184,7 +185,7 @@ func (client *storageRESTClient) ListVols() (volinfo []VolInfo, err error) {
if err != nil { if err != nil {
return return
} }
defer CloseResponse(respBody) defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&volinfo) err = gob.NewDecoder(respBody).Decode(&volinfo)
return volinfo, err return volinfo, err
} }
@ -197,7 +198,7 @@ func (client *storageRESTClient) StatVol(volume string) (volInfo VolInfo, err er
if err != nil { if err != nil {
return return
} }
defer CloseResponse(respBody) defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&volInfo) err = gob.NewDecoder(respBody).Decode(&volInfo)
return volInfo, err return volInfo, err
} }
@ -207,7 +208,7 @@ func (client *storageRESTClient) DeleteVol(volume string) (err error) {
values := make(url.Values) values := make(url.Values)
values.Set(storageRESTVolume, volume) values.Set(storageRESTVolume, volume)
respBody, err := client.call(storageRESTMethodDeleteVol, values, nil, -1) respBody, err := client.call(storageRESTMethodDeleteVol, values, nil, -1)
defer CloseResponse(respBody) defer http.DrainBody(respBody)
return err return err
} }
@ -218,7 +219,7 @@ func (client *storageRESTClient) AppendFile(volume, path string, buffer []byte)
values.Set(storageRESTFilePath, path) values.Set(storageRESTFilePath, path)
reader := bytes.NewBuffer(buffer) reader := bytes.NewBuffer(buffer)
respBody, err := client.call(storageRESTMethodAppendFile, values, reader, -1) respBody, err := client.call(storageRESTMethodAppendFile, values, reader, -1)
defer CloseResponse(respBody) defer http.DrainBody(respBody)
return err return err
} }
@ -228,7 +229,7 @@ func (client *storageRESTClient) CreateFile(volume, path string, length int64, r
values.Set(storageRESTFilePath, path) values.Set(storageRESTFilePath, path)
values.Set(storageRESTLength, strconv.Itoa(int(length))) values.Set(storageRESTLength, strconv.Itoa(int(length)))
respBody, err := client.call(storageRESTMethodCreateFile, values, r, length) respBody, err := client.call(storageRESTMethodCreateFile, values, r, length)
defer CloseResponse(respBody) defer http.DrainBody(respBody)
return err return err
} }
@ -239,7 +240,7 @@ func (client *storageRESTClient) WriteAll(volume, path string, buffer []byte) er
values.Set(storageRESTFilePath, path) values.Set(storageRESTFilePath, path)
reader := bytes.NewBuffer(buffer) reader := bytes.NewBuffer(buffer)
respBody, err := client.call(storageRESTMethodWriteAll, values, reader, -1) respBody, err := client.call(storageRESTMethodWriteAll, values, reader, -1)
defer CloseResponse(respBody) defer http.DrainBody(respBody)
return err return err
} }
@ -252,7 +253,7 @@ func (client *storageRESTClient) StatFile(volume, path string) (info FileInfo, e
if err != nil { if err != nil {
return info, err return info, err
} }
defer CloseResponse(respBody) defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info) err = gob.NewDecoder(respBody).Decode(&info)
return info, err return info, err
} }
@ -266,7 +267,7 @@ func (client *storageRESTClient) ReadAll(volume, path string) ([]byte, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer CloseResponse(respBody) defer http.DrainBody(respBody)
return ioutil.ReadAll(respBody) return ioutil.ReadAll(respBody)
} }
@ -302,7 +303,7 @@ func (client *storageRESTClient) ReadFile(volume, path string, offset int64, buf
if err != nil { if err != nil {
return 0, err return 0, err
} }
defer CloseResponse(respBody) defer http.DrainBody(respBody)
n, err := io.ReadFull(respBody, buffer) n, err := io.ReadFull(respBody, buffer)
return int64(n), err return int64(n), err
} }
@ -317,7 +318,7 @@ func (client *storageRESTClient) ListDir(volume, dirPath string, count int) (ent
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer CloseResponse(respBody) defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&entries) err = gob.NewDecoder(respBody).Decode(&entries)
return entries, err return entries, err
} }
@ -328,7 +329,7 @@ func (client *storageRESTClient) DeleteFile(volume, path string) error {
values.Set(storageRESTVolume, volume) values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path) values.Set(storageRESTFilePath, path)
respBody, err := client.call(storageRESTMethodDeleteFile, values, nil, -1) respBody, err := client.call(storageRESTMethodDeleteFile, values, nil, -1)
defer CloseResponse(respBody) defer http.DrainBody(respBody)
return err return err
} }
@ -340,7 +341,7 @@ func (client *storageRESTClient) RenameFile(srcVolume, srcPath, dstVolume, dstPa
values.Set(storageRESTDstVolume, dstVolume) values.Set(storageRESTDstVolume, dstVolume)
values.Set(storageRESTDstPath, dstPath) values.Set(storageRESTDstPath, dstPath)
respBody, err := client.call(storageRESTMethodRenameFile, values, nil, -1) respBody, err := client.call(storageRESTMethodRenameFile, values, nil, -1)
defer CloseResponse(respBody) defer http.DrainBody(respBody)
return err return err
} }

@ -32,6 +32,7 @@ import (
"github.com/inconshreveable/go-update" "github.com/inconshreveable/go-update"
"github.com/minio/cli" "github.com/minio/cli"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
_ "github.com/minio/sha256-simd" // Needed for sha256 hash verifier. _ "github.com/minio/sha256-simd" // Needed for sha256 hash verifier.
"github.com/segmentio/go-prompt" "github.com/segmentio/go-prompt"
@ -318,7 +319,7 @@ func downloadReleaseURL(releaseChecksumURL string, timeout time.Duration, mode s
if resp == nil { if resp == nil {
return content, fmt.Errorf("No response from server to download URL %s", releaseChecksumURL) return content, fmt.Errorf("No response from server to download URL %s", releaseChecksumURL)
} }
defer CloseResponse(resp.Body) defer xhttp.DrainBody(resp.Body)
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
return content, fmt.Errorf("Error downloading URL %s. Response: %v", releaseChecksumURL, resp.Status) return content, fmt.Errorf("Error downloading URL %s. Response: %v", releaseChecksumURL, resp.Status)
@ -465,7 +466,7 @@ func doUpdate(sha256Hex string, latestReleaseTime time.Time, ok bool) (updateSta
if err != nil { if err != nil {
return updateStatusMsg, err return updateStatusMsg, err
} }
defer CloseResponse(resp.Body) defer xhttp.DrainBody(resp.Body)
// FIXME: add support for gpg verification as well. // FIXME: add support for gpg verification as well.
if err = update.Apply(resp.Body, if err = update.Apply(resp.Body,

@ -34,7 +34,6 @@ import (
"path/filepath" "path/filepath"
"reflect" "reflect"
"strings" "strings"
"sync"
"time" "time"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
@ -467,35 +466,6 @@ func isNetworkOrHostDown(err error) bool {
return false return false
} }
var b512pool = sync.Pool{
New: func() interface{} {
buf := make([]byte, 512)
return &buf
},
}
// CloseResponse close non nil response with any response Body.
// convenient wrapper to drain any remaining data on response body.
//
// Subsequently this allows golang http RoundTripper
// to re-use the same connection for future requests.
func CloseResponse(respBody io.ReadCloser) {
// Callers should close resp.Body when done reading from it.
// If resp.Body is not closed, the Client's underlying RoundTripper
// (typically Transport) may not be able to re-use a persistent TCP
// connection to the server for a subsequent "keep-alive" request.
if respBody != nil {
// Drain any remaining Body and then close the connection.
// Without this closing connection would disallow re-using
// the same connection for future uses.
// - http://stackoverflow.com/a/17961593/4465767
bufp := b512pool.Get().(*[]byte)
defer b512pool.Put(bufp)
io.CopyBuffer(ioutil.Discard, respBody, *bufp)
respBody.Close()
}
}
// Used for registering with rest handlers (have a look at registerStorageRESTHandlers for usage example) // Used for registering with rest handlers (have a look at registerStorageRESTHandlers for usage example)
// If it is passed ["aaaa", "bbbb"], it returns ["aaaa", "{aaaa:.*}", "bbbb", "{bbbb:.*}"] // If it is passed ["aaaa", "bbbb"], it returns ["aaaa", "{aaaa:.*}", "bbbb", "{bbbb:.*}"]
func restQueries(keys ...string) []string { func restQueries(keys ...string) []string {

Loading…
Cancel
Save