Redo how to handle stale dangling files (#7171)

foo.CORRUPTED should never be created because when
multiple sets are involved we would hash the file
to wrong a location, this PR removes the code.

But allows DeleteBucket() to work properly to delete
dangling buckets/objects. Also adds another option
to Healing where a user needs to specify `--remove`
such that all dangling objects will be deleted with
user confirmation.
master
Harshavardhana 6 years ago committed by kannappanr
parent e4081aee62
commit 30135eed86
  1. 6
      cmd/admin-heal-ops.go
  2. 4
      cmd/dummy-object-layer_test.go
  3. 4
      cmd/fs-v1.go
  4. 2
      cmd/fs-v1_test.go
  5. 4
      cmd/gateway-unsupported.go
  6. 4
      cmd/object-api-interface.go
  7. 8
      cmd/xl-sets.go
  8. 40
      cmd/xl-v1-bucket.go
  9. 43
      cmd/xl-v1-common.go
  10. 56
      cmd/xl-v1-healing.go
  11. 4
      cmd/xl-v1-healing_test.go
  12. 114
      cmd/xl-v1-object.go
  13. 6
      cmd/xl-v1-object_test.go
  14. 1
      pkg/madmin/heal-commands.go

@ -596,7 +596,7 @@ func (h *healSequence) healConfig() error {
return errHealStopSignalled return errHealStopSignalled
} }
o := objectInfos.Objects[index] o := objectInfos.Objects[index]
res, herr := objectAPI.HealObject(h.ctx, o.Bucket, o.Name, h.settings.DryRun) res, herr := objectAPI.HealObject(h.ctx, o.Bucket, o.Name, h.settings.DryRun, h.settings.Remove)
// Object might have been deleted, by the time heal // Object might have been deleted, by the time heal
// was attempted we ignore this file an move on. // was attempted we ignore this file an move on.
if isErrObjectNotFound(herr) { if isErrObjectNotFound(herr) {
@ -692,7 +692,7 @@ func (h *healSequence) healBucket(bucket string) error {
return errServerNotInitialized return errServerNotInitialized
} }
results, err := objectAPI.HealBucket(h.ctx, bucket, h.settings.DryRun) results, err := objectAPI.HealBucket(h.ctx, bucket, h.settings.DryRun, h.settings.Remove)
// push any available results before checking for error // push any available results before checking for error
for _, result := range results { for _, result := range results {
if perr := h.pushHealResultItem(result); perr != nil { if perr := h.pushHealResultItem(result); perr != nil {
@ -775,7 +775,7 @@ func (h *healSequence) healObject(bucket, object string) error {
return errServerNotInitialized return errServerNotInitialized
} }
hri, err := objectAPI.HealObject(h.ctx, bucket, object, h.settings.DryRun) hri, err := objectAPI.HealObject(h.ctx, bucket, object, h.settings.DryRun, h.settings.Remove)
if isErrObjectNotFound(err) { if isErrObjectNotFound(err) {
return nil return nil
} }

@ -119,11 +119,11 @@ func (api *DummyObjectLayer) HealFormat(ctx context.Context, dryRun bool) (item
return return
} }
func (api *DummyObjectLayer) HealBucket(ctx context.Context, bucket string, dryRun bool) (items []madmin.HealResultItem, err error) { func (api *DummyObjectLayer) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (items []madmin.HealResultItem, err error) {
return return
} }
func (api *DummyObjectLayer) HealObject(ctx context.Context, bucket, object string, dryRun bool) (item madmin.HealResultItem, err error) { func (api *DummyObjectLayer) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool) (item madmin.HealResultItem, err error) {
return return
} }

@ -1245,14 +1245,14 @@ func (fs *FSObjects) HealFormat(ctx context.Context, dryRun bool) (madmin.HealRe
} }
// HealObject - no-op for fs. Valid only for XL. // HealObject - no-op for fs. Valid only for XL.
func (fs *FSObjects) HealObject(ctx context.Context, bucket, object string, dryRun bool) ( func (fs *FSObjects) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool) (
res madmin.HealResultItem, err error) { res madmin.HealResultItem, err error) {
logger.LogIf(ctx, NotImplemented{}) logger.LogIf(ctx, NotImplemented{})
return res, NotImplemented{} return res, NotImplemented{}
} }
// HealBucket - no-op for fs, Valid only for XL. // HealBucket - no-op for fs, Valid only for XL.
func (fs *FSObjects) HealBucket(ctx context.Context, bucket string, dryRun bool) ([]madmin.HealResultItem, func (fs *FSObjects) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) ([]madmin.HealResultItem,
error) { error) {
logger.LogIf(ctx, NotImplemented{}) logger.LogIf(ctx, NotImplemented{})
return nil, NotImplemented{} return nil, NotImplemented{}

@ -390,7 +390,7 @@ func TestFSHealObject(t *testing.T) {
defer os.RemoveAll(disk) defer os.RemoveAll(disk)
obj := initFSObjects(disk, t) obj := initFSObjects(disk, t)
_, err := obj.HealObject(context.Background(), "bucket", "object", false) _, err := obj.HealObject(context.Background(), "bucket", "object", false, false)
if err == nil || !isSameType(err, NotImplemented{}) { if err == nil || !isSameType(err, NotImplemented{}) {
t.Fatalf("Heal Object should return NotImplemented error ") t.Fatalf("Heal Object should return NotImplemented error ")
} }

@ -92,7 +92,7 @@ func (a GatewayUnsupported) HealFormat(ctx context.Context, dryRun bool) (madmin
} }
// HealBucket - Not implemented stub // HealBucket - Not implemented stub
func (a GatewayUnsupported) HealBucket(ctx context.Context, bucket string, dryRun bool) ([]madmin.HealResultItem, error) { func (a GatewayUnsupported) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) ([]madmin.HealResultItem, error) {
return nil, NotImplemented{} return nil, NotImplemented{}
} }
@ -102,7 +102,7 @@ func (a GatewayUnsupported) ListBucketsHeal(ctx context.Context) (buckets []Buck
} }
// HealObject - Not implemented stub // HealObject - Not implemented stub
func (a GatewayUnsupported) HealObject(ctx context.Context, bucket, object string, dryRun bool) (h madmin.HealResultItem, e error) { func (a GatewayUnsupported) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool) (h madmin.HealResultItem, e error) {
return h, NotImplemented{} return h, NotImplemented{}
} }

@ -82,8 +82,8 @@ type ObjectLayer interface {
// Healing operations. // Healing operations.
ReloadFormat(ctx context.Context, dryRun bool) error ReloadFormat(ctx context.Context, dryRun bool) error
HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error)
HealBucket(ctx context.Context, bucket string, dryRun bool) ([]madmin.HealResultItem, error) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) ([]madmin.HealResultItem, error)
HealObject(ctx context.Context, bucket, object string, dryRun bool) (madmin.HealResultItem, error) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool) (madmin.HealResultItem, error)
ListBucketsHeal(ctx context.Context) (buckets []BucketInfo, err error) ListBucketsHeal(ctx context.Context) (buckets []BucketInfo, err error)
ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error)

@ -1182,7 +1182,7 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealRe
} }
// HealBucket - heals inconsistent buckets and bucket metadata on all sets. // HealBucket - heals inconsistent buckets and bucket metadata on all sets.
func (s *xlSets) HealBucket(ctx context.Context, bucket string, dryRun bool) (results []madmin.HealResultItem, err error) { func (s *xlSets) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (results []madmin.HealResultItem, err error) {
bucketLock := globalNSMutex.NewNSLock(bucket, "") bucketLock := globalNSMutex.NewNSLock(bucket, "")
if err := bucketLock.GetLock(globalHealingTimeout); err != nil { if err := bucketLock.GetLock(globalHealingTimeout); err != nil {
return nil, err return nil, err
@ -1199,7 +1199,7 @@ func (s *xlSets) HealBucket(ctx context.Context, bucket string, dryRun bool) (re
for _, s := range s.sets { for _, s := range s.sets {
var setResults []madmin.HealResultItem var setResults []madmin.HealResultItem
setResults, _ = s.HealBucket(ctx, bucket, dryRun) setResults, _ = s.HealBucket(ctx, bucket, dryRun, remove)
for _, setResult := range setResults { for _, setResult := range setResults {
if setResult.Type == madmin.HealItemBucket { if setResult.Type == madmin.HealItemBucket {
for _, v := range setResult.Before.Drives { for _, v := range setResult.Before.Drives {
@ -1267,8 +1267,8 @@ func (s *xlSets) HealBucket(ctx context.Context, bucket string, dryRun bool) (re
} }
// HealObject - heals inconsistent object on a hashedSet based on object name. // HealObject - heals inconsistent object on a hashedSet based on object name.
func (s *xlSets) HealObject(ctx context.Context, bucket, object string, dryRun bool) (madmin.HealResultItem, error) { func (s *xlSets) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool) (madmin.HealResultItem, error) {
return s.getHashedSet(object).HealObject(ctx, bucket, object, dryRun) return s.getHashedSet(object).HealObject(ctx, bucket, object, dryRun, remove)
} }
// Lists all buckets which need healing. // Lists all buckets which need healing.

@ -218,6 +218,30 @@ func (xl xlObjects) ListBuckets(ctx context.Context) ([]BucketInfo, error) {
return bucketInfos, nil return bucketInfos, nil
} }
// Dangling buckets should be handled appropriately, in this following situation
// we actually have quorum error to be `nil` but we have some disks where
// the bucket delete returned `errVolumeNotEmpty` but this is not correct
// can only happen if there are dangling objects in a bucket. Under such
// a situation we simply attempt a full delete of the bucket including
// the dangling objects. All of this happens under a lock and there
// is no way a user can create buckets and sneak in objects into namespace,
// so it is safer to do.
func deleteDanglingBucket(ctx context.Context, storageDisks []StorageAPI, dErrs []error, bucket string) {
for index, err := range dErrs {
if err == errVolumeNotEmpty {
// Attempt to delete bucket again.
if derr := storageDisks[index].DeleteVol(bucket); derr == errVolumeNotEmpty {
_ = cleanupDir(ctx, storageDisks[index], bucket, "")
_ = storageDisks[index].DeleteVol(bucket)
// Cleanup all the previously incomplete multiparts.
_ = cleanupDir(ctx, storageDisks[index], minioMetaMultipartBucket, bucket)
}
}
}
}
// DeleteBucket - deletes a bucket. // DeleteBucket - deletes a bucket.
func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string) error { func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string) error {
bucketLock := xl.nsMutex.NewNSLock(bucket, "") bucketLock := xl.nsMutex.NewNSLock(bucket, "")
@ -231,7 +255,8 @@ func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string) error {
var dErrs = make([]error, len(xl.getDisks())) var dErrs = make([]error, len(xl.getDisks()))
// Remove a volume entry on all underlying storage disks. // Remove a volume entry on all underlying storage disks.
for index, disk := range xl.getDisks() { storageDisks := xl.getDisks()
for index, disk := range storageDisks {
if disk == nil { if disk == nil {
dErrs[index] = errDiskNotFound dErrs[index] = errDiskNotFound
continue continue
@ -242,18 +267,14 @@ func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string) error {
defer wg.Done() defer wg.Done()
// Attempt to delete bucket. // Attempt to delete bucket.
err := disk.DeleteVol(bucket) err := disk.DeleteVol(bucket)
if err != nil { if err != nil {
dErrs[index] = err dErrs[index] = err
return return
} }
// Cleanup all the previously incomplete multiparts. // Cleanup all the previously incomplete multiparts.
err = cleanupDir(ctx, disk, minioMetaMultipartBucket, bucket) err = cleanupDir(ctx, disk, minioMetaMultipartBucket, bucket)
if err != nil && err != errVolumeNotFound {
if err != nil {
if err == errVolumeNotFound {
return
}
dErrs[index] = err dErrs[index] = err
} }
}(index, disk) }(index, disk)
@ -271,6 +292,11 @@ func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string) error {
return toObjectErr(err, bucket) return toObjectErr(err, bucket)
} }
// If we reduce quorum to nil, means we have deleted buckets properly
// on some servers in quorum, we should look for volumeNotEmpty errors
// and delete those buckets as well.
deleteDanglingBucket(ctx, storageDisks, dErrs, bucket)
return nil return nil
} }

@ -19,6 +19,7 @@ package cmd
import ( import (
"context" "context"
"path" "path"
"sync"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
) )
@ -75,26 +76,46 @@ func (xl xlObjects) isObject(bucket, prefix string) (ok bool) {
// isObjectDir returns if the specified path represents an empty directory. // isObjectDir returns if the specified path represents an empty directory.
func (xl xlObjects) isObjectDir(bucket, prefix string) (ok bool) { func (xl xlObjects) isObjectDir(bucket, prefix string) (ok bool) {
for _, disk := range xl.getLoadBalancedDisks() { var objectDirs = make([]bool, len(xl.getDisks()))
var errs = make([]error, len(xl.getDisks()))
var wg sync.WaitGroup
for index, disk := range xl.getDisks() {
if disk == nil { if disk == nil {
continue continue
} }
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
// Check if 'prefix' is an object on this 'disk', else continue the check the next disk // Check if 'prefix' is an object on this 'disk', else continue the check the next disk
ctnts, err := disk.ListDir(bucket, prefix, 1) entries, err := disk.ListDir(bucket, prefix, 1)
if err == nil { if err != nil {
if len(ctnts) == 0 { errs[index] = err
return true return
}
return false
} }
// Ignore for file not found, disk not found or faulty disk. objectDirs[index] = len(entries) == 0
if IsErrIgnored(err, xlTreeWalkIgnoredErrs...) { }(index, disk)
continue
} }
wg.Wait()
readQuorum := len(xl.getDisks()) / 2
err := reduceReadQuorumErrs(context.Background(), errs, xlTreeWalkIgnoredErrs, readQuorum)
if err != nil {
reqInfo := &logger.ReqInfo{BucketName: bucket} reqInfo := &logger.ReqInfo{BucketName: bucket}
reqInfo.AppendTags("prefix", prefix) reqInfo.AppendTags("prefix", prefix)
ctx := logger.SetReqInfo(context.Background(), reqInfo) ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
} // Exhausted all disks - return false.
return false return false
}
for _, ok := range objectDirs {
if !ok {
// We perhaps found some entries in a prefix.
return false
}
}
// Return true if and only if the all directories are empty.
return true
} }

@ -41,7 +41,7 @@ func (xl xlObjects) HealFormat(ctx context.Context, dryRun bool) (madmin.HealRes
// Heals a bucket if it doesn't exist on one of the disks, additionally // Heals a bucket if it doesn't exist on one of the disks, additionally
// also heals the missing entries for bucket metadata files // also heals the missing entries for bucket metadata files
// `policy.json, notification.xml, listeners.json`. // `policy.json, notification.xml, listeners.json`.
func (xl xlObjects) HealBucket(ctx context.Context, bucket string, dryRun bool) ( func (xl xlObjects) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (
results []madmin.HealResultItem, err error) { results []madmin.HealResultItem, err error) {
storageDisks := xl.getDisks() storageDisks := xl.getDisks()
@ -58,7 +58,7 @@ func (xl xlObjects) HealBucket(ctx context.Context, bucket string, dryRun bool)
results = append(results, result) results = append(results, result)
// Proceed to heal bucket metadata. // Proceed to heal bucket metadata.
metaResults, err := healBucketMetadata(xl, bucket, dryRun) metaResults, err := healBucketMetadata(xl, bucket, dryRun, remove)
results = append(results, metaResults...) results = append(results, metaResults...)
return results, err return results, err
} }
@ -159,13 +159,13 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, bucket string, w
// Heals all the metadata associated for a given bucket, this function // Heals all the metadata associated for a given bucket, this function
// heals `policy.json`, `notification.xml` and `listeners.json`. // heals `policy.json`, `notification.xml` and `listeners.json`.
func healBucketMetadata(xl xlObjects, bucket string, dryRun bool) ( func healBucketMetadata(xl xlObjects, bucket string, dryRun, remove bool) (
results []madmin.HealResultItem, err error) { results []madmin.HealResultItem, err error) {
healBucketMetaFn := func(metaPath string) error { healBucketMetaFn := func(metaPath string) error {
reqInfo := &logger.ReqInfo{BucketName: bucket} reqInfo := &logger.ReqInfo{BucketName: bucket}
ctx := logger.SetReqInfo(context.Background(), reqInfo) ctx := logger.SetReqInfo(context.Background(), reqInfo)
result, healErr := xl.HealObject(ctx, minioMetaBucket, metaPath, dryRun) result, healErr := xl.HealObject(ctx, minioMetaBucket, metaPath, dryRun, remove)
// If object is not found, skip the file. // If object is not found, skip the file.
if isErrObjectNotFound(healErr) { if isErrObjectNotFound(healErr) {
return nil return nil
@ -639,12 +639,45 @@ func defaultHealResult(storageDisks []StorageAPI, errs []error, bucket, object s
return result return result
} }
// Object is considered dangling/corrupted if any only
// if total disks - a combination of corrupted and missing
// files is lesser than number of data blocks.
func (xl xlObjects) isObjectDangling(metaArr []xlMetaV1, errs []error) (validMeta xlMetaV1, ok bool) {
// We can consider an object data not reliable
// when xl.json is not found in read quorum disks.
// or when xl.json is not readable in read quorum disks.
var notFoundXLJSON, corruptedXLJSON int
for _, readErr := range errs {
if readErr == errFileNotFound {
notFoundXLJSON++
} else if readErr == errCorruptedFormat {
corruptedXLJSON++
}
}
for _, m := range metaArr {
if !m.IsValid() {
continue
}
validMeta = m
break
}
// We couldn't find any valid meta we are indeed corrupted, return true right away.
if validMeta.Erasure.DataBlocks == 0 {
return validMeta, true
}
// We have valid meta, now verify if we have enough files with data blocks.
return validMeta, (len(xl.getDisks()) - corruptedXLJSON - notFoundXLJSON) < validMeta.Erasure.DataBlocks
}
// HealObject - heal the given object. // HealObject - heal the given object.
// //
// FIXME: If an object object was deleted and one disk was down, // FIXME: If an object object was deleted and one disk was down,
// and later the disk comes back up again, heal on the object // and later the disk comes back up again, heal on the object
// should delete it. // should delete it.
func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRun bool) (hr madmin.HealResultItem, err error) { func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRun bool, remove bool) (hr madmin.HealResultItem, err error) {
// Create context that also contains information about the object and bucket. // Create context that also contains information about the object and bucket.
// The top level handler might not have this information. // The top level handler might not have this information.
reqInfo := logger.GetReqInfo(ctx) reqInfo := logger.GetReqInfo(ctx)
@ -667,6 +700,19 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRu
// Read metadata files from all the disks // Read metadata files from all the disks
partsMetadata, errs := readAllXLMetadata(healCtx, storageDisks, bucket, object) partsMetadata, errs := readAllXLMetadata(healCtx, storageDisks, bucket, object)
// Check if the object is dangling, if yes and user requested
// remove we simply delete it from namespace.
if m, ok := xl.isObjectDangling(partsMetadata, errs); ok {
writeQuorum := m.Erasure.DataBlocks + 1
if m.Erasure.DataBlocks == 0 {
writeQuorum = len(xl.getDisks())/2 + 1
}
if !dryRun && remove {
err = xl.deleteObject(healCtx, bucket, object, writeQuorum, false)
}
return defaultHealResult(storageDisks, errs, bucket, object), err
}
latestXLMeta, err := getLatestXLMeta(healCtx, partsMetadata, errs) latestXLMeta, err := getLatestXLMeta(healCtx, partsMetadata, errs)
if err != nil { if err != nil {
return defaultHealResult(storageDisks, errs, bucket, object), toObjectErr(err, bucket, object) return defaultHealResult(storageDisks, errs, bucket, object), toObjectErr(err, bucket, object)

@ -114,7 +114,7 @@ func TestHealObjectXL(t *testing.T) {
t.Fatalf("Failed to delete a file - %v", err) t.Fatalf("Failed to delete a file - %v", err)
} }
_, err = obj.HealObject(context.Background(), bucket, object, false) _, err = obj.HealObject(context.Background(), bucket, object, false, false)
if err != nil { if err != nil {
t.Fatalf("Failed to heal object - %v", err) t.Fatalf("Failed to heal object - %v", err)
} }
@ -130,7 +130,7 @@ func TestHealObjectXL(t *testing.T) {
} }
// Try healing now, expect to receive errDiskNotFound. // Try healing now, expect to receive errDiskNotFound.
_, err = obj.HealObject(context.Background(), bucket, object, false) _, err = obj.HealObject(context.Background(), bucket, object, false, false)
// since majority of xl.jsons are not available, object quorum can't be read properly and error will be errXLReadQuorum // since majority of xl.jsons are not available, object quorum can't be read properly and error will be errXLReadQuorum
if _, ok := err.(InsufficientReadQuorum); !ok { if _, ok := err.(InsufficientReadQuorum); !ok {
t.Errorf("Expected %v but received %v", InsufficientReadQuorum{}, err) t.Errorf("Expected %v but received %v", InsufficientReadQuorum{}, err)

@ -23,7 +23,6 @@ import (
"net/http" "net/http"
"path" "path"
"strconv" "strconv"
"strings"
"sync" "sync"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
@ -430,100 +429,6 @@ func (xl xlObjects) GetObjectInfo(ctx context.Context, bucket, object string, op
return info, nil return info, nil
} }
func (xl xlObjects) isObjectCorrupted(metaArr []xlMetaV1, errs []error) (validMeta xlMetaV1, ok bool) {
// We can consider an object data not reliable
// when xl.json is not found in read quorum disks.
var notFoundXLJSON, corruptedXLJSON int
for _, readErr := range errs {
if readErr == errFileNotFound {
notFoundXLJSON++
}
}
for _, readErr := range errs {
if readErr == errCorruptedFormat {
corruptedXLJSON++
}
}
for _, m := range metaArr {
if !m.IsValid() {
continue
}
validMeta = m
break
}
// Return if the object is indeed corrupted.
return validMeta, len(xl.getDisks())-notFoundXLJSON < validMeta.Erasure.DataBlocks || len(xl.getDisks()) == corruptedXLJSON
}
const xlCorruptedSuffix = ".CORRUPTED"
// Renames the corrupted object and makes it visible.
func (xl xlObjects) renameCorruptedObject(ctx context.Context, bucket, object string, validMeta xlMetaV1, disks []StorageAPI, errs []error) {
// if errs returned are corrupted
if validMeta.Erasure.DataBlocks == 0 {
validMeta = newXLMetaV1(object, len(disks)/2, len(disks)/2)
}
writeQuorum := validMeta.Erasure.DataBlocks + 1
// Move all existing objects into corrupted suffix.
oldObj := mustGetUUID()
rename(ctx, disks, bucket, object, minioMetaTmpBucket, oldObj, true, writeQuorum, []error{errFileNotFound})
// Delete temporary object in the event of failure.
// If PutObject succeeded there would be no temporary
// object to delete.
defer xl.deleteObject(ctx, minioMetaTmpBucket, oldObj, writeQuorum, false)
tempObj := mustGetUUID()
// Get all the disks which do not have the file.
var cdisks = make([]StorageAPI, len(disks))
for i, merr := range errs {
if merr == errFileNotFound || merr == errCorruptedFormat {
cdisks[i] = disks[i]
}
}
for _, disk := range cdisks {
if disk == nil {
continue
}
// Write empty part file on missing disks.
disk.AppendFile(minioMetaTmpBucket, pathJoin(tempObj, "part.1"), []byte{})
// Write algorithm hash for empty part file.
var algorithm = DefaultBitrotAlgorithm
h := algorithm.New()
h.Write([]byte{})
// Update the checksums and part info.
validMeta.Erasure.Checksums = []ChecksumInfo{
{
Name: "part.1",
Algorithm: algorithm,
Hash: h.Sum(nil),
},
}
validMeta.Parts = []ObjectPartInfo{
{
Number: 1,
Name: "part.1",
},
}
// Write the `xl.json` with the newly calculated metadata.
writeXLMetadata(ctx, disk, minioMetaTmpBucket, tempObj, validMeta)
}
// Finally rename all the parts into their respective locations.
rename(ctx, cdisks, minioMetaTmpBucket, tempObj, bucket, object+xlCorruptedSuffix, true, writeQuorum, []error{errFileNotFound})
}
// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo. // getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo.
func (xl xlObjects) getObjectInfo(ctx context.Context, bucket, object string) (objInfo ObjectInfo, err error) { func (xl xlObjects) getObjectInfo(ctx context.Context, bucket, object string) (objInfo ObjectInfo, err error) {
disks := xl.getDisks() disks := xl.getDisks()
@ -531,28 +436,11 @@ func (xl xlObjects) getObjectInfo(ctx context.Context, bucket, object string) (o
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
metaArr, errs := readAllXLMetadata(ctx, disks, bucket, object) metaArr, errs := readAllXLMetadata(ctx, disks, bucket, object)
var readQuorum int readQuorum, _, err := objectQuorumFromMeta(ctx, xl, metaArr, errs)
// Having read quorum means we have xl.json in at least N/2 disks.
if !strings.HasSuffix(object, xlCorruptedSuffix) {
if validMeta, ok := xl.isObjectCorrupted(metaArr, errs); ok {
xl.renameCorruptedObject(ctx, bucket, object, validMeta, disks, errs)
// Return err file not found since we renamed now the corrupted object
return objInfo, errFileNotFound
}
// Not a corrupted object, attempt to get readquorum properly.
readQuorum, _, err = objectQuorumFromMeta(ctx, xl, metaArr, errs)
if err != nil { if err != nil {
return objInfo, err return objInfo, err
} }
} else {
// If this is a corrupted object, change read quorum to N/2 disks
// so it will be visible to users, so they can delete it.
readQuorum = len(xl.getDisks()) / 2
}
// List all the file commit ids from parts metadata. // List all the file commit ids from parts metadata.
modTimes := listObjectModtimes(metaArr, errs) modTimes := listObjectModtimes(metaArr, errs)

@ -308,7 +308,7 @@ func TestHealing(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
_, err = xl.HealObject(context.Background(), bucket, object, false) _, err = xl.HealObject(context.Background(), bucket, object, false, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -337,7 +337,7 @@ func TestHealing(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
_, err = xl.HealObject(context.Background(), bucket, object, false) _, err = xl.HealObject(context.Background(), bucket, object, false, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -359,7 +359,7 @@ func TestHealing(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
// This would create the bucket. // This would create the bucket.
_, err = xl.HealBucket(context.Background(), bucket, false) _, err = xl.HealBucket(context.Background(), bucket, false, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

@ -30,6 +30,7 @@ import (
type HealOpts struct { type HealOpts struct {
Recursive bool `json:"recursive"` Recursive bool `json:"recursive"`
DryRun bool `json:"dryRun"` DryRun bool `json:"dryRun"`
Remove bool `json:"remove"`
} }
// HealStartSuccess - holds information about a successfully started // HealStartSuccess - holds information about a successfully started

Loading…
Cancel
Save