@ -17,7 +17,9 @@
package json
import (
"errors"
"io"
"sync"
"github.com/minio/minio/pkg/s3select/sql"
@ -67,7 +69,6 @@ func (r *Reader) Read(dst sql.Record) (sql.Record, error) {
// Close - closes underlying reader.
func ( r * Reader ) Close ( ) error {
// Close the input.
// Potentially racy if the stream decoder is still reading.
err := r . readCloser . Close ( )
for range r . valueCh {
// Drain values so we don't leak a goroutine.
@ -78,6 +79,7 @@ func (r *Reader) Close() error {
// NewReader - creates new JSON reader using readCloser.
func NewReader ( readCloser io . ReadCloser , args * ReaderArgs ) * Reader {
readCloser = & syncReadCloser { rc : readCloser }
d := jstream . NewDecoder ( readCloser , 0 ) . ObjectAsKVS ( )
return & Reader {
args : args ,
@ -86,3 +88,46 @@ func NewReader(readCloser io.ReadCloser, args *ReaderArgs) *Reader {
readCloser : readCloser ,
}
}
// syncReadCloser will wrap a readcloser and make it safe to call Close
// while reads are running.
// All read errors are also postponed until Close is called and
// io.EOF is returned instead.
type syncReadCloser struct {
rc io . ReadCloser
errMu sync . Mutex
err error
}
func ( pr * syncReadCloser ) Read ( p [ ] byte ) ( n int , err error ) {
// This ensures that Close will block until Read has completed.
// This allows another goroutine to close the reader.
pr . errMu . Lock ( )
defer pr . errMu . Unlock ( )
if pr . err != nil {
return 0 , io . EOF
}
n , pr . err = pr . rc . Read ( p )
if pr . err != nil {
// Translate any error into io.EOF, so we don't crash:
// https://github.com/bcicen/jstream/blob/master/scanner.go#L48
return n , io . EOF
}
return n , nil
}
var errClosed = errors . New ( "read after close" )
func ( pr * syncReadCloser ) Close ( ) error {
pr . errMu . Lock ( )
defer pr . errMu . Unlock ( )
if pr . err == errClosed {
return nil
}
if pr . err != nil {
return pr . err
}
pr . err = errClosed
return pr . rc . Close ( )
}