diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index ee224f97c..accee9d09 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -440,7 +440,7 @@ func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string // startOffset indicates the starting read location of the object. // length indicates the total length of the object. func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer, etag string) (err error) { - if err = checkBucketAndObjectNamesFS(bucket, object); err != nil { + if err = checkGetObjArgs(bucket, object); err != nil { return err } @@ -572,22 +572,6 @@ func (fs fsObjects) getObjectInfo(bucket, object string) (oi ObjectInfo, e error return fsMeta.ToObjectInfo(bucket, object, fi), nil } -// Checks bucket and object name validity, returns nil if both are valid. -func checkBucketAndObjectNamesFS(bucket, object string) error { - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return errors.Trace(BucketNameInvalid{Bucket: bucket}) - } - // Verify if object is valid. - if len(object) == 0 { - return errors.Trace(ObjectNameInvalid{Bucket: bucket, Object: object}) - } - if !IsValidObjectPrefix(object) { - return errors.Trace(ObjectNameInvalid{Bucket: bucket, Object: object}) - } - return nil -} - // GetObjectInfo - reads object metadata and replies back ObjectInfo. func (fs fsObjects) GetObjectInfo(bucket, object string) (oi ObjectInfo, e error) { // Lock the object before reading. @@ -597,7 +581,7 @@ func (fs fsObjects) GetObjectInfo(bucket, object string) (oi ObjectInfo, e error } defer objectLock.RUnlock() - if err := checkBucketAndObjectNamesFS(bucket, object); err != nil { + if err := checkGetObjArgs(bucket, object); err != nil { return oi, err } @@ -775,7 +759,7 @@ func (fs fsObjects) DeleteObject(bucket, object string) error { } defer objectLock.Unlock() - if err := checkBucketAndObjectNamesFS(bucket, object); err != nil { + if err := checkDelObjArgs(bucket, object); err != nil { return err } diff --git a/cmd/object-api-common.go b/cmd/object-api-common.go index 64c5541e0..4d758a85a 100644 --- a/cmd/object-api-common.go +++ b/cmd/object-api-common.go @@ -17,6 +17,7 @@ package cmd import ( + "path" "sync" humanize "github.com/dustin/go-humanize" @@ -221,6 +222,11 @@ func cleanupDir(storage StorageAPI, volume, dirPath string) error { return errors.Trace(err) } // else on success.. + // Entry path is empty, just delete it. + if len(entries) == 0 { + return errors.Trace(storage.DeleteFile(volume, path.Clean(entryPath))) + } + // Recurse and delete all other entries. for _, entry := range entries { if err = delFunc(pathJoin(entryPath, entry)); err != nil { diff --git a/cmd/object-api-input-checks.go b/cmd/object-api-input-checks.go index a23d065f0..40431037f 100644 --- a/cmd/object-api-input-checks.go +++ b/cmd/object-api-input-checks.go @@ -38,11 +38,10 @@ func checkBucketAndObjectNames(bucket, object string) error { return errors.Trace(BucketNameInvalid{Bucket: bucket}) } // Verify if object is valid. - if !IsValidObjectName(object) { - // Objects with "/" are invalid, verify to return a different error. - if hasSuffix(object, slashSeparator) || hasPrefix(object, slashSeparator) { - return errors.Trace(ObjectNotFound{Bucket: bucket, Object: object}) - } + if len(object) == 0 { + return errors.Trace(ObjectNameInvalid{Bucket: bucket, Object: object}) + } + if !IsValidObjectPrefix(object) { return errors.Trace(ObjectNameInvalid{Bucket: bucket, Object: object}) } return nil diff --git a/cmd/posix-utils_nix.go b/cmd/posix-utils_nix.go index cb4b736b2..f62a4aef2 100644 --- a/cmd/posix-utils_nix.go +++ b/cmd/posix-utils_nix.go @@ -21,5 +21,5 @@ package cmd // isValidVolname verifies a volname name in accordance with object // layer requirements. func isValidVolname(volname string) bool { - return !(len(volname) < 3 || len(volname) > 63) + return !(len(volname) < 3) } diff --git a/cmd/posix-utils_test.go b/cmd/posix-utils_test.go index b0b2cbbe7..a6428139a 100644 --- a/cmd/posix-utils_test.go +++ b/cmd/posix-utils_test.go @@ -50,6 +50,7 @@ func TestIsValidVolname(t *testing.T) { {"tHIS-ENDS-WITH-UPPERCASE", true}, {"ThisBeginsAndEndsWithUpperCase", true}, {"una ñina", true}, + {"lalalallalallalalalallalallalala-theString-size-is-greater-than-64", true}, // cases for which test should fail. // passing invalid bucket names. {"", false}, @@ -58,7 +59,6 @@ func TestIsValidVolname(t *testing.T) { {"ab", false}, {"ab/", true}, {"......", true}, - {"lalalallalallalalalallalallalala-theString-size-is-greater-than-64", false}, } for i, testCase := range testCases { diff --git a/cmd/posix-utils_windows.go b/cmd/posix-utils_windows.go index 0c98cd7b2..f1ac25397 100644 --- a/cmd/posix-utils_windows.go +++ b/cmd/posix-utils_windows.go @@ -25,7 +25,7 @@ import ( // isValidVolname verifies a volname name in accordance with object // layer requirements. func isValidVolname(volname string) bool { - if len(volname) < 3 || len(volname) > 63 { + if len(volname) < 3 { return false } // Volname shouldn't have reserved characters on windows in it. diff --git a/cmd/posix.go b/cmd/posix.go index 3949e8b19..4fe17f24e 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -280,18 +280,21 @@ func (s *posix) MakeVol(volume string) (err error) { if err != nil { return err } - // Make a volume entry, with mode 0777 mkdir honors system umask. - err = os.Mkdir((volumeDir), 0777) - if err != nil { - if os.IsExist(err) { - return errVolumeExists - } else if os.IsPermission(err) { + + if _, err := os.Stat(volumeDir); err != nil { + // Volume does not exist we proceed to create. + if os.IsNotExist(err) { + // Make a volume entry, with mode 0777 mkdir honors system umask. + err = os.MkdirAll(volumeDir, 0777) + } + if os.IsPermission(err) { return errDiskAccessDenied } return err } - // Success - return nil + + // Stat succeeds we return errVolumeExists. + return errVolumeExists } // ListVols - list volumes. @@ -381,7 +384,7 @@ func (s *posix) StatVol(volume string) (volInfo VolInfo, err error) { } // Stat a volume entry. var st os.FileInfo - st, err = os.Stat((volumeDir)) + st, err = os.Stat(volumeDir) if err != nil { if os.IsNotExist(err) { return VolInfo{}, errVolumeNotFound diff --git a/cmd/posix_test.go b/cmd/posix_test.go index 1e2ff2977..42b088705 100644 --- a/cmd/posix_test.go +++ b/cmd/posix_test.go @@ -861,7 +861,7 @@ func TestPosixDeleteFile(t *testing.T) { // TestPosix case - 4. // TestPosix case with segment of the volume name > 255. { - srcVol: "my-obj-del-0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001", + srcVol: "my", srcPath: "success-file", ioErrCnt: 0, expectedErr: errInvalidArgument, diff --git a/cmd/server_test.go b/cmd/server_test.go index 533429600..0c8146d3b 100644 --- a/cmd/server_test.go +++ b/cmd/server_test.go @@ -76,9 +76,7 @@ func verifyError(c *check, response *http.Response, code, description string, st func runAllTests(suite *TestSuiteCommon, c *check) { suite.SetUpSuite(c) suite.TestBucketSQSNotificationWebHook(c) - if suite.serverType == "XL" { - suite.TestObjectDir(c) - } + suite.TestObjectDir(c) suite.TestBucketSQSNotificationAMQP(c) suite.TestBucketPolicy(c) suite.TestDeleteBucket(c) @@ -260,7 +258,7 @@ func (s *TestSuiteCommon) TestObjectDir(c *check) { response, err = client.Do(request) c.Assert(err, nil) - c.Assert(response.StatusCode, http.StatusNotFound) + c.Assert(response.StatusCode, http.StatusOK) request, err = newTestSignedRequest("GET", getGetObjectURL(s.endPoint, bucketName, "my-object-directory/"), 0, nil, s.accessKey, s.secretKey, s.signer) @@ -271,7 +269,7 @@ func (s *TestSuiteCommon) TestObjectDir(c *check) { response, err = client.Do(request) c.Assert(err, nil) - c.Assert(response.StatusCode, http.StatusNotFound) + c.Assert(response.StatusCode, http.StatusOK) request, err = newTestSignedRequest("DELETE", getDeleteObjectURL(s.endPoint, bucketName, "my-object-directory/"), 0, nil, s.accessKey, s.secretKey, s.signer) @@ -638,7 +636,7 @@ func (s *TestSuiteCommon) TestDeleteObject(c *check) { // assert the status of http response. c.Assert(response.StatusCode, http.StatusOK) - // object name was "prefix/myobject", an attempt to delelte "prefix" + // object name was "prefix/myobject", an attempt to delete "prefix" // Should not delete "prefix/myobject" request, err = newTestSignedRequest("DELETE", getDeleteObjectURL(s.endPoint, bucketName, "prefix"), 0, nil, s.accessKey, s.secretKey, s.signer) diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 2bd8dd0f6..bbe37d9ab 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -33,6 +33,29 @@ import ( // list all errors which can be ignored in object operations. var objectOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied) +// putObjectDir hints the bottom layer to create a new directory. +func (xl xlObjects) putObjectDir(bucket, object string, writeQuorum int) error { + var wg = &sync.WaitGroup{} + + errs := make([]error, len(xl.storageDisks)) + // Prepare object creation in all disks + for index, disk := range xl.storageDisks { + if disk == nil { + continue + } + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + if err := disk.MakeVol(pathJoin(bucket, object)); err != nil && err != errVolumeExists { + errs[index] = err + } + }(index, disk) + } + wg.Wait() + + return reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, writeQuorum) +} + // prepareFile hints the bottom layer to optimize the creation of a new object func (xl xlObjects) prepareFile(bucket, object string, size int64, onlineDisks []StorageAPI, blockSize int64, dataBlocks, writeQuorum int) error { pErrs := make([]error, len(onlineDisks)) @@ -204,6 +227,12 @@ func (xl xlObjects) getObject(bucket, object string, startOffset int64, length i return errors.Trace(errUnexpected) } + // If its a directory request, we return an empty body. + if hasSuffix(object, slashSeparator) { + _, err := writer.Write([]byte("")) + return toObjectErr(errors.Trace(err), bucket, object) + } + // Read metadata associated with the object from all disks. metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object) @@ -354,6 +383,42 @@ func (xl xlObjects) getObject(bucket, object string, startOffset int64, length i return nil } +// getObjectInfoDir - This getObjectInfo is specific to object directory lookup. +func (xl xlObjects) getObjectInfoDir(bucket, object string) (oi ObjectInfo, err error) { + var wg = &sync.WaitGroup{} + + errs := make([]error, len(xl.storageDisks)) + // Prepare object creation in a all disks + for index, disk := range xl.storageDisks { + if disk == nil { + continue + } + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + if _, err := disk.StatVol(pathJoin(bucket, object)); err != nil { + // Since we are re-purposing StatVol, an object which + // is a directory if it doesn't exist should be + // returned as errFileNotFound instead, convert + // the error right here accordingly. + if err == errVolumeNotFound { + err = errFileNotFound + } else if err == errVolumeAccessDenied { + err = errFileAccessDenied + } + + // Save error to reduce it later + errs[index] = err + } + }(index, disk) + } + + wg.Wait() + + readQuorum := len(xl.storageDisks) / 2 + return dirObjectInfo(bucket, object, 0, map[string]string{}), reduceReadQuorumErrs(errs, objectOpIgnoredErrs, readQuorum) +} + // GetObjectInfo - reads object metadata and replies back ObjectInfo. func (xl xlObjects) GetObjectInfo(bucket, object string) (oi ObjectInfo, e error) { // Lock the object before reading. @@ -367,15 +432,21 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (oi ObjectInfo, e error return oi, err } + if hasSuffix(object, slashSeparator) { + return xl.getObjectInfoDir(bucket, object) + } + info, err := xl.getObjectInfo(bucket, object) if err != nil { return oi, toObjectErr(err, bucket, object) } + return info, nil } // getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo. func (xl xlObjects) getObjectInfo(bucket, object string) (objInfo ObjectInfo, err error) { + // Extracts xlStat and xlMetaMap. xlStat, xlMetaMap, err := xl.readXLMetaStat(bucket, object) if err != nil { @@ -508,6 +579,26 @@ func (xl xlObjects) PutObject(bucket string, object string, data *hash.Reader, m // putObject wrapper for xl PutObject func (xl xlObjects) putObject(bucket string, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) { + uniqueID := mustGetUUID() + tempObj := uniqueID + + // No metadata is set, allocate a new one. + if metadata == nil { + metadata = make(map[string]string) + } + + // Get parity and data drive count based on storage class metadata + dataDrives, parityDrives := getRedundancyCount(metadata[amzStorageClass], len(xl.storageDisks)) + + // we now know the number of blocks this object needs for data and parity. + // writeQuorum is dataBlocks + 1 + writeQuorum := dataDrives + 1 + + // Delete temporary object in the event of failure. + // If PutObject succeeded there would be no temporary + // object to delete. + defer xl.deleteObject(minioMetaTmpBucket, tempObj) + // This is a special case with size as '0' and object ends with // a slash separator, we treat it like a valid operation and // return success. @@ -518,6 +609,16 @@ func (xl xlObjects) putObject(bucket string, object string, data *hash.Reader, m if xl.parentDirIsObject(bucket, path.Dir(object)) { return ObjectInfo{}, toObjectErr(errors.Trace(errFileAccessDenied), bucket, object) } + + if err = xl.putObjectDir(minioMetaTmpBucket, tempObj, writeQuorum); err != nil { + return ObjectInfo{}, toObjectErr(errors.Trace(err), bucket, object) + } + + // Rename the successfully written temporary object to final location. + if _, err = renameObject(xl.storageDisks, minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil { + return ObjectInfo{}, toObjectErr(err, bucket, object) + } + return dirObjectInfo(bucket, object, data.Size(), metadata), nil } @@ -538,14 +639,6 @@ func (xl xlObjects) putObject(bucket string, object string, data *hash.Reader, m return ObjectInfo{}, toObjectErr(errors.Trace(errFileAccessDenied), bucket, object) } - // No metadata is set, allocate a new one. - if metadata == nil { - metadata = make(map[string]string) - } - - uniqueID := mustGetUUID() - tempObj := uniqueID - // Limit the reader to its provided size if specified. var reader io.Reader = data @@ -569,12 +662,6 @@ func (xl xlObjects) putObject(bucket string, object string, data *hash.Reader, m } } } - // Get parity and data drive count based on storage class metadata - dataDrives, parityDrives := getRedundancyCount(metadata[amzStorageClass], len(xl.storageDisks)) - - // we now know the number of blocks this object needs for data and parity. - // writeQuorum is dataBlocks + 1 - writeQuorum := dataDrives + 1 // Initialize parts metadata partsMetadata := make([]xlMetaV1, len(xl.storageDisks)) @@ -589,11 +676,6 @@ func (xl xlObjects) putObject(bucket string, object string, data *hash.Reader, m // Order disks according to erasure distribution onlineDisks := shuffleDisks(xl.storageDisks, partsMetadata[0].Erasure.Distribution) - // Delete temporary object in the event of failure. - // If PutObject succeeded there would be no temporary - // object to delete. - defer xl.deleteObject(minioMetaTmpBucket, tempObj) - // Total size of the written object var sizeWritten int64 @@ -741,13 +823,20 @@ func (xl xlObjects) deleteObject(bucket, object string) error { // Initialize sync waitgroup. var wg = &sync.WaitGroup{} - // Read metadata associated with the object from all disks. - metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object) + var writeQuorum int + var err error + // If its a directory request, no need to read metadata. + if !hasSuffix(object, slashSeparator) { + // Read metadata associated with the object from all disks. + metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object) - // get Quorum for this object - _, writeQuorum, err := objectQuorumFromMeta(xl, metaArr, errs) - if err != nil { - return err + // get Quorum for this object + _, writeQuorum, err = objectQuorumFromMeta(xl, metaArr, errs) + if err != nil { + return err + } + } else { + writeQuorum = len(xl.storageDisks)/2 + 1 } // Initialize list of errors. @@ -789,14 +878,20 @@ func (xl xlObjects) DeleteObject(bucket, object string) (err error) { return err } + if hasSuffix(object, slashSeparator) { + // Delete the object on all disks. + if err = xl.deleteObject(bucket, object); err != nil { + return toObjectErr(err, bucket, object) + } + } + // Validate object exists. if !xl.isObject(bucket, object) { return errors.Trace(ObjectNotFound{bucket, object}) } // else proceed to delete the object. // Delete the object on all disks. - err = xl.deleteObject(bucket, object) - if err != nil { + if err = xl.deleteObject(bucket, object); err != nil { return toObjectErr(err, bucket, object) } diff --git a/cmd/xl-v1-object_test.go b/cmd/xl-v1-object_test.go index 6dfd96ff4..2601851ec 100644 --- a/cmd/xl-v1-object_test.go +++ b/cmd/xl-v1-object_test.go @@ -75,8 +75,6 @@ func TestXLDeleteObjectBasic(t *testing.T) { {"----", "obj", BucketNameInvalid{Bucket: "----"}}, {"bucket", "", ObjectNameInvalid{Bucket: "bucket", Object: ""}}, {"bucket", "doesnotexist", ObjectNotFound{Bucket: "bucket", Object: "doesnotexist"}}, - {"bucket", "obj/", ObjectNotFound{Bucket: "bucket", Object: "obj/"}}, - {"bucket", "/obj", ObjectNotFound{Bucket: "bucket", Object: "/obj"}}, {"bucket", "obj", nil}, }