From 0422eda6a261010e0387307c2ea84b298f9df38f Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Wed, 25 Nov 2020 09:37:30 -0800 Subject: [PATCH] metacache: Always close block writer (#10973) In some cases a writer could be left behind unclosed, leaking compression blocks. Always close and set compression concurrency to 2 which should be fine to keep up. --- cmd/metacache-stream.go | 5 +++-- cmd/metacache-stream_test.go | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/cmd/metacache-stream.go b/cmd/metacache-stream.go index aff5ace87..fac52214a 100644 --- a/cmd/metacache-stream.go +++ b/cmd/metacache-stream.go @@ -76,7 +76,7 @@ func newMetacacheWriter(out io.Writer, blockSize int) *metacacheWriter { blockSize: blockSize, } w.creator = func() error { - s2w := s2.NewWriter(out, s2.WriterBlockSize(blockSize)) + s2w := s2.NewWriter(out, s2.WriterBlockSize(blockSize), s2.WriterConcurrency(2)) w.mw = msgp.NewWriter(s2w) w.creator = nil if err := w.mw.WriteByte(metacacheStreamVersion); err != nil { @@ -206,7 +206,7 @@ func (w *metacacheWriter) Close() error { func (w *metacacheWriter) Reset(out io.Writer) { w.streamErr = nil w.creator = func() error { - s2w := s2.NewWriter(out, s2.WriterBlockSize(w.blockSize)) + s2w := s2.NewWriter(out, s2.WriterBlockSize(w.blockSize), s2.WriterConcurrency(2)) w.mw = msgp.NewWriter(s2w) w.creator = nil if err := w.mw.WriteByte(metacacheStreamVersion); err != nil { @@ -757,6 +757,7 @@ func newMetacacheBlockWriter(in <-chan metaCacheEntry, nextBlock func(b *metacac var n int var buf bytes.Buffer block := newMetacacheWriter(&buf, 1<<20) + defer block.Close() finishBlock := func() { err := block.Close() if err != nil { diff --git a/cmd/metacache-stream_test.go b/cmd/metacache-stream_test.go index 7a6d9f1cf..6525b4e04 100644 --- a/cmd/metacache-stream_test.go +++ b/cmd/metacache-stream_test.go @@ -370,6 +370,7 @@ func Test_newMetacacheStream(t *testing.T) { r := loadMetacacheSample(t) var buf bytes.Buffer w := newMetacacheWriter(&buf, 1<<20) + defer w.Close() err := r.readFn(func(object metaCacheEntry) bool { err := w.write(object) if err != nil {