From e46338692104ae1875b4992564bb4161f6d3b5cf Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Sat, 9 Mar 2019 08:13:37 -0800 Subject: [PATCH] Add JSON Path expression evaluation support (#7315) - Includes support for FROM clause JSON path --- pkg/s3select/csv/record.go | 11 +++ pkg/s3select/json/reader.go | 17 ++--- pkg/s3select/json/record.go | 51 +++++++------- pkg/s3select/parquet/reader.go | 7 +- pkg/s3select/select.go | 6 +- pkg/s3select/sql/errors.go | 13 +++- pkg/s3select/sql/evaluate.go | 42 ++++++++++- pkg/s3select/sql/jsondata/books.json | 84 ++++++++++++++++++++++ pkg/s3select/sql/jsonpath.go | 100 +++++++++++++++++++++++++++ pkg/s3select/sql/jsonpath_test.go | 96 +++++++++++++++++++++++++ pkg/s3select/sql/parser.go | 8 +-- pkg/s3select/sql/parser_test.go | 2 + pkg/s3select/sql/record.go | 22 ++++++ pkg/s3select/sql/statement.go | 65 +++++++++++++++-- pkg/s3select/sql/utils.go | 23 +++++- 15 files changed, 488 insertions(+), 59 deletions(-) create mode 100644 pkg/s3select/sql/jsondata/books.json create mode 100644 pkg/s3select/sql/jsonpath.go create mode 100644 pkg/s3select/sql/jsonpath_test.go diff --git a/pkg/s3select/csv/record.go b/pkg/s3select/csv/record.go index 358e8e65b..813db4688 100644 --- a/pkg/s3select/csv/record.go +++ b/pkg/s3select/csv/record.go @@ -20,6 +20,7 @@ import ( "bytes" "encoding/csv" "encoding/json" + "errors" "fmt" "github.com/bcicen/jstream" @@ -86,6 +87,16 @@ func (r *Record) MarshalJSON() ([]byte, error) { return json.Marshal(kvs) } +// Raw - returns the underlying data with format info. +func (r *Record) Raw() (sql.SelectObjectFormat, interface{}) { + return sql.SelectFmtCSV, r +} + +// Replace - is not supported for CSV +func (r *Record) Replace(_ jstream.KVS) error { + return errors.New("Replace is not supported for CSV") +} + // NewRecord - creates new CSV record. func NewRecord() *Record { return &Record{} diff --git a/pkg/s3select/json/reader.go b/pkg/s3select/json/reader.go index f34d182d9..7a9ef9832 100644 --- a/pkg/s3select/json/reader.go +++ b/pkg/s3select/json/reader.go @@ -17,7 +17,6 @@ package json import ( - "encoding/json" "io" "github.com/minio/minio/pkg/s3select/sql" @@ -43,24 +42,22 @@ func (r *Reader) Read() (sql.Record, error) { return nil, io.EOF } - var data []byte - var err error - + var kvs jstream.KVS if v.ValueType == jstream.Object { - data, err = json.Marshal(v.Value) + // This is a JSON object type (that preserves key + // order) + kvs = v.Value.(jstream.KVS) } else { // To be AWS S3 compatible Select for JSON needs to // output non-object JSON as single column value // i.e. a map with `_1` as key and value as the // non-object. - data, err = json.Marshal(jstream.KVS{jstream.KV{Key: "_1", Value: v.Value}}) - } - if err != nil { - return nil, errJSONParsingError(err) + kvs = jstream.KVS{jstream.KV{Key: "_1", Value: v.Value}} } return &Record{ - Data: data, + KVS: kvs, + SelectFormat: sql.SelectFmtJSON, }, nil } diff --git a/pkg/s3select/json/record.go b/pkg/s3select/json/record.go index 224622d8b..2a394e387 100644 --- a/pkg/s3select/json/record.go +++ b/pkg/s3select/json/record.go @@ -26,7 +26,6 @@ import ( "github.com/bcicen/jstream" "github.com/minio/minio/pkg/s3select/sql" - "github.com/tidwall/gjson" ) // RawJSON is a byte-slice that contains valid JSON @@ -40,34 +39,20 @@ func (b RawJSON) MarshalJSON() ([]byte, error) { // Record - is JSON record. type Record struct { - // Used in Get() - Data []byte - // Used in Set(), Marshal*() - kvs jstream.KVS + KVS jstream.KVS + + SelectFormat sql.SelectObjectFormat } // Get - gets the value for a column name. func (r *Record) Get(name string) (*sql.Value, error) { - result := gjson.GetBytes(r.Data, name) - switch result.Type { - case gjson.Null: - return sql.FromNull(), nil - case gjson.False: - return sql.FromBool(false), nil - case gjson.Number: - return sql.FromFloat(result.Float()), nil - case gjson.String: - return sql.FromString(result.String()), nil - case gjson.True: - return sql.FromBool(true), nil - } - - return nil, fmt.Errorf("unsupported gjson value %v; %v", result, result.Type) + // Get is implemented directly in the sql package. + return nil, errors.New("not implemented here") } // Set - sets the value for a column name. -func (r *Record) Set(name string, value *sql.Value) (err error) { +func (r *Record) Set(name string, value *sql.Value) error { var v interface{} if b, ok := value.ToBool(); ok { v = b @@ -88,14 +73,14 @@ func (r *Record) Set(name string, value *sql.Value) (err error) { } name = strings.Replace(name, "*", "__ALL__", -1) - r.kvs = append(r.kvs, jstream.KV{Key: name, Value: v}) - return err + r.KVS = append(r.KVS, jstream.KV{Key: name, Value: v}) + return nil } // MarshalCSV - encodes to CSV data. func (r *Record) MarshalCSV(fieldDelimiter rune) ([]byte, error) { var csvRecord []string - for _, kv := range r.kvs { + for _, kv := range r.KVS { var columnValue string switch val := kv.Value.(type) { case bool, float64, int64, string: @@ -125,14 +110,26 @@ func (r *Record) MarshalCSV(fieldDelimiter rune) ([]byte, error) { return data[:len(data)-1], nil } +// Raw - returns the underlying representation. +func (r *Record) Raw() (sql.SelectObjectFormat, interface{}) { + return r.SelectFormat, r.KVS +} + // MarshalJSON - encodes to JSON data. func (r *Record) MarshalJSON() ([]byte, error) { - return json.Marshal(r.kvs) + return json.Marshal(r.KVS) +} + +// Replace the underlying buffer of json data. +func (r *Record) Replace(k jstream.KVS) error { + r.KVS = k + return nil } // NewRecord - creates new empty JSON record. -func NewRecord() *Record { +func NewRecord(f sql.SelectObjectFormat) *Record { return &Record{ - Data: []byte("{}"), + KVS: jstream.KVS{}, + SelectFormat: f, } } diff --git a/pkg/s3select/parquet/reader.go b/pkg/s3select/parquet/reader.go index fde918a1d..45d8fec01 100644 --- a/pkg/s3select/parquet/reader.go +++ b/pkg/s3select/parquet/reader.go @@ -17,7 +17,6 @@ package parquet import ( - "encoding/json" "io" "github.com/bcicen/jstream" @@ -75,11 +74,7 @@ func (r *Reader) Read() (rec sql.Record, rerr error) { } parquetRecord.Range(f) - data, err := json.Marshal(kvs) - if err != nil { - return nil, err - } - return &jsonfmt.Record{Data: data}, rerr + return &jsonfmt.Record{KVS: kvs, SelectFormat: sql.SelectFmtParquet}, nil } // Close - closes underlaying readers. diff --git a/pkg/s3select/select.go b/pkg/s3select/select.go index 6247acefd..b6b2c1517 100644 --- a/pkg/s3select/select.go +++ b/pkg/s3select/select.go @@ -246,7 +246,7 @@ func (s3Select *S3Select) outputRecord() sql.Record { case csvFormat: return csv.NewRecord() case jsonFormat: - return json.NewRecord() + return json.NewRecord(sql.SelectFmtJSON) } panic(fmt.Errorf("unknown output format '%v'", s3Select.Output.format)) @@ -391,6 +391,10 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) { break } + if inputRecord, err = s3Select.statement.EvalFrom(s3Select.Input.format, inputRecord); err != nil { + break + } + if s3Select.statement.IsAggregated() { if err = s3Select.statement.AggregateRow(inputRecord); err != nil { break diff --git a/pkg/s3select/sql/errors.go b/pkg/s3select/sql/errors.go index c6b225b63..b31a8f4c9 100644 --- a/pkg/s3select/sql/errors.go +++ b/pkg/s3select/sql/errors.go @@ -16,6 +16,8 @@ package sql +import "fmt" + type s3Error struct { code string message string @@ -91,7 +93,16 @@ func errQueryAnalysisFailure(err error) *s3Error { func errBadTableName(err error) *s3Error { return &s3Error{ code: "BadTableName", - message: "The table name is not supported", + message: fmt.Sprintf("The table name is not supported: %v", err), + statusCode: 400, + cause: err, + } +} + +func errDataSource(err error) *s3Error { + return &s3Error{ + code: "DataSourcePathUnsupported", + message: fmt.Sprintf("Data source: %v", err), statusCode: 400, cause: err, } diff --git a/pkg/s3select/sql/evaluate.go b/pkg/s3select/sql/evaluate.go index f1c1480ab..10f96b018 100644 --- a/pkg/s3select/sql/evaluate.go +++ b/pkg/s3select/sql/evaluate.go @@ -17,8 +17,11 @@ package sql import ( + "encoding/json" "errors" "strings" + + "github.com/bcicen/jstream" ) var ( @@ -319,7 +322,44 @@ func (e *JSONPath) evalNode(r Record) (*Value, error) { if len(ps) == 2 { keypath = ps[1] } - return r.Get(keypath) + objFmt, rawVal := r.Raw() + switch objFmt { + case SelectFmtJSON, SelectFmtParquet: + rowVal := rawVal.(jstream.KVS) + + pathExpr := e.PathExpr + if len(pathExpr) == 0 { + pathExpr = []*JSONPathElement{{Key: &ObjectKey{ID: e.BaseKey}}} + } + + result, err := jsonpathEval(pathExpr, rowVal) + if err != nil { + return nil, err + } + + switch rval := result.(type) { + case string: + return FromString(rval), nil + case float64: + return FromFloat(rval), nil + case int64: + return FromInt(rval), nil + case bool: + return FromBool(rval), nil + case jstream.KVS, []interface{}: + bs, err := json.Marshal(result) + if err != nil { + return nil, err + } + return FromBytes(bs), nil + case nil: + return FromNull(), nil + default: + return nil, errors.New("Unhandled value type") + } + default: + return r.Get(keypath) + } } func (e *PrimaryTerm) evalNode(r Record) (res *Value, err error) { diff --git a/pkg/s3select/sql/jsondata/books.json b/pkg/s3select/sql/jsondata/books.json new file mode 100644 index 000000000..cd2785dc2 --- /dev/null +++ b/pkg/s3select/sql/jsondata/books.json @@ -0,0 +1,84 @@ +{ + "title": "Murder on the Orient Express", + "authorInfo": { + "name": "Agatha Christie", + "yearRange": [1890, 1976], + "penName": "Mary Westmacott" + }, + "genre": "Crime novel", + "publicationHistory": [ + { + "year": 1934, + "publisher": "Collins Crime Club (London)", + "type": "Hardcover", + "pages": 256 + }, + { + "year": 1934, + "publisher": "Dodd Mead and Company (New York)", + "type": "Hardcover", + "pages": 302 + }, + { + "year": 2011, + "publisher": "Harper Collins", + "type": "Paperback", + "pages": 265 + } + ] +} +{ + "title": "The Robots of Dawn", + "authorInfo": { + "name": "Isaac Asimov", + "yearRange": [1920, 1992], + "penName": "Paul French" + }, + "genre": "Science fiction", + "publicationHistory": [ + { + "year": 1983, + "publisher": "Phantasia Press", + "type": "Hardcover", + "pages": 336 + }, + { + "year": 1984, + "publisher": "Granada", + "type": "Hardcover", + "pages": 419 + }, + { + "year": 2018, + "publisher": "Harper Voyager", + "type": "Paperback", + "pages": 432 + } + ] +} +{ + "title": "Pigs Have Wings", + "authorInfo": { + "name": "P. G. Wodehouse", + "yearRange": [1881, 1975] + }, + "genre": "Comic novel", + "publicationHistory": [ + { + "year": 1952, + "publisher": "Doubleday & Company", + "type": "Hardcover" + }, + { + "year": 2000, + "publisher": "Harry N. Abrams", + "type": "Hardcover" + }, + { + "year": 2019, + "publisher": "Ulverscroft Collections", + "type": "Paperback", + "pages": 294 + } + ] +} diff --git a/pkg/s3select/sql/jsonpath.go b/pkg/s3select/sql/jsonpath.go new file mode 100644 index 000000000..67606bdca --- /dev/null +++ b/pkg/s3select/sql/jsonpath.go @@ -0,0 +1,100 @@ +/* + * Minio Cloud Storage, (C) 2019 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sql + +import ( + "errors" + + "github.com/bcicen/jstream" +) + +var ( + errKeyLookup = errors.New("Cannot look up key in non-object value") + errIndexLookup = errors.New("Cannot look up array index in non-array value") + errWildcardObjectLookup = errors.New("Object wildcard used on non-object value") + errWildcardArrayLookup = errors.New("Array wildcard used on non-array value") + errWilcardObjectUsageInvalid = errors.New("Invalid usage of object wildcard") +) + +func jsonpathEval(p []*JSONPathElement, v interface{}) (r interface{}, err error) { + // fmt.Printf("JPATHexpr: %v jsonobj: %v\n\n", p, v) + if len(p) == 0 || v == nil { + return v, nil + } + + switch { + case p[0].Key != nil: + key := p[0].Key.keyString() + + kvs, ok := v.(jstream.KVS) + if !ok { + return nil, errKeyLookup + } + for _, kv := range kvs { + if kv.Key == key { + return jsonpathEval(p[1:], kv.Value) + } + } + // Key not found - return nil result + return nil, nil + + case p[0].Index != nil: + idx := *p[0].Index + + arr, ok := v.([]interface{}) + if !ok { + return nil, errIndexLookup + } + + if idx >= len(arr) { + return nil, nil + } + return jsonpathEval(p[1:], arr[idx]) + + case p[0].ObjectWildcard: + kvs, ok := v.(jstream.KVS) + if !ok { + return nil, errWildcardObjectLookup + } + + if len(p[1:]) > 0 { + return nil, errWilcardObjectUsageInvalid + } + + return kvs, nil + + case p[0].ArrayWildcard: + arr, ok := v.([]interface{}) + if !ok { + return nil, errWildcardArrayLookup + } + + // Lookup remainder of path in each array element and + // make result array. + var result []interface{} + for _, a := range arr { + rval, err := jsonpathEval(p[1:], a) + if err != nil { + return nil, err + } + + result = append(result, rval) + } + return result, nil + } + panic("cannot reach here") +} diff --git a/pkg/s3select/sql/jsonpath_test.go b/pkg/s3select/sql/jsonpath_test.go new file mode 100644 index 000000000..54be9c499 --- /dev/null +++ b/pkg/s3select/sql/jsonpath_test.go @@ -0,0 +1,96 @@ +/* + * Minio Cloud Storage, (C) 2019 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sql + +import ( + "bytes" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "reflect" + "testing" + + "github.com/alecthomas/participle" + "github.com/bcicen/jstream" +) + +func getJSONStructs(b []byte) ([]interface{}, error) { + dec := jstream.NewDecoder(bytes.NewBuffer(b), 0).ObjectAsKVS() + var result []interface{} + for parsedVal := range dec.Stream() { + result = append(result, parsedVal.Value) + } + if err := dec.Err(); err != nil { + return nil, err + } + return result, nil +} + +func TestJsonpathEval(t *testing.T) { + f, err := os.Open(filepath.Join("jsondata", "books.json")) + if err != nil { + t.Fatal(err) + } + + b, err := ioutil.ReadAll(f) + if err != nil { + t.Fatal(err) + } + + p := participle.MustBuild( + &JSONPath{}, + participle.Lexer(sqlLexer), + participle.CaseInsensitive("Keyword"), + ) + cases := []struct { + str string + res []interface{} + }{ + {"s.title", []interface{}{"Murder on the Orient Express", "The Robots of Dawn", "Pigs Have Wings"}}, + {"s.authorInfo.yearRange", []interface{}{[]interface{}{1890.0, 1976.0}, []interface{}{1920.0, 1992.0}, []interface{}{1881.0, 1975.0}}}, + {"s.authorInfo.name", []interface{}{"Agatha Christie", "Isaac Asimov", "P. G. Wodehouse"}}, + {"s.authorInfo.yearRange[0]", []interface{}{1890.0, 1920.0, 1881.0}}, + {"s.publicationHistory[0].pages", []interface{}{256.0, 336.0, nil}}, + } + for i, tc := range cases { + jp := JSONPath{} + err := p.ParseString(tc.str, &jp) + // fmt.Println(jp) + if err != nil { + t.Fatalf("parse failed!: %d %v %s", i, err, tc) + } + + // Read only the first json object from the file + recs, err := getJSONStructs(b) + if err != nil || len(recs) != 3 { + t.Fatalf("%v or length was not 3", err) + } + + for j, rec := range recs { + // fmt.Println(rec) + r, err := jsonpathEval(jp.PathExpr, rec) + if err != nil { + t.Errorf("Error: %d %d %v", i, j, err) + } + if !reflect.DeepEqual(r, tc.res[j]) { + fmt.Printf("%#v (%v) != %v (%v)\n", r, reflect.TypeOf(r), tc.res[j], reflect.TypeOf(tc.res[j])) + t.Errorf("case: %d %d failed", i, j) + } + } + } +} diff --git a/pkg/s3select/sql/parser.go b/pkg/s3select/sql/parser.go index a9ebe6399..29c8dbb49 100644 --- a/pkg/s3select/sql/parser.go +++ b/pkg/s3select/sql/parser.go @@ -57,7 +57,7 @@ type ObjectKey struct { // quoted. type QuotedIdentifier string -// Capture inferface used by participle +// Capture interface used by participle func (qi *QuotedIdentifier) Capture(values []string) error { // Remove enclosing quotes n := len(values[0]) @@ -74,8 +74,8 @@ func (qi *QuotedIdentifier) Capture(values []string) error { type Select struct { Expression *SelectExpression `parser:"\"SELECT\" @@"` From *TableExpression `parser:"\"FROM\" @@"` - Where *Expression `parser:"[ \"WHERE\" @@ ]"` - Limit *LitValue `parser:"[ \"LIMIT\" @@ ]"` + Where *Expression `parser:"( \"WHERE\" @@ )?"` + Limit *LitValue `parser:"( \"LIMIT\" @@ )?"` } // SelectExpression represents the items requested in the select @@ -94,7 +94,7 @@ type TableExpression struct { // JSONPathElement represents a keypath component type JSONPathElement struct { Key *ObjectKey `parser:" @@"` // ['name'] and .name forms - Index *uint64 `parser:"| \"[\" @Number \"]\""` // [3] form + Index *int `parser:"| \"[\" @Number \"]\""` // [3] form ObjectWildcard bool `parser:"| @\".*\""` // .* form ArrayWildcard bool `parser:"| @\"[*]\""` // [*] form } diff --git a/pkg/s3select/sql/parser_test.go b/pkg/s3select/sql/parser_test.go index d71102673..8e96887a3 100644 --- a/pkg/s3select/sql/parser_test.go +++ b/pkg/s3select/sql/parser_test.go @@ -29,6 +29,7 @@ func TestJSONPathElement(t *testing.T) { &JSONPathElement{}, participle.Lexer(sqlLexer), participle.CaseInsensitive("Keyword"), + participle.CaseInsensitive("Timeword"), ) j := JSONPathElement{} @@ -59,6 +60,7 @@ func TestJSONPath(t *testing.T) { &JSONPath{}, participle.Lexer(sqlLexer), participle.CaseInsensitive("Keyword"), + participle.CaseInsensitive("Timeword"), ) j := JSONPath{} diff --git a/pkg/s3select/sql/record.go b/pkg/s3select/sql/record.go index 76e61235b..57f9078e5 100644 --- a/pkg/s3select/sql/record.go +++ b/pkg/s3select/sql/record.go @@ -16,10 +16,32 @@ package sql +import "github.com/bcicen/jstream" + +// SelectObjectFormat specifies the format of the underlying data +type SelectObjectFormat int + +const ( + // SelectFmtUnknown - unknown format (default value) + SelectFmtUnknown SelectObjectFormat = iota + // SelectFmtCSV - CSV format + SelectFmtCSV + // SelectFmtJSON - JSON format + SelectFmtJSON + // SelectFmtParquet - Parquet format + SelectFmtParquet +) + // Record - is a type containing columns and their values. type Record interface { Get(name string) (*Value, error) Set(name string, value *Value) error MarshalCSV(fieldDelimiter rune) ([]byte, error) MarshalJSON() ([]byte, error) + + // Returns underlying representation + Raw() (SelectObjectFormat, interface{}) + + // Replaces the underlying data + Replace(k jstream.KVS) error } diff --git a/pkg/s3select/sql/statement.go b/pkg/s3select/sql/statement.go index bfd06bedb..01a57ff86 100644 --- a/pkg/s3select/sql/statement.go +++ b/pkg/s3select/sql/statement.go @@ -20,12 +20,18 @@ import ( "errors" "fmt" "strings" + + "github.com/bcicen/jstream" ) var ( errBadLimitSpecified = errors.New("Limit value must be a positive integer") ) +const ( + baseTableName = "s3object" +) + // SelectStatement is the top level parsed and analyzed structure type SelectStatement struct { selectAST *Select @@ -74,9 +80,8 @@ func ParseSelectStatement(s string) (stmt SelectStatement, err error) { } // Validate table name - tableString := strings.ToLower(selectAST.From.Table.String()) - if !strings.HasPrefix(tableString, "s3object.") && tableString != "s3object" { - err = errBadTableName(errors.New("Table name must be s3object")) + err = validateTableName(selectAST.From) + if err != nil { return } @@ -89,6 +94,19 @@ func ParseSelectStatement(s string) (stmt SelectStatement, err error) { return } +func validateTableName(from *TableExpression) error { + if strings.ToLower(from.Table.BaseKey.String()) != baseTableName { + return errBadTableName(errors.New("table name must be `s3object`")) + } + + if len(from.Table.PathExpr) > 0 { + if !from.Table.PathExpr[0].ArrayWildcard { + return errBadTableName(errors.New("keypath table name is invalid - please check the service documentation")) + } + } + return nil +} + func parseLimit(v *LitValue) (int64, error) { switch { case v == nil: @@ -104,6 +122,41 @@ func parseLimit(v *LitValue) (int64, error) { } } +// EvalFrom evaluates the From clause on the input record. It only +// applies to JSON input data format (currently). +func (e *SelectStatement) EvalFrom(format string, input Record) (Record, error) { + if e.selectAST.From.HasKeypath() { + if format == "json" { + objFmt, rawVal := input.Raw() + if objFmt != SelectFmtJSON { + return nil, errDataSource(errors.New("unexpected non JSON input")) + } + + jsonRec := rawVal.(jstream.KVS) + txedRec, err := jsonpathEval(e.selectAST.From.Table.PathExpr[1:], jsonRec) + if err != nil { + return nil, err + } + + var kvs jstream.KVS + switch v := txedRec.(type) { + case jstream.KVS: + kvs = v + default: + kvs = jstream.KVS{jstream.KV{Key: "_1", Value: v}} + } + + if err = input.Replace(kvs); err != nil { + return nil, err + } + + return input, nil + } + return nil, errDataSource(errors.New("path not supported")) + } + return input, nil +} + // IsAggregated returns if the statement involves SQL aggregation func (e *SelectStatement) IsAggregated() bool { return e.selectQProp.isAggregation @@ -164,12 +217,10 @@ func (e *SelectStatement) AggregateRow(input Record) error { // applies only to non-aggregation queries. func (e *SelectStatement) Eval(input, output Record) (Record, error) { ok, err := e.isPassingWhereClause(input) - if err != nil { + if err != nil || !ok { + // Either error or row did not pass where clause return nil, err } - if !ok { - return nil, nil - } if e.selectAST.Expression.All { // Return the input record for `SELECT * FROM diff --git a/pkg/s3select/sql/utils.go b/pkg/s3select/sql/utils.go index 0fc5ea3e5..e59605a3c 100644 --- a/pkg/s3select/sql/utils.go +++ b/pkg/s3select/sql/utils.go @@ -62,6 +62,13 @@ func (o *ObjectKey) String() string { return fmt.Sprintf(".%s", o.ID.String()) } +func (o *ObjectKey) keyString() string { + if o.Lit != nil { + return string(*o.Lit) + } + return o.ID.String() +} + // getLastKeypathComponent checks if the given expression is a path // expression, and if so extracts the last dot separated component of // the path. Otherwise it returns false. @@ -81,7 +88,19 @@ func getLastKeypathComponent(e *Expression) (string, bool) { return "", false } - keypath := operand.Left.Left.Primary.JPathExpr.String() - ps := strings.Split(keypath, ".") + // Check if path expression ends in a key + jpath := operand.Left.Left.Primary.JPathExpr + n := len(jpath.PathExpr) + if n > 0 && jpath.PathExpr[n-1].Key == nil { + return "", false + } + + ps := strings.Split(jpath.String(), ".") return ps[len(ps)-1], true } + +// HasKeypath returns if the from clause has a key path - +// e.g. S3object[*].id +func (from *TableExpression) HasKeypath() bool { + return len(from.Table.PathExpr) > 1 +}