From 12353caf35b69f925748f2e4602f8a3bedf3a0fb Mon Sep 17 00:00:00 2001 From: Kanagaraj M Date: Wed, 17 Jul 2019 23:40:18 +0530 Subject: [PATCH] Fix: Support Unicode delimiters in s3 select (#7931) --- mint/run/core/aws-sdk-go/quick-tests.go | 183 ++++++++++++++++++++++++ pkg/s3select/csv/args.go | 8 +- 2 files changed, 187 insertions(+), 4 deletions(-) diff --git a/mint/run/core/aws-sdk-go/quick-tests.go b/mint/run/core/aws-sdk-go/quick-tests.go index b09e243cb..ebd2beca5 100644 --- a/mint/run/core/aws-sdk-go/quick-tests.go +++ b/mint/run/core/aws-sdk-go/quick-tests.go @@ -292,6 +292,188 @@ func testListObjects(s3Client *s3.S3) { successLogger(function, args, startTime).Info() } +func testSelectObject(s3Client *s3.S3) { + startTime := time.Now() + function := "testSelectObject" + bucket := randString(60, rand.NewSource(time.Now().UnixNano()), "aws-sdk-go-test-") + object1 := "object1.csv" + object2 := "object2.csv" + args := map[string]interface{}{ + "bucketName": bucket, + "objectName1": object1, + "objectName2": object2, + } + + _, err := s3Client.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(bucket), + }) + if err != nil { + failureLog(function, args, startTime, "", "AWS SDK Go CreateBucket Failed", err).Fatal() + return + } + + // Test comma field seperator + inputCsv1 := `year,gender,ethnicity,firstname,count,rank +2011,FEMALE,ASIAN AND PACIFIC ISLANDER,SOPHIA,119,1 +2011,FEMALE,ASIAN AND PACIFIC ISLANDER,CHLOE,106,2 +2011,FEMALE,ASIAN AND PACIFIC ISLANDER,EMILY,93,3 +2011,FEMALE,ASIAN AND PACIFIC ISLANDER,OLIVIA,89,4 +2011,FEMALE,ASIAN AND PACIFIC ISLANDER,EMMA,75,5 +2011,FEMALE,ASIAN AND PACIFIC ISLANDER,ISABELLA,67,6 +2011,FEMALE,ASIAN AND PACIFIC ISLANDER,TIFFANY,54,7 +2011,FEMALE,ASIAN AND PACIFIC ISLANDER,ASHLEY,52,8 +2011,FEMALE,ASIAN AND PACIFIC ISLANDER,FIONA,48,9 +2011,FEMALE,ASIAN AND PACIFIC ISLANDER,ANGELA,47,10 +` + + outputCSV1 := `2011 +2011 +2011 +2011 +2011 +2011 +2011 +2011 +2011 +2011 +` + + putInput1 := &s3.PutObjectInput{ + Body: aws.ReadSeekCloser(strings.NewReader(inputCsv1)), + Bucket: aws.String(bucket), + Key: aws.String(object1), + } + _, err = s3Client.PutObject(putInput1) + + defer cleanup(s3Client, bucket, object1, function, args, startTime, true) + + params := &s3.SelectObjectContentInput{ + Bucket: &bucket, + Key: &object1, + ExpressionType: aws.String(s3.ExpressionTypeSql), + Expression: aws.String("SELECT s._1 FROM S3Object s"), + RequestProgress: &s3.RequestProgress{}, + InputSerialization: &s3.InputSerialization{ + CompressionType: aws.String("NONE"), + CSV: &s3.CSVInput{ + FileHeaderInfo: aws.String(s3.FileHeaderInfoIgnore), + FieldDelimiter: aws.String(","), + RecordDelimiter: aws.String("\n"), + }, + }, + OutputSerialization: &s3.OutputSerialization{ + CSV: &s3.CSVOutput{}, + }, + } + + resp, err := s3Client.SelectObjectContent(params) + if err != nil { + failureLog(function, args, startTime, "", fmt.Sprintf("AWS SDK Go Select object failed %v", err), err).Fatal() + return + } + defer resp.EventStream.Close() + + payload := "" + for event := range resp.EventStream.Events() { + switch v := event.(type) { + case *s3.RecordsEvent: + // s3.RecordsEvent.Records is a byte slice of select records + payload = string(v.Payload) + } + } + + if err := resp.EventStream.Err(); err != nil { + failureLog(function, args, startTime, "", fmt.Sprintf("AWS SDK Go Select object failed %v", err), err).Fatal() + return + } + + if payload != outputCSV1 { + failureLog(function, args, startTime, "", fmt.Sprintf("AWS SDK Go Select object output mismatch %v", payload), errors.New("AWS S3 select object mismatch")).Fatal() + return + } + + // Test unicode field seperator + inputCsv2 := `"year"╦"gender"╦"ethnicity"╦"firstname"╦"count"╦"rank" +"2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"SOPHIA"╦"119"╦"1" +"2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"CHLOE"╦"106"╦"2" +"2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"EMILY"╦"93"╦"3" +"2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"OLIVIA"╦"89"╦"4" +"2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"EMMA"╦"75"╦"5" +"2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"ISABELLA"╦"67"╦"6" +"2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"TIFFANY"╦"54"╦"7" +"2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"ASHLEY"╦"52"╦"8" +"2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"FIONA"╦"48"╦"9" +"2011"╦"FEMALE"╦"ASIAN AND PACIFIC ISLANDER"╦"ANGELA"╦"47"╦"10" +` + + outputCSV2 := `2011 +2011 +2011 +2011 +2011 +2011 +2011 +2011 +2011 +2011 +` + + putInput2 := &s3.PutObjectInput{ + Body: aws.ReadSeekCloser(strings.NewReader(inputCsv2)), + Bucket: aws.String(bucket), + Key: aws.String(object2), + } + _, err = s3Client.PutObject(putInput2) + + defer cleanup(s3Client, bucket, object2, function, args, startTime, false) + + params2 := &s3.SelectObjectContentInput{ + Bucket: &bucket, + Key: &object2, + ExpressionType: aws.String(s3.ExpressionTypeSql), + Expression: aws.String("SELECT s._1 FROM S3Object s"), + RequestProgress: &s3.RequestProgress{}, + InputSerialization: &s3.InputSerialization{ + CompressionType: aws.String("NONE"), + CSV: &s3.CSVInput{ + FileHeaderInfo: aws.String(s3.FileHeaderInfoIgnore), + FieldDelimiter: aws.String("╦"), + RecordDelimiter: aws.String("\n"), + }, + }, + OutputSerialization: &s3.OutputSerialization{ + CSV: &s3.CSVOutput{}, + }, + } + + resp, err = s3Client.SelectObjectContent(params2) + if err != nil { + failureLog(function, args, startTime, "", fmt.Sprintf("AWS SDK Go Select object failed for unicode separator %v", err), err).Fatal() + return + } + defer resp.EventStream.Close() + + for event := range resp.EventStream.Events() { + switch v := event.(type) { + case *s3.RecordsEvent: + // s3.RecordsEvent.Records is a byte slice of select records + payload = string(v.Payload) + } + } + + if err := resp.EventStream.Err(); err != nil { + failureLog(function, args, startTime, "", fmt.Sprintf("AWS SDK Go Select object failed for unicode separator %v", err), err).Fatal() + return + } + + if payload != outputCSV2 { + failureLog(function, args, startTime, "", fmt.Sprintf("AWS SDK Go Select object output mismatch %v", payload), errors.New("AWS S3 select object mismatch")).Fatal() + return + } + + successLogger(function, args, startTime).Info() +} + func main() { endpoint := os.Getenv("SERVER_ENDPOINT") accessKey := os.Getenv("ACCESS_KEY") @@ -325,4 +507,5 @@ func main() { // execute tests testPresignedPutInvalidHash(s3Client) testListObjects(s3Client) + testSelectObject(s3Client) } diff --git a/pkg/s3select/csv/args.go b/pkg/s3select/csv/args.go index bd1980f2b..17d8bcfc2 100644 --- a/pkg/s3select/csv/args.go +++ b/pkg/s3select/csv/args.go @@ -72,7 +72,7 @@ func (args *ReaderArgs) UnmarshalXML(d *xml.Decoder, start xml.StartElement) err return errInvalidFileHeaderInfo(fmt.Errorf("invalid FileHeaderInfo '%v'", parsedArgs.FileHeaderInfo)) } - switch len(parsedArgs.RecordDelimiter) { + switch len([]rune(parsedArgs.RecordDelimiter)) { case 0: parsedArgs.RecordDelimiter = defaultRecordDelimiter case 1, 2: @@ -80,7 +80,7 @@ func (args *ReaderArgs) UnmarshalXML(d *xml.Decoder, start xml.StartElement) err return fmt.Errorf("invalid RecordDelimiter '%v'", parsedArgs.RecordDelimiter) } - switch len(parsedArgs.FieldDelimiter) { + switch len([]rune(parsedArgs.FieldDelimiter)) { case 0: parsedArgs.FieldDelimiter = defaultFieldDelimiter case 1: @@ -154,7 +154,7 @@ func (args *WriterArgs) UnmarshalXML(d *xml.Decoder, start xml.StartElement) err return errInvalidQuoteFields(fmt.Errorf("invalid QuoteFields '%v'", parsedArgs.QuoteFields)) } - switch len(parsedArgs.RecordDelimiter) { + switch len([]rune(parsedArgs.RecordDelimiter)) { case 0: parsedArgs.RecordDelimiter = defaultRecordDelimiter case 1, 2: @@ -162,7 +162,7 @@ func (args *WriterArgs) UnmarshalXML(d *xml.Decoder, start xml.StartElement) err return fmt.Errorf("invalid RecordDelimiter '%v'", parsedArgs.RecordDelimiter) } - switch len(parsedArgs.FieldDelimiter) { + switch len([]rune(parsedArgs.FieldDelimiter)) { case 0: parsedArgs.FieldDelimiter = defaultFieldDelimiter case 1: