fs: Fix GetObject failure to read large blocks. (#1982)

Add relevant test cases as well for verifying this
part of the codebase.

Fixes #1979
master
karthic rao 9 years ago committed by Harshavardhana
parent cb1200a66d
commit ed2fdd90b0
  1. 11
      erasure-utils.go
  2. 10
      fs-v1-multipart.go
  3. 51
      fs-v1.go
  4. 3
      object-common.go
  5. 255
      server_test.go

@ -50,7 +50,7 @@ func newHash(algo string) hash.Hash {
// hashSum calculates the hash of the entire path and returns. // hashSum calculates the hash of the entire path and returns.
func hashSum(disk StorageAPI, volume, path string, writer hash.Hash) ([]byte, error) { func hashSum(disk StorageAPI, volume, path string, writer hash.Hash) ([]byte, error) {
// Allocate staging buffer of 128KiB for copyBuffer. // Allocate staging buffer of 128KiB for copyBuffer.
buf := make([]byte, 128*1024) buf := make([]byte, readSizeV1)
// Copy entire buffer to writer. // Copy entire buffer to writer.
if err := copyBuffer(writer, disk, volume, path, buf); err != nil { if err := copyBuffer(writer, disk, volume, path, buf); err != nil {
@ -153,11 +153,15 @@ func getEncodedBlockLen(inputLen int64, dataBlocks int) (curEncBlockSize int64)
// the read at. copyN returns io.EOF if there aren't enough data to be read. // the read at. copyN returns io.EOF if there aren't enough data to be read.
func copyN(writer io.Writer, disk StorageAPI, volume string, path string, offset int64, length int64) (err error) { func copyN(writer io.Writer, disk StorageAPI, volume string, path string, offset int64, length int64) (err error) {
// Use 128KiB staging buffer to read upto length. // Use 128KiB staging buffer to read upto length.
buf := make([]byte, 128*1024) buf := make([]byte, readSizeV1)
// Read into writer until length. // Read into writer until length.
for length > 0 { for length > 0 {
nr, er := disk.ReadFile(volume, path, offset, buf) curLength := int64(readSizeV1)
if length < readSizeV1 {
curLength = length
}
nr, er := disk.ReadFile(volume, path, offset, buf[:curLength])
if nr > 0 { if nr > 0 {
nw, ew := writer.Write(buf[0:nr]) nw, ew := writer.Write(buf[0:nr])
if nw > 0 { if nw > 0 {
@ -181,6 +185,7 @@ func copyN(writer io.Writer, disk StorageAPI, volume string, path string, offset
} }
if er != nil { if er != nil {
err = er err = er
break
} }
} }

@ -490,8 +490,8 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
tempObj := path.Join(tmpMetaPrefix, uploadID, "object1") tempObj := path.Join(tmpMetaPrefix, uploadID, "object1")
// Allocate 32KiB buffer for staging buffer. // Allocate 128KiB of staging buffer.
var buf = make([]byte, 128*1024) var buf = make([]byte, readSizeV1)
// Loop through all parts, validate them and then commit to disk. // Loop through all parts, validate them and then commit to disk.
for i, part := range parts { for i, part := range parts {
@ -512,8 +512,12 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
offset := int64(0) offset := int64(0)
totalLeft := fsMeta.Parts[partIdx].Size totalLeft := fsMeta.Parts[partIdx].Size
for totalLeft > 0 { for totalLeft > 0 {
curLeft := int64(readSizeV1)
if totalLeft < readSizeV1 {
curLeft = totalLeft
}
var n int64 var n int64
n, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, offset, buf) n, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, offset, buf[:curLeft])
if n > 0 { if n > 0 {
if err = fs.storage.AppendFile(minioMetaBucket, tempObj, buf[:n]); err != nil { if err = fs.storage.AppendFile(minioMetaBucket, tempObj, buf[:n]); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempObj) return "", toObjectErr(err, minioMetaBucket, tempObj)

@ -200,7 +200,7 @@ func (fs fsObjects) DeleteBucket(bucket string) error {
/// Object Operations /// Object Operations
// GetObject - get an object. // GetObject - get an object.
func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer) error { func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer) (err error) {
// Verify if bucket is valid. // Verify if bucket is valid.
if !IsValidBucketName(bucket) { if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket} return BucketNameInvalid{Bucket: bucket}
@ -210,29 +210,44 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64,
return ObjectNameInvalid{Bucket: bucket, Object: object} return ObjectNameInvalid{Bucket: bucket, Object: object}
} }
var totalLeft = length var totalLeft = length
buf := make([]byte, 32*1024) // Allocate a 32KiB staging buffer. buf := make([]byte, readSizeV1) // Allocate a 128KiB staging buffer.
for totalLeft > 0 { for totalLeft > 0 {
// Figure out the right size for the buffer. // Figure out the right size for the buffer.
var curSize int64 curLeft := int64(readSizeV1)
if blockSizeV1 < totalLeft { if totalLeft < readSizeV1 {
curSize = blockSizeV1 curLeft = totalLeft
} else {
curSize = totalLeft
} }
// Reads the file at offset. // Reads the file at offset.
n, err := fs.storage.ReadFile(bucket, object, offset, buf[:curSize]) nr, er := fs.storage.ReadFile(bucket, object, offset, buf[:curLeft])
if err != nil { if nr > 0 {
return toObjectErr(err, bucket, object) // Write to response writer.
nw, ew := writer.Write(buf[0:nr])
if nw > 0 {
// Decrement whats left to write.
totalLeft -= int64(nw)
// Progress the offset
offset += int64(nw)
}
if ew != nil {
err = ew
break
}
if nr != int64(nw) {
err = io.ErrShortWrite
break
}
} }
// Write to response writer. if er == io.EOF || er == io.ErrUnexpectedEOF {
m, err := writer.Write(buf[:n]) break
if err != nil {
return toObjectErr(err, bucket, object)
} }
totalLeft -= int64(m) if er != nil {
offset += int64(m) err = er
} // Success. break
return nil }
}
// Returns any error.
return toObjectErr(err, bucket, object)
} }
// GetObjectInfo - get object info. // GetObjectInfo - get object info.

@ -27,6 +27,9 @@ import (
const ( const (
// Block size used for all internal operations version 1. // Block size used for all internal operations version 1.
blockSizeV1 = 10 * 1024 * 1024 // 10MiB. blockSizeV1 = 10 * 1024 * 1024 // 10MiB.
// Staging buffer read size for all internal operations version 1.
readSizeV1 = 128 * 1024 // 128KiB.
) )
// Register callback functions that needs to be called when process shutsdown. // Register callback functions that needs to be called when process shutsdown.

@ -20,9 +20,11 @@ import (
"bytes" "bytes"
"crypto/md5" "crypto/md5"
"encoding/base64" "encoding/base64"
"encoding/hex"
"encoding/xml" "encoding/xml"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math/rand"
"net/http" "net/http"
"strings" "strings"
"sync" "sync"
@ -895,6 +897,259 @@ func (s *MyAPISuite) TestPutBucketErrors(c *C) {
verifyError(c, response, "NotImplemented", "A header you provided implies functionality that is not implemented.", http.StatusNotImplemented) verifyError(c, response, "NotImplemented", "A header you provided implies functionality that is not implemented.", http.StatusNotImplemented)
} }
func (s *MyAPISuite) TestGetObjectLarge10MiB(c *C) {
// Make bucket for this test.
request, err := newTestRequest("PUT", s.testServer.Server.URL+"/test-bucket-10",
0, nil, s.testServer.AccessKey, s.testServer.SecretKey)
c.Assert(err, IsNil)
client := http.Client{}
response, err := client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusOK)
var buffer bytes.Buffer
line := "1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,123"
// Create 10MiB content where each line contains 1024 characters.
for i := 0; i < 10*1024; i++ {
buffer.WriteString(fmt.Sprintf("[%05d] %s\n", i, line))
}
putContent := buffer.String()
// Put object
buf := bytes.NewReader([]byte(putContent))
request, err = newTestRequest("PUT", s.testServer.Server.URL+"/test-bucket-10/big-file-10",
int64(buf.Len()), buf, s.testServer.AccessKey, s.testServer.SecretKey)
c.Assert(err, IsNil)
client = http.Client{}
response, err = client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusOK)
// Get object
request, err = newTestRequest("GET", s.testServer.Server.URL+"/test-bucket-10/big-file-10",
0, nil, s.testServer.AccessKey, s.testServer.SecretKey)
c.Assert(err, IsNil)
client = http.Client{}
response, err = client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusOK)
getContent, err := ioutil.ReadAll(response.Body)
c.Assert(err, IsNil)
// Compare putContent and getContent
c.Assert(string(getContent), Equals, putContent)
}
func (s *MyAPISuite) TestGetObjectLarge11MiB(c *C) {
// Make bucket for this test.
request, err := newTestRequest("PUT", s.testServer.Server.URL+"/test-bucket-11",
0, nil, s.testServer.AccessKey, s.testServer.SecretKey)
c.Assert(err, IsNil)
client := http.Client{}
response, err := client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusOK)
var buffer bytes.Buffer
line := "1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,123"
// Create 11MiB content where each line contains 1024 characters.
for i := 0; i < 11*1024; i++ {
buffer.WriteString(fmt.Sprintf("[%05d] %s\n", i, line))
}
putMD5 := sumMD5(buffer.Bytes())
// Put object
buf := bytes.NewReader(buffer.Bytes())
request, err = newTestRequest("PUT", s.testServer.Server.URL+"/test-bucket-11/big-file-11",
int64(buf.Len()), buf, s.testServer.AccessKey, s.testServer.SecretKey)
c.Assert(err, IsNil)
client = http.Client{}
response, err = client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusOK)
// Get object
request, err = newTestRequest("GET", s.testServer.Server.URL+"/test-bucket-11/big-file-11",
0, nil, s.testServer.AccessKey, s.testServer.SecretKey)
c.Assert(err, IsNil)
client = http.Client{}
response, err = client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusOK)
getContent, err := ioutil.ReadAll(response.Body)
c.Assert(err, IsNil)
getMD5 := sumMD5(getContent) // Get md5.
// Compare putContent and getContent
c.Assert(hex.EncodeToString(putMD5), Equals, hex.EncodeToString(getMD5))
}
// TestGetPartialObjectMisAligned - tests get object partially mis-aligned.
func (s *MyAPISuite) TestGetPartialObjectMisAligned(c *C) {
// Make bucket for this test.
request, err := newTestRequest("PUT", s.testServer.Server.URL+"/test-bucket-align",
0, nil, s.testServer.AccessKey, s.testServer.SecretKey)
c.Assert(err, IsNil)
client := http.Client{}
response, err := client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusOK)
var buffer bytes.Buffer
line := "1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,123"
rand.Seed(time.Now().UTC().UnixNano())
// Create a misalgined data.
for i := 0; i < 13*rand.Intn(1<<16); i++ {
buffer.WriteString(fmt.Sprintf("[%05d] %s\n", i, line[:rand.Intn(1<<8)]))
}
putContent := buffer.String()
// Put object
buf := bytes.NewReader([]byte(putContent))
request, err = newTestRequest("PUT", s.testServer.Server.URL+"/test-bucket-align/big-file-13",
int64(buf.Len()), buf, s.testServer.AccessKey, s.testServer.SecretKey)
c.Assert(err, IsNil)
client = http.Client{}
response, err = client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusOK)
// Prepare request
var testCases = []struct {
byteRange string
expectedString string
}{
{"10-11", putContent[10:12]},
{"1-", putContent[1:]},
{"6-", putContent[6:]},
{"-2", putContent[len(putContent)-2:]},
{"-7", putContent[len(putContent)-7:]},
}
for _, t := range testCases {
// Get object
request, err = newTestRequest("GET", s.testServer.Server.URL+"/test-bucket-align/big-file-13",
0, nil, s.testServer.AccessKey, s.testServer.SecretKey)
c.Assert(err, IsNil)
// Get a different byte range.
request.Header.Add("Range", "bytes="+t.byteRange)
client = http.Client{}
response, err = client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusPartialContent)
getContent, err := ioutil.ReadAll(response.Body)
c.Assert(err, IsNil)
// Compare putContent and getContent
c.Assert(string(getContent), Equals, t.expectedString)
}
}
func (s *MyAPISuite) TestGetPartialObjectLarge11MiB(c *C) {
// Make bucket for this test.
request, err := newTestRequest("PUT", s.testServer.Server.URL+"/test-bucket-11p",
0, nil, s.testServer.AccessKey, s.testServer.SecretKey)
c.Assert(err, IsNil)
client := http.Client{}
response, err := client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusOK)
var buffer bytes.Buffer
line := "1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,123"
// Create 11MiB content where each line contains 1024
// characters.
for i := 0; i < 11*1024; i++ {
buffer.WriteString(fmt.Sprintf("[%05d] %s\n", i, line))
}
putContent := buffer.String()
// Put object
buf := bytes.NewReader([]byte(putContent))
request, err = newTestRequest("PUT", s.testServer.Server.URL+"/test-bucket-11p/big-file-11",
int64(buf.Len()), buf, s.testServer.AccessKey, s.testServer.SecretKey)
c.Assert(err, IsNil)
client = http.Client{}
response, err = client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusOK)
// Get object
request, err = newTestRequest("GET", s.testServer.Server.URL+"/test-bucket-11p/big-file-11",
0, nil, s.testServer.AccessKey, s.testServer.SecretKey)
c.Assert(err, IsNil)
// This range spans into first two blocks.
request.Header.Add("Range", "bytes=10485750-10485769")
client = http.Client{}
response, err = client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusPartialContent)
getContent, err := ioutil.ReadAll(response.Body)
c.Assert(err, IsNil)
// Compare putContent and getContent
c.Assert(string(getContent), Equals, putContent[10485750:10485770])
}
func (s *MyAPISuite) TestGetPartialObjectLarge10MiB(c *C) {
// Make bucket for this test.
request, err := newTestRequest("PUT", s.testServer.Server.URL+"/test-bucket-10p",
0, nil, s.testServer.AccessKey, s.testServer.SecretKey)
c.Assert(err, IsNil)
client := http.Client{}
response, err := client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusOK)
var buffer bytes.Buffer
line := "1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,123"
// Create 10MiB content where each line contains 1024 characters.
for i := 0; i < 10*1024; i++ {
buffer.WriteString(fmt.Sprintf("[%05d] %s\n", i, line))
}
putContent := buffer.String()
// Put object
buf := bytes.NewReader([]byte(putContent))
request, err = newTestRequest("PUT", s.testServer.Server.URL+"/test-bucket-10p/big-file-10",
int64(buf.Len()), buf, s.testServer.AccessKey, s.testServer.SecretKey)
c.Assert(err, IsNil)
client = http.Client{}
response, err = client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusOK)
// Get object
request, err = newTestRequest("GET", s.testServer.Server.URL+"/test-bucket-10p/big-file-10",
0, nil, s.testServer.AccessKey, s.testServer.SecretKey)
c.Assert(err, IsNil)
request.Header.Add("Range", "bytes=2048-2058")
client = http.Client{}
response, err = client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusPartialContent)
getContent, err := ioutil.ReadAll(response.Body)
c.Assert(err, IsNil)
// Compare putContent and getContent
c.Assert(string(getContent), Equals, putContent[2048:2059])
}
func (s *MyAPISuite) TestGetObjectErrors(c *C) { func (s *MyAPISuite) TestGetObjectErrors(c *C) {
request, err := newTestRequest("GET", s.testServer.Server.URL+"/getobjecterrors", request, err := newTestRequest("GET", s.testServer.Server.URL+"/getobjecterrors",
0, nil, s.testServer.AccessKey, s.testServer.SecretKey) 0, nil, s.testServer.AccessKey, s.testServer.SecretKey)

Loading…
Cancel
Save