Fix racy error communication inside go-routine (#6539)

Use CloseWithError to communicate errors in pipe,
this PR also fixes potential shadowing of error
master
Harshavardhana 6 years ago committed by Nitish Tiwari
parent ce9d36d954
commit 8c29f69b00
  1. 7
      cmd/disk-cache.go
  2. 84
      cmd/object-handlers.go
  3. 48
      cmd/web-handlers.go

@ -319,11 +319,8 @@ func (c cacheObjects) GetObject(ctx context.Context, bucket, object string, star
return err return err
} }
go func() { go func() {
if err = GetObjectFn(ctx, bucket, object, 0, objInfo.Size, io.MultiWriter(writer, pipeWriter), etag, opts); err != nil { gerr := GetObjectFn(ctx, bucket, object, 0, objInfo.Size, io.MultiWriter(writer, pipeWriter), etag, opts)
pipeWriter.CloseWithError(err) pipeWriter.CloseWithError(gerr) // Close writer explicitly signaling we wrote all data.
return
}
pipeWriter.Close() // Close writer explicitly signaling we wrote all data.
}() }()
err = dcache.Put(ctx, bucket, object, hashReader, c.getMetadata(objInfo), opts) err = dcache.Put(ctx, bucket, object, hashReader, c.getMetadata(objInfo), opts)
if err != nil { if err != nil {

@ -738,25 +738,20 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
// No need to compress for remote etcd calls // No need to compress for remote etcd calls
// Pass the decompressed stream to such calls. // Pass the decompressed stream to such calls.
if srcInfo.IsCompressed() && !isRemoteCallRequired(ctx, srcBucket, dstBucket, objectAPI) { if srcInfo.IsCompressed() && !isRemoteCallRequired(ctx, srcBucket, dstBucket, objectAPI) {
var sreader io.Reader
var swriter io.Writer
// Open a pipe for compression. // Open a pipe for compression.
// Where snappyWriter is piped to srcInfo.Reader. // Where pipeWriter is piped to srcInfo.Reader.
// gr writes to snappyWriter. // gr writes to pipeWriter.
snappyReader, snappyWriter := io.Pipe() pipeReader, pipeWriter := io.Pipe()
reader = snappyReader reader = pipeReader
length = -1 length = -1
swriter = snappy.NewWriter(snappyWriter) snappyWriter := snappy.NewWriter(pipeWriter)
sreader = gr
go func() { go func() {
defer snappyWriter.Close()
// Compress the decompressed source object. // Compress the decompressed source object.
if _, err = io.Copy(swriter, sreader); err != nil { _, cerr := io.Copy(snappyWriter, gr)
return snappyWriter.Close()
} pipeWriter.CloseWithError(cerr)
}() }()
} else { } else {
// Remove the metadata for remote calls. // Remove the metadata for remote calls.
@ -1102,7 +1097,6 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
} }
} }
var hashError error
actualSize := size actualSize := size
if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) && size > 0 { if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) && size > 0 {
@ -1113,21 +1107,21 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
pipeReader, pipeWriter := io.Pipe() pipeReader, pipeWriter := io.Pipe()
snappyWriter := snappy.NewWriter(pipeWriter) snappyWriter := snappy.NewWriter(pipeWriter)
actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize) var actualReader *hash.Reader
actualReader, err = hash.NewReader(reader, size, md5hex, sha256hex, actualSize)
if err != nil { if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL) writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return return
} }
go func() { go func() {
defer pipeWriter.Close()
defer snappyWriter.Close()
// Writing to the compressed writer. // Writing to the compressed writer.
_, err = io.CopyN(snappyWriter, actualReader, actualSize) _, cerr := io.CopyN(snappyWriter, actualReader, actualSize)
if err != nil { snappyWriter.Close()
hashError = err pipeWriter.CloseWithError(cerr)
return
}
}() }()
// Set compression metrics. // Set compression metrics.
size = -1 // Since compressed size is un-predictable. size = -1 // Since compressed size is un-predictable.
md5hex = "" // Do not try to verify the content. md5hex = "" // Do not try to verify the content.
@ -1197,15 +1191,6 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
} }
} }
if hashError != nil {
if hashError == io.ErrUnexpectedEOF {
writeErrorResponse(w, ErrIncompleteBody, r.URL)
} else {
writeErrorResponse(w, toAPIErrorCode(hashError), r.URL)
}
return
}
writeSuccessResponseHeadersOnly(w) writeSuccessResponseHeadersOnly(w)
// Get host and port from Request.RemoteAddr. // Get host and port from Request.RemoteAddr.
@ -1458,25 +1443,20 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
// Need to decompress only for range-enabled copy parts. // Need to decompress only for range-enabled copy parts.
if srcInfo.IsCompressed() && rangeHeader != "" { if srcInfo.IsCompressed() && rangeHeader != "" {
var sreader io.Reader
var swriter io.Writer
// Open a pipe for compression. // Open a pipe for compression.
// Where snappyWriter is piped to srcInfo.Reader. // Where pipeWriter is piped to srcInfo.Reader.
// gr writes to snappyWriter. // gr writes to pipeWriter.
snappyReader, snappyWriter := io.Pipe() pipeReader, pipeWriter := io.Pipe()
reader = snappyReader reader = pipeReader
length = -1 length = -1
swriter = snappy.NewWriter(snappyWriter) snappyWriter := snappy.NewWriter(pipeWriter)
sreader = gr
go func() { go func() {
defer snappyWriter.Close()
// Compress the decompressed source object. // Compress the decompressed source object.
if _, err = io.Copy(swriter, sreader); err != nil { _, cerr := io.Copy(snappyWriter, gr)
return snappyWriter.Close()
} pipeWriter.CloseWithError(cerr)
}() }()
} else { } else {
reader = gr reader = gr
@ -1692,21 +1672,21 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
if objectAPI.IsCompressionSupported() && compressPart { if objectAPI.IsCompressionSupported() && compressPart {
pipeReader, pipeWriter = io.Pipe() pipeReader, pipeWriter = io.Pipe()
snappyWriter := snappy.NewWriter(pipeWriter) snappyWriter := snappy.NewWriter(pipeWriter)
actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize)
var actualReader *hash.Reader
actualReader, err = hash.NewReader(reader, size, md5hex, sha256hex, actualSize)
if err != nil { if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL) writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return return
} }
go func() { go func() {
defer pipeWriter.Close()
defer snappyWriter.Close()
// Writing to the compressed writer. // Writing to the compressed writer.
_, err = io.CopyN(snappyWriter, actualReader, actualSize) _, cerr := io.CopyN(snappyWriter, actualReader, actualSize)
if err != nil { snappyWriter.Close()
// The ErrorResponse is already written in putObjectPart Handle. pipeWriter.CloseWithError(cerr)
return
}
}() }()
// Set compression metrics. // Set compression metrics.
size = -1 // Since compressed size is un-predictable. size = -1 // Since compressed size is un-predictable.
md5hex = "" // Do not try to verify the content. md5hex = "" // Do not try to verify the content.

@ -651,7 +651,6 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
} }
reader := r.Body reader := r.Body
var hashError error
actualSize := size actualSize := size
if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) && size > 0 { if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) && size > 0 {
@ -662,21 +661,20 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
pipeReader, pipeWriter := io.Pipe() pipeReader, pipeWriter := io.Pipe()
snappyWriter := snappy.NewWriter(pipeWriter) snappyWriter := snappy.NewWriter(pipeWriter)
actualReader, err := hash.NewReader(reader, size, "", "", actualSize) var actualReader *hash.Reader
actualReader, err = hash.NewReader(reader, size, "", "", actualSize)
if err != nil { if err != nil {
writeWebErrorResponse(w, err) writeWebErrorResponse(w, err)
return return
} }
go func() { go func() {
defer pipeWriter.Close()
defer snappyWriter.Close()
// Writing to the compressed writer. // Writing to the compressed writer.
_, err = io.CopyN(snappyWriter, actualReader, actualSize) _, cerr := io.CopyN(snappyWriter, actualReader, actualSize)
if err != nil { snappyWriter.Close()
hashError = err pipeWriter.CloseWithError(cerr)
return
}
}() }()
// Set compression metrics. // Set compression metrics.
size = -1 // Since compressed size is un-predictable. size = -1 // Since compressed size is un-predictable.
reader = pipeReader reader = pipeReader
@ -702,11 +700,6 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
return return
} }
if hashError != nil {
writeWebErrorResponse(w, hashError)
return
}
// Notify object created event. // Notify object created event.
sendEvent(eventArgs{ sendEvent(eventArgs{
EventName: event.ObjectCreatedPut, EventName: event.ObjectCreatedPut,
@ -776,18 +769,14 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) {
} }
var startOffset int64 var startOffset int64
var writer io.Writer var writer io.Writer
var decompressReader *io.PipeReader
var compressWriter *io.PipeWriter
if objInfo.IsCompressed() { if objInfo.IsCompressed() {
var pipeErr error
// The decompress metrics are set. // The decompress metrics are set.
snappyStartOffset := 0 snappyStartOffset := 0
snappyLength := actualSize snappyLength := actualSize
// Open a pipe for compression // Open a pipe for compression
// Where compressWriter is actually passed to the getObject // Where compressWriter is actually passed to the getObject
decompressReader, compressWriter = io.Pipe() decompressReader, compressWriter := io.Pipe()
snappyReader := snappy.NewReader(decompressReader) snappyReader := snappy.NewReader(decompressReader)
// The limit is set to the actual size. // The limit is set to the actual size.
@ -795,13 +784,13 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) {
wg.Add(1) //For closures. wg.Add(1) //For closures.
go func() { go func() {
defer wg.Done() defer wg.Done()
// Finally, writes to the client. // Finally, writes to the client.
if _, pipeErr = io.Copy(responseWriter, snappyReader); pipeErr != nil { _, perr := io.Copy(responseWriter, snappyReader)
return
}
// Close the compressWriter if the data is read already. // Close the compressWriter if the data is read already.
// Closing the pipe, releases the writer passed to the getObject. // Closing the pipe, releases the writer passed to the getObject.
compressWriter.Close() compressWriter.CloseWithError(perr)
}() }()
writer = compressWriter writer = compressWriter
} else { } else {
@ -906,8 +895,6 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
opts := ObjectOptions{} opts := ObjectOptions{}
var length int64 var length int64
for _, object := range args.Objects { for _, object := range args.Objects {
var decompressReader *io.PipeReader
var compressWriter *io.PipeWriter
// Writes compressed object file to the response. // Writes compressed object file to the response.
zipit := func(objectName string) error { zipit := func(objectName string) error {
info, err := getObjectInfo(context.Background(), args.BucketName, objectName, opts) info, err := getObjectInfo(context.Background(), args.BucketName, objectName, opts)
@ -947,15 +934,13 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
var writer io.Writer var writer io.Writer
if info.IsCompressed() { if info.IsCompressed() {
var pipeErr error
// The decompress metrics are set. // The decompress metrics are set.
snappyStartOffset := 0 snappyStartOffset := 0
snappyLength := actualSize snappyLength := actualSize
// Open a pipe for compression // Open a pipe for compression
// Where compressWriter is actually passed to the getObject // Where compressWriter is actually passed to the getObject
decompressReader, compressWriter = io.Pipe() decompressReader, compressWriter := io.Pipe()
snappyReader := snappy.NewReader(decompressReader) snappyReader := snappy.NewReader(decompressReader)
// The limit is set to the actual size. // The limit is set to the actual size.
@ -964,12 +949,11 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
go func() { go func() {
defer wg.Done() defer wg.Done()
// Finally, writes to the client. // Finally, writes to the client.
if _, pipeErr = io.Copy(responseWriter, snappyReader); pipeErr != nil { _, perr := io.Copy(responseWriter, snappyReader)
return
}
// Close the compressWriter if the data is read already. // Close the compressWriter if the data is read already.
// Closing the pipe, releases the writer passed to the getObject. // Closing the pipe, releases the writer passed to the getObject.
compressWriter.Close() compressWriter.CloseWithError(perr)
}() }()
writer = compressWriter writer = compressWriter
} else { } else {

Loading…
Cancel
Save