From f21d650ed4b5f65bc8d1cd2820631ae6052b6be6 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 7 Jan 2021 19:27:31 -0800 Subject: [PATCH] fix: readData in bulk call using messagepack byte wrappers (#11228) This PR refactors the way we use buffers for O_DIRECT and to re-use those buffers for messagepack reader writer. After some extensive benchmarking found that not all objects have this benefit, and only objects smaller than 64KiB see this benefit overall. Benefits are seen from almost all objects from 1KiB - 32KiB Beyond this no objects see benefit with bulk call approach as the latency of bytes sent over the wire v/s streaming content directly from disk negate each other with no remarkable benefits. All other optimizations include reuse of msgp.Reader, msgp.Writer using sync.Pool's for all internode calls. --- cmd/bitrot-streaming.go | 10 ++- cmd/bitrot.go | 4 +- cmd/bitrot_test.go | 2 +- cmd/erasure-decode_test.go | 8 +- cmd/erasure-heal_test.go | 2 +- cmd/erasure-healing-common_test.go | 6 +- cmd/erasure-healing.go | 4 +- cmd/erasure-healing_test.go | 12 +-- cmd/erasure-metadata-utils.go | 4 +- cmd/erasure-multipart.go | 20 ++--- cmd/erasure-object.go | 19 ++--- cmd/erasure-object_test.go | 26 ++++--- cmd/metacache-set.go | 8 +- cmd/naughty-disk_test.go | 4 +- cmd/storage-datatypes.go | 2 + cmd/storage-datatypes_gen.go | 34 ++++++--- cmd/storage-interface.go | 2 +- cmd/storage-rest-client.go | 34 +++++++-- cmd/storage-rest-common.go | 3 +- cmd/storage-rest-server.go | 16 +++- cmd/xl-storage-disk-id-check.go | 4 +- cmd/xl-storage.go | 118 +++++++++++++++++++++-------- go.mod | 3 +- go.sum | 4 - 24 files changed, 231 insertions(+), 118 deletions(-) diff --git a/cmd/bitrot-streaming.go b/cmd/bitrot-streaming.go index c05b53fa2..d3453d405 100644 --- a/cmd/bitrot-streaming.go +++ b/cmd/bitrot-streaming.go @@ -91,6 +91,7 @@ func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length i // ReadAt() implementation which verifies the bitrot hash available as part of the stream. type streamingBitrotReader struct { disk StorageAPI + data []byte rc io.Reader volume string filePath string @@ -122,7 +123,11 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) { // For the first ReadAt() call we need to open the stream for reading. b.currOffset = offset streamOffset := (offset/b.shardSize)*int64(b.h.Size()) + offset - b.rc, err = b.disk.ReadFileStream(context.TODO(), b.volume, b.filePath, streamOffset, b.tillOffset-streamOffset) + if len(b.data) == 0 { + b.rc, err = b.disk.ReadFileStream(context.TODO(), b.volume, b.filePath, streamOffset, b.tillOffset-streamOffset) + } else { + b.rc = io.NewSectionReader(bytes.NewReader(b.data), streamOffset, b.tillOffset-streamOffset) + } if err != nil { return 0, err } @@ -154,10 +159,11 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) { } // Returns streaming bitrot reader implementation. -func newStreamingBitrotReader(disk StorageAPI, volume, filePath string, tillOffset int64, algo BitrotAlgorithm, shardSize int64) *streamingBitrotReader { +func newStreamingBitrotReader(disk StorageAPI, data []byte, volume, filePath string, tillOffset int64, algo BitrotAlgorithm, shardSize int64) *streamingBitrotReader { h := algo.New() return &streamingBitrotReader{ disk, + data, nil, volume, filePath, diff --git a/cmd/bitrot.go b/cmd/bitrot.go index cddc17a96..24f7ecd39 100644 --- a/cmd/bitrot.go +++ b/cmd/bitrot.go @@ -103,9 +103,9 @@ func newBitrotWriter(disk StorageAPI, volume, filePath string, length int64, alg return newWholeBitrotWriter(disk, volume, filePath, algo, shardSize) } -func newBitrotReader(disk StorageAPI, bucket string, filePath string, tillOffset int64, algo BitrotAlgorithm, sum []byte, shardSize int64) io.ReaderAt { +func newBitrotReader(disk StorageAPI, data []byte, bucket string, filePath string, tillOffset int64, algo BitrotAlgorithm, sum []byte, shardSize int64) io.ReaderAt { if algo == HighwayHash256S { - return newStreamingBitrotReader(disk, bucket, filePath, tillOffset, algo, shardSize) + return newStreamingBitrotReader(disk, data, bucket, filePath, tillOffset, algo, shardSize) } return newWholeBitrotReader(disk, bucket, filePath, algo, tillOffset, sum) } diff --git a/cmd/bitrot_test.go b/cmd/bitrot_test.go index 0990c4919..17cfc3813 100644 --- a/cmd/bitrot_test.go +++ b/cmd/bitrot_test.go @@ -62,7 +62,7 @@ func testBitrotReaderWriterAlgo(t *testing.T, bitrotAlgo BitrotAlgorithm) { } writer.(io.Closer).Close() - reader := newBitrotReader(disk, volume, filePath, 35, bitrotAlgo, bitrotWriterSum(writer), 10) + reader := newBitrotReader(disk, nil, volume, filePath, 35, bitrotAlgo, bitrotWriterSum(writer), 10) b := make([]byte, 10) if _, err = reader.ReadAt(b, 0); err != nil { log.Fatal(err) diff --git a/cmd/erasure-decode_test.go b/cmd/erasure-decode_test.go index 7797ccced..d622a8b3d 100644 --- a/cmd/erasure-decode_test.go +++ b/cmd/erasure-decode_test.go @@ -134,7 +134,7 @@ func TestErasureDecode(t *testing.T) { } tillOffset := erasure.ShardFileOffset(test.offset, test.length, test.data) - bitrotReaders[index] = newBitrotReader(disk, "testbucket", "object", tillOffset, writeAlgorithm, bitrotWriterSum(writers[index]), erasure.ShardSize()) + bitrotReaders[index] = newBitrotReader(disk, nil, "testbucket", "object", tillOffset, writeAlgorithm, bitrotWriterSum(writers[index]), erasure.ShardSize()) } writer := bytes.NewBuffer(nil) @@ -164,7 +164,7 @@ func TestErasureDecode(t *testing.T) { continue } tillOffset := erasure.ShardFileOffset(test.offset, test.length, test.data) - bitrotReaders[index] = newBitrotReader(disk, "testbucket", "object", tillOffset, writeAlgorithm, bitrotWriterSum(writers[index]), erasure.ShardSize()) + bitrotReaders[index] = newBitrotReader(disk, nil, "testbucket", "object", tillOffset, writeAlgorithm, bitrotWriterSum(writers[index]), erasure.ShardSize()) } for j := range disks[:test.offDisks] { if bitrotReaders[j] == nil { @@ -270,7 +270,7 @@ func TestErasureDecodeRandomOffsetLength(t *testing.T) { continue } tillOffset := erasure.ShardFileOffset(offset, readLen, length) - bitrotReaders[index] = newStreamingBitrotReader(disk, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) + bitrotReaders[index] = newStreamingBitrotReader(disk, nil, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) } err = erasure.Decode(context.Background(), buf, bitrotReaders, offset, readLen, length, nil) closeBitrotReaders(bitrotReaders) @@ -332,7 +332,7 @@ func benchmarkErasureDecode(data, parity, dataDown, parityDown int, size int64, continue } tillOffset := erasure.ShardFileOffset(0, size, size) - bitrotReaders[index] = newStreamingBitrotReader(disk, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) + bitrotReaders[index] = newStreamingBitrotReader(disk, nil, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) } if err = erasure.Decode(context.Background(), bytes.NewBuffer(content[:0]), bitrotReaders, 0, size, size, nil); err != nil { panic(err) diff --git a/cmd/erasure-heal_test.go b/cmd/erasure-heal_test.go index 4d1fff044..94d2905e4 100644 --- a/cmd/erasure-heal_test.go +++ b/cmd/erasure-heal_test.go @@ -99,7 +99,7 @@ func TestErasureHeal(t *testing.T) { readers := make([]io.ReaderAt, len(disks)) for i, disk := range disks { shardFilesize := erasure.ShardFileSize(test.size) - readers[i] = newBitrotReader(disk, "testbucket", "testobject", shardFilesize, test.algorithm, bitrotWriterSum(writers[i]), erasure.ShardSize()) + readers[i] = newBitrotReader(disk, nil, "testbucket", "testobject", shardFilesize, test.algorithm, bitrotWriterSum(writers[i]), erasure.ShardSize()) } // setup stale disks for the test case diff --git a/cmd/erasure-healing-common_test.go b/cmd/erasure-healing-common_test.go index 99e912a18..6c9f81c3a 100644 --- a/cmd/erasure-healing-common_test.go +++ b/cmd/erasure-healing-common_test.go @@ -186,7 +186,7 @@ func TestListOnlineDisks(t *testing.T) { t.Fatalf("Failed to putObject %v", err) } - partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "") + partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) fi, err := getLatestFileInfo(ctx, partsMetadata, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo %v", err) @@ -287,7 +287,7 @@ func TestDisksWithAllParts(t *testing.T) { t.Fatalf("Failed to putObject %v", err) } - _, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "") + _, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) readQuorum := len(erasureDisks) / 2 if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { t.Fatalf("Failed to read xl meta data %v", reducedErr) @@ -295,7 +295,7 @@ func TestDisksWithAllParts(t *testing.T) { // Test that all disks are returned without any failures with // unmodified meta data - partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "") + partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) if err != nil { t.Fatalf("Failed to read xl meta data %v", err) } diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index b01573f7c..69f06baf6 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -395,7 +395,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s if latestMeta.XLV1 { partPath = pathJoin(object, fmt.Sprintf("part.%d", partNumber)) } - readers[i] = newBitrotReader(disk, bucket, partPath, tillOffset, checksumAlgo, checksumInfo.Hash, erasure.ShardSize()) + readers[i] = newBitrotReader(disk, nil, bucket, partPath, tillOffset, checksumAlgo, checksumInfo.Hash, erasure.ShardSize()) } writers := make([]io.Writer, len(outDatedDisks)) for i, disk := range outDatedDisks { @@ -811,7 +811,7 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version storageEndpoints := er.getEndpoints() // Read metadata files from all the disks - partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID) + partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false) if isAllNotFound(errs) { err = toObjectErr(errFileNotFound, bucket, object) diff --git a/cmd/erasure-healing_test.go b/cmd/erasure-healing_test.go index d8f0a8ce0..3d9671440 100644 --- a/cmd/erasure-healing_test.go +++ b/cmd/erasure-healing_test.go @@ -67,7 +67,7 @@ func TestHealing(t *testing.T) { } disk := er.getDisks()[0] - fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "") + fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false) if err != nil { t.Fatal(err) } @@ -84,7 +84,7 @@ func TestHealing(t *testing.T) { t.Fatal(err) } - fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "") + fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false) if err != nil { t.Fatal(err) } @@ -113,7 +113,7 @@ func TestHealing(t *testing.T) { t.Fatal(err) } - fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "") + fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", false) if err != nil { t.Fatal(err) } @@ -214,7 +214,7 @@ func TestHealObjectCorrupted(t *testing.T) { t.Fatalf("Failed to heal object - %v", err) } - fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "") + fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) fi, err := getLatestFileInfo(ctx, fileInfos, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) @@ -239,7 +239,7 @@ func TestHealObjectCorrupted(t *testing.T) { t.Errorf("Expected nil but received %v", err) } - fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "") + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) nfi, err := getLatestFileInfo(ctx, fileInfos, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) @@ -265,7 +265,7 @@ func TestHealObjectCorrupted(t *testing.T) { t.Errorf("Expected nil but received %v", err) } - fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "") + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) nfi, err = getLatestFileInfo(ctx, fileInfos, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) diff --git a/cmd/erasure-metadata-utils.go b/cmd/erasure-metadata-utils.go index db4aa9186..4e33d04f9 100644 --- a/cmd/erasure-metadata-utils.go +++ b/cmd/erasure-metadata-utils.go @@ -115,7 +115,7 @@ func hashOrder(key string, cardinality int) []int { // Reads all `xl.meta` metadata as a FileInfo slice. // Returns error slice indicating the failed metadata reads. -func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string) ([]FileInfo, []error) { +func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string, readData bool) ([]FileInfo, []error) { metadataArray := make([]FileInfo, len(disks)) g := errgroup.WithNErrs(len(disks)) @@ -126,7 +126,7 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve if disks[index] == nil { return errDiskNotFound } - metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID) + metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID, readData) if err != nil { if !IsErr(err, errFileNotFound, errVolumeNotFound, errFileVersionNotFound, errDiskNotFound) { logger.LogOnceIf(ctx, err, disks[index].String()) diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index a4fb8b215..d413041cf 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -46,7 +46,7 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object disks := er.getDisks() // Read metadata associated with the object from all disks. - metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), "") + metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), "", false) readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs) if err != nil { @@ -113,7 +113,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto } for _, uploadIDDir := range uploadIDDirs { uploadIDPath := pathJoin(shaDir, uploadIDDir) - fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "") + fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "", false) if err != nil { continue } @@ -127,7 +127,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto return } for _, tmpDir := range tmpDirs { - fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "") + fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "", false) if err != nil { continue } @@ -181,7 +181,7 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec if populatedUploadIds.Contains(uploadID) { continue } - fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "") + fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "", false) if err != nil { return result, toObjectErr(err, bucket, object) } @@ -371,7 +371,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo // Read metadata associated with the object from all disks. partsMetadata, errs = readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, - uploadIDPath, "") + uploadIDPath, "", false) // get Quorum for this object _, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs) @@ -474,7 +474,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo } // Read metadata again because it might be updated with parallel upload of another part. - partsMetadata, errs = readAllFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "") + partsMetadata, errs = readAllFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "", false) reducedErr = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) if reducedErr == errErasureWriteQuorum { return pi, toObjectErr(reducedErr, bucket, object) @@ -552,7 +552,7 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u storageDisks := er.getDisks() // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, opts.VersionID) + partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, opts.VersionID, false) // get Quorum for this object readQuorum, _, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs) @@ -600,7 +600,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up storageDisks := er.getDisks() // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "") + partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) // get Quorum for this object _, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs) @@ -704,7 +704,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str storageDisks := er.getDisks() // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "") + partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) // get Quorum for this object _, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs) @@ -889,7 +889,7 @@ func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, objec uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllFileInfo(ctx, er.getDisks(), minioMetaMultipartBucket, uploadIDPath, "") + partsMetadata, errs := readAllFileInfo(ctx, er.getDisks(), minioMetaMultipartBucket, uploadIDPath, "", false) // get Quorum for this object _, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs) diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index f413e4354..2db07f73e 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -58,7 +58,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d // Read metadata associated with the object from all disks. storageDisks := er.getDisks() - metaArr, errs := readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID) + metaArr, errs := readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID, false) // get Quorum for this object readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs) @@ -157,7 +157,7 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri unlockOnDefer = true } - fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts) + fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true) if err != nil { return nil, toObjectErr(err, bucket, object) } @@ -298,7 +298,8 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje } checksumInfo := metaArr[index].Erasure.GetChecksumInfo(partNumber) partPath := pathJoin(object, metaArr[index].DataDir, fmt.Sprintf("part.%d", partNumber)) - readers[index] = newBitrotReader(disk, bucket, partPath, tillOffset, + data := metaArr[index].Data + readers[index] = newBitrotReader(disk, data, bucket, partPath, tillOffset, checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize()) // Prefer local disks @@ -337,7 +338,7 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje // getObject wrapper for erasure GetObject func (er erasureObjects) getObject(ctx context.Context, bucket, object string, startOffset, length int64, writer io.Writer, opts ObjectOptions) error { - fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts) + fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true) if err != nil { return toObjectErr(err, bucket, object) } @@ -364,11 +365,11 @@ func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object strin return er.getObjectInfo(ctx, bucket, object, opts) } -func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) { +func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions, readData bool) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) { disks := er.getDisks() // Read metadata associated with the object from all disks. - metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID) + metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, readData) readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs) if err != nil { @@ -410,7 +411,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s // getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo. func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { - fi, _, _, err := er.getObjectFileInfo(ctx, bucket, object, opts) + fi, _, _, err := er.getObjectFileInfo(ctx, bucket, object, opts, false) if err != nil { return objInfo, toObjectErr(err, bucket, object) } @@ -1073,7 +1074,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin disks := er.getDisks() // Read metadata associated with the object from all disks. - metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID) + metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false) readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs) if err != nil { @@ -1134,7 +1135,7 @@ func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object st disks := er.getDisks() // Read metadata associated with the object from all disks. - metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID) + metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false) readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs) if err != nil { diff --git a/cmd/erasure-object_test.go b/cmd/erasure-object_test.go index a224489df..c89a4daf0 100644 --- a/cmd/erasure-object_test.go +++ b/cmd/erasure-object_test.go @@ -19,7 +19,9 @@ package cmd import ( "bytes" "context" + crand "crypto/rand" "errors" + "io" "io/ioutil" "os" "testing" @@ -288,9 +290,13 @@ func TestGetObjectNoQuorum(t *testing.T) { bucket := "bucket" object := "object" opts := ObjectOptions{} + buf := make([]byte, 33<<10) + if _, err = io.ReadFull(crand.Reader, buf); err != nil { + t.Fatal(err) + } // Test use case 1: All disks are online, xl.meta are present, but data are missing - _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts) + _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(buf), int64(len(buf)), "", ""), opts) if err != nil { t.Fatal(err) } @@ -304,7 +310,7 @@ func TestGetObjectNoQuorum(t *testing.T) { } } - err = xl.GetObject(ctx, bucket, object, 0, int64(len("abcd")), ioutil.Discard, "", opts) + err = xl.GetObject(ctx, bucket, object, 0, int64(len(buf)), ioutil.Discard, "", opts) if err != toObjectErr(errFileNotFound, bucket, object) { t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err) } @@ -315,7 +321,7 @@ func TestGetObjectNoQuorum(t *testing.T) { // invocations, where f - [0,2) // Create "object" under "bucket". - _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts) + _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(buf), int64(len(buf)), "", ""), opts) if err != nil { t.Fatal(err) } @@ -521,7 +527,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts1, errs1 := readAllFileInfo(ctx, erasureDisks, bucket, object1, "") + parts1, errs1 := readAllFileInfo(ctx, erasureDisks, bucket, object1, "", false) parts1SC := globalStorageClass @@ -534,7 +540,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts2, errs2 := readAllFileInfo(ctx, erasureDisks, bucket, object2, "") + parts2, errs2 := readAllFileInfo(ctx, erasureDisks, bucket, object2, "", false) parts2SC := globalStorageClass // Object for test case 3 - No StorageClass defined, MetaData in PutObject requesting Standard Storage Class @@ -546,7 +552,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts3, errs3 := readAllFileInfo(ctx, erasureDisks, bucket, object3, "") + parts3, errs3 := readAllFileInfo(ctx, erasureDisks, bucket, object3, "", false) parts3SC := globalStorageClass // Object for test case 4 - Standard StorageClass defined as Parity 6, MetaData in PutObject requesting Standard Storage Class @@ -564,7 +570,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts4, errs4 := readAllFileInfo(ctx, erasureDisks, bucket, object4, "") + parts4, errs4 := readAllFileInfo(ctx, erasureDisks, bucket, object4, "", false) parts4SC := storageclass.Config{ Standard: storageclass.StorageClass{ Parity: 6, @@ -587,7 +593,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts5, errs5 := readAllFileInfo(ctx, erasureDisks, bucket, object5, "") + parts5, errs5 := readAllFileInfo(ctx, erasureDisks, bucket, object5, "", false) parts5SC := storageclass.Config{ RRS: storageclass.StorageClass{ Parity: 2, @@ -609,7 +615,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts6, errs6 := readAllFileInfo(ctx, erasureDisks, bucket, object6, "") + parts6, errs6 := readAllFileInfo(ctx, erasureDisks, bucket, object6, "", false) parts6SC := storageclass.Config{ RRS: storageclass.StorageClass{ Parity: 2, @@ -632,7 +638,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts7, errs7 := readAllFileInfo(ctx, erasureDisks, bucket, object7, "") + parts7, errs7 := readAllFileInfo(ctx, erasureDisks, bucket, object7, "", false) parts7SC := storageclass.Config{ Standard: storageclass.StorageClass{ Parity: 5, diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index 0a4644f28..bb05b4e78 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -388,7 +388,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt continue } - _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "") + _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "", false) if err != nil { time.Sleep(retryDelay) retries++ @@ -397,7 +397,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt } // Read metadata associated with the object from all disks. - fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), ObjectOptions{}) + fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), ObjectOptions{}, true) if err != nil { switch toObjectErr(err, minioMetaBucket, o.objectPath(0)).(type) { case ObjectNotFound: @@ -463,7 +463,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt continue } - _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "") + _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "", false) if err != nil { time.Sleep(retryDelay) retries++ @@ -471,7 +471,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt } } // Load first part metadata... - fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{}) + fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{}, true) if err != nil { time.Sleep(retryDelay) retries++ diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 3d7d913fc..df3a1e2f3 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -252,11 +252,11 @@ func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi return d.disk.DeleteVersion(ctx, volume, path, fi) } -func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { +func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { if err := d.calcError(); err != nil { return FileInfo{}, err } - return d.disk.ReadVersion(ctx, volume, path, versionID) + return d.disk.ReadVersion(ctx, volume, path, versionID, readData) } func (d *naughtyDisk) WriteAll(ctx context.Context, volume string, path string, b []byte) (err error) { diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index f77da8565..5cce92a33 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -151,6 +151,8 @@ type FileInfo struct { MarkDeleted bool // mark this version as deleted DeleteMarkerReplicationStatus string VersionPurgeStatus VersionPurgeStatusType + + Data []byte // optionally carries object data } // VersionPurgeStatusKey denotes purge status in metadata diff --git a/cmd/storage-datatypes_gen.go b/cmd/storage-datatypes_gen.go index aa5196da0..4dd1e686a 100644 --- a/cmd/storage-datatypes_gen.go +++ b/cmd/storage-datatypes_gen.go @@ -245,8 +245,8 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err) return } - if zb0001 != 17 { - err = msgp.ArrayError{Wanted: 17, Got: zb0001} + if zb0001 != 18 { + err = msgp.ArrayError{Wanted: 18, Got: zb0001} return } z.Volume, err = dc.ReadString() @@ -375,13 +375,18 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) { } z.VersionPurgeStatus = VersionPurgeStatusType(zb0004) } + z.Data, err = dc.ReadBytes(z.Data) + if err != nil { + err = msgp.WrapError(err, "Data") + return + } return } // EncodeMsg implements msgp.Encodable func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) { - // array header, size 17 - err = en.Append(0xdc, 0x0, 0x11) + // array header, size 18 + err = en.Append(0xdc, 0x0, 0x12) if err != nil { return } @@ -489,14 +494,19 @@ func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "VersionPurgeStatus") return } + err = en.WriteBytes(z.Data) + if err != nil { + err = msgp.WrapError(err, "Data") + return + } return } // MarshalMsg implements msgp.Marshaler func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // array header, size 17 - o = append(o, 0xdc, 0x0, 0x11) + // array header, size 18 + o = append(o, 0xdc, 0x0, 0x12) o = msgp.AppendString(o, z.Volume) o = msgp.AppendString(o, z.Name) o = msgp.AppendString(o, z.VersionID) @@ -529,6 +539,7 @@ func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.AppendBool(o, z.MarkDeleted) o = msgp.AppendString(o, z.DeleteMarkerReplicationStatus) o = msgp.AppendString(o, string(z.VersionPurgeStatus)) + o = msgp.AppendBytes(o, z.Data) return } @@ -540,8 +551,8 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err) return } - if zb0001 != 17 { - err = msgp.ArrayError{Wanted: 17, Got: zb0001} + if zb0001 != 18 { + err = msgp.ArrayError{Wanted: 18, Got: zb0001} return } z.Volume, bts, err = msgp.ReadStringBytes(bts) @@ -670,6 +681,11 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { } z.VersionPurgeStatus = VersionPurgeStatusType(zb0004) } + z.Data, bts, err = msgp.ReadBytesBytes(bts, z.Data) + if err != nil { + err = msgp.WrapError(err, "Data") + return + } o = bts return } @@ -687,7 +703,7 @@ func (z *FileInfo) Msgsize() (s int) { for za0003 := range z.Parts { s += z.Parts[za0003].Msgsize() } - s += z.Erasure.Msgsize() + msgp.BoolSize + msgp.StringPrefixSize + len(z.DeleteMarkerReplicationStatus) + msgp.StringPrefixSize + len(string(z.VersionPurgeStatus)) + s += z.Erasure.Msgsize() + msgp.BoolSize + msgp.StringPrefixSize + len(z.DeleteMarkerReplicationStatus) + msgp.StringPrefixSize + len(string(z.VersionPurgeStatus)) + msgp.BytesPrefixSize + len(z.Data) return } diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 1d92bee25..8df8308e3 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -58,7 +58,7 @@ type StorageAPI interface { DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error - ReadVersion(ctx context.Context, volume, path, versionID string) (FileInfo, error) + ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (FileInfo, error) RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) error // File operations. diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index ece9c387c..a78ebc0b8 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -36,6 +36,7 @@ import ( "github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/rest" xnet "github.com/minio/minio/pkg/net" + xbufio "github.com/philhofer/fwd" "github.com/tinylib/msgp/msgp" ) @@ -226,8 +227,7 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, e return info, err } defer http.DrainBody(respBody) - err = msgp.Decode(respBody, &info) - if err != nil { + if err = msgp.Decode(respBody, &info); err != nil { return info, err } if info.Error != "" { @@ -398,18 +398,38 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP return err } -func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { +// where we keep old *Readers +var readMsgpReaderPool = sync.Pool{New: func() interface{} { return &msgp.Reader{} }} + +// mspNewReader returns a *Reader that reads from the provided reader. +// The reader will be buffered. +func msgpNewReader(r io.Reader) *msgp.Reader { + p := readMsgpReaderPool.Get().(*msgp.Reader) + if p.R == nil { + p.R = xbufio.NewReaderSize(r, 8<<10) + } else { + p.R.Reset(r) + } + return p +} + +func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { values := make(url.Values) values.Set(storageRESTVolume, volume) values.Set(storageRESTFilePath, path) values.Set(storageRESTVersionID, versionID) + values.Set(storageRESTReadData, strconv.FormatBool(readData)) respBody, err := client.call(ctx, storageRESTMethodReadVersion, values, nil, -1) if err != nil { return fi, err } defer http.DrainBody(respBody) - err = msgp.Decode(respBody, &fi) + + dec := msgpNewReader(respBody) + defer readMsgpReaderPool.Put(dec) + + err = fi.DecodeMsg(dec) return fi, err } @@ -479,10 +499,12 @@ func (client *storageRESTClient) WalkVersions(ctx context.Context, volume, dirPa defer close(ch) defer http.DrainBody(respBody) - decoder := msgp.NewReader(respBody) + dec := msgpNewReader(respBody) + defer readMsgpReaderPool.Put(dec) + for { var fi FileInfoVersions - if gerr := fi.DecodeMsg(decoder); gerr != nil { + if gerr := fi.DecodeMsg(dec); gerr != nil { // Upon error return if msgp.Cause(gerr) != io.EOF { logger.LogIf(GlobalContext, gerr) diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 228ab9ae3..4de76d3d6 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -17,7 +17,7 @@ package cmd const ( - storageRESTVersion = "v23" // Add small file optimization + storageRESTVersion = "v24" // Add more small file optimization storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) @@ -59,6 +59,7 @@ const ( storageRESTDirPath = "dir-path" storageRESTFilePath = "file-path" storageRESTVersionID = "version-id" + storageRESTReadData = "read-data" storageRESTTotalVersions = "total-versions" storageRESTSrcVolume = "source-volume" storageRESTSrcPath = "source-path" diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 9d806ba81..43dbbac1d 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -327,12 +327,22 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re volume := vars[storageRESTVolume] filePath := vars[storageRESTFilePath] versionID := vars[storageRESTVersionID] - fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID) + readData, err := strconv.ParseBool(vars[storageRESTReadData]) if err != nil { s.writeErrorResponse(w, err) return } - logger.LogIf(r.Context(), msgp.Encode(w, &fi)) + fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID, readData) + if err != nil { + s.writeErrorResponse(w, err) + return + } + bufp := s.storage.rpool.Get().(*[]byte) + defer s.storage.rpool.Put(bufp) + + enc := msgp.NewWriterBuf(w, *bufp) + logger.LogIf(r.Context(), fi.EncodeMsg(enc)) + enc.Flush() } // WriteMetadata write new updated metadata. @@ -1033,7 +1043,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersion).HandlerFunc(httpTraceHdrs(server.DeleteVersionHandler)). Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(httpTraceHdrs(server.ReadVersionHandler)). - Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID)...) + Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID, storageRESTReadData)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameData).HandlerFunc(httpTraceHdrs(server.RenameDataHandler)). Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDataDir, storageRESTDstVolume, storageRESTDstPath)...) diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 7ae615ae5..646e2596d 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -272,12 +272,12 @@ func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path s return p.storage.WriteMetadata(ctx, volume, path, fi) } -func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { +func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { if err = p.checkDiskStale(); err != nil { return fi, err } - return p.storage.ReadVersion(ctx, volume, path, versionID) + return p.storage.ReadVersion(ctx, volume, path, versionID, readData) } func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) { diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 6418f50f1..debc64d25 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -65,6 +65,10 @@ const ( // Size of each buffer. readAheadBufSize = 1 << 20 + // Small file threshold below which data accompanies metadata + // from storage layer. + smallFileThreshold = 32 * humanize.KiByte + // XL metadata file carries per object metadata. xlStorageFormatFile = "xl.meta" ) @@ -1082,7 +1086,14 @@ func (s *xlStorage) renameLegacyMetadata(volume, path string) error { } // ReadVersion - reads metadata and returns FileInfo at path `xl.meta` -func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { +// for all objects less than `32KiB` this call returns data as well +// along with metadata. +func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { + volumeDir, err := s.getVolDir(volume) + if err != nil { + return fi, err + } + buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile)) if err != nil { if err == errFileNotFound { @@ -1117,7 +1128,65 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str return fi, errFileNotFound } - return getFileInfo(buf, volume, path, versionID) + fi, err = getFileInfo(buf, volume, path, versionID) + if err != nil { + return fi, err + } + + if readData { + // Reading data for small objects when + // - object has not yet transitioned + // - object size lesser than 32KiB + // - object has maximum of 1 parts + if fi.TransitionStatus == "" && fi.DataDir != "" && fi.Size <= smallFileThreshold && len(fi.Parts) == 1 { + fi.Data, err = s.readAllData(volumeDir, pathJoin(volumeDir, path, fi.DataDir, fmt.Sprintf("part.%d", fi.Parts[0].Number))) + if err != nil { + return FileInfo{}, err + } + } + } + + return fi, nil +} + +func (s *xlStorage) readAllData(volumeDir, filePath string) (buf []byte, err error) { + var f *os.File + if globalStorageClass.GetDMA() == storageclass.DMAReadWrite { + f, err = disk.OpenFileDirectIO(filePath, os.O_RDONLY, 0666) + } else { + f, err = os.Open(filePath) + } + if err != nil { + if osIsNotExist(err) { + // Check if the object doesn't exist because its bucket + // is missing in order to return the correct error. + _, err = os.Stat(volumeDir) + if err != nil && osIsNotExist(err) { + return nil, errVolumeNotFound + } + return nil, errFileNotFound + } else if osIsPermission(err) { + return nil, errFileAccessDenied + } else if isSysErrNotDir(err) || isSysErrIsDir(err) { + return nil, errFileNotFound + } else if isSysErrHandleInvalid(err) { + // This case is special and needs to be handled for windows. + return nil, errFileNotFound + } else if isSysErrIO(err) { + return nil, errFaultyDisk + } else if isSysErrTooManyFiles(err) { + return nil, errTooManyOpenFiles + } else if isSysErrInvalidArg(err) { + return nil, errFileNotFound + } + return nil, err + } + + atomic.AddInt32(&s.activeIOCount, 1) + rd := &odirectReader{f, nil, nil, true, s, nil} + defer rd.Close() + + return ioutil.ReadAll(rd) } // ReadAll reads from r until an error or EOF and returns the data it read. @@ -1405,35 +1474,13 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off return nil, err } + var file *os.File if offset == 0 && globalStorageClass.GetDMA() == storageclass.DMAReadWrite { - file, err := disk.OpenFileDirectIO(filePath, os.O_RDONLY, 0666) - if err != nil { - switch { - case osIsNotExist(err): - _, err = os.Stat(volumeDir) - if err != nil && osIsNotExist(err) { - return nil, errVolumeNotFound - } - return nil, errFileNotFound - case osIsPermission(err): - return nil, errFileAccessDenied - case isSysErrNotDir(err): - return nil, errFileAccessDenied - case isSysErrIO(err): - return nil, errFaultyDisk - case isSysErrTooManyFiles(err): - return nil, errTooManyOpenFiles - default: - return nil, err - } - } - - atomic.AddInt32(&s.activeIOCount, 1) - return &odirectReader{file, nil, nil, true, s, nil}, nil + file, err = disk.OpenFileDirectIO(filePath, os.O_RDONLY, 0666) + } else { + // Open the fileile fileor reading. + file, err = os.Open(filePath) } - - // Open the file for reading. - file, err := os.Open(filePath) if err != nil { switch { case osIsNotExist(err): @@ -1466,11 +1513,17 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off return nil, errIsNotRegular } - if _, err = file.Seek(offset, io.SeekStart); err != nil { - return nil, err + atomic.AddInt32(&s.activeIOCount, 1) + if offset == 0 && globalStorageClass.GetDMA() == storageclass.DMAReadWrite { + return &odirectReader{file, nil, nil, true, s, nil}, nil + } + + if offset > 0 { + if _, err = file.Seek(offset, io.SeekStart); err != nil { + return nil, err + } } - atomic.AddInt32(&s.activeIOCount, 1) r := struct { io.Reader io.Closer @@ -1517,6 +1570,7 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz if err != nil { return err } + // Stat a volume entry. _, err = os.Stat(volumeDir) if err != nil { diff --git a/go.mod b/go.mod index 88cd7e74b..b46c3af47 100644 --- a/go.mod +++ b/go.mod @@ -62,11 +62,10 @@ require ( github.com/ncw/directio v1.0.5 github.com/nsqio/go-nsq v1.0.8 github.com/olivere/elastic/v7 v7.0.22 - github.com/philhofer/fwd v1.1.1 // indirect + github.com/philhofer/fwd v1.1.1 github.com/pierrec/lz4 v2.5.2+incompatible github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.8.0 - github.com/quasilyte/go-ruleguard/dsl/fluent v0.0.0-20201222093424-5d7e62a465d3 // indirect github.com/rjeczalik/notify v0.9.2 github.com/rs/cors v1.7.0 github.com/secure-io/sio-go v0.3.0 diff --git a/go.sum b/go.sum index f5f0d1f23..0a632c900 100644 --- a/go.sum +++ b/go.sum @@ -539,10 +539,6 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.2.0 h1:wH4vA7pcjKuZzjF7lM8awk4fnuJO6idemZXoKnULUx4= github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= -github.com/quasilyte/go-ruleguard v0.2.1 h1:56eRm0daAyny9UhJnmtJW/UyLZQusukBAB8oT8AHKHo= -github.com/quasilyte/go-ruleguard/dsl v0.0.0-20201222101508-986133edf04e h1:gXFs5pU/5pxy0nw9QoV2dAhGXI+jKSN0GJEL8TMKf4A= -github.com/quasilyte/go-ruleguard/dsl/fluent v0.0.0-20201222093424-5d7e62a465d3 h1:eL7x4/zMnlquMxYe7V078BD7MGskZ0daGln+SJCVzuY= -github.com/quasilyte/go-ruleguard/dsl/fluent v0.0.0-20201222093424-5d7e62a465d3/go.mod h1:P7JlQWFT7jDcFZMtUPQbtGzzzxva3rBn6oIF+LPwFcM= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=