s3: Fix reading GET with partNumber specified (#11032)

partNumber was miscalculting the start and end of parts when partNumber
query is specified in the GET request. This commit fixes it and also
fixes the ContentRange header in that case.
master
Anis Elleuch 4 years ago committed by GitHub
parent 04848dfa1c
commit a51488cbaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      cmd/api-headers.go
  2. 22
      cmd/object-api-utils.go
  3. 125
      cmd/object-handlers_test.go

@ -157,20 +157,14 @@ func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, rs *HTTPRangeSp
}
if opts.PartNumber > 0 {
var start, end int64
for i := 0; i < len(objInfo.Parts) && i < opts.PartNumber; i++ {
start = end
end = start + objInfo.Parts[i].ActualSize - 1
}
rs = &HTTPRangeSpec{Start: start, End: end}
rangeLen = end - start + 1
} else {
// for providing ranged content
rs = partNumberToRangeSpec(objInfo, opts.PartNumber)
}
// For providing ranged content
start, rangeLen, err = rs.GetOffsetLength(totalObjectSize)
if err != nil {
return err
}
}
// Set content length.
w.Header().Set(xhttp.ContentLength, strconv.FormatInt(rangeLen, 10))

@ -502,6 +502,21 @@ func getPartFile(entriesTrie *trie.Trie, partNumber int, etag string) (partFile
return partFile
}
func partNumberToRangeSpec(oi ObjectInfo, partNumber int) *HTTPRangeSpec {
if oi.Size == 0 || len(oi.Parts) == 0 {
return nil
}
var start int64
var end = int64(-1)
for i := 0; i < len(oi.Parts) && i < partNumber; i++ {
start = end + 1
end = start + oi.Parts[i].ActualSize - 1
}
return &HTTPRangeSpec{Start: start, End: end}
}
// Returns the compressed offset which should be skipped.
func getCompressedOffsets(objectInfo ObjectInfo, offset int64) (int64, int64) {
var compressedOffset int64
@ -571,12 +586,7 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
fn ObjReaderFn, off, length int64, err error) {
if rs == nil && opts.PartNumber > 0 {
var start, end int64
for i := 0; i < len(oi.Parts) && i < opts.PartNumber; i++ {
start = end
end = start + oi.Parts[i].ActualSize - 1
}
rs = &HTTPRangeSpec{Start: start, End: end}
rs = partNumberToRangeSpec(oi, opts.PartNumber)
}
// Call the clean-up functions immediately in case of exit

@ -844,6 +844,131 @@ func testAPIGetObjectWithMPHandler(obj ObjectLayer, instanceType, bucketName str
}
// Wrapper for calling GetObject API handler tests for both Erasure multiple disks and FS single drive setup.
func TestAPIGetObjectWithPartNumberHandler(t *testing.T) {
globalPolicySys = NewPolicySys()
defer func() { globalPolicySys = nil }()
defer DetectTestLeak(t)()
ExecObjectLayerAPITest(t, testAPIGetObjectWithPartNumberHandler, []string{"NewMultipart", "PutObjectPart", "CompleteMultipart", "GetObject", "PutObject"})
}
func testAPIGetObjectWithPartNumberHandler(obj ObjectLayer, instanceType, bucketName string, apiRouter http.Handler,
credentials auth.Credentials, t *testing.T) {
// Set SSL to on to do encryption tests
globalIsSSL = true
defer func() { globalIsSSL = false }()
var (
oneMiB int64 = 1024 * 1024
key32Bytes = generateBytesData(32 * humanize.Byte)
key32BytesMd5 = md5.Sum(key32Bytes)
metaWithSSEC = map[string]string{
crypto.SSECAlgorithm: crypto.SSEAlgorithmAES256,
crypto.SSECKey: base64.StdEncoding.EncodeToString(key32Bytes),
crypto.SSECKeyMD5: base64.StdEncoding.EncodeToString(key32BytesMd5[:]),
}
mapCopy = func(m map[string]string) map[string]string {
r := make(map[string]string, len(m))
for k, v := range m {
r[k] = v
}
return r
}
)
type ObjectInput struct {
objectName string
partLengths []int64
metaData map[string]string
}
// set of inputs for uploading the objects before tests for
// downloading is done. Data bytes are from DummyDataGen.
objectInputs := []ObjectInput{
// // cases 0-4: small single part objects
{"nothing", []int64{0}, make(map[string]string)},
{"1byte", []int64{1}, make(map[string]string)},
{"small-0", []int64{11}, make(map[string]string)},
{"small-1", []int64{509}, make(map[string]string)},
{"small-2", []int64{5 * oneMiB}, make(map[string]string)},
// // // cases 5-8: multipart part objects
{"mp-0", []int64{5 * oneMiB, 1}, make(map[string]string)},
{"mp-1", []int64{5*oneMiB + 1, 1}, make(map[string]string)},
{"mp-2", []int64{5487701, 5487799, 3}, make(map[string]string)},
{"mp-3", []int64{10499807, 10499963, 7}, make(map[string]string)},
// cases 9-12: small single part objects with encryption
{"enc-nothing", []int64{0}, mapCopy(metaWithSSEC)},
{"enc-small-0", []int64{11}, mapCopy(metaWithSSEC)},
{"enc-small-1", []int64{509}, mapCopy(metaWithSSEC)},
{"enc-small-2", []int64{5 * oneMiB}, mapCopy(metaWithSSEC)},
// cases 13-16: multipart part objects with encryption
{"enc-mp-0", []int64{5 * oneMiB, 1}, mapCopy(metaWithSSEC)},
{"enc-mp-1", []int64{5*oneMiB + 1, 1}, mapCopy(metaWithSSEC)},
{"enc-mp-2", []int64{5487701, 5487799, 3}, mapCopy(metaWithSSEC)},
{"enc-mp-3", []int64{10499807, 10499963, 7}, mapCopy(metaWithSSEC)},
}
// iterate through the above set of inputs and upload the object.
for _, input := range objectInputs {
uploadTestObject(t, apiRouter, credentials, bucketName, input.objectName, input.partLengths, input.metaData, false)
}
mkGetReqWithPartNumber := func(oindex int, oi ObjectInput, partNumber int) {
object := oi.objectName
rec := httptest.NewRecorder()
queries := url.Values{}
queries.Add("partNumber", strconv.Itoa(partNumber))
targetURL := makeTestTargetURL("", bucketName, object, queries)
req, err := newTestSignedRequestV4(http.MethodGet, targetURL,
0, nil, credentials.AccessKey, credentials.SecretKey, oi.metaData)
if err != nil {
t.Fatalf("Object: %s Object Index %d PartNumber: %d: Failed to create HTTP request for Get Object: <ERROR> %v",
object, oindex, partNumber, err)
}
apiRouter.ServeHTTP(rec, req)
// Check response code (we make only valid requests in this test)
if rec.Code != http.StatusPartialContent && rec.Code != http.StatusOK {
bd, err1 := ioutil.ReadAll(rec.Body)
t.Fatalf("%s Object: %s ObjectIndex %d PartNumber: %d: Got response status `%d` and body: %s,%v",
instanceType, object, oindex, partNumber, rec.Code, string(bd), err1)
}
oinfo, err := obj.GetObjectInfo(context.Background(), bucketName, object, ObjectOptions{})
if err != nil {
t.Fatalf("Object: %s Object Index %d: Unexpected err: %v", object, oindex, err)
}
rs := partNumberToRangeSpec(oinfo, partNumber)
off, length, err := rs.GetOffsetLength(oinfo.Size)
if err != nil {
t.Fatalf("Object: %s Object Index %d: Unexpected err: %v", object, oindex, err)
}
readers := []io.Reader{}
cumulativeSum := int64(0)
for _, p := range oi.partLengths {
readers = append(readers, NewDummyDataGen(p, cumulativeSum))
cumulativeSum += p
}
refReader := io.LimitReader(ioutilx.NewSkipReader(io.MultiReader(readers...), off), length)
if ok, msg := cmpReaders(refReader, rec.Body); !ok {
t.Fatalf("(%s) Object: %s ObjectIndex %d PartNumber: %d --> data mismatch! (msg: %s)", instanceType, oi.objectName, oindex, partNumber, msg)
}
}
for idx, oi := range objectInputs {
for partNum := 1; partNum <= len(oi.partLengths); partNum++ {
mkGetReqWithPartNumber(idx, oi, partNum)
}
}
}
// Wrapper for calling PutObject API handler tests using streaming signature v4 for both Erasure multiple disks and FS single drive setup.
func TestAPIPutObjectStreamSigV4Handler(t *testing.T) {
defer DetectTestLeak(t)()

Loading…
Cancel
Save