fix: readData in bulk call using messagepack byte wrappers (#11228)

This PR refactors the way we use buffers for O_DIRECT and
to re-use those buffers for messagepack reader writer.

After some extensive benchmarking found that not all objects
have this benefit, and only objects smaller than 64KiB see
this benefit overall.

Benefits are seen from almost all objects from

1KiB - 32KiB

Beyond this no objects see benefit with bulk call approach
as the latency of bytes sent over the wire v/s streaming
content directly from disk negate each other with no
remarkable benefits.

All other optimizations include reuse of msgp.Reader,
msgp.Writer using sync.Pool's for all internode calls.
master
Harshavardhana 4 years ago committed by GitHub
parent a4f6705874
commit f21d650ed4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      cmd/bitrot-streaming.go
  2. 4
      cmd/bitrot.go
  3. 2
      cmd/bitrot_test.go
  4. 8
      cmd/erasure-decode_test.go
  5. 2
      cmd/erasure-heal_test.go
  6. 6
      cmd/erasure-healing-common_test.go
  7. 4
      cmd/erasure-healing.go
  8. 12
      cmd/erasure-healing_test.go
  9. 4
      cmd/erasure-metadata-utils.go
  10. 20
      cmd/erasure-multipart.go
  11. 19
      cmd/erasure-object.go
  12. 26
      cmd/erasure-object_test.go
  13. 8
      cmd/metacache-set.go
  14. 4
      cmd/naughty-disk_test.go
  15. 2
      cmd/storage-datatypes.go
  16. 34
      cmd/storage-datatypes_gen.go
  17. 2
      cmd/storage-interface.go
  18. 34
      cmd/storage-rest-client.go
  19. 3
      cmd/storage-rest-common.go
  20. 16
      cmd/storage-rest-server.go
  21. 4
      cmd/xl-storage-disk-id-check.go
  22. 114
      cmd/xl-storage.go
  23. 3
      go.mod
  24. 4
      go.sum

@ -91,6 +91,7 @@ func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length i
// ReadAt() implementation which verifies the bitrot hash available as part of the stream. // ReadAt() implementation which verifies the bitrot hash available as part of the stream.
type streamingBitrotReader struct { type streamingBitrotReader struct {
disk StorageAPI disk StorageAPI
data []byte
rc io.Reader rc io.Reader
volume string volume string
filePath string filePath string
@ -122,7 +123,11 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) {
// For the first ReadAt() call we need to open the stream for reading. // For the first ReadAt() call we need to open the stream for reading.
b.currOffset = offset b.currOffset = offset
streamOffset := (offset/b.shardSize)*int64(b.h.Size()) + offset streamOffset := (offset/b.shardSize)*int64(b.h.Size()) + offset
if len(b.data) == 0 {
b.rc, err = b.disk.ReadFileStream(context.TODO(), b.volume, b.filePath, streamOffset, b.tillOffset-streamOffset) b.rc, err = b.disk.ReadFileStream(context.TODO(), b.volume, b.filePath, streamOffset, b.tillOffset-streamOffset)
} else {
b.rc = io.NewSectionReader(bytes.NewReader(b.data), streamOffset, b.tillOffset-streamOffset)
}
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -154,10 +159,11 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) {
} }
// Returns streaming bitrot reader implementation. // Returns streaming bitrot reader implementation.
func newStreamingBitrotReader(disk StorageAPI, volume, filePath string, tillOffset int64, algo BitrotAlgorithm, shardSize int64) *streamingBitrotReader { func newStreamingBitrotReader(disk StorageAPI, data []byte, volume, filePath string, tillOffset int64, algo BitrotAlgorithm, shardSize int64) *streamingBitrotReader {
h := algo.New() h := algo.New()
return &streamingBitrotReader{ return &streamingBitrotReader{
disk, disk,
data,
nil, nil,
volume, volume,
filePath, filePath,

@ -103,9 +103,9 @@ func newBitrotWriter(disk StorageAPI, volume, filePath string, length int64, alg
return newWholeBitrotWriter(disk, volume, filePath, algo, shardSize) return newWholeBitrotWriter(disk, volume, filePath, algo, shardSize)
} }
func newBitrotReader(disk StorageAPI, bucket string, filePath string, tillOffset int64, algo BitrotAlgorithm, sum []byte, shardSize int64) io.ReaderAt { func newBitrotReader(disk StorageAPI, data []byte, bucket string, filePath string, tillOffset int64, algo BitrotAlgorithm, sum []byte, shardSize int64) io.ReaderAt {
if algo == HighwayHash256S { if algo == HighwayHash256S {
return newStreamingBitrotReader(disk, bucket, filePath, tillOffset, algo, shardSize) return newStreamingBitrotReader(disk, data, bucket, filePath, tillOffset, algo, shardSize)
} }
return newWholeBitrotReader(disk, bucket, filePath, algo, tillOffset, sum) return newWholeBitrotReader(disk, bucket, filePath, algo, tillOffset, sum)
} }

@ -62,7 +62,7 @@ func testBitrotReaderWriterAlgo(t *testing.T, bitrotAlgo BitrotAlgorithm) {
} }
writer.(io.Closer).Close() writer.(io.Closer).Close()
reader := newBitrotReader(disk, volume, filePath, 35, bitrotAlgo, bitrotWriterSum(writer), 10) reader := newBitrotReader(disk, nil, volume, filePath, 35, bitrotAlgo, bitrotWriterSum(writer), 10)
b := make([]byte, 10) b := make([]byte, 10)
if _, err = reader.ReadAt(b, 0); err != nil { if _, err = reader.ReadAt(b, 0); err != nil {
log.Fatal(err) log.Fatal(err)

@ -134,7 +134,7 @@ func TestErasureDecode(t *testing.T) {
} }
tillOffset := erasure.ShardFileOffset(test.offset, test.length, test.data) tillOffset := erasure.ShardFileOffset(test.offset, test.length, test.data)
bitrotReaders[index] = newBitrotReader(disk, "testbucket", "object", tillOffset, writeAlgorithm, bitrotWriterSum(writers[index]), erasure.ShardSize()) bitrotReaders[index] = newBitrotReader(disk, nil, "testbucket", "object", tillOffset, writeAlgorithm, bitrotWriterSum(writers[index]), erasure.ShardSize())
} }
writer := bytes.NewBuffer(nil) writer := bytes.NewBuffer(nil)
@ -164,7 +164,7 @@ func TestErasureDecode(t *testing.T) {
continue continue
} }
tillOffset := erasure.ShardFileOffset(test.offset, test.length, test.data) tillOffset := erasure.ShardFileOffset(test.offset, test.length, test.data)
bitrotReaders[index] = newBitrotReader(disk, "testbucket", "object", tillOffset, writeAlgorithm, bitrotWriterSum(writers[index]), erasure.ShardSize()) bitrotReaders[index] = newBitrotReader(disk, nil, "testbucket", "object", tillOffset, writeAlgorithm, bitrotWriterSum(writers[index]), erasure.ShardSize())
} }
for j := range disks[:test.offDisks] { for j := range disks[:test.offDisks] {
if bitrotReaders[j] == nil { if bitrotReaders[j] == nil {
@ -270,7 +270,7 @@ func TestErasureDecodeRandomOffsetLength(t *testing.T) {
continue continue
} }
tillOffset := erasure.ShardFileOffset(offset, readLen, length) tillOffset := erasure.ShardFileOffset(offset, readLen, length)
bitrotReaders[index] = newStreamingBitrotReader(disk, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) bitrotReaders[index] = newStreamingBitrotReader(disk, nil, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize())
} }
err = erasure.Decode(context.Background(), buf, bitrotReaders, offset, readLen, length, nil) err = erasure.Decode(context.Background(), buf, bitrotReaders, offset, readLen, length, nil)
closeBitrotReaders(bitrotReaders) closeBitrotReaders(bitrotReaders)
@ -332,7 +332,7 @@ func benchmarkErasureDecode(data, parity, dataDown, parityDown int, size int64,
continue continue
} }
tillOffset := erasure.ShardFileOffset(0, size, size) tillOffset := erasure.ShardFileOffset(0, size, size)
bitrotReaders[index] = newStreamingBitrotReader(disk, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) bitrotReaders[index] = newStreamingBitrotReader(disk, nil, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize())
} }
if err = erasure.Decode(context.Background(), bytes.NewBuffer(content[:0]), bitrotReaders, 0, size, size, nil); err != nil { if err = erasure.Decode(context.Background(), bytes.NewBuffer(content[:0]), bitrotReaders, 0, size, size, nil); err != nil {
panic(err) panic(err)

@ -99,7 +99,7 @@ func TestErasureHeal(t *testing.T) {
readers := make([]io.ReaderAt, len(disks)) readers := make([]io.ReaderAt, len(disks))
for i, disk := range disks { for i, disk := range disks {
shardFilesize := erasure.ShardFileSize(test.size) shardFilesize := erasure.ShardFileSize(test.size)
readers[i] = newBitrotReader(disk, "testbucket", "testobject", shardFilesize, test.algorithm, bitrotWriterSum(writers[i]), erasure.ShardSize()) readers[i] = newBitrotReader(disk, nil, "testbucket", "testobject", shardFilesize, test.algorithm, bitrotWriterSum(writers[i]), erasure.ShardSize())
} }
// setup stale disks for the test case // setup stale disks for the test case

@ -186,7 +186,7 @@ func TestListOnlineDisks(t *testing.T) {
t.Fatalf("Failed to putObject %v", err) t.Fatalf("Failed to putObject %v", err)
} }
partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "") partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
fi, err := getLatestFileInfo(ctx, partsMetadata, errs) fi, err := getLatestFileInfo(ctx, partsMetadata, errs)
if err != nil { if err != nil {
t.Fatalf("Failed to getLatestFileInfo %v", err) t.Fatalf("Failed to getLatestFileInfo %v", err)
@ -287,7 +287,7 @@ func TestDisksWithAllParts(t *testing.T) {
t.Fatalf("Failed to putObject %v", err) t.Fatalf("Failed to putObject %v", err)
} }
_, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "") _, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
readQuorum := len(erasureDisks) / 2 readQuorum := len(erasureDisks) / 2
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
t.Fatalf("Failed to read xl meta data %v", reducedErr) t.Fatalf("Failed to read xl meta data %v", reducedErr)
@ -295,7 +295,7 @@ func TestDisksWithAllParts(t *testing.T) {
// Test that all disks are returned without any failures with // Test that all disks are returned without any failures with
// unmodified meta data // unmodified meta data
partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "") partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
if err != nil { if err != nil {
t.Fatalf("Failed to read xl meta data %v", err) t.Fatalf("Failed to read xl meta data %v", err)
} }

@ -395,7 +395,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
if latestMeta.XLV1 { if latestMeta.XLV1 {
partPath = pathJoin(object, fmt.Sprintf("part.%d", partNumber)) partPath = pathJoin(object, fmt.Sprintf("part.%d", partNumber))
} }
readers[i] = newBitrotReader(disk, bucket, partPath, tillOffset, checksumAlgo, checksumInfo.Hash, erasure.ShardSize()) readers[i] = newBitrotReader(disk, nil, bucket, partPath, tillOffset, checksumAlgo, checksumInfo.Hash, erasure.ShardSize())
} }
writers := make([]io.Writer, len(outDatedDisks)) writers := make([]io.Writer, len(outDatedDisks))
for i, disk := range outDatedDisks { for i, disk := range outDatedDisks {
@ -811,7 +811,7 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
storageEndpoints := er.getEndpoints() storageEndpoints := er.getEndpoints()
// Read metadata files from all the disks // Read metadata files from all the disks
partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID) partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false)
if isAllNotFound(errs) { if isAllNotFound(errs) {
err = toObjectErr(errFileNotFound, bucket, object) err = toObjectErr(errFileNotFound, bucket, object)

@ -67,7 +67,7 @@ func TestHealing(t *testing.T) {
} }
disk := er.getDisks()[0] disk := er.getDisks()[0]
fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "") fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -84,7 +84,7 @@ func TestHealing(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "") fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -113,7 +113,7 @@ func TestHealing(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "") fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -214,7 +214,7 @@ func TestHealObjectCorrupted(t *testing.T) {
t.Fatalf("Failed to heal object - %v", err) t.Fatalf("Failed to heal object - %v", err)
} }
fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "") fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
fi, err := getLatestFileInfo(ctx, fileInfos, errs) fi, err := getLatestFileInfo(ctx, fileInfos, errs)
if err != nil { if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err) t.Fatalf("Failed to getLatestFileInfo - %v", err)
@ -239,7 +239,7 @@ func TestHealObjectCorrupted(t *testing.T) {
t.Errorf("Expected nil but received %v", err) t.Errorf("Expected nil but received %v", err)
} }
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "") fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
nfi, err := getLatestFileInfo(ctx, fileInfos, errs) nfi, err := getLatestFileInfo(ctx, fileInfos, errs)
if err != nil { if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err) t.Fatalf("Failed to getLatestFileInfo - %v", err)
@ -265,7 +265,7 @@ func TestHealObjectCorrupted(t *testing.T) {
t.Errorf("Expected nil but received %v", err) t.Errorf("Expected nil but received %v", err)
} }
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "") fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
nfi, err = getLatestFileInfo(ctx, fileInfos, errs) nfi, err = getLatestFileInfo(ctx, fileInfos, errs)
if err != nil { if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err) t.Fatalf("Failed to getLatestFileInfo - %v", err)

@ -115,7 +115,7 @@ func hashOrder(key string, cardinality int) []int {
// Reads all `xl.meta` metadata as a FileInfo slice. // Reads all `xl.meta` metadata as a FileInfo slice.
// Returns error slice indicating the failed metadata reads. // Returns error slice indicating the failed metadata reads.
func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string) ([]FileInfo, []error) { func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string, readData bool) ([]FileInfo, []error) {
metadataArray := make([]FileInfo, len(disks)) metadataArray := make([]FileInfo, len(disks))
g := errgroup.WithNErrs(len(disks)) g := errgroup.WithNErrs(len(disks))
@ -126,7 +126,7 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve
if disks[index] == nil { if disks[index] == nil {
return errDiskNotFound return errDiskNotFound
} }
metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID) metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID, readData)
if err != nil { if err != nil {
if !IsErr(err, errFileNotFound, errVolumeNotFound, errFileVersionNotFound, errDiskNotFound) { if !IsErr(err, errFileNotFound, errVolumeNotFound, errFileVersionNotFound, errDiskNotFound) {
logger.LogOnceIf(ctx, err, disks[index].String()) logger.LogOnceIf(ctx, err, disks[index].String())

@ -46,7 +46,7 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object
disks := er.getDisks() disks := er.getDisks()
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), "") metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), "", false)
readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs) readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
if err != nil { if err != nil {
@ -113,7 +113,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
} }
for _, uploadIDDir := range uploadIDDirs { for _, uploadIDDir := range uploadIDDirs {
uploadIDPath := pathJoin(shaDir, uploadIDDir) uploadIDPath := pathJoin(shaDir, uploadIDDir)
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "") fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "", false)
if err != nil { if err != nil {
continue continue
} }
@ -127,7 +127,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
return return
} }
for _, tmpDir := range tmpDirs { for _, tmpDir := range tmpDirs {
fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "") fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "", false)
if err != nil { if err != nil {
continue continue
} }
@ -181,7 +181,7 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec
if populatedUploadIds.Contains(uploadID) { if populatedUploadIds.Contains(uploadID) {
continue continue
} }
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "") fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "", false)
if err != nil { if err != nil {
return result, toObjectErr(err, bucket, object) return result, toObjectErr(err, bucket, object)
} }
@ -371,7 +371,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
partsMetadata, errs = readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, partsMetadata, errs = readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket,
uploadIDPath, "") uploadIDPath, "", false)
// get Quorum for this object // get Quorum for this object
_, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs) _, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
@ -474,7 +474,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
} }
// Read metadata again because it might be updated with parallel upload of another part. // Read metadata again because it might be updated with parallel upload of another part.
partsMetadata, errs = readAllFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "") partsMetadata, errs = readAllFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
reducedErr = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) reducedErr = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
if reducedErr == errErasureWriteQuorum { if reducedErr == errErasureWriteQuorum {
return pi, toObjectErr(reducedErr, bucket, object) return pi, toObjectErr(reducedErr, bucket, object)
@ -552,7 +552,7 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
storageDisks := er.getDisks() storageDisks := er.getDisks()
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, opts.VersionID) partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, opts.VersionID, false)
// get Quorum for this object // get Quorum for this object
readQuorum, _, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs) readQuorum, _, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
@ -600,7 +600,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
storageDisks := er.getDisks() storageDisks := er.getDisks()
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "") partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
// get Quorum for this object // get Quorum for this object
_, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs) _, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
@ -704,7 +704,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
storageDisks := er.getDisks() storageDisks := er.getDisks()
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "") partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
// get Quorum for this object // get Quorum for this object
_, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs) _, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
@ -889,7 +889,7 @@ func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, objec
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
partsMetadata, errs := readAllFileInfo(ctx, er.getDisks(), minioMetaMultipartBucket, uploadIDPath, "") partsMetadata, errs := readAllFileInfo(ctx, er.getDisks(), minioMetaMultipartBucket, uploadIDPath, "", false)
// get Quorum for this object // get Quorum for this object
_, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs) _, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)

@ -58,7 +58,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
storageDisks := er.getDisks() storageDisks := er.getDisks()
metaArr, errs := readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID) metaArr, errs := readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID, false)
// get Quorum for this object // get Quorum for this object
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs) readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
@ -157,7 +157,7 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
unlockOnDefer = true unlockOnDefer = true
} }
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts) fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true)
if err != nil { if err != nil {
return nil, toObjectErr(err, bucket, object) return nil, toObjectErr(err, bucket, object)
} }
@ -298,7 +298,8 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
} }
checksumInfo := metaArr[index].Erasure.GetChecksumInfo(partNumber) checksumInfo := metaArr[index].Erasure.GetChecksumInfo(partNumber)
partPath := pathJoin(object, metaArr[index].DataDir, fmt.Sprintf("part.%d", partNumber)) partPath := pathJoin(object, metaArr[index].DataDir, fmt.Sprintf("part.%d", partNumber))
readers[index] = newBitrotReader(disk, bucket, partPath, tillOffset, data := metaArr[index].Data
readers[index] = newBitrotReader(disk, data, bucket, partPath, tillOffset,
checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize()) checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize())
// Prefer local disks // Prefer local disks
@ -337,7 +338,7 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
// getObject wrapper for erasure GetObject // getObject wrapper for erasure GetObject
func (er erasureObjects) getObject(ctx context.Context, bucket, object string, startOffset, length int64, writer io.Writer, opts ObjectOptions) error { func (er erasureObjects) getObject(ctx context.Context, bucket, object string, startOffset, length int64, writer io.Writer, opts ObjectOptions) error {
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts) fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true)
if err != nil { if err != nil {
return toObjectErr(err, bucket, object) return toObjectErr(err, bucket, object)
} }
@ -364,11 +365,11 @@ func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object strin
return er.getObjectInfo(ctx, bucket, object, opts) return er.getObjectInfo(ctx, bucket, object, opts)
} }
func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) { func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions, readData bool) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) {
disks := er.getDisks() disks := er.getDisks()
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID) metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, readData)
readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs) readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
if err != nil { if err != nil {
@ -410,7 +411,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo. // getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo.
func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
fi, _, _, err := er.getObjectFileInfo(ctx, bucket, object, opts) fi, _, _, err := er.getObjectFileInfo(ctx, bucket, object, opts, false)
if err != nil { if err != nil {
return objInfo, toObjectErr(err, bucket, object) return objInfo, toObjectErr(err, bucket, object)
} }
@ -1073,7 +1074,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
disks := er.getDisks() disks := er.getDisks()
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID) metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false)
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs) readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
if err != nil { if err != nil {
@ -1134,7 +1135,7 @@ func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object st
disks := er.getDisks() disks := er.getDisks()
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID) metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false)
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs) readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
if err != nil { if err != nil {

@ -19,7 +19,9 @@ package cmd
import ( import (
"bytes" "bytes"
"context" "context"
crand "crypto/rand"
"errors" "errors"
"io"
"io/ioutil" "io/ioutil"
"os" "os"
"testing" "testing"
@ -288,9 +290,13 @@ func TestGetObjectNoQuorum(t *testing.T) {
bucket := "bucket" bucket := "bucket"
object := "object" object := "object"
opts := ObjectOptions{} opts := ObjectOptions{}
buf := make([]byte, 33<<10)
if _, err = io.ReadFull(crand.Reader, buf); err != nil {
t.Fatal(err)
}
// Test use case 1: All disks are online, xl.meta are present, but data are missing // Test use case 1: All disks are online, xl.meta are present, but data are missing
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts) _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(buf), int64(len(buf)), "", ""), opts)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -304,7 +310,7 @@ func TestGetObjectNoQuorum(t *testing.T) {
} }
} }
err = xl.GetObject(ctx, bucket, object, 0, int64(len("abcd")), ioutil.Discard, "", opts) err = xl.GetObject(ctx, bucket, object, 0, int64(len(buf)), ioutil.Discard, "", opts)
if err != toObjectErr(errFileNotFound, bucket, object) { if err != toObjectErr(errFileNotFound, bucket, object) {
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err) t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err)
} }
@ -315,7 +321,7 @@ func TestGetObjectNoQuorum(t *testing.T) {
// invocations, where f - [0,2) // invocations, where f - [0,2)
// Create "object" under "bucket". // Create "object" under "bucket".
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts) _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(buf), int64(len(buf)), "", ""), opts)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -521,7 +527,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
t.Fatalf("Failed to putObject %v", err) t.Fatalf("Failed to putObject %v", err)
} }
parts1, errs1 := readAllFileInfo(ctx, erasureDisks, bucket, object1, "") parts1, errs1 := readAllFileInfo(ctx, erasureDisks, bucket, object1, "", false)
parts1SC := globalStorageClass parts1SC := globalStorageClass
@ -534,7 +540,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
t.Fatalf("Failed to putObject %v", err) t.Fatalf("Failed to putObject %v", err)
} }
parts2, errs2 := readAllFileInfo(ctx, erasureDisks, bucket, object2, "") parts2, errs2 := readAllFileInfo(ctx, erasureDisks, bucket, object2, "", false)
parts2SC := globalStorageClass parts2SC := globalStorageClass
// Object for test case 3 - No StorageClass defined, MetaData in PutObject requesting Standard Storage Class // Object for test case 3 - No StorageClass defined, MetaData in PutObject requesting Standard Storage Class
@ -546,7 +552,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
t.Fatalf("Failed to putObject %v", err) t.Fatalf("Failed to putObject %v", err)
} }
parts3, errs3 := readAllFileInfo(ctx, erasureDisks, bucket, object3, "") parts3, errs3 := readAllFileInfo(ctx, erasureDisks, bucket, object3, "", false)
parts3SC := globalStorageClass parts3SC := globalStorageClass
// Object for test case 4 - Standard StorageClass defined as Parity 6, MetaData in PutObject requesting Standard Storage Class // Object for test case 4 - Standard StorageClass defined as Parity 6, MetaData in PutObject requesting Standard Storage Class
@ -564,7 +570,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
t.Fatalf("Failed to putObject %v", err) t.Fatalf("Failed to putObject %v", err)
} }
parts4, errs4 := readAllFileInfo(ctx, erasureDisks, bucket, object4, "") parts4, errs4 := readAllFileInfo(ctx, erasureDisks, bucket, object4, "", false)
parts4SC := storageclass.Config{ parts4SC := storageclass.Config{
Standard: storageclass.StorageClass{ Standard: storageclass.StorageClass{
Parity: 6, Parity: 6,
@ -587,7 +593,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
t.Fatalf("Failed to putObject %v", err) t.Fatalf("Failed to putObject %v", err)
} }
parts5, errs5 := readAllFileInfo(ctx, erasureDisks, bucket, object5, "") parts5, errs5 := readAllFileInfo(ctx, erasureDisks, bucket, object5, "", false)
parts5SC := storageclass.Config{ parts5SC := storageclass.Config{
RRS: storageclass.StorageClass{ RRS: storageclass.StorageClass{
Parity: 2, Parity: 2,
@ -609,7 +615,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
t.Fatalf("Failed to putObject %v", err) t.Fatalf("Failed to putObject %v", err)
} }
parts6, errs6 := readAllFileInfo(ctx, erasureDisks, bucket, object6, "") parts6, errs6 := readAllFileInfo(ctx, erasureDisks, bucket, object6, "", false)
parts6SC := storageclass.Config{ parts6SC := storageclass.Config{
RRS: storageclass.StorageClass{ RRS: storageclass.StorageClass{
Parity: 2, Parity: 2,
@ -632,7 +638,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
t.Fatalf("Failed to putObject %v", err) t.Fatalf("Failed to putObject %v", err)
} }
parts7, errs7 := readAllFileInfo(ctx, erasureDisks, bucket, object7, "") parts7, errs7 := readAllFileInfo(ctx, erasureDisks, bucket, object7, "", false)
parts7SC := storageclass.Config{ parts7SC := storageclass.Config{
Standard: storageclass.StorageClass{ Standard: storageclass.StorageClass{
Parity: 5, Parity: 5,

@ -388,7 +388,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
continue continue
} }
_, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "") _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "", false)
if err != nil { if err != nil {
time.Sleep(retryDelay) time.Sleep(retryDelay)
retries++ retries++
@ -397,7 +397,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
} }
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), ObjectOptions{}) fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), ObjectOptions{}, true)
if err != nil { if err != nil {
switch toObjectErr(err, minioMetaBucket, o.objectPath(0)).(type) { switch toObjectErr(err, minioMetaBucket, o.objectPath(0)).(type) {
case ObjectNotFound: case ObjectNotFound:
@ -463,7 +463,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
continue continue
} }
_, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "") _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "", false)
if err != nil { if err != nil {
time.Sleep(retryDelay) time.Sleep(retryDelay)
retries++ retries++
@ -471,7 +471,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
} }
} }
// Load first part metadata... // Load first part metadata...
fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{}) fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{}, true)
if err != nil { if err != nil {
time.Sleep(retryDelay) time.Sleep(retryDelay)
retries++ retries++

@ -252,11 +252,11 @@ func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi
return d.disk.DeleteVersion(ctx, volume, path, fi) return d.disk.DeleteVersion(ctx, volume, path, fi)
} }
func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) {
if err := d.calcError(); err != nil { if err := d.calcError(); err != nil {
return FileInfo{}, err return FileInfo{}, err
} }
return d.disk.ReadVersion(ctx, volume, path, versionID) return d.disk.ReadVersion(ctx, volume, path, versionID, readData)
} }
func (d *naughtyDisk) WriteAll(ctx context.Context, volume string, path string, b []byte) (err error) { func (d *naughtyDisk) WriteAll(ctx context.Context, volume string, path string, b []byte) (err error) {

@ -151,6 +151,8 @@ type FileInfo struct {
MarkDeleted bool // mark this version as deleted MarkDeleted bool // mark this version as deleted
DeleteMarkerReplicationStatus string DeleteMarkerReplicationStatus string
VersionPurgeStatus VersionPurgeStatusType VersionPurgeStatus VersionPurgeStatusType
Data []byte // optionally carries object data
} }
// VersionPurgeStatusKey denotes purge status in metadata // VersionPurgeStatusKey denotes purge status in metadata

@ -245,8 +245,8 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err) err = msgp.WrapError(err)
return return
} }
if zb0001 != 17 { if zb0001 != 18 {
err = msgp.ArrayError{Wanted: 17, Got: zb0001} err = msgp.ArrayError{Wanted: 18, Got: zb0001}
return return
} }
z.Volume, err = dc.ReadString() z.Volume, err = dc.ReadString()
@ -375,13 +375,18 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) {
} }
z.VersionPurgeStatus = VersionPurgeStatusType(zb0004) z.VersionPurgeStatus = VersionPurgeStatusType(zb0004)
} }
z.Data, err = dc.ReadBytes(z.Data)
if err != nil {
err = msgp.WrapError(err, "Data")
return
}
return return
} }
// EncodeMsg implements msgp.Encodable // EncodeMsg implements msgp.Encodable
func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) { func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) {
// array header, size 17 // array header, size 18
err = en.Append(0xdc, 0x0, 0x11) err = en.Append(0xdc, 0x0, 0x12)
if err != nil { if err != nil {
return return
} }
@ -489,14 +494,19 @@ func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "VersionPurgeStatus") err = msgp.WrapError(err, "VersionPurgeStatus")
return return
} }
err = en.WriteBytes(z.Data)
if err != nil {
err = msgp.WrapError(err, "Data")
return
}
return return
} }
// MarshalMsg implements msgp.Marshaler // MarshalMsg implements msgp.Marshaler
func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) { func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize()) o = msgp.Require(b, z.Msgsize())
// array header, size 17 // array header, size 18
o = append(o, 0xdc, 0x0, 0x11) o = append(o, 0xdc, 0x0, 0x12)
o = msgp.AppendString(o, z.Volume) o = msgp.AppendString(o, z.Volume)
o = msgp.AppendString(o, z.Name) o = msgp.AppendString(o, z.Name)
o = msgp.AppendString(o, z.VersionID) o = msgp.AppendString(o, z.VersionID)
@ -529,6 +539,7 @@ func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.AppendBool(o, z.MarkDeleted) o = msgp.AppendBool(o, z.MarkDeleted)
o = msgp.AppendString(o, z.DeleteMarkerReplicationStatus) o = msgp.AppendString(o, z.DeleteMarkerReplicationStatus)
o = msgp.AppendString(o, string(z.VersionPurgeStatus)) o = msgp.AppendString(o, string(z.VersionPurgeStatus))
o = msgp.AppendBytes(o, z.Data)
return return
} }
@ -540,8 +551,8 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err) err = msgp.WrapError(err)
return return
} }
if zb0001 != 17 { if zb0001 != 18 {
err = msgp.ArrayError{Wanted: 17, Got: zb0001} err = msgp.ArrayError{Wanted: 18, Got: zb0001}
return return
} }
z.Volume, bts, err = msgp.ReadStringBytes(bts) z.Volume, bts, err = msgp.ReadStringBytes(bts)
@ -670,6 +681,11 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
} }
z.VersionPurgeStatus = VersionPurgeStatusType(zb0004) z.VersionPurgeStatus = VersionPurgeStatusType(zb0004)
} }
z.Data, bts, err = msgp.ReadBytesBytes(bts, z.Data)
if err != nil {
err = msgp.WrapError(err, "Data")
return
}
o = bts o = bts
return return
} }
@ -687,7 +703,7 @@ func (z *FileInfo) Msgsize() (s int) {
for za0003 := range z.Parts { for za0003 := range z.Parts {
s += z.Parts[za0003].Msgsize() s += z.Parts[za0003].Msgsize()
} }
s += z.Erasure.Msgsize() + msgp.BoolSize + msgp.StringPrefixSize + len(z.DeleteMarkerReplicationStatus) + msgp.StringPrefixSize + len(string(z.VersionPurgeStatus)) s += z.Erasure.Msgsize() + msgp.BoolSize + msgp.StringPrefixSize + len(z.DeleteMarkerReplicationStatus) + msgp.StringPrefixSize + len(string(z.VersionPurgeStatus)) + msgp.BytesPrefixSize + len(z.Data)
return return
} }

@ -58,7 +58,7 @@ type StorageAPI interface {
DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error
DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error
WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error
ReadVersion(ctx context.Context, volume, path, versionID string) (FileInfo, error) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (FileInfo, error)
RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) error RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) error
// File operations. // File operations.

@ -36,6 +36,7 @@ import (
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/cmd/rest" "github.com/minio/minio/cmd/rest"
xnet "github.com/minio/minio/pkg/net" xnet "github.com/minio/minio/pkg/net"
xbufio "github.com/philhofer/fwd"
"github.com/tinylib/msgp/msgp" "github.com/tinylib/msgp/msgp"
) )
@ -226,8 +227,7 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, e
return info, err return info, err
} }
defer http.DrainBody(respBody) defer http.DrainBody(respBody)
err = msgp.Decode(respBody, &info) if err = msgp.Decode(respBody, &info); err != nil {
if err != nil {
return info, err return info, err
} }
if info.Error != "" { if info.Error != "" {
@ -398,18 +398,38 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP
return err return err
} }
func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { // where we keep old *Readers
var readMsgpReaderPool = sync.Pool{New: func() interface{} { return &msgp.Reader{} }}
// mspNewReader returns a *Reader that reads from the provided reader.
// The reader will be buffered.
func msgpNewReader(r io.Reader) *msgp.Reader {
p := readMsgpReaderPool.Get().(*msgp.Reader)
if p.R == nil {
p.R = xbufio.NewReaderSize(r, 8<<10)
} else {
p.R.Reset(r)
}
return p
}
func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) {
values := make(url.Values) values := make(url.Values)
values.Set(storageRESTVolume, volume) values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path) values.Set(storageRESTFilePath, path)
values.Set(storageRESTVersionID, versionID) values.Set(storageRESTVersionID, versionID)
values.Set(storageRESTReadData, strconv.FormatBool(readData))
respBody, err := client.call(ctx, storageRESTMethodReadVersion, values, nil, -1) respBody, err := client.call(ctx, storageRESTMethodReadVersion, values, nil, -1)
if err != nil { if err != nil {
return fi, err return fi, err
} }
defer http.DrainBody(respBody) defer http.DrainBody(respBody)
err = msgp.Decode(respBody, &fi)
dec := msgpNewReader(respBody)
defer readMsgpReaderPool.Put(dec)
err = fi.DecodeMsg(dec)
return fi, err return fi, err
} }
@ -479,10 +499,12 @@ func (client *storageRESTClient) WalkVersions(ctx context.Context, volume, dirPa
defer close(ch) defer close(ch)
defer http.DrainBody(respBody) defer http.DrainBody(respBody)
decoder := msgp.NewReader(respBody) dec := msgpNewReader(respBody)
defer readMsgpReaderPool.Put(dec)
for { for {
var fi FileInfoVersions var fi FileInfoVersions
if gerr := fi.DecodeMsg(decoder); gerr != nil { if gerr := fi.DecodeMsg(dec); gerr != nil {
// Upon error return // Upon error return
if msgp.Cause(gerr) != io.EOF { if msgp.Cause(gerr) != io.EOF {
logger.LogIf(GlobalContext, gerr) logger.LogIf(GlobalContext, gerr)

@ -17,7 +17,7 @@
package cmd package cmd
const ( const (
storageRESTVersion = "v23" // Add small file optimization storageRESTVersion = "v24" // Add more small file optimization
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
storageRESTPrefix = minioReservedBucketPath + "/storage" storageRESTPrefix = minioReservedBucketPath + "/storage"
) )
@ -59,6 +59,7 @@ const (
storageRESTDirPath = "dir-path" storageRESTDirPath = "dir-path"
storageRESTFilePath = "file-path" storageRESTFilePath = "file-path"
storageRESTVersionID = "version-id" storageRESTVersionID = "version-id"
storageRESTReadData = "read-data"
storageRESTTotalVersions = "total-versions" storageRESTTotalVersions = "total-versions"
storageRESTSrcVolume = "source-volume" storageRESTSrcVolume = "source-volume"
storageRESTSrcPath = "source-path" storageRESTSrcPath = "source-path"

@ -327,12 +327,22 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re
volume := vars[storageRESTVolume] volume := vars[storageRESTVolume]
filePath := vars[storageRESTFilePath] filePath := vars[storageRESTFilePath]
versionID := vars[storageRESTVersionID] versionID := vars[storageRESTVersionID]
fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID) readData, err := strconv.ParseBool(vars[storageRESTReadData])
if err != nil { if err != nil {
s.writeErrorResponse(w, err) s.writeErrorResponse(w, err)
return return
} }
logger.LogIf(r.Context(), msgp.Encode(w, &fi)) fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID, readData)
if err != nil {
s.writeErrorResponse(w, err)
return
}
bufp := s.storage.rpool.Get().(*[]byte)
defer s.storage.rpool.Put(bufp)
enc := msgp.NewWriterBuf(w, *bufp)
logger.LogIf(r.Context(), fi.EncodeMsg(enc))
enc.Flush()
} }
// WriteMetadata write new updated metadata. // WriteMetadata write new updated metadata.
@ -1033,7 +1043,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersion).HandlerFunc(httpTraceHdrs(server.DeleteVersionHandler)). subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersion).HandlerFunc(httpTraceHdrs(server.DeleteVersionHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(httpTraceHdrs(server.ReadVersionHandler)). subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(httpTraceHdrs(server.ReadVersionHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID)...) Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID, storageRESTReadData)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameData).HandlerFunc(httpTraceHdrs(server.RenameDataHandler)). subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameData).HandlerFunc(httpTraceHdrs(server.RenameDataHandler)).
Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDataDir, Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDataDir,
storageRESTDstVolume, storageRESTDstPath)...) storageRESTDstVolume, storageRESTDstPath)...)

@ -272,12 +272,12 @@ func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path s
return p.storage.WriteMetadata(ctx, volume, path, fi) return p.storage.WriteMetadata(ctx, volume, path, fi)
} }
func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) {
if err = p.checkDiskStale(); err != nil { if err = p.checkDiskStale(); err != nil {
return fi, err return fi, err
} }
return p.storage.ReadVersion(ctx, volume, path, versionID) return p.storage.ReadVersion(ctx, volume, path, versionID, readData)
} }
func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) { func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) {

@ -65,6 +65,10 @@ const (
// Size of each buffer. // Size of each buffer.
readAheadBufSize = 1 << 20 readAheadBufSize = 1 << 20
// Small file threshold below which data accompanies metadata
// from storage layer.
smallFileThreshold = 32 * humanize.KiByte
// XL metadata file carries per object metadata. // XL metadata file carries per object metadata.
xlStorageFormatFile = "xl.meta" xlStorageFormatFile = "xl.meta"
) )
@ -1082,7 +1086,14 @@ func (s *xlStorage) renameLegacyMetadata(volume, path string) error {
} }
// ReadVersion - reads metadata and returns FileInfo at path `xl.meta` // ReadVersion - reads metadata and returns FileInfo at path `xl.meta`
func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { // for all objects less than `32KiB` this call returns data as well
// along with metadata.
func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) {
volumeDir, err := s.getVolDir(volume)
if err != nil {
return fi, err
}
buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile)) buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
if err != nil { if err != nil {
if err == errFileNotFound { if err == errFileNotFound {
@ -1117,7 +1128,65 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
return fi, errFileNotFound return fi, errFileNotFound
} }
return getFileInfo(buf, volume, path, versionID) fi, err = getFileInfo(buf, volume, path, versionID)
if err != nil {
return fi, err
}
if readData {
// Reading data for small objects when
// - object has not yet transitioned
// - object size lesser than 32KiB
// - object has maximum of 1 parts
if fi.TransitionStatus == "" && fi.DataDir != "" && fi.Size <= smallFileThreshold && len(fi.Parts) == 1 {
fi.Data, err = s.readAllData(volumeDir, pathJoin(volumeDir, path, fi.DataDir, fmt.Sprintf("part.%d", fi.Parts[0].Number)))
if err != nil {
return FileInfo{}, err
}
}
}
return fi, nil
}
func (s *xlStorage) readAllData(volumeDir, filePath string) (buf []byte, err error) {
var f *os.File
if globalStorageClass.GetDMA() == storageclass.DMAReadWrite {
f, err = disk.OpenFileDirectIO(filePath, os.O_RDONLY, 0666)
} else {
f, err = os.Open(filePath)
}
if err != nil {
if osIsNotExist(err) {
// Check if the object doesn't exist because its bucket
// is missing in order to return the correct error.
_, err = os.Stat(volumeDir)
if err != nil && osIsNotExist(err) {
return nil, errVolumeNotFound
}
return nil, errFileNotFound
} else if osIsPermission(err) {
return nil, errFileAccessDenied
} else if isSysErrNotDir(err) || isSysErrIsDir(err) {
return nil, errFileNotFound
} else if isSysErrHandleInvalid(err) {
// This case is special and needs to be handled for windows.
return nil, errFileNotFound
} else if isSysErrIO(err) {
return nil, errFaultyDisk
} else if isSysErrTooManyFiles(err) {
return nil, errTooManyOpenFiles
} else if isSysErrInvalidArg(err) {
return nil, errFileNotFound
}
return nil, err
}
atomic.AddInt32(&s.activeIOCount, 1)
rd := &odirectReader{f, nil, nil, true, s, nil}
defer rd.Close()
return ioutil.ReadAll(rd)
} }
// ReadAll reads from r until an error or EOF and returns the data it read. // ReadAll reads from r until an error or EOF and returns the data it read.
@ -1405,35 +1474,13 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off
return nil, err return nil, err
} }
var file *os.File
if offset == 0 && globalStorageClass.GetDMA() == storageclass.DMAReadWrite { if offset == 0 && globalStorageClass.GetDMA() == storageclass.DMAReadWrite {
file, err := disk.OpenFileDirectIO(filePath, os.O_RDONLY, 0666) file, err = disk.OpenFileDirectIO(filePath, os.O_RDONLY, 0666)
if err != nil { } else {
switch { // Open the fileile fileor reading.
case osIsNotExist(err): file, err = os.Open(filePath)
_, err = os.Stat(volumeDir)
if err != nil && osIsNotExist(err) {
return nil, errVolumeNotFound
}
return nil, errFileNotFound
case osIsPermission(err):
return nil, errFileAccessDenied
case isSysErrNotDir(err):
return nil, errFileAccessDenied
case isSysErrIO(err):
return nil, errFaultyDisk
case isSysErrTooManyFiles(err):
return nil, errTooManyOpenFiles
default:
return nil, err
}
}
atomic.AddInt32(&s.activeIOCount, 1)
return &odirectReader{file, nil, nil, true, s, nil}, nil
} }
// Open the file for reading.
file, err := os.Open(filePath)
if err != nil { if err != nil {
switch { switch {
case osIsNotExist(err): case osIsNotExist(err):
@ -1466,11 +1513,17 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off
return nil, errIsNotRegular return nil, errIsNotRegular
} }
atomic.AddInt32(&s.activeIOCount, 1)
if offset == 0 && globalStorageClass.GetDMA() == storageclass.DMAReadWrite {
return &odirectReader{file, nil, nil, true, s, nil}, nil
}
if offset > 0 {
if _, err = file.Seek(offset, io.SeekStart); err != nil { if _, err = file.Seek(offset, io.SeekStart); err != nil {
return nil, err return nil, err
} }
}
atomic.AddInt32(&s.activeIOCount, 1)
r := struct { r := struct {
io.Reader io.Reader
io.Closer io.Closer
@ -1517,6 +1570,7 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz
if err != nil { if err != nil {
return err return err
} }
// Stat a volume entry. // Stat a volume entry.
_, err = os.Stat(volumeDir) _, err = os.Stat(volumeDir)
if err != nil { if err != nil {

@ -62,11 +62,10 @@ require (
github.com/ncw/directio v1.0.5 github.com/ncw/directio v1.0.5
github.com/nsqio/go-nsq v1.0.8 github.com/nsqio/go-nsq v1.0.8
github.com/olivere/elastic/v7 v7.0.22 github.com/olivere/elastic/v7 v7.0.22
github.com/philhofer/fwd v1.1.1 // indirect github.com/philhofer/fwd v1.1.1
github.com/pierrec/lz4 v2.5.2+incompatible github.com/pierrec/lz4 v2.5.2+incompatible
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.8.0 github.com/prometheus/client_golang v1.8.0
github.com/quasilyte/go-ruleguard/dsl/fluent v0.0.0-20201222093424-5d7e62a465d3 // indirect
github.com/rjeczalik/notify v0.9.2 github.com/rjeczalik/notify v0.9.2
github.com/rs/cors v1.7.0 github.com/rs/cors v1.7.0
github.com/secure-io/sio-go v0.3.0 github.com/secure-io/sio-go v0.3.0

@ -539,10 +539,6 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.2.0 h1:wH4vA7pcjKuZzjF7lM8awk4fnuJO6idemZXoKnULUx4= github.com/prometheus/procfs v0.2.0 h1:wH4vA7pcjKuZzjF7lM8awk4fnuJO6idemZXoKnULUx4=
github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/quasilyte/go-ruleguard v0.2.1 h1:56eRm0daAyny9UhJnmtJW/UyLZQusukBAB8oT8AHKHo=
github.com/quasilyte/go-ruleguard/dsl v0.0.0-20201222101508-986133edf04e h1:gXFs5pU/5pxy0nw9QoV2dAhGXI+jKSN0GJEL8TMKf4A=
github.com/quasilyte/go-ruleguard/dsl/fluent v0.0.0-20201222093424-5d7e62a465d3 h1:eL7x4/zMnlquMxYe7V078BD7MGskZ0daGln+SJCVzuY=
github.com/quasilyte/go-ruleguard/dsl/fluent v0.0.0-20201222093424-5d7e62a465d3/go.mod h1:P7JlQWFT7jDcFZMtUPQbtGzzzxva3rBn6oIF+LPwFcM=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ= github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=

Loading…
Cancel
Save