/* * Minio Cloud Storage, (C) 2019 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package schema import ( "fmt" "strings" "github.com/minio/minio/pkg/s3select/internal/parquet-go/gen-go/parquet" ) func updateMaxDLRL(schemaMap map[string]*Element, maxDL, maxRL int64) { for _, element := range schemaMap { element.MaxDefinitionLevel = maxDL element.MaxRepetitionLevel = maxRL if *element.RepetitionType != parquet.FieldRepetitionType_REQUIRED { element.MaxDefinitionLevel++ if *element.RepetitionType == parquet.FieldRepetitionType_REPEATED { element.MaxRepetitionLevel++ } } if element.Children != nil { updateMaxDLRL(element.Children.schemaMap, element.MaxDefinitionLevel, element.MaxRepetitionLevel) } } } func toParquetSchema(tree *Tree, treePrefix string, schemaPrefix string, schemaList *[]*parquet.SchemaElement, valueElements *[]*Element) (err error) { tree.Range(func(name string, element *Element) bool { pathInTree := name if treePrefix != "" { pathInTree = treePrefix + "." + name } if element.Type == nil && element.ConvertedType == nil && element.Children == nil { err = fmt.Errorf("%v: group element must have children", pathInTree) return false } if element.ConvertedType != nil { switch *element.ConvertedType { case parquet.ConvertedType_LIST: // Supported structure. // group (LIST) { // REPEATED group list { // element; // } // } if element.Type != nil { err = fmt.Errorf("%v: type must be nil for LIST ConvertedType", pathInTree) return false } if element.Children == nil || element.Children.Length() != 1 { err = fmt.Errorf("%v: children must have one element only for LIST ConvertedType", pathInTree) return false } listElement, ok := element.Children.Get("list") if !ok { err = fmt.Errorf("%v: missing group element 'list' for LIST ConvertedType", pathInTree) return false } if listElement.Name != "list" { err = fmt.Errorf("%v.list: name must be 'list'", pathInTree) return false } if *listElement.RepetitionType != parquet.FieldRepetitionType_REPEATED { err = fmt.Errorf("%v.list: repetition type must be REPEATED type", pathInTree) return false } if listElement.Type != nil || listElement.ConvertedType != nil { err = fmt.Errorf("%v.list: type and converted type must be nil", pathInTree) return false } if listElement.Children == nil || listElement.Children.Length() != 1 { err = fmt.Errorf("%v.list.element: not found", pathInTree) return false } valueElement, ok := listElement.Children.Get("element") if !ok { err = fmt.Errorf("%v.list.element: not found", pathInTree) return false } if valueElement.Name != "element" { err = fmt.Errorf("%v.list.element: name must be 'element'", pathInTree) return false } case parquet.ConvertedType_MAP: // Supported structure: // group (MAP) { // REPEATED group key_value { // REQUIRED key; // value; // } // } if element.Type != nil { err = fmt.Errorf("%v: type must be nil for MAP ConvertedType", pathInTree) return false } if element.Children == nil || element.Children.Length() != 1 { err = fmt.Errorf("%v: children must have one element only for MAP ConvertedType", pathInTree) return false } keyValueElement, ok := element.Children.Get("key_value") if !ok { err = fmt.Errorf("%v: missing group element 'key_value' for MAP ConvertedType", pathInTree) return false } if keyValueElement.Name != "key_value" { err = fmt.Errorf("%v.key_value: name must be 'key_value'", pathInTree) return false } if *keyValueElement.RepetitionType != parquet.FieldRepetitionType_REPEATED { err = fmt.Errorf("%v.key_value: repetition type must be REPEATED type", pathInTree) return false } if keyValueElement.Children == nil || keyValueElement.Children.Length() < 1 || keyValueElement.Children.Length() > 2 { err = fmt.Errorf("%v.key_value: children must have 'key' and optionally 'value' elements for MAP ConvertedType", pathInTree) return false } keyElement, ok := keyValueElement.Children.Get("key") if !ok { err = fmt.Errorf("%v.key_value: missing 'key' element for MAP ConvertedType", pathInTree) return false } if keyElement.Name != "key" { err = fmt.Errorf("%v.key_value.key: name must be 'key'", pathInTree) return false } if *keyElement.RepetitionType != parquet.FieldRepetitionType_REQUIRED { err = fmt.Errorf("%v.key_value: repetition type must be REQUIRED type", pathInTree) return false } if keyValueElement.Children.Length() == 2 { valueElement, ok := keyValueElement.Children.Get("value") if !ok { err = fmt.Errorf("%v.key_value: second element must be 'value' element for MAP ConvertedType", pathInTree) return false } if valueElement.Name != "value" { err = fmt.Errorf("%v.key_value.value: name must be 'value'", pathInTree) return false } } case parquet.ConvertedType_UTF8, parquet.ConvertedType_UINT_8, parquet.ConvertedType_UINT_16: fallthrough case parquet.ConvertedType_UINT_32, parquet.ConvertedType_UINT_64, parquet.ConvertedType_INT_8: fallthrough case parquet.ConvertedType_INT_16, parquet.ConvertedType_INT_32, parquet.ConvertedType_INT_64: if element.Type == nil { err = fmt.Errorf("%v: ConvertedType %v must have Type value", pathInTree, element.ConvertedType) return false } default: err = fmt.Errorf("%v: unsupported ConvertedType %v", pathInTree, element.ConvertedType) return false } } element.PathInTree = pathInTree element.PathInSchema = element.Name if schemaPrefix != "" { element.PathInSchema = schemaPrefix + "." + element.Name } if element.Type != nil { *valueElements = append(*valueElements, element) } *schemaList = append(*schemaList, &element.SchemaElement) if element.Children != nil { element.numChildren = int32(element.Children.Length()) err = toParquetSchema(element.Children, element.PathInTree, element.PathInSchema, schemaList, valueElements) } return (err == nil) }) return err } // Tree - represents tree of schema. Tree preserves order in which elements are added. type Tree struct { schemaMap map[string]*Element keys []string readOnly bool } // String - stringify this tree. func (tree *Tree) String() string { var s []string tree.Range(func(name string, element *Element) bool { s = append(s, fmt.Sprintf("%v: %v", name, element)) return true }) return "{" + strings.Join(s, ", ") + "}" } // Length - returns length of tree. func (tree *Tree) Length() int { return len(tree.keys) } func (tree *Tree) travel(pathSegments []string) (pathSegmentIndex int, pathSegment string, currElement *Element, parentTree *Tree, found bool) { parentTree = tree for pathSegmentIndex, pathSegment = range pathSegments { if tree == nil { found = false break } var tmpCurrElement *Element if tmpCurrElement, found = tree.schemaMap[pathSegment]; !found { break } currElement = tmpCurrElement parentTree = tree tree = currElement.Children } return } // ReadOnly - returns whether this tree is read only or not. func (tree *Tree) ReadOnly() bool { return tree.readOnly } // Get - returns the element stored for name. func (tree *Tree) Get(name string) (element *Element, ok bool) { pathSegments := strings.Split(name, ".") for _, pathSegment := range pathSegments { if tree == nil { element = nil ok = false break } if element, ok = tree.schemaMap[pathSegment]; !ok { break } tree = element.Children } return element, ok } // Set - adds or sets element to name. func (tree *Tree) Set(name string, element *Element) error { if tree.readOnly { return fmt.Errorf("read only tree") } pathSegments := strings.Split(name, ".") if err := validataPathSegments(pathSegments); err != nil { return err } i, pathSegment, currElement, parentTree, found := tree.travel(pathSegments) if !found { if i != len(pathSegments)-1 { return fmt.Errorf("parent %v does not exist", strings.Join(pathSegments[:i+1], ".")) } if currElement == nil { parentTree = tree } else { if currElement.Type != nil { return fmt.Errorf("parent %v is not group element", strings.Join(pathSegments[:i], ".")) } if currElement.Children == nil { currElement.Children = NewTree() } parentTree = currElement.Children } parentTree.keys = append(parentTree.keys, pathSegment) } parentTree.schemaMap[pathSegment] = element return nil } // Delete - deletes name and its element. func (tree *Tree) Delete(name string) { if tree.readOnly { panic(fmt.Errorf("read only tree")) } pathSegments := strings.Split(name, ".") _, pathSegment, _, parentTree, found := tree.travel(pathSegments) if found { for i := range parentTree.keys { if parentTree.keys[i] == pathSegment { copy(parentTree.keys[i:], parentTree.keys[i+1:]) parentTree.keys = parentTree.keys[:len(parentTree.keys)-1] break } } delete(parentTree.schemaMap, pathSegment) } } // Range - calls f sequentially for each name and its element. If f returns false, range stops the iteration. func (tree *Tree) Range(f func(name string, element *Element) bool) { for _, name := range tree.keys { if !f(name, tree.schemaMap[name]) { break } } } // ToParquetSchema - returns list of parquet SchemaElement and list of elements those stores values. func (tree *Tree) ToParquetSchema() (schemaList []*parquet.SchemaElement, valueElements []*Element, err error) { if tree.readOnly { return nil, nil, fmt.Errorf("read only tree") } updateMaxDLRL(tree.schemaMap, 0, 0) var schemaElements []*parquet.SchemaElement if err = toParquetSchema(tree, "", "", &schemaElements, &valueElements); err != nil { return nil, nil, err } tree.readOnly = true numChildren := int32(len(tree.keys)) schemaList = append(schemaList, &parquet.SchemaElement{ Name: "schema", RepetitionType: parquet.FieldRepetitionTypePtr(parquet.FieldRepetitionType_REQUIRED), NumChildren: &numChildren, }) schemaList = append(schemaList, schemaElements...) return schemaList, valueElements, nil } // NewTree - creates new schema tree. func NewTree() *Tree { return &Tree{ schemaMap: make(map[string]*Element), } }