From a343d14f191480d4f9a01c70840bc17a3142aab4 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas <634494+krishnasrinivas@users.noreply.github.com> Date: Tue, 14 May 2019 12:33:18 -0700 Subject: [PATCH] Simplify putObject by not breaking the stream into parts (#7199) We broke into parts previously as we had checksum for the entire file to tax less on memory and to have better TTFB. We dont need to now, after the streaming-bitrot change. --- cmd/xl-v1-healing-common_test.go | 5 +- cmd/xl-v1-object.go | 107 ++++++++----------------------- 2 files changed, 29 insertions(+), 83 deletions(-) diff --git a/cmd/xl-v1-healing-common_test.go b/cmd/xl-v1-healing-common_test.go index cbd01e563..fa3e51c07 100644 --- a/cmd/xl-v1-healing-common_test.go +++ b/cmd/xl-v1-healing-common_test.go @@ -311,9 +311,9 @@ func TestDisksWithAllParts(t *testing.T) { diskFailures := make(map[int]string) // key = disk index, value = part name with hash mismatch - diskFailures[0] = "part.3" + diskFailures[0] = "part.1" diskFailures[3] = "part.1" - diskFailures[15] = "part.2" + diskFailures[15] = "part.1" for diskIndex, partName := range diskFailures { for _, info := range partsMetadata[diskIndex].Erasure.Checksums { @@ -354,5 +354,4 @@ func TestDisksWithAllParts(t *testing.T) { } } - } diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 8f9b9bf83..e6cd6d161 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -22,7 +22,6 @@ import ( "io" "net/http" "path" - "strconv" "sync" "github.com/minio/minio/cmd/logger" @@ -574,9 +573,6 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, return ObjectInfo{}, toObjectErr(errFileParentIsFile, bucket, object) } - // Limit the reader to its provided size if specified. - var reader io.Reader = data - // Initialize parts metadata partsMetadata := make([]xlMetaV1, len(xl.getDisks())) @@ -588,10 +584,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, } // Order disks according to erasure distribution - onlineDisks := shuffleDisks(xl.getDisks(), partsMetadata[0].Erasure.Distribution) - - // Total size of the written object - var sizeWritten int64 + onlineDisks := shuffleDisks(xl.getDisks(), xlMeta.Erasure.Distribution) erasure, err := NewErasure(ctx, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, xlMeta.Erasure.BlockSize) if err != nil { @@ -615,82 +608,37 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, buffer = buffer[:xlMeta.Erasure.BlockSize] } - // Read data and split into parts - similar to multipart mechanism - for partIdx := 1; ; partIdx++ { - // Compute part name - partName := "part." + strconv.Itoa(partIdx) - // Compute the path of current part - tempErasureObj := pathJoin(uniqueID, partName) - - // Calculate the size of the current part. - var curPartSize int64 - curPartSize, err = calculatePartSizeFromIdx(ctx, data.Size(), globalPutPartSize, partIdx) - if err != nil { - logger.LogIf(ctx, err) - return ObjectInfo{}, toObjectErr(err, bucket, object) - } - - // Hint the filesystem to pre-allocate one continuous large block. - // This is only an optimization. - var curPartReader io.Reader - - if curPartSize < data.Size() { - curPartReader = io.LimitReader(reader, curPartSize) - } else { - curPartReader = reader - } - - writers := make([]io.Writer, len(onlineDisks)) - for i, disk := range onlineDisks { - if disk == nil { - continue - } - writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj, erasure.ShardFileSize(curPartSize), DefaultBitrotAlgorithm, erasure.ShardSize()) - } - - n, erasureErr := erasure.Encode(ctx, curPartReader, writers, buffer, erasure.dataBlocks+1) - // Note: we should not be defer'ing the following closeBitrotWriters() call as we are inside a for loop i.e if we use defer, we would accumulate a lot of open files by the time - // we return from this function. - closeBitrotWriters(writers) - if erasureErr != nil { - return ObjectInfo{}, toObjectErr(erasureErr, minioMetaTmpBucket, tempErasureObj) - } - - // Should return IncompleteBody{} error when reader has fewer bytes - // than specified in request header. - - if n < curPartSize && data.Size() > 0 { - logger.LogIf(ctx, IncompleteBody{}) - return ObjectInfo{}, IncompleteBody{} - } + partName := "part.1" + tempErasureObj := pathJoin(uniqueID, partName) - if n == 0 && data.Size() == -1 { - // The last part of a compressed object will always be empty - // Since the compressed size is unpredictable. - // Hence removing the last (empty) part from all `xl.disks`. - dErr := xl.deleteObject(ctx, minioMetaTmpBucket, tempErasureObj, writeQuorum, true) - if dErr != nil { - return ObjectInfo{}, toObjectErr(dErr, minioMetaTmpBucket, tempErasureObj) - } - break + writers := make([]io.Writer, len(onlineDisks)) + for i, disk := range onlineDisks { + if disk == nil { + continue } + writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize()) + } - // Update the total written size - sizeWritten += n + n, erasureErr := erasure.Encode(ctx, data, writers, buffer, erasure.dataBlocks+1) + closeBitrotWriters(writers) + if erasureErr != nil { + return ObjectInfo{}, toObjectErr(erasureErr, minioMetaTmpBucket, tempErasureObj) + } - for i, w := range writers { - if w == nil { - onlineDisks[i] = nil - continue - } - partsMetadata[i].AddObjectPart(partIdx, partName, "", n, data.ActualSize()) - partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{partName, DefaultBitrotAlgorithm, bitrotWriterSum(w)}) - } + // Should return IncompleteBody{} error when reader has fewer bytes + // than specified in request header. + if n < data.Size() { + logger.LogIf(ctx, IncompleteBody{}) + return ObjectInfo{}, IncompleteBody{} + } - // We wrote everything, break out. - if sizeWritten == data.Size() { - break + for i, w := range writers { + if w == nil { + onlineDisks[i] = nil + continue } + partsMetadata[i].AddObjectPart(1, partName, "", n, data.ActualSize()) + partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{partName, DefaultBitrotAlgorithm, bitrotWriterSum(w)}) } // Save additional erasureMetadata. @@ -728,7 +676,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, // Update `xl.json` content on each disks. for index := range partsMetadata { partsMetadata[index].Meta = opts.UserDefined - partsMetadata[index].Stat.Size = sizeWritten + partsMetadata[index].Stat.Size = n partsMetadata[index].Stat.ModTime = modTime } @@ -758,7 +706,6 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, UserDefined: xlMeta.Meta, } - // Success, return object info. return objInfo, nil }