Fix JSON parser handling for certain objects (#7162)

This PR also adds some comments and simplifies
the code. Primary handling is done to ensure
that we make sure to honor cached buffer.

Added unit tests as well

Fixes #7141
master
Harshavardhana 6 years ago committed by Nitish Tiwari
parent d203e7e1cc
commit 85e939636f
  1. 2
      cmd/object-handlers.go
  2. 12
      pkg/s3select/json/data/10.json
  3. 8
      pkg/s3select/json/data/11.json
  4. 5
      pkg/s3select/json/data/12.json
  5. 1
      pkg/s3select/json/data/2.json
  6. 1
      pkg/s3select/json/data/3.json
  7. 26
      pkg/s3select/json/data/4.json
  8. 5
      pkg/s3select/json/data/5.json
  9. 1
      pkg/s3select/json/data/6.json
  10. 3
      pkg/s3select/json/data/7.json
  11. 2
      pkg/s3select/json/data/8.json
  12. 6
      pkg/s3select/json/data/9.json
  13. 192
      pkg/s3select/json/reader.go
  14. 49
      pkg/s3select/json/reader_test.go
  15. 22
      vendor/github.com/bcicen/jstream/LICENSE
  16. 93
      vendor/github.com/bcicen/jstream/README.md
  17. 554
      vendor/github.com/bcicen/jstream/decoder.go
  18. 41
      vendor/github.com/bcicen/jstream/errors.go
  19. BIN
      vendor/github.com/bcicen/jstream/jstream.png
  20. 107
      vendor/github.com/bcicen/jstream/scanner.go
  21. 44
      vendor/github.com/bcicen/jstream/scratch.go
  22. 6
      vendor/vendor.json

@ -198,7 +198,7 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r
w.WriteHeader(serr.HTTPStatusCode()) w.WriteHeader(serr.HTTPStatusCode())
w.Write(s3select.NewErrorMessage(serr.ErrorCode(), serr.ErrorMessage())) w.Write(s3select.NewErrorMessage(serr.ErrorCode(), serr.ErrorMessage()))
} else { } else {
writeErrorResponse(w, ErrInternalError, r.URL, guessIsBrowserReq(r)) writeErrorResponse(w, toAPIErrorCode(ctx, err), r.URL, guessIsBrowserReq(r))
} }
return return
} }

@ -0,0 +1,12 @@
[
{
"key_1": "value",
"key_2": "value"
}
]
[
{
"key_1": "value2",
"key_2": "value3"
}
]

@ -0,0 +1,8 @@
"a"
1
3.145
["a"]
{}
{
"a": 1
}

@ -0,0 +1,5 @@
{
"a": 1
}{
"b": 2
}

@ -0,0 +1 @@
{"text": "hello world\\n2nd line"}

@ -0,0 +1 @@
{"hello":"wor{l}d"}

@ -0,0 +1,26 @@
{
"id": "0001",
"type": "donut",
"name": "Cake",
"ppu": 0.55,
"batters":
{
"batter":
[
{ "id": "1001", "type": "Regular" },
{ "id": "1002", "type": "Chocolate" },
{ "id": "1003", "type": "Blueberry" },
{ "id": "1004", "type": "Devil's Food" }
]
},
"topping":
[
{ "id": "5001", "type": "None" },
{ "id": "5002", "type": "Glazed" },
{ "id": "5005", "type": "Sugar" },
{ "id": "5007", "type": "Powdered Sugar" },
{ "id": "5006", "type": "Chocolate with Sprinkles" },
{ "id": "5003", "type": "Chocolate" },
{ "id": "5004", "type": "Maple" }
]
}

@ -0,0 +1,5 @@
{
"foo": {
"bar": "baz"
}
}

@ -0,0 +1 @@
{ "name": "John", "age":28, "hobby": { "name": "chess", "type": "boardgame" }}

@ -0,0 +1,3 @@
{"name":"Michael", "age": 31}
{"name":"Andy", "age": 30}
{"name":"Justin", "age": 19}

@ -0,0 +1,6 @@
[
{
"key_1": "value",
"key_2": "value"
}
]

@ -17,186 +17,48 @@
package json package json
import ( import (
"bytes" "encoding/json"
"io" "io"
"io/ioutil"
"strconv"
"github.com/minio/minio/pkg/s3select/sql" "github.com/minio/minio/pkg/s3select/sql"
"github.com/tidwall/gjson"
"github.com/bcicen/jstream"
"github.com/tidwall/sjson" "github.com/tidwall/sjson"
) )
func toSingleLineJSON(input string, currentKey string, result gjson.Result) (output string, err error) {
switch {
case result.IsObject():
result.ForEach(func(key, value gjson.Result) bool {
jsonKey := key.String()
if currentKey != "" {
jsonKey = currentKey + "." + key.String()
}
output, err = toSingleLineJSON(input, jsonKey, value)
input = output
return err == nil
})
case result.IsArray():
i := 0
result.ForEach(func(key, value gjson.Result) bool {
if currentKey == "" {
panic("currentKey is empty")
}
indexKey := currentKey + "." + strconv.Itoa(i)
output, err = toSingleLineJSON(input, indexKey, value)
input = output
i++
return err == nil
})
default:
output, err = sjson.Set(input, currentKey, result.Value())
}
return output, err
}
type objectReader struct {
reader io.Reader
err error
p []byte
start int
end int
escaped bool
quoteOpened bool
curlyCount uint64
endOfObject bool
}
func (or *objectReader) objectEndIndex(p []byte, length int) int {
for i := 0; i < length; i++ {
if p[i] == '\\' {
or.escaped = !or.escaped
continue
}
if p[i] == '"' && !or.escaped {
or.quoteOpened = !or.quoteOpened
}
or.escaped = false
switch p[i] {
case '{':
if !or.quoteOpened {
or.curlyCount++
}
case '}':
if or.quoteOpened || or.curlyCount == 0 {
break
}
if or.curlyCount--; or.curlyCount == 0 {
return i + 1
}
}
}
return -1
}
func (or *objectReader) Read(p []byte) (n int, err error) {
if or.endOfObject {
return 0, io.EOF
}
if or.p != nil {
n = copy(p, or.p[or.start:or.end])
or.start += n
if or.start == or.end {
// made full copy.
or.p = nil
or.start = 0
or.end = 0
}
} else {
if or.err != nil {
return 0, or.err
}
n, err = or.reader.Read(p)
or.err = err
switch err {
case nil:
case io.EOF, io.ErrUnexpectedEOF, io.ErrClosedPipe:
or.err = io.EOF
default:
return 0, err
}
}
index := or.objectEndIndex(p, n)
if index == -1 || index == n {
return n, nil
}
or.endOfObject = true
if or.p == nil {
or.p = p
or.start = index
or.end = n
} else {
or.start -= index
}
return index, nil
}
func (or *objectReader) Reset() error {
or.endOfObject = false
if or.p != nil {
return nil
}
return or.err
}
// Reader - JSON record reader for S3Select. // Reader - JSON record reader for S3Select.
type Reader struct { type Reader struct {
args *ReaderArgs args *ReaderArgs
objectReader *objectReader decoder *jstream.Decoder
readCloser io.ReadCloser valueCh chan *jstream.MetaValue
readCloser io.ReadCloser
} }
// Read - reads single record. // Read - reads single record.
func (r *Reader) Read() (sql.Record, error) { func (r *Reader) Read() (sql.Record, error) {
if err := r.objectReader.Reset(); err != nil { v, ok := <-r.valueCh
return nil, err if !ok {
if err := r.decoder.Err(); err != nil {
return nil, errJSONParsingError(err)
}
return nil, io.EOF
} }
data, err := ioutil.ReadAll(r.objectReader) var data []byte
if err != nil { var err error
return nil, errJSONParsingError(err)
}
data = bytes.TrimSpace(data) if v.ValueType == jstream.Object {
if len(data) == 0 { data, err = json.Marshal(v.Value)
return nil, io.EOF } else {
// To be AWS S3 compatible
// Select for JSON needs to output non-object JSON as single column value
// i.e. a map with `_1` as key and value as the non-object.
data, err = sjson.SetBytes(data, "_1", v.Value)
} }
if err != nil {
if !gjson.ValidBytes(data) {
return nil, errJSONParsingError(err) return nil, errJSONParsingError(err)
} }
if bytes.Count(data, []byte("\n")) > 0 {
var s string
if s, err = toSingleLineJSON("", "", gjson.ParseBytes(data)); err != nil {
return nil, errJSONParsingError(err)
}
data = []byte(s)
}
return &Record{ return &Record{
data: data, data: data,
}, nil }, nil
@ -209,9 +71,11 @@ func (r *Reader) Close() error {
// NewReader - creates new JSON reader using readCloser. // NewReader - creates new JSON reader using readCloser.
func NewReader(readCloser io.ReadCloser, args *ReaderArgs) *Reader { func NewReader(readCloser io.ReadCloser, args *ReaderArgs) *Reader {
d := jstream.NewDecoder(readCloser, 0)
return &Reader{ return &Reader{
args: args, args: args,
objectReader: &objectReader{reader: readCloser}, decoder: d,
readCloser: readCloser, valueCh: d.Stream(),
readCloser: readCloser,
} }
} }

@ -0,0 +1,49 @@
/*
* Minio Cloud Storage, (C) 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package json
import (
"io"
"io/ioutil"
"os"
"path/filepath"
"testing"
)
func TestNewReader(t *testing.T) {
files, err := ioutil.ReadDir("data")
if err != nil {
t.Fatal(err)
}
for _, file := range files {
f, err := os.Open(filepath.Join("data", file.Name()))
if err != nil {
t.Fatal(err)
}
r := NewReader(f, &ReaderArgs{})
for {
_, err = r.Read()
if err != nil {
break
}
}
r.Close()
if err != io.EOF {
t.Fatalf("Reading failed with %s, %s", err, file.Name())
}
}
}

@ -0,0 +1,22 @@
The MIT License (MIT)
Copyright (c) 2018 Bradley Cicenas
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

@ -0,0 +1,93 @@
<p align="center"><img width="350px" src="jstream.png" alt="jstream"/></p>
#
[![GoDoc](https://godoc.org/github.com/bcicen/jstream?status.svg)](https://godoc.org/github.com/bcicen/jstream)
`jstream` is a streaming JSON parser and value extraction library for Go.
Unlike most JSON parsers, `jstream` is document position- and depth-aware -- this enables the extraction of values at a specified depth, eliminating the overhead of allocating encompassing arrays or objects; e.g:
Using the below example document:
<img width="85%" src="https://bradley.codes/static/img/jstream-levels.gif" alt="jstream"/>
we can choose to extract and act only the objects within the top-level array:
```go
f, _ := os.Open("input.json")
decoder := jstream.NewDecoder(f, 1) // extract JSON values at a depth level of 1
for mv := range decoder.Stream() {
fmt.Printf("%v\n ", mv.Value)
}
```
output:
```
map[desc:RGB colors:[red green blue]]
map[desc:CMYK colors:[cyan magenta yellow black]]
```
likewise, increasing depth level to `3` yields:
```
red
green
blue
cyan
magenta
yellow
black
```
optionally, kev:value pairs can be emitted as an individual struct:
```go
decoder := jstream.NewDecoder(f, 2).EmitKV() // enable KV streaming at a depth level of 2
```
```
jstream.KV{desc RGB}
jstream.KV{colors [red green blue]}
jstream.KV{desc CMYK}
jstream.KV{colors [cyan magenta yellow black]}
```
## Installing
```bash
go get github.com/bcicen/jstream
```
## Commandline
`jstream` comes with a cli tool for quick viewing of parsed values from JSON input:
```bash
cat input.json | jstream -v -d 1
depth start end type | value
1 004 069 object | {"colors":["red","green","blue"],"desc":"RGB"}
1 073 153 object | {"colors":["cyan","magenta","yellow","black"],"desc":"CMYK"}
```
### Options
Opt | Description
--- | ---
-d \<n\> | emit values at depth n. if n < 0, all values will be emitted
-v | output depth and offset details for each value
-h | display help dialog
## Benchmarks
Obligatory benchmarks performed on files with arrays of objects, where the decoded objects are to be extracted.
Two file sizes are used -- regular (1.6mb, 1000 objects) and large (128mb, 100000 objects)
input size | lib | MB/s | Allocated
--- | --- | --- | ---
regular | standard | 97 | 3.6MB
regular | jstream | 175 | 2.1MB
large | standard | 92 | 305MB
large | jstream | 404 | 69MB
In a real world scenario, including initialization and reader overhead from varying blob sizes, performance can be expected as below:
<img src="https://bradley.codes/static/img/bench.svg" alt="jstream"/>

@ -0,0 +1,554 @@
package jstream
import (
"bytes"
"encoding/json"
"io"
"strconv"
"sync/atomic"
"unicode/utf16"
)
// ValueType - defines the type of each JSON value
type ValueType int
// Different types of JSON value
const (
Unknown ValueType = iota
Null
String
Number
Boolean
Array
Object
)
// MetaValue wraps a decoded interface value with the document
// position and depth at which the value was parsed
type MetaValue struct {
Offset int
Length int
Depth int
Value interface{}
ValueType ValueType
}
// KV contains a key and value pair parsed from a decoded object
type KV struct {
Key string `json:"key"`
Value interface{} `json:"value"`
}
// KVS - represents key values in an JSON object
type KVS []KV
// MarshalJSON - implements converting slice of KVS into a JSON object
// with multiple keys and values.
func (kvs KVS) MarshalJSON() ([]byte, error) {
b := new(bytes.Buffer)
b.Write([]byte("{"))
for i, kv := range kvs {
b.Write([]byte("\"" + kv.Key + "\"" + ":"))
valBuf, err := json.Marshal(kv.Value)
if err != nil {
return nil, err
}
b.Write(valBuf)
if i < len(kvs)-1 {
b.Write([]byte(","))
}
}
b.Write([]byte("}"))
return b.Bytes(), nil
}
// Decoder wraps an io.Reader to provide incremental decoding of
// JSON values
type Decoder struct {
*scanner
emitDepth int
emitKV bool
emitRecursive bool
depth int
scratch *scratch
metaCh chan *MetaValue
err error
// follow line position to add context to errors
lineNo int
lineStart int64
}
// NewDecoder creates new Decoder to read JSON values at the provided
// emitDepth from the provider io.Reader.
// If emitDepth is < 0, values at every depth will be emitted.
func NewDecoder(r io.Reader, emitDepth int) *Decoder {
d := &Decoder{
scanner: newScanner(r),
emitDepth: emitDepth,
scratch: &scratch{data: make([]byte, 1024)},
metaCh: make(chan *MetaValue, 128),
}
if emitDepth < 0 {
d.emitDepth = 0
d.emitRecursive = true
}
return d
}
// EmitKV enables emitting a jstream.KV struct when the items(s) parsed
// at configured emit depth are within a JSON object. By default, only
// the object values are emitted.
func (d *Decoder) EmitKV() *Decoder {
d.emitKV = true
return d
}
// Recursive enables emitting all values at a depth higher than the
// configured emit depth; e.g. if an array is found at emit depth, all
// values within the array are emitted to the stream, then the array
// containing those values is emitted.
func (d *Decoder) Recursive() *Decoder {
d.emitRecursive = true
return d
}
// Stream begins decoding from the underlying reader and returns a
// streaming MetaValue channel for JSON values at the configured emitDepth.
func (d *Decoder) Stream() chan *MetaValue {
go d.decode()
return d.metaCh
}
// Pos returns the number of bytes consumed from the underlying reader
func (d *Decoder) Pos() int { return int(d.pos) }
// Err returns the most recent decoder error if any, or nil
func (d *Decoder) Err() error { return d.err }
// Decode parses the JSON-encoded data and returns an interface value
func (d *Decoder) decode() {
defer close(d.metaCh)
d.skipSpaces()
for d.pos < atomic.LoadInt64(&d.end) {
_, err := d.emitAny()
if err != nil {
d.err = err
break
}
d.skipSpaces()
}
}
func (d *Decoder) emitAny() (interface{}, error) {
if d.pos >= atomic.LoadInt64(&d.end) {
return nil, d.mkError(ErrUnexpectedEOF)
}
offset := d.pos - 1
i, t, err := d.any()
if d.willEmit() {
d.metaCh <- &MetaValue{
Offset: int(offset),
Length: int(d.pos - offset),
Depth: d.depth,
Value: i,
ValueType: t,
}
}
return i, err
}
// return whether, at the current depth, the value being decoded will
// be emitted to stream
func (d *Decoder) willEmit() bool {
if d.emitRecursive {
return d.depth >= d.emitDepth
}
return d.depth == d.emitDepth
}
// any used to decode any valid JSON value, and returns an
// interface{} that holds the actual data
func (d *Decoder) any() (interface{}, ValueType, error) {
c := d.cur()
switch c {
case '"':
i, err := d.string()
return i, String, err
case '0', '1', '2', '3', '4', '5', '6', '7', '8', '9':
i, err := d.number()
return i, Number, err
case '-':
if c = d.next(); c < '0' && c > '9' {
return nil, Unknown, d.mkError(ErrSyntax, "in negative numeric literal")
}
n, err := d.number()
if err != nil {
return nil, Unknown, err
}
return -n, Number, nil
case 'f':
if d.remaining() < 4 {
return nil, Unknown, d.mkError(ErrUnexpectedEOF)
}
if d.next() == 'a' && d.next() == 'l' && d.next() == 's' && d.next() == 'e' {
return false, Boolean, nil
}
return nil, Unknown, d.mkError(ErrSyntax, "in literal false")
case 't':
if d.remaining() < 3 {
return nil, Unknown, d.mkError(ErrUnexpectedEOF)
}
if d.next() == 'r' && d.next() == 'u' && d.next() == 'e' {
return true, Boolean, nil
}
return nil, Unknown, d.mkError(ErrSyntax, "in literal true")
case 'n':
if d.remaining() < 3 {
return nil, Unknown, d.mkError(ErrUnexpectedEOF)
}
if d.next() == 'u' && d.next() == 'l' && d.next() == 'l' {
return nil, Null, nil
}
return nil, Unknown, d.mkError(ErrSyntax, "in literal null")
case '[':
i, err := d.array()
return i, Array, err
case '{':
i, err := d.object()
return i, Object, err
default:
return nil, Unknown, d.mkError(ErrSyntax, "looking for beginning of value")
}
}
// string called by `any` or `object`(for map keys) after reading `"`
func (d *Decoder) string() (string, error) {
d.scratch.reset()
var (
c = d.next()
)
scan:
for {
switch {
case c == '"':
return string(d.scratch.bytes()), nil
case c == '\\':
c = d.next()
goto scan_esc
case c < 0x20:
return "", d.mkError(ErrSyntax, "in string literal")
// Coerce to well-formed UTF-8.
default:
d.scratch.add(c)
if d.remaining() == 0 {
return "", d.mkError(ErrSyntax, "in string literal")
}
c = d.next()
}
}
scan_esc:
switch c {
case '"', '\\', '/', '\'':
d.scratch.add(c)
case 'u':
goto scan_u
case 'b':
d.scratch.add('\b')
case 'f':
d.scratch.add('\f')
case 'n':
d.scratch.add('\n')
case 'r':
d.scratch.add('\r')
case 't':
d.scratch.add('\t')
default:
return "", d.mkError(ErrSyntax, "in string escape code")
}
c = d.next()
goto scan
scan_u:
r := d.u4()
if r < 0 {
return "", d.mkError(ErrSyntax, "in unicode escape sequence")
}
// check for proceeding surrogate pair
c = d.next()
if !utf16.IsSurrogate(r) || c != '\\' {
d.scratch.addRune(r)
goto scan
}
if c = d.next(); c != 'u' {
d.scratch.addRune(r)
goto scan_esc
}
r2 := d.u4()
if r2 < 0 {
return "", d.mkError(ErrSyntax, "in unicode escape sequence")
}
// write surrogate pair
d.scratch.addRune(utf16.DecodeRune(r, r2))
c = d.next()
goto scan
}
// u4 reads four bytes following a \u escape
func (d *Decoder) u4() rune {
// logic taken from:
// github.com/buger/jsonparser/blob/master/escape.go#L20
var h [4]int
for i := 0; i < 4; i++ {
c := d.next()
switch {
case c >= '0' && c <= '9':
h[i] = int(c - '0')
case c >= 'A' && c <= 'F':
h[i] = int(c - 'A' + 10)
case c >= 'a' && c <= 'f':
h[i] = int(c - 'a' + 10)
default:
return -1
}
}
return rune(h[0]<<12 + h[1]<<8 + h[2]<<4 + h[3])
}
// number called by `any` after reading number between 0 to 9
func (d *Decoder) number() (float64, error) {
d.scratch.reset()
var (
c = d.cur()
n float64
isFloat bool
)
// digits first
switch {
case c == '0':
d.scratch.add(c)
c = d.next()
case '1' <= c && c <= '9':
for ; c >= '0' && c <= '9'; c = d.next() {
n = 10*n + float64(c-'0')
d.scratch.add(c)
}
}
// . followed by 1 or more digits
if c == '.' {
isFloat = true
d.scratch.add(c)
// first char following must be digit
if c = d.next(); c < '0' && c > '9' {
return 0, d.mkError(ErrSyntax, "after decimal point in numeric literal")
}
d.scratch.add(c)
for {
if d.remaining() == 0 {
return 0, d.mkError(ErrUnexpectedEOF)
}
if c = d.next(); c < '0' || c > '9' {
break
}
d.scratch.add(c)
}
}
// e or E followed by an optional - or + and
// 1 or more digits.
if c == 'e' || c == 'E' {
isFloat = true
d.scratch.add(c)
if c = d.next(); c == '+' || c == '-' {
d.scratch.add(c)
if c = d.next(); c < '0' || c > '9' {
return 0, d.mkError(ErrSyntax, "in exponent of numeric literal")
}
d.scratch.add(c)
}
for ; c >= '0' && c <= '9'; c = d.next() {
d.scratch.add(c)
}
}
if isFloat {
var (
err error
sn string
)
sn = string(d.scratch.bytes())
if n, err = strconv.ParseFloat(sn, 64); err != nil {
return 0, err
}
}
d.back()
return n, nil
}
// array accept valid JSON array value
func (d *Decoder) array() ([]interface{}, error) {
d.depth++
var (
c byte
v interface{}
err error
array = make([]interface{}, 0)
)
// look ahead for ] - if the array is empty.
if c = d.skipSpaces(); c == ']' {
goto out
}
scan:
if v, err = d.emitAny(); err != nil {
goto out
}
if d.depth > d.emitDepth { // skip alloc for array if it won't be emitted
array = append(array, v)
}
// next token must be ',' or ']'
switch c = d.skipSpaces(); c {
case ',':
d.skipSpaces()
goto scan
case ']':
goto out
default:
err = d.mkError(ErrSyntax, "after array element")
}
out:
d.depth--
return array, err
}
// object accept valid JSON array value
func (d *Decoder) object() (KVS, error) {
d.depth++
var (
c byte
k string
v interface{}
t ValueType
err error
obj KVS
)
// skip allocating map if it will not be emitted
if d.depth > d.emitDepth {
obj = make(KVS, 0)
}
// if the object has no keys
if c = d.skipSpaces(); c == '}' {
goto out
}
scan:
for {
offset := d.pos - 1
// read string key
if c != '"' {
err = d.mkError(ErrSyntax, "looking for beginning of object key string")
break
}
if k, err = d.string(); err != nil {
break
}
// read colon before value
if c = d.skipSpaces(); c != ':' {
err = d.mkError(ErrSyntax, "after object key")
break
}
// read value
d.skipSpaces()
if d.emitKV {
if v, t, err = d.any(); err != nil {
break
}
if d.willEmit() {
d.metaCh <- &MetaValue{
Offset: int(offset),
Length: int(d.pos - offset),
Depth: d.depth,
Value: KV{k, v},
ValueType: t,
}
}
} else {
if v, err = d.emitAny(); err != nil {
break
}
}
if obj != nil {
obj = append(obj, KV{k, v})
}
// next token must be ',' or '}'
switch c = d.skipSpaces(); c {
case '}':
goto out
case ',':
c = d.skipSpaces()
goto scan
default:
err = d.mkError(ErrSyntax, "after object key:value pair")
}
}
out:
d.depth--
return obj, err
}
// returns the next char after white spaces
func (d *Decoder) skipSpaces() byte {
for d.pos < atomic.LoadInt64(&d.end) {
switch c := d.next(); c {
case '\n':
d.lineStart = d.pos
d.lineNo++
continue
case ' ', '\t', '\r':
continue
default:
return c
}
}
return 0
}
// create syntax errors at current position, with optional context
func (d *Decoder) mkError(err SyntaxError, context ...string) error {
if len(context) > 0 {
err.context = context[0]
}
err.atChar = d.cur()
err.pos[0] = d.lineNo + 1
err.pos[1] = int(d.pos - d.lineStart)
return err
}

@ -0,0 +1,41 @@
package jstream
import (
"fmt"
"strconv"
)
// Predefined errors
var (
ErrSyntax = SyntaxError{msg: "invalid character"}
ErrUnexpectedEOF = SyntaxError{msg: "unexpected end of JSON input"}
)
type errPos [2]int // line number, byte offset where error occurred
type SyntaxError struct {
msg string // description of error
context string // additional error context
pos errPos
atChar byte
}
func (e SyntaxError) Error() string {
loc := fmt.Sprintf("%s [%d,%d]", quoteChar(e.atChar), e.pos[0], e.pos[1])
return fmt.Sprintf("%s %s: %s", e.msg, e.context, loc)
}
// quoteChar formats c as a quoted character literal
func quoteChar(c byte) string {
// special cases - different from quoted strings
if c == '\'' {
return `'\''`
}
if c == '"' {
return `'"'`
}
// use quoted string with different quotation marks
s := strconv.Quote(string(c))
return "'" + s[1:len(s)-1] + "'"
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

@ -0,0 +1,107 @@
package jstream
import (
"io"
"sync/atomic"
)
const (
chunk = 4095 // ~4k
maxUint = ^uint(0)
maxInt = int64(maxUint >> 1)
)
type scanner struct {
pos int64 // position in reader
ipos int64 // internal buffer position
ifill int64 // internal buffer fill
end int64
buf [chunk + 1]byte // internal buffer (with a lookback size of 1)
nbuf [chunk]byte // next internal buffer
fillReq chan struct{}
fillReady chan int64
}
func newScanner(r io.Reader) *scanner {
sr := &scanner{
end: maxInt,
fillReq: make(chan struct{}),
fillReady: make(chan int64),
}
go func() {
var rpos int64 // total bytes read into buffer
for _ = range sr.fillReq {
scan:
n, err := r.Read(sr.nbuf[:])
if n == 0 {
switch err {
case io.EOF: // reader is exhausted
atomic.StoreInt64(&sr.end, rpos)
close(sr.fillReady)
return
case nil: // no data and no error, retry fill
goto scan
default:
panic(err)
}
}
rpos += int64(n)
sr.fillReady <- int64(n)
}
}()
sr.fillReq <- struct{}{} // initial fill
return sr
}
// remaining returns the number of unread bytes
// if EOF for the underlying reader has not yet been found,
// maximum possible integer value will be returned
func (s *scanner) remaining() int64 {
if atomic.LoadInt64(&s.end) == maxInt {
return maxInt
}
return atomic.LoadInt64(&s.end) - s.pos
}
// read byte at current position (without advancing)
func (s *scanner) cur() byte { return s.buf[s.ipos] }
// read next byte
func (s *scanner) next() byte {
if s.pos >= atomic.LoadInt64(&s.end) {
return byte(0)
}
s.ipos++
if s.ipos > s.ifill { // internal buffer is exhausted
s.ifill = <-s.fillReady
s.buf[0] = s.buf[len(s.buf)-1] // copy current last item to guarantee lookback
copy(s.buf[1:], s.nbuf[:]) // copy contents of pre-filled next buffer
s.ipos = 1 // move to beginning of internal buffer
// request next fill to be prepared
if s.end == maxInt {
s.fillReq <- struct{}{}
}
}
s.pos++
return s.buf[s.ipos]
}
// back undoes a previous call to next(), moving backward one byte in the internal buffer.
// as we only guarantee a lookback buffer size of one, any subsequent calls to back()
// before calling next() may panic
func (s *scanner) back() {
if s.ipos <= 0 {
panic("back buffer exhausted")
}
s.ipos--
s.pos--
}

@ -0,0 +1,44 @@
package jstream
import (
"unicode/utf8"
)
type scratch struct {
data []byte
fill int
}
// reset scratch buffer
func (s *scratch) reset() { s.fill = 0 }
// bytes returns the written contents of scratch buffer
func (s *scratch) bytes() []byte { return s.data[0:s.fill] }
// grow scratch buffer
func (s *scratch) grow() {
ndata := make([]byte, cap(s.data)*2)
copy(ndata, s.data[:])
s.data = ndata
}
// append single byte to scratch buffer
func (s *scratch) add(c byte) {
if s.fill+1 >= cap(s.data) {
s.grow()
}
s.data[s.fill] = c
s.fill++
}
// append encoded rune to scratch buffer
func (s *scratch) addRune(r rune) int {
if s.fill+utf8.UTFMax >= cap(s.data) {
s.grow()
}
n := utf8.EncodeRune(s.data[s.fill:], r)
s.fill += n
return n
}

@ -115,6 +115,12 @@
"revision": "6fe16293d6b7af4f5c2450714c5b4825c8ad040c", "revision": "6fe16293d6b7af4f5c2450714c5b4825c8ad040c",
"revisionTime": "2017-09-25T03:23:15Z" "revisionTime": "2017-09-25T03:23:15Z"
}, },
{
"checksumSHA1": "mSo9Sti7F6+laUs4NLx/p23rHD0=",
"path": "github.com/bcicen/jstream",
"revision": "f306cd3e1fa602a1b513114521a0d6a5a9d5c919",
"revisionTime": "2019-02-06T02:23:53Z"
},
{ {
"checksumSHA1": "0rido7hYHQtfq3UJzVT5LClLAWc=", "checksumSHA1": "0rido7hYHQtfq3UJzVT5LClLAWc=",
"path": "github.com/beorn7/perks/quantile", "path": "github.com/beorn7/perks/quantile",

Loading…
Cancel
Save