From 5cc9e4e214583eeebe6d4f62ac134f750a6decfc Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Mon, 18 Jul 2016 19:06:48 -0700 Subject: [PATCH] fs/XL: Return IncompleteBody{} error for short writes (#2228) --- erasure-createfile.go | 8 +++---- fs-createfile.go | 41 ++++++++++++++++++++++++++++++++++++ fs-v1-multipart.go | 34 +++++++++++------------------- fs-v1.go | 30 +++++++++++--------------- object-api-multipart_test.go | 2 +- object-api-putobject_test.go | 2 +- xl-v1-multipart.go | 8 +++++++ xl-v1-object.go | 14 +++++++++++- xl-v1-object_test.go | 2 +- 9 files changed, 93 insertions(+), 48 deletions(-) create mode 100644 fs-createfile.go diff --git a/erasure-createfile.go b/erasure-createfile.go index 8f0319b2d..cfb53fa61 100644 --- a/erasure-createfile.go +++ b/erasure-createfile.go @@ -28,7 +28,7 @@ import ( // erasureCreateFile - writes an entire stream by erasure coding to // all the disks, writes also calculate individual block's checksum // for future bit-rot protection. -func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader, blockSize int64, dataBlocks int, parityBlocks int, writeQuorum int) (size int64, checkSums []string, err error) { +func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader, blockSize int64, dataBlocks int, parityBlocks int, writeQuorum int) (bytesWritten int64, checkSums []string, err error) { // Allocated blockSized buffer for reading. buf := make([]byte, blockSize) @@ -47,7 +47,7 @@ func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader // We have reached EOF on the first byte read, io.Reader // must be 0bytes, we don't need to erasure code // data. Will create a 0byte file instead. - if size == 0 { + if bytesWritten == 0 { blocks = make([][]byte, len(disks)) rErr = appendFile(disks, volume, path, blocks, hashWriters, writeQuorum) if rErr != nil { @@ -72,7 +72,7 @@ func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader if err = appendFile(disks, volume, path, blocks, hashWriters, writeQuorum); err != nil { return 0, nil, err } - size += int64(n) + bytesWritten += int64(n) } } @@ -80,7 +80,7 @@ func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader for i := range checkSums { checkSums[i] = hex.EncodeToString(hashWriters[i].Sum(nil)) } - return size, checkSums, nil + return bytesWritten, checkSums, nil } // encodeData - encodes incoming data buffer into diff --git a/fs-createfile.go b/fs-createfile.go new file mode 100644 index 000000000..baac70375 --- /dev/null +++ b/fs-createfile.go @@ -0,0 +1,41 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import "io" + +func fsCreateFile(disk StorageAPI, reader io.Reader, buf []byte, tmpBucket, tempObj string) (int64, error) { + bytesWritten := int64(0) + // Read the buffer till io.EOF and append the read data to the temporary file. + for { + n, rErr := reader.Read(buf) + if rErr != nil && rErr != io.EOF { + return 0, rErr + } + bytesWritten += int64(n) + if n > 0 { + wErr := disk.AppendFile(tmpBucket, tempObj, buf[0:n]) + if wErr != nil { + return 0, wErr + } + } + if rErr == io.EOF { + break + } + } + return bytesWritten, nil +} diff --git a/fs-v1-multipart.go b/fs-v1-multipart.go index 4b39d715b..51c70b6d5 100644 --- a/fs-v1-multipart.go +++ b/fs-v1-multipart.go @@ -314,32 +314,22 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s limitDataReader = data } - // Allocate buffer for staging buffer. + teeReader := io.TeeReader(limitDataReader, md5Writer) bufSize := int64(readSizeV1) if size > 0 && bufSize > size { bufSize = size } - var buf = make([]byte, int(bufSize)) - - // Read up to required size - totalLeft := size - for { - n, err := io.ReadFull(limitDataReader, buf) - if err == io.EOF { - break - } - if err != nil && err != io.ErrUnexpectedEOF { - return "", toObjectErr(err, bucket, object) - } - // Update md5 writer. - md5Writer.Write(buf[:n]) - if err = fs.storage.AppendFile(minioMetaBucket, tmpPartPath, buf[:n]); err != nil { - return "", toObjectErr(err, bucket, object) - } - - if totalLeft -= int64(n); size >= 0 && totalLeft <= 0 { - break - } + buf := make([]byte, int(bufSize)) + bytesWritten, cErr := fsCreateFile(fs.storage, teeReader, buf, minioMetaBucket, tmpPartPath) + if cErr != nil { + fs.storage.DeleteFile(minioMetaBucket, tmpPartPath) + return "", toObjectErr(cErr, minioMetaBucket, tmpPartPath) + } + // Should return IncompleteBody{} error when reader has fewer + // bytes than specified in request header. + if bytesWritten < size { + fs.storage.DeleteFile(minioMetaBucket, tmpPartPath) + return "", IncompleteBody{} } // Validate if payload is valid. diff --git a/fs-v1.go b/fs-v1.go index 3ea47f3d2..b1f32cf47 100644 --- a/fs-v1.go +++ b/fs-v1.go @@ -369,30 +369,24 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io. return "", toObjectErr(err, bucket, object) } } else { - // Allocate a buffer to Read() the object upload stream. + // Allocate a buffer to Read() from request body bufSize := int64(readSizeV1) if size > 0 && bufSize > size { bufSize = size } buf := make([]byte, int(bufSize)) + teeReader := io.TeeReader(limitDataReader, md5Writer) + bytesWritten, err := fsCreateFile(fs.storage, teeReader, buf, minioMetaBucket, tempObj) + if err != nil { + fs.storage.DeleteFile(minioMetaBucket, tempObj) + return "", toObjectErr(err, bucket, object) + } - // Read the buffer till io.EOF and append the read data to the temporary file. - for { - n, rErr := limitDataReader.Read(buf) - if rErr != nil && rErr != io.EOF { - return "", toObjectErr(rErr, bucket, object) - } - if n > 0 { - // Update md5 writer. - md5Writer.Write(buf[0:n]) - wErr := fs.storage.AppendFile(minioMetaBucket, tempObj, buf[0:n]) - if wErr != nil { - return "", toObjectErr(wErr, bucket, object) - } - } - if rErr == io.EOF { - break - } + // Should return IncompleteBody{} error when reader has fewer + // bytes than specified in request header. + if bytesWritten < size { + fs.storage.DeleteFile(minioMetaBucket, tempObj) + return "", IncompleteBody{} } } diff --git a/object-api-multipart_test.go b/object-api-multipart_test.go index 1d8dfeb06..671f8999a 100644 --- a/object-api-multipart_test.go +++ b/object-api-multipart_test.go @@ -265,7 +265,7 @@ func testObjectAPIPutObjectPart(obj ObjectLayer, instanceType string, t TestErrH // Test case - 13. // Input with size more than the size of actual data inside the reader. {bucket, object, uploadID, 1, "abcd", "a35", int64(len("abcd") + 1), false, "", - fmt.Errorf("%s", "Bad digest: Expected a35 is not valid with what we calculated e2fc714c4727ee9395f324cd2e7f331f")}, + IncompleteBody{}}, // Test case - 14. // Input with size less than the size of actual data inside the reader. {bucket, object, uploadID, 1, "abcd", "a35", int64(len("abcd") - 1), false, "", diff --git a/object-api-putobject_test.go b/object-api-putobject_test.go index 4f7fcd691..b3e384aba 100644 --- a/object-api-putobject_test.go +++ b/object-api-putobject_test.go @@ -84,7 +84,7 @@ func testObjectAPIPutObject(obj ObjectLayer, instanceType string, t TestErrHandl // Test case - 8. // Input with size more than the size of actual data inside the reader. {bucket, object, "abcd", map[string]string{"md5Sum": "a35"}, int64(len("abcd") + 1), false, "", - fmt.Errorf("%s", "Bad digest: Expected a35 is not valid with what we calculated e2fc714c4727ee9395f324cd2e7f331f")}, + IncompleteBody{}}, // Test case - 9. // Input with size less than the size of actual data inside the reader. {bucket, object, "abcd", map[string]string{"md5Sum": "a35"}, int64(len("abcd") - 1), false, "", diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index 0039129d9..1ce7acf17 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -385,6 +385,14 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Erasure code data and write across all disks. sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, teeReader, xlMeta.Erasure.BlockSize, xl.dataBlocks, xl.parityBlocks, xl.writeQuorum) + if err != nil { + return "", toObjectErr(err, minioMetaBucket, tmpPartPath) + } + // Should return IncompleteBody{} error when reader has fewer bytes + // than specified in request header. + if sizeWritten < size { + return "", IncompleteBody{} + } // For size == -1, perhaps client is sending in chunked encoding // set the size as size that was actually written. diff --git a/xl-v1-object.go b/xl-v1-object.go index d02883205..ae0932516 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -117,7 +117,8 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i // Object cache enabled block. if xlMeta.Stat.Size > 0 && xl.objCacheEnabled { // Validate if we have previous cache. - cachedBuffer, err := xl.objCache.Open(path.Join(bucket, object)) + var cachedBuffer io.ReadSeeker + cachedBuffer, err = xl.objCache.Open(path.Join(bucket, object)) if err == nil { // Cache hit. // Advance the buffer to offset as if it was read. if _, err = cachedBuffer.Seek(startOffset, 0); err != nil { // Seek to the offset. @@ -435,8 +436,17 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. // Erasure code data and write across all disks. sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tempErasureObj, teeReader, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, xl.writeQuorum) if err != nil { + // Create file failed, delete temporary object. + xl.deleteObject(minioMetaTmpBucket, tempObj) return "", toObjectErr(err, minioMetaBucket, tempErasureObj) } + // Should return IncompleteBody{} error when reader has fewer bytes + // than specified in request header. + if sizeWritten < size { + // Short write, delete temporary object. + xl.deleteObject(minioMetaTmpBucket, tempObj) + return "", IncompleteBody{} + } // For size == -1, perhaps client is sending in chunked encoding // set the size as size that was actually written. @@ -490,6 +500,8 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. // Check if an object is present as one of the parent dir. // -- FIXME. (needs a new kind of lock). if xl.parentDirIsObject(bucket, path.Dir(object)) { + // Parent (in the namespace) is an object, delete temporary object. + xl.deleteObject(minioMetaTmpBucket, tempObj) return "", toObjectErr(errFileAccessDenied, bucket, object) } diff --git a/xl-v1-object_test.go b/xl-v1-object_test.go index 2576230fd..90a008ba5 100644 --- a/xl-v1-object_test.go +++ b/xl-v1-object_test.go @@ -45,7 +45,7 @@ func TestRepeatPutObjectPart(t *testing.T) { if err != nil { t.Fatal(err) } - fiveMBBytes := bytes.Repeat([]byte("a"), 5*1024*124) + fiveMBBytes := bytes.Repeat([]byte("a"), 5*1024*1024) md5Writer := md5.New() md5Writer.Write(fiveMBBytes) md5Hex := hex.EncodeToString(md5Writer.Sum(nil))