Better support of empty directories (#5890)

Better support of HEAD and listing of zero sized objects with trailing
slash (a.k.a empty directory). For that, isLeafDir function is added
to indicate if the specified object is an empty directory or not. Each
backend (xl, fs) has the responsibility to store that information.
Currently, in both of XL & FS, an empty directory is represented by
an empty directory in the backend.

isLeafDir() checks if the given path is an empty directory or not,
since dir listing is costly if the latter contains too many objects,
readDirN() is added in this PR to list only N number of entries.
In isLeadDir(), we will only list one entry to check if a directory
is empty or not.
master
Anis Elleuch 7 years ago committed by Harshavardhana
parent 32700fca52
commit 6d5f2a4391
  1. 12
      cmd/disk-cache.go
  2. 26
      cmd/fs-v1.go
  3. 4
      cmd/naughty-disk_test.go
  4. 2
      cmd/object-api-common.go
  5. 19
      cmd/posix-list-dir-nix.go
  6. 35
      cmd/posix-list-dir-others.go
  7. 38
      cmd/posix-list-dir_test.go
  8. 9
      cmd/posix.go
  9. 2
      cmd/posix_test.go
  10. 2
      cmd/storage-interface.go
  11. 7
      cmd/storage-rpc-client.go
  12. 4
      cmd/storage-rpc-client_test.go
  13. 3
      cmd/storage-rpc-server-datatypes.go
  14. 2
      cmd/storage-rpc-server.go
  15. 32
      cmd/tree-walk.go
  16. 64
      cmd/tree-walk_test.go
  17. 18
      cmd/xl-sets.go
  18. 26
      cmd/xl-v1-common.go
  19. 2
      cmd/xl-v1-healing.go
  20. 7
      cmd/xl-v1-list-objects.go
  21. 6
      cmd/xl-v1-multipart.go
  22. 3
      cmd/xl-v1-object.go

@ -357,8 +357,16 @@ func (c cacheObjects) listCacheObjects(ctx context.Context, bucket, prefix, mark
return err == nil
}
isLeafDir := func(bucket, object string) bool {
fs, err := c.cache.getCacheFS(ctx, bucket, object)
if err != nil {
return false
}
return fs.isObjectDir(bucket, object)
}
listDir := listDirCacheFactory(isLeaf, cacheTreeWalkIgnoredErrs, c.cache.cfs)
walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, endWalkCh)
walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh)
}
for i := 0; i < maxKeys; {
@ -417,7 +425,7 @@ func (c cacheObjects) listCacheObjects(ctx context.Context, bucket, prefix, mark
result = ListObjectsInfo{IsTruncated: !eof}
for _, objInfo := range objInfos {
result.NextMarker = objInfo.Name
if objInfo.IsDir {
if objInfo.IsDir && delimiter == slashSeparator {
result.Prefixes = append(result.Prefixes, objInfo.Name)
continue
}

@ -627,6 +627,10 @@ func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object s
return oi, toObjectErr(err, bucket)
}
if strings.HasSuffix(object, slashSeparator) && !fs.isObjectDir(bucket, object) {
return oi, errFileNotFound
}
return fs.getObjectInfo(ctx, bucket, object)
}
@ -890,6 +894,17 @@ func (fs *FSObjects) listDirFactory(isLeaf isLeafFunc) listDirFunc {
return listDir
}
// isObjectDir returns true if the specified bucket & prefix exists
// and the prefix represents an empty directory. An S3 empty directory
// is also an empty directory in the FS backend.
func (fs *FSObjects) isObjectDir(bucket, prefix string) bool {
entries, err := readDirN(pathJoin(fs.fsPath, bucket, prefix), 1)
if err != nil {
return false
}
return len(entries) == 0
}
// getObjectETag is a helper function, which returns only the md5sum
// of the file on the disk.
func (fs *FSObjects) getObjectETag(ctx context.Context, bucket, entry string, lock bool) (string, error) {
@ -1019,8 +1034,15 @@ func (fs *FSObjects) ListObjects(ctx context.Context, bucket, prefix, marker, de
// object string does not end with "/".
return !hasSuffix(object, slashSeparator)
}
// Return true if the specified object is an empty directory
isLeafDir := func(bucket, object string) bool {
if !hasSuffix(object, slashSeparator) {
return false
}
return fs.isObjectDir(bucket, object)
}
listDir := fs.listDirFactory(isLeaf)
walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, endWalkCh)
walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh)
}
var objInfos []ObjectInfo
@ -1065,7 +1087,7 @@ func (fs *FSObjects) ListObjects(ctx context.Context, bucket, prefix, marker, de
result := ListObjectsInfo{IsTruncated: !eof}
for _, objInfo := range objInfos {
result.NextMarker = objInfo.Name
if objInfo.IsDir {
if objInfo.IsDir && delimiter == slashSeparator {
result.Prefixes = append(result.Prefixes, objInfo.Name)
continue
}

@ -108,11 +108,11 @@ func (d *naughtyDisk) DeleteVol(volume string) (err error) {
return d.disk.DeleteVol(volume)
}
func (d *naughtyDisk) ListDir(volume, path string) (entries []string, err error) {
func (d *naughtyDisk) ListDir(volume, path string, count int) (entries []string, err error) {
if err := d.calcError(); err != nil {
return []string{}, err
}
return d.disk.ListDir(volume, path)
return d.disk.ListDir(volume, path, count)
}
func (d *naughtyDisk) ReadFile(volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error) {

@ -116,7 +116,7 @@ func cleanupDir(ctx context.Context, storage StorageAPI, volume, dirPath string)
}
// If it's a directory, list and call delFunc() for each entry.
entries, err := storage.ListDir(volume, entryPath)
entries, err := storage.ListDir(volume, entryPath, -1)
// If entryPath prefix never existed, safe to ignore.
if err == errFileNotFound {
return nil

@ -112,6 +112,12 @@ var readDirBufPool = sync.Pool{
// Return all the entries at the directory dirPath.
func readDir(dirPath string) (entries []string, err error) {
return readDirN(dirPath, -1)
}
// Return count entries at the directory dirPath and all entries
// if count is set to -1
func readDirN(dirPath string, count int) (entries []string, err error) {
bufp := readDirBufPool.Get().(*[]byte)
buf := *bufp
defer readDirBufPool.Put(bufp)
@ -135,7 +141,11 @@ func readDir(dirPath string) (entries []string, err error) {
defer d.Close()
fd := int(d.Fd())
for {
remaining := count
done := false
for !done {
nbuf, err := syscall.ReadDirent(fd, buf)
if err != nil {
return nil, err
@ -147,6 +157,13 @@ func readDir(dirPath string) (entries []string, err error) {
if tmpEntries, err = parseDirents(dirPath, buf[:nbuf]); err != nil {
return nil, err
}
if count > 0 {
if remaining <= len(tmpEntries) {
tmpEntries = tmpEntries[:remaining]
done = true
}
remaining -= len(tmpEntries)
}
entries = append(entries, tmpEntries...)
}
return

@ -30,7 +30,12 @@ import (
// Return all the entries at the directory dirPath.
func readDir(dirPath string) (entries []string, err error) {
d, err := os.Open((dirPath))
return readDirN(dirPath, -1)
}
// Return N entries at the directory dirPath. If count is -1, return all entries
func readDirN(dirPath string, count int) (entries []string, err error) {
d, err := os.Open(dirPath)
if err != nil {
// File is really not found.
if os.IsNotExist(err) {
@ -45,20 +50,34 @@ func readDir(dirPath string) (entries []string, err error) {
}
defer d.Close()
for {
// Read 1000 entries.
fis, err := d.Readdir(1000)
maxEntries := 1000
if count > 0 && count < maxEntries {
maxEntries = count
}
done := false
remaining := count
for !done {
// Read up to max number of entries.
fis, err := d.Readdir(maxEntries)
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
if count > 0 {
if remaining <= len(fis) {
fis = fis[:remaining]
done = true
}
}
for _, fi := range fis {
// Stat symbolic link and follow to get the final value.
if fi.Mode()&os.ModeSymlink == os.ModeSymlink {
var st os.FileInfo
st, err = os.Stat((path.Join(dirPath, fi.Name())))
st, err = os.Stat(path.Join(dirPath, fi.Name()))
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("path", path.Join(dirPath, fi.Name()))
ctx := logger.SetReqInfo(context.Background(), reqInfo)
@ -71,6 +90,9 @@ func readDir(dirPath string) (entries []string, err error) {
} else if st.Mode().IsRegular() {
entries = append(entries, fi.Name())
}
if count > 0 {
remaining--
}
continue
}
if fi.Mode().IsDir() {
@ -79,6 +101,9 @@ func readDir(dirPath string) (entries []string, err error) {
} else if fi.Mode().IsRegular() {
entries = append(entries, fi.Name())
}
if count > 0 {
remaining--
}
}
}
return entries, nil

@ -201,3 +201,41 @@ func TestReadDir(t *testing.T) {
}
}
}
func TestReadDirN(t *testing.T) {
testCases := []struct {
numFiles int
n int
expectedNum int
}{
{0, 0, 0},
{0, 1, 0},
{1, 0, 1},
{0, -1, 0},
{1, -1, 1},
{10, -1, 10},
{1, 1, 1},
{2, 1, 1},
{10, 9, 9},
{10, 10, 10},
{10, 11, 10},
}
for i, testCase := range testCases {
dir := mustSetupDir(t)
defer os.RemoveAll(dir)
for c := 1; c <= testCase.numFiles; c++ {
if err := ioutil.WriteFile(filepath.Join(dir, fmt.Sprintf("%d", c)), []byte{}, os.ModePerm); err != nil {
t.Fatalf("Unable to create a file, %s", err)
}
}
entries, err := readDirN(dir, testCase.n)
if err != nil {
t.Fatalf("Unable to read entries, %s", err)
}
if len(entries) != testCase.expectedNum {
t.Fatalf("Test %d: unexpected number of entries, waiting for %d, but found %d", i+1, testCase.expectedNum, len(entries))
}
}
}

@ -467,7 +467,7 @@ func (s *posix) DeleteVol(volume string) (err error) {
// ListDir - return all the entries at the given directory path.
// If an entry is a directory it will be returned with a trailing "/".
func (s *posix) ListDir(volume, dirPath string) (entries []string, err error) {
func (s *posix) ListDir(volume, dirPath string, count int) (entries []string, err error) {
defer func() {
if err == syscall.EIO {
atomic.AddInt32(&s.ioErrCount, 1)
@ -495,7 +495,12 @@ func (s *posix) ListDir(volume, dirPath string) (entries []string, err error) {
}
return nil, err
}
return readDir(pathJoin(volumeDir, dirPath))
dirPath = pathJoin(volumeDir, dirPath)
if count > 0 {
return readDirN(dirPath, count)
}
return readDir(dirPath)
}
// ReadAll reads from r until an error or EOF and returns the data it read.

@ -785,7 +785,7 @@ func TestPosixPosixListDir(t *testing.T) {
} else {
t.Errorf("Expected the StorageAPI to be of type *posix")
}
dirList, err = posixStorage.ListDir(testCase.srcVol, testCase.srcPath)
dirList, err = posixStorage.ListDir(testCase.srcVol, testCase.srcPath, -1)
if err != testCase.expectedErr {
t.Fatalf("TestPosix case %d: Expected: \"%s\", got: \"%s\"", i+1, testCase.expectedErr, err)
}

@ -39,7 +39,7 @@ type StorageAPI interface {
DeleteVol(volume string) (err error)
// File operations.
ListDir(volume, dirPath string) ([]string, error)
ListDir(volume, dirPath string, count int) ([]string, error)
ReadFile(volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error)
PrepareFile(volume string, path string, len int64) (err error)
AppendFile(volume string, path string, buf []byte) (err error)

@ -283,10 +283,11 @@ func (n *networkStorage) ReadFile(volume string, path string, offset int64, buff
}
// ListDir - list all entries at prefix.
func (n *networkStorage) ListDir(volume, path string) (entries []string, err error) {
func (n *networkStorage) ListDir(volume, path string, count int) (entries []string, err error) {
if err = n.call("Storage.ListDirHandler", &ListDirArgs{
Vol: volume,
Path: path,
Vol: volume,
Path: path,
Count: count,
}, &entries); err != nil {
return nil, err
}

@ -435,7 +435,7 @@ func (s *TestRPCStorageSuite) testRPCStorageListDir(t *testing.T) {
t.Error("Unable to initiate MakeVol", err)
}
}
dirs, err := storageDisk.ListDir("myvol", "")
dirs, err := storageDisk.ListDir("myvol", "", -1)
if err != nil {
t.Error(err)
}
@ -448,7 +448,7 @@ func (s *TestRPCStorageSuite) testRPCStorageListDir(t *testing.T) {
t.Error("Unable to initiate DeleteVol", err)
}
}
dirs, err = storageDisk.ListDir("myvol", "")
dirs, err = storageDisk.ListDir("myvol", "", -1)
if err != nil {
t.Error(err)
}

@ -134,6 +134,9 @@ type ListDirArgs struct {
// Name of the path.
Path string
// Number of wanted results
Count int
}
// RenameFileArgs represents rename file RPC arguments.

@ -120,7 +120,7 @@ func (s *storageServer) ListDirHandler(args *ListDirArgs, reply *[]string) error
return err
}
entries, err := s.storage.ListDir(args.Vol, args.Path)
entries, err := s.storage.ListDir(args.Vol, args.Path, args.Count)
if err != nil {
return err
}

@ -98,6 +98,9 @@ type listDirFunc func(bucket, prefixDir, prefixEntry string) (entries []string,
// 4. XL backend multipart listing - isLeaf is true if the entry is a directory and contains uploads.json
type isLeafFunc func(string, string) bool
// A function isLeafDir of type isLeafDirFunc is used to detect if an entry represents an empty directory.
type isLeafDirFunc func(string, string) bool
func filterListEntries(bucket, prefixDir string, entries []string, prefixEntry string, isLeaf isLeafFunc) ([]string, bool) {
// Listing needs to be sorted.
sort.Strings(entries)
@ -125,7 +128,7 @@ func filterListEntries(bucket, prefixDir string, entries []string, prefixEntry s
}
// treeWalk walks directory tree recursively pushing treeWalkResult into the channel as and when it encounters files.
func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, listDir listDirFunc, isLeaf isLeafFunc, resultCh chan treeWalkResult, endWalkCh chan struct{}, isEnd bool) error {
func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, listDir listDirFunc, isLeaf isLeafFunc, isLeafDir isLeafDirFunc, resultCh chan treeWalkResult, endWalkCh chan struct{}, isEnd bool) error {
// Example:
// if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively
// called with prefixDir="one/two/three/four/" and marker="five.txt"
@ -167,17 +170,30 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker
return nil
}
for i, entry := range entries {
var leaf, leafDir bool
// Decision to do isLeaf check was pushed from listDir() to here.
if delayIsLeaf && isLeaf(bucket, pathJoin(prefixDir, entry)) {
entry = strings.TrimSuffix(entry, slashSeparator)
if delayIsLeaf {
leaf = isLeaf(bucket, pathJoin(prefixDir, entry))
if leaf {
entry = strings.TrimSuffix(entry, slashSeparator)
}
} else {
leaf = !strings.HasSuffix(entry, slashSeparator)
}
if strings.HasSuffix(entry, slashSeparator) {
leafDir = isLeafDir(bucket, pathJoin(prefixDir, entry))
}
isDir := !leafDir && !leaf
if i == 0 && markerDir == entry {
if !recursive {
// Skip as the marker would already be listed in the previous listing.
continue
}
if recursive && !hasSuffix(entry, slashSeparator) {
if recursive && !isDir {
// We should not skip for recursive listing and if markerDir is a directory
// for ex. if marker is "four/five.txt" markerDir will be "four/" which
// should not be skipped, instead it will need to be treeWalk()'ed into.
@ -186,7 +202,7 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker
continue
}
}
if recursive && hasSuffix(entry, slashSeparator) {
if recursive && isDir {
// If the entry is a directory, we will need recurse into it.
markerArg := ""
if entry == markerDir {
@ -198,7 +214,7 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker
// markIsEnd is passed to this entry's treeWalk() so that treeWalker.end can be marked
// true at the end of the treeWalk stream.
markIsEnd := i == len(entries)-1 && isEnd
if tErr := doTreeWalk(ctx, bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, listDir, isLeaf, resultCh, endWalkCh, markIsEnd); tErr != nil {
if tErr := doTreeWalk(ctx, bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, listDir, isLeaf, isLeafDir, resultCh, endWalkCh, markIsEnd); tErr != nil {
return tErr
}
continue
@ -218,7 +234,7 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker
}
// Initiate a new treeWalk in a goroutine.
func startTreeWalk(ctx context.Context, bucket, prefix, marker string, recursive bool, listDir listDirFunc, isLeaf isLeafFunc, endWalkCh chan struct{}) chan treeWalkResult {
func startTreeWalk(ctx context.Context, bucket, prefix, marker string, recursive bool, listDir listDirFunc, isLeaf isLeafFunc, isLeafDir isLeafDirFunc, endWalkCh chan struct{}) chan treeWalkResult {
// Example 1
// If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt"
// treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt"
@ -240,7 +256,7 @@ func startTreeWalk(ctx context.Context, bucket, prefix, marker string, recursive
marker = strings.TrimPrefix(marker, prefixDir)
go func() {
isEnd := true // Indication to start walking the tree with end as true.
doTreeWalk(ctx, bucket, prefixDir, entryPrefixMatch, marker, recursive, listDir, isLeaf, resultCh, endWalkCh, isEnd)
doTreeWalk(ctx, bucket, prefixDir, entryPrefixMatch, marker, recursive, listDir, isLeaf, isLeafDir, resultCh, endWalkCh, isEnd)
close(resultCh)
}()
return resultCh

@ -128,11 +128,11 @@ func createNamespace(disk StorageAPI, volume string, files []string) error {
// Test if tree walker returns entries matching prefix alone are received
// when a non empty prefix is supplied.
func testTreeWalkPrefix(t *testing.T, listDir listDirFunc, isLeaf isLeafFunc) {
func testTreeWalkPrefix(t *testing.T, listDir listDirFunc, isLeaf isLeafFunc, isLeafDir isLeafDirFunc) {
// Start the tree walk go-routine.
prefix := "d/"
endWalkCh := make(chan struct{})
twResultCh := startTreeWalk(context.Background(), volume, prefix, "", true, listDir, isLeaf, endWalkCh)
twResultCh := startTreeWalk(context.Background(), volume, prefix, "", true, listDir, isLeaf, isLeafDir, endWalkCh)
// Check if all entries received on the channel match the prefix.
for res := range twResultCh {
@ -143,11 +143,11 @@ func testTreeWalkPrefix(t *testing.T, listDir listDirFunc, isLeaf isLeafFunc) {
}
// Test if entries received on tree walk's channel appear after the supplied marker.
func testTreeWalkMarker(t *testing.T, listDir listDirFunc, isLeaf isLeafFunc) {
func testTreeWalkMarker(t *testing.T, listDir listDirFunc, isLeaf isLeafFunc, isLeafDir isLeafDirFunc) {
// Start the tree walk go-routine.
prefix := ""
endWalkCh := make(chan struct{})
twResultCh := startTreeWalk(context.Background(), volume, prefix, "d/g", true, listDir, isLeaf, endWalkCh)
twResultCh := startTreeWalk(context.Background(), volume, prefix, "d/g", true, listDir, isLeaf, isLeafDir, endWalkCh)
// Check if only 3 entries, namely d/g/h, i/j/k, lmn are received on the channel.
expectedCount := 3
@ -187,11 +187,20 @@ func TestTreeWalk(t *testing.T) {
isLeaf := func(volume, prefix string) bool {
return !hasSuffix(prefix, slashSeparator)
}
isLeafDir := func(volume, prefix string) bool {
entries, listErr := disk.ListDir(volume, prefix, 1)
if listErr != nil {
return false
}
return len(entries) == 0
}
listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk)
// Simple test for prefix based walk.
testTreeWalkPrefix(t, listDir, isLeaf)
testTreeWalkPrefix(t, listDir, isLeaf, isLeafDir)
// Simple test when marker is set.
testTreeWalkMarker(t, listDir, isLeaf)
testTreeWalkMarker(t, listDir, isLeaf, isLeafDir)
err = os.RemoveAll(fsDir)
if err != nil {
t.Fatal(err)
@ -222,6 +231,15 @@ func TestTreeWalkTimeout(t *testing.T) {
isLeaf := func(volume, prefix string) bool {
return !hasSuffix(prefix, slashSeparator)
}
isLeafDir := func(volume, prefix string) bool {
entries, listErr := disk.ListDir(volume, prefix, 1)
if listErr != nil {
return false
}
return len(entries) == 0
}
listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk)
// TreeWalk pool with 2 seconds timeout for tree-walk go routines.
@ -231,7 +249,7 @@ func TestTreeWalkTimeout(t *testing.T) {
prefix := ""
marker := ""
recursive := true
resultCh := startTreeWalk(context.Background(), volume, prefix, marker, recursive, listDir, isLeaf, endWalkCh)
resultCh := startTreeWalk(context.Background(), volume, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh)
params := listParams{
bucket: volume,
@ -373,6 +391,14 @@ func TestRecursiveTreeWalk(t *testing.T) {
return !hasSuffix(prefix, slashSeparator)
}
isLeafDir := func(volume, prefix string) bool {
entries, listErr := disk1.ListDir(volume, prefix, 1)
if listErr != nil {
return false
}
return len(entries) == 0
}
// Create listDir function.
listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk1)
@ -450,7 +476,7 @@ func TestRecursiveTreeWalk(t *testing.T) {
for i, testCase := range testCases {
for entry := range startTreeWalk(context.Background(), volume,
testCase.prefix, testCase.marker, testCase.recursive,
listDir, isLeaf, endWalkCh) {
listDir, isLeaf, isLeafDir, endWalkCh) {
if _, found := testCase.expected[entry.entry]; !found {
t.Errorf("Test %d: Expected %s, but couldn't find", i+1, entry.entry)
}
@ -479,6 +505,15 @@ func TestSortedness(t *testing.T) {
isLeaf := func(volume, prefix string) bool {
return !hasSuffix(prefix, slashSeparator)
}
isLeafDir := func(volume, prefix string) bool {
entries, listErr := disk1.ListDir(volume, prefix, 1)
if listErr != nil {
return false
}
return len(entries) == 0
}
// Create listDir function.
listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk1)
@ -522,7 +557,7 @@ func TestSortedness(t *testing.T) {
var actualEntries []string
for entry := range startTreeWalk(context.Background(), volume,
test.prefix, test.marker, test.recursive,
listDir, isLeaf, endWalkCh) {
listDir, isLeaf, isLeafDir, endWalkCh) {
actualEntries = append(actualEntries, entry.entry)
}
if !sort.IsSorted(sort.StringSlice(actualEntries)) {
@ -553,6 +588,15 @@ func TestTreeWalkIsEnd(t *testing.T) {
isLeaf := func(volume, prefix string) bool {
return !hasSuffix(prefix, slashSeparator)
}
isLeafDir := func(volume, prefix string) bool {
entries, listErr := disk1.ListDir(volume, prefix, 1)
if listErr != nil {
return false
}
return len(entries) == 0
}
// Create listDir function.
listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk1)
@ -595,7 +639,7 @@ func TestTreeWalkIsEnd(t *testing.T) {
}
for i, test := range testCases {
var entry treeWalkResult
for entry = range startTreeWalk(context.Background(), volume, test.prefix, test.marker, test.recursive, listDir, isLeaf, endWalkCh) {
for entry = range startTreeWalk(context.Background(), volume, test.prefix, test.marker, test.recursive, listDir, isLeaf, isLeafDir, endWalkCh) {
}
if entry.entry != test.expectedEntry {
t.Errorf("Test %d: Expected entry %s, but received %s with the EOF marker", i, test.expectedEntry, entry.entry)

@ -623,7 +623,7 @@ func (s *xlSets) CopyObject(ctx context.Context, srcBucket, srcObject, destBucke
// Returns function "listDir" of the type listDirFunc.
// isLeaf - is used by listDir function to check if an entry is a leaf or non-leaf entry.
// disks - used for doing disk.ListDir(). Sets passes set of disks.
func listDirSetsFactory(ctx context.Context, isLeaf isLeafFunc, treeWalkIgnoredErrs []error, sets ...[]StorageAPI) listDirFunc {
func listDirSetsFactory(ctx context.Context, isLeaf isLeafFunc, isLeafDir isLeafDirFunc, treeWalkIgnoredErrs []error, sets ...[]StorageAPI) listDirFunc {
listDirInternal := func(bucket, prefixDir, prefixEntry string, disks []StorageAPI) (mergedEntries []string, err error) {
for _, disk := range disks {
if disk == nil {
@ -632,7 +632,7 @@ func listDirSetsFactory(ctx context.Context, isLeaf isLeafFunc, treeWalkIgnoredE
var entries []string
var newEntries []string
entries, err = disk.ListDir(bucket, prefixDir)
entries, err = disk.ListDir(bucket, prefixDir, -1)
if err != nil {
// For any reason disk was deleted or goes offline, continue
// and list from other disks if possible.
@ -723,13 +723,17 @@ func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimi
return s.getHashedSet(entry).isObject(bucket, entry)
}
isLeafDir := func(bucket, entry string) bool {
return s.getHashedSet(entry).isObjectDir(bucket, entry)
}
var setDisks = make([][]StorageAPI, len(s.sets))
for _, set := range s.sets {
setDisks = append(setDisks, set.getLoadBalancedDisks())
}
listDir := listDirSetsFactory(ctx, isLeaf, xlTreeWalkIgnoredErrs, setDisks...)
walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, endWalkCh)
listDir := listDirSetsFactory(ctx, isLeaf, isLeafDir, xlTreeWalkIgnoredErrs, setDisks...)
walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh)
}
for i := 0; i < maxKeys; {
@ -781,7 +785,7 @@ func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimi
result = ListObjectsInfo{IsTruncated: !eof}
for _, objInfo := range objInfos {
result.NextMarker = objInfo.Name
if objInfo.IsDir {
if objInfo.IsDir && delimiter == slashSeparator {
result.Prefixes = append(result.Prefixes, objInfo.Name)
continue
}
@ -1270,7 +1274,7 @@ func listDirSetsHealFactory(isLeaf isLeafFunc, sets ...[]StorageAPI) listDirFunc
}
var entries []string
var newEntries []string
entries, err = disk.ListDir(bucket, prefixDir)
entries, err = disk.ListDir(bucket, prefixDir, -1)
if err != nil {
continue
}
@ -1362,7 +1366,7 @@ func (s *xlSets) listObjectsHeal(ctx context.Context, bucket, prefix, marker, de
}
listDir := listDirSetsHealFactory(isLeaf, setDisks...)
walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, nil, endWalkCh)
walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, nil, nil, endWalkCh)
}
var objInfos []ObjectInfo

@ -76,6 +76,32 @@ func (xl xlObjects) isObject(bucket, prefix string) (ok bool) {
return false
}
// isObjectDir returns if the specified path represents an empty directory.
func (xl xlObjects) isObjectDir(bucket, prefix string) (ok bool) {
for _, disk := range xl.getLoadBalancedDisks() {
if disk == nil {
continue
}
// Check if 'prefix' is an object on this 'disk', else continue the check the next disk
ctnts, err := disk.ListDir(bucket, prefix, 1)
if err == nil {
if len(ctnts) == 0 {
return true
}
return false
}
// Ignore for file not found, disk not found or faulty disk.
if IsErrIgnored(err, xlTreeWalkIgnoredErrs...) {
continue
}
reqInfo := &logger.ReqInfo{BucketName: bucket}
reqInfo.AppendTags("prefix", prefix)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
} // Exhausted all disks - return false.
return false
}
// Calculate the space occupied by an object in a single disk
func (xl xlObjects) sizeOnDisk(fileSize int64, blockSize int64, dataBlocks int) int64 {
numBlocks := fileSize / blockSize

@ -422,7 +422,7 @@ func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, o
}
// List and delete the object directory,
files, derr := disk.ListDir(bucket, object)
files, derr := disk.ListDir(bucket, object, -1)
if derr == nil {
for _, entry := range files {
_ = disk.DeleteFile(bucket,

@ -33,7 +33,7 @@ func listDirFactory(ctx context.Context, isLeaf isLeafFunc, treeWalkIgnoredErrs
}
var entries []string
var newEntries []string
entries, err = disk.ListDir(bucket, prefixDir)
entries, err = disk.ListDir(bucket, prefixDir, -1)
if err != nil {
// For any reason disk was deleted or goes offline, continue
// and list from other disks if possible.
@ -78,8 +78,9 @@ func (xl xlObjects) listObjects(ctx context.Context, bucket, prefix, marker, del
if walkResultCh == nil {
endWalkCh = make(chan struct{})
isLeaf := xl.isObject
isLeafDir := xl.isObjectDir
listDir := listDirFactory(ctx, isLeaf, xlTreeWalkIgnoredErrs, xl.getLoadBalancedDisks()...)
walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, endWalkCh)
walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh)
}
var objInfos []ObjectInfo
@ -136,7 +137,7 @@ func (xl xlObjects) listObjects(ctx context.Context, bucket, prefix, marker, del
result := ListObjectsInfo{IsTruncated: !eof}
for _, objInfo := range objInfos {
result.NextMarker = objInfo.Name
if objInfo.IsDir {
if objInfo.IsDir && delimiter == slashSeparator {
result.Prefixes = append(result.Prefixes, objInfo.Name)
continue
}

@ -156,7 +156,7 @@ func (xl xlObjects) ListMultipartUploads(ctx context.Context, bucket, object, ke
if disk == nil {
continue
}
uploadIDs, err := disk.ListDir(minioMetaMultipartBucket, xl.getMultipartSHADir(bucket, object))
uploadIDs, err := disk.ListDir(minioMetaMultipartBucket, xl.getMultipartSHADir(bucket, object), -1)
if err != nil {
if err == errFileNotFound {
return result, nil
@ -862,12 +862,12 @@ func (xl xlObjects) cleanupStaleMultipartUploads(ctx context.Context, cleanupInt
// Remove the old multipart uploads on the given disk.
func (xl xlObjects) cleanupStaleMultipartUploadsOnDisk(ctx context.Context, disk StorageAPI, expiry time.Duration) {
now := time.Now()
shaDirs, err := disk.ListDir(minioMetaMultipartBucket, "")
shaDirs, err := disk.ListDir(minioMetaMultipartBucket, "", -1)
if err != nil {
return
}
for _, shaDir := range shaDirs {
uploadIDDirs, err := disk.ListDir(minioMetaMultipartBucket, shaDir)
uploadIDDirs, err := disk.ListDir(minioMetaMultipartBucket, shaDir, -1)
if err != nil {
continue
}

@ -359,6 +359,9 @@ func (xl xlObjects) GetObjectInfo(ctx context.Context, bucket, object string) (o
}
if hasSuffix(object, slashSeparator) {
if !xl.isObjectDir(bucket, object) {
return oi, toObjectErr(errFileNotFound, bucket, object)
}
if oi, e = xl.getObjectInfoDir(ctx, bucket, object); e != nil {
return oi, toObjectErr(e, bucket, object)
}

Loading…
Cancel
Save