Support configurable quote character parameter in Select (#8955)

master
Anis Elleuch 5 years ago committed by GitHub
parent 3ca9f5ffa3
commit 35ecc04223
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      mint/Dockerfile.dev
  2. 18
      mint/build/s3select/install.sh
  3. 21
      mint/run/core/s3select/README.md
  4. 28
      mint/run/core/s3select/run.sh
  5. 304
      mint/run/core/s3select/tests.py
  6. 33
      pkg/csvparser/reader.go
  7. 32
      pkg/csvparser/writer.go
  8. 17
      pkg/csvparser/writer_test.go
  9. 210
      pkg/s3select/csv/args.go
  10. 5
      pkg/s3select/csv/reader.go
  11. 6
      pkg/s3select/csv/reader_test.go
  12. 4
      pkg/s3select/csv/record.go
  13. 6
      pkg/s3select/json/record.go
  14. 5
      pkg/s3select/select.go
  15. 2
      pkg/s3select/select_test.go
  16. 4
      pkg/s3select/simdj/reader_test.go
  17. 7
      pkg/s3select/simdj/record.go
  18. 2
      pkg/s3select/sql/record.go

@ -74,6 +74,9 @@ RUN build/worm/install.sh
COPY build/healthcheck /mint/build/healthcheck COPY build/healthcheck /mint/build/healthcheck
RUN build/healthcheck/install.sh RUN build/healthcheck/install.sh
COPY build/s3select /mint/build/s3select
RUN build/s3select/install.sh
COPY remove-packages.list /mint COPY remove-packages.list /mint
COPY postinstall.sh /mint COPY postinstall.sh /mint
RUN /mint/postinstall.sh RUN /mint/postinstall.sh

@ -0,0 +1,18 @@
#!/bin/bash -e
#
# Mint (C) 2020 Minio, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
python -m pip install minio

@ -0,0 +1,21 @@
## `s3select` tests
This directory serves as the location for Mint tests for s3select features. Top level `mint.sh` calls `run.sh` to execute tests.
## Adding new tests
New tests are added into `s3select/tests.py` as new functions.
## Running tests manually
- Set environment variables `MINT_DATA_DIR`, `MINT_MODE`, `SERVER_ENDPOINT`, `ACCESS_KEY`, `SECRET_KEY`, `SERVER_REGION` and `ENABLE_HTTPS`
- Call `run.sh` with output log file and error log file. for example
```bash
export MINT_DATA_DIR=~/my-mint-dir
export MINT_MODE=core
export SERVER_ENDPOINT="play.min.io"
export ACCESS_KEY="Q3AM3UQ867SPQQA43P2F"
export SECRET_KEY="zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG"
export ENABLE_HTTPS=1
export SERVER_REGION=us-east-1
./run.sh /tmp/output.log /tmp/error.log
```

@ -0,0 +1,28 @@
#!/bin/bash
#
# Mint (C) 2020 Minio, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# handle command line arguments
if [ $# -ne 2 ]; then
echo "usage: run.sh <OUTPUT-LOG-FILE> <ERROR-LOG-FILE>"
exit -1
fi
output_log_file="$1"
error_log_file="$2"
# run path style tests
python "./tests.py" 1>>"$output_log_file" 2>"$error_log_file"

@ -0,0 +1,304 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# MinIO Python Library for Amazon S3 Compatible Cloud Storage,
# (C) 2015-2020 MinIO, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# from __future__ import division
# from __future__ import absolute_import
import os
import io
from sys import exit
import uuid
import inspect
import json
import time
import traceback
from minio import Minio
from minio.select.options import (SelectObjectOptions, CSVInput,
RequestProgress, InputSerialization,
OutputSerialization, CSVOutput, JsonOutput)
class LogOutput(object):
"""
LogOutput is the class for log output. It is required standard for all
SDK tests controlled by mint.
Here are its attributes:
'name': name of the SDK under test, e.g. 's3select'
'function': name of the method/api under test with its signature
The following python code can be used to
pull args information of a <method> and to
put together with the method name:
<method>.__name__+'('+', '.join(args_list)+')'
e.g. 'remove_object(bucket_name, object_name)'
'args': method/api arguments with their values, in
dictionary form: {'arg1': val1, 'arg2': val2, ...}
'duration': duration of the whole test in milliseconds,
defaults to 0
'alert': any extra information user is needed to be alerted about,
like whether this is a Blocker/Gateway/Server related
issue, etc., defaults to None
'message': descriptive error message, defaults to None
'error': stack-trace/exception message(only in case of failure),
actual low level exception/error thrown by the program,
defaults to None
'status': exit status, possible values are 'PASS', 'FAIL', 'NA',
defaults to 'PASS'
"""
PASS = 'PASS'
FAIL = 'FAIL'
NA = 'NA'
def __init__(self, meth, test_name):
self.__args_list = inspect.getargspec(meth).args[1:]
self.__name = 'minio-py:'+test_name
self.__function = meth.__name__+'('+', '.join(self.__args_list)+')'
self.__args = {}
self.__duration = 0
self.__alert = ''
self.__message = None
self.__error = None
self.__status = self.PASS
self.__start_time = time.time()
@property
def name(self): return self.__name
@property
def function(self): return self.__function
@property
def args(self): return self.__args
@name.setter
def name(self, val): self.__name = val
@function.setter
def function(self, val): self.__function = val
@args.setter
def args(self, val): self.__args = val
def json_report(self, err_msg='', alert='', status=''):
self.__args = {k: v for k, v in self.__args.items() if v and v != ''}
entry = {'name': self.__name,
'function': self.__function,
'args': self.__args,
'duration': int(round((time.time() - self.__start_time)*1000)),
'alert': str(alert),
'message': str(err_msg),
'error': traceback.format_exc() if err_msg and err_msg != '' else '',
'status': status if status and status != '' else
self.FAIL if err_msg and err_msg != '' else self.PASS
}
return json.dumps({k: v for k, v in entry.items() if v and v != ''})
def generate_bucket_name():
return "s3select-test-" + uuid.uuid4().__str__()
def test_csv_input_quote_char(client, log_output):
# Get a unique bucket_name and object_name
log_output.args['bucket_name'] = bucket_name = generate_bucket_name()
tests = [
# Invalid quote character, should fail
('""', b'col1,col2,col3\n', Exception()),
# UTF-8 quote character
('ع', b'\xd8\xb9col1\xd8\xb9,\xd8\xb9col2\xd8\xb9,\xd8\xb9col3\xd8\xb9\n', b'{"_1":"col1","_2":"col2","_3":"col3"}\n'),
# Only one field is quoted
('"', b'"col1",col2,col3\n', b'{"_1":"col1","_2":"col2","_3":"col3"}\n'),
('"', b'"col1,col2,col3"\n', b'{"_1":"col1,col2,col3"}\n'),
('\'', b'"col1",col2,col3\n', b'{"_1":"\\"col1\\"","_2":"col2","_3":"col3"}\n'),
('', b'"col1",col2,col3\n', b'{"_1":"\\"col1\\"","_2":"col2","_3":"col3"}\n'),
('', b'"col1",col2,col3\n', b'{"_1":"\\"col1\\"","_2":"col2","_3":"col3"}\n'),
('', b'"col1","col2","col3"\n', b'{"_1":"\\"col1\\"","_2":"\\"col2\\"","_3":"\\"col3\\""}\n'),
('"', b'""""""\n', b'{"_1":"\\"\\""}\n'),
]
try:
client.make_bucket(bucket_name)
for idx, (quote_char, object_content, expected_output) in enumerate(tests):
options = SelectObjectOptions(
expression="select * from s3object",
input_serialization=InputSerialization(
compression_type="NONE",
csv=CSVInput(FileHeaderInfo="NONE",
RecordDelimiter="\n",
FieldDelimiter=",",
QuoteCharacter=quote_char,
QuoteEscapeCharacter=quote_char,
Comments="#",
AllowQuotedRecordDelimiter="FALSE",),
),
output_serialization=OutputSerialization(
json = JsonOutput(
RecordDelimiter="\n",
)
),
request_progress=RequestProgress(
enabled="False"
)
)
got_output = b''
try:
got_output = exec_select(client, bucket_name, object_content, options, log_output)
except Exception as select_err:
if not isinstance(expected_output, Exception):
raise ValueError('Test {} unexpectedly failed with: {}'.format(idx+1, select_err))
else:
if isinstance(expected_output, Exception):
raise ValueError('Test {}: expected an exception, got {}'.format(idx+1, got_output))
if got_output != expected_output:
raise ValueError('Test {}: data mismatch. Expected : {}, Received {}'.format(idx+1, expected_output, got_output))
except Exception as err:
raise Exception(err)
finally:
try:
client.remove_bucket(bucket_name)
except Exception as err:
raise Exception(err)
# Test passes
print(log_output.json_report())
def test_csv_output_quote_char(client, log_output):
# Get a unique bucket_name and object_name
log_output.args['bucket_name'] = bucket_name = generate_bucket_name()
tests = [
# UTF-8 quote character
("''", b'col1,col2,col3\n', Exception()),
("'", b'col1,col2,col3\n', b"'col1','col2','col3'\n"),
("", b'col1,col2,col3\n', b'\x00col1\x00,\x00col2\x00,\x00col3\x00\n'),
('"', b'col1,col2,col3\n', b'"col1","col2","col3"\n'),
('"', b'col"1,col2,col3\n', b'"col""1","col2","col3"\n'),
('"', b'\n', b''),
]
try:
client.make_bucket(bucket_name)
for idx, (quote_char, object_content, expected_output) in enumerate(tests):
options = SelectObjectOptions(
expression="select * from s3object",
input_serialization=InputSerialization(
compression_type="NONE",
csv=CSVInput(FileHeaderInfo="NONE",
RecordDelimiter="\n",
FieldDelimiter=",",
QuoteCharacter='"',
QuoteEscapeCharacter='"',
Comments="#",
AllowQuotedRecordDelimiter="FALSE",),
),
output_serialization=OutputSerialization(
csv=CSVOutput(QuoteFields="ALWAYS",
RecordDelimiter="\n",
FieldDelimiter=",",
QuoteCharacter=quote_char,
QuoteEscapeCharacter=quote_char,)
),
request_progress=RequestProgress(
enabled="False"
)
)
got_output = b''
try:
got_output = exec_select(client, bucket_name, object_content, options, log_output)
except Exception as select_err:
if not isinstance(expected_output, Exception):
raise ValueError('Test {} unexpectedly failed with: {}'.format(idx+1, select_err))
else:
if isinstance(expected_output, Exception):
raise ValueError('Test {}: expected an exception, got {}'.format(idx+1, got_output))
if got_output != expected_output:
raise ValueError('Test {}: data mismatch. Expected : {}. Received: {}.'.format(idx+1, expected_output, got_output))
except Exception as err:
raise Exception(err)
finally:
try:
client.remove_bucket(bucket_name)
except Exception as err:
raise Exception(err)
# Test passes
print(log_output.json_report())
def exec_select(client, bucket_name, object_content, options, log_output):
log_output.args['object_name'] = object_name = uuid.uuid4().__str__()
try:
bytes_content = io.BytesIO(object_content)
client.put_object(bucket_name, object_name, io.BytesIO(object_content), len(object_content))
data = client.select_object_content(bucket_name, object_name, options)
# Get the records
records = io.BytesIO()
for d in data.stream(10*1024):
records.write(d.encode('utf-8'))
return records.getvalue()
except Exception as err:
raise Exception(err)
finally:
try:
client.remove_object(bucket_name, object_name)
except Exception as err:
raise Exception(err)
def main():
"""
Functional testing for S3 select.
"""
try:
access_key = os.getenv('ACCESS_KEY', 'Q3AM3UQ867SPQQA43P2F')
secret_key = os.getenv('SECRET_KEY',
'zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG')
server_endpoint = os.getenv('SERVER_ENDPOINT', 'play.min.io')
secure = os.getenv('ENABLE_HTTPS', '1') == '1'
if server_endpoint == 'play.min.io':
access_key = 'Q3AM3UQ867SPQQA43P2F'
secret_key = 'zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG'
secure = True
client = Minio(server_endpoint, access_key, secret_key, secure=secure)
log_output = LogOutput(client.select_object_content, 'test_csv_input_quote_char')
test_csv_input_quote_char(client, log_output)
log_output = LogOutput(client.select_object_content, 'test_csv_output_quote_char')
test_csv_output_quote_char(client, log_output)
except Exception as err:
print(log_output.json_report(err))
exit(1)
if __name__ == "__main__":
# Execute only if run as a script
main()

@ -113,6 +113,9 @@ type Reader struct {
// or the Unicode replacement character (0xFFFD). // or the Unicode replacement character (0xFFFD).
Comma rune Comma rune
// Quote is the single character used for marking fields limits
Quote []rune
// Comment, if not 0, is the comment character. Lines beginning with the // Comment, if not 0, is the comment character. Lines beginning with the
// Comment character without preceding whitespace are ignored. // Comment character without preceding whitespace are ignored.
// With leading whitespace the Comment character becomes part of the // With leading whitespace the Comment character becomes part of the
@ -171,6 +174,7 @@ type Reader struct {
func NewReader(r io.Reader) *Reader { func NewReader(r io.Reader) *Reader {
return &Reader{ return &Reader{
Comma: ',', Comma: ',',
Quote: []rune(`"`),
r: bufio.NewReader(r), r: bufio.NewReader(r),
} }
} }
@ -255,6 +259,13 @@ func nextRune(b []byte) rune {
return r return r
} }
func encodeRune(r rune) []byte {
rlen := utf8.RuneLen(r)
p := make([]byte, rlen)
_ = utf8.EncodeRune(p, r)
return p
}
func (r *Reader) readRecord(dst []string) ([]string, error) { func (r *Reader) readRecord(dst []string) ([]string, error) {
if r.Comma == r.Comment || !validDelim(r.Comma) || (r.Comment != 0 && !validDelim(r.Comment)) { if r.Comma == r.Comment || !validDelim(r.Comma) || (r.Comment != 0 && !validDelim(r.Comment)) {
return nil, errInvalidDelim return nil, errInvalidDelim
@ -280,9 +291,17 @@ func (r *Reader) readRecord(dst []string) ([]string, error) {
return nil, errRead return nil, errRead
} }
var quote rune
var quoteLen int
if len(r.Quote) > 0 {
quote = r.Quote[0]
quoteLen = utf8.RuneLen(quote)
}
encodedQuote := encodeRune(quote)
// Parse each field in the record. // Parse each field in the record.
var err error var err error
const quoteLen = len(`"`)
commaLen := utf8.RuneLen(r.Comma) commaLen := utf8.RuneLen(r.Comma)
recLine := r.numLine // Starting line for record recLine := r.numLine // Starting line for record
r.recordBuffer = r.recordBuffer[:0] r.recordBuffer = r.recordBuffer[:0]
@ -292,7 +311,7 @@ parseField:
if r.TrimLeadingSpace { if r.TrimLeadingSpace {
line = bytes.TrimLeftFunc(line, unicode.IsSpace) line = bytes.TrimLeftFunc(line, unicode.IsSpace)
} }
if len(line) == 0 || line[0] != '"' { if len(line) == 0 || quoteLen == 0 || nextRune(line) != quote {
// Non-quoted string field // Non-quoted string field
i := bytes.IndexRune(line, r.Comma) i := bytes.IndexRune(line, r.Comma)
field := line field := line
@ -303,7 +322,7 @@ parseField:
} }
// Check to make sure a quote does not appear in field. // Check to make sure a quote does not appear in field.
if !r.LazyQuotes { if !r.LazyQuotes {
if j := bytes.IndexByte(field, '"'); j >= 0 { if j := bytes.IndexRune(field, quote); j >= 0 {
col := utf8.RuneCount(fullLine[:len(fullLine)-len(line[j:])]) col := utf8.RuneCount(fullLine[:len(fullLine)-len(line[j:])])
err = &ParseError{StartLine: recLine, Line: r.numLine, Column: col, Err: ErrBareQuote} err = &ParseError{StartLine: recLine, Line: r.numLine, Column: col, Err: ErrBareQuote}
break parseField break parseField
@ -320,15 +339,15 @@ parseField:
// Quoted string field // Quoted string field
line = line[quoteLen:] line = line[quoteLen:]
for { for {
i := bytes.IndexByte(line, '"') i := bytes.IndexRune(line, quote)
if i >= 0 { if i >= 0 {
// Hit next quote. // Hit next quote.
r.recordBuffer = append(r.recordBuffer, line[:i]...) r.recordBuffer = append(r.recordBuffer, line[:i]...)
line = line[i+quoteLen:] line = line[i+quoteLen:]
switch rn := nextRune(line); { switch rn := nextRune(line); {
case rn == '"': case rn == quote:
// `""` sequence (append quote). // `""` sequence (append quote).
r.recordBuffer = append(r.recordBuffer, '"') r.recordBuffer = append(r.recordBuffer, encodedQuote...)
line = line[quoteLen:] line = line[quoteLen:]
case rn == r.Comma: case rn == r.Comma:
// `",` sequence (end of field). // `",` sequence (end of field).
@ -341,7 +360,7 @@ parseField:
break parseField break parseField
case r.LazyQuotes: case r.LazyQuotes:
// `"` sequence (bare quote). // `"` sequence (bare quote).
r.recordBuffer = append(r.recordBuffer, '"') r.recordBuffer = append(r.recordBuffer, encodedQuote...)
default: default:
// `"*` sequence (invalid non-escaped quote). // `"*` sequence (invalid non-escaped quote).
col := utf8.RuneCount(fullLine[:len(fullLine)-len(line)-quoteLen]) col := utf8.RuneCount(fullLine[:len(fullLine)-len(line)-quoteLen])

@ -28,15 +28,18 @@ import (
// the underlying io.Writer. Any errors that occurred should // the underlying io.Writer. Any errors that occurred should
// be checked by calling the Error method. // be checked by calling the Error method.
type Writer struct { type Writer struct {
Comma rune // Field delimiter (set to ',' by NewWriter) Comma rune // Field delimiter (set to ',' by NewWriter)
UseCRLF bool // True to use \r\n as the line terminator Quote rune // Fields quote character
w *bufio.Writer AlwaysQuote bool // True to quote all fields
UseCRLF bool // True to use \r\n as the line terminator
w *bufio.Writer
} }
// NewWriter returns a new Writer that writes to w. // NewWriter returns a new Writer that writes to w.
func NewWriter(w io.Writer) *Writer { func NewWriter(w io.Writer) *Writer {
return &Writer{ return &Writer{
Comma: ',', Comma: ',',
Quote: '"',
w: bufio.NewWriter(w), w: bufio.NewWriter(w),
} }
} }
@ -59,19 +62,22 @@ func (w *Writer) Write(record []string) error {
// If we don't have to have a quoted field then just // If we don't have to have a quoted field then just
// write out the field and continue to the next field. // write out the field and continue to the next field.
if !w.fieldNeedsQuotes(field) { if !w.AlwaysQuote && !w.fieldNeedsQuotes(field) {
if _, err := w.w.WriteString(field); err != nil { if _, err := w.w.WriteString(field); err != nil {
return err return err
} }
continue continue
} }
if err := w.w.WriteByte('"'); err != nil { if _, err := w.w.WriteRune(w.Quote); err != nil {
return err return err
} }
specialChars := "\r\n" + string(w.Quote)
for len(field) > 0 { for len(field) > 0 {
// Search for special characters. // Search for special characters.
i := strings.IndexAny(field, "\"\r\n") i := strings.IndexAny(field, specialChars)
if i < 0 { if i < 0 {
i = len(field) i = len(field)
} }
@ -85,9 +91,13 @@ func (w *Writer) Write(record []string) error {
// Encode the special character. // Encode the special character.
if len(field) > 0 { if len(field) > 0 {
var err error var err error
switch field[0] { switch nextRune([]byte(field)) {
case '"': case w.Quote:
_, err = w.w.WriteString(`""`) _, err = w.w.WriteRune(w.Quote)
if err != nil {
break
}
_, err = w.w.WriteRune(w.Quote)
case '\r': case '\r':
if !w.UseCRLF { if !w.UseCRLF {
err = w.w.WriteByte('\r') err = w.w.WriteByte('\r')
@ -105,7 +115,7 @@ func (w *Writer) Write(record []string) error {
} }
} }
} }
if err := w.w.WriteByte('"'); err != nil { if _, err := w.w.WriteRune(w.Quote); err != nil {
return err return err
} }
} }
@ -158,7 +168,7 @@ func (w *Writer) fieldNeedsQuotes(field string) bool {
if field == "" { if field == "" {
return false return false
} }
if field == `\.` || strings.ContainsRune(field, w.Comma) || strings.ContainsAny(field, "\"\r\n") { if field == `\.` || strings.ContainsAny(field, "\r\n"+string(w.Quote)+string(w.Comma)) {
return true return true
} }

@ -11,11 +11,13 @@ import (
) )
var writeTests = []struct { var writeTests = []struct {
Input [][]string Input [][]string
Output string Output string
Error error Error error
UseCRLF bool UseCRLF bool
Comma rune Comma rune
Quote rune
AlwaysQuote bool
}{ }{
{Input: [][]string{{"abc"}}, Output: "abc\n"}, {Input: [][]string{{"abc"}}, Output: "abc\n"},
{Input: [][]string{{"abc"}}, Output: "abc\r\n", UseCRLF: true}, {Input: [][]string{{"abc"}}, Output: "abc\r\n", UseCRLF: true},
@ -46,6 +48,7 @@ var writeTests = []struct {
{Input: [][]string{{"a", "a", ""}}, Output: "a|a|\n", Comma: '|'}, {Input: [][]string{{"a", "a", ""}}, Output: "a|a|\n", Comma: '|'},
{Input: [][]string{{",", ",", ""}}, Output: ",|,|\n", Comma: '|'}, {Input: [][]string{{",", ",", ""}}, Output: ",|,|\n", Comma: '|'},
{Input: [][]string{{"foo"}}, Comma: '"', Error: errInvalidDelim}, {Input: [][]string{{"foo"}}, Comma: '"', Error: errInvalidDelim},
{Input: [][]string{{"a", "a", ""}}, Quote: '"', AlwaysQuote: true, Output: "\"a\"|\"a\"|\"\"\n", Comma: '|'},
} }
func TestWrite(t *testing.T) { func TestWrite(t *testing.T) {
@ -56,6 +59,10 @@ func TestWrite(t *testing.T) {
if tt.Comma != 0 { if tt.Comma != 0 {
f.Comma = tt.Comma f.Comma = tt.Comma
} }
if tt.Quote != 0 {
f.Quote = tt.Quote
}
f.AlwaysQuote = tt.AlwaysQuote
err := f.WriteAll(tt.Input) err := f.WriteAll(tt.Input)
if err != tt.Error { if err != tt.Error {
t.Errorf("Unexpected error:\ngot %v\nwant %v", err, tt.Error) t.Errorf("Unexpected error:\ngot %v\nwant %v", err, tt.Error)

@ -18,8 +18,11 @@ package csv
import ( import (
"encoding/xml" "encoding/xml"
"errors"
"fmt" "fmt"
"io"
"strings" "strings"
"unicode/utf8"
) )
const ( const (
@ -55,68 +58,64 @@ func (args *ReaderArgs) IsEmpty() bool {
} }
// UnmarshalXML - decodes XML data. // UnmarshalXML - decodes XML data.
func (args *ReaderArgs) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error { func (args *ReaderArgs) UnmarshalXML(d *xml.Decoder, start xml.StartElement) (err error) {
// Make subtype to avoid recursive UnmarshalXML(). args.FileHeaderInfo = none
type subReaderArgs ReaderArgs args.RecordDelimiter = defaultRecordDelimiter
parsedArgs := subReaderArgs{} args.FieldDelimiter = defaultFieldDelimiter
if err := d.DecodeElement(&parsedArgs, &start); err != nil { args.QuoteCharacter = defaultQuoteCharacter
return err args.QuoteEscapeCharacter = defaultQuoteEscapeCharacter
args.CommentCharacter = defaultCommentCharacter
args.AllowQuotedRecordDelimiter = false
for {
// Read tokens from the XML document in a stream.
t, err := d.Token()
if err != nil {
if err == io.EOF {
break
}
return err
}
switch se := t.(type) {
case xml.StartElement:
tagName := se.Name.Local
switch tagName {
case "AllowQuotedRecordDelimiter":
var b bool
if err = d.DecodeElement(&b, &se); err != nil {
return err
}
args.AllowQuotedRecordDelimiter = b
default:
var s string
if err = d.DecodeElement(&s, &se); err != nil {
return err
}
switch tagName {
case "FileHeaderInfo":
args.FileHeaderInfo = strings.ToLower(s)
case "RecordDelimiter":
args.RecordDelimiter = s
case "FieldDelimiter":
args.FieldDelimiter = s
case "QuoteCharacter":
if utf8.RuneCountInString(s) > 1 {
return fmt.Errorf("unsupported QuoteCharacter '%v'", s)
}
args.QuoteCharacter = s
// Not supported yet
case "QuoteEscapeCharacter":
case "Comments":
args.CommentCharacter = s
default:
return errors.New("unrecognized option")
}
}
}
} }
parsedArgs.FileHeaderInfo = strings.ToLower(parsedArgs.FileHeaderInfo) args.QuoteEscapeCharacter = args.QuoteCharacter
switch parsedArgs.FileHeaderInfo {
case "":
parsedArgs.FileHeaderInfo = none
case none, use, ignore:
default:
return errInvalidFileHeaderInfo(fmt.Errorf("invalid FileHeaderInfo '%v'", parsedArgs.FileHeaderInfo))
}
switch len([]rune(parsedArgs.RecordDelimiter)) {
case 0:
parsedArgs.RecordDelimiter = defaultRecordDelimiter
case 1, 2:
default:
return fmt.Errorf("invalid RecordDelimiter '%v'", parsedArgs.RecordDelimiter)
}
switch len([]rune(parsedArgs.FieldDelimiter)) {
case 0:
parsedArgs.FieldDelimiter = defaultFieldDelimiter
case 1:
default:
return fmt.Errorf("invalid FieldDelimiter '%v'", parsedArgs.FieldDelimiter)
}
switch parsedArgs.QuoteCharacter {
case "":
parsedArgs.QuoteCharacter = defaultQuoteCharacter
case defaultQuoteCharacter:
default:
return fmt.Errorf("unsupported QuoteCharacter '%v'", parsedArgs.QuoteCharacter)
}
switch parsedArgs.QuoteEscapeCharacter {
case "":
parsedArgs.QuoteEscapeCharacter = defaultQuoteEscapeCharacter
case defaultQuoteEscapeCharacter:
default:
return fmt.Errorf("unsupported QuoteEscapeCharacter '%v'", parsedArgs.QuoteEscapeCharacter)
}
switch parsedArgs.CommentCharacter {
case "":
parsedArgs.CommentCharacter = defaultCommentCharacter
case defaultCommentCharacter:
default:
return fmt.Errorf("unsupported Comments '%v'", parsedArgs.CommentCharacter)
}
if parsedArgs.AllowQuotedRecordDelimiter {
return fmt.Errorf("flag AllowQuotedRecordDelimiter is unsupported at the moment")
}
*args = ReaderArgs(parsedArgs)
args.unmarshaled = true args.unmarshaled = true
return nil return nil
} }
@ -138,55 +137,54 @@ func (args *WriterArgs) IsEmpty() bool {
// UnmarshalXML - decodes XML data. // UnmarshalXML - decodes XML data.
func (args *WriterArgs) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error { func (args *WriterArgs) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
// Make subtype to avoid recursive UnmarshalXML().
type subWriterArgs WriterArgs
parsedArgs := subWriterArgs{}
if err := d.DecodeElement(&parsedArgs, &start); err != nil {
return err
}
parsedArgs.QuoteFields = strings.ToLower(parsedArgs.QuoteFields)
switch parsedArgs.QuoteFields {
case "":
parsedArgs.QuoteFields = asneeded
case always, asneeded:
default:
return errInvalidQuoteFields(fmt.Errorf("invalid QuoteFields '%v'", parsedArgs.QuoteFields))
}
switch len([]rune(parsedArgs.RecordDelimiter)) {
case 0:
parsedArgs.RecordDelimiter = defaultRecordDelimiter
case 1, 2:
default:
return fmt.Errorf("invalid RecordDelimiter '%v'", parsedArgs.RecordDelimiter)
}
switch len([]rune(parsedArgs.FieldDelimiter)) {
case 0:
parsedArgs.FieldDelimiter = defaultFieldDelimiter
case 1:
default:
return fmt.Errorf("invalid FieldDelimiter '%v'", parsedArgs.FieldDelimiter)
}
switch parsedArgs.QuoteCharacter {
case "":
parsedArgs.QuoteCharacter = defaultQuoteCharacter
case defaultQuoteCharacter:
default:
return fmt.Errorf("unsupported QuoteCharacter '%v'", parsedArgs.QuoteCharacter)
}
switch parsedArgs.QuoteEscapeCharacter { args.QuoteFields = asneeded
case "": args.RecordDelimiter = defaultRecordDelimiter
parsedArgs.QuoteEscapeCharacter = defaultQuoteEscapeCharacter args.FieldDelimiter = defaultFieldDelimiter
case defaultQuoteEscapeCharacter: args.QuoteCharacter = defaultQuoteCharacter
default: args.QuoteEscapeCharacter = defaultQuoteCharacter
return fmt.Errorf("unsupported QuoteEscapeCharacter '%v'", parsedArgs.QuoteEscapeCharacter)
for {
// Read tokens from the XML document in a stream.
t, err := d.Token()
if err != nil {
if err == io.EOF {
break
}
return err
}
switch se := t.(type) {
case xml.StartElement:
var s string
if err = d.DecodeElement(&s, &se); err != nil {
return err
}
switch se.Name.Local {
case "QuoteFields":
args.QuoteFields = strings.ToLower(s)
case "RecordDelimiter":
args.RecordDelimiter = s
case "FieldDelimiter":
args.FieldDelimiter = s
case "QuoteCharacter":
switch utf8.RuneCountInString(s) {
case 0:
args.QuoteCharacter = "\x00"
case 1:
args.QuoteCharacter = s
default:
return fmt.Errorf("unsupported QuoteCharacter '%v'", s)
}
// Not supported yet
case "QuoteEscapeCharacter":
default:
return errors.New("unrecognized option")
}
}
} }
*args = WriterArgs(parsedArgs) args.QuoteEscapeCharacter = args.QuoteCharacter
args.unmarshaled = true args.unmarshaled = true
return nil return nil
} }

@ -294,6 +294,11 @@ func NewReader(readCloser io.ReadCloser, args *ReaderArgs) (*Reader, error) {
ret := csv.NewReader(r) ret := csv.NewReader(r)
ret.Comma = []rune(args.FieldDelimiter)[0] ret.Comma = []rune(args.FieldDelimiter)[0]
ret.Comment = []rune(args.CommentCharacter)[0] ret.Comment = []rune(args.CommentCharacter)[0]
ret.Quote = []rune{}
if len([]rune(args.QuoteCharacter)) > 0 {
// Add the first rune of args.QuoteChracter
ret.Quote = append(ret.Quote, []rune(args.QuoteCharacter)[0])
}
ret.FieldsPerRecord = -1 ret.FieldsPerRecord = -1
// If LazyQuotes is true, a quote may appear in an unquoted field and a // If LazyQuotes is true, a quote may appear in an unquoted field and a
// non-doubled quote may appear in a quoted field. // non-doubled quote may appear in a quoted field.

@ -63,7 +63,7 @@ func TestRead(t *testing.T) {
if err != nil { if err != nil {
break break
} }
record.WriteCSV(&result, []rune(c.fieldDelimiter)[0]) record.WriteCSV(&result, []rune(c.fieldDelimiter)[0], '"', false)
result.Truncate(result.Len() - 1) result.Truncate(result.Len() - 1)
result.WriteString(c.recordDelimiter) result.WriteString(c.recordDelimiter)
} }
@ -243,7 +243,7 @@ func TestReadExtended(t *testing.T) {
} }
if fields < 10 { if fields < 10 {
// Write with fixed delimiters, newlines. // Write with fixed delimiters, newlines.
err := record.WriteCSV(&result, ',') err := record.WriteCSV(&result, ',', '"', false)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
@ -454,7 +454,7 @@ func TestReadFailures(t *testing.T) {
break break
} }
// Write with fixed delimiters, newlines. // Write with fixed delimiters, newlines.
err := record.WriteCSV(&result, ',') err := record.WriteCSV(&result, ',', '"', false)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }

@ -92,9 +92,11 @@ func (r *Record) Clone(dst sql.Record) sql.Record {
} }
// WriteCSV - encodes to CSV data. // WriteCSV - encodes to CSV data.
func (r *Record) WriteCSV(writer io.Writer, fieldDelimiter rune) error { func (r *Record) WriteCSV(writer io.Writer, fieldDelimiter rune, quote rune, alwaysQuote bool) error {
w := csv.NewWriter(writer) w := csv.NewWriter(writer)
w.Comma = fieldDelimiter w.Comma = fieldDelimiter
w.AlwaysQuote = alwaysQuote
w.Quote = quote
if err := w.Write(r.csvRecord); err != nil { if err := w.Write(r.csvRecord); err != nil {
return err return err
} }

@ -17,7 +17,6 @@
package json package json
import ( import (
"encoding/csv"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -27,6 +26,7 @@ import (
"strings" "strings"
"github.com/bcicen/jstream" "github.com/bcicen/jstream"
csv "github.com/minio/minio/pkg/csvparser"
"github.com/minio/minio/pkg/s3select/sql" "github.com/minio/minio/pkg/s3select/sql"
) )
@ -108,7 +108,7 @@ func (r *Record) Set(name string, value *sql.Value) (sql.Record, error) {
} }
// WriteCSV - encodes to CSV data. // WriteCSV - encodes to CSV data.
func (r *Record) WriteCSV(writer io.Writer, fieldDelimiter rune) error { func (r *Record) WriteCSV(writer io.Writer, fieldDelimiter rune, quote rune, alwaysQuote bool) error {
var csvRecord []string var csvRecord []string
for _, kv := range r.KVS { for _, kv := range r.KVS {
var columnValue string var columnValue string
@ -137,6 +137,8 @@ func (r *Record) WriteCSV(writer io.Writer, fieldDelimiter rune) error {
w := csv.NewWriter(writer) w := csv.NewWriter(writer)
w.Comma = fieldDelimiter w.Comma = fieldDelimiter
w.Quote = quote
w.AlwaysQuote = alwaysQuote
if err := w.Write(csvRecord); err != nil { if err := w.Write(csvRecord); err != nil {
return err return err
} }

@ -353,7 +353,10 @@ func (s3Select *S3Select) marshal(buf *bytes.Buffer, record sql.Record) error {
}() }()
bufioWriter.Reset(buf) bufioWriter.Reset(buf)
err := record.WriteCSV(bufioWriter, []rune(s3Select.Output.CSVArgs.FieldDelimiter)[0]) err := record.WriteCSV(bufioWriter,
[]rune(s3Select.Output.CSVArgs.FieldDelimiter)[0],
[]rune(s3Select.Output.CSVArgs.QuoteCharacter)[0],
strings.ToLower(s3Select.Output.CSVArgs.QuoteFields) == "always")
if err != nil { if err != nil {
return err return err
} }

@ -252,6 +252,7 @@ func TestJSONQueries(t *testing.T) {
</InputSerialization> </InputSerialization>
<OutputSerialization> <OutputSerialization>
<CSV> <CSV>
<QuoteCharacter>"</QuoteCharacter>
</CSV> </CSV>
</OutputSerialization> </OutputSerialization>
<RequestProgress> <RequestProgress>
@ -587,6 +588,7 @@ func TestCSVQueries2(t *testing.T) {
<CompressionType>NONE</CompressionType> <CompressionType>NONE</CompressionType>
<CSV> <CSV>
<FileHeaderInfo>USE</FileHeaderInfo> <FileHeaderInfo>USE</FileHeaderInfo>
<QuoteCharacter>"</QuoteCharacter>
</CSV> </CSV>
</InputSerialization> </InputSerialization>
<OutputSerialization> <OutputSerialization>

@ -131,11 +131,11 @@ func TestNDJSON(t *testing.T) {
t.Error(err) t.Error(err)
} }
var gotB, wantB bytes.Buffer var gotB, wantB bytes.Buffer
err = rec.WriteCSV(&gotB, ',') err = rec.WriteCSV(&gotB, ',', '"', false)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
err = want.WriteCSV(&wantB, ',') err = want.WriteCSV(&wantB, ',', '"', false)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }

@ -17,10 +17,11 @@
package simdj package simdj
import ( import (
"encoding/csv"
"fmt" "fmt"
"io" "io"
csv "github.com/minio/minio/pkg/csvparser"
"github.com/bcicen/jstream" "github.com/bcicen/jstream"
"github.com/minio/minio/pkg/s3select/json" "github.com/minio/minio/pkg/s3select/json"
"github.com/minio/minio/pkg/s3select/sql" "github.com/minio/minio/pkg/s3select/sql"
@ -140,7 +141,7 @@ func (r *Record) Set(name string, value *sql.Value) (sql.Record, error) {
} }
// WriteCSV - encodes to CSV data. // WriteCSV - encodes to CSV data.
func (r *Record) WriteCSV(writer io.Writer, fieldDelimiter rune) error { func (r *Record) WriteCSV(writer io.Writer, fieldDelimiter, quote rune, alwaysQuote bool) error {
csvRecord := make([]string, 0, 10) csvRecord := make([]string, 0, 10)
var tmp simdjson.Iter var tmp simdjson.Iter
obj := r.object obj := r.object
@ -173,6 +174,8 @@ allElems:
} }
w := csv.NewWriter(writer) w := csv.NewWriter(writer)
w.Comma = fieldDelimiter w.Comma = fieldDelimiter
w.Quote = quote
w.AlwaysQuote = alwaysQuote
if err := w.Write(csvRecord); err != nil { if err := w.Write(csvRecord); err != nil {
return err return err
} }

@ -46,7 +46,7 @@ type Record interface {
// Set a value. // Set a value.
// Can return a different record type. // Can return a different record type.
Set(name string, value *Value) (Record, error) Set(name string, value *Value) (Record, error)
WriteCSV(writer io.Writer, fieldDelimiter rune) error WriteCSV(writer io.Writer, fieldDelimiter, quote rune, alwaysQuote bool) error
WriteJSON(writer io.Writer) error WriteJSON(writer io.Writer) error
// Clone the record and if possible use the destination provided. // Clone the record and if possible use the destination provided.

Loading…
Cancel
Save