You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
513 lines
11 KiB
513 lines
11 KiB
/*
|
|
* Minio Cloud Storage, (C) 2018 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package parquet
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
|
|
"github.com/minio/minio/pkg/s3select/internal/parquet-go/gen-go/parquet"
|
|
)
|
|
|
|
func i64sToi32s(i64s []int64) (i32s []int32) {
|
|
i32s = make([]int32, len(i64s))
|
|
for i := range i64s {
|
|
i32s[i] = int32(i64s[i])
|
|
}
|
|
|
|
return i32s
|
|
}
|
|
|
|
func readBitPacked(reader *bytes.Reader, header, bitWidth uint64) (result []int64, err error) {
|
|
count := header * 8
|
|
|
|
if count == 0 {
|
|
return result, nil
|
|
}
|
|
|
|
if bitWidth == 0 {
|
|
return make([]int64, count), nil
|
|
}
|
|
|
|
data := make([]byte, header*bitWidth)
|
|
if _, err = reader.Read(data); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var val, used, left, b uint64
|
|
|
|
valNeedBits := bitWidth
|
|
i := -1
|
|
for {
|
|
if left <= 0 {
|
|
i++
|
|
if i >= len(data) {
|
|
break
|
|
}
|
|
|
|
b = uint64(data[i])
|
|
left = 8
|
|
used = 0
|
|
}
|
|
|
|
if left >= valNeedBits {
|
|
val |= ((b >> used) & ((1 << valNeedBits) - 1)) << (bitWidth - valNeedBits)
|
|
result = append(result, int64(val))
|
|
val = 0
|
|
left -= valNeedBits
|
|
used += valNeedBits
|
|
valNeedBits = bitWidth
|
|
} else {
|
|
val |= (b >> used) << (bitWidth - valNeedBits)
|
|
valNeedBits -= left
|
|
left = 0
|
|
}
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func readBools(reader *bytes.Reader, count uint64) (result []bool, err error) {
|
|
i64s, err := readBitPacked(reader, count, 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var i uint64
|
|
for i = 0; i < count; i++ {
|
|
result = append(result, i64s[i] > 0)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func readInt32s(reader *bytes.Reader, count uint64) (result []int32, err error) {
|
|
buf := make([]byte, 4)
|
|
|
|
var i uint64
|
|
for i = 0; i < count; i++ {
|
|
if _, err = reader.Read(buf); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result = append(result, int32(bytesToUint32(buf)))
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func readInt64s(reader *bytes.Reader, count uint64) (result []int64, err error) {
|
|
buf := make([]byte, 8)
|
|
|
|
var i uint64
|
|
for i = 0; i < count; i++ {
|
|
if _, err = reader.Read(buf); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result = append(result, int64(bytesToUint64(buf)))
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func readInt96s(reader *bytes.Reader, count uint64) (result [][]byte, err error) {
|
|
var i uint64
|
|
for i = 0; i < count; i++ {
|
|
buf := make([]byte, 12)
|
|
|
|
if _, err = reader.Read(buf); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result = append(result, buf)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func readFloats(reader *bytes.Reader, count uint64) (result []float32, err error) {
|
|
buf := make([]byte, 4)
|
|
|
|
var i uint64
|
|
for i = 0; i < count; i++ {
|
|
if _, err = reader.Read(buf); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result = append(result, math.Float32frombits(bytesToUint32(buf)))
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func readDoubles(reader *bytes.Reader, count uint64) (result []float64, err error) {
|
|
buf := make([]byte, 8)
|
|
|
|
var i uint64
|
|
for i = 0; i < count; i++ {
|
|
if _, err = reader.Read(buf); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result = append(result, math.Float64frombits(bytesToUint64(buf)))
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func readByteArrays(reader *bytes.Reader, count uint64) (result [][]byte, err error) {
|
|
buf := make([]byte, 4)
|
|
var length uint32
|
|
var data []byte
|
|
|
|
var i uint64
|
|
for i = 0; i < count; i++ {
|
|
if _, err = reader.Read(buf); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
length = bytesToUint32(buf)
|
|
data = make([]byte, length)
|
|
if length > 0 {
|
|
if _, err = reader.Read(data); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
result = append(result, data)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func readFixedLenByteArrays(reader *bytes.Reader, count, length uint64) (result [][]byte, err error) {
|
|
var i uint64
|
|
for i = 0; i < count; i++ {
|
|
data := make([]byte, length)
|
|
if _, err = reader.Read(data); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result = append(result, data)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func readValues(reader *bytes.Reader, dataType parquet.Type, count, length uint64) (interface{}, error) {
|
|
switch dataType {
|
|
case parquet.Type_BOOLEAN:
|
|
return readBools(reader, count)
|
|
case parquet.Type_INT32:
|
|
return readInt32s(reader, count)
|
|
case parquet.Type_INT64:
|
|
return readInt64s(reader, count)
|
|
case parquet.Type_INT96:
|
|
return readInt96s(reader, count)
|
|
case parquet.Type_FLOAT:
|
|
return readFloats(reader, count)
|
|
case parquet.Type_DOUBLE:
|
|
return readDoubles(reader, count)
|
|
case parquet.Type_BYTE_ARRAY:
|
|
return readByteArrays(reader, count)
|
|
case parquet.Type_FIXED_LEN_BYTE_ARRAY:
|
|
return readFixedLenByteArrays(reader, count, length)
|
|
}
|
|
|
|
return nil, fmt.Errorf("unknown parquet type %v", dataType)
|
|
}
|
|
|
|
func readUnsignedVarInt(reader *bytes.Reader) (v uint64, err error) {
|
|
var b byte
|
|
var shift uint64
|
|
|
|
for {
|
|
if b, err = reader.ReadByte(); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if v |= ((uint64(b) & 0x7F) << shift); b&0x80 == 0 {
|
|
break
|
|
}
|
|
|
|
shift += 7
|
|
}
|
|
|
|
return v, nil
|
|
}
|
|
|
|
func readRLE(reader *bytes.Reader, header, bitWidth uint64) (result []int64, err error) {
|
|
width := (bitWidth + 7) / 8
|
|
data := make([]byte, width)
|
|
if width > 0 {
|
|
if _, err = reader.Read(data); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if width < 4 {
|
|
data = append(data, make([]byte, 4-width)...)
|
|
}
|
|
|
|
val := int64(bytesToUint32(data))
|
|
count := header >> 1
|
|
if count > math.MaxInt64/8 {
|
|
// 8 bytes/element.
|
|
return nil, errors.New("parquet: size too large")
|
|
}
|
|
result = make([]int64, count)
|
|
for i := range result {
|
|
result[i] = val
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func readRLEBitPackedHybrid(reader *bytes.Reader, length, bitWidth uint64) (result []int64, err error) {
|
|
if length <= 0 {
|
|
var i32s []int32
|
|
i32s, err = readInt32s(reader, 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if i32s[0] < 0 {
|
|
return nil, errors.New("parquet: negative RLEBitPackedHybrid length")
|
|
}
|
|
length = uint64(i32s[0])
|
|
}
|
|
|
|
buf := make([]byte, length)
|
|
if _, err = reader.Read(buf); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
reader = bytes.NewReader(buf)
|
|
for reader.Len() > 0 {
|
|
header, err := readUnsignedVarInt(reader)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var i64s []int64
|
|
if header&1 == 0 {
|
|
i64s, err = readRLE(reader, header, bitWidth)
|
|
} else {
|
|
i64s, err = readBitPacked(reader, header>>1, bitWidth)
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result = append(result, i64s...)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func readDeltaBinaryPackedInt(reader *bytes.Reader) (result []int64, err error) {
|
|
blockSize, err := readUnsignedVarInt(reader)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
numMiniblocksInBlock, err := readUnsignedVarInt(reader)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
numValues, err := readUnsignedVarInt(reader)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
firstValueZigZag, err := readUnsignedVarInt(reader)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
v := int64(firstValueZigZag>>1) ^ (-int64(firstValueZigZag & 1))
|
|
result = append(result, v)
|
|
if numMiniblocksInBlock == 0 {
|
|
return nil, errors.New("parquet: zero mini blocks in block")
|
|
}
|
|
numValuesInMiniBlock := blockSize / numMiniblocksInBlock
|
|
|
|
bitWidths := make([]uint64, numMiniblocksInBlock)
|
|
for uint64(len(result)) < numValues {
|
|
minDeltaZigZag, err := readUnsignedVarInt(reader)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for i := 0; uint64(i) < numMiniblocksInBlock; i++ {
|
|
b, err := reader.ReadByte()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
bitWidths[i] = uint64(b)
|
|
}
|
|
|
|
minDelta := int64(minDeltaZigZag>>1) ^ (-int64(minDeltaZigZag & 1))
|
|
for i := 0; uint64(i) < numMiniblocksInBlock; i++ {
|
|
i64s, err := readBitPacked(reader, numValuesInMiniBlock/8, bitWidths[i])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for j := range i64s {
|
|
v += i64s[j] + minDelta
|
|
result = append(result, v)
|
|
}
|
|
}
|
|
}
|
|
|
|
return result[:numValues], nil
|
|
}
|
|
|
|
func readDeltaLengthByteArrays(reader *bytes.Reader) (result [][]byte, err error) {
|
|
i64s, err := readDeltaBinaryPackedInt(reader)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for i := 0; i < len(i64s); i++ {
|
|
arrays, err := readFixedLenByteArrays(reader, 1, uint64(i64s[i]))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result = append(result, arrays[0])
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func readDeltaByteArrays(reader *bytes.Reader) (result [][]byte, err error) {
|
|
i64s, err := readDeltaBinaryPackedInt(reader)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
suffixes, err := readDeltaLengthByteArrays(reader)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result = append(result, suffixes[0])
|
|
for i := 1; i < len(i64s); i++ {
|
|
prefixLength := i64s[i]
|
|
val := append([]byte{}, result[i-1][:prefixLength]...)
|
|
val = append(val, suffixes[i]...)
|
|
result = append(result, val)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func readDataPageValues(
|
|
bytesReader *bytes.Reader,
|
|
encoding parquet.Encoding,
|
|
dataType parquet.Type,
|
|
convertedType parquet.ConvertedType,
|
|
count, bitWidth uint64,
|
|
) (result interface{}, resultDataType parquet.Type, err error) {
|
|
switch encoding {
|
|
case parquet.Encoding_PLAIN:
|
|
result, err = readValues(bytesReader, dataType, count, bitWidth)
|
|
return result, dataType, err
|
|
|
|
case parquet.Encoding_PLAIN_DICTIONARY:
|
|
b, err := bytesReader.ReadByte()
|
|
if err != nil {
|
|
return nil, -1, err
|
|
}
|
|
|
|
i64s, err := readRLEBitPackedHybrid(bytesReader, uint64(bytesReader.Len()), uint64(b))
|
|
if err != nil {
|
|
return nil, -1, err
|
|
}
|
|
if len(i64s) < int(count) || count > math.MaxInt64/8 {
|
|
return nil, -1, errors.New("parquet: value out of range")
|
|
}
|
|
return i64s[:count], parquet.Type_INT64, nil
|
|
|
|
case parquet.Encoding_RLE:
|
|
i64s, err := readRLEBitPackedHybrid(bytesReader, 0, bitWidth)
|
|
if err != nil {
|
|
return nil, -1, err
|
|
}
|
|
|
|
if len(i64s) < int(count) || count > math.MaxInt64/8 {
|
|
return nil, -1, errors.New("parquet: value out of range")
|
|
}
|
|
i64s = i64s[:count]
|
|
|
|
if dataType == parquet.Type_INT32 {
|
|
return i64sToi32s(i64s), parquet.Type_INT32, nil
|
|
}
|
|
|
|
return i64s, parquet.Type_INT64, nil
|
|
|
|
case parquet.Encoding_BIT_PACKED:
|
|
return nil, -1, fmt.Errorf("deprecated parquet encoding %v", parquet.Encoding_BIT_PACKED)
|
|
|
|
case parquet.Encoding_DELTA_BINARY_PACKED:
|
|
i64s, err := readDeltaBinaryPackedInt(bytesReader)
|
|
if err != nil {
|
|
return nil, -1, err
|
|
}
|
|
|
|
if len(i64s) < int(count) || count > math.MaxInt64/8 {
|
|
return nil, -1, errors.New("parquet: value out of range")
|
|
}
|
|
i64s = i64s[:count]
|
|
|
|
if dataType == parquet.Type_INT32 {
|
|
return i64sToi32s(i64s), parquet.Type_INT32, nil
|
|
}
|
|
|
|
return i64s, parquet.Type_INT64, nil
|
|
|
|
case parquet.Encoding_DELTA_LENGTH_BYTE_ARRAY:
|
|
byteSlices, err := readDeltaLengthByteArrays(bytesReader)
|
|
if err != nil {
|
|
return nil, -1, err
|
|
}
|
|
if len(byteSlices) < int(count) || count > math.MaxInt64/24 {
|
|
return nil, -1, errors.New("parquet: value out of range")
|
|
}
|
|
|
|
return byteSlices[:count], parquet.Type_FIXED_LEN_BYTE_ARRAY, nil
|
|
|
|
case parquet.Encoding_DELTA_BYTE_ARRAY:
|
|
byteSlices, err := readDeltaByteArrays(bytesReader)
|
|
if err != nil {
|
|
return nil, -1, err
|
|
}
|
|
if len(byteSlices) < int(count) || count > math.MaxInt64/24 {
|
|
return nil, -1, errors.New("parquet: value out of range")
|
|
}
|
|
|
|
return byteSlices[:count], parquet.Type_FIXED_LEN_BYTE_ARRAY, nil
|
|
}
|
|
|
|
return nil, -1, fmt.Errorf("unsupported parquet encoding %v", encoding)
|
|
}
|
|
|