diff --git a/cmd/erasure-decode.go b/cmd/erasure-decode.go index 14861cea2..53f2d6eb8 100644 --- a/cmd/erasure-decode.go +++ b/cmd/erasure-decode.go @@ -33,6 +33,7 @@ type parallelReader struct { readers []io.ReaderAt orgReaders []io.ReaderAt dataBlocks int + errs []error offset int64 shardSize int64 shardFileSize int64 @@ -49,6 +50,7 @@ func newParallelReader(readers []io.ReaderAt, e Erasure, offset, totalLength int return ¶llelReader{ readers: readers, orgReaders: readers, + errs: make([]error, len(readers)), dataBlocks: e.dataBlocks, offset: (offset / e.blockSize) * e.ShardSize(), shardSize: e.ShardSize(), @@ -169,6 +171,8 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) { // This will be communicated upstream. p.orgReaders[bufIdx] = nil p.readers[i] = nil + p.errs[i] = err + // Since ReadAt returned error, trigger another read. readTriggerCh <- true return @@ -191,7 +195,7 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) { return newBuf, nil } - return nil, errErasureReadQuorum + return nil, reduceReadQuorumErrs(context.Background(), p.errs, objectOpIgnoredErrs, p.dataBlocks) } type errDecodeHealRequired struct { diff --git a/cmd/erasure-healing_test.go b/cmd/erasure-healing_test.go index ef768c6fa..d8f0a8ce0 100644 --- a/cmd/erasure-healing_test.go +++ b/cmd/erasure-healing_test.go @@ -67,7 +67,7 @@ func TestHealing(t *testing.T) { } disk := er.getDisks()[0] - fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false) + fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "") if err != nil { t.Fatal(err) } @@ -84,7 +84,7 @@ func TestHealing(t *testing.T) { t.Fatal(err) } - fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false) + fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "") if err != nil { t.Fatal(err) } @@ -113,7 +113,7 @@ func TestHealing(t *testing.T) { t.Fatal(err) } - fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", false) + fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "") if err != nil { t.Fatal(err) } diff --git a/cmd/erasure-metadata-utils.go b/cmd/erasure-metadata-utils.go index bce748921..db4aa9186 100644 --- a/cmd/erasure-metadata-utils.go +++ b/cmd/erasure-metadata-utils.go @@ -113,21 +113,9 @@ func hashOrder(key string, cardinality int) []int { return nums } -// Reads all `xl.meta` metadata as a FileInfo slice and checks if the data dir exists as well, -// otherwise returns errFileNotFound (or errFileVersionNotFound) -func getAllObjectFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string) ([]FileInfo, []error) { - return readVersionFromDisks(ctx, disks, bucket, object, versionID, true) -} - // Reads all `xl.meta` metadata as a FileInfo slice. // Returns error slice indicating the failed metadata reads. func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string) ([]FileInfo, []error) { - return readVersionFromDisks(ctx, disks, bucket, object, versionID, false) -} - -// Reads all `xl.meta` metadata as a FileInfo slice and checks if the data dir -// exists as well, if checkDataDir is set to true. -func readVersionFromDisks(ctx context.Context, disks []StorageAPI, bucket, object, versionID string, checkDataDir bool) ([]FileInfo, []error) { metadataArray := make([]FileInfo, len(disks)) g := errgroup.WithNErrs(len(disks)) @@ -138,7 +126,7 @@ func readVersionFromDisks(ctx context.Context, disks []StorageAPI, bucket, objec if disks[index] == nil { return errDiskNotFound } - metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID, checkDataDir) + metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID) if err != nil { if !IsErr(err, errFileNotFound, errVolumeNotFound, errFileVersionNotFound, errDiskNotFound) { logger.LogOnceIf(ctx, err, disks[index].String()) diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index cc56e7e95..a4fb8b215 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -113,7 +113,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto } for _, uploadIDDir := range uploadIDDirs { uploadIDPath := pathJoin(shaDir, uploadIDDir) - fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "", false) + fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "") if err != nil { continue } @@ -127,7 +127,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto return } for _, tmpDir := range tmpDirs { - fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "", false) + fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "") if err != nil { continue } @@ -181,7 +181,7 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec if populatedUploadIds.Contains(uploadID) { continue } - fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "", false) + fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "") if err != nil { return result, toObjectErr(err, bucket, object) } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 64e887a08..f413e4354 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -368,7 +368,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s disks := er.getDisks() // Read metadata associated with the object from all disks. - metaArr, errs := getAllObjectFileInfo(ctx, disks, bucket, object, opts.VersionID) + metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID) readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs) if err != nil { diff --git a/cmd/erasure-object_test.go b/cmd/erasure-object_test.go index 2016b843f..a224489df 100644 --- a/cmd/erasure-object_test.go +++ b/cmd/erasure-object_test.go @@ -288,16 +288,38 @@ func TestGetObjectNoQuorum(t *testing.T) { bucket := "bucket" object := "object" opts := ObjectOptions{} - // Create "object" under "bucket". + + // Test use case 1: All disks are online, xl.meta are present, but data are missing _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts) if err != nil { t.Fatal(err) } - // Make 9 disks offline, which leaves less than quorum number of disks + for _, disk := range xl.getDisks() { + files, _ := disk.ListDir(ctx, bucket, object, -1) + for _, file := range files { + if file != "xl.meta" { + disk.Delete(ctx, bucket, pathJoin(object, file), true) + } + } + } + + err = xl.GetObject(ctx, bucket, object, 0, int64(len("abcd")), ioutil.Discard, "", opts) + if err != toObjectErr(errFileNotFound, bucket, object) { + t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err) + } + + // Test use case 2: Make 9 disks offline, which leaves less than quorum number of disks // in a 16 disk Erasure setup. The original disks are 'replaced' with // naughtyDisks that fail after 'f' successful StorageAPI method // invocations, where f - [0,2) + + // Create "object" under "bucket". + _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts) + if err != nil { + t.Fatal(err) + } + for f := 0; f < 2; f++ { diskErrors := make(map[int]error) for i := 0; i <= f; i++ { @@ -320,9 +342,84 @@ func TestGetObjectNoQuorum(t *testing.T) { // Fetch object from store. err = xl.GetObject(ctx, bucket, object, 0, int64(len("abcd")), ioutil.Discard, "", opts) if err != toObjectErr(errErasureReadQuorum, bucket, object) { - t.Errorf("Expected putObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err) + t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err) + } + } + +} + +func TestHeadObjectNoQuorum(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create an instance of xl backend. + obj, fsDirs, err := prepareErasure16(ctx) + if err != nil { + t.Fatal(err) + } + // Cleanup backend directories. + defer obj.Shutdown(context.Background()) + defer removeRoots(fsDirs) + + z := obj.(*erasureServerPools) + xl := z.serverPools[0].sets[0] + + // Create "bucket" + err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{}) + if err != nil { + t.Fatal(err) + } + + bucket := "bucket" + object := "object" + opts := ObjectOptions{} + + // Test use case 1: All disks are online, xl.meta are present, but data are missing + _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts) + if err != nil { + t.Fatal(err) + } + for _, disk := range xl.getDisks() { + files, _ := disk.ListDir(ctx, bucket, object, -1) + for _, file := range files { + if file != "xl.meta" { + disk.Delete(ctx, bucket, pathJoin(object, file), true) + } } } + + _, err = xl.GetObjectInfo(ctx, bucket, object, opts) + if err != nil { + t.Errorf("Expected StatObject to succeed if data dir are not found, but failed with %v", err) + } + + // Test use case 2: Make 9 disks offline, which leaves less than quorum number of disks + // in a 16 disk Erasure setup. The original disks are 'replaced' with + // naughtyDisks that fail after 'f' successful StorageAPI method + // invocations, where f - [0,2) + + // Create "object" under "bucket". + _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts) + if err != nil { + t.Fatal(err) + } + + erasureDisks := xl.getDisks() + for i := range erasureDisks[:10] { + erasureDisks[i] = nil + } + + z.serverPools[0].erasureDisksMu.Lock() + xl.getDisks = func() []StorageAPI { + return erasureDisks + } + z.serverPools[0].erasureDisksMu.Unlock() + + // Fetch object from store. + _, err = xl.GetObjectInfo(ctx, bucket, object, opts) + if err != toObjectErr(errErasureReadQuorum, bucket, object) { + t.Errorf("Expected getObjectInfo to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err) + } } func TestPutObjectNoQuorum(t *testing.T) { diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index b6f28b0bd..6fea4e5de 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -388,7 +388,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt continue } - _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "", false) + _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "") if err != nil { time.Sleep(retryDelay) retries++ @@ -463,7 +463,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt continue } - _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "", false) + _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "") if err != nil { time.Sleep(retryDelay) retries++ diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 95681c1b8..3d7d913fc 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -252,11 +252,11 @@ func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi return d.disk.DeleteVersion(ctx, volume, path, fi) } -func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) { +func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { if err := d.calcError(); err != nil { return FileInfo{}, err } - return d.disk.ReadVersion(ctx, volume, path, versionID, checkDataDir) + return d.disk.ReadVersion(ctx, volume, path, versionID) } func (d *naughtyDisk) WriteAll(ctx context.Context, volume string, path string, b []byte) (err error) { diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 00d527a37..1d92bee25 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -58,7 +58,7 @@ type StorageAPI interface { DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error - ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (FileInfo, error) + ReadVersion(ctx context.Context, volume, path, versionID string) (FileInfo, error) RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) error // File operations. diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 89cb5bfcb..0006c88b9 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -396,12 +396,11 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP return err } -func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) { +func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { values := make(url.Values) values.Set(storageRESTVolume, volume) values.Set(storageRESTFilePath, path) values.Set(storageRESTVersionID, versionID) - values.Set(storageRESTCheckDataDir, strconv.FormatBool(checkDataDir)) respBody, err := client.call(ctx, storageRESTMethodReadVersion, values, nil, -1) if err != nil { diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 7ac8bb3c0..bc93275c1 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -59,7 +59,6 @@ const ( storageRESTDirPath = "dir-path" storageRESTFilePath = "file-path" storageRESTVersionID = "version-id" - storageRESTCheckDataDir = "check-data-dir" storageRESTTotalVersions = "total-versions" storageRESTSrcVolume = "source-volume" storageRESTSrcPath = "source-path" diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index d199e9587..6cd3d1aa1 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -327,13 +327,7 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re volume := vars[storageRESTVolume] filePath := vars[storageRESTFilePath] versionID := vars[storageRESTVersionID] - checkDataDir, err := strconv.ParseBool(vars[storageRESTCheckDataDir]) - if err != nil { - s.writeErrorResponse(w, err) - return - } - - fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID, checkDataDir) + fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID) if err != nil { s.writeErrorResponse(w, err) return @@ -1043,7 +1037,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersion).HandlerFunc(httpTraceHdrs(server.DeleteVersionHandler)). Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(httpTraceHdrs(server.ReadVersionHandler)). - Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID, storageRESTCheckDataDir)...) + Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameData).HandlerFunc(httpTraceHdrs(server.RenameDataHandler)). Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDataDir, storageRESTDstVolume, storageRESTDstPath)...) diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 9f36554ec..7ae615ae5 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -272,12 +272,12 @@ func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path s return p.storage.WriteMetadata(ctx, volume, path, fi) } -func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) { +func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { if err = p.checkDiskStale(); err != nil { return fi, err } - return p.storage.ReadVersion(ctx, volume, path, versionID, checkDataDir) + return p.storage.ReadVersion(ctx, volume, path, versionID) } func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) { diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index bcb995d18..6a5819cee 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -1123,7 +1123,7 @@ func (s *xlStorage) renameLegacyMetadata(volume, path string) error { } // ReadVersion - reads metadata and returns FileInfo at path `xl.meta` -func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) { +func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile)) if err != nil { if err == errFileNotFound { @@ -1158,28 +1158,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str return fi, errFileNotFound } - fi, err = getFileInfo(buf, volume, path, versionID) - if err != nil { - return fi, err - } - // skip checking data dir when object is transitioned - after transition, data dir will - // be empty with just the metadata left behind. - if fi.TransitionStatus != "" { - checkDataDir = false - } - if fi.DataDir != "" && checkDataDir { - if _, err = s.StatVol(ctx, pathJoin(volume, path, fi.DataDir, slashSeparator)); err != nil { - if err == errVolumeNotFound { - if versionID != "" { - return fi, errFileVersionNotFound - } - return fi, errFileNotFound - } - return fi, err - } - } - - return fi, nil + return getFileInfo(buf, volume, path, versionID) } // ReadAll reads from r until an error or EOF and returns the data it read.