diff --git a/cmd/tree-walk-pool.go b/cmd/tree-walk-pool.go index 54e568304..3eda45a02 100644 --- a/cmd/tree-walk-pool.go +++ b/cmd/tree-walk-pool.go @@ -58,7 +58,7 @@ type treeWalk struct { // treeWalkPool's purpose is to maintain active treeWalk go-routines in a map so that // it can be looked up across related list calls. type TreeWalkPool struct { - sync.Mutex + mu sync.Mutex pool map[listParams][]treeWalk timeOut time.Duration } @@ -77,8 +77,8 @@ func NewTreeWalkPool(timeout time.Duration) *TreeWalkPool { // channel. // Returns nil if listParams does not have an associated treeWalk. func (t *TreeWalkPool) Release(params listParams) (resultCh chan TreeWalkResult, endWalkCh chan struct{}) { - t.Lock() - defer t.Unlock() + t.mu.Lock() + defer t.mu.Unlock() walks, ok := t.pool[params] // Pick the valid walks. if ok { if len(walks) > 0 { @@ -108,9 +108,8 @@ func (t *TreeWalkPool) Release(params listParams) (resultCh chan TreeWalkResult, // During listing the timer should not timeout and end the treeWalk go-routine, hence the // timer go-routine should be ended. func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endWalkCh chan struct{}) { - t.Lock() - defer t.Unlock() - + t.mu.Lock() + defer t.mu.Unlock() // If we are above the limit delete at least one entry from the pool. if len(t.pool) > treeWalkEntryLimit { age := time.Now() @@ -121,21 +120,31 @@ func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endW continue } // The first element is the oldest, so we only check that. - if v[0].added.Before(age) { + e := v[0] + if e.added.Before(age) { oldest = k + age = e.added } } // Invalidate and delete oldest. - if walks, ok := t.pool[oldest]; ok { - walk := walks[0] - walks[0] = treeWalk{} // clear references. - walks = walks[1:] - if len(walks) > 0 { - t.pool[params] = walks + if walks, ok := t.pool[oldest]; ok && len(walks) > 0 { + endCh := walks[0].endTimerCh + if len(walks) > 1 { + // Move walks forward + copy(walks, walks[1:]) + walks = walks[:len(walks)-1] + t.pool[oldest] = walks } else { - delete(t.pool, params) + // Only entry, just delete. + delete(t.pool, oldest) } - walk.endTimerCh <- struct{}{} + select { + case endCh <- struct{}{}: + default: + } + } else { + // Shouldn't happen, but just in case. + delete(t.pool, oldest) } } @@ -153,7 +162,10 @@ func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endW t.pool[params] = append(walks, walkInfo) } else { // We are at limit, invalidate oldest, move list down and add new as last. - walks[0].endTimerCh <- struct{}{} + select { + case walks[0].endTimerCh <- struct{}{}: + default: + } copy(walks, walks[1:]) walks[len(walks)-1] = walkInfo } @@ -165,7 +177,8 @@ func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endW case <-time.After(t.timeOut): // Timeout has expired. Remove the treeWalk from treeWalkPool and // end the treeWalk go-routine. - t.Lock() + t.mu.Lock() + defer t.mu.Unlock() walks, ok := t.pool[params] if ok { // Trick of filtering without allocating @@ -189,7 +202,6 @@ func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endW } // Signal the treeWalk go-routine to die. close(endWalkCh) - t.Unlock() case <-endTimerCh: return } diff --git a/cmd/tree-walk-pool_test.go b/cmd/tree-walk-pool_test.go index 313383d1a..8d30861d1 100644 --- a/cmd/tree-walk-pool_test.go +++ b/cmd/tree-walk-pool_test.go @@ -80,22 +80,22 @@ func TestManyWalksSameParam(t *testing.T) { tw.Set(params, resultCh, endWalkCh) } - tw.Lock() + tw.mu.Lock() if walks, ok := tw.pool[params]; ok { if len(walks) != treeWalkSameEntryLimit { t.Error("There aren't as many walks as were Set") } } - tw.Unlock() + tw.mu.Unlock() for i := 0; i < treeWalkSameEntryLimit; i++ { - tw.Lock() + tw.mu.Lock() if walks, ok := tw.pool[params]; ok { // Before ith Release we should have n-i treeWalk go-routines. if treeWalkSameEntryLimit-i != len(walks) { t.Error("There aren't as many walks as were Set") } } - tw.Unlock() + tw.mu.Unlock() tw.Release(params) } } @@ -125,22 +125,22 @@ func TestManyWalksSameParamPrune(t *testing.T) { tw.Set(params, resultCh, endWalkCh) } - tw.Lock() + tw.mu.Lock() if walks, ok := tw.pool[params]; ok { if len(walks) != treeWalkSameEntryLimit { t.Error("There aren't as many walks as were Set") } } - tw.Unlock() + tw.mu.Unlock() for i := 0; i < treeWalkSameEntryLimit; i++ { - tw.Lock() + tw.mu.Lock() if walks, ok := tw.pool[params]; ok { // Before ith Release we should have n-i treeWalk go-routines. if treeWalkSameEntryLimit-i != len(walks) { t.Error("There aren't as many walks as were Set") } } - tw.Unlock() + tw.mu.Unlock() tw.Release(params) } }