From c850905e431610fb0b1aba0490a51d4b5342ad1e Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Thu, 9 Jul 2020 07:02:18 -0700 Subject: [PATCH] fix: threadwalk lockup under high load (#9992) Main issue is that `t.pool[params]` should be `t.pool[oldest]`. We add a bit more safety features for the code. * Make writes to the endTimerCh non-blocking in all cases so multiple releases cannot lock up. * Double check expectations. * Shift down deletes with copy instead of truncating slice. * Actually delete the oldest if we are above total limit. * Actually delete the oldest found and not the current. * Unexport the mutex so nobody from the outside can meddle with it. --- cmd/tree-walk-pool.go | 48 ++++++++++++++++++++++++-------------- cmd/tree-walk-pool_test.go | 16 ++++++------- 2 files changed, 38 insertions(+), 26 deletions(-) diff --git a/cmd/tree-walk-pool.go b/cmd/tree-walk-pool.go index 54e568304..3eda45a02 100644 --- a/cmd/tree-walk-pool.go +++ b/cmd/tree-walk-pool.go @@ -58,7 +58,7 @@ type treeWalk struct { // treeWalkPool's purpose is to maintain active treeWalk go-routines in a map so that // it can be looked up across related list calls. type TreeWalkPool struct { - sync.Mutex + mu sync.Mutex pool map[listParams][]treeWalk timeOut time.Duration } @@ -77,8 +77,8 @@ func NewTreeWalkPool(timeout time.Duration) *TreeWalkPool { // channel. // Returns nil if listParams does not have an associated treeWalk. func (t *TreeWalkPool) Release(params listParams) (resultCh chan TreeWalkResult, endWalkCh chan struct{}) { - t.Lock() - defer t.Unlock() + t.mu.Lock() + defer t.mu.Unlock() walks, ok := t.pool[params] // Pick the valid walks. if ok { if len(walks) > 0 { @@ -108,9 +108,8 @@ func (t *TreeWalkPool) Release(params listParams) (resultCh chan TreeWalkResult, // During listing the timer should not timeout and end the treeWalk go-routine, hence the // timer go-routine should be ended. func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endWalkCh chan struct{}) { - t.Lock() - defer t.Unlock() - + t.mu.Lock() + defer t.mu.Unlock() // If we are above the limit delete at least one entry from the pool. if len(t.pool) > treeWalkEntryLimit { age := time.Now() @@ -121,21 +120,31 @@ func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endW continue } // The first element is the oldest, so we only check that. - if v[0].added.Before(age) { + e := v[0] + if e.added.Before(age) { oldest = k + age = e.added } } // Invalidate and delete oldest. - if walks, ok := t.pool[oldest]; ok { - walk := walks[0] - walks[0] = treeWalk{} // clear references. - walks = walks[1:] - if len(walks) > 0 { - t.pool[params] = walks + if walks, ok := t.pool[oldest]; ok && len(walks) > 0 { + endCh := walks[0].endTimerCh + if len(walks) > 1 { + // Move walks forward + copy(walks, walks[1:]) + walks = walks[:len(walks)-1] + t.pool[oldest] = walks } else { - delete(t.pool, params) + // Only entry, just delete. + delete(t.pool, oldest) } - walk.endTimerCh <- struct{}{} + select { + case endCh <- struct{}{}: + default: + } + } else { + // Shouldn't happen, but just in case. + delete(t.pool, oldest) } } @@ -153,7 +162,10 @@ func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endW t.pool[params] = append(walks, walkInfo) } else { // We are at limit, invalidate oldest, move list down and add new as last. - walks[0].endTimerCh <- struct{}{} + select { + case walks[0].endTimerCh <- struct{}{}: + default: + } copy(walks, walks[1:]) walks[len(walks)-1] = walkInfo } @@ -165,7 +177,8 @@ func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endW case <-time.After(t.timeOut): // Timeout has expired. Remove the treeWalk from treeWalkPool and // end the treeWalk go-routine. - t.Lock() + t.mu.Lock() + defer t.mu.Unlock() walks, ok := t.pool[params] if ok { // Trick of filtering without allocating @@ -189,7 +202,6 @@ func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endW } // Signal the treeWalk go-routine to die. close(endWalkCh) - t.Unlock() case <-endTimerCh: return } diff --git a/cmd/tree-walk-pool_test.go b/cmd/tree-walk-pool_test.go index 313383d1a..8d30861d1 100644 --- a/cmd/tree-walk-pool_test.go +++ b/cmd/tree-walk-pool_test.go @@ -80,22 +80,22 @@ func TestManyWalksSameParam(t *testing.T) { tw.Set(params, resultCh, endWalkCh) } - tw.Lock() + tw.mu.Lock() if walks, ok := tw.pool[params]; ok { if len(walks) != treeWalkSameEntryLimit { t.Error("There aren't as many walks as were Set") } } - tw.Unlock() + tw.mu.Unlock() for i := 0; i < treeWalkSameEntryLimit; i++ { - tw.Lock() + tw.mu.Lock() if walks, ok := tw.pool[params]; ok { // Before ith Release we should have n-i treeWalk go-routines. if treeWalkSameEntryLimit-i != len(walks) { t.Error("There aren't as many walks as were Set") } } - tw.Unlock() + tw.mu.Unlock() tw.Release(params) } } @@ -125,22 +125,22 @@ func TestManyWalksSameParamPrune(t *testing.T) { tw.Set(params, resultCh, endWalkCh) } - tw.Lock() + tw.mu.Lock() if walks, ok := tw.pool[params]; ok { if len(walks) != treeWalkSameEntryLimit { t.Error("There aren't as many walks as were Set") } } - tw.Unlock() + tw.mu.Unlock() for i := 0; i < treeWalkSameEntryLimit; i++ { - tw.Lock() + tw.mu.Lock() if walks, ok := tw.pool[params]; ok { // Before ith Release we should have n-i treeWalk go-routines. if treeWalkSameEntryLimit-i != len(walks) { t.Error("There aren't as many walks as were Set") } } - tw.Unlock() + tw.mu.Unlock() tw.Release(params) } }