diff --git a/pkg/s3select/csv/errors.go b/pkg/s3select/csv/errors.go index b01a448d9..eba0cbe1f 100644 --- a/pkg/s3select/csv/errors.go +++ b/pkg/s3select/csv/errors.go @@ -16,6 +16,8 @@ package csv +import "errors" + type s3Error struct { code string message string @@ -51,3 +53,12 @@ func errCSVParsingError(err error) *s3Error { cause: err, } } + +func errInvalidTextEncodingError() *s3Error { + return &s3Error{ + code: "InvalidTextEncoding", + message: "UTF-8 encoding is required.", + statusCode: 400, + cause: errors.New("invalid utf8 encoding"), + } +} diff --git a/pkg/s3select/csv/reader.go b/pkg/s3select/csv/reader.go index 84b5deeb9..12bedfdf2 100644 --- a/pkg/s3select/csv/reader.go +++ b/pkg/s3select/csv/reader.go @@ -23,6 +23,7 @@ import ( "io" "runtime" "sync" + "unicode/utf8" csv "github.com/minio/minio/pkg/csvparser" "github.com/minio/minio/pkg/s3select/sql" @@ -159,6 +160,9 @@ func (r *Reader) startReaders(newReader func(io.Reader) *csv.Reader) error { r.err = err return err } + if !utf8.Valid(b) { + return errInvalidTextEncodingError() + } reader := newReader(bytes.NewReader(b)) record, err := reader.Read() if err != nil { @@ -181,6 +185,13 @@ func (r *Reader) startReaders(newReader func(io.Reader) *csv.Reader) error { return make([]byte, csvSplitSize+1024) } + // Return first block + next, nextErr := r.nextSplit(csvSplitSize, r.bufferPool.Get().([]byte)) + // Check if first block is valid. + if !utf8.Valid(next) { + return errInvalidTextEncodingError() + } + // Create queue r.queue = make(chan *queueItem, runtime.GOMAXPROCS(0)) r.input = make(chan *queueItem, runtime.GOMAXPROCS(0)) @@ -192,11 +203,10 @@ func (r *Reader) startReaders(newReader func(io.Reader) *csv.Reader) error { defer close(r.queue) defer r.readerWg.Done() for { - next, err := r.nextSplit(csvSplitSize, r.bufferPool.Get().([]byte)) q := queueItem{ input: next, dst: make(chan [][]string, 1), - err: err, + err: nextErr, } select { case <-r.close: @@ -209,10 +219,11 @@ func (r *Reader) startReaders(newReader func(io.Reader) *csv.Reader) error { return case r.input <- &q: } - if err != nil { + if nextErr != nil { // Exit on any error. return } + next, nextErr = r.nextSplit(csvSplitSize, r.bufferPool.Get().([]byte)) } }()