Optimize string processing in select (#6593)

Reduce allocations during string concatenation and simplify some
processing code.
master
Aditya Manthramurthy 6 years ago committed by kannappanr
parent 54ae364def
commit e3eec89d24
  1. 84
      pkg/s3select/helpers.go
  2. 46
      pkg/s3select/select.go

@ -55,8 +55,8 @@ func stringInSlice(x string, list []string) bool {
// This function returns the index of a string in a list // This function returns the index of a string in a list
func stringIndex(a string, list []string) int { func stringIndex(a string, list []string) int {
for i := range list { for i, v := range list {
if list[i] == a { if v == a {
return i return i
} }
} }
@ -65,10 +65,8 @@ func stringIndex(a string, list []string) int {
// Returns a true or false, whether a string can be represented as an int. // Returns a true or false, whether a string can be represented as an int.
func representsInt(s string) bool { func representsInt(s string) bool {
if _, err := strconv.Atoi(s); err == nil { _, err := strconv.Atoi(s)
return true return err == nil
}
return false
} }
// The function below processes the where clause into an acutal boolean given a // The function below processes the where clause into an acutal boolean given a
@ -236,16 +234,16 @@ func checkValidOperator(operator string) error {
} }
// checkStringType converts the value from the csv to the appropriate one. // checkStringType converts the value from the csv to the appropriate one.
func checkStringType(myTblVal string) interface{} { func checkStringType(tblVal string) interface{} {
myInt, isInt := strconv.Atoi(myTblVal) intVal, err := strconv.Atoi(tblVal)
myFloat, isFloat := strconv.ParseFloat(myTblVal, 64) if err == nil {
if isInt == nil { return intVal
return myInt
} else if isFloat == nil {
return myFloat
} else {
return myTblVal
} }
floatVal, err := strconv.ParseFloat(tblVal, 64)
if err == nil {
return floatVal
}
return tblVal
} }
// stringEval is for evaluating the state of string comparison. // stringEval is for evaluating the state of string comparison.
@ -627,31 +625,6 @@ func (reader *Input) whereClauseNameErrs(whereClause interface{}, alias string)
return nil return nil
} }
// qualityCheck ensures the row has enough separators.
func qualityCheck(row string, amountOfSep int, sep string) string {
for i := 0; i < amountOfSep; i++ {
row = row + sep
}
return row
}
// writeRow helps to write the row regardless of how many entries.
func writeRow(myRow string, myEntry string, delimiter string, numOfReqCols int) string {
if myEntry == "" && len(myRow) == 0 && numOfReqCols == 1 {
return myEntry
}
if myEntry == "" && len(myRow) == 0 {
return myEntry + delimiter
}
if len(myRow) == 1 && myRow[0] == ',' {
return myRow + myEntry
}
if len(myRow) == 0 {
return myEntry
}
return myRow + delimiter + myEntry
}
// colNameErrs is a function which makes sure that the headers are requested are // colNameErrs is a function which makes sure that the headers are requested are
// present in the file otherwise it throws an error. // present in the file otherwise it throws an error.
func (reader *Input) colNameErrs(columnNames []string) error { func (reader *Input) colNameErrs(columnNames []string) error {
@ -677,24 +650,23 @@ func (reader *Input) colNameErrs(columnNames []string) error {
} }
// aggFuncToStr converts an array of floats into a properly formatted string. // aggFuncToStr converts an array of floats into a properly formatted string.
func (reader *Input) aggFuncToStr(myAggVals []float64) string { func (reader *Input) aggFuncToStr(aggVals []float64) string {
var myRow string // Define a number formatting function
var aggregateval string numToStr := func(f float64) string {
if myAggVals[0] == math.Trunc(myAggVals[0]) { if f == math.Trunc(f) {
myRow = strconv.FormatInt(int64(myAggVals[0]), 10) return strconv.FormatInt(int64(f), 10)
} else {
myRow = strconv.FormatFloat(myAggVals[0], 'f', 6, 64)
}
for i := 1; i < len(myAggVals); i++ {
if myAggVals[i] == math.Trunc(myAggVals[i]) {
aggregateval = strconv.FormatInt(int64(myAggVals[i]), 10)
} else {
aggregateval = strconv.FormatFloat(myAggVals[i], 'f', 6, 64)
} }
myRow = myRow + reader.options.OutputFieldDelimiter + aggregateval return strconv.FormatFloat(f, 'f', 6, 64)
}
// Display all whole numbers in aggVals as integers
vals := make([]string, len(aggVals))
for i, v := range aggVals {
vals[i] = numToStr(v)
} }
return myRow
// Intersperse field delimiter
return strings.Join(vals, reader.options.OutputFieldDelimiter)
} }
// checkForDuplicates ensures we do not have an ambigious column name. // checkForDuplicates ensures we do not have an ambigious column name.

@ -284,11 +284,7 @@ func (reader *Input) processSelectReq(reqColNames []string, alias string, whereC
// printAsterix helps to print out the entire row if an asterix is used. // printAsterix helps to print out the entire row if an asterix is used.
func (reader *Input) printAsterix(record []string) string { func (reader *Input) printAsterix(record []string) string {
myRow := record[0] return strings.Join(record, reader.options.OutputFieldDelimiter)
for i := 1; i < len(record); i++ {
myRow = myRow + reader.options.OutputFieldDelimiter + record[i]
}
return myRow
} }
// processColumnNames is a function which allows for cleaning of column names. // processColumnNames is a function which allows for cleaning of column names.
@ -304,7 +300,7 @@ func (reader *Input) processColumnNames(reqColNames []string, alias string) erro
// processColNameIndex is the function which creates the row for an index based // processColNameIndex is the function which creates the row for an index based
// query. // query.
func (reader *Input) processColNameIndex(record []string, reqColNames []string, columns []string) (string, error) { func (reader *Input) processColNameIndex(record []string, reqColNames []string, columns []string) (string, error) {
myRow := "" row := make([]string, len(reqColNames))
for i := 0; i < len(reqColNames); i++ { for i := 0; i < len(reqColNames); i++ {
// COALESCE AND NULLIF do not support index based access. // COALESCE AND NULLIF do not support index based access.
if reqColNames[0] == "0" { if reqColNames[0] == "0" {
@ -312,54 +308,48 @@ func (reader *Input) processColNameIndex(record []string, reqColNames []string,
} }
// Subtract 1 because AWS Indexing is not 0 based, it starts at 1. // Subtract 1 because AWS Indexing is not 0 based, it starts at 1.
mytempindex, err := strconv.Atoi(reqColNames[i]) mytempindex, err := strconv.Atoi(reqColNames[i])
if err != nil {
return "", ErrMissingHeaders
}
mytempindex = mytempindex - 1 mytempindex = mytempindex - 1
if mytempindex > len(columns) { if mytempindex > len(columns) {
return "", ErrInvalidColumnIndex return "", ErrInvalidColumnIndex
} }
myRow = writeRow(myRow, record[mytempindex], reader.options.OutputFieldDelimiter, len(reqColNames)) row[i] = record[mytempindex]
if err != nil {
return "", ErrMissingHeaders
}
} }
if len(myRow) > 1000000 { rowStr := strings.Join(row, reader.options.OutputFieldDelimiter)
if len(rowStr) > 1000000 {
return "", ErrOverMaxRecordSize return "", ErrOverMaxRecordSize
} }
if strings.Count(myRow, reader.options.OutputFieldDelimiter) != len(reqColNames)-1 { return rowStr, nil
myRow = qualityCheck(myRow, len(reqColNames)-1-strings.Count(myRow, reader.options.OutputFieldDelimiter), reader.options.OutputFieldDelimiter)
}
return myRow, nil
} }
// processColNameLiteral is the function which creates the row for an name based // processColNameLiteral is the function which creates the row for an name based
// query. // query.
func (reader *Input) processColNameLiteral(record []string, reqColNames []string, columns []string, columnsMap map[string]int, myFunc *SelectFuncs) (string, error) { func (reader *Input) processColNameLiteral(record []string, reqColNames []string, columns []string, columnsMap map[string]int, myFunc *SelectFuncs) (string, error) {
myRow := "" row := make([]string, len(reqColNames))
for i := 0; i < len(reqColNames); i++ { for i := 0; i < len(reqColNames); i++ {
// this is the case to deal with COALESCE. // this is the case to deal with COALESCE.
if reqColNames[i] == "" && isValidFunc(myFunc.index, i) { if reqColNames[i] == "" && isValidFunc(myFunc.index, i) {
myVal := evaluateFuncExpr(myFunc.funcExpr[i], "", record, columnsMap) row[i] = evaluateFuncExpr(myFunc.funcExpr[i], "", record, columnsMap)
myRow = writeRow(myRow, myVal, reader.options.OutputFieldDelimiter, len(reqColNames))
continue continue
} }
myTempIndex, notFound := columnsMap[trimQuotes(reqColNames[i])] myTempIndex, notFound := columnsMap[trimQuotes(reqColNames[i])]
if !notFound { if !notFound {
return "", ErrMissingHeaders return "", ErrMissingHeaders
} }
myRow = writeRow(myRow, record[myTempIndex], reader.options.OutputFieldDelimiter, len(reqColNames)) row[i] = record[myTempIndex]
} }
if len(myRow) > 1000000 { rowStr := strings.Join(row, reader.options.OutputFieldDelimiter)
if len(rowStr) > 1000000 {
return "", ErrOverMaxRecordSize return "", ErrOverMaxRecordSize
} }
if strings.Count(myRow, reader.options.OutputFieldDelimiter) != len(reqColNames)-1 { return rowStr, nil
myRow = qualityCheck(myRow, len(reqColNames)-1-strings.Count(myRow, reader.options.OutputFieldDelimiter), reader.options.OutputFieldDelimiter)
}
return myRow, nil
} }
// aggregationFunctions is a function which performs the actual aggregation // aggregationFunctions performs the actual aggregation methods on the
// methods on the given row, it uses an array defined the the main parsing // given row, it uses an array defined for the main parsing function
// function to keep track of values. // to keep track of values.
func aggregationFunctions(counter int, filtrCount int, myAggVals []float64, columnsMap map[string]int, storeReqCols []string, storeFunctions []string, record []string) error { func aggregationFunctions(counter int, filtrCount int, myAggVals []float64, columnsMap map[string]int, storeReqCols []string, storeFunctions []string, record []string) error {
for i := 0; i < len(storeFunctions); i++ { for i := 0; i < len(storeFunctions); i++ {
if storeFunctions[i] == "" { if storeFunctions[i] == "" {

Loading…
Cancel
Save