diff --git a/cmd/merge-walk-pool.go b/cmd/merge-walk-pool.go index a68dc2320..624409ee7 100644 --- a/cmd/merge-walk-pool.go +++ b/cmd/merge-walk-pool.go @@ -1,5 +1,5 @@ /* - * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * MinIO Cloud Storage, (C) 2019, 2020 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ const ( // mergeWalkVersions - represents the go routine that does the merge walk versions. type mergeWalkVersions struct { + added time.Time entryChs []FileInfoVersionsCh endWalkCh chan struct{} // To signal when mergeWalk go-routine should end. endTimerCh chan<- struct{} // To signal when timer go-routine should end. @@ -88,17 +89,70 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi t.Lock() defer t.Unlock() + // If we are above the limit delete at least one entry from the pool. + if len(t.pool) > treeWalkEntryLimit { + age := time.Now() + var oldest listParams + for k, v := range t.pool { + if len(v) == 0 { + delete(t.pool, k) + continue + } + // The first element is the oldest, so we only check that. + e := v[0] + if e.added.Before(age) { + oldest = k + age = e.added + } + } + // Invalidate and delete oldest. + if walks, ok := t.pool[oldest]; ok && len(walks) > 0 { + endCh := walks[0].endTimerCh + endWalkCh := walks[0].endWalkCh + if len(walks) > 1 { + // Move walks forward + copy(walks, walks[1:]) + walks = walks[:len(walks)-1] + t.pool[oldest] = walks + } else { + // Only entry, just delete. + delete(t.pool, oldest) + } + select { + case endCh <- struct{}{}: + close(endWalkCh) + default: + } + } else { + // Shouldn't happen, but just in case. + delete(t.pool, oldest) + } + } + // Should be a buffered channel so that Release() never blocks. endTimerCh := make(chan struct{}, 1) walkInfo := mergeWalkVersions{ + added: UTCNow(), entryChs: resultChs, endWalkCh: endWalkCh, endTimerCh: endTimerCh, } // Append new walk info. - t.pool[params] = append(t.pool[params], walkInfo) + walks := t.pool[params] + if len(walks) < treeWalkSameEntryLimit { + t.pool[params] = append(walks, walkInfo) + } else { + // We are at limit, invalidate oldest, move list down and add new as last. + select { + case walks[0].endTimerCh <- struct{}{}: + close(walks[0].endWalkCh) + default: + } + copy(walks, walks[1:]) + walks[len(walks)-1] = walkInfo + } // Timer go-routine which times out after t.timeOut seconds. go func(endTimerCh <-chan struct{}, walkInfo mergeWalkVersions) { @@ -108,6 +162,7 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi // Timeout has expired. Remove the mergeWalk from mergeWalkPool and // end the mergeWalk go-routine. t.Lock() + defer t.Unlock() walks, ok := t.pool[params] if ok { // Trick of filtering without allocating @@ -131,7 +186,6 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi } // Signal the mergeWalk go-routine to die. close(endWalkCh) - t.Unlock() case <-endTimerCh: return } @@ -166,23 +220,22 @@ func (t *MergeWalkPool) Release(params listParams) ([]FileInfoCh, chan struct{}) t.Lock() defer t.Unlock() walks, ok := t.pool[params] // Pick the valid walks. - if ok { - if len(walks) > 0 { - // Pop out the first valid walk entry. - walk := walks[0] - walks[0] = mergeWalk{} // clear references. - walks = walks[1:] - if len(walks) > 0 { - t.pool[params] = walks - } else { - delete(t.pool, params) - } - walk.endTimerCh <- struct{}{} - return walk.entryChs, walk.endWalkCh - } + if !ok || len(walks) == 0 { + // Release return nil if params not found. + return nil, nil + } + + // Pop out the first valid walk entry. + walk := walks[0] + walks[0] = mergeWalk{} // clear references. + walks = walks[1:] + if len(walks) > 0 { + t.pool[params] = walks + } else { + delete(t.pool, params) } - // Release return nil if params not found. - return nil, nil + walk.endTimerCh <- struct{}{} + return walk.entryChs, walk.endWalkCh } // Set - adds a mergeWalk to the mergeWalkPool. @@ -207,27 +260,38 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh continue } // The first element is the oldest, so we only check that. - if v[0].added.Before(age) { + e := v[0] + if e.added.Before(age) { oldest = k + age = e.added } } // Invalidate and delete oldest. - if walks, ok := t.pool[oldest]; ok { - walk := walks[0] - walks[0] = mergeWalk{} // clear references. - walks = walks[1:] - if len(walks) > 0 { - t.pool[params] = walks + if walks, ok := t.pool[oldest]; ok && len(walks) > 0 { + endCh := walks[0].endTimerCh + endWalkCh := walks[0].endWalkCh + if len(walks) > 1 { + // Move walks forward + copy(walks, walks[1:]) + walks = walks[:len(walks)-1] + t.pool[oldest] = walks } else { - delete(t.pool, params) + // Only entry, just delete. + delete(t.pool, oldest) + } + select { + case endCh <- struct{}{}: + close(endWalkCh) + default: } - walk.endTimerCh <- struct{}{} + } else { + // Shouldn't happen, but just in case. + delete(t.pool, oldest) } } // Should be a buffered channel so that Release() never blocks. endTimerCh := make(chan struct{}, 1) - walkInfo := mergeWalk{ added: UTCNow(), entryChs: resultChs, @@ -241,7 +305,11 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh t.pool[params] = append(walks, walkInfo) } else { // We are at limit, invalidate oldest, move list down and add new as last. - walks[0].endTimerCh <- struct{}{} + select { + case walks[0].endTimerCh <- struct{}{}: + close(walks[0].endWalkCh) + default: + } copy(walks, walks[1:]) walks[len(walks)-1] = walkInfo } @@ -254,6 +322,7 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh // Timeout has expired. Remove the mergeWalk from mergeWalkPool and // end the mergeWalk go-routine. t.Lock() + defer t.Unlock() walks, ok := t.pool[params] if ok { // Trick of filtering without allocating @@ -277,7 +346,6 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh } // Signal the mergeWalk go-routine to die. close(endWalkCh) - t.Unlock() case <-endTimerCh: return } diff --git a/cmd/merge-walk-pool_test.go b/cmd/merge-walk-pool_test.go index ee127162c..7718b6ab9 100644 --- a/cmd/merge-walk-pool_test.go +++ b/cmd/merge-walk-pool_test.go @@ -1,5 +1,5 @@ /* - * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * MinIO Cloud Storage, (C) 2019,2020 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,42 @@ import ( "time" ) +// Test if tree walker go-routine is removed from the pool after timeout +// and that is available in the pool before the timeout. +func TestMergeWalkPoolVersionsBasic(t *testing.T) { + // Create a treeWalkPool + tw := NewMergeWalkVersionsPool(1 * time.Second) + + // Create sample params + params := listParams{ + bucket: "test-bucket", + } + + endWalkCh := make(chan struct{}) + // Add a treeWalk to the pool + tw.Set(params, []FileInfoVersionsCh{}, endWalkCh) + + // Wait for treeWalkPool timeout to happen + <-time.After(2 * time.Second) + if c1, _ := tw.Release(params); c1 != nil { + t.Error("treeWalk go-routine must have been freed") + } + + // Add the treeWalk back to the pool + endWalkCh = make(chan struct{}) + tw.Set(params, []FileInfoVersionsCh{}, endWalkCh) + + // Release the treeWalk before timeout + select { + case <-time.After(1 * time.Second): + break + default: + if c1, _ := tw.Release(params); c1 == nil { + t.Error("treeWalk go-routine got freed before timeout") + } + } +} + // Test if tree walker go-routine is removed from the pool after timeout // and that is available in the pool before the timeout. func TestMergeWalkPoolBasic(t *testing.T) { diff --git a/cmd/tree-walk-pool.go b/cmd/tree-walk-pool.go index 3eda45a02..a43086bfc 100644 --- a/cmd/tree-walk-pool.go +++ b/cmd/tree-walk-pool.go @@ -1,5 +1,5 @@ /* - * MinIO Cloud Storage, (C) 2016 MinIO, Inc. + * MinIO Cloud Storage, (C) 2016-2020 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -80,23 +80,21 @@ func (t *TreeWalkPool) Release(params listParams) (resultCh chan TreeWalkResult, t.mu.Lock() defer t.mu.Unlock() walks, ok := t.pool[params] // Pick the valid walks. - if ok { - if len(walks) > 0 { - // Pop out the first valid walk entry. - walk := walks[0] - walks[0] = treeWalk{} // clear references. - walks = walks[1:] - if len(walks) > 0 { - t.pool[params] = walks - } else { - delete(t.pool, params) - } - walk.endTimerCh <- struct{}{} - return walk.resultCh, walk.endWalkCh - } + if !ok || len(walks) == 0 { + // Release return nil if params not found. + return nil, nil } - // Release return nil if params not found. - return nil, nil + + // Pop out the first valid walk entry. + walk := walks[0] + walks = walks[1:] + if len(walks) > 0 { + t.pool[params] = walks + } else { + delete(t.pool, params) + } + walk.endTimerCh <- struct{}{} + return walk.resultCh, walk.endWalkCh } // Set - adds a treeWalk to the treeWalkPool. @@ -129,6 +127,7 @@ func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endW // Invalidate and delete oldest. if walks, ok := t.pool[oldest]; ok && len(walks) > 0 { endCh := walks[0].endTimerCh + endWalkCh := walks[0].endWalkCh if len(walks) > 1 { // Move walks forward copy(walks, walks[1:]) @@ -140,6 +139,7 @@ func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endW } select { case endCh <- struct{}{}: + close(endWalkCh) default: } } else { @@ -156,6 +156,7 @@ func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endW endWalkCh: endWalkCh, endTimerCh: endTimerCh, } + // Append new walk info. walks := t.pool[params] if len(walks) < treeWalkSameEntryLimit { @@ -164,6 +165,7 @@ func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endW // We are at limit, invalidate oldest, move list down and add new as last. select { case walks[0].endTimerCh <- struct{}{}: + close(walks[0].endWalkCh) default: } copy(walks, walks[1:]) @@ -171,7 +173,7 @@ func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endW } // Timer go-routine which times out after t.timeOut seconds. - go func(endTimerCh <-chan struct{}) { + go func(endTimerCh <-chan struct{}, walkInfo treeWalk) { select { // Wait until timeOut case <-time.After(t.timeOut): @@ -205,5 +207,5 @@ func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endW case <-endTimerCh: return } - }(endTimerCh) + }(endTimerCh, walkInfo) }