Add RPC counters for HTTP stats. (#6206)

This patch introduces separate counters for HTTP stats for minio
reserved bucket.

Fixes #6158
master
Bala FA 6 years ago committed by Nitish Tiwari
parent 5399d91965
commit 72fa2b4537
  1. 33
      cmd/http-stats.go
  2. 24
      cmd/http/bufconn.go
  3. 2
      cmd/http/bufconn_test.go
  4. 108
      cmd/http/listener.go
  5. 61
      cmd/http/listener_test.go
  6. 5
      cmd/http/server.go

@ -19,6 +19,7 @@ package cmd
import (
"fmt"
"net/http"
"strings"
"time"
"github.com/prometheus/client_golang/prometheus"
@ -26,23 +27,51 @@ import (
"go.uber.org/atomic"
)
func getRequestResource(r *http.Request) string {
if r == nil {
// http.Request is nil when non-HTTP data (like TLS record) is read/written.
return ""
}
if globalDomainName != "" {
host := r.Header.Get("Host")
if strings.HasSuffix(host, "."+globalDomainName) {
return "/" + strings.TrimSuffix(host, "."+globalDomainName) + r.URL.Path
}
}
return r.URL.Path
}
// ConnStats - Network statistics
// Count total input/output transferred bytes during
// the server's life.
type ConnStats struct {
totalInputBytes atomic.Uint64
totalOutputBytes atomic.Uint64
totalRPCInputBytes atomic.Uint64
totalRPCOutputBytes atomic.Uint64
}
// Increase total input bytes
func (s *ConnStats) incInputBytes(n int) {
func (s *ConnStats) incInputBytes(r *http.Request, n int) {
resource := getRequestResource(r)
if resource == minioReservedBucketPath || strings.HasPrefix(resource, minioReservedBucketPath+"/") {
s.totalRPCInputBytes.Add(uint64(n))
} else {
s.totalInputBytes.Add(uint64(n))
}
}
// Increase total output bytes
func (s *ConnStats) incOutputBytes(n int) {
func (s *ConnStats) incOutputBytes(r *http.Request, n int) {
resource := getRequestResource(r)
if resource == minioReservedBucketPath || strings.HasPrefix(resource, minioReservedBucketPath+"/") {
s.totalRPCOutputBytes.Add(uint64(n))
} else {
s.totalOutputBytes.Add(uint64(n))
}
}
// Return total input bytes
func (s *ConnStats) getTotalInputBytes() uint64 {

@ -19,6 +19,7 @@ package http
import (
"bufio"
"net"
"net/http"
"time"
)
@ -28,8 +29,18 @@ type BufConn struct {
bufReader *bufio.Reader // buffered reader wraps reader in net.Conn.
readTimeout time.Duration // sets the read timeout in the connection.
writeTimeout time.Duration // sets the write timeout in the connection.
updateBytesReadFunc func(int) // function to be called to update bytes read.
updateBytesWrittenFunc func(int) // function to be called to update bytes written.
request *http.Request // HTTP request information.
updateBytesReadFunc func(*http.Request, int) // function to be called to update bytes read.
updateBytesWrittenFunc func(*http.Request, int) // function to be called to update bytes written.
}
func (c *BufConn) setRequest(request *http.Request) {
c.request = request
}
func (c *BufConn) setUpdateFuncs(updateBytesReadFunc, updateBytesWrittenFunc func(*http.Request, int)) {
c.updateBytesReadFunc = updateBytesReadFunc
c.updateBytesWrittenFunc = updateBytesWrittenFunc
}
// Sets read timeout
@ -70,7 +81,7 @@ func (c *BufConn) Read(b []byte) (n int, err error) {
c.setReadTimeout()
n, err = c.bufReader.Read(b)
if err == nil && c.updateBytesReadFunc != nil {
c.updateBytesReadFunc(n)
c.updateBytesReadFunc(c.request, n)
}
return n, err
@ -81,21 +92,18 @@ func (c *BufConn) Write(b []byte) (n int, err error) {
c.setWriteTimeout()
n, err = c.Conn.Write(b)
if err == nil && c.updateBytesWrittenFunc != nil {
c.updateBytesWrittenFunc(n)
c.updateBytesWrittenFunc(c.request, n)
}
return n, err
}
// newBufConn - creates a new connection object wrapping net.Conn.
func newBufConn(c net.Conn, readTimeout, writeTimeout time.Duration,
updateBytesReadFunc, updateBytesWrittenFunc func(int)) *BufConn {
func newBufConn(c net.Conn, readTimeout, writeTimeout time.Duration) *BufConn {
return &BufConn{
QuirkConn: QuirkConn{Conn: c},
bufReader: bufio.NewReader(c),
readTimeout: readTimeout,
writeTimeout: writeTimeout,
updateBytesReadFunc: updateBytesReadFunc,
updateBytesWrittenFunc: updateBytesWrittenFunc,
}
}

@ -48,7 +48,7 @@ func TestBuffConnReadTimeout(t *testing.T) {
if terr != nil {
t.Fatalf("failed to accept new connection. %v", terr)
}
bufconn := newBufConn(tcpConn, 1*time.Second, 1*time.Second, nil, nil)
bufconn := newBufConn(tcpConn, 1*time.Second, 1*time.Second)
defer bufconn.Close()
// Read a line

@ -23,6 +23,7 @@ import (
"io"
"net"
"net/http"
"net/url"
"os"
"strings"
"sync"
@ -72,6 +73,41 @@ func isHTTPMethod(s string) bool {
return false
}
func getResourceHost(bufConn *BufConn, maxHeaderBytes, methodLen int) (resource string, host string, err error) {
var data []byte
for dataLen := 0; methodLen+dataLen < maxHeaderBytes; dataLen += 8 {
if data, err = bufConn.Peek(methodLen + dataLen); err != nil {
return "", "", err
}
tokens := strings.Split(string(data), "\n")
if len(tokens) < 2 {
continue
}
if resource == "" {
resource = strings.SplitN(tokens[0][methodLen:], " ", 2)[0]
}
for _, token := range tokens[1:] {
if token == "" || !strings.HasSuffix(token, "\r") {
continue
}
if strings.HasPrefix(token, "Host: ") {
host = strings.TrimPrefix(strings.TrimSuffix(token, "\r"), "Host: ")
return resource, host, nil
}
}
if tokens[len(tokens)-1] == "\r" {
break
}
}
return resource, host, nil
}
type acceptResult struct {
conn net.Conn
err error
@ -87,8 +123,9 @@ type httpListener struct {
tcpKeepAliveTimeout time.Duration
readTimeout time.Duration
writeTimeout time.Duration
updateBytesReadFunc func(int) // function to be called to update bytes read in BufConn.
updateBytesWrittenFunc func(int) // function to be called to update bytes written in BufConn.
maxHeaderBytes int
updateBytesReadFunc func(*http.Request, int) // function to be called to update bytes read in BufConn.
updateBytesWrittenFunc func(*http.Request, int) // function to be called to update bytes written in BufConn.
}
// isRoutineNetErr returns true if error is due to a network timeout,
@ -134,8 +171,7 @@ func (listener *httpListener) start() {
tcpConn.SetKeepAlive(true)
tcpConn.SetKeepAlivePeriod(listener.tcpKeepAliveTimeout)
bufconn := newBufConn(tcpConn, listener.readTimeout, listener.writeTimeout,
listener.updateBytesReadFunc, listener.updateBytesWrittenFunc)
bufconn := newBufConn(tcpConn, listener.readTimeout, listener.writeTimeout)
// Peek bytes of maximum length of all HTTP methods.
data, err := bufconn.Peek(methodMaxLen)
@ -158,14 +194,39 @@ func (listener *httpListener) start() {
// Return bufconn if read data is a valid HTTP method.
tokens := strings.SplitN(string(data), " ", 2)
if isHTTPMethod(tokens[0]) {
if listener.tlsConfig == nil {
send(acceptResult{bufconn, nil}, doneCh)
} else {
if listener.tlsConfig != nil {
// As TLS is configured and we got plain text HTTP request,
// return 403 (forbidden) error.
bufconn.Write(sslRequiredErrMsg)
bufconn.Close()
return
}
var resource, host string
if resource, host, err = getResourceHost(bufconn, listener.maxHeaderBytes, len(tokens[0])+1); err != nil {
if !isRoutineNetErr(err) {
reqInfo := (&logger.ReqInfo{}).AppendTags("remoteAddr", bufconn.RemoteAddr().String())
reqInfo.AppendTags("localAddr", bufconn.LocalAddr().String())
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
}
bufconn.Close()
return
}
header := make(http.Header)
if host != "" {
header.Add("Host", host)
}
bufconn.setRequest(&http.Request{
Method: tokens[0],
URL: &url.URL{Path: resource},
Host: bufconn.LocalAddr().String(),
Header: header,
})
bufconn.setUpdateFuncs(listener.updateBytesReadFunc, listener.updateBytesWrittenFunc)
send(acceptResult{bufconn, nil}, doneCh)
return
}
@ -182,8 +243,7 @@ func (listener *httpListener) start() {
}
// Check whether the connection contains HTTP request or not.
bufconn = newBufConn(tlsConn, listener.readTimeout, listener.writeTimeout,
listener.updateBytesReadFunc, listener.updateBytesWrittenFunc)
bufconn = newBufConn(tlsConn, listener.readTimeout, listener.writeTimeout)
// Peek bytes of maximum length of all HTTP methods.
data, err = bufconn.Peek(methodMaxLen)
@ -201,6 +261,30 @@ func (listener *httpListener) start() {
// Return bufconn if read data is a valid HTTP method.
tokens := strings.SplitN(string(data), " ", 2)
if isHTTPMethod(tokens[0]) {
var resource, host string
if resource, host, err = getResourceHost(bufconn, listener.maxHeaderBytes, len(tokens[0])+1); err != nil {
if !isRoutineNetErr(err) {
reqInfo := (&logger.ReqInfo{}).AppendTags("remoteAddr", bufconn.RemoteAddr().String())
reqInfo.AppendTags("localAddr", bufconn.LocalAddr().String())
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
}
bufconn.Close()
return
}
header := make(http.Header)
if host != "" {
header.Add("Host", host)
}
bufconn.setRequest(&http.Request{
Method: tokens[0],
URL: &url.URL{Path: resource},
Host: bufconn.LocalAddr().String(),
Header: header,
})
bufconn.setUpdateFuncs(listener.updateBytesReadFunc, listener.updateBytesWrittenFunc)
send(acceptResult{bufconn, nil}, doneCh)
return
}
@ -296,8 +380,9 @@ func newHTTPListener(serverAddrs []string,
tcpKeepAliveTimeout time.Duration,
readTimeout time.Duration,
writeTimeout time.Duration,
updateBytesReadFunc func(int),
updateBytesWrittenFunc func(int)) (listener *httpListener, err error) {
maxHeaderBytes int,
updateBytesReadFunc func(*http.Request, int),
updateBytesWrittenFunc func(*http.Request, int)) (listener *httpListener, err error) {
var tcpListeners []*net.TCPListener
@ -333,6 +418,7 @@ func newHTTPListener(serverAddrs []string,
tcpKeepAliveTimeout: tcpKeepAliveTimeout,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
maxHeaderBytes: maxHeaderBytes,
updateBytesReadFunc: updateBytesReadFunc,
updateBytesWrittenFunc: updateBytesWrittenFunc,
}

@ -19,12 +19,12 @@ package http
import (
"bufio"
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"net/http"
"runtime"
"strconv"
"strings"
@ -212,7 +212,7 @@ func TestNewHTTPListener(t *testing.T) {
}
remoteUnknownErr := "lookup unknown-host" + errMsg
if runtime.GOOS == "wpindows" {
if runtime.GOOS == "windows" {
remoteUnknownErr = "listen tcp: lookup unknown-host" + errMsg
}
@ -224,19 +224,18 @@ func TestNewHTTPListener(t *testing.T) {
tcpKeepAliveTimeout time.Duration
readTimeout time.Duration
writeTimeout time.Duration
updateBytesReadFunc func(int)
updateBytesWrittenFunc func(int)
errorLogFunc func(context.Context, error)
updateBytesReadFunc func(*http.Request, int)
updateBytesWrittenFunc func(*http.Request, int)
expectedErr error
}{
{[]string{"93.184.216.34:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, nil, errors.New(remoteAddrErrMsgIP)},
{[]string{"example.org:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, nil, errors.New(remoteAddrErrMsgHost)},
{[]string{"unknown-host"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, nil, errors.New(remoteMissingErr)},
{[]string{"unknown-host:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, nil, errors.New(remoteUnknownErr)},
{[]string{"localhost:65432", "93.184.216.34:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, nil, errors.New(remoteAddrErrMsgIP)},
{[]string{"localhost:65432", "unknown-host:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, nil, errors.New(remoteUnknownErr)},
{[]string{"localhost:0"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, nil, nil},
{[]string{"localhost:0"}, tlsConfig, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, nil, nil},
{[]string{"93.184.216.34:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, errors.New(remoteAddrErrMsgIP)},
{[]string{"example.org:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, errors.New(remoteAddrErrMsgHost)},
{[]string{"unknown-host"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, errors.New(remoteMissingErr)},
{[]string{"unknown-host:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, errors.New(remoteUnknownErr)},
{[]string{"localhost:65432", "93.184.216.34:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, errors.New(remoteAddrErrMsgIP)},
{[]string{"localhost:65432", "unknown-host:65432"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, errors.New(remoteUnknownErr)},
{[]string{"localhost:0"}, nil, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, nil},
{[]string{"localhost:0"}, tlsConfig, time.Duration(0), time.Duration(0), time.Duration(0), nil, nil, nil},
}
for _, testCase := range testCases {
@ -246,6 +245,7 @@ func TestNewHTTPListener(t *testing.T) {
testCase.tcpKeepAliveTimeout,
testCase.readTimeout,
testCase.writeTimeout,
DefaultMaxHeaderBytes,
testCase.updateBytesReadFunc,
testCase.updateBytesWrittenFunc,
)
@ -297,6 +297,7 @@ func TestHTTPListenerStartClose(t *testing.T) {
time.Duration(0),
time.Duration(0),
time.Duration(0),
DefaultMaxHeaderBytes,
nil,
nil,
)
@ -344,6 +345,7 @@ func TestHTTPListenerAddr(t *testing.T) {
time.Duration(0),
time.Duration(0),
time.Duration(0),
DefaultMaxHeaderBytes,
nil,
nil,
)
@ -388,6 +390,7 @@ func TestHTTPListenerAddrs(t *testing.T) {
time.Duration(0),
time.Duration(0),
time.Duration(0),
DefaultMaxHeaderBytes,
nil,
nil,
)
@ -418,13 +421,14 @@ func TestHTTPListenerAccept(t *testing.T) {
tlsConfig *tls.Config
request string
reply string
expectedRequestLine string
}{
{[]string{"localhost:0"}, nil, "GET / HTTP/1.0\n", "200 OK\n"},
{[]string{nonLoopBackIP + ":0"}, nil, "POST / HTTP/1.0\n", "200 OK\n"},
{[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, nil, "CONNECT \n", "200 OK\n"},
{[]string{"localhost:0"}, tlsConfig, "GET / HTTP/1.0\n", "200 OK\n"},
{[]string{nonLoopBackIP + ":0"}, tlsConfig, "POST / HTTP/1.0\n", "200 OK\n"},
{[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, tlsConfig, "CONNECT \n", "200 OK\n"},
{[]string{"localhost:0"}, nil, "GET / HTTP/1.0\r\nHost: example.org\r\n\r\n", "200 OK\r\n", "GET / HTTP/1.0\r\n"},
{[]string{nonLoopBackIP + ":0"}, nil, "POST / HTTP/1.0\r\nHost: example.org\r\n\r\n", "200 OK\r\n", "POST / HTTP/1.0\r\n"},
{[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, nil, "CONNECT \r\nHost: www.example.org\r\n\r\n", "200 OK\r\n", "CONNECT \r\n"},
{[]string{"localhost:0"}, tlsConfig, "GET / HTTP/1.0\r\nHost: example.org\r\n\r\n", "200 OK\r\n", "GET / HTTP/1.0\r\n"},
{[]string{nonLoopBackIP + ":0"}, tlsConfig, "POST / HTTP/1.0\r\nHost: example.org\r\n\r\n", "200 OK\r\n", "POST / HTTP/1.0\r\n"},
{[]string{"127.0.0.1:0", nonLoopBackIP + ":0"}, tlsConfig, "CONNECT \r\nHost: www.example.org\r\n\r\n", "200 OK\r\n", "CONNECT \r\n"},
}
for i, testCase := range testCases {
@ -434,6 +438,7 @@ func TestHTTPListenerAccept(t *testing.T) {
time.Duration(0),
time.Duration(0),
time.Duration(0),
DefaultMaxHeaderBytes,
nil,
nil,
)
@ -463,13 +468,13 @@ func TestHTTPListenerAccept(t *testing.T) {
t.Fatalf("Test %d: accept: expected = <nil>, got = %v", i+1, err)
}
request, err := bufio.NewReader(serverConn).ReadString('\n')
requestLine, err := bufio.NewReader(serverConn).ReadString('\n')
if err != nil {
t.Fatalf("Test %d: request read: expected = <nil>, got = %v", i+1, err)
}
if testCase.request != request {
t.Fatalf("Test %d: request: expected = %v, got = %v", i+1, testCase.request, request)
if requestLine != testCase.expectedRequestLine {
t.Fatalf("Test %d: request: expected = %v, got = %v", i+1, testCase.expectedRequestLine, requestLine)
}
if _, err = io.WriteString(serverConn, testCase.reply); err != nil {
@ -513,6 +518,7 @@ func TestHTTPListenerAcceptPeekError(t *testing.T) {
time.Duration(0),
time.Duration(0),
time.Duration(0),
DefaultMaxHeaderBytes,
nil,
nil,
)
@ -566,6 +572,7 @@ func TestHTTPListenerAcceptTLSError(t *testing.T) {
time.Duration(0),
time.Duration(0),
time.Duration(0),
DefaultMaxHeaderBytes,
nil,
nil,
)
@ -632,6 +639,7 @@ func TestHTTPListenerAcceptError(t *testing.T) {
time.Duration(0),
time.Duration(0),
time.Duration(0),
DefaultMaxHeaderBytes,
nil,
nil,
)
@ -757,6 +765,7 @@ func TestHTTPListenerAcceptParallel(t *testing.T) {
time.Duration(0),
time.Duration(0),
time.Duration(0),
DefaultMaxHeaderBytes,
nil,
nil,
)
@ -765,8 +774,8 @@ func TestHTTPListenerAcceptParallel(t *testing.T) {
}
for _, serverAddr := range listener.Addrs() {
go connect(i, serverAddr.String(), testCase.tlsConfig != nil, true, "GET /1 HTTP/1.0\n", testCase.reply)
go connect(i, serverAddr.String(), testCase.tlsConfig != nil, false, "GET /2 HTTP/1.0\n", testCase.reply)
go connect(i, serverAddr.String(), testCase.tlsConfig != nil, true, "GET /1 HTTP/1.0\r\nHost: example.org\r\nr\n", testCase.reply)
go connect(i, serverAddr.String(), testCase.tlsConfig != nil, false, "GET /2 HTTP/1.0\r\nHost: example.org\r\n\r\n", testCase.reply)
var wg sync.WaitGroup
@ -775,14 +784,14 @@ func TestHTTPListenerAcceptParallel(t *testing.T) {
t.Fatalf("Test %d: accept: expected = <nil>, got = %v", i+1, err)
}
wg.Add(1)
go handleConnection(i, &wg, serverConn, "GET /2 HTTP/1.0\n", testCase.reply)
go handleConnection(i, &wg, serverConn, "GET /2 HTTP/1.0\r\n", testCase.reply)
serverConn, err = listener.Accept()
if err != nil {
t.Fatalf("Test %d: accept: expected = <nil>, got = %v", i+1, err)
}
wg.Add(1)
go handleConnection(i, &wg, serverConn, "GET /1 HTTP/1.0\n", testCase.reply)
go handleConnection(i, &wg, serverConn, "GET /1 HTTP/1.0\r\n", testCase.reply)
wg.Wait()
}

@ -54,8 +54,8 @@ type Server struct {
Addrs []string // addresses on which the server listens for new connection.
ShutdownTimeout time.Duration // timeout used for graceful server shutdown.
TCPKeepAliveTimeout time.Duration // timeout used for underneath TCP connection.
UpdateBytesReadFunc func(int) // function to be called to update bytes read in bufConn.
UpdateBytesWrittenFunc func(int) // function to be called to update bytes written in bufConn.
UpdateBytesReadFunc func(*http.Request, int) // function to be called to update bytes read in bufConn.
UpdateBytesWrittenFunc func(*http.Request, int) // function to be called to update bytes written in bufConn.
listenerMutex *sync.Mutex // to guard 'listener' field.
listener *httpListener // HTTP listener for all 'Addrs' field.
inShutdown uint32 // indicates whether the server is in shutdown or not
@ -91,6 +91,7 @@ func (srv *Server) Start() (err error) {
tcpKeepAliveTimeout,
readTimeout,
writeTimeout,
srv.MaxHeaderBytes,
updateBytesReadFunc,
updateBytesWrittenFunc,
)

Loading…
Cancel
Save