From 9902c9baaafd2860e937d65469e32d9227d9278f Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Wed, 1 Apr 2020 23:39:34 +0100 Subject: [PATCH] sql: Add support of escape quote in CSV (#9231) This commit modifies csv parser, a fork of golang csv parser to support a custom quote escape character. The quote escape character is used to escape the quote character when a csv field contains a quote character as part of data. --- mint/run/core/s3select/tests.py | 71 +++++++++++++++++++------------ pkg/csvparser/reader.go | 31 +++++++++++--- pkg/csvparser/writer.go | 10 +++-- pkg/s3select/csv/args.go | 20 +++++++-- pkg/s3select/csv/reader.go | 1 + pkg/s3select/csv/reader_test.go | 25 +++++++++-- pkg/s3select/csv/record.go | 9 ++-- pkg/s3select/json/record.go | 9 ++-- pkg/s3select/select.go | 11 +++-- pkg/s3select/simdj/reader_test.go | 11 ++++- pkg/s3select/simdj/record.go | 9 ++-- pkg/s3select/sql/record.go | 10 ++++- 12 files changed, 153 insertions(+), 64 deletions(-) diff --git a/mint/run/core/s3select/tests.py b/mint/run/core/s3select/tests.py index 145598a7f..5d39cadc0 100644 --- a/mint/run/core/s3select/tests.py +++ b/mint/run/core/s3select/tests.py @@ -111,29 +111,39 @@ def generate_bucket_name(): return "s3select-test-" + uuid.uuid4().__str__() -def test_csv_input_quote_char(client, log_output): +def test_csv_input_custom_quote_char(client, log_output): # Get a unique bucket_name and object_name log_output.args['bucket_name'] = bucket_name = generate_bucket_name() tests = [ # Invalid quote character, should fail - ('""', b'col1,col2,col3\n', Exception()), + ('""', '"', b'col1,col2,col3\n', Exception()), # UTF-8 quote character - ('ع', b'\xd8\xb9col1\xd8\xb9,\xd8\xb9col2\xd8\xb9,\xd8\xb9col3\xd8\xb9\n', b'{"_1":"col1","_2":"col2","_3":"col3"}\n'), + ('ع', '"', b'\xd8\xb9col1\xd8\xb9,\xd8\xb9col2\xd8\xb9,\xd8\xb9col3\xd8\xb9\n', b'{"_1":"col1","_2":"col2","_3":"col3"}\n'), # Only one field is quoted - ('"', b'"col1",col2,col3\n', b'{"_1":"col1","_2":"col2","_3":"col3"}\n'), - ('"', b'"col1,col2,col3"\n', b'{"_1":"col1,col2,col3"}\n'), - ('\'', b'"col1",col2,col3\n', b'{"_1":"\\"col1\\"","_2":"col2","_3":"col3"}\n'), - ('', b'"col1",col2,col3\n', b'{"_1":"\\"col1\\"","_2":"col2","_3":"col3"}\n'), - ('', b'"col1",col2,col3\n', b'{"_1":"\\"col1\\"","_2":"col2","_3":"col3"}\n'), - ('', b'"col1","col2","col3"\n', b'{"_1":"\\"col1\\"","_2":"\\"col2\\"","_3":"\\"col3\\""}\n'), - ('"', b'""""""\n', b'{"_1":"\\"\\""}\n'), + ('"', '"', b'"col1",col2,col3\n', b'{"_1":"col1","_2":"col2","_3":"col3"}\n'), + ('"', '"', b'"col1,col2,col3"\n', b'{"_1":"col1,col2,col3"}\n'), + ('\'', '"', b'"col1",col2,col3\n', b'{"_1":"\\"col1\\"","_2":"col2","_3":"col3"}\n'), + ('', '"', b'"col1",col2,col3\n', b'{"_1":"\\"col1\\"","_2":"col2","_3":"col3"}\n'), + ('', '"', b'"col1",col2,col3\n', b'{"_1":"\\"col1\\"","_2":"col2","_3":"col3"}\n'), + ('', '"', b'"col1","col2","col3"\n', b'{"_1":"\\"col1\\"","_2":"\\"col2\\"","_3":"\\"col3\\""}\n'), + ('"', '"', b'""""""\n', b'{"_1":"\\"\\""}\n'), + ('"', '"', b'A",B\n', b'{"_1":"A\\"","_2":"B"}\n'), + ('"', '"', b'A"",B\n', b'{"_1":"A\\"\\"","_2":"B"}\n'), + ('"', '\\', b'A\\B,C\n', b'{"_1":"A\\\\B","_2":"C"}\n'), + ('"', '"', b'"A""B","CD"\n', b'{"_1":"A\\"B","_2":"CD"}\n'), + ('"', '\\', b'"A\\B","CD"\n', b'{"_1":"AB","_2":"CD"}\n'), + ('"', '\\', b'"A\\,","CD"\n', b'{"_1":"A,","_2":"CD"}\n'), + ('"', '\\', b'"A\\"B","CD"\n', b'{"_1":"A\\"B","_2":"CD"}\n'), + ('"', '\\', b'"A\\""\n', b'{"_1":"A\\""}\n'), + ('"', '\\', b'"A\\"\\"B"\n', b'{"_1":"A\\"\\"B"}\n'), + ('"', '\\', b'"A\\"","\\"B"\n', b'{"_1":"A\\"","_2":"\\"B"}\n'), ] try: client.make_bucket(bucket_name) - for idx, (quote_char, object_content, expected_output) in enumerate(tests): + for idx, (quote_char, escape_char, object_content, expected_output) in enumerate(tests): options = SelectObjectOptions( expression="select * from s3object", input_serialization=InputSerialization( @@ -142,14 +152,14 @@ def test_csv_input_quote_char(client, log_output): RecordDelimiter="\n", FieldDelimiter=",", QuoteCharacter=quote_char, - QuoteEscapeCharacter=quote_char, + QuoteEscapeCharacter=escape_char, Comments="#", AllowQuotedRecordDelimiter="FALSE",), ), output_serialization=OutputSerialization( json = JsonOutput( - RecordDelimiter="\n", - ) + RecordDelimiter="\n", + ) ), request_progress=RequestProgress( enabled="False" @@ -180,24 +190,32 @@ def test_csv_input_quote_char(client, log_output): # Test passes print(log_output.json_report()) -def test_csv_output_quote_char(client, log_output): +def test_csv_output_custom_quote_char(client, log_output): # Get a unique bucket_name and object_name log_output.args['bucket_name'] = bucket_name = generate_bucket_name() tests = [ # UTF-8 quote character - ("''", b'col1,col2,col3\n', Exception()), - ("'", b'col1,col2,col3\n', b"'col1','col2','col3'\n"), - ("", b'col1,col2,col3\n', b'\x00col1\x00,\x00col2\x00,\x00col3\x00\n'), - ('"', b'col1,col2,col3\n', b'"col1","col2","col3"\n'), - ('"', b'col"1,col2,col3\n', b'"col""1","col2","col3"\n'), - ('"', b'\n', b''), + ("''", "''", b'col1,col2,col3\n', Exception()), + ("'", "'", b'col1,col2,col3\n', b"'col1','col2','col3'\n"), + ("", '"', b'col1,col2,col3\n', b'\x00col1\x00,\x00col2\x00,\x00col3\x00\n'), + ('"', '"', b'col1,col2,col3\n', b'"col1","col2","col3"\n'), + ('"', '"', b'col"1,col2,col3\n', b'"col""1","col2","col3"\n'), + ('"', '"', b'""""\n', b'""""\n'), + ('"', '"', b'\n', b''), + ("'", "\\", b'col1,col2,col3\n', b"'col1','col2','col3'\n"), + ("'", "\\", b'col""1,col2,col3\n', b"'col\"\"1','col2','col3'\n"), + ("'", "\\", b'col\'1,col2,col3\n', b"'col\\'1','col2','col3'\n"), + ("'", "\\", b'"col\'1","col2","col3"\n', b"'col\\'1','col2','col3'\n"), + ("'", "\\", b'col\'\n', b"'col\\''\n"), + # Two consecutive escaped quotes + ("'", "\\", b'"a"""""\n', b"'a\"\"'\n"), ] try: client.make_bucket(bucket_name) - for idx, (quote_char, object_content, expected_output) in enumerate(tests): + for idx, (quote_char, escape_char, object_content, expected_output) in enumerate(tests): options = SelectObjectOptions( expression="select * from s3object", input_serialization=InputSerialization( @@ -215,7 +233,7 @@ def test_csv_output_quote_char(client, log_output): RecordDelimiter="\n", FieldDelimiter=",", QuoteCharacter=quote_char, - QuoteEscapeCharacter=quote_char,) + QuoteEscapeCharacter=escape_char,) ), request_progress=RequestProgress( enabled="False" @@ -286,14 +304,13 @@ def main(): secret_key = 'zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG' secure = True - client = Minio(server_endpoint, access_key, secret_key, secure=secure) + client = Minio(server_endpoint, access_key, secret_key, secure=False) log_output = LogOutput(client.select_object_content, 'test_csv_input_quote_char') - test_csv_input_quote_char(client, log_output) + test_csv_input_custom_quote_char(client, log_output) log_output = LogOutput(client.select_object_content, 'test_csv_output_quote_char') - test_csv_output_quote_char(client, log_output) - + test_csv_output_custom_quote_char(client, log_output) except Exception as err: print(log_output.json_report(err)) diff --git a/pkg/csvparser/reader.go b/pkg/csvparser/reader.go index a50ef6dcc..e96334910 100644 --- a/pkg/csvparser/reader.go +++ b/pkg/csvparser/reader.go @@ -113,9 +113,12 @@ type Reader struct { // or the Unicode replacement character (0xFFFD). Comma rune - // Quote is the single character used for marking fields limits + // Quote is a single rune used for marking fields limits Quote []rune + // QuoteEscape is a single rune to escape the quote character + QuoteEscape rune + // Comment, if not 0, is the comment character. Lines beginning with the // Comment character without preceding whitespace are ignored. // With leading whitespace the Comment character becomes part of the @@ -173,9 +176,10 @@ type Reader struct { // NewReader returns a new Reader that reads from r. func NewReader(r io.Reader) *Reader { return &Reader{ - Comma: ',', - Quote: []rune(`"`), - r: bufio.NewReader(r), + Comma: ',', + Quote: []rune(`"`), + QuoteEscape: '"', + r: bufio.NewReader(r), } } @@ -291,6 +295,9 @@ func (r *Reader) readRecord(dst []string) ([]string, error) { return nil, errRead } + var quoteEscape = r.QuoteEscape + var quoteEscapeLen = utf8.RuneLen(quoteEscape) + var quote rune var quoteLen int if len(r.Quote) > 0 { @@ -339,12 +346,22 @@ parseField: // Quoted string field line = line[quoteLen:] for { - i := bytes.IndexRune(line, quote) + i := bytes.IndexAny(line, string(quote)+string(quoteEscape)) if i >= 0 { - // Hit next quote. + // Hit next quote or escape quote r.recordBuffer = append(r.recordBuffer, line[:i]...) - line = line[i+quoteLen:] + + escape := nextRune(line[i:]) == quoteEscape + if escape { + line = line[i+quoteEscapeLen:] + } else { + line = line[i+quoteLen:] + } + switch rn := nextRune(line); { + case escape && quoteEscape != quote: + r.recordBuffer = append(r.recordBuffer, encodeRune(rn)...) + line = line[utf8.RuneLen(rn):] case rn == quote: // `""` sequence (append quote). r.recordBuffer = append(r.recordBuffer, encodedQuote...) diff --git a/pkg/csvparser/writer.go b/pkg/csvparser/writer.go index cdcfc42b4..2376a4769 100644 --- a/pkg/csvparser/writer.go +++ b/pkg/csvparser/writer.go @@ -30,6 +30,7 @@ import ( type Writer struct { Comma rune // Field delimiter (set to ',' by NewWriter) Quote rune // Fields quote character + QuoteEscape rune AlwaysQuote bool // True to quote all fields UseCRLF bool // True to use \r\n as the line terminator w *bufio.Writer @@ -38,9 +39,10 @@ type Writer struct { // NewWriter returns a new Writer that writes to w. func NewWriter(w io.Writer) *Writer { return &Writer{ - Comma: ',', - Quote: '"', - w: bufio.NewWriter(w), + Comma: ',', + Quote: '"', + QuoteEscape: '"', + w: bufio.NewWriter(w), } } @@ -93,7 +95,7 @@ func (w *Writer) Write(record []string) error { var err error switch nextRune([]byte(field)) { case w.Quote: - _, err = w.w.WriteRune(w.Quote) + _, err = w.w.WriteRune(w.QuoteEscape) if err != nil { break } diff --git a/pkg/s3select/csv/args.go b/pkg/s3select/csv/args.go index bd03fcd9f..9c581d062 100644 --- a/pkg/s3select/csv/args.go +++ b/pkg/s3select/csv/args.go @@ -104,8 +104,15 @@ func (args *ReaderArgs) UnmarshalXML(d *xml.Decoder, start xml.StartElement) (er return fmt.Errorf("unsupported QuoteCharacter '%v'", s) } args.QuoteCharacter = s - // Not supported yet case "QuoteEscapeCharacter": + switch utf8.RuneCountInString(s) { + case 0: + args.QuoteEscapeCharacter = defaultQuoteEscapeCharacter + case 1: + args.QuoteEscapeCharacter = s + default: + return fmt.Errorf("unsupported QuoteEscapeCharacter '%v'", s) + } case "Comments": args.CommentCharacter = s default: @@ -115,7 +122,6 @@ func (args *ReaderArgs) UnmarshalXML(d *xml.Decoder, start xml.StartElement) (er } } - args.QuoteEscapeCharacter = args.QuoteCharacter args.unmarshaled = true return nil } @@ -176,15 +182,21 @@ func (args *WriterArgs) UnmarshalXML(d *xml.Decoder, start xml.StartElement) err default: return fmt.Errorf("unsupported QuoteCharacter '%v'", s) } - // Not supported yet case "QuoteEscapeCharacter": + switch utf8.RuneCountInString(s) { + case 0: + args.QuoteEscapeCharacter = defaultQuoteEscapeCharacter + case 1: + args.QuoteEscapeCharacter = s + default: + return fmt.Errorf("unsupported QuoteCharacter '%v'", s) + } default: return errors.New("unrecognized option") } } } - args.QuoteEscapeCharacter = args.QuoteCharacter args.unmarshaled = true return nil } diff --git a/pkg/s3select/csv/reader.go b/pkg/s3select/csv/reader.go index 9d0fc1171..84b5deeb9 100644 --- a/pkg/s3select/csv/reader.go +++ b/pkg/s3select/csv/reader.go @@ -299,6 +299,7 @@ func NewReader(readCloser io.ReadCloser, args *ReaderArgs) (*Reader, error) { // Add the first rune of args.QuoteChracter ret.Quote = append(ret.Quote, []rune(args.QuoteCharacter)[0]) } + ret.QuoteEscape = []rune(args.QuoteEscapeCharacter)[0] ret.FieldsPerRecord = -1 // If LazyQuotes is true, a quote may appear in an unquoted field and a // non-doubled quote may appear in a quoted field. diff --git a/pkg/s3select/csv/reader_test.go b/pkg/s3select/csv/reader_test.go index 9aed957b0..71ae78132 100644 --- a/pkg/s3select/csv/reader_test.go +++ b/pkg/s3select/csv/reader_test.go @@ -63,7 +63,13 @@ func TestRead(t *testing.T) { if err != nil { break } - record.WriteCSV(&result, []rune(c.fieldDelimiter)[0], '"', false) + opts := sql.WriteCSVOpts{ + FieldDelimiter: []rune(c.fieldDelimiter)[0], + Quote: '"', + QuoteEscape: '"', + AlwaysQuote: false, + } + record.WriteCSV(&result, opts) result.Truncate(result.Len() - 1) result.WriteString(c.recordDelimiter) } @@ -242,8 +248,14 @@ func TestReadExtended(t *testing.T) { break } if fields < 10 { + opts := sql.WriteCSVOpts{ + FieldDelimiter: ',', + Quote: '"', + QuoteEscape: '"', + AlwaysQuote: false, + } // Write with fixed delimiters, newlines. - err := record.WriteCSV(&result, ',', '"', false) + err := record.WriteCSV(&result, opts) if err != nil { t.Error(err) } @@ -453,8 +465,15 @@ func TestReadFailures(t *testing.T) { if err != nil { break } + + opts := sql.WriteCSVOpts{ + FieldDelimiter: ',', + Quote: '"', + QuoteEscape: '"', + AlwaysQuote: false, + } // Write with fixed delimiters, newlines. - err := record.WriteCSV(&result, ',', '"', false) + err := record.WriteCSV(&result, opts) if err != nil { t.Error(err) } diff --git a/pkg/s3select/csv/record.go b/pkg/s3select/csv/record.go index 7cda5e4bb..c9f0f842b 100644 --- a/pkg/s3select/csv/record.go +++ b/pkg/s3select/csv/record.go @@ -92,11 +92,12 @@ func (r *Record) Clone(dst sql.Record) sql.Record { } // WriteCSV - encodes to CSV data. -func (r *Record) WriteCSV(writer io.Writer, fieldDelimiter rune, quote rune, alwaysQuote bool) error { +func (r *Record) WriteCSV(writer io.Writer, opts sql.WriteCSVOpts) error { w := csv.NewWriter(writer) - w.Comma = fieldDelimiter - w.AlwaysQuote = alwaysQuote - w.Quote = quote + w.Comma = opts.FieldDelimiter + w.AlwaysQuote = opts.AlwaysQuote + w.Quote = opts.Quote + w.QuoteEscape = opts.QuoteEscape if err := w.Write(r.csvRecord); err != nil { return err } diff --git a/pkg/s3select/json/record.go b/pkg/s3select/json/record.go index 6410a0224..d599466fd 100644 --- a/pkg/s3select/json/record.go +++ b/pkg/s3select/json/record.go @@ -108,7 +108,7 @@ func (r *Record) Set(name string, value *sql.Value) (sql.Record, error) { } // WriteCSV - encodes to CSV data. -func (r *Record) WriteCSV(writer io.Writer, fieldDelimiter rune, quote rune, alwaysQuote bool) error { +func (r *Record) WriteCSV(writer io.Writer, opts sql.WriteCSVOpts) error { var csvRecord []string for _, kv := range r.KVS { var columnValue string @@ -136,9 +136,10 @@ func (r *Record) WriteCSV(writer io.Writer, fieldDelimiter rune, quote rune, alw } w := csv.NewWriter(writer) - w.Comma = fieldDelimiter - w.Quote = quote - w.AlwaysQuote = alwaysQuote + w.Comma = opts.FieldDelimiter + w.Quote = opts.Quote + w.AlwaysQuote = opts.AlwaysQuote + w.QuoteEscape = opts.QuoteEscape if err := w.Write(csvRecord); err != nil { return err } diff --git a/pkg/s3select/select.go b/pkg/s3select/select.go index 1230d666b..f76e10de6 100644 --- a/pkg/s3select/select.go +++ b/pkg/s3select/select.go @@ -353,10 +353,13 @@ func (s3Select *S3Select) marshal(buf *bytes.Buffer, record sql.Record) error { }() bufioWriter.Reset(buf) - err := record.WriteCSV(bufioWriter, - []rune(s3Select.Output.CSVArgs.FieldDelimiter)[0], - []rune(s3Select.Output.CSVArgs.QuoteCharacter)[0], - strings.ToLower(s3Select.Output.CSVArgs.QuoteFields) == "always") + opts := sql.WriteCSVOpts{ + FieldDelimiter: []rune(s3Select.Output.CSVArgs.FieldDelimiter)[0], + Quote: []rune(s3Select.Output.CSVArgs.QuoteCharacter)[0], + QuoteEscape: []rune(s3Select.Output.CSVArgs.QuoteEscapeCharacter)[0], + AlwaysQuote: strings.ToLower(s3Select.Output.CSVArgs.QuoteFields) == "always", + } + err := record.WriteCSV(bufioWriter, opts) if err != nil { return err } diff --git a/pkg/s3select/simdj/reader_test.go b/pkg/s3select/simdj/reader_test.go index 012beff55..216400300 100644 --- a/pkg/s3select/simdj/reader_test.go +++ b/pkg/s3select/simdj/reader_test.go @@ -25,6 +25,7 @@ import ( "github.com/klauspost/compress/zstd" "github.com/minio/minio/pkg/s3select/json" + "github.com/minio/minio/pkg/s3select/sql" "github.com/minio/simdjson-go" ) @@ -131,11 +132,17 @@ func TestNDJSON(t *testing.T) { t.Error(err) } var gotB, wantB bytes.Buffer - err = rec.WriteCSV(&gotB, ',', '"', false) + opts := sql.WriteCSVOpts{ + FieldDelimiter: ',', + Quote: '"', + QuoteEscape: '"', + AlwaysQuote: false, + } + err = rec.WriteCSV(&gotB, opts) if err != nil { t.Error(err) } - err = want.WriteCSV(&wantB, ',', '"', false) + err = want.WriteCSV(&wantB, opts) if err != nil { t.Error(err) } diff --git a/pkg/s3select/simdj/record.go b/pkg/s3select/simdj/record.go index 38ccafd81..83139a6b2 100644 --- a/pkg/s3select/simdj/record.go +++ b/pkg/s3select/simdj/record.go @@ -141,7 +141,7 @@ func (r *Record) Set(name string, value *sql.Value) (sql.Record, error) { } // WriteCSV - encodes to CSV data. -func (r *Record) WriteCSV(writer io.Writer, fieldDelimiter, quote rune, alwaysQuote bool) error { +func (r *Record) WriteCSV(writer io.Writer, opts sql.WriteCSVOpts) error { csvRecord := make([]string, 0, 10) var tmp simdjson.Iter obj := r.object @@ -173,9 +173,10 @@ allElems: csvRecord = append(csvRecord, columnValue) } w := csv.NewWriter(writer) - w.Comma = fieldDelimiter - w.Quote = quote - w.AlwaysQuote = alwaysQuote + w.Comma = opts.FieldDelimiter + w.Quote = opts.Quote + w.QuoteEscape = opts.QuoteEscape + w.AlwaysQuote = opts.AlwaysQuote if err := w.Write(csvRecord); err != nil { return err } diff --git a/pkg/s3select/sql/record.go b/pkg/s3select/sql/record.go index 8e375765c..ced507067 100644 --- a/pkg/s3select/sql/record.go +++ b/pkg/s3select/sql/record.go @@ -39,6 +39,14 @@ const ( SelectFmtParquet ) +// WriteCSVOpts - encapsulates options for Select CSV output +type WriteCSVOpts struct { + FieldDelimiter rune + Quote rune + QuoteEscape rune + AlwaysQuote bool +} + // Record - is a type containing columns and their values. type Record interface { Get(name string) (*Value, error) @@ -46,7 +54,7 @@ type Record interface { // Set a value. // Can return a different record type. Set(name string, value *Value) (Record, error) - WriteCSV(writer io.Writer, fieldDelimiter, quote rune, alwaysQuote bool) error + WriteCSV(writer io.Writer, opts WriteCSVOpts) error WriteJSON(writer io.Writer) error // Clone the record and if possible use the destination provided.