S3Select: Handle array selection in from clause (#9076)

master
Aditya Manthramurthy 5 years ago committed by GitHub
parent 5ab9cc029d
commit cec8cdb35e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      pkg/s3select/select.go
  2. 62
      pkg/s3select/select_test.go
  3. 32
      pkg/s3select/sql/statement.go

@ -392,7 +392,6 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) {
} }
writer := newMessageWriter(w, getProgressFunc) writer := newMessageWriter(w, getProgressFunc)
var inputRecord sql.Record
var outputQueue []sql.Record var outputQueue []sql.Record
// Create queue based on the type. // Create queue based on the type.
@ -433,6 +432,7 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) {
} }
var rec sql.Record var rec sql.Record
OuterLoop:
for { for {
if s3Select.statement.LimitReached() { if s3Select.statement.LimitReached() {
if !sendRecord() { if !sendRecord() {
@ -469,13 +469,15 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) {
break break
} }
if inputRecord, err = s3Select.statement.EvalFrom(s3Select.Input.format, rec); err != nil { var inputRecords []*sql.Record
if inputRecords, err = s3Select.statement.EvalFrom(s3Select.Input.format, rec); err != nil {
break break
} }
for _, inputRecord := range inputRecords {
if s3Select.statement.IsAggregated() { if s3Select.statement.IsAggregated() {
if err = s3Select.statement.AggregateRow(inputRecord); err != nil { if err = s3Select.statement.AggregateRow(*inputRecord); err != nil {
break break OuterLoop
} }
} else { } else {
var outputRecord sql.Record var outputRecord sql.Record
@ -492,13 +494,13 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) {
outputRecord = s3Select.outputRecord() outputRecord = s3Select.outputRecord()
outputQueue[len(outputQueue)-1] = outputRecord outputQueue[len(outputQueue)-1] = outputRecord
} }
outputRecord, err = s3Select.statement.Eval(inputRecord, outputRecord) outputRecord, err = s3Select.statement.Eval(*inputRecord, outputRecord)
if outputRecord == nil || err != nil { if outputRecord == nil || err != nil {
// This should not be written. // This should not be written.
// Remove it from the queue. // Remove it from the queue.
outputQueue = outputQueue[:len(outputQueue)-1] outputQueue = outputQueue[:len(outputQueue)-1]
if err != nil { if err != nil {
break break OuterLoop
} }
continue continue
} }
@ -509,7 +511,8 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) {
} }
if !sendRecord() { if !sendRecord() {
break break OuterLoop
}
} }
} }
} }

@ -260,6 +260,68 @@ func TestJSONQueries(t *testing.T) {
</SelectObjectContentRequest>`), </SelectObjectContentRequest>`),
wantResult: `"[""foo"",""bar"",""whatever""]"`, wantResult: `"[""foo"",""bar"",""whatever""]"`,
}, },
{
name: "document",
query: "",
requestXML: []byte(`
<?xml version="1.0" encoding="UTF-8"?>
<SelectObjectContentRequest>
<Expression>select * from s3object[*].elements[*] s where s.element_type = '__elem__merfu'</Expression>
<ExpressionType>SQL</ExpressionType>
<InputSerialization>
<CompressionType>NONE</CompressionType>
<JSON>
<Type>DOCUMENT</Type>
</JSON>
</InputSerialization>
<OutputSerialization>
<JSON>
</JSON>
</OutputSerialization>
<RequestProgress>
<Enabled>FALSE</Enabled>
</RequestProgress>
</SelectObjectContentRequest>`),
withJSON: `
{
"name": "small_pdf1.pdf",
"lume_id": "9507193e-572d-4f95-bcf1-e9226d96be65",
"elements": [
{
"element_type": "__elem__image",
"element_id": "859d09c4-7cf1-4a37-9674-3a7de8b56abc",
"attributes": {
"__attr__image_dpi": 300,
"__attr__image_size": [
2550,
3299
],
"__attr__image_index": 1,
"__attr__image_format": "JPEG",
"__attr__file_extension": "jpg",
"__attr__data": null
}
},
{
"element_type": "__elem__merfu",
"element_id": "d868aefe-ef9a-4be2-b9b2-c9fd89cc43eb",
"attributes": {
"__attr__image_dpi": 300,
"__attr__image_size": [
2550,
3299
],
"__attr__image_index": 2,
"__attr__image_format": "JPEG",
"__attr__file_extension": "jpg",
"__attr__data": null
}
}
],
"data": "asdascasdc1234e123erdasdas"
}`,
wantResult: `{"element_type":"__elem__merfu","element_id":"d868aefe-ef9a-4be2-b9b2-c9fd89cc43eb","attributes":{"__attr__image_dpi":300,"__attr__image_size":[2550,3299],"__attr__image_index":2,"__attr__image_format":"JPEG","__attr__file_extension":"jpg","__attr__data":null}}`,
},
} }
defRequest := `<?xml version="1.0" encoding="UTF-8"?> defRequest := `<?xml version="1.0" encoding="UTF-8"?>

@ -140,9 +140,9 @@ func parseLimit(v *LitValue) (int64, error) {
// EvalFrom evaluates the From clause on the input record. It only // EvalFrom evaluates the From clause on the input record. It only
// applies to JSON input data format (currently). // applies to JSON input data format (currently).
func (e *SelectStatement) EvalFrom(format string, input Record) (Record, error) { func (e *SelectStatement) EvalFrom(format string, input Record) ([]*Record, error) {
if !e.selectAST.From.HasKeypath() { if !e.selectAST.From.HasKeypath() {
return input, nil return []*Record{&input}, nil
} }
_, rawVal := input.Raw() _, rawVal := input.Raw()
@ -160,6 +160,18 @@ func (e *SelectStatement) EvalFrom(format string, input Record) (Record, error)
switch v := txedRec.(type) { switch v := txedRec.(type) {
case jstream.KVS: case jstream.KVS:
kvs = v kvs = v
case []interface{}:
recs := make([]*Record, len(v))
for i, val := range v {
tmpRec := input.Clone(nil)
if err = tmpRec.Replace(val); err != nil {
return nil, err
}
recs[i] = &tmpRec
}
return recs, nil
default: default:
kvs = jstream.KVS{jstream.KV{Key: "_1", Value: v}} kvs = jstream.KVS{jstream.KV{Key: "_1", Value: v}}
} }
@ -168,7 +180,7 @@ func (e *SelectStatement) EvalFrom(format string, input Record) (Record, error)
return nil, err return nil, err
} }
return input, nil return []*Record{&input}, nil
case simdjson.Object: case simdjson.Object:
txedRec, _, err := jsonpathEval(e.selectAST.From.Table.PathExpr[1:], rec) txedRec, _, err := jsonpathEval(e.selectAST.From.Table.PathExpr[1:], rec)
if err != nil { if err != nil {
@ -181,6 +193,18 @@ func (e *SelectStatement) EvalFrom(format string, input Record) (Record, error)
if err != nil { if err != nil {
return nil, err return nil, err
} }
case []interface{}:
recs := make([]*Record, len(v))
for i, val := range v {
tmpRec := input.Clone(nil)
if err = tmpRec.Replace(val); err != nil {
return nil, err
}
recs[i] = &tmpRec
}
return recs, nil
default: default:
input.Reset() input.Reset()
input, err = input.Set("_1", &Value{value: v}) input, err = input.Set("_1", &Value{value: v})
@ -188,7 +212,7 @@ func (e *SelectStatement) EvalFrom(format string, input Record) (Record, error)
return nil, err return nil, err
} }
} }
return input, nil return []*Record{&input}, nil
} }
return nil, errDataSource(errors.New("unexpected non JSON input")) return nil, errDataSource(errors.New("unexpected non JSON input"))
} }

Loading…
Cancel
Save