Support HTTP/2.0 (#7204)

Fixes #6704
master
Harshavardhana 6 years ago committed by kannappanr
parent 8a405cab2f
commit 396d78352d
  1. 34
      cmd/http-stats.go
  2. 23
      cmd/http/bufconn.go
  3. 2
      cmd/http/bufconn_test.go
  4. 145
      cmd/http/listener.go
  5. 393
      cmd/http/listener_test.go
  6. 15
      cmd/http/server.go
  7. 2
      cmd/object-api-common.go
  8. 18
      cmd/rest/client.go
  9. 7
      cmd/rpc.go
  10. 33
      cmd/rpc/client.go
  11. 5
      cmd/rpc/client_test.go
  12. 14
      cmd/storage-rest-client.go
  13. 5
      cmd/storage-rest_test.go

@ -19,59 +19,29 @@ package cmd
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"strings"
"time" "time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic" "go.uber.org/atomic"
) )
func getRequestResource(r *http.Request) string {
if r == nil {
// http.Request is nil when non-HTTP data (like TLS record) is read/written.
return ""
}
if globalDomainName != "" {
host := r.Header.Get("Host")
if strings.HasSuffix(host, "."+globalDomainName) {
return "/" + strings.TrimSuffix(host, "."+globalDomainName) + r.URL.Path
}
}
return r.URL.Path
}
// ConnStats - Network statistics // ConnStats - Network statistics
// Count total input/output transferred bytes during // Count total input/output transferred bytes during
// the server's life. // the server's life.
type ConnStats struct { type ConnStats struct {
totalInputBytes atomic.Uint64 totalInputBytes atomic.Uint64
totalOutputBytes atomic.Uint64 totalOutputBytes atomic.Uint64
totalRPCInputBytes atomic.Uint64
totalRPCOutputBytes atomic.Uint64
} }
// Increase total input bytes // Increase total input bytes
func (s *ConnStats) incInputBytes(r *http.Request, n int) { func (s *ConnStats) incInputBytes(n int) {
resource := getRequestResource(r)
if resource == minioReservedBucketPath || strings.HasPrefix(resource, minioReservedBucketPath+"/") {
s.totalRPCInputBytes.Add(uint64(n))
} else {
s.totalInputBytes.Add(uint64(n)) s.totalInputBytes.Add(uint64(n))
} }
}
// Increase total output bytes // Increase total output bytes
func (s *ConnStats) incOutputBytes(r *http.Request, n int) { func (s *ConnStats) incOutputBytes(n int) {
resource := getRequestResource(r)
if resource == minioReservedBucketPath || strings.HasPrefix(resource, minioReservedBucketPath+"/") {
s.totalRPCOutputBytes.Add(uint64(n))
} else {
s.totalOutputBytes.Add(uint64(n)) s.totalOutputBytes.Add(uint64(n))
} }
}
// Return total input bytes // Return total input bytes
func (s *ConnStats) getTotalInputBytes() uint64 { func (s *ConnStats) getTotalInputBytes() uint64 {

@ -19,7 +19,6 @@ package http
import ( import (
"bufio" "bufio"
"net" "net"
"net/http"
"time" "time"
) )
@ -29,18 +28,8 @@ type BufConn struct {
bufReader *bufio.Reader // buffered reader wraps reader in net.Conn. bufReader *bufio.Reader // buffered reader wraps reader in net.Conn.
readTimeout time.Duration // sets the read timeout in the connection. readTimeout time.Duration // sets the read timeout in the connection.
writeTimeout time.Duration // sets the write timeout in the connection. writeTimeout time.Duration // sets the write timeout in the connection.
request *http.Request // HTTP request information. updateBytesReadFunc func(int) // function to be called to update bytes read.
updateBytesReadFunc func(*http.Request, int) // function to be called to update bytes read. updateBytesWrittenFunc func(int) // function to be called to update bytes written.
updateBytesWrittenFunc func(*http.Request, int) // function to be called to update bytes written.
}
func (c *BufConn) setRequest(request *http.Request) {
c.request = request
}
func (c *BufConn) setUpdateFuncs(updateBytesReadFunc, updateBytesWrittenFunc func(*http.Request, int)) {
c.updateBytesReadFunc = updateBytesReadFunc
c.updateBytesWrittenFunc = updateBytesWrittenFunc
} }
// Sets read timeout // Sets read timeout
@ -81,7 +70,7 @@ func (c *BufConn) Read(b []byte) (n int, err error) {
c.setReadTimeout() c.setReadTimeout()
n, err = c.bufReader.Read(b) n, err = c.bufReader.Read(b)
if err == nil && c.updateBytesReadFunc != nil { if err == nil && c.updateBytesReadFunc != nil {
c.updateBytesReadFunc(c.request, n) c.updateBytesReadFunc(n)
} }
return n, err return n, err
@ -92,18 +81,20 @@ func (c *BufConn) Write(b []byte) (n int, err error) {
c.setWriteTimeout() c.setWriteTimeout()
n, err = c.Conn.Write(b) n, err = c.Conn.Write(b)
if err == nil && c.updateBytesWrittenFunc != nil { if err == nil && c.updateBytesWrittenFunc != nil {
c.updateBytesWrittenFunc(c.request, n) c.updateBytesWrittenFunc(n)
} }
return n, err return n, err
} }
// newBufConn - creates a new connection object wrapping net.Conn. // newBufConn - creates a new connection object wrapping net.Conn.
func newBufConn(c net.Conn, readTimeout, writeTimeout time.Duration, maxHeaderBytes int) *BufConn { func newBufConn(c net.Conn, readTimeout, writeTimeout time.Duration, maxHeaderBytes int, updateBytesReadFunc, updateBytesWrittenFunc func(int)) *BufConn {
return &BufConn{ return &BufConn{
QuirkConn: QuirkConn{Conn: c}, QuirkConn: QuirkConn{Conn: c},
bufReader: bufio.NewReaderSize(c, maxHeaderBytes), bufReader: bufio.NewReaderSize(c, maxHeaderBytes),
readTimeout: readTimeout, readTimeout: readTimeout,
writeTimeout: writeTimeout, writeTimeout: writeTimeout,
updateBytesReadFunc: updateBytesReadFunc,
updateBytesWrittenFunc: updateBytesWrittenFunc,
} }
} }

@ -49,7 +49,7 @@ func TestBuffConnReadTimeout(t *testing.T) {
t.Errorf("failed to accept new connection. %v", terr) t.Errorf("failed to accept new connection. %v", terr)
return return
} }
bufconn := newBufConn(tcpConn, 1*time.Second, 1*time.Second, 4096) bufconn := newBufConn(tcpConn, 1*time.Second, 1*time.Second, 4096, nil, nil)
defer bufconn.Close() defer bufconn.Close()
// Read a line // Read a line

@ -23,9 +23,7 @@ import (
"io" "io"
"net" "net"
"net/http" "net/http"
"net/url"
"os" "os"
"strings"
"sync" "sync"
"syscall" "syscall"
"time" "time"
@ -33,9 +31,7 @@ import (
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
) )
var sslRequiredErrMsg = []byte("HTTP/1.0 403 Forbidden\r\n\r\nSSL required") var sslRequiredErrMsg = []byte("HTTP/1.1 403 Forbidden\r\n\r\nSSL required")
var badRequestMsg = []byte("HTTP/1.0 400 Bad Request\r\n\r\n")
// HTTP methods. // HTTP methods.
var methods = []string{ var methods = []string{
@ -51,30 +47,6 @@ var methods = []string{
"PRI", // HTTP 2 method "PRI", // HTTP 2 method
} }
// maximum length of above methods + one space.
var methodMaxLen = getMethodMaxLen() + 1
func getMethodMaxLen() int {
maxLen := 0
for _, method := range methods {
if len(method) > maxLen {
maxLen = len(method)
}
}
return maxLen
}
func isHTTPMethod(s string) bool {
for _, method := range methods {
if s == method {
return true
}
}
return false
}
func getPlainText(bufConn *BufConn) (bool, error) { func getPlainText(bufConn *BufConn) (bool, error) {
defer bufConn.setReadTimeout() defer bufConn.setReadTimeout()
@ -95,66 +67,6 @@ func getPlainText(bufConn *BufConn) (bool, error) {
return false, nil return false, nil
} }
func getMethodResourceHost(bufConn *BufConn, maxHeaderBytes int) (method string, resource string, host string, err error) {
defer bufConn.setReadTimeout()
var data []byte
for dataLen := 1; dataLen < maxHeaderBytes; dataLen++ {
if bufConn.canSetReadDeadline() {
// Set deadline such that we close the connection quickly
// of no data was received from the Peek()
bufConn.SetReadDeadline(time.Now().UTC().Add(time.Second * 3))
}
data, err = bufConn.bufReader.Peek(dataLen)
if err != nil {
return "", "", "", err
}
tokens := strings.Split(string(data), "\n")
if len(tokens) < 2 {
continue
}
if method == "" && resource == "" {
if i := strings.IndexByte(tokens[0], ' '); i == -1 {
return "", "", "", fmt.Errorf("malformed HTTP request from '%s'", bufConn.LocalAddr())
}
httpTokens := strings.SplitN(tokens[0], " ", 3)
if len(httpTokens) < 3 {
return "", "", "", fmt.Errorf("malformed HTTP request from '%s'", bufConn.LocalAddr())
}
if !isHTTPMethod(httpTokens[0]) {
return "", "", "", fmt.Errorf("malformed HTTP request, invalid HTTP method '%s' from '%s'",
httpTokens[0], bufConn.LocalAddr())
}
method = httpTokens[0]
resource = httpTokens[1]
}
for _, token := range tokens[1:] {
if token == "" || !strings.HasSuffix(token, "\r") {
continue
}
// HTTP headers are case insensitive, so we should simply convert
// each tokens to their lower case form to match 'host' header.
token = strings.ToLower(token)
if strings.HasPrefix(token, "host:") {
host = strings.TrimPrefix(strings.TrimSuffix(token, "\r"), "host:")
return method, resource, host, nil
}
}
if tokens[len(tokens)-1] == "\r" {
break
}
}
return "", "", "", fmt.Errorf("malformed HTTP request from %s", bufConn.LocalAddr())
}
type acceptResult struct { type acceptResult struct {
conn net.Conn conn net.Conn
err error err error
@ -171,8 +83,8 @@ type httpListener struct {
readTimeout time.Duration readTimeout time.Duration
writeTimeout time.Duration writeTimeout time.Duration
maxHeaderBytes int maxHeaderBytes int
updateBytesReadFunc func(*http.Request, int) // function to be called to update bytes read in BufConn. updateBytesReadFunc func(int) // function to be called to update bytes read in BufConn.
updateBytesWrittenFunc func(*http.Request, int) // function to be called to update bytes written in BufConn. updateBytesWrittenFunc func(int) // function to be called to update bytes written in BufConn.
} }
// isRoutineNetErr returns true if error is due to a network timeout, // isRoutineNetErr returns true if error is due to a network timeout,
@ -222,7 +134,8 @@ func (listener *httpListener) start() {
tcpConn.SetKeepAlive(true) tcpConn.SetKeepAlive(true)
tcpConn.SetKeepAlivePeriod(listener.tcpKeepAliveTimeout) tcpConn.SetKeepAlivePeriod(listener.tcpKeepAliveTimeout)
bufconn := newBufConn(tcpConn, listener.readTimeout, listener.writeTimeout, listener.maxHeaderBytes) bufconn := newBufConn(tcpConn, listener.readTimeout, listener.writeTimeout, listener.maxHeaderBytes,
listener.updateBytesReadFunc, listener.updateBytesWrittenFunc)
if listener.tlsConfig != nil { if listener.tlsConfig != nil {
ok, err := getPlainText(bufconn) ok, err := getPlainText(bufconn)
if err != nil { if err != nil {
@ -248,51 +161,7 @@ func (listener *httpListener) start() {
bufconn.Close() bufconn.Close()
return return
} }
// As the listener is configured with TLS, try to do TLS handshake, drop the connection if it fails.
tlsConn := tls.Server(bufconn, listener.tlsConfig)
if err := tlsConn.Handshake(); err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("remoteAddr", bufconn.RemoteAddr().String())
reqInfo.AppendTags("localAddr", bufconn.LocalAddr().String())
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
bufconn.Close()
return
}
bufconn = newBufConn(tlsConn, listener.readTimeout, listener.writeTimeout, listener.maxHeaderBytes)
}
method, resource, host, err := getMethodResourceHost(bufconn, listener.maxHeaderBytes)
if err != nil {
// Peek could fail legitimately when clients abruptly close
// connection. E.g. Chrome browser opens connections speculatively to
// speed up loading of a web page. Peek may also fail due to network
// saturation on a transport with read timeout set. All other kind of
// errors should be logged for further investigation. Thanks @brendanashworth.
if !isRoutineNetErr(err) {
reqInfo := (&logger.ReqInfo{}).AppendTags("remoteAddr", bufconn.RemoteAddr().String())
reqInfo.AppendTags("localAddr", bufconn.LocalAddr().String())
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
bufconn.Write(badRequestMsg)
}
bufconn.Close()
return
}
header := make(http.Header)
if host != "" {
header.Add("Host", host)
} }
bufconn.setRequest(&http.Request{
Method: method,
URL: &url.URL{Path: resource},
Host: bufconn.LocalAddr().String(),
Header: header,
})
bufconn.setUpdateFuncs(listener.updateBytesReadFunc, listener.updateBytesWrittenFunc)
send(acceptResult{bufconn, nil}, doneCh) send(acceptResult{bufconn, nil}, doneCh)
} }
@ -380,8 +249,8 @@ func newHTTPListener(serverAddrs []string,
readTimeout time.Duration, readTimeout time.Duration,
writeTimeout time.Duration, writeTimeout time.Duration,
maxHeaderBytes int, maxHeaderBytes int,
updateBytesReadFunc func(*http.Request, int), updateBytesReadFunc func(int),
updateBytesWrittenFunc func(*http.Request, int)) (listener *httpListener, err error) { updateBytesWrittenFunc func(int)) (listener *httpListener, err error) {
var tcpListeners []*net.TCPListener var tcpListeners []*net.TCPListener

@ -17,17 +17,14 @@
package http package http
import ( import (
"bufio"
"bytes" "bytes"
"crypto/tls" "crypto/tls"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"net" "net"
"net/http"
"strconv" "strconv"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time" "time"
@ -156,43 +153,6 @@ func getNonLoopBackIP(t *testing.T) string {
return nonLoopBackIP return nonLoopBackIP
} }
// Test getMethodMaxLen()
func TestGetMethodMaxLen(t *testing.T) {
l := getMethodMaxLen()
if l != (methodMaxLen - 1) {
t.Fatalf("expected: %v, got: %v", (methodMaxLen - 1), l)
}
}
// Test isHTTPMethod()
func TestIsHTTPMethod(t *testing.T) {
testCases := []struct {
method string
expectedResult bool
}{
{"", false},
{"get", false},
{"put", false},
{"UPLOAD", false},
{"OPTIONS", true},
{"GET", true},
{"HEAD", true},
{"POST", true},
{"PUT", true},
{"DELETE", true},
{"TRACE", true},
{"CONNECT", true},
{"PRI", true},
}
for _, testCase := range testCases {
result := isHTTPMethod(testCase.method)
if result != testCase.expectedResult {
t.Fatalf("expected: %v, got: %v", testCase.expectedResult, result)
}
}
}
func TestNewHTTPListener(t *testing.T) { func TestNewHTTPListener(t *testing.T) {
tlsConfig := getTLSConfig(t) tlsConfig := getTLSConfig(t)
@ -202,8 +162,8 @@ func TestNewHTTPListener(t *testing.T) {
tcpKeepAliveTimeout time.Duration tcpKeepAliveTimeout time.Duration
readTimeout time.Duration readTimeout time.Duration
writeTimeout time.Duration writeTimeout time.Duration
updateBytesReadFunc func(*http.Request, int) updateBytesReadFunc func(int)
updateBytesWrittenFunc func(*http.Request, int) updateBytesWrittenFunc func(int)
expectedErr bool expectedErr bool
}{ }{
{[]string{"93.184.216.34:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, true}, {[]string{"93.184.216.34:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, true},
@ -266,8 +226,7 @@ func TestHTTPListenerStartClose(t *testing.T) {
time.Duration(0), time.Duration(0),
time.Duration(0), time.Duration(0),
DefaultMaxHeaderBytes, DefaultMaxHeaderBytes,
nil, nil, nil,
nil,
) )
if err != nil { if err != nil {
t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err) t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err)
@ -314,8 +273,7 @@ func TestHTTPListenerAddr(t *testing.T) {
time.Duration(0), time.Duration(0),
time.Duration(0), time.Duration(0),
DefaultMaxHeaderBytes, DefaultMaxHeaderBytes,
nil, nil, nil,
nil,
) )
if err != nil { if err != nil {
t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err) t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err)
@ -359,8 +317,7 @@ func TestHTTPListenerAddrs(t *testing.T) {
time.Duration(0), time.Duration(0),
time.Duration(0), time.Duration(0),
DefaultMaxHeaderBytes, DefaultMaxHeaderBytes,
nil, nil, nil,
nil,
) )
if err != nil { if err != nil {
t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err) t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err)
@ -380,149 +337,6 @@ func TestHTTPListenerAddrs(t *testing.T) {
} }
} }
func TestHTTPListenerAccept(t *testing.T) {
tlsConfig := getTLSConfig(t)
nonLoopBackIP := getNonLoopBackIP(t)
testCases := []struct {
serverAddrs []string
tlsConfig *tls.Config
request string
reply string
expectedRequestLine string
}{
{[]string{"localhost:0"}, nil, "GET / HTTP/1.0\r\nHost: example.org\r\n\r\n", "200 OK\r\n", "GET / HTTP/1.0\r\n"},
{[]string{nonLoopBackIP + ":0"}, nil, "POST / HTTP/1.0\r\nHost: example.org\r\n\r\n", "200 OK\r\n", "POST / HTTP/1.0\r\n"},
{[]string{nonLoopBackIP + ":0"}, nil, "HEAD / HTTP/1.0\r\nhost: example.org\r\n\r\n", "200 OK\r\n", "HEAD / HTTP/1.0\r\n"},
{[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, nil, "CONNECT / HTTP/1.0\r\nHost: www.example.org\r\n\r\n", "200 OK\r\n", "CONNECT / HTTP/1.0\r\n"},
{[]string{"localhost:0"}, tlsConfig, "GET / HTTP/1.0\r\nHost: example.org\r\n\r\n", "200 OK\r\n", "GET / HTTP/1.0\r\n"},
{[]string{nonLoopBackIP + ":0"}, tlsConfig, "POST / HTTP/1.0\r\nHost: example.org\r\n\r\n", "200 OK\r\n", "POST / HTTP/1.0\r\n"},
{[]string{nonLoopBackIP + ":0"}, tlsConfig, "HEAD / HTTP/1.0\r\nhost: example.org\r\n\r\n", "200 OK\r\n", "HEAD / HTTP/1.0\r\n"},
{[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, tlsConfig, "CONNECT / HTTP/1.0\r\nHost: www.example.org\r\n\r\n", "200 OK\r\n", "CONNECT / HTTP/1.0\r\n"},
}
for i, testCase := range testCases {
listener, err := newHTTPListener(
testCase.serverAddrs,
testCase.tlsConfig,
time.Duration(0),
time.Duration(0),
time.Duration(0),
DefaultMaxHeaderBytes,
nil,
nil,
)
if err != nil {
t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err)
}
for _, serverAddr := range listener.Addrs() {
var conn net.Conn
var err error
if testCase.tlsConfig == nil {
conn, err = net.Dial("tcp", serverAddr.String())
} else {
conn, err = tls.Dial("tcp", serverAddr.String(), &tls.Config{InsecureSkipVerify: true})
}
if err != nil {
t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err)
}
if _, err = io.WriteString(conn, testCase.request); err != nil {
t.Fatalf("Test %d: request send: expected = <nil>, got = %v", i+1, err)
}
serverConn, err := listener.Accept()
if err != nil {
t.Fatalf("Test %d: accept: expected = <nil>, got = %v", i+1, err)
}
requestLine, err := bufio.NewReader(serverConn).ReadString('\n')
if err != nil {
t.Fatalf("Test %d: request read: expected = <nil>, got = %v", i+1, err)
}
if requestLine != testCase.expectedRequestLine {
t.Fatalf("Test %d: request: expected = %v, got = %v", i+1, testCase.expectedRequestLine, requestLine)
}
if _, err = io.WriteString(serverConn, testCase.reply); err != nil {
t.Fatalf("Test %d: reply send: expected = <nil>, got = %v", i+1, err)
}
reply, err := bufio.NewReader(conn).ReadString('\n')
if err != nil {
t.Fatalf("Test %d: reply read: expected = <nil>, got = %v", i+1, err)
}
if testCase.reply != reply {
t.Fatalf("Test %d: reply: expected = %v, got = %v", i+1, testCase.reply, reply)
}
serverConn.Close()
conn.Close()
}
listener.Close()
}
}
func TestHTTPListenerAcceptPeekError(t *testing.T) {
tlsConfig := getTLSConfig(t)
nonLoopBackIP := getNonLoopBackIP(t)
testCases := []struct {
serverAddrs []string
tlsConfig *tls.Config
request string
}{
{[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, nil, "CONN"},
{[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, tlsConfig, "CONN"},
}
for i, testCase := range testCases {
listener, err := newHTTPListener(
testCase.serverAddrs,
testCase.tlsConfig,
time.Duration(0),
time.Duration(0),
time.Duration(0),
DefaultMaxHeaderBytes,
nil,
nil,
)
if err != nil {
t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err)
}
go func() {
serverConn, aerr := listener.Accept()
if aerr == nil {
fail(t, "Test %d: accept: expected = <error>, got = <nil>", i+1)
}
if serverConn != nil {
fail(t, "Test %d: accept: server expected = <nil>, got = %v", i+1, serverConn)
}
}()
for _, serverAddr := range listener.Addrs() {
conn, err := net.Dial("tcp", serverAddr.String())
if err != nil {
t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err)
}
if _, err = io.WriteString(conn, testCase.request); err != nil {
t.Fatalf("Test %d: request send: expected = <nil>, got = %v", i+1, err)
}
conn.Close()
}
listener.Close()
}
}
func TestHTTPListenerAcceptTLSError(t *testing.T) { func TestHTTPListenerAcceptTLSError(t *testing.T) {
tlsConfig := getTLSConfig(t) tlsConfig := getTLSConfig(t)
nonLoopBackIP := getNonLoopBackIP(t) nonLoopBackIP := getNonLoopBackIP(t)
@ -543,8 +357,7 @@ func TestHTTPListenerAcceptTLSError(t *testing.T) {
time.Duration(0), time.Duration(0),
time.Duration(0), time.Duration(0),
DefaultMaxHeaderBytes, DefaultMaxHeaderBytes,
nil, nil, nil,
nil,
) )
if err != nil { if err != nil {
t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err) t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err)
@ -587,200 +400,6 @@ func TestHTTPListenerAcceptTLSError(t *testing.T) {
} }
} }
func TestHTTPListenerAcceptError(t *testing.T) {
tlsConfig := getTLSConfig(t)
nonLoopBackIP := getNonLoopBackIP(t)
testCases := []struct {
serverAddrs []string
tlsConfig *tls.Config
secureClient bool
request string
}{
{[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, nil, false, "CONNECTION"},
{[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, tlsConfig, false, "CONNECTION"},
{[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, tlsConfig, true, "CONNECTION"},
}
for i, testCase := range testCases {
listener, err := newHTTPListener(
testCase.serverAddrs,
testCase.tlsConfig,
time.Duration(0),
time.Duration(0),
time.Duration(0),
DefaultMaxHeaderBytes,
nil,
nil,
)
if err != nil {
t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err)
}
for _, serverAddr := range listener.Addrs() {
var conn net.Conn
var err error
if testCase.secureClient {
conn, err = tls.Dial("tcp", serverAddr.String(), &tls.Config{InsecureSkipVerify: true})
} else {
conn, err = net.Dial("tcp", serverAddr.String())
}
if err != nil {
t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err)
}
if _, err = io.WriteString(conn, testCase.request); err != nil {
t.Fatalf("Test %d: request send: expected = <nil>, got = %v", i+1, err)
}
go func() {
serverConn, aerr := listener.Accept()
if aerr == nil {
fail(t, "Test %d: accept: expected = <error>, got = <nil>", i+1)
}
if serverConn != nil {
fail(t, "Test %d: accept: server expected = <nil>, got = %v", i+1, serverConn)
}
}()
if !testCase.secureClient && testCase.tlsConfig != nil {
buf := make([]byte, len(sslRequiredErrMsg))
var n int
n, err = io.ReadFull(conn, buf)
if err != nil {
t.Fatalf("Test %d: reply read: expected = <nil> got = %v", i+1, err)
} else if n != len(buf) {
t.Fatalf("Test %d: reply length: expected = %v got = %v", i+1, len(buf), n)
} else if !bytes.Equal(buf, sslRequiredErrMsg) {
t.Fatalf("Test %d: reply: expected = %v got = %v", i+1, string(sslRequiredErrMsg), string(buf))
}
continue
}
if _, err = bufio.NewReader(conn).ReadString('\n'); err != io.EOF {
t.Errorf("Test %d: reply read: expected = io.EOF, got = %s", i+1, err)
}
conn.Close()
}
listener.Close()
}
}
func TestHTTPListenerAcceptParallel(t *testing.T) {
tlsConfig := getTLSConfig(t)
nonLoopBackIP := getNonLoopBackIP(t)
testCases := []struct {
serverAddrs []string
tlsConfig *tls.Config
reply string
}{
{[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, nil, "200 OK\n"},
{[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, tlsConfig, "200 OK\n"},
}
connect := func(i int, serverAddr string, secure bool, delay bool, request, reply string) {
var conn net.Conn
var err error
if secure {
conn, err = tls.Dial("tcp", serverAddr, &tls.Config{InsecureSkipVerify: true})
} else {
conn, err = net.Dial("tcp", serverAddr)
}
if err != nil {
fail(t, "Test %d: error: expected = <nil>, got = %v", i+1, err)
}
if delay {
if _, err = io.WriteString(conn, request[:3]); err != nil {
fail(t, "Test %d: request send: expected = <nil>, got = %v", i+1, err)
}
time.Sleep(1 * time.Second)
if _, err = io.WriteString(conn, request[3:]); err != nil {
fail(t, "Test %d: request send: expected = <nil>, got = %v", i+1, err)
}
} else {
if _, err = io.WriteString(conn, request); err != nil {
fail(t, "Test %d: request send: expected = <nil>, got = %v", i+1, err)
}
}
received, err := bufio.NewReader(conn).ReadString('\n')
if err != nil {
fail(t, "Test %d: reply read: expected = <nil>, got = %v", i+1, err)
}
if received != reply {
fail(t, "Test %d: reply: expected = %v, got = %v", i+1, reply, received)
}
conn.Close()
}
handleConnection := func(i int, wg *sync.WaitGroup, serverConn net.Conn, request, reply string) {
defer wg.Done()
received, err := bufio.NewReader(serverConn).ReadString('\n')
if err != nil {
fail(t, "Test %d: request read: expected = <nil>, got = %v", i+1, err)
}
if received != request {
fail(t, "Test %d: request: expected = %v, got = %v", i+1, request, received)
}
if _, err := io.WriteString(serverConn, reply); err != nil {
fail(t, "Test %d: reply send: expected = <nil>, got = %v", i+1, err)
}
serverConn.Close()
}
for i, testCase := range testCases {
listener, err := newHTTPListener(
testCase.serverAddrs,
testCase.tlsConfig,
time.Duration(0),
time.Duration(0),
time.Duration(0),
DefaultMaxHeaderBytes,
nil,
nil,
)
if err != nil {
t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err)
}
for _, serverAddr := range listener.Addrs() {
go connect(i, serverAddr.String(), testCase.tlsConfig != nil, true, "GET /1 HTTP/1.0\r\nHost: example.org\r\nr\n", testCase.reply)
go connect(i, serverAddr.String(), testCase.tlsConfig != nil, false, "GET /2 HTTP/1.0\r\nHost: example.org\r\n\r\n", testCase.reply)
var wg sync.WaitGroup
serverConn, err := listener.Accept()
if err != nil {
t.Fatalf("Test %d: accept: expected = <nil>, got = %v", i+1, err)
}
wg.Add(1)
go handleConnection(i, &wg, serverConn, "GET /2 HTTP/1.0\r\n", testCase.reply)
serverConn, err = listener.Accept()
if err != nil {
t.Fatalf("Test %d: accept: expected = <nil>, got = %v", i+1, err)
}
wg.Add(1)
go handleConnection(i, &wg, serverConn, "GET /1 HTTP/1.0\r\n", testCase.reply)
wg.Wait()
}
listener.Close()
}
}
type myTimeoutErr struct { type myTimeoutErr struct {
timeout bool timeout bool
} }

@ -54,8 +54,8 @@ type Server struct {
Addrs []string // addresses on which the server listens for new connection. Addrs []string // addresses on which the server listens for new connection.
ShutdownTimeout time.Duration // timeout used for graceful server shutdown. ShutdownTimeout time.Duration // timeout used for graceful server shutdown.
TCPKeepAliveTimeout time.Duration // timeout used for underneath TCP connection. TCPKeepAliveTimeout time.Duration // timeout used for underneath TCP connection.
UpdateBytesReadFunc func(*http.Request, int) // function to be called to update bytes read in bufConn. UpdateBytesReadFunc func(int) // function to be called to update bytes read in bufConn.
UpdateBytesWrittenFunc func(*http.Request, int) // function to be called to update bytes written in bufConn. UpdateBytesWrittenFunc func(int) // function to be called to update bytes written in bufConn.
listenerMutex *sync.Mutex // to guard 'listener' field. listenerMutex *sync.Mutex // to guard 'listener' field.
listener *httpListener // HTTP listener for all 'Addrs' field. listener *httpListener // HTTP listener for all 'Addrs' field.
inShutdown uint32 // indicates whether the server is in shutdown or not inShutdown uint32 // indicates whether the server is in shutdown or not
@ -80,8 +80,6 @@ func (srv *Server) Start() (err error) {
addrs := set.CreateStringSet(srv.Addrs...).ToSlice() // copy and remove duplicates addrs := set.CreateStringSet(srv.Addrs...).ToSlice() // copy and remove duplicates
tcpKeepAliveTimeout := srv.TCPKeepAliveTimeout tcpKeepAliveTimeout := srv.TCPKeepAliveTimeout
updateBytesReadFunc := srv.UpdateBytesReadFunc
updateBytesWrittenFunc := srv.UpdateBytesWrittenFunc
// Create new HTTP listener. // Create new HTTP listener.
var listener *httpListener var listener *httpListener
@ -92,8 +90,8 @@ func (srv *Server) Start() (err error) {
readTimeout, readTimeout,
writeTimeout, writeTimeout,
srv.MaxHeaderBytes, srv.MaxHeaderBytes,
updateBytesReadFunc, srv.UpdateBytesReadFunc,
updateBytesWrittenFunc, srv.UpdateBytesWrittenFunc,
) )
if err != nil { if err != nil {
return err return err
@ -121,6 +119,9 @@ func (srv *Server) Start() (err error) {
srv.listenerMutex.Unlock() srv.listenerMutex.Unlock()
// Start servicing with listener. // Start servicing with listener.
if tlsConfig != nil {
return srv.Server.Serve(tls.NewListener(listener, tlsConfig))
}
return srv.Server.Serve(listener) return srv.Server.Serve(listener)
} }
@ -192,7 +193,7 @@ func NewServer(addrs []string, handler http.Handler, getCert certs.GetCertificat
CipherSuites: defaultCipherSuites, CipherSuites: defaultCipherSuites,
CurvePreferences: secureCurves, CurvePreferences: secureCurves,
MinVersion: tls.VersionTLS12, MinVersion: tls.VersionTLS12,
NextProtos: []string{"http/1.1"}, NextProtos: []string{"h2", "http/1.1"},
} }
tlsConfig.GetCertificate = getCert tlsConfig.GetCertificate = getCert
} }

@ -100,7 +100,7 @@ func newStorageAPI(endpoint Endpoint) (storage StorageAPI, err error) {
return newPosix(endpoint.Path) return newPosix(endpoint.Path)
} }
return newStorageRESTClient(endpoint), nil return newStorageRESTClient(endpoint)
} }
// Cleanup a directory recursively. // Cleanup a directory recursively.

@ -28,6 +28,7 @@ import (
"time" "time"
xhttp "github.com/minio/minio/cmd/http" xhttp "github.com/minio/minio/cmd/http"
"golang.org/x/net/http2"
) )
// DefaultRESTTimeout - default RPC timeout is one minute. // DefaultRESTTimeout - default RPC timeout is one minute.
@ -95,7 +96,7 @@ func newCustomDialContext(timeout time.Duration) func(ctx context.Context, netwo
} }
// NewClient - returns new REST client. // NewClient - returns new REST client.
func NewClient(url *url.URL, tlsConfig *tls.Config, timeout time.Duration, newAuthToken func() string) *Client { func NewClient(url *url.URL, tlsConfig *tls.Config, timeout time.Duration, newAuthToken func() string) (*Client, error) {
// Transport is exactly same as Go default in https://golang.org/pkg/net/http/#RoundTripper // Transport is exactly same as Go default in https://golang.org/pkg/net/http/#RoundTripper
// except custom DialContext and TLSClientConfig. // except custom DialContext and TLSClientConfig.
tr := &http.Transport{ tr := &http.Transport{
@ -103,17 +104,22 @@ func NewClient(url *url.URL, tlsConfig *tls.Config, timeout time.Duration, newAu
DialContext: newCustomDialContext(timeout), DialContext: newCustomDialContext(timeout),
MaxIdleConnsPerHost: 4096, MaxIdleConnsPerHost: 4096,
MaxIdleConns: 4096, MaxIdleConns: 4096,
IdleConnTimeout: 90 * time.Second, IdleConnTimeout: 120 * time.Second,
TLSHandshakeTimeout: 10 * time.Second, TLSHandshakeTimeout: 30 * time.Second,
ExpectContinueTimeout: 1 * time.Second, ExpectContinueTimeout: 10 * time.Second,
TLSClientConfig: tlsConfig, TLSClientConfig: tlsConfig,
DisableCompression: true, DisableCompression: true,
} }
if tlsConfig != nil {
// If TLS is enabled configure http2
if err := http2.ConfigureTransport(tr); err != nil {
return nil, err
}
}
return &Client{ return &Client{
httpClient: &http.Client{Transport: tr}, httpClient: &http.Client{Transport: tr},
httpIdleConnsCloser: tr.CloseIdleConnections, httpIdleConnsCloser: tr.CloseIdleConnections,
url: url, url: url,
newAuthToken: newAuthToken, newAuthToken: newAuthToken,
} }, nil
} }

@ -268,9 +268,14 @@ func NewRPCClient(args RPCClientArgs) (*RPCClient, error) {
return nil, err return nil, err
} }
rpcClient, err := xrpc.NewClient(args.ServiceURL, args.TLSConfig, defaultRPCTimeout)
if err != nil {
return nil, err
}
return &RPCClient{ return &RPCClient{
args: args, args: args,
authToken: args.NewAuthTokenFunc(), authToken: args.NewAuthTokenFunc(),
rpcClient: xrpc.NewClient(args.ServiceURL, args.TLSConfig, defaultRPCTimeout), rpcClient: rpcClient,
}, nil }, nil
} }

@ -29,6 +29,7 @@ import (
xhttp "github.com/minio/minio/cmd/http" xhttp "github.com/minio/minio/cmd/http"
xnet "github.com/minio/minio/pkg/net" xnet "github.com/minio/minio/pkg/net"
"golang.org/x/net/http2"
) )
// DefaultRPCTimeout - default RPC timeout is one minute. // DefaultRPCTimeout - default RPC timeout is one minute.
@ -37,6 +38,7 @@ const DefaultRPCTimeout = 1 * time.Minute
// Client - http based RPC client. // Client - http based RPC client.
type Client struct { type Client struct {
httpClient *http.Client httpClient *http.Client
httpIdleConnsCloser func()
serviceURL *xnet.URL serviceURL *xnet.URL
} }
@ -87,8 +89,11 @@ func (client *Client) Call(serviceMethod string, args, reply interface{}) error
return gobDecode(callResponse.ReplyBytes, reply) return gobDecode(callResponse.ReplyBytes, reply)
} }
// Close - does nothing and presents for interface compatibility. // Close closes all idle connections of the underlying http client
func (client *Client) Close() error { func (client *Client) Close() error {
if client.httpIdleConnsCloser != nil {
client.httpIdleConnsCloser()
}
return nil return nil
} }
@ -110,23 +115,29 @@ func newCustomDialContext(timeout time.Duration) func(ctx context.Context, netwo
} }
// NewClient - returns new RPC client. // NewClient - returns new RPC client.
func NewClient(serviceURL *xnet.URL, tlsConfig *tls.Config, timeout time.Duration) *Client { func NewClient(serviceURL *xnet.URL, tlsConfig *tls.Config, timeout time.Duration) (*Client, error) {
return &Client{
httpClient: &http.Client{
// Transport is exactly same as Go default in https://golang.org/pkg/net/http/#RoundTripper // Transport is exactly same as Go default in https://golang.org/pkg/net/http/#RoundTripper
// except custom DialContext and TLSClientConfig. // except custom DialContext and TLSClientConfig.
Transport: &http.Transport{ tr := &http.Transport{
Proxy: http.ProxyFromEnvironment, Proxy: http.ProxyFromEnvironment,
DialContext: newCustomDialContext(timeout), DialContext: newCustomDialContext(timeout),
MaxIdleConnsPerHost: 4096, MaxIdleConnsPerHost: 4096,
MaxIdleConns: 4096, MaxIdleConns: 4096,
IdleConnTimeout: 90 * time.Second, IdleConnTimeout: 120 * time.Second,
TLSHandshakeTimeout: 10 * time.Second, TLSHandshakeTimeout: 30 * time.Second,
ExpectContinueTimeout: 1 * time.Second, ExpectContinueTimeout: 10 * time.Second,
TLSClientConfig: tlsConfig, TLSClientConfig: tlsConfig,
DisableCompression: true, DisableCompression: true,
},
},
serviceURL: serviceURL,
} }
if tlsConfig != nil {
// If TLS is enabled configure http2
if err := http2.ConfigureTransport(tr); err != nil {
return nil, err
}
}
return &Client{
httpClient: &http.Client{Transport: tr},
httpIdleConnsCloser: tr.CloseIdleConnections,
serviceURL: serviceURL,
}, nil
} }

@ -39,7 +39,10 @@ func TestClientCall(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unexpected error %v", err) t.Fatalf("unexpected error %v", err)
} }
rpcClient := NewClient(url, nil, DefaultRPCTimeout) rpcClient, err := NewClient(url, nil, DefaultRPCTimeout)
if err != nil {
t.Fatalf("NewClient initialization error %v", err)
}
var reply int var reply int
var boolReply bool var boolReply bool

@ -34,7 +34,6 @@ import (
"strings" "strings"
"github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/cmd/rest" "github.com/minio/minio/cmd/rest"
xnet "github.com/minio/minio/pkg/net" xnet "github.com/minio/minio/pkg/net"
) )
@ -381,9 +380,11 @@ func (client *storageRESTClient) Close() error {
} }
// Returns a storage rest client. // Returns a storage rest client.
func newStorageRESTClient(endpoint Endpoint) *storageRESTClient { func newStorageRESTClient(endpoint Endpoint) (*storageRESTClient, error) {
host, err := xnet.ParseHost(endpoint.Host) host, err := xnet.ParseHost(endpoint.Host)
logger.FatalIf(err, "Unable to parse storage Host") if err != nil {
return nil, err
}
scheme := "http" scheme := "http"
if globalIsSSL { if globalIsSSL {
@ -404,8 +405,11 @@ func newStorageRESTClient(endpoint Endpoint) *storageRESTClient {
} }
} }
restClient := rest.NewClient(serverURL, tlsConfig, storageRESTTimeout, newAuthToken) restClient, err := rest.NewClient(serverURL, tlsConfig, storageRESTTimeout, newAuthToken)
if err != nil {
return nil, err
}
client := &storageRESTClient{endpoint: endpoint, restClient: restClient, connected: true} client := &storageRESTClient{endpoint: endpoint, restClient: restClient, connected: true}
client.connected = client.getInstanceID() == nil client.connected = client.getInstanceID() == nil
return client return client, nil
} }

@ -509,7 +509,10 @@ func newStorageRESTHTTPServerClient(t *testing.T) (*httptest.Server, *storageRES
} }
registerStorageRESTHandlers(router, EndpointList{endpoint}) registerStorageRESTHandlers(router, EndpointList{endpoint})
restClient := newStorageRESTClient(endpoint) restClient, err := newStorageRESTClient(endpoint)
if err != nil {
t.Fatalf("newStorageRESTClient failed for %v, with error %s", endpoint, err)
}
prevGlobalServerConfig := globalServerConfig prevGlobalServerConfig := globalServerConfig
globalServerConfig = newServerConfig() globalServerConfig = newServerConfig()

Loading…
Cancel
Save