diff --git a/cmd/bitrot-streaming.go b/cmd/bitrot-streaming.go index c05b53fa2..d3453d405 100644 --- a/cmd/bitrot-streaming.go +++ b/cmd/bitrot-streaming.go @@ -91,6 +91,7 @@ func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length i // ReadAt() implementation which verifies the bitrot hash available as part of the stream. type streamingBitrotReader struct { disk StorageAPI + data []byte rc io.Reader volume string filePath string @@ -122,7 +123,11 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) { // For the first ReadAt() call we need to open the stream for reading. b.currOffset = offset streamOffset := (offset/b.shardSize)*int64(b.h.Size()) + offset - b.rc, err = b.disk.ReadFileStream(context.TODO(), b.volume, b.filePath, streamOffset, b.tillOffset-streamOffset) + if len(b.data) == 0 { + b.rc, err = b.disk.ReadFileStream(context.TODO(), b.volume, b.filePath, streamOffset, b.tillOffset-streamOffset) + } else { + b.rc = io.NewSectionReader(bytes.NewReader(b.data), streamOffset, b.tillOffset-streamOffset) + } if err != nil { return 0, err } @@ -154,10 +159,11 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) { } // Returns streaming bitrot reader implementation. -func newStreamingBitrotReader(disk StorageAPI, volume, filePath string, tillOffset int64, algo BitrotAlgorithm, shardSize int64) *streamingBitrotReader { +func newStreamingBitrotReader(disk StorageAPI, data []byte, volume, filePath string, tillOffset int64, algo BitrotAlgorithm, shardSize int64) *streamingBitrotReader { h := algo.New() return &streamingBitrotReader{ disk, + data, nil, volume, filePath, diff --git a/cmd/bitrot.go b/cmd/bitrot.go index cddc17a96..24f7ecd39 100644 --- a/cmd/bitrot.go +++ b/cmd/bitrot.go @@ -103,9 +103,9 @@ func newBitrotWriter(disk StorageAPI, volume, filePath string, length int64, alg return newWholeBitrotWriter(disk, volume, filePath, algo, shardSize) } -func newBitrotReader(disk StorageAPI, bucket string, filePath string, tillOffset int64, algo BitrotAlgorithm, sum []byte, shardSize int64) io.ReaderAt { +func newBitrotReader(disk StorageAPI, data []byte, bucket string, filePath string, tillOffset int64, algo BitrotAlgorithm, sum []byte, shardSize int64) io.ReaderAt { if algo == HighwayHash256S { - return newStreamingBitrotReader(disk, bucket, filePath, tillOffset, algo, shardSize) + return newStreamingBitrotReader(disk, data, bucket, filePath, tillOffset, algo, shardSize) } return newWholeBitrotReader(disk, bucket, filePath, algo, tillOffset, sum) } diff --git a/cmd/bitrot_test.go b/cmd/bitrot_test.go index 0990c4919..17cfc3813 100644 --- a/cmd/bitrot_test.go +++ b/cmd/bitrot_test.go @@ -62,7 +62,7 @@ func testBitrotReaderWriterAlgo(t *testing.T, bitrotAlgo BitrotAlgorithm) { } writer.(io.Closer).Close() - reader := newBitrotReader(disk, volume, filePath, 35, bitrotAlgo, bitrotWriterSum(writer), 10) + reader := newBitrotReader(disk, nil, volume, filePath, 35, bitrotAlgo, bitrotWriterSum(writer), 10) b := make([]byte, 10) if _, err = reader.ReadAt(b, 0); err != nil { log.Fatal(err) diff --git a/cmd/erasure-decode_test.go b/cmd/erasure-decode_test.go index 7797ccced..d622a8b3d 100644 --- a/cmd/erasure-decode_test.go +++ b/cmd/erasure-decode_test.go @@ -134,7 +134,7 @@ func TestErasureDecode(t *testing.T) { } tillOffset := erasure.ShardFileOffset(test.offset, test.length, test.data) - bitrotReaders[index] = newBitrotReader(disk, "testbucket", "object", tillOffset, writeAlgorithm, bitrotWriterSum(writers[index]), erasure.ShardSize()) + bitrotReaders[index] = newBitrotReader(disk, nil, "testbucket", "object", tillOffset, writeAlgorithm, bitrotWriterSum(writers[index]), erasure.ShardSize()) } writer := bytes.NewBuffer(nil) @@ -164,7 +164,7 @@ func TestErasureDecode(t *testing.T) { continue } tillOffset := erasure.ShardFileOffset(test.offset, test.length, test.data) - bitrotReaders[index] = newBitrotReader(disk, "testbucket", "object", tillOffset, writeAlgorithm, bitrotWriterSum(writers[index]), erasure.ShardSize()) + bitrotReaders[index] = newBitrotReader(disk, nil, "testbucket", "object", tillOffset, writeAlgorithm, bitrotWriterSum(writers[index]), erasure.ShardSize()) } for j := range disks[:test.offDisks] { if bitrotReaders[j] == nil { @@ -270,7 +270,7 @@ func TestErasureDecodeRandomOffsetLength(t *testing.T) { continue } tillOffset := erasure.ShardFileOffset(offset, readLen, length) - bitrotReaders[index] = newStreamingBitrotReader(disk, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) + bitrotReaders[index] = newStreamingBitrotReader(disk, nil, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) } err = erasure.Decode(context.Background(), buf, bitrotReaders, offset, readLen, length, nil) closeBitrotReaders(bitrotReaders) @@ -332,7 +332,7 @@ func benchmarkErasureDecode(data, parity, dataDown, parityDown int, size int64, continue } tillOffset := erasure.ShardFileOffset(0, size, size) - bitrotReaders[index] = newStreamingBitrotReader(disk, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) + bitrotReaders[index] = newStreamingBitrotReader(disk, nil, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) } if err = erasure.Decode(context.Background(), bytes.NewBuffer(content[:0]), bitrotReaders, 0, size, size, nil); err != nil { panic(err) diff --git a/cmd/erasure-heal_test.go b/cmd/erasure-heal_test.go index 4d1fff044..94d2905e4 100644 --- a/cmd/erasure-heal_test.go +++ b/cmd/erasure-heal_test.go @@ -99,7 +99,7 @@ func TestErasureHeal(t *testing.T) { readers := make([]io.ReaderAt, len(disks)) for i, disk := range disks { shardFilesize := erasure.ShardFileSize(test.size) - readers[i] = newBitrotReader(disk, "testbucket", "testobject", shardFilesize, test.algorithm, bitrotWriterSum(writers[i]), erasure.ShardSize()) + readers[i] = newBitrotReader(disk, nil, "testbucket", "testobject", shardFilesize, test.algorithm, bitrotWriterSum(writers[i]), erasure.ShardSize()) } // setup stale disks for the test case diff --git a/cmd/erasure-healing-common_test.go b/cmd/erasure-healing-common_test.go index 99e912a18..6c9f81c3a 100644 --- a/cmd/erasure-healing-common_test.go +++ b/cmd/erasure-healing-common_test.go @@ -186,7 +186,7 @@ func TestListOnlineDisks(t *testing.T) { t.Fatalf("Failed to putObject %v", err) } - partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "") + partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) fi, err := getLatestFileInfo(ctx, partsMetadata, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo %v", err) @@ -287,7 +287,7 @@ func TestDisksWithAllParts(t *testing.T) { t.Fatalf("Failed to putObject %v", err) } - _, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "") + _, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) readQuorum := len(erasureDisks) / 2 if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { t.Fatalf("Failed to read xl meta data %v", reducedErr) @@ -295,7 +295,7 @@ func TestDisksWithAllParts(t *testing.T) { // Test that all disks are returned without any failures with // unmodified meta data - partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "") + partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) if err != nil { t.Fatalf("Failed to read xl meta data %v", err) } diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index b01573f7c..69f06baf6 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -395,7 +395,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s if latestMeta.XLV1 { partPath = pathJoin(object, fmt.Sprintf("part.%d", partNumber)) } - readers[i] = newBitrotReader(disk, bucket, partPath, tillOffset, checksumAlgo, checksumInfo.Hash, erasure.ShardSize()) + readers[i] = newBitrotReader(disk, nil, bucket, partPath, tillOffset, checksumAlgo, checksumInfo.Hash, erasure.ShardSize()) } writers := make([]io.Writer, len(outDatedDisks)) for i, disk := range outDatedDisks { @@ -811,7 +811,7 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version storageEndpoints := er.getEndpoints() // Read metadata files from all the disks - partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID) + partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false) if isAllNotFound(errs) { err = toObjectErr(errFileNotFound, bucket, object) diff --git a/cmd/erasure-healing_test.go b/cmd/erasure-healing_test.go index d8f0a8ce0..3d9671440 100644 --- a/cmd/erasure-healing_test.go +++ b/cmd/erasure-healing_test.go @@ -67,7 +67,7 @@ func TestHealing(t *testing.T) { } disk := er.getDisks()[0] - fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "") + fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false) if err != nil { t.Fatal(err) } @@ -84,7 +84,7 @@ func TestHealing(t *testing.T) { t.Fatal(err) } - fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "") + fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false) if err != nil { t.Fatal(err) } @@ -113,7 +113,7 @@ func TestHealing(t *testing.T) { t.Fatal(err) } - fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "") + fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", false) if err != nil { t.Fatal(err) } @@ -214,7 +214,7 @@ func TestHealObjectCorrupted(t *testing.T) { t.Fatalf("Failed to heal object - %v", err) } - fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "") + fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) fi, err := getLatestFileInfo(ctx, fileInfos, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) @@ -239,7 +239,7 @@ func TestHealObjectCorrupted(t *testing.T) { t.Errorf("Expected nil but received %v", err) } - fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "") + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) nfi, err := getLatestFileInfo(ctx, fileInfos, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) @@ -265,7 +265,7 @@ func TestHealObjectCorrupted(t *testing.T) { t.Errorf("Expected nil but received %v", err) } - fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "") + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) nfi, err = getLatestFileInfo(ctx, fileInfos, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) diff --git a/cmd/erasure-metadata-utils.go b/cmd/erasure-metadata-utils.go index db4aa9186..4e33d04f9 100644 --- a/cmd/erasure-metadata-utils.go +++ b/cmd/erasure-metadata-utils.go @@ -115,7 +115,7 @@ func hashOrder(key string, cardinality int) []int { // Reads all `xl.meta` metadata as a FileInfo slice. // Returns error slice indicating the failed metadata reads. -func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string) ([]FileInfo, []error) { +func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string, readData bool) ([]FileInfo, []error) { metadataArray := make([]FileInfo, len(disks)) g := errgroup.WithNErrs(len(disks)) @@ -126,7 +126,7 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve if disks[index] == nil { return errDiskNotFound } - metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID) + metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID, readData) if err != nil { if !IsErr(err, errFileNotFound, errVolumeNotFound, errFileVersionNotFound, errDiskNotFound) { logger.LogOnceIf(ctx, err, disks[index].String()) diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index a4fb8b215..d413041cf 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -46,7 +46,7 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object disks := er.getDisks() // Read metadata associated with the object from all disks. - metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), "") + metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), "", false) readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs) if err != nil { @@ -113,7 +113,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto } for _, uploadIDDir := range uploadIDDirs { uploadIDPath := pathJoin(shaDir, uploadIDDir) - fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "") + fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "", false) if err != nil { continue } @@ -127,7 +127,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto return } for _, tmpDir := range tmpDirs { - fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "") + fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "", false) if err != nil { continue } @@ -181,7 +181,7 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec if populatedUploadIds.Contains(uploadID) { continue } - fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "") + fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "", false) if err != nil { return result, toObjectErr(err, bucket, object) } @@ -371,7 +371,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo // Read metadata associated with the object from all disks. partsMetadata, errs = readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, - uploadIDPath, "") + uploadIDPath, "", false) // get Quorum for this object _, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs) @@ -474,7 +474,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo } // Read metadata again because it might be updated with parallel upload of another part. - partsMetadata, errs = readAllFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "") + partsMetadata, errs = readAllFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "", false) reducedErr = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) if reducedErr == errErasureWriteQuorum { return pi, toObjectErr(reducedErr, bucket, object) @@ -552,7 +552,7 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u storageDisks := er.getDisks() // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, opts.VersionID) + partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, opts.VersionID, false) // get Quorum for this object readQuorum, _, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs) @@ -600,7 +600,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up storageDisks := er.getDisks() // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "") + partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) // get Quorum for this object _, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs) @@ -704,7 +704,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str storageDisks := er.getDisks() // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "") + partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) // get Quorum for this object _, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs) @@ -889,7 +889,7 @@ func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, objec uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllFileInfo(ctx, er.getDisks(), minioMetaMultipartBucket, uploadIDPath, "") + partsMetadata, errs := readAllFileInfo(ctx, er.getDisks(), minioMetaMultipartBucket, uploadIDPath, "", false) // get Quorum for this object _, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs) diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index f413e4354..2db07f73e 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -58,7 +58,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d // Read metadata associated with the object from all disks. storageDisks := er.getDisks() - metaArr, errs := readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID) + metaArr, errs := readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID, false) // get Quorum for this object readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs) @@ -157,7 +157,7 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri unlockOnDefer = true } - fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts) + fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true) if err != nil { return nil, toObjectErr(err, bucket, object) } @@ -298,7 +298,8 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje } checksumInfo := metaArr[index].Erasure.GetChecksumInfo(partNumber) partPath := pathJoin(object, metaArr[index].DataDir, fmt.Sprintf("part.%d", partNumber)) - readers[index] = newBitrotReader(disk, bucket, partPath, tillOffset, + data := metaArr[index].Data + readers[index] = newBitrotReader(disk, data, bucket, partPath, tillOffset, checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize()) // Prefer local disks @@ -337,7 +338,7 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje // getObject wrapper for erasure GetObject func (er erasureObjects) getObject(ctx context.Context, bucket, object string, startOffset, length int64, writer io.Writer, opts ObjectOptions) error { - fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts) + fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true) if err != nil { return toObjectErr(err, bucket, object) } @@ -364,11 +365,11 @@ func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object strin return er.getObjectInfo(ctx, bucket, object, opts) } -func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) { +func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions, readData bool) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) { disks := er.getDisks() // Read metadata associated with the object from all disks. - metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID) + metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, readData) readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs) if err != nil { @@ -410,7 +411,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s // getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo. func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { - fi, _, _, err := er.getObjectFileInfo(ctx, bucket, object, opts) + fi, _, _, err := er.getObjectFileInfo(ctx, bucket, object, opts, false) if err != nil { return objInfo, toObjectErr(err, bucket, object) } @@ -1073,7 +1074,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin disks := er.getDisks() // Read metadata associated with the object from all disks. - metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID) + metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false) readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs) if err != nil { @@ -1134,7 +1135,7 @@ func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object st disks := er.getDisks() // Read metadata associated with the object from all disks. - metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID) + metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false) readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs) if err != nil { diff --git a/cmd/erasure-object_test.go b/cmd/erasure-object_test.go index a224489df..c89a4daf0 100644 --- a/cmd/erasure-object_test.go +++ b/cmd/erasure-object_test.go @@ -19,7 +19,9 @@ package cmd import ( "bytes" "context" + crand "crypto/rand" "errors" + "io" "io/ioutil" "os" "testing" @@ -288,9 +290,13 @@ func TestGetObjectNoQuorum(t *testing.T) { bucket := "bucket" object := "object" opts := ObjectOptions{} + buf := make([]byte, 33<<10) + if _, err = io.ReadFull(crand.Reader, buf); err != nil { + t.Fatal(err) + } // Test use case 1: All disks are online, xl.meta are present, but data are missing - _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts) + _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(buf), int64(len(buf)), "", ""), opts) if err != nil { t.Fatal(err) } @@ -304,7 +310,7 @@ func TestGetObjectNoQuorum(t *testing.T) { } } - err = xl.GetObject(ctx, bucket, object, 0, int64(len("abcd")), ioutil.Discard, "", opts) + err = xl.GetObject(ctx, bucket, object, 0, int64(len(buf)), ioutil.Discard, "", opts) if err != toObjectErr(errFileNotFound, bucket, object) { t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err) } @@ -315,7 +321,7 @@ func TestGetObjectNoQuorum(t *testing.T) { // invocations, where f - [0,2) // Create "object" under "bucket". - _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts) + _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(buf), int64(len(buf)), "", ""), opts) if err != nil { t.Fatal(err) } @@ -521,7 +527,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts1, errs1 := readAllFileInfo(ctx, erasureDisks, bucket, object1, "") + parts1, errs1 := readAllFileInfo(ctx, erasureDisks, bucket, object1, "", false) parts1SC := globalStorageClass @@ -534,7 +540,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts2, errs2 := readAllFileInfo(ctx, erasureDisks, bucket, object2, "") + parts2, errs2 := readAllFileInfo(ctx, erasureDisks, bucket, object2, "", false) parts2SC := globalStorageClass // Object for test case 3 - No StorageClass defined, MetaData in PutObject requesting Standard Storage Class @@ -546,7 +552,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts3, errs3 := readAllFileInfo(ctx, erasureDisks, bucket, object3, "") + parts3, errs3 := readAllFileInfo(ctx, erasureDisks, bucket, object3, "", false) parts3SC := globalStorageClass // Object for test case 4 - Standard StorageClass defined as Parity 6, MetaData in PutObject requesting Standard Storage Class @@ -564,7 +570,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts4, errs4 := readAllFileInfo(ctx, erasureDisks, bucket, object4, "") + parts4, errs4 := readAllFileInfo(ctx, erasureDisks, bucket, object4, "", false) parts4SC := storageclass.Config{ Standard: storageclass.StorageClass{ Parity: 6, @@ -587,7 +593,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts5, errs5 := readAllFileInfo(ctx, erasureDisks, bucket, object5, "") + parts5, errs5 := readAllFileInfo(ctx, erasureDisks, bucket, object5, "", false) parts5SC := storageclass.Config{ RRS: storageclass.StorageClass{ Parity: 2, @@ -609,7 +615,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts6, errs6 := readAllFileInfo(ctx, erasureDisks, bucket, object6, "") + parts6, errs6 := readAllFileInfo(ctx, erasureDisks, bucket, object6, "", false) parts6SC := storageclass.Config{ RRS: storageclass.StorageClass{ Parity: 2, @@ -632,7 +638,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts7, errs7 := readAllFileInfo(ctx, erasureDisks, bucket, object7, "") + parts7, errs7 := readAllFileInfo(ctx, erasureDisks, bucket, object7, "", false) parts7SC := storageclass.Config{ Standard: storageclass.StorageClass{ Parity: 5, diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index 0a4644f28..bb05b4e78 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -388,7 +388,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt continue } - _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "") + _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "", false) if err != nil { time.Sleep(retryDelay) retries++ @@ -397,7 +397,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt } // Read metadata associated with the object from all disks. - fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), ObjectOptions{}) + fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), ObjectOptions{}, true) if err != nil { switch toObjectErr(err, minioMetaBucket, o.objectPath(0)).(type) { case ObjectNotFound: @@ -463,7 +463,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt continue } - _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "") + _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "", false) if err != nil { time.Sleep(retryDelay) retries++ @@ -471,7 +471,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt } } // Load first part metadata... - fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{}) + fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{}, true) if err != nil { time.Sleep(retryDelay) retries++ diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 3d7d913fc..df3a1e2f3 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -252,11 +252,11 @@ func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi return d.disk.DeleteVersion(ctx, volume, path, fi) } -func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { +func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { if err := d.calcError(); err != nil { return FileInfo{}, err } - return d.disk.ReadVersion(ctx, volume, path, versionID) + return d.disk.ReadVersion(ctx, volume, path, versionID, readData) } func (d *naughtyDisk) WriteAll(ctx context.Context, volume string, path string, b []byte) (err error) { diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index f77da8565..5cce92a33 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -151,6 +151,8 @@ type FileInfo struct { MarkDeleted bool // mark this version as deleted DeleteMarkerReplicationStatus string VersionPurgeStatus VersionPurgeStatusType + + Data []byte // optionally carries object data } // VersionPurgeStatusKey denotes purge status in metadata diff --git a/cmd/storage-datatypes_gen.go b/cmd/storage-datatypes_gen.go index aa5196da0..4dd1e686a 100644 --- a/cmd/storage-datatypes_gen.go +++ b/cmd/storage-datatypes_gen.go @@ -245,8 +245,8 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err) return } - if zb0001 != 17 { - err = msgp.ArrayError{Wanted: 17, Got: zb0001} + if zb0001 != 18 { + err = msgp.ArrayError{Wanted: 18, Got: zb0001} return } z.Volume, err = dc.ReadString() @@ -375,13 +375,18 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) { } z.VersionPurgeStatus = VersionPurgeStatusType(zb0004) } + z.Data, err = dc.ReadBytes(z.Data) + if err != nil { + err = msgp.WrapError(err, "Data") + return + } return } // EncodeMsg implements msgp.Encodable func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) { - // array header, size 17 - err = en.Append(0xdc, 0x0, 0x11) + // array header, size 18 + err = en.Append(0xdc, 0x0, 0x12) if err != nil { return } @@ -489,14 +494,19 @@ func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "VersionPurgeStatus") return } + err = en.WriteBytes(z.Data) + if err != nil { + err = msgp.WrapError(err, "Data") + return + } return } // MarshalMsg implements msgp.Marshaler func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // array header, size 17 - o = append(o, 0xdc, 0x0, 0x11) + // array header, size 18 + o = append(o, 0xdc, 0x0, 0x12) o = msgp.AppendString(o, z.Volume) o = msgp.AppendString(o, z.Name) o = msgp.AppendString(o, z.VersionID) @@ -529,6 +539,7 @@ func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.AppendBool(o, z.MarkDeleted) o = msgp.AppendString(o, z.DeleteMarkerReplicationStatus) o = msgp.AppendString(o, string(z.VersionPurgeStatus)) + o = msgp.AppendBytes(o, z.Data) return } @@ -540,8 +551,8 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err) return } - if zb0001 != 17 { - err = msgp.ArrayError{Wanted: 17, Got: zb0001} + if zb0001 != 18 { + err = msgp.ArrayError{Wanted: 18, Got: zb0001} return } z.Volume, bts, err = msgp.ReadStringBytes(bts) @@ -670,6 +681,11 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { } z.VersionPurgeStatus = VersionPurgeStatusType(zb0004) } + z.Data, bts, err = msgp.ReadBytesBytes(bts, z.Data) + if err != nil { + err = msgp.WrapError(err, "Data") + return + } o = bts return } @@ -687,7 +703,7 @@ func (z *FileInfo) Msgsize() (s int) { for za0003 := range z.Parts { s += z.Parts[za0003].Msgsize() } - s += z.Erasure.Msgsize() + msgp.BoolSize + msgp.StringPrefixSize + len(z.DeleteMarkerReplicationStatus) + msgp.StringPrefixSize + len(string(z.VersionPurgeStatus)) + s += z.Erasure.Msgsize() + msgp.BoolSize + msgp.StringPrefixSize + len(z.DeleteMarkerReplicationStatus) + msgp.StringPrefixSize + len(string(z.VersionPurgeStatus)) + msgp.BytesPrefixSize + len(z.Data) return } diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 1d92bee25..8df8308e3 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -58,7 +58,7 @@ type StorageAPI interface { DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error - ReadVersion(ctx context.Context, volume, path, versionID string) (FileInfo, error) + ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (FileInfo, error) RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) error // File operations. diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index ece9c387c..a78ebc0b8 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -36,6 +36,7 @@ import ( "github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/rest" xnet "github.com/minio/minio/pkg/net" + xbufio "github.com/philhofer/fwd" "github.com/tinylib/msgp/msgp" ) @@ -226,8 +227,7 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, e return info, err } defer http.DrainBody(respBody) - err = msgp.Decode(respBody, &info) - if err != nil { + if err = msgp.Decode(respBody, &info); err != nil { return info, err } if info.Error != "" { @@ -398,18 +398,38 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP return err } -func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { +// where we keep old *Readers +var readMsgpReaderPool = sync.Pool{New: func() interface{} { return &msgp.Reader{} }} + +// mspNewReader returns a *Reader that reads from the provided reader. +// The reader will be buffered. +func msgpNewReader(r io.Reader) *msgp.Reader { + p := readMsgpReaderPool.Get().(*msgp.Reader) + if p.R == nil { + p.R = xbufio.NewReaderSize(r, 8<<10) + } else { + p.R.Reset(r) + } + return p +} + +func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { values := make(url.Values) values.Set(storageRESTVolume, volume) values.Set(storageRESTFilePath, path) values.Set(storageRESTVersionID, versionID) + values.Set(storageRESTReadData, strconv.FormatBool(readData)) respBody, err := client.call(ctx, storageRESTMethodReadVersion, values, nil, -1) if err != nil { return fi, err } defer http.DrainBody(respBody) - err = msgp.Decode(respBody, &fi) + + dec := msgpNewReader(respBody) + defer readMsgpReaderPool.Put(dec) + + err = fi.DecodeMsg(dec) return fi, err } @@ -479,10 +499,12 @@ func (client *storageRESTClient) WalkVersions(ctx context.Context, volume, dirPa defer close(ch) defer http.DrainBody(respBody) - decoder := msgp.NewReader(respBody) + dec := msgpNewReader(respBody) + defer readMsgpReaderPool.Put(dec) + for { var fi FileInfoVersions - if gerr := fi.DecodeMsg(decoder); gerr != nil { + if gerr := fi.DecodeMsg(dec); gerr != nil { // Upon error return if msgp.Cause(gerr) != io.EOF { logger.LogIf(GlobalContext, gerr) diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 228ab9ae3..4de76d3d6 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -17,7 +17,7 @@ package cmd const ( - storageRESTVersion = "v23" // Add small file optimization + storageRESTVersion = "v24" // Add more small file optimization storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) @@ -59,6 +59,7 @@ const ( storageRESTDirPath = "dir-path" storageRESTFilePath = "file-path" storageRESTVersionID = "version-id" + storageRESTReadData = "read-data" storageRESTTotalVersions = "total-versions" storageRESTSrcVolume = "source-volume" storageRESTSrcPath = "source-path" diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 9d806ba81..43dbbac1d 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -327,12 +327,22 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re volume := vars[storageRESTVolume] filePath := vars[storageRESTFilePath] versionID := vars[storageRESTVersionID] - fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID) + readData, err := strconv.ParseBool(vars[storageRESTReadData]) if err != nil { s.writeErrorResponse(w, err) return } - logger.LogIf(r.Context(), msgp.Encode(w, &fi)) + fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID, readData) + if err != nil { + s.writeErrorResponse(w, err) + return + } + bufp := s.storage.rpool.Get().(*[]byte) + defer s.storage.rpool.Put(bufp) + + enc := msgp.NewWriterBuf(w, *bufp) + logger.LogIf(r.Context(), fi.EncodeMsg(enc)) + enc.Flush() } // WriteMetadata write new updated metadata. @@ -1033,7 +1043,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersion).HandlerFunc(httpTraceHdrs(server.DeleteVersionHandler)). Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(httpTraceHdrs(server.ReadVersionHandler)). - Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID)...) + Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID, storageRESTReadData)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameData).HandlerFunc(httpTraceHdrs(server.RenameDataHandler)). Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDataDir, storageRESTDstVolume, storageRESTDstPath)...) diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 7ae615ae5..646e2596d 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -272,12 +272,12 @@ func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path s return p.storage.WriteMetadata(ctx, volume, path, fi) } -func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { +func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { if err = p.checkDiskStale(); err != nil { return fi, err } - return p.storage.ReadVersion(ctx, volume, path, versionID) + return p.storage.ReadVersion(ctx, volume, path, versionID, readData) } func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) { diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 6418f50f1..debc64d25 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -65,6 +65,10 @@ const ( // Size of each buffer. readAheadBufSize = 1 << 20 + // Small file threshold below which data accompanies metadata + // from storage layer. + smallFileThreshold = 32 * humanize.KiByte + // XL metadata file carries per object metadata. xlStorageFormatFile = "xl.meta" ) @@ -1082,7 +1086,14 @@ func (s *xlStorage) renameLegacyMetadata(volume, path string) error { } // ReadVersion - reads metadata and returns FileInfo at path `xl.meta` -func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { +// for all objects less than `32KiB` this call returns data as well +// along with metadata. +func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { + volumeDir, err := s.getVolDir(volume) + if err != nil { + return fi, err + } + buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile)) if err != nil { if err == errFileNotFound { @@ -1117,7 +1128,65 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str return fi, errFileNotFound } - return getFileInfo(buf, volume, path, versionID) + fi, err = getFileInfo(buf, volume, path, versionID) + if err != nil { + return fi, err + } + + if readData { + // Reading data for small objects when + // - object has not yet transitioned + // - object size lesser than 32KiB + // - object has maximum of 1 parts + if fi.TransitionStatus == "" && fi.DataDir != "" && fi.Size <= smallFileThreshold && len(fi.Parts) == 1 { + fi.Data, err = s.readAllData(volumeDir, pathJoin(volumeDir, path, fi.DataDir, fmt.Sprintf("part.%d", fi.Parts[0].Number))) + if err != nil { + return FileInfo{}, err + } + } + } + + return fi, nil +} + +func (s *xlStorage) readAllData(volumeDir, filePath string) (buf []byte, err error) { + var f *os.File + if globalStorageClass.GetDMA() == storageclass.DMAReadWrite { + f, err = disk.OpenFileDirectIO(filePath, os.O_RDONLY, 0666) + } else { + f, err = os.Open(filePath) + } + if err != nil { + if osIsNotExist(err) { + // Check if the object doesn't exist because its bucket + // is missing in order to return the correct error. + _, err = os.Stat(volumeDir) + if err != nil && osIsNotExist(err) { + return nil, errVolumeNotFound + } + return nil, errFileNotFound + } else if osIsPermission(err) { + return nil, errFileAccessDenied + } else if isSysErrNotDir(err) || isSysErrIsDir(err) { + return nil, errFileNotFound + } else if isSysErrHandleInvalid(err) { + // This case is special and needs to be handled for windows. + return nil, errFileNotFound + } else if isSysErrIO(err) { + return nil, errFaultyDisk + } else if isSysErrTooManyFiles(err) { + return nil, errTooManyOpenFiles + } else if isSysErrInvalidArg(err) { + return nil, errFileNotFound + } + return nil, err + } + + atomic.AddInt32(&s.activeIOCount, 1) + rd := &odirectReader{f, nil, nil, true, s, nil} + defer rd.Close() + + return ioutil.ReadAll(rd) } // ReadAll reads from r until an error or EOF and returns the data it read. @@ -1405,35 +1474,13 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off return nil, err } + var file *os.File if offset == 0 && globalStorageClass.GetDMA() == storageclass.DMAReadWrite { - file, err := disk.OpenFileDirectIO(filePath, os.O_RDONLY, 0666) - if err != nil { - switch { - case osIsNotExist(err): - _, err = os.Stat(volumeDir) - if err != nil && osIsNotExist(err) { - return nil, errVolumeNotFound - } - return nil, errFileNotFound - case osIsPermission(err): - return nil, errFileAccessDenied - case isSysErrNotDir(err): - return nil, errFileAccessDenied - case isSysErrIO(err): - return nil, errFaultyDisk - case isSysErrTooManyFiles(err): - return nil, errTooManyOpenFiles - default: - return nil, err - } - } - - atomic.AddInt32(&s.activeIOCount, 1) - return &odirectReader{file, nil, nil, true, s, nil}, nil + file, err = disk.OpenFileDirectIO(filePath, os.O_RDONLY, 0666) + } else { + // Open the fileile fileor reading. + file, err = os.Open(filePath) } - - // Open the file for reading. - file, err := os.Open(filePath) if err != nil { switch { case osIsNotExist(err): @@ -1466,11 +1513,17 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off return nil, errIsNotRegular } - if _, err = file.Seek(offset, io.SeekStart); err != nil { - return nil, err + atomic.AddInt32(&s.activeIOCount, 1) + if offset == 0 && globalStorageClass.GetDMA() == storageclass.DMAReadWrite { + return &odirectReader{file, nil, nil, true, s, nil}, nil + } + + if offset > 0 { + if _, err = file.Seek(offset, io.SeekStart); err != nil { + return nil, err + } } - atomic.AddInt32(&s.activeIOCount, 1) r := struct { io.Reader io.Closer @@ -1517,6 +1570,7 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz if err != nil { return err } + // Stat a volume entry. _, err = os.Stat(volumeDir) if err != nil { diff --git a/go.mod b/go.mod index 88cd7e74b..b46c3af47 100644 --- a/go.mod +++ b/go.mod @@ -62,11 +62,10 @@ require ( github.com/ncw/directio v1.0.5 github.com/nsqio/go-nsq v1.0.8 github.com/olivere/elastic/v7 v7.0.22 - github.com/philhofer/fwd v1.1.1 // indirect + github.com/philhofer/fwd v1.1.1 github.com/pierrec/lz4 v2.5.2+incompatible github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.8.0 - github.com/quasilyte/go-ruleguard/dsl/fluent v0.0.0-20201222093424-5d7e62a465d3 // indirect github.com/rjeczalik/notify v0.9.2 github.com/rs/cors v1.7.0 github.com/secure-io/sio-go v0.3.0 diff --git a/go.sum b/go.sum index f5f0d1f23..0a632c900 100644 --- a/go.sum +++ b/go.sum @@ -539,10 +539,6 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.2.0 h1:wH4vA7pcjKuZzjF7lM8awk4fnuJO6idemZXoKnULUx4= github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= -github.com/quasilyte/go-ruleguard v0.2.1 h1:56eRm0daAyny9UhJnmtJW/UyLZQusukBAB8oT8AHKHo= -github.com/quasilyte/go-ruleguard/dsl v0.0.0-20201222101508-986133edf04e h1:gXFs5pU/5pxy0nw9QoV2dAhGXI+jKSN0GJEL8TMKf4A= -github.com/quasilyte/go-ruleguard/dsl/fluent v0.0.0-20201222093424-5d7e62a465d3 h1:eL7x4/zMnlquMxYe7V078BD7MGskZ0daGln+SJCVzuY= -github.com/quasilyte/go-ruleguard/dsl/fluent v0.0.0-20201222093424-5d7e62a465d3/go.mod h1:P7JlQWFT7jDcFZMtUPQbtGzzzxva3rBn6oIF+LPwFcM= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=