From b2cbade4776577a81df8cb9e8ba2a94b0a60961b Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 16 Oct 2017 17:20:54 -0700 Subject: [PATCH] Support creating empty directories. (#5049) Every so often we get requirements for creating directories/prefixes and we end up rejecting such requirements. This PR implements this and allows empty directories without any new file addition to backend. Existing lower APIs themselves are leveraged to provide this behavior. Only FS backend supports this for the time being as desired. --- cmd/fs-v1-helpers.go | 118 +++++++++++++++------------ cmd/fs-v1-helpers_test.go | 24 +++++- cmd/fs-v1-metadata.go | 11 ++- cmd/fs-v1.go | 94 ++++++++++++++++----- cmd/object-api-getobjectinfo_test.go | 6 +- cmd/server_test.go | 13 ++- 6 files changed, 186 insertions(+), 80 deletions(-) diff --git a/cmd/fs-v1-helpers.go b/cmd/fs-v1-helpers.go index 3a3592b7f..dc88c9964 100644 --- a/cmd/fs-v1-helpers.go +++ b/cmd/fs-v1-helpers.go @@ -37,12 +37,7 @@ func fsRemoveFile(filePath string) (err error) { } if err = os.Remove((filePath)); err != nil { - if os.IsNotExist(err) { - return traceError(errFileNotFound) - } else if os.IsPermission(err) { - return traceError(errFileAccessDenied) - } - return traceError(err) + return osErrToFSFileErr(err) } return nil @@ -94,6 +89,24 @@ func fsRemoveDir(dirPath string) (err error) { return nil } +// Creates a new directory, parent dir is also recursively created +// if it doesn't exist. +func fsMkdirAll(dirPath string) (err error) { + if dirPath == "" { + return traceError(errInvalidArgument) + } + + if err = checkPathLength(dirPath); err != nil { + return traceError(err) + } + + if err = os.MkdirAll(dirPath, 0777); err != nil { + return traceError(err) + } + + return nil +} + // Creates a new directory, parent dir should exist // otherwise returns an error. If directory already // exists returns an error. Windows long paths @@ -126,6 +139,11 @@ func fsMkdir(dirPath string) (err error) { return nil } +// fsStat is a low level call which validates input arguments +// and checks input length upto supported maximum. Does +// not perform any higher layer interpretation of files v/s +// directories. For higher level interpretation look at +// fsStatFileDir, fsStatFile, fsStatDir. func fsStat(statLoc string) (os.FileInfo, error) { if statLoc == "" { return nil, traceError(errInvalidArgument) @@ -141,10 +159,9 @@ func fsStat(statLoc string) (os.FileInfo, error) { return fi, nil } -// Lookup if directory exists, returns directory -// attributes upon success. -func fsStatDir(statDir string) (os.FileInfo, error) { - fi, err := fsStat(statDir) +// Lookup if volume exists, returns volume attributes upon success. +func fsStatVolume(volume string) (os.FileInfo, error) { + fi, err := fsStat(volume) if err != nil { err = errorCause(err) if os.IsNotExist(err) { @@ -162,21 +179,47 @@ func fsStatDir(statDir string) (os.FileInfo, error) { return fi, nil } -// Lookup if file exists, returns file attributes upon success +// Is a one place function which converts all os.PathError +// into a more FS object layer friendly form, converts +// known errors into their typed form for top level +// interpretation. +func osErrToFSFileErr(err error) error { + if err == nil { + return nil + } + err = errorCause(err) + if os.IsNotExist(err) { + return traceError(errFileNotFound) + } + if os.IsPermission(err) { + return traceError(errFileAccessDenied) + } + if isSysErrNotDir(err) { + return traceError(errFileAccessDenied) + } + if isSysErrPathNotFound(err) { + return traceError(errFileNotFound) + } + return err +} + +// Lookup if directory exists, returns directory attributes upon success. +func fsStatDir(statDir string) (os.FileInfo, error) { + fi, err := fsStat(statDir) + if err != nil { + return nil, osErrToFSFileErr(err) + } + if !fi.IsDir() { + return nil, traceError(errFileAccessDenied) + } + return fi, nil +} + +// Lookup if file exists, returns file attributes upon success. func fsStatFile(statFile string) (os.FileInfo, error) { fi, err := fsStat(statFile) if err != nil { - err = errorCause(err) - if os.IsNotExist(err) { - return nil, traceError(errFileNotFound) - } else if os.IsPermission(err) { - return nil, traceError(errFileAccessDenied) - } else if isSysErrNotDir(err) { - return nil, traceError(errFileAccessDenied) - } else if isSysErrPathNotFound(err) { - return nil, traceError(errFileNotFound) - } - return nil, traceError(err) + return nil, osErrToFSFileErr(err) } if fi.IsDir() { return nil, traceError(errFileAccessDenied) @@ -196,18 +239,7 @@ func fsOpenFile(readPath string, offset int64) (io.ReadCloser, int64, error) { fr, err := os.Open((readPath)) if err != nil { - if os.IsNotExist(err) { - return nil, 0, traceError(errFileNotFound) - } else if os.IsPermission(err) { - return nil, 0, traceError(errFileAccessDenied) - } else if isSysErrNotDir(err) { - // File path cannot be verified since one of the parents is a file. - return nil, 0, traceError(errFileAccessDenied) - } else if isSysErrPathNotFound(err) { - // Add specific case for windows. - return nil, 0, traceError(errFileNotFound) - } - return nil, 0, traceError(err) + return nil, 0, osErrToFSFileErr(err) } // Stat to get the size of the file at path. @@ -253,11 +285,7 @@ func fsCreateFile(filePath string, reader io.Reader, buf []byte, fallocSize int6 writer, err := os.OpenFile((filePath), os.O_CREATE|os.O_WRONLY, 0666) if err != nil { - // File path cannot be verified since one of the parents is a file. - if isSysErrNotDir(err) { - return 0, traceError(errFileAccessDenied) - } - return 0, err + return 0, osErrToFSFileErr(err) } defer writer.Close() @@ -346,17 +374,7 @@ func fsRenameFile(sourcePath, destPath string) error { } // Verify if source path exists. if _, err := os.Stat((sourcePath)); err != nil { - if os.IsNotExist(err) { - return traceError(errFileNotFound) - } else if os.IsPermission(err) { - return traceError(errFileAccessDenied) - } else if isSysErrPathNotFound(err) { - return traceError(errFileNotFound) - } else if isSysErrNotDir(err) { - // File path cannot be verified since one of the parents is a file. - return traceError(errFileAccessDenied) - } - return traceError(err) + return osErrToFSFileErr(err) } if err := os.MkdirAll(pathutil.Dir(destPath), 0777); err != nil { return traceError(err) diff --git a/cmd/fs-v1-helpers_test.go b/cmd/fs-v1-helpers_test.go index 1c85b1d7e..cb94cd79b 100644 --- a/cmd/fs-v1-helpers_test.go +++ b/cmd/fs-v1-helpers_test.go @@ -27,6 +27,28 @@ import ( "github.com/minio/minio/pkg/lock" ) +// Tests - fsMkdirAll() +func TestFSMkdirAll(t *testing.T) { + // create posix test setup + _, path, err := newPosixTestSetup() + if err != nil { + t.Fatalf("Unable to create posix test setup, %s", err) + } + defer os.RemoveAll(path) + + if err = fsMkdirAll(""); errorCause(err) != errInvalidArgument { + t.Fatal("Unexpected error", err) + } + + if err = fsMkdirAll(pathJoin(path, "my-obj-del-0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001")); errorCause(err) != errFileNameTooLong { + t.Fatal("Unexpected error", err) + } + + if err = fsMkdirAll(pathJoin(path, "success-vol", "success-object")); err != nil { + t.Fatal("Unexpected error", err) + } +} + func TestFSRenameFile(t *testing.T) { // create posix test setup _, path, err := newPosixTestSetup() @@ -173,7 +195,7 @@ func TestFSStats(t *testing.T) { t.Fatalf("TestPosix case %d: Expected: \"%s\", got: \"%s\"", i+1, testCase.expectedErr, err) } } else { - if _, err := fsStatDir(pathJoin(testCase.srcFSPath, testCase.srcVol)); errorCause(err) != testCase.expectedErr { + if _, err := fsStatVolume(pathJoin(testCase.srcFSPath, testCase.srcVol)); errorCause(err) != testCase.expectedErr { t.Fatalf("TestPosix case %d: Expected: \"%s\", got: \"%s\"", i+1, testCase.expectedErr, err) } } diff --git a/cmd/fs-v1-metadata.go b/cmd/fs-v1-metadata.go index 5519da896..7296b4579 100644 --- a/cmd/fs-v1-metadata.go +++ b/cmd/fs-v1-metadata.go @@ -91,6 +91,11 @@ func (m fsMetaV1) ToObjectInfo(bucket, object string, fi os.FileInfo) ObjectInfo } } + if hasSuffix(object, slashSeparator) { + m.Meta["etag"] = emptyETag // For directories etag is d41d8cd98f00b204e9800998ecf8427e + m.Meta["content-type"] = "application/octet-stream" + } + objInfo := ObjectInfo{ Bucket: bucket, Name: object, @@ -101,7 +106,11 @@ func (m fsMetaV1) ToObjectInfo(bucket, object string, fi os.FileInfo) ObjectInfo if fi != nil { objInfo.ModTime = fi.ModTime() objInfo.Size = fi.Size() - objInfo.IsDir = fi.IsDir() + if fi.IsDir() { + // Directory is always 0 bytes in S3 API, treat it as such. + objInfo.Size = 0 + objInfo.IsDir = fi.IsDir() + } } // Extract etag from metadata. diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index f23ecdc67..c9c9aed8c 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -201,7 +201,7 @@ func (fs fsObjects) statBucketDir(bucket string) (os.FileInfo, error) { if err != nil { return nil, err } - st, err := fsStatDir(bucketDir) + st, err := fsStatVolume(bucketDir) if err != nil { return nil, err } @@ -255,7 +255,7 @@ func (fs fsObjects) ListBuckets() ([]BucketInfo, error) { continue } var fi os.FileInfo - fi, err = fsStatDir(pathJoin(fs.fsPath, entry)) + fi, err = fsStatVolume(pathJoin(fs.fsPath, entry)) // There seems like no practical reason to check for errors // at this point, if there are indeed errors we can simply // just ignore such buckets and list only those which @@ -379,7 +379,7 @@ func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string // startOffset indicates the starting read location of the object. // length indicates the total length of the object. func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer) (err error) { - if err = checkGetObjArgs(bucket, object); err != nil { + if err = checkBucketAndObjectNamesFS(bucket, object); err != nil { return err } @@ -397,6 +397,12 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, return toObjectErr(traceError(errUnexpected), bucket, object) } + // If its a directory request, we return an empty body. + if hasSuffix(object, slashSeparator) { + _, err = writer.Write([]byte("")) + return toObjectErr(traceError(err), bucket, object) + } + if bucket != minioMetaBucket { fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fsMetaJSONFile) _, err = fs.rwPool.Open(fsMetaPath) @@ -440,6 +446,15 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, // getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo. func (fs fsObjects) getObjectInfo(bucket, object string) (oi ObjectInfo, e error) { fsMeta := fsMetaV1{} + if hasSuffix(object, slashSeparator) { + // Directory call needs to arrive with object ending with "/". + fi, err := fsStatDir(pathJoin(fs.fsPath, bucket, object)) + if err != nil { + return oi, toObjectErr(err, bucket, object) + } + return fsMeta.ToObjectInfo(bucket, object, fi), nil + } + fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fsMetaJSONFile) // Read `fs.json` to perhaps contend with @@ -472,9 +487,25 @@ func (fs fsObjects) getObjectInfo(bucket, object string) (oi ObjectInfo, e error return fsMeta.ToObjectInfo(bucket, object, fi), nil } +// Checks bucket and object name validity, returns nil if both are valid. +func checkBucketAndObjectNamesFS(bucket, object string) error { + // Verify if bucket is valid. + if !IsValidBucketName(bucket) { + return traceError(BucketNameInvalid{Bucket: bucket}) + } + // Verify if object is valid. + if len(object) == 0 { + return traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) + } + if !IsValidObjectPrefix(object) { + return traceError(ObjectNameInvalid{Bucket: bucket, Object: object}) + } + return nil +} + // GetObjectInfo - reads object metadata and replies back ObjectInfo. func (fs fsObjects) GetObjectInfo(bucket, object string) (oi ObjectInfo, e error) { - if err := checkGetObjArgs(bucket, object); err != nil { + if err := checkBucketAndObjectNamesFS(bucket, object); err != nil { return oi, err } @@ -510,6 +541,11 @@ func (fs fsObjects) parentDirIsObject(bucket, parent string) bool { // Additionally writes `fs.json` which carries the necessary metadata // for future object operations. func (fs fsObjects) PutObject(bucket string, object string, data *HashReader, metadata map[string]string) (objInfo ObjectInfo, retErr error) { + // No metadata is set, allocate a new one. + if metadata == nil { + metadata = make(map[string]string) + } + var err error // Validate if bucket name is valid and exists. @@ -517,6 +553,9 @@ func (fs fsObjects) PutObject(bucket string, object string, data *HashReader, me return ObjectInfo{}, toObjectErr(err, bucket) } + fsMeta := newFSMetaV1() + fsMeta.Meta = metadata + // This is a special case with size as '0' and object ends // with a slash separator, we treat it like a valid operation // and return success. @@ -525,7 +564,14 @@ func (fs fsObjects) PutObject(bucket string, object string, data *HashReader, me if fs.parentDirIsObject(bucket, path.Dir(object)) { return ObjectInfo{}, toObjectErr(traceError(errFileAccessDenied), bucket, object) } - return dirObjectInfo(bucket, object, data.Size(), metadata), nil + if err = fsMkdirAll(pathJoin(fs.fsPath, bucket, object)); err != nil { + return ObjectInfo{}, toObjectErr(err, bucket, object) + } + var fi os.FileInfo + if fi, err = fsStatDir(pathJoin(fs.fsPath, bucket, object)); err != nil { + return ObjectInfo{}, toObjectErr(err, bucket, object) + } + return fsMeta.ToObjectInfo(bucket, object, fi), nil } if err = checkPutObjectArgs(bucket, object, fs); err != nil { @@ -542,14 +588,6 @@ func (fs fsObjects) PutObject(bucket string, object string, data *HashReader, me return ObjectInfo{}, traceError(errInvalidArgument) } - // No metadata is set, allocate a new one. - if metadata == nil { - metadata = make(map[string]string) - } - - fsMeta := newFSMetaV1() - fsMeta.Meta = metadata - var wlk *lock.LockedFile if bucket != minioMetaBucket { bucketMetaDir := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix) @@ -632,7 +670,7 @@ func (fs fsObjects) PutObject(bucket string, object string, data *HashReader, me // DeleteObject - deletes an object from a bucket, this operation is destructive // and there are no rollbacks supported. func (fs fsObjects) DeleteObject(bucket, object string) error { - if err := checkDelObjArgs(bucket, object); err != nil { + if err := checkBucketAndObjectNamesFS(bucket, object); err != nil { return err } @@ -779,18 +817,30 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey // Convert entry to ObjectInfo entryToObjectInfo := func(entry string) (objInfo ObjectInfo, err error) { - if hasSuffix(entry, slashSeparator) { - // Object name needs to be full path. - objInfo.Name = entry - objInfo.IsDir = true - return - } - - // Protect reading `fs.json`. + // Protect the entry from concurrent deletes, or renames. objectLock := globalNSMutex.NewNSLock(bucket, entry) if err = objectLock.GetRLock(globalListingTimeout); err != nil { return ObjectInfo{}, err } + + if hasSuffix(entry, slashSeparator) { + var fi os.FileInfo + fi, err = fsStatDir(pathJoin(fs.fsPath, bucket, entry)) + objectLock.RUnlock() + if err != nil { + return objInfo, err + } + // Success. + return ObjectInfo{ + // Object name needs to be full path. + Name: entry, + Bucket: bucket, + Size: fi.Size(), + ModTime: fi.ModTime(), + IsDir: fi.IsDir(), + }, nil + } + var etag string etag, err = fs.getObjectETag(bucket, entry) objectLock.RUnlock() diff --git a/cmd/object-api-getobjectinfo_test.go b/cmd/object-api-getobjectinfo_test.go index bcee41199..3b6fae5c3 100644 --- a/cmd/object-api-getobjectinfo_test.go +++ b/cmd/object-api-getobjectinfo_test.go @@ -1,5 +1,5 @@ /* - * Minio Cloud Storage, (C) 2016 Minio, Inc. + * Minio Cloud Storage, (C) 2016, 2017 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,9 +67,7 @@ func testGetObjectInfo(obj ObjectLayer, instanceType string, t TestErrHandler) { {"test-getobjectinfo", "Africa", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Africa"}, false}, {"test-getobjectinfo", "Antartica", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Antartica"}, false}, {"test-getobjectinfo", "Asia/myfile", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Asia/myfile"}, false}, - // Test case with existing bucket but object name set to a directory (Test number 12). - {"test-getobjectinfo", "Asia/", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Asia/"}, false}, - // Valid case with existing object (Test number 14). + // Valid case with existing object (Test number 12). {"test-getobjectinfo", "Asia/asiapics.jpg", resultCases[0], nil, true}, } for i, testCase := range testCases { diff --git a/cmd/server_test.go b/cmd/server_test.go index 6442660e0..e75ebc152 100644 --- a/cmd/server_test.go +++ b/cmd/server_test.go @@ -27,6 +27,7 @@ import ( "math/rand" "net/http" "net/url" + "os" "reflect" "strings" "sync" @@ -43,6 +44,7 @@ type TestSuiteCommon struct { endPoint string accessKey string secretKey string + configPath string signer signerType secure bool transport *http.Transport @@ -56,7 +58,7 @@ type check struct { // Assert - checks if gotValue is same as expectedValue, if not fails the test. func (c *check) Assert(gotValue interface{}, expectedValue interface{}) { if !reflect.DeepEqual(gotValue, expectedValue) { - c.Fatalf("Test %s: expected %v, got %v", c.testType, expectedValue, gotValue) + c.Fatalf("Test %s:%s expected %v, got %v", getSource(), c.testType, expectedValue, gotValue) } } @@ -74,7 +76,9 @@ func verifyError(c *check, response *http.Response, code, description string, st func runAllTests(suite *TestSuiteCommon, c *check) { suite.SetUpSuite(c) suite.TestBucketSQSNotificationWebHook(c) - suite.TestObjectDir(c) + if suite.serverType == "XL" { + suite.TestObjectDir(c) + } suite.TestBucketSQSNotificationAMQP(c) suite.TestBucketPolicy(c) suite.TestDeleteBucket(c) @@ -140,6 +144,9 @@ func TestServerSuite(t *testing.T) { // Setting up the test suite. // Starting the Test server with temporary FS backend. func (s *TestSuiteCommon) SetUpSuite(c *check) { + rootPath, err := newTestConfig("us-east-1") + c.Assert(err, nil) + if s.secure { cert, key, err := generateTLSCertKey("127.0.0.1") c.Assert(err, nil) @@ -163,10 +170,12 @@ func (s *TestSuiteCommon) SetUpSuite(c *check) { s.endPoint = s.testServer.Server.URL s.accessKey = s.testServer.AccessKey s.secretKey = s.testServer.SecretKey + s.configPath = rootPath } // Called implicitly by "gopkg.in/check.v1" after all tests are run. func (s *TestSuiteCommon) TearDownSuite(c *check) { + os.RemoveAll(s.configPath) s.testServer.Stop() }