add hdfs sub-path support (#10046)

for users who don't have access to HDFS rootPath '/'
can optionally specify `minio gateway hdfs hdfs://namenode:8200/path`
for which they have access to, allowing all writes to be
performed at `/path`.

NOTE: once configured in this manner you need to make
sure command line is correctly specified, otherwise
your data might not be visible

closes #10011
master
Harshavardhana 5 years ago committed by GitHub
parent a97ce3c96e
commit 14ff7f5fcf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 97
      cmd/gateway/hdfs/gateway-hdfs.go

@ -159,6 +159,7 @@ func (g *HDFS) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error
opts.DatanodeDialFunc = dialFunc
// Not addresses found, load it from command line.
var commonPath string
if len(opts.Addresses) == 0 {
var addresses []string
for _, s := range g.args {
@ -166,6 +167,15 @@ func (g *HDFS) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error
if err != nil {
return nil, err
}
if u.Scheme != "hdfs" {
return nil, fmt.Errorf("unsupported scheme %s, only supports hdfs://", u)
}
if commonPath != "" && commonPath != u.Path {
return nil, fmt.Errorf("all namenode paths should be same %s", g.args)
}
if commonPath == "" {
commonPath = u.Path
}
addresses = append(addresses, u.Host)
}
opts.Addresses = addresses
@ -173,13 +183,13 @@ func (g *HDFS) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error
u, err := user.Current()
if err != nil {
return nil, fmt.Errorf("Unable to lookup local user: %s", err)
return nil, fmt.Errorf("unable to lookup local user: %s", err)
}
if opts.KerberosClient != nil {
opts.KerberosClient, err = getKerberosClient()
if err != nil {
return nil, fmt.Errorf("Unable to initialize kerberos client: %s", err)
return nil, fmt.Errorf("unable to initialize kerberos client: %s", err)
}
} else {
opts.User = env.Get("HADOOP_USER_NAME", u.Username)
@ -187,14 +197,14 @@ func (g *HDFS) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error
clnt, err := hdfs.NewClient(opts)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to initialize hdfsClient")
}
if err = clnt.MkdirAll(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket), os.FileMode(0755)); err != nil {
if err = clnt.MkdirAll(minio.PathJoin(commonPath, hdfsSeparator, minioMetaTmpBucket), os.FileMode(0755)); err != nil {
return nil, err
}
return &hdfsObjects{clnt: clnt, listPool: minio.NewTreeWalkPool(time.Minute * 30)}, nil
return &hdfsObjects{clnt: clnt, subPath: commonPath, listPool: minio.NewTreeWalkPool(time.Minute * 30)}, nil
}
// Production - hdfs gateway is production ready.
@ -223,6 +233,7 @@ func (n *hdfsObjects) StorageInfo(ctx context.Context, _ bool) (si minio.Storage
type hdfsObjects struct {
minio.GatewayUnsupported
clnt *hdfs.Client
subPath string
listPool *minio.TreeWalkPool
}
@ -276,14 +287,18 @@ func hdfsIsValidBucketName(bucket string) bool {
return s3utils.CheckValidBucketNameStrict(bucket) == nil
}
func (n *hdfsObjects) hdfsPathJoin(args ...string) string {
return minio.PathJoin(append([]string{n.subPath, hdfsSeparator}, args...)...)
}
func (n *hdfsObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
if !hdfsIsValidBucketName(bucket) {
return minio.BucketNameInvalid{Bucket: bucket}
}
if forceDelete {
return hdfsToObjectErr(ctx, n.clnt.RemoveAll(minio.PathJoin(hdfsSeparator, bucket)), bucket)
return hdfsToObjectErr(ctx, n.clnt.RemoveAll(n.hdfsPathJoin(bucket)), bucket)
}
return hdfsToObjectErr(ctx, n.clnt.Remove(minio.PathJoin(hdfsSeparator, bucket)), bucket)
return hdfsToObjectErr(ctx, n.clnt.Remove(n.hdfsPathJoin(bucket)), bucket)
}
func (n *hdfsObjects) MakeBucketWithLocation(ctx context.Context, bucket string, opts minio.BucketOptions) error {
@ -294,11 +309,11 @@ func (n *hdfsObjects) MakeBucketWithLocation(ctx context.Context, bucket string,
if !hdfsIsValidBucketName(bucket) {
return minio.BucketNameInvalid{Bucket: bucket}
}
return hdfsToObjectErr(ctx, n.clnt.Mkdir(minio.PathJoin(hdfsSeparator, bucket), os.FileMode(0755)), bucket)
return hdfsToObjectErr(ctx, n.clnt.Mkdir(n.hdfsPathJoin(bucket), os.FileMode(0755)), bucket)
}
func (n *hdfsObjects) GetBucketInfo(ctx context.Context, bucket string) (bi minio.BucketInfo, err error) {
fi, err := n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket))
fi, err := n.clnt.Stat(n.hdfsPathJoin(bucket))
if err != nil {
return bi, hdfsToObjectErr(ctx, err, bucket)
}
@ -336,7 +351,7 @@ func (n *hdfsObjects) ListBuckets(ctx context.Context) (buckets []minio.BucketIn
func (n *hdfsObjects) listDirFactory() minio.ListDirFunc {
// listDir - lists all the entries at a given prefix and given entry in the prefix.
listDir := func(bucket, prefixDir, prefixEntry string) (emptyDir bool, entries []string) {
f, err := n.clnt.Open(minio.PathJoin(hdfsSeparator, bucket, prefixDir))
f, err := n.clnt.Open(n.hdfsPathJoin(bucket, prefixDir))
if err != nil {
if os.IsNotExist(err) {
err = nil
@ -369,12 +384,12 @@ func (n *hdfsObjects) listDirFactory() minio.ListDirFunc {
// ListObjects lists all blobs in HDFS bucket filtered by prefix.
func (n *hdfsObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi minio.ListObjectsInfo, err error) {
if _, err := n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket)); err != nil {
if _, err := n.clnt.Stat(n.hdfsPathJoin(bucket)); err != nil {
return loi, hdfsToObjectErr(ctx, err, bucket)
}
getObjectInfo := func(ctx context.Context, bucket, entry string) (minio.ObjectInfo, error) {
fi, err := n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket, entry))
fi, err := n.clnt.Stat(n.hdfsPathJoin(bucket, entry))
if err != nil {
return minio.ObjectInfo{}, hdfsToObjectErr(ctx, err, bucket, entry)
}
@ -443,7 +458,7 @@ func (n *hdfsObjects) ListObjectsV2(ctx context.Context, bucket, prefix, continu
}
func (n *hdfsObjects) DeleteObject(ctx context.Context, bucket, object string, opts minio.ObjectOptions) (minio.ObjectInfo, error) {
err := hdfsToObjectErr(ctx, n.deleteObject(minio.PathJoin(hdfsSeparator, bucket), minio.PathJoin(hdfsSeparator, bucket, object)), bucket, object)
err := hdfsToObjectErr(ctx, n.deleteObject(n.hdfsPathJoin(bucket), n.hdfsPathJoin(bucket, object)), bucket, object)
return minio.ObjectInfo{
Bucket: bucket,
Name: object,
@ -490,7 +505,7 @@ func (n *hdfsObjects) GetObjectNInfo(ctx context.Context, bucket, object string,
}
func (n *hdfsObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (minio.ObjectInfo, error) {
cpSrcDstSame := minio.IsStringEqual(minio.PathJoin(hdfsSeparator, srcBucket, srcObject), minio.PathJoin(hdfsSeparator, dstBucket, dstObject))
cpSrcDstSame := minio.IsStringEqual(n.hdfsPathJoin(srcBucket, srcObject), n.hdfsPathJoin(dstBucket, dstObject))
if cpSrcDstSame {
return n.GetObjectInfo(ctx, srcBucket, srcObject, minio.ObjectOptions{})
}
@ -502,10 +517,10 @@ func (n *hdfsObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstB
}
func (n *hdfsObjects) GetObject(ctx context.Context, bucket, key string, startOffset, length int64, writer io.Writer, etag string, opts minio.ObjectOptions) error {
if _, err := n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket)); err != nil {
if _, err := n.clnt.Stat(n.hdfsPathJoin(bucket)); err != nil {
return hdfsToObjectErr(ctx, err, bucket)
}
rd, err := n.clnt.Open(minio.PathJoin(hdfsSeparator, bucket, key))
rd, err := n.clnt.Open(n.hdfsPathJoin(bucket, key))
if err != nil {
return hdfsToObjectErr(ctx, err, bucket, key)
}
@ -521,7 +536,7 @@ func (n *hdfsObjects) GetObject(ctx context.Context, bucket, key string, startOf
}
func (n *hdfsObjects) isObjectDir(ctx context.Context, bucket, object string) bool {
f, err := n.clnt.Open(minio.PathJoin(hdfsSeparator, bucket, object))
f, err := n.clnt.Open(n.hdfsPathJoin(bucket, object))
if err != nil {
if os.IsNotExist(err) {
return false
@ -541,7 +556,7 @@ func (n *hdfsObjects) isObjectDir(ctx context.Context, bucket, object string) bo
// GetObjectInfo reads object info and replies back ObjectInfo.
func (n *hdfsObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
_, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket))
_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
if err != nil {
return objInfo, hdfsToObjectErr(ctx, err, bucket)
}
@ -549,7 +564,7 @@ func (n *hdfsObjects) GetObjectInfo(ctx context.Context, bucket, object string,
return objInfo, hdfsToObjectErr(ctx, os.ErrNotExist, bucket, object)
}
fi, err := n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket, object))
fi, err := n.clnt.Stat(n.hdfsPathJoin(bucket, object))
if err != nil {
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
}
@ -564,27 +579,27 @@ func (n *hdfsObjects) GetObjectInfo(ctx context.Context, bucket, object string,
}
func (n *hdfsObjects) PutObject(ctx context.Context, bucket string, object string, r *minio.PutObjReader, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
_, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket))
_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
if err != nil {
return objInfo, hdfsToObjectErr(ctx, err, bucket)
}
name := minio.PathJoin(hdfsSeparator, bucket, object)
name := n.hdfsPathJoin(bucket, object)
// If its a directory create a prefix {
if strings.HasSuffix(object, hdfsSeparator) && r.Size() == 0 {
if err = n.clnt.MkdirAll(name, os.FileMode(0755)); err != nil {
n.deleteObject(minio.PathJoin(hdfsSeparator, bucket), name)
n.deleteObject(n.hdfsPathJoin(bucket), name)
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
}
} else {
tmpname := minio.PathJoin(hdfsSeparator, minioMetaTmpBucket, minio.MustGetUUID())
tmpname := n.hdfsPathJoin(minioMetaTmpBucket, minio.MustGetUUID())
var w *hdfs.FileWriter
w, err = n.clnt.Create(tmpname)
if err != nil {
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
}
defer n.deleteObject(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket), tmpname)
defer n.deleteObject(n.hdfsPathJoin(minioMetaTmpBucket), tmpname)
if _, err = io.Copy(w, r); err != nil {
w.Close()
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
@ -593,7 +608,7 @@ func (n *hdfsObjects) PutObject(ctx context.Context, bucket string, object strin
if dir != "" {
if err = n.clnt.MkdirAll(dir, os.FileMode(0755)); err != nil {
w.Close()
n.deleteObject(minio.PathJoin(hdfsSeparator, bucket), dir)
n.deleteObject(n.hdfsPathJoin(bucket), dir)
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
}
}
@ -618,13 +633,13 @@ func (n *hdfsObjects) PutObject(ctx context.Context, bucket string, object strin
}
func (n *hdfsObjects) NewMultipartUpload(ctx context.Context, bucket string, object string, opts minio.ObjectOptions) (uploadID string, err error) {
_, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket))
_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
if err != nil {
return uploadID, hdfsToObjectErr(ctx, err, bucket)
}
uploadID = minio.MustGetUUID()
if err = n.clnt.CreateEmptyFile(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket, uploadID)); err != nil {
if err = n.clnt.CreateEmptyFile(n.hdfsPathJoin(minioMetaTmpBucket, uploadID)); err != nil {
return uploadID, hdfsToObjectErr(ctx, err, bucket)
}
@ -632,7 +647,7 @@ func (n *hdfsObjects) NewMultipartUpload(ctx context.Context, bucket string, obj
}
func (n *hdfsObjects) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (lmi minio.ListMultipartsInfo, err error) {
_, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket))
_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
if err != nil {
return lmi, hdfsToObjectErr(ctx, err, bucket)
}
@ -642,7 +657,7 @@ func (n *hdfsObjects) ListMultipartUploads(ctx context.Context, bucket string, p
}
func (n *hdfsObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) (err error) {
_, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket, uploadID))
_, err = n.clnt.Stat(n.hdfsPathJoin(minioMetaTmpBucket, uploadID))
if err != nil {
return hdfsToObjectErr(ctx, err, bucket, object, uploadID)
}
@ -651,7 +666,7 @@ func (n *hdfsObjects) checkUploadIDExists(ctx context.Context, bucket, object, u
// GetMultipartInfo returns multipart info of the uploadId of the object
func (n *hdfsObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (result minio.MultipartInfo, err error) {
_, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket))
_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
if err != nil {
return result, hdfsToObjectErr(ctx, err, bucket)
}
@ -667,7 +682,7 @@ func (n *hdfsObjects) GetMultipartInfo(ctx context.Context, bucket, object, uplo
}
func (n *hdfsObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (result minio.ListPartsInfo, err error) {
_, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket))
_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
if err != nil {
return result, hdfsToObjectErr(ctx, err, bucket)
}
@ -686,13 +701,13 @@ func (n *hdfsObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject,
}
func (n *hdfsObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *minio.PutObjReader, opts minio.ObjectOptions) (info minio.PartInfo, err error) {
_, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket))
_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
if err != nil {
return info, hdfsToObjectErr(ctx, err, bucket)
}
var w *hdfs.FileWriter
w, err = n.clnt.Append(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket, uploadID))
w, err = n.clnt.Append(n.hdfsPathJoin(minioMetaTmpBucket, uploadID))
if err != nil {
return info, hdfsToObjectErr(ctx, err, bucket, object, uploadID)
}
@ -711,7 +726,7 @@ func (n *hdfsObjects) PutObjectPart(ctx context.Context, bucket, object, uploadI
}
func (n *hdfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, parts []minio.CompletePart, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
_, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket))
_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
if err != nil {
return objInfo, hdfsToObjectErr(ctx, err, bucket)
}
@ -720,7 +735,7 @@ func (n *hdfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, objec
return objInfo, err
}
name := minio.PathJoin(hdfsSeparator, bucket, object)
name := n.hdfsPathJoin(bucket, object)
dir := path.Dir(name)
if dir != "" {
if err = n.clnt.MkdirAll(dir, os.FileMode(0755)); err != nil {
@ -728,19 +743,19 @@ func (n *hdfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, objec
}
}
err = n.clnt.Rename(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket, uploadID), name)
err = n.clnt.Rename(n.hdfsPathJoin(minioMetaTmpBucket, uploadID), name)
// Object already exists is an error on HDFS
// remove it and then create it again.
if os.IsExist(err) {
if err = n.clnt.Remove(name); err != nil {
if dir != "" {
n.deleteObject(minio.PathJoin(hdfsSeparator, bucket), dir)
n.deleteObject(n.hdfsPathJoin(bucket), dir)
}
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
}
if err = n.clnt.Rename(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket, uploadID), name); err != nil {
if err = n.clnt.Rename(n.hdfsPathJoin(minioMetaTmpBucket, uploadID), name); err != nil {
if dir != "" {
n.deleteObject(minio.PathJoin(hdfsSeparator, bucket), dir)
n.deleteObject(n.hdfsPathJoin(bucket), dir)
}
return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
}
@ -765,11 +780,11 @@ func (n *hdfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, objec
}
func (n *hdfsObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) (err error) {
_, err = n.clnt.Stat(minio.PathJoin(hdfsSeparator, bucket))
_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
if err != nil {
return hdfsToObjectErr(ctx, err, bucket)
}
return hdfsToObjectErr(ctx, n.clnt.Remove(minio.PathJoin(hdfsSeparator, minioMetaTmpBucket, uploadID)), bucket, object, uploadID)
return hdfsToObjectErr(ctx, n.clnt.Remove(n.hdfsPathJoin(minioMetaTmpBucket, uploadID)), bucket, object, uploadID)
}
// IsReady returns whether the layer is ready to take requests.

Loading…
Cancel
Save