From c4131c279816414f55defef40ba4e13fb75fbdb6 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sun, 3 Jan 2021 11:27:57 -0800 Subject: [PATCH] feat: Small object optimization read data in single bulk call (#11207) --- cmd/api-router.go | 24 +- cmd/bitrot-streaming.go | 22 +- cmd/bitrot.go | 4 +- cmd/bitrot_test.go | 2 +- cmd/erasure-decode_test.go | 8 +- cmd/erasure-heal_test.go | 2 +- cmd/erasure-healing-common_test.go | 6 +- cmd/erasure-healing.go | 4 +- cmd/erasure-healing_test.go | 12 +- cmd/erasure-metadata-utils.go | 4 +- cmd/erasure-multipart.go | 20 +- cmd/erasure-object.go | 18 +- cmd/erasure-object_test.go | 29 +- cmd/metacache-set.go | 8 +- cmd/naughty-disk_test.go | 4 +- cmd/server-main.go | 22 +- cmd/storage-datatypes.go | 7 + cmd/storage-datatypes_gen.go | 606 ++++++++++------------------- cmd/storage-datatypes_test.go | 26 +- cmd/storage-interface.go | 2 +- cmd/storage-rest-client.go | 3 +- cmd/storage-rest-common.go | 3 +- cmd/storage-rest-server.go | 9 +- cmd/xl-storage-disk-id-check.go | 4 +- cmd/xl-storage.go | 75 +++- 25 files changed, 434 insertions(+), 490 deletions(-) diff --git a/cmd/api-router.go b/cmd/api-router.go index e4065b98e..b1628fed1 100644 --- a/cmd/api-router.go +++ b/cmd/api-router.go @@ -27,23 +27,35 @@ import ( ) func newHTTPServerFn() *xhttp.Server { - globalObjLayerMutex.Lock() - defer globalObjLayerMutex.Unlock() + globalObjLayerMutex.RLock() + defer globalObjLayerMutex.RUnlock() return globalHTTPServer } -func newObjectLayerFn() ObjectLayer { +func setHTTPServer(h *xhttp.Server) { globalObjLayerMutex.Lock() - defer globalObjLayerMutex.Unlock() + globalHTTPServer = h + globalObjLayerMutex.Unlock() +} + +func newObjectLayerFn() ObjectLayer { + globalObjLayerMutex.RLock() + defer globalObjLayerMutex.RUnlock() return globalObjectAPI } func newCachedObjectLayerFn() CacheObjectLayer { - globalObjLayerMutex.Lock() - defer globalObjLayerMutex.Unlock() + globalObjLayerMutex.RLock() + defer globalObjLayerMutex.RUnlock() return globalCacheObjectAPI } +func setCacheObjectLayer(c CacheObjectLayer) { + globalObjLayerMutex.Lock() + globalCacheObjectAPI = c + globalObjLayerMutex.Unlock() +} + func setObjectLayer(o ObjectLayer) { globalObjLayerMutex.Lock() globalObjectAPI = o diff --git a/cmd/bitrot-streaming.go b/cmd/bitrot-streaming.go index ddeaf29c2..f552323b6 100644 --- a/cmd/bitrot-streaming.go +++ b/cmd/bitrot-streaming.go @@ -91,7 +91,8 @@ func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length i // ReadAt() implementation which verifies the bitrot hash available as part of the stream. type streamingBitrotReader struct { disk StorageAPI - rc io.ReadCloser + data []byte + rc io.Reader volume string filePath string tillOffset int64 @@ -105,7 +106,10 @@ func (b *streamingBitrotReader) Close() error { if b.rc == nil { return nil } - return b.rc.Close() + if closer, ok := b.rc.(io.Closer); ok { + return closer.Close() + } + return nil } func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) { @@ -119,11 +123,16 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) { // For the first ReadAt() call we need to open the stream for reading. b.currOffset = offset streamOffset := (offset/b.shardSize)*int64(b.h.Size()) + offset - b.rc, err = b.disk.ReadFileStream(context.TODO(), b.volume, b.filePath, streamOffset, b.tillOffset-streamOffset) - if err != nil { - return 0, err + if len(b.data) == 0 { + b.rc, err = b.disk.ReadFileStream(context.TODO(), b.volume, b.filePath, streamOffset, b.tillOffset-streamOffset) + if err != nil { + return 0, err + } + } else { + b.rc = io.NewSectionReader(bytes.NewReader(b.data), streamOffset, b.tillOffset-streamOffset) } } + if offset != b.currOffset { // Can never happen unless there are programmer bugs return 0, errUnexpected @@ -150,10 +159,11 @@ func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) { } // Returns streaming bitrot reader implementation. -func newStreamingBitrotReader(disk StorageAPI, volume, filePath string, tillOffset int64, algo BitrotAlgorithm, shardSize int64) *streamingBitrotReader { +func newStreamingBitrotReader(disk StorageAPI, data []byte, volume, filePath string, tillOffset int64, algo BitrotAlgorithm, shardSize int64) *streamingBitrotReader { h := algo.New() return &streamingBitrotReader{ disk, + data, nil, volume, filePath, diff --git a/cmd/bitrot.go b/cmd/bitrot.go index cddc17a96..24f7ecd39 100644 --- a/cmd/bitrot.go +++ b/cmd/bitrot.go @@ -103,9 +103,9 @@ func newBitrotWriter(disk StorageAPI, volume, filePath string, length int64, alg return newWholeBitrotWriter(disk, volume, filePath, algo, shardSize) } -func newBitrotReader(disk StorageAPI, bucket string, filePath string, tillOffset int64, algo BitrotAlgorithm, sum []byte, shardSize int64) io.ReaderAt { +func newBitrotReader(disk StorageAPI, data []byte, bucket string, filePath string, tillOffset int64, algo BitrotAlgorithm, sum []byte, shardSize int64) io.ReaderAt { if algo == HighwayHash256S { - return newStreamingBitrotReader(disk, bucket, filePath, tillOffset, algo, shardSize) + return newStreamingBitrotReader(disk, data, bucket, filePath, tillOffset, algo, shardSize) } return newWholeBitrotReader(disk, bucket, filePath, algo, tillOffset, sum) } diff --git a/cmd/bitrot_test.go b/cmd/bitrot_test.go index 0990c4919..17cfc3813 100644 --- a/cmd/bitrot_test.go +++ b/cmd/bitrot_test.go @@ -62,7 +62,7 @@ func testBitrotReaderWriterAlgo(t *testing.T, bitrotAlgo BitrotAlgorithm) { } writer.(io.Closer).Close() - reader := newBitrotReader(disk, volume, filePath, 35, bitrotAlgo, bitrotWriterSum(writer), 10) + reader := newBitrotReader(disk, nil, volume, filePath, 35, bitrotAlgo, bitrotWriterSum(writer), 10) b := make([]byte, 10) if _, err = reader.ReadAt(b, 0); err != nil { log.Fatal(err) diff --git a/cmd/erasure-decode_test.go b/cmd/erasure-decode_test.go index 7797ccced..d622a8b3d 100644 --- a/cmd/erasure-decode_test.go +++ b/cmd/erasure-decode_test.go @@ -134,7 +134,7 @@ func TestErasureDecode(t *testing.T) { } tillOffset := erasure.ShardFileOffset(test.offset, test.length, test.data) - bitrotReaders[index] = newBitrotReader(disk, "testbucket", "object", tillOffset, writeAlgorithm, bitrotWriterSum(writers[index]), erasure.ShardSize()) + bitrotReaders[index] = newBitrotReader(disk, nil, "testbucket", "object", tillOffset, writeAlgorithm, bitrotWriterSum(writers[index]), erasure.ShardSize()) } writer := bytes.NewBuffer(nil) @@ -164,7 +164,7 @@ func TestErasureDecode(t *testing.T) { continue } tillOffset := erasure.ShardFileOffset(test.offset, test.length, test.data) - bitrotReaders[index] = newBitrotReader(disk, "testbucket", "object", tillOffset, writeAlgorithm, bitrotWriterSum(writers[index]), erasure.ShardSize()) + bitrotReaders[index] = newBitrotReader(disk, nil, "testbucket", "object", tillOffset, writeAlgorithm, bitrotWriterSum(writers[index]), erasure.ShardSize()) } for j := range disks[:test.offDisks] { if bitrotReaders[j] == nil { @@ -270,7 +270,7 @@ func TestErasureDecodeRandomOffsetLength(t *testing.T) { continue } tillOffset := erasure.ShardFileOffset(offset, readLen, length) - bitrotReaders[index] = newStreamingBitrotReader(disk, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) + bitrotReaders[index] = newStreamingBitrotReader(disk, nil, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) } err = erasure.Decode(context.Background(), buf, bitrotReaders, offset, readLen, length, nil) closeBitrotReaders(bitrotReaders) @@ -332,7 +332,7 @@ func benchmarkErasureDecode(data, parity, dataDown, parityDown int, size int64, continue } tillOffset := erasure.ShardFileOffset(0, size, size) - bitrotReaders[index] = newStreamingBitrotReader(disk, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) + bitrotReaders[index] = newStreamingBitrotReader(disk, nil, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) } if err = erasure.Decode(context.Background(), bytes.NewBuffer(content[:0]), bitrotReaders, 0, size, size, nil); err != nil { panic(err) diff --git a/cmd/erasure-heal_test.go b/cmd/erasure-heal_test.go index 4d1fff044..94d2905e4 100644 --- a/cmd/erasure-heal_test.go +++ b/cmd/erasure-heal_test.go @@ -99,7 +99,7 @@ func TestErasureHeal(t *testing.T) { readers := make([]io.ReaderAt, len(disks)) for i, disk := range disks { shardFilesize := erasure.ShardFileSize(test.size) - readers[i] = newBitrotReader(disk, "testbucket", "testobject", shardFilesize, test.algorithm, bitrotWriterSum(writers[i]), erasure.ShardSize()) + readers[i] = newBitrotReader(disk, nil, "testbucket", "testobject", shardFilesize, test.algorithm, bitrotWriterSum(writers[i]), erasure.ShardSize()) } // setup stale disks for the test case diff --git a/cmd/erasure-healing-common_test.go b/cmd/erasure-healing-common_test.go index 99e912a18..6c9f81c3a 100644 --- a/cmd/erasure-healing-common_test.go +++ b/cmd/erasure-healing-common_test.go @@ -186,7 +186,7 @@ func TestListOnlineDisks(t *testing.T) { t.Fatalf("Failed to putObject %v", err) } - partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "") + partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) fi, err := getLatestFileInfo(ctx, partsMetadata, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo %v", err) @@ -287,7 +287,7 @@ func TestDisksWithAllParts(t *testing.T) { t.Fatalf("Failed to putObject %v", err) } - _, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "") + _, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) readQuorum := len(erasureDisks) / 2 if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { t.Fatalf("Failed to read xl meta data %v", reducedErr) @@ -295,7 +295,7 @@ func TestDisksWithAllParts(t *testing.T) { // Test that all disks are returned without any failures with // unmodified meta data - partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "") + partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) if err != nil { t.Fatalf("Failed to read xl meta data %v", err) } diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 0a8efa17f..31bfa24f6 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -395,7 +395,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s if latestMeta.XLV1 { partPath = pathJoin(object, fmt.Sprintf("part.%d", partNumber)) } - readers[i] = newBitrotReader(disk, bucket, partPath, tillOffset, checksumAlgo, checksumInfo.Hash, erasure.ShardSize()) + readers[i] = newBitrotReader(disk, partsMetadata[i].Data, bucket, partPath, tillOffset, checksumAlgo, checksumInfo.Hash, erasure.ShardSize()) } writers := make([]io.Writer, len(outDatedDisks)) for i, disk := range outDatedDisks { @@ -811,7 +811,7 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version storageEndpoints := er.getEndpoints() // Read metadata files from all the disks - partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID) + partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false) if isAllNotFound(errs) { err = toObjectErr(errFileNotFound, bucket, object) diff --git a/cmd/erasure-healing_test.go b/cmd/erasure-healing_test.go index d8f0a8ce0..3d9671440 100644 --- a/cmd/erasure-healing_test.go +++ b/cmd/erasure-healing_test.go @@ -67,7 +67,7 @@ func TestHealing(t *testing.T) { } disk := er.getDisks()[0] - fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "") + fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false) if err != nil { t.Fatal(err) } @@ -84,7 +84,7 @@ func TestHealing(t *testing.T) { t.Fatal(err) } - fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "") + fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false) if err != nil { t.Fatal(err) } @@ -113,7 +113,7 @@ func TestHealing(t *testing.T) { t.Fatal(err) } - fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "") + fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", false) if err != nil { t.Fatal(err) } @@ -214,7 +214,7 @@ func TestHealObjectCorrupted(t *testing.T) { t.Fatalf("Failed to heal object - %v", err) } - fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "") + fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) fi, err := getLatestFileInfo(ctx, fileInfos, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) @@ -239,7 +239,7 @@ func TestHealObjectCorrupted(t *testing.T) { t.Errorf("Expected nil but received %v", err) } - fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "") + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) nfi, err := getLatestFileInfo(ctx, fileInfos, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) @@ -265,7 +265,7 @@ func TestHealObjectCorrupted(t *testing.T) { t.Errorf("Expected nil but received %v", err) } - fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "") + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) nfi, err = getLatestFileInfo(ctx, fileInfos, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) diff --git a/cmd/erasure-metadata-utils.go b/cmd/erasure-metadata-utils.go index db4aa9186..4e33d04f9 100644 --- a/cmd/erasure-metadata-utils.go +++ b/cmd/erasure-metadata-utils.go @@ -115,7 +115,7 @@ func hashOrder(key string, cardinality int) []int { // Reads all `xl.meta` metadata as a FileInfo slice. // Returns error slice indicating the failed metadata reads. -func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string) ([]FileInfo, []error) { +func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string, readData bool) ([]FileInfo, []error) { metadataArray := make([]FileInfo, len(disks)) g := errgroup.WithNErrs(len(disks)) @@ -126,7 +126,7 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve if disks[index] == nil { return errDiskNotFound } - metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID) + metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID, readData) if err != nil { if !IsErr(err, errFileNotFound, errVolumeNotFound, errFileVersionNotFound, errDiskNotFound) { logger.LogOnceIf(ctx, err, disks[index].String()) diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index a4fb8b215..d413041cf 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -46,7 +46,7 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object disks := er.getDisks() // Read metadata associated with the object from all disks. - metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), "") + metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), "", false) readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs) if err != nil { @@ -113,7 +113,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto } for _, uploadIDDir := range uploadIDDirs { uploadIDPath := pathJoin(shaDir, uploadIDDir) - fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "") + fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "", false) if err != nil { continue } @@ -127,7 +127,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto return } for _, tmpDir := range tmpDirs { - fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "") + fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "", false) if err != nil { continue } @@ -181,7 +181,7 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec if populatedUploadIds.Contains(uploadID) { continue } - fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "") + fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "", false) if err != nil { return result, toObjectErr(err, bucket, object) } @@ -371,7 +371,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo // Read metadata associated with the object from all disks. partsMetadata, errs = readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, - uploadIDPath, "") + uploadIDPath, "", false) // get Quorum for this object _, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs) @@ -474,7 +474,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo } // Read metadata again because it might be updated with parallel upload of another part. - partsMetadata, errs = readAllFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "") + partsMetadata, errs = readAllFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "", false) reducedErr = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) if reducedErr == errErasureWriteQuorum { return pi, toObjectErr(reducedErr, bucket, object) @@ -552,7 +552,7 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u storageDisks := er.getDisks() // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, opts.VersionID) + partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, opts.VersionID, false) // get Quorum for this object readQuorum, _, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs) @@ -600,7 +600,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up storageDisks := er.getDisks() // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "") + partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) // get Quorum for this object _, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs) @@ -704,7 +704,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str storageDisks := er.getDisks() // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "") + partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) // get Quorum for this object _, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs) @@ -889,7 +889,7 @@ func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, objec uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllFileInfo(ctx, er.getDisks(), minioMetaMultipartBucket, uploadIDPath, "") + partsMetadata, errs := readAllFileInfo(ctx, er.getDisks(), minioMetaMultipartBucket, uploadIDPath, "", false) // get Quorum for this object _, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs) diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index f413e4354..fde7161ac 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -58,7 +58,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d // Read metadata associated with the object from all disks. storageDisks := er.getDisks() - metaArr, errs := readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID) + metaArr, errs := readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID, false) // get Quorum for this object readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs) @@ -157,7 +157,7 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri unlockOnDefer = true } - fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts) + fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, true, opts) if err != nil { return nil, toObjectErr(err, bucket, object) } @@ -298,7 +298,7 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje } checksumInfo := metaArr[index].Erasure.GetChecksumInfo(partNumber) partPath := pathJoin(object, metaArr[index].DataDir, fmt.Sprintf("part.%d", partNumber)) - readers[index] = newBitrotReader(disk, bucket, partPath, tillOffset, + readers[index] = newBitrotReader(disk, metaArr[index].Data, bucket, partPath, tillOffset, checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize()) // Prefer local disks @@ -337,7 +337,7 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje // getObject wrapper for erasure GetObject func (er erasureObjects) getObject(ctx context.Context, bucket, object string, startOffset, length int64, writer io.Writer, opts ObjectOptions) error { - fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts) + fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, true, opts) if err != nil { return toObjectErr(err, bucket, object) } @@ -364,11 +364,11 @@ func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object strin return er.getObjectInfo(ctx, bucket, object, opts) } -func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) { +func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, readData bool, opts ObjectOptions) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) { disks := er.getDisks() // Read metadata associated with the object from all disks. - metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID) + metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, readData) readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs) if err != nil { @@ -410,7 +410,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s // getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo. func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { - fi, _, _, err := er.getObjectFileInfo(ctx, bucket, object, opts) + fi, _, _, err := er.getObjectFileInfo(ctx, bucket, object, false, opts) if err != nil { return objInfo, toObjectErr(err, bucket, object) } @@ -1073,7 +1073,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin disks := er.getDisks() // Read metadata associated with the object from all disks. - metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID) + metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false) readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs) if err != nil { @@ -1134,7 +1134,7 @@ func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object st disks := er.getDisks() // Read metadata associated with the object from all disks. - metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID) + metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false) readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs) if err != nil { diff --git a/cmd/erasure-object_test.go b/cmd/erasure-object_test.go index a224489df..ed38b9f26 100644 --- a/cmd/erasure-object_test.go +++ b/cmd/erasure-object_test.go @@ -339,10 +339,17 @@ func TestGetObjectNoQuorum(t *testing.T) { return erasureDisks } z.serverPools[0].erasureDisksMu.Unlock() - // Fetch object from store. - err = xl.GetObject(ctx, bucket, object, 0, int64(len("abcd")), ioutil.Discard, "", opts) - if err != toObjectErr(errErasureReadQuorum, bucket, object) { - t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err) + switch f { + case 0, 2: + // Fetch object from store. + err = xl.GetObject(ctx, bucket, object, 0, int64(len("abcd")), ioutil.Discard, "", opts) + if err != toObjectErr(errErasureReadQuorum, bucket, object) { + t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err) + } + case 1: + if err = xl.GetObject(ctx, bucket, object, 0, int64(len("abcd")), ioutil.Discard, "", opts); err != nil { + t.Errorf("Expected GetObject to succeed, but failed with %v", err) + } } } @@ -521,7 +528,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts1, errs1 := readAllFileInfo(ctx, erasureDisks, bucket, object1, "") + parts1, errs1 := readAllFileInfo(ctx, erasureDisks, bucket, object1, "", false) parts1SC := globalStorageClass @@ -534,7 +541,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts2, errs2 := readAllFileInfo(ctx, erasureDisks, bucket, object2, "") + parts2, errs2 := readAllFileInfo(ctx, erasureDisks, bucket, object2, "", false) parts2SC := globalStorageClass // Object for test case 3 - No StorageClass defined, MetaData in PutObject requesting Standard Storage Class @@ -546,7 +553,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts3, errs3 := readAllFileInfo(ctx, erasureDisks, bucket, object3, "") + parts3, errs3 := readAllFileInfo(ctx, erasureDisks, bucket, object3, "", false) parts3SC := globalStorageClass // Object for test case 4 - Standard StorageClass defined as Parity 6, MetaData in PutObject requesting Standard Storage Class @@ -564,7 +571,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts4, errs4 := readAllFileInfo(ctx, erasureDisks, bucket, object4, "") + parts4, errs4 := readAllFileInfo(ctx, erasureDisks, bucket, object4, "", false) parts4SC := storageclass.Config{ Standard: storageclass.StorageClass{ Parity: 6, @@ -587,7 +594,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts5, errs5 := readAllFileInfo(ctx, erasureDisks, bucket, object5, "") + parts5, errs5 := readAllFileInfo(ctx, erasureDisks, bucket, object5, "", false) parts5SC := storageclass.Config{ RRS: storageclass.StorageClass{ Parity: 2, @@ -609,7 +616,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts6, errs6 := readAllFileInfo(ctx, erasureDisks, bucket, object6, "") + parts6, errs6 := readAllFileInfo(ctx, erasureDisks, bucket, object6, "", false) parts6SC := storageclass.Config{ RRS: storageclass.StorageClass{ Parity: 2, @@ -632,7 +639,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts7, errs7 := readAllFileInfo(ctx, erasureDisks, bucket, object7, "") + parts7, errs7 := readAllFileInfo(ctx, erasureDisks, bucket, object7, "", false) parts7SC := storageclass.Config{ Standard: storageclass.StorageClass{ Parity: 5, diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index 6fea4e5de..6490edd76 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -388,7 +388,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt continue } - _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "") + _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "", false) if err != nil { time.Sleep(retryDelay) retries++ @@ -397,7 +397,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt } // Read metadata associated with the object from all disks. - fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), ObjectOptions{}) + fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), false, ObjectOptions{}) if err != nil { switch toObjectErr(err, minioMetaBucket, o.objectPath(0)).(type) { case ObjectNotFound: @@ -463,7 +463,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt continue } - _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "") + _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "", false) if err != nil { time.Sleep(retryDelay) retries++ @@ -471,7 +471,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt } } // Load first part metadata... - fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{}) + fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), false, ObjectOptions{}) if err != nil { time.Sleep(retryDelay) retries++ diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 3d7d913fc..df3a1e2f3 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -252,11 +252,11 @@ func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi return d.disk.DeleteVersion(ctx, volume, path, fi) } -func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { +func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { if err := d.calcError(); err != nil { return FileInfo{}, err } - return d.disk.ReadVersion(ctx, volume, path, versionID) + return d.disk.ReadVersion(ctx, volume, path, versionID, readData) } func (d *naughtyDisk) WriteAll(ctx context.Context, volume string, path string, b []byte) (err error) { diff --git a/cmd/server-main.go b/cmd/server-main.go index bc6eed992..07f12311c 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -40,6 +40,7 @@ import ( "github.com/minio/minio/pkg/certs" "github.com/minio/minio/pkg/color" "github.com/minio/minio/pkg/env" + "github.com/minio/minio/pkg/madmin" ) // ServerFlags - server command specific flags @@ -217,9 +218,7 @@ func newAllSubsystems() { func initServer(ctx context.Context, newObject ObjectLayer) error { // Once the config is fully loaded, initialize the new object layer. - globalObjLayerMutex.Lock() - globalObjectAPI = newObject - globalObjLayerMutex.Unlock() + setObjectLayer(newObject) // Make sure to hold lock for entire migration to avoid // such that only one server should migrate the entire config @@ -319,6 +318,15 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) { return fmt.Errorf("Unable to list buckets to heal: %w", err) } + if globalIsErasure { + logger.Info(fmt.Sprintf("Verifying %d buckets are consistent across drives...", len(buckets))) + for _, bucket := range buckets { + if _, err = newObject.HealBucket(ctx, bucket.Name, madmin.HealOpts{}); err != nil { + return fmt.Errorf("Unable to list buckets to heal: %w", err) + } + } + } + // Initialize config system. if err = globalConfigSys.Init(newObject); err != nil { if errors.Is(err, errDiskNotFound) || @@ -433,9 +441,7 @@ func serverMain(ctx *cli.Context) { globalHTTPServerErrorCh <- httpServer.Start() }() - globalObjLayerMutex.Lock() - globalHTTPServer = httpServer - globalObjLayerMutex.Unlock() + setHTTPServer(httpServer) if globalIsDistErasure && globalEndpoints.FirstLocal() { for { @@ -489,9 +495,7 @@ func serverMain(ctx *cli.Context) { cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig) logger.FatalIf(err, "Unable to initialize disk caching") - globalObjLayerMutex.Lock() - globalCacheObjectAPI = cacheAPI - globalObjLayerMutex.Unlock() + setCacheObjectLayer(cacheAPI) } // Initialize users credentials and policies in background right after config has initialized. diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index 2e75d87b4..c9af3382f 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -24,6 +24,8 @@ import ( // DiskInfo is an extended type which returns current // disk usage per path. +//msgp:tuple DiskInfo +// The above means that any added/deleted fields are incompatible. type DiskInfo struct { Total uint64 Free uint64 @@ -42,6 +44,8 @@ type DiskInfo struct { type VolsInfo []VolInfo // VolInfo - represents volume stat information. +//msgp:tuple VolInfo +// The above means that any added/deleted fields are incompatible. type VolInfo struct { // Name of the volume. Name string @@ -147,6 +151,9 @@ type FileInfo struct { MarkDeleted bool // mark this version as deleted DeleteMarkerReplicationStatus string VersionPurgeStatus VersionPurgeStatusType + + // Data (actual data for the object) + Data []byte } // VersionPurgeStatusKey denotes purge status in metadata diff --git a/cmd/storage-datatypes_gen.go b/cmd/storage-datatypes_gen.go index 48404402b..4dd1e686a 100644 --- a/cmd/storage-datatypes_gen.go +++ b/cmd/storage-datatypes_gen.go @@ -8,205 +8,129 @@ import ( // DecodeMsg implements msgp.Decodable func (z *DiskInfo) DecodeMsg(dc *msgp.Reader) (err error) { - var field []byte - _ = field var zb0001 uint32 - zb0001, err = dc.ReadMapHeader() + zb0001, err = dc.ReadArrayHeader() if err != nil { err = msgp.WrapError(err) return } - for zb0001 > 0 { - zb0001-- - field, err = dc.ReadMapKeyPtr() - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "Total": - z.Total, err = dc.ReadUint64() - if err != nil { - err = msgp.WrapError(err, "Total") - return - } - case "Free": - z.Free, err = dc.ReadUint64() - if err != nil { - err = msgp.WrapError(err, "Free") - return - } - case "Used": - z.Used, err = dc.ReadUint64() - if err != nil { - err = msgp.WrapError(err, "Used") - return - } - case "UsedInodes": - z.UsedInodes, err = dc.ReadUint64() - if err != nil { - err = msgp.WrapError(err, "UsedInodes") - return - } - case "FSType": - z.FSType, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "FSType") - return - } - case "RootDisk": - z.RootDisk, err = dc.ReadBool() - if err != nil { - err = msgp.WrapError(err, "RootDisk") - return - } - case "Healing": - z.Healing, err = dc.ReadBool() - if err != nil { - err = msgp.WrapError(err, "Healing") - return - } - case "Endpoint": - z.Endpoint, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "Endpoint") - return - } - case "MountPath": - z.MountPath, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "MountPath") - return - } - case "ID": - z.ID, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "ID") - return - } - case "Error": - z.Error, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "Error") - return - } - default: - err = dc.Skip() - if err != nil { - err = msgp.WrapError(err) - return - } - } + if zb0001 != 11 { + err = msgp.ArrayError{Wanted: 11, Got: zb0001} + return } - return -} - -// EncodeMsg implements msgp.Encodable -func (z *DiskInfo) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 11 - // write "Total" - err = en.Append(0x8b, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) + z.Total, err = dc.ReadUint64() if err != nil { + err = msgp.WrapError(err, "Total") return } - err = en.WriteUint64(z.Total) + z.Free, err = dc.ReadUint64() if err != nil { - err = msgp.WrapError(err, "Total") + err = msgp.WrapError(err, "Free") return } - // write "Free" - err = en.Append(0xa4, 0x46, 0x72, 0x65, 0x65) + z.Used, err = dc.ReadUint64() if err != nil { + err = msgp.WrapError(err, "Used") return } - err = en.WriteUint64(z.Free) + z.UsedInodes, err = dc.ReadUint64() if err != nil { - err = msgp.WrapError(err, "Free") + err = msgp.WrapError(err, "UsedInodes") return } - // write "Used" - err = en.Append(0xa4, 0x55, 0x73, 0x65, 0x64) + z.FSType, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "FSType") return } - err = en.WriteUint64(z.Used) + z.RootDisk, err = dc.ReadBool() if err != nil { - err = msgp.WrapError(err, "Used") + err = msgp.WrapError(err, "RootDisk") return } - // write "UsedInodes" - err = en.Append(0xaa, 0x55, 0x73, 0x65, 0x64, 0x49, 0x6e, 0x6f, 0x64, 0x65, 0x73) + z.Healing, err = dc.ReadBool() if err != nil { + err = msgp.WrapError(err, "Healing") return } - err = en.WriteUint64(z.UsedInodes) + z.Endpoint, err = dc.ReadString() if err != nil { - err = msgp.WrapError(err, "UsedInodes") + err = msgp.WrapError(err, "Endpoint") return } - // write "FSType" - err = en.Append(0xa6, 0x46, 0x53, 0x54, 0x79, 0x70, 0x65) + z.MountPath, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "MountPath") return } - err = en.WriteString(z.FSType) + z.ID, err = dc.ReadString() if err != nil { - err = msgp.WrapError(err, "FSType") + err = msgp.WrapError(err, "ID") return } - // write "RootDisk" - err = en.Append(0xa8, 0x52, 0x6f, 0x6f, 0x74, 0x44, 0x69, 0x73, 0x6b) + z.Error, err = dc.ReadString() if err != nil { + err = msgp.WrapError(err, "Error") return } - err = en.WriteBool(z.RootDisk) + return +} + +// EncodeMsg implements msgp.Encodable +func (z *DiskInfo) EncodeMsg(en *msgp.Writer) (err error) { + // array header, size 11 + err = en.Append(0x9b) if err != nil { - err = msgp.WrapError(err, "RootDisk") return } - // write "Healing" - err = en.Append(0xa7, 0x48, 0x65, 0x61, 0x6c, 0x69, 0x6e, 0x67) + err = en.WriteUint64(z.Total) if err != nil { + err = msgp.WrapError(err, "Total") return } - err = en.WriteBool(z.Healing) + err = en.WriteUint64(z.Free) if err != nil { - err = msgp.WrapError(err, "Healing") + err = msgp.WrapError(err, "Free") return } - // write "Endpoint" - err = en.Append(0xa8, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74) + err = en.WriteUint64(z.Used) if err != nil { + err = msgp.WrapError(err, "Used") return } - err = en.WriteString(z.Endpoint) + err = en.WriteUint64(z.UsedInodes) if err != nil { - err = msgp.WrapError(err, "Endpoint") + err = msgp.WrapError(err, "UsedInodes") return } - // write "MountPath" - err = en.Append(0xa9, 0x4d, 0x6f, 0x75, 0x6e, 0x74, 0x50, 0x61, 0x74, 0x68) + err = en.WriteString(z.FSType) if err != nil { + err = msgp.WrapError(err, "FSType") return } - err = en.WriteString(z.MountPath) + err = en.WriteBool(z.RootDisk) if err != nil { - err = msgp.WrapError(err, "MountPath") + err = msgp.WrapError(err, "RootDisk") return } - // write "ID" - err = en.Append(0xa2, 0x49, 0x44) + err = en.WriteBool(z.Healing) if err != nil { + err = msgp.WrapError(err, "Healing") return } - err = en.WriteString(z.ID) + err = en.WriteString(z.Endpoint) if err != nil { - err = msgp.WrapError(err, "ID") + err = msgp.WrapError(err, "Endpoint") return } - // write "Error" - err = en.Append(0xa5, 0x45, 0x72, 0x72, 0x6f, 0x72) + err = en.WriteString(z.MountPath) if err != nil { + err = msgp.WrapError(err, "MountPath") + return + } + err = en.WriteString(z.ID) + if err != nil { + err = msgp.WrapError(err, "ID") return } err = en.WriteString(z.Error) @@ -220,134 +144,88 @@ func (z *DiskInfo) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *DiskInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 11 - // string "Total" - o = append(o, 0x8b, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) + // array header, size 11 + o = append(o, 0x9b) o = msgp.AppendUint64(o, z.Total) - // string "Free" - o = append(o, 0xa4, 0x46, 0x72, 0x65, 0x65) o = msgp.AppendUint64(o, z.Free) - // string "Used" - o = append(o, 0xa4, 0x55, 0x73, 0x65, 0x64) o = msgp.AppendUint64(o, z.Used) - // string "UsedInodes" - o = append(o, 0xaa, 0x55, 0x73, 0x65, 0x64, 0x49, 0x6e, 0x6f, 0x64, 0x65, 0x73) o = msgp.AppendUint64(o, z.UsedInodes) - // string "FSType" - o = append(o, 0xa6, 0x46, 0x53, 0x54, 0x79, 0x70, 0x65) o = msgp.AppendString(o, z.FSType) - // string "RootDisk" - o = append(o, 0xa8, 0x52, 0x6f, 0x6f, 0x74, 0x44, 0x69, 0x73, 0x6b) o = msgp.AppendBool(o, z.RootDisk) - // string "Healing" - o = append(o, 0xa7, 0x48, 0x65, 0x61, 0x6c, 0x69, 0x6e, 0x67) o = msgp.AppendBool(o, z.Healing) - // string "Endpoint" - o = append(o, 0xa8, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74) o = msgp.AppendString(o, z.Endpoint) - // string "MountPath" - o = append(o, 0xa9, 0x4d, 0x6f, 0x75, 0x6e, 0x74, 0x50, 0x61, 0x74, 0x68) o = msgp.AppendString(o, z.MountPath) - // string "ID" - o = append(o, 0xa2, 0x49, 0x44) o = msgp.AppendString(o, z.ID) - // string "Error" - o = append(o, 0xa5, 0x45, 0x72, 0x72, 0x6f, 0x72) o = msgp.AppendString(o, z.Error) return } // UnmarshalMsg implements msgp.Unmarshaler func (z *DiskInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { - var field []byte - _ = field var zb0001 uint32 - zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { err = msgp.WrapError(err) return } - for zb0001 > 0 { - zb0001-- - field, bts, err = msgp.ReadMapKeyZC(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "Total": - z.Total, bts, err = msgp.ReadUint64Bytes(bts) - if err != nil { - err = msgp.WrapError(err, "Total") - return - } - case "Free": - z.Free, bts, err = msgp.ReadUint64Bytes(bts) - if err != nil { - err = msgp.WrapError(err, "Free") - return - } - case "Used": - z.Used, bts, err = msgp.ReadUint64Bytes(bts) - if err != nil { - err = msgp.WrapError(err, "Used") - return - } - case "UsedInodes": - z.UsedInodes, bts, err = msgp.ReadUint64Bytes(bts) - if err != nil { - err = msgp.WrapError(err, "UsedInodes") - return - } - case "FSType": - z.FSType, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "FSType") - return - } - case "RootDisk": - z.RootDisk, bts, err = msgp.ReadBoolBytes(bts) - if err != nil { - err = msgp.WrapError(err, "RootDisk") - return - } - case "Healing": - z.Healing, bts, err = msgp.ReadBoolBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Healing") - return - } - case "Endpoint": - z.Endpoint, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Endpoint") - return - } - case "MountPath": - z.MountPath, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "MountPath") - return - } - case "ID": - z.ID, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "ID") - return - } - case "Error": - z.Error, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Error") - return - } - default: - bts, err = msgp.Skip(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - } + if zb0001 != 11 { + err = msgp.ArrayError{Wanted: 11, Got: zb0001} + return + } + z.Total, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "Total") + return + } + z.Free, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "Free") + return + } + z.Used, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "Used") + return + } + z.UsedInodes, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "UsedInodes") + return + } + z.FSType, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "FSType") + return + } + z.RootDisk, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "RootDisk") + return + } + z.Healing, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Healing") + return + } + z.Endpoint, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Endpoint") + return + } + z.MountPath, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "MountPath") + return + } + z.ID, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "ID") + return + } + z.Error, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Error") + return } o = bts return @@ -355,7 +233,7 @@ func (z *DiskInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *DiskInfo) Msgsize() (s int) { - s = 1 + 6 + msgp.Uint64Size + 5 + msgp.Uint64Size + 5 + msgp.Uint64Size + 11 + msgp.Uint64Size + 7 + msgp.StringPrefixSize + len(z.FSType) + 9 + msgp.BoolSize + 8 + msgp.BoolSize + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 10 + msgp.StringPrefixSize + len(z.MountPath) + 3 + msgp.StringPrefixSize + len(z.ID) + 6 + msgp.StringPrefixSize + len(z.Error) + s = 1 + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.StringPrefixSize + len(z.FSType) + msgp.BoolSize + msgp.BoolSize + msgp.StringPrefixSize + len(z.Endpoint) + msgp.StringPrefixSize + len(z.MountPath) + msgp.StringPrefixSize + len(z.ID) + msgp.StringPrefixSize + len(z.Error) return } @@ -367,8 +245,8 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err) return } - if zb0001 != 17 { - err = msgp.ArrayError{Wanted: 17, Got: zb0001} + if zb0001 != 18 { + err = msgp.ArrayError{Wanted: 18, Got: zb0001} return } z.Volume, err = dc.ReadString() @@ -497,13 +375,18 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) { } z.VersionPurgeStatus = VersionPurgeStatusType(zb0004) } + z.Data, err = dc.ReadBytes(z.Data) + if err != nil { + err = msgp.WrapError(err, "Data") + return + } return } // EncodeMsg implements msgp.Encodable func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) { - // array header, size 17 - err = en.Append(0xdc, 0x0, 0x11) + // array header, size 18 + err = en.Append(0xdc, 0x0, 0x12) if err != nil { return } @@ -611,14 +494,19 @@ func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "VersionPurgeStatus") return } + err = en.WriteBytes(z.Data) + if err != nil { + err = msgp.WrapError(err, "Data") + return + } return } // MarshalMsg implements msgp.Marshaler func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // array header, size 17 - o = append(o, 0xdc, 0x0, 0x11) + // array header, size 18 + o = append(o, 0xdc, 0x0, 0x12) o = msgp.AppendString(o, z.Volume) o = msgp.AppendString(o, z.Name) o = msgp.AppendString(o, z.VersionID) @@ -651,6 +539,7 @@ func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.AppendBool(o, z.MarkDeleted) o = msgp.AppendString(o, z.DeleteMarkerReplicationStatus) o = msgp.AppendString(o, string(z.VersionPurgeStatus)) + o = msgp.AppendBytes(o, z.Data) return } @@ -662,8 +551,8 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err) return } - if zb0001 != 17 { - err = msgp.ArrayError{Wanted: 17, Got: zb0001} + if zb0001 != 18 { + err = msgp.ArrayError{Wanted: 18, Got: zb0001} return } z.Volume, bts, err = msgp.ReadStringBytes(bts) @@ -792,6 +681,11 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { } z.VersionPurgeStatus = VersionPurgeStatusType(zb0004) } + z.Data, bts, err = msgp.ReadBytesBytes(bts, z.Data) + if err != nil { + err = msgp.WrapError(err, "Data") + return + } o = bts return } @@ -809,7 +703,7 @@ func (z *FileInfo) Msgsize() (s int) { for za0003 := range z.Parts { s += z.Parts[za0003].Msgsize() } - s += z.Erasure.Msgsize() + msgp.BoolSize + msgp.StringPrefixSize + len(z.DeleteMarkerReplicationStatus) + msgp.StringPrefixSize + len(string(z.VersionPurgeStatus)) + s += z.Erasure.Msgsize() + msgp.BoolSize + msgp.StringPrefixSize + len(z.DeleteMarkerReplicationStatus) + msgp.StringPrefixSize + len(string(z.VersionPurgeStatus)) + msgp.BytesPrefixSize + len(z.Data) return } @@ -1432,50 +1326,33 @@ func (z VersionPurgeStatusType) Msgsize() (s int) { // DecodeMsg implements msgp.Decodable func (z *VolInfo) DecodeMsg(dc *msgp.Reader) (err error) { - var field []byte - _ = field var zb0001 uint32 - zb0001, err = dc.ReadMapHeader() + zb0001, err = dc.ReadArrayHeader() if err != nil { err = msgp.WrapError(err) return } - for zb0001 > 0 { - zb0001-- - field, err = dc.ReadMapKeyPtr() - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "Name": - z.Name, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "Name") - return - } - case "Created": - z.Created, err = dc.ReadTime() - if err != nil { - err = msgp.WrapError(err, "Created") - return - } - default: - err = dc.Skip() - if err != nil { - err = msgp.WrapError(err) - return - } - } + if zb0001 != 2 { + err = msgp.ArrayError{Wanted: 2, Got: zb0001} + return + } + z.Name, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Name") + return + } + z.Created, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "Created") + return } return } // EncodeMsg implements msgp.Encodable func (z VolInfo) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 2 - // write "Name" - err = en.Append(0x82, 0xa4, 0x4e, 0x61, 0x6d, 0x65) + // array header, size 2 + err = en.Append(0x92) if err != nil { return } @@ -1484,11 +1361,6 @@ func (z VolInfo) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Name") return } - // write "Created" - err = en.Append(0xa7, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64) - if err != nil { - return - } err = en.WriteTime(z.Created) if err != nil { err = msgp.WrapError(err, "Created") @@ -1500,53 +1372,34 @@ func (z VolInfo) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z VolInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 2 - // string "Name" - o = append(o, 0x82, 0xa4, 0x4e, 0x61, 0x6d, 0x65) + // array header, size 2 + o = append(o, 0x92) o = msgp.AppendString(o, z.Name) - // string "Created" - o = append(o, 0xa7, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64) o = msgp.AppendTime(o, z.Created) return } // UnmarshalMsg implements msgp.Unmarshaler func (z *VolInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { - var field []byte - _ = field var zb0001 uint32 - zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { err = msgp.WrapError(err) return } - for zb0001 > 0 { - zb0001-- - field, bts, err = msgp.ReadMapKeyZC(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "Name": - z.Name, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Name") - return - } - case "Created": - z.Created, bts, err = msgp.ReadTimeBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Created") - return - } - default: - bts, err = msgp.Skip(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - } + if zb0001 != 2 { + err = msgp.ArrayError{Wanted: 2, Got: zb0001} + return + } + z.Name, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Name") + return + } + z.Created, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Created") + return } o = bts return @@ -1554,7 +1407,7 @@ func (z *VolInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z VolInfo) Msgsize() (s int) { - s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 8 + msgp.TimeSize + s = 1 + msgp.StringPrefixSize + len(z.Name) + msgp.TimeSize return } @@ -1572,41 +1425,25 @@ func (z *VolsInfo) DecodeMsg(dc *msgp.Reader) (err error) { (*z) = make(VolsInfo, zb0002) } for zb0001 := range *z { - var field []byte - _ = field var zb0003 uint32 - zb0003, err = dc.ReadMapHeader() + zb0003, err = dc.ReadArrayHeader() if err != nil { err = msgp.WrapError(err, zb0001) return } - for zb0003 > 0 { - zb0003-- - field, err = dc.ReadMapKeyPtr() - if err != nil { - err = msgp.WrapError(err, zb0001) - return - } - switch msgp.UnsafeString(field) { - case "Name": - (*z)[zb0001].Name, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, zb0001, "Name") - return - } - case "Created": - (*z)[zb0001].Created, err = dc.ReadTime() - if err != nil { - err = msgp.WrapError(err, zb0001, "Created") - return - } - default: - err = dc.Skip() - if err != nil { - err = msgp.WrapError(err, zb0001) - return - } - } + if zb0003 != 2 { + err = msgp.ArrayError{Wanted: 2, Got: zb0003} + return + } + (*z)[zb0001].Name, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, zb0001, "Name") + return + } + (*z)[zb0001].Created, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, zb0001, "Created") + return } } return @@ -1620,9 +1457,8 @@ func (z VolsInfo) EncodeMsg(en *msgp.Writer) (err error) { return } for zb0004 := range z { - // map header, size 2 - // write "Name" - err = en.Append(0x82, 0xa4, 0x4e, 0x61, 0x6d, 0x65) + // array header, size 2 + err = en.Append(0x92) if err != nil { return } @@ -1631,11 +1467,6 @@ func (z VolsInfo) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, zb0004, "Name") return } - // write "Created" - err = en.Append(0xa7, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64) - if err != nil { - return - } err = en.WriteTime(z[zb0004].Created) if err != nil { err = msgp.WrapError(err, zb0004, "Created") @@ -1650,12 +1481,9 @@ func (z VolsInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) o = msgp.AppendArrayHeader(o, uint32(len(z))) for zb0004 := range z { - // map header, size 2 - // string "Name" - o = append(o, 0x82, 0xa4, 0x4e, 0x61, 0x6d, 0x65) + // array header, size 2 + o = append(o, 0x92) o = msgp.AppendString(o, z[zb0004].Name) - // string "Created" - o = append(o, 0xa7, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64) o = msgp.AppendTime(o, z[zb0004].Created) } return @@ -1675,41 +1503,25 @@ func (z *VolsInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { (*z) = make(VolsInfo, zb0002) } for zb0001 := range *z { - var field []byte - _ = field var zb0003 uint32 - zb0003, bts, err = msgp.ReadMapHeaderBytes(bts) + zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { err = msgp.WrapError(err, zb0001) return } - for zb0003 > 0 { - zb0003-- - field, bts, err = msgp.ReadMapKeyZC(bts) - if err != nil { - err = msgp.WrapError(err, zb0001) - return - } - switch msgp.UnsafeString(field) { - case "Name": - (*z)[zb0001].Name, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, zb0001, "Name") - return - } - case "Created": - (*z)[zb0001].Created, bts, err = msgp.ReadTimeBytes(bts) - if err != nil { - err = msgp.WrapError(err, zb0001, "Created") - return - } - default: - bts, err = msgp.Skip(bts) - if err != nil { - err = msgp.WrapError(err, zb0001) - return - } - } + if zb0003 != 2 { + err = msgp.ArrayError{Wanted: 2, Got: zb0003} + return + } + (*z)[zb0001].Name, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, zb0001, "Name") + return + } + (*z)[zb0001].Created, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, zb0001, "Created") + return } } o = bts @@ -1720,7 +1532,7 @@ func (z *VolsInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { func (z VolsInfo) Msgsize() (s int) { s = msgp.ArrayHeaderSize for zb0004 := range z { - s += 1 + 5 + msgp.StringPrefixSize + len(z[zb0004].Name) + 8 + msgp.TimeSize + s += 1 + msgp.StringPrefixSize + len(z[zb0004].Name) + msgp.TimeSize } return } diff --git a/cmd/storage-datatypes_test.go b/cmd/storage-datatypes_test.go index 2dca7d00c..3b94013fc 100644 --- a/cmd/storage-datatypes_test.go +++ b/cmd/storage-datatypes_test.go @@ -21,10 +21,32 @@ import ( "encoding/gob" "io/ioutil" "testing" + "time" "github.com/tinylib/msgp/msgp" ) +func BenchmarkDecodeVolInfoMsgp(b *testing.B) { + v := VolInfo{ + Name: "uuid", + Created: time.Now(), + } + var buf bytes.Buffer + msgp.Encode(&buf, &v) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.Log("Size:", buf.Len(), "bytes") + b.SetBytes(1) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func BenchmarkDecodeDiskInfoMsgp(b *testing.B) { v := DiskInfo{ Total: 1000, @@ -40,10 +62,10 @@ func BenchmarkDecodeDiskInfoMsgp(b *testing.B) { } var buf bytes.Buffer msgp.Encode(&buf, &v) - b.SetBytes(1) rd := msgp.NewEndlessReader(buf.Bytes(), b) dc := msgp.NewReader(rd) b.Log("Size:", buf.Len(), "bytes") + b.SetBytes(1) b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { @@ -139,10 +161,10 @@ func BenchmarkDecodeFileInfoMsgp(b *testing.B) { v := FileInfo{Volume: "testbucket", Name: "src/compress/zlib/reader_test.go", VersionID: "", IsLatest: true, Deleted: false, DataDir: "5e0153cc-621a-4267-8cb6-4919140d53b3", XLV1: false, ModTime: UTCNow(), Size: 3430, Mode: 0x0, Metadata: map[string]string{"X-Minio-Internal-Server-Side-Encryption-Iv": "jIJPsrkkVYYMvc7edBrNl+7zcM7+ZwXqMb/YAjBO/ck=", "X-Minio-Internal-Server-Side-Encryption-S3-Kms-Key-Id": "my-minio-key", "X-Minio-Internal-Server-Side-Encryption-S3-Kms-Sealed-Key": "IAAfAP2p7ZLv3UpLwBnsKkF2mtWba0qoY42tymK0szRgGvAxBNcXyHXYooe9dQpeeEJWgKUa/8R61oCy1mFwIg==", "X-Minio-Internal-Server-Side-Encryption-S3-Sealed-Key": "IAAfAPFYRDkHVirJBJxBixNj3PLWt78dFuUTyTLIdLG820J7XqLPBO4gpEEEWw/DoTsJIb+apnaem+rKtQ1h3Q==", "X-Minio-Internal-Server-Side-Encryption-Seal-Algorithm": "DAREv2-HMAC-SHA256", "content-type": "application/octet-stream", "etag": "20000f00e2c3709dc94905c6ce31e1cadbd1c064e14acdcd44cf0ac2db777eeedd88d639fcd64de16851ade8b21a9a1a"}, Parts: []ObjectPartInfo{{ETag: "", Number: 1, Size: 3430, ActualSize: 3398}}, Erasure: ErasureInfo{Algorithm: "reedsolomon", DataBlocks: 2, ParityBlocks: 2, BlockSize: 10485760, Index: 3, Distribution: []int{3, 4, 1, 2}, Checksums: []ChecksumInfo{{PartNumber: 1, Algorithm: 0x3, Hash: []uint8{}}}}} var buf bytes.Buffer msgp.Encode(&buf, &v) - b.SetBytes(1) rd := msgp.NewEndlessReader(buf.Bytes(), b) dc := msgp.NewReader(rd) b.Log("Size:", buf.Len(), "bytes") + b.SetBytes(1) b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 1d92bee25..8df8308e3 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -58,7 +58,7 @@ type StorageAPI interface { DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error - ReadVersion(ctx context.Context, volume, path, versionID string) (FileInfo, error) + ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (FileInfo, error) RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) error // File operations. diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index ece9c387c..cc546ad51 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -398,11 +398,12 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP return err } -func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { +func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { values := make(url.Values) values.Set(storageRESTVolume, volume) values.Set(storageRESTFilePath, path) values.Set(storageRESTVersionID, versionID) + values.Set(storageRESTReadData, strconv.FormatBool(readData)) respBody, err := client.call(ctx, storageRESTMethodReadVersion, values, nil, -1) if err != nil { diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index bc93275c1..79564dad8 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -17,7 +17,7 @@ package cmd const ( - storageRESTVersion = "v22" // Add dir listing and recursive delete operation. + storageRESTVersion = "v23" // Add small file optimization storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) @@ -59,6 +59,7 @@ const ( storageRESTDirPath = "dir-path" storageRESTFilePath = "file-path" storageRESTVersionID = "version-id" + storageRESTReadData = "read-data" storageRESTTotalVersions = "total-versions" storageRESTSrcVolume = "source-volume" storageRESTSrcPath = "source-path" diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 6cd3d1aa1..5ef5532b0 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -327,7 +327,12 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re volume := vars[storageRESTVolume] filePath := vars[storageRESTFilePath] versionID := vars[storageRESTVersionID] - fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID) + readData, err := strconv.ParseBool(vars[storageRESTReadData]) + if err != nil { + s.writeErrorResponse(w, err) + return + } + fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID, readData) if err != nil { s.writeErrorResponse(w, err) return @@ -1037,7 +1042,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersion).HandlerFunc(httpTraceHdrs(server.DeleteVersionHandler)). Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(httpTraceHdrs(server.ReadVersionHandler)). - Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID)...) + Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID, storageRESTReadData)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameData).HandlerFunc(httpTraceHdrs(server.RenameDataHandler)). Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDataDir, storageRESTDstVolume, storageRESTDstPath)...) diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 7ae615ae5..646e2596d 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -272,12 +272,12 @@ func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path s return p.storage.WriteMetadata(ctx, volume, path, fi) } -func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { +func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { if err = p.checkDiskStale(); err != nil { return fi, err } - return p.storage.ReadVersion(ctx, volume, path, versionID) + return p.storage.ReadVersion(ctx, volume, path, versionID, readData) } func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) { diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 909f7140b..cccda8eb1 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -35,7 +35,6 @@ import ( "strings" "sync" "sync/atomic" - "syscall" "time" "github.com/dustin/go-humanize" @@ -1123,7 +1122,14 @@ func (s *xlStorage) renameLegacyMetadata(volume, path string) error { } // ReadVersion - reads metadata and returns FileInfo at path `xl.meta` -func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { +// for all objects less than `128KiB` this call returns data as well +// along with metadata. +func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { + volumeDir, err := s.getVolDir(volume) + if err != nil { + return fi, err + } + buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile)) if err != nil { if err == errFileNotFound { @@ -1158,7 +1164,62 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str return fi, errFileNotFound } - return getFileInfo(buf, volume, path, versionID) + fi, err = getFileInfo(buf, volume, path, versionID) + if err != nil { + return fi, err + } + + if readData { + // Reading data for small objects when + // - object has not yet transitioned + // - object size lesser than 32KiB + // - object has maximum of 1 parts + if fi.TransitionStatus == "" && fi.DataDir != "" && fi.Size < 32*humanize.KiByte && len(fi.Parts) == 1 { + fi.Data, err = s.readAllData(volumeDir, pathJoin(volumeDir, path, fi.DataDir, fmt.Sprintf("part.%d", fi.Parts[0].Number))) + if err != nil { + return fi, err + } + } + } + + return fi, nil +} + +func (s *xlStorage) readAllData(volumeDir, filePath string) (buf []byte, err error) { + var file *os.File + if globalStorageClass.GetDMA() == storageclass.DMAReadWrite { + file, err = disk.OpenFileDirectIO(filePath, os.O_RDONLY, 0666) + } else { + file, err = os.Open(filePath) + } + if err != nil { + if osIsNotExist(err) { + // Check if the object doesn't exist because its bucket + // is missing in order to return the correct error. + _, err = os.Stat(volumeDir) + if err != nil && osIsNotExist(err) { + return nil, errVolumeNotFound + } + return nil, errFileNotFound + } else if osIsPermission(err) { + return nil, errFileAccessDenied + } else if isSysErrNotDir(err) || isSysErrIsDir(err) { + return nil, errFileNotFound + } else if isSysErrHandleInvalid(err) { + // This case is special and needs to be handled for windows. + return nil, errFileNotFound + } else if isSysErrIO(err) { + return nil, errFaultyDisk + } else if isSysErrTooManyFiles(err) { + return nil, errTooManyOpenFiles + } else if isSysErrInvalidArg(err) { + return nil, errFileNotFound + } + return nil, err + } + defer file.Close() + + return ioutil.ReadAll(file) } // ReadAll reads from r until an error or EOF and returns the data it read. @@ -1184,7 +1245,6 @@ func (s *xlStorage) ReadAll(ctx context.Context, volume string, path string) (bu return nil, err } - // Open the file for reading. buf, err = ioutil.ReadFile(filePath) if err != nil { if osIsNotExist(err) { @@ -1197,7 +1257,7 @@ func (s *xlStorage) ReadAll(ctx context.Context, volume string, path string) (bu return nil, errFileNotFound } else if osIsPermission(err) { return nil, errFileAccessDenied - } else if errors.Is(err, syscall.ENOTDIR) || errors.Is(err, syscall.EISDIR) { + } else if isSysErrNotDir(err) || isSysErrIsDir(err) { return nil, errFileNotFound } else if isSysErrHandleInvalid(err) { // This case is special and needs to be handled for windows. @@ -1206,10 +1266,12 @@ func (s *xlStorage) ReadAll(ctx context.Context, volume string, path string) (bu return nil, errFaultyDisk } else if isSysErrTooManyFiles(err) { return nil, errTooManyOpenFiles + } else if isSysErrInvalidArg(err) { + return nil, errFileNotFound } - return nil, err } + return buf, nil } @@ -1330,6 +1392,7 @@ func (s *xlStorage) openFile(volume, path string, mode int) (f *os.File, err err if err != nil { return nil, err } + // Stat a volume entry. _, err = os.Stat(volumeDir) if err != nil {