From 8c29f69b0026ab0e8bc242435b30c29fdfa58fc8 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 28 Sep 2018 00:44:59 -0700 Subject: [PATCH] Fix racy error communication inside go-routine (#6539) Use CloseWithError to communicate errors in pipe, this PR also fixes potential shadowing of error --- cmd/disk-cache.go | 7 +--- cmd/object-handlers.go | 84 ++++++++++++++++-------------------------- cmd/web-handlers.go | 48 ++++++++---------------- 3 files changed, 50 insertions(+), 89 deletions(-) diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index 6d08233a5..845a5b8ef 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -319,11 +319,8 @@ func (c cacheObjects) GetObject(ctx context.Context, bucket, object string, star return err } go func() { - if err = GetObjectFn(ctx, bucket, object, 0, objInfo.Size, io.MultiWriter(writer, pipeWriter), etag, opts); err != nil { - pipeWriter.CloseWithError(err) - return - } - pipeWriter.Close() // Close writer explicitly signaling we wrote all data. + gerr := GetObjectFn(ctx, bucket, object, 0, objInfo.Size, io.MultiWriter(writer, pipeWriter), etag, opts) + pipeWriter.CloseWithError(gerr) // Close writer explicitly signaling we wrote all data. }() err = dcache.Put(ctx, bucket, object, hashReader, c.getMetadata(objInfo), opts) if err != nil { diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 913b21723..203ffcd2a 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -738,25 +738,20 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re // No need to compress for remote etcd calls // Pass the decompressed stream to such calls. if srcInfo.IsCompressed() && !isRemoteCallRequired(ctx, srcBucket, dstBucket, objectAPI) { - var sreader io.Reader - var swriter io.Writer - // Open a pipe for compression. - // Where snappyWriter is piped to srcInfo.Reader. - // gr writes to snappyWriter. - snappyReader, snappyWriter := io.Pipe() - reader = snappyReader + // Where pipeWriter is piped to srcInfo.Reader. + // gr writes to pipeWriter. + pipeReader, pipeWriter := io.Pipe() + reader = pipeReader length = -1 - swriter = snappy.NewWriter(snappyWriter) - sreader = gr + snappyWriter := snappy.NewWriter(pipeWriter) go func() { - defer snappyWriter.Close() // Compress the decompressed source object. - if _, err = io.Copy(swriter, sreader); err != nil { - return - } + _, cerr := io.Copy(snappyWriter, gr) + snappyWriter.Close() + pipeWriter.CloseWithError(cerr) }() } else { // Remove the metadata for remote calls. @@ -1102,7 +1097,6 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req } } - var hashError error actualSize := size if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) && size > 0 { @@ -1113,21 +1107,21 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req pipeReader, pipeWriter := io.Pipe() snappyWriter := snappy.NewWriter(pipeWriter) - actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize) + var actualReader *hash.Reader + actualReader, err = hash.NewReader(reader, size, md5hex, sha256hex, actualSize) if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) return } + go func() { - defer pipeWriter.Close() - defer snappyWriter.Close() // Writing to the compressed writer. - _, err = io.CopyN(snappyWriter, actualReader, actualSize) - if err != nil { - hashError = err - return - } + _, cerr := io.CopyN(snappyWriter, actualReader, actualSize) + snappyWriter.Close() + pipeWriter.CloseWithError(cerr) + }() + // Set compression metrics. size = -1 // Since compressed size is un-predictable. md5hex = "" // Do not try to verify the content. @@ -1197,15 +1191,6 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req } } - if hashError != nil { - if hashError == io.ErrUnexpectedEOF { - writeErrorResponse(w, ErrIncompleteBody, r.URL) - } else { - writeErrorResponse(w, toAPIErrorCode(hashError), r.URL) - } - return - } - writeSuccessResponseHeadersOnly(w) // Get host and port from Request.RemoteAddr. @@ -1458,25 +1443,20 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt // Need to decompress only for range-enabled copy parts. if srcInfo.IsCompressed() && rangeHeader != "" { - var sreader io.Reader - var swriter io.Writer - // Open a pipe for compression. - // Where snappyWriter is piped to srcInfo.Reader. - // gr writes to snappyWriter. - snappyReader, snappyWriter := io.Pipe() - reader = snappyReader + // Where pipeWriter is piped to srcInfo.Reader. + // gr writes to pipeWriter. + pipeReader, pipeWriter := io.Pipe() + reader = pipeReader length = -1 - swriter = snappy.NewWriter(snappyWriter) - sreader = gr + snappyWriter := snappy.NewWriter(pipeWriter) go func() { - defer snappyWriter.Close() // Compress the decompressed source object. - if _, err = io.Copy(swriter, sreader); err != nil { - return - } + _, cerr := io.Copy(snappyWriter, gr) + snappyWriter.Close() + pipeWriter.CloseWithError(cerr) }() } else { reader = gr @@ -1692,21 +1672,21 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http if objectAPI.IsCompressionSupported() && compressPart { pipeReader, pipeWriter = io.Pipe() snappyWriter := snappy.NewWriter(pipeWriter) - actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize) + + var actualReader *hash.Reader + actualReader, err = hash.NewReader(reader, size, md5hex, sha256hex, actualSize) if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) return } + go func() { - defer pipeWriter.Close() - defer snappyWriter.Close() // Writing to the compressed writer. - _, err = io.CopyN(snappyWriter, actualReader, actualSize) - if err != nil { - // The ErrorResponse is already written in putObjectPart Handle. - return - } + _, cerr := io.CopyN(snappyWriter, actualReader, actualSize) + snappyWriter.Close() + pipeWriter.CloseWithError(cerr) }() + // Set compression metrics. size = -1 // Since compressed size is un-predictable. md5hex = "" // Do not try to verify the content. diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index 45ca7594d..a4b693798 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -651,7 +651,6 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { } reader := r.Body - var hashError error actualSize := size if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) && size > 0 { @@ -662,21 +661,20 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { pipeReader, pipeWriter := io.Pipe() snappyWriter := snappy.NewWriter(pipeWriter) - actualReader, err := hash.NewReader(reader, size, "", "", actualSize) + var actualReader *hash.Reader + actualReader, err = hash.NewReader(reader, size, "", "", actualSize) if err != nil { writeWebErrorResponse(w, err) return } + go func() { - defer pipeWriter.Close() - defer snappyWriter.Close() // Writing to the compressed writer. - _, err = io.CopyN(snappyWriter, actualReader, actualSize) - if err != nil { - hashError = err - return - } + _, cerr := io.CopyN(snappyWriter, actualReader, actualSize) + snappyWriter.Close() + pipeWriter.CloseWithError(cerr) }() + // Set compression metrics. size = -1 // Since compressed size is un-predictable. reader = pipeReader @@ -702,11 +700,6 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { return } - if hashError != nil { - writeWebErrorResponse(w, hashError) - return - } - // Notify object created event. sendEvent(eventArgs{ EventName: event.ObjectCreatedPut, @@ -776,18 +769,14 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) { } var startOffset int64 var writer io.Writer - var decompressReader *io.PipeReader - var compressWriter *io.PipeWriter if objInfo.IsCompressed() { - var pipeErr error - // The decompress metrics are set. snappyStartOffset := 0 snappyLength := actualSize // Open a pipe for compression // Where compressWriter is actually passed to the getObject - decompressReader, compressWriter = io.Pipe() + decompressReader, compressWriter := io.Pipe() snappyReader := snappy.NewReader(decompressReader) // The limit is set to the actual size. @@ -795,13 +784,13 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) { wg.Add(1) //For closures. go func() { defer wg.Done() + // Finally, writes to the client. - if _, pipeErr = io.Copy(responseWriter, snappyReader); pipeErr != nil { - return - } + _, perr := io.Copy(responseWriter, snappyReader) + // Close the compressWriter if the data is read already. // Closing the pipe, releases the writer passed to the getObject. - compressWriter.Close() + compressWriter.CloseWithError(perr) }() writer = compressWriter } else { @@ -906,8 +895,6 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) { opts := ObjectOptions{} var length int64 for _, object := range args.Objects { - var decompressReader *io.PipeReader - var compressWriter *io.PipeWriter // Writes compressed object file to the response. zipit := func(objectName string) error { info, err := getObjectInfo(context.Background(), args.BucketName, objectName, opts) @@ -947,15 +934,13 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) { var writer io.Writer if info.IsCompressed() { - var pipeErr error - // The decompress metrics are set. snappyStartOffset := 0 snappyLength := actualSize // Open a pipe for compression // Where compressWriter is actually passed to the getObject - decompressReader, compressWriter = io.Pipe() + decompressReader, compressWriter := io.Pipe() snappyReader := snappy.NewReader(decompressReader) // The limit is set to the actual size. @@ -964,12 +949,11 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) { go func() { defer wg.Done() // Finally, writes to the client. - if _, pipeErr = io.Copy(responseWriter, snappyReader); pipeErr != nil { - return - } + _, perr := io.Copy(responseWriter, snappyReader) + // Close the compressWriter if the data is read already. // Closing the pipe, releases the writer passed to the getObject. - compressWriter.Close() + compressWriter.CloseWithError(perr) }() writer = compressWriter } else {