From ef132c5714962896bcd78804f6b48ee59a8845c7 Mon Sep 17 00:00:00 2001 From: Kale Blankenship Date: Tue, 5 Mar 2019 08:35:37 -0800 Subject: [PATCH] Replace snappy.Writer/io.Pipe with snappyCompressReader. (#7316) Prevents deferred close functions from being called while still attempting to copy reader to snappyWriter. Reduces code duplication when compressing objects. --- cmd/object-api-utils.go | 51 ++++++++++++++++++++++++++++ cmd/object-api-utils_test.go | 59 ++++++++++++++++++++++++++++++++ cmd/object-handlers.go | 65 ++++-------------------------------- cmd/web-handlers.go | 15 ++------- 4 files changed, 119 insertions(+), 71 deletions(-) diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index ece90be5c..a98427a41 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -675,3 +675,54 @@ func CleanMinioInternalMetadataKeys(metadata map[string]string) map[string]strin } return newMeta } + +// snappyCompressReader compresses data as it reads +// from the underlying io.Reader. +type snappyCompressReader struct { + r io.Reader + w *snappy.Writer + closed bool + buf bytes.Buffer +} + +func newSnappyCompressReader(r io.Reader) *snappyCompressReader { + cr := &snappyCompressReader{r: r} + cr.w = snappy.NewBufferedWriter(&cr.buf) + return cr +} + +func (cr *snappyCompressReader) Read(p []byte) (int, error) { + if cr.closed { + // if snappy writer is closed r has been completely read, + // return any remaining data in buf. + return cr.buf.Read(p) + } + + // read from original using p as buffer + nr, readErr := cr.r.Read(p) + + // write read bytes to snappy writer + nw, err := cr.w.Write(p[:nr]) + if err != nil { + return 0, err + } + if nw != nr { + return 0, io.ErrShortWrite + } + + // if last of data from reader, close snappy writer to flush + if readErr == io.EOF { + err := cr.w.Close() + cr.closed = true + if err != nil { + return 0, err + } + } + + // read compressed bytes out of buf + n, err := cr.buf.Read(p) + if readErr != io.EOF && (err == nil || err == io.EOF) { + err = readErr + } + return n, err +} diff --git a/cmd/object-api-utils_test.go b/cmd/object-api-utils_test.go index 8ed6201e9..e36ee9b4e 100644 --- a/cmd/object-api-utils_test.go +++ b/cmd/object-api-utils_test.go @@ -17,10 +17,14 @@ package cmd import ( + "bytes" "context" + "io" "net/http" "reflect" "testing" + + "github.com/golang/snappy" ) // Tests validate bucket name. @@ -544,3 +548,58 @@ func TestGetCompressedOffsets(t *testing.T) { } } } + +func TestSnappyCompressReader(t *testing.T) { + tests := []struct { + name string + data []byte + }{ + {name: "empty", data: nil}, + {name: "small", data: []byte("hello, world")}, + {name: "large", data: bytes.Repeat([]byte("hello, world"), 1000)}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + buf := make([]byte, 100) // make small buffer to ensure multiple reads are required for large case + + r := newSnappyCompressReader(bytes.NewReader(tt.data)) + + var rdrBuf bytes.Buffer + _, err := io.CopyBuffer(&rdrBuf, r, buf) + if err != nil { + t.Fatal(err) + } + + var stdBuf bytes.Buffer + w := snappy.NewBufferedWriter(&stdBuf) + _, err = io.CopyBuffer(w, bytes.NewReader(tt.data), buf) + if err != nil { + t.Fatal(err) + } + err = w.Close() + if err != nil { + t.Fatal(err) + } + + var ( + got = rdrBuf.Bytes() + want = stdBuf.Bytes() + ) + if !bytes.Equal(got, want) { + t.Errorf("encoded data does not match\n\t%q\n\t%q", got, want) + } + + var decBuf bytes.Buffer + decRdr := snappy.NewReader(&rdrBuf) + _, err = io.Copy(&decBuf, decRdr) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(tt.data, decBuf.Bytes()) { + t.Errorf("roundtrip failed\n\t%q\n\t%q", tt.data, decBuf.Bytes()) + } + }) + } +} diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index f79a70adb..2e2812ae9 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -33,7 +33,6 @@ import ( "time" - snappy "github.com/golang/snappy" "github.com/gorilla/mux" miniogo "github.com/minio/minio-go" "github.com/minio/minio-go/pkg/encrypt" @@ -831,21 +830,9 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re // Remove all source encrypted related metadata to // avoid copying them in target object. crypto.RemoveInternalEntries(srcInfo.UserDefined) - // Open a pipe for compression. - // Where pipeWriter is piped to srcInfo.Reader. - // gr writes to pipeWriter. - pipeReader, pipeWriter := io.Pipe() - reader = pipeReader - length = -1 - - snappyWriter := snappy.NewBufferedWriter(pipeWriter) - go func() { - // Compress the decompressed source object. - _, cerr := io.Copy(snappyWriter, gr) - snappyWriter.Close() - pipeWriter.CloseWithError(cerr) - }() + reader = newSnappyCompressReader(gr) + length = -1 } else { // Remove the metadata for remote calls. delete(srcInfo.UserDefined, ReservedMetadataPrefix+"compression") @@ -1216,28 +1203,17 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV1 metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10) - pipeReader, pipeWriter := io.Pipe() - snappyWriter := snappy.NewBufferedWriter(pipeWriter) - - var actualReader *hash.Reader - actualReader, err = hash.NewReader(reader, size, md5hex, sha256hex, actualSize) + actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } - go func() { - // Writing to the compressed writer. - _, cerr := io.CopyN(snappyWriter, actualReader, actualSize) - snappyWriter.Close() - pipeWriter.CloseWithError(cerr) - }() - // Set compression metrics. + reader = newSnappyCompressReader(actualReader) size = -1 // Since compressed size is un-predictable. md5hex = "" // Do not try to verify the content. sha256hex = "" - reader = pipeReader } hashReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize) @@ -1654,21 +1630,8 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt isCompressed := compressPart // Compress only if the compression is enabled during initial multipart. if isCompressed { - // Open a pipe for compression. - // Where pipeWriter is piped to srcInfo.Reader. - // gr writes to pipeWriter. - pipeReader, pipeWriter := io.Pipe() - reader = pipeReader + reader = newSnappyCompressReader(gr) length = -1 - - snappyWriter := snappy.NewBufferedWriter(pipeWriter) - - go func() { - // Compress the decompressed source object. - _, cerr := io.Copy(snappyWriter, gr) - snappyWriter.Close() - pipeWriter.CloseWithError(cerr) - }() } else { reader = gr } @@ -1879,8 +1842,6 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http } actualSize := size - var pipeReader *io.PipeReader - var pipeWriter *io.PipeWriter // get encryption options var opts ObjectOptions @@ -1902,28 +1863,17 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http isCompressed := false if objectAPI.IsCompressionSupported() && compressPart { - pipeReader, pipeWriter = io.Pipe() - snappyWriter := snappy.NewBufferedWriter(pipeWriter) - - var actualReader *hash.Reader - actualReader, err = hash.NewReader(reader, size, md5hex, sha256hex, actualSize) + actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize) if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } - go func() { - // Writing to the compressed writer. - _, cerr := io.CopyN(snappyWriter, actualReader, actualSize) - snappyWriter.Close() - pipeWriter.CloseWithError(cerr) - }() - // Set compression metrics. + reader = newSnappyCompressReader(actualReader) size = -1 // Since compressed size is un-predictable. md5hex = "" // Do not try to verify the content. sha256hex = "" - reader = pipeReader isCompressed = true } @@ -2016,7 +1966,6 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http etag := partInfo.ETag if isCompressed { - pipeWriter.Close() // Suppress compressed ETag. etag = partInfo.ETag + "-1" } else if isEncrypted { diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index 58fda6087..a328eb5ec 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -912,26 +912,15 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV1 metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10) - pipeReader, pipeWriter := io.Pipe() - snappyWriter := snappy.NewBufferedWriter(pipeWriter) - - var actualReader *hash.Reader - actualReader, err = hash.NewReader(reader, size, "", "", actualSize) + actualReader, err := hash.NewReader(reader, size, "", "", actualSize) if err != nil { writeWebErrorResponse(w, err) return } - go func() { - // Writing to the compressed writer. - _, cerr := io.CopyN(snappyWriter, actualReader, actualSize) - snappyWriter.Close() - pipeWriter.CloseWithError(cerr) - }() - // Set compression metrics. size = -1 // Since compressed size is un-predictable. - reader = pipeReader + reader = newSnappyCompressReader(actualReader) hashReader, err = hash.NewReader(reader, size, "", "", actualSize) if err != nil { writeWebErrorResponse(w, err)