sql: Add support of escape quote in CSV (#9231)

This commit modifies csv parser, a fork of golang csv
parser to support a custom quote escape character.

The quote escape character is used to escape the quote
character when a csv field contains a quote character
as part of data.
master
Anis Elleuch 5 years ago committed by GitHub
parent 7de29e6e6b
commit 9902c9baaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 71
      mint/run/core/s3select/tests.py
  2. 31
      pkg/csvparser/reader.go
  3. 10
      pkg/csvparser/writer.go
  4. 20
      pkg/s3select/csv/args.go
  5. 1
      pkg/s3select/csv/reader.go
  6. 25
      pkg/s3select/csv/reader_test.go
  7. 9
      pkg/s3select/csv/record.go
  8. 9
      pkg/s3select/json/record.go
  9. 11
      pkg/s3select/select.go
  10. 11
      pkg/s3select/simdj/reader_test.go
  11. 9
      pkg/s3select/simdj/record.go
  12. 10
      pkg/s3select/sql/record.go

@ -111,29 +111,39 @@ def generate_bucket_name():
return "s3select-test-" + uuid.uuid4().__str__()
def test_csv_input_quote_char(client, log_output):
def test_csv_input_custom_quote_char(client, log_output):
# Get a unique bucket_name and object_name
log_output.args['bucket_name'] = bucket_name = generate_bucket_name()
tests = [
# Invalid quote character, should fail
('""', b'col1,col2,col3\n', Exception()),
('""', '"', b'col1,col2,col3\n', Exception()),
# UTF-8 quote character
('ع', b'\xd8\xb9col1\xd8\xb9,\xd8\xb9col2\xd8\xb9,\xd8\xb9col3\xd8\xb9\n', b'{"_1":"col1","_2":"col2","_3":"col3"}\n'),
('ع', '"', b'\xd8\xb9col1\xd8\xb9,\xd8\xb9col2\xd8\xb9,\xd8\xb9col3\xd8\xb9\n', b'{"_1":"col1","_2":"col2","_3":"col3"}\n'),
# Only one field is quoted
('"', b'"col1",col2,col3\n', b'{"_1":"col1","_2":"col2","_3":"col3"}\n'),
('"', b'"col1,col2,col3"\n', b'{"_1":"col1,col2,col3"}\n'),
('\'', b'"col1",col2,col3\n', b'{"_1":"\\"col1\\"","_2":"col2","_3":"col3"}\n'),
('', b'"col1",col2,col3\n', b'{"_1":"\\"col1\\"","_2":"col2","_3":"col3"}\n'),
('', b'"col1",col2,col3\n', b'{"_1":"\\"col1\\"","_2":"col2","_3":"col3"}\n'),
('', b'"col1","col2","col3"\n', b'{"_1":"\\"col1\\"","_2":"\\"col2\\"","_3":"\\"col3\\""}\n'),
('"', b'""""""\n', b'{"_1":"\\"\\""}\n'),
('"', '"', b'"col1",col2,col3\n', b'{"_1":"col1","_2":"col2","_3":"col3"}\n'),
('"', '"', b'"col1,col2,col3"\n', b'{"_1":"col1,col2,col3"}\n'),
('\'', '"', b'"col1",col2,col3\n', b'{"_1":"\\"col1\\"","_2":"col2","_3":"col3"}\n'),
('', '"', b'"col1",col2,col3\n', b'{"_1":"\\"col1\\"","_2":"col2","_3":"col3"}\n'),
('', '"', b'"col1",col2,col3\n', b'{"_1":"\\"col1\\"","_2":"col2","_3":"col3"}\n'),
('', '"', b'"col1","col2","col3"\n', b'{"_1":"\\"col1\\"","_2":"\\"col2\\"","_3":"\\"col3\\""}\n'),
('"', '"', b'""""""\n', b'{"_1":"\\"\\""}\n'),
('"', '"', b'A",B\n', b'{"_1":"A\\"","_2":"B"}\n'),
('"', '"', b'A"",B\n', b'{"_1":"A\\"\\"","_2":"B"}\n'),
('"', '\\', b'A\\B,C\n', b'{"_1":"A\\\\B","_2":"C"}\n'),
('"', '"', b'"A""B","CD"\n', b'{"_1":"A\\"B","_2":"CD"}\n'),
('"', '\\', b'"A\\B","CD"\n', b'{"_1":"AB","_2":"CD"}\n'),
('"', '\\', b'"A\\,","CD"\n', b'{"_1":"A,","_2":"CD"}\n'),
('"', '\\', b'"A\\"B","CD"\n', b'{"_1":"A\\"B","_2":"CD"}\n'),
('"', '\\', b'"A\\""\n', b'{"_1":"A\\""}\n'),
('"', '\\', b'"A\\"\\"B"\n', b'{"_1":"A\\"\\"B"}\n'),
('"', '\\', b'"A\\"","\\"B"\n', b'{"_1":"A\\"","_2":"\\"B"}\n'),
]
try:
client.make_bucket(bucket_name)
for idx, (quote_char, object_content, expected_output) in enumerate(tests):
for idx, (quote_char, escape_char, object_content, expected_output) in enumerate(tests):
options = SelectObjectOptions(
expression="select * from s3object",
input_serialization=InputSerialization(
@ -142,14 +152,14 @@ def test_csv_input_quote_char(client, log_output):
RecordDelimiter="\n",
FieldDelimiter=",",
QuoteCharacter=quote_char,
QuoteEscapeCharacter=quote_char,
QuoteEscapeCharacter=escape_char,
Comments="#",
AllowQuotedRecordDelimiter="FALSE",),
),
output_serialization=OutputSerialization(
json = JsonOutput(
RecordDelimiter="\n",
)
RecordDelimiter="\n",
)
),
request_progress=RequestProgress(
enabled="False"
@ -180,24 +190,32 @@ def test_csv_input_quote_char(client, log_output):
# Test passes
print(log_output.json_report())
def test_csv_output_quote_char(client, log_output):
def test_csv_output_custom_quote_char(client, log_output):
# Get a unique bucket_name and object_name
log_output.args['bucket_name'] = bucket_name = generate_bucket_name()
tests = [
# UTF-8 quote character
("''", b'col1,col2,col3\n', Exception()),
("'", b'col1,col2,col3\n', b"'col1','col2','col3'\n"),
("", b'col1,col2,col3\n', b'\x00col1\x00,\x00col2\x00,\x00col3\x00\n'),
('"', b'col1,col2,col3\n', b'"col1","col2","col3"\n'),
('"', b'col"1,col2,col3\n', b'"col""1","col2","col3"\n'),
('"', b'\n', b''),
("''", "''", b'col1,col2,col3\n', Exception()),
("'", "'", b'col1,col2,col3\n', b"'col1','col2','col3'\n"),
("", '"', b'col1,col2,col3\n', b'\x00col1\x00,\x00col2\x00,\x00col3\x00\n'),
('"', '"', b'col1,col2,col3\n', b'"col1","col2","col3"\n'),
('"', '"', b'col"1,col2,col3\n', b'"col""1","col2","col3"\n'),
('"', '"', b'""""\n', b'""""\n'),
('"', '"', b'\n', b''),
("'", "\\", b'col1,col2,col3\n', b"'col1','col2','col3'\n"),
("'", "\\", b'col""1,col2,col3\n', b"'col\"\"1','col2','col3'\n"),
("'", "\\", b'col\'1,col2,col3\n', b"'col\\'1','col2','col3'\n"),
("'", "\\", b'"col\'1","col2","col3"\n', b"'col\\'1','col2','col3'\n"),
("'", "\\", b'col\'\n', b"'col\\''\n"),
# Two consecutive escaped quotes
("'", "\\", b'"a"""""\n', b"'a\"\"'\n"),
]
try:
client.make_bucket(bucket_name)
for idx, (quote_char, object_content, expected_output) in enumerate(tests):
for idx, (quote_char, escape_char, object_content, expected_output) in enumerate(tests):
options = SelectObjectOptions(
expression="select * from s3object",
input_serialization=InputSerialization(
@ -215,7 +233,7 @@ def test_csv_output_quote_char(client, log_output):
RecordDelimiter="\n",
FieldDelimiter=",",
QuoteCharacter=quote_char,
QuoteEscapeCharacter=quote_char,)
QuoteEscapeCharacter=escape_char,)
),
request_progress=RequestProgress(
enabled="False"
@ -286,14 +304,13 @@ def main():
secret_key = 'zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG'
secure = True
client = Minio(server_endpoint, access_key, secret_key, secure=secure)
client = Minio(server_endpoint, access_key, secret_key, secure=False)
log_output = LogOutput(client.select_object_content, 'test_csv_input_quote_char')
test_csv_input_quote_char(client, log_output)
test_csv_input_custom_quote_char(client, log_output)
log_output = LogOutput(client.select_object_content, 'test_csv_output_quote_char')
test_csv_output_quote_char(client, log_output)
test_csv_output_custom_quote_char(client, log_output)
except Exception as err:
print(log_output.json_report(err))

@ -113,9 +113,12 @@ type Reader struct {
// or the Unicode replacement character (0xFFFD).
Comma rune
// Quote is the single character used for marking fields limits
// Quote is a single rune used for marking fields limits
Quote []rune
// QuoteEscape is a single rune to escape the quote character
QuoteEscape rune
// Comment, if not 0, is the comment character. Lines beginning with the
// Comment character without preceding whitespace are ignored.
// With leading whitespace the Comment character becomes part of the
@ -173,9 +176,10 @@ type Reader struct {
// NewReader returns a new Reader that reads from r.
func NewReader(r io.Reader) *Reader {
return &Reader{
Comma: ',',
Quote: []rune(`"`),
r: bufio.NewReader(r),
Comma: ',',
Quote: []rune(`"`),
QuoteEscape: '"',
r: bufio.NewReader(r),
}
}
@ -291,6 +295,9 @@ func (r *Reader) readRecord(dst []string) ([]string, error) {
return nil, errRead
}
var quoteEscape = r.QuoteEscape
var quoteEscapeLen = utf8.RuneLen(quoteEscape)
var quote rune
var quoteLen int
if len(r.Quote) > 0 {
@ -339,12 +346,22 @@ parseField:
// Quoted string field
line = line[quoteLen:]
for {
i := bytes.IndexRune(line, quote)
i := bytes.IndexAny(line, string(quote)+string(quoteEscape))
if i >= 0 {
// Hit next quote.
// Hit next quote or escape quote
r.recordBuffer = append(r.recordBuffer, line[:i]...)
line = line[i+quoteLen:]
escape := nextRune(line[i:]) == quoteEscape
if escape {
line = line[i+quoteEscapeLen:]
} else {
line = line[i+quoteLen:]
}
switch rn := nextRune(line); {
case escape && quoteEscape != quote:
r.recordBuffer = append(r.recordBuffer, encodeRune(rn)...)
line = line[utf8.RuneLen(rn):]
case rn == quote:
// `""` sequence (append quote).
r.recordBuffer = append(r.recordBuffer, encodedQuote...)

@ -30,6 +30,7 @@ import (
type Writer struct {
Comma rune // Field delimiter (set to ',' by NewWriter)
Quote rune // Fields quote character
QuoteEscape rune
AlwaysQuote bool // True to quote all fields
UseCRLF bool // True to use \r\n as the line terminator
w *bufio.Writer
@ -38,9 +39,10 @@ type Writer struct {
// NewWriter returns a new Writer that writes to w.
func NewWriter(w io.Writer) *Writer {
return &Writer{
Comma: ',',
Quote: '"',
w: bufio.NewWriter(w),
Comma: ',',
Quote: '"',
QuoteEscape: '"',
w: bufio.NewWriter(w),
}
}
@ -93,7 +95,7 @@ func (w *Writer) Write(record []string) error {
var err error
switch nextRune([]byte(field)) {
case w.Quote:
_, err = w.w.WriteRune(w.Quote)
_, err = w.w.WriteRune(w.QuoteEscape)
if err != nil {
break
}

@ -104,8 +104,15 @@ func (args *ReaderArgs) UnmarshalXML(d *xml.Decoder, start xml.StartElement) (er
return fmt.Errorf("unsupported QuoteCharacter '%v'", s)
}
args.QuoteCharacter = s
// Not supported yet
case "QuoteEscapeCharacter":
switch utf8.RuneCountInString(s) {
case 0:
args.QuoteEscapeCharacter = defaultQuoteEscapeCharacter
case 1:
args.QuoteEscapeCharacter = s
default:
return fmt.Errorf("unsupported QuoteEscapeCharacter '%v'", s)
}
case "Comments":
args.CommentCharacter = s
default:
@ -115,7 +122,6 @@ func (args *ReaderArgs) UnmarshalXML(d *xml.Decoder, start xml.StartElement) (er
}
}
args.QuoteEscapeCharacter = args.QuoteCharacter
args.unmarshaled = true
return nil
}
@ -176,15 +182,21 @@ func (args *WriterArgs) UnmarshalXML(d *xml.Decoder, start xml.StartElement) err
default:
return fmt.Errorf("unsupported QuoteCharacter '%v'", s)
}
// Not supported yet
case "QuoteEscapeCharacter":
switch utf8.RuneCountInString(s) {
case 0:
args.QuoteEscapeCharacter = defaultQuoteEscapeCharacter
case 1:
args.QuoteEscapeCharacter = s
default:
return fmt.Errorf("unsupported QuoteCharacter '%v'", s)
}
default:
return errors.New("unrecognized option")
}
}
}
args.QuoteEscapeCharacter = args.QuoteCharacter
args.unmarshaled = true
return nil
}

@ -299,6 +299,7 @@ func NewReader(readCloser io.ReadCloser, args *ReaderArgs) (*Reader, error) {
// Add the first rune of args.QuoteChracter
ret.Quote = append(ret.Quote, []rune(args.QuoteCharacter)[0])
}
ret.QuoteEscape = []rune(args.QuoteEscapeCharacter)[0]
ret.FieldsPerRecord = -1
// If LazyQuotes is true, a quote may appear in an unquoted field and a
// non-doubled quote may appear in a quoted field.

@ -63,7 +63,13 @@ func TestRead(t *testing.T) {
if err != nil {
break
}
record.WriteCSV(&result, []rune(c.fieldDelimiter)[0], '"', false)
opts := sql.WriteCSVOpts{
FieldDelimiter: []rune(c.fieldDelimiter)[0],
Quote: '"',
QuoteEscape: '"',
AlwaysQuote: false,
}
record.WriteCSV(&result, opts)
result.Truncate(result.Len() - 1)
result.WriteString(c.recordDelimiter)
}
@ -242,8 +248,14 @@ func TestReadExtended(t *testing.T) {
break
}
if fields < 10 {
opts := sql.WriteCSVOpts{
FieldDelimiter: ',',
Quote: '"',
QuoteEscape: '"',
AlwaysQuote: false,
}
// Write with fixed delimiters, newlines.
err := record.WriteCSV(&result, ',', '"', false)
err := record.WriteCSV(&result, opts)
if err != nil {
t.Error(err)
}
@ -453,8 +465,15 @@ func TestReadFailures(t *testing.T) {
if err != nil {
break
}
opts := sql.WriteCSVOpts{
FieldDelimiter: ',',
Quote: '"',
QuoteEscape: '"',
AlwaysQuote: false,
}
// Write with fixed delimiters, newlines.
err := record.WriteCSV(&result, ',', '"', false)
err := record.WriteCSV(&result, opts)
if err != nil {
t.Error(err)
}

@ -92,11 +92,12 @@ func (r *Record) Clone(dst sql.Record) sql.Record {
}
// WriteCSV - encodes to CSV data.
func (r *Record) WriteCSV(writer io.Writer, fieldDelimiter rune, quote rune, alwaysQuote bool) error {
func (r *Record) WriteCSV(writer io.Writer, opts sql.WriteCSVOpts) error {
w := csv.NewWriter(writer)
w.Comma = fieldDelimiter
w.AlwaysQuote = alwaysQuote
w.Quote = quote
w.Comma = opts.FieldDelimiter
w.AlwaysQuote = opts.AlwaysQuote
w.Quote = opts.Quote
w.QuoteEscape = opts.QuoteEscape
if err := w.Write(r.csvRecord); err != nil {
return err
}

@ -108,7 +108,7 @@ func (r *Record) Set(name string, value *sql.Value) (sql.Record, error) {
}
// WriteCSV - encodes to CSV data.
func (r *Record) WriteCSV(writer io.Writer, fieldDelimiter rune, quote rune, alwaysQuote bool) error {
func (r *Record) WriteCSV(writer io.Writer, opts sql.WriteCSVOpts) error {
var csvRecord []string
for _, kv := range r.KVS {
var columnValue string
@ -136,9 +136,10 @@ func (r *Record) WriteCSV(writer io.Writer, fieldDelimiter rune, quote rune, alw
}
w := csv.NewWriter(writer)
w.Comma = fieldDelimiter
w.Quote = quote
w.AlwaysQuote = alwaysQuote
w.Comma = opts.FieldDelimiter
w.Quote = opts.Quote
w.AlwaysQuote = opts.AlwaysQuote
w.QuoteEscape = opts.QuoteEscape
if err := w.Write(csvRecord); err != nil {
return err
}

@ -353,10 +353,13 @@ func (s3Select *S3Select) marshal(buf *bytes.Buffer, record sql.Record) error {
}()
bufioWriter.Reset(buf)
err := record.WriteCSV(bufioWriter,
[]rune(s3Select.Output.CSVArgs.FieldDelimiter)[0],
[]rune(s3Select.Output.CSVArgs.QuoteCharacter)[0],
strings.ToLower(s3Select.Output.CSVArgs.QuoteFields) == "always")
opts := sql.WriteCSVOpts{
FieldDelimiter: []rune(s3Select.Output.CSVArgs.FieldDelimiter)[0],
Quote: []rune(s3Select.Output.CSVArgs.QuoteCharacter)[0],
QuoteEscape: []rune(s3Select.Output.CSVArgs.QuoteEscapeCharacter)[0],
AlwaysQuote: strings.ToLower(s3Select.Output.CSVArgs.QuoteFields) == "always",
}
err := record.WriteCSV(bufioWriter, opts)
if err != nil {
return err
}

@ -25,6 +25,7 @@ import (
"github.com/klauspost/compress/zstd"
"github.com/minio/minio/pkg/s3select/json"
"github.com/minio/minio/pkg/s3select/sql"
"github.com/minio/simdjson-go"
)
@ -131,11 +132,17 @@ func TestNDJSON(t *testing.T) {
t.Error(err)
}
var gotB, wantB bytes.Buffer
err = rec.WriteCSV(&gotB, ',', '"', false)
opts := sql.WriteCSVOpts{
FieldDelimiter: ',',
Quote: '"',
QuoteEscape: '"',
AlwaysQuote: false,
}
err = rec.WriteCSV(&gotB, opts)
if err != nil {
t.Error(err)
}
err = want.WriteCSV(&wantB, ',', '"', false)
err = want.WriteCSV(&wantB, opts)
if err != nil {
t.Error(err)
}

@ -141,7 +141,7 @@ func (r *Record) Set(name string, value *sql.Value) (sql.Record, error) {
}
// WriteCSV - encodes to CSV data.
func (r *Record) WriteCSV(writer io.Writer, fieldDelimiter, quote rune, alwaysQuote bool) error {
func (r *Record) WriteCSV(writer io.Writer, opts sql.WriteCSVOpts) error {
csvRecord := make([]string, 0, 10)
var tmp simdjson.Iter
obj := r.object
@ -173,9 +173,10 @@ allElems:
csvRecord = append(csvRecord, columnValue)
}
w := csv.NewWriter(writer)
w.Comma = fieldDelimiter
w.Quote = quote
w.AlwaysQuote = alwaysQuote
w.Comma = opts.FieldDelimiter
w.Quote = opts.Quote
w.QuoteEscape = opts.QuoteEscape
w.AlwaysQuote = opts.AlwaysQuote
if err := w.Write(csvRecord); err != nil {
return err
}

@ -39,6 +39,14 @@ const (
SelectFmtParquet
)
// WriteCSVOpts - encapsulates options for Select CSV output
type WriteCSVOpts struct {
FieldDelimiter rune
Quote rune
QuoteEscape rune
AlwaysQuote bool
}
// Record - is a type containing columns and their values.
type Record interface {
Get(name string) (*Value, error)
@ -46,7 +54,7 @@ type Record interface {
// Set a value.
// Can return a different record type.
Set(name string, value *Value) (Record, error)
WriteCSV(writer io.Writer, fieldDelimiter, quote rune, alwaysQuote bool) error
WriteCSV(writer io.Writer, opts WriteCSVOpts) error
WriteJSON(writer io.Writer) error
// Clone the record and if possible use the destination provided.

Loading…
Cancel
Save