From c9d502e6fadf2c2cbb1e2ce7a9dc8de7dfed1d34 Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Sat, 2 Jan 2021 21:01:29 +0100 Subject: [PATCH] parentDirIsObject() to return quickly with inexistant parent (#11204) Rewrite parentIsObject() function. Currently if a client uploads a/b/c/d, we always check if c, b, a are actual objects or not. The new code will check with the reverse order and quickly quit if the segment doesn't exist. So if a, b, c in 'a/b/c' does not exist in the first place, then returns false quickly. --- cmd/erasure-common.go | 26 ++++++++++++++++---------- cmd/erasure-common_test.go | 4 ++++ cmd/storage-errors.go | 3 +++ cmd/storage-rest-client.go | 2 ++ cmd/xl-storage.go | 4 ++-- cmd/xl-storage_test.go | 6 +++--- 6 files changed, 30 insertions(+), 15 deletions(-) diff --git a/cmd/erasure-common.go b/cmd/erasure-common.go index 02252f508..f95e47272 100644 --- a/cmd/erasure-common.go +++ b/cmd/erasure-common.go @@ -18,8 +18,8 @@ package cmd import ( "context" - "path" "sort" + "strings" "sync" "github.com/minio/minio/pkg/sync/errgroup" @@ -207,24 +207,29 @@ func (er erasureObjects) getLoadBalancedDisks(optimized bool) []StorageAPI { // object is "a/b/c/d", stat makes sure that objects ""a/b/c"" // "a/b" and "a" do not exist. func (er erasureObjects) parentDirIsObject(ctx context.Context, bucket, parent string) bool { - var isParentDirObject func(string) bool - isParentDirObject = func(p string) bool { - if p == "." || p == SlashSeparator { + path := "" + segments := strings.Split(parent, slashSeparator) + for _, s := range segments { + if s == "" { + break + } + path += s + isObject, pathNotExist := er.isObject(ctx, bucket, path) + if pathNotExist { return false } - if er.isObject(ctx, bucket, p) { + if isObject { // If there is already a file at prefix "p", return true. return true } - // Check if there is a file as one of the parent paths. - return isParentDirObject(path.Dir(p)) + path += slashSeparator } - return isParentDirObject(parent) + return false } // isObject - returns `true` if the prefix is an object i.e if // `xl.meta` exists at the leaf, false otherwise. -func (er erasureObjects) isObject(ctx context.Context, bucket, prefix string) (ok bool) { +func (er erasureObjects) isObject(ctx context.Context, bucket, prefix string) (ok, pathDoesNotExist bool) { storageDisks := er.getDisks() g := errgroup.WithNErrs(len(storageDisks)) @@ -246,5 +251,6 @@ func (er erasureObjects) isObject(ctx context.Context, bucket, prefix string) (o // ignored if necessary. readQuorum := getReadQuorum(len(storageDisks)) - return reduceReadQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, readQuorum) == nil + err := reduceReadQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, readQuorum) + return err == nil, err == errPathNotFound } diff --git a/cmd/erasure-common_test.go b/cmd/erasure-common_test.go index cac436215..483d8bda6 100644 --- a/cmd/erasure-common_test.go +++ b/cmd/erasure-common_test.go @@ -66,6 +66,10 @@ func TestErasureParentDirIsObject(t *testing.T) { parentIsObject: true, objectName: objectName, }, + { + parentIsObject: false, + objectName: "new-dir", + }, { parentIsObject: false, objectName: "", diff --git a/cmd/storage-errors.go b/cmd/storage-errors.go index e15e10109..86e1216cf 100644 --- a/cmd/storage-errors.go +++ b/cmd/storage-errors.go @@ -67,6 +67,9 @@ var errVolumeExists = StorageErr("volume already exists") // errIsNotRegular - not of regular file type. var errIsNotRegular = StorageErr("not of regular file type") +// errPathNotFound - cannot find the path. +var errPathNotFound = StorageErr("path not found") + // errVolumeNotFound - cannot find the volume. var errVolumeNotFound = StorageErr("volume not found") diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 0006c88b9..ece9c387c 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -82,6 +82,8 @@ func toStorageErr(err error) error { return errFileNameTooLong case errFileAccessDenied.Error(): return errFileAccessDenied + case errPathNotFound.Error(): + return errPathNotFound case errIsNotRegular.Error(): return errIsNotRegular case errVolumeNotEmpty.Error(): diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 6a5819cee..909f7140b 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -1757,10 +1757,10 @@ func (s *xlStorage) CheckFile(ctx context.Context, volume string, path string) e } // Stat a volume entry. - _, err = os.Stat(volumeDir) + _, err = os.Stat(pathJoin(volumeDir, path)) if err != nil { if osIsNotExist(err) { - return errVolumeNotFound + return errPathNotFound } return err } diff --git a/cmd/xl-storage_test.go b/cmd/xl-storage_test.go index 5abafe5cc..97ec5eb18 100644 --- a/cmd/xl-storage_test.go +++ b/cmd/xl-storage_test.go @@ -1596,14 +1596,14 @@ func TestXLStorageCheckFile(t *testing.T) { { srcVol: "success-vol", srcPath: "nonexistent-file", - expectedErr: errFileNotFound, + expectedErr: errPathNotFound, }, // TestXLStorage case - 4. // TestXLStorage case with non-existent file path. { srcVol: "success-vol", srcPath: "path/2/success-file", - expectedErr: errFileNotFound, + expectedErr: errPathNotFound, }, // TestXLStorage case - 5. // TestXLStorage case with path being a directory. @@ -1617,7 +1617,7 @@ func TestXLStorageCheckFile(t *testing.T) { { srcVol: "non-existent-vol", srcPath: "success-file", - expectedErr: errVolumeNotFound, + expectedErr: errPathNotFound, }, }