diff --git a/cmd/api-datatypes.go b/cmd/api-datatypes.go index 33bc17ff5..0f998b6e0 100644 --- a/cmd/api-datatypes.go +++ b/cmd/api-datatypes.go @@ -69,7 +69,6 @@ type JSONType string // Constants for JSONTypes. const ( JSONDocumentType JSONType = "Document" - JSONStreamType = "Stream" JSONLinesType = "Lines" ) @@ -80,6 +79,7 @@ type ObjectSelectRequest struct { ExpressionType QueryExpressionType InputSerialization struct { CompressionType SelectCompressionType + Parquet *struct{} CSV *struct { FileHeaderInfo CSVFileHeaderInfo RecordDelimiter string @@ -104,6 +104,9 @@ type ObjectSelectRequest struct { RecordDelimiter string } } + RequestProgress struct { + Enabled bool + } } // ObjectIdentifier carries key name for the object to delete. diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 729c970b5..610a0e5ee 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -242,6 +242,7 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r OutputFieldDelimiter: selectReq.OutputSerialization.CSV.FieldDelimiter, StreamSize: objInfo.Size, HeaderOpt: selectReq.InputSerialization.CSV.FileHeaderInfo == CSVFileHeaderInfoUse, + Progress: selectReq.RequestProgress.Enabled, } s3s, err := s3select.NewInput(options) if err != nil { diff --git a/pkg/s3select/input.go b/pkg/s3select/input.go index eb47e9669..b9c3d4ad3 100644 --- a/pkg/s3select/input.go +++ b/pkg/s3select/input.go @@ -43,19 +43,19 @@ const ( // progress represents a struct that represents the format for XML of the // progress messages type progress struct { - BytesScanned int64 `xml:"BytesScanned"` - BytesProcessed int64 `xml:"BytesProcessed"` - BytesReturned int64 `xml:"BytesReturned"` - Xmlns string `xml:"xmlns,attr"` + XMLName xml.Name `xml:"Progress" json:"-"` + BytesScanned int64 `xml:"BytesScanned"` + BytesProcessed int64 `xml:"BytesProcessed"` + BytesReturned int64 `xml:"BytesReturned"` } // stats represents a struct that represents the format for XML of the stat // messages type stats struct { - BytesScanned int64 `xml:"BytesScanned"` - BytesProcessed int64 `xml:"BytesProcessed"` - BytesReturned int64 `xml:"BytesReturned"` - Xmlns string `xml:"xmlns,attr"` + XMLName xml.Name `xml:"Stats" json:"-"` + BytesScanned int64 `xml:"BytesScanned"` + BytesProcessed int64 `xml:"BytesProcessed"` + BytesReturned int64 `xml:"BytesReturned"` } // StatInfo is a struct that represents the @@ -111,6 +111,9 @@ type Options struct { // Whether Header is "USE" or another HeaderOpt bool + + // Progress enabled, enable/disable progress messages. + Progress bool } // NewInput sets up a new Input, the first row is read when this is run. @@ -121,16 +124,18 @@ func NewInput(opts *Options) (*Input, error) { myReader := opts.ReadFrom var tempBytesScanned int64 tempBytesScanned = 0 - if opts.Compressed == "GZIP" { + switch opts.Compressed { + case "GZIP": tempBytesScanned = opts.StreamSize var err error if myReader, err = gzip.NewReader(opts.ReadFrom); err != nil { return nil, ErrTruncatedInput } - } else if opts.Compressed == "BZIP2" { + case "BZIP2": tempBytesScanned = opts.StreamSize myReader = bzip2.NewReader(opts.ReadFrom) } + // DelimitedReader treats custom record delimiter like `\r\n`,`\r`,`ab` etc and replaces it with `\n`. normalizedReader := ioutil.NewDelimitedReader(myReader, []rune(opts.RecordDelimiter)) progress := &statInfo{ @@ -138,6 +143,7 @@ func NewInput(opts *Options) (*Input, error) { BytesProcessed: 0, BytesReturned: 0, } + reader := &Input{ options: opts, reader: csv.NewReader(normalizedReader), @@ -237,13 +243,11 @@ func (reader *Input) createStatXML() (string, error) { reader.stats.BytesProcessed = reader.options.StreamSize reader.stats.BytesScanned = reader.stats.BytesProcessed } - statXML := stats{ + out, err := xml.Marshal(&stats{ BytesScanned: reader.stats.BytesScanned, BytesProcessed: reader.stats.BytesProcessed, BytesReturned: reader.stats.BytesReturned, - Xmlns: "", - } - out, err := xml.Marshal(statXML) + }) if err != nil { return "", err } @@ -255,16 +259,14 @@ func (reader *Input) createProgressXML() (string, error) { if reader.options.HasHeader { reader.stats.BytesProcessed += processSize(reader.header) } - if !(reader.options.Compressed != "NONE") { + if reader.options.Compressed == "NONE" { reader.stats.BytesScanned = reader.stats.BytesProcessed } - progressXML := &progress{ + out, err := xml.Marshal(&progress{ BytesScanned: reader.stats.BytesScanned, BytesProcessed: reader.stats.BytesProcessed, BytesReturned: reader.stats.BytesReturned, - Xmlns: "", - } - out, err := xml.Marshal(progressXML) + }) if err != nil { return "", err } @@ -355,20 +357,23 @@ func (reader *Input) Execute(writer io.Writer) error { } case <-progressTicker.C: - progressPayload, err := reader.createProgressXML() - if err != nil { - return err - } - progressMessage := reader.writeProgressMessage(progressPayload, curBuf) - _, err = progressMessage.WriteTo(writer) - flusher, ok := writer.(http.Flusher) - if ok { - flusher.Flush() - } - if err != nil { - return err + // Send progress messages only if requested by client. + if reader.options.Progress { + progressPayload, err := reader.createProgressXML() + if err != nil { + return err + } + progressMessage := reader.writeProgressMessage(progressPayload, curBuf) + _, err = progressMessage.WriteTo(writer) + flusher, ok := writer.(http.Flusher) + if ok { + flusher.Flush() + } + if err != nil { + return err + } + curBuf.Reset() } - curBuf.Reset() case <-continuationTimer.C: message := reader.writeContinuationMessage(curBuf) _, err := message.WriteTo(writer) diff --git a/pkg/s3select/select_test.go b/pkg/s3select/select_test.go index 8f03fb26e..f3da4b2b1 100644 --- a/pkg/s3select/select_test.go +++ b/pkg/s3select/select_test.go @@ -677,7 +677,7 @@ func TestMyXMLFunction(t *testing.T) { expectedStat int expectedProgress int }{ - {159, 165}, + {150, 156}, } for _, table := range tables { myVal, _ := s3s.createStatXML() @@ -764,7 +764,7 @@ func TestMyInfoProtocolFunctions(t *testing.T) { expectedStat int expectedProgress int }{ - {myVal, myOtherVal, 242, 252}, + {myVal, myOtherVal, 233, 243}, } for _, table := range tables { var currBuf = &bytes.Buffer{}