diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go
index 6cb708453..bfadf289f 100644
--- a/cmd/object-handlers.go
+++ b/cmd/object-handlers.go
@@ -198,7 +198,7 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r
w.WriteHeader(serr.HTTPStatusCode())
w.Write(s3select.NewErrorMessage(serr.ErrorCode(), serr.ErrorMessage()))
} else {
- writeErrorResponse(w, ErrInternalError, r.URL, guessIsBrowserReq(r))
+ writeErrorResponse(w, toAPIErrorCode(ctx, err), r.URL, guessIsBrowserReq(r))
}
return
}
diff --git a/pkg/s3select/json/data/10.json b/pkg/s3select/json/data/10.json
new file mode 100644
index 000000000..57bf5e805
--- /dev/null
+++ b/pkg/s3select/json/data/10.json
@@ -0,0 +1,12 @@
+[
+ {
+ "key_1": "value",
+ "key_2": "value"
+ }
+]
+[
+ {
+ "key_1": "value2",
+ "key_2": "value3"
+ }
+]
diff --git a/pkg/s3select/json/data/11.json b/pkg/s3select/json/data/11.json
new file mode 100644
index 000000000..e63d62d87
--- /dev/null
+++ b/pkg/s3select/json/data/11.json
@@ -0,0 +1,8 @@
+"a"
+1
+3.145
+["a"]
+{}
+{
+ "a": 1
+}
diff --git a/pkg/s3select/json/data/12.json b/pkg/s3select/json/data/12.json
new file mode 100644
index 000000000..8c175ec23
--- /dev/null
+++ b/pkg/s3select/json/data/12.json
@@ -0,0 +1,5 @@
+{
+ "a": 1
+}{
+ "b": 2
+}
diff --git a/pkg/s3select/json/data/2.json b/pkg/s3select/json/data/2.json
new file mode 100644
index 000000000..45fed0992
--- /dev/null
+++ b/pkg/s3select/json/data/2.json
@@ -0,0 +1 @@
+{"text": "hello world\\n2nd line"}
diff --git a/pkg/s3select/json/data/3.json b/pkg/s3select/json/data/3.json
new file mode 100644
index 000000000..949390e27
--- /dev/null
+++ b/pkg/s3select/json/data/3.json
@@ -0,0 +1 @@
+{"hello":"wor{l}d"}
diff --git a/pkg/s3select/json/data/4.json b/pkg/s3select/json/data/4.json
new file mode 100644
index 000000000..ef2b65474
--- /dev/null
+++ b/pkg/s3select/json/data/4.json
@@ -0,0 +1,26 @@
+{
+ "id": "0001",
+ "type": "donut",
+ "name": "Cake",
+ "ppu": 0.55,
+ "batters":
+ {
+ "batter":
+ [
+ { "id": "1001", "type": "Regular" },
+ { "id": "1002", "type": "Chocolate" },
+ { "id": "1003", "type": "Blueberry" },
+ { "id": "1004", "type": "Devil's Food" }
+ ]
+ },
+ "topping":
+ [
+ { "id": "5001", "type": "None" },
+ { "id": "5002", "type": "Glazed" },
+ { "id": "5005", "type": "Sugar" },
+ { "id": "5007", "type": "Powdered Sugar" },
+ { "id": "5006", "type": "Chocolate with Sprinkles" },
+ { "id": "5003", "type": "Chocolate" },
+ { "id": "5004", "type": "Maple" }
+ ]
+}
diff --git a/pkg/s3select/json/data/5.json b/pkg/s3select/json/data/5.json
new file mode 100644
index 000000000..ef69872d7
--- /dev/null
+++ b/pkg/s3select/json/data/5.json
@@ -0,0 +1,5 @@
+{
+ "foo": {
+ "bar": "baz"
+ }
+}
diff --git a/pkg/s3select/json/data/6.json b/pkg/s3select/json/data/6.json
new file mode 100644
index 000000000..31e3e4330
--- /dev/null
+++ b/pkg/s3select/json/data/6.json
@@ -0,0 +1 @@
+{ "name": "John", "age":28, "hobby": { "name": "chess", "type": "boardgame" }}
diff --git a/pkg/s3select/json/data/7.json b/pkg/s3select/json/data/7.json
new file mode 100644
index 000000000..60e603e38
--- /dev/null
+++ b/pkg/s3select/json/data/7.json
@@ -0,0 +1,3 @@
+{"name":"Michael", "age": 31}
+{"name":"Andy", "age": 30}
+{"name":"Justin", "age": 19}
diff --git a/pkg/s3select/json/data/8.json b/pkg/s3select/json/data/8.json
new file mode 100644
index 000000000..cc9350e5f
--- /dev/null
+++ b/pkg/s3select/json/data/8.json
@@ -0,0 +1,2 @@
+{"a":"}"
+}
diff --git a/pkg/s3select/json/data/9.json b/pkg/s3select/json/data/9.json
new file mode 100644
index 000000000..f9ab70461
--- /dev/null
+++ b/pkg/s3select/json/data/9.json
@@ -0,0 +1,6 @@
+[
+ {
+ "key_1": "value",
+ "key_2": "value"
+ }
+]
diff --git a/pkg/s3select/json/reader.go b/pkg/s3select/json/reader.go
index fb4e15ba2..06971e2ab 100644
--- a/pkg/s3select/json/reader.go
+++ b/pkg/s3select/json/reader.go
@@ -17,186 +17,48 @@
package json
import (
- "bytes"
+ "encoding/json"
"io"
- "io/ioutil"
- "strconv"
"github.com/minio/minio/pkg/s3select/sql"
- "github.com/tidwall/gjson"
+
+ "github.com/bcicen/jstream"
"github.com/tidwall/sjson"
)
-func toSingleLineJSON(input string, currentKey string, result gjson.Result) (output string, err error) {
- switch {
- case result.IsObject():
- result.ForEach(func(key, value gjson.Result) bool {
- jsonKey := key.String()
- if currentKey != "" {
- jsonKey = currentKey + "." + key.String()
- }
- output, err = toSingleLineJSON(input, jsonKey, value)
- input = output
- return err == nil
- })
- case result.IsArray():
- i := 0
- result.ForEach(func(key, value gjson.Result) bool {
- if currentKey == "" {
- panic("currentKey is empty")
- }
-
- indexKey := currentKey + "." + strconv.Itoa(i)
- output, err = toSingleLineJSON(input, indexKey, value)
- input = output
- i++
- return err == nil
- })
- default:
- output, err = sjson.Set(input, currentKey, result.Value())
- }
-
- return output, err
-}
-
-type objectReader struct {
- reader io.Reader
- err error
-
- p []byte
- start int
- end int
-
- escaped bool
- quoteOpened bool
- curlyCount uint64
- endOfObject bool
-}
-
-func (or *objectReader) objectEndIndex(p []byte, length int) int {
- for i := 0; i < length; i++ {
- if p[i] == '\\' {
- or.escaped = !or.escaped
- continue
- }
-
- if p[i] == '"' && !or.escaped {
- or.quoteOpened = !or.quoteOpened
- }
-
- or.escaped = false
-
- switch p[i] {
- case '{':
- if !or.quoteOpened {
- or.curlyCount++
- }
- case '}':
- if or.quoteOpened || or.curlyCount == 0 {
- break
- }
-
- if or.curlyCount--; or.curlyCount == 0 {
- return i + 1
- }
- }
- }
-
- return -1
-}
-
-func (or *objectReader) Read(p []byte) (n int, err error) {
- if or.endOfObject {
- return 0, io.EOF
- }
-
- if or.p != nil {
- n = copy(p, or.p[or.start:or.end])
- or.start += n
- if or.start == or.end {
- // made full copy.
- or.p = nil
- or.start = 0
- or.end = 0
- }
- } else {
- if or.err != nil {
- return 0, or.err
- }
-
- n, err = or.reader.Read(p)
- or.err = err
- switch err {
- case nil:
- case io.EOF, io.ErrUnexpectedEOF, io.ErrClosedPipe:
- or.err = io.EOF
- default:
- return 0, err
- }
- }
-
- index := or.objectEndIndex(p, n)
- if index == -1 || index == n {
- return n, nil
- }
-
- or.endOfObject = true
- if or.p == nil {
- or.p = p
- or.start = index
- or.end = n
- } else {
- or.start -= index
- }
-
- return index, nil
-}
-
-func (or *objectReader) Reset() error {
- or.endOfObject = false
-
- if or.p != nil {
- return nil
- }
-
- return or.err
-}
-
// Reader - JSON record reader for S3Select.
type Reader struct {
- args *ReaderArgs
- objectReader *objectReader
- readCloser io.ReadCloser
+ args *ReaderArgs
+ decoder *jstream.Decoder
+ valueCh chan *jstream.MetaValue
+ readCloser io.ReadCloser
}
// Read - reads single record.
func (r *Reader) Read() (sql.Record, error) {
- if err := r.objectReader.Reset(); err != nil {
- return nil, err
+ v, ok := <-r.valueCh
+ if !ok {
+ if err := r.decoder.Err(); err != nil {
+ return nil, errJSONParsingError(err)
+ }
+ return nil, io.EOF
}
- data, err := ioutil.ReadAll(r.objectReader)
- if err != nil {
- return nil, errJSONParsingError(err)
- }
+ var data []byte
+ var err error
- data = bytes.TrimSpace(data)
- if len(data) == 0 {
- return nil, io.EOF
+ if v.ValueType == jstream.Object {
+ data, err = json.Marshal(v.Value)
+ } else {
+ // To be AWS S3 compatible
+ // Select for JSON needs to output non-object JSON as single column value
+ // i.e. a map with `_1` as key and value as the non-object.
+ data, err = sjson.SetBytes(data, "_1", v.Value)
}
-
- if !gjson.ValidBytes(data) {
+ if err != nil {
return nil, errJSONParsingError(err)
}
- if bytes.Count(data, []byte("\n")) > 0 {
- var s string
- if s, err = toSingleLineJSON("", "", gjson.ParseBytes(data)); err != nil {
- return nil, errJSONParsingError(err)
- }
- data = []byte(s)
- }
-
return &Record{
data: data,
}, nil
@@ -209,9 +71,11 @@ func (r *Reader) Close() error {
// NewReader - creates new JSON reader using readCloser.
func NewReader(readCloser io.ReadCloser, args *ReaderArgs) *Reader {
+ d := jstream.NewDecoder(readCloser, 0)
return &Reader{
- args: args,
- objectReader: &objectReader{reader: readCloser},
- readCloser: readCloser,
+ args: args,
+ decoder: d,
+ valueCh: d.Stream(),
+ readCloser: readCloser,
}
}
diff --git a/pkg/s3select/json/reader_test.go b/pkg/s3select/json/reader_test.go
new file mode 100644
index 000000000..cb83a7c2e
--- /dev/null
+++ b/pkg/s3select/json/reader_test.go
@@ -0,0 +1,49 @@
+/*
+ * Minio Cloud Storage, (C) 2019 Minio, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package json
+
+import (
+ "io"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "testing"
+)
+
+func TestNewReader(t *testing.T) {
+ files, err := ioutil.ReadDir("data")
+ if err != nil {
+ t.Fatal(err)
+ }
+ for _, file := range files {
+ f, err := os.Open(filepath.Join("data", file.Name()))
+ if err != nil {
+ t.Fatal(err)
+ }
+ r := NewReader(f, &ReaderArgs{})
+ for {
+ _, err = r.Read()
+ if err != nil {
+ break
+ }
+ }
+ r.Close()
+ if err != io.EOF {
+ t.Fatalf("Reading failed with %s, %s", err, file.Name())
+ }
+ }
+}
diff --git a/vendor/github.com/bcicen/jstream/LICENSE b/vendor/github.com/bcicen/jstream/LICENSE
new file mode 100644
index 000000000..1c5d82df6
--- /dev/null
+++ b/vendor/github.com/bcicen/jstream/LICENSE
@@ -0,0 +1,22 @@
+The MIT License (MIT)
+
+Copyright (c) 2018 Bradley Cicenas
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+
diff --git a/vendor/github.com/bcicen/jstream/README.md b/vendor/github.com/bcicen/jstream/README.md
new file mode 100644
index 000000000..d8f43aa7a
--- /dev/null
+++ b/vendor/github.com/bcicen/jstream/README.md
@@ -0,0 +1,93 @@
+
![jstream](jstream.png)
+
+#
+
+[![GoDoc](https://godoc.org/github.com/bcicen/jstream?status.svg)](https://godoc.org/github.com/bcicen/jstream)
+
+
+`jstream` is a streaming JSON parser and value extraction library for Go.
+
+Unlike most JSON parsers, `jstream` is document position- and depth-aware -- this enables the extraction of values at a specified depth, eliminating the overhead of allocating encompassing arrays or objects; e.g:
+
+Using the below example document:
+
+
+we can choose to extract and act only the objects within the top-level array:
+```go
+f, _ := os.Open("input.json")
+decoder := jstream.NewDecoder(f, 1) // extract JSON values at a depth level of 1
+for mv := range decoder.Stream() {
+ fmt.Printf("%v\n ", mv.Value)
+}
+```
+
+output:
+```
+map[desc:RGB colors:[red green blue]]
+map[desc:CMYK colors:[cyan magenta yellow black]]
+```
+
+likewise, increasing depth level to `3` yields:
+```
+red
+green
+blue
+cyan
+magenta
+yellow
+black
+```
+
+optionally, kev:value pairs can be emitted as an individual struct:
+```go
+decoder := jstream.NewDecoder(f, 2).EmitKV() // enable KV streaming at a depth level of 2
+```
+
+```
+jstream.KV{desc RGB}
+jstream.KV{colors [red green blue]}
+jstream.KV{desc CMYK}
+jstream.KV{colors [cyan magenta yellow black]}
+```
+
+## Installing
+
+```bash
+go get github.com/bcicen/jstream
+```
+
+## Commandline
+
+`jstream` comes with a cli tool for quick viewing of parsed values from JSON input:
+
+```bash
+cat input.json | jstream -v -d 1
+depth start end type | value
+
+1 004 069 object | {"colors":["red","green","blue"],"desc":"RGB"}
+1 073 153 object | {"colors":["cyan","magenta","yellow","black"],"desc":"CMYK"}
+```
+
+### Options
+
+Opt | Description
+--- | ---
+-d \ | emit values at depth n. if n < 0, all values will be emitted
+-v | output depth and offset details for each value
+-h | display help dialog
+
+## Benchmarks
+
+Obligatory benchmarks performed on files with arrays of objects, where the decoded objects are to be extracted.
+
+Two file sizes are used -- regular (1.6mb, 1000 objects) and large (128mb, 100000 objects)
+
+input size | lib | MB/s | Allocated
+--- | --- | --- | ---
+regular | standard | 97 | 3.6MB
+regular | jstream | 175 | 2.1MB
+large | standard | 92 | 305MB
+large | jstream | 404 | 69MB
+
+In a real world scenario, including initialization and reader overhead from varying blob sizes, performance can be expected as below:
+
diff --git a/vendor/github.com/bcicen/jstream/decoder.go b/vendor/github.com/bcicen/jstream/decoder.go
new file mode 100644
index 000000000..1c596e22c
--- /dev/null
+++ b/vendor/github.com/bcicen/jstream/decoder.go
@@ -0,0 +1,554 @@
+package jstream
+
+import (
+ "bytes"
+ "encoding/json"
+ "io"
+ "strconv"
+ "sync/atomic"
+ "unicode/utf16"
+)
+
+// ValueType - defines the type of each JSON value
+type ValueType int
+
+// Different types of JSON value
+const (
+ Unknown ValueType = iota
+ Null
+ String
+ Number
+ Boolean
+ Array
+ Object
+)
+
+// MetaValue wraps a decoded interface value with the document
+// position and depth at which the value was parsed
+type MetaValue struct {
+ Offset int
+ Length int
+ Depth int
+ Value interface{}
+ ValueType ValueType
+}
+
+// KV contains a key and value pair parsed from a decoded object
+type KV struct {
+ Key string `json:"key"`
+ Value interface{} `json:"value"`
+}
+
+// KVS - represents key values in an JSON object
+type KVS []KV
+
+// MarshalJSON - implements converting slice of KVS into a JSON object
+// with multiple keys and values.
+func (kvs KVS) MarshalJSON() ([]byte, error) {
+ b := new(bytes.Buffer)
+ b.Write([]byte("{"))
+ for i, kv := range kvs {
+ b.Write([]byte("\"" + kv.Key + "\"" + ":"))
+ valBuf, err := json.Marshal(kv.Value)
+ if err != nil {
+ return nil, err
+ }
+ b.Write(valBuf)
+ if i < len(kvs)-1 {
+ b.Write([]byte(","))
+ }
+ }
+ b.Write([]byte("}"))
+ return b.Bytes(), nil
+}
+
+// Decoder wraps an io.Reader to provide incremental decoding of
+// JSON values
+type Decoder struct {
+ *scanner
+ emitDepth int
+ emitKV bool
+ emitRecursive bool
+
+ depth int
+ scratch *scratch
+ metaCh chan *MetaValue
+ err error
+
+ // follow line position to add context to errors
+ lineNo int
+ lineStart int64
+}
+
+// NewDecoder creates new Decoder to read JSON values at the provided
+// emitDepth from the provider io.Reader.
+// If emitDepth is < 0, values at every depth will be emitted.
+func NewDecoder(r io.Reader, emitDepth int) *Decoder {
+ d := &Decoder{
+ scanner: newScanner(r),
+ emitDepth: emitDepth,
+ scratch: &scratch{data: make([]byte, 1024)},
+ metaCh: make(chan *MetaValue, 128),
+ }
+ if emitDepth < 0 {
+ d.emitDepth = 0
+ d.emitRecursive = true
+ }
+ return d
+}
+
+// EmitKV enables emitting a jstream.KV struct when the items(s) parsed
+// at configured emit depth are within a JSON object. By default, only
+// the object values are emitted.
+func (d *Decoder) EmitKV() *Decoder {
+ d.emitKV = true
+ return d
+}
+
+// Recursive enables emitting all values at a depth higher than the
+// configured emit depth; e.g. if an array is found at emit depth, all
+// values within the array are emitted to the stream, then the array
+// containing those values is emitted.
+func (d *Decoder) Recursive() *Decoder {
+ d.emitRecursive = true
+ return d
+}
+
+// Stream begins decoding from the underlying reader and returns a
+// streaming MetaValue channel for JSON values at the configured emitDepth.
+func (d *Decoder) Stream() chan *MetaValue {
+ go d.decode()
+ return d.metaCh
+}
+
+// Pos returns the number of bytes consumed from the underlying reader
+func (d *Decoder) Pos() int { return int(d.pos) }
+
+// Err returns the most recent decoder error if any, or nil
+func (d *Decoder) Err() error { return d.err }
+
+// Decode parses the JSON-encoded data and returns an interface value
+func (d *Decoder) decode() {
+ defer close(d.metaCh)
+ d.skipSpaces()
+ for d.pos < atomic.LoadInt64(&d.end) {
+ _, err := d.emitAny()
+ if err != nil {
+ d.err = err
+ break
+ }
+ d.skipSpaces()
+ }
+}
+
+func (d *Decoder) emitAny() (interface{}, error) {
+ if d.pos >= atomic.LoadInt64(&d.end) {
+ return nil, d.mkError(ErrUnexpectedEOF)
+ }
+ offset := d.pos - 1
+ i, t, err := d.any()
+ if d.willEmit() {
+ d.metaCh <- &MetaValue{
+ Offset: int(offset),
+ Length: int(d.pos - offset),
+ Depth: d.depth,
+ Value: i,
+ ValueType: t,
+ }
+ }
+ return i, err
+}
+
+// return whether, at the current depth, the value being decoded will
+// be emitted to stream
+func (d *Decoder) willEmit() bool {
+ if d.emitRecursive {
+ return d.depth >= d.emitDepth
+ }
+ return d.depth == d.emitDepth
+}
+
+// any used to decode any valid JSON value, and returns an
+// interface{} that holds the actual data
+func (d *Decoder) any() (interface{}, ValueType, error) {
+ c := d.cur()
+
+ switch c {
+ case '"':
+ i, err := d.string()
+ return i, String, err
+ case '0', '1', '2', '3', '4', '5', '6', '7', '8', '9':
+ i, err := d.number()
+ return i, Number, err
+ case '-':
+ if c = d.next(); c < '0' && c > '9' {
+ return nil, Unknown, d.mkError(ErrSyntax, "in negative numeric literal")
+ }
+ n, err := d.number()
+ if err != nil {
+ return nil, Unknown, err
+ }
+ return -n, Number, nil
+ case 'f':
+ if d.remaining() < 4 {
+ return nil, Unknown, d.mkError(ErrUnexpectedEOF)
+ }
+ if d.next() == 'a' && d.next() == 'l' && d.next() == 's' && d.next() == 'e' {
+ return false, Boolean, nil
+ }
+ return nil, Unknown, d.mkError(ErrSyntax, "in literal false")
+ case 't':
+ if d.remaining() < 3 {
+ return nil, Unknown, d.mkError(ErrUnexpectedEOF)
+ }
+ if d.next() == 'r' && d.next() == 'u' && d.next() == 'e' {
+ return true, Boolean, nil
+ }
+ return nil, Unknown, d.mkError(ErrSyntax, "in literal true")
+ case 'n':
+ if d.remaining() < 3 {
+ return nil, Unknown, d.mkError(ErrUnexpectedEOF)
+ }
+ if d.next() == 'u' && d.next() == 'l' && d.next() == 'l' {
+ return nil, Null, nil
+ }
+ return nil, Unknown, d.mkError(ErrSyntax, "in literal null")
+ case '[':
+ i, err := d.array()
+ return i, Array, err
+ case '{':
+ i, err := d.object()
+ return i, Object, err
+ default:
+ return nil, Unknown, d.mkError(ErrSyntax, "looking for beginning of value")
+ }
+}
+
+// string called by `any` or `object`(for map keys) after reading `"`
+func (d *Decoder) string() (string, error) {
+ d.scratch.reset()
+
+ var (
+ c = d.next()
+ )
+
+scan:
+ for {
+ switch {
+ case c == '"':
+ return string(d.scratch.bytes()), nil
+ case c == '\\':
+ c = d.next()
+ goto scan_esc
+ case c < 0x20:
+ return "", d.mkError(ErrSyntax, "in string literal")
+ // Coerce to well-formed UTF-8.
+ default:
+ d.scratch.add(c)
+ if d.remaining() == 0 {
+ return "", d.mkError(ErrSyntax, "in string literal")
+ }
+ c = d.next()
+ }
+ }
+
+scan_esc:
+ switch c {
+ case '"', '\\', '/', '\'':
+ d.scratch.add(c)
+ case 'u':
+ goto scan_u
+ case 'b':
+ d.scratch.add('\b')
+ case 'f':
+ d.scratch.add('\f')
+ case 'n':
+ d.scratch.add('\n')
+ case 'r':
+ d.scratch.add('\r')
+ case 't':
+ d.scratch.add('\t')
+ default:
+ return "", d.mkError(ErrSyntax, "in string escape code")
+ }
+ c = d.next()
+ goto scan
+
+scan_u:
+ r := d.u4()
+ if r < 0 {
+ return "", d.mkError(ErrSyntax, "in unicode escape sequence")
+ }
+
+ // check for proceeding surrogate pair
+ c = d.next()
+ if !utf16.IsSurrogate(r) || c != '\\' {
+ d.scratch.addRune(r)
+ goto scan
+ }
+ if c = d.next(); c != 'u' {
+ d.scratch.addRune(r)
+ goto scan_esc
+ }
+
+ r2 := d.u4()
+ if r2 < 0 {
+ return "", d.mkError(ErrSyntax, "in unicode escape sequence")
+ }
+
+ // write surrogate pair
+ d.scratch.addRune(utf16.DecodeRune(r, r2))
+ c = d.next()
+ goto scan
+}
+
+// u4 reads four bytes following a \u escape
+func (d *Decoder) u4() rune {
+ // logic taken from:
+ // github.com/buger/jsonparser/blob/master/escape.go#L20
+ var h [4]int
+ for i := 0; i < 4; i++ {
+ c := d.next()
+ switch {
+ case c >= '0' && c <= '9':
+ h[i] = int(c - '0')
+ case c >= 'A' && c <= 'F':
+ h[i] = int(c - 'A' + 10)
+ case c >= 'a' && c <= 'f':
+ h[i] = int(c - 'a' + 10)
+ default:
+ return -1
+ }
+ }
+ return rune(h[0]<<12 + h[1]<<8 + h[2]<<4 + h[3])
+}
+
+// number called by `any` after reading number between 0 to 9
+func (d *Decoder) number() (float64, error) {
+ d.scratch.reset()
+
+ var (
+ c = d.cur()
+ n float64
+ isFloat bool
+ )
+
+ // digits first
+ switch {
+ case c == '0':
+ d.scratch.add(c)
+ c = d.next()
+ case '1' <= c && c <= '9':
+ for ; c >= '0' && c <= '9'; c = d.next() {
+ n = 10*n + float64(c-'0')
+ d.scratch.add(c)
+ }
+ }
+
+ // . followed by 1 or more digits
+ if c == '.' {
+ isFloat = true
+ d.scratch.add(c)
+
+ // first char following must be digit
+ if c = d.next(); c < '0' && c > '9' {
+ return 0, d.mkError(ErrSyntax, "after decimal point in numeric literal")
+ }
+ d.scratch.add(c)
+
+ for {
+ if d.remaining() == 0 {
+ return 0, d.mkError(ErrUnexpectedEOF)
+ }
+ if c = d.next(); c < '0' || c > '9' {
+ break
+ }
+ d.scratch.add(c)
+ }
+ }
+
+ // e or E followed by an optional - or + and
+ // 1 or more digits.
+ if c == 'e' || c == 'E' {
+ isFloat = true
+ d.scratch.add(c)
+
+ if c = d.next(); c == '+' || c == '-' {
+ d.scratch.add(c)
+ if c = d.next(); c < '0' || c > '9' {
+ return 0, d.mkError(ErrSyntax, "in exponent of numeric literal")
+ }
+ d.scratch.add(c)
+ }
+ for ; c >= '0' && c <= '9'; c = d.next() {
+ d.scratch.add(c)
+ }
+ }
+
+ if isFloat {
+ var (
+ err error
+ sn string
+ )
+ sn = string(d.scratch.bytes())
+ if n, err = strconv.ParseFloat(sn, 64); err != nil {
+ return 0, err
+ }
+ }
+
+ d.back()
+ return n, nil
+}
+
+// array accept valid JSON array value
+func (d *Decoder) array() ([]interface{}, error) {
+ d.depth++
+
+ var (
+ c byte
+ v interface{}
+ err error
+ array = make([]interface{}, 0)
+ )
+
+ // look ahead for ] - if the array is empty.
+ if c = d.skipSpaces(); c == ']' {
+ goto out
+ }
+
+scan:
+ if v, err = d.emitAny(); err != nil {
+ goto out
+ }
+
+ if d.depth > d.emitDepth { // skip alloc for array if it won't be emitted
+ array = append(array, v)
+ }
+
+ // next token must be ',' or ']'
+ switch c = d.skipSpaces(); c {
+ case ',':
+ d.skipSpaces()
+ goto scan
+ case ']':
+ goto out
+ default:
+ err = d.mkError(ErrSyntax, "after array element")
+ }
+
+out:
+ d.depth--
+ return array, err
+}
+
+// object accept valid JSON array value
+func (d *Decoder) object() (KVS, error) {
+ d.depth++
+
+ var (
+ c byte
+ k string
+ v interface{}
+ t ValueType
+ err error
+ obj KVS
+ )
+
+ // skip allocating map if it will not be emitted
+ if d.depth > d.emitDepth {
+ obj = make(KVS, 0)
+ }
+
+ // if the object has no keys
+ if c = d.skipSpaces(); c == '}' {
+ goto out
+ }
+
+scan:
+ for {
+ offset := d.pos - 1
+
+ // read string key
+ if c != '"' {
+ err = d.mkError(ErrSyntax, "looking for beginning of object key string")
+ break
+ }
+ if k, err = d.string(); err != nil {
+ break
+ }
+
+ // read colon before value
+ if c = d.skipSpaces(); c != ':' {
+ err = d.mkError(ErrSyntax, "after object key")
+ break
+ }
+
+ // read value
+ d.skipSpaces()
+ if d.emitKV {
+ if v, t, err = d.any(); err != nil {
+ break
+ }
+ if d.willEmit() {
+ d.metaCh <- &MetaValue{
+ Offset: int(offset),
+ Length: int(d.pos - offset),
+ Depth: d.depth,
+ Value: KV{k, v},
+ ValueType: t,
+ }
+ }
+ } else {
+ if v, err = d.emitAny(); err != nil {
+ break
+ }
+ }
+
+ if obj != nil {
+ obj = append(obj, KV{k, v})
+ }
+
+ // next token must be ',' or '}'
+ switch c = d.skipSpaces(); c {
+ case '}':
+ goto out
+ case ',':
+ c = d.skipSpaces()
+ goto scan
+ default:
+ err = d.mkError(ErrSyntax, "after object key:value pair")
+ }
+ }
+
+out:
+ d.depth--
+ return obj, err
+}
+
+// returns the next char after white spaces
+func (d *Decoder) skipSpaces() byte {
+ for d.pos < atomic.LoadInt64(&d.end) {
+ switch c := d.next(); c {
+ case '\n':
+ d.lineStart = d.pos
+ d.lineNo++
+ continue
+ case ' ', '\t', '\r':
+ continue
+ default:
+ return c
+ }
+ }
+ return 0
+}
+
+// create syntax errors at current position, with optional context
+func (d *Decoder) mkError(err SyntaxError, context ...string) error {
+ if len(context) > 0 {
+ err.context = context[0]
+ }
+ err.atChar = d.cur()
+ err.pos[0] = d.lineNo + 1
+ err.pos[1] = int(d.pos - d.lineStart)
+ return err
+}
diff --git a/vendor/github.com/bcicen/jstream/errors.go b/vendor/github.com/bcicen/jstream/errors.go
new file mode 100644
index 000000000..19e3b1f62
--- /dev/null
+++ b/vendor/github.com/bcicen/jstream/errors.go
@@ -0,0 +1,41 @@
+package jstream
+
+import (
+ "fmt"
+ "strconv"
+)
+
+// Predefined errors
+var (
+ ErrSyntax = SyntaxError{msg: "invalid character"}
+ ErrUnexpectedEOF = SyntaxError{msg: "unexpected end of JSON input"}
+)
+
+type errPos [2]int // line number, byte offset where error occurred
+
+type SyntaxError struct {
+ msg string // description of error
+ context string // additional error context
+ pos errPos
+ atChar byte
+}
+
+func (e SyntaxError) Error() string {
+ loc := fmt.Sprintf("%s [%d,%d]", quoteChar(e.atChar), e.pos[0], e.pos[1])
+ return fmt.Sprintf("%s %s: %s", e.msg, e.context, loc)
+}
+
+// quoteChar formats c as a quoted character literal
+func quoteChar(c byte) string {
+ // special cases - different from quoted strings
+ if c == '\'' {
+ return `'\''`
+ }
+ if c == '"' {
+ return `'"'`
+ }
+
+ // use quoted string with different quotation marks
+ s := strconv.Quote(string(c))
+ return "'" + s[1:len(s)-1] + "'"
+}
diff --git a/vendor/github.com/bcicen/jstream/jstream.png b/vendor/github.com/bcicen/jstream/jstream.png
new file mode 100644
index 000000000..38d6e4486
Binary files /dev/null and b/vendor/github.com/bcicen/jstream/jstream.png differ
diff --git a/vendor/github.com/bcicen/jstream/scanner.go b/vendor/github.com/bcicen/jstream/scanner.go
new file mode 100644
index 000000000..1e8224233
--- /dev/null
+++ b/vendor/github.com/bcicen/jstream/scanner.go
@@ -0,0 +1,107 @@
+package jstream
+
+import (
+ "io"
+ "sync/atomic"
+)
+
+const (
+ chunk = 4095 // ~4k
+ maxUint = ^uint(0)
+ maxInt = int64(maxUint >> 1)
+)
+
+type scanner struct {
+ pos int64 // position in reader
+ ipos int64 // internal buffer position
+ ifill int64 // internal buffer fill
+ end int64
+ buf [chunk + 1]byte // internal buffer (with a lookback size of 1)
+ nbuf [chunk]byte // next internal buffer
+ fillReq chan struct{}
+ fillReady chan int64
+}
+
+func newScanner(r io.Reader) *scanner {
+ sr := &scanner{
+ end: maxInt,
+ fillReq: make(chan struct{}),
+ fillReady: make(chan int64),
+ }
+
+ go func() {
+ var rpos int64 // total bytes read into buffer
+
+ for _ = range sr.fillReq {
+ scan:
+ n, err := r.Read(sr.nbuf[:])
+
+ if n == 0 {
+ switch err {
+ case io.EOF: // reader is exhausted
+ atomic.StoreInt64(&sr.end, rpos)
+ close(sr.fillReady)
+ return
+ case nil: // no data and no error, retry fill
+ goto scan
+ default:
+ panic(err)
+ }
+ }
+
+ rpos += int64(n)
+ sr.fillReady <- int64(n)
+ }
+ }()
+
+ sr.fillReq <- struct{}{} // initial fill
+
+ return sr
+}
+
+// remaining returns the number of unread bytes
+// if EOF for the underlying reader has not yet been found,
+// maximum possible integer value will be returned
+func (s *scanner) remaining() int64 {
+ if atomic.LoadInt64(&s.end) == maxInt {
+ return maxInt
+ }
+ return atomic.LoadInt64(&s.end) - s.pos
+}
+
+// read byte at current position (without advancing)
+func (s *scanner) cur() byte { return s.buf[s.ipos] }
+
+// read next byte
+func (s *scanner) next() byte {
+ if s.pos >= atomic.LoadInt64(&s.end) {
+ return byte(0)
+ }
+ s.ipos++
+
+ if s.ipos > s.ifill { // internal buffer is exhausted
+ s.ifill = <-s.fillReady
+ s.buf[0] = s.buf[len(s.buf)-1] // copy current last item to guarantee lookback
+ copy(s.buf[1:], s.nbuf[:]) // copy contents of pre-filled next buffer
+ s.ipos = 1 // move to beginning of internal buffer
+
+ // request next fill to be prepared
+ if s.end == maxInt {
+ s.fillReq <- struct{}{}
+ }
+ }
+
+ s.pos++
+ return s.buf[s.ipos]
+}
+
+// back undoes a previous call to next(), moving backward one byte in the internal buffer.
+// as we only guarantee a lookback buffer size of one, any subsequent calls to back()
+// before calling next() may panic
+func (s *scanner) back() {
+ if s.ipos <= 0 {
+ panic("back buffer exhausted")
+ }
+ s.ipos--
+ s.pos--
+}
diff --git a/vendor/github.com/bcicen/jstream/scratch.go b/vendor/github.com/bcicen/jstream/scratch.go
new file mode 100644
index 000000000..9e29e3e43
--- /dev/null
+++ b/vendor/github.com/bcicen/jstream/scratch.go
@@ -0,0 +1,44 @@
+package jstream
+
+import (
+ "unicode/utf8"
+)
+
+type scratch struct {
+ data []byte
+ fill int
+}
+
+// reset scratch buffer
+func (s *scratch) reset() { s.fill = 0 }
+
+// bytes returns the written contents of scratch buffer
+func (s *scratch) bytes() []byte { return s.data[0:s.fill] }
+
+// grow scratch buffer
+func (s *scratch) grow() {
+ ndata := make([]byte, cap(s.data)*2)
+ copy(ndata, s.data[:])
+ s.data = ndata
+}
+
+// append single byte to scratch buffer
+func (s *scratch) add(c byte) {
+ if s.fill+1 >= cap(s.data) {
+ s.grow()
+ }
+
+ s.data[s.fill] = c
+ s.fill++
+}
+
+// append encoded rune to scratch buffer
+func (s *scratch) addRune(r rune) int {
+ if s.fill+utf8.UTFMax >= cap(s.data) {
+ s.grow()
+ }
+
+ n := utf8.EncodeRune(s.data[s.fill:], r)
+ s.fill += n
+ return n
+}
diff --git a/vendor/vendor.json b/vendor/vendor.json
index 32f1514cb..4a62aaa66 100644
--- a/vendor/vendor.json
+++ b/vendor/vendor.json
@@ -115,6 +115,12 @@
"revision": "6fe16293d6b7af4f5c2450714c5b4825c8ad040c",
"revisionTime": "2017-09-25T03:23:15Z"
},
+ {
+ "checksumSHA1": "mSo9Sti7F6+laUs4NLx/p23rHD0=",
+ "path": "github.com/bcicen/jstream",
+ "revision": "f306cd3e1fa602a1b513114521a0d6a5a9d5c919",
+ "revisionTime": "2019-02-06T02:23:53Z"
+ },
{
"checksumSHA1": "0rido7hYHQtfq3UJzVT5LClLAWc=",
"path": "github.com/beorn7/perks/quantile",