diff --git a/cmd/http-stats.go b/cmd/http-stats.go index e83f7fece..1270d758c 100644 --- a/cmd/http-stats.go +++ b/cmd/http-stats.go @@ -19,58 +19,28 @@ package cmd import ( "fmt" "net/http" - "strings" "time" "github.com/prometheus/client_golang/prometheus" - "go.uber.org/atomic" ) -func getRequestResource(r *http.Request) string { - if r == nil { - // http.Request is nil when non-HTTP data (like TLS record) is read/written. - return "" - } - - if globalDomainName != "" { - host := r.Header.Get("Host") - if strings.HasSuffix(host, "."+globalDomainName) { - return "/" + strings.TrimSuffix(host, "."+globalDomainName) + r.URL.Path - } - } - - return r.URL.Path -} - // ConnStats - Network statistics // Count total input/output transferred bytes during // the server's life. type ConnStats struct { - totalInputBytes atomic.Uint64 - totalOutputBytes atomic.Uint64 - totalRPCInputBytes atomic.Uint64 - totalRPCOutputBytes atomic.Uint64 + totalInputBytes atomic.Uint64 + totalOutputBytes atomic.Uint64 } // Increase total input bytes -func (s *ConnStats) incInputBytes(r *http.Request, n int) { - resource := getRequestResource(r) - if resource == minioReservedBucketPath || strings.HasPrefix(resource, minioReservedBucketPath+"/") { - s.totalRPCInputBytes.Add(uint64(n)) - } else { - s.totalInputBytes.Add(uint64(n)) - } +func (s *ConnStats) incInputBytes(n int) { + s.totalInputBytes.Add(uint64(n)) } // Increase total output bytes -func (s *ConnStats) incOutputBytes(r *http.Request, n int) { - resource := getRequestResource(r) - if resource == minioReservedBucketPath || strings.HasPrefix(resource, minioReservedBucketPath+"/") { - s.totalRPCOutputBytes.Add(uint64(n)) - } else { - s.totalOutputBytes.Add(uint64(n)) - } +func (s *ConnStats) incOutputBytes(n int) { + s.totalOutputBytes.Add(uint64(n)) } // Return total input bytes diff --git a/cmd/http/bufconn.go b/cmd/http/bufconn.go index f51d25ab3..a66bc0972 100644 --- a/cmd/http/bufconn.go +++ b/cmd/http/bufconn.go @@ -19,28 +19,17 @@ package http import ( "bufio" "net" - "net/http" "time" ) // BufConn - is a generic stream-oriented network connection supporting buffered reader and read/write timeout. type BufConn struct { QuirkConn - bufReader *bufio.Reader // buffered reader wraps reader in net.Conn. - readTimeout time.Duration // sets the read timeout in the connection. - writeTimeout time.Duration // sets the write timeout in the connection. - request *http.Request // HTTP request information. - updateBytesReadFunc func(*http.Request, int) // function to be called to update bytes read. - updateBytesWrittenFunc func(*http.Request, int) // function to be called to update bytes written. -} - -func (c *BufConn) setRequest(request *http.Request) { - c.request = request -} - -func (c *BufConn) setUpdateFuncs(updateBytesReadFunc, updateBytesWrittenFunc func(*http.Request, int)) { - c.updateBytesReadFunc = updateBytesReadFunc - c.updateBytesWrittenFunc = updateBytesWrittenFunc + bufReader *bufio.Reader // buffered reader wraps reader in net.Conn. + readTimeout time.Duration // sets the read timeout in the connection. + writeTimeout time.Duration // sets the write timeout in the connection. + updateBytesReadFunc func(int) // function to be called to update bytes read. + updateBytesWrittenFunc func(int) // function to be called to update bytes written. } // Sets read timeout @@ -81,7 +70,7 @@ func (c *BufConn) Read(b []byte) (n int, err error) { c.setReadTimeout() n, err = c.bufReader.Read(b) if err == nil && c.updateBytesReadFunc != nil { - c.updateBytesReadFunc(c.request, n) + c.updateBytesReadFunc(n) } return n, err @@ -92,18 +81,20 @@ func (c *BufConn) Write(b []byte) (n int, err error) { c.setWriteTimeout() n, err = c.Conn.Write(b) if err == nil && c.updateBytesWrittenFunc != nil { - c.updateBytesWrittenFunc(c.request, n) + c.updateBytesWrittenFunc(n) } return n, err } // newBufConn - creates a new connection object wrapping net.Conn. -func newBufConn(c net.Conn, readTimeout, writeTimeout time.Duration, maxHeaderBytes int) *BufConn { +func newBufConn(c net.Conn, readTimeout, writeTimeout time.Duration, maxHeaderBytes int, updateBytesReadFunc, updateBytesWrittenFunc func(int)) *BufConn { return &BufConn{ - QuirkConn: QuirkConn{Conn: c}, - bufReader: bufio.NewReaderSize(c, maxHeaderBytes), - readTimeout: readTimeout, - writeTimeout: writeTimeout, + QuirkConn: QuirkConn{Conn: c}, + bufReader: bufio.NewReaderSize(c, maxHeaderBytes), + readTimeout: readTimeout, + writeTimeout: writeTimeout, + updateBytesReadFunc: updateBytesReadFunc, + updateBytesWrittenFunc: updateBytesWrittenFunc, } } diff --git a/cmd/http/bufconn_test.go b/cmd/http/bufconn_test.go index b2b6b609c..b95539b3e 100644 --- a/cmd/http/bufconn_test.go +++ b/cmd/http/bufconn_test.go @@ -49,7 +49,7 @@ func TestBuffConnReadTimeout(t *testing.T) { t.Errorf("failed to accept new connection. %v", terr) return } - bufconn := newBufConn(tcpConn, 1*time.Second, 1*time.Second, 4096) + bufconn := newBufConn(tcpConn, 1*time.Second, 1*time.Second, 4096, nil, nil) defer bufconn.Close() // Read a line diff --git a/cmd/http/listener.go b/cmd/http/listener.go index abbd3752f..02ff1efe4 100644 --- a/cmd/http/listener.go +++ b/cmd/http/listener.go @@ -23,9 +23,7 @@ import ( "io" "net" "net/http" - "net/url" "os" - "strings" "sync" "syscall" "time" @@ -33,9 +31,7 @@ import ( "github.com/minio/minio/cmd/logger" ) -var sslRequiredErrMsg = []byte("HTTP/1.0 403 Forbidden\r\n\r\nSSL required") - -var badRequestMsg = []byte("HTTP/1.0 400 Bad Request\r\n\r\n") +var sslRequiredErrMsg = []byte("HTTP/1.1 403 Forbidden\r\n\r\nSSL required") // HTTP methods. var methods = []string{ @@ -51,30 +47,6 @@ var methods = []string{ "PRI", // HTTP 2 method } -// maximum length of above methods + one space. -var methodMaxLen = getMethodMaxLen() + 1 - -func getMethodMaxLen() int { - maxLen := 0 - for _, method := range methods { - if len(method) > maxLen { - maxLen = len(method) - } - } - - return maxLen -} - -func isHTTPMethod(s string) bool { - for _, method := range methods { - if s == method { - return true - } - } - - return false -} - func getPlainText(bufConn *BufConn) (bool, error) { defer bufConn.setReadTimeout() @@ -95,66 +67,6 @@ func getPlainText(bufConn *BufConn) (bool, error) { return false, nil } -func getMethodResourceHost(bufConn *BufConn, maxHeaderBytes int) (method string, resource string, host string, err error) { - defer bufConn.setReadTimeout() - - var data []byte - for dataLen := 1; dataLen < maxHeaderBytes; dataLen++ { - if bufConn.canSetReadDeadline() { - // Set deadline such that we close the connection quickly - // of no data was received from the Peek() - bufConn.SetReadDeadline(time.Now().UTC().Add(time.Second * 3)) - } - - data, err = bufConn.bufReader.Peek(dataLen) - if err != nil { - return "", "", "", err - } - - tokens := strings.Split(string(data), "\n") - if len(tokens) < 2 { - continue - } - - if method == "" && resource == "" { - if i := strings.IndexByte(tokens[0], ' '); i == -1 { - return "", "", "", fmt.Errorf("malformed HTTP request from '%s'", bufConn.LocalAddr()) - } - httpTokens := strings.SplitN(tokens[0], " ", 3) - if len(httpTokens) < 3 { - return "", "", "", fmt.Errorf("malformed HTTP request from '%s'", bufConn.LocalAddr()) - } - if !isHTTPMethod(httpTokens[0]) { - return "", "", "", fmt.Errorf("malformed HTTP request, invalid HTTP method '%s' from '%s'", - httpTokens[0], bufConn.LocalAddr()) - } - - method = httpTokens[0] - resource = httpTokens[1] - } - - for _, token := range tokens[1:] { - if token == "" || !strings.HasSuffix(token, "\r") { - continue - } - - // HTTP headers are case insensitive, so we should simply convert - // each tokens to their lower case form to match 'host' header. - token = strings.ToLower(token) - if strings.HasPrefix(token, "host:") { - host = strings.TrimPrefix(strings.TrimSuffix(token, "\r"), "host:") - return method, resource, host, nil - } - } - - if tokens[len(tokens)-1] == "\r" { - break - } - } - - return "", "", "", fmt.Errorf("malformed HTTP request from %s", bufConn.LocalAddr()) -} - type acceptResult struct { conn net.Conn err error @@ -171,8 +83,8 @@ type httpListener struct { readTimeout time.Duration writeTimeout time.Duration maxHeaderBytes int - updateBytesReadFunc func(*http.Request, int) // function to be called to update bytes read in BufConn. - updateBytesWrittenFunc func(*http.Request, int) // function to be called to update bytes written in BufConn. + updateBytesReadFunc func(int) // function to be called to update bytes read in BufConn. + updateBytesWrittenFunc func(int) // function to be called to update bytes written in BufConn. } // isRoutineNetErr returns true if error is due to a network timeout, @@ -222,7 +134,8 @@ func (listener *httpListener) start() { tcpConn.SetKeepAlive(true) tcpConn.SetKeepAlivePeriod(listener.tcpKeepAliveTimeout) - bufconn := newBufConn(tcpConn, listener.readTimeout, listener.writeTimeout, listener.maxHeaderBytes) + bufconn := newBufConn(tcpConn, listener.readTimeout, listener.writeTimeout, listener.maxHeaderBytes, + listener.updateBytesReadFunc, listener.updateBytesWrittenFunc) if listener.tlsConfig != nil { ok, err := getPlainText(bufconn) if err != nil { @@ -248,51 +161,7 @@ func (listener *httpListener) start() { bufconn.Close() return } - - // As the listener is configured with TLS, try to do TLS handshake, drop the connection if it fails. - tlsConn := tls.Server(bufconn, listener.tlsConfig) - - if err := tlsConn.Handshake(); err != nil { - reqInfo := (&logger.ReqInfo{}).AppendTags("remoteAddr", bufconn.RemoteAddr().String()) - reqInfo.AppendTags("localAddr", bufconn.LocalAddr().String()) - ctx := logger.SetReqInfo(context.Background(), reqInfo) - logger.LogIf(ctx, err) - bufconn.Close() - return - } - - bufconn = newBufConn(tlsConn, listener.readTimeout, listener.writeTimeout, listener.maxHeaderBytes) - } - - method, resource, host, err := getMethodResourceHost(bufconn, listener.maxHeaderBytes) - if err != nil { - // Peek could fail legitimately when clients abruptly close - // connection. E.g. Chrome browser opens connections speculatively to - // speed up loading of a web page. Peek may also fail due to network - // saturation on a transport with read timeout set. All other kind of - // errors should be logged for further investigation. Thanks @brendanashworth. - if !isRoutineNetErr(err) { - reqInfo := (&logger.ReqInfo{}).AppendTags("remoteAddr", bufconn.RemoteAddr().String()) - reqInfo.AppendTags("localAddr", bufconn.LocalAddr().String()) - ctx := logger.SetReqInfo(context.Background(), reqInfo) - logger.LogIf(ctx, err) - bufconn.Write(badRequestMsg) - } - bufconn.Close() - return - } - - header := make(http.Header) - if host != "" { - header.Add("Host", host) } - bufconn.setRequest(&http.Request{ - Method: method, - URL: &url.URL{Path: resource}, - Host: bufconn.LocalAddr().String(), - Header: header, - }) - bufconn.setUpdateFuncs(listener.updateBytesReadFunc, listener.updateBytesWrittenFunc) send(acceptResult{bufconn, nil}, doneCh) } @@ -380,8 +249,8 @@ func newHTTPListener(serverAddrs []string, readTimeout time.Duration, writeTimeout time.Duration, maxHeaderBytes int, - updateBytesReadFunc func(*http.Request, int), - updateBytesWrittenFunc func(*http.Request, int)) (listener *httpListener, err error) { + updateBytesReadFunc func(int), + updateBytesWrittenFunc func(int)) (listener *httpListener, err error) { var tcpListeners []*net.TCPListener diff --git a/cmd/http/listener_test.go b/cmd/http/listener_test.go index 741738807..a65edf1d7 100644 --- a/cmd/http/listener_test.go +++ b/cmd/http/listener_test.go @@ -17,17 +17,14 @@ package http import ( - "bufio" "bytes" "crypto/tls" "errors" "fmt" "io" "net" - "net/http" "strconv" "strings" - "sync" "sync/atomic" "testing" "time" @@ -156,43 +153,6 @@ func getNonLoopBackIP(t *testing.T) string { return nonLoopBackIP } -// Test getMethodMaxLen() -func TestGetMethodMaxLen(t *testing.T) { - l := getMethodMaxLen() - if l != (methodMaxLen - 1) { - t.Fatalf("expected: %v, got: %v", (methodMaxLen - 1), l) - } -} - -// Test isHTTPMethod() -func TestIsHTTPMethod(t *testing.T) { - testCases := []struct { - method string - expectedResult bool - }{ - {"", false}, - {"get", false}, - {"put", false}, - {"UPLOAD", false}, - {"OPTIONS", true}, - {"GET", true}, - {"HEAD", true}, - {"POST", true}, - {"PUT", true}, - {"DELETE", true}, - {"TRACE", true}, - {"CONNECT", true}, - {"PRI", true}, - } - - for _, testCase := range testCases { - result := isHTTPMethod(testCase.method) - if result != testCase.expectedResult { - t.Fatalf("expected: %v, got: %v", testCase.expectedResult, result) - } - } -} - func TestNewHTTPListener(t *testing.T) { tlsConfig := getTLSConfig(t) @@ -202,8 +162,8 @@ func TestNewHTTPListener(t *testing.T) { tcpKeepAliveTimeout time.Duration readTimeout time.Duration writeTimeout time.Duration - updateBytesReadFunc func(*http.Request, int) - updateBytesWrittenFunc func(*http.Request, int) + updateBytesReadFunc func(int) + updateBytesWrittenFunc func(int) expectedErr bool }{ {[]string{"93.184.216.34:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, true}, @@ -266,8 +226,7 @@ func TestHTTPListenerStartClose(t *testing.T) { time.Duration(0), time.Duration(0), DefaultMaxHeaderBytes, - nil, - nil, + nil, nil, ) if err != nil { t.Fatalf("Test %d: error: expected = , got = %v", i+1, err) @@ -314,8 +273,7 @@ func TestHTTPListenerAddr(t *testing.T) { time.Duration(0), time.Duration(0), DefaultMaxHeaderBytes, - nil, - nil, + nil, nil, ) if err != nil { t.Fatalf("Test %d: error: expected = , got = %v", i+1, err) @@ -359,8 +317,7 @@ func TestHTTPListenerAddrs(t *testing.T) { time.Duration(0), time.Duration(0), DefaultMaxHeaderBytes, - nil, - nil, + nil, nil, ) if err != nil { t.Fatalf("Test %d: error: expected = , got = %v", i+1, err) @@ -380,149 +337,6 @@ func TestHTTPListenerAddrs(t *testing.T) { } } -func TestHTTPListenerAccept(t *testing.T) { - tlsConfig := getTLSConfig(t) - nonLoopBackIP := getNonLoopBackIP(t) - - testCases := []struct { - serverAddrs []string - tlsConfig *tls.Config - request string - reply string - expectedRequestLine string - }{ - {[]string{"localhost:0"}, nil, "GET / HTTP/1.0\r\nHost: example.org\r\n\r\n", "200 OK\r\n", "GET / HTTP/1.0\r\n"}, - {[]string{nonLoopBackIP + ":0"}, nil, "POST / HTTP/1.0\r\nHost: example.org\r\n\r\n", "200 OK\r\n", "POST / HTTP/1.0\r\n"}, - {[]string{nonLoopBackIP + ":0"}, nil, "HEAD / HTTP/1.0\r\nhost: example.org\r\n\r\n", "200 OK\r\n", "HEAD / HTTP/1.0\r\n"}, - {[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, nil, "CONNECT / HTTP/1.0\r\nHost: www.example.org\r\n\r\n", "200 OK\r\n", "CONNECT / HTTP/1.0\r\n"}, - {[]string{"localhost:0"}, tlsConfig, "GET / HTTP/1.0\r\nHost: example.org\r\n\r\n", "200 OK\r\n", "GET / HTTP/1.0\r\n"}, - {[]string{nonLoopBackIP + ":0"}, tlsConfig, "POST / HTTP/1.0\r\nHost: example.org\r\n\r\n", "200 OK\r\n", "POST / HTTP/1.0\r\n"}, - {[]string{nonLoopBackIP + ":0"}, tlsConfig, "HEAD / HTTP/1.0\r\nhost: example.org\r\n\r\n", "200 OK\r\n", "HEAD / HTTP/1.0\r\n"}, - {[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, tlsConfig, "CONNECT / HTTP/1.0\r\nHost: www.example.org\r\n\r\n", "200 OK\r\n", "CONNECT / HTTP/1.0\r\n"}, - } - - for i, testCase := range testCases { - listener, err := newHTTPListener( - testCase.serverAddrs, - testCase.tlsConfig, - time.Duration(0), - time.Duration(0), - time.Duration(0), - DefaultMaxHeaderBytes, - nil, - nil, - ) - if err != nil { - t.Fatalf("Test %d: error: expected = , got = %v", i+1, err) - } - - for _, serverAddr := range listener.Addrs() { - var conn net.Conn - var err error - - if testCase.tlsConfig == nil { - conn, err = net.Dial("tcp", serverAddr.String()) - } else { - conn, err = tls.Dial("tcp", serverAddr.String(), &tls.Config{InsecureSkipVerify: true}) - } - if err != nil { - t.Fatalf("Test %d: error: expected = , got = %v", i+1, err) - } - - if _, err = io.WriteString(conn, testCase.request); err != nil { - t.Fatalf("Test %d: request send: expected = , got = %v", i+1, err) - } - - serverConn, err := listener.Accept() - if err != nil { - t.Fatalf("Test %d: accept: expected = , got = %v", i+1, err) - } - - requestLine, err := bufio.NewReader(serverConn).ReadString('\n') - if err != nil { - t.Fatalf("Test %d: request read: expected = , got = %v", i+1, err) - } - - if requestLine != testCase.expectedRequestLine { - t.Fatalf("Test %d: request: expected = %v, got = %v", i+1, testCase.expectedRequestLine, requestLine) - } - - if _, err = io.WriteString(serverConn, testCase.reply); err != nil { - t.Fatalf("Test %d: reply send: expected = , got = %v", i+1, err) - } - - reply, err := bufio.NewReader(conn).ReadString('\n') - if err != nil { - t.Fatalf("Test %d: reply read: expected = , got = %v", i+1, err) - } - - if testCase.reply != reply { - t.Fatalf("Test %d: reply: expected = %v, got = %v", i+1, testCase.reply, reply) - } - - serverConn.Close() - conn.Close() - } - - listener.Close() - } -} - -func TestHTTPListenerAcceptPeekError(t *testing.T) { - tlsConfig := getTLSConfig(t) - nonLoopBackIP := getNonLoopBackIP(t) - - testCases := []struct { - serverAddrs []string - tlsConfig *tls.Config - request string - }{ - {[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, nil, "CONN"}, - {[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, tlsConfig, "CONN"}, - } - - for i, testCase := range testCases { - listener, err := newHTTPListener( - testCase.serverAddrs, - testCase.tlsConfig, - time.Duration(0), - time.Duration(0), - time.Duration(0), - DefaultMaxHeaderBytes, - nil, - nil, - ) - if err != nil { - t.Fatalf("Test %d: error: expected = , got = %v", i+1, err) - } - - go func() { - serverConn, aerr := listener.Accept() - if aerr == nil { - fail(t, "Test %d: accept: expected = , got = ", i+1) - } - if serverConn != nil { - fail(t, "Test %d: accept: server expected = , got = %v", i+1, serverConn) - } - }() - - for _, serverAddr := range listener.Addrs() { - conn, err := net.Dial("tcp", serverAddr.String()) - if err != nil { - t.Fatalf("Test %d: error: expected = , got = %v", i+1, err) - } - - if _, err = io.WriteString(conn, testCase.request); err != nil { - t.Fatalf("Test %d: request send: expected = , got = %v", i+1, err) - } - - conn.Close() - } - - listener.Close() - } -} - func TestHTTPListenerAcceptTLSError(t *testing.T) { tlsConfig := getTLSConfig(t) nonLoopBackIP := getNonLoopBackIP(t) @@ -543,8 +357,7 @@ func TestHTTPListenerAcceptTLSError(t *testing.T) { time.Duration(0), time.Duration(0), DefaultMaxHeaderBytes, - nil, - nil, + nil, nil, ) if err != nil { t.Fatalf("Test %d: error: expected = , got = %v", i+1, err) @@ -587,200 +400,6 @@ func TestHTTPListenerAcceptTLSError(t *testing.T) { } } -func TestHTTPListenerAcceptError(t *testing.T) { - tlsConfig := getTLSConfig(t) - nonLoopBackIP := getNonLoopBackIP(t) - - testCases := []struct { - serverAddrs []string - tlsConfig *tls.Config - secureClient bool - request string - }{ - {[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, nil, false, "CONNECTION"}, - {[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, tlsConfig, false, "CONNECTION"}, - {[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, tlsConfig, true, "CONNECTION"}, - } - - for i, testCase := range testCases { - listener, err := newHTTPListener( - testCase.serverAddrs, - testCase.tlsConfig, - time.Duration(0), - time.Duration(0), - time.Duration(0), - DefaultMaxHeaderBytes, - nil, - nil, - ) - if err != nil { - t.Fatalf("Test %d: error: expected = , got = %v", i+1, err) - } - - for _, serverAddr := range listener.Addrs() { - var conn net.Conn - var err error - - if testCase.secureClient { - conn, err = tls.Dial("tcp", serverAddr.String(), &tls.Config{InsecureSkipVerify: true}) - } else { - conn, err = net.Dial("tcp", serverAddr.String()) - } - if err != nil { - t.Fatalf("Test %d: error: expected = , got = %v", i+1, err) - } - - if _, err = io.WriteString(conn, testCase.request); err != nil { - t.Fatalf("Test %d: request send: expected = , got = %v", i+1, err) - } - - go func() { - serverConn, aerr := listener.Accept() - if aerr == nil { - fail(t, "Test %d: accept: expected = , got = ", i+1) - } - if serverConn != nil { - fail(t, "Test %d: accept: server expected = , got = %v", i+1, serverConn) - } - }() - - if !testCase.secureClient && testCase.tlsConfig != nil { - buf := make([]byte, len(sslRequiredErrMsg)) - var n int - n, err = io.ReadFull(conn, buf) - if err != nil { - t.Fatalf("Test %d: reply read: expected = got = %v", i+1, err) - } else if n != len(buf) { - t.Fatalf("Test %d: reply length: expected = %v got = %v", i+1, len(buf), n) - } else if !bytes.Equal(buf, sslRequiredErrMsg) { - t.Fatalf("Test %d: reply: expected = %v got = %v", i+1, string(sslRequiredErrMsg), string(buf)) - } - continue - } - - if _, err = bufio.NewReader(conn).ReadString('\n'); err != io.EOF { - t.Errorf("Test %d: reply read: expected = io.EOF, got = %s", i+1, err) - } - - conn.Close() - } - - listener.Close() - } -} - -func TestHTTPListenerAcceptParallel(t *testing.T) { - tlsConfig := getTLSConfig(t) - nonLoopBackIP := getNonLoopBackIP(t) - - testCases := []struct { - serverAddrs []string - tlsConfig *tls.Config - reply string - }{ - {[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, nil, "200 OK\n"}, - {[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, tlsConfig, "200 OK\n"}, - } - - connect := func(i int, serverAddr string, secure bool, delay bool, request, reply string) { - var conn net.Conn - var err error - - if secure { - conn, err = tls.Dial("tcp", serverAddr, &tls.Config{InsecureSkipVerify: true}) - } else { - conn, err = net.Dial("tcp", serverAddr) - } - if err != nil { - fail(t, "Test %d: error: expected = , got = %v", i+1, err) - } - - if delay { - if _, err = io.WriteString(conn, request[:3]); err != nil { - fail(t, "Test %d: request send: expected = , got = %v", i+1, err) - } - time.Sleep(1 * time.Second) - if _, err = io.WriteString(conn, request[3:]); err != nil { - fail(t, "Test %d: request send: expected = , got = %v", i+1, err) - } - } else { - if _, err = io.WriteString(conn, request); err != nil { - fail(t, "Test %d: request send: expected = , got = %v", i+1, err) - } - } - - received, err := bufio.NewReader(conn).ReadString('\n') - if err != nil { - fail(t, "Test %d: reply read: expected = , got = %v", i+1, err) - } - if received != reply { - fail(t, "Test %d: reply: expected = %v, got = %v", i+1, reply, received) - } - - conn.Close() - } - - handleConnection := func(i int, wg *sync.WaitGroup, serverConn net.Conn, request, reply string) { - defer wg.Done() - - received, err := bufio.NewReader(serverConn).ReadString('\n') - if err != nil { - fail(t, "Test %d: request read: expected = , got = %v", i+1, err) - } - - if received != request { - fail(t, "Test %d: request: expected = %v, got = %v", i+1, request, received) - } - - if _, err := io.WriteString(serverConn, reply); err != nil { - fail(t, "Test %d: reply send: expected = , got = %v", i+1, err) - } - - serverConn.Close() - } - - for i, testCase := range testCases { - listener, err := newHTTPListener( - testCase.serverAddrs, - testCase.tlsConfig, - time.Duration(0), - time.Duration(0), - time.Duration(0), - DefaultMaxHeaderBytes, - nil, - nil, - ) - if err != nil { - t.Fatalf("Test %d: error: expected = , got = %v", i+1, err) - } - - for _, serverAddr := range listener.Addrs() { - go connect(i, serverAddr.String(), testCase.tlsConfig != nil, true, "GET /1 HTTP/1.0\r\nHost: example.org\r\nr\n", testCase.reply) - go connect(i, serverAddr.String(), testCase.tlsConfig != nil, false, "GET /2 HTTP/1.0\r\nHost: example.org\r\n\r\n", testCase.reply) - - var wg sync.WaitGroup - - serverConn, err := listener.Accept() - if err != nil { - t.Fatalf("Test %d: accept: expected = , got = %v", i+1, err) - } - wg.Add(1) - go handleConnection(i, &wg, serverConn, "GET /2 HTTP/1.0\r\n", testCase.reply) - - serverConn, err = listener.Accept() - if err != nil { - t.Fatalf("Test %d: accept: expected = , got = %v", i+1, err) - } - wg.Add(1) - go handleConnection(i, &wg, serverConn, "GET /1 HTTP/1.0\r\n", testCase.reply) - - wg.Wait() - } - - listener.Close() - } -} - type myTimeoutErr struct { timeout bool } diff --git a/cmd/http/server.go b/cmd/http/server.go index d4ee5e484..e4db87d51 100644 --- a/cmd/http/server.go +++ b/cmd/http/server.go @@ -51,15 +51,15 @@ const ( // Server - extended http.Server supports multiple addresses to serve and enhanced connection handling. type Server struct { http.Server - Addrs []string // addresses on which the server listens for new connection. - ShutdownTimeout time.Duration // timeout used for graceful server shutdown. - TCPKeepAliveTimeout time.Duration // timeout used for underneath TCP connection. - UpdateBytesReadFunc func(*http.Request, int) // function to be called to update bytes read in bufConn. - UpdateBytesWrittenFunc func(*http.Request, int) // function to be called to update bytes written in bufConn. - listenerMutex *sync.Mutex // to guard 'listener' field. - listener *httpListener // HTTP listener for all 'Addrs' field. - inShutdown uint32 // indicates whether the server is in shutdown or not - requestCount int32 // counter holds no. of request in progress. + Addrs []string // addresses on which the server listens for new connection. + ShutdownTimeout time.Duration // timeout used for graceful server shutdown. + TCPKeepAliveTimeout time.Duration // timeout used for underneath TCP connection. + UpdateBytesReadFunc func(int) // function to be called to update bytes read in bufConn. + UpdateBytesWrittenFunc func(int) // function to be called to update bytes written in bufConn. + listenerMutex *sync.Mutex // to guard 'listener' field. + listener *httpListener // HTTP listener for all 'Addrs' field. + inShutdown uint32 // indicates whether the server is in shutdown or not + requestCount int32 // counter holds no. of request in progress. } // GetRequestCount - returns number of request in progress. @@ -80,8 +80,6 @@ func (srv *Server) Start() (err error) { addrs := set.CreateStringSet(srv.Addrs...).ToSlice() // copy and remove duplicates tcpKeepAliveTimeout := srv.TCPKeepAliveTimeout - updateBytesReadFunc := srv.UpdateBytesReadFunc - updateBytesWrittenFunc := srv.UpdateBytesWrittenFunc // Create new HTTP listener. var listener *httpListener @@ -92,8 +90,8 @@ func (srv *Server) Start() (err error) { readTimeout, writeTimeout, srv.MaxHeaderBytes, - updateBytesReadFunc, - updateBytesWrittenFunc, + srv.UpdateBytesReadFunc, + srv.UpdateBytesWrittenFunc, ) if err != nil { return err @@ -121,6 +119,9 @@ func (srv *Server) Start() (err error) { srv.listenerMutex.Unlock() // Start servicing with listener. + if tlsConfig != nil { + return srv.Server.Serve(tls.NewListener(listener, tlsConfig)) + } return srv.Server.Serve(listener) } @@ -192,7 +193,7 @@ func NewServer(addrs []string, handler http.Handler, getCert certs.GetCertificat CipherSuites: defaultCipherSuites, CurvePreferences: secureCurves, MinVersion: tls.VersionTLS12, - NextProtos: []string{"http/1.1"}, + NextProtos: []string{"h2", "http/1.1"}, } tlsConfig.GetCertificate = getCert } diff --git a/cmd/object-api-common.go b/cmd/object-api-common.go index 81265bbda..6e6241911 100644 --- a/cmd/object-api-common.go +++ b/cmd/object-api-common.go @@ -100,7 +100,7 @@ func newStorageAPI(endpoint Endpoint) (storage StorageAPI, err error) { return newPosix(endpoint.Path) } - return newStorageRESTClient(endpoint), nil + return newStorageRESTClient(endpoint) } // Cleanup a directory recursively. diff --git a/cmd/rest/client.go b/cmd/rest/client.go index 16ca2fc40..5afc2d462 100644 --- a/cmd/rest/client.go +++ b/cmd/rest/client.go @@ -28,6 +28,7 @@ import ( "time" xhttp "github.com/minio/minio/cmd/http" + "golang.org/x/net/http2" ) // DefaultRESTTimeout - default RPC timeout is one minute. @@ -95,7 +96,7 @@ func newCustomDialContext(timeout time.Duration) func(ctx context.Context, netwo } // NewClient - returns new REST client. -func NewClient(url *url.URL, tlsConfig *tls.Config, timeout time.Duration, newAuthToken func() string) *Client { +func NewClient(url *url.URL, tlsConfig *tls.Config, timeout time.Duration, newAuthToken func() string) (*Client, error) { // Transport is exactly same as Go default in https://golang.org/pkg/net/http/#RoundTripper // except custom DialContext and TLSClientConfig. tr := &http.Transport{ @@ -103,17 +104,22 @@ func NewClient(url *url.URL, tlsConfig *tls.Config, timeout time.Duration, newAu DialContext: newCustomDialContext(timeout), MaxIdleConnsPerHost: 4096, MaxIdleConns: 4096, - IdleConnTimeout: 90 * time.Second, - TLSHandshakeTimeout: 10 * time.Second, - ExpectContinueTimeout: 1 * time.Second, + IdleConnTimeout: 120 * time.Second, + TLSHandshakeTimeout: 30 * time.Second, + ExpectContinueTimeout: 10 * time.Second, TLSClientConfig: tlsConfig, DisableCompression: true, } - + if tlsConfig != nil { + // If TLS is enabled configure http2 + if err := http2.ConfigureTransport(tr); err != nil { + return nil, err + } + } return &Client{ httpClient: &http.Client{Transport: tr}, httpIdleConnsCloser: tr.CloseIdleConnections, url: url, newAuthToken: newAuthToken, - } + }, nil } diff --git a/cmd/rpc.go b/cmd/rpc.go index d8bd9dc34..79bfe8940 100644 --- a/cmd/rpc.go +++ b/cmd/rpc.go @@ -268,9 +268,14 @@ func NewRPCClient(args RPCClientArgs) (*RPCClient, error) { return nil, err } + rpcClient, err := xrpc.NewClient(args.ServiceURL, args.TLSConfig, defaultRPCTimeout) + if err != nil { + return nil, err + } + return &RPCClient{ args: args, authToken: args.NewAuthTokenFunc(), - rpcClient: xrpc.NewClient(args.ServiceURL, args.TLSConfig, defaultRPCTimeout), + rpcClient: rpcClient, }, nil } diff --git a/cmd/rpc/client.go b/cmd/rpc/client.go index a7a4101c8..cc61d21b3 100644 --- a/cmd/rpc/client.go +++ b/cmd/rpc/client.go @@ -29,6 +29,7 @@ import ( xhttp "github.com/minio/minio/cmd/http" xnet "github.com/minio/minio/pkg/net" + "golang.org/x/net/http2" ) // DefaultRPCTimeout - default RPC timeout is one minute. @@ -36,8 +37,9 @@ const DefaultRPCTimeout = 1 * time.Minute // Client - http based RPC client. type Client struct { - httpClient *http.Client - serviceURL *xnet.URL + httpClient *http.Client + httpIdleConnsCloser func() + serviceURL *xnet.URL } // Call - calls service method on RPC server. @@ -87,8 +89,11 @@ func (client *Client) Call(serviceMethod string, args, reply interface{}) error return gobDecode(callResponse.ReplyBytes, reply) } -// Close - does nothing and presents for interface compatibility. +// Close closes all idle connections of the underlying http client func (client *Client) Close() error { + if client.httpIdleConnsCloser != nil { + client.httpIdleConnsCloser() + } return nil } @@ -110,23 +115,29 @@ func newCustomDialContext(timeout time.Duration) func(ctx context.Context, netwo } // NewClient - returns new RPC client. -func NewClient(serviceURL *xnet.URL, tlsConfig *tls.Config, timeout time.Duration) *Client { - return &Client{ - httpClient: &http.Client{ - // Transport is exactly same as Go default in https://golang.org/pkg/net/http/#RoundTripper - // except custom DialContext and TLSClientConfig. - Transport: &http.Transport{ - Proxy: http.ProxyFromEnvironment, - DialContext: newCustomDialContext(timeout), - MaxIdleConnsPerHost: 4096, - MaxIdleConns: 4096, - IdleConnTimeout: 90 * time.Second, - TLSHandshakeTimeout: 10 * time.Second, - ExpectContinueTimeout: 1 * time.Second, - TLSClientConfig: tlsConfig, - DisableCompression: true, - }, - }, - serviceURL: serviceURL, +func NewClient(serviceURL *xnet.URL, tlsConfig *tls.Config, timeout time.Duration) (*Client, error) { + // Transport is exactly same as Go default in https://golang.org/pkg/net/http/#RoundTripper + // except custom DialContext and TLSClientConfig. + tr := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: newCustomDialContext(timeout), + MaxIdleConnsPerHost: 4096, + MaxIdleConns: 4096, + IdleConnTimeout: 120 * time.Second, + TLSHandshakeTimeout: 30 * time.Second, + ExpectContinueTimeout: 10 * time.Second, + TLSClientConfig: tlsConfig, + DisableCompression: true, + } + if tlsConfig != nil { + // If TLS is enabled configure http2 + if err := http2.ConfigureTransport(tr); err != nil { + return nil, err + } } + return &Client{ + httpClient: &http.Client{Transport: tr}, + httpIdleConnsCloser: tr.CloseIdleConnections, + serviceURL: serviceURL, + }, nil } diff --git a/cmd/rpc/client_test.go b/cmd/rpc/client_test.go index 2cd069d37..234eab68e 100644 --- a/cmd/rpc/client_test.go +++ b/cmd/rpc/client_test.go @@ -39,7 +39,10 @@ func TestClientCall(t *testing.T) { if err != nil { t.Fatalf("unexpected error %v", err) } - rpcClient := NewClient(url, nil, DefaultRPCTimeout) + rpcClient, err := NewClient(url, nil, DefaultRPCTimeout) + if err != nil { + t.Fatalf("NewClient initialization error %v", err) + } var reply int var boolReply bool diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 82c3e632b..c5c436164 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -34,7 +34,6 @@ import ( "strings" "github.com/minio/minio/cmd/http" - "github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/rest" xnet "github.com/minio/minio/pkg/net" ) @@ -381,9 +380,11 @@ func (client *storageRESTClient) Close() error { } // Returns a storage rest client. -func newStorageRESTClient(endpoint Endpoint) *storageRESTClient { +func newStorageRESTClient(endpoint Endpoint) (*storageRESTClient, error) { host, err := xnet.ParseHost(endpoint.Host) - logger.FatalIf(err, "Unable to parse storage Host") + if err != nil { + return nil, err + } scheme := "http" if globalIsSSL { @@ -404,8 +405,11 @@ func newStorageRESTClient(endpoint Endpoint) *storageRESTClient { } } - restClient := rest.NewClient(serverURL, tlsConfig, storageRESTTimeout, newAuthToken) + restClient, err := rest.NewClient(serverURL, tlsConfig, storageRESTTimeout, newAuthToken) + if err != nil { + return nil, err + } client := &storageRESTClient{endpoint: endpoint, restClient: restClient, connected: true} client.connected = client.getInstanceID() == nil - return client + return client, nil } diff --git a/cmd/storage-rest_test.go b/cmd/storage-rest_test.go index 5acf44352..e014c8bf7 100644 --- a/cmd/storage-rest_test.go +++ b/cmd/storage-rest_test.go @@ -509,7 +509,10 @@ func newStorageRESTHTTPServerClient(t *testing.T) (*httptest.Server, *storageRES } registerStorageRESTHandlers(router, EndpointList{endpoint}) - restClient := newStorageRESTClient(endpoint) + restClient, err := newStorageRESTClient(endpoint) + if err != nil { + t.Fatalf("newStorageRESTClient failed for %v, with error %s", endpoint, err) + } prevGlobalServerConfig := globalServerConfig globalServerConfig = newServerConfig()