support directory objects in listing in certain scenarios (#11452)

When a directory object is presented as a `prefix`
param our implementation tend to only list objects
present common to the `prefix` than the `prefix` itself,
to mimic AWS S3 like flat key behavior this PR ensures
that if `prefix` is directory object, it should be
automatically considered to be part of the eventual
listing result.

fixes #11370
master
Harshavardhana 4 years ago committed by GitHub
parent 8bb580abfc
commit c9b0f595b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 30
      cmd/metacache-walk.go
  2. 62
      cmd/xl-storage.go

@ -66,7 +66,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
} }
// Stat a volume entry. // Stat a volume entry.
_, err = os.Stat(volumeDir) _, err = os.Lstat(volumeDir)
if err != nil { if err != nil {
if osIsNotExist(err) { if osIsNotExist(err) {
return errVolumeNotFound return errVolumeNotFound
@ -76,13 +76,6 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
return err return err
} }
// Fast exit track to check if we are listing an object with
// a trailing slash, this will avoid to list the object content.
if HasSuffix(opts.BaseDir, SlashSeparator) {
if st, err := os.Stat(pathJoin(volumeDir, opts.BaseDir, xlStorageFormatFile)); err == nil && st.Mode().IsRegular() {
return errFileNotFound
}
}
// Use a small block size to start sending quickly // Use a small block size to start sending quickly
w := newMetacacheWriter(wr, 16<<10) w := newMetacacheWriter(wr, 16<<10)
defer w.Close() defer w.Close()
@ -92,6 +85,27 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
} }
defer close(out) defer close(out)
// Fast exit track to check if we are listing an object with
// a trailing slash, this will avoid to list the object content.
if HasSuffix(opts.BaseDir, SlashSeparator) {
metadata, err := ioutil.ReadFile(pathJoin(volumeDir,
opts.BaseDir[:len(opts.BaseDir)-1]+globalDirSuffix,
xlStorageFormatFile))
if err == nil {
// if baseDir is already a directory object, consider it
// as part of the list call, this is a AWS S3 specific
// behavior.
out <- metaCacheEntry{
name: opts.BaseDir,
metadata: metadata,
}
} else {
if st, err := os.Lstat(pathJoin(volumeDir, opts.BaseDir, xlStorageFormatFile)); err == nil && st.Mode().IsRegular() {
return errFileNotFound
}
}
}
prefix := opts.FilterPrefix prefix := opts.FilterPrefix
var scanDir func(path string) error var scanDir func(path string) error
scanDir = func(current string) error { scanDir = func(current string) error {

@ -168,7 +168,7 @@ func getValidPath(path string) (string, error) {
return path, err return path, err
} }
fi, err := os.Stat(path) fi, err := os.Lstat(path)
if err != nil && !osIsNotExist(err) { if err != nil && !osIsNotExist(err) {
return path, err return path, err
} }
@ -323,7 +323,7 @@ func (s *xlStorage) IsLocal() bool {
func (s *xlStorage) Healing() bool { func (s *xlStorage) Healing() bool {
healingFile := pathJoin(s.diskPath, minioMetaBucket, healingFile := pathJoin(s.diskPath, minioMetaBucket,
bucketMetaPrefix, healingTrackerFilename) bucketMetaPrefix, healingTrackerFilename)
_, err := os.Stat(healingFile) _, err := os.Lstat(healingFile)
return err == nil return err == nil
} }
@ -483,11 +483,11 @@ func (s *xlStorage) GetDiskID() (string, error) {
s.Unlock() s.Unlock()
formatFile := pathJoin(s.diskPath, minioMetaBucket, formatConfigFile) formatFile := pathJoin(s.diskPath, minioMetaBucket, formatConfigFile)
fi, err := os.Stat(formatFile) fi, err := os.Lstat(formatFile)
if err != nil { if err != nil {
// If the disk is still not initialized. // If the disk is still not initialized.
if osIsNotExist(err) { if osIsNotExist(err) {
_, err = os.Stat(s.diskPath) _, err = os.Lstat(s.diskPath)
if err == nil { if err == nil {
// Disk is present but missing `format.json` // Disk is present but missing `format.json`
return "", errUnformattedDisk return "", errUnformattedDisk
@ -518,7 +518,7 @@ func (s *xlStorage) GetDiskID() (string, error) {
if err != nil { if err != nil {
// If the disk is still not initialized. // If the disk is still not initialized.
if osIsNotExist(err) { if osIsNotExist(err) {
_, err = os.Stat(s.diskPath) _, err = os.Lstat(s.diskPath)
if err == nil { if err == nil {
// Disk is present but missing `format.json` // Disk is present but missing `format.json`
return "", errUnformattedDisk return "", errUnformattedDisk
@ -586,7 +586,7 @@ func (s *xlStorage) MakeVol(ctx context.Context, volume string) error {
return err return err
} }
if _, err := os.Stat(volumeDir); err != nil { if _, err := os.Lstat(volumeDir); err != nil {
// Volume does not exist we proceed to create. // Volume does not exist we proceed to create.
if osIsNotExist(err) { if osIsNotExist(err) {
// Make a volume entry, with mode 0777 mkdir honors system umask. // Make a volume entry, with mode 0777 mkdir honors system umask.
@ -630,7 +630,7 @@ func listVols(dirPath string) ([]VolInfo, error) {
continue continue
} }
var fi os.FileInfo var fi os.FileInfo
fi, err = os.Stat(pathJoin(dirPath, entry)) fi, err = os.Lstat(pathJoin(dirPath, entry))
if err != nil { if err != nil {
// If the file does not exist, skip the entry. // If the file does not exist, skip the entry.
if osIsNotExist(err) { if osIsNotExist(err) {
@ -642,7 +642,7 @@ func listVols(dirPath string) ([]VolInfo, error) {
} }
volsInfo = append(volsInfo, VolInfo{ volsInfo = append(volsInfo, VolInfo{
Name: fi.Name(), Name: fi.Name(),
// As os.Stat() doesn't carry other than ModTime(), use // As os.Lstat() doesn't carry other than ModTime(), use
// ModTime() as CreatedTime. // ModTime() as CreatedTime.
Created: fi.ModTime(), Created: fi.ModTime(),
}) })
@ -664,7 +664,7 @@ func (s *xlStorage) StatVol(ctx context.Context, volume string) (vol VolInfo, er
} }
// Stat a volume entry. // Stat a volume entry.
var st os.FileInfo var st os.FileInfo
st, err = os.Stat(volumeDir) st, err = os.Lstat(volumeDir)
if err != nil { if err != nil {
switch { switch {
case osIsNotExist(err): case osIsNotExist(err):
@ -677,7 +677,7 @@ func (s *xlStorage) StatVol(ctx context.Context, volume string) (vol VolInfo, er
return VolInfo{}, err return VolInfo{}, err
} }
} }
// As os.Stat() doesn't carry other than ModTime(), use ModTime() // As os.Lstat() doesn't carry other than ModTime(), use ModTime()
// as CreatedTime. // as CreatedTime.
createdTime := st.ModTime() createdTime := st.ModTime()
return VolInfo{ return VolInfo{
@ -728,7 +728,7 @@ func (s *xlStorage) isLeaf(volume string, leafPath string) bool {
return false return false
} }
_, err = os.Stat(pathJoin(volumeDir, leafPath, xlStorageFormatFile)) _, err = os.Lstat(pathJoin(volumeDir, leafPath, xlStorageFormatFile))
if err == nil { if err == nil {
return true return true
} }
@ -766,7 +766,7 @@ func (s *xlStorage) WalkVersions(ctx context.Context, volume, dirPath, marker st
} }
// Stat a volume entry. // Stat a volume entry.
_, err = os.Stat(volumeDir) _, err = os.Lstat(volumeDir)
if err != nil { if err != nil {
if osIsNotExist(err) { if osIsNotExist(err) {
return nil, errVolumeNotFound return nil, errVolumeNotFound
@ -779,7 +779,7 @@ func (s *xlStorage) WalkVersions(ctx context.Context, volume, dirPath, marker st
// Fast exit track to check if we are listing an object with // Fast exit track to check if we are listing an object with
// a trailing slash, this will avoid to list the object content. // a trailing slash, this will avoid to list the object content.
if HasSuffix(dirPath, SlashSeparator) { if HasSuffix(dirPath, SlashSeparator) {
if st, err := os.Stat(pathJoin(volumeDir, dirPath, xlStorageFormatFile)); err == nil && st.Mode().IsRegular() { if st, err := os.Lstat(pathJoin(volumeDir, dirPath, xlStorageFormatFile)); err == nil && st.Mode().IsRegular() {
return nil, errFileNotFound return nil, errFileNotFound
} }
} }
@ -859,7 +859,7 @@ func (s *xlStorage) ListDir(ctx context.Context, volume, dirPath string, count i
} }
if err != nil { if err != nil {
if err == errFileNotFound { if err == errFileNotFound {
if _, verr := os.Stat(volumeDir); verr != nil { if _, verr := os.Lstat(volumeDir); verr != nil {
if osIsNotExist(verr) { if osIsNotExist(verr) {
return nil, errVolumeNotFound return nil, errVolumeNotFound
} else if isSysErrIO(verr) { } else if isSysErrIO(verr) {
@ -1147,7 +1147,7 @@ func (s *xlStorage) readAllData(volumeDir, filePath string, requireDirectIO bool
if osIsNotExist(err) { if osIsNotExist(err) {
// Check if the object doesn't exist because its bucket // Check if the object doesn't exist because its bucket
// is missing in order to return the correct error. // is missing in order to return the correct error.
_, err = os.Stat(volumeDir) _, err = os.Lstat(volumeDir)
if err != nil && osIsNotExist(err) { if err != nil && osIsNotExist(err) {
return nil, errVolumeNotFound return nil, errVolumeNotFound
} }
@ -1208,7 +1208,7 @@ func (s *xlStorage) ReadAll(ctx context.Context, volume string, path string) (bu
if osIsNotExist(err) { if osIsNotExist(err) {
// Check if the object doesn't exist because its bucket // Check if the object doesn't exist because its bucket
// is missing in order to return the correct error. // is missing in order to return the correct error.
_, err = os.Stat(volumeDir) _, err = os.Lstat(volumeDir)
if err != nil && osIsNotExist(err) { if err != nil && osIsNotExist(err) {
return nil, errVolumeNotFound return nil, errVolumeNotFound
} }
@ -1264,7 +1264,7 @@ func (s *xlStorage) ReadFile(ctx context.Context, volume string, path string, of
var n int var n int
// Stat a volume entry. // Stat a volume entry.
_, err = os.Stat(volumeDir) _, err = os.Lstat(volumeDir)
if err != nil { if err != nil {
if osIsNotExist(err) { if osIsNotExist(err) {
return 0, errVolumeNotFound return 0, errVolumeNotFound
@ -1349,7 +1349,7 @@ func (s *xlStorage) openFile(volume, path string, mode int) (f *os.File, err err
} }
// Stat a volume entry. // Stat a volume entry.
_, err = os.Stat(volumeDir) _, err = os.Lstat(volumeDir)
if err != nil { if err != nil {
if osIsNotExist(err) { if osIsNotExist(err) {
return nil, errVolumeNotFound return nil, errVolumeNotFound
@ -1366,7 +1366,7 @@ func (s *xlStorage) openFile(volume, path string, mode int) (f *os.File, err err
// Verify if the file already exists and is not of regular type. // Verify if the file already exists and is not of regular type.
var st os.FileInfo var st os.FileInfo
if st, err = os.Stat(filePath); err == nil { if st, err = os.Lstat(filePath); err == nil {
if !st.Mode().IsRegular() { if !st.Mode().IsRegular() {
return nil, errIsNotRegular return nil, errIsNotRegular
} }
@ -1487,7 +1487,7 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off
if err != nil { if err != nil {
switch { switch {
case osIsNotExist(err): case osIsNotExist(err):
_, err = os.Stat(volumeDir) _, err = os.Lstat(volumeDir)
if err != nil && osIsNotExist(err) { if err != nil && osIsNotExist(err) {
return nil, errVolumeNotFound return nil, errVolumeNotFound
} }
@ -1582,7 +1582,7 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz
} }
// Stat a volume entry. // Stat a volume entry.
_, err = os.Stat(volumeDir) _, err = os.Lstat(volumeDir)
if err != nil { if err != nil {
if osIsNotExist(err) { if osIsNotExist(err) {
return errVolumeNotFound return errVolumeNotFound
@ -1740,7 +1740,7 @@ func (s *xlStorage) CheckParts(ctx context.Context, volume string, path string,
} }
// Stat a volume entry. // Stat a volume entry.
if _, err = os.Stat(volumeDir); err != nil { if _, err = os.Lstat(volumeDir); err != nil {
if osIsNotExist(err) { if osIsNotExist(err) {
return errVolumeNotFound return errVolumeNotFound
} }
@ -1756,7 +1756,7 @@ func (s *xlStorage) CheckParts(ctx context.Context, volume string, path string,
if err = checkPathLength(filePath); err != nil { if err = checkPathLength(filePath); err != nil {
return err return err
} }
st, err := os.Stat(filePath) st, err := os.Lstat(filePath)
if err != nil { if err != nil {
return osErrToFileErr(err) return osErrToFileErr(err)
} }
@ -1900,7 +1900,7 @@ func (s *xlStorage) Delete(ctx context.Context, volume string, path string, recu
} }
// Stat a volume entry. // Stat a volume entry.
_, err = os.Stat(volumeDir) _, err = os.Lstat(volumeDir)
if err != nil { if err != nil {
if osIsNotExist(err) { if osIsNotExist(err) {
return errVolumeNotFound return errVolumeNotFound
@ -1935,7 +1935,7 @@ func (s *xlStorage) DeleteFileBulk(volume string, paths []string) (errs []error,
} }
// Stat a volume entry. // Stat a volume entry.
_, err = os.Stat(volumeDir) _, err = os.Lstat(volumeDir)
if err != nil { if err != nil {
if osIsNotExist(err) { if osIsNotExist(err) {
return nil, errVolumeNotFound return nil, errVolumeNotFound
@ -1980,7 +1980,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
} }
// Stat a volume entry. // Stat a volume entry.
_, err = os.Stat(srcVolumeDir) _, err = os.Lstat(srcVolumeDir)
if err != nil { if err != nil {
if osIsNotExist(err) { if osIsNotExist(err) {
return errVolumeNotFound return errVolumeNotFound
@ -1990,7 +1990,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
return err return err
} }
_, err = os.Stat(dstVolumeDir) _, err = os.Lstat(dstVolumeDir)
if err != nil { if err != nil {
if osIsNotExist(err) { if osIsNotExist(err) {
return errVolumeNotFound return errVolumeNotFound
@ -2233,7 +2233,7 @@ func (s *xlStorage) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolum
return err return err
} }
// Stat a volume entry. // Stat a volume entry.
_, err = os.Stat(srcVolumeDir) _, err = os.Lstat(srcVolumeDir)
if err != nil { if err != nil {
if osIsNotExist(err) { if osIsNotExist(err) {
return errVolumeNotFound return errVolumeNotFound
@ -2242,7 +2242,7 @@ func (s *xlStorage) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolum
} }
return err return err
} }
_, err = os.Stat(dstVolumeDir) _, err = os.Lstat(dstVolumeDir)
if err != nil { if err != nil {
if osIsNotExist(err) { if osIsNotExist(err) {
return errVolumeNotFound return errVolumeNotFound
@ -2270,7 +2270,7 @@ func (s *xlStorage) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolum
// If source is a directory, we expect the destination to be non-existent but we // If source is a directory, we expect the destination to be non-existent but we
// we still need to allow overwriting an empty directory since it represents // we still need to allow overwriting an empty directory since it represents
// an object empty directory. // an object empty directory.
_, err = os.Stat(dstFilePath) _, err = os.Lstat(dstFilePath)
if isSysErrIO(err) { if isSysErrIO(err) {
return errFaultyDisk return errFaultyDisk
} }
@ -2383,7 +2383,7 @@ func (s *xlStorage) VerifyFile(ctx context.Context, volume, path string, fi File
} }
// Stat a volume entry. // Stat a volume entry.
_, err = os.Stat(volumeDir) _, err = os.Lstat(volumeDir)
if err != nil { if err != nil {
if osIsNotExist(err) { if osIsNotExist(err) {
return errVolumeNotFound return errVolumeNotFound

Loading…
Cancel
Save