From c9b0f595b99a9c874b41c1399ebb57ac615c609b Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 5 Feb 2021 10:12:25 -0800 Subject: [PATCH] support directory objects in listing in certain scenarios (#11452) When a directory object is presented as a `prefix` param our implementation tend to only list objects present common to the `prefix` than the `prefix` itself, to mimic AWS S3 like flat key behavior this PR ensures that if `prefix` is directory object, it should be automatically considered to be part of the eventual listing result. fixes #11370 --- cmd/metacache-walk.go | 30 +++++++++++++++------ cmd/xl-storage.go | 62 +++++++++++++++++++++---------------------- 2 files changed, 53 insertions(+), 39 deletions(-) diff --git a/cmd/metacache-walk.go b/cmd/metacache-walk.go index fc73abf61..4c5ac33ae 100644 --- a/cmd/metacache-walk.go +++ b/cmd/metacache-walk.go @@ -66,7 +66,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ } // Stat a volume entry. - _, err = os.Stat(volumeDir) + _, err = os.Lstat(volumeDir) if err != nil { if osIsNotExist(err) { return errVolumeNotFound @@ -76,13 +76,6 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ return err } - // Fast exit track to check if we are listing an object with - // a trailing slash, this will avoid to list the object content. - if HasSuffix(opts.BaseDir, SlashSeparator) { - if st, err := os.Stat(pathJoin(volumeDir, opts.BaseDir, xlStorageFormatFile)); err == nil && st.Mode().IsRegular() { - return errFileNotFound - } - } // Use a small block size to start sending quickly w := newMetacacheWriter(wr, 16<<10) defer w.Close() @@ -92,6 +85,27 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ } defer close(out) + // Fast exit track to check if we are listing an object with + // a trailing slash, this will avoid to list the object content. + if HasSuffix(opts.BaseDir, SlashSeparator) { + metadata, err := ioutil.ReadFile(pathJoin(volumeDir, + opts.BaseDir[:len(opts.BaseDir)-1]+globalDirSuffix, + xlStorageFormatFile)) + if err == nil { + // if baseDir is already a directory object, consider it + // as part of the list call, this is a AWS S3 specific + // behavior. + out <- metaCacheEntry{ + name: opts.BaseDir, + metadata: metadata, + } + } else { + if st, err := os.Lstat(pathJoin(volumeDir, opts.BaseDir, xlStorageFormatFile)); err == nil && st.Mode().IsRegular() { + return errFileNotFound + } + } + } + prefix := opts.FilterPrefix var scanDir func(path string) error scanDir = func(current string) error { diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 063391364..753ccf7fe 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -168,7 +168,7 @@ func getValidPath(path string) (string, error) { return path, err } - fi, err := os.Stat(path) + fi, err := os.Lstat(path) if err != nil && !osIsNotExist(err) { return path, err } @@ -323,7 +323,7 @@ func (s *xlStorage) IsLocal() bool { func (s *xlStorage) Healing() bool { healingFile := pathJoin(s.diskPath, minioMetaBucket, bucketMetaPrefix, healingTrackerFilename) - _, err := os.Stat(healingFile) + _, err := os.Lstat(healingFile) return err == nil } @@ -483,11 +483,11 @@ func (s *xlStorage) GetDiskID() (string, error) { s.Unlock() formatFile := pathJoin(s.diskPath, minioMetaBucket, formatConfigFile) - fi, err := os.Stat(formatFile) + fi, err := os.Lstat(formatFile) if err != nil { // If the disk is still not initialized. if osIsNotExist(err) { - _, err = os.Stat(s.diskPath) + _, err = os.Lstat(s.diskPath) if err == nil { // Disk is present but missing `format.json` return "", errUnformattedDisk @@ -518,7 +518,7 @@ func (s *xlStorage) GetDiskID() (string, error) { if err != nil { // If the disk is still not initialized. if osIsNotExist(err) { - _, err = os.Stat(s.diskPath) + _, err = os.Lstat(s.diskPath) if err == nil { // Disk is present but missing `format.json` return "", errUnformattedDisk @@ -586,7 +586,7 @@ func (s *xlStorage) MakeVol(ctx context.Context, volume string) error { return err } - if _, err := os.Stat(volumeDir); err != nil { + if _, err := os.Lstat(volumeDir); err != nil { // Volume does not exist we proceed to create. if osIsNotExist(err) { // Make a volume entry, with mode 0777 mkdir honors system umask. @@ -630,7 +630,7 @@ func listVols(dirPath string) ([]VolInfo, error) { continue } var fi os.FileInfo - fi, err = os.Stat(pathJoin(dirPath, entry)) + fi, err = os.Lstat(pathJoin(dirPath, entry)) if err != nil { // If the file does not exist, skip the entry. if osIsNotExist(err) { @@ -642,7 +642,7 @@ func listVols(dirPath string) ([]VolInfo, error) { } volsInfo = append(volsInfo, VolInfo{ Name: fi.Name(), - // As os.Stat() doesn't carry other than ModTime(), use + // As os.Lstat() doesn't carry other than ModTime(), use // ModTime() as CreatedTime. Created: fi.ModTime(), }) @@ -664,7 +664,7 @@ func (s *xlStorage) StatVol(ctx context.Context, volume string) (vol VolInfo, er } // Stat a volume entry. var st os.FileInfo - st, err = os.Stat(volumeDir) + st, err = os.Lstat(volumeDir) if err != nil { switch { case osIsNotExist(err): @@ -677,7 +677,7 @@ func (s *xlStorage) StatVol(ctx context.Context, volume string) (vol VolInfo, er return VolInfo{}, err } } - // As os.Stat() doesn't carry other than ModTime(), use ModTime() + // As os.Lstat() doesn't carry other than ModTime(), use ModTime() // as CreatedTime. createdTime := st.ModTime() return VolInfo{ @@ -728,7 +728,7 @@ func (s *xlStorage) isLeaf(volume string, leafPath string) bool { return false } - _, err = os.Stat(pathJoin(volumeDir, leafPath, xlStorageFormatFile)) + _, err = os.Lstat(pathJoin(volumeDir, leafPath, xlStorageFormatFile)) if err == nil { return true } @@ -766,7 +766,7 @@ func (s *xlStorage) WalkVersions(ctx context.Context, volume, dirPath, marker st } // Stat a volume entry. - _, err = os.Stat(volumeDir) + _, err = os.Lstat(volumeDir) if err != nil { if osIsNotExist(err) { return nil, errVolumeNotFound @@ -779,7 +779,7 @@ func (s *xlStorage) WalkVersions(ctx context.Context, volume, dirPath, marker st // Fast exit track to check if we are listing an object with // a trailing slash, this will avoid to list the object content. if HasSuffix(dirPath, SlashSeparator) { - if st, err := os.Stat(pathJoin(volumeDir, dirPath, xlStorageFormatFile)); err == nil && st.Mode().IsRegular() { + if st, err := os.Lstat(pathJoin(volumeDir, dirPath, xlStorageFormatFile)); err == nil && st.Mode().IsRegular() { return nil, errFileNotFound } } @@ -859,7 +859,7 @@ func (s *xlStorage) ListDir(ctx context.Context, volume, dirPath string, count i } if err != nil { if err == errFileNotFound { - if _, verr := os.Stat(volumeDir); verr != nil { + if _, verr := os.Lstat(volumeDir); verr != nil { if osIsNotExist(verr) { return nil, errVolumeNotFound } else if isSysErrIO(verr) { @@ -1147,7 +1147,7 @@ func (s *xlStorage) readAllData(volumeDir, filePath string, requireDirectIO bool if osIsNotExist(err) { // Check if the object doesn't exist because its bucket // is missing in order to return the correct error. - _, err = os.Stat(volumeDir) + _, err = os.Lstat(volumeDir) if err != nil && osIsNotExist(err) { return nil, errVolumeNotFound } @@ -1208,7 +1208,7 @@ func (s *xlStorage) ReadAll(ctx context.Context, volume string, path string) (bu if osIsNotExist(err) { // Check if the object doesn't exist because its bucket // is missing in order to return the correct error. - _, err = os.Stat(volumeDir) + _, err = os.Lstat(volumeDir) if err != nil && osIsNotExist(err) { return nil, errVolumeNotFound } @@ -1264,7 +1264,7 @@ func (s *xlStorage) ReadFile(ctx context.Context, volume string, path string, of var n int // Stat a volume entry. - _, err = os.Stat(volumeDir) + _, err = os.Lstat(volumeDir) if err != nil { if osIsNotExist(err) { return 0, errVolumeNotFound @@ -1349,7 +1349,7 @@ func (s *xlStorage) openFile(volume, path string, mode int) (f *os.File, err err } // Stat a volume entry. - _, err = os.Stat(volumeDir) + _, err = os.Lstat(volumeDir) if err != nil { if osIsNotExist(err) { return nil, errVolumeNotFound @@ -1366,7 +1366,7 @@ func (s *xlStorage) openFile(volume, path string, mode int) (f *os.File, err err // Verify if the file already exists and is not of regular type. var st os.FileInfo - if st, err = os.Stat(filePath); err == nil { + if st, err = os.Lstat(filePath); err == nil { if !st.Mode().IsRegular() { return nil, errIsNotRegular } @@ -1487,7 +1487,7 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off if err != nil { switch { case osIsNotExist(err): - _, err = os.Stat(volumeDir) + _, err = os.Lstat(volumeDir) if err != nil && osIsNotExist(err) { return nil, errVolumeNotFound } @@ -1582,7 +1582,7 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz } // Stat a volume entry. - _, err = os.Stat(volumeDir) + _, err = os.Lstat(volumeDir) if err != nil { if osIsNotExist(err) { return errVolumeNotFound @@ -1740,7 +1740,7 @@ func (s *xlStorage) CheckParts(ctx context.Context, volume string, path string, } // Stat a volume entry. - if _, err = os.Stat(volumeDir); err != nil { + if _, err = os.Lstat(volumeDir); err != nil { if osIsNotExist(err) { return errVolumeNotFound } @@ -1756,7 +1756,7 @@ func (s *xlStorage) CheckParts(ctx context.Context, volume string, path string, if err = checkPathLength(filePath); err != nil { return err } - st, err := os.Stat(filePath) + st, err := os.Lstat(filePath) if err != nil { return osErrToFileErr(err) } @@ -1900,7 +1900,7 @@ func (s *xlStorage) Delete(ctx context.Context, volume string, path string, recu } // Stat a volume entry. - _, err = os.Stat(volumeDir) + _, err = os.Lstat(volumeDir) if err != nil { if osIsNotExist(err) { return errVolumeNotFound @@ -1935,7 +1935,7 @@ func (s *xlStorage) DeleteFileBulk(volume string, paths []string) (errs []error, } // Stat a volume entry. - _, err = os.Stat(volumeDir) + _, err = os.Lstat(volumeDir) if err != nil { if osIsNotExist(err) { return nil, errVolumeNotFound @@ -1980,7 +1980,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir, } // Stat a volume entry. - _, err = os.Stat(srcVolumeDir) + _, err = os.Lstat(srcVolumeDir) if err != nil { if osIsNotExist(err) { return errVolumeNotFound @@ -1990,7 +1990,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir, return err } - _, err = os.Stat(dstVolumeDir) + _, err = os.Lstat(dstVolumeDir) if err != nil { if osIsNotExist(err) { return errVolumeNotFound @@ -2233,7 +2233,7 @@ func (s *xlStorage) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolum return err } // Stat a volume entry. - _, err = os.Stat(srcVolumeDir) + _, err = os.Lstat(srcVolumeDir) if err != nil { if osIsNotExist(err) { return errVolumeNotFound @@ -2242,7 +2242,7 @@ func (s *xlStorage) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolum } return err } - _, err = os.Stat(dstVolumeDir) + _, err = os.Lstat(dstVolumeDir) if err != nil { if osIsNotExist(err) { return errVolumeNotFound @@ -2270,7 +2270,7 @@ func (s *xlStorage) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolum // If source is a directory, we expect the destination to be non-existent but we // we still need to allow overwriting an empty directory since it represents // an object empty directory. - _, err = os.Stat(dstFilePath) + _, err = os.Lstat(dstFilePath) if isSysErrIO(err) { return errFaultyDisk } @@ -2383,7 +2383,7 @@ func (s *xlStorage) VerifyFile(ctx context.Context, volume, path string, fi File } // Stat a volume entry. - _, err = os.Stat(volumeDir) + _, err = os.Lstat(volumeDir) if err != nil { if osIsNotExist(err) { return errVolumeNotFound