diff --git a/pkg/s3select/csv/record.go b/pkg/s3select/csv/record.go index 47ee818d0..358e8e65b 100644 --- a/pkg/s3select/csv/record.go +++ b/pkg/s3select/csv/record.go @@ -19,10 +19,11 @@ package csv import ( "bytes" "encoding/csv" + "encoding/json" "fmt" + "github.com/bcicen/jstream" "github.com/minio/minio/pkg/s3select/sql" - "github.com/tidwall/sjson" ) // Record - is CSV record. @@ -78,20 +79,11 @@ func (r *Record) MarshalCSV(fieldDelimiter rune) ([]byte, error) { // MarshalJSON - encodes to JSON data. func (r *Record) MarshalJSON() ([]byte, error) { - data := "{}" - - var err error - for i := len(r.columnNames) - 1; i >= 0; i-- { - if i >= len(r.csvRecord) { - continue - } - - if data, err = sjson.Set(data, r.columnNames[i], r.csvRecord[i]); err != nil { - return nil, err - } + var kvs jstream.KVS = make([]jstream.KV, len(r.columnNames)) + for i := 0; i < len(r.columnNames); i++ { + kvs[i] = jstream.KV{Key: r.columnNames[i], Value: r.csvRecord[i]} } - - return []byte(data), nil + return json.Marshal(kvs) } // NewRecord - creates new CSV record. diff --git a/pkg/s3select/json/reader.go b/pkg/s3select/json/reader.go index e9d4d1f45..f34d182d9 100644 --- a/pkg/s3select/json/reader.go +++ b/pkg/s3select/json/reader.go @@ -23,7 +23,6 @@ import ( "github.com/minio/minio/pkg/s3select/sql" "github.com/bcicen/jstream" - "github.com/tidwall/sjson" ) // Reader - JSON record reader for S3Select. @@ -50,17 +49,18 @@ func (r *Reader) Read() (sql.Record, error) { if v.ValueType == jstream.Object { data, err = json.Marshal(v.Value) } else { - // To be AWS S3 compatible - // Select for JSON needs to output non-object JSON as single column value - // i.e. a map with `_1` as key and value as the non-object. - data, err = sjson.SetBytes(data, "_1", v.Value) + // To be AWS S3 compatible Select for JSON needs to + // output non-object JSON as single column value + // i.e. a map with `_1` as key and value as the + // non-object. + data, err = json.Marshal(jstream.KVS{jstream.KV{Key: "_1", Value: v.Value}}) } if err != nil { return nil, errJSONParsingError(err) } return &Record{ - data: data, + Data: data, }, nil } diff --git a/pkg/s3select/json/record.go b/pkg/s3select/json/record.go index 087551535..224622d8b 100644 --- a/pkg/s3select/json/record.go +++ b/pkg/s3select/json/record.go @@ -19,22 +19,37 @@ package json import ( "bytes" "encoding/csv" + "encoding/json" + "errors" "fmt" "strings" + "github.com/bcicen/jstream" "github.com/minio/minio/pkg/s3select/sql" "github.com/tidwall/gjson" - "github.com/tidwall/sjson" ) +// RawJSON is a byte-slice that contains valid JSON +type RawJSON []byte + +// MarshalJSON instance for []byte that assumes that byte-slice is +// already serialized JSON +func (b RawJSON) MarshalJSON() ([]byte, error) { + return b, nil +} + // Record - is JSON record. type Record struct { - data []byte + // Used in Get() + Data []byte + + // Used in Set(), Marshal*() + kvs jstream.KVS } // Get - gets the value for a column name. func (r *Record) Get(name string) (*sql.Value, error) { - result := gjson.GetBytes(r.data, name) + result := gjson.GetBytes(r.Data, name) switch result.Type { case gjson.Null: return sql.FromNull(), nil @@ -67,24 +82,33 @@ func (r *Record) Set(name string, value *sql.Value) (err error) { } else if value.IsNull() { v = nil } else if b, ok := value.ToBytes(); ok { - v = string(b) + v = RawJSON(b) } else { return fmt.Errorf("unsupported sql value %v and type %v", value, value.GetTypeString()) } name = strings.Replace(name, "*", "__ALL__", -1) - r.data, err = sjson.SetBytes(r.data, name, v) + r.kvs = append(r.kvs, jstream.KV{Key: name, Value: v}) return err } // MarshalCSV - encodes to CSV data. func (r *Record) MarshalCSV(fieldDelimiter rune) ([]byte, error) { var csvRecord []string - result := gjson.ParseBytes(r.data) - result.ForEach(func(key, value gjson.Result) bool { - csvRecord = append(csvRecord, value.String()) - return true - }) + for _, kv := range r.kvs { + var columnValue string + switch val := kv.Value.(type) { + case bool, float64, int64, string: + columnValue = fmt.Sprintf("%v", val) + case nil: + columnValue = "" + case RawJSON: + columnValue = string([]byte(val)) + default: + return nil, errors.New("Cannot marshal unhandled type") + } + csvRecord = append(csvRecord, columnValue) + } buf := new(bytes.Buffer) w := csv.NewWriter(buf) @@ -103,12 +127,12 @@ func (r *Record) MarshalCSV(fieldDelimiter rune) ([]byte, error) { // MarshalJSON - encodes to JSON data. func (r *Record) MarshalJSON() ([]byte, error) { - return r.data, nil + return json.Marshal(r.kvs) } // NewRecord - creates new empty JSON record. func NewRecord() *Record { return &Record{ - data: []byte("{}"), + Data: []byte("{}"), } } diff --git a/pkg/s3select/parquet/reader.go b/pkg/s3select/parquet/reader.go index 70c0195cb..fde918a1d 100644 --- a/pkg/s3select/parquet/reader.go +++ b/pkg/s3select/parquet/reader.go @@ -17,9 +17,11 @@ package parquet import ( + "encoding/json" "io" - "github.com/minio/minio/pkg/s3select/json" + "github.com/bcicen/jstream" + jsonfmt "github.com/minio/minio/pkg/s3select/json" "github.com/minio/minio/pkg/s3select/sql" parquetgo "github.com/minio/parquet-go" parquetgen "github.com/minio/parquet-go/gen-go/parquet" @@ -42,42 +44,42 @@ func (r *Reader) Read() (rec sql.Record, rerr error) { return nil, err } - record := json.NewRecord() + kvs := jstream.KVS{} f := func(name string, v parquetgo.Value) bool { if v.Value == nil { - if err := record.Set(name, sql.FromNull()); err != nil { - rerr = errParquetParsingError(err) - } - return rerr == nil + kvs = append(kvs, jstream.KV{Key: name, Value: nil}) + return true } - var value *sql.Value + var value interface{} switch v.Type { case parquetgen.Type_BOOLEAN: - value = sql.FromBool(v.Value.(bool)) + value = v.Value.(bool) case parquetgen.Type_INT32: - value = sql.FromInt(int64(v.Value.(int32))) + value = int64(v.Value.(int32)) case parquetgen.Type_INT64: - value = sql.FromInt(int64(v.Value.(int64))) + value = int64(v.Value.(int64)) case parquetgen.Type_FLOAT: - value = sql.FromFloat(float64(v.Value.(float32))) + value = float64(v.Value.(float32)) case parquetgen.Type_DOUBLE: - value = sql.FromFloat(v.Value.(float64)) + value = v.Value.(float64) case parquetgen.Type_INT96, parquetgen.Type_BYTE_ARRAY, parquetgen.Type_FIXED_LEN_BYTE_ARRAY: - value = sql.FromString(string(v.Value.([]byte))) + value = string(v.Value.([]byte)) default: rerr = errParquetParsingError(nil) return false } - if err = record.Set(name, value); err != nil { - rerr = errParquetParsingError(err) - } - return rerr == nil + kvs = append(kvs, jstream.KV{Key: name, Value: value}) + return true } parquetRecord.Range(f) - return record, rerr + data, err := json.Marshal(kvs) + if err != nil { + return nil, err + } + return &jsonfmt.Record{Data: data}, rerr } // Close - closes underlaying readers.