From 30135eed86f470407123775f3c11ecd3e991337b Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 5 Feb 2019 17:58:48 -0800 Subject: [PATCH] Redo how to handle stale dangling files (#7171) foo.CORRUPTED should never be created because when multiple sets are involved we would hash the file to wrong a location, this PR removes the code. But allows DeleteBucket() to work properly to delete dangling buckets/objects. Also adds another option to Healing where a user needs to specify `--remove` such that all dangling objects will be deleted with user confirmation. --- cmd/admin-heal-ops.go | 6 +- cmd/dummy-object-layer_test.go | 4 +- cmd/fs-v1.go | 4 +- cmd/fs-v1_test.go | 2 +- cmd/gateway-unsupported.go | 4 +- cmd/object-api-interface.go | 4 +- cmd/xl-sets.go | 8 +-- cmd/xl-v1-bucket.go | 40 +++++++++-- cmd/xl-v1-common.go | 49 ++++++++++---- cmd/xl-v1-healing.go | 56 ++++++++++++++-- cmd/xl-v1-healing_test.go | 4 +- cmd/xl-v1-object.go | 118 +-------------------------------- cmd/xl-v1-object_test.go | 6 +- pkg/madmin/heal-commands.go | 1 + 14 files changed, 144 insertions(+), 162 deletions(-) diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index d09b37724..df3cc1458 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -596,7 +596,7 @@ func (h *healSequence) healConfig() error { return errHealStopSignalled } o := objectInfos.Objects[index] - res, herr := objectAPI.HealObject(h.ctx, o.Bucket, o.Name, h.settings.DryRun) + res, herr := objectAPI.HealObject(h.ctx, o.Bucket, o.Name, h.settings.DryRun, h.settings.Remove) // Object might have been deleted, by the time heal // was attempted we ignore this file an move on. if isErrObjectNotFound(herr) { @@ -692,7 +692,7 @@ func (h *healSequence) healBucket(bucket string) error { return errServerNotInitialized } - results, err := objectAPI.HealBucket(h.ctx, bucket, h.settings.DryRun) + results, err := objectAPI.HealBucket(h.ctx, bucket, h.settings.DryRun, h.settings.Remove) // push any available results before checking for error for _, result := range results { if perr := h.pushHealResultItem(result); perr != nil { @@ -775,7 +775,7 @@ func (h *healSequence) healObject(bucket, object string) error { return errServerNotInitialized } - hri, err := objectAPI.HealObject(h.ctx, bucket, object, h.settings.DryRun) + hri, err := objectAPI.HealObject(h.ctx, bucket, object, h.settings.DryRun, h.settings.Remove) if isErrObjectNotFound(err) { return nil } diff --git a/cmd/dummy-object-layer_test.go b/cmd/dummy-object-layer_test.go index dfa9748bf..869431b6f 100644 --- a/cmd/dummy-object-layer_test.go +++ b/cmd/dummy-object-layer_test.go @@ -119,11 +119,11 @@ func (api *DummyObjectLayer) HealFormat(ctx context.Context, dryRun bool) (item return } -func (api *DummyObjectLayer) HealBucket(ctx context.Context, bucket string, dryRun bool) (items []madmin.HealResultItem, err error) { +func (api *DummyObjectLayer) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (items []madmin.HealResultItem, err error) { return } -func (api *DummyObjectLayer) HealObject(ctx context.Context, bucket, object string, dryRun bool) (item madmin.HealResultItem, err error) { +func (api *DummyObjectLayer) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool) (item madmin.HealResultItem, err error) { return } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index ce2f11e4b..a27c3e5bb 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -1245,14 +1245,14 @@ func (fs *FSObjects) HealFormat(ctx context.Context, dryRun bool) (madmin.HealRe } // HealObject - no-op for fs. Valid only for XL. -func (fs *FSObjects) HealObject(ctx context.Context, bucket, object string, dryRun bool) ( +func (fs *FSObjects) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool) ( res madmin.HealResultItem, err error) { logger.LogIf(ctx, NotImplemented{}) return res, NotImplemented{} } // HealBucket - no-op for fs, Valid only for XL. -func (fs *FSObjects) HealBucket(ctx context.Context, bucket string, dryRun bool) ([]madmin.HealResultItem, +func (fs *FSObjects) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) ([]madmin.HealResultItem, error) { logger.LogIf(ctx, NotImplemented{}) return nil, NotImplemented{} diff --git a/cmd/fs-v1_test.go b/cmd/fs-v1_test.go index 02ef650d5..a23ebb983 100644 --- a/cmd/fs-v1_test.go +++ b/cmd/fs-v1_test.go @@ -390,7 +390,7 @@ func TestFSHealObject(t *testing.T) { defer os.RemoveAll(disk) obj := initFSObjects(disk, t) - _, err := obj.HealObject(context.Background(), "bucket", "object", false) + _, err := obj.HealObject(context.Background(), "bucket", "object", false, false) if err == nil || !isSameType(err, NotImplemented{}) { t.Fatalf("Heal Object should return NotImplemented error ") } diff --git a/cmd/gateway-unsupported.go b/cmd/gateway-unsupported.go index 74031d11f..cabd8eeb7 100644 --- a/cmd/gateway-unsupported.go +++ b/cmd/gateway-unsupported.go @@ -92,7 +92,7 @@ func (a GatewayUnsupported) HealFormat(ctx context.Context, dryRun bool) (madmin } // HealBucket - Not implemented stub -func (a GatewayUnsupported) HealBucket(ctx context.Context, bucket string, dryRun bool) ([]madmin.HealResultItem, error) { +func (a GatewayUnsupported) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) ([]madmin.HealResultItem, error) { return nil, NotImplemented{} } @@ -102,7 +102,7 @@ func (a GatewayUnsupported) ListBucketsHeal(ctx context.Context) (buckets []Buck } // HealObject - Not implemented stub -func (a GatewayUnsupported) HealObject(ctx context.Context, bucket, object string, dryRun bool) (h madmin.HealResultItem, e error) { +func (a GatewayUnsupported) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool) (h madmin.HealResultItem, e error) { return h, NotImplemented{} } diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 6c685a02d..45945b1e0 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -82,8 +82,8 @@ type ObjectLayer interface { // Healing operations. ReloadFormat(ctx context.Context, dryRun bool) error HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) - HealBucket(ctx context.Context, bucket string, dryRun bool) ([]madmin.HealResultItem, error) - HealObject(ctx context.Context, bucket, object string, dryRun bool) (madmin.HealResultItem, error) + HealBucket(ctx context.Context, bucket string, dryRun, remove bool) ([]madmin.HealResultItem, error) + HealObject(ctx context.Context, bucket, object string, dryRun, remove bool) (madmin.HealResultItem, error) ListBucketsHeal(ctx context.Context) (buckets []BucketInfo, err error) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 2c9216236..c779224ee 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -1182,7 +1182,7 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealRe } // HealBucket - heals inconsistent buckets and bucket metadata on all sets. -func (s *xlSets) HealBucket(ctx context.Context, bucket string, dryRun bool) (results []madmin.HealResultItem, err error) { +func (s *xlSets) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (results []madmin.HealResultItem, err error) { bucketLock := globalNSMutex.NewNSLock(bucket, "") if err := bucketLock.GetLock(globalHealingTimeout); err != nil { return nil, err @@ -1199,7 +1199,7 @@ func (s *xlSets) HealBucket(ctx context.Context, bucket string, dryRun bool) (re for _, s := range s.sets { var setResults []madmin.HealResultItem - setResults, _ = s.HealBucket(ctx, bucket, dryRun) + setResults, _ = s.HealBucket(ctx, bucket, dryRun, remove) for _, setResult := range setResults { if setResult.Type == madmin.HealItemBucket { for _, v := range setResult.Before.Drives { @@ -1267,8 +1267,8 @@ func (s *xlSets) HealBucket(ctx context.Context, bucket string, dryRun bool) (re } // HealObject - heals inconsistent object on a hashedSet based on object name. -func (s *xlSets) HealObject(ctx context.Context, bucket, object string, dryRun bool) (madmin.HealResultItem, error) { - return s.getHashedSet(object).HealObject(ctx, bucket, object, dryRun) +func (s *xlSets) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool) (madmin.HealResultItem, error) { + return s.getHashedSet(object).HealObject(ctx, bucket, object, dryRun, remove) } // Lists all buckets which need healing. diff --git a/cmd/xl-v1-bucket.go b/cmd/xl-v1-bucket.go index 222ad5cda..2c5cef2e6 100644 --- a/cmd/xl-v1-bucket.go +++ b/cmd/xl-v1-bucket.go @@ -218,6 +218,30 @@ func (xl xlObjects) ListBuckets(ctx context.Context) ([]BucketInfo, error) { return bucketInfos, nil } +// Dangling buckets should be handled appropriately, in this following situation +// we actually have quorum error to be `nil` but we have some disks where +// the bucket delete returned `errVolumeNotEmpty` but this is not correct +// can only happen if there are dangling objects in a bucket. Under such +// a situation we simply attempt a full delete of the bucket including +// the dangling objects. All of this happens under a lock and there +// is no way a user can create buckets and sneak in objects into namespace, +// so it is safer to do. +func deleteDanglingBucket(ctx context.Context, storageDisks []StorageAPI, dErrs []error, bucket string) { + for index, err := range dErrs { + if err == errVolumeNotEmpty { + // Attempt to delete bucket again. + if derr := storageDisks[index].DeleteVol(bucket); derr == errVolumeNotEmpty { + _ = cleanupDir(ctx, storageDisks[index], bucket, "") + + _ = storageDisks[index].DeleteVol(bucket) + + // Cleanup all the previously incomplete multiparts. + _ = cleanupDir(ctx, storageDisks[index], minioMetaMultipartBucket, bucket) + } + } + } +} + // DeleteBucket - deletes a bucket. func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string) error { bucketLock := xl.nsMutex.NewNSLock(bucket, "") @@ -231,7 +255,8 @@ func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string) error { var dErrs = make([]error, len(xl.getDisks())) // Remove a volume entry on all underlying storage disks. - for index, disk := range xl.getDisks() { + storageDisks := xl.getDisks() + for index, disk := range storageDisks { if disk == nil { dErrs[index] = errDiskNotFound continue @@ -242,18 +267,14 @@ func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string) error { defer wg.Done() // Attempt to delete bucket. err := disk.DeleteVol(bucket) - if err != nil { dErrs[index] = err return } + // Cleanup all the previously incomplete multiparts. err = cleanupDir(ctx, disk, minioMetaMultipartBucket, bucket) - - if err != nil { - if err == errVolumeNotFound { - return - } + if err != nil && err != errVolumeNotFound { dErrs[index] = err } }(index, disk) @@ -271,6 +292,11 @@ func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string) error { return toObjectErr(err, bucket) } + // If we reduce quorum to nil, means we have deleted buckets properly + // on some servers in quorum, we should look for volumeNotEmpty errors + // and delete those buckets as well. + deleteDanglingBucket(ctx, storageDisks, dErrs, bucket) + return nil } diff --git a/cmd/xl-v1-common.go b/cmd/xl-v1-common.go index 0b8ec4d25..35b9eafa9 100644 --- a/cmd/xl-v1-common.go +++ b/cmd/xl-v1-common.go @@ -19,6 +19,7 @@ package cmd import ( "context" "path" + "sync" "github.com/minio/minio/cmd/logger" ) @@ -75,26 +76,46 @@ func (xl xlObjects) isObject(bucket, prefix string) (ok bool) { // isObjectDir returns if the specified path represents an empty directory. func (xl xlObjects) isObjectDir(bucket, prefix string) (ok bool) { - for _, disk := range xl.getLoadBalancedDisks() { + var objectDirs = make([]bool, len(xl.getDisks())) + var errs = make([]error, len(xl.getDisks())) + var wg sync.WaitGroup + for index, disk := range xl.getDisks() { if disk == nil { continue } - // Check if 'prefix' is an object on this 'disk', else continue the check the next disk - ctnts, err := disk.ListDir(bucket, prefix, 1) - if err == nil { - if len(ctnts) == 0 { - return true + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + // Check if 'prefix' is an object on this 'disk', else continue the check the next disk + entries, err := disk.ListDir(bucket, prefix, 1) + if err != nil { + errs[index] = err + return } - return false - } - // Ignore for file not found, disk not found or faulty disk. - if IsErrIgnored(err, xlTreeWalkIgnoredErrs...) { - continue - } + objectDirs[index] = len(entries) == 0 + }(index, disk) + } + + wg.Wait() + + readQuorum := len(xl.getDisks()) / 2 + + err := reduceReadQuorumErrs(context.Background(), errs, xlTreeWalkIgnoredErrs, readQuorum) + if err != nil { reqInfo := &logger.ReqInfo{BucketName: bucket} reqInfo.AppendTags("prefix", prefix) ctx := logger.SetReqInfo(context.Background(), reqInfo) logger.LogIf(ctx, err) - } // Exhausted all disks - return false. - return false + return false + } + + for _, ok := range objectDirs { + if !ok { + // We perhaps found some entries in a prefix. + return false + } + } + + // Return true if and only if the all directories are empty. + return true } diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index 3ea0ead9a..48316a70e 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -41,7 +41,7 @@ func (xl xlObjects) HealFormat(ctx context.Context, dryRun bool) (madmin.HealRes // Heals a bucket if it doesn't exist on one of the disks, additionally // also heals the missing entries for bucket metadata files // `policy.json, notification.xml, listeners.json`. -func (xl xlObjects) HealBucket(ctx context.Context, bucket string, dryRun bool) ( +func (xl xlObjects) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) ( results []madmin.HealResultItem, err error) { storageDisks := xl.getDisks() @@ -58,7 +58,7 @@ func (xl xlObjects) HealBucket(ctx context.Context, bucket string, dryRun bool) results = append(results, result) // Proceed to heal bucket metadata. - metaResults, err := healBucketMetadata(xl, bucket, dryRun) + metaResults, err := healBucketMetadata(xl, bucket, dryRun, remove) results = append(results, metaResults...) return results, err } @@ -159,13 +159,13 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, bucket string, w // Heals all the metadata associated for a given bucket, this function // heals `policy.json`, `notification.xml` and `listeners.json`. -func healBucketMetadata(xl xlObjects, bucket string, dryRun bool) ( +func healBucketMetadata(xl xlObjects, bucket string, dryRun, remove bool) ( results []madmin.HealResultItem, err error) { healBucketMetaFn := func(metaPath string) error { reqInfo := &logger.ReqInfo{BucketName: bucket} ctx := logger.SetReqInfo(context.Background(), reqInfo) - result, healErr := xl.HealObject(ctx, minioMetaBucket, metaPath, dryRun) + result, healErr := xl.HealObject(ctx, minioMetaBucket, metaPath, dryRun, remove) // If object is not found, skip the file. if isErrObjectNotFound(healErr) { return nil @@ -639,12 +639,45 @@ func defaultHealResult(storageDisks []StorageAPI, errs []error, bucket, object s return result } +// Object is considered dangling/corrupted if any only +// if total disks - a combination of corrupted and missing +// files is lesser than number of data blocks. +func (xl xlObjects) isObjectDangling(metaArr []xlMetaV1, errs []error) (validMeta xlMetaV1, ok bool) { + // We can consider an object data not reliable + // when xl.json is not found in read quorum disks. + // or when xl.json is not readable in read quorum disks. + var notFoundXLJSON, corruptedXLJSON int + for _, readErr := range errs { + if readErr == errFileNotFound { + notFoundXLJSON++ + } else if readErr == errCorruptedFormat { + corruptedXLJSON++ + } + } + + for _, m := range metaArr { + if !m.IsValid() { + continue + } + validMeta = m + break + } + + // We couldn't find any valid meta we are indeed corrupted, return true right away. + if validMeta.Erasure.DataBlocks == 0 { + return validMeta, true + } + + // We have valid meta, now verify if we have enough files with data blocks. + return validMeta, (len(xl.getDisks()) - corruptedXLJSON - notFoundXLJSON) < validMeta.Erasure.DataBlocks +} + // HealObject - heal the given object. // // FIXME: If an object object was deleted and one disk was down, // and later the disk comes back up again, heal on the object // should delete it. -func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRun bool) (hr madmin.HealResultItem, err error) { +func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRun bool, remove bool) (hr madmin.HealResultItem, err error) { // Create context that also contains information about the object and bucket. // The top level handler might not have this information. reqInfo := logger.GetReqInfo(ctx) @@ -667,6 +700,19 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRu // Read metadata files from all the disks partsMetadata, errs := readAllXLMetadata(healCtx, storageDisks, bucket, object) + // Check if the object is dangling, if yes and user requested + // remove we simply delete it from namespace. + if m, ok := xl.isObjectDangling(partsMetadata, errs); ok { + writeQuorum := m.Erasure.DataBlocks + 1 + if m.Erasure.DataBlocks == 0 { + writeQuorum = len(xl.getDisks())/2 + 1 + } + if !dryRun && remove { + err = xl.deleteObject(healCtx, bucket, object, writeQuorum, false) + } + return defaultHealResult(storageDisks, errs, bucket, object), err + } + latestXLMeta, err := getLatestXLMeta(healCtx, partsMetadata, errs) if err != nil { return defaultHealResult(storageDisks, errs, bucket, object), toObjectErr(err, bucket, object) diff --git a/cmd/xl-v1-healing_test.go b/cmd/xl-v1-healing_test.go index 3c305db12..fafedd75b 100644 --- a/cmd/xl-v1-healing_test.go +++ b/cmd/xl-v1-healing_test.go @@ -114,7 +114,7 @@ func TestHealObjectXL(t *testing.T) { t.Fatalf("Failed to delete a file - %v", err) } - _, err = obj.HealObject(context.Background(), bucket, object, false) + _, err = obj.HealObject(context.Background(), bucket, object, false, false) if err != nil { t.Fatalf("Failed to heal object - %v", err) } @@ -130,7 +130,7 @@ func TestHealObjectXL(t *testing.T) { } // Try healing now, expect to receive errDiskNotFound. - _, err = obj.HealObject(context.Background(), bucket, object, false) + _, err = obj.HealObject(context.Background(), bucket, object, false, false) // since majority of xl.jsons are not available, object quorum can't be read properly and error will be errXLReadQuorum if _, ok := err.(InsufficientReadQuorum); !ok { t.Errorf("Expected %v but received %v", InsufficientReadQuorum{}, err) diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 6a4d63bf4..f4d6c3a52 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -23,7 +23,6 @@ import ( "net/http" "path" "strconv" - "strings" "sync" "github.com/minio/minio/cmd/logger" @@ -430,100 +429,6 @@ func (xl xlObjects) GetObjectInfo(ctx context.Context, bucket, object string, op return info, nil } -func (xl xlObjects) isObjectCorrupted(metaArr []xlMetaV1, errs []error) (validMeta xlMetaV1, ok bool) { - // We can consider an object data not reliable - // when xl.json is not found in read quorum disks. - var notFoundXLJSON, corruptedXLJSON int - for _, readErr := range errs { - if readErr == errFileNotFound { - notFoundXLJSON++ - } - } - for _, readErr := range errs { - if readErr == errCorruptedFormat { - corruptedXLJSON++ - } - } - - for _, m := range metaArr { - if !m.IsValid() { - continue - } - validMeta = m - break - } - - // Return if the object is indeed corrupted. - return validMeta, len(xl.getDisks())-notFoundXLJSON < validMeta.Erasure.DataBlocks || len(xl.getDisks()) == corruptedXLJSON -} - -const xlCorruptedSuffix = ".CORRUPTED" - -// Renames the corrupted object and makes it visible. -func (xl xlObjects) renameCorruptedObject(ctx context.Context, bucket, object string, validMeta xlMetaV1, disks []StorageAPI, errs []error) { - // if errs returned are corrupted - if validMeta.Erasure.DataBlocks == 0 { - validMeta = newXLMetaV1(object, len(disks)/2, len(disks)/2) - } - writeQuorum := validMeta.Erasure.DataBlocks + 1 - - // Move all existing objects into corrupted suffix. - oldObj := mustGetUUID() - - rename(ctx, disks, bucket, object, minioMetaTmpBucket, oldObj, true, writeQuorum, []error{errFileNotFound}) - - // Delete temporary object in the event of failure. - // If PutObject succeeded there would be no temporary - // object to delete. - defer xl.deleteObject(ctx, minioMetaTmpBucket, oldObj, writeQuorum, false) - - tempObj := mustGetUUID() - - // Get all the disks which do not have the file. - var cdisks = make([]StorageAPI, len(disks)) - for i, merr := range errs { - if merr == errFileNotFound || merr == errCorruptedFormat { - cdisks[i] = disks[i] - } - } - - for _, disk := range cdisks { - if disk == nil { - continue - } - - // Write empty part file on missing disks. - disk.AppendFile(minioMetaTmpBucket, pathJoin(tempObj, "part.1"), []byte{}) - - // Write algorithm hash for empty part file. - var algorithm = DefaultBitrotAlgorithm - h := algorithm.New() - h.Write([]byte{}) - - // Update the checksums and part info. - validMeta.Erasure.Checksums = []ChecksumInfo{ - { - Name: "part.1", - Algorithm: algorithm, - Hash: h.Sum(nil), - }, - } - - validMeta.Parts = []ObjectPartInfo{ - { - Number: 1, - Name: "part.1", - }, - } - - // Write the `xl.json` with the newly calculated metadata. - writeXLMetadata(ctx, disk, minioMetaTmpBucket, tempObj, validMeta) - } - - // Finally rename all the parts into their respective locations. - rename(ctx, cdisks, minioMetaTmpBucket, tempObj, bucket, object+xlCorruptedSuffix, true, writeQuorum, []error{errFileNotFound}) -} - // getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo. func (xl xlObjects) getObjectInfo(ctx context.Context, bucket, object string) (objInfo ObjectInfo, err error) { disks := xl.getDisks() @@ -531,26 +436,9 @@ func (xl xlObjects) getObjectInfo(ctx context.Context, bucket, object string) (o // Read metadata associated with the object from all disks. metaArr, errs := readAllXLMetadata(ctx, disks, bucket, object) - var readQuorum int - // Having read quorum means we have xl.json in at least N/2 disks. - if !strings.HasSuffix(object, xlCorruptedSuffix) { - if validMeta, ok := xl.isObjectCorrupted(metaArr, errs); ok { - xl.renameCorruptedObject(ctx, bucket, object, validMeta, disks, errs) - // Return err file not found since we renamed now the corrupted object - return objInfo, errFileNotFound - } - - // Not a corrupted object, attempt to get readquorum properly. - readQuorum, _, err = objectQuorumFromMeta(ctx, xl, metaArr, errs) - if err != nil { - return objInfo, err - } - - } else { - - // If this is a corrupted object, change read quorum to N/2 disks - // so it will be visible to users, so they can delete it. - readQuorum = len(xl.getDisks()) / 2 + readQuorum, _, err := objectQuorumFromMeta(ctx, xl, metaArr, errs) + if err != nil { + return objInfo, err } // List all the file commit ids from parts metadata. diff --git a/cmd/xl-v1-object_test.go b/cmd/xl-v1-object_test.go index 597285436..889a0e448 100644 --- a/cmd/xl-v1-object_test.go +++ b/cmd/xl-v1-object_test.go @@ -308,7 +308,7 @@ func TestHealing(t *testing.T) { t.Fatal(err) } - _, err = xl.HealObject(context.Background(), bucket, object, false) + _, err = xl.HealObject(context.Background(), bucket, object, false, false) if err != nil { t.Fatal(err) } @@ -337,7 +337,7 @@ func TestHealing(t *testing.T) { t.Fatal(err) } - _, err = xl.HealObject(context.Background(), bucket, object, false) + _, err = xl.HealObject(context.Background(), bucket, object, false, false) if err != nil { t.Fatal(err) } @@ -359,7 +359,7 @@ func TestHealing(t *testing.T) { t.Fatal(err) } // This would create the bucket. - _, err = xl.HealBucket(context.Background(), bucket, false) + _, err = xl.HealBucket(context.Background(), bucket, false, false) if err != nil { t.Fatal(err) } diff --git a/pkg/madmin/heal-commands.go b/pkg/madmin/heal-commands.go index da18c17f6..c348b93fb 100644 --- a/pkg/madmin/heal-commands.go +++ b/pkg/madmin/heal-commands.go @@ -30,6 +30,7 @@ import ( type HealOpts struct { Recursive bool `json:"recursive"` DryRun bool `json:"dryRun"` + Remove bool `json:"remove"` } // HealStartSuccess - holds information about a successfully started