Support JSON to CSV and CSV to JSON output format conversion (#6910)

This PR implements one of the pending items in issue #6286
in S3 API a user can request CSV output for a JSON document
and a JSON output for a CSV document. This PR refactors
the code a little bit to bring this feature.
master
Harshavardhana 6 years ago committed by kannappanr
parent 313ba74b09
commit 4c7c571875
  1. 12
      cmd/object-handlers.go
  2. 34
      pkg/s3select/format/csv/csv.go
  3. 48
      pkg/s3select/format/json/json.go
  4. 2
      pkg/s3select/format/select.go
  5. 74
      pkg/s3select/input.go
  6. 36
      pkg/s3select/select.go

@ -218,11 +218,13 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r
writeErrorResponse(w, ErrInvalidFileHeaderInfo, r.URL, guessIsBrowserReq(r))
return
}
if selectReq.OutputSerialization.CSV.QuoteFields != s3select.CSVQuoteFieldsAlways &&
selectReq.OutputSerialization.CSV.QuoteFields != s3select.CSVQuoteFieldsAsNeeded &&
selectReq.OutputSerialization.CSV.QuoteFields != "" {
writeErrorResponse(w, ErrInvalidQuoteFields, r.URL, guessIsBrowserReq(r))
return
if selectReq.OutputSerialization.CSV != nil {
if selectReq.OutputSerialization.CSV.QuoteFields != s3select.CSVQuoteFieldsAlways &&
selectReq.OutputSerialization.CSV.QuoteFields != s3select.CSVQuoteFieldsAsNeeded &&
selectReq.OutputSerialization.CSV.QuoteFields != "" {
writeErrorResponse(w, ErrInvalidQuoteFields, r.URL, guessIsBrowserReq(r))
return
}
}
if len(selectReq.InputSerialization.CSV.RecordDelimiter) > 2 {
writeErrorResponse(w, ErrInvalidRequestParameter, r.URL, guessIsBrowserReq(r))

@ -57,9 +57,12 @@ type Options struct {
// SQL expression meant to be evaluated.
Expression string
// What the outputted CSV will be delimited by .
// Output CSV will be delimited by.
OutputFieldDelimiter string
// Output CSV record will be delimited by.
OutputRecordDelimiter string
// Size of incoming object
StreamSize int64
@ -68,6 +71,9 @@ type Options struct {
// Progress enabled, enable/disable progress messages.
Progress bool
// Output format type, supported values are CSV and JSON
OutputType format.Type
}
// cinput represents a record producing input from a formatted object.
@ -147,6 +153,9 @@ func (reader *cinput) readHeader() error {
reader.firstRow = nil
} else {
reader.firstRow, readErr = reader.reader.Read()
if readErr != nil {
return format.ErrCSVParsingError
}
reader.header = make([]string, len(reader.firstRow))
for i := range reader.firstRow {
reader.header[i] = "_" + strconv.Itoa(i)
@ -173,8 +182,13 @@ func (reader *cinput) Read() ([]byte, error) {
if dec != nil {
var data []byte
var err error
for i, value := range dec {
data, err = sjson.SetBytes(data, reader.header[i], value)
// Navigate column values in reverse order to preserve
// the input order for AWS S3 compatibility, because
// sjson adds json key/value pairs in first in last out
// fashion. This should be fixed in sjson ideally. Following
// work around is needed to circumvent this issue for now.
for i := len(dec) - 1; i >= 0; i-- {
data, err = sjson.SetBytes(data, reader.header[i], dec[i])
if err != nil {
return nil, err
}
@ -184,11 +198,16 @@ func (reader *cinput) Read() ([]byte, error) {
return nil, nil
}
// OutputFieldDelimiter - returns the delimiter specified in input request
// OutputFieldDelimiter - returns the requested output field delimiter.
func (reader *cinput) OutputFieldDelimiter() string {
return reader.options.OutputFieldDelimiter
}
// OutputRecordDelimiter - returns the requested output record delimiter.
func (reader *cinput) OutputRecordDelimiter() string {
return reader.options.OutputFieldDelimiter
}
// HasHeader - returns true or false depending upon the header.
func (reader *cinput) HasHeader() bool {
return reader.options.HasHeader
@ -285,11 +304,16 @@ func (reader *cinput) CreateProgressXML() (string, error) {
return xml.Header + string(out), nil
}
// Type - return the data format type {
// Type - return the data format type
func (reader *cinput) Type() format.Type {
return format.CSV
}
// OutputType - return the data format type
func (reader *cinput) OutputType() format.Type {
return reader.options.OutputType
}
// ColNameErrs is a function which makes sure that the headers are requested are
// present in the file otherwise it throws an error.
func (reader *cinput) ColNameErrs(columnNames []string) error {

@ -22,6 +22,7 @@ import (
"io"
"github.com/minio/minio/pkg/s3select/format"
"github.com/tidwall/gjson"
)
// Options options are passed to the underlying encoding/json reader.
@ -40,24 +41,32 @@ type Options struct {
// SQL expression meant to be evaluated.
Expression string
// What the outputted will be delimited by .
// Input record delimiter.
RecordDelimiter string
// Output CSV will be delimited by.
OutputFieldDelimiter string
// Output record delimiter.
OutputRecordDelimiter string
// Size of incoming object
StreamSize int64
// True if Type is DOCUMENTS
Type bool
// True if DocumentType is DOCUMENTS
DocumentType bool
// Progress enabled, enable/disable progress messages.
Progress bool
// Output format type, supported values are CSV and JSON
OutputType format.Type
}
// jinput represents a record producing input from a formatted file or pipe.
type jinput struct {
options *Options
reader *bufio.Reader
firstRow []string
header []string
minOutputLength int
stats struct {
@ -79,7 +88,6 @@ func New(opts *Options) (format.Select, error) {
reader.stats.BytesScanned = opts.StreamSize
reader.stats.BytesProcessed = 0
reader.stats.BytesReturned = 0
return reader, nil
}
@ -95,7 +103,7 @@ func (reader *jinput) UpdateBytesProcessed(size int64) {
// Read the file and returns
func (reader *jinput) Read() ([]byte, error) {
data, err := reader.reader.ReadBytes('\n')
data, _, err := reader.reader.ReadLine()
if err != nil {
if err == io.EOF || err == io.ErrClosedPipe {
err = nil
@ -103,17 +111,32 @@ func (reader *jinput) Read() ([]byte, error) {
err = format.ErrJSONParsingError
}
}
if err == nil {
var header []string
gjson.ParseBytes(data).ForEach(func(key, value gjson.Result) bool {
header = append(header, key.String())
return true
})
reader.header = header
}
return data, err
}
// OutputFieldDelimiter - returns the delimiter specified in input request
// OutputFieldDelimiter - returns the delimiter specified in input request,
// for JSON output this value is empty, but does have a value when
// output type is CSV.
func (reader *jinput) OutputFieldDelimiter() string {
return ","
return reader.options.OutputFieldDelimiter
}
// OutputRecordDelimiter - returns the delimiter specified in input request, after each JSON record.
func (reader *jinput) OutputRecordDelimiter() string {
return reader.options.OutputRecordDelimiter
}
// HasHeader - returns true or false depending upon the header.
func (reader *jinput) HasHeader() bool {
return false
return true
}
// Expression - return the Select Expression for
@ -128,7 +151,7 @@ func (reader *jinput) UpdateBytesReturned(size int64) {
// Header returns a nil in case of
func (reader *jinput) Header() []string {
return nil
return reader.header
}
// CreateStatXML is the function which does the marshaling from the stat
@ -171,6 +194,11 @@ func (reader *jinput) Type() format.Type {
return format.JSON
}
// OutputType - return the data format type {
func (reader *jinput) OutputType() format.Type {
return reader.options.OutputType
}
// ColNameErrs - this is a dummy function for JSON input type.
func (reader *jinput) ColNameErrs(columnNames []string) error {
return nil

@ -22,10 +22,12 @@ import "encoding/xml"
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectSELECTContent.html
type Select interface {
Type() Type
OutputType() Type
Read() ([]byte, error)
Header() []string
HasHeader() bool
OutputFieldDelimiter() string
OutputRecordDelimiter() string
UpdateBytesProcessed(int64)
Expression() string
UpdateBytesReturned(int64)

@ -65,40 +65,60 @@ func New(reader io.Reader, size int64, req ObjectSelectRequest) (s3s format.Sele
// Initializating options for CSV
if req.InputSerialization.CSV != nil {
if req.OutputSerialization.CSV.FieldDelimiter == "" {
req.OutputSerialization.CSV.FieldDelimiter = ","
}
if req.InputSerialization.CSV.FileHeaderInfo == "" {
req.InputSerialization.CSV.FileHeaderInfo = CSVFileHeaderInfoNone
}
if req.InputSerialization.CSV.RecordDelimiter == "" {
req.InputSerialization.CSV.RecordDelimiter = "\n"
}
s3s, err = csv.New(&csv.Options{
HasHeader: req.InputSerialization.CSV.FileHeaderInfo == CSVFileHeaderInfoUse,
RecordDelimiter: req.InputSerialization.CSV.RecordDelimiter,
FieldDelimiter: req.InputSerialization.CSV.FieldDelimiter,
Comments: req.InputSerialization.CSV.Comments,
Name: "S3Object", // Default table name for all objects
ReadFrom: reader,
Compressed: string(req.InputSerialization.CompressionType),
Expression: cleanExpr(req.Expression),
OutputFieldDelimiter: req.OutputSerialization.CSV.FieldDelimiter,
StreamSize: size,
HeaderOpt: req.InputSerialization.CSV.FileHeaderInfo == CSVFileHeaderInfoUse,
Progress: req.RequestProgress.Enabled,
})
options := &csv.Options{
Name: "S3Object", // Default table name for all objects
HasHeader: req.InputSerialization.CSV.FileHeaderInfo == CSVFileHeaderInfoUse,
RecordDelimiter: req.InputSerialization.CSV.RecordDelimiter,
FieldDelimiter: req.InputSerialization.CSV.FieldDelimiter,
Comments: req.InputSerialization.CSV.Comments,
ReadFrom: reader,
Compressed: string(req.InputSerialization.CompressionType),
Expression: cleanExpr(req.Expression),
StreamSize: size,
HeaderOpt: req.InputSerialization.CSV.FileHeaderInfo == CSVFileHeaderInfoUse,
Progress: req.RequestProgress.Enabled,
}
if req.OutputSerialization.CSV != nil {
if req.OutputSerialization.CSV.FieldDelimiter == "" {
req.OutputSerialization.CSV.FieldDelimiter = ","
}
options.OutputFieldDelimiter = req.OutputSerialization.CSV.FieldDelimiter
options.OutputRecordDelimiter = req.OutputSerialization.CSV.RecordDelimiter
options.OutputType = format.CSV
}
if req.OutputSerialization.JSON != nil {
options.OutputRecordDelimiter = req.OutputSerialization.JSON.RecordDelimiter
options.OutputType = format.JSON
}
// Initialize CSV input type
s3s, err = csv.New(options)
} else if req.InputSerialization.JSON != nil {
// Initializating options for JSON
s3s, err = json.New(&json.Options{
Name: "S3Object", // Default table name for all objects
ReadFrom: reader,
Compressed: string(req.InputSerialization.CompressionType),
Expression: cleanExpr(req.Expression),
StreamSize: size,
Type: req.InputSerialization.JSON.Type == JSONTypeDocument,
Progress: req.RequestProgress.Enabled,
})
options := &json.Options{
Name: "S3Object", // Default table name for all objects
ReadFrom: reader,
Compressed: string(req.InputSerialization.CompressionType),
Expression: cleanExpr(req.Expression),
StreamSize: size,
DocumentType: req.InputSerialization.JSON.Type == JSONTypeDocument,
Progress: req.RequestProgress.Enabled,
}
if req.OutputSerialization.JSON != nil {
options.OutputRecordDelimiter = req.OutputSerialization.JSON.RecordDelimiter
options.OutputType = format.JSON
}
if req.OutputSerialization.CSV != nil {
options.OutputFieldDelimiter = req.OutputSerialization.CSV.FieldDelimiter
options.OutputRecordDelimiter = req.OutputSerialization.CSV.RecordDelimiter
options.OutputType = format.CSV
}
// Initialize JSON input type
s3s, err = json.New(options)
}
return s3s, err
}

@ -201,14 +201,19 @@ func processSelectReq(reqColNames []string, alias string, wc sqlparser.Expr, lre
lrecords = math.MaxInt64
}
columnsKv, err := columnsIndex(reqColNames, f)
if err != nil {
rowCh <- Row{
err: err,
var results []string
var columnsKv []columnKv
if f.Type() == format.CSV {
var err error
columnsKv, err = columnsIndex(reqColNames, f)
if err != nil {
rowCh <- Row{
err: err,
}
return
}
return
results = make([]string, len(columnsKv))
}
var results = make([]string, len(columnsKv))
for {
record, err := f.Read()
@ -228,6 +233,19 @@ func processSelectReq(reqColNames []string, alias string, wc sqlparser.Expr, lre
return
}
// For JSON multi-line input type columns needs
// to be handled for each record.
if f.Type() == format.JSON {
columnsKv, err = columnsIndex(reqColNames, f)
if err != nil {
rowCh <- Row{
err: err,
}
return
}
results = make([]string, len(columnsKv))
}
f.UpdateBytesProcessed(int64(len(record)))
// Return in case the number of record reaches the LIMIT
@ -250,17 +268,17 @@ func processSelectReq(reqColNames []string, alias string, wc sqlparser.Expr, lre
if condition {
// if its an asterix we just print everything in the row
if reqColNames[0] == "*" && fnNames[0] == "" {
switch f.Type() {
switch f.OutputType() {
case format.CSV:
for i, kv := range columnsKv {
results[i] = gjson.GetBytes(record, kv.Key).String()
}
rowCh <- Row{
record: strings.Join(results, f.OutputFieldDelimiter()) + "\n",
record: strings.Join(results, f.OutputFieldDelimiter()) + f.OutputRecordDelimiter(),
}
case format.JSON:
rowCh <- Row{
record: string(record) + "\n",
record: string(record) + f.OutputRecordDelimiter(),
}
}
} else if alias != "" {

Loading…
Cancel
Save