Set S3 Select record message length to 128KiB (#7475)

- Previously this limit was a little more than 1MiB, and it broke
  compatibility with AWS SDK Java causing a buffer overflow error.
master
Aditya Manthramurthy 6 years ago committed by kannappanr
parent c90999df98
commit b1b1d77893
  1. 40
      pkg/s3select/message.go

@ -61,6 +61,14 @@ var recordsHeader = []byte{
11, ':', 'e', 'v', 'e', 'n', 't', '-', 't', 'y', 'p', 'e', 7, 0, 7, 'R', 'e', 'c', 'o', 'r', 'd', 's', 11, ':', 'e', 'v', 'e', 'n', 't', '-', 't', 'y', 'p', 'e', 7, 0, 7, 'R', 'e', 'c', 'o', 'r', 'd', 's',
} }
const (
maxRecordMessageLength = 128 * 1024 // Chosen for compatibility with AWS JAVA SDK
)
var (
bufLength = payloadLenForMsgLen(maxRecordMessageLength)
)
// newRecordsMessage - creates new Records Message which can contain a single record, partial records, // newRecordsMessage - creates new Records Message which can contain a single record, partial records,
// or multiple records. Depending on the size of the result, a response can contain one or more of these messages. // or multiple records. Depending on the size of the result, a response can contain one or more of these messages.
// //
@ -74,6 +82,14 @@ func newRecordsMessage(payload []byte) []byte {
return genMessage(recordsHeader, payload) return genMessage(recordsHeader, payload)
} }
// payloadLenForMsgLen computes the length of the payload in a record
// message given the length of the message.
func payloadLenForMsgLen(messageLength int) int {
headerLength := len(recordsHeader)
payloadLength := messageLength - 4 - 4 - 4 - headerLength - 4
return payloadLength
}
// continuationMessage - S3 periodically sends this message to keep the TCP connection open. // continuationMessage - S3 periodically sends this message to keep the TCP connection open.
// These messages appear in responses at random. The client must detect the message type and process accordingly. // These messages appear in responses at random. The client must detect the message type and process accordingly.
// //
@ -292,16 +308,20 @@ func (writer *messageWriter) start() {
} }
writer.write(endMessage) writer.write(endMessage)
} else { } else {
// Write record payload to staging buffer for len(payload) > 0 {
freeSpace := bufLength - writer.payloadBufferIndex copiedLen := copy(writer.payloadBuffer[writer.payloadBufferIndex:], payload)
if len(payload) > freeSpace { writer.payloadBufferIndex += copiedLen
if !writer.flushRecords() { payload = payload[copiedLen:]
quitFlag = true
break // If buffer is filled, flush it now!
freeSpace := bufLength - writer.payloadBufferIndex
if freeSpace == 0 {
if !writer.flushRecords() {
quitFlag = true
break
}
} }
} }
copy(writer.payloadBuffer[writer.payloadBufferIndex:], payload)
writer.payloadBufferIndex += len(payload)
} }
case <-recordStagingTicker.C: case <-recordStagingTicker.C:
@ -331,10 +351,6 @@ func (writer *messageWriter) start() {
} }
} }
const (
bufLength = maxRecordSize
)
// Sends a single whole record. // Sends a single whole record.
func (writer *messageWriter) SendRecord(payload []byte) error { func (writer *messageWriter) SendRecord(payload []byte) error {
select { select {

Loading…
Cancel
Save