diff --git a/pkg/s3select/select.go b/pkg/s3select/select.go index 4ddc47f1a..07ac05193 100644 --- a/pkg/s3select/select.go +++ b/pkg/s3select/select.go @@ -392,7 +392,6 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) { } writer := newMessageWriter(w, getProgressFunc) - var inputRecord sql.Record var outputQueue []sql.Record // Create queue based on the type. @@ -433,6 +432,7 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) { } var rec sql.Record +OuterLoop: for { if s3Select.statement.LimitReached() { if !sendRecord() { @@ -469,47 +469,50 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) { break } - if inputRecord, err = s3Select.statement.EvalFrom(s3Select.Input.format, rec); err != nil { + var inputRecords []*sql.Record + if inputRecords, err = s3Select.statement.EvalFrom(s3Select.Input.format, rec); err != nil { break } - if s3Select.statement.IsAggregated() { - if err = s3Select.statement.AggregateRow(inputRecord); err != nil { - break - } - } else { - var outputRecord sql.Record - // We will attempt to reuse the records in the table. - // The type of these should not change. - // The queue should always have at least one entry left for this to work. - outputQueue = outputQueue[:len(outputQueue)+1] - if t := outputQueue[len(outputQueue)-1]; t != nil { - // If the output record is already set, we reuse it. - outputRecord = t - outputRecord.Reset() + for _, inputRecord := range inputRecords { + if s3Select.statement.IsAggregated() { + if err = s3Select.statement.AggregateRow(*inputRecord); err != nil { + break OuterLoop + } } else { - // Create new one - outputRecord = s3Select.outputRecord() - outputQueue[len(outputQueue)-1] = outputRecord - } - outputRecord, err = s3Select.statement.Eval(inputRecord, outputRecord) - if outputRecord == nil || err != nil { - // This should not be written. - // Remove it from the queue. - outputQueue = outputQueue[:len(outputQueue)-1] - if err != nil { - break + var outputRecord sql.Record + // We will attempt to reuse the records in the table. + // The type of these should not change. + // The queue should always have at least one entry left for this to work. + outputQueue = outputQueue[:len(outputQueue)+1] + if t := outputQueue[len(outputQueue)-1]; t != nil { + // If the output record is already set, we reuse it. + outputRecord = t + outputRecord.Reset() + } else { + // Create new one + outputRecord = s3Select.outputRecord() + outputQueue[len(outputQueue)-1] = outputRecord + } + outputRecord, err = s3Select.statement.Eval(*inputRecord, outputRecord) + if outputRecord == nil || err != nil { + // This should not be written. + // Remove it from the queue. + outputQueue = outputQueue[:len(outputQueue)-1] + if err != nil { + break OuterLoop + } + continue } - continue - } - outputQueue[len(outputQueue)-1] = outputRecord - if len(outputQueue) < cap(outputQueue) { - continue - } + outputQueue[len(outputQueue)-1] = outputRecord + if len(outputQueue) < cap(outputQueue) { + continue + } - if !sendRecord() { - break + if !sendRecord() { + break OuterLoop + } } } } diff --git a/pkg/s3select/select_test.go b/pkg/s3select/select_test.go index bbf81e171..b14a17ecd 100644 --- a/pkg/s3select/select_test.go +++ b/pkg/s3select/select_test.go @@ -260,6 +260,68 @@ func TestJSONQueries(t *testing.T) { `), wantResult: `"[""foo"",""bar"",""whatever""]"`, }, + { + name: "document", + query: "", + requestXML: []byte(` + + + select * from s3object[*].elements[*] s where s.element_type = '__elem__merfu' + SQL + + NONE + + DOCUMENT + + + + + + + + FALSE + +`), + withJSON: ` +{ + "name": "small_pdf1.pdf", + "lume_id": "9507193e-572d-4f95-bcf1-e9226d96be65", + "elements": [ + { + "element_type": "__elem__image", + "element_id": "859d09c4-7cf1-4a37-9674-3a7de8b56abc", + "attributes": { + "__attr__image_dpi": 300, + "__attr__image_size": [ + 2550, + 3299 + ], + "__attr__image_index": 1, + "__attr__image_format": "JPEG", + "__attr__file_extension": "jpg", + "__attr__data": null + } + }, + { + "element_type": "__elem__merfu", + "element_id": "d868aefe-ef9a-4be2-b9b2-c9fd89cc43eb", + "attributes": { + "__attr__image_dpi": 300, + "__attr__image_size": [ + 2550, + 3299 + ], + "__attr__image_index": 2, + "__attr__image_format": "JPEG", + "__attr__file_extension": "jpg", + "__attr__data": null + } + } + ], + "data": "asdascasdc1234e123erdasdas" +}`, + wantResult: `{"element_type":"__elem__merfu","element_id":"d868aefe-ef9a-4be2-b9b2-c9fd89cc43eb","attributes":{"__attr__image_dpi":300,"__attr__image_size":[2550,3299],"__attr__image_index":2,"__attr__image_format":"JPEG","__attr__file_extension":"jpg","__attr__data":null}}`, + }, } defRequest := ` diff --git a/pkg/s3select/sql/statement.go b/pkg/s3select/sql/statement.go index e418410b2..5383221b2 100644 --- a/pkg/s3select/sql/statement.go +++ b/pkg/s3select/sql/statement.go @@ -140,9 +140,9 @@ func parseLimit(v *LitValue) (int64, error) { // EvalFrom evaluates the From clause on the input record. It only // applies to JSON input data format (currently). -func (e *SelectStatement) EvalFrom(format string, input Record) (Record, error) { +func (e *SelectStatement) EvalFrom(format string, input Record) ([]*Record, error) { if !e.selectAST.From.HasKeypath() { - return input, nil + return []*Record{&input}, nil } _, rawVal := input.Raw() @@ -160,6 +160,18 @@ func (e *SelectStatement) EvalFrom(format string, input Record) (Record, error) switch v := txedRec.(type) { case jstream.KVS: kvs = v + + case []interface{}: + recs := make([]*Record, len(v)) + for i, val := range v { + tmpRec := input.Clone(nil) + if err = tmpRec.Replace(val); err != nil { + return nil, err + } + recs[i] = &tmpRec + } + return recs, nil + default: kvs = jstream.KVS{jstream.KV{Key: "_1", Value: v}} } @@ -168,7 +180,7 @@ func (e *SelectStatement) EvalFrom(format string, input Record) (Record, error) return nil, err } - return input, nil + return []*Record{&input}, nil case simdjson.Object: txedRec, _, err := jsonpathEval(e.selectAST.From.Table.PathExpr[1:], rec) if err != nil { @@ -181,6 +193,18 @@ func (e *SelectStatement) EvalFrom(format string, input Record) (Record, error) if err != nil { return nil, err } + + case []interface{}: + recs := make([]*Record, len(v)) + for i, val := range v { + tmpRec := input.Clone(nil) + if err = tmpRec.Replace(val); err != nil { + return nil, err + } + recs[i] = &tmpRec + } + return recs, nil + default: input.Reset() input, err = input.Set("_1", &Value{value: v}) @@ -188,7 +212,7 @@ func (e *SelectStatement) EvalFrom(format string, input Record) (Record, error) return nil, err } } - return input, nil + return []*Record{&input}, nil } return nil, errDataSource(errors.New("unexpected non JSON input")) }