diff --git a/cmd/api-errors.go b/cmd/api-errors.go index 8021b8098..ed7529560 100644 --- a/cmd/api-errors.go +++ b/cmd/api-errors.go @@ -606,6 +606,11 @@ func toAPIErrorCode(err error) (apiErr APIErrorCode) { apiErr = ErrSignatureDoesNotMatch case errContentSHA256Mismatch: apiErr = ErrContentSHA256Mismatch + case errDataTooLarge: + apiErr = ErrEntityTooLarge + case errDataTooSmall: + apiErr = ErrEntityTooSmall + } if apiErr != ErrNone { diff --git a/cmd/erasure-createfile.go b/cmd/erasure-createfile.go index 18797ba5f..d20ca8060 100644 --- a/cmd/erasure-createfile.go +++ b/cmd/erasure-createfile.go @@ -28,7 +28,7 @@ import ( // erasureCreateFile - writes an entire stream by erasure coding to // all the disks, writes also calculate individual block's checksum // for future bit-rot protection. -func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader, blockSize int64, dataBlocks int, parityBlocks int, algo string, writeQuorum int) (bytesWritten int64, checkSums []string, err error) { +func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader, allowEmpty bool, blockSize int64, dataBlocks int, parityBlocks int, algo string, writeQuorum int) (bytesWritten int64, checkSums []string, err error) { // Allocated blockSized buffer for reading from incoming stream. buf := make([]byte, blockSize) @@ -47,7 +47,7 @@ func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader // We have reached EOF on the first byte read, io.Reader // must be 0bytes, we don't need to erasure code // data. Will create a 0byte file instead. - if bytesWritten == 0 { + if bytesWritten == 0 && allowEmpty { blocks = make([][]byte, len(disks)) rErr = appendFile(disks, volume, path, blocks, hashWriters, writeQuorum) if rErr != nil { diff --git a/cmd/erasure-createfile_test.go b/cmd/erasure-createfile_test.go index 2411bd432..bcc9973eb 100644 --- a/cmd/erasure-createfile_test.go +++ b/cmd/erasure-createfile_test.go @@ -56,7 +56,7 @@ func TestErasureCreateFile(t *testing.T) { t.Fatal(err) } // Test when all disks are up. - size, _, err := erasureCreateFile(disks, "testbucket", "testobject1", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) + size, _, err := erasureCreateFile(disks, "testbucket", "testobject1", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) if err != nil { t.Fatal(err) } @@ -69,7 +69,7 @@ func TestErasureCreateFile(t *testing.T) { disks[5] = AppendDiskDown{disks[5].(*posix)} // Test when two disks are down. - size, _, err = erasureCreateFile(disks, "testbucket", "testobject2", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) + size, _, err = erasureCreateFile(disks, "testbucket", "testobject2", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) if err != nil { t.Fatal(err) } @@ -83,7 +83,7 @@ func TestErasureCreateFile(t *testing.T) { disks[8] = AppendDiskDown{disks[8].(*posix)} disks[9] = AppendDiskDown{disks[9].(*posix)} - size, _, err = erasureCreateFile(disks, "testbucket", "testobject3", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) + size, _, err = erasureCreateFile(disks, "testbucket", "testobject3", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) if err != nil { t.Fatal(err) } @@ -93,7 +93,7 @@ func TestErasureCreateFile(t *testing.T) { // 1 more disk down. 7 disk down in total. Should return quorum error. disks[10] = AppendDiskDown{disks[10].(*posix)} - _, _, err = erasureCreateFile(disks, "testbucket", "testobject4", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) + _, _, err = erasureCreateFile(disks, "testbucket", "testobject4", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) if errorCause(err) != errXLWriteQuorum { t.Errorf("erasureCreateFile return value: expected errXLWriteQuorum, got %s", err) } diff --git a/cmd/erasure-healfile_test.go b/cmd/erasure-healfile_test.go index b4e644c07..0f820738c 100644 --- a/cmd/erasure-healfile_test.go +++ b/cmd/erasure-healfile_test.go @@ -48,7 +48,7 @@ func TestErasureHealFile(t *testing.T) { t.Fatal(err) } // Create a test file. - size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject1", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) + size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject1", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) if err != nil { t.Fatal(err) } diff --git a/cmd/erasure-readfile_test.go b/cmd/erasure-readfile_test.go index ab0e8f947..51071952f 100644 --- a/cmd/erasure-readfile_test.go +++ b/cmd/erasure-readfile_test.go @@ -271,7 +271,7 @@ func TestErasureReadFileDiskFail(t *testing.T) { } // Create a test file to read from. - size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) + size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) if err != nil { t.Fatal(err) } @@ -354,7 +354,7 @@ func TestErasureReadFileOffsetLength(t *testing.T) { } // Create a test file to read from. - size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) + size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) if err != nil { t.Fatal(err) } @@ -433,7 +433,7 @@ func TestErasureReadFileRandomOffsetLength(t *testing.T) { iterations := 10000 // Create a test file to read from. - size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) + size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) if err != nil { t.Fatal(err) } diff --git a/cmd/globals.go b/cmd/globals.go index 4eed8972b..f2ed99dd9 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -82,6 +82,9 @@ var ( // Caching is enabled only for RAM size > 8GiB. globalMaxCacheSize = uint64(0) + // Maximum size of internal objects parts + globalPutPartSize = int64(64 * 1024 * 1024) + // Cache expiry. globalCacheExpiry = objcache.DefaultExpiry diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 92fafc35e..10fd79f1f 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -627,8 +627,11 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s } } + // We always allow empty part. + allowEmpty := true + // Erasure code data and write across all disks. - sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaTmpBucket, tmpPartPath, teeReader, xlMeta.Erasure.BlockSize, xl.dataBlocks, xl.parityBlocks, bitRotAlgo, xl.writeQuorum) + sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaTmpBucket, tmpPartPath, teeReader, allowEmpty, xlMeta.Erasure.BlockSize, xl.dataBlocks, xl.parityBlocks, bitRotAlgo, xl.writeQuorum) if err != nil { return "", toObjectErr(err, bucket, object) } diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index a55e31306..86563d362 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -22,6 +22,7 @@ import ( "hash" "io" "path" + "strconv" "strings" "sync" "time" @@ -461,7 +462,6 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. } uniqueID := mustGetUUID() - tempErasureObj := path.Join(uniqueID, "part.1") tempObj := uniqueID // Initialize md5 writer. @@ -512,40 +512,101 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. // Tee reader combines incoming data stream and md5, data read from input stream is written to md5. teeReader := io.TeeReader(limitDataReader, mw) - // Initialize xl meta. + // Initialize parts metadata + partsMetadata := make([]xlMetaV1, len(xl.storageDisks)) + xlMeta := newXLMetaV1(object, xl.dataBlocks, xl.parityBlocks) - onlineDisks := getOrderedDisks(xlMeta.Erasure.Distribution, xl.storageDisks) + // Initialize xl meta. + for index := range partsMetadata { + partsMetadata[index] = xlMeta + } + + // Order disks according to erasure distribution + onlineDisks := getOrderedDisks(partsMetadata[0].Erasure.Distribution, xl.storageDisks) // Delete temporary object in the event of failure. // If PutObject succeeded there would be no temporary // object to delete. defer xl.deleteObject(minioMetaTmpBucket, tempObj) - if size > 0 { - for _, disk := range onlineDisks { - if disk != nil { - actualSize := xl.sizeOnDisk(size, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks) - disk.PrepareFile(minioMetaTmpBucket, tempErasureObj, actualSize) + // Total size of the written object + sizeWritten := int64(0) + + // Read data and split into parts - similar to multipart mechanism + for partIdx := 1; ; partIdx++ { + // Compute part name + partName := "part." + strconv.Itoa(partIdx) + // Compute the path of current part + tempErasureObj := path.Join(uniqueID, partName) + + // Calculate the size of the current part, if size is unknown, curPartSize wil be unknown too. + // allowEmptyPart will always be true if this is the first part and false otherwise. + curPartSize := getPartSizeFromIdx(size, globalPutPartSize, partIdx) + + // Prepare file for eventual optimization in the disk + if curPartSize > 0 { + // Calculate the real size of the part in the disk and prepare it for eventual optimization + actualSize := xl.sizeOnDisk(curPartSize, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks) + for _, disk := range onlineDisks { + if disk != nil { + disk.PrepareFile(minioMetaTmpBucket, tempErasureObj, actualSize) + } } } - } - // Erasure code data and write across all disks. - sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaTmpBucket, tempErasureObj, teeReader, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, bitRotAlgo, xl.writeQuorum) - if err != nil { - return ObjectInfo{}, toObjectErr(err, minioMetaTmpBucket, tempErasureObj) - } - // Should return IncompleteBody{} error when reader has fewer bytes - // than specified in request header. - if sizeWritten < size { - return ObjectInfo{}, traceError(IncompleteBody{}) + // partReader streams at most maximum part size + partReader := io.LimitReader(teeReader, globalPutPartSize) + + // Allow creating empty earsure file only when this is the first part. This flag is useful + // when size == -1 because in this case, we are not able to predict how many parts we will have. + allowEmptyPart := partIdx == 1 + + // Erasure code data and write across all disks. + partSizeWritten, checkSums, erasureErr := erasureCreateFile(onlineDisks, minioMetaTmpBucket, tempErasureObj, partReader, allowEmptyPart, partsMetadata[0].Erasure.BlockSize, partsMetadata[0].Erasure.DataBlocks, partsMetadata[0].Erasure.ParityBlocks, bitRotAlgo, xl.writeQuorum) + if erasureErr != nil { + return ObjectInfo{}, toObjectErr(erasureErr, minioMetaTmpBucket, tempErasureObj) + } + + // Should return IncompleteBody{} error when reader has fewer bytes + // than specified in request header. + if partSizeWritten < int64(curPartSize) { + return ObjectInfo{}, traceError(IncompleteBody{}) + } + + // Update the total written size + sizeWritten += partSizeWritten + + // If erasure stored some data in the loop or created an empty file + if partSizeWritten > 0 || allowEmptyPart { + for index := range partsMetadata { + // Add the part to xl.json. + partsMetadata[index].AddObjectPart(partIdx, partName, "", partSizeWritten) + // Add part checksum info to xl.json. + partsMetadata[index].Erasure.AddCheckSumInfo(checkSumInfo{ + Name: partName, + Hash: checkSums[index], + Algorithm: bitRotAlgo, + }) + } + } + + // If we didn't write anything or we know that the next part doesn't have any + // data to write, we should quit this loop immediately + if partSizeWritten == 0 || getPartSizeFromIdx(size, globalPutPartSize, partIdx+1) == 0 { + break + } } // For size == -1, perhaps client is sending in chunked encoding // set the size as size that was actually written. if size == -1 { size = sizeWritten + } else { + // Check if stored data satisfies what is asked + if sizeWritten < size { + return ObjectInfo{}, traceError(IncompleteBody{}) + } } // Save additional erasureMetadata. @@ -604,22 +665,11 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. } // Fill all the necessary metadata. - xlMeta.Meta = metadata - xlMeta.Stat.Size = size - xlMeta.Stat.ModTime = modTime - - // Add the final part. - xlMeta.AddObjectPart(1, "part.1", newMD5Hex, xlMeta.Stat.Size) - - partsMetadata := make([]xlMetaV1, len(xl.storageDisks)) // Update `xl.json` content on each disks. for index := range partsMetadata { - partsMetadata[index] = xlMeta - partsMetadata[index].Erasure.AddCheckSumInfo(checkSumInfo{ - Name: "part.1", - Hash: checkSums[index], - Algorithm: bitRotAlgo, - }) + partsMetadata[index].Meta = metadata + partsMetadata[index].Stat.Size = size + partsMetadata[index].Stat.ModTime = modTime } // Write unique `xl.json` for each disk. @@ -639,6 +689,10 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. newBuffer.Close() } + // Object info is the same in all disks, so we can pick the first meta + // of the first disk + xlMeta = partsMetadata[0] + objInfo = ObjectInfo{ IsDir: false, Bucket: bucket, diff --git a/cmd/xl-v1-utils.go b/cmd/xl-v1-utils.go index e2309483c..536eae968 100644 --- a/cmd/xl-v1-utils.go +++ b/cmd/xl-v1-utils.go @@ -343,3 +343,30 @@ func getOrderedDisks(distribution []int, disks []StorageAPI) (orderedDisks []Sto } return orderedDisks } + +// getPartSizeFromIdx predicts the part size according to its index. It also +// returns -1 when totalSize is also -1. +func getPartSizeFromIdx(totalSize int64, partSize int64, partIndex int) int64 { + if partSize == 0 { + panic("Part size cannot be nil.") + } + if partIndex < 1 { + panic("Part index should be greater than 1.") + } + switch totalSize { + case -1, 0: + return totalSize + } + // Compute the total count of parts + partsCount := totalSize/partSize + 1 + // Return the part's size + switch { + case int64(partIndex) < partsCount: + return partSize + case int64(partIndex) == partsCount: + // Size of last part + return totalSize % partSize + default: + return 0 + } +} diff --git a/cmd/xl-v1-utils_test.go b/cmd/xl-v1-utils_test.go index 1bada5408..4b80ffbb4 100644 --- a/cmd/xl-v1-utils_test.go +++ b/cmd/xl-v1-utils_test.go @@ -22,6 +22,8 @@ import ( "strconv" "testing" "time" + + humanize "github.com/dustin/go-humanize" ) // Tests caclculating disk count. @@ -325,3 +327,35 @@ func TestGetXLMetaV1GJson10(t *testing.T) { } compareXLMetaV1(t, unMarshalXLMeta, gjsonXLMeta) } + +// Test the predicted part size from the part index +func TestGetPartSizeFromIdx(t *testing.T) { + // Create test cases + testCases := []struct { + totalSize int64 + partSize int64 + partIndex int + expectedSize int64 + }{ + // Total size is - 1 + {-1, 10, 1, -1}, + // Total size is zero + {0, 10, 1, 0}, + // part size 2MiB, total size 4MiB + {4 * humanize.MiByte, 2 * humanize.MiByte, 1, 2 * humanize.MiByte}, + {4 * humanize.MiByte, 2 * humanize.MiByte, 2, 2 * humanize.MiByte}, + {4 * humanize.MiByte, 2 * humanize.MiByte, 3, 0}, + // part size 2MiB, total size 5MiB + {5 * humanize.MiByte, 2 * humanize.MiByte, 1, 2 * humanize.MiByte}, + {5 * humanize.MiByte, 2 * humanize.MiByte, 2, 2 * humanize.MiByte}, + {5 * humanize.MiByte, 2 * humanize.MiByte, 3, 1 * humanize.MiByte}, + {5 * humanize.MiByte, 2 * humanize.MiByte, 4, 0}, + } + + for i, testCase := range testCases { + s := getPartSizeFromIdx(testCase.totalSize, testCase.partSize, testCase.partIndex) + if s != testCase.expectedSize { + t.Errorf("Test %d: The calculated part size is incorrect: expected = %d, found = %d\n", i+1, testCase.expectedSize, s) + } + } +}