diff --git a/pkg/s3select/internal/parquet-go/page.go b/pkg/s3select/internal/parquet-go/page.go index 22c25793e..22f89fe80 100644 --- a/pkg/s3select/internal/parquet-go/page.go +++ b/pkg/s3select/internal/parquet-go/page.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "io" "math" "strings" @@ -108,24 +109,39 @@ func readPage( } repLevelsLen = pageHeader.DataPageHeaderV2.GetRepetitionLevelsByteLength() repLevelsBuf = make([]byte, repLevelsLen) - if _, err = thriftReader.Read(repLevelsBuf); err != nil { + + n, err := io.ReadFull(thriftReader, repLevelsBuf) + if err != nil { return nil, err } + if n != int(repLevelsLen) { + return nil, fmt.Errorf("expected parquet header repetition levels %d, got %d", repLevelsLen, n) + } defLevelsLen = pageHeader.DataPageHeaderV2.GetDefinitionLevelsByteLength() defLevelsBuf = make([]byte, defLevelsLen) - if _, err = thriftReader.Read(defLevelsBuf); err != nil { + + n, err = io.ReadFull(thriftReader, defLevelsBuf) + if err != nil { return nil, err } + if n != int(defLevelsLen) { + return nil, fmt.Errorf("expected parquet header definition levels %d, got %d", defLevelsLen, n) + } } dbLen := pageHeader.GetCompressedPageSize() - repLevelsLen - defLevelsLen if dbLen < 0 { return nil, errors.New("parquet: negative data length") } + dataBuf := make([]byte, dbLen) - if _, err = thriftReader.Read(dataBuf); err != nil { + n, err := io.ReadFull(thriftReader, dataBuf) + if err != nil { return nil, err } + if n != int(dbLen) { + return nil, fmt.Errorf("expected parquet data buffer %d, got %d", dbLen, n) + } if dataBuf, err = compressionCodec(metadata.GetCodec()).uncompress(dataBuf); err != nil { return nil, err