From 0c4be55936858db7dd8c27d4c5967a7664e5eb3c Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 20 Jul 2020 17:28:26 -0700 Subject: [PATCH] fix: fix lockup in merge-walk pool (#10098) Fixes two different types of problems - continuation of the problem seen in FS #9992 as not fixed for erasure coded deployments, reproduced this issue with spark and its fixed now - another issue was leaking walk go-routines which would lead to high memory usage and crash the system this is simply because all the walks which were purged at the top limit had leaking end walkers which would consume memory endlessly. closes #9966 closes #10088 --- cmd/merge-walk-pool.go | 130 +++++++++++++++++++++++++++--------- cmd/merge-walk-pool_test.go | 38 ++++++++++- cmd/tree-walk-pool.go | 40 +++++------ 3 files changed, 157 insertions(+), 51 deletions(-) diff --git a/cmd/merge-walk-pool.go b/cmd/merge-walk-pool.go index a68dc2320..624409ee7 100644 --- a/cmd/merge-walk-pool.go +++ b/cmd/merge-walk-pool.go @@ -1,5 +1,5 @@ /* - * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * MinIO Cloud Storage, (C) 2019, 2020 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ const ( // mergeWalkVersions - represents the go routine that does the merge walk versions. type mergeWalkVersions struct { + added time.Time entryChs []FileInfoVersionsCh endWalkCh chan struct{} // To signal when mergeWalk go-routine should end. endTimerCh chan<- struct{} // To signal when timer go-routine should end. @@ -88,17 +89,70 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi t.Lock() defer t.Unlock() + // If we are above the limit delete at least one entry from the pool. + if len(t.pool) > treeWalkEntryLimit { + age := time.Now() + var oldest listParams + for k, v := range t.pool { + if len(v) == 0 { + delete(t.pool, k) + continue + } + // The first element is the oldest, so we only check that. + e := v[0] + if e.added.Before(age) { + oldest = k + age = e.added + } + } + // Invalidate and delete oldest. + if walks, ok := t.pool[oldest]; ok && len(walks) > 0 { + endCh := walks[0].endTimerCh + endWalkCh := walks[0].endWalkCh + if len(walks) > 1 { + // Move walks forward + copy(walks, walks[1:]) + walks = walks[:len(walks)-1] + t.pool[oldest] = walks + } else { + // Only entry, just delete. + delete(t.pool, oldest) + } + select { + case endCh <- struct{}{}: + close(endWalkCh) + default: + } + } else { + // Shouldn't happen, but just in case. + delete(t.pool, oldest) + } + } + // Should be a buffered channel so that Release() never blocks. endTimerCh := make(chan struct{}, 1) walkInfo := mergeWalkVersions{ + added: UTCNow(), entryChs: resultChs, endWalkCh: endWalkCh, endTimerCh: endTimerCh, } // Append new walk info. - t.pool[params] = append(t.pool[params], walkInfo) + walks := t.pool[params] + if len(walks) < treeWalkSameEntryLimit { + t.pool[params] = append(walks, walkInfo) + } else { + // We are at limit, invalidate oldest, move list down and add new as last. + select { + case walks[0].endTimerCh <- struct{}{}: + close(walks[0].endWalkCh) + default: + } + copy(walks, walks[1:]) + walks[len(walks)-1] = walkInfo + } // Timer go-routine which times out after t.timeOut seconds. go func(endTimerCh <-chan struct{}, walkInfo mergeWalkVersions) { @@ -108,6 +162,7 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi // Timeout has expired. Remove the mergeWalk from mergeWalkPool and // end the mergeWalk go-routine. t.Lock() + defer t.Unlock() walks, ok := t.pool[params] if ok { // Trick of filtering without allocating @@ -131,7 +186,6 @@ func (t *MergeWalkVersionsPool) Set(params listParams, resultChs []FileInfoVersi } // Signal the mergeWalk go-routine to die. close(endWalkCh) - t.Unlock() case <-endTimerCh: return } @@ -166,23 +220,22 @@ func (t *MergeWalkPool) Release(params listParams) ([]FileInfoCh, chan struct{}) t.Lock() defer t.Unlock() walks, ok := t.pool[params] // Pick the valid walks. - if ok { - if len(walks) > 0 { - // Pop out the first valid walk entry. - walk := walks[0] - walks[0] = mergeWalk{} // clear references. - walks = walks[1:] - if len(walks) > 0 { - t.pool[params] = walks - } else { - delete(t.pool, params) - } - walk.endTimerCh <- struct{}{} - return walk.entryChs, walk.endWalkCh - } + if !ok || len(walks) == 0 { + // Release return nil if params not found. + return nil, nil + } + + // Pop out the first valid walk entry. + walk := walks[0] + walks[0] = mergeWalk{} // clear references. + walks = walks[1:] + if len(walks) > 0 { + t.pool[params] = walks + } else { + delete(t.pool, params) } - // Release return nil if params not found. - return nil, nil + walk.endTimerCh <- struct{}{} + return walk.entryChs, walk.endWalkCh } // Set - adds a mergeWalk to the mergeWalkPool. @@ -207,27 +260,38 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh continue } // The first element is the oldest, so we only check that. - if v[0].added.Before(age) { + e := v[0] + if e.added.Before(age) { oldest = k + age = e.added } } // Invalidate and delete oldest. - if walks, ok := t.pool[oldest]; ok { - walk := walks[0] - walks[0] = mergeWalk{} // clear references. - walks = walks[1:] - if len(walks) > 0 { - t.pool[params] = walks + if walks, ok := t.pool[oldest]; ok && len(walks) > 0 { + endCh := walks[0].endTimerCh + endWalkCh := walks[0].endWalkCh + if len(walks) > 1 { + // Move walks forward + copy(walks, walks[1:]) + walks = walks[:len(walks)-1] + t.pool[oldest] = walks } else { - delete(t.pool, params) + // Only entry, just delete. + delete(t.pool, oldest) + } + select { + case endCh <- struct{}{}: + close(endWalkCh) + default: } - walk.endTimerCh <- struct{}{} + } else { + // Shouldn't happen, but just in case. + delete(t.pool, oldest) } } // Should be a buffered channel so that Release() never blocks. endTimerCh := make(chan struct{}, 1) - walkInfo := mergeWalk{ added: UTCNow(), entryChs: resultChs, @@ -241,7 +305,11 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh t.pool[params] = append(walks, walkInfo) } else { // We are at limit, invalidate oldest, move list down and add new as last. - walks[0].endTimerCh <- struct{}{} + select { + case walks[0].endTimerCh <- struct{}{}: + close(walks[0].endWalkCh) + default: + } copy(walks, walks[1:]) walks[len(walks)-1] = walkInfo } @@ -254,6 +322,7 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh // Timeout has expired. Remove the mergeWalk from mergeWalkPool and // end the mergeWalk go-routine. t.Lock() + defer t.Unlock() walks, ok := t.pool[params] if ok { // Trick of filtering without allocating @@ -277,7 +346,6 @@ func (t *MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh } // Signal the mergeWalk go-routine to die. close(endWalkCh) - t.Unlock() case <-endTimerCh: return } diff --git a/cmd/merge-walk-pool_test.go b/cmd/merge-walk-pool_test.go index ee127162c..7718b6ab9 100644 --- a/cmd/merge-walk-pool_test.go +++ b/cmd/merge-walk-pool_test.go @@ -1,5 +1,5 @@ /* - * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * MinIO Cloud Storage, (C) 2019,2020 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,42 @@ import ( "time" ) +// Test if tree walker go-routine is removed from the pool after timeout +// and that is available in the pool before the timeout. +func TestMergeWalkPoolVersionsBasic(t *testing.T) { + // Create a treeWalkPool + tw := NewMergeWalkVersionsPool(1 * time.Second) + + // Create sample params + params := listParams{ + bucket: "test-bucket", + } + + endWalkCh := make(chan struct{}) + // Add a treeWalk to the pool + tw.Set(params, []FileInfoVersionsCh{}, endWalkCh) + + // Wait for treeWalkPool timeout to happen + <-time.After(2 * time.Second) + if c1, _ := tw.Release(params); c1 != nil { + t.Error("treeWalk go-routine must have been freed") + } + + // Add the treeWalk back to the pool + endWalkCh = make(chan struct{}) + tw.Set(params, []FileInfoVersionsCh{}, endWalkCh) + + // Release the treeWalk before timeout + select { + case <-time.After(1 * time.Second): + break + default: + if c1, _ := tw.Release(params); c1 == nil { + t.Error("treeWalk go-routine got freed before timeout") + } + } +} + // Test if tree walker go-routine is removed from the pool after timeout // and that is available in the pool before the timeout. func TestMergeWalkPoolBasic(t *testing.T) { diff --git a/cmd/tree-walk-pool.go b/cmd/tree-walk-pool.go index 3eda45a02..a43086bfc 100644 --- a/cmd/tree-walk-pool.go +++ b/cmd/tree-walk-pool.go @@ -1,5 +1,5 @@ /* - * MinIO Cloud Storage, (C) 2016 MinIO, Inc. + * MinIO Cloud Storage, (C) 2016-2020 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -80,23 +80,21 @@ func (t *TreeWalkPool) Release(params listParams) (resultCh chan TreeWalkResult, t.mu.Lock() defer t.mu.Unlock() walks, ok := t.pool[params] // Pick the valid walks. - if ok { - if len(walks) > 0 { - // Pop out the first valid walk entry. - walk := walks[0] - walks[0] = treeWalk{} // clear references. - walks = walks[1:] - if len(walks) > 0 { - t.pool[params] = walks - } else { - delete(t.pool, params) - } - walk.endTimerCh <- struct{}{} - return walk.resultCh, walk.endWalkCh - } + if !ok || len(walks) == 0 { + // Release return nil if params not found. + return nil, nil } - // Release return nil if params not found. - return nil, nil + + // Pop out the first valid walk entry. + walk := walks[0] + walks = walks[1:] + if len(walks) > 0 { + t.pool[params] = walks + } else { + delete(t.pool, params) + } + walk.endTimerCh <- struct{}{} + return walk.resultCh, walk.endWalkCh } // Set - adds a treeWalk to the treeWalkPool. @@ -129,6 +127,7 @@ func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endW // Invalidate and delete oldest. if walks, ok := t.pool[oldest]; ok && len(walks) > 0 { endCh := walks[0].endTimerCh + endWalkCh := walks[0].endWalkCh if len(walks) > 1 { // Move walks forward copy(walks, walks[1:]) @@ -140,6 +139,7 @@ func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endW } select { case endCh <- struct{}{}: + close(endWalkCh) default: } } else { @@ -156,6 +156,7 @@ func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endW endWalkCh: endWalkCh, endTimerCh: endTimerCh, } + // Append new walk info. walks := t.pool[params] if len(walks) < treeWalkSameEntryLimit { @@ -164,6 +165,7 @@ func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endW // We are at limit, invalidate oldest, move list down and add new as last. select { case walks[0].endTimerCh <- struct{}{}: + close(walks[0].endWalkCh) default: } copy(walks, walks[1:]) @@ -171,7 +173,7 @@ func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endW } // Timer go-routine which times out after t.timeOut seconds. - go func(endTimerCh <-chan struct{}) { + go func(endTimerCh <-chan struct{}, walkInfo treeWalk) { select { // Wait until timeOut case <-time.After(t.timeOut): @@ -205,5 +207,5 @@ func (t *TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endW case <-endTimerCh: return } - }(endTimerCh) + }(endTimerCh, walkInfo) }