From 1c90a6bd495909bfac74e9cc63549509b8c48f3d Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Sat, 9 Nov 2019 20:10:35 +0300 Subject: [PATCH] S3 Select: Convert CSV data to JSON (#8464) --- pkg/s3select/json/record.go | 8 ++- pkg/s3select/select_test.go | 117 +++++++++++++++++++++++++++++++++++- pkg/s3select/sql/value.go | 49 ++++++++++++++- 3 files changed, 169 insertions(+), 5 deletions(-) diff --git a/pkg/s3select/json/record.go b/pkg/s3select/json/record.go index 473bdeba6..37db6fd30 100644 --- a/pkg/s3select/json/record.go +++ b/pkg/s3select/json/record.go @@ -87,7 +87,13 @@ func (r *Record) Set(name string, value *sql.Value) error { } else if value.IsNull() { v = nil } else if b, ok := value.ToBytes(); ok { - v = RawJSON(b) + // This can either be raw json or a CSV value. + // Only treat objects and arrays as JSON. + if len(b) > 0 && (b[0] == '{' || b[0] == '[') { + v = RawJSON(b) + } else { + v = string(b) + } } else if arr, ok := value.ToArray(); ok { v = arr } else { diff --git a/pkg/s3select/select_test.go b/pkg/s3select/select_test.go index 79ab00db7..f583017ee 100644 --- a/pkg/s3select/select_test.go +++ b/pkg/s3select/select_test.go @@ -59,7 +59,7 @@ func TestJSONQueries(t *testing.T) { var testTable = []struct { name string query string - requestXML []byte + requestXML []byte // override request XML wantResult string }{ { @@ -294,6 +294,121 @@ func TestJSONQueries(t *testing.T) { } } +func TestCSVQueries(t *testing.T) { + input := `id,time,num,num2,text +1,2010-01-01T,7867786,4565.908123,"a text, with comma" +2,2017-01-02T03:04Z,-5, 0.765111, +` + var testTable = []struct { + name string + query string + requestXML []byte // override request XML + wantResult string + }{ + { + name: "select-all", + query: `SELECT * from s3object AS s WHERE id = '1'`, + wantResult: `{"id":"1","time":"2010-01-01T","num":"7867786","num2":"4565.908123","text":"a text, with comma"}`, + }, + { + name: "select-all-2", + query: `SELECT * from s3object s WHERE id = 2`, + wantResult: `{"id":"2","time":"2017-01-02T03:04Z","num":"-5","num2":" 0.765111","text":""}`, + }, + { + name: "select-text-convert", + query: `SELECT CAST(text AS STRING) AS text from s3object s WHERE id = 1`, + wantResult: `{"text":"a text, with comma"}`, + }, + { + name: "select-text-direct", + query: `SELECT text from s3object s WHERE id = 1`, + wantResult: `{"text":"a text, with comma"}`, + }, + { + name: "select-time-direct", + query: `SELECT time from s3object s WHERE id = 2`, + wantResult: `{"time":"2017-01-02T03:04Z"}`, + }, + { + name: "select-int-direct", + query: `SELECT num from s3object s WHERE id = 2`, + wantResult: `{"num":"-5"}`, + }, + { + name: "select-float-direct", + query: `SELECT num2 from s3object s WHERE id = 2`, + wantResult: `{"num2":" 0.765111"}`, + }, + { + name: "select-float-by-val", + query: `SELECT num2 from s3object s WHERE num2 = 0.765111`, + wantResult: `{"num2":" 0.765111"}`, + }, + } + + defRequest := ` + + %s + SQL + + NONE + + USE + + + + + + + + FALSE + +` + + for _, testCase := range testTable { + t.Run(testCase.name, func(t *testing.T) { + testReq := testCase.requestXML + if len(testReq) == 0 { + testReq = []byte(fmt.Sprintf(defRequest, testCase.query)) + } + s3Select, err := NewS3Select(bytes.NewReader(testReq)) + if err != nil { + t.Fatal(err) + } + + if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) { + return ioutil.NopCloser(bytes.NewBufferString(input)), nil + }); err != nil { + t.Fatal(err) + } + + w := &testResponseWriter{} + s3Select.Evaluate(w) + s3Select.Close() + resp := http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(bytes.NewReader(w.response)), + ContentLength: int64(len(w.response)), + } + res, err := minio.NewSelectResults(&resp, "testbucket") + if err != nil { + t.Error(err) + return + } + got, err := ioutil.ReadAll(res) + if err != nil { + t.Error(err) + return + } + gotS := strings.TrimSpace(string(got)) + if !reflect.DeepEqual(gotS, testCase.wantResult) { + t.Errorf("received response does not match with expected reply. Query: %s\ngot: %s\nwant:%s", testCase.query, gotS, testCase.wantResult) + } + }) + } +} + func TestCSVInput(t *testing.T) { var testTable = []struct { requestXML []byte diff --git a/pkg/s3select/sql/value.go b/pkg/s3select/sql/value.go index 1b58735c8..657afe1d1 100644 --- a/pkg/s3select/sql/value.go +++ b/pkg/s3select/sql/value.go @@ -25,6 +25,7 @@ import ( "strconv" "strings" "time" + "unicode/utf8" ) var ( @@ -158,13 +159,13 @@ func (v Value) ToFloat() (val float64, ok bool) { return 0, false } -// ToInt converts value to int. +// ToInt returns the value if int. func (v Value) ToInt() (val int64, ok bool) { val, ok = v.value.(int64) return } -// ToString converts value to string. +// ToString returns the value if string. func (v Value) ToString() (val string, ok bool) { val, ok = v.value.(string) return @@ -215,7 +216,7 @@ func (v Value) ToTimestamp() (t time.Time, ok bool) { return } -// ToBytes converts Value to byte-slice. +// ToBytes returns the value if byte-slice. func (v Value) ToBytes() (val []byte, ok bool) { val, ok = v.value.([]byte) return @@ -339,6 +340,48 @@ const ( opIneq = "!=" ) +// InferBytesType will attempt to infer the data type of bytes. +// Will fail if value type is not bytes or it would result in invalid utf8. +// ORDER: int, float, bool, JSON (object or array), timestamp, string +// If the content is valid JSON, the type will still be bytes. +func (v *Value) InferBytesType() (err error) { + b, ok := v.ToBytes() + if !ok { + return fmt.Errorf("InferByteType: Input is not bytes, but %v", v.GetTypeString()) + } + + // Check for numeric inference + if x, ok := v.bytesToInt(); ok { + v.setInt(x) + return nil + } + if x, ok := v.bytesToFloat(); ok { + v.setFloat(x) + return nil + } + if x, ok := v.bytesToBool(); ok { + v.setBool(x) + return nil + } + + asString := strings.TrimSpace(v.bytesToString()) + if len(b) > 0 && + (strings.HasPrefix(asString, "{") || strings.HasPrefix(asString, "[")) { + return nil + } + + if t, err := parseSQLTimestamp(asString); err == nil { + v.setTimestamp(t) + return nil + } + if !utf8.Valid(b) { + return errors.New("value is not valid utf-8") + } + // Fallback to string + v.setString(asString) + return +} + // When numeric types are compared, type promotions could happen. If // values do not have types (e.g. when reading from CSV), for // comparison operations, automatic type conversion happens by trying