XL/erasureCreate: Create a limit reader if size is specified. (#2059)

This is needed so that we only write data which was requested
for, using a limit reader avoids spurious reads on the incoming
client data. Additionally using limit reader provides server
safety from rogue clients sending copious amounts of data (for
example a denial of service attack).

This patch also caters for size == -1 when content encoding from
a client is set as chunked, we happily read till io.EOF
master
Harshavardhana 9 years ago committed by GitHub
parent 3f2b4d9dc2
commit 734e779b19
  1. 27
      erasure-createfile.go
  2. 15
      xl-v1-multipart.go
  3. 15
      xl-v1-object.go

@ -38,39 +38,42 @@ func erasureCreateFile(disks []StorageAPI, volume string, path string, partName
// Read until io.EOF, erasure codes data and writes to all disks. // Read until io.EOF, erasure codes data and writes to all disks.
for { for {
var n int
var blocks [][]byte var blocks [][]byte
n, err = io.ReadFull(data, buf) n, rErr := io.ReadFull(data, buf)
if err == io.EOF { // FIXME: this is a bug in Golang, n == 0 and err ==
// io.ErrUnexpectedEOF for io.ReadFull function.
if n == 0 && rErr == io.ErrUnexpectedEOF {
return nil, 0, rErr
}
if rErr == io.EOF {
// We have reached EOF on the first byte read, io.Reader // We have reached EOF on the first byte read, io.Reader
// must be 0bytes, we don't need to erasure code // must be 0bytes, we don't need to erasure code
// data. Will create a 0byte file instead. // data. Will create a 0byte file instead.
if size == 0 { if size == 0 {
blocks = make([][]byte, len(disks)) blocks = make([][]byte, len(disks))
err = appendFile(disks, volume, path, blocks, eInfo.Distribution, hashWriters, writeQuorum) rErr = appendFile(disks, volume, path, blocks, eInfo.Distribution, hashWriters, writeQuorum)
if err != nil { if rErr != nil {
return nil, 0, err return nil, 0, rErr
} }
} // else we have reached EOF after few reads, no need to } // else we have reached EOF after few reads, no need to
// add an additional 0bytes at the end. // add an additional 0bytes at the end.
break break
} }
if err != nil && err != io.ErrUnexpectedEOF { if rErr != nil && rErr != io.ErrUnexpectedEOF {
return nil, 0, err return nil, 0, rErr
} }
size += int64(n)
// Returns encoded blocks. // Returns encoded blocks.
var enErr error var enErr error
blocks, enErr = encodeData(buf[:n], eInfo.DataBlocks, eInfo.ParityBlocks) blocks, enErr = encodeData(buf[0:n], eInfo.DataBlocks, eInfo.ParityBlocks)
if enErr != nil { if enErr != nil {
return nil, 0, enErr return nil, 0, enErr
} }
// Write to all disks. // Write to all disks.
err = appendFile(disks, volume, path, blocks, eInfo.Distribution, hashWriters, writeQuorum) if err = appendFile(disks, volume, path, blocks, eInfo.Distribution, hashWriters, writeQuorum); err != nil {
if err != nil {
return nil, 0, err return nil, 0, err
} }
size += int64(n)
} }
// Save the checksums. // Save the checksums.

@ -340,6 +340,13 @@ func (xl xlObjects) putObjectPart(bucket string, object string, uploadID string,
// Initialize md5 writer. // Initialize md5 writer.
md5Writer := md5.New() md5Writer := md5.New()
// Limit the reader to its provided size > 0.
if size > 0 {
// This is done so that we can avoid erroneous clients sending
// more data than the set content size.
data = io.LimitReader(data, size+1)
} // else we read till EOF.
// Construct a tee reader for md5sum. // Construct a tee reader for md5sum.
teeReader := io.TeeReader(data, md5Writer) teeReader := io.TeeReader(data, md5Writer)
@ -350,13 +357,17 @@ func (xl xlObjects) putObjectPart(bucket string, object string, uploadID string,
} }
// Erasure code data and write across all disks. // Erasure code data and write across all disks.
newEInfos, n, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, partSuffix, teeReader, eInfos, xl.writeQuorum) newEInfos, sizeWritten, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, partSuffix, teeReader, eInfos, xl.writeQuorum)
if err != nil { if err != nil {
return "", toObjectErr(err, minioMetaBucket, tmpPartPath) return "", toObjectErr(err, minioMetaBucket, tmpPartPath)
} }
// For size == -1, perhaps client is sending in chunked encoding
// set the size as size that was actually written.
if size == -1 { if size == -1 {
size = n size = sizeWritten
} }
// Calculate new md5sum. // Calculate new md5sum.
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil)) newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
if md5Hex != "" { if md5Hex != "" {

@ -332,6 +332,13 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// Initialize md5 writer. // Initialize md5 writer.
md5Writer := md5.New() md5Writer := md5.New()
// Limit the reader to its provided size if specified.
if size > 0 {
// This is done so that we can avoid erroneous clients sending
// more data than the set content size.
data = io.LimitReader(data, size+1)
} // else we read till EOF.
// Tee reader combines incoming data stream and md5, data read // Tee reader combines incoming data stream and md5, data read
// from input stream is written to md5. // from input stream is written to md5.
teeReader := io.TeeReader(data, md5Writer) teeReader := io.TeeReader(data, md5Writer)
@ -343,13 +350,17 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
} }
// Erasure code and write across all disks. // Erasure code and write across all disks.
newEInfos, n, err := erasureCreateFile(onlineDisks, minioMetaBucket, tempErasureObj, "part.1", teeReader, eInfos, xl.writeQuorum) newEInfos, sizeWritten, err := erasureCreateFile(onlineDisks, minioMetaBucket, tempErasureObj, "part.1", teeReader, eInfos, xl.writeQuorum)
if err != nil { if err != nil {
return "", toObjectErr(err, minioMetaBucket, tempErasureObj) return "", toObjectErr(err, minioMetaBucket, tempErasureObj)
} }
// For size == -1, perhaps client is sending in chunked encoding
// set the size as size that was actually written.
if size == -1 { if size == -1 {
size = n size = sizeWritten
} }
// Save additional erasureMetadata. // Save additional erasureMetadata.
modTime := time.Now().UTC() modTime := time.Now().UTC()

Loading…
Cancel
Save