diff --git a/object-api-getobjectinfo_test.go b/object-api-getobjectinfo_test.go index 82a473e89..784c6170a 100644 --- a/object-api-getobjectinfo_test.go +++ b/object-api-getobjectinfo_test.go @@ -86,7 +86,7 @@ func TestGetObjectInfo(t *testing.T) { {"test-getobjectinfo", "Antartica", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Antartica"}, false}, {"test-getobjectinfo", "Asia/myfile", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Asia/myfile"}, false}, // Test case with existing bucket but object name set to a directory (Test number 13). - {"test-getobjectinfo", "Asia", ObjectInfo{}, ObjectNotFound{Bucket: "test-getobjectinfo", Object: "Asia"}, false}, + {"test-getobjectinfo", "Asia", ObjectInfo{}, ObjectExistsAsPrefix{Bucket: "test-getobjectinfo", Object: "Asia"}, false}, // Valid case with existing object (Test number 14). {"test-getobjectinfo", "Asia/asiapics.jpg", resultCases[0], nil, true}, } diff --git a/object-api_test.go b/object-api_test.go index 4292e8f90..031c2af01 100644 --- a/object-api_test.go +++ b/object-api_test.go @@ -29,7 +29,7 @@ var _ = Suite(&MySuite{}) func (s *MySuite) TestAPISuite(c *C) { var storageList []string - create := func() *objectAPI { + create := func() objectAPI { path, err := ioutil.TempDir(os.TempDir(), "minio-") c.Check(err, IsNil) storageAPI, err := newFS(path) diff --git a/object_api_suite_test.go b/object_api_suite_test.go index ed06bff7c..236898f17 100644 --- a/object_api_suite_test.go +++ b/object_api_suite_test.go @@ -28,7 +28,7 @@ import ( ) // APITestSuite - collection of API tests -func APITestSuite(c *check.C, create func() *objectAPI) { +func APITestSuite(c *check.C, create func() objectAPI) { testMakeBucket(c, create) testMultipleObjectCreation(c, create) testPaging(c, create) @@ -46,14 +46,13 @@ func APITestSuite(c *check.C, create func() *objectAPI) { testMultipartObjectAbort(c, create) } -func testMakeBucket(c *check.C, create func() *objectAPI) { +func testMakeBucket(c *check.C, create func() objectAPI) { obj := create() err := obj.MakeBucket("bucket") c.Assert(err, check.IsNil) } -// Tests verifies the functionality of PutObjectPart. -func testMultipartObjectCreation(c *check.C, create func() *objectAPI) { +func testMultipartObjectCreation(c *check.C, create func() objectAPI) { obj := create() err := obj.MakeBucket("bucket") c.Assert(err, check.IsNil) @@ -83,7 +82,7 @@ func testMultipartObjectCreation(c *check.C, create func() *objectAPI) { c.Assert(md5Sum, check.Equals, "3605d84b1c43b1a664aa7c0d5082d271-10") } -func testMultipartObjectAbort(c *check.C, create func() *objectAPI) { +func testMultipartObjectAbort(c *check.C, create func() objectAPI) { obj := create() err := obj.MakeBucket("bucket") c.Assert(err, check.IsNil) @@ -114,7 +113,7 @@ func testMultipartObjectAbort(c *check.C, create func() *objectAPI) { c.Assert(err, check.IsNil) } -func testMultipleObjectCreation(c *check.C, create func() *objectAPI) { +func testMultipleObjectCreation(c *check.C, create func() objectAPI) { objects := make(map[string][]byte) obj := create() err := obj.MakeBucket("bucket") @@ -155,7 +154,7 @@ func testMultipleObjectCreation(c *check.C, create func() *objectAPI) { } } -func testPaging(c *check.C, create func() *objectAPI) { +func testPaging(c *check.C, create func() objectAPI) { obj := create() obj.MakeBucket("bucket") result, err := obj.ListObjects("bucket", "", "", "", 0) @@ -255,7 +254,7 @@ func testPaging(c *check.C, create func() *objectAPI) { } } -func testObjectOverwriteWorks(c *check.C, create func() *objectAPI) { +func testObjectOverwriteWorks(c *check.C, create func() objectAPI) { obj := create() err := obj.MakeBucket("bucket") c.Assert(err, check.IsNil) @@ -276,13 +275,13 @@ func testObjectOverwriteWorks(c *check.C, create func() *objectAPI) { c.Assert(r.Close(), check.IsNil) } -func testNonExistantBucketOperations(c *check.C, create func() *objectAPI) { +func testNonExistantBucketOperations(c *check.C, create func() objectAPI) { obj := create() _, err := obj.PutObject("bucket", "object", int64(len("one")), bytes.NewBufferString("one"), nil) c.Assert(err, check.Not(check.IsNil)) } -func testBucketRecreateFails(c *check.C, create func() *objectAPI) { +func testBucketRecreateFails(c *check.C, create func() objectAPI) { obj := create() err := obj.MakeBucket("string") c.Assert(err, check.IsNil) @@ -290,7 +289,7 @@ func testBucketRecreateFails(c *check.C, create func() *objectAPI) { c.Assert(err, check.Not(check.IsNil)) } -func testPutObjectInSubdir(c *check.C, create func() *objectAPI) { +func testPutObjectInSubdir(c *check.C, create func() objectAPI) { obj := create() err := obj.MakeBucket("bucket") c.Assert(err, check.IsNil) @@ -308,7 +307,7 @@ func testPutObjectInSubdir(c *check.C, create func() *objectAPI) { c.Assert(r.Close(), check.IsNil) } -func testListBuckets(c *check.C, create func() *objectAPI) { +func testListBuckets(c *check.C, create func() objectAPI) { obj := create() // test empty list @@ -340,7 +339,7 @@ func testListBuckets(c *check.C, create func() *objectAPI) { c.Assert(err, check.IsNil) } -func testListBucketsOrder(c *check.C, create func() *objectAPI) { +func testListBucketsOrder(c *check.C, create func() objectAPI) { // if implementation contains a map, order of map keys will vary. // this ensures they return in the same order each time for i := 0; i < 10; i++ { @@ -358,7 +357,7 @@ func testListBucketsOrder(c *check.C, create func() *objectAPI) { } } -func testListObjectsTestsForNonExistantBucket(c *check.C, create func() *objectAPI) { +func testListObjectsTestsForNonExistantBucket(c *check.C, create func() objectAPI) { obj := create() result, err := obj.ListObjects("bucket", "", "", "", 1000) c.Assert(err, check.Not(check.IsNil)) @@ -366,7 +365,7 @@ func testListObjectsTestsForNonExistantBucket(c *check.C, create func() *objectA c.Assert(len(result.Objects), check.Equals, 0) } -func testNonExistantObjectInBucket(c *check.C, create func() *objectAPI) { +func testNonExistantObjectInBucket(c *check.C, create func() objectAPI) { obj := create() err := obj.MakeBucket("bucket") c.Assert(err, check.IsNil) @@ -381,7 +380,7 @@ func testNonExistantObjectInBucket(c *check.C, create func() *objectAPI) { } } -func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() *objectAPI) { +func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() objectAPI) { obj := create() err := obj.MakeBucket("bucket") c.Assert(err, check.IsNil) @@ -410,7 +409,7 @@ func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() *objectAPI) } } -func testDefaultContentType(c *check.C, create func() *objectAPI) { +func testDefaultContentType(c *check.C, create func() objectAPI) { obj := create() err := obj.MakeBucket("bucket") c.Assert(err, check.IsNil) diff --git a/xl-v1-common.go b/xl-v1-common.go new file mode 100644 index 000000000..989484598 --- /dev/null +++ b/xl-v1-common.go @@ -0,0 +1,157 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "encoding/json" + "errors" + slashpath "path" + "path/filepath" +) + +// Returns slice of disks needed for ReadFile operation: +// - slice returing readable disks. +// - fileMetadata +// - bool value indicating if selfHeal is needed. +// - error if any. +func (xl XL) getReadableDisks(volume, path string) ([]StorageAPI, fileMetadata, bool, error) { + partsMetadata, errs := xl.getPartsMetadata(volume, path) + highestVersion := int64(0) + versions := make([]int64, len(xl.storageDisks)) + quorumDisks := make([]StorageAPI, len(xl.storageDisks)) + notFoundCount := 0 + // If quorum says errFileNotFound return errFileNotFound + for _, err := range errs { + if err == errFileNotFound { + notFoundCount++ + } + } + if notFoundCount > xl.readQuorum { + return nil, fileMetadata{}, false, errFileNotFound + } + for index, metadata := range partsMetadata { + if errs[index] == nil { + version, err := metadata.GetFileVersion() + if err == errMetadataKeyNotExist { + versions[index] = 0 + continue + } + if err != nil { + // Unexpected, return error. + return nil, fileMetadata{}, false, err + } + versions[index] = version + } else { + versions[index] = -1 + } + } + quorumCount := 0 + for index, version := range versions { + if version == highestVersion { + quorumDisks[index] = xl.storageDisks[index] + quorumCount++ + } else { + quorumDisks[index] = nil + } + } + if quorumCount < xl.readQuorum { + return nil, fileMetadata{}, false, errReadQuorum + } + var metadata fileMetadata + for index, disk := range quorumDisks { + if disk == nil { + continue + } + metadata = partsMetadata[index] + break + } + // FIXME: take care of the situation when a disk has failed and been removed + // by looking at the error returned from the fs layer. fs-layer will have + // to return an error indicating that the disk is not available and should be + // different from ErrNotExist. + doSelfHeal := quorumCount != len(xl.storageDisks) + return quorumDisks, metadata, doSelfHeal, nil +} + +// Get parts.json metadata as a map slice. +// Returns error slice indicating the failed metadata reads. +// Read lockNS() should be done by caller. +func (xl XL) getPartsMetadata(volume, path string) ([]fileMetadata, []error) { + errs := make([]error, len(xl.storageDisks)) + metadataArray := make([]fileMetadata, len(xl.storageDisks)) + metadataFilePath := slashpath.Join(path, metadataFile) + for index, disk := range xl.storageDisks { + offset := int64(0) + metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset) + if err != nil { + errs[index] = err + continue + } + defer metadataReader.Close() + + metadata, err := fileMetadataDecode(metadataReader) + if err != nil { + // Unable to parse parts.json, set error. + errs[index] = err + continue + } + metadataArray[index] = metadata + } + return metadataArray, errs +} + +// Writes/Updates `parts.json` for given file. updateParts carries +// index of disks where `parts.json` needs to be updated. +// +// Returns collection of errors, indexed in accordance with input +// updateParts order. +// Write lockNS() should be done by caller. +func (xl XL) setPartsMetadata(volume, path string, metadata fileMetadata, updateParts []bool) []error { + metadataFilePath := filepath.Join(path, metadataFile) + errs := make([]error, len(xl.storageDisks)) + + for index := range updateParts { + errs[index] = errors.New("Metadata not updated") + } + + metadataBytes, err := json.Marshal(metadata) + if err != nil { + for index := range updateParts { + errs[index] = err + } + return errs + } + + for index, shouldUpdate := range updateParts { + if !shouldUpdate { + continue + } + writer, err := xl.storageDisks[index].CreateFile(volume, metadataFilePath) + errs[index] = err + if err != nil { + continue + } + _, err = writer.Write(metadataBytes) + if err != nil { + errs[index] = err + safeCloseAndRemove(writer) + continue + } + writer.Close() + } + return errs +} diff --git a/xl-v1-createfile.go b/xl-v1-createfile.go index cca4d3404..93a94cbfd 100644 --- a/xl-v1-createfile.go +++ b/xl-v1-createfile.go @@ -98,16 +98,15 @@ func (xl XL) getFileQuorumVersionMap(volume, path string) map[int]int64 { continue } - if version := metadata.Get("file.version"); version == nil { + version, err := metadata.GetFileVersion() + if err == errMetadataKeyNotExist { fileQuorumVersionMap[index] = 0 - } else { - // Convert string to integer. - fileVersion, err := strconv.ParseInt(version[0], 10, 64) - if err != nil { - continue - } - fileQuorumVersionMap[index] = fileVersion + continue + } + if err != nil { + continue } + fileQuorumVersionMap[index] = version } return fileQuorumVersionMap } diff --git a/xl-v1-metadata.go b/xl-v1-metadata.go index 342d0227d..0345ea3da 100644 --- a/xl-v1-metadata.go +++ b/xl-v1-metadata.go @@ -96,6 +96,25 @@ func (f fileMetadata) GetModTime() (time.Time, error) { return time.Parse(timeFormatAMZ, timeStrs[0]) } +// Set file Modification time. +func (f fileMetadata) SetModTime(modTime time.Time) { + f.Set("file.modTime", modTime.Format(timeFormatAMZ)) +} + +// Get file version. +func (f fileMetadata) GetFileVersion() (int64, error) { + version := f.Get("file.version") + if version == nil { + return 0, errMetadataKeyNotExist + } + return strconv.ParseInt(version[0], 10, 64) +} + +// Set file version. +func (f fileMetadata) SetFileVersion(fileVersion int64) { + f.Set("file.version", strconv.FormatInt(fileVersion, 10)) +} + // fileMetadataDecode - file metadata decode. func fileMetadataDecode(reader io.Reader) (fileMetadata, error) { metadata := make(fileMetadata) diff --git a/xl-v1-readfile.go b/xl-v1-readfile.go index 5755151e5..54672b966 100644 --- a/xl-v1-readfile.go +++ b/xl-v1-readfile.go @@ -21,96 +21,8 @@ import ( "fmt" "io" slashpath "path" - "strconv" ) -// checkBlockSize return the size of a single block. -// The first non-zero size is returned, -// or 0 if all blocks are size 0. -func checkBlockSize(blocks [][]byte) int { - for _, block := range blocks { - if len(block) != 0 { - return len(block) - } - } - return 0 -} - -// calculate the blockSize based on input length and total number of -// data blocks. -func getEncodedBlockLen(inputLen, dataBlocks int) (curBlockSize int) { - curBlockSize = (inputLen + dataBlocks - 1) / dataBlocks - return -} - -// Returns slice of disks needed for ReadFile operation: -// - slice returing readable disks. -// - fileMetadata -// - bool value indicating if selfHeal is needed. -// - error if any. -func (xl XL) getReadableDisks(volume, path string) ([]StorageAPI, fileMetadata, bool, error) { - partsMetadata, errs := xl.getPartsMetadata(volume, path) - highestVersion := int64(0) - versions := make([]int64, len(xl.storageDisks)) - quorumDisks := make([]StorageAPI, len(xl.storageDisks)) - notFoundCount := 0 - // If quorum says errFileNotFound return errFileNotFound - for _, err := range errs { - if err == errFileNotFound { - notFoundCount++ - } - } - if notFoundCount > xl.readQuorum { - return nil, fileMetadata{}, false, errFileNotFound - } - for index, metadata := range partsMetadata { - if errs[index] == nil { - if version := metadata.Get("file.version"); version != nil { - // Convert string to integer. - version, err := strconv.ParseInt(version[0], 10, 64) - if err != nil { - // Unexpected, return error. - return nil, fileMetadata{}, false, err - } - if version > highestVersion { - highestVersion = version - } - versions[index] = version - } else { - versions[index] = 0 - } - } else { - versions[index] = -1 - } - } - quorumCount := 0 - for index, version := range versions { - if version == highestVersion { - quorumDisks[index] = xl.storageDisks[index] - quorumCount++ - } else { - quorumDisks[index] = nil - } - } - if quorumCount < xl.readQuorum { - return nil, fileMetadata{}, false, errReadQuorum - } - var metadata fileMetadata - for index, disk := range quorumDisks { - if disk == nil { - continue - } - metadata = partsMetadata[index] - break - } - // FIXME: take care of the situation when a disk has failed and been removed - // by looking at the error returned from the fs layer. fs-layer will have - // to return an error indicating that the disk is not available and should be - // different from ErrNotExist. - doSelfHeal := quorumCount != len(xl.storageDisks) - return quorumDisks, metadata, doSelfHeal, nil -} - // ReadFile - read file func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) { // Input validation. diff --git a/xl-v1-utils.go b/xl-v1-utils.go index f3ae39f10..09b4ea48a 100644 --- a/xl-v1-utils.go +++ b/xl-v1-utils.go @@ -1,77 +1,36 @@ -package main - -import ( - "encoding/json" - "errors" - slashpath "path" - "path/filepath" -) +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -// Get parts.json metadata as a map slice. -// Returns error slice indicating the failed metadata reads. -// Read lockNS() should be done by caller. -func (xl XL) getPartsMetadata(volume, path string) ([]fileMetadata, []error) { - errs := make([]error, len(xl.storageDisks)) - metadataArray := make([]fileMetadata, len(xl.storageDisks)) - metadataFilePath := slashpath.Join(path, metadataFile) - for index, disk := range xl.storageDisks { - offset := int64(0) - metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset) - if err != nil { - errs[index] = err - continue - } - defer metadataReader.Close() +package main - metadata, err := fileMetadataDecode(metadataReader) - if err != nil { - // Unable to parse parts.json, set error. - errs[index] = err - continue +// checkBlockSize return the size of a single block. +// The first non-zero size is returned, +// or 0 if all blocks are size 0. +func checkBlockSize(blocks [][]byte) int { + for _, block := range blocks { + if len(block) != 0 { + return len(block) } - metadataArray[index] = metadata } - return metadataArray, errs + return 0 } -// Writes/Updates `parts.json` for given file. updateParts carries -// index of disks where `parts.json` needs to be updated. -// -// Returns collection of errors, indexed in accordance with input -// updateParts order. -// Write lockNS() should be done by caller. -func (xl XL) setPartsMetadata(volume, path string, metadata fileMetadata, updateParts []bool) []error { - metadataFilePath := filepath.Join(path, metadataFile) - errs := make([]error, len(xl.storageDisks)) - - for index := range updateParts { - errs[index] = errors.New("Metadata not updated") - } - - metadataBytes, err := json.Marshal(metadata) - if err != nil { - for index := range updateParts { - errs[index] = err - } - return errs - } - - for index, shouldUpdate := range updateParts { - if !shouldUpdate { - continue - } - writer, err := xl.storageDisks[index].CreateFile(volume, metadataFilePath) - errs[index] = err - if err != nil { - continue - } - _, err = writer.Write(metadataBytes) - if err != nil { - errs[index] = err - safeCloseAndRemove(writer) - continue - } - writer.Close() - } - return errs +// calculate the blockSize based on input length and total number of +// data blocks. +func getEncodedBlockLen(inputLen, dataBlocks int) (curBlockSize int) { + curBlockSize = (inputLen + dataBlocks - 1) / dataBlocks + return }