Simplify putObject by not breaking the stream into parts (#7199)

We broke into parts previously as we had checksum for the entire file
to tax less on memory and to have better TTFB. We dont need to now,
after the streaming-bitrot change.
master
Krishna Srinivas 6 years ago committed by kannappanr
parent 9c90a28546
commit a343d14f19
  1. 5
      cmd/xl-v1-healing-common_test.go
  2. 107
      cmd/xl-v1-object.go

@ -311,9 +311,9 @@ func TestDisksWithAllParts(t *testing.T) {
diskFailures := make(map[int]string) diskFailures := make(map[int]string)
// key = disk index, value = part name with hash mismatch // key = disk index, value = part name with hash mismatch
diskFailures[0] = "part.3" diskFailures[0] = "part.1"
diskFailures[3] = "part.1" diskFailures[3] = "part.1"
diskFailures[15] = "part.2" diskFailures[15] = "part.1"
for diskIndex, partName := range diskFailures { for diskIndex, partName := range diskFailures {
for _, info := range partsMetadata[diskIndex].Erasure.Checksums { for _, info := range partsMetadata[diskIndex].Erasure.Checksums {
@ -354,5 +354,4 @@ func TestDisksWithAllParts(t *testing.T) {
} }
} }
} }

@ -22,7 +22,6 @@ import (
"io" "io"
"net/http" "net/http"
"path" "path"
"strconv"
"sync" "sync"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
@ -574,9 +573,6 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
return ObjectInfo{}, toObjectErr(errFileParentIsFile, bucket, object) return ObjectInfo{}, toObjectErr(errFileParentIsFile, bucket, object)
} }
// Limit the reader to its provided size if specified.
var reader io.Reader = data
// Initialize parts metadata // Initialize parts metadata
partsMetadata := make([]xlMetaV1, len(xl.getDisks())) partsMetadata := make([]xlMetaV1, len(xl.getDisks()))
@ -588,10 +584,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
} }
// Order disks according to erasure distribution // Order disks according to erasure distribution
onlineDisks := shuffleDisks(xl.getDisks(), partsMetadata[0].Erasure.Distribution) onlineDisks := shuffleDisks(xl.getDisks(), xlMeta.Erasure.Distribution)
// Total size of the written object
var sizeWritten int64
erasure, err := NewErasure(ctx, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, xlMeta.Erasure.BlockSize) erasure, err := NewErasure(ctx, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, xlMeta.Erasure.BlockSize)
if err != nil { if err != nil {
@ -615,82 +608,37 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
buffer = buffer[:xlMeta.Erasure.BlockSize] buffer = buffer[:xlMeta.Erasure.BlockSize]
} }
// Read data and split into parts - similar to multipart mechanism partName := "part.1"
for partIdx := 1; ; partIdx++ { tempErasureObj := pathJoin(uniqueID, partName)
// Compute part name
partName := "part." + strconv.Itoa(partIdx)
// Compute the path of current part
tempErasureObj := pathJoin(uniqueID, partName)
// Calculate the size of the current part.
var curPartSize int64
curPartSize, err = calculatePartSizeFromIdx(ctx, data.Size(), globalPutPartSize, partIdx)
if err != nil {
logger.LogIf(ctx, err)
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
// Hint the filesystem to pre-allocate one continuous large block.
// This is only an optimization.
var curPartReader io.Reader
if curPartSize < data.Size() {
curPartReader = io.LimitReader(reader, curPartSize)
} else {
curPartReader = reader
}
writers := make([]io.Writer, len(onlineDisks))
for i, disk := range onlineDisks {
if disk == nil {
continue
}
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj, erasure.ShardFileSize(curPartSize), DefaultBitrotAlgorithm, erasure.ShardSize())
}
n, erasureErr := erasure.Encode(ctx, curPartReader, writers, buffer, erasure.dataBlocks+1)
// Note: we should not be defer'ing the following closeBitrotWriters() call as we are inside a for loop i.e if we use defer, we would accumulate a lot of open files by the time
// we return from this function.
closeBitrotWriters(writers)
if erasureErr != nil {
return ObjectInfo{}, toObjectErr(erasureErr, minioMetaTmpBucket, tempErasureObj)
}
// Should return IncompleteBody{} error when reader has fewer bytes
// than specified in request header.
if n < curPartSize && data.Size() > 0 {
logger.LogIf(ctx, IncompleteBody{})
return ObjectInfo{}, IncompleteBody{}
}
if n == 0 && data.Size() == -1 { writers := make([]io.Writer, len(onlineDisks))
// The last part of a compressed object will always be empty for i, disk := range onlineDisks {
// Since the compressed size is unpredictable. if disk == nil {
// Hence removing the last (empty) part from all `xl.disks`. continue
dErr := xl.deleteObject(ctx, minioMetaTmpBucket, tempErasureObj, writeQuorum, true)
if dErr != nil {
return ObjectInfo{}, toObjectErr(dErr, minioMetaTmpBucket, tempErasureObj)
}
break
} }
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize())
}
// Update the total written size n, erasureErr := erasure.Encode(ctx, data, writers, buffer, erasure.dataBlocks+1)
sizeWritten += n closeBitrotWriters(writers)
if erasureErr != nil {
return ObjectInfo{}, toObjectErr(erasureErr, minioMetaTmpBucket, tempErasureObj)
}
for i, w := range writers { // Should return IncompleteBody{} error when reader has fewer bytes
if w == nil { // than specified in request header.
onlineDisks[i] = nil if n < data.Size() {
continue logger.LogIf(ctx, IncompleteBody{})
} return ObjectInfo{}, IncompleteBody{}
partsMetadata[i].AddObjectPart(partIdx, partName, "", n, data.ActualSize()) }
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{partName, DefaultBitrotAlgorithm, bitrotWriterSum(w)})
}
// We wrote everything, break out. for i, w := range writers {
if sizeWritten == data.Size() { if w == nil {
break onlineDisks[i] = nil
continue
} }
partsMetadata[i].AddObjectPart(1, partName, "", n, data.ActualSize())
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{partName, DefaultBitrotAlgorithm, bitrotWriterSum(w)})
} }
// Save additional erasureMetadata. // Save additional erasureMetadata.
@ -728,7 +676,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
// Update `xl.json` content on each disks. // Update `xl.json` content on each disks.
for index := range partsMetadata { for index := range partsMetadata {
partsMetadata[index].Meta = opts.UserDefined partsMetadata[index].Meta = opts.UserDefined
partsMetadata[index].Stat.Size = sizeWritten partsMetadata[index].Stat.Size = n
partsMetadata[index].Stat.ModTime = modTime partsMetadata[index].Stat.ModTime = modTime
} }
@ -758,7 +706,6 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
UserDefined: xlMeta.Meta, UserDefined: xlMeta.Meta,
} }
// Success, return object info.
return objInfo, nil return objInfo, nil
} }

Loading…
Cancel
Save