diff --git a/cmd/data-usage-cache.go b/cmd/data-usage-cache.go index d4aa29785..65c88db17 100644 --- a/cmd/data-usage-cache.go +++ b/cmd/data-usage-cache.go @@ -489,9 +489,12 @@ func (d *dataUsageCache) load(ctx context.Context, store objectIO, name string) // save the content of the cache to minioMetaBackgroundOpsBucket with the provided name. func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string) error { - b := d.serialize() - size := int64(len(b)) - r, err := hash.NewReader(bytes.NewReader(b), size, "", "", size, false) + pr, pw := io.Pipe() + go func() { + pw.CloseWithError(d.serializeTo(pw)) + }() + defer pr.Close() + r, err := hash.NewReader(pr, -1, "", "", -1, false) if err != nil { return err } @@ -513,32 +516,33 @@ func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string) const dataUsageCacheVer = 3 // serialize the contents of the cache. -func (d *dataUsageCache) serialize() []byte { - // Prepend version and compress. - dst := make([]byte, 0, d.Msgsize()+1) - dst = append(dst, dataUsageCacheVer) - buf := bytes.NewBuffer(dst) - enc, err := zstd.NewWriter(buf, +func (d *dataUsageCache) serializeTo(dst io.Writer) error { + // Add version and compress. + _, err := dst.Write([]byte{dataUsageCacheVer}) + if err != nil { + return err + } + enc, err := zstd.NewWriter(dst, zstd.WithEncoderLevel(zstd.SpeedFastest), zstd.WithWindowSize(1<<20), zstd.WithEncoderConcurrency(2)) if err != nil { - logger.LogIf(GlobalContext, err) - return nil + return err } mEnc := msgp.NewWriter(enc) err = d.EncodeMsg(mEnc) if err != nil { - logger.LogIf(GlobalContext, err) - return nil + return err + } + err = mEnc.Flush() + if err != nil { + return err } - mEnc.Flush() err = enc.Close() if err != nil { - logger.LogIf(GlobalContext, err) - return nil + return err } - return buf.Bytes() + return nil } // deserialize the supplied byte slice into the cache. diff --git a/cmd/data-usage_test.go b/cmd/data-usage_test.go index 7270f7bff..c24debd75 100644 --- a/cmd/data-usage_test.go +++ b/cmd/data-usage_test.go @@ -662,14 +662,17 @@ func TestDataUsageCacheSerialize(t *testing.T) { if err != nil { t.Fatal(err) } - - b := want.serialize() + var buf bytes.Buffer + err = want.serializeTo(&buf) + if err != nil { + t.Fatal(err) + } + t.Log("serialized size:", buf.Len(), "bytes") var got dataUsageCache - err = got.deserialize(bytes.NewBuffer(b)) + err = got.deserialize(&buf) if err != nil { t.Fatal(err) } - t.Log("serialized size:", len(b), "bytes") if got.Info.LastUpdate.IsZero() { t.Error("lastupdate not set") } diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index 9086d667c..717db4741 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -828,7 +828,7 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) { ReportNotFound: opts.reportNotFound, FilterPrefix: opts.filterPrefix}, w) w.CloseWithError(err) - if err != io.EOF { + if err != io.EOF && err != nil && err.Error() != errFileNotFound.Error() { logger.LogIf(ctx, err) } }() diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 2ce19eccf..8a3e6511c 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -28,6 +28,7 @@ import ( "path" "strconv" "strings" + "sync" "time" "github.com/minio/minio/cmd/http" @@ -170,18 +171,34 @@ func (client *storageRESTClient) Healing() bool { } func (client *storageRESTClient) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) { - b := cache.serialize() - respBody, err := client.call(ctx, storageRESTMethodCrawlAndGetDataUsage, url.Values{}, bytes.NewBuffer(b), int64(len(b))) + pr, pw := io.Pipe() + go func() { + pw.CloseWithError(cache.serializeTo(pw)) + }() + defer pr.Close() + respBody, err := client.call(ctx, storageRESTMethodCrawlAndGetDataUsage, url.Values{}, pr, -1) defer http.DrainBody(respBody) if err != nil { return cache, err } - reader, err := waitForHTTPResponse(respBody) + + var wg sync.WaitGroup + var newCache dataUsageCache + var decErr error + pr, pw = io.Pipe() + wg.Add(1) + go func() { + defer wg.Done() + decErr = newCache.deserialize(pr) + pr.CloseWithError(err) + }() + err = waitForHTTPStream(respBody, pw) + pw.CloseWithError(err) if err != nil { return cache, err } - var newCache dataUsageCache - return newCache, newCache.deserialize(reader) + wg.Wait() + return newCache, decErr } func (client *storageRESTClient) GetDiskID() (string, error) { diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 5390d5e81..8c8333b92 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -171,14 +171,13 @@ func (s *storageRESTServer) CrawlAndGetDataUsageHandler(w http.ResponseWriter, r return } - done := keepHTTPResponseAlive(w) + resp := streamHTTPResponse(w) usageInfo, err := s.storage.CrawlAndGetDataUsage(r.Context(), cache) - done(err) if err != nil { + resp.CloseWithError(err) return } - w.Write(usageInfo.serialize()) - w.(http.Flusher).Flush() + resp.CloseWithError(usageInfo.serializeTo(resp)) } // MakeVolHandler - make a volume. @@ -889,7 +888,6 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error { if err != nil { return err } - length := binary.LittleEndian.Uint32(tmp[:]) _, err = io.CopyN(w, respBody, int64(length)) if err != nil {