Fix stale locks held by SelectParquet API (#7364)

Vendorize upstream parquet-go to fix this issue.
master
Harshavardhana 6 years ago committed by GitHub
parent 7079abc931
commit 91d85a0d53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      pkg/s3select/parquet/reader.go
  2. 36
      vendor/github.com/minio/parquet-go/Makefile
  3. 149
      vendor/github.com/minio/parquet-go/columnchunk.go
  4. 285
      vendor/github.com/minio/parquet-go/common.go
  5. 54
      vendor/github.com/minio/parquet-go/compression.go
  6. 21
      vendor/github.com/minio/parquet-go/decode.go
  7. 508
      vendor/github.com/minio/parquet-go/encode.go
  8. 50
      vendor/github.com/minio/parquet-go/endian.go
  9. 8
      vendor/github.com/minio/parquet-go/go.mod
  10. 8
      vendor/github.com/minio/parquet-go/go.sum
  11. 418
      vendor/github.com/minio/parquet-go/page.go
  12. 56
      vendor/github.com/minio/parquet-go/reader.go
  13. 54
      vendor/github.com/minio/parquet-go/rowgroup.go
  14. 32
      vendor/github.com/minio/parquet-go/table.go
  15. 279
      vendor/github.com/minio/parquet-go/writer.go
  16. 10
      vendor/vendor.json

@ -28,13 +28,13 @@ import (
// Reader - Parquet record reader for S3Select.
type Reader struct {
args *ReaderArgs
file *parquetgo.File
args *ReaderArgs
reader *parquetgo.Reader
}
// Read - reads single record.
func (r *Reader) Read() (rec sql.Record, rerr error) {
parquetRecord, err := r.file.Read()
parquetRecord, err := r.reader.Read()
if err != nil {
if err != io.EOF {
return nil, errParquetParsingError(err)
@ -79,12 +79,12 @@ func (r *Reader) Read() (rec sql.Record, rerr error) {
// Close - closes underlaying readers.
func (r *Reader) Close() error {
return r.file.Close()
return r.reader.Close()
}
// NewReader - creates new Parquet reader using readerFunc callback.
func NewReader(getReaderFunc func(offset, length int64) (io.ReadCloser, error), args *ReaderArgs) (*Reader, error) {
file, err := parquetgo.Open(getReaderFunc, nil)
reader, err := parquetgo.NewReader(getReaderFunc, nil)
if err != nil {
if err != io.EOF {
return nil, errParquetParsingError(err)
@ -94,7 +94,7 @@ func NewReader(getReaderFunc func(offset, length int64) (io.ReadCloser, error),
}
return &Reader{
args: args,
file: file,
args: args,
reader: reader,
}, nil
}

@ -1,36 +0,0 @@
GOPATH := $(shell go env GOPATH)
all: check
getdeps:
@if [ ! -f ${GOPATH}/bin/golint ]; then echo "Installing golint" && go get -u golang.org/x/lint/golint; fi
@if [ ! -f ${GOPATH}/bin/gocyclo ]; then echo "Installing gocyclo" && go get -u github.com/fzipp/gocyclo; fi
@if [ ! -f ${GOPATH}/bin/misspell ]; then echo "Installing misspell" && go get -u github.com/client9/misspell/cmd/misspell; fi
@if [ ! -f ${GOPATH}/bin/ineffassign ]; then echo "Installing ineffassign" && go get -u github.com/gordonklaus/ineffassign; fi
vet:
@echo "Running $@"
@go tool vet -atomic -bool -copylocks -nilfunc -printf -shadow -rangeloops -unreachable -unsafeptr -unusedresult *.go
fmt:
@echo "Running $@"
@gofmt -d *.go
lint:
@echo "Running $@"
@${GOPATH}/bin/golint -set_exit_status
cyclo:
@echo "Running $@"
@${GOPATH}/bin/gocyclo -over 200 .
spelling:
@${GOPATH}/bin/misspell -locale US -error *.go README.md
ineffassign:
@echo "Running $@"
@${GOPATH}/bin/ineffassign .
check: getdeps vet fmt lint cyclo spelling ineffassign
@echo "Running unit tests"
@go test -tags kqueue .

@ -0,0 +1,149 @@
/*
* Minio Cloud Storage, (C) 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package parquet
import "github.com/minio/parquet-go/gen-go/parquet"
type columnChunk struct {
Pages []*page
chunkHeader *parquet.ColumnChunk
}
func pagesToColumnChunk(pages []*page) *columnChunk {
var numValues, totalUncompressedSize, totalCompressedSize int64
minVal := pages[0].MinVal
maxVal := pages[0].MaxVal
parquetType, convertedType := pages[0].DataTable.Type, pages[0].DataTable.ConvertedType
for i := 0; i < len(pages); i++ {
if pages[i].Header.DataPageHeader != nil {
numValues += int64(pages[i].Header.DataPageHeader.NumValues)
} else {
numValues += int64(pages[i].Header.DataPageHeaderV2.NumValues)
}
totalUncompressedSize += int64(pages[i].Header.UncompressedPageSize) + int64(len(pages[i].RawData)) - int64(pages[i].Header.CompressedPageSize)
totalCompressedSize += int64(len(pages[i].RawData))
minVal = min(minVal, pages[i].MinVal, &parquetType, &convertedType)
maxVal = max(maxVal, pages[i].MaxVal, &parquetType, &convertedType)
}
metaData := parquet.NewColumnMetaData()
metaData.Type = pages[0].DataType
metaData.Encodings = []parquet.Encoding{
parquet.Encoding_RLE,
parquet.Encoding_BIT_PACKED,
parquet.Encoding_PLAIN,
// parquet.Encoding_DELTA_BINARY_PACKED,
}
metaData.Codec = pages[0].CompressType
metaData.NumValues = numValues
metaData.TotalCompressedSize = totalCompressedSize
metaData.TotalUncompressedSize = totalUncompressedSize
metaData.PathInSchema = pages[0].Path
metaData.Statistics = parquet.NewStatistics()
if maxVal != nil && minVal != nil {
tmpBufMin := valueToBytes(minVal, parquetType)
tmpBufMax := valueToBytes(maxVal, parquetType)
if convertedType == parquet.ConvertedType_UTF8 || convertedType == parquet.ConvertedType_DECIMAL {
tmpBufMin = tmpBufMin[4:]
tmpBufMax = tmpBufMax[4:]
}
metaData.Statistics.Min = tmpBufMin
metaData.Statistics.Max = tmpBufMax
}
chunk := new(columnChunk)
chunk.Pages = pages
chunk.chunkHeader = parquet.NewColumnChunk()
chunk.chunkHeader.MetaData = metaData
return chunk
}
func pagesToDictColumnChunk(pages []*page) *columnChunk {
if len(pages) < 2 {
return nil
}
var numValues, totalUncompressedSize, totalCompressedSize int64
minVal := pages[1].MinVal
maxVal := pages[1].MaxVal
parquetType, convertedType := pages[1].DataTable.Type, pages[1].DataTable.ConvertedType
for i := 0; i < len(pages); i++ {
if pages[i].Header.DataPageHeader != nil {
numValues += int64(pages[i].Header.DataPageHeader.NumValues)
} else {
numValues += int64(pages[i].Header.DataPageHeaderV2.NumValues)
}
totalUncompressedSize += int64(pages[i].Header.UncompressedPageSize) + int64(len(pages[i].RawData)) - int64(pages[i].Header.CompressedPageSize)
totalCompressedSize += int64(len(pages[i].RawData))
if i > 0 {
minVal = min(minVal, pages[i].MinVal, &parquetType, &convertedType)
maxVal = max(maxVal, pages[i].MaxVal, &parquetType, &convertedType)
}
}
metaData := parquet.NewColumnMetaData()
metaData.Type = pages[1].DataType
metaData.Encodings = []parquet.Encoding{
parquet.Encoding_RLE,
parquet.Encoding_BIT_PACKED,
parquet.Encoding_PLAIN,
parquet.Encoding_PLAIN_DICTIONARY,
}
metaData.Codec = pages[1].CompressType
metaData.NumValues = numValues
metaData.TotalCompressedSize = totalCompressedSize
metaData.TotalUncompressedSize = totalUncompressedSize
metaData.PathInSchema = pages[1].Path
metaData.Statistics = parquet.NewStatistics()
if maxVal != nil && minVal != nil {
tmpBufMin := valueToBytes(minVal, parquetType)
tmpBufMax := valueToBytes(maxVal, parquetType)
if convertedType == parquet.ConvertedType_UTF8 || convertedType == parquet.ConvertedType_DECIMAL {
tmpBufMin = tmpBufMin[4:]
tmpBufMax = tmpBufMax[4:]
}
metaData.Statistics.Min = tmpBufMin
metaData.Statistics.Max = tmpBufMax
}
chunk := new(columnChunk)
chunk.Pages = pages
chunk.chunkHeader = parquet.NewColumnChunk()
chunk.chunkHeader.MetaData = metaData
return chunk
}
func decodeDictColumnChunk(chunk *columnChunk) {
dictPage := chunk.Pages[0]
numPages := len(chunk.Pages)
for i := 1; i < numPages; i++ {
numValues := len(chunk.Pages[i].DataTable.Values)
for j := 0; j < numValues; j++ {
if chunk.Pages[i].DataTable.Values[j] != nil {
index := chunk.Pages[i].DataTable.Values[j].(int64)
chunk.Pages[i].DataTable.Values[j] = dictPage.DataTable.Values[index]
}
}
}
chunk.Pages = chunk.Pages[1:] // delete the head dict page
}

@ -0,0 +1,285 @@
/*
* Minio Cloud Storage, (C) 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package parquet
import (
"bytes"
"reflect"
"github.com/minio/parquet-go/gen-go/parquet"
)
func valuesToInterfaces(values interface{}, valueType parquet.Type) (tableValues []interface{}) {
switch valueType {
case parquet.Type_BOOLEAN:
for _, v := range values.([]bool) {
tableValues = append(tableValues, v)
}
case parquet.Type_INT32:
for _, v := range values.([]int32) {
tableValues = append(tableValues, v)
}
case parquet.Type_INT64:
for _, v := range values.([]int64) {
tableValues = append(tableValues, v)
}
case parquet.Type_FLOAT:
for _, v := range values.([]float32) {
tableValues = append(tableValues, v)
}
case parquet.Type_DOUBLE:
for _, v := range values.([]float64) {
tableValues = append(tableValues, v)
}
case parquet.Type_INT96, parquet.Type_BYTE_ARRAY, parquet.Type_FIXED_LEN_BYTE_ARRAY:
for _, v := range values.([][]byte) {
tableValues = append(tableValues, v)
}
}
return tableValues
}
func interfacesToValues(values []interface{}, valueType parquet.Type) interface{} {
switch valueType {
case parquet.Type_BOOLEAN:
bs := make([]bool, len(values))
for i := range values {
bs[i] = values[i].(bool)
}
return bs
case parquet.Type_INT32:
i32s := make([]int32, len(values))
for i := range values {
i32s[i] = values[i].(int32)
}
return i32s
case parquet.Type_INT64:
i64s := make([]int64, len(values))
for i := range values {
i64s[i] = values[i].(int64)
}
return i64s
case parquet.Type_FLOAT:
f32s := make([]float32, len(values))
for i := range values {
f32s[i] = values[i].(float32)
}
return f32s
case parquet.Type_DOUBLE:
f64s := make([]float64, len(values))
for i := range values {
f64s[i] = values[i].(float64)
}
return f64s
case parquet.Type_INT96, parquet.Type_BYTE_ARRAY, parquet.Type_FIXED_LEN_BYTE_ARRAY:
array := make([][]byte, len(values))
for i := range values {
array[i] = values[i].([]byte)
}
return array
}
return nil
}
// sizeOf - get the size of a parquet value
func sizeOf(value interface{}) (size int32) {
v := reflect.ValueOf(value)
if v.IsNil() {
return size
}
v = v.Elem()
switch v.Kind() {
case reflect.Bool:
size = 1
case reflect.Int32, reflect.Float32:
size = 4
case reflect.Int64, reflect.Float64:
size = 8
case reflect.Slice:
size = int32(v.Len())
}
return size
}
func lessThanBytes(a, b []byte, littleEndianOrder, signed bool) bool {
alen, blen := len(a), len(b)
if littleEndianOrder {
// Reverse a
for i, j := 0, len(a)-1; i < j; i, j = i+1, j-1 {
a[i], a[j] = a[j], a[i]
}
// Reverse b
for i, j := 0, len(b)-1; i < j; i, j = i+1, j-1 {
b[i], b[j] = b[j], b[i]
}
}
// Make a and b are equal sized array.
if alen < blen {
preBytes := make([]byte, blen-alen)
if signed && a[0]&0x80 == 0x80 {
for i := range preBytes {
preBytes[i] = 0xFF
}
}
a = append(preBytes, a...)
}
if alen > blen {
preBytes := make([]byte, alen-blen)
if signed && b[0]&0x80 == 0x80 {
for i := range preBytes {
preBytes[i] = 0xFF
}
}
b = append(preBytes, b...)
}
if signed {
// If ((BYTE & 0x80) = 0x80) means, BYTE is negative. Hence negative logic is used.
if a[0]&0x80 > b[0]&0x80 {
return true
}
if a[0]&0x80 < b[0]&0x80 {
return false
}
}
for i := 0; i < len(a); i++ {
if a[i] < b[i] {
return true
}
if a[i] > b[i] {
return false
}
}
return false
}
// lessThan - returns whether a is less than b.
func lessThan(a, b interface{}, dataType *parquet.Type, convertedType *parquet.ConvertedType) bool {
if a == nil {
if b == nil {
return false
}
return true
}
if b == nil {
return false
}
switch *dataType {
case parquet.Type_BOOLEAN:
return !a.(bool) && b.(bool)
case parquet.Type_INT32:
if convertedType != nil {
switch *convertedType {
case parquet.ConvertedType_UINT_8, parquet.ConvertedType_UINT_16, parquet.ConvertedType_UINT_32:
return uint32(a.(int32)) < uint32(b.(int32))
}
}
return a.(int32) < b.(int32)
case parquet.Type_INT64:
if convertedType != nil && *convertedType == parquet.ConvertedType_UINT_64 {
return uint64(a.(int64)) < uint64(b.(int64))
}
return a.(int64) < b.(int64)
case parquet.Type_INT96:
ab := a.([]byte)
bb := b.([]byte)
// If ((BYTE & 0x80) = 0x80) means, BYTE is negative. Hence negative logic is used.
if ab[11]&0x80 > bb[11]&0x80 {
return true
}
if ab[11]&0x80 < bb[11]&0x80 {
return false
}
for i := 11; i >= 0; i-- {
if ab[i] < bb[i] {
return true
}
if ab[i] > bb[i] {
return false
}
}
return false
case parquet.Type_FLOAT:
return a.(float32) < b.(float32)
case parquet.Type_DOUBLE:
return a.(float64) < b.(float64)
case parquet.Type_BYTE_ARRAY, parquet.Type_FIXED_LEN_BYTE_ARRAY:
return bytes.Compare(a.([]byte), b.([]byte)) == -1
}
return false
}
func min(a, b interface{}, dataType *parquet.Type, convertedType *parquet.ConvertedType) interface{} {
if a == nil {
return b
}
if b == nil {
return a
}
if lessThan(a, b, dataType, convertedType) {
return a
}
return b
}
func max(a, b interface{}, dataType *parquet.Type, convertedType *parquet.ConvertedType) interface{} {
if a == nil {
return b
}
if b == nil {
return a
}
if lessThan(a, b, dataType, convertedType) {
return b
}
return a
}

@ -29,6 +29,60 @@ import (
type compressionCodec parquet.CompressionCodec
func (c compressionCodec) compress(buf []byte) ([]byte, error) {
switch parquet.CompressionCodec(c) {
case parquet.CompressionCodec_UNCOMPRESSED:
return buf, nil
case parquet.CompressionCodec_SNAPPY:
return snappy.Encode(nil, buf), nil
case parquet.CompressionCodec_GZIP:
byteBuf := new(bytes.Buffer)
writer := gzip.NewWriter(byteBuf)
n, err := writer.Write(buf)
if err != nil {
return nil, err
}
if n != len(buf) {
return nil, fmt.Errorf("short writes")
}
if err = writer.Flush(); err != nil {
return nil, err
}
if err = writer.Close(); err != nil {
return nil, err
}
return byteBuf.Bytes(), nil
case parquet.CompressionCodec_LZ4:
byteBuf := new(bytes.Buffer)
writer := lz4.NewWriter(byteBuf)
n, err := writer.Write(buf)
if err != nil {
return nil, err
}
if n != len(buf) {
return nil, fmt.Errorf("short writes")
}
if err = writer.Flush(); err != nil {
return nil, err
}
if err = writer.Close(); err != nil {
return nil, err
}
return byteBuf.Bytes(), nil
}
return nil, fmt.Errorf("invalid compression codec %v", c)
}
func (c compressionCodec) uncompress(buf []byte) ([]byte, error) {
switch parquet.CompressionCodec(c) {
case parquet.CompressionCodec_UNCOMPRESSED:

@ -18,33 +18,12 @@ package parquet
import (
"bytes"
"encoding/binary"
"fmt"
"math"
"github.com/minio/parquet-go/gen-go/parquet"
)
func uint32ToBytes(v uint32) []byte {
buf := make([]byte, 4)
binary.LittleEndian.PutUint32(buf, v)
return buf
}
func uint64ToBytes(v uint64) []byte {
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, v)
return buf
}
func bytesToUint32(buf []byte) uint32 {
return binary.LittleEndian.Uint32(buf)
}
func bytesToUint64(buf []byte) uint64 {
return binary.LittleEndian.Uint64(buf)
}
func i64sToi32s(i64s []int64) (i32s []int32) {
i32s = make([]int32, len(i64s))
for i := range i64s {

@ -0,0 +1,508 @@
/*
* Minio Cloud Storage, (C) 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package parquet
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"math"
"github.com/minio/parquet-go/gen-go/parquet"
)
func boolsToBytes(bs []bool) []byte {
size := (len(bs) + 7) / 8
result := make([]byte, size)
for i := range bs {
if bs[i] {
result[i/8] |= 1 << uint32(i%8)
}
}
return result
}
func int32sToBytes(i32s []int32) []byte {
buf := make([]byte, 4*len(i32s))
for i, i32 := range i32s {
binary.LittleEndian.PutUint32(buf[i*4:], uint32(i32))
}
return buf
}
func int64sToBytes(i64s []int64) []byte {
buf := make([]byte, 8*len(i64s))
for i, i64 := range i64s {
binary.LittleEndian.PutUint64(buf[i*8:], uint64(i64))
}
return buf
}
func float32sToBytes(f32s []float32) []byte {
buf := make([]byte, 4*len(f32s))
for i, f32 := range f32s {
binary.LittleEndian.PutUint32(buf[i*4:], math.Float32bits(f32))
}
return buf
}
func float64sToBytes(f64s []float64) []byte {
buf := make([]byte, 8*len(f64s))
for i, f64 := range f64s {
binary.LittleEndian.PutUint64(buf[i*8:], math.Float64bits(f64))
}
return buf
}
func byteSlicesToBytes(byteSlices [][]byte) []byte {
buf := new(bytes.Buffer)
for _, s := range byteSlices {
if err := binary.Write(buf, binary.LittleEndian, uint32(len(s))); err != nil {
panic(err)
}
if _, err := buf.Write(s); err != nil {
panic(err)
}
}
return buf.Bytes()
}
func byteArraysToBytes(arrayList [][]byte) []byte {
buf := new(bytes.Buffer)
arrayLen := -1
for _, array := range arrayList {
if arrayLen != -1 && len(array) != arrayLen {
panic(errors.New("array list does not have same length"))
}
arrayLen = len(array)
if _, err := buf.Write(array); err != nil {
panic(err)
}
}
return buf.Bytes()
}
func int96sToBytes(i96s [][]byte) []byte {
return byteArraysToBytes(i96s)
}
func valuesToBytes(values interface{}, dataType parquet.Type) []byte {
switch dataType {
case parquet.Type_BOOLEAN:
return boolsToBytes(values.([]bool))
case parquet.Type_INT32:
return int32sToBytes(values.([]int32))
case parquet.Type_INT64:
return int64sToBytes(values.([]int64))
case parquet.Type_INT96:
return int96sToBytes(values.([][]byte))
case parquet.Type_FLOAT:
return float32sToBytes(values.([]float32))
case parquet.Type_DOUBLE:
return float64sToBytes(values.([]float64))
case parquet.Type_BYTE_ARRAY:
return byteSlicesToBytes(values.([][]byte))
case parquet.Type_FIXED_LEN_BYTE_ARRAY:
return byteArraysToBytes(values.([][]byte))
}
return []byte{}
}
func valueToBytes(value interface{}, dataType parquet.Type) []byte {
var values interface{}
switch dataType {
case parquet.Type_BOOLEAN:
values = []bool{value.(bool)}
case parquet.Type_INT32:
values = []int32{value.(int32)}
case parquet.Type_INT64:
values = []int64{value.(int64)}
case parquet.Type_INT96:
values = [][]byte{value.([]byte)}
case parquet.Type_FLOAT:
values = []float32{value.(float32)}
case parquet.Type_DOUBLE:
values = []float64{value.(float64)}
case parquet.Type_BYTE_ARRAY, parquet.Type_FIXED_LEN_BYTE_ARRAY:
values = [][]byte{value.([]byte)}
}
return valuesToBytes(values, dataType)
}
func unsignedVarIntToBytes(ui64 uint64) []byte {
size := (getBitWidth(ui64) + 6) / 7
if size == 0 {
return []byte{0}
}
buf := make([]byte, size)
for i := uint64(0); i < size; i++ {
buf[i] = byte(ui64&0x7F) | 0x80
ui64 >>= 7
}
buf[size-1] &= 0x7F
return buf
}
func valuesToRLEBytes(values interface{}, bitWidth int32, valueType parquet.Type) []byte {
vals := valuesToInterfaces(values, valueType)
result := []byte{}
j := 0
for i := 0; i < len(vals); i = j {
for j = i + 1; j < len(vals) && vals[i] == vals[j]; j++ {
}
headerBytes := unsignedVarIntToBytes(uint64((j - i) << 1))
result = append(result, headerBytes...)
valBytes := valueToBytes(vals[i], valueType)
byteCount := (bitWidth + 7) / 8
result = append(result, valBytes[:byteCount]...)
}
return result
}
func valuesToRLEBitPackedHybridBytes(values interface{}, bitWidth int32, dataType parquet.Type) []byte {
rleBytes := valuesToRLEBytes(values, bitWidth, dataType)
lenBytes := valueToBytes(int32(len(rleBytes)), parquet.Type_INT32)
return append(lenBytes, rleBytes...)
}
func valuesToBitPackedBytes(values interface{}, bitWidth int64, withHeader bool, dataType parquet.Type) []byte {
var i64s []int64
switch dataType {
case parquet.Type_BOOLEAN:
bs := values.([]bool)
i64s = make([]int64, len(bs))
for i := range bs {
if bs[i] {
i64s[i] = 1
}
}
case parquet.Type_INT32:
i32s := values.([]int32)
i64s = make([]int64, len(i32s))
for i := range i32s {
i64s[i] = int64(i32s[i])
}
case parquet.Type_INT64:
i64s = values.([]int64)
default:
panic(fmt.Errorf("data type %v is not supported for bit packing", dataType))
}
if len(i64s) == 0 {
return nil
}
var valueByte byte
bitsSet := uint64(0)
bitsNeeded := uint64(8)
bitsToSet := uint64(bitWidth)
value := i64s[0]
valueBytes := []byte{}
for i := 0; i < len(i64s); {
if bitsToSet >= bitsNeeded {
valueByte |= byte(((value >> bitsSet) & ((1 << bitsNeeded) - 1)) << (8 - bitsNeeded))
valueBytes = append(valueBytes, valueByte)
bitsToSet -= bitsNeeded
bitsSet += bitsNeeded
bitsNeeded = 8
valueByte = 0
if bitsToSet <= 0 && (i+1) < len(i64s) {
i++
value = i64s[i]
bitsToSet = uint64(bitWidth)
bitsSet = 0
}
} else {
valueByte |= byte((value >> bitsSet) << (8 - bitsNeeded))
i++
if i < len(i64s) {
value = i64s[i]
}
bitsNeeded -= bitsToSet
bitsToSet = uint64(bitWidth)
bitsSet = 0
}
}
if withHeader {
header := uint64(((len(i64s) / 8) << 1) | 1)
headerBytes := unsignedVarIntToBytes(header)
return append(headerBytes, valueBytes...)
}
return valueBytes
}
func valuesToBitPackedDeprecatedBytes(values interface{}, bitWidth int64, dataType parquet.Type) []byte {
var ui64s []uint64
switch dataType {
case parquet.Type_INT32:
i32s := values.([]int32)
ui64s = make([]uint64, len(i32s))
for i := range i32s {
ui64s[i] = uint64(i32s[i])
}
case parquet.Type_INT64:
i64s := values.([]int64)
ui64s = make([]uint64, len(i64s))
for i := range i64s {
ui64s[i] = uint64(i64s[i])
}
default:
panic(fmt.Errorf("data type %v is not supported for bit packing deprecated", dataType))
}
if len(ui64s) == 0 {
return nil
}
result := []byte{}
var curByte byte
curNeed := uint64(8)
valBitLeft := uint64(bitWidth)
val := ui64s[0] << uint64(64-bitWidth)
for i := 0; i < len(ui64s); {
if valBitLeft > curNeed {
mask := uint64(((1 << curNeed) - 1) << (64 - curNeed))
curByte |= byte((val & mask) >> (64 - curNeed))
val <<= curNeed
valBitLeft -= curNeed
result = append(result, curByte)
curByte = 0
curNeed = 8
} else {
curByte |= byte(val >> (64 - curNeed))
curNeed -= valBitLeft
if curNeed == 0 {
result = append(result, curByte)
curByte = 0
curNeed = 8
}
valBitLeft = uint64(bitWidth)
i++
if i < len(ui64s) {
val = ui64s[i] << uint64(64-bitWidth)
}
}
}
return result
}
const (
blockSize = 128
subBlockSize = 32
subBlockCount = blockSize / subBlockSize
)
var (
blockSizeBytes = unsignedVarIntToBytes(blockSize)
subBlockCountBytes = unsignedVarIntToBytes(subBlockCount)
)
func int32ToDeltaBytes(i32s []int32) []byte {
getValue := func(i32 int32) uint64 {
return uint64((i32 >> 31) ^ (i32 << 1))
}
result := append([]byte{}, blockSizeBytes...)
result = append(result, subBlockCountBytes...)
result = append(result, unsignedVarIntToBytes(uint64(len(i32s)))...)
result = append(result, unsignedVarIntToBytes(getValue(i32s[0]))...)
for i := 1; i < len(i32s); {
block := []int32{}
minDelta := int32(0x7FFFFFFF)
for ; i < len(i32s) && len(block) < blockSize; i++ {
delta := i32s[i] - i32s[i-1]
block = append(block, delta)
if delta < minDelta {
minDelta = delta
}
}
for len(block) < blockSize {
block = append(block, minDelta)
}
bitWidths := make([]byte, subBlockCount)
for j := 0; j < subBlockCount; j++ {
maxValue := int32(0)
for k := j * subBlockSize; k < (j+1)*subBlockSize; k++ {
block[k] -= minDelta
if block[k] > maxValue {
maxValue = block[k]
}
}
bitWidths[j] = byte(getBitWidth(uint64(maxValue)))
}
minDeltaZigZag := getValue(minDelta)
result = append(result, unsignedVarIntToBytes(minDeltaZigZag)...)
result = append(result, bitWidths...)
for j := 0; j < subBlockCount; j++ {
bitPacked := valuesToBitPackedBytes(
block[j*subBlockSize:(j+1)*subBlockSize],
int64(bitWidths[j]),
false,
parquet.Type_INT32,
)
result = append(result, bitPacked...)
}
}
return result
}
func int64ToDeltaBytes(i64s []int64) []byte {
getValue := func(i64 int64) uint64 {
return uint64((i64 >> 63) ^ (i64 << 1))
}
result := append([]byte{}, blockSizeBytes...)
result = append(result, subBlockCountBytes...)
result = append(result, unsignedVarIntToBytes(uint64(len(i64s)))...)
result = append(result, unsignedVarIntToBytes(getValue(i64s[0]))...)
for i := 1; i < len(i64s); {
block := []int64{}
minDelta := int64(0x7FFFFFFFFFFFFFFF)
for ; i < len(i64s) && len(block) < blockSize; i++ {
delta := i64s[i] - i64s[i-1]
block = append(block, delta)
if delta < minDelta {
minDelta = delta
}
}
for len(block) < blockSize {
block = append(block, minDelta)
}
bitWidths := make([]byte, subBlockCount)
for j := 0; j < subBlockCount; j++ {
maxValue := int64(0)
for k := j * subBlockSize; k < (j+1)*subBlockSize; k++ {
block[k] -= minDelta
if block[k] > maxValue {
maxValue = block[k]
}
}
bitWidths[j] = byte(getBitWidth(uint64(maxValue)))
}
minDeltaZigZag := getValue(minDelta)
result = append(result, unsignedVarIntToBytes(minDeltaZigZag)...)
result = append(result, bitWidths...)
for j := 0; j < subBlockCount; j++ {
bitPacked := valuesToBitPackedBytes(
block[j*subBlockSize:(j+1)*subBlockSize],
int64(bitWidths[j]),
false,
parquet.Type_INT64,
)
result = append(result, bitPacked...)
}
}
return result
}
func valuesToDeltaBytes(values interface{}, dataType parquet.Type) []byte {
switch dataType {
case parquet.Type_INT32:
return int32ToDeltaBytes(values.([]int32))
case parquet.Type_INT64:
return int64ToDeltaBytes(values.([]int64))
}
return nil
}
func stringsToDeltaLengthByteArrayBytes(strs []string) []byte {
lengths := make([]int32, len(strs))
for i, s := range strs {
lengths[i] = int32(len(s))
}
result := int32ToDeltaBytes(lengths)
for _, s := range strs {
result = append(result, []byte(s)...)
}
return result
}
func stringsToDeltaByteArrayBytes(strs []string) []byte {
prefixLengths := make([]int32, len(strs))
suffixes := make([]string, len(strs))
var i, j int
for i = 1; i < len(strs); i++ {
for j = 0; j < len(strs[i-1]) && j < len(strs[i]); j++ {
if strs[i-1][j] != strs[i][j] {
break
}
}
prefixLengths[i] = int32(j)
suffixes[i] = strs[i][j:]
}
result := int32ToDeltaBytes(prefixLengths)
return append(result, stringsToDeltaLengthByteArrayBytes(suffixes)...)
}
func encodeValues(values interface{}, dataType parquet.Type, encoding parquet.Encoding, bitWidth int32) []byte {
switch encoding {
case parquet.Encoding_RLE:
return valuesToRLEBitPackedHybridBytes(values, bitWidth, dataType)
case parquet.Encoding_DELTA_BINARY_PACKED:
return valuesToDeltaBytes(values, dataType)
case parquet.Encoding_DELTA_BYTE_ARRAY:
return stringsToDeltaByteArrayBytes(values.([]string))
case parquet.Encoding_DELTA_LENGTH_BYTE_ARRAY:
return stringsToDeltaLengthByteArrayBytes(values.([]string))
}
return valuesToBytes(values, dataType)
}

@ -0,0 +1,50 @@
/*
* Minio Cloud Storage, (C) 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package parquet
import (
"encoding/binary"
"math"
)
func uint32ToBytes(v uint32) []byte {
buf := make([]byte, 4)
binary.LittleEndian.PutUint32(buf, v)
return buf
}
func uint64ToBytes(v uint64) []byte {
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, v)
return buf
}
func float32ToBytes(v float32) []byte {
return uint32ToBytes(math.Float32bits(v))
}
func float64ToBytes(v float64) []byte {
return uint64ToBytes(math.Float64bits(v))
}
func bytesToUint32(buf []byte) uint32 {
return binary.LittleEndian.Uint32(buf)
}
func bytesToUint64(buf []byte) uint64 {
return binary.LittleEndian.Uint64(buf)
}

@ -1,8 +0,0 @@
module github.com/minio/parquet-go
require (
git.apache.org/thrift.git v0.12.0
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
github.com/minio/minio-go v6.0.14+incompatible
github.com/pierrec/lz4 v2.0.5+incompatible
)

@ -1,8 +0,0 @@
git.apache.org/thrift.git v0.12.0 h1:CMxsZlAmxKs+VAZMlDDL0wXciMblJcutQbEe3A9CYUM=
git.apache.org/thrift.git v0.12.0/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/minio/minio-go v6.0.14+incompatible h1:fnV+GD28LeqdN6vT2XdGKW8Qe/IfjJDswNVuni6km9o=
github.com/minio/minio-go v6.0.14+incompatible/go.mod h1:7guKYtitv8dktvNUGrhzmNlA5wrAABTQXCoesZdFQO8=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=

@ -18,6 +18,7 @@ package parquet
import (
"bytes"
"context"
"fmt"
"strings"
@ -312,7 +313,7 @@ type page struct {
func newPage() *page {
return &page{
Header: parquet.NewPageHeader(),
PageSize: 8 * 1024,
PageSize: defaultPageSize,
}
}
@ -529,3 +530,418 @@ func (page *page) getValueFromRawData(columnNameIndexMap map[string]int, schemaE
return fmt.Errorf("unsupported page type %v", pageType)
}
func (page *page) toDataPage(compressType parquet.CompressionCodec) []byte {
values := []interface{}{}
for i := range page.DataTable.DefinitionLevels {
if page.DataTable.DefinitionLevels[i] == page.DataTable.MaxDefinitionLevel {
values = append(values, page.DataTable.Values[i])
}
}
valuesBytes := encodeValues(interfacesToValues(values, page.DataTable.Type), page.DataType, page.DataTable.Encoding, page.DataTable.BitWidth)
var defLevelBytes []byte
if page.DataTable.MaxDefinitionLevel > 0 {
defLevels := make([]int64, len(page.DataTable.DefinitionLevels))
for i := range page.DataTable.DefinitionLevels {
defLevels[i] = int64(page.DataTable.DefinitionLevels[i])
}
defLevelBytes = valuesToRLEBitPackedHybridBytes(
defLevels,
int32(getBitWidth(uint64(page.DataTable.MaxDefinitionLevel))),
parquet.Type_INT64,
)
}
var repLevelBytes []byte
if page.DataTable.MaxRepetitionLevel > 0 {
repLevels := make([]int64, len(page.DataTable.DefinitionLevels))
for i := range page.DataTable.DefinitionLevels {
repLevels[i] = int64(page.DataTable.RepetitionLevels[i])
}
repLevelBytes = valuesToRLEBitPackedHybridBytes(
repLevels,
int32(getBitWidth(uint64(page.DataTable.MaxRepetitionLevel))),
parquet.Type_INT64,
)
}
data := repLevelBytes
data = append(data, defLevelBytes...)
data = append(data, valuesBytes...)
compressedData, err := compressionCodec(compressType).compress(data)
if err != nil {
panic(err)
}
page.Header = parquet.NewPageHeader()
page.Header.Type = parquet.PageType_DATA_PAGE
page.Header.CompressedPageSize = int32(len(compressedData))
page.Header.UncompressedPageSize = int32(len(data))
page.Header.DataPageHeader = parquet.NewDataPageHeader()
page.Header.DataPageHeader.NumValues = int32(len(page.DataTable.DefinitionLevels))
page.Header.DataPageHeader.DefinitionLevelEncoding = parquet.Encoding_RLE
page.Header.DataPageHeader.RepetitionLevelEncoding = parquet.Encoding_RLE
page.Header.DataPageHeader.Encoding = page.DataTable.Encoding
page.Header.DataPageHeader.Statistics = parquet.NewStatistics()
if page.MaxVal != nil {
tmpBuf := valueToBytes(page.MaxVal, page.DataType)
if page.DataType == parquet.Type_BYTE_ARRAY {
switch page.DataTable.ConvertedType {
case parquet.ConvertedType_UTF8, parquet.ConvertedType_DECIMAL:
tmpBuf = tmpBuf[4:]
}
}
page.Header.DataPageHeader.Statistics.Max = tmpBuf
}
if page.MinVal != nil {
tmpBuf := valueToBytes(page.MinVal, page.DataType)
if page.DataType == parquet.Type_BYTE_ARRAY {
switch page.DataTable.ConvertedType {
case parquet.ConvertedType_UTF8, parquet.ConvertedType_DECIMAL:
tmpBuf = tmpBuf[4:]
}
}
page.Header.DataPageHeader.Statistics.Min = tmpBuf
}
ts := thrift.NewTSerializer()
ts.Protocol = thrift.NewTCompactProtocolFactory().GetProtocol(ts.Transport)
pageHeaderBytes, err := ts.Write(context.TODO(), page.Header)
if err != nil {
panic(err)
}
page.RawData = append(pageHeaderBytes, compressedData...)
return page.RawData
}
func (page *page) toDataPageV2(compressType parquet.CompressionCodec) []byte {
values := []interface{}{}
for i := range page.DataTable.DefinitionLevels {
if page.DataTable.DefinitionLevels[i] == page.DataTable.MaxDefinitionLevel {
values = append(values, page.DataTable.Values[i])
}
}
valuesBytes := encodeValues(values, page.DataType, page.DataTable.Encoding, page.DataTable.BitWidth)
var defLevelBytes []byte
if page.DataTable.MaxDefinitionLevel > 0 {
defLevels := make([]int64, len(page.DataTable.DefinitionLevels))
for i := range page.DataTable.DefinitionLevels {
defLevels[i] = int64(page.DataTable.DefinitionLevels[i])
}
defLevelBytes = valuesToRLEBytes(
defLevels,
int32(getBitWidth(uint64(page.DataTable.MaxDefinitionLevel))),
parquet.Type_INT64,
)
}
var repLevelBytes []byte
numRows := int32(0)
if page.DataTable.MaxRepetitionLevel > 0 {
repLevels := make([]int64, len(page.DataTable.DefinitionLevels))
for i := range page.DataTable.DefinitionLevels {
repLevels[i] = int64(page.DataTable.RepetitionLevels[i])
if page.DataTable.RepetitionLevels[i] == 0 {
numRows++
}
}
repLevelBytes = valuesToRLEBytes(
repLevels,
int32(getBitWidth(uint64(page.DataTable.MaxRepetitionLevel))),
parquet.Type_INT64,
)
}
compressedData, err := compressionCodec(compressType).compress(valuesBytes)
if err != nil {
panic(err)
}
page.Header = parquet.NewPageHeader()
page.Header.Type = parquet.PageType_DATA_PAGE_V2
page.Header.CompressedPageSize = int32(len(compressedData) + len(defLevelBytes) + len(repLevelBytes))
page.Header.UncompressedPageSize = int32(len(valuesBytes) + len(defLevelBytes) + len(repLevelBytes))
page.Header.DataPageHeaderV2 = parquet.NewDataPageHeaderV2()
page.Header.DataPageHeaderV2.NumValues = int32(len(page.DataTable.Values))
page.Header.DataPageHeaderV2.NumNulls = page.Header.DataPageHeaderV2.NumValues - int32(len(values))
page.Header.DataPageHeaderV2.NumRows = numRows
page.Header.DataPageHeaderV2.Encoding = page.DataTable.Encoding
page.Header.DataPageHeaderV2.DefinitionLevelsByteLength = int32(len(defLevelBytes))
page.Header.DataPageHeaderV2.RepetitionLevelsByteLength = int32(len(repLevelBytes))
page.Header.DataPageHeaderV2.IsCompressed = true
page.Header.DataPageHeaderV2.Statistics = parquet.NewStatistics()
if page.MaxVal != nil {
tmpBuf := valueToBytes(page.MaxVal, page.DataType)
if page.DataType == parquet.Type_BYTE_ARRAY {
switch page.DataTable.ConvertedType {
case parquet.ConvertedType_UTF8, parquet.ConvertedType_DECIMAL:
tmpBuf = tmpBuf[4:]
}
}
page.Header.DataPageHeaderV2.Statistics.Max = tmpBuf
}
if page.MinVal != nil {
tmpBuf := valueToBytes(page.MinVal, page.DataType)
if page.DataType == parquet.Type_BYTE_ARRAY {
switch page.DataTable.ConvertedType {
case parquet.ConvertedType_UTF8, parquet.ConvertedType_DECIMAL:
tmpBuf = tmpBuf[4:]
}
}
page.Header.DataPageHeaderV2.Statistics.Min = tmpBuf
}
ts := thrift.NewTSerializer()
ts.Protocol = thrift.NewTCompactProtocolFactory().GetProtocol(ts.Transport)
pageHeaderBytes, err := ts.Write(context.TODO(), page.Header)
if err != nil {
panic(err)
}
page.RawData = append(pageHeaderBytes, repLevelBytes...)
page.RawData = append(page.RawData, defLevelBytes...)
page.RawData = append(page.RawData, compressedData...)
return page.RawData
}
func (page *page) toDictPage(compressType parquet.CompressionCodec, dataType parquet.Type) []byte {
valuesBytes := valuesToBytes(page.DataTable.Values, dataType)
compressedData, err := compressionCodec(compressType).compress(valuesBytes)
if err != nil {
panic(err)
}
page.Header = parquet.NewPageHeader()
page.Header.Type = parquet.PageType_DICTIONARY_PAGE
page.Header.CompressedPageSize = int32(len(compressedData))
page.Header.UncompressedPageSize = int32(len(valuesBytes))
page.Header.DictionaryPageHeader = parquet.NewDictionaryPageHeader()
page.Header.DictionaryPageHeader.NumValues = int32(len(page.DataTable.Values))
page.Header.DictionaryPageHeader.Encoding = parquet.Encoding_PLAIN
ts := thrift.NewTSerializer()
ts.Protocol = thrift.NewTCompactProtocolFactory().GetProtocol(ts.Transport)
pageHeaderBytes, err := ts.Write(context.TODO(), page.Header)
if err != nil {
panic(err)
}
page.RawData = append(pageHeaderBytes, compressedData...)
return page.RawData
}
func (page *page) toDictDataPage(compressType parquet.CompressionCodec, bitWidth int32) []byte {
valuesBytes := append([]byte{byte(bitWidth)}, valuesToRLEBytes(page.DataTable.Values, bitWidth, parquet.Type_INT32)...)
var defLevelBytes []byte
if page.DataTable.MaxDefinitionLevel > 0 {
defLevels := make([]int64, len(page.DataTable.DefinitionLevels))
for i := range page.DataTable.DefinitionLevels {
defLevels[i] = int64(page.DataTable.DefinitionLevels[i])
}
defLevelBytes = valuesToRLEBitPackedHybridBytes(
defLevels,
int32(getBitWidth(uint64(page.DataTable.MaxDefinitionLevel))),
parquet.Type_INT64,
)
}
var repLevelBytes []byte
if page.DataTable.MaxRepetitionLevel > 0 {
repLevels := make([]int64, len(page.DataTable.DefinitionLevels))
for i := range page.DataTable.DefinitionLevels {
repLevels[i] = int64(page.DataTable.RepetitionLevels[i])
}
repLevelBytes = valuesToRLEBitPackedHybridBytes(
repLevels,
int32(getBitWidth(uint64(page.DataTable.MaxRepetitionLevel))),
parquet.Type_INT64,
)
}
data := append(repLevelBytes, defLevelBytes...)
data = append(data, valuesBytes...)
compressedData, err := compressionCodec(compressType).compress(data)
if err != nil {
panic(err)
}
page.Header = parquet.NewPageHeader()
page.Header.Type = parquet.PageType_DATA_PAGE
page.Header.CompressedPageSize = int32(len(compressedData))
page.Header.UncompressedPageSize = int32(len(data))
page.Header.DataPageHeader = parquet.NewDataPageHeader()
page.Header.DataPageHeader.NumValues = int32(len(page.DataTable.DefinitionLevels))
page.Header.DataPageHeader.DefinitionLevelEncoding = parquet.Encoding_RLE
page.Header.DataPageHeader.RepetitionLevelEncoding = parquet.Encoding_RLE
page.Header.DataPageHeader.Encoding = parquet.Encoding_PLAIN_DICTIONARY
ts := thrift.NewTSerializer()
ts.Protocol = thrift.NewTCompactProtocolFactory().GetProtocol(ts.Transport)
pageHeaderBytes, err := ts.Write(context.TODO(), page.Header)
if err != nil {
panic(err)
}
page.RawData = append(pageHeaderBytes, compressedData...)
return page.RawData
}
func tableToDataPages(dataTable *table, pageSize int32, compressType parquet.CompressionCodec) (result []*page, totalSize int64) {
var j int
for i := 0; i < len(dataTable.Values); i = j {
var size, numValues int32
minVal := dataTable.Values[i]
maxVal := dataTable.Values[i]
for j = i + 1; j < len(dataTable.Values) && size < pageSize; j++ {
if dataTable.DefinitionLevels[j] == dataTable.MaxDefinitionLevel {
numValues++
size += sizeOf(dataTable.Values[j])
minVal = min(minVal, dataTable.Values[j], &dataTable.Type, &dataTable.ConvertedType)
maxVal = max(maxVal, dataTable.Values[j], &dataTable.Type, &dataTable.ConvertedType)
}
}
page := newDataPage()
page.PageSize = pageSize
page.Header.DataPageHeader.NumValues = numValues
page.Header.Type = parquet.PageType_DATA_PAGE
page.DataTable = new(table)
page.DataTable.RepetitionType = dataTable.RepetitionType
page.DataTable.Path = dataTable.Path
page.DataTable.MaxDefinitionLevel = dataTable.MaxDefinitionLevel
page.DataTable.MaxRepetitionLevel = dataTable.MaxRepetitionLevel
page.DataTable.Values = dataTable.Values[i:j]
page.DataTable.DefinitionLevels = dataTable.DefinitionLevels[i:j]
page.DataTable.RepetitionLevels = dataTable.RepetitionLevels[i:j]
page.DataTable.Type = dataTable.Type
page.DataTable.ConvertedType = dataTable.ConvertedType
page.DataTable.Encoding = dataTable.Encoding
page.MinVal = minVal
page.MaxVal = maxVal
page.DataType = dataTable.Type
page.CompressType = compressType
page.Path = dataTable.Path
page.toDataPage(compressType)
totalSize += int64(len(page.RawData))
result = append(result, page)
}
return result, totalSize
}
func tableToDictPage(dataTable *table, pageSize int32, compressType parquet.CompressionCodec) (*page, int64) {
dataType := dataTable.Type
page := newDataPage()
page.PageSize = pageSize
page.Header.DataPageHeader.NumValues = int32(len(dataTable.Values))
page.Header.Type = parquet.PageType_DICTIONARY_PAGE
page.DataTable = new(table)
page.DataTable.RepetitionType = dataTable.RepetitionType
page.DataTable.Path = dataTable.Path
page.DataTable.MaxDefinitionLevel = dataTable.MaxDefinitionLevel
page.DataTable.MaxRepetitionLevel = dataTable.MaxRepetitionLevel
page.DataTable.Values = dataTable.Values
page.DataTable.DefinitionLevels = dataTable.DefinitionLevels
page.DataTable.RepetitionLevels = dataTable.RepetitionLevels
page.DataType = dataType
page.CompressType = compressType
page.Path = dataTable.Path
page.toDictPage(compressType, dataTable.Type)
return page, int64(len(page.RawData))
}
type dictRec struct {
DictMap map[interface{}]int32
DictSlice []interface{}
Type parquet.Type
}
func newDictRec(dataType parquet.Type) *dictRec {
return &dictRec{
DictMap: make(map[interface{}]int32),
Type: dataType,
}
}
func dictRecToDictPage(dictRec *dictRec, pageSize int32, compressType parquet.CompressionCodec) (*page, int64) {
page := newDataPage()
page.PageSize = pageSize
page.Header.DataPageHeader.NumValues = int32(len(dictRec.DictSlice))
page.Header.Type = parquet.PageType_DICTIONARY_PAGE
page.DataTable = new(table)
page.DataTable.Values = dictRec.DictSlice
page.DataType = parquet.Type_INT32
page.CompressType = compressType
page.toDictPage(compressType, dictRec.Type)
return page, int64(len(page.RawData))
}
func tableToDictDataPages(dictRec *dictRec, dataTable *table, pageSize int32, bitWidth int32, compressType parquet.CompressionCodec) (pages []*page, totalSize int64) {
dataType := dataTable.Type
pT, cT := dataTable.Type, dataTable.ConvertedType
j := 0
for i := 0; i < len(dataTable.Values); i = j {
j = i
var size, numValues int32
maxVal := dataTable.Values[i]
minVal := dataTable.Values[i]
values := make([]interface{}, 0)
for j < len(dataTable.Values) && size < pageSize {
if dataTable.DefinitionLevels[j] == dataTable.MaxDefinitionLevel {
numValues++
size += int32(sizeOf(dataTable.Values[j]))
maxVal = max(maxVal, dataTable.Values[j], &pT, &cT)
minVal = min(minVal, dataTable.Values[j], &pT, &cT)
if _, ok := dictRec.DictMap[dataTable.Values[j]]; !ok {
dictRec.DictSlice = append(dictRec.DictSlice, dataTable.Values[j])
dictRec.DictMap[dataTable.Values[j]] = int32(len(dictRec.DictSlice) - 1)
}
values = append(values, int32(dictRec.DictMap[dataTable.Values[j]]))
}
j++
}
page := newDataPage()
page.PageSize = pageSize
page.Header.DataPageHeader.NumValues = numValues
page.Header.Type = parquet.PageType_DATA_PAGE
page.DataTable = new(table)
page.DataTable.RepetitionType = dataTable.RepetitionType
page.DataTable.Path = dataTable.Path
page.DataTable.MaxDefinitionLevel = dataTable.MaxDefinitionLevel
page.DataTable.MaxRepetitionLevel = dataTable.MaxRepetitionLevel
page.DataTable.Values = values
page.DataTable.DefinitionLevels = dataTable.DefinitionLevels[i:j]
page.DataTable.RepetitionLevels = dataTable.RepetitionLevels[i:j]
page.MaxVal = maxVal
page.MinVal = minVal
page.DataType = dataType
page.CompressType = compressType
page.Path = dataTable.Path
page.toDictDataPage(compressType, bitWidth)
totalSize += int64(len(page.RawData))
pages = append(pages, page)
}
return pages, totalSize
}

@ -81,8 +81,8 @@ func (value Value) MarshalJSON() (data []byte, err error) {
return json.Marshal(value.Value)
}
// File - denotes parquet file.
type File struct {
// Reader - denotes parquet file.
type Reader struct {
getReaderFunc GetReaderFunc
schemaElements []*parquet.SchemaElement
rowGroups []*parquet.RowGroup
@ -94,8 +94,8 @@ type File struct {
rowIndex int64
}
// Open - opens parquet file with given column names.
func Open(getReaderFunc GetReaderFunc, columnNames set.StringSet) (*File, error) {
// NewReader - creates new parquet reader. Reader calls getReaderFunc to get required data range for given columnNames. If columnNames is empty, all columns are used.
func NewReader(getReaderFunc GetReaderFunc, columnNames set.StringSet) (*Reader, error) {
fileMeta, err := fileMetadata(getReaderFunc)
if err != nil {
return nil, err
@ -107,7 +107,7 @@ func Open(getReaderFunc GetReaderFunc, columnNames set.StringSet) (*File, error)
nameList = append(nameList, element.Name)
}
return &File{
return &Reader{
getReaderFunc: getReaderFunc,
rowGroups: fileMeta.GetRowGroups(),
schemaElements: schemaElements,
@ -117,54 +117,50 @@ func Open(getReaderFunc GetReaderFunc, columnNames set.StringSet) (*File, error)
}
// Read - reads single record.
func (file *File) Read() (record *Record, err error) {
if file.rowGroupIndex >= len(file.rowGroups) {
func (reader *Reader) Read() (record *Record, err error) {
if reader.rowGroupIndex >= len(reader.rowGroups) {
return nil, io.EOF
}
if file.columns == nil {
file.columns, err = getColumns(
file.rowGroups[file.rowGroupIndex],
file.columnNames,
file.schemaElements,
file.getReaderFunc,
if reader.columns == nil {
reader.columns, err = getColumns(
reader.rowGroups[reader.rowGroupIndex],
reader.columnNames,
reader.schemaElements,
reader.getReaderFunc,
)
if err != nil {
return nil, err
}
file.rowIndex = 0
reader.rowIndex = 0
}
if file.rowIndex >= file.rowGroups[file.rowGroupIndex].GetNumRows() {
file.rowGroupIndex++
file.Close()
return file.Read()
if reader.rowIndex >= reader.rowGroups[reader.rowGroupIndex].GetNumRows() {
reader.rowGroupIndex++
reader.Close()
return reader.Read()
}
record = newRecord(file.nameList)
for name := range file.columns {
value, valueType := file.columns[name].read()
record = newRecord(reader.nameList)
for name := range reader.columns {
value, valueType := reader.columns[name].read()
record.set(name, Value{value, valueType})
}
file.rowIndex++
reader.rowIndex++
return record, nil
}
// Close - closes underneath readers.
func (file *File) Close() (err error) {
if file.columns != nil {
return nil
}
for _, column := range file.columns {
func (reader *Reader) Close() (err error) {
for _, column := range reader.columns {
column.close()
}
file.columns = nil
file.rowIndex = 0
reader.columns = nil
reader.rowIndex = 0
return nil
}

@ -0,0 +1,54 @@
/*
* Minio Cloud Storage, (C) 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package parquet
import (
"strings"
"github.com/minio/parquet-go/gen-go/parquet"
)
type rowGroup struct {
Chunks []*columnChunk
RowGroupHeader *parquet.RowGroup
}
func newRowGroup() *rowGroup {
return &rowGroup{
RowGroupHeader: parquet.NewRowGroup(),
}
}
func (rg *rowGroup) rowGroupToTableMap() map[string]*table {
tableMap := make(map[string]*table)
for _, chunk := range rg.Chunks {
columnPath := ""
for _, page := range chunk.Pages {
if columnPath == "" {
columnPath = strings.Join(page.DataTable.Path, ".")
}
if _, ok := tableMap[columnPath]; !ok {
tableMap[columnPath] = newTableFromTable(page.DataTable)
}
tableMap[columnPath].Merge(page.DataTable)
}
}
return tableMap
}

@ -19,34 +19,7 @@ package parquet
import "github.com/minio/parquet-go/gen-go/parquet"
func getTableValues(values interface{}, valueType parquet.Type) (tableValues []interface{}) {
switch valueType {
case parquet.Type_BOOLEAN:
for _, v := range values.([]bool) {
tableValues = append(tableValues, v)
}
case parquet.Type_INT32:
for _, v := range values.([]int32) {
tableValues = append(tableValues, v)
}
case parquet.Type_INT64:
for _, v := range values.([]int64) {
tableValues = append(tableValues, v)
}
case parquet.Type_FLOAT:
for _, v := range values.([]float32) {
tableValues = append(tableValues, v)
}
case parquet.Type_DOUBLE:
for _, v := range values.([]float64) {
tableValues = append(tableValues, v)
}
case parquet.Type_INT96, parquet.Type_BYTE_ARRAY, parquet.Type_FIXED_LEN_BYTE_ARRAY:
for _, v := range values.([][]byte) {
tableValues = append(tableValues, v)
}
}
return tableValues
return valuesToInterfaces(values, valueType)
}
type table struct {
@ -58,6 +31,9 @@ type table struct {
Values []interface{} // Parquet values
DefinitionLevels []int32 // Definition Levels slice
RepetitionLevels []int32 // Repetition Levels slice
ConvertedType parquet.ConvertedType
Encoding parquet.Encoding
BitWidth int32
}
func newTableFromTable(srcTable *table) *table {

@ -0,0 +1,279 @@
/*
* Minio Cloud Storage, (C) 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package parquet
import (
"context"
"encoding/binary"
"fmt"
"io"
"strings"
"git.apache.org/thrift.git/lib/go/thrift"
"github.com/minio/parquet-go/gen-go/parquet"
)
const (
defaultPageSize = 8 * 1024 // 8 KiB
defaultRowGroupSize = 128 * 1024 * 1024 // 128 MiB
)
// Writer - represents parquet writer.
type Writer struct {
PageSize int64
RowGroupSize int64
CompressionType parquet.CompressionCodec
writeCloser io.WriteCloser
size int64
numRows int64
offset int64
pagesMapBuf map[string][]*page
dictRecs map[string]*dictRec
footer *parquet.FileMetaData
schemaElements []*parquet.SchemaElement
rowGroupCount int
records []map[string]*Value
}
func (writer *Writer) writeRecords() (err error) {
if len(writer.records) == 0 {
return nil
}
tableMap := make(map[string]*table)
for _, schema := range writer.schemaElements {
if schema.GetNumChildren() != 0 {
continue
}
table := new(table)
table.Path = strings.Split(schema.Name, ".")
table.MaxDefinitionLevel = 0
table.MaxRepetitionLevel = 0
table.RepetitionType = schema.GetRepetitionType()
table.Type = schema.GetType()
table.ConvertedType = -1
for _, record := range writer.records {
value := record[schema.Name]
if *schema.Type != value.Type {
return fmt.Errorf("schema.Type and value.Type are not same")
}
switch value.Type {
case parquet.Type_BOOLEAN:
table.Values = append(table.Values, value.Value.(bool))
case parquet.Type_INT32:
table.Values = append(table.Values, value.Value.(int32))
case parquet.Type_INT64:
table.Values = append(table.Values, value.Value.(int64))
case parquet.Type_INT96, parquet.Type_BYTE_ARRAY, parquet.Type_FIXED_LEN_BYTE_ARRAY:
table.Values = append(table.Values, value.Value.([]byte))
case parquet.Type_FLOAT:
table.Values = append(table.Values, value.Value.(float32))
case parquet.Type_DOUBLE:
table.Values = append(table.Values, value.Value.(float64))
default:
return fmt.Errorf("unknown parquet type %v", value.Type)
}
table.DefinitionLevels = append(table.DefinitionLevels, 0)
table.RepetitionLevels = append(table.RepetitionLevels, 0)
}
tableMap[schema.Name] = table
}
pagesMap := make(map[string][]*page)
for name, table := range tableMap {
if table.Encoding == parquet.Encoding_PLAIN_DICTIONARY {
if _, ok := writer.dictRecs[name]; !ok {
writer.dictRecs[name] = newDictRec(table.Type)
}
pagesMap[name], _ = tableToDictDataPages(writer.dictRecs[name], table, int32(writer.PageSize), 32, writer.CompressionType)
} else {
pagesMap[name], _ = tableToDataPages(table, int32(writer.PageSize), writer.CompressionType)
}
}
recordsSize := int64(0)
for name, pages := range pagesMap {
if _, ok := writer.pagesMapBuf[name]; !ok {
writer.pagesMapBuf[name] = pages
} else {
writer.pagesMapBuf[name] = append(writer.pagesMapBuf[name], pages...)
}
for _, page := range pages {
recordsSize += int64(len(page.RawData))
writer.size += int64(len(page.RawData))
// As we got raw data, we don't need data table here after
// page.DataTable = nil
}
}
writer.numRows += int64(len(writer.records))
// if len(writer.pagesMapBuf) > 0 && writer.size+recordsSize >= writer.RowGroupSize {
if len(writer.pagesMapBuf) > 0 {
//pages -> chunk
chunkMap := make(map[string]*columnChunk)
for name, pages := range writer.pagesMapBuf {
// FIXME: add page encoding support.
// if len(pages) > 0 && pages[0].Info.Encoding == parquet.Encoding_PLAIN_DICTIONARY {
// dictPage, _ := dictoRecToDictPage(writer.dictRecs[name], int32(writer.PageSize), writer.CompressionType)
// tmp := append([]*page{dictPage}, pages...)
// chunkMap[name] = pagesToDictColumnChunk(tmp)
// } else {
// chunkMap[name] = pagesToColumnChunk(pages)
// }
chunkMap[name] = pagesToColumnChunk(pages)
}
writer.dictRecs = make(map[string]*dictRec)
//chunks -> rowGroup
rowGroup := newRowGroup()
rowGroup.RowGroupHeader.Columns = []*parquet.ColumnChunk{}
for k := 0; k < len(writer.schemaElements); k++ {
//for _, chunk := range chunkMap {
schema := writer.schemaElements[k]
if schema.GetNumChildren() > 0 {
continue
}
chunk := chunkMap[schema.Name]
if chunk == nil {
continue
}
rowGroup.Chunks = append(rowGroup.Chunks, chunk)
rowGroup.RowGroupHeader.TotalByteSize += chunk.chunkHeader.MetaData.TotalCompressedSize
rowGroup.RowGroupHeader.Columns = append(rowGroup.RowGroupHeader.Columns, chunk.chunkHeader)
}
rowGroup.RowGroupHeader.NumRows = writer.numRows
writer.numRows = 0
for k := 0; k < len(rowGroup.Chunks); k++ {
rowGroup.Chunks[k].chunkHeader.MetaData.DataPageOffset = -1
rowGroup.Chunks[k].chunkHeader.FileOffset = writer.offset
for l := 0; l < len(rowGroup.Chunks[k].Pages); l++ {
switch {
case rowGroup.Chunks[k].Pages[l].Header.Type == parquet.PageType_DICTIONARY_PAGE:
offset := writer.offset
rowGroup.Chunks[k].chunkHeader.MetaData.DictionaryPageOffset = &offset
case rowGroup.Chunks[k].chunkHeader.MetaData.DataPageOffset <= 0:
rowGroup.Chunks[k].chunkHeader.MetaData.DataPageOffset = writer.offset
}
data := rowGroup.Chunks[k].Pages[l].RawData
if _, err = writer.writeCloser.Write(data); err != nil {
return err
}
writer.offset += int64(len(data))
}
}
writer.footer.RowGroups = append(writer.footer.RowGroups, rowGroup.RowGroupHeader)
writer.size = 0
writer.pagesMapBuf = make(map[string][]*page)
}
writer.footer.NumRows += int64(len(writer.records))
writer.records = writer.records[:0]
return nil
}
// Write - writes a single record. The actual binary data write happens once rowGroupCount records are cached.
func (writer *Writer) Write(record map[string]*Value) (err error) {
writer.records = append(writer.records, record)
if len(writer.records) != writer.rowGroupCount {
return nil
}
return writer.writeRecords()
}
func (writer *Writer) finalize() (err error) {
if err = writer.writeRecords(); err != nil {
return err
}
ts := thrift.NewTSerializer()
ts.Protocol = thrift.NewTCompactProtocolFactory().GetProtocol(ts.Transport)
footerBuf, err := ts.Write(context.TODO(), writer.footer)
if err != nil {
return err
}
if _, err = writer.writeCloser.Write(footerBuf); err != nil {
return err
}
footerSizeBuf := make([]byte, 4)
binary.LittleEndian.PutUint32(footerSizeBuf, uint32(len(footerBuf)))
if _, err = writer.writeCloser.Write(footerSizeBuf); err != nil {
return err
}
_, err = writer.writeCloser.Write([]byte("PAR1"))
return err
}
// Close - finalizes and closes writer. If any pending records are available, they are written here.
func (writer *Writer) Close() (err error) {
if err = writer.finalize(); err != nil {
return err
}
return writer.writeCloser.Close()
}
// NewWriter - creates new parquet writer. Binary data of rowGroupCount records are written to writeCloser.
func NewWriter(writeCloser io.WriteCloser, schemaElements []*parquet.SchemaElement, rowGroupCount int) (*Writer, error) {
if _, err := writeCloser.Write([]byte("PAR1")); err != nil {
return nil, err
}
footer := parquet.NewFileMetaData()
footer.Version = 1
numChildren := int32(len(schemaElements))
footer.Schema = append(footer.Schema, &parquet.SchemaElement{
Name: "schema",
RepetitionType: parquet.FieldRepetitionTypePtr(parquet.FieldRepetitionType_REQUIRED),
NumChildren: &numChildren,
})
footer.Schema = append(footer.Schema, schemaElements...)
return &Writer{
PageSize: defaultPageSize,
RowGroupSize: defaultRowGroupSize,
CompressionType: parquet.CompressionCodec_SNAPPY,
writeCloser: writeCloser,
offset: 4,
pagesMapBuf: make(map[string][]*page),
dictRecs: make(map[string]*dictRec),
footer: footer,
schemaElements: schemaElements,
rowGroupCount: rowGroupCount,
}, nil
}

10
vendor/vendor.json vendored

@ -674,16 +674,16 @@
"revisionTime": "2019-01-20T10:05:29Z"
},
{
"checksumSHA1": "FO6q7sC2QTZMGDWv+/cGWJfjTCk=",
"checksumSHA1": "KzX4Kc/eOb8Dd8yJtO8fh3CIxHs=",
"path": "github.com/minio/parquet-go",
"revision": "d5e4e922da820530a1851afc22499c826c08f1e8",
"revisionTime": "2019-02-10T14:56:30Z"
"revision": "54766442a4a1f0fee45d70ebbf8acebc6d66f24b",
"revisionTime": "2019-03-13T01:38:37Z"
},
{
"checksumSHA1": "N4WRPw4p3AN958RH/O53kUsJacQ=",
"path": "github.com/minio/parquet-go/gen-go/parquet",
"revision": "d50385ed243d7120cf0f78de3f4f4e171936f12f",
"revisionTime": "2018-11-07T21:57:30Z"
"revision": "54766442a4a1f0fee45d70ebbf8acebc6d66f24b",
"revisionTime": "2019-03-13T01:38:37Z"
},
{
"checksumSHA1": "cYuXpiVBMypgkEr0Wqd79jPPyBg=",

Loading…
Cancel
Save