fix: dangling object delete shall return object doesn't exist (#10961)

dangling object when deleted means object doesn't exist
anymore, so we should return appropriate errors, this
allows crawler heal to ensure that it removes the tracker
for dangling objects.
master
Harshavardhana 4 years ago committed by GitHub
parent 75a8e81f8f
commit 6990de9c94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      cmd/erasure-healing.go

@ -303,8 +303,12 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
} }
if isAllNotFound(errs) { if isAllNotFound(errs) {
err = toObjectErr(errFileNotFound, bucket, object)
if versionID != "" {
err = toObjectErr(errFileVersionNotFound, bucket, object, versionID)
}
// File is fully gone, fileInfo is empty. // File is fully gone, fileInfo is empty.
return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs, bucket, object, versionID), nil return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs, bucket, object, versionID), err
} }
// If less than read quorum number of disks have all the parts // If less than read quorum number of disks have all the parts
@ -535,7 +539,8 @@ func (er erasureObjects) healObjectDir(ctx context.Context, bucket, object strin
} }
} }
if dryRun || danglingObject || isAllNotFound(errs) { if dryRun || danglingObject || isAllNotFound(errs) {
return hr, nil // Nothing to do, file is already gone.
return hr, toObjectErr(errFileNotFound, bucket, object)
} }
for i, err := range errs { for i, err := range errs {
if err == errVolumeNotFound || err == errFileNotFound { if err == errVolumeNotFound || err == errFileNotFound {
@ -688,6 +693,7 @@ func (er erasureObjects) purgeObjectDangling(ctx context.Context, bucket, object
writeQuorum = getWriteQuorum(len(storageDisks)) writeQuorum = getWriteQuorum(len(storageDisks))
} }
var err error var err error
var returnNotFound bool
if !opts.DryRun && opts.Remove { if !opts.DryRun && opts.Remove {
if versionID == "" { if versionID == "" {
err = er.deleteObject(ctx, bucket, object, writeQuorum) err = er.deleteObject(ctx, bucket, object, writeQuorum)
@ -705,6 +711,18 @@ func (er erasureObjects) purgeObjectDangling(ctx context.Context, bucket, object
// Dangling object successfully purged, size is '0' // Dangling object successfully purged, size is '0'
m.Size = 0 m.Size = 0
} }
// Delete successfully purged dangling content, return ObjectNotFound/VersionNotFound instead.
if countErrs(errs, nil) == len(errs) {
returnNotFound = true
}
}
if returnNotFound {
err = toObjectErr(errFileNotFound, bucket, object)
if versionID != "" {
err = toObjectErr(errFileVersionNotFound, bucket, object, versionID)
}
return defaultHealResult(m, storageDisks, storageEndpoints, errs, bucket, object, versionID), err
} }
return defaultHealResult(m, storageDisks, storageEndpoints, errs, bucket, object, versionID), toObjectErr(err, bucket, object, versionID) return defaultHealResult(m, storageDisks, storageEndpoints, errs, bucket, object, versionID), toObjectErr(err, bucket, object, versionID)
} }
@ -796,8 +814,12 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID) partsMetadata, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID)
if isAllNotFound(errs) { if isAllNotFound(errs) {
err = toObjectErr(errFileNotFound, bucket, object)
if versionID != "" {
err = toObjectErr(errFileVersionNotFound, bucket, object, versionID)
}
// Nothing to do, file is already gone. // Nothing to do, file is already gone.
return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs, bucket, object, versionID), nil return defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs, bucket, object, versionID), err
} }
fi, err := getLatestFileInfo(healCtx, partsMetadata, errs) fi, err := getLatestFileInfo(healCtx, partsMetadata, errs)

Loading…
Cancel
Save