Send progress only when requested by client in SelectObject (#6467)

master
Harshavardhana 6 years ago committed by Nitish Tiwari
parent 66fda7a37f
commit a0683d3c1f
  1. 5
      cmd/api-datatypes.go
  2. 1
      cmd/object-handlers.go
  3. 69
      pkg/s3select/input.go
  4. 4
      pkg/s3select/select_test.go

@ -69,7 +69,6 @@ type JSONType string
// Constants for JSONTypes. // Constants for JSONTypes.
const ( const (
JSONDocumentType JSONType = "Document" JSONDocumentType JSONType = "Document"
JSONStreamType = "Stream"
JSONLinesType = "Lines" JSONLinesType = "Lines"
) )
@ -80,6 +79,7 @@ type ObjectSelectRequest struct {
ExpressionType QueryExpressionType ExpressionType QueryExpressionType
InputSerialization struct { InputSerialization struct {
CompressionType SelectCompressionType CompressionType SelectCompressionType
Parquet *struct{}
CSV *struct { CSV *struct {
FileHeaderInfo CSVFileHeaderInfo FileHeaderInfo CSVFileHeaderInfo
RecordDelimiter string RecordDelimiter string
@ -104,6 +104,9 @@ type ObjectSelectRequest struct {
RecordDelimiter string RecordDelimiter string
} }
} }
RequestProgress struct {
Enabled bool
}
} }
// ObjectIdentifier carries key name for the object to delete. // ObjectIdentifier carries key name for the object to delete.

@ -242,6 +242,7 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r
OutputFieldDelimiter: selectReq.OutputSerialization.CSV.FieldDelimiter, OutputFieldDelimiter: selectReq.OutputSerialization.CSV.FieldDelimiter,
StreamSize: objInfo.Size, StreamSize: objInfo.Size,
HeaderOpt: selectReq.InputSerialization.CSV.FileHeaderInfo == CSVFileHeaderInfoUse, HeaderOpt: selectReq.InputSerialization.CSV.FileHeaderInfo == CSVFileHeaderInfoUse,
Progress: selectReq.RequestProgress.Enabled,
} }
s3s, err := s3select.NewInput(options) s3s, err := s3select.NewInput(options)
if err != nil { if err != nil {

@ -43,19 +43,19 @@ const (
// progress represents a struct that represents the format for XML of the // progress represents a struct that represents the format for XML of the
// progress messages // progress messages
type progress struct { type progress struct {
BytesScanned int64 `xml:"BytesScanned"` XMLName xml.Name `xml:"Progress" json:"-"`
BytesProcessed int64 `xml:"BytesProcessed"` BytesScanned int64 `xml:"BytesScanned"`
BytesReturned int64 `xml:"BytesReturned"` BytesProcessed int64 `xml:"BytesProcessed"`
Xmlns string `xml:"xmlns,attr"` BytesReturned int64 `xml:"BytesReturned"`
} }
// stats represents a struct that represents the format for XML of the stat // stats represents a struct that represents the format for XML of the stat
// messages // messages
type stats struct { type stats struct {
BytesScanned int64 `xml:"BytesScanned"` XMLName xml.Name `xml:"Stats" json:"-"`
BytesProcessed int64 `xml:"BytesProcessed"` BytesScanned int64 `xml:"BytesScanned"`
BytesReturned int64 `xml:"BytesReturned"` BytesProcessed int64 `xml:"BytesProcessed"`
Xmlns string `xml:"xmlns,attr"` BytesReturned int64 `xml:"BytesReturned"`
} }
// StatInfo is a struct that represents the // StatInfo is a struct that represents the
@ -111,6 +111,9 @@ type Options struct {
// Whether Header is "USE" or another // Whether Header is "USE" or another
HeaderOpt bool HeaderOpt bool
// Progress enabled, enable/disable progress messages.
Progress bool
} }
// NewInput sets up a new Input, the first row is read when this is run. // NewInput sets up a new Input, the first row is read when this is run.
@ -121,16 +124,18 @@ func NewInput(opts *Options) (*Input, error) {
myReader := opts.ReadFrom myReader := opts.ReadFrom
var tempBytesScanned int64 var tempBytesScanned int64
tempBytesScanned = 0 tempBytesScanned = 0
if opts.Compressed == "GZIP" { switch opts.Compressed {
case "GZIP":
tempBytesScanned = opts.StreamSize tempBytesScanned = opts.StreamSize
var err error var err error
if myReader, err = gzip.NewReader(opts.ReadFrom); err != nil { if myReader, err = gzip.NewReader(opts.ReadFrom); err != nil {
return nil, ErrTruncatedInput return nil, ErrTruncatedInput
} }
} else if opts.Compressed == "BZIP2" { case "BZIP2":
tempBytesScanned = opts.StreamSize tempBytesScanned = opts.StreamSize
myReader = bzip2.NewReader(opts.ReadFrom) myReader = bzip2.NewReader(opts.ReadFrom)
} }
// DelimitedReader treats custom record delimiter like `\r\n`,`\r`,`ab` etc and replaces it with `\n`. // DelimitedReader treats custom record delimiter like `\r\n`,`\r`,`ab` etc and replaces it with `\n`.
normalizedReader := ioutil.NewDelimitedReader(myReader, []rune(opts.RecordDelimiter)) normalizedReader := ioutil.NewDelimitedReader(myReader, []rune(opts.RecordDelimiter))
progress := &statInfo{ progress := &statInfo{
@ -138,6 +143,7 @@ func NewInput(opts *Options) (*Input, error) {
BytesProcessed: 0, BytesProcessed: 0,
BytesReturned: 0, BytesReturned: 0,
} }
reader := &Input{ reader := &Input{
options: opts, options: opts,
reader: csv.NewReader(normalizedReader), reader: csv.NewReader(normalizedReader),
@ -237,13 +243,11 @@ func (reader *Input) createStatXML() (string, error) {
reader.stats.BytesProcessed = reader.options.StreamSize reader.stats.BytesProcessed = reader.options.StreamSize
reader.stats.BytesScanned = reader.stats.BytesProcessed reader.stats.BytesScanned = reader.stats.BytesProcessed
} }
statXML := stats{ out, err := xml.Marshal(&stats{
BytesScanned: reader.stats.BytesScanned, BytesScanned: reader.stats.BytesScanned,
BytesProcessed: reader.stats.BytesProcessed, BytesProcessed: reader.stats.BytesProcessed,
BytesReturned: reader.stats.BytesReturned, BytesReturned: reader.stats.BytesReturned,
Xmlns: "", })
}
out, err := xml.Marshal(statXML)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -255,16 +259,14 @@ func (reader *Input) createProgressXML() (string, error) {
if reader.options.HasHeader { if reader.options.HasHeader {
reader.stats.BytesProcessed += processSize(reader.header) reader.stats.BytesProcessed += processSize(reader.header)
} }
if !(reader.options.Compressed != "NONE") { if reader.options.Compressed == "NONE" {
reader.stats.BytesScanned = reader.stats.BytesProcessed reader.stats.BytesScanned = reader.stats.BytesProcessed
} }
progressXML := &progress{ out, err := xml.Marshal(&progress{
BytesScanned: reader.stats.BytesScanned, BytesScanned: reader.stats.BytesScanned,
BytesProcessed: reader.stats.BytesProcessed, BytesProcessed: reader.stats.BytesProcessed,
BytesReturned: reader.stats.BytesReturned, BytesReturned: reader.stats.BytesReturned,
Xmlns: "", })
}
out, err := xml.Marshal(progressXML)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -355,20 +357,23 @@ func (reader *Input) Execute(writer io.Writer) error {
} }
case <-progressTicker.C: case <-progressTicker.C:
progressPayload, err := reader.createProgressXML() // Send progress messages only if requested by client.
if err != nil { if reader.options.Progress {
return err progressPayload, err := reader.createProgressXML()
} if err != nil {
progressMessage := reader.writeProgressMessage(progressPayload, curBuf) return err
_, err = progressMessage.WriteTo(writer) }
flusher, ok := writer.(http.Flusher) progressMessage := reader.writeProgressMessage(progressPayload, curBuf)
if ok { _, err = progressMessage.WriteTo(writer)
flusher.Flush() flusher, ok := writer.(http.Flusher)
} if ok {
if err != nil { flusher.Flush()
return err }
if err != nil {
return err
}
curBuf.Reset()
} }
curBuf.Reset()
case <-continuationTimer.C: case <-continuationTimer.C:
message := reader.writeContinuationMessage(curBuf) message := reader.writeContinuationMessage(curBuf)
_, err := message.WriteTo(writer) _, err := message.WriteTo(writer)

@ -677,7 +677,7 @@ func TestMyXMLFunction(t *testing.T) {
expectedStat int expectedStat int
expectedProgress int expectedProgress int
}{ }{
{159, 165}, {150, 156},
} }
for _, table := range tables { for _, table := range tables {
myVal, _ := s3s.createStatXML() myVal, _ := s3s.createStatXML()
@ -764,7 +764,7 @@ func TestMyInfoProtocolFunctions(t *testing.T) {
expectedStat int expectedStat int
expectedProgress int expectedProgress int
}{ }{
{myVal, myOtherVal, 242, 252}, {myVal, myOtherVal, 233, 243},
} }
for _, table := range tables { for _, table := range tables {
var currBuf = &bytes.Buffer{} var currBuf = &bytes.Buffer{}

Loading…
Cancel
Save