Rewrite connection muxer peek process to avoid server blocking by silent clients (#3187)

master
Anis Elleuch 8 years ago committed by Harshavardhana
parent 754c0770d6
commit 5ff30777e1
  1. 65
      cmd/server-mux.go
  2. 7
      cmd/server-mux_test.go

@ -134,11 +134,52 @@ func (c *ConnMux) Close() (err error) {
type ListenerMux struct { type ListenerMux struct {
net.Listener net.Listener
config *tls.Config config *tls.Config
// acceptResCh is a channel for transporting wrapped net.Conn (regular or tls)
// after peeking the content of the latter
acceptResCh chan ListenerMuxAcceptRes
// Cond is used to signal Close when there are no references to the listener. // Cond is used to signal Close when there are no references to the listener.
cond *sync.Cond cond *sync.Cond
refs int refs int
} }
// ListenerMuxAcceptRes contains then final net.Conn data (wrapper by tls or not) to be sent to the http handler
type ListenerMuxAcceptRes struct {
conn net.Conn
err error
}
// newListenerMux listens and wraps accepted connections with tls after protocol peeking
func newListenerMux(listener net.Listener, config *tls.Config) *ListenerMux {
l := ListenerMux{
Listener: listener,
config: config,
cond: sync.NewCond(&sync.Mutex{}),
acceptResCh: make(chan ListenerMuxAcceptRes),
}
// Start listening, wrap connections with tls when needed
go func() {
// Loop for accepting new connections
for {
conn, err := l.Listener.Accept()
if err != nil {
l.acceptResCh <- ListenerMuxAcceptRes{err: err}
return
}
// Wrap the connection with ConnMux to be able to peek the data in the incoming connection
// and decide if we need to wrap the connection itself with a TLS or not
go func(conn net.Conn) {
connMux := NewConnMux(conn)
if connMux.PeekProtocol() == "tls" {
l.acceptResCh <- ListenerMuxAcceptRes{conn: tls.Server(connMux, l.config)}
} else {
l.acceptResCh <- ListenerMuxAcceptRes{conn: connMux}
}
}(conn)
}
}()
return &l
}
// IsClosed - Returns if the underlying listener is closed fully. // IsClosed - Returns if the underlying listener is closed fully.
func (l *ListenerMux) IsClosed() bool { func (l *ListenerMux) IsClosed() bool {
l.cond.L.Lock() l.cond.L.Lock()
@ -187,16 +228,8 @@ func (l *ListenerMux) Accept() (net.Conn, error) {
l.incRef() l.incRef()
defer l.decRef() defer l.decRef()
conn, err := l.Listener.Accept() res := <-l.acceptResCh
if err != nil { return res.conn, res.err
return conn, err
}
connMux := NewConnMux(conn)
protocol := connMux.PeekProtocol()
if protocol == "tls" {
return tls.Server(connMux, l.config), nil
}
return connMux, nil
} }
// ServerMux - the main mux server // ServerMux - the main mux server
@ -247,11 +280,7 @@ func initListeners(serverAddr string, tls *tls.Config) ([]*ListenerMux, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
listeners = append(listeners, &ListenerMux{ listeners = append(listeners, newListenerMux(listener, tls))
Listener: listener,
config: tls,
cond: sync.NewCond(&sync.Mutex{}),
})
return listeners, nil return listeners, nil
} }
var addrs []string var addrs []string
@ -272,11 +301,7 @@ func initListeners(serverAddr string, tls *tls.Config) ([]*ListenerMux, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
listeners = append(listeners, &ListenerMux{ listeners = append(listeners, newListenerMux(listener, tls))
Listener: listener,
config: tls,
cond: sync.NewCond(&sync.Mutex{}),
})
} }
return listeners, nil return listeners, nil
} }

@ -59,11 +59,8 @@ func runTest(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
ln = &ListenerMux{
Listener: ln, ln = newListenerMux(ln, &tls.Config{})
config: &tls.Config{},
cond: sync.NewCond(&sync.Mutex{}),
}
addr := ln.Addr().String() addr := ln.Addr().String()
waitForListener := make(chan error) waitForListener := make(chan error)

Loading…
Cancel
Save