From c2c7bdf0cd2b622d1d61dfd83fc925eadbdbb37a Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 6 Jul 2015 18:11:25 -0700 Subject: [PATCH] Cleanup nimble http --- pkg/server/nimble/http.go | 62 ++++++++++++++++++++------------------- pkg/server/nimble/net.go | 41 +++++++++++++------------- 2 files changed, 53 insertions(+), 50 deletions(-) diff --git a/pkg/server/nimble/http.go b/pkg/server/nimble/http.go index d1b72f030..4caa2fabe 100644 --- a/pkg/server/nimble/http.go +++ b/pkg/server/nimble/http.go @@ -13,6 +13,7 @@ import ( "os/signal" "sync" "syscall" + "time" "github.com/facebookgo/httpdown" "github.com/minio/minio/pkg/iodine" @@ -23,25 +24,16 @@ var ( ppid = os.Getppid() ) -// An app contains one or more servers and associated configuration. +// An app contains one or more servers and their associated configuration. type app struct { servers []*http.Server - net *nimbleNet listeners []net.Listener sds []httpdown.Server + net *nimbleNet errors chan error } -func newApp(servers []*http.Server) *app { - return &app{ - servers: servers, - net: &nimbleNet{}, - listeners: make([]net.Listener, 0, len(servers)), - sds: make([]httpdown.Server, 0, len(servers)), - errors: make(chan error, 1+(len(servers)*2)), - } -} - +// listen initailize listeners func (a *app) listen() error { for _, s := range a.servers { l, err := a.net.Listen("tcp", s.Addr) @@ -56,17 +48,22 @@ func (a *app) listen() error { return nil } +// serve start serving all listeners func (a *app) serve() { - h := &httpdown.HTTP{} + h := &httpdown.HTTP{ + StopTimeout: 10 * time.Second, + KillTimeout: 1 * time.Second, + } for i, s := range a.servers { a.sds = append(a.sds, h.Serve(s, a.listeners[i])) } } +// wait for http server to signal all requests that have been served func (a *app) wait() { var wg sync.WaitGroup wg.Add(len(a.sds) * 2) // Wait & Stop - go a.signalHandler(&wg) + go a.trapSignal(&wg) for _, s := range a.sds { go func(s httpdown.Server) { defer wg.Done() @@ -78,28 +75,25 @@ func (a *app) wait() { wg.Wait() } -func (a *app) term(wg *sync.WaitGroup) { - for _, s := range a.sds { - go func(s httpdown.Server) { - defer wg.Done() - if err := s.Stop(); err != nil { - a.errors <- iodine.New(err, nil) - } - }(s) - } -} - -func (a *app) signalHandler(wg *sync.WaitGroup) { +// trapSignal wait on listed signals for pre-defined behaviors +func (a *app) trapSignal(wg *sync.WaitGroup) { ch := make(chan os.Signal, 10) signal.Notify(ch, syscall.SIGTERM, syscall.SIGUSR2, syscall.SIGHUP) for { sig := <-ch switch sig { case syscall.SIGTERM: - // this ensures a subsequent TERM will trigger standard go behaviour of - // terminating. + // this ensures a subsequent TERM will trigger standard go behaviour of terminating signal.Stop(ch) - a.term(wg) + // roll through all initialized http servers and stop them + for _, s := range a.sds { + go func(s httpdown.Server) { + defer wg.Done() + if err := s.Stop(); err != nil { + a.errors <- iodine.New(err, nil) + } + }(s) + } return case syscall.SIGUSR2: fallthrough @@ -116,7 +110,13 @@ func (a *app) signalHandler(wg *sync.WaitGroup) { // ListenAndServe will serve the given http.Servers and will monitor for signals // allowing for graceful termination (SIGTERM) or restart (SIGUSR2/SIGHUP). func ListenAndServe(servers ...*http.Server) error { - a := newApp(servers) + a := &app{ + servers: servers, + listeners: make([]net.Listener, 0, len(servers)), + sds: make([]httpdown.Server, 0, len(servers)), + net: &nimbleNet{}, + errors: make(chan error, 1+(len(servers)*2)), + } // Acquire Listeners if err := a.listen(); err != nil { @@ -137,6 +137,8 @@ func ListenAndServe(servers ...*http.Server) error { go func() { defer close(waitdone) a.wait() + // communicate by sending not by closing a channel + waitdone <- struct{}{} }() select { diff --git a/pkg/server/nimble/net.go b/pkg/server/nimble/net.go index b360b85b0..eeaabae0d 100644 --- a/pkg/server/nimble/net.go +++ b/pkg/server/nimble/net.go @@ -37,13 +37,14 @@ var originalWD, _ = os.Getwd() // nimbleNet provides the family of Listen functions and maintains the associated // state. Typically you will have only once instance of nimbleNet per application. type nimbleNet struct { - inherited []net.Listener - active []net.Listener - mutex sync.Mutex - inheritOnce sync.Once + inheritedListeners []net.Listener + activeListeners []net.Listener + mutex sync.Mutex + inheritOnce sync.Once } -func (n *nimbleNet) inherit() error { +// inherit - lookg for LISTEN_FDS in environment variables and populate listeners +func (n *nimbleNet) getInheritedListeners() error { var retErr error n.inheritOnce.Do(func() { n.mutex.Lock() @@ -71,7 +72,7 @@ func (n *nimbleNet) inherit() error { retErr = fmt.Errorf("error closing inherited socket fd %d: %s", i, err) return } - n.inherited = append(n.inherited, l) + n.inheritedListeners = append(n.inheritedListeners, l) } }) return iodine.New(retErr, nil) @@ -104,7 +105,7 @@ func (n *nimbleNet) Listen(nett, laddr string) (net.Listener, error) { // be: "tcp", "tcp4" or "tcp6". It returns an inherited net.Listener for the // matching network and address, or creates a new one using net.ListenTCP. func (n *nimbleNet) ListenTCP(nett string, laddr *net.TCPAddr) (*net.TCPListener, error) { - if err := n.inherit(); err != nil { + if err := n.getInheritedListeners(); err != nil { return nil, iodine.New(err, nil) } @@ -112,13 +113,13 @@ func (n *nimbleNet) ListenTCP(nett string, laddr *net.TCPAddr) (*net.TCPListener defer n.mutex.Unlock() // look for an inherited listener - for i, l := range n.inherited { + for i, l := range n.inheritedListeners { if l == nil { // we nil used inherited listeners continue } if isSameAddr(l.Addr(), laddr) { - n.inherited[i] = nil - n.active = append(n.active, l) + n.inheritedListeners[i] = nil + n.activeListeners = append(n.activeListeners, l) return l.(*net.TCPListener), nil } } @@ -128,7 +129,7 @@ func (n *nimbleNet) ListenTCP(nett string, laddr *net.TCPAddr) (*net.TCPListener if err != nil { return nil, iodine.New(err, nil) } - n.active = append(n.active, l) + n.activeListeners = append(n.activeListeners, l) return l, nil } @@ -136,7 +137,7 @@ func (n *nimbleNet) ListenTCP(nett string, laddr *net.TCPAddr) (*net.TCPListener // must be a: "unix" or "unixpacket". It returns an inherited net.Listener for // the matching network and address, or creates a new one using net.ListenUnix. func (n *nimbleNet) ListenUnix(nett string, laddr *net.UnixAddr) (*net.UnixListener, error) { - if err := n.inherit(); err != nil { + if err := n.getInheritedListeners(); err != nil { return nil, iodine.New(err, nil) } @@ -144,13 +145,13 @@ func (n *nimbleNet) ListenUnix(nett string, laddr *net.UnixAddr) (*net.UnixListe defer n.mutex.Unlock() // look for an inherited listener - for i, l := range n.inherited { + for i, l := range n.inheritedListeners { if l == nil { // we nil used inherited listeners continue } if isSameAddr(l.Addr(), laddr) { - n.inherited[i] = nil - n.active = append(n.active, l) + n.inheritedListeners[i] = nil + n.activeListeners = append(n.activeListeners, l) return l.(*net.UnixListener), nil } } @@ -160,16 +161,16 @@ func (n *nimbleNet) ListenUnix(nett string, laddr *net.UnixAddr) (*net.UnixListe if err != nil { return nil, iodine.New(err, nil) } - n.active = append(n.active, l) + n.activeListeners = append(n.activeListeners, l) return l, nil } // activeListeners returns a snapshot copy of the active listeners. -func (n *nimbleNet) activeListeners() ([]net.Listener, error) { +func (n *nimbleNet) getActiveListeners() ([]net.Listener, error) { n.mutex.Lock() defer n.mutex.Unlock() - ls := make([]net.Listener, len(n.active)) - copy(ls, n.active) + ls := make([]net.Listener, len(n.activeListeners)) + copy(ls, n.activeListeners) return ls, nil } @@ -200,7 +201,7 @@ func isSameAddr(a1, a2 net.Addr) bool { // deployed binary to be started. It returns the pid of the newly started // process when successful. func (n *nimbleNet) StartProcess() (int, error) { - listeners, err := n.activeListeners() + listeners, err := n.getActiveListeners() if err != nil { return 0, iodine.New(err, nil) }