diff --git a/cmd/erasure-common.go b/cmd/erasure-common.go index f95e47272..017c405dc 100644 --- a/cmd/erasure-common.go +++ b/cmd/erasure-common.go @@ -19,7 +19,6 @@ package cmd import ( "context" "sort" - "strings" "sync" "github.com/minio/minio/pkg/sync/errgroup" @@ -204,32 +203,12 @@ func (er erasureObjects) getLoadBalancedDisks(optimized bool) []StorageAPI { } // This function does the following check, suppose -// object is "a/b/c/d", stat makes sure that objects ""a/b/c"" -// "a/b" and "a" do not exist. +// object is "a/b/c/d", stat makes sure that objects +// - "a/b/c" +// - "a/b" +// - "a" +// do not exist on the namespace. func (er erasureObjects) parentDirIsObject(ctx context.Context, bucket, parent string) bool { - path := "" - segments := strings.Split(parent, slashSeparator) - for _, s := range segments { - if s == "" { - break - } - path += s - isObject, pathNotExist := er.isObject(ctx, bucket, path) - if pathNotExist { - return false - } - if isObject { - // If there is already a file at prefix "p", return true. - return true - } - path += slashSeparator - } - return false -} - -// isObject - returns `true` if the prefix is an object i.e if -// `xl.meta` exists at the leaf, false otherwise. -func (er erasureObjects) isObject(ctx context.Context, bucket, prefix string) (ok, pathDoesNotExist bool) { storageDisks := er.getDisks() g := errgroup.WithNErrs(len(storageDisks)) @@ -241,7 +220,7 @@ func (er erasureObjects) isObject(ctx context.Context, bucket, prefix string) (o return errDiskNotFound } // Check if 'prefix' is an object on this 'disk', else continue the check the next disk - return storageDisks[index].CheckFile(ctx, bucket, prefix) + return storageDisks[index].CheckFile(ctx, bucket, parent) }, index) } @@ -251,6 +230,5 @@ func (er erasureObjects) isObject(ctx context.Context, bucket, prefix string) (o // ignored if necessary. readQuorum := getReadQuorum(len(storageDisks)) - err := reduceReadQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, readQuorum) - return err == nil, err == errPathNotFound + return reduceReadQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, readQuorum) == nil } diff --git a/cmd/erasure-common_test.go b/cmd/erasure-common_test.go index 483d8bda6..f88af2b28 100644 --- a/cmd/erasure-common_test.go +++ b/cmd/erasure-common_test.go @@ -28,7 +28,7 @@ func TestErasureParentDirIsObject(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - obj, fsDisks, err := prepareErasure16(ctx) + obj, fsDisks, err := prepareErasureSets32(ctx) if err != nil { t.Fatalf("Unable to initialize 'Erasure' object layer.") } @@ -45,59 +45,38 @@ func TestErasureParentDirIsObject(t *testing.T) { if err = obj.MakeBucketWithLocation(GlobalContext, bucketName, BucketOptions{}); err != nil { t.Fatal(err) } + objectContent := "12345" - objInfo, err := obj.PutObject(GlobalContext, bucketName, objectName, + _, err = obj.PutObject(GlobalContext, bucketName, objectName, mustGetPutObjReader(t, bytes.NewReader([]byte(objectContent)), int64(len(objectContent)), "", ""), ObjectOptions{}) if err != nil { t.Fatal(err) } - if objInfo.Name != objectName { - t.Fatalf("Unexpected object name returned got %s, expected %s", objInfo.Name, objectName) - } - z := obj.(*erasureServerPools) - xl := z.serverPools[0].sets[0] testCases := []struct { - parentIsObject bool - objectName string + expectedErr bool + objectName string }{ - // parentIsObject is true if object is available. - { - parentIsObject: true, - objectName: objectName, - }, - { - parentIsObject: false, - objectName: "new-dir", - }, - { - parentIsObject: false, - objectName: "", - }, - { - parentIsObject: false, - objectName: ".", - }, - // Should not cause infinite loop. - { - parentIsObject: false, - objectName: SlashSeparator, - }, { - parentIsObject: false, - objectName: "\\", + expectedErr: true, + objectName: pathJoin(objectName, "parent-is-object"), }, - // Should not cause infinite loop with double forward slash. { - parentIsObject: false, - objectName: "//", + expectedErr: false, + objectName: pathJoin("no-parent", "object"), }, } - for i, testCase := range testCases { - gotValue := xl.parentDirIsObject(GlobalContext, bucketName, testCase.objectName) - if testCase.parentIsObject != gotValue { - t.Errorf("Test %d: Unexpected value returned got %t, expected %t", i+1, gotValue, testCase.parentIsObject) - } + for _, testCase := range testCases { + t.Run("", func(t *testing.T) { + _, err = obj.PutObject(GlobalContext, bucketName, testCase.objectName, + mustGetPutObjReader(t, bytes.NewReader([]byte(objectContent)), int64(len(objectContent)), "", ""), ObjectOptions{}) + if testCase.expectedErr && err == nil { + t.Error("Expected error but got nil") + } + if !testCase.expectedErr && err != nil { + t.Errorf("Expected nil but got %v", err) + } + }) } } diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 3a3d2fb00..f5d0f8961 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -774,6 +774,9 @@ func (s *erasureSets) GetObject(ctx context.Context, bucket, object string, star } func (s *erasureSets) parentDirIsObject(ctx context.Context, bucket, parent string) bool { + if parent == "." { + return false + } return s.getHashedSet(parent).parentDirIsObject(ctx, bucket, parent) } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 34534efae..edd537965 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -1778,6 +1778,12 @@ func (s *xlStorage) CheckParts(ctx context.Context, volume string, path string, } // CheckFile check if path has necessary metadata. +// This function does the following check, suppose +// you are creating a metadata file at "a/b/c/d/xl.meta", +// makes sure that there is no `xl.meta` at +// - "a/b/c/" +// - "a/b/" +// - "a/" func (s *xlStorage) CheckFile(ctx context.Context, volume string, path string) error { atomic.AddInt32(&s.activeIOCount, 1) defer func() { @@ -1789,42 +1795,45 @@ func (s *xlStorage) CheckFile(ctx context.Context, volume string, path string) e return err } - // Stat a volume entry. - _, err = os.Stat(pathJoin(volumeDir, path)) - if err != nil { - if osIsNotExist(err) { + var checkFile func(p string) error + checkFile = func(p string) error { + if p == "." || p == SlashSeparator { return errPathNotFound } - return err - } - filePath := pathJoin(volumeDir, path, xlStorageFormatFile) - if err = checkPathLength(filePath); err != nil { - return err - } + filePath := pathJoin(volumeDir, p, xlStorageFormatFile) + if err := checkPathLength(filePath); err != nil { + return err + } - filePathOld := pathJoin(volumeDir, path, xlStorageFormatFileV1) - if err = checkPathLength(filePathOld); err != nil { - return err - } + st, _ := os.Lstat(filePath) + if st == nil { + if s.formatLegacy { + filePathOld := pathJoin(volumeDir, p, xlStorageFormatFileV1) + if err := checkPathLength(filePathOld); err != nil { + return err + } - st, err := os.Stat(filePath) - if err != nil && !osIsNotExist(err) { - return osErrToFileErr(err) - } - if st == nil { - st, err = os.Stat(filePathOld) - if err != nil { - return osErrToFileErr(err) + st, _ = os.Lstat(filePathOld) + if st == nil { + return errPathNotFound + } + } } - } - // If its a directory its not a regular file. - if st.Mode().IsDir() || st.Size() == 0 { - return errFileNotFound + if st != nil { + if !st.Mode().IsRegular() { + // not a regular file return error. + return errFileNotFound + } + // Success fully found + return nil + } + + return checkFile(slashpath.Dir(p)) } - return nil + return checkFile(path) } // deleteFile deletes a file or a directory if its empty unless recursive @@ -1837,8 +1846,8 @@ func deleteFile(basePath, deletePath string, recursive bool) error { return nil } isObjectDir := HasSuffix(deletePath, SlashSeparator) - basePath = filepath.Clean(basePath) - deletePath = filepath.Clean(deletePath) + basePath = slashpath.Clean(basePath) + deletePath = slashpath.Clean(deletePath) if !strings.HasPrefix(deletePath, basePath) || deletePath == basePath { return nil } @@ -1872,7 +1881,7 @@ func deleteFile(basePath, deletePath string, recursive bool) error { } } - deletePath = filepath.Dir(deletePath) + deletePath = slashpath.Dir(deletePath) // Delete parent directory obviously not recursively. Errors for // parent directories shouldn't trickle down. diff --git a/cmd/xl-storage_test.go b/cmd/xl-storage_test.go index c37ee0dab..3d24ea87f 100644 --- a/cmd/xl-storage_test.go +++ b/cmd/xl-storage_test.go @@ -1657,6 +1657,10 @@ func TestXLStorageCheckFile(t *testing.T) { t.Fatalf("Unable to create file, %s", err) } + if err := xlStorage.MakeVol(context.Background(), "success-vol/path/to/"+xlStorageFormatFile); err != nil { + t.Fatalf("Unable to create path, %s", err) + } + testCases := []struct { srcVol string srcPath string @@ -1695,7 +1699,7 @@ func TestXLStorageCheckFile(t *testing.T) { { srcVol: "success-vol", srcPath: "path", - expectedErr: errFileNotFound, + expectedErr: errPathNotFound, }, // TestXLStorage case - 6. // TestXLStorage case with non existent volume. @@ -1704,6 +1708,13 @@ func TestXLStorageCheckFile(t *testing.T) { srcPath: "success-file", expectedErr: errPathNotFound, }, + // TestXLStorage case - 7. + // TestXLStorage case with file with directory. + { + srcVol: "success-vol", + srcPath: "path/to", + expectedErr: errFileNotFound, + }, } for i, testCase := range testCases {