Feed correct part number to sio (#11326)

When offsets were specified we relied on the first part number to be correct.

Recalculate based on offset.
master
Klaus Post 3 years ago committed by GitHub
parent 4e6d717f39
commit 2167ba0111
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      cmd/object-api-utils.go
  2. 18
      cmd/object-api-utils_test.go

@ -519,7 +519,7 @@ func partNumberToRangeSpec(oi ObjectInfo, partNumber int) *HTTPRangeSpec {
// Returns the compressed offset which should be skipped. // Returns the compressed offset which should be skipped.
// If encrypted offsets are adjusted for encrypted block headers/trailers. // If encrypted offsets are adjusted for encrypted block headers/trailers.
// Since de-compression is after decryption encryption overhead is only added to compressedOffset. // Since de-compression is after decryption encryption overhead is only added to compressedOffset.
func getCompressedOffsets(objectInfo ObjectInfo, offset int64) (compressedOffset int64, partSkip int64) { func getCompressedOffsets(objectInfo ObjectInfo, offset int64) (compressedOffset int64, partSkip int64, firstPart int) {
var skipLength int64 var skipLength int64
var cumulativeActualSize int64 var cumulativeActualSize int64
var firstPartIdx int var firstPartIdx int
@ -535,12 +535,13 @@ func getCompressedOffsets(objectInfo ObjectInfo, offset int64) (compressedOffset
} }
} }
} }
if isEncryptedMultipart(objectInfo) && firstPartIdx > 0 { if isEncryptedMultipart(objectInfo) && firstPartIdx > 0 {
off, _, _, _, _, err := objectInfo.GetDecryptedRange(partNumberToRangeSpec(objectInfo, firstPartIdx)) off, _, _, _, _, err := objectInfo.GetDecryptedRange(partNumberToRangeSpec(objectInfo, firstPartIdx))
logger.LogIf(context.Background(), err) logger.LogIf(context.Background(), err)
compressedOffset += off compressedOffset += off
} }
return compressedOffset, offset - skipLength return compressedOffset, offset - skipLength, firstPartIdx
} }
// GetObjectReader is a type that wraps a reader with a lock to // GetObjectReader is a type that wraps a reader with a lock to
@ -608,6 +609,7 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
if opts.TransitionStatus == lifecycle.TransitionPending && isEncrypted { if opts.TransitionStatus == lifecycle.TransitionPending && isEncrypted {
isEncrypted = false isEncrypted = false
} }
var firstPart = opts.PartNumber
var skipLen int64 var skipLen int64
// Calculate range to read (different for encrypted/compressed objects) // Calculate range to read (different for encrypted/compressed objects)
switch { switch {
@ -626,7 +628,7 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
return nil, 0, 0, err return nil, 0, 0, err
} }
// In case of range based queries on multiparts, the offset and length are reduced. // In case of range based queries on multiparts, the offset and length are reduced.
off, decOff = getCompressedOffsets(oi, off) off, decOff, firstPart = getCompressedOffsets(oi, off)
decLength = length decLength = length
length = oi.Size - off length = oi.Size - off
@ -652,7 +654,7 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
if isEncrypted { if isEncrypted {
copySource := h.Get(xhttp.AmzServerSideEncryptionCopyCustomerAlgorithm) != "" copySource := h.Get(xhttp.AmzServerSideEncryptionCopyCustomerAlgorithm) != ""
// Attach decrypter on inputReader // Attach decrypter on inputReader
inputReader, err = DecryptBlocksRequestR(inputReader, h, 0, opts.PartNumber, oi, copySource) inputReader, err = DecryptBlocksRequestR(inputReader, h, 0, firstPart, oi, copySource)
if err != nil { if err != nil {
// Call the cleanup funcs // Call the cleanup funcs
for i := len(cFns) - 1; i >= 0; i-- { for i := len(cFns) - 1; i >= 0; i-- {
@ -660,6 +662,8 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
} }
return nil, err return nil, err
} }
oi.Size = decLength
} }
// Decompression reader. // Decompression reader.
s2Reader := s2.NewReader(inputReader) s2Reader := s2.NewReader(inputReader)

@ -519,8 +519,9 @@ func TestGetCompressedOffsets(t *testing.T) {
offset int64 offset int64
startOffset int64 startOffset int64
snappyStartOffset int64 snappyStartOffset int64
firstPart int
}{ }{
{ 0: {
objInfo: ObjectInfo{ objInfo: ObjectInfo{
Parts: []ObjectPartInfo{ Parts: []ObjectPartInfo{
{ {
@ -536,8 +537,9 @@ func TestGetCompressedOffsets(t *testing.T) {
offset: 79109865, offset: 79109865,
startOffset: 39235668, startOffset: 39235668,
snappyStartOffset: 12001001, snappyStartOffset: 12001001,
firstPart: 1,
}, },
{ 1: {
objInfo: ObjectInfo{ objInfo: ObjectInfo{
Parts: []ObjectPartInfo{ Parts: []ObjectPartInfo{
{ {
@ -554,7 +556,7 @@ func TestGetCompressedOffsets(t *testing.T) {
startOffset: 0, startOffset: 0,
snappyStartOffset: 19109865, snappyStartOffset: 19109865,
}, },
{ 2: {
objInfo: ObjectInfo{ objInfo: ObjectInfo{
Parts: []ObjectPartInfo{ Parts: []ObjectPartInfo{
{ {
@ -573,14 +575,18 @@ func TestGetCompressedOffsets(t *testing.T) {
}, },
} }
for i, test := range testCases { for i, test := range testCases {
startOffset, snappyStartOffset := getCompressedOffsets(test.objInfo, test.offset) startOffset, snappyStartOffset, firstPart := getCompressedOffsets(test.objInfo, test.offset)
if startOffset != test.startOffset { if startOffset != test.startOffset {
t.Errorf("Test %d - expected startOffset %d but received %d", t.Errorf("Test %d - expected startOffset %d but received %d",
i+1, test.startOffset, startOffset) i, test.startOffset, startOffset)
} }
if snappyStartOffset != test.snappyStartOffset { if snappyStartOffset != test.snappyStartOffset {
t.Errorf("Test %d - expected snappyOffset %d but received %d", t.Errorf("Test %d - expected snappyOffset %d but received %d",
i+1, test.snappyStartOffset, snappyStartOffset) i, test.snappyStartOffset, snappyStartOffset)
}
if firstPart != test.firstPart {
t.Errorf("Test %d - expected firstPart %d but received %d",
i, test.firstPart, firstPart)
} }
} }
} }

Loading…
Cancel
Save