diff --git a/pkg/s3select/json/preader.go b/pkg/s3select/json/preader.go new file mode 100644 index 000000000..bb7798004 --- /dev/null +++ b/pkg/s3select/json/preader.go @@ -0,0 +1,227 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package json + +import ( + "bufio" + "bytes" + "io" + "runtime" + "sync" + + "github.com/bcicen/jstream" + "github.com/minio/minio/pkg/s3select/sql" +) + +// PReader - JSON record reader for S3Select. +// Operates concurrently on line-delimited JSON. +type PReader struct { + args *ReaderArgs + readCloser io.ReadCloser // raw input + buf *bufio.Reader // input to the splitter + current []jstream.KVS // current block of results to be returned + recordsRead int // number of records read in current slice + input chan *queueItem // input for workers + queue chan *queueItem // output from workers in order + err error // global error state, only touched by Reader.Read + bufferPool sync.Pool // pool of []byte objects for input + kvDstPool sync.Pool // pool of []jstream.KV used for output + close chan struct{} // used for shutting down the splitter before end of stream + readerWg sync.WaitGroup // used to keep track of async reader. +} + +// queueItem is an item in the queue. +type queueItem struct { + input []byte // raw input sent to the worker + dst chan []jstream.KVS // result of block decode + err error // any error encountered will be set here +} + +// Read - reads single record. +// Once Read is called the previous record should no longer be referenced. +func (r *PReader) Read(dst sql.Record) (sql.Record, error) { + // If we have have any records left, return these before any error. + for len(r.current) <= r.recordsRead { + if r.err != nil { + return nil, r.err + } + // Move to next block + item, ok := <-r.queue + if !ok { + r.err = io.EOF + return nil, r.err + } + //lint:ignore SA6002 Using pointer would allocate more since we would have to copy slice header before taking a pointer. + r.kvDstPool.Put(r.current) + r.current = <-item.dst + r.err = item.err + r.recordsRead = 0 + } + kvRecord := r.current[r.recordsRead] + r.recordsRead++ + + dstRec, ok := dst.(*Record) + if !ok { + dstRec = &Record{} + } + dstRec.KVS = kvRecord + dstRec.SelectFormat = sql.SelectFmtJSON + return dstRec, nil +} + +// Close - closes underlying reader. +func (r *PReader) Close() error { + if r.close != nil { + close(r.close) + r.readerWg.Wait() + r.close = nil + } + r.recordsRead = len(r.current) + if r.err == nil { + r.err = io.EOF + } + return r.readCloser.Close() +} + +// nextSplit will attempt to skip a number of bytes and +// return the buffer until the next newline occurs. +// The last block will be sent along with an io.EOF. +func (r *PReader) nextSplit(skip int, dst []byte) ([]byte, error) { + if cap(dst) < skip { + dst = make([]byte, 0, skip+1024) + } + dst = dst[:skip] + if skip > 0 { + n, err := io.ReadFull(r.buf, dst) + if err != nil && err != io.ErrUnexpectedEOF { + // If an EOF happens after reading some but not all the bytes, + // ReadFull returns ErrUnexpectedEOF. + return dst[:n], err + } + dst = dst[:n] + if err == io.ErrUnexpectedEOF { + return dst, io.EOF + } + } + // Read until next line. + in, err := r.buf.ReadBytes('\n') + dst = append(dst, in...) + return dst, err +} + +// jsonSplitSize is the size of each block. +// Blocks will read this much and find the first following newline. +// 128KB appears to be a very reasonable default. +const jsonSplitSize = 128 << 10 + +// startReaders will read the header if needed and spin up a parser +// and a number of workers based on GOMAXPROCS. +// If an error is returned no goroutines have been started and r.err will have been set. +func (r *PReader) startReaders() { + r.bufferPool.New = func() interface{} { + return make([]byte, jsonSplitSize+1024) + } + + // Create queue + r.queue = make(chan *queueItem, runtime.GOMAXPROCS(0)) + r.input = make(chan *queueItem, runtime.GOMAXPROCS(0)) + r.readerWg.Add(1) + + // Start splitter + go func() { + defer close(r.input) + defer close(r.queue) + defer r.readerWg.Done() + for { + next, err := r.nextSplit(jsonSplitSize, r.bufferPool.Get().([]byte)) + q := queueItem{ + input: next, + dst: make(chan []jstream.KVS, 1), + err: err, + } + select { + case <-r.close: + return + case r.queue <- &q: + } + + select { + case <-r.close: + return + case r.input <- &q: + } + if err != nil { + // Exit on any error. + return + } + } + }() + + // Start parsers + for i := 0; i < runtime.GOMAXPROCS(0); i++ { + go func() { + for in := range r.input { + if len(in.input) == 0 { + in.dst <- nil + continue + } + dst, ok := r.kvDstPool.Get().([]jstream.KVS) + if !ok { + dst = make([]jstream.KVS, 0, 1000) + } + + d := jstream.NewDecoder(bytes.NewBuffer(in.input), 0).ObjectAsKVS() + stream := d.Stream() + all := dst[:0] + for mv := range stream { + var kvs jstream.KVS + if mv.ValueType == jstream.Object { + // This is a JSON object type (that preserves key + // order) + kvs = mv.Value.(jstream.KVS) + } else { + // To be AWS S3 compatible Select for JSON needs to + // output non-object JSON as single column value + // i.e. a map with `_1` as key and value as the + // non-object. + kvs = jstream.KVS{jstream.KV{Key: "_1", Value: mv.Value}} + } + all = append(all, kvs) + } + // We don't need the input any more. + //lint:ignore SA6002 Using pointer would allocate more since we would have to copy slice header before taking a pointer. + r.bufferPool.Put(in.input) + in.input = nil + in.err = d.Err() + in.dst <- all + } + }() + } +} + +// NewPReader - creates new parallel JSON reader using readCloser. +// Should only be used for LINES types. +func NewPReader(readCloser io.ReadCloser, args *ReaderArgs) *PReader { + r := &PReader{ + args: args, + buf: bufio.NewReaderSize(readCloser, jsonSplitSize*2), + readCloser: readCloser, + close: make(chan struct{}), + } + r.startReaders() + return r +} diff --git a/pkg/s3select/json/preader_test.go b/pkg/s3select/json/preader_test.go new file mode 100644 index 000000000..7d345e6ea --- /dev/null +++ b/pkg/s3select/json/preader_test.go @@ -0,0 +1,106 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package json + +import ( + "bytes" + "io" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/minio/minio/pkg/s3select/sql" +) + +func TestNewPReader(t *testing.T) { + files, err := ioutil.ReadDir("testdata") + if err != nil { + t.Fatal(err) + } + for _, file := range files { + t.Run(file.Name(), func(t *testing.T) { + f, err := os.Open(filepath.Join("testdata", file.Name())) + if err != nil { + t.Fatal(err) + } + r := NewPReader(f, &ReaderArgs{}) + var record sql.Record + for { + record, err = r.Read(record) + if err != nil { + break + } + } + r.Close() + if err != io.EOF { + t.Fatalf("Reading failed with %s, %s", err, file.Name()) + } + }) + + t.Run(file.Name()+"-close", func(t *testing.T) { + f, err := os.Open(filepath.Join("testdata", file.Name())) + if err != nil { + t.Fatal(err) + } + r := NewPReader(f, &ReaderArgs{}) + r.Close() + var record sql.Record + for { + record, err = r.Read(record) + if err != nil { + break + } + } + if err != io.EOF { + t.Fatalf("Reading failed with %s, %s", err, file.Name()) + } + }) + } +} + +func BenchmarkPReader(b *testing.B) { + files, err := ioutil.ReadDir("testdata") + if err != nil { + b.Fatal(err) + } + for _, file := range files { + b.Run(file.Name(), func(b *testing.B) { + f, err := ioutil.ReadFile(filepath.Join("testdata", file.Name())) + if err != nil { + b.Fatal(err) + } + b.SetBytes(int64(len(f))) + b.ReportAllocs() + b.ResetTimer() + var record sql.Record + for i := 0; i < b.N; i++ { + r := NewPReader(ioutil.NopCloser(bytes.NewBuffer(f)), &ReaderArgs{}) + for { + record, err = r.Read(record) + if err != nil { + break + } + } + r.Close() + if err != io.EOF { + b.Fatalf("Reading failed with %s, %s", err, file.Name()) + } + } + }) + } +} diff --git a/pkg/s3select/select.go b/pkg/s3select/select.go index b9a28757e..5ace10707 100644 --- a/pkg/s3select/select.go +++ b/pkg/s3select/select.go @@ -316,7 +316,11 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos return err } - s3Select.recordReader = json.NewReader(s3Select.progressReader, &s3Select.Input.JSONArgs) + if strings.EqualFold(s3Select.Input.JSONArgs.ContentType, "lines") { + s3Select.recordReader = json.NewPReader(s3Select.progressReader, &s3Select.Input.JSONArgs) + } else { + s3Select.recordReader = json.NewReader(s3Select.progressReader, &s3Select.Input.JSONArgs) + } return nil case parquetFormat: var err error