diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index d71a60b9d..bf818444b 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -9,6 +9,18 @@ "ImportPath": "github.com/dustin/go-humanize", "Rev": "8cc1aaa2d955ee82833337cfb10babc42be6bce6" }, + { + "ImportPath": "github.com/facebookgo/clock", + "Rev": "600d898af40aa09a7a93ecb9265d87b0504b6f03" + }, + { + "ImportPath": "github.com/facebookgo/httpdown", + "Rev": "3d94c3159d8ba15fa8e9499134ccc0d8acf6adb7" + }, + { + "ImportPath": "github.com/facebookgo/stats", + "Rev": "31fb71caf5a4f04c9f8bb3fa8e7c2597ba6eb50a" + }, { "ImportPath": "github.com/fatih/structs", "Rev": "c00d27128bb88e9c1adab1a53cda9c72c6d1ff9b" diff --git a/Godeps/_workspace/src/github.com/facebookgo/clock/LICENSE b/Godeps/_workspace/src/github.com/facebookgo/clock/LICENSE new file mode 100644 index 000000000..ce212cb1c --- /dev/null +++ b/Godeps/_workspace/src/github.com/facebookgo/clock/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 Ben Johnson + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/Godeps/_workspace/src/github.com/facebookgo/clock/README.md b/Godeps/_workspace/src/github.com/facebookgo/clock/README.md new file mode 100644 index 000000000..5d4f4fe72 --- /dev/null +++ b/Godeps/_workspace/src/github.com/facebookgo/clock/README.md @@ -0,0 +1,104 @@ +clock [![Build Status](https://drone.io/github.com/benbjohnson/clock/status.png)](https://drone.io/github.com/benbjohnson/clock/latest) [![Coverage Status](https://coveralls.io/repos/benbjohnson/clock/badge.png?branch=master)](https://coveralls.io/r/benbjohnson/clock?branch=master) [![GoDoc](https://godoc.org/github.com/benbjohnson/clock?status.png)](https://godoc.org/github.com/benbjohnson/clock) ![Project status](http://img.shields.io/status/experimental.png?color=red) +===== + +Clock is a small library for mocking time in Go. It provides an interface +around the standard library's [`time`][time] package so that the application +can use the realtime clock while tests can use the mock clock. + +[time]: http://golang.org/pkg/time/ + + +## Usage + +### Realtime Clock + +Your application can maintain a `Clock` variable that will allow realtime and +mock clocks to be interchangable. For example, if you had an `Application` type: + +```go +import "github.com/benbjohnson/clock" + +type Application struct { + Clock clock.Clock +} +``` + +You could initialize it to use the realtime clock like this: + +```go +var app Application +app.Clock = clock.New() +... +``` + +Then all timers and time-related functionality should be performed from the +`Clock` variable. + + +### Mocking time + +In your tests, you will want to use a `Mock` clock: + +```go +import ( + "testing" + + "github.com/benbjohnson/clock" +) + +func TestApplication_DoSomething(t *testing.T) { + mock := clock.NewMock() + app := Application{Clock: mock} + ... +} +``` + +Now that you've initialized your application to use the mock clock, you can +adjust the time programmatically. The mock clock always starts from the Unix +epoch (midnight, Jan 1, 1970 UTC). + + +### Controlling time + +The mock clock provides the same functions that the standard library's `time` +package provides. For example, to find the current time, you use the `Now()` +function: + +```go +mock := clock.NewMock() + +// Find the current time. +mock.Now().UTC() // 1970-01-01 00:00:00 +0000 UTC + +// Move the clock forward. +mock.Add(2 * time.Hour) + +// Check the time again. It's 2 hours later! +mock.Now().UTC() // 1970-01-01 02:00:00 +0000 UTC +``` + +Timers and Tickers are also controlled by this same mock clock. They will only +execute when the clock is moved forward: + +``` +mock := clock.NewMock() +count := 0 + +// Kick off a timer to increment every 1 mock second. +go func() { + ticker := clock.Ticker(1 * time.Second) + for { + <-ticker.C + count++ + } +}() +runtime.Gosched() + +// Move the clock forward 10 second. +mock.Add(10 * time.Second) + +// This prints 10. +fmt.Println(count) +``` + + diff --git a/Godeps/_workspace/src/github.com/facebookgo/clock/clock.go b/Godeps/_workspace/src/github.com/facebookgo/clock/clock.go new file mode 100644 index 000000000..bca1a7ba8 --- /dev/null +++ b/Godeps/_workspace/src/github.com/facebookgo/clock/clock.go @@ -0,0 +1,363 @@ +package clock + +import ( + "runtime" + "sort" + "sync" + "time" +) + +// Clock represents an interface to the functions in the standard library time +// package. Two implementations are available in the clock package. The first +// is a real-time clock which simply wraps the time package's functions. The +// second is a mock clock which will only make forward progress when +// programmatically adjusted. +type Clock interface { + After(d time.Duration) <-chan time.Time + AfterFunc(d time.Duration, f func()) *Timer + Now() time.Time + Sleep(d time.Duration) + Tick(d time.Duration) <-chan time.Time + Ticker(d time.Duration) *Ticker + Timer(d time.Duration) *Timer +} + +// New returns an instance of a real-time clock. +func New() Clock { + return &clock{} +} + +// clock implements a real-time clock by simply wrapping the time package functions. +type clock struct{} + +func (c *clock) After(d time.Duration) <-chan time.Time { return time.After(d) } + +func (c *clock) AfterFunc(d time.Duration, f func()) *Timer { + return &Timer{timer: time.AfterFunc(d, f)} +} + +func (c *clock) Now() time.Time { return time.Now() } + +func (c *clock) Sleep(d time.Duration) { time.Sleep(d) } + +func (c *clock) Tick(d time.Duration) <-chan time.Time { return time.Tick(d) } + +func (c *clock) Ticker(d time.Duration) *Ticker { + t := time.NewTicker(d) + return &Ticker{C: t.C, ticker: t} +} + +func (c *clock) Timer(d time.Duration) *Timer { + t := time.NewTimer(d) + return &Timer{C: t.C, timer: t} +} + +// Mock represents a mock clock that only moves forward programmically. +// It can be preferable to a real-time clock when testing time-based functionality. +type Mock struct { + mu sync.Mutex + now time.Time // current time + timers clockTimers // tickers & timers + + calls Calls + waiting []waiting + callsMutex sync.Mutex +} + +// NewMock returns an instance of a mock clock. +// The current time of the mock clock on initialization is the Unix epoch. +func NewMock() *Mock { + return &Mock{now: time.Unix(0, 0)} +} + +// Add moves the current time of the mock clock forward by the duration. +// This should only be called from a single goroutine at a time. +func (m *Mock) Add(d time.Duration) { + // Calculate the final current time. + t := m.now.Add(d) + + // Continue to execute timers until there are no more before the new time. + for { + if !m.runNextTimer(t) { + break + } + } + + // Ensure that we end with the new time. + m.mu.Lock() + m.now = t + m.mu.Unlock() + + // Give a small buffer to make sure the other goroutines get handled. + gosched() +} + +// runNextTimer executes the next timer in chronological order and moves the +// current time to the timer's next tick time. The next time is not executed if +// it's next time if after the max time. Returns true if a timer is executed. +func (m *Mock) runNextTimer(max time.Time) bool { + m.mu.Lock() + + // Sort timers by time. + sort.Sort(m.timers) + + // If we have no more timers then exit. + if len(m.timers) == 0 { + m.mu.Unlock() + return false + } + + // Retrieve next timer. Exit if next tick is after new time. + t := m.timers[0] + if t.Next().After(max) { + m.mu.Unlock() + return false + } + + // Move "now" forward and unlock clock. + m.now = t.Next() + m.mu.Unlock() + + // Execute timer. + t.Tick(m.now) + return true +} + +// After waits for the duration to elapse and then sends the current time on the returned channel. +func (m *Mock) After(d time.Duration) <-chan time.Time { + defer m.inc(&m.calls.After) + return m.Timer(d).C +} + +// AfterFunc waits for the duration to elapse and then executes a function. +// A Timer is returned that can be stopped. +func (m *Mock) AfterFunc(d time.Duration, f func()) *Timer { + defer m.inc(&m.calls.AfterFunc) + t := m.Timer(d) + t.C = nil + t.fn = f + return t +} + +// Now returns the current wall time on the mock clock. +func (m *Mock) Now() time.Time { + defer m.inc(&m.calls.Now) + m.mu.Lock() + defer m.mu.Unlock() + return m.now +} + +// Sleep pauses the goroutine for the given duration on the mock clock. +// The clock must be moved forward in a separate goroutine. +func (m *Mock) Sleep(d time.Duration) { + defer m.inc(&m.calls.Sleep) + <-m.After(d) +} + +// Tick is a convenience function for Ticker(). +// It will return a ticker channel that cannot be stopped. +func (m *Mock) Tick(d time.Duration) <-chan time.Time { + defer m.inc(&m.calls.Tick) + return m.Ticker(d).C +} + +// Ticker creates a new instance of Ticker. +func (m *Mock) Ticker(d time.Duration) *Ticker { + defer m.inc(&m.calls.Ticker) + m.mu.Lock() + defer m.mu.Unlock() + ch := make(chan time.Time) + t := &Ticker{ + C: ch, + c: ch, + mock: m, + d: d, + next: m.now.Add(d), + } + m.timers = append(m.timers, (*internalTicker)(t)) + return t +} + +// Timer creates a new instance of Timer. +func (m *Mock) Timer(d time.Duration) *Timer { + defer m.inc(&m.calls.Timer) + m.mu.Lock() + defer m.mu.Unlock() + ch := make(chan time.Time) + t := &Timer{ + C: ch, + c: ch, + mock: m, + next: m.now.Add(d), + } + m.timers = append(m.timers, (*internalTimer)(t)) + return t +} + +func (m *Mock) removeClockTimer(t clockTimer) { + m.mu.Lock() + defer m.mu.Unlock() + for i, timer := range m.timers { + if timer == t { + copy(m.timers[i:], m.timers[i+1:]) + m.timers[len(m.timers)-1] = nil + m.timers = m.timers[:len(m.timers)-1] + break + } + } + sort.Sort(m.timers) +} + +func (m *Mock) inc(addr *uint32) { + m.callsMutex.Lock() + defer m.callsMutex.Unlock() + *addr++ + var newWaiting []waiting + for _, w := range m.waiting { + if m.calls.atLeast(w.expected) { + close(w.done) + continue + } + newWaiting = append(newWaiting, w) + } + m.waiting = newWaiting +} + +// Wait waits for at least the relevant calls before returning. The expected +// Calls are always over the lifetime of the Mock. Values in the Calls struct +// are used as the minimum number of calls, this allows you to wait for only +// the calls you care about. +func (m *Mock) Wait(s Calls) { + m.callsMutex.Lock() + if m.calls.atLeast(s) { + m.callsMutex.Unlock() + return + } + done := make(chan struct{}) + m.waiting = append(m.waiting, waiting{expected: s, done: done}) + m.callsMutex.Unlock() + <-done +} + +// clockTimer represents an object with an associated start time. +type clockTimer interface { + Next() time.Time + Tick(time.Time) +} + +// clockTimers represents a list of sortable timers. +type clockTimers []clockTimer + +func (a clockTimers) Len() int { return len(a) } +func (a clockTimers) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a clockTimers) Less(i, j int) bool { return a[i].Next().Before(a[j].Next()) } + +// Timer represents a single event. +// The current time will be sent on C, unless the timer was created by AfterFunc. +type Timer struct { + C <-chan time.Time + c chan time.Time + timer *time.Timer // realtime impl, if set + next time.Time // next tick time + mock *Mock // mock clock, if set + fn func() // AfterFunc function, if set +} + +// Stop turns off the ticker. +func (t *Timer) Stop() { + if t.timer != nil { + t.timer.Stop() + } else { + t.mock.removeClockTimer((*internalTimer)(t)) + } +} + +type internalTimer Timer + +func (t *internalTimer) Next() time.Time { return t.next } +func (t *internalTimer) Tick(now time.Time) { + if t.fn != nil { + t.fn() + } else { + t.c <- now + } + t.mock.removeClockTimer((*internalTimer)(t)) + gosched() +} + +// Ticker holds a channel that receives "ticks" at regular intervals. +type Ticker struct { + C <-chan time.Time + c chan time.Time + ticker *time.Ticker // realtime impl, if set + next time.Time // next tick time + mock *Mock // mock clock, if set + d time.Duration // time between ticks +} + +// Stop turns off the ticker. +func (t *Ticker) Stop() { + if t.ticker != nil { + t.ticker.Stop() + } else { + t.mock.removeClockTimer((*internalTicker)(t)) + } +} + +type internalTicker Ticker + +func (t *internalTicker) Next() time.Time { return t.next } +func (t *internalTicker) Tick(now time.Time) { + select { + case t.c <- now: + case <-time.After(1 * time.Millisecond): + } + t.next = now.Add(t.d) + gosched() +} + +// Sleep momentarily so that other goroutines can process. +func gosched() { runtime.Gosched() } + +// Calls keeps track of the count of calls for each of the methods on the Clock +// interface. +type Calls struct { + After uint32 + AfterFunc uint32 + Now uint32 + Sleep uint32 + Tick uint32 + Ticker uint32 + Timer uint32 +} + +// atLeast returns true if at least the number of calls in o have been made. +func (c Calls) atLeast(o Calls) bool { + if c.After < o.After { + return false + } + if c.AfterFunc < o.AfterFunc { + return false + } + if c.Now < o.Now { + return false + } + if c.Sleep < o.Sleep { + return false + } + if c.Tick < o.Tick { + return false + } + if c.Ticker < o.Ticker { + return false + } + if c.Timer < o.Timer { + return false + } + return true +} + +type waiting struct { + expected Calls + done chan struct{} +} diff --git a/Godeps/_workspace/src/github.com/facebookgo/clock/clock_test.go b/Godeps/_workspace/src/github.com/facebookgo/clock/clock_test.go new file mode 100644 index 000000000..452622e19 --- /dev/null +++ b/Godeps/_workspace/src/github.com/facebookgo/clock/clock_test.go @@ -0,0 +1,536 @@ +package clock_test + +import ( + "fmt" + "os" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/facebookgo/clock" +) + +// Ensure that the clock's After channel sends at the correct time. +func TestClock_After(t *testing.T) { + var ok bool + go func() { + time.Sleep(10 * time.Millisecond) + ok = true + }() + go func() { + time.Sleep(30 * time.Millisecond) + t.Fatal("too late") + }() + gosched() + + <-clock.New().After(20 * time.Millisecond) + if !ok { + t.Fatal("too early") + } +} + +// Ensure that the clock's AfterFunc executes at the correct time. +func TestClock_AfterFunc(t *testing.T) { + var ok bool + go func() { + time.Sleep(10 * time.Millisecond) + ok = true + }() + go func() { + time.Sleep(30 * time.Millisecond) + t.Fatal("too late") + }() + gosched() + + var wg sync.WaitGroup + wg.Add(1) + clock.New().AfterFunc(20*time.Millisecond, func() { + wg.Done() + }) + wg.Wait() + if !ok { + t.Fatal("too early") + } +} + +// Ensure that the clock's time matches the standary library. +func TestClock_Now(t *testing.T) { + a := time.Now().Round(time.Second) + b := clock.New().Now().Round(time.Second) + if !a.Equal(b) { + t.Errorf("not equal: %s != %s", a, b) + } +} + +// Ensure that the clock sleeps for the appropriate amount of time. +func TestClock_Sleep(t *testing.T) { + var ok bool + go func() { + time.Sleep(10 * time.Millisecond) + ok = true + }() + go func() { + time.Sleep(30 * time.Millisecond) + t.Fatal("too late") + }() + gosched() + + clock.New().Sleep(20 * time.Millisecond) + if !ok { + t.Fatal("too early") + } +} + +// Ensure that the clock ticks correctly. +func TestClock_Tick(t *testing.T) { + var ok bool + go func() { + time.Sleep(10 * time.Millisecond) + ok = true + }() + go func() { + time.Sleep(50 * time.Millisecond) + t.Fatal("too late") + }() + gosched() + + c := clock.New().Tick(20 * time.Millisecond) + <-c + <-c + if !ok { + t.Fatal("too early") + } +} + +// Ensure that the clock's ticker ticks correctly. +func TestClock_Ticker(t *testing.T) { + var ok bool + go func() { + time.Sleep(100 * time.Millisecond) + ok = true + }() + go func() { + time.Sleep(200 * time.Millisecond) + t.Fatal("too late") + }() + gosched() + + ticker := clock.New().Ticker(50 * time.Millisecond) + <-ticker.C + <-ticker.C + if !ok { + t.Fatal("too early") + } +} + +// Ensure that the clock's ticker can stop correctly. +func TestClock_Ticker_Stp(t *testing.T) { + var ok bool + go func() { + time.Sleep(10 * time.Millisecond) + ok = true + }() + gosched() + + ticker := clock.New().Ticker(20 * time.Millisecond) + <-ticker.C + ticker.Stop() + select { + case <-ticker.C: + t.Fatal("unexpected send") + case <-time.After(30 * time.Millisecond): + } +} + +// Ensure that the clock's timer waits correctly. +func TestClock_Timer(t *testing.T) { + var ok bool + go func() { + time.Sleep(10 * time.Millisecond) + ok = true + }() + go func() { + time.Sleep(30 * time.Millisecond) + t.Fatal("too late") + }() + gosched() + + timer := clock.New().Timer(20 * time.Millisecond) + <-timer.C + if !ok { + t.Fatal("too early") + } +} + +// Ensure that the clock's timer can be stopped. +func TestClock_Timer_Stop(t *testing.T) { + var ok bool + go func() { + time.Sleep(10 * time.Millisecond) + ok = true + }() + + timer := clock.New().Timer(20 * time.Millisecond) + timer.Stop() + select { + case <-timer.C: + t.Fatal("unexpected send") + case <-time.After(30 * time.Millisecond): + } +} + +// Ensure that the mock's After channel sends at the correct time. +func TestMock_After(t *testing.T) { + var ok int32 + clock := clock.NewMock() + + // Create a channel to execute after 10 mock seconds. + ch := clock.After(10 * time.Second) + go func(ch <-chan time.Time) { + <-ch + atomic.StoreInt32(&ok, 1) + }(ch) + + // Move clock forward to just before the time. + clock.Add(9 * time.Second) + if atomic.LoadInt32(&ok) == 1 { + t.Fatal("too early") + } + + // Move clock forward to the after channel's time. + clock.Add(1 * time.Second) + if atomic.LoadInt32(&ok) == 0 { + t.Fatal("too late") + } +} + +// Ensure that the mock's AfterFunc executes at the correct time. +func TestMock_AfterFunc(t *testing.T) { + var ok int32 + clock := clock.NewMock() + + // Execute function after duration. + clock.AfterFunc(10*time.Second, func() { + atomic.StoreInt32(&ok, 1) + }) + + // Move clock forward to just before the time. + clock.Add(9 * time.Second) + if atomic.LoadInt32(&ok) == 1 { + t.Fatal("too early") + } + + // Move clock forward to the after channel's time. + clock.Add(1 * time.Second) + if atomic.LoadInt32(&ok) == 0 { + t.Fatal("too late") + } +} + +// Ensure that the mock's AfterFunc doesn't execute if stopped. +func TestMock_AfterFunc_Stop(t *testing.T) { + // Execute function after duration. + clock := clock.NewMock() + timer := clock.AfterFunc(10*time.Second, func() { + t.Fatal("unexpected function execution") + }) + gosched() + + // Stop timer & move clock forward. + timer.Stop() + clock.Add(10 * time.Second) + gosched() +} + +// Ensure that the mock's current time can be changed. +func TestMock_Now(t *testing.T) { + clock := clock.NewMock() + if now := clock.Now(); !now.Equal(time.Unix(0, 0)) { + t.Fatalf("expected epoch, got: ", now) + } + + // Add 10 seconds and check the time. + clock.Add(10 * time.Second) + if now := clock.Now(); !now.Equal(time.Unix(10, 0)) { + t.Fatalf("expected epoch, got: ", now) + } +} + +// Ensure that the mock can sleep for the correct time. +func TestMock_Sleep(t *testing.T) { + var ok int32 + clock := clock.NewMock() + + // Create a channel to execute after 10 mock seconds. + go func() { + clock.Sleep(10 * time.Second) + atomic.StoreInt32(&ok, 1) + }() + gosched() + + // Move clock forward to just before the sleep duration. + clock.Add(9 * time.Second) + if atomic.LoadInt32(&ok) == 1 { + t.Fatal("too early") + } + + // Move clock forward to the after the sleep duration. + clock.Add(1 * time.Second) + if atomic.LoadInt32(&ok) == 0 { + t.Fatal("too late") + } +} + +// Ensure that the mock's Tick channel sends at the correct time. +func TestMock_Tick(t *testing.T) { + var n int32 + clock := clock.NewMock() + + // Create a channel to increment every 10 seconds. + go func() { + tick := clock.Tick(10 * time.Second) + for { + <-tick + atomic.AddInt32(&n, 1) + } + }() + gosched() + + // Move clock forward to just before the first tick. + clock.Add(9 * time.Second) + if atomic.LoadInt32(&n) != 0 { + t.Fatalf("expected 0, got %d", n) + } + + // Move clock forward to the start of the first tick. + clock.Add(1 * time.Second) + if atomic.LoadInt32(&n) != 1 { + t.Fatalf("expected 1, got %d", n) + } + + // Move clock forward over several ticks. + clock.Add(30 * time.Second) + if atomic.LoadInt32(&n) != 4 { + t.Fatalf("expected 4, got %d", n) + } +} + +// Ensure that the mock's Ticker channel sends at the correct time. +func TestMock_Ticker(t *testing.T) { + var n int32 + clock := clock.NewMock() + + // Create a channel to increment every microsecond. + go func() { + ticker := clock.Ticker(1 * time.Microsecond) + for { + <-ticker.C + atomic.AddInt32(&n, 1) + } + }() + gosched() + + // Move clock forward. + clock.Add(10 * time.Microsecond) + if atomic.LoadInt32(&n) != 10 { + t.Fatalf("unexpected: %d", n) + } +} + +// Ensure that the mock's Ticker channel won't block if not read from. +func TestMock_Ticker_Overflow(t *testing.T) { + clock := clock.NewMock() + ticker := clock.Ticker(1 * time.Microsecond) + clock.Add(10 * time.Microsecond) + ticker.Stop() +} + +// Ensure that the mock's Ticker can be stopped. +func TestMock_Ticker_Stop(t *testing.T) { + var n int32 + clock := clock.NewMock() + + // Create a channel to increment every second. + ticker := clock.Ticker(1 * time.Second) + go func() { + for { + <-ticker.C + atomic.AddInt32(&n, 1) + } + }() + gosched() + + // Move clock forward. + clock.Add(5 * time.Second) + if atomic.LoadInt32(&n) != 5 { + t.Fatalf("expected 5, got: %d", n) + } + + ticker.Stop() + + // Move clock forward again. + clock.Add(5 * time.Second) + if atomic.LoadInt32(&n) != 5 { + t.Fatalf("still expected 5, got: %d", n) + } +} + +// Ensure that multiple tickers can be used together. +func TestMock_Ticker_Multi(t *testing.T) { + var n int32 + clock := clock.NewMock() + + go func() { + a := clock.Ticker(1 * time.Microsecond) + b := clock.Ticker(3 * time.Microsecond) + + for { + select { + case <-a.C: + atomic.AddInt32(&n, 1) + case <-b.C: + atomic.AddInt32(&n, 100) + } + } + }() + gosched() + + // Move clock forward. + clock.Add(10 * time.Microsecond) + gosched() + if atomic.LoadInt32(&n) != 310 { + t.Fatalf("unexpected: %d", n) + } +} + +func ExampleMock_After() { + // Create a new mock clock. + clock := clock.NewMock() + count := 0 + + // Create a channel to execute after 10 mock seconds. + go func() { + <-clock.After(10 * time.Second) + count = 100 + }() + runtime.Gosched() + + // Print the starting value. + fmt.Printf("%s: %d\n", clock.Now().UTC(), count) + + // Move the clock forward 5 seconds and print the value again. + clock.Add(5 * time.Second) + fmt.Printf("%s: %d\n", clock.Now().UTC(), count) + + // Move the clock forward 5 seconds to the tick time and check the value. + clock.Add(5 * time.Second) + fmt.Printf("%s: %d\n", clock.Now().UTC(), count) + + // Output: + // 1970-01-01 00:00:00 +0000 UTC: 0 + // 1970-01-01 00:00:05 +0000 UTC: 0 + // 1970-01-01 00:00:10 +0000 UTC: 100 +} + +func ExampleMock_AfterFunc() { + // Create a new mock clock. + clock := clock.NewMock() + count := 0 + + // Execute a function after 10 mock seconds. + clock.AfterFunc(10*time.Second, func() { + count = 100 + }) + runtime.Gosched() + + // Print the starting value. + fmt.Printf("%s: %d\n", clock.Now().UTC(), count) + + // Move the clock forward 10 seconds and print the new value. + clock.Add(10 * time.Second) + fmt.Printf("%s: %d\n", clock.Now().UTC(), count) + + // Output: + // 1970-01-01 00:00:00 +0000 UTC: 0 + // 1970-01-01 00:00:10 +0000 UTC: 100 +} + +func ExampleMock_Sleep() { + // Create a new mock clock. + clock := clock.NewMock() + count := 0 + + // Execute a function after 10 mock seconds. + go func() { + clock.Sleep(10 * time.Second) + count = 100 + }() + runtime.Gosched() + + // Print the starting value. + fmt.Printf("%s: %d\n", clock.Now().UTC(), count) + + // Move the clock forward 10 seconds and print the new value. + clock.Add(10 * time.Second) + fmt.Printf("%s: %d\n", clock.Now().UTC(), count) + + // Output: + // 1970-01-01 00:00:00 +0000 UTC: 0 + // 1970-01-01 00:00:10 +0000 UTC: 100 +} + +func ExampleMock_Ticker() { + // Create a new mock clock. + clock := clock.NewMock() + count := 0 + + // Increment count every mock second. + go func() { + ticker := clock.Ticker(1 * time.Second) + for { + <-ticker.C + count++ + } + }() + runtime.Gosched() + + // Move the clock forward 10 seconds and print the new value. + clock.Add(10 * time.Second) + fmt.Printf("Count is %d after 10 seconds\n", count) + + // Move the clock forward 5 more seconds and print the new value. + clock.Add(5 * time.Second) + fmt.Printf("Count is %d after 15 seconds\n", count) + + // Output: + // Count is 10 after 10 seconds + // Count is 15 after 15 seconds +} + +func ExampleMock_Timer() { + // Create a new mock clock. + clock := clock.NewMock() + count := 0 + + // Increment count after a mock second. + go func() { + timer := clock.Timer(1 * time.Second) + <-timer.C + count++ + }() + runtime.Gosched() + + // Move the clock forward 10 seconds and print the new value. + clock.Add(10 * time.Second) + fmt.Printf("Count is %d after 10 seconds\n", count) + + // Output: + // Count is 1 after 10 seconds +} + +func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) } +func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) } + +func gosched() { time.Sleep(1 * time.Millisecond) } diff --git a/Godeps/_workspace/src/github.com/facebookgo/httpdown/.travis.yml b/Godeps/_workspace/src/github.com/facebookgo/httpdown/.travis.yml new file mode 100644 index 000000000..d29694f6f --- /dev/null +++ b/Godeps/_workspace/src/github.com/facebookgo/httpdown/.travis.yml @@ -0,0 +1,23 @@ +language: go + +go: + - 1.3 + +matrix: + fast_finish: true + +before_install: + - go get -v code.google.com/p/go.tools/cmd/vet + - go get -v github.com/golang/lint/golint + - go get -v code.google.com/p/go.tools/cmd/cover + +install: + - go install -race -v std + - go get -race -t -v ./... + - go install -race -v ./... + +script: + - go vet ./... + - $HOME/gopath/bin/golint . + - go test -cpu=2 -race -v ./... + - go test -cpu=2 -covermode=atomic ./... diff --git a/Godeps/_workspace/src/github.com/facebookgo/httpdown/httpdown.go b/Godeps/_workspace/src/github.com/facebookgo/httpdown/httpdown.go new file mode 100644 index 000000000..2ad031ea0 --- /dev/null +++ b/Godeps/_workspace/src/github.com/facebookgo/httpdown/httpdown.go @@ -0,0 +1,380 @@ +// Package httpdown provides http.ConnState enabled graceful termination of +// http.Server. +package httpdown + +import ( + "crypto/tls" + "fmt" + "log" + "net" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/facebookgo/clock" + "github.com/facebookgo/stats" +) + +const ( + defaultStopTimeout = time.Minute + defaultKillTimeout = time.Minute +) + +// A Server allows encapsulates the process of accepting new connections and +// serving them, and gracefully shutting down the listener without dropping +// active connections. +type Server interface { + // Wait waits for the serving loop to finish. This will happen when Stop is + // called, at which point it returns no error, or if there is an error in the + // serving loop. You must call Wait after calling Serve or ListenAndServe. + Wait() error + + // Stop stops the listener. It will block until all connections have been + // closed. + Stop() error +} + +// HTTP defines the configuration for serving a http.Server. Multiple calls to +// Serve or ListenAndServe can be made on the same HTTP instance. The default +// timeouts of 1 minute each result in a maximum of 2 minutes before a Stop() +// returns. +type HTTP struct { + // StopTimeout is the duration before we begin force closing connections. + // Defaults to 1 minute. + StopTimeout time.Duration + + // KillTimeout is the duration before which we completely give up and abort + // even though we still have connected clients. This is useful when a large + // number of client connections exist and closing them can take a long time. + // Note, this is in addition to the StopTimeout. Defaults to 1 minute. + KillTimeout time.Duration + + // Stats is optional. If provided, it will be used to record various metrics. + Stats stats.Client + + // Clock allows for testing timing related functionality. Do not specify this + // in production code. + Clock clock.Clock +} + +// Serve provides the low-level API which is useful if you're creating your own +// net.Listener. +func (h HTTP) Serve(s *http.Server, l net.Listener) Server { + stopTimeout := h.StopTimeout + if stopTimeout == 0 { + stopTimeout = defaultStopTimeout + } + killTimeout := h.KillTimeout + if killTimeout == 0 { + killTimeout = defaultKillTimeout + } + klock := h.Clock + if klock == nil { + klock = clock.New() + } + + ss := &server{ + stopTimeout: stopTimeout, + killTimeout: killTimeout, + stats: h.Stats, + clock: klock, + oldConnState: s.ConnState, + listener: l, + server: s, + serveDone: make(chan struct{}), + serveErr: make(chan error, 1), + new: make(chan net.Conn), + active: make(chan net.Conn), + idle: make(chan net.Conn), + closed: make(chan net.Conn), + stop: make(chan chan struct{}), + kill: make(chan chan struct{}), + } + s.ConnState = ss.connState + go ss.manage() + go ss.serve() + return ss +} + +// ListenAndServe returns a Server for the given http.Server. It is equivalent +// to ListendAndServe from the standard library, but returns immediately. +// Requests will be accepted in a background goroutine. If the http.Server has +// a non-nil TLSConfig, a TLS enabled listener will be setup. +func (h HTTP) ListenAndServe(s *http.Server) (Server, error) { + addr := s.Addr + if addr == "" { + if s.TLSConfig == nil { + addr = ":http" + } else { + addr = ":https" + } + } + l, err := net.Listen("tcp", addr) + if err != nil { + stats.BumpSum(h.Stats, "listen.error", 1) + return nil, err + } + if s.TLSConfig != nil { + l = tls.NewListener(l, s.TLSConfig) + } + return h.Serve(s, l), nil +} + +// server manages the serving process and allows for gracefully stopping it. +type server struct { + stopTimeout time.Duration + killTimeout time.Duration + stats stats.Client + clock clock.Clock + + oldConnState func(net.Conn, http.ConnState) + server *http.Server + serveDone chan struct{} + serveErr chan error + listener net.Listener + + new chan net.Conn + active chan net.Conn + idle chan net.Conn + closed chan net.Conn + stop chan chan struct{} + kill chan chan struct{} + + stopOnce sync.Once + stopErr error +} + +func (s *server) connState(c net.Conn, cs http.ConnState) { + if s.oldConnState != nil { + s.oldConnState(c, cs) + } + + switch cs { + case http.StateNew: + s.new <- c + case http.StateActive: + s.active <- c + case http.StateIdle: + s.idle <- c + case http.StateHijacked, http.StateClosed: + s.closed <- c + } +} + +func (s *server) manage() { + defer func() { + close(s.new) + close(s.active) + close(s.idle) + close(s.closed) + close(s.stop) + close(s.kill) + }() + + var stopDone chan struct{} + + conns := map[net.Conn]http.ConnState{} + var countNew, countActive, countIdle float64 + + // decConn decrements the count associated with the current state of the + // given connection. + decConn := func(c net.Conn) { + switch conns[c] { + default: + panic(fmt.Errorf("unknown existing connection: %s", c)) + case http.StateNew: + countNew-- + case http.StateActive: + countActive-- + case http.StateIdle: + countIdle-- + } + } + + // setup a ticker to report various values every minute. if we don't have a + // Stats implementation provided, we Stop it so it never ticks. + statsTicker := s.clock.Ticker(time.Minute) + if s.stats == nil { + statsTicker.Stop() + } + + for { + select { + case <-statsTicker.C: + // we'll only get here when s.stats is not nil + s.stats.BumpAvg("http-state.new", countNew) + s.stats.BumpAvg("http-state.active", countActive) + s.stats.BumpAvg("http-state.idle", countIdle) + s.stats.BumpAvg("http-state.total", countNew+countActive+countIdle) + case c := <-s.new: + conns[c] = http.StateNew + countNew++ + case c := <-s.active: + decConn(c) + countActive++ + + conns[c] = http.StateActive + case c := <-s.idle: + decConn(c) + countIdle++ + + conns[c] = http.StateIdle + + // if we're already stopping, close it + if stopDone != nil { + c.Close() + } + case c := <-s.closed: + stats.BumpSum(s.stats, "conn.closed", 1) + decConn(c) + delete(conns, c) + + // if we're waiting to stop and are all empty, we just closed the last + // connection and we're done. + if stopDone != nil && len(conns) == 0 { + close(stopDone) + return + } + case stopDone = <-s.stop: + // if we're already all empty, we're already done + if len(conns) == 0 { + close(stopDone) + return + } + + // close current idle connections right away + for c, cs := range conns { + if cs == http.StateIdle { + c.Close() + } + } + + // continue the loop and wait for all the ConnState updates which will + // eventually close(stopDone) and return from this goroutine. + + case killDone := <-s.kill: + // force close all connections + stats.BumpSum(s.stats, "kill.conn.count", float64(len(conns))) + for c := range conns { + c.Close() + } + + // don't block the kill. + close(killDone) + + // continue the loop and we wait for all the ConnState updates and will + // return from this goroutine when we're all done. otherwise we'll try to + // send those ConnState updates on closed channels. + + } + } +} + +func (s *server) serve() { + stats.BumpSum(s.stats, "serve", 1) + s.serveErr <- s.server.Serve(s.listener) + close(s.serveDone) + close(s.serveErr) +} + +func (s *server) Wait() error { + if err := <-s.serveErr; !isUseOfClosedError(err) { + return err + } + return nil +} + +func (s *server) Stop() error { + s.stopOnce.Do(func() { + defer stats.BumpTime(s.stats, "stop.time").End() + stats.BumpSum(s.stats, "stop", 1) + + // first disable keep-alive for new connections + s.server.SetKeepAlivesEnabled(false) + + // then close the listener so new connections can't connect come thru + closeErr := s.listener.Close() + <-s.serveDone + + // then trigger the background goroutine to stop and wait for it + stopDone := make(chan struct{}) + s.stop <- stopDone + + // wait for stop + select { + case <-stopDone: + case <-s.clock.After(s.stopTimeout): + defer stats.BumpTime(s.stats, "kill.time").End() + stats.BumpSum(s.stats, "kill", 1) + + // stop timed out, wait for kill + killDone := make(chan struct{}) + s.kill <- killDone + select { + case <-killDone: + case <-s.clock.After(s.killTimeout): + // kill timed out, give up + stats.BumpSum(s.stats, "kill.timeout", 1) + } + } + + if closeErr != nil && !isUseOfClosedError(closeErr) { + stats.BumpSum(s.stats, "listener.close.error", 1) + s.stopErr = closeErr + } + }) + return s.stopErr +} + +func isUseOfClosedError(err error) bool { + if err == nil { + return false + } + if opErr, ok := err.(*net.OpError); ok { + err = opErr.Err + } + return err.Error() == "use of closed network connection" +} + +// ListenAndServe is a convenience function to serve and wait for a SIGTERM +// or SIGINT before shutting down. +func ListenAndServe(s *http.Server, hd *HTTP) error { + if hd == nil { + hd = &HTTP{} + } + hs, err := hd.ListenAndServe(s) + if err != nil { + return err + } + log.Printf("serving on http://%s/ with pid %d\n", s.Addr, os.Getpid()) + + waiterr := make(chan error, 1) + go func() { + defer close(waiterr) + waiterr <- hs.Wait() + }() + + signals := make(chan os.Signal, 10) + signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT) + + select { + case err := <-waiterr: + if err != nil { + return err + } + case s := <-signals: + signal.Stop(signals) + log.Printf("signal received: %s\n", s) + if err := hs.Stop(); err != nil { + return err + } + if err := <-waiterr; err != nil { + return err + } + } + log.Println("exiting") + return nil +} diff --git a/Godeps/_workspace/src/github.com/facebookgo/httpdown/httpdown_example/main.go b/Godeps/_workspace/src/github.com/facebookgo/httpdown/httpdown_example/main.go new file mode 100644 index 000000000..9e3c0bff1 --- /dev/null +++ b/Godeps/_workspace/src/github.com/facebookgo/httpdown/httpdown_example/main.go @@ -0,0 +1,43 @@ +package main + +import ( + "flag" + "fmt" + "net/http" + "os" + "time" + + "github.com/facebookgo/httpdown" +) + +func handler(w http.ResponseWriter, r *http.Request) { + duration, err := time.ParseDuration(r.FormValue("duration")) + if err != nil { + http.Error(w, err.Error(), 400) + return + } + fmt.Fprintf(w, "going to sleep %s with pid %d\n", duration, os.Getpid()) + w.(http.Flusher).Flush() + time.Sleep(duration) + fmt.Fprintf(w, "slept %s with pid %d\n", duration, os.Getpid()) +} + +func main() { + server := &http.Server{ + Addr: "127.0.0.1:8080", + Handler: http.HandlerFunc(handler), + } + hd := &httpdown.HTTP{ + StopTimeout: 10 * time.Second, + KillTimeout: 1 * time.Second, + } + + flag.StringVar(&server.Addr, "addr", server.Addr, "http address") + flag.DurationVar(&hd.StopTimeout, "stop-timeout", hd.StopTimeout, "stop timeout") + flag.DurationVar(&hd.KillTimeout, "kill-timeout", hd.KillTimeout, "kill timeout") + flag.Parse() + + if err := httpdown.ListenAndServe(server, hd); err != nil { + panic(err) + } +} diff --git a/Godeps/_workspace/src/github.com/facebookgo/httpdown/httpdown_test.go b/Godeps/_workspace/src/github.com/facebookgo/httpdown/httpdown_test.go new file mode 100644 index 000000000..e582e22b6 --- /dev/null +++ b/Godeps/_workspace/src/github.com/facebookgo/httpdown/httpdown_test.go @@ -0,0 +1,677 @@ +package httpdown_test + +import ( + "bytes" + "crypto/tls" + "errors" + "fmt" + "io/ioutil" + "net" + "net/http" + "os" + "regexp" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/facebookgo/clock" + "github.com/facebookgo/ensure" + "github.com/facebookgo/freeport" + "github.com/facebookgo/httpdown" + "github.com/facebookgo/stats" +) + +type onCloseListener struct { + net.Listener + mutex sync.Mutex + onClose chan struct{} +} + +func (o *onCloseListener) Close() error { + // Listener is closed twice, once by Grace, and once by the http library, so + // we guard against a double close of the chan. + defer func() { + o.mutex.Lock() + defer o.mutex.Unlock() + if o.onClose != nil { + close(o.onClose) + o.onClose = nil + } + }() + return o.Listener.Close() +} + +func NewOnCloseListener(l net.Listener) (net.Listener, chan struct{}) { + c := make(chan struct{}) + return &onCloseListener{Listener: l, onClose: c}, c +} + +type closeErrListener struct { + net.Listener + err error +} + +func (c *closeErrListener) Close() error { + c.Listener.Close() + return c.err +} + +type acceptErrListener struct { + net.Listener + err chan error +} + +func (c *acceptErrListener) Accept() (net.Conn, error) { + return nil, <-c.err +} + +type closeErrConn struct { + net.Conn + unblockClose chan chan struct{} +} + +func (c *closeErrConn) Close() error { + ch := <-c.unblockClose + + // Close gets called multiple times, but only the first one gets this ch + if ch != nil { + defer close(ch) + } + + return c.Conn.Close() +} + +type closeErrConnListener struct { + net.Listener + unblockClose chan chan struct{} +} + +func (l *closeErrConnListener) Accept() (net.Conn, error) { + c, err := l.Listener.Accept() + if err != nil { + return c, err + } + return &closeErrConn{Conn: c, unblockClose: l.unblockClose}, nil +} + +func TestHTTPStopWithNoRequest(t *testing.T) { + t.Parallel() + listener, err := net.Listen("tcp", "127.0.0.1:0") + ensure.Nil(t, err) + + statsDone := make(chan struct{}, 2) + hc := &stats.HookClient{ + BumpSumHook: func(key string, val float64) { + if key == "serve" && val == 1 { + statsDone <- struct{}{} + } + if key == "stop" && val == 1 { + statsDone <- struct{}{} + } + }, + } + + server := &http.Server{} + down := &httpdown.HTTP{Stats: hc} + s := down.Serve(server, listener) + ensure.Nil(t, s.Stop()) + <-statsDone + <-statsDone +} + +func TestHTTPStopWithFinishedRequest(t *testing.T) { + t.Parallel() + hello := []byte("hello") + fin := make(chan struct{}) + okHandler := func(w http.ResponseWriter, r *http.Request) { + defer close(fin) + w.Write(hello) + } + + listener, err := net.Listen("tcp", "127.0.0.1:0") + ensure.Nil(t, err) + server := &http.Server{Handler: http.HandlerFunc(okHandler)} + transport := &http.Transport{} + client := &http.Client{Transport: transport} + down := &httpdown.HTTP{} + s := down.Serve(server, listener) + res, err := client.Get(fmt.Sprintf("http://%s/", listener.Addr().String())) + ensure.Nil(t, err) + actualBody, err := ioutil.ReadAll(res.Body) + ensure.Nil(t, err) + ensure.DeepEqual(t, actualBody, hello) + ensure.Nil(t, res.Body.Close()) + + // At this point the request is finished, and the connection should be alive + // but idle (because we have keep alive enabled by default in our Transport). + ensure.Nil(t, s.Stop()) + <-fin + + ensure.Nil(t, s.Wait()) +} + +func TestHTTPStopWithActiveRequest(t *testing.T) { + t.Parallel() + const count = 10000 + hello := []byte("hello") + finOkHandler := make(chan struct{}) + okHandler := func(w http.ResponseWriter, r *http.Request) { + defer close(finOkHandler) + w.WriteHeader(200) + for i := 0; i < count; i++ { + w.Write(hello) + } + } + + listener, err := net.Listen("tcp", "127.0.0.1:0") + ensure.Nil(t, err) + server := &http.Server{Handler: http.HandlerFunc(okHandler)} + transport := &http.Transport{} + client := &http.Client{Transport: transport} + down := &httpdown.HTTP{} + s := down.Serve(server, listener) + res, err := client.Get(fmt.Sprintf("http://%s/", listener.Addr().String())) + ensure.Nil(t, err) + + finStop := make(chan struct{}) + go func() { + defer close(finStop) + ensure.Nil(t, s.Stop()) + }() + + actualBody, err := ioutil.ReadAll(res.Body) + ensure.Nil(t, err) + ensure.DeepEqual(t, actualBody, bytes.Repeat(hello, count)) + ensure.Nil(t, res.Body.Close()) + <-finOkHandler + <-finStop +} + +func TestNewRequestAfterStop(t *testing.T) { + t.Parallel() + const count = 10000 + hello := []byte("hello") + finOkHandler := make(chan struct{}) + unblockOkHandler := make(chan struct{}) + okHandler := func(w http.ResponseWriter, r *http.Request) { + defer close(finOkHandler) + w.WriteHeader(200) + const diff = 500 + for i := 0; i < count-diff; i++ { + w.Write(hello) + } + <-unblockOkHandler + for i := 0; i < diff; i++ { + w.Write(hello) + } + } + + listener, err := net.Listen("tcp", "127.0.0.1:0") + listener, onClose := NewOnCloseListener(listener) + ensure.Nil(t, err) + server := &http.Server{Handler: http.HandlerFunc(okHandler)} + transport := &http.Transport{} + client := &http.Client{Transport: transport} + down := &httpdown.HTTP{} + s := down.Serve(server, listener) + res, err := client.Get(fmt.Sprintf("http://%s/", listener.Addr().String())) + ensure.Nil(t, err) + + finStop := make(chan struct{}) + go func() { + defer close(finStop) + ensure.Nil(t, s.Stop()) + }() + + // Wait until the listener is closed. + <-onClose + + // Now the next request should not be able to connect as the listener is + // now closed. + _, err = client.Get(fmt.Sprintf("http://%s/", listener.Addr().String())) + + // We should just get "connection refused" here, but sometimes, very rarely, + // we get a "connection reset" instead. Unclear why this happens. + ensure.Err(t, err, regexp.MustCompile("(connection refused|connection reset by peer)$")) + + // Unblock the handler and ensure we finish writing the rest of the body + // successfully. + close(unblockOkHandler) + actualBody, err := ioutil.ReadAll(res.Body) + ensure.Nil(t, err) + ensure.DeepEqual(t, actualBody, bytes.Repeat(hello, count)) + ensure.Nil(t, res.Body.Close()) + <-finOkHandler + <-finStop +} + +func TestHTTPListenerCloseError(t *testing.T) { + t.Parallel() + expectedError := errors.New("foo") + listener, err := net.Listen("tcp", "127.0.0.1:0") + listener = &closeErrListener{Listener: listener, err: expectedError} + ensure.Nil(t, err) + server := &http.Server{} + down := &httpdown.HTTP{} + s := down.Serve(server, listener) + ensure.DeepEqual(t, s.Stop(), expectedError) +} + +func TestHTTPServeError(t *testing.T) { + t.Parallel() + expectedError := errors.New("foo") + listener, err := net.Listen("tcp", "127.0.0.1:0") + errChan := make(chan error) + listener = &acceptErrListener{Listener: listener, err: errChan} + ensure.Nil(t, err) + server := &http.Server{} + down := &httpdown.HTTP{} + s := down.Serve(server, listener) + errChan <- expectedError + ensure.DeepEqual(t, s.Wait(), expectedError) + ensure.Nil(t, s.Stop()) +} + +func TestHTTPWithinStopTimeout(t *testing.T) { + t.Parallel() + hello := []byte("hello") + finOkHandler := make(chan struct{}) + okHandler := func(w http.ResponseWriter, r *http.Request) { + defer close(finOkHandler) + w.WriteHeader(200) + w.Write(hello) + } + + listener, err := net.Listen("tcp", "127.0.0.1:0") + ensure.Nil(t, err) + server := &http.Server{Handler: http.HandlerFunc(okHandler)} + transport := &http.Transport{} + client := &http.Client{Transport: transport} + down := &httpdown.HTTP{StopTimeout: time.Minute} + s := down.Serve(server, listener) + res, err := client.Get(fmt.Sprintf("http://%s/", listener.Addr().String())) + ensure.Nil(t, err) + + finStop := make(chan struct{}) + go func() { + defer close(finStop) + ensure.Nil(t, s.Stop()) + }() + + actualBody, err := ioutil.ReadAll(res.Body) + ensure.Nil(t, err) + ensure.DeepEqual(t, actualBody, hello) + ensure.Nil(t, res.Body.Close()) + <-finOkHandler + <-finStop +} + +func TestHTTPStopTimeoutMissed(t *testing.T) { + t.Parallel() + + klock := clock.NewMock() + + const count = 10000 + hello := []byte("hello") + finOkHandler := make(chan struct{}) + unblockOkHandler := make(chan struct{}) + okHandler := func(w http.ResponseWriter, r *http.Request) { + defer close(finOkHandler) + w.Header().Set("Content-Length", fmt.Sprint(len(hello)*count)) + w.WriteHeader(200) + for i := 0; i < count/2; i++ { + w.Write(hello) + } + <-unblockOkHandler + for i := 0; i < count/2; i++ { + w.Write(hello) + } + } + + listener, err := net.Listen("tcp", "127.0.0.1:0") + ensure.Nil(t, err) + server := &http.Server{Handler: http.HandlerFunc(okHandler)} + transport := &http.Transport{} + client := &http.Client{Transport: transport} + down := &httpdown.HTTP{ + StopTimeout: time.Minute, + Clock: klock, + } + s := down.Serve(server, listener) + res, err := client.Get(fmt.Sprintf("http://%s/", listener.Addr().String())) + ensure.Nil(t, err) + + finStop := make(chan struct{}) + go func() { + defer close(finStop) + ensure.Nil(t, s.Stop()) + }() + + klock.Wait(clock.Calls{After: 1}) // wait for Stop to call After + klock.Add(down.StopTimeout) + + _, err = ioutil.ReadAll(res.Body) + ensure.Err(t, err, regexp.MustCompile("^unexpected EOF$")) + ensure.Nil(t, res.Body.Close()) + close(unblockOkHandler) + <-finOkHandler + <-finStop +} + +func TestHTTPKillTimeout(t *testing.T) { + t.Parallel() + + klock := clock.NewMock() + + statsDone := make(chan struct{}, 1) + hc := &stats.HookClient{ + BumpSumHook: func(key string, val float64) { + if key == "kill" && val == 1 { + statsDone <- struct{}{} + } + }, + } + + const count = 10000 + hello := []byte("hello") + finOkHandler := make(chan struct{}) + unblockOkHandler := make(chan struct{}) + okHandler := func(w http.ResponseWriter, r *http.Request) { + defer close(finOkHandler) + w.Header().Set("Content-Length", fmt.Sprint(len(hello)*count)) + w.WriteHeader(200) + for i := 0; i < count/2; i++ { + w.Write(hello) + } + <-unblockOkHandler + for i := 0; i < count/2; i++ { + w.Write(hello) + } + } + + listener, err := net.Listen("tcp", "127.0.0.1:0") + ensure.Nil(t, err) + server := &http.Server{Handler: http.HandlerFunc(okHandler)} + transport := &http.Transport{} + client := &http.Client{Transport: transport} + down := &httpdown.HTTP{ + StopTimeout: time.Minute, + KillTimeout: time.Minute, + Stats: hc, + Clock: klock, + } + s := down.Serve(server, listener) + res, err := client.Get(fmt.Sprintf("http://%s/", listener.Addr().String())) + ensure.Nil(t, err) + + finStop := make(chan struct{}) + go func() { + defer close(finStop) + ensure.Nil(t, s.Stop()) + }() + + klock.Wait(clock.Calls{After: 1}) // wait for Stop to call After + klock.Add(down.StopTimeout) + + _, err = ioutil.ReadAll(res.Body) + ensure.Err(t, err, regexp.MustCompile("^unexpected EOF$")) + ensure.Nil(t, res.Body.Close()) + close(unblockOkHandler) + <-finOkHandler + <-finStop + <-statsDone +} + +func TestHTTPKillTimeoutMissed(t *testing.T) { + t.Parallel() + + klock := clock.NewMock() + + statsDone := make(chan struct{}, 1) + hc := &stats.HookClient{ + BumpSumHook: func(key string, val float64) { + if key == "kill.timeout" && val == 1 { + statsDone <- struct{}{} + } + }, + } + + const count = 10000 + hello := []byte("hello") + finOkHandler := make(chan struct{}) + unblockOkHandler := make(chan struct{}) + okHandler := func(w http.ResponseWriter, r *http.Request) { + defer close(finOkHandler) + w.Header().Set("Content-Length", fmt.Sprint(len(hello)*count)) + w.WriteHeader(200) + for i := 0; i < count/2; i++ { + w.Write(hello) + } + <-unblockOkHandler + for i := 0; i < count/2; i++ { + w.Write(hello) + } + } + + listener, err := net.Listen("tcp", "127.0.0.1:0") + ensure.Nil(t, err) + unblockConnClose := make(chan chan struct{}, 1) + listener = &closeErrConnListener{ + Listener: listener, + unblockClose: unblockConnClose, + } + + server := &http.Server{Handler: http.HandlerFunc(okHandler)} + transport := &http.Transport{} + client := &http.Client{Transport: transport} + down := &httpdown.HTTP{ + StopTimeout: time.Minute, + KillTimeout: time.Minute, + Stats: hc, + Clock: klock, + } + s := down.Serve(server, listener) + res, err := client.Get(fmt.Sprintf("http://%s/", listener.Addr().String())) + ensure.Nil(t, err) + + // Start the Stop process. + finStop := make(chan struct{}) + go func() { + defer close(finStop) + ensure.Nil(t, s.Stop()) + }() + + klock.Wait(clock.Calls{After: 1}) // wait for Stop to call After + klock.Add(down.StopTimeout) // trigger stop timeout + klock.Wait(clock.Calls{After: 2}) // wait for Kill to call After + klock.Add(down.KillTimeout) // trigger kill timeout + + // We hit both the StopTimeout & the KillTimeout. + <-finStop + + // Then we unblock the Close, so we get an unexpected EOF since we close + // before we finish writing the response. + connCloseDone := make(chan struct{}) + unblockConnClose <- connCloseDone + <-connCloseDone + close(unblockConnClose) + + // Then we unblock the handler which tries to write the rest of the data. + close(unblockOkHandler) + + _, err = ioutil.ReadAll(res.Body) + ensure.Err(t, err, regexp.MustCompile("^unexpected EOF$")) + ensure.Nil(t, res.Body.Close()) + <-finOkHandler + <-statsDone +} + +func TestDoubleStop(t *testing.T) { + t.Parallel() + listener, err := net.Listen("tcp", "127.0.0.1:0") + ensure.Nil(t, err) + server := &http.Server{} + down := &httpdown.HTTP{} + s := down.Serve(server, listener) + ensure.Nil(t, s.Stop()) + ensure.Nil(t, s.Stop()) +} + +func TestExistingConnState(t *testing.T) { + t.Parallel() + hello := []byte("hello") + fin := make(chan struct{}) + okHandler := func(w http.ResponseWriter, r *http.Request) { + defer close(fin) + w.Write(hello) + } + + var called int32 + listener, err := net.Listen("tcp", "127.0.0.1:0") + ensure.Nil(t, err) + server := &http.Server{ + Handler: http.HandlerFunc(okHandler), + ConnState: func(c net.Conn, s http.ConnState) { + atomic.AddInt32(&called, 1) + }, + } + transport := &http.Transport{} + client := &http.Client{Transport: transport} + down := &httpdown.HTTP{} + s := down.Serve(server, listener) + res, err := client.Get(fmt.Sprintf("http://%s/", listener.Addr().String())) + ensure.Nil(t, err) + actualBody, err := ioutil.ReadAll(res.Body) + ensure.Nil(t, err) + ensure.DeepEqual(t, actualBody, hello) + ensure.Nil(t, res.Body.Close()) + + ensure.Nil(t, s.Stop()) + <-fin + + ensure.True(t, atomic.LoadInt32(&called) > 0) +} + +func TestHTTPDefaultListenError(t *testing.T) { + if os.Getuid() == 0 { + t.Skip("cant run this test as root") + } + + statsDone := make(chan struct{}, 1) + hc := &stats.HookClient{ + BumpSumHook: func(key string, val float64) { + if key == "listen.error" && val == 1 { + statsDone <- struct{}{} + } + }, + } + + t.Parallel() + down := &httpdown.HTTP{Stats: hc} + _, err := down.ListenAndServe(&http.Server{}) + ensure.Err(t, err, regexp.MustCompile("listen tcp :80: bind: permission denied")) + <-statsDone +} + +func TestHTTPSDefaultListenError(t *testing.T) { + if os.Getuid() == 0 { + t.Skip("cant run this test as root") + } + t.Parallel() + + cert, err := tls.X509KeyPair(localhostCert, localhostKey) + if err != nil { + t.Fatalf("error loading cert: %v", err) + } + + down := &httpdown.HTTP{} + _, err = down.ListenAndServe(&http.Server{ + TLSConfig: &tls.Config{ + NextProtos: []string{"http/1.1"}, + Certificates: []tls.Certificate{cert}, + }, + }) + ensure.Err(t, err, regexp.MustCompile("listen tcp :443: bind: permission denied")) +} + +func TestTLS(t *testing.T) { + t.Parallel() + port, err := freeport.Get() + ensure.Nil(t, err) + + cert, err := tls.X509KeyPair(localhostCert, localhostKey) + if err != nil { + t.Fatalf("error loading cert: %v", err) + } + const count = 10000 + hello := []byte("hello") + finOkHandler := make(chan struct{}) + okHandler := func(w http.ResponseWriter, r *http.Request) { + defer close(finOkHandler) + w.WriteHeader(200) + for i := 0; i < count; i++ { + w.Write(hello) + } + } + + server := &http.Server{ + Addr: fmt.Sprintf("0.0.0.0:%d", port), + Handler: http.HandlerFunc(okHandler), + TLSConfig: &tls.Config{ + NextProtos: []string{"http/1.1"}, + Certificates: []tls.Certificate{cert}, + }, + } + transport := &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + } + client := &http.Client{Transport: transport} + down := &httpdown.HTTP{} + s, err := down.ListenAndServe(server) + ensure.Nil(t, err) + res, err := client.Get(fmt.Sprintf("https://%s/", server.Addr)) + ensure.Nil(t, err) + + finStop := make(chan struct{}) + go func() { + defer close(finStop) + ensure.Nil(t, s.Stop()) + }() + + actualBody, err := ioutil.ReadAll(res.Body) + ensure.Nil(t, err) + ensure.DeepEqual(t, actualBody, bytes.Repeat(hello, count)) + ensure.Nil(t, res.Body.Close()) + <-finOkHandler + <-finStop +} + +// localhostCert is a PEM-encoded TLS cert with SAN IPs +// "127.0.0.1" and "[::1]", expiring at the last second of 2049 (the end +// of ASN.1 time). +// generated from src/pkg/crypto/tls: +// go run generate_cert.go --rsa-bits 512 --host 127.0.0.1,::1,example.com --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h +var localhostCert = []byte(`-----BEGIN CERTIFICATE----- +MIIBdzCCASOgAwIBAgIBADALBgkqhkiG9w0BAQUwEjEQMA4GA1UEChMHQWNtZSBD +bzAeFw03MDAxMDEwMDAwMDBaFw00OTEyMzEyMzU5NTlaMBIxEDAOBgNVBAoTB0Fj +bWUgQ28wWjALBgkqhkiG9w0BAQEDSwAwSAJBALyCfqwwip8BvTKgVKGdmjZTU8DD +ndR+WALmFPIRqn89bOU3s30olKiqYEju/SFoEvMyFRT/TWEhXHDaufThqaMCAwEA +AaNoMGYwDgYDVR0PAQH/BAQDAgCkMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA8GA1Ud +EwEB/wQFMAMBAf8wLgYDVR0RBCcwJYILZXhhbXBsZS5jb22HBH8AAAGHEAAAAAAA +AAAAAAAAAAAAAAEwCwYJKoZIhvcNAQEFA0EAr/09uy108p51rheIOSnz4zgduyTl +M+4AmRo8/U1twEZLgfAGG/GZjREv2y4mCEUIM3HebCAqlA5jpRg76Rf8jw== +-----END CERTIFICATE-----`) + +// localhostKey is the private key for localhostCert. +var localhostKey = []byte(`-----BEGIN RSA PRIVATE KEY----- +MIIBOQIBAAJBALyCfqwwip8BvTKgVKGdmjZTU8DDndR+WALmFPIRqn89bOU3s30o +lKiqYEju/SFoEvMyFRT/TWEhXHDaufThqaMCAwEAAQJAPXuWUxTV8XyAt8VhNQER +LgzJcUKb9JVsoS1nwXgPksXnPDKnL9ax8VERrdNr+nZbj2Q9cDSXBUovfdtehcdP +qQIhAO48ZsPylbTrmtjDEKiHT2Ik04rLotZYS2U873J6I7WlAiEAypDjYxXyafv/ +Yo1pm9onwcetQKMW8CS3AjuV9Axzj6cCIEx2Il19fEMG4zny0WPlmbrcKvD/DpJQ +4FHrzsYlIVTpAiAas7S1uAvneqd0l02HlN9OxQKKlbUNXNme+rnOnOGS2wIgS0jW +zl1jvrOSJeP1PpAHohWz6LOhEr8uvltWkN6x3vE= +-----END RSA PRIVATE KEY-----`) diff --git a/Godeps/_workspace/src/github.com/facebookgo/httpdown/license b/Godeps/_workspace/src/github.com/facebookgo/httpdown/license new file mode 100644 index 000000000..d849082ff --- /dev/null +++ b/Godeps/_workspace/src/github.com/facebookgo/httpdown/license @@ -0,0 +1,30 @@ +BSD License + +For httpdown software + +Copyright (c) 2015, Facebook, Inc. All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + * Neither the name Facebook nor the names of its contributors may be used to + endorse or promote products derived from this software without specific + prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/Godeps/_workspace/src/github.com/facebookgo/httpdown/patents b/Godeps/_workspace/src/github.com/facebookgo/httpdown/patents new file mode 100644 index 000000000..f7133456a --- /dev/null +++ b/Godeps/_workspace/src/github.com/facebookgo/httpdown/patents @@ -0,0 +1,33 @@ +Additional Grant of Patent Rights Version 2 + +"Software" means the httpdown software distributed by Facebook, Inc. + +Facebook, Inc. ("Facebook") hereby grants to each recipient of the Software +("you") a perpetual, worldwide, royalty-free, non-exclusive, irrevocable +(subject to the termination provision below) license under any Necessary +Claims, to make, have made, use, sell, offer to sell, import, and otherwise +transfer the Software. For avoidance of doubt, no license is granted under +Facebook’s rights in any patent claims that are infringed by (i) modifications +to the Software made by you or any third party or (ii) the Software in +combination with any software or other technology. + +The license granted hereunder will terminate, automatically and without notice, +if you (or any of your subsidiaries, corporate affiliates or agents) initiate +directly or indirectly, or take a direct financial interest in, any Patent +Assertion: (i) against Facebook or any of its subsidiaries or corporate +affiliates, (ii) against any party if such Patent Assertion arises in whole or +in part from any software, technology, product or service of Facebook or any of +its subsidiaries or corporate affiliates, or (iii) against any party relating +to the Software. Notwithstanding the foregoing, if Facebook or any of its +subsidiaries or corporate affiliates files a lawsuit alleging patent +infringement against you in the first instance, and you respond by filing a +patent infringement counterclaim in that lawsuit against that party that is +unrelated to the Software, the license granted hereunder will not terminate +under section (i) of this paragraph due to such counterclaim. + +A "Necessary Claim" is a claim of a patent owned by Facebook that is +necessarily infringed by the Software standing alone. + +A "Patent Assertion" is any lawsuit or other action alleging direct, indirect, +or contributory infringement or inducement to infringe any patent, including a +cross-claim or counterclaim. diff --git a/Godeps/_workspace/src/github.com/facebookgo/httpdown/readme.md b/Godeps/_workspace/src/github.com/facebookgo/httpdown/readme.md new file mode 100644 index 000000000..d5fa245db --- /dev/null +++ b/Godeps/_workspace/src/github.com/facebookgo/httpdown/readme.md @@ -0,0 +1,41 @@ +httpdown [![Build Status](https://secure.travis-ci.org/facebookgo/httpdown.png)](https://travis-ci.org/facebookgo/httpdown) +======== + +Documentation: https://godoc.org/github.com/facebookgo/httpdown + +Package httpdown provides a library that makes it easy to build a HTTP server +that can be shutdown gracefully (that is, without dropping any connections). + +If you want graceful restart and not just graceful shutdown, look at the +[grace](https://github.com/facebookgo/grace) package which uses this package +underneath but also provides graceful restart. + +Usage +----- + +Demo HTTP Server with graceful termination: +https://github.com/facebookgo/httpdown/blob/master/httpdown_example/main.go + +1. Install the demo application + + go get github.com/facebookgo/httpdown/httpdown_example + +1. Start it in the first terminal + + httpdown_example + + This will output something like: + + 2014/11/18 21:57:50 serving on http://127.0.0.1:8080/ with pid 17 + +1. In a second terminal start a slow HTTP request + + curl 'http://localhost:8080/?duration=20s' + +1. In a third terminal trigger a graceful shutdown (using the pid from your output): + + kill -TERM 17 + +This will demonstrate that the slow request was served before the server was +shutdown. You could also have used `Ctrl-C` instead of `kill` as the example +application triggers graceful shutdown on TERM or INT signals. diff --git a/Godeps/_workspace/src/github.com/facebookgo/stats/.travis.yml b/Godeps/_workspace/src/github.com/facebookgo/stats/.travis.yml new file mode 100644 index 000000000..2cc62c5e8 --- /dev/null +++ b/Godeps/_workspace/src/github.com/facebookgo/stats/.travis.yml @@ -0,0 +1,24 @@ +language: go + +go: + - 1.2 + - 1.3 + +matrix: + fast_finish: true + +before_install: + - go get -v code.google.com/p/go.tools/cmd/vet + - go get -v github.com/golang/lint/golint + - go get -v code.google.com/p/go.tools/cmd/cover + +install: + - go install -race -v std + - go get -race -t -v ./... + - go install -race -v ./... + +script: + - go vet ./... + - $HOME/gopath/bin/golint . + - go test -cpu=2 -race -v ./... + - go test -cpu=2 -covermode=atomic ./... diff --git a/Godeps/_workspace/src/github.com/facebookgo/stats/license b/Godeps/_workspace/src/github.com/facebookgo/stats/license new file mode 100644 index 000000000..feae87075 --- /dev/null +++ b/Godeps/_workspace/src/github.com/facebookgo/stats/license @@ -0,0 +1,30 @@ +BSD License + +For stats software + +Copyright (c) 2015, Facebook, Inc. All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + * Neither the name Facebook nor the names of its contributors may be used to + endorse or promote products derived from this software without specific + prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/Godeps/_workspace/src/github.com/facebookgo/stats/patents b/Godeps/_workspace/src/github.com/facebookgo/stats/patents new file mode 100644 index 000000000..5d7617212 --- /dev/null +++ b/Godeps/_workspace/src/github.com/facebookgo/stats/patents @@ -0,0 +1,33 @@ +Additional Grant of Patent Rights Version 2 + +"Software" means the stats software distributed by Facebook, Inc. + +Facebook, Inc. ("Facebook") hereby grants to each recipient of the Software +("you") a perpetual, worldwide, royalty-free, non-exclusive, irrevocable +(subject to the termination provision below) license under any Necessary +Claims, to make, have made, use, sell, offer to sell, import, and otherwise +transfer the Software. For avoidance of doubt, no license is granted under +Facebook’s rights in any patent claims that are infringed by (i) modifications +to the Software made by you or any third party or (ii) the Software in +combination with any software or other technology. + +The license granted hereunder will terminate, automatically and without notice, +if you (or any of your subsidiaries, corporate affiliates or agents) initiate +directly or indirectly, or take a direct financial interest in, any Patent +Assertion: (i) against Facebook or any of its subsidiaries or corporate +affiliates, (ii) against any party if such Patent Assertion arises in whole or +in part from any software, technology, product or service of Facebook or any of +its subsidiaries or corporate affiliates, or (iii) against any party relating +to the Software. Notwithstanding the foregoing, if Facebook or any of its +subsidiaries or corporate affiliates files a lawsuit alleging patent +infringement against you in the first instance, and you respond by filing a +patent infringement counterclaim in that lawsuit against that party that is +unrelated to the Software, the license granted hereunder will not terminate +under section (i) of this paragraph due to such counterclaim. + +A "Necessary Claim" is a claim of a patent owned by Facebook that is +necessarily infringed by the Software standing alone. + +A "Patent Assertion" is any lawsuit or other action alleging direct, indirect, +or contributory infringement or inducement to infringe any patent, including a +cross-claim or counterclaim. diff --git a/Godeps/_workspace/src/github.com/facebookgo/stats/readme.md b/Godeps/_workspace/src/github.com/facebookgo/stats/readme.md new file mode 100644 index 000000000..f268ed307 --- /dev/null +++ b/Godeps/_workspace/src/github.com/facebookgo/stats/readme.md @@ -0,0 +1,4 @@ +stats [![Build Status](https://secure.travis-ci.org/facebookgo/stats.png)](https://travis-ci.org/facebookgo/stats) +===== + +Documentation: https://godoc.org/github.com/facebookgo/stats diff --git a/Godeps/_workspace/src/github.com/facebookgo/stats/stats.go b/Godeps/_workspace/src/github.com/facebookgo/stats/stats.go new file mode 100644 index 000000000..b833506aa --- /dev/null +++ b/Godeps/_workspace/src/github.com/facebookgo/stats/stats.go @@ -0,0 +1,166 @@ +// Package stats defines a lightweight interface for collecting statistics. It +// doesn't provide an implementation, just the shared interface. +package stats + +// Client provides methods to collection statistics. +type Client interface { + // BumpAvg bumps the average for the given key. + BumpAvg(key string, val float64) + + // BumpSum bumps the sum for the given key. + BumpSum(key string, val float64) + + // BumpHistogram bumps the histogram for the given key. + BumpHistogram(key string, val float64) + + // BumpTime is a special version of BumpHistogram which is specialized for + // timers. Calling it starts the timer, and it returns a value on which End() + // can be called to indicate finishing the timer. A convenient way of + // recording the duration of a function is calling it like such at the top of + // the function: + // + // defer s.BumpTime("my.function").End() + BumpTime(key string) interface { + End() + } +} + +// PrefixClient adds multiple keys for the same value, with each prefix +// added to the key and calls the underlying client. +func PrefixClient(prefixes []string, client Client) Client { + return &prefixClient{ + Prefixes: prefixes, + Client: client, + } +} + +type prefixClient struct { + Prefixes []string + Client Client +} + +func (p *prefixClient) BumpAvg(key string, val float64) { + for _, prefix := range p.Prefixes { + p.Client.BumpAvg(prefix+key, val) + } +} + +func (p *prefixClient) BumpSum(key string, val float64) { + for _, prefix := range p.Prefixes { + p.Client.BumpSum(prefix+key, val) + } +} + +func (p *prefixClient) BumpHistogram(key string, val float64) { + for _, prefix := range p.Prefixes { + p.Client.BumpHistogram(prefix+key, val) + } +} + +func (p *prefixClient) BumpTime(key string) interface { + End() +} { + var m multiEnder + for _, prefix := range p.Prefixes { + m = append(m, p.Client.BumpTime(prefix+key)) + } + return m +} + +// multiEnder combines many enders together. +type multiEnder []interface { + End() +} + +func (m multiEnder) End() { + for _, e := range m { + e.End() + } +} + +// HookClient is useful for testing. It provides optional hooks for each +// expected method in the interface, which if provided will be called. If a +// hook is not provided, it will be ignored. +type HookClient struct { + BumpAvgHook func(key string, val float64) + BumpSumHook func(key string, val float64) + BumpHistogramHook func(key string, val float64) + BumpTimeHook func(key string) interface { + End() + } +} + +// BumpAvg will call BumpAvgHook if defined. +func (c *HookClient) BumpAvg(key string, val float64) { + if c.BumpAvgHook != nil { + c.BumpAvgHook(key, val) + } +} + +// BumpSum will call BumpSumHook if defined. +func (c *HookClient) BumpSum(key string, val float64) { + if c.BumpSumHook != nil { + c.BumpSumHook(key, val) + } +} + +// BumpHistogram will call BumpHistogramHook if defined. +func (c *HookClient) BumpHistogram(key string, val float64) { + if c.BumpHistogramHook != nil { + c.BumpHistogramHook(key, val) + } +} + +// BumpTime will call BumpTimeHook if defined. +func (c *HookClient) BumpTime(key string) interface { + End() +} { + if c.BumpTimeHook != nil { + return c.BumpTimeHook(key) + } + return NoOpEnd +} + +type noOpEnd struct{} + +func (n noOpEnd) End() {} + +// NoOpEnd provides a dummy value for use in tests as valid return value for +// BumpTime(). +var NoOpEnd = noOpEnd{} + +// BumpAvg calls BumpAvg on the Client if it isn't nil. This is useful when a +// component has an optional stats.Client. +func BumpAvg(c Client, key string, val float64) { + if c != nil { + c.BumpAvg(key, val) + } +} + +// BumpSum calls BumpSum on the Client if it isn't nil. This is useful when a +// component has an optional stats.Client. +func BumpSum(c Client, key string, val float64) { + if c != nil { + c.BumpSum(key, val) + } +} + +// BumpHistogram calls BumpHistogram on the Client if it isn't nil. This is +// useful when a component has an optional stats.Client. +func BumpHistogram(c Client, key string, val float64) { + if c != nil { + c.BumpHistogram(key, val) + } +} + +// BumpTime calls BumpTime on the Client if it isn't nil. If the Client is nil +// it still returns a valid return value which will be a no-op. This is useful +// when a component has an optional stats.Client. +func BumpTime(c Client, key string) interface { + End() +} { + if c != nil { + return c.BumpTime(key) + } + return NoOpEnd +} diff --git a/Godeps/_workspace/src/github.com/facebookgo/stats/stats_test.go b/Godeps/_workspace/src/github.com/facebookgo/stats/stats_test.go new file mode 100644 index 000000000..b53df71b1 --- /dev/null +++ b/Godeps/_workspace/src/github.com/facebookgo/stats/stats_test.go @@ -0,0 +1,77 @@ +package stats_test + +import ( + "testing" + + "github.com/facebookgo/ensure" + "github.com/facebookgo/stats" +) + +// Ensure calling End works even when a BumpTimeHook isn't provided. +func TestHookClientBumpTime(t *testing.T) { + (&stats.HookClient{}).BumpTime("foo").End() +} + +func TestPrefixClient(t *testing.T) { + const ( + prefix1 = "prefix1" + prefix2 = "prefix2" + avgKey = "avg" + avgVal = float64(1) + sumKey = "sum" + sumVal = float64(2) + histogramKey = "histogram" + histogramVal = float64(3) + timeKey = "time" + ) + + var keys []string + hc := &stats.HookClient{ + BumpAvgHook: func(key string, val float64) { + keys = append(keys, key) + ensure.DeepEqual(t, val, avgVal) + }, + BumpSumHook: func(key string, val float64) { + keys = append(keys, key) + ensure.DeepEqual(t, val, sumVal) + }, + BumpHistogramHook: func(key string, val float64) { + keys = append(keys, key) + ensure.DeepEqual(t, val, histogramVal) + }, + BumpTimeHook: func(key string) interface { + End() + } { + return multiEnderTest{ + EndHook: func() { + keys = append(keys, key) + }, + } + }, + } + + pc := stats.PrefixClient([]string{prefix1, prefix2}, hc) + pc.BumpAvg(avgKey, avgVal) + pc.BumpSum(sumKey, sumVal) + pc.BumpHistogram(histogramKey, histogramVal) + pc.BumpTime(timeKey).End() + + ensure.SameElements(t, keys, []string{ + prefix1 + avgKey, + prefix1 + sumKey, + prefix1 + histogramKey, + prefix1 + timeKey, + prefix2 + avgKey, + prefix2 + sumKey, + prefix2 + histogramKey, + prefix2 + timeKey, + }) +} + +type multiEnderTest struct { + EndHook func() +} + +func (e multiEnderTest) End() { + e.EndHook() +} diff --git a/commands.go b/commands.go index 73f0f4912..da07fb6b5 100644 --- a/commands.go +++ b/commands.go @@ -2,9 +2,7 @@ package main import ( "os" - "os/signal" "os/user" - "syscall" "github.com/minio/cli" "github.com/minio/minio/pkg/controller" @@ -70,32 +68,13 @@ func getServerConfig(c *cli.Context) api.Config { } } -func trapServer(doneCh chan struct{}) { - // Go signal notification works by sending `os.Signal` - // values on a channel. - sigs := make(chan os.Signal, 1) - - // `signal.Notify` registers the given channel to - // receive notifications of the specified signals. - signal.Notify(sigs, syscall.SIGHUP, syscall.SIGUSR2) - - // This executes a blocking receive for signals. - // When it gets one it'll then notify the program - // that it can finish. - <-sigs - doneCh <- struct{}{} -} - func runServer(c *cli.Context) { _, err := user.Current() if err != nil { Fatalf("Unable to determine current user. Reason: %s\n", err) } - doneCh := make(chan struct{}) - go trapServer(doneCh) - apiServerConfig := getServerConfig(c) - err = server.StartServices(apiServerConfig, doneCh) + err = server.StartServices(apiServerConfig) if err != nil { Fatalln(err) } diff --git a/pkg/donut/donut-v2.go b/pkg/donut/donut-v2.go index 40da107d5..a9687a4a9 100644 --- a/pkg/donut/donut-v2.go +++ b/pkg/donut/donut-v2.go @@ -25,7 +25,6 @@ import ( "io" "io/ioutil" "log" - "reflect" "runtime/debug" "sort" "strconv" @@ -124,29 +123,10 @@ func New() (Interface, error) { return a, nil } -// updateConfig loads new config everytime -func (donut API) updateConfig() { - // on error loading config's just return do not modify - conf, err := LoadConfig() - if err != nil { - return - } - if reflect.DeepEqual(donut.config, conf) { - return - } - if conf.MaxSize == donut.config.MaxSize { - return - } - donut.config = conf - donut.objects.SetMaxSize(conf.MaxSize) -} - // GetObject - GET object from cache buffer func (donut API) GetObject(w io.Writer, bucket string, object string) (int64, error) { donut.lock.Lock() defer donut.lock.Unlock() - // update Config if possible - donut.updateConfig() if !IsValidBucket(bucket) { return 0, iodine.New(BucketNameInvalid{Bucket: bucket}, nil) @@ -193,8 +173,6 @@ func (donut API) GetObject(w io.Writer, bucket string, object string) (int64, er func (donut API) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) { donut.lock.Lock() defer donut.lock.Unlock() - // update Config if possible - donut.updateConfig() errParams := map[string]string{ "bucket": bucket, @@ -251,8 +229,6 @@ func (donut API) GetPartialObject(w io.Writer, bucket, object string, start, len func (donut API) GetBucketMetadata(bucket string) (BucketMetadata, error) { donut.lock.Lock() defer donut.lock.Unlock() - // update Config if possible - donut.updateConfig() if !IsValidBucket(bucket) { return BucketMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil) @@ -276,8 +252,6 @@ func (donut API) GetBucketMetadata(bucket string) (BucketMetadata, error) { func (donut API) SetBucketMetadata(bucket string, metadata map[string]string) error { donut.lock.Lock() defer donut.lock.Unlock() - // update Config if possible - donut.updateConfig() if !IsValidBucket(bucket) { return iodine.New(BucketNameInvalid{Bucket: bucket}, nil) @@ -319,8 +293,6 @@ func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error { func (donut API) CreateObject(bucket, key, expectedMD5Sum string, size int64, data io.Reader, metadata map[string]string) (ObjectMetadata, error) { donut.lock.Lock() defer donut.lock.Unlock() - // update Config if possible - donut.updateConfig() contentType := metadata["contentType"] objectMetadata, err := donut.createObject(bucket, key, contentType, expectedMD5Sum, size, data) @@ -434,8 +406,6 @@ func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, s func (donut API) MakeBucket(bucketName, acl string) error { donut.lock.Lock() defer donut.lock.Unlock() - // update Config if possible - donut.updateConfig() if donut.storedBuckets.Stats().Items == totalBuckets { return iodine.New(TooManyBuckets{Bucket: bucketName}, nil) @@ -475,8 +445,6 @@ func (donut API) MakeBucket(bucketName, acl string) error { func (donut API) ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error) { donut.lock.Lock() defer donut.lock.Unlock() - // update Config if possible - donut.updateConfig() if !IsValidBucket(bucket) { return nil, BucketResourcesMetadata{IsTruncated: false}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil) @@ -571,8 +539,6 @@ func (b byBucketName) Less(i, j int) bool { return b[i].Name < b[j].Name } func (donut API) ListBuckets() ([]BucketMetadata, error) { donut.lock.Lock() defer donut.lock.Unlock() - // update Config if possible - donut.updateConfig() var results []BucketMetadata if len(donut.config.NodeDiskMap) > 0 { @@ -597,8 +563,6 @@ func (donut API) ListBuckets() ([]BucketMetadata, error) { func (donut API) GetObjectMetadata(bucket, key string) (ObjectMetadata, error) { donut.lock.Lock() defer donut.lock.Unlock() - // update Config if possible - donut.updateConfig() // check if bucket exists if !IsValidBucket(bucket) { diff --git a/pkg/donut/donut-v2_test.go b/pkg/donut/donut-v2_test.go index 64a853374..4bf7a40f3 100644 --- a/pkg/donut/donut-v2_test.go +++ b/pkg/donut/donut-v2_test.go @@ -45,7 +45,6 @@ func (s *MyCacheSuite) SetUpSuite(c *C) { s.root = root customConfigPath = filepath.Join(root, "donut.json") - var err error dc, err = New() c.Assert(err, IsNil) @@ -55,7 +54,7 @@ func (s *MyCacheSuite) SetUpSuite(c *C) { c.Assert(len(buckets), Equals, 0) } -func (s *MyDonutSuite) TearDownSuite(c *C) { +func (s *MyCacheSuite) TearDownSuite(c *C) { os.RemoveAll(s.root) } diff --git a/pkg/donut/multipart.go b/pkg/donut/multipart.go index cbe292f84..78269ef03 100644 --- a/pkg/donut/multipart.go +++ b/pkg/donut/multipart.go @@ -38,8 +38,6 @@ import ( func (donut API) NewMultipartUpload(bucket, key, contentType string) (string, error) { donut.lock.Lock() defer donut.lock.Unlock() - // update Config if possible - donut.updateConfig() if !IsValidBucket(bucket) { return "", iodine.New(BucketNameInvalid{Bucket: bucket}, nil) @@ -72,8 +70,6 @@ func (donut API) NewMultipartUpload(bucket, key, contentType string) (string, er func (donut API) AbortMultipartUpload(bucket, key, uploadID string) error { donut.lock.Lock() defer donut.lock.Unlock() - // update Config if possible - donut.updateConfig() if !IsValidBucket(bucket) { return iodine.New(BucketNameInvalid{Bucket: bucket}, nil) @@ -93,8 +89,6 @@ func (donut API) AbortMultipartUpload(bucket, key, uploadID string) error { func (donut API) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { donut.lock.Lock() defer donut.lock.Unlock() - // update Config if possible - donut.updateConfig() etag, err := donut.createObjectPart(bucket, key, uploadID, partID, "", expectedMD5Sum, size, data) // possible free @@ -201,8 +195,6 @@ func (donut API) cleanupMultipartSession(bucket, key, uploadID string) { // CompleteMultipartUpload - complete a multipart upload and persist the data func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (ObjectMetadata, error) { donut.lock.Lock() - // update Config if possible - donut.updateConfig() if !IsValidBucket(bucket) { donut.lock.Unlock() @@ -282,8 +274,6 @@ func (donut API) ListMultipartUploads(bucket string, resources BucketMultipartRe // TODO handle delimiter donut.lock.Lock() defer donut.lock.Unlock() - // update Config if possible - donut.updateConfig() if !donut.storedBuckets.Exists(bucket) { return BucketMultipartResourcesMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil) @@ -347,8 +337,6 @@ func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMe // Verify upload id donut.lock.Lock() defer donut.lock.Unlock() - // update Config if possible - donut.updateConfig() if !donut.storedBuckets.Exists(bucket) { return ObjectResourcesMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil) diff --git a/pkg/server/nimble/LICENSE.Facebook b/pkg/server/nimble/LICENSE.Facebook new file mode 100644 index 000000000..3aea87532 --- /dev/null +++ b/pkg/server/nimble/LICENSE.Facebook @@ -0,0 +1,30 @@ +BSD License + +For grace software + +Copyright (c) 2015, Facebook, Inc. All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + * Neither the name Facebook nor the names of its contributors may be used to + endorse or promote products derived from this software without specific + prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/pkg/server/nimble/LICENSE.Minio b/pkg/server/nimble/LICENSE.Minio new file mode 100644 index 000000000..d64569567 --- /dev/null +++ b/pkg/server/nimble/LICENSE.Minio @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/pkg/server/nimble/http.go b/pkg/server/nimble/http.go index 2b897f872..d1b72f030 100644 --- a/pkg/server/nimble/http.go +++ b/pkg/server/nimble/http.go @@ -1,6 +1,8 @@ -// Based on https://github.com/facebookgo/grace and https://github.com/facebookgo/httpdown +// Package nimble provides easy to use graceful restart for a set of HTTP services // -// Modified for Minio's internal use +// This package originally from https://github.com/facebookgo/grace +// +// Re-licensing with Apache License 2.0, with code modifications package nimble import ( @@ -16,6 +18,11 @@ import ( "github.com/minio/minio/pkg/iodine" ) +var ( + inheritedListeners = os.Getenv("LISTEN_FDS") + ppid = os.Getppid() +) + // An app contains one or more servers and associated configuration. type app struct { servers []*http.Server @@ -25,7 +32,7 @@ type app struct { errors chan error } -func newApp(servers ...*http.Server) *app { +func newApp(servers []*http.Server) *app { return &app{ servers: servers, net: &nimbleNet{}, @@ -84,7 +91,7 @@ func (a *app) term(wg *sync.WaitGroup) { func (a *app) signalHandler(wg *sync.WaitGroup) { ch := make(chan os.Signal, 10) - signal.Notify(ch, syscall.SIGTERM, os.Interrupt) + signal.Notify(ch, syscall.SIGTERM, syscall.SIGUSR2, syscall.SIGHUP) for { sig := <-ch switch sig { @@ -94,7 +101,9 @@ func (a *app) signalHandler(wg *sync.WaitGroup) { signal.Stop(ch) a.term(wg) return - case os.Interrupt: + case syscall.SIGUSR2: + fallthrough + case syscall.SIGHUP: // we only return here if there's an error, otherwise the new process // will send us a TERM when it's ready to trigger the actual shutdown. if _, err := a.net.StartProcess(); err != nil { @@ -105,9 +114,9 @@ func (a *app) signalHandler(wg *sync.WaitGroup) { } // ListenAndServe will serve the given http.Servers and will monitor for signals -// allowing for graceful termination (SIGTERM) or restart (SIGHUP). +// allowing for graceful termination (SIGTERM) or restart (SIGUSR2/SIGHUP). func ListenAndServe(servers ...*http.Server) error { - a := newApp(servers...) + a := newApp(servers) // Acquire Listeners if err := a.listen(); err != nil { @@ -117,13 +126,17 @@ func ListenAndServe(servers ...*http.Server) error { // Start serving. a.serve() - waitDone := make(chan struct{}) + // Close the parent if we inherited and it wasn't init that started us. + if inheritedListeners != "" && ppid != 1 { + if err := syscall.Kill(ppid, syscall.SIGTERM); err != nil { + return iodine.New(err, nil) + } + } + + waitdone := make(chan struct{}) go func() { - defer close(waitDone) + defer close(waitdone) a.wait() - // do not use closing a channel as a way of communicating over - // channel send an appropriate message - waitDone <- struct{}{} }() select { @@ -132,7 +145,7 @@ func ListenAndServe(servers ...*http.Server) error { panic("unexpected nil error") } return iodine.New(err, nil) - case <-waitDone: + case <-waitdone: return nil } } diff --git a/pkg/server/nimble/net.go b/pkg/server/nimble/net.go index bc5421916..b360b85b0 100644 --- a/pkg/server/nimble/net.go +++ b/pkg/server/nimble/net.go @@ -1,11 +1,3 @@ -// Package gracenet provides a family of Listen functions that either open a -// fresh connection or provide an inherited connection from when the process -// was started. The behave like their counterparts in the net pacakge, but -// transparently provide support for graceful restarts without dropping -// connections. This is provided in a systemd socket activation compatible form -// to allow using socket activation. -// -// BUG: Doesn't handle closing of listeners. package nimble import ( @@ -20,9 +12,21 @@ import ( "github.com/minio/minio/pkg/iodine" ) +// This package originally from https://github.com/facebookgo/grace +// +// Re-licensing with Apache License 2.0, with code modifications + +// This package provides a family of Listen functions that either open a +// fresh connection or provide an inherited connection from when the process +// was started. This behaves like their counterparts in the net pacakge, but +// transparently provide support for graceful restarts without dropping +// connections. This is provided in a systemd socket activation compatible form +// to allow using socket activation. +// + const ( // Used to indicate a graceful restart in the new process. - envCountKey = "LISTEN_FDS" + envCountKey = "LISTEN_FDS" // similar to systemd SDS_LISTEN_FDS envCountKeyPrefix = envCountKey + "=" ) @@ -30,8 +34,8 @@ const ( // it at startup. var originalWD, _ = os.Getwd() -// Net provides the family of Listen functions and maintains the associated -// state. Typically you will have only once instance of Net per application. +// nimbleNet provides the family of Listen functions and maintains the associated +// state. Typically you will have only once instance of nimbleNet per application. type nimbleNet struct { inherited []net.Listener active []net.Listener @@ -39,11 +43,6 @@ type nimbleNet struct { inheritOnce sync.Once } -// fileListener using this to extract underlying file descriptor from net.Listener -type fileListener interface { - File() (*os.File, error) -} - func (n *nimbleNet) inherit() error { var retErr error n.inheritOnce.Do(func() { @@ -59,7 +58,6 @@ func (n *nimbleNet) inherit() error { return } - // In normal operations if we are inheriting, the listeners will begin at fd 3. fdStart := 3 for i := fdStart; i < fdStart+count; i++ { file := os.NewFile(uintptr(i), "listener") @@ -139,7 +137,7 @@ func (n *nimbleNet) ListenTCP(nett string, laddr *net.TCPAddr) (*net.TCPListener // the matching network and address, or creates a new one using net.ListenUnix. func (n *nimbleNet) ListenUnix(nett string, laddr *net.UnixAddr) (*net.UnixListener, error) { if err := n.inherit(); err != nil { - return nil, err + return nil, iodine.New(err, nil) } n.mutex.Lock() @@ -170,28 +168,30 @@ func (n *nimbleNet) ListenUnix(nett string, laddr *net.UnixAddr) (*net.UnixListe func (n *nimbleNet) activeListeners() ([]net.Listener, error) { n.mutex.Lock() defer n.mutex.Unlock() - listeners := make([]net.Listener, len(n.active)) - copy(listeners, n.active) - return listeners, nil + ls := make([]net.Listener, len(n.active)) + copy(ls, n.active) + return ls, nil } func isSameAddr(a1, a2 net.Addr) bool { if a1.Network() != a2.Network() { return false } - if a1.String() == a2.String() { + a1s := a1.String() + a2s := a2.String() + if a1s == a2s { return true } + // This allows for ipv6 vs ipv4 local addresses to compare as equal. This // scenario is common when listening on localhost. - a1host, a1port, _ := net.SplitHostPort(a1.String()) - a2host, a2port, _ := net.SplitHostPort(a2.String()) - if a1host == a2host { - if a1port == a2port { - return true - } - } - return false + const ipv6prefix = "[::]" + a1s = strings.TrimPrefix(a1s, ipv6prefix) + a2s = strings.TrimPrefix(a2s, ipv6prefix) + const ipv4prefix = "0.0.0.0" + a1s = strings.TrimPrefix(a1s, ipv4prefix) + a2s = strings.TrimPrefix(a2s, ipv4prefix) + return a1s == a2s } // StartProcess starts a new process passing it the active listeners. It @@ -231,18 +231,18 @@ func (n *nimbleNet) StartProcess() (int, error) { } env = append(env, fmt.Sprintf("%s%d", envCountKeyPrefix, len(listeners))) - inheritedFiles := append([]*os.File{os.Stdin, os.Stdout, os.Stderr}, files...) - process, err := os.StartProcess( - argv0, - os.Args, - &os.ProcAttr{ - Dir: originalWD, - Env: env, - Files: inheritedFiles, - }, - ) + allFiles := append([]*os.File{os.Stdin, os.Stdout, os.Stderr}, files...) + process, err := os.StartProcess(argv0, os.Args, &os.ProcAttr{ + Dir: originalWD, + Env: env, + Files: allFiles, + }) if err != nil { return 0, iodine.New(err, nil) } return process.Pid, nil } + +type fileListener interface { + File() (*os.File, error) +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 0d068f3b9..692129fe4 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -21,6 +21,7 @@ import ( "fmt" "net" "net/http" + "os" "strings" "github.com/minio/minio/pkg/iodine" @@ -28,12 +29,6 @@ import ( "github.com/minio/minio/pkg/server/nimble" ) -// startServices start all services -func startServices(errCh chan error, servers ...*http.Server) { - defer close(errCh) - errCh <- nimble.ListenAndServe(servers...) -} - // getAPI server instance func getAPIServer(conf api.Config, apiHandler http.Handler) (*http.Server, error) { // Minio server config @@ -83,9 +78,9 @@ func getAPIServer(conf api.Config, apiHandler http.Handler) (*http.Server, error for _, host := range hosts { if conf.TLS { - fmt.Printf("Starting minio server on: https://%s:%s\n", host, port) + fmt.Printf("Starting minio server on: https://%s:%s, PID: %d\n", host, port, os.Getpid()) } else { - fmt.Printf("Starting minio server on: http://%s:%s\n", host, port) + fmt.Printf("Starting minio server on: http://%s:%s, PID: %d\n", host, port, os.Getpid()) } } @@ -113,21 +108,18 @@ func startTM(a api.Minio) { } // StartServices starts basic services for a server -func StartServices(conf api.Config, doneCh chan struct{}) error { - errCh := make(chan error) +func StartServices(conf api.Config) error { apiHandler, minioAPI := getAPIHandler(conf) apiServer, err := getAPIServer(conf, apiHandler) if err != nil { return iodine.New(err, nil) } rpcServer := getRPCServer(getRPCHandler()) - go startServices(errCh, apiServer, rpcServer) + // start ticket master go startTM(minioAPI) - select { - case err := <-errCh: + if err := nimble.ListenAndServe(apiServer, rpcServer); err != nil { return iodine.New(err, nil) - case <-doneCh: - return nil } + return nil }