Tune tcp keep-alives with new kernel timeout options (#9963)

For more deeper understanding https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die/
master
Harshavardhana 4 years ago committed by GitHub
parent 21a37e3393
commit e59ee14f40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 48
      cmd/http/dial_linux.go
  2. 11
      cmd/http/dial_others.go
  3. 24
      cmd/http/listener.go
  4. 4
      cmd/http/listener_test.go
  5. 23
      cmd/http/server.go
  6. 4
      cmd/http/server_test.go
  7. 2
      cmd/update.go
  8. 2
      cmd/utils.go

@ -23,29 +23,51 @@ import (
"net" "net"
"syscall" "syscall"
"time" "time"
)
const ( "golang.org/x/sys/unix"
// TCPFastOpenConnect sets the underlying socket to use
// the TCP fast open connect. This feature is supported
// since Linux 4.11.
TCPFastOpenConnect = 30
) )
func setTCPParameters(c syscall.RawConn) error {
return c.Control(func(fdPtr uintptr) {
// got socket file descriptor to set parameters.
fd := int(fdPtr)
// Enable TCP fast connect
// TCPFastOpenConnect sets the underlying socket to use
// the TCP fast open connect. This feature is supported
// since Linux 4.11.
_ = syscall.SetsockoptInt(int(fd), syscall.IPPROTO_TCP, unix.TCP_FASTOPEN_CONNECT, 1)
// The time (in seconds) the connection needs to remain idle before
// TCP starts sending keepalive probes, set this to 5 secs
// system defaults to 7200 secs!!!
_ = syscall.SetsockoptInt(int(fd), syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, 5)
// Number of probes.
// ~ cat /proc/sys/net/ipv4/tcp_keepalive_probes (defaults to 9, we reduce it to 5)
// 9
_ = syscall.SetsockoptInt(int(fd), syscall.IPPROTO_TCP, syscall.TCP_KEEPCNT, 5)
// Wait time after successful probe in seconds.
// ~ cat /proc/sys/net/ipv4/tcp_keepalive_intvl (defaults to 75 secs, we reduce it to 2 secs)
// 75
_ = syscall.SetsockoptInt(int(fd), syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, 2)
// Set TCP_USER_TIMEOUT to TCP_KEEPIDLE + TCP_KEEPINTVL * TCP_KEEPCNT.
_ = syscall.SetsockoptInt(int(fd), syscall.IPPROTO_TCP, unix.TCP_USER_TIMEOUT, 15)
})
}
// DialContext is a function to make custom Dial for internode communications // DialContext is a function to make custom Dial for internode communications
type DialContext func(ctx context.Context, network, address string) (net.Conn, error) type DialContext func(ctx context.Context, network, address string) (net.Conn, error)
// NewCustomDialContext setups a custom dialer for internode communications // NewCustomDialContext setups a custom dialer for internode communications
func NewCustomDialContext(dialTimeout, dialKeepAlive time.Duration) DialContext { func NewCustomDialContext(dialTimeout time.Duration) DialContext {
return func(ctx context.Context, network, addr string) (net.Conn, error) { return func(ctx context.Context, network, addr string) (net.Conn, error) {
dialer := &net.Dialer{ dialer := &net.Dialer{
Timeout: dialTimeout, Timeout: dialTimeout,
KeepAlive: dialKeepAlive,
Control: func(network, address string, c syscall.RawConn) error { Control: func(network, address string, c syscall.RawConn) error {
return c.Control(func(fd uintptr) { return setTCPParameters(c)
// Enable TCP fast connect
_ = syscall.SetsockoptInt(int(fd), syscall.IPPROTO_TCP, TCPFastOpenConnect, 1)
})
}, },
} }
return dialer.DialContext(ctx, network, addr) return dialer.DialContext(ctx, network, addr)

@ -21,18 +21,23 @@ package http
import ( import (
"context" "context"
"net" "net"
"syscall"
"time" "time"
) )
// TODO: if possible implement for non-linux platforms, not a priority at the moment
func setTCPParameters(c syscall.RawConn) error {
return nil
}
// DialContext is a function to make custom Dial for internode communications // DialContext is a function to make custom Dial for internode communications
type DialContext func(ctx context.Context, network, address string) (net.Conn, error) type DialContext func(ctx context.Context, network, address string) (net.Conn, error)
// NewCustomDialContext configures a custom dialer for internode communications // NewCustomDialContext configures a custom dialer for internode communications
func NewCustomDialContext(dialTimeout, dialKeepAlive time.Duration) DialContext { func NewCustomDialContext(dialTimeout time.Duration) DialContext {
return func(ctx context.Context, network, addr string) (net.Conn, error) { return func(ctx context.Context, network, addr string) (net.Conn, error) {
dialer := &net.Dialer{ dialer := &net.Dialer{
Timeout: dialTimeout, Timeout: dialTimeout,
KeepAlive: dialKeepAlive,
} }
return dialer.DialContext(ctx, network, addr) return dialer.DialContext(ctx, network, addr)
} }

@ -23,7 +23,6 @@ import (
"os" "os"
"sync" "sync"
"syscall" "syscall"
"time"
) )
type acceptResult struct { type acceptResult struct {
@ -33,11 +32,10 @@ type acceptResult struct {
// httpListener - HTTP listener capable of handling multiple server addresses. // httpListener - HTTP listener capable of handling multiple server addresses.
type httpListener struct { type httpListener struct {
mutex sync.Mutex // to guard Close() method. mutex sync.Mutex // to guard Close() method.
tcpListeners []*net.TCPListener // underlaying TCP listeners. tcpListeners []*net.TCPListener // underlaying TCP listeners.
acceptCh chan acceptResult // channel where all TCP listeners write accepted connection. acceptCh chan acceptResult // channel where all TCP listeners write accepted connection.
doneCh chan struct{} // done channel for TCP listener goroutines. doneCh chan struct{} // done channel for TCP listener goroutines.
tcpKeepAliveTimeout time.Duration
} }
// isRoutineNetErr returns true if error is due to a network timeout, // isRoutineNetErr returns true if error is due to a network timeout,
@ -83,10 +81,10 @@ func (listener *httpListener) start() {
// Closure to handle single connection. // Closure to handle single connection.
handleConn := func(tcpConn *net.TCPConn, doneCh <-chan struct{}) { handleConn := func(tcpConn *net.TCPConn, doneCh <-chan struct{}) {
// Tune accepted TCP connection. rawConn, err := tcpConn.SyscallConn()
tcpConn.SetKeepAlive(true) if err == nil {
tcpConn.SetKeepAlivePeriod(listener.tcpKeepAliveTimeout) setTCPParameters(rawConn)
}
send(acceptResult{tcpConn, nil}, doneCh) send(acceptResult{tcpConn, nil}, doneCh)
} }
@ -167,8 +165,7 @@ func (listener *httpListener) Addrs() (addrs []net.Addr) {
// httpListener is capable to // httpListener is capable to
// * listen to multiple addresses // * listen to multiple addresses
// * controls incoming connections only doing HTTP protocol // * controls incoming connections only doing HTTP protocol
func newHTTPListener(serverAddrs []string, func newHTTPListener(serverAddrs []string) (listener *httpListener, err error) {
tcpKeepAliveTimeout time.Duration) (listener *httpListener, err error) {
var tcpListeners []*net.TCPListener var tcpListeners []*net.TCPListener
@ -201,8 +198,7 @@ func newHTTPListener(serverAddrs []string,
} }
listener = &httpListener{ listener = &httpListener{
tcpListeners: tcpListeners, tcpListeners: tcpListeners,
tcpKeepAliveTimeout: tcpKeepAliveTimeout,
} }
listener.start() listener.start()

@ -151,7 +151,6 @@ func TestNewHTTPListener(t *testing.T) {
for _, testCase := range testCases { for _, testCase := range testCases {
listener, err := newHTTPListener( listener, err := newHTTPListener(
testCase.serverAddrs, testCase.serverAddrs,
testCase.tcpKeepAliveTimeout,
) )
if !testCase.expectedErr { if !testCase.expectedErr {
@ -185,7 +184,6 @@ func TestHTTPListenerStartClose(t *testing.T) {
for i, testCase := range testCases { for i, testCase := range testCases {
listener, err := newHTTPListener( listener, err := newHTTPListener(
testCase.serverAddrs, testCase.serverAddrs,
time.Duration(0),
) )
if err != nil { if err != nil {
t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err) t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err)
@ -225,7 +223,6 @@ func TestHTTPListenerAddr(t *testing.T) {
for i, testCase := range testCases { for i, testCase := range testCases {
listener, err := newHTTPListener( listener, err := newHTTPListener(
testCase.serverAddrs, testCase.serverAddrs,
time.Duration(0),
) )
if err != nil { if err != nil {
t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err) t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err)
@ -262,7 +259,6 @@ func TestHTTPListenerAddrs(t *testing.T) {
for i, testCase := range testCases { for i, testCase := range testCases {
listener, err := newHTTPListener( listener, err := newHTTPListener(
testCase.serverAddrs, testCase.serverAddrs,
time.Duration(0),
) )
if err != nil { if err != nil {
t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err) t.Fatalf("Test %d: error: expected = <nil>, got = %v", i+1, err)

@ -38,9 +38,6 @@ const (
// DefaultShutdownTimeout - default shutdown timeout used for graceful http server shutdown. // DefaultShutdownTimeout - default shutdown timeout used for graceful http server shutdown.
DefaultShutdownTimeout = 5 * time.Second DefaultShutdownTimeout = 5 * time.Second
// DefaultTCPKeepAliveTimeout - default TCP keep alive timeout for accepted connection.
DefaultTCPKeepAliveTimeout = 30 * time.Second
// DefaultMaxHeaderBytes - default maximum HTTP header size in bytes. // DefaultMaxHeaderBytes - default maximum HTTP header size in bytes.
DefaultMaxHeaderBytes = 1 * humanize.MiByte DefaultMaxHeaderBytes = 1 * humanize.MiByte
) )
@ -48,13 +45,12 @@ const (
// Server - extended http.Server supports multiple addresses to serve and enhanced connection handling. // Server - extended http.Server supports multiple addresses to serve and enhanced connection handling.
type Server struct { type Server struct {
http.Server http.Server
Addrs []string // addresses on which the server listens for new connection. Addrs []string // addresses on which the server listens for new connection.
ShutdownTimeout time.Duration // timeout used for graceful server shutdown. ShutdownTimeout time.Duration // timeout used for graceful server shutdown.
TCPKeepAliveTimeout time.Duration // timeout used for underneath TCP connection. listenerMutex sync.Mutex // to guard 'listener' field.
listenerMutex sync.Mutex // to guard 'listener' field. listener *httpListener // HTTP listener for all 'Addrs' field.
listener *httpListener // HTTP listener for all 'Addrs' field. inShutdown uint32 // indicates whether the server is in shutdown or not
inShutdown uint32 // indicates whether the server is in shutdown or not requestCount int32 // counter holds no. of request in progress.
requestCount int32 // counter holds no. of request in progress.
} }
// GetRequestCount - returns number of request in progress. // GetRequestCount - returns number of request in progress.
@ -72,13 +68,11 @@ func (srv *Server) Start() (err error) {
handler := srv.Handler // if srv.Handler holds non-synced state -> possible data race handler := srv.Handler // if srv.Handler holds non-synced state -> possible data race
addrs := set.CreateStringSet(srv.Addrs...).ToSlice() // copy and remove duplicates addrs := set.CreateStringSet(srv.Addrs...).ToSlice() // copy and remove duplicates
tcpKeepAliveTimeout := srv.TCPKeepAliveTimeout
// Create new HTTP listener. // Create new HTTP listener.
var listener *httpListener var listener *httpListener
listener, err = newHTTPListener( listener, err = newHTTPListener(
addrs, addrs,
tcpKeepAliveTimeout,
) )
if err != nil { if err != nil {
return err return err
@ -204,9 +198,8 @@ func NewServer(addrs []string, handler http.Handler, getCert certs.GetCertificat
} }
httpServer := &Server{ httpServer := &Server{
Addrs: addrs, Addrs: addrs,
ShutdownTimeout: DefaultShutdownTimeout, ShutdownTimeout: DefaultShutdownTimeout,
TCPKeepAliveTimeout: DefaultTCPKeepAliveTimeout,
} }
httpServer.Handler = handler httpServer.Handler = handler
httpServer.TLSConfig = tlsConfig httpServer.TLSConfig = tlsConfig

@ -73,10 +73,6 @@ func TestNewServer(t *testing.T) {
t.Fatalf("Case %v: server.ShutdownTimeout: expected: %v, got: %v", (i + 1), DefaultShutdownTimeout, server.ShutdownTimeout) t.Fatalf("Case %v: server.ShutdownTimeout: expected: %v, got: %v", (i + 1), DefaultShutdownTimeout, server.ShutdownTimeout)
} }
if server.TCPKeepAliveTimeout != DefaultTCPKeepAliveTimeout {
t.Fatalf("Case %v: server.TCPKeepAliveTimeout: expected: %v, got: %v", (i + 1), DefaultTCPKeepAliveTimeout, server.TCPKeepAliveTimeout)
}
if server.MaxHeaderBytes != DefaultMaxHeaderBytes { if server.MaxHeaderBytes != DefaultMaxHeaderBytes {
t.Fatalf("Case %v: server.MaxHeaderBytes: expected: %v, got: %v", (i + 1), DefaultMaxHeaderBytes, server.MaxHeaderBytes) t.Fatalf("Case %v: server.MaxHeaderBytes: expected: %v, got: %v", (i + 1), DefaultMaxHeaderBytes, server.MaxHeaderBytes)
} }

@ -409,7 +409,7 @@ const updateTimeout = 10 * time.Second
func getUpdateTransport(timeout time.Duration) http.RoundTripper { func getUpdateTransport(timeout time.Duration) http.RoundTripper {
var updateTransport http.RoundTripper = &http.Transport{ var updateTransport http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment, Proxy: http.ProxyFromEnvironment,
DialContext: xhttp.NewCustomDialContext(timeout, timeout), DialContext: xhttp.NewCustomDialContext(timeout),
IdleConnTimeout: timeout, IdleConnTimeout: timeout,
TLSHandshakeTimeout: timeout, TLSHandshakeTimeout: timeout,
ExpectContinueTimeout: timeout, ExpectContinueTimeout: timeout,

@ -454,7 +454,7 @@ func newCustomHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration) fu
// https://golang.org/pkg/net/http/#Transport documentation // https://golang.org/pkg/net/http/#Transport documentation
tr := &http.Transport{ tr := &http.Transport{
Proxy: http.ProxyFromEnvironment, Proxy: http.ProxyFromEnvironment,
DialContext: xhttp.NewCustomDialContext(dialTimeout, 15*time.Second), DialContext: xhttp.NewCustomDialContext(dialTimeout),
MaxIdleConnsPerHost: 16, MaxIdleConnsPerHost: 16,
MaxIdleConns: 16, MaxIdleConns: 16,
IdleConnTimeout: 1 * time.Minute, IdleConnTimeout: 1 * time.Minute,

Loading…
Cancel
Save