diff --git a/cmd/gateway/hdfs/gateway-hdfs.go b/cmd/gateway/hdfs/gateway-hdfs.go index 66924a19a..0416e77ea 100644 --- a/cmd/gateway/hdfs/gateway-hdfs.go +++ b/cmd/gateway/hdfs/gateway-hdfs.go @@ -384,28 +384,78 @@ func (n *hdfsObjects) listDirFactory() minio.ListDirFunc { // ListObjects lists all blobs in HDFS bucket filtered by prefix. func (n *hdfsObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi minio.ListObjectsInfo, err error) { - if _, err := n.clnt.Stat(n.hdfsPathJoin(bucket)); err != nil { + fileInfos := make(map[string]os.FileInfo) + directoryPath := n.hdfsPathJoin(bucket, prefix) + + if err = n.populateDirectoryListing(directoryPath, fileInfos); err != nil { return loi, hdfsToObjectErr(ctx, err, bucket) } getObjectInfo := func(ctx context.Context, bucket, entry string) (minio.ObjectInfo, error) { - fi, err := n.clnt.Stat(n.hdfsPathJoin(bucket, entry)) - if err != nil { - return minio.ObjectInfo{}, hdfsToObjectErr(ctx, err, bucket, entry) + filePath := path.Clean(n.hdfsPathJoin(bucket, entry)) + fi, ok := fileInfos[filePath] + + // If the file info is not known, this may be a recursive listing and filePath is a + // child of a sub-directory. In this case, obtain that sub-directory's listing. + if !ok { + parentPath := path.Dir(filePath) + + if err := n.populateDirectoryListing(parentPath, fileInfos); err != nil { + return minio.ObjectInfo{}, hdfsToObjectErr(ctx, err, bucket) + } + + fi, ok = fileInfos[filePath] + + if !ok { + err = fmt.Errorf("could not get FileInfo for path '%s'", filePath) + return minio.ObjectInfo{}, hdfsToObjectErr(ctx, err, bucket, entry) + } } - return minio.ObjectInfo{ + + objectInfo := minio.ObjectInfo{ Bucket: bucket, Name: entry, ModTime: fi.ModTime(), Size: fi.Size(), IsDir: fi.IsDir(), AccTime: fi.(*hdfs.FileInfo).AccessTime(), - }, nil + } + + delete(fileInfos, filePath) + + return objectInfo, nil } return minio.ListObjects(ctx, n, bucket, prefix, marker, delimiter, maxKeys, n.listPool, n.listDirFactory(), getObjectInfo, getObjectInfo) } +// Lists a path's direct, first-level entries and populates them in the `fileInfos` cache which maps +// a path entry to an `os.FileInfo`. It also saves the listed path's `os.FileInfo` in the cache. +func (n *hdfsObjects) populateDirectoryListing(filePath string, fileInfos map[string]os.FileInfo) error { + dirReader, err := n.clnt.Open(filePath) + + if err != nil { + return err + } + + dirStat := dirReader.Stat() + key := path.Clean(filePath) + + fileInfos[key] = dirStat + infos, err := dirReader.Readdir(0) + + if err != nil { + return err + } + + for _, fileInfo := range infos { + filePath := n.hdfsPathJoin(filePath, fileInfo.Name()) + fileInfos[filePath] = fileInfo + } + + return nil +} + // deleteObject deletes a file path if its empty. If it's successfully deleted, // it will recursively move up the tree, deleting empty parent directories // until it finds one with files in it. Returns nil for a non-empty directory.