diff --git a/pkg/s3select/parquet/reader.go b/pkg/s3select/parquet/reader.go index 45d8fec01..5f15e05c7 100644 --- a/pkg/s3select/parquet/reader.go +++ b/pkg/s3select/parquet/reader.go @@ -28,13 +28,13 @@ import ( // Reader - Parquet record reader for S3Select. type Reader struct { - args *ReaderArgs - file *parquetgo.File + args *ReaderArgs + reader *parquetgo.Reader } // Read - reads single record. func (r *Reader) Read() (rec sql.Record, rerr error) { - parquetRecord, err := r.file.Read() + parquetRecord, err := r.reader.Read() if err != nil { if err != io.EOF { return nil, errParquetParsingError(err) @@ -79,12 +79,12 @@ func (r *Reader) Read() (rec sql.Record, rerr error) { // Close - closes underlaying readers. func (r *Reader) Close() error { - return r.file.Close() + return r.reader.Close() } // NewReader - creates new Parquet reader using readerFunc callback. func NewReader(getReaderFunc func(offset, length int64) (io.ReadCloser, error), args *ReaderArgs) (*Reader, error) { - file, err := parquetgo.Open(getReaderFunc, nil) + reader, err := parquetgo.NewReader(getReaderFunc, nil) if err != nil { if err != io.EOF { return nil, errParquetParsingError(err) @@ -94,7 +94,7 @@ func NewReader(getReaderFunc func(offset, length int64) (io.ReadCloser, error), } return &Reader{ - args: args, - file: file, + args: args, + reader: reader, }, nil } diff --git a/vendor/github.com/minio/parquet-go/Makefile b/vendor/github.com/minio/parquet-go/Makefile deleted file mode 100644 index dc06ae83a..000000000 --- a/vendor/github.com/minio/parquet-go/Makefile +++ /dev/null @@ -1,36 +0,0 @@ -GOPATH := $(shell go env GOPATH) - -all: check - -getdeps: - @if [ ! -f ${GOPATH}/bin/golint ]; then echo "Installing golint" && go get -u golang.org/x/lint/golint; fi - @if [ ! -f ${GOPATH}/bin/gocyclo ]; then echo "Installing gocyclo" && go get -u github.com/fzipp/gocyclo; fi - @if [ ! -f ${GOPATH}/bin/misspell ]; then echo "Installing misspell" && go get -u github.com/client9/misspell/cmd/misspell; fi - @if [ ! -f ${GOPATH}/bin/ineffassign ]; then echo "Installing ineffassign" && go get -u github.com/gordonklaus/ineffassign; fi - -vet: - @echo "Running $@" - @go tool vet -atomic -bool -copylocks -nilfunc -printf -shadow -rangeloops -unreachable -unsafeptr -unusedresult *.go - -fmt: - @echo "Running $@" - @gofmt -d *.go - -lint: - @echo "Running $@" - @${GOPATH}/bin/golint -set_exit_status - -cyclo: - @echo "Running $@" - @${GOPATH}/bin/gocyclo -over 200 . - -spelling: - @${GOPATH}/bin/misspell -locale US -error *.go README.md - -ineffassign: - @echo "Running $@" - @${GOPATH}/bin/ineffassign . - -check: getdeps vet fmt lint cyclo spelling ineffassign - @echo "Running unit tests" - @go test -tags kqueue . diff --git a/vendor/github.com/minio/parquet-go/columnchunk.go b/vendor/github.com/minio/parquet-go/columnchunk.go new file mode 100644 index 000000000..090cc16b3 --- /dev/null +++ b/vendor/github.com/minio/parquet-go/columnchunk.go @@ -0,0 +1,149 @@ +/* + * Minio Cloud Storage, (C) 2019 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parquet + +import "github.com/minio/parquet-go/gen-go/parquet" + +type columnChunk struct { + Pages []*page + chunkHeader *parquet.ColumnChunk +} + +func pagesToColumnChunk(pages []*page) *columnChunk { + var numValues, totalUncompressedSize, totalCompressedSize int64 + minVal := pages[0].MinVal + maxVal := pages[0].MaxVal + parquetType, convertedType := pages[0].DataTable.Type, pages[0].DataTable.ConvertedType + + for i := 0; i < len(pages); i++ { + if pages[i].Header.DataPageHeader != nil { + numValues += int64(pages[i].Header.DataPageHeader.NumValues) + } else { + numValues += int64(pages[i].Header.DataPageHeaderV2.NumValues) + } + totalUncompressedSize += int64(pages[i].Header.UncompressedPageSize) + int64(len(pages[i].RawData)) - int64(pages[i].Header.CompressedPageSize) + totalCompressedSize += int64(len(pages[i].RawData)) + minVal = min(minVal, pages[i].MinVal, &parquetType, &convertedType) + maxVal = max(maxVal, pages[i].MaxVal, &parquetType, &convertedType) + } + + metaData := parquet.NewColumnMetaData() + metaData.Type = pages[0].DataType + metaData.Encodings = []parquet.Encoding{ + parquet.Encoding_RLE, + parquet.Encoding_BIT_PACKED, + parquet.Encoding_PLAIN, + // parquet.Encoding_DELTA_BINARY_PACKED, + } + metaData.Codec = pages[0].CompressType + metaData.NumValues = numValues + metaData.TotalCompressedSize = totalCompressedSize + metaData.TotalUncompressedSize = totalUncompressedSize + metaData.PathInSchema = pages[0].Path + metaData.Statistics = parquet.NewStatistics() + if maxVal != nil && minVal != nil { + tmpBufMin := valueToBytes(minVal, parquetType) + tmpBufMax := valueToBytes(maxVal, parquetType) + + if convertedType == parquet.ConvertedType_UTF8 || convertedType == parquet.ConvertedType_DECIMAL { + tmpBufMin = tmpBufMin[4:] + tmpBufMax = tmpBufMax[4:] + } + + metaData.Statistics.Min = tmpBufMin + metaData.Statistics.Max = tmpBufMax + } + + chunk := new(columnChunk) + chunk.Pages = pages + chunk.chunkHeader = parquet.NewColumnChunk() + chunk.chunkHeader.MetaData = metaData + return chunk +} + +func pagesToDictColumnChunk(pages []*page) *columnChunk { + if len(pages) < 2 { + return nil + } + + var numValues, totalUncompressedSize, totalCompressedSize int64 + minVal := pages[1].MinVal + maxVal := pages[1].MaxVal + parquetType, convertedType := pages[1].DataTable.Type, pages[1].DataTable.ConvertedType + + for i := 0; i < len(pages); i++ { + if pages[i].Header.DataPageHeader != nil { + numValues += int64(pages[i].Header.DataPageHeader.NumValues) + } else { + numValues += int64(pages[i].Header.DataPageHeaderV2.NumValues) + } + totalUncompressedSize += int64(pages[i].Header.UncompressedPageSize) + int64(len(pages[i].RawData)) - int64(pages[i].Header.CompressedPageSize) + totalCompressedSize += int64(len(pages[i].RawData)) + if i > 0 { + minVal = min(minVal, pages[i].MinVal, &parquetType, &convertedType) + maxVal = max(maxVal, pages[i].MaxVal, &parquetType, &convertedType) + } + } + + metaData := parquet.NewColumnMetaData() + metaData.Type = pages[1].DataType + metaData.Encodings = []parquet.Encoding{ + parquet.Encoding_RLE, + parquet.Encoding_BIT_PACKED, + parquet.Encoding_PLAIN, + parquet.Encoding_PLAIN_DICTIONARY, + } + metaData.Codec = pages[1].CompressType + metaData.NumValues = numValues + metaData.TotalCompressedSize = totalCompressedSize + metaData.TotalUncompressedSize = totalUncompressedSize + metaData.PathInSchema = pages[1].Path + metaData.Statistics = parquet.NewStatistics() + if maxVal != nil && minVal != nil { + tmpBufMin := valueToBytes(minVal, parquetType) + tmpBufMax := valueToBytes(maxVal, parquetType) + + if convertedType == parquet.ConvertedType_UTF8 || convertedType == parquet.ConvertedType_DECIMAL { + tmpBufMin = tmpBufMin[4:] + tmpBufMax = tmpBufMax[4:] + } + + metaData.Statistics.Min = tmpBufMin + metaData.Statistics.Max = tmpBufMax + } + + chunk := new(columnChunk) + chunk.Pages = pages + chunk.chunkHeader = parquet.NewColumnChunk() + chunk.chunkHeader.MetaData = metaData + return chunk +} + +func decodeDictColumnChunk(chunk *columnChunk) { + dictPage := chunk.Pages[0] + numPages := len(chunk.Pages) + for i := 1; i < numPages; i++ { + numValues := len(chunk.Pages[i].DataTable.Values) + for j := 0; j < numValues; j++ { + if chunk.Pages[i].DataTable.Values[j] != nil { + index := chunk.Pages[i].DataTable.Values[j].(int64) + chunk.Pages[i].DataTable.Values[j] = dictPage.DataTable.Values[index] + } + } + } + chunk.Pages = chunk.Pages[1:] // delete the head dict page +} diff --git a/vendor/github.com/minio/parquet-go/common.go b/vendor/github.com/minio/parquet-go/common.go new file mode 100644 index 000000000..f83958a1b --- /dev/null +++ b/vendor/github.com/minio/parquet-go/common.go @@ -0,0 +1,285 @@ +/* + * Minio Cloud Storage, (C) 2019 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parquet + +import ( + "bytes" + "reflect" + + "github.com/minio/parquet-go/gen-go/parquet" +) + +func valuesToInterfaces(values interface{}, valueType parquet.Type) (tableValues []interface{}) { + switch valueType { + case parquet.Type_BOOLEAN: + for _, v := range values.([]bool) { + tableValues = append(tableValues, v) + } + case parquet.Type_INT32: + for _, v := range values.([]int32) { + tableValues = append(tableValues, v) + } + case parquet.Type_INT64: + for _, v := range values.([]int64) { + tableValues = append(tableValues, v) + } + case parquet.Type_FLOAT: + for _, v := range values.([]float32) { + tableValues = append(tableValues, v) + } + case parquet.Type_DOUBLE: + for _, v := range values.([]float64) { + tableValues = append(tableValues, v) + } + case parquet.Type_INT96, parquet.Type_BYTE_ARRAY, parquet.Type_FIXED_LEN_BYTE_ARRAY: + for _, v := range values.([][]byte) { + tableValues = append(tableValues, v) + } + } + + return tableValues +} + +func interfacesToValues(values []interface{}, valueType parquet.Type) interface{} { + switch valueType { + case parquet.Type_BOOLEAN: + bs := make([]bool, len(values)) + for i := range values { + bs[i] = values[i].(bool) + } + return bs + case parquet.Type_INT32: + i32s := make([]int32, len(values)) + for i := range values { + i32s[i] = values[i].(int32) + } + return i32s + case parquet.Type_INT64: + i64s := make([]int64, len(values)) + for i := range values { + i64s[i] = values[i].(int64) + } + return i64s + case parquet.Type_FLOAT: + f32s := make([]float32, len(values)) + for i := range values { + f32s[i] = values[i].(float32) + } + return f32s + case parquet.Type_DOUBLE: + f64s := make([]float64, len(values)) + for i := range values { + f64s[i] = values[i].(float64) + } + return f64s + case parquet.Type_INT96, parquet.Type_BYTE_ARRAY, parquet.Type_FIXED_LEN_BYTE_ARRAY: + array := make([][]byte, len(values)) + for i := range values { + array[i] = values[i].([]byte) + } + return array + } + + return nil +} + +// sizeOf - get the size of a parquet value +func sizeOf(value interface{}) (size int32) { + v := reflect.ValueOf(value) + if v.IsNil() { + return size + } + + v = v.Elem() + switch v.Kind() { + case reflect.Bool: + size = 1 + case reflect.Int32, reflect.Float32: + size = 4 + case reflect.Int64, reflect.Float64: + size = 8 + case reflect.Slice: + size = int32(v.Len()) + } + + return size +} + +func lessThanBytes(a, b []byte, littleEndianOrder, signed bool) bool { + alen, blen := len(a), len(b) + + if littleEndianOrder { + // Reverse a + for i, j := 0, len(a)-1; i < j; i, j = i+1, j-1 { + a[i], a[j] = a[j], a[i] + } + + // Reverse b + for i, j := 0, len(b)-1; i < j; i, j = i+1, j-1 { + b[i], b[j] = b[j], b[i] + } + } + + // Make a and b are equal sized array. + if alen < blen { + preBytes := make([]byte, blen-alen) + if signed && a[0]&0x80 == 0x80 { + for i := range preBytes { + preBytes[i] = 0xFF + } + } + + a = append(preBytes, a...) + } + + if alen > blen { + preBytes := make([]byte, alen-blen) + if signed && b[0]&0x80 == 0x80 { + for i := range preBytes { + preBytes[i] = 0xFF + } + } + + b = append(preBytes, b...) + } + + if signed { + // If ((BYTE & 0x80) = 0x80) means, BYTE is negative. Hence negative logic is used. + if a[0]&0x80 > b[0]&0x80 { + return true + } + + if a[0]&0x80 < b[0]&0x80 { + return false + } + } + + for i := 0; i < len(a); i++ { + if a[i] < b[i] { + return true + } + + if a[i] > b[i] { + return false + } + } + + return false +} + +// lessThan - returns whether a is less than b. +func lessThan(a, b interface{}, dataType *parquet.Type, convertedType *parquet.ConvertedType) bool { + if a == nil { + if b == nil { + return false + } + + return true + } + + if b == nil { + return false + } + + switch *dataType { + case parquet.Type_BOOLEAN: + return !a.(bool) && b.(bool) + + case parquet.Type_INT32: + if convertedType != nil { + switch *convertedType { + case parquet.ConvertedType_UINT_8, parquet.ConvertedType_UINT_16, parquet.ConvertedType_UINT_32: + return uint32(a.(int32)) < uint32(b.(int32)) + } + } + return a.(int32) < b.(int32) + + case parquet.Type_INT64: + if convertedType != nil && *convertedType == parquet.ConvertedType_UINT_64 { + return uint64(a.(int64)) < uint64(b.(int64)) + } + return a.(int64) < b.(int64) + + case parquet.Type_INT96: + ab := a.([]byte) + bb := b.([]byte) + + // If ((BYTE & 0x80) = 0x80) means, BYTE is negative. Hence negative logic is used. + if ab[11]&0x80 > bb[11]&0x80 { + return true + } + + if ab[11]&0x80 < bb[11]&0x80 { + return false + } + + for i := 11; i >= 0; i-- { + if ab[i] < bb[i] { + return true + } + + if ab[i] > bb[i] { + return false + } + } + + return false + + case parquet.Type_FLOAT: + return a.(float32) < b.(float32) + + case parquet.Type_DOUBLE: + return a.(float64) < b.(float64) + + case parquet.Type_BYTE_ARRAY, parquet.Type_FIXED_LEN_BYTE_ARRAY: + return bytes.Compare(a.([]byte), b.([]byte)) == -1 + } + + return false +} + +func min(a, b interface{}, dataType *parquet.Type, convertedType *parquet.ConvertedType) interface{} { + if a == nil { + return b + } + + if b == nil { + return a + } + + if lessThan(a, b, dataType, convertedType) { + return a + } + + return b +} + +func max(a, b interface{}, dataType *parquet.Type, convertedType *parquet.ConvertedType) interface{} { + if a == nil { + return b + } + + if b == nil { + return a + } + + if lessThan(a, b, dataType, convertedType) { + return b + } + + return a +} diff --git a/vendor/github.com/minio/parquet-go/compression.go b/vendor/github.com/minio/parquet-go/compression.go index 148824e56..07e96cb6c 100644 --- a/vendor/github.com/minio/parquet-go/compression.go +++ b/vendor/github.com/minio/parquet-go/compression.go @@ -29,6 +29,60 @@ import ( type compressionCodec parquet.CompressionCodec +func (c compressionCodec) compress(buf []byte) ([]byte, error) { + switch parquet.CompressionCodec(c) { + case parquet.CompressionCodec_UNCOMPRESSED: + return buf, nil + + case parquet.CompressionCodec_SNAPPY: + return snappy.Encode(nil, buf), nil + + case parquet.CompressionCodec_GZIP: + byteBuf := new(bytes.Buffer) + writer := gzip.NewWriter(byteBuf) + n, err := writer.Write(buf) + if err != nil { + return nil, err + } + if n != len(buf) { + return nil, fmt.Errorf("short writes") + } + + if err = writer.Flush(); err != nil { + return nil, err + } + + if err = writer.Close(); err != nil { + return nil, err + } + + return byteBuf.Bytes(), nil + + case parquet.CompressionCodec_LZ4: + byteBuf := new(bytes.Buffer) + writer := lz4.NewWriter(byteBuf) + n, err := writer.Write(buf) + if err != nil { + return nil, err + } + if n != len(buf) { + return nil, fmt.Errorf("short writes") + } + + if err = writer.Flush(); err != nil { + return nil, err + } + + if err = writer.Close(); err != nil { + return nil, err + } + + return byteBuf.Bytes(), nil + } + + return nil, fmt.Errorf("invalid compression codec %v", c) +} + func (c compressionCodec) uncompress(buf []byte) ([]byte, error) { switch parquet.CompressionCodec(c) { case parquet.CompressionCodec_UNCOMPRESSED: diff --git a/vendor/github.com/minio/parquet-go/decode.go b/vendor/github.com/minio/parquet-go/decode.go index b3bffb201..d2a0888cc 100644 --- a/vendor/github.com/minio/parquet-go/decode.go +++ b/vendor/github.com/minio/parquet-go/decode.go @@ -18,33 +18,12 @@ package parquet import ( "bytes" - "encoding/binary" "fmt" "math" "github.com/minio/parquet-go/gen-go/parquet" ) -func uint32ToBytes(v uint32) []byte { - buf := make([]byte, 4) - binary.LittleEndian.PutUint32(buf, v) - return buf -} - -func uint64ToBytes(v uint64) []byte { - buf := make([]byte, 8) - binary.LittleEndian.PutUint64(buf, v) - return buf -} - -func bytesToUint32(buf []byte) uint32 { - return binary.LittleEndian.Uint32(buf) -} - -func bytesToUint64(buf []byte) uint64 { - return binary.LittleEndian.Uint64(buf) -} - func i64sToi32s(i64s []int64) (i32s []int32) { i32s = make([]int32, len(i64s)) for i := range i64s { diff --git a/vendor/github.com/minio/parquet-go/encode.go b/vendor/github.com/minio/parquet-go/encode.go new file mode 100644 index 000000000..4931ef6fd --- /dev/null +++ b/vendor/github.com/minio/parquet-go/encode.go @@ -0,0 +1,508 @@ +/* + * Minio Cloud Storage, (C) 2019 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parquet + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "math" + + "github.com/minio/parquet-go/gen-go/parquet" +) + +func boolsToBytes(bs []bool) []byte { + size := (len(bs) + 7) / 8 + result := make([]byte, size) + for i := range bs { + if bs[i] { + result[i/8] |= 1 << uint32(i%8) + } + } + + return result +} + +func int32sToBytes(i32s []int32) []byte { + buf := make([]byte, 4*len(i32s)) + for i, i32 := range i32s { + binary.LittleEndian.PutUint32(buf[i*4:], uint32(i32)) + } + return buf +} + +func int64sToBytes(i64s []int64) []byte { + buf := make([]byte, 8*len(i64s)) + for i, i64 := range i64s { + binary.LittleEndian.PutUint64(buf[i*8:], uint64(i64)) + } + return buf +} + +func float32sToBytes(f32s []float32) []byte { + buf := make([]byte, 4*len(f32s)) + for i, f32 := range f32s { + binary.LittleEndian.PutUint32(buf[i*4:], math.Float32bits(f32)) + } + return buf +} + +func float64sToBytes(f64s []float64) []byte { + buf := make([]byte, 8*len(f64s)) + for i, f64 := range f64s { + binary.LittleEndian.PutUint64(buf[i*8:], math.Float64bits(f64)) + } + return buf +} + +func byteSlicesToBytes(byteSlices [][]byte) []byte { + buf := new(bytes.Buffer) + for _, s := range byteSlices { + if err := binary.Write(buf, binary.LittleEndian, uint32(len(s))); err != nil { + panic(err) + } + + if _, err := buf.Write(s); err != nil { + panic(err) + } + } + + return buf.Bytes() +} + +func byteArraysToBytes(arrayList [][]byte) []byte { + buf := new(bytes.Buffer) + arrayLen := -1 + for _, array := range arrayList { + if arrayLen != -1 && len(array) != arrayLen { + panic(errors.New("array list does not have same length")) + } + + arrayLen = len(array) + if _, err := buf.Write(array); err != nil { + panic(err) + } + } + + return buf.Bytes() +} + +func int96sToBytes(i96s [][]byte) []byte { + return byteArraysToBytes(i96s) +} + +func valuesToBytes(values interface{}, dataType parquet.Type) []byte { + switch dataType { + case parquet.Type_BOOLEAN: + return boolsToBytes(values.([]bool)) + case parquet.Type_INT32: + return int32sToBytes(values.([]int32)) + case parquet.Type_INT64: + return int64sToBytes(values.([]int64)) + case parquet.Type_INT96: + return int96sToBytes(values.([][]byte)) + case parquet.Type_FLOAT: + return float32sToBytes(values.([]float32)) + case parquet.Type_DOUBLE: + return float64sToBytes(values.([]float64)) + case parquet.Type_BYTE_ARRAY: + return byteSlicesToBytes(values.([][]byte)) + case parquet.Type_FIXED_LEN_BYTE_ARRAY: + return byteArraysToBytes(values.([][]byte)) + } + + return []byte{} +} + +func valueToBytes(value interface{}, dataType parquet.Type) []byte { + var values interface{} + switch dataType { + case parquet.Type_BOOLEAN: + values = []bool{value.(bool)} + case parquet.Type_INT32: + values = []int32{value.(int32)} + case parquet.Type_INT64: + values = []int64{value.(int64)} + case parquet.Type_INT96: + values = [][]byte{value.([]byte)} + case parquet.Type_FLOAT: + values = []float32{value.(float32)} + case parquet.Type_DOUBLE: + values = []float64{value.(float64)} + case parquet.Type_BYTE_ARRAY, parquet.Type_FIXED_LEN_BYTE_ARRAY: + values = [][]byte{value.([]byte)} + } + + return valuesToBytes(values, dataType) +} + +func unsignedVarIntToBytes(ui64 uint64) []byte { + size := (getBitWidth(ui64) + 6) / 7 + if size == 0 { + return []byte{0} + } + + buf := make([]byte, size) + for i := uint64(0); i < size; i++ { + buf[i] = byte(ui64&0x7F) | 0x80 + ui64 >>= 7 + } + buf[size-1] &= 0x7F + + return buf +} + +func valuesToRLEBytes(values interface{}, bitWidth int32, valueType parquet.Type) []byte { + vals := valuesToInterfaces(values, valueType) + result := []byte{} + j := 0 + for i := 0; i < len(vals); i = j { + for j = i + 1; j < len(vals) && vals[i] == vals[j]; j++ { + } + headerBytes := unsignedVarIntToBytes(uint64((j - i) << 1)) + result = append(result, headerBytes...) + + valBytes := valueToBytes(vals[i], valueType) + byteCount := (bitWidth + 7) / 8 + result = append(result, valBytes[:byteCount]...) + } + + return result +} + +func valuesToRLEBitPackedHybridBytes(values interface{}, bitWidth int32, dataType parquet.Type) []byte { + rleBytes := valuesToRLEBytes(values, bitWidth, dataType) + lenBytes := valueToBytes(int32(len(rleBytes)), parquet.Type_INT32) + return append(lenBytes, rleBytes...) +} + +func valuesToBitPackedBytes(values interface{}, bitWidth int64, withHeader bool, dataType parquet.Type) []byte { + var i64s []int64 + switch dataType { + case parquet.Type_BOOLEAN: + bs := values.([]bool) + i64s = make([]int64, len(bs)) + for i := range bs { + if bs[i] { + i64s[i] = 1 + } + } + case parquet.Type_INT32: + i32s := values.([]int32) + i64s = make([]int64, len(i32s)) + for i := range i32s { + i64s[i] = int64(i32s[i]) + } + case parquet.Type_INT64: + i64s = values.([]int64) + default: + panic(fmt.Errorf("data type %v is not supported for bit packing", dataType)) + } + + if len(i64s) == 0 { + return nil + } + + var valueByte byte + bitsSet := uint64(0) + bitsNeeded := uint64(8) + bitsToSet := uint64(bitWidth) + value := i64s[0] + + valueBytes := []byte{} + for i := 0; i < len(i64s); { + if bitsToSet >= bitsNeeded { + valueByte |= byte(((value >> bitsSet) & ((1 << bitsNeeded) - 1)) << (8 - bitsNeeded)) + valueBytes = append(valueBytes, valueByte) + bitsToSet -= bitsNeeded + bitsSet += bitsNeeded + + bitsNeeded = 8 + valueByte = 0 + + if bitsToSet <= 0 && (i+1) < len(i64s) { + i++ + value = i64s[i] + bitsToSet = uint64(bitWidth) + bitsSet = 0 + } + } else { + valueByte |= byte((value >> bitsSet) << (8 - bitsNeeded)) + i++ + + if i < len(i64s) { + value = i64s[i] + } + + bitsNeeded -= bitsToSet + bitsToSet = uint64(bitWidth) + bitsSet = 0 + } + } + + if withHeader { + header := uint64(((len(i64s) / 8) << 1) | 1) + headerBytes := unsignedVarIntToBytes(header) + return append(headerBytes, valueBytes...) + } + + return valueBytes +} + +func valuesToBitPackedDeprecatedBytes(values interface{}, bitWidth int64, dataType parquet.Type) []byte { + var ui64s []uint64 + switch dataType { + case parquet.Type_INT32: + i32s := values.([]int32) + ui64s = make([]uint64, len(i32s)) + for i := range i32s { + ui64s[i] = uint64(i32s[i]) + } + case parquet.Type_INT64: + i64s := values.([]int64) + ui64s = make([]uint64, len(i64s)) + for i := range i64s { + ui64s[i] = uint64(i64s[i]) + } + default: + panic(fmt.Errorf("data type %v is not supported for bit packing deprecated", dataType)) + } + + if len(ui64s) == 0 { + return nil + } + + result := []byte{} + var curByte byte + curNeed := uint64(8) + valBitLeft := uint64(bitWidth) + val := ui64s[0] << uint64(64-bitWidth) + + for i := 0; i < len(ui64s); { + if valBitLeft > curNeed { + mask := uint64(((1 << curNeed) - 1) << (64 - curNeed)) + curByte |= byte((val & mask) >> (64 - curNeed)) + val <<= curNeed + + valBitLeft -= curNeed + result = append(result, curByte) + curByte = 0 + curNeed = 8 + } else { + curByte |= byte(val >> (64 - curNeed)) + curNeed -= valBitLeft + if curNeed == 0 { + result = append(result, curByte) + curByte = 0 + curNeed = 8 + } + + valBitLeft = uint64(bitWidth) + i++ + if i < len(ui64s) { + val = ui64s[i] << uint64(64-bitWidth) + } + } + } + return result +} + +const ( + blockSize = 128 + subBlockSize = 32 + subBlockCount = blockSize / subBlockSize +) + +var ( + blockSizeBytes = unsignedVarIntToBytes(blockSize) + subBlockCountBytes = unsignedVarIntToBytes(subBlockCount) +) + +func int32ToDeltaBytes(i32s []int32) []byte { + getValue := func(i32 int32) uint64 { + return uint64((i32 >> 31) ^ (i32 << 1)) + } + + result := append([]byte{}, blockSizeBytes...) + result = append(result, subBlockCountBytes...) + result = append(result, unsignedVarIntToBytes(uint64(len(i32s)))...) + result = append(result, unsignedVarIntToBytes(getValue(i32s[0]))...) + + for i := 1; i < len(i32s); { + block := []int32{} + minDelta := int32(0x7FFFFFFF) + + for ; i < len(i32s) && len(block) < blockSize; i++ { + delta := i32s[i] - i32s[i-1] + block = append(block, delta) + if delta < minDelta { + minDelta = delta + } + } + + for len(block) < blockSize { + block = append(block, minDelta) + } + + bitWidths := make([]byte, subBlockCount) + for j := 0; j < subBlockCount; j++ { + maxValue := int32(0) + for k := j * subBlockSize; k < (j+1)*subBlockSize; k++ { + block[k] -= minDelta + if block[k] > maxValue { + maxValue = block[k] + } + } + + bitWidths[j] = byte(getBitWidth(uint64(maxValue))) + } + + minDeltaZigZag := getValue(minDelta) + result = append(result, unsignedVarIntToBytes(minDeltaZigZag)...) + result = append(result, bitWidths...) + + for j := 0; j < subBlockCount; j++ { + bitPacked := valuesToBitPackedBytes( + block[j*subBlockSize:(j+1)*subBlockSize], + int64(bitWidths[j]), + false, + parquet.Type_INT32, + ) + result = append(result, bitPacked...) + } + } + + return result +} + +func int64ToDeltaBytes(i64s []int64) []byte { + getValue := func(i64 int64) uint64 { + return uint64((i64 >> 63) ^ (i64 << 1)) + } + + result := append([]byte{}, blockSizeBytes...) + result = append(result, subBlockCountBytes...) + result = append(result, unsignedVarIntToBytes(uint64(len(i64s)))...) + result = append(result, unsignedVarIntToBytes(getValue(i64s[0]))...) + + for i := 1; i < len(i64s); { + block := []int64{} + minDelta := int64(0x7FFFFFFFFFFFFFFF) + + for ; i < len(i64s) && len(block) < blockSize; i++ { + delta := i64s[i] - i64s[i-1] + block = append(block, delta) + if delta < minDelta { + minDelta = delta + } + } + + for len(block) < blockSize { + block = append(block, minDelta) + } + + bitWidths := make([]byte, subBlockCount) + for j := 0; j < subBlockCount; j++ { + maxValue := int64(0) + for k := j * subBlockSize; k < (j+1)*subBlockSize; k++ { + block[k] -= minDelta + if block[k] > maxValue { + maxValue = block[k] + } + } + + bitWidths[j] = byte(getBitWidth(uint64(maxValue))) + } + + minDeltaZigZag := getValue(minDelta) + result = append(result, unsignedVarIntToBytes(minDeltaZigZag)...) + result = append(result, bitWidths...) + + for j := 0; j < subBlockCount; j++ { + bitPacked := valuesToBitPackedBytes( + block[j*subBlockSize:(j+1)*subBlockSize], + int64(bitWidths[j]), + false, + parquet.Type_INT64, + ) + result = append(result, bitPacked...) + } + } + + return result +} + +func valuesToDeltaBytes(values interface{}, dataType parquet.Type) []byte { + switch dataType { + case parquet.Type_INT32: + return int32ToDeltaBytes(values.([]int32)) + case parquet.Type_INT64: + return int64ToDeltaBytes(values.([]int64)) + } + + return nil +} + +func stringsToDeltaLengthByteArrayBytes(strs []string) []byte { + lengths := make([]int32, len(strs)) + for i, s := range strs { + lengths[i] = int32(len(s)) + } + + result := int32ToDeltaBytes(lengths) + for _, s := range strs { + result = append(result, []byte(s)...) + } + + return result +} + +func stringsToDeltaByteArrayBytes(strs []string) []byte { + prefixLengths := make([]int32, len(strs)) + suffixes := make([]string, len(strs)) + + var i, j int + for i = 1; i < len(strs); i++ { + for j = 0; j < len(strs[i-1]) && j < len(strs[i]); j++ { + if strs[i-1][j] != strs[i][j] { + break + } + } + + prefixLengths[i] = int32(j) + suffixes[i] = strs[i][j:] + } + + result := int32ToDeltaBytes(prefixLengths) + return append(result, stringsToDeltaLengthByteArrayBytes(suffixes)...) +} + +func encodeValues(values interface{}, dataType parquet.Type, encoding parquet.Encoding, bitWidth int32) []byte { + switch encoding { + case parquet.Encoding_RLE: + return valuesToRLEBitPackedHybridBytes(values, bitWidth, dataType) + case parquet.Encoding_DELTA_BINARY_PACKED: + return valuesToDeltaBytes(values, dataType) + case parquet.Encoding_DELTA_BYTE_ARRAY: + return stringsToDeltaByteArrayBytes(values.([]string)) + case parquet.Encoding_DELTA_LENGTH_BYTE_ARRAY: + return stringsToDeltaLengthByteArrayBytes(values.([]string)) + } + + return valuesToBytes(values, dataType) +} diff --git a/vendor/github.com/minio/parquet-go/endian.go b/vendor/github.com/minio/parquet-go/endian.go new file mode 100644 index 000000000..b0f208077 --- /dev/null +++ b/vendor/github.com/minio/parquet-go/endian.go @@ -0,0 +1,50 @@ +/* + * Minio Cloud Storage, (C) 2019 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parquet + +import ( + "encoding/binary" + "math" +) + +func uint32ToBytes(v uint32) []byte { + buf := make([]byte, 4) + binary.LittleEndian.PutUint32(buf, v) + return buf +} + +func uint64ToBytes(v uint64) []byte { + buf := make([]byte, 8) + binary.LittleEndian.PutUint64(buf, v) + return buf +} + +func float32ToBytes(v float32) []byte { + return uint32ToBytes(math.Float32bits(v)) +} + +func float64ToBytes(v float64) []byte { + return uint64ToBytes(math.Float64bits(v)) +} + +func bytesToUint32(buf []byte) uint32 { + return binary.LittleEndian.Uint32(buf) +} + +func bytesToUint64(buf []byte) uint64 { + return binary.LittleEndian.Uint64(buf) +} diff --git a/vendor/github.com/minio/parquet-go/go.mod b/vendor/github.com/minio/parquet-go/go.mod deleted file mode 100644 index b152cf5b4..000000000 --- a/vendor/github.com/minio/parquet-go/go.mod +++ /dev/null @@ -1,8 +0,0 @@ -module github.com/minio/parquet-go - -require ( - git.apache.org/thrift.git v0.12.0 - github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db - github.com/minio/minio-go v6.0.14+incompatible - github.com/pierrec/lz4 v2.0.5+incompatible -) diff --git a/vendor/github.com/minio/parquet-go/go.sum b/vendor/github.com/minio/parquet-go/go.sum deleted file mode 100644 index d2e612fd7..000000000 --- a/vendor/github.com/minio/parquet-go/go.sum +++ /dev/null @@ -1,8 +0,0 @@ -git.apache.org/thrift.git v0.12.0 h1:CMxsZlAmxKs+VAZMlDDL0wXciMblJcutQbEe3A9CYUM= -git.apache.org/thrift.git v0.12.0/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= -github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= -github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/minio/minio-go v6.0.14+incompatible h1:fnV+GD28LeqdN6vT2XdGKW8Qe/IfjJDswNVuni6km9o= -github.com/minio/minio-go v6.0.14+incompatible/go.mod h1:7guKYtitv8dktvNUGrhzmNlA5wrAABTQXCoesZdFQO8= -github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= -github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= diff --git a/vendor/github.com/minio/parquet-go/page.go b/vendor/github.com/minio/parquet-go/page.go index fdee8f351..6a5551733 100644 --- a/vendor/github.com/minio/parquet-go/page.go +++ b/vendor/github.com/minio/parquet-go/page.go @@ -18,6 +18,7 @@ package parquet import ( "bytes" + "context" "fmt" "strings" @@ -312,7 +313,7 @@ type page struct { func newPage() *page { return &page{ Header: parquet.NewPageHeader(), - PageSize: 8 * 1024, + PageSize: defaultPageSize, } } @@ -529,3 +530,418 @@ func (page *page) getValueFromRawData(columnNameIndexMap map[string]int, schemaE return fmt.Errorf("unsupported page type %v", pageType) } + +func (page *page) toDataPage(compressType parquet.CompressionCodec) []byte { + values := []interface{}{} + for i := range page.DataTable.DefinitionLevels { + if page.DataTable.DefinitionLevels[i] == page.DataTable.MaxDefinitionLevel { + values = append(values, page.DataTable.Values[i]) + } + } + valuesBytes := encodeValues(interfacesToValues(values, page.DataTable.Type), page.DataType, page.DataTable.Encoding, page.DataTable.BitWidth) + + var defLevelBytes []byte + if page.DataTable.MaxDefinitionLevel > 0 { + defLevels := make([]int64, len(page.DataTable.DefinitionLevels)) + for i := range page.DataTable.DefinitionLevels { + defLevels[i] = int64(page.DataTable.DefinitionLevels[i]) + } + defLevelBytes = valuesToRLEBitPackedHybridBytes( + defLevels, + int32(getBitWidth(uint64(page.DataTable.MaxDefinitionLevel))), + parquet.Type_INT64, + ) + } + + var repLevelBytes []byte + if page.DataTable.MaxRepetitionLevel > 0 { + repLevels := make([]int64, len(page.DataTable.DefinitionLevels)) + for i := range page.DataTable.DefinitionLevels { + repLevels[i] = int64(page.DataTable.RepetitionLevels[i]) + } + repLevelBytes = valuesToRLEBitPackedHybridBytes( + repLevels, + int32(getBitWidth(uint64(page.DataTable.MaxRepetitionLevel))), + parquet.Type_INT64, + ) + } + + data := repLevelBytes + data = append(data, defLevelBytes...) + data = append(data, valuesBytes...) + + compressedData, err := compressionCodec(compressType).compress(data) + if err != nil { + panic(err) + } + + page.Header = parquet.NewPageHeader() + page.Header.Type = parquet.PageType_DATA_PAGE + page.Header.CompressedPageSize = int32(len(compressedData)) + page.Header.UncompressedPageSize = int32(len(data)) + page.Header.DataPageHeader = parquet.NewDataPageHeader() + page.Header.DataPageHeader.NumValues = int32(len(page.DataTable.DefinitionLevels)) + page.Header.DataPageHeader.DefinitionLevelEncoding = parquet.Encoding_RLE + page.Header.DataPageHeader.RepetitionLevelEncoding = parquet.Encoding_RLE + page.Header.DataPageHeader.Encoding = page.DataTable.Encoding + page.Header.DataPageHeader.Statistics = parquet.NewStatistics() + if page.MaxVal != nil { + tmpBuf := valueToBytes(page.MaxVal, page.DataType) + if page.DataType == parquet.Type_BYTE_ARRAY { + switch page.DataTable.ConvertedType { + case parquet.ConvertedType_UTF8, parquet.ConvertedType_DECIMAL: + tmpBuf = tmpBuf[4:] + } + } + page.Header.DataPageHeader.Statistics.Max = tmpBuf + } + if page.MinVal != nil { + tmpBuf := valueToBytes(page.MinVal, page.DataType) + if page.DataType == parquet.Type_BYTE_ARRAY { + switch page.DataTable.ConvertedType { + case parquet.ConvertedType_UTF8, parquet.ConvertedType_DECIMAL: + tmpBuf = tmpBuf[4:] + } + } + page.Header.DataPageHeader.Statistics.Min = tmpBuf + } + + ts := thrift.NewTSerializer() + ts.Protocol = thrift.NewTCompactProtocolFactory().GetProtocol(ts.Transport) + pageHeaderBytes, err := ts.Write(context.TODO(), page.Header) + if err != nil { + panic(err) + } + + page.RawData = append(pageHeaderBytes, compressedData...) + return page.RawData +} + +func (page *page) toDataPageV2(compressType parquet.CompressionCodec) []byte { + values := []interface{}{} + for i := range page.DataTable.DefinitionLevels { + if page.DataTable.DefinitionLevels[i] == page.DataTable.MaxDefinitionLevel { + values = append(values, page.DataTable.Values[i]) + } + } + valuesBytes := encodeValues(values, page.DataType, page.DataTable.Encoding, page.DataTable.BitWidth) + + var defLevelBytes []byte + if page.DataTable.MaxDefinitionLevel > 0 { + defLevels := make([]int64, len(page.DataTable.DefinitionLevels)) + for i := range page.DataTable.DefinitionLevels { + defLevels[i] = int64(page.DataTable.DefinitionLevels[i]) + } + defLevelBytes = valuesToRLEBytes( + defLevels, + int32(getBitWidth(uint64(page.DataTable.MaxDefinitionLevel))), + parquet.Type_INT64, + ) + } + + var repLevelBytes []byte + numRows := int32(0) + if page.DataTable.MaxRepetitionLevel > 0 { + repLevels := make([]int64, len(page.DataTable.DefinitionLevels)) + for i := range page.DataTable.DefinitionLevels { + repLevels[i] = int64(page.DataTable.RepetitionLevels[i]) + if page.DataTable.RepetitionLevels[i] == 0 { + numRows++ + } + } + repLevelBytes = valuesToRLEBytes( + repLevels, + int32(getBitWidth(uint64(page.DataTable.MaxRepetitionLevel))), + parquet.Type_INT64, + ) + } + + compressedData, err := compressionCodec(compressType).compress(valuesBytes) + if err != nil { + panic(err) + } + + page.Header = parquet.NewPageHeader() + page.Header.Type = parquet.PageType_DATA_PAGE_V2 + page.Header.CompressedPageSize = int32(len(compressedData) + len(defLevelBytes) + len(repLevelBytes)) + page.Header.UncompressedPageSize = int32(len(valuesBytes) + len(defLevelBytes) + len(repLevelBytes)) + page.Header.DataPageHeaderV2 = parquet.NewDataPageHeaderV2() + page.Header.DataPageHeaderV2.NumValues = int32(len(page.DataTable.Values)) + page.Header.DataPageHeaderV2.NumNulls = page.Header.DataPageHeaderV2.NumValues - int32(len(values)) + page.Header.DataPageHeaderV2.NumRows = numRows + page.Header.DataPageHeaderV2.Encoding = page.DataTable.Encoding + page.Header.DataPageHeaderV2.DefinitionLevelsByteLength = int32(len(defLevelBytes)) + page.Header.DataPageHeaderV2.RepetitionLevelsByteLength = int32(len(repLevelBytes)) + page.Header.DataPageHeaderV2.IsCompressed = true + + page.Header.DataPageHeaderV2.Statistics = parquet.NewStatistics() + if page.MaxVal != nil { + tmpBuf := valueToBytes(page.MaxVal, page.DataType) + if page.DataType == parquet.Type_BYTE_ARRAY { + switch page.DataTable.ConvertedType { + case parquet.ConvertedType_UTF8, parquet.ConvertedType_DECIMAL: + tmpBuf = tmpBuf[4:] + } + } + page.Header.DataPageHeaderV2.Statistics.Max = tmpBuf + } + if page.MinVal != nil { + tmpBuf := valueToBytes(page.MinVal, page.DataType) + if page.DataType == parquet.Type_BYTE_ARRAY { + switch page.DataTable.ConvertedType { + case parquet.ConvertedType_UTF8, parquet.ConvertedType_DECIMAL: + tmpBuf = tmpBuf[4:] + } + } + page.Header.DataPageHeaderV2.Statistics.Min = tmpBuf + } + + ts := thrift.NewTSerializer() + ts.Protocol = thrift.NewTCompactProtocolFactory().GetProtocol(ts.Transport) + pageHeaderBytes, err := ts.Write(context.TODO(), page.Header) + if err != nil { + panic(err) + } + + page.RawData = append(pageHeaderBytes, repLevelBytes...) + page.RawData = append(page.RawData, defLevelBytes...) + page.RawData = append(page.RawData, compressedData...) + + return page.RawData +} + +func (page *page) toDictPage(compressType parquet.CompressionCodec, dataType parquet.Type) []byte { + valuesBytes := valuesToBytes(page.DataTable.Values, dataType) + compressedData, err := compressionCodec(compressType).compress(valuesBytes) + if err != nil { + panic(err) + } + + page.Header = parquet.NewPageHeader() + page.Header.Type = parquet.PageType_DICTIONARY_PAGE + page.Header.CompressedPageSize = int32(len(compressedData)) + page.Header.UncompressedPageSize = int32(len(valuesBytes)) + page.Header.DictionaryPageHeader = parquet.NewDictionaryPageHeader() + page.Header.DictionaryPageHeader.NumValues = int32(len(page.DataTable.Values)) + page.Header.DictionaryPageHeader.Encoding = parquet.Encoding_PLAIN + + ts := thrift.NewTSerializer() + ts.Protocol = thrift.NewTCompactProtocolFactory().GetProtocol(ts.Transport) + pageHeaderBytes, err := ts.Write(context.TODO(), page.Header) + if err != nil { + panic(err) + } + + page.RawData = append(pageHeaderBytes, compressedData...) + return page.RawData +} + +func (page *page) toDictDataPage(compressType parquet.CompressionCodec, bitWidth int32) []byte { + valuesBytes := append([]byte{byte(bitWidth)}, valuesToRLEBytes(page.DataTable.Values, bitWidth, parquet.Type_INT32)...) + + var defLevelBytes []byte + if page.DataTable.MaxDefinitionLevel > 0 { + defLevels := make([]int64, len(page.DataTable.DefinitionLevels)) + for i := range page.DataTable.DefinitionLevels { + defLevels[i] = int64(page.DataTable.DefinitionLevels[i]) + } + defLevelBytes = valuesToRLEBitPackedHybridBytes( + defLevels, + int32(getBitWidth(uint64(page.DataTable.MaxDefinitionLevel))), + parquet.Type_INT64, + ) + } + + var repLevelBytes []byte + if page.DataTable.MaxRepetitionLevel > 0 { + repLevels := make([]int64, len(page.DataTable.DefinitionLevels)) + for i := range page.DataTable.DefinitionLevels { + repLevels[i] = int64(page.DataTable.RepetitionLevels[i]) + } + repLevelBytes = valuesToRLEBitPackedHybridBytes( + repLevels, + int32(getBitWidth(uint64(page.DataTable.MaxRepetitionLevel))), + parquet.Type_INT64, + ) + } + + data := append(repLevelBytes, defLevelBytes...) + data = append(data, valuesBytes...) + + compressedData, err := compressionCodec(compressType).compress(data) + if err != nil { + panic(err) + } + + page.Header = parquet.NewPageHeader() + page.Header.Type = parquet.PageType_DATA_PAGE + page.Header.CompressedPageSize = int32(len(compressedData)) + page.Header.UncompressedPageSize = int32(len(data)) + page.Header.DataPageHeader = parquet.NewDataPageHeader() + page.Header.DataPageHeader.NumValues = int32(len(page.DataTable.DefinitionLevels)) + page.Header.DataPageHeader.DefinitionLevelEncoding = parquet.Encoding_RLE + page.Header.DataPageHeader.RepetitionLevelEncoding = parquet.Encoding_RLE + page.Header.DataPageHeader.Encoding = parquet.Encoding_PLAIN_DICTIONARY + + ts := thrift.NewTSerializer() + ts.Protocol = thrift.NewTCompactProtocolFactory().GetProtocol(ts.Transport) + pageHeaderBytes, err := ts.Write(context.TODO(), page.Header) + if err != nil { + panic(err) + } + + page.RawData = append(pageHeaderBytes, compressedData...) + return page.RawData +} + +func tableToDataPages(dataTable *table, pageSize int32, compressType parquet.CompressionCodec) (result []*page, totalSize int64) { + var j int + for i := 0; i < len(dataTable.Values); i = j { + var size, numValues int32 + minVal := dataTable.Values[i] + maxVal := dataTable.Values[i] + for j = i + 1; j < len(dataTable.Values) && size < pageSize; j++ { + if dataTable.DefinitionLevels[j] == dataTable.MaxDefinitionLevel { + numValues++ + size += sizeOf(dataTable.Values[j]) + minVal = min(minVal, dataTable.Values[j], &dataTable.Type, &dataTable.ConvertedType) + maxVal = max(maxVal, dataTable.Values[j], &dataTable.Type, &dataTable.ConvertedType) + } + } + + page := newDataPage() + page.PageSize = pageSize + page.Header.DataPageHeader.NumValues = numValues + page.Header.Type = parquet.PageType_DATA_PAGE + + page.DataTable = new(table) + page.DataTable.RepetitionType = dataTable.RepetitionType + page.DataTable.Path = dataTable.Path + page.DataTable.MaxDefinitionLevel = dataTable.MaxDefinitionLevel + page.DataTable.MaxRepetitionLevel = dataTable.MaxRepetitionLevel + page.DataTable.Values = dataTable.Values[i:j] + page.DataTable.DefinitionLevels = dataTable.DefinitionLevels[i:j] + page.DataTable.RepetitionLevels = dataTable.RepetitionLevels[i:j] + page.DataTable.Type = dataTable.Type + page.DataTable.ConvertedType = dataTable.ConvertedType + page.DataTable.Encoding = dataTable.Encoding + page.MinVal = minVal + page.MaxVal = maxVal + page.DataType = dataTable.Type + page.CompressType = compressType + page.Path = dataTable.Path + + page.toDataPage(compressType) + + totalSize += int64(len(page.RawData)) + result = append(result, page) + } + + return result, totalSize +} + +func tableToDictPage(dataTable *table, pageSize int32, compressType parquet.CompressionCodec) (*page, int64) { + dataType := dataTable.Type + + page := newDataPage() + page.PageSize = pageSize + page.Header.DataPageHeader.NumValues = int32(len(dataTable.Values)) + page.Header.Type = parquet.PageType_DICTIONARY_PAGE + + page.DataTable = new(table) + page.DataTable.RepetitionType = dataTable.RepetitionType + page.DataTable.Path = dataTable.Path + page.DataTable.MaxDefinitionLevel = dataTable.MaxDefinitionLevel + page.DataTable.MaxRepetitionLevel = dataTable.MaxRepetitionLevel + page.DataTable.Values = dataTable.Values + page.DataTable.DefinitionLevels = dataTable.DefinitionLevels + page.DataTable.RepetitionLevels = dataTable.RepetitionLevels + page.DataType = dataType + page.CompressType = compressType + page.Path = dataTable.Path + + page.toDictPage(compressType, dataTable.Type) + return page, int64(len(page.RawData)) +} + +type dictRec struct { + DictMap map[interface{}]int32 + DictSlice []interface{} + Type parquet.Type +} + +func newDictRec(dataType parquet.Type) *dictRec { + return &dictRec{ + DictMap: make(map[interface{}]int32), + Type: dataType, + } +} + +func dictRecToDictPage(dictRec *dictRec, pageSize int32, compressType parquet.CompressionCodec) (*page, int64) { + page := newDataPage() + page.PageSize = pageSize + page.Header.DataPageHeader.NumValues = int32(len(dictRec.DictSlice)) + page.Header.Type = parquet.PageType_DICTIONARY_PAGE + + page.DataTable = new(table) + page.DataTable.Values = dictRec.DictSlice + page.DataType = parquet.Type_INT32 + page.CompressType = compressType + + page.toDictPage(compressType, dictRec.Type) + return page, int64(len(page.RawData)) +} + +func tableToDictDataPages(dictRec *dictRec, dataTable *table, pageSize int32, bitWidth int32, compressType parquet.CompressionCodec) (pages []*page, totalSize int64) { + dataType := dataTable.Type + pT, cT := dataTable.Type, dataTable.ConvertedType + j := 0 + for i := 0; i < len(dataTable.Values); i = j { + j = i + var size, numValues int32 + maxVal := dataTable.Values[i] + minVal := dataTable.Values[i] + + values := make([]interface{}, 0) + for j < len(dataTable.Values) && size < pageSize { + if dataTable.DefinitionLevels[j] == dataTable.MaxDefinitionLevel { + numValues++ + size += int32(sizeOf(dataTable.Values[j])) + maxVal = max(maxVal, dataTable.Values[j], &pT, &cT) + minVal = min(minVal, dataTable.Values[j], &pT, &cT) + if _, ok := dictRec.DictMap[dataTable.Values[j]]; !ok { + dictRec.DictSlice = append(dictRec.DictSlice, dataTable.Values[j]) + dictRec.DictMap[dataTable.Values[j]] = int32(len(dictRec.DictSlice) - 1) + } + values = append(values, int32(dictRec.DictMap[dataTable.Values[j]])) + } + j++ + } + + page := newDataPage() + page.PageSize = pageSize + page.Header.DataPageHeader.NumValues = numValues + page.Header.Type = parquet.PageType_DATA_PAGE + + page.DataTable = new(table) + page.DataTable.RepetitionType = dataTable.RepetitionType + page.DataTable.Path = dataTable.Path + page.DataTable.MaxDefinitionLevel = dataTable.MaxDefinitionLevel + page.DataTable.MaxRepetitionLevel = dataTable.MaxRepetitionLevel + page.DataTable.Values = values + page.DataTable.DefinitionLevels = dataTable.DefinitionLevels[i:j] + page.DataTable.RepetitionLevels = dataTable.RepetitionLevels[i:j] + page.MaxVal = maxVal + page.MinVal = minVal + page.DataType = dataType + page.CompressType = compressType + page.Path = dataTable.Path + + page.toDictDataPage(compressType, bitWidth) + + totalSize += int64(len(page.RawData)) + pages = append(pages, page) + } + + return pages, totalSize +} diff --git a/vendor/github.com/minio/parquet-go/parquet.go b/vendor/github.com/minio/parquet-go/reader.go similarity index 73% rename from vendor/github.com/minio/parquet-go/parquet.go rename to vendor/github.com/minio/parquet-go/reader.go index d9d4339f2..4d8b30633 100644 --- a/vendor/github.com/minio/parquet-go/parquet.go +++ b/vendor/github.com/minio/parquet-go/reader.go @@ -81,8 +81,8 @@ func (value Value) MarshalJSON() (data []byte, err error) { return json.Marshal(value.Value) } -// File - denotes parquet file. -type File struct { +// Reader - denotes parquet file. +type Reader struct { getReaderFunc GetReaderFunc schemaElements []*parquet.SchemaElement rowGroups []*parquet.RowGroup @@ -94,8 +94,8 @@ type File struct { rowIndex int64 } -// Open - opens parquet file with given column names. -func Open(getReaderFunc GetReaderFunc, columnNames set.StringSet) (*File, error) { +// NewReader - creates new parquet reader. Reader calls getReaderFunc to get required data range for given columnNames. If columnNames is empty, all columns are used. +func NewReader(getReaderFunc GetReaderFunc, columnNames set.StringSet) (*Reader, error) { fileMeta, err := fileMetadata(getReaderFunc) if err != nil { return nil, err @@ -107,7 +107,7 @@ func Open(getReaderFunc GetReaderFunc, columnNames set.StringSet) (*File, error) nameList = append(nameList, element.Name) } - return &File{ + return &Reader{ getReaderFunc: getReaderFunc, rowGroups: fileMeta.GetRowGroups(), schemaElements: schemaElements, @@ -117,54 +117,50 @@ func Open(getReaderFunc GetReaderFunc, columnNames set.StringSet) (*File, error) } // Read - reads single record. -func (file *File) Read() (record *Record, err error) { - if file.rowGroupIndex >= len(file.rowGroups) { +func (reader *Reader) Read() (record *Record, err error) { + if reader.rowGroupIndex >= len(reader.rowGroups) { return nil, io.EOF } - if file.columns == nil { - file.columns, err = getColumns( - file.rowGroups[file.rowGroupIndex], - file.columnNames, - file.schemaElements, - file.getReaderFunc, + if reader.columns == nil { + reader.columns, err = getColumns( + reader.rowGroups[reader.rowGroupIndex], + reader.columnNames, + reader.schemaElements, + reader.getReaderFunc, ) if err != nil { return nil, err } - file.rowIndex = 0 + reader.rowIndex = 0 } - if file.rowIndex >= file.rowGroups[file.rowGroupIndex].GetNumRows() { - file.rowGroupIndex++ - file.Close() - return file.Read() + if reader.rowIndex >= reader.rowGroups[reader.rowGroupIndex].GetNumRows() { + reader.rowGroupIndex++ + reader.Close() + return reader.Read() } - record = newRecord(file.nameList) - for name := range file.columns { - value, valueType := file.columns[name].read() + record = newRecord(reader.nameList) + for name := range reader.columns { + value, valueType := reader.columns[name].read() record.set(name, Value{value, valueType}) } - file.rowIndex++ + reader.rowIndex++ return record, nil } // Close - closes underneath readers. -func (file *File) Close() (err error) { - if file.columns != nil { - return nil - } - - for _, column := range file.columns { +func (reader *Reader) Close() (err error) { + for _, column := range reader.columns { column.close() } - file.columns = nil - file.rowIndex = 0 + reader.columns = nil + reader.rowIndex = 0 return nil } diff --git a/vendor/github.com/minio/parquet-go/rowgroup.go b/vendor/github.com/minio/parquet-go/rowgroup.go new file mode 100644 index 000000000..f8e687620 --- /dev/null +++ b/vendor/github.com/minio/parquet-go/rowgroup.go @@ -0,0 +1,54 @@ +/* + * Minio Cloud Storage, (C) 2019 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parquet + +import ( + "strings" + + "github.com/minio/parquet-go/gen-go/parquet" +) + +type rowGroup struct { + Chunks []*columnChunk + RowGroupHeader *parquet.RowGroup +} + +func newRowGroup() *rowGroup { + return &rowGroup{ + RowGroupHeader: parquet.NewRowGroup(), + } +} + +func (rg *rowGroup) rowGroupToTableMap() map[string]*table { + tableMap := make(map[string]*table) + for _, chunk := range rg.Chunks { + columnPath := "" + for _, page := range chunk.Pages { + if columnPath == "" { + columnPath = strings.Join(page.DataTable.Path, ".") + } + + if _, ok := tableMap[columnPath]; !ok { + tableMap[columnPath] = newTableFromTable(page.DataTable) + } + + tableMap[columnPath].Merge(page.DataTable) + } + } + + return tableMap +} diff --git a/vendor/github.com/minio/parquet-go/table.go b/vendor/github.com/minio/parquet-go/table.go index 17db2d47f..77a6b52bb 100644 --- a/vendor/github.com/minio/parquet-go/table.go +++ b/vendor/github.com/minio/parquet-go/table.go @@ -19,34 +19,7 @@ package parquet import "github.com/minio/parquet-go/gen-go/parquet" func getTableValues(values interface{}, valueType parquet.Type) (tableValues []interface{}) { - switch valueType { - case parquet.Type_BOOLEAN: - for _, v := range values.([]bool) { - tableValues = append(tableValues, v) - } - case parquet.Type_INT32: - for _, v := range values.([]int32) { - tableValues = append(tableValues, v) - } - case parquet.Type_INT64: - for _, v := range values.([]int64) { - tableValues = append(tableValues, v) - } - case parquet.Type_FLOAT: - for _, v := range values.([]float32) { - tableValues = append(tableValues, v) - } - case parquet.Type_DOUBLE: - for _, v := range values.([]float64) { - tableValues = append(tableValues, v) - } - case parquet.Type_INT96, parquet.Type_BYTE_ARRAY, parquet.Type_FIXED_LEN_BYTE_ARRAY: - for _, v := range values.([][]byte) { - tableValues = append(tableValues, v) - } - } - - return tableValues + return valuesToInterfaces(values, valueType) } type table struct { @@ -58,6 +31,9 @@ type table struct { Values []interface{} // Parquet values DefinitionLevels []int32 // Definition Levels slice RepetitionLevels []int32 // Repetition Levels slice + ConvertedType parquet.ConvertedType + Encoding parquet.Encoding + BitWidth int32 } func newTableFromTable(srcTable *table) *table { diff --git a/vendor/github.com/minio/parquet-go/writer.go b/vendor/github.com/minio/parquet-go/writer.go new file mode 100644 index 000000000..fbf65857c --- /dev/null +++ b/vendor/github.com/minio/parquet-go/writer.go @@ -0,0 +1,279 @@ +/* + * Minio Cloud Storage, (C) 2019 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package parquet + +import ( + "context" + "encoding/binary" + "fmt" + "io" + "strings" + + "git.apache.org/thrift.git/lib/go/thrift" + "github.com/minio/parquet-go/gen-go/parquet" +) + +const ( + defaultPageSize = 8 * 1024 // 8 KiB + defaultRowGroupSize = 128 * 1024 * 1024 // 128 MiB +) + +// Writer - represents parquet writer. +type Writer struct { + PageSize int64 + RowGroupSize int64 + CompressionType parquet.CompressionCodec + + writeCloser io.WriteCloser + size int64 + numRows int64 + offset int64 + pagesMapBuf map[string][]*page + dictRecs map[string]*dictRec + footer *parquet.FileMetaData + schemaElements []*parquet.SchemaElement + rowGroupCount int + records []map[string]*Value +} + +func (writer *Writer) writeRecords() (err error) { + if len(writer.records) == 0 { + return nil + } + + tableMap := make(map[string]*table) + for _, schema := range writer.schemaElements { + if schema.GetNumChildren() != 0 { + continue + } + + table := new(table) + table.Path = strings.Split(schema.Name, ".") + table.MaxDefinitionLevel = 0 + table.MaxRepetitionLevel = 0 + table.RepetitionType = schema.GetRepetitionType() + table.Type = schema.GetType() + table.ConvertedType = -1 + + for _, record := range writer.records { + value := record[schema.Name] + if *schema.Type != value.Type { + return fmt.Errorf("schema.Type and value.Type are not same") + } + + switch value.Type { + case parquet.Type_BOOLEAN: + table.Values = append(table.Values, value.Value.(bool)) + case parquet.Type_INT32: + table.Values = append(table.Values, value.Value.(int32)) + case parquet.Type_INT64: + table.Values = append(table.Values, value.Value.(int64)) + case parquet.Type_INT96, parquet.Type_BYTE_ARRAY, parquet.Type_FIXED_LEN_BYTE_ARRAY: + table.Values = append(table.Values, value.Value.([]byte)) + case parquet.Type_FLOAT: + table.Values = append(table.Values, value.Value.(float32)) + case parquet.Type_DOUBLE: + table.Values = append(table.Values, value.Value.(float64)) + default: + return fmt.Errorf("unknown parquet type %v", value.Type) + } + + table.DefinitionLevels = append(table.DefinitionLevels, 0) + table.RepetitionLevels = append(table.RepetitionLevels, 0) + } + + tableMap[schema.Name] = table + } + + pagesMap := make(map[string][]*page) + for name, table := range tableMap { + if table.Encoding == parquet.Encoding_PLAIN_DICTIONARY { + if _, ok := writer.dictRecs[name]; !ok { + writer.dictRecs[name] = newDictRec(table.Type) + } + pagesMap[name], _ = tableToDictDataPages(writer.dictRecs[name], table, int32(writer.PageSize), 32, writer.CompressionType) + } else { + pagesMap[name], _ = tableToDataPages(table, int32(writer.PageSize), writer.CompressionType) + } + } + + recordsSize := int64(0) + for name, pages := range pagesMap { + if _, ok := writer.pagesMapBuf[name]; !ok { + writer.pagesMapBuf[name] = pages + } else { + writer.pagesMapBuf[name] = append(writer.pagesMapBuf[name], pages...) + } + for _, page := range pages { + recordsSize += int64(len(page.RawData)) + writer.size += int64(len(page.RawData)) + // As we got raw data, we don't need data table here after + // page.DataTable = nil + } + } + + writer.numRows += int64(len(writer.records)) + + // if len(writer.pagesMapBuf) > 0 && writer.size+recordsSize >= writer.RowGroupSize { + if len(writer.pagesMapBuf) > 0 { + //pages -> chunk + chunkMap := make(map[string]*columnChunk) + for name, pages := range writer.pagesMapBuf { + // FIXME: add page encoding support. + // if len(pages) > 0 && pages[0].Info.Encoding == parquet.Encoding_PLAIN_DICTIONARY { + // dictPage, _ := dictoRecToDictPage(writer.dictRecs[name], int32(writer.PageSize), writer.CompressionType) + // tmp := append([]*page{dictPage}, pages...) + // chunkMap[name] = pagesToDictColumnChunk(tmp) + // } else { + // chunkMap[name] = pagesToColumnChunk(pages) + // } + + chunkMap[name] = pagesToColumnChunk(pages) + } + + writer.dictRecs = make(map[string]*dictRec) + + //chunks -> rowGroup + rowGroup := newRowGroup() + rowGroup.RowGroupHeader.Columns = []*parquet.ColumnChunk{} + + for k := 0; k < len(writer.schemaElements); k++ { + //for _, chunk := range chunkMap { + schema := writer.schemaElements[k] + if schema.GetNumChildren() > 0 { + continue + } + chunk := chunkMap[schema.Name] + if chunk == nil { + continue + } + rowGroup.Chunks = append(rowGroup.Chunks, chunk) + rowGroup.RowGroupHeader.TotalByteSize += chunk.chunkHeader.MetaData.TotalCompressedSize + rowGroup.RowGroupHeader.Columns = append(rowGroup.RowGroupHeader.Columns, chunk.chunkHeader) + } + rowGroup.RowGroupHeader.NumRows = writer.numRows + writer.numRows = 0 + + for k := 0; k < len(rowGroup.Chunks); k++ { + rowGroup.Chunks[k].chunkHeader.MetaData.DataPageOffset = -1 + rowGroup.Chunks[k].chunkHeader.FileOffset = writer.offset + + for l := 0; l < len(rowGroup.Chunks[k].Pages); l++ { + switch { + case rowGroup.Chunks[k].Pages[l].Header.Type == parquet.PageType_DICTIONARY_PAGE: + offset := writer.offset + rowGroup.Chunks[k].chunkHeader.MetaData.DictionaryPageOffset = &offset + case rowGroup.Chunks[k].chunkHeader.MetaData.DataPageOffset <= 0: + rowGroup.Chunks[k].chunkHeader.MetaData.DataPageOffset = writer.offset + } + + data := rowGroup.Chunks[k].Pages[l].RawData + if _, err = writer.writeCloser.Write(data); err != nil { + return err + } + + writer.offset += int64(len(data)) + } + } + + writer.footer.RowGroups = append(writer.footer.RowGroups, rowGroup.RowGroupHeader) + writer.size = 0 + writer.pagesMapBuf = make(map[string][]*page) + } + + writer.footer.NumRows += int64(len(writer.records)) + writer.records = writer.records[:0] + + return nil +} + +// Write - writes a single record. The actual binary data write happens once rowGroupCount records are cached. +func (writer *Writer) Write(record map[string]*Value) (err error) { + writer.records = append(writer.records, record) + if len(writer.records) != writer.rowGroupCount { + return nil + } + + return writer.writeRecords() +} + +func (writer *Writer) finalize() (err error) { + if err = writer.writeRecords(); err != nil { + return err + } + + ts := thrift.NewTSerializer() + ts.Protocol = thrift.NewTCompactProtocolFactory().GetProtocol(ts.Transport) + footerBuf, err := ts.Write(context.TODO(), writer.footer) + if err != nil { + return err + } + + if _, err = writer.writeCloser.Write(footerBuf); err != nil { + return err + } + + footerSizeBuf := make([]byte, 4) + binary.LittleEndian.PutUint32(footerSizeBuf, uint32(len(footerBuf))) + + if _, err = writer.writeCloser.Write(footerSizeBuf); err != nil { + return err + } + + _, err = writer.writeCloser.Write([]byte("PAR1")) + return err +} + +// Close - finalizes and closes writer. If any pending records are available, they are written here. +func (writer *Writer) Close() (err error) { + if err = writer.finalize(); err != nil { + return err + } + + return writer.writeCloser.Close() +} + +// NewWriter - creates new parquet writer. Binary data of rowGroupCount records are written to writeCloser. +func NewWriter(writeCloser io.WriteCloser, schemaElements []*parquet.SchemaElement, rowGroupCount int) (*Writer, error) { + if _, err := writeCloser.Write([]byte("PAR1")); err != nil { + return nil, err + } + + footer := parquet.NewFileMetaData() + footer.Version = 1 + numChildren := int32(len(schemaElements)) + footer.Schema = append(footer.Schema, &parquet.SchemaElement{ + Name: "schema", + RepetitionType: parquet.FieldRepetitionTypePtr(parquet.FieldRepetitionType_REQUIRED), + NumChildren: &numChildren, + }) + footer.Schema = append(footer.Schema, schemaElements...) + + return &Writer{ + PageSize: defaultPageSize, + RowGroupSize: defaultRowGroupSize, + CompressionType: parquet.CompressionCodec_SNAPPY, + + writeCloser: writeCloser, + offset: 4, + pagesMapBuf: make(map[string][]*page), + dictRecs: make(map[string]*dictRec), + footer: footer, + schemaElements: schemaElements, + rowGroupCount: rowGroupCount, + }, nil +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 72ab0c8d8..cd08282dd 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -674,16 +674,16 @@ "revisionTime": "2019-01-20T10:05:29Z" }, { - "checksumSHA1": "FO6q7sC2QTZMGDWv+/cGWJfjTCk=", + "checksumSHA1": "KzX4Kc/eOb8Dd8yJtO8fh3CIxHs=", "path": "github.com/minio/parquet-go", - "revision": "d5e4e922da820530a1851afc22499c826c08f1e8", - "revisionTime": "2019-02-10T14:56:30Z" + "revision": "54766442a4a1f0fee45d70ebbf8acebc6d66f24b", + "revisionTime": "2019-03-13T01:38:37Z" }, { "checksumSHA1": "N4WRPw4p3AN958RH/O53kUsJacQ=", "path": "github.com/minio/parquet-go/gen-go/parquet", - "revision": "d50385ed243d7120cf0f78de3f4f4e171936f12f", - "revisionTime": "2018-11-07T21:57:30Z" + "revision": "54766442a4a1f0fee45d70ebbf8acebc6d66f24b", + "revisionTime": "2019-03-13T01:38:37Z" }, { "checksumSHA1": "cYuXpiVBMypgkEr0Wqd79jPPyBg=",