From b6c00405ec5c637232499f3c49835e090829a55a Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 15 Feb 2019 16:21:19 -0800 Subject: [PATCH] Do not pro-actively return false in isObjectDir() (#7246) We should change the logic for both isObject() and isObjectDir() leaf detection to be done with quorum, due to how our directory navigation works - this allows for properly deleting all the dangling directories or objects if any. --- cmd/object-api-multipart_test.go | 4 +- cmd/xl-sets.go | 80 +------------------------------- cmd/xl-v1-common.go | 69 +++++++++++++-------------- 3 files changed, 37 insertions(+), 116 deletions(-) diff --git a/cmd/object-api-multipart_test.go b/cmd/object-api-multipart_test.go index ab701ff72..8fcd550bb 100644 --- a/cmd/object-api-multipart_test.go +++ b/cmd/object-api-multipart_test.go @@ -239,8 +239,8 @@ func testPutObjectPartDiskNotFound(obj ObjectLayer, instanceType string, disks [ if err == nil { t.Fatalf("Test %s: expected to fail but passed instead", instanceType) } - // as majority of xl.json are not available, we expect InsufficientReadQuorum while trying to fetch the object quorum - expectedErr := InsufficientReadQuorum{} + // as majority of xl.json are not available, we expect uploadID to be not available. + expectedErr := InvalidUploadID{UploadID: testCase.uploadID} if err.Error() != expectedErr.Error() { t.Fatalf("Test %s: expected error %s, got %s instead.", instanceType, expectedErr, err) } diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 9e7d59119..ab3f60e8e 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -1323,82 +1323,6 @@ func (s *xlSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) { return listBuckets, nil } -// Returns function "listDir" of the type listDirFunc. -// isLeaf - is used by listDir function to check if an entry is a leaf or non-leaf entry. -// disks - used for doing disk.ListDir(). Sets passes set of disks. -func listDirSetsHealFactory(isLeaf isLeafFunc, sets ...[]StorageAPI) listDirFunc { - listDirInternal := func(bucket, prefixDir, prefixEntry string, disks []StorageAPI) (mergedEntries []string) { - for _, disk := range disks { - if disk == nil { - continue - } - - var entries []string - var newEntries []string - var err error - entries, err = disk.ListDir(bucket, prefixDir, -1) - if err != nil { - continue - } - - // Filter entries that have the prefix prefixEntry. - entries = filterMatchingPrefix(entries, prefixEntry) - - // isLeaf() check has to happen here so that - // trailing "/" for objects can be removed. - for i, entry := range entries { - if isLeaf(bucket, pathJoin(prefixDir, entry)) { - entries[i] = strings.TrimSuffix(entry, slashSeparator) - } - } - - // Find elements in entries which are not in mergedEntries - for _, entry := range entries { - idx := sort.SearchStrings(mergedEntries, entry) - // if entry is already present in mergedEntries don't add. - if idx < len(mergedEntries) && mergedEntries[idx] == entry { - continue - } - newEntries = append(newEntries, entry) - } - - if len(newEntries) > 0 { - // Merge the entries and sort it. - mergedEntries = append(mergedEntries, newEntries...) - sort.Strings(mergedEntries) - } - } - return mergedEntries - - } - - // listDir - lists all the entries at a given prefix and given entry in the prefix. - listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string, delayIsLeaf bool) { - for _, disks := range sets { - entries := listDirInternal(bucket, prefixDir, prefixEntry, disks) - - var newEntries []string - // Find elements in entries which are not in mergedEntries - for _, entry := range entries { - idx := sort.SearchStrings(mergedEntries, entry) - // if entry is already present in mergedEntries don't add. - if idx < len(mergedEntries) && mergedEntries[idx] == entry { - continue - } - newEntries = append(newEntries, entry) - } - - if len(newEntries) > 0 { - // Merge the entries and sort it. - mergedEntries = append(mergedEntries, newEntries...) - sort.Strings(mergedEntries) - } - } - return mergedEntries, false - } - return listDir -} - // listObjectsHeal - wrapper function implemented over file tree walk. func (s *xlSets) listObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) { // Default is recursive, if delimiter is set then list non recursive. @@ -1434,8 +1358,8 @@ func (s *xlSets) listObjectsHeal(ctx context.Context, bucket, prefix, marker, de setDisks = append(setDisks, set.getLoadBalancedDisks()) } - listDir := listDirSetsHealFactory(isLeaf, setDisks...) - walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, nil, isLeafDir, endWalkCh) + listDir := listDirSetsFactory(ctx, isLeaf, isLeafDir, setDisks...) + walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh) } var objInfos []ObjectInfo diff --git a/cmd/xl-v1-common.go b/cmd/xl-v1-common.go index 35b9eafa9..d740c9986 100644 --- a/cmd/xl-v1-common.go +++ b/cmd/xl-v1-common.go @@ -20,8 +20,6 @@ import ( "context" "path" "sync" - - "github.com/minio/minio/cmd/logger" ) // getLoadBalancedDisks - fetches load balanced (sufficiently randomized) disk slice. @@ -52,31 +50,44 @@ func (xl xlObjects) parentDirIsObject(ctx context.Context, bucket, parent string return isParentDirObject(parent) } -var xlTreeWalkIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errVolumeNotFound, errFileNotFound) - // isObject - returns `true` if the prefix is an object i.e if // `xl.json` exists at the leaf, false otherwise. func (xl xlObjects) isObject(bucket, prefix string) (ok bool) { - for _, disk := range xl.getLoadBalancedDisks() { + var errs = make([]error, len(xl.getDisks())) + var wg sync.WaitGroup + for index, disk := range xl.getDisks() { if disk == nil { continue } - // Check if 'prefix' is an object on this 'disk', else continue the check the next disk - _, err := disk.StatFile(bucket, path.Join(prefix, xlMetaJSONFile)) - if err == nil { - return true - } - // Ignore for file not found, disk not found or faulty disk. - if IsErrIgnored(err, xlTreeWalkIgnoredErrs...) { - continue - } - } // Exhausted all disks - return false. - return false + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + // Check if 'prefix' is an object on this 'disk', else continue the check the next disk + fi, err := disk.StatFile(bucket, path.Join(prefix, xlMetaJSONFile)) + if err != nil { + errs[index] = err + return + } + if fi.Size == 0 { + errs[index] = errCorruptedFormat + return + } + }(index, disk) + } + + wg.Wait() + + // NOTE: Observe we are not trying to read `xl.json` and figure out the actual + // quorum intentionally, but rely on the default case scenario. Actual quorum + // verification will happen by top layer by using getObjectInfo() and will be + // ignored if necessary. + readQuorum := len(xl.getDisks()) / 2 + + return reduceReadQuorumErrs(context.Background(), errs, objectOpIgnoredErrs, readQuorum) == nil } // isObjectDir returns if the specified path represents an empty directory. func (xl xlObjects) isObjectDir(bucket, prefix string) (ok bool) { - var objectDirs = make([]bool, len(xl.getDisks())) var errs = make([]error, len(xl.getDisks())) var wg sync.WaitGroup for index, disk := range xl.getDisks() { @@ -92,7 +103,10 @@ func (xl xlObjects) isObjectDir(bucket, prefix string) (ok bool) { errs[index] = err return } - objectDirs[index] = len(entries) == 0 + if len(entries) > 0 { + errs[index] = errVolumeNotEmpty + return + } }(index, disk) } @@ -100,22 +114,5 @@ func (xl xlObjects) isObjectDir(bucket, prefix string) (ok bool) { readQuorum := len(xl.getDisks()) / 2 - err := reduceReadQuorumErrs(context.Background(), errs, xlTreeWalkIgnoredErrs, readQuorum) - if err != nil { - reqInfo := &logger.ReqInfo{BucketName: bucket} - reqInfo.AppendTags("prefix", prefix) - ctx := logger.SetReqInfo(context.Background(), reqInfo) - logger.LogIf(ctx, err) - return false - } - - for _, ok := range objectDirs { - if !ok { - // We perhaps found some entries in a prefix. - return false - } - } - - // Return true if and only if the all directories are empty. - return true + return reduceReadQuorumErrs(context.Background(), errs, objectOpIgnoredErrs, readQuorum) == nil }