speed up the performance of s3select on csv (#7945)

master
Yao Zongyou 5 years ago committed by Harshavardhana
parent fa3546bb03
commit ec9bfd3aef
  1. 28
      pkg/s3select/csv/reader.go
  2. 12
      pkg/s3select/csv/reader_test.go
  3. 22
      pkg/s3select/csv/record.go
  4. 24
      pkg/s3select/json/record.go
  5. 20
      pkg/s3select/message.go
  6. 59
      pkg/s3select/select.go
  7. 30
      pkg/s3select/select_benchmark_test.go
  8. 10
      pkg/s3select/sql/record.go

@ -82,10 +82,11 @@ func (rr *recordReader) Read(p []byte) (n int, err error) {
// Reader - CSV record reader for S3Select. // Reader - CSV record reader for S3Select.
type Reader struct { type Reader struct {
args *ReaderArgs args *ReaderArgs
readCloser io.ReadCloser readCloser io.ReadCloser
csvReader *csv.Reader csvReader *csv.Reader
columnNames []string columnNames []string
nameIndexMap map[string]int64
} }
// Read - reads single record. // Read - reads single record.
@ -99,23 +100,24 @@ func (r *Reader) Read() (sql.Record, error) {
return nil, err return nil, err
} }
columnNames := r.columnNames if r.columnNames == nil {
if columnNames == nil { r.columnNames = make([]string, len(csvRecord))
columnNames = make([]string, len(csvRecord))
for i := range csvRecord { for i := range csvRecord {
columnNames[i] = fmt.Sprintf("_%v", i+1) r.columnNames[i] = fmt.Sprintf("_%v", i+1)
} }
} }
nameIndexMap := make(map[string]int64) if r.nameIndexMap == nil {
for i := range columnNames { r.nameIndexMap = make(map[string]int64)
nameIndexMap[columnNames[i]] = int64(i) for i := range r.columnNames {
r.nameIndexMap[r.columnNames[i]] = int64(i)
}
} }
return &Record{ return &Record{
columnNames: columnNames, columnNames: r.columnNames,
csvRecord: csvRecord, csvRecord: csvRecord,
nameIndexMap: nameIndexMap, nameIndexMap: r.nameIndexMap,
}, nil }, nil
} }

@ -17,6 +17,7 @@
package csv package csv
import ( import (
"bytes"
"io" "io"
"io/ioutil" "io/ioutil"
"strings" "strings"
@ -39,6 +40,7 @@ func TestRead(t *testing.T) {
for i, c := range cases { for i, c := range cases {
var err error var err error
var record sql.Record var record sql.Record
var result bytes.Buffer
r, _ := NewReader(ioutil.NopCloser(strings.NewReader(c.content)), &ReaderArgs{ r, _ := NewReader(ioutil.NopCloser(strings.NewReader(c.content)), &ReaderArgs{
FileHeaderInfo: none, FileHeaderInfo: none,
@ -51,22 +53,22 @@ func TestRead(t *testing.T) {
unmarshaled: true, unmarshaled: true,
}) })
result := ""
for { for {
record, err = r.Read() record, err = r.Read()
if err != nil { if err != nil {
break break
} }
s, _ := record.MarshalCSV([]rune(c.fieldDelimiter)[0]) record.WriteCSV(&result, []rune(c.fieldDelimiter)[0])
result += string(s) + c.recordDelimiter result.Truncate(result.Len() - 1)
result.WriteString(c.recordDelimiter)
} }
r.Close() r.Close()
if err != io.EOF { if err != io.EOF {
t.Fatalf("Case %d failed with %s", i, err) t.Fatalf("Case %d failed with %s", i, err)
} }
if result != c.content { if result.String() != c.content {
t.Errorf("Case %d failed: expected %v result %v", i, c.content, result) t.Errorf("Case %d failed: expected %v result %v", i, c.content, result.String())
} }
} }
} }

@ -17,11 +17,11 @@
package csv package csv
import ( import (
"bytes"
"encoding/csv" "encoding/csv"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io"
"github.com/bcicen/jstream" "github.com/bcicen/jstream"
"github.com/minio/minio/pkg/s3select/sql" "github.com/minio/minio/pkg/s3select/sql"
@ -61,30 +61,28 @@ func (r *Record) Set(name string, value *sql.Value) error {
return nil return nil
} }
// MarshalCSV - encodes to CSV data. // WriteCSV - encodes to CSV data.
func (r *Record) MarshalCSV(fieldDelimiter rune) ([]byte, error) { func (r *Record) WriteCSV(writer io.Writer, fieldDelimiter rune) error {
buf := new(bytes.Buffer) w := csv.NewWriter(writer)
w := csv.NewWriter(buf)
w.Comma = fieldDelimiter w.Comma = fieldDelimiter
if err := w.Write(r.csvRecord); err != nil { if err := w.Write(r.csvRecord); err != nil {
return nil, err return err
} }
w.Flush() w.Flush()
if err := w.Error(); err != nil { if err := w.Error(); err != nil {
return nil, err return err
} }
data := buf.Bytes() return nil
return data[:len(data)-1], nil
} }
// MarshalJSON - encodes to JSON data. // WriteJSON - encodes to JSON data.
func (r *Record) MarshalJSON() ([]byte, error) { func (r *Record) WriteJSON(writer io.Writer) error {
var kvs jstream.KVS = make([]jstream.KV, len(r.columnNames)) var kvs jstream.KVS = make([]jstream.KV, len(r.columnNames))
for i := 0; i < len(r.columnNames); i++ { for i := 0; i < len(r.columnNames); i++ {
kvs[i] = jstream.KV{Key: r.columnNames[i], Value: r.csvRecord[i]} kvs[i] = jstream.KV{Key: r.columnNames[i], Value: r.csvRecord[i]}
} }
return json.Marshal(kvs) return json.NewEncoder(writer).Encode(kvs)
} }
// Raw - returns the underlying data with format info. // Raw - returns the underlying data with format info.

@ -17,11 +17,11 @@
package json package json
import ( import (
"bytes"
"encoding/csv" "encoding/csv"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io"
"strings" "strings"
"github.com/bcicen/jstream" "github.com/bcicen/jstream"
@ -77,8 +77,8 @@ func (r *Record) Set(name string, value *sql.Value) error {
return nil return nil
} }
// MarshalCSV - encodes to CSV data. // WriteCSV - encodes to CSV data.
func (r *Record) MarshalCSV(fieldDelimiter rune) ([]byte, error) { func (r *Record) WriteCSV(writer io.Writer, fieldDelimiter rune) error {
var csvRecord []string var csvRecord []string
for _, kv := range r.KVS { for _, kv := range r.KVS {
var columnValue string var columnValue string
@ -90,24 +90,22 @@ func (r *Record) MarshalCSV(fieldDelimiter rune) ([]byte, error) {
case RawJSON: case RawJSON:
columnValue = string([]byte(val)) columnValue = string([]byte(val))
default: default:
return nil, errors.New("Cannot marshal unhandled type") return errors.New("Cannot marshal unhandled type")
} }
csvRecord = append(csvRecord, columnValue) csvRecord = append(csvRecord, columnValue)
} }
buf := new(bytes.Buffer) w := csv.NewWriter(writer)
w := csv.NewWriter(buf)
w.Comma = fieldDelimiter w.Comma = fieldDelimiter
if err := w.Write(csvRecord); err != nil { if err := w.Write(csvRecord); err != nil {
return nil, err return err
} }
w.Flush() w.Flush()
if err := w.Error(); err != nil { if err := w.Error(); err != nil {
return nil, err return err
} }
data := buf.Bytes() return nil
return data[:len(data)-1], nil
} }
// Raw - returns the underlying representation. // Raw - returns the underlying representation.
@ -115,9 +113,9 @@ func (r *Record) Raw() (sql.SelectObjectFormat, interface{}) {
return r.SelectFormat, r.KVS return r.SelectFormat, r.KVS
} }
// MarshalJSON - encodes to JSON data. // WriteJSON - encodes to JSON data.
func (r *Record) MarshalJSON() ([]byte, error) { func (r *Record) WriteJSON(writer io.Writer) error {
return json.Marshal(r.KVS) return json.NewEncoder(writer).Encode(r.KVS)
} }
// Replace the underlying buffer of json data. // Replace the underlying buffer of json data.

@ -242,7 +242,7 @@ type messageWriter struct {
payloadBuffer []byte payloadBuffer []byte
payloadBufferIndex int payloadBufferIndex int
payloadCh chan []byte payloadCh chan *bytes.Buffer
finBytesScanned, finBytesProcessed int64 finBytesScanned, finBytesProcessed int64
@ -308,10 +308,10 @@ func (writer *messageWriter) start() {
} }
writer.write(endMessage) writer.write(endMessage)
} else { } else {
for len(payload) > 0 { for payload.Len() > 0 {
copiedLen := copy(writer.payloadBuffer[writer.payloadBufferIndex:], payload) copiedLen := copy(writer.payloadBuffer[writer.payloadBufferIndex:], payload.Bytes())
writer.payloadBufferIndex += copiedLen writer.payloadBufferIndex += copiedLen
payload = payload[copiedLen:] payload.Next(copiedLen)
// If buffer is filled, flush it now! // If buffer is filled, flush it now!
freeSpace := bufLength - writer.payloadBufferIndex freeSpace := bufLength - writer.payloadBufferIndex
@ -322,6 +322,8 @@ func (writer *messageWriter) start() {
} }
} }
} }
bufPool.Put(payload)
} }
case <-recordStagingTicker.C: case <-recordStagingTicker.C:
@ -349,10 +351,16 @@ func (writer *messageWriter) start() {
if progressTicker != nil { if progressTicker != nil {
progressTicker.Stop() progressTicker.Stop()
} }
// Whatever drain the payloadCh to prevent from memory leaking.
for len(writer.payloadCh) > 0 {
payload := <-writer.payloadCh
bufPool.Put(payload)
}
} }
// Sends a single whole record. // Sends a single whole record.
func (writer *messageWriter) SendRecord(payload []byte) error { func (writer *messageWriter) SendRecord(payload *bytes.Buffer) error {
select { select {
case writer.payloadCh <- payload: case writer.payloadCh <- payload:
return nil return nil
@ -409,7 +417,7 @@ func newMessageWriter(w http.ResponseWriter, getProgressFunc func() (bytesScanne
getProgressFunc: getProgressFunc, getProgressFunc: getProgressFunc,
payloadBuffer: make([]byte, bufLength), payloadBuffer: make([]byte, bufLength),
payloadCh: make(chan []byte), payloadCh: make(chan *bytes.Buffer),
errCh: make(chan []byte), errCh: make(chan []byte),
doneCh: make(chan struct{}), doneCh: make(chan struct{}),

@ -17,11 +17,15 @@
package s3select package s3select
import ( import (
"bufio"
"bytes"
"encoding/xml" "encoding/xml"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"net/http" "net/http"
"strings" "strings"
"sync"
"github.com/minio/minio/pkg/s3select/csv" "github.com/minio/minio/pkg/s3select/csv"
"github.com/minio/minio/pkg/s3select/json" "github.com/minio/minio/pkg/s3select/json"
@ -53,6 +57,20 @@ const (
maxRecordSize = 1 << 20 // 1 MiB maxRecordSize = 1 << 20 // 1 MiB
) )
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
var bufioWriterPool = sync.Pool{
New: func() interface{} {
// ioutil.Discard is just used to create the writer. Actual destination
// writer is set later by Reset() before using it.
return bufio.NewWriter(ioutil.Discard)
},
}
// UnmarshalXML - decodes XML data. // UnmarshalXML - decodes XML data.
func (c *CompressionType) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error { func (c *CompressionType) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
var s string var s string
@ -306,22 +324,36 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos
panic(fmt.Errorf("unknown input format '%v'", s3Select.Input.format)) panic(fmt.Errorf("unknown input format '%v'", s3Select.Input.format))
} }
func (s3Select *S3Select) marshal(record sql.Record) ([]byte, error) { func (s3Select *S3Select) marshal(buf *bytes.Buffer, record sql.Record) error {
switch s3Select.Output.format { switch s3Select.Output.format {
case csvFormat: case csvFormat:
data, err := record.MarshalCSV([]rune(s3Select.Output.CSVArgs.FieldDelimiter)[0]) // Use bufio Writer to prevent csv.Writer from allocating a new buffer.
bufioWriter := bufioWriterPool.Get().(*bufio.Writer)
defer func() {
bufioWriter.Reset(ioutil.Discard)
bufioWriterPool.Put(bufioWriter)
}()
bufioWriter.Reset(buf)
err := record.WriteCSV(bufioWriter, []rune(s3Select.Output.CSVArgs.FieldDelimiter)[0])
if err != nil { if err != nil {
return nil, err return err
} }
return append(data, []byte(s3Select.Output.CSVArgs.RecordDelimiter)...), nil buf.Truncate(buf.Len() - 1)
buf.WriteString(s3Select.Output.CSVArgs.RecordDelimiter)
return nil
case jsonFormat: case jsonFormat:
data, err := record.MarshalJSON() err := record.WriteJSON(buf)
if err != nil { if err != nil {
return nil, err return err
} }
return append(data, []byte(s3Select.Output.JSONArgs.RecordDelimiter)...), nil buf.Truncate(buf.Len() - 1)
buf.WriteString(s3Select.Output.JSONArgs.RecordDelimiter)
return nil
} }
panic(fmt.Errorf("unknown output format '%v'", s3Select.Output.format)) panic(fmt.Errorf("unknown output format '%v'", s3Select.Output.format))
@ -338,24 +370,29 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) {
var inputRecord sql.Record var inputRecord sql.Record
var outputRecord sql.Record var outputRecord sql.Record
var err error var err error
var data []byte
sendRecord := func() bool { sendRecord := func() bool {
if outputRecord == nil { if outputRecord == nil {
return true return true
} }
if data, err = s3Select.marshal(outputRecord); err != nil { buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()
if err = s3Select.marshal(buf, outputRecord); err != nil {
bufPool.Put(buf)
return false return false
} }
if len(data) > maxRecordSize { if buf.Len() > maxRecordSize {
writer.FinishWithError("OverMaxRecordSize", "The length of a record in the input or result is greater than maxCharsPerRecord of 1 MB.") writer.FinishWithError("OverMaxRecordSize", "The length of a record in the input or result is greater than maxCharsPerRecord of 1 MB.")
bufPool.Put(buf)
return false return false
} }
if err = writer.SendRecord(data); err != nil { if err = writer.SendRecord(buf); err != nil {
// FIXME: log this error. // FIXME: log this error.
err = nil err = nil
bufPool.Put(buf)
return false return false
} }

@ -99,26 +99,28 @@ func benchmarkSelect(b *testing.B, count int, query string) {
</SelectObjectContentRequest> </SelectObjectContentRequest>
`) `)
s3Select, err := NewS3Select(bytes.NewReader(requestXML))
if err != nil {
b.Fatal(err)
}
csvData := genSampleCSVData(count) csvData := genSampleCSVData(count)
b.ResetTimer() b.ResetTimer()
b.ReportAllocs() b.ReportAllocs()
for i := 0; i < b.N; i++ { b.RunParallel(func(pb *testing.PB) {
if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) { for pb.Next() {
return ioutil.NopCloser(bytes.NewReader(csvData)), nil s3Select, err := NewS3Select(bytes.NewReader(requestXML))
}); err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
}
if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) {
return ioutil.NopCloser(bytes.NewReader(csvData)), nil
}); err != nil {
b.Fatal(err)
}
s3Select.Evaluate(&nullResponseWriter{})
s3Select.Close()
} }
})
s3Select.Evaluate(&nullResponseWriter{})
s3Select.Close()
}
} }
func benchmarkSelectAll(b *testing.B, count int) { func benchmarkSelectAll(b *testing.B, count int) {

@ -16,7 +16,11 @@
package sql package sql
import "github.com/bcicen/jstream" import (
"io"
"github.com/bcicen/jstream"
)
// SelectObjectFormat specifies the format of the underlying data // SelectObjectFormat specifies the format of the underlying data
type SelectObjectFormat int type SelectObjectFormat int
@ -36,8 +40,8 @@ const (
type Record interface { type Record interface {
Get(name string) (*Value, error) Get(name string) (*Value, error)
Set(name string, value *Value) error Set(name string, value *Value) error
MarshalCSV(fieldDelimiter rune) ([]byte, error) WriteCSV(writer io.Writer, fieldDelimiter rune) error
MarshalJSON() ([]byte, error) WriteJSON(writer io.Writer) error
// Returns underlying representation // Returns underlying representation
Raw() (SelectObjectFormat, interface{}) Raw() (SelectObjectFormat, interface{})

Loading…
Cancel
Save