diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index fe575d805..d3f4e584f 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -357,8 +357,16 @@ func (c cacheObjects) listCacheObjects(ctx context.Context, bucket, prefix, mark return err == nil } + isLeafDir := func(bucket, object string) bool { + fs, err := c.cache.getCacheFS(ctx, bucket, object) + if err != nil { + return false + } + return fs.isObjectDir(bucket, object) + } + listDir := listDirCacheFactory(isLeaf, cacheTreeWalkIgnoredErrs, c.cache.cfs) - walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, endWalkCh) + walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh) } for i := 0; i < maxKeys; { @@ -417,7 +425,7 @@ func (c cacheObjects) listCacheObjects(ctx context.Context, bucket, prefix, mark result = ListObjectsInfo{IsTruncated: !eof} for _, objInfo := range objInfos { result.NextMarker = objInfo.Name - if objInfo.IsDir { + if objInfo.IsDir && delimiter == slashSeparator { result.Prefixes = append(result.Prefixes, objInfo.Name) continue } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index b4527b6f0..79ea9ce75 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -627,6 +627,10 @@ func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object s return oi, toObjectErr(err, bucket) } + if strings.HasSuffix(object, slashSeparator) && !fs.isObjectDir(bucket, object) { + return oi, errFileNotFound + } + return fs.getObjectInfo(ctx, bucket, object) } @@ -890,6 +894,17 @@ func (fs *FSObjects) listDirFactory(isLeaf isLeafFunc) listDirFunc { return listDir } +// isObjectDir returns true if the specified bucket & prefix exists +// and the prefix represents an empty directory. An S3 empty directory +// is also an empty directory in the FS backend. +func (fs *FSObjects) isObjectDir(bucket, prefix string) bool { + entries, err := readDirN(pathJoin(fs.fsPath, bucket, prefix), 1) + if err != nil { + return false + } + return len(entries) == 0 +} + // getObjectETag is a helper function, which returns only the md5sum // of the file on the disk. func (fs *FSObjects) getObjectETag(ctx context.Context, bucket, entry string, lock bool) (string, error) { @@ -1019,8 +1034,15 @@ func (fs *FSObjects) ListObjects(ctx context.Context, bucket, prefix, marker, de // object string does not end with "/". return !hasSuffix(object, slashSeparator) } + // Return true if the specified object is an empty directory + isLeafDir := func(bucket, object string) bool { + if !hasSuffix(object, slashSeparator) { + return false + } + return fs.isObjectDir(bucket, object) + } listDir := fs.listDirFactory(isLeaf) - walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, endWalkCh) + walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh) } var objInfos []ObjectInfo @@ -1065,7 +1087,7 @@ func (fs *FSObjects) ListObjects(ctx context.Context, bucket, prefix, marker, de result := ListObjectsInfo{IsTruncated: !eof} for _, objInfo := range objInfos { result.NextMarker = objInfo.Name - if objInfo.IsDir { + if objInfo.IsDir && delimiter == slashSeparator { result.Prefixes = append(result.Prefixes, objInfo.Name) continue } diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 7a6f5b142..774edd7ea 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -108,11 +108,11 @@ func (d *naughtyDisk) DeleteVol(volume string) (err error) { return d.disk.DeleteVol(volume) } -func (d *naughtyDisk) ListDir(volume, path string) (entries []string, err error) { +func (d *naughtyDisk) ListDir(volume, path string, count int) (entries []string, err error) { if err := d.calcError(); err != nil { return []string{}, err } - return d.disk.ListDir(volume, path) + return d.disk.ListDir(volume, path, count) } func (d *naughtyDisk) ReadFile(volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error) { diff --git a/cmd/object-api-common.go b/cmd/object-api-common.go index 81e1f0e02..142fd9739 100644 --- a/cmd/object-api-common.go +++ b/cmd/object-api-common.go @@ -116,7 +116,7 @@ func cleanupDir(ctx context.Context, storage StorageAPI, volume, dirPath string) } // If it's a directory, list and call delFunc() for each entry. - entries, err := storage.ListDir(volume, entryPath) + entries, err := storage.ListDir(volume, entryPath, -1) // If entryPath prefix never existed, safe to ignore. if err == errFileNotFound { return nil diff --git a/cmd/posix-list-dir-nix.go b/cmd/posix-list-dir-nix.go index fd00c8dc2..d896decbd 100644 --- a/cmd/posix-list-dir-nix.go +++ b/cmd/posix-list-dir-nix.go @@ -112,6 +112,12 @@ var readDirBufPool = sync.Pool{ // Return all the entries at the directory dirPath. func readDir(dirPath string) (entries []string, err error) { + return readDirN(dirPath, -1) +} + +// Return count entries at the directory dirPath and all entries +// if count is set to -1 +func readDirN(dirPath string, count int) (entries []string, err error) { bufp := readDirBufPool.Get().(*[]byte) buf := *bufp defer readDirBufPool.Put(bufp) @@ -135,7 +141,11 @@ func readDir(dirPath string) (entries []string, err error) { defer d.Close() fd := int(d.Fd()) - for { + + remaining := count + done := false + + for !done { nbuf, err := syscall.ReadDirent(fd, buf) if err != nil { return nil, err @@ -147,6 +157,13 @@ func readDir(dirPath string) (entries []string, err error) { if tmpEntries, err = parseDirents(dirPath, buf[:nbuf]); err != nil { return nil, err } + if count > 0 { + if remaining <= len(tmpEntries) { + tmpEntries = tmpEntries[:remaining] + done = true + } + remaining -= len(tmpEntries) + } entries = append(entries, tmpEntries...) } return diff --git a/cmd/posix-list-dir-others.go b/cmd/posix-list-dir-others.go index 4cadd79ff..fbc3ed930 100644 --- a/cmd/posix-list-dir-others.go +++ b/cmd/posix-list-dir-others.go @@ -30,7 +30,12 @@ import ( // Return all the entries at the directory dirPath. func readDir(dirPath string) (entries []string, err error) { - d, err := os.Open((dirPath)) + return readDirN(dirPath, -1) +} + +// Return N entries at the directory dirPath. If count is -1, return all entries +func readDirN(dirPath string, count int) (entries []string, err error) { + d, err := os.Open(dirPath) if err != nil { // File is really not found. if os.IsNotExist(err) { @@ -45,20 +50,34 @@ func readDir(dirPath string) (entries []string, err error) { } defer d.Close() - for { - // Read 1000 entries. - fis, err := d.Readdir(1000) + maxEntries := 1000 + if count > 0 && count < maxEntries { + maxEntries = count + } + + done := false + remaining := count + + for !done { + // Read up to max number of entries. + fis, err := d.Readdir(maxEntries) if err != nil { if err == io.EOF { break } return nil, err } + if count > 0 { + if remaining <= len(fis) { + fis = fis[:remaining] + done = true + } + } for _, fi := range fis { // Stat symbolic link and follow to get the final value. if fi.Mode()&os.ModeSymlink == os.ModeSymlink { var st os.FileInfo - st, err = os.Stat((path.Join(dirPath, fi.Name()))) + st, err = os.Stat(path.Join(dirPath, fi.Name())) if err != nil { reqInfo := (&logger.ReqInfo{}).AppendTags("path", path.Join(dirPath, fi.Name())) ctx := logger.SetReqInfo(context.Background(), reqInfo) @@ -71,6 +90,9 @@ func readDir(dirPath string) (entries []string, err error) { } else if st.Mode().IsRegular() { entries = append(entries, fi.Name()) } + if count > 0 { + remaining-- + } continue } if fi.Mode().IsDir() { @@ -79,6 +101,9 @@ func readDir(dirPath string) (entries []string, err error) { } else if fi.Mode().IsRegular() { entries = append(entries, fi.Name()) } + if count > 0 { + remaining-- + } } } return entries, nil diff --git a/cmd/posix-list-dir_test.go b/cmd/posix-list-dir_test.go index 30d85b862..a704d2894 100644 --- a/cmd/posix-list-dir_test.go +++ b/cmd/posix-list-dir_test.go @@ -201,3 +201,41 @@ func TestReadDir(t *testing.T) { } } } + +func TestReadDirN(t *testing.T) { + testCases := []struct { + numFiles int + n int + expectedNum int + }{ + {0, 0, 0}, + {0, 1, 0}, + {1, 0, 1}, + {0, -1, 0}, + {1, -1, 1}, + {10, -1, 10}, + {1, 1, 1}, + {2, 1, 1}, + {10, 9, 9}, + {10, 10, 10}, + {10, 11, 10}, + } + + for i, testCase := range testCases { + dir := mustSetupDir(t) + defer os.RemoveAll(dir) + + for c := 1; c <= testCase.numFiles; c++ { + if err := ioutil.WriteFile(filepath.Join(dir, fmt.Sprintf("%d", c)), []byte{}, os.ModePerm); err != nil { + t.Fatalf("Unable to create a file, %s", err) + } + } + entries, err := readDirN(dir, testCase.n) + if err != nil { + t.Fatalf("Unable to read entries, %s", err) + } + if len(entries) != testCase.expectedNum { + t.Fatalf("Test %d: unexpected number of entries, waiting for %d, but found %d", i+1, testCase.expectedNum, len(entries)) + } + } +} diff --git a/cmd/posix.go b/cmd/posix.go index 650fc0841..463e74be1 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -467,7 +467,7 @@ func (s *posix) DeleteVol(volume string) (err error) { // ListDir - return all the entries at the given directory path. // If an entry is a directory it will be returned with a trailing "/". -func (s *posix) ListDir(volume, dirPath string) (entries []string, err error) { +func (s *posix) ListDir(volume, dirPath string, count int) (entries []string, err error) { defer func() { if err == syscall.EIO { atomic.AddInt32(&s.ioErrCount, 1) @@ -495,7 +495,12 @@ func (s *posix) ListDir(volume, dirPath string) (entries []string, err error) { } return nil, err } - return readDir(pathJoin(volumeDir, dirPath)) + + dirPath = pathJoin(volumeDir, dirPath) + if count > 0 { + return readDirN(dirPath, count) + } + return readDir(dirPath) } // ReadAll reads from r until an error or EOF and returns the data it read. diff --git a/cmd/posix_test.go b/cmd/posix_test.go index b89178a12..54aff5077 100644 --- a/cmd/posix_test.go +++ b/cmd/posix_test.go @@ -785,7 +785,7 @@ func TestPosixPosixListDir(t *testing.T) { } else { t.Errorf("Expected the StorageAPI to be of type *posix") } - dirList, err = posixStorage.ListDir(testCase.srcVol, testCase.srcPath) + dirList, err = posixStorage.ListDir(testCase.srcVol, testCase.srcPath, -1) if err != testCase.expectedErr { t.Fatalf("TestPosix case %d: Expected: \"%s\", got: \"%s\"", i+1, testCase.expectedErr, err) } diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index d6ed13ab9..575932f90 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -39,7 +39,7 @@ type StorageAPI interface { DeleteVol(volume string) (err error) // File operations. - ListDir(volume, dirPath string) ([]string, error) + ListDir(volume, dirPath string, count int) ([]string, error) ReadFile(volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error) PrepareFile(volume string, path string, len int64) (err error) AppendFile(volume string, path string, buf []byte) (err error) diff --git a/cmd/storage-rpc-client.go b/cmd/storage-rpc-client.go index 8454671ae..7d9216d0c 100644 --- a/cmd/storage-rpc-client.go +++ b/cmd/storage-rpc-client.go @@ -283,10 +283,11 @@ func (n *networkStorage) ReadFile(volume string, path string, offset int64, buff } // ListDir - list all entries at prefix. -func (n *networkStorage) ListDir(volume, path string) (entries []string, err error) { +func (n *networkStorage) ListDir(volume, path string, count int) (entries []string, err error) { if err = n.call("Storage.ListDirHandler", &ListDirArgs{ - Vol: volume, - Path: path, + Vol: volume, + Path: path, + Count: count, }, &entries); err != nil { return nil, err } diff --git a/cmd/storage-rpc-client_test.go b/cmd/storage-rpc-client_test.go index b762ae899..c15bb8302 100644 --- a/cmd/storage-rpc-client_test.go +++ b/cmd/storage-rpc-client_test.go @@ -435,7 +435,7 @@ func (s *TestRPCStorageSuite) testRPCStorageListDir(t *testing.T) { t.Error("Unable to initiate MakeVol", err) } } - dirs, err := storageDisk.ListDir("myvol", "") + dirs, err := storageDisk.ListDir("myvol", "", -1) if err != nil { t.Error(err) } @@ -448,7 +448,7 @@ func (s *TestRPCStorageSuite) testRPCStorageListDir(t *testing.T) { t.Error("Unable to initiate DeleteVol", err) } } - dirs, err = storageDisk.ListDir("myvol", "") + dirs, err = storageDisk.ListDir("myvol", "", -1) if err != nil { t.Error(err) } diff --git a/cmd/storage-rpc-server-datatypes.go b/cmd/storage-rpc-server-datatypes.go index b417e3da1..9db35d2fa 100644 --- a/cmd/storage-rpc-server-datatypes.go +++ b/cmd/storage-rpc-server-datatypes.go @@ -134,6 +134,9 @@ type ListDirArgs struct { // Name of the path. Path string + + // Number of wanted results + Count int } // RenameFileArgs represents rename file RPC arguments. diff --git a/cmd/storage-rpc-server.go b/cmd/storage-rpc-server.go index f29782c1d..1cf9b4c51 100644 --- a/cmd/storage-rpc-server.go +++ b/cmd/storage-rpc-server.go @@ -120,7 +120,7 @@ func (s *storageServer) ListDirHandler(args *ListDirArgs, reply *[]string) error return err } - entries, err := s.storage.ListDir(args.Vol, args.Path) + entries, err := s.storage.ListDir(args.Vol, args.Path, args.Count) if err != nil { return err } diff --git a/cmd/tree-walk.go b/cmd/tree-walk.go index ebdffdde9..5e3c003de 100644 --- a/cmd/tree-walk.go +++ b/cmd/tree-walk.go @@ -98,6 +98,9 @@ type listDirFunc func(bucket, prefixDir, prefixEntry string) (entries []string, // 4. XL backend multipart listing - isLeaf is true if the entry is a directory and contains uploads.json type isLeafFunc func(string, string) bool +// A function isLeafDir of type isLeafDirFunc is used to detect if an entry represents an empty directory. +type isLeafDirFunc func(string, string) bool + func filterListEntries(bucket, prefixDir string, entries []string, prefixEntry string, isLeaf isLeafFunc) ([]string, bool) { // Listing needs to be sorted. sort.Strings(entries) @@ -125,7 +128,7 @@ func filterListEntries(bucket, prefixDir string, entries []string, prefixEntry s } // treeWalk walks directory tree recursively pushing treeWalkResult into the channel as and when it encounters files. -func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, listDir listDirFunc, isLeaf isLeafFunc, resultCh chan treeWalkResult, endWalkCh chan struct{}, isEnd bool) error { +func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, listDir listDirFunc, isLeaf isLeafFunc, isLeafDir isLeafDirFunc, resultCh chan treeWalkResult, endWalkCh chan struct{}, isEnd bool) error { // Example: // if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively // called with prefixDir="one/two/three/four/" and marker="five.txt" @@ -167,17 +170,30 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker return nil } for i, entry := range entries { + var leaf, leafDir bool + // Decision to do isLeaf check was pushed from listDir() to here. - if delayIsLeaf && isLeaf(bucket, pathJoin(prefixDir, entry)) { - entry = strings.TrimSuffix(entry, slashSeparator) + if delayIsLeaf { + leaf = isLeaf(bucket, pathJoin(prefixDir, entry)) + if leaf { + entry = strings.TrimSuffix(entry, slashSeparator) + } + } else { + leaf = !strings.HasSuffix(entry, slashSeparator) } + if strings.HasSuffix(entry, slashSeparator) { + leafDir = isLeafDir(bucket, pathJoin(prefixDir, entry)) + } + + isDir := !leafDir && !leaf + if i == 0 && markerDir == entry { if !recursive { // Skip as the marker would already be listed in the previous listing. continue } - if recursive && !hasSuffix(entry, slashSeparator) { + if recursive && !isDir { // We should not skip for recursive listing and if markerDir is a directory // for ex. if marker is "four/five.txt" markerDir will be "four/" which // should not be skipped, instead it will need to be treeWalk()'ed into. @@ -186,7 +202,7 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker continue } } - if recursive && hasSuffix(entry, slashSeparator) { + if recursive && isDir { // If the entry is a directory, we will need recurse into it. markerArg := "" if entry == markerDir { @@ -198,7 +214,7 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker // markIsEnd is passed to this entry's treeWalk() so that treeWalker.end can be marked // true at the end of the treeWalk stream. markIsEnd := i == len(entries)-1 && isEnd - if tErr := doTreeWalk(ctx, bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, listDir, isLeaf, resultCh, endWalkCh, markIsEnd); tErr != nil { + if tErr := doTreeWalk(ctx, bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, listDir, isLeaf, isLeafDir, resultCh, endWalkCh, markIsEnd); tErr != nil { return tErr } continue @@ -218,7 +234,7 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker } // Initiate a new treeWalk in a goroutine. -func startTreeWalk(ctx context.Context, bucket, prefix, marker string, recursive bool, listDir listDirFunc, isLeaf isLeafFunc, endWalkCh chan struct{}) chan treeWalkResult { +func startTreeWalk(ctx context.Context, bucket, prefix, marker string, recursive bool, listDir listDirFunc, isLeaf isLeafFunc, isLeafDir isLeafDirFunc, endWalkCh chan struct{}) chan treeWalkResult { // Example 1 // If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt" // treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt" @@ -240,7 +256,7 @@ func startTreeWalk(ctx context.Context, bucket, prefix, marker string, recursive marker = strings.TrimPrefix(marker, prefixDir) go func() { isEnd := true // Indication to start walking the tree with end as true. - doTreeWalk(ctx, bucket, prefixDir, entryPrefixMatch, marker, recursive, listDir, isLeaf, resultCh, endWalkCh, isEnd) + doTreeWalk(ctx, bucket, prefixDir, entryPrefixMatch, marker, recursive, listDir, isLeaf, isLeafDir, resultCh, endWalkCh, isEnd) close(resultCh) }() return resultCh diff --git a/cmd/tree-walk_test.go b/cmd/tree-walk_test.go index e1626ca2b..8340e853e 100644 --- a/cmd/tree-walk_test.go +++ b/cmd/tree-walk_test.go @@ -128,11 +128,11 @@ func createNamespace(disk StorageAPI, volume string, files []string) error { // Test if tree walker returns entries matching prefix alone are received // when a non empty prefix is supplied. -func testTreeWalkPrefix(t *testing.T, listDir listDirFunc, isLeaf isLeafFunc) { +func testTreeWalkPrefix(t *testing.T, listDir listDirFunc, isLeaf isLeafFunc, isLeafDir isLeafDirFunc) { // Start the tree walk go-routine. prefix := "d/" endWalkCh := make(chan struct{}) - twResultCh := startTreeWalk(context.Background(), volume, prefix, "", true, listDir, isLeaf, endWalkCh) + twResultCh := startTreeWalk(context.Background(), volume, prefix, "", true, listDir, isLeaf, isLeafDir, endWalkCh) // Check if all entries received on the channel match the prefix. for res := range twResultCh { @@ -143,11 +143,11 @@ func testTreeWalkPrefix(t *testing.T, listDir listDirFunc, isLeaf isLeafFunc) { } // Test if entries received on tree walk's channel appear after the supplied marker. -func testTreeWalkMarker(t *testing.T, listDir listDirFunc, isLeaf isLeafFunc) { +func testTreeWalkMarker(t *testing.T, listDir listDirFunc, isLeaf isLeafFunc, isLeafDir isLeafDirFunc) { // Start the tree walk go-routine. prefix := "" endWalkCh := make(chan struct{}) - twResultCh := startTreeWalk(context.Background(), volume, prefix, "d/g", true, listDir, isLeaf, endWalkCh) + twResultCh := startTreeWalk(context.Background(), volume, prefix, "d/g", true, listDir, isLeaf, isLeafDir, endWalkCh) // Check if only 3 entries, namely d/g/h, i/j/k, lmn are received on the channel. expectedCount := 3 @@ -187,11 +187,20 @@ func TestTreeWalk(t *testing.T) { isLeaf := func(volume, prefix string) bool { return !hasSuffix(prefix, slashSeparator) } + + isLeafDir := func(volume, prefix string) bool { + entries, listErr := disk.ListDir(volume, prefix, 1) + if listErr != nil { + return false + } + return len(entries) == 0 + } + listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk) // Simple test for prefix based walk. - testTreeWalkPrefix(t, listDir, isLeaf) + testTreeWalkPrefix(t, listDir, isLeaf, isLeafDir) // Simple test when marker is set. - testTreeWalkMarker(t, listDir, isLeaf) + testTreeWalkMarker(t, listDir, isLeaf, isLeafDir) err = os.RemoveAll(fsDir) if err != nil { t.Fatal(err) @@ -222,6 +231,15 @@ func TestTreeWalkTimeout(t *testing.T) { isLeaf := func(volume, prefix string) bool { return !hasSuffix(prefix, slashSeparator) } + + isLeafDir := func(volume, prefix string) bool { + entries, listErr := disk.ListDir(volume, prefix, 1) + if listErr != nil { + return false + } + return len(entries) == 0 + } + listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk) // TreeWalk pool with 2 seconds timeout for tree-walk go routines. @@ -231,7 +249,7 @@ func TestTreeWalkTimeout(t *testing.T) { prefix := "" marker := "" recursive := true - resultCh := startTreeWalk(context.Background(), volume, prefix, marker, recursive, listDir, isLeaf, endWalkCh) + resultCh := startTreeWalk(context.Background(), volume, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh) params := listParams{ bucket: volume, @@ -373,6 +391,14 @@ func TestRecursiveTreeWalk(t *testing.T) { return !hasSuffix(prefix, slashSeparator) } + isLeafDir := func(volume, prefix string) bool { + entries, listErr := disk1.ListDir(volume, prefix, 1) + if listErr != nil { + return false + } + return len(entries) == 0 + } + // Create listDir function. listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk1) @@ -450,7 +476,7 @@ func TestRecursiveTreeWalk(t *testing.T) { for i, testCase := range testCases { for entry := range startTreeWalk(context.Background(), volume, testCase.prefix, testCase.marker, testCase.recursive, - listDir, isLeaf, endWalkCh) { + listDir, isLeaf, isLeafDir, endWalkCh) { if _, found := testCase.expected[entry.entry]; !found { t.Errorf("Test %d: Expected %s, but couldn't find", i+1, entry.entry) } @@ -479,6 +505,15 @@ func TestSortedness(t *testing.T) { isLeaf := func(volume, prefix string) bool { return !hasSuffix(prefix, slashSeparator) } + + isLeafDir := func(volume, prefix string) bool { + entries, listErr := disk1.ListDir(volume, prefix, 1) + if listErr != nil { + return false + } + return len(entries) == 0 + } + // Create listDir function. listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk1) @@ -522,7 +557,7 @@ func TestSortedness(t *testing.T) { var actualEntries []string for entry := range startTreeWalk(context.Background(), volume, test.prefix, test.marker, test.recursive, - listDir, isLeaf, endWalkCh) { + listDir, isLeaf, isLeafDir, endWalkCh) { actualEntries = append(actualEntries, entry.entry) } if !sort.IsSorted(sort.StringSlice(actualEntries)) { @@ -553,6 +588,15 @@ func TestTreeWalkIsEnd(t *testing.T) { isLeaf := func(volume, prefix string) bool { return !hasSuffix(prefix, slashSeparator) } + + isLeafDir := func(volume, prefix string) bool { + entries, listErr := disk1.ListDir(volume, prefix, 1) + if listErr != nil { + return false + } + return len(entries) == 0 + } + // Create listDir function. listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk1) @@ -595,7 +639,7 @@ func TestTreeWalkIsEnd(t *testing.T) { } for i, test := range testCases { var entry treeWalkResult - for entry = range startTreeWalk(context.Background(), volume, test.prefix, test.marker, test.recursive, listDir, isLeaf, endWalkCh) { + for entry = range startTreeWalk(context.Background(), volume, test.prefix, test.marker, test.recursive, listDir, isLeaf, isLeafDir, endWalkCh) { } if entry.entry != test.expectedEntry { t.Errorf("Test %d: Expected entry %s, but received %s with the EOF marker", i, test.expectedEntry, entry.entry) diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index ebc0e4d42..44e9f4af6 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -623,7 +623,7 @@ func (s *xlSets) CopyObject(ctx context.Context, srcBucket, srcObject, destBucke // Returns function "listDir" of the type listDirFunc. // isLeaf - is used by listDir function to check if an entry is a leaf or non-leaf entry. // disks - used for doing disk.ListDir(). Sets passes set of disks. -func listDirSetsFactory(ctx context.Context, isLeaf isLeafFunc, treeWalkIgnoredErrs []error, sets ...[]StorageAPI) listDirFunc { +func listDirSetsFactory(ctx context.Context, isLeaf isLeafFunc, isLeafDir isLeafDirFunc, treeWalkIgnoredErrs []error, sets ...[]StorageAPI) listDirFunc { listDirInternal := func(bucket, prefixDir, prefixEntry string, disks []StorageAPI) (mergedEntries []string, err error) { for _, disk := range disks { if disk == nil { @@ -632,7 +632,7 @@ func listDirSetsFactory(ctx context.Context, isLeaf isLeafFunc, treeWalkIgnoredE var entries []string var newEntries []string - entries, err = disk.ListDir(bucket, prefixDir) + entries, err = disk.ListDir(bucket, prefixDir, -1) if err != nil { // For any reason disk was deleted or goes offline, continue // and list from other disks if possible. @@ -723,13 +723,17 @@ func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimi return s.getHashedSet(entry).isObject(bucket, entry) } + isLeafDir := func(bucket, entry string) bool { + return s.getHashedSet(entry).isObjectDir(bucket, entry) + } + var setDisks = make([][]StorageAPI, len(s.sets)) for _, set := range s.sets { setDisks = append(setDisks, set.getLoadBalancedDisks()) } - listDir := listDirSetsFactory(ctx, isLeaf, xlTreeWalkIgnoredErrs, setDisks...) - walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, endWalkCh) + listDir := listDirSetsFactory(ctx, isLeaf, isLeafDir, xlTreeWalkIgnoredErrs, setDisks...) + walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh) } for i := 0; i < maxKeys; { @@ -781,7 +785,7 @@ func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimi result = ListObjectsInfo{IsTruncated: !eof} for _, objInfo := range objInfos { result.NextMarker = objInfo.Name - if objInfo.IsDir { + if objInfo.IsDir && delimiter == slashSeparator { result.Prefixes = append(result.Prefixes, objInfo.Name) continue } @@ -1270,7 +1274,7 @@ func listDirSetsHealFactory(isLeaf isLeafFunc, sets ...[]StorageAPI) listDirFunc } var entries []string var newEntries []string - entries, err = disk.ListDir(bucket, prefixDir) + entries, err = disk.ListDir(bucket, prefixDir, -1) if err != nil { continue } @@ -1362,7 +1366,7 @@ func (s *xlSets) listObjectsHeal(ctx context.Context, bucket, prefix, marker, de } listDir := listDirSetsHealFactory(isLeaf, setDisks...) - walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, nil, endWalkCh) + walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, nil, nil, endWalkCh) } var objInfos []ObjectInfo diff --git a/cmd/xl-v1-common.go b/cmd/xl-v1-common.go index 6de5ec96c..c7bfb818e 100644 --- a/cmd/xl-v1-common.go +++ b/cmd/xl-v1-common.go @@ -76,6 +76,32 @@ func (xl xlObjects) isObject(bucket, prefix string) (ok bool) { return false } +// isObjectDir returns if the specified path represents an empty directory. +func (xl xlObjects) isObjectDir(bucket, prefix string) (ok bool) { + for _, disk := range xl.getLoadBalancedDisks() { + if disk == nil { + continue + } + // Check if 'prefix' is an object on this 'disk', else continue the check the next disk + ctnts, err := disk.ListDir(bucket, prefix, 1) + if err == nil { + if len(ctnts) == 0 { + return true + } + return false + } + // Ignore for file not found, disk not found or faulty disk. + if IsErrIgnored(err, xlTreeWalkIgnoredErrs...) { + continue + } + reqInfo := &logger.ReqInfo{BucketName: bucket} + reqInfo.AppendTags("prefix", prefix) + ctx := logger.SetReqInfo(context.Background(), reqInfo) + logger.LogIf(ctx, err) + } // Exhausted all disks - return false. + return false +} + // Calculate the space occupied by an object in a single disk func (xl xlObjects) sizeOnDisk(fileSize int64, blockSize int64, dataBlocks int) int64 { numBlocks := fileSize / blockSize diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index 68001b0e8..286979412 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -422,7 +422,7 @@ func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, o } // List and delete the object directory, - files, derr := disk.ListDir(bucket, object) + files, derr := disk.ListDir(bucket, object, -1) if derr == nil { for _, entry := range files { _ = disk.DeleteFile(bucket, diff --git a/cmd/xl-v1-list-objects.go b/cmd/xl-v1-list-objects.go index bb3d876ec..696093ccb 100644 --- a/cmd/xl-v1-list-objects.go +++ b/cmd/xl-v1-list-objects.go @@ -33,7 +33,7 @@ func listDirFactory(ctx context.Context, isLeaf isLeafFunc, treeWalkIgnoredErrs } var entries []string var newEntries []string - entries, err = disk.ListDir(bucket, prefixDir) + entries, err = disk.ListDir(bucket, prefixDir, -1) if err != nil { // For any reason disk was deleted or goes offline, continue // and list from other disks if possible. @@ -78,8 +78,9 @@ func (xl xlObjects) listObjects(ctx context.Context, bucket, prefix, marker, del if walkResultCh == nil { endWalkCh = make(chan struct{}) isLeaf := xl.isObject + isLeafDir := xl.isObjectDir listDir := listDirFactory(ctx, isLeaf, xlTreeWalkIgnoredErrs, xl.getLoadBalancedDisks()...) - walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, endWalkCh) + walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh) } var objInfos []ObjectInfo @@ -136,7 +137,7 @@ func (xl xlObjects) listObjects(ctx context.Context, bucket, prefix, marker, del result := ListObjectsInfo{IsTruncated: !eof} for _, objInfo := range objInfos { result.NextMarker = objInfo.Name - if objInfo.IsDir { + if objInfo.IsDir && delimiter == slashSeparator { result.Prefixes = append(result.Prefixes, objInfo.Name) continue } diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 0cff64284..182d24edf 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -156,7 +156,7 @@ func (xl xlObjects) ListMultipartUploads(ctx context.Context, bucket, object, ke if disk == nil { continue } - uploadIDs, err := disk.ListDir(minioMetaMultipartBucket, xl.getMultipartSHADir(bucket, object)) + uploadIDs, err := disk.ListDir(minioMetaMultipartBucket, xl.getMultipartSHADir(bucket, object), -1) if err != nil { if err == errFileNotFound { return result, nil @@ -862,12 +862,12 @@ func (xl xlObjects) cleanupStaleMultipartUploads(ctx context.Context, cleanupInt // Remove the old multipart uploads on the given disk. func (xl xlObjects) cleanupStaleMultipartUploadsOnDisk(ctx context.Context, disk StorageAPI, expiry time.Duration) { now := time.Now() - shaDirs, err := disk.ListDir(minioMetaMultipartBucket, "") + shaDirs, err := disk.ListDir(minioMetaMultipartBucket, "", -1) if err != nil { return } for _, shaDir := range shaDirs { - uploadIDDirs, err := disk.ListDir(minioMetaMultipartBucket, shaDir) + uploadIDDirs, err := disk.ListDir(minioMetaMultipartBucket, shaDir, -1) if err != nil { continue } diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index ac0a37fd7..a3f7e9ff1 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -359,6 +359,9 @@ func (xl xlObjects) GetObjectInfo(ctx context.Context, bucket, object string) (o } if hasSuffix(object, slashSeparator) { + if !xl.isObjectDir(bucket, object) { + return oi, toObjectErr(errFileNotFound, bucket, object) + } if oi, e = xl.getObjectInfoDir(ctx, bucket, object); e != nil { return oi, toObjectErr(e, bucket, object) }