From 0a2e6d58a578ff7ad354d3e4cd88876a051b8f03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jorge=20Israel=20Pe=C3=B1a?= Date: Wed, 26 Aug 2020 16:03:53 -0700 Subject: [PATCH] hdfs gateway handle listing single files (#10362) --- cmd/gateway/hdfs/gateway-hdfs.go | 53 +++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/cmd/gateway/hdfs/gateway-hdfs.go b/cmd/gateway/hdfs/gateway-hdfs.go index 6160dca0f..39ae7cc49 100644 --- a/cmd/gateway/hdfs/gateway-hdfs.go +++ b/cmd/gateway/hdfs/gateway-hdfs.go @@ -392,12 +392,27 @@ func (n *hdfsObjects) listDirFactory() minio.ListDirFunc { // ListObjects lists all blobs in HDFS bucket filtered by prefix. func (n *hdfsObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi minio.ListObjectsInfo, err error) { fileInfos := make(map[string]os.FileInfo) - directoryPath := n.hdfsPathJoin(bucket, prefix) + targetPath := n.hdfsPathJoin(bucket, prefix) - if err = n.populateDirectoryListing(directoryPath, fileInfos); err != nil { + var targetFileInfo os.FileInfo + + if targetFileInfo, err = n.populateDirectoryListing(targetPath, fileInfos); err != nil { return loi, hdfsToObjectErr(ctx, err, bucket) } + // If the user is trying to list a single file, bypass the entire directory-walking code below + // and just return the single file's information. + if !targetFileInfo.IsDir() { + return minio.ListObjectsInfo{ + IsTruncated: false, + NextMarker: "", + Objects: []minio.ObjectInfo{ + fileInfoToObjectInfo(bucket, prefix, targetFileInfo), + }, + Prefixes: []string{}, + }, nil + } + getObjectInfo := func(ctx context.Context, bucket, entry string) (minio.ObjectInfo, error) { filePath := path.Clean(n.hdfsPathJoin(bucket, entry)) fi, ok := fileInfos[filePath] @@ -407,7 +422,7 @@ func (n *hdfsObjects) ListObjects(ctx context.Context, bucket, prefix, marker, d if !ok { parentPath := path.Dir(filePath) - if err := n.populateDirectoryListing(parentPath, fileInfos); err != nil { + if _, err := n.populateDirectoryListing(parentPath, fileInfos); err != nil { return minio.ObjectInfo{}, hdfsToObjectErr(ctx, err, bucket) } @@ -419,14 +434,7 @@ func (n *hdfsObjects) ListObjects(ctx context.Context, bucket, prefix, marker, d } } - objectInfo := minio.ObjectInfo{ - Bucket: bucket, - Name: entry, - ModTime: fi.ModTime(), - Size: fi.Size(), - IsDir: fi.IsDir(), - AccTime: fi.(*hdfs.FileInfo).AccessTime(), - } + objectInfo := fileInfoToObjectInfo(bucket, entry, fi) delete(fileInfos, filePath) @@ -436,23 +444,38 @@ func (n *hdfsObjects) ListObjects(ctx context.Context, bucket, prefix, marker, d return minio.ListObjects(ctx, n, bucket, prefix, marker, delimiter, maxKeys, n.listPool, n.listDirFactory(), n.isLeaf, n.isLeafDir, getObjectInfo, getObjectInfo) } +func fileInfoToObjectInfo(bucket string, entry string, fi os.FileInfo) minio.ObjectInfo { + return minio.ObjectInfo{ + Bucket: bucket, + Name: entry, + ModTime: fi.ModTime(), + Size: fi.Size(), + IsDir: fi.IsDir(), + AccTime: fi.(*hdfs.FileInfo).AccessTime(), + } +} + // Lists a path's direct, first-level entries and populates them in the `fileInfos` cache which maps // a path entry to an `os.FileInfo`. It also saves the listed path's `os.FileInfo` in the cache. -func (n *hdfsObjects) populateDirectoryListing(filePath string, fileInfos map[string]os.FileInfo) error { +func (n *hdfsObjects) populateDirectoryListing(filePath string, fileInfos map[string]os.FileInfo) (os.FileInfo, error) { dirReader, err := n.clnt.Open(filePath) if err != nil { - return err + return nil, err } dirStat := dirReader.Stat() key := path.Clean(filePath) + if !dirStat.IsDir() { + return dirStat, nil + } + fileInfos[key] = dirStat infos, err := dirReader.Readdir(0) if err != nil { - return err + return nil, err } for _, fileInfo := range infos { @@ -460,7 +483,7 @@ func (n *hdfsObjects) populateDirectoryListing(filePath string, fileInfos map[st fileInfos[filePath] = fileInfo } - return nil + return dirStat, nil } // deleteObject deletes a file path if its empty. If it's successfully deleted,