|
|
@ -23,6 +23,7 @@ import ( |
|
|
|
"io" |
|
|
|
"io" |
|
|
|
"runtime" |
|
|
|
"runtime" |
|
|
|
"sync" |
|
|
|
"sync" |
|
|
|
|
|
|
|
"unicode/utf8" |
|
|
|
|
|
|
|
|
|
|
|
csv "github.com/minio/minio/pkg/csvparser" |
|
|
|
csv "github.com/minio/minio/pkg/csvparser" |
|
|
|
"github.com/minio/minio/pkg/s3select/sql" |
|
|
|
"github.com/minio/minio/pkg/s3select/sql" |
|
|
@ -159,6 +160,9 @@ func (r *Reader) startReaders(newReader func(io.Reader) *csv.Reader) error { |
|
|
|
r.err = err |
|
|
|
r.err = err |
|
|
|
return err |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if !utf8.Valid(b) { |
|
|
|
|
|
|
|
return errInvalidTextEncodingError() |
|
|
|
|
|
|
|
} |
|
|
|
reader := newReader(bytes.NewReader(b)) |
|
|
|
reader := newReader(bytes.NewReader(b)) |
|
|
|
record, err := reader.Read() |
|
|
|
record, err := reader.Read() |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
@ -181,6 +185,13 @@ func (r *Reader) startReaders(newReader func(io.Reader) *csv.Reader) error { |
|
|
|
return make([]byte, csvSplitSize+1024) |
|
|
|
return make([]byte, csvSplitSize+1024) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Return first block
|
|
|
|
|
|
|
|
next, nextErr := r.nextSplit(csvSplitSize, r.bufferPool.Get().([]byte)) |
|
|
|
|
|
|
|
// Check if first block is valid.
|
|
|
|
|
|
|
|
if !utf8.Valid(next) { |
|
|
|
|
|
|
|
return errInvalidTextEncodingError() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Create queue
|
|
|
|
// Create queue
|
|
|
|
r.queue = make(chan *queueItem, runtime.GOMAXPROCS(0)) |
|
|
|
r.queue = make(chan *queueItem, runtime.GOMAXPROCS(0)) |
|
|
|
r.input = make(chan *queueItem, runtime.GOMAXPROCS(0)) |
|
|
|
r.input = make(chan *queueItem, runtime.GOMAXPROCS(0)) |
|
|
@ -192,11 +203,10 @@ func (r *Reader) startReaders(newReader func(io.Reader) *csv.Reader) error { |
|
|
|
defer close(r.queue) |
|
|
|
defer close(r.queue) |
|
|
|
defer r.readerWg.Done() |
|
|
|
defer r.readerWg.Done() |
|
|
|
for { |
|
|
|
for { |
|
|
|
next, err := r.nextSplit(csvSplitSize, r.bufferPool.Get().([]byte)) |
|
|
|
|
|
|
|
q := queueItem{ |
|
|
|
q := queueItem{ |
|
|
|
input: next, |
|
|
|
input: next, |
|
|
|
dst: make(chan [][]string, 1), |
|
|
|
dst: make(chan [][]string, 1), |
|
|
|
err: err, |
|
|
|
err: nextErr, |
|
|
|
} |
|
|
|
} |
|
|
|
select { |
|
|
|
select { |
|
|
|
case <-r.close: |
|
|
|
case <-r.close: |
|
|
@ -209,10 +219,11 @@ func (r *Reader) startReaders(newReader func(io.Reader) *csv.Reader) error { |
|
|
|
return |
|
|
|
return |
|
|
|
case r.input <- &q: |
|
|
|
case r.input <- &q: |
|
|
|
} |
|
|
|
} |
|
|
|
if err != nil { |
|
|
|
if nextErr != nil { |
|
|
|
// Exit on any error.
|
|
|
|
// Exit on any error.
|
|
|
|
return |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
next, nextErr = r.nextSplit(csvSplitSize, r.bufferPool.Get().([]byte)) |
|
|
|
} |
|
|
|
} |
|
|
|
}() |
|
|
|
}() |
|
|
|
|
|
|
|
|
|
|
|