diff --git a/format-config-v1.go b/format-config-v1.go index c4342a2f2..1017546dc 100644 --- a/format-config-v1.go +++ b/format-config-v1.go @@ -115,10 +115,8 @@ func reorderDisks(bootstrapDisks []StorageAPI, formatConfigs []*formatConfigV1) // loadFormat - load format from disk. func loadFormat(disk StorageAPI) (format *formatConfigV1, err error) { - buffer := make([]byte, blockSizeV1) - offset := int64(0) - var n int64 - n, err = disk.ReadFile(minioMetaBucket, formatConfigFile, offset, buffer) + var buffer []byte + buffer, err = readAll(disk, minioMetaBucket, formatConfigFile) if err != nil { // 'file not found' and 'volume not found' as // same. 'volume not found' usually means its a fresh disk. @@ -138,7 +136,7 @@ func loadFormat(disk StorageAPI) (format *formatConfigV1, err error) { return nil, err } format = &formatConfigV1{} - err = json.Unmarshal(buffer[:n], format) + err = json.Unmarshal(buffer, format) if err != nil { return nil, err } diff --git a/fs-v1-metadata.go b/fs-v1-metadata.go index 84d26c359..b37252900 100644 --- a/fs-v1-metadata.go +++ b/fs-v1-metadata.go @@ -57,12 +57,12 @@ func (m *fsMetaV1) AddObjectPart(partNumber int, partName string, partETag strin // readFSMetadata - returns the object metadata `fs.json` content. func (fs fsObjects) readFSMetadata(bucket, object string) (fsMeta fsMetaV1, err error) { - buffer := make([]byte, blockSizeV1) - n, err := fs.storage.ReadFile(bucket, path.Join(object, fsMetaJSONFile), int64(0), buffer) + var buffer []byte + buffer, err = readAll(fs.storage, bucket, path.Join(object, fsMetaJSONFile)) if err != nil { return fsMetaV1{}, err } - err = json.Unmarshal(buffer[:n], &fsMeta) + err = json.Unmarshal(buffer, &fsMeta) if err != nil { return fsMetaV1{}, err } diff --git a/fs-v1-multipart.go b/fs-v1-multipart.go index 7d6fc63f5..230a17370 100644 --- a/fs-v1-multipart.go +++ b/fs-v1-multipart.go @@ -43,8 +43,8 @@ func (fs fsObjects) isBucketExist(bucket string) bool { return true } -// newMultipartUploadCommon - initialize a new multipart, is a common function for both object layers. -func (fs fsObjects) newMultipartUploadCommon(bucket string, object string, meta map[string]string) (uploadID string, err error) { +// newMultipartUpload - initialize a new multipart. +func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[string]string) (uploadID string, err error) { // Verify if bucket name is valid. if !IsValidBucketName(bucket) { return "", BucketNameInvalid{Bucket: bucket} @@ -111,8 +111,8 @@ func (fs fsObjects) listUploadsInfo(prefixPath string) (uploads []uploadInfo, er return uploads, nil } -// listMultipartUploadsCommon - lists all multipart uploads, common function for both object layers. -func (fs fsObjects) listMultipartUploadsCommon(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { +// listMultipartUploads - lists all multipart uploads. +func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { result := ListMultipartsInfo{} // Verify if bucket is valid. if !IsValidBucketName(bucket) { @@ -266,17 +266,17 @@ func (fs fsObjects) listMultipartUploadsCommon(bucket, prefix, keyMarker, upload // ListMultipartUploads - list multipart uploads. func (fs fsObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { - return fs.listMultipartUploadsCommon(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) + return fs.listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) } // NewMultipartUpload - initialize a new multipart upload, returns a unique id. func (fs fsObjects) NewMultipartUpload(bucket, object string, meta map[string]string) (string, error) { meta = make(map[string]string) // Reset the meta value, we are not going to save headers for fs. - return fs.newMultipartUploadCommon(bucket, object, meta) + return fs.newMultipartUpload(bucket, object, meta) } // putObjectPartCommon - put object part. -func (fs fsObjects) putObjectPartCommon(bucket string, object string, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { +func (fs fsObjects) putObjectPart(bucket string, object string, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { return "", BucketNameInvalid{Bucket: bucket} @@ -364,10 +364,10 @@ func (fs fsObjects) putObjectPartCommon(bucket string, object string, uploadID s // PutObjectPart - writes the multipart upload chunks. func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { - return fs.putObjectPartCommon(bucket, object, uploadID, partID, size, data, md5Hex) + return fs.putObjectPart(bucket, object, uploadID, partID, size, data, md5Hex) } -func (fs fsObjects) listObjectPartsCommon(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) { +func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { return ListPartsInfo{}, BucketNameInvalid{Bucket: bucket} @@ -432,8 +432,9 @@ func (fs fsObjects) listObjectPartsCommon(bucket, object, uploadID string, partN return result, nil } +// ListObjectParts - list all parts. func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) { - return fs.listObjectPartsCommon(bucket, object, uploadID, partNumberMarker, maxParts) + return fs.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts) } // isUploadIDExists - verify if a given uploadID exists and is valid. @@ -450,6 +451,7 @@ func (fs fsObjects) isUploadIDExists(bucket, object, uploadID string) bool { return true } +// CompleteMultipartUpload - implement complete multipart upload transaction. func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { @@ -533,9 +535,8 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload return s3MD5, nil } -// abortMultipartUploadCommon - aborts a multipart upload, common -// function used by both object layers. -func (fs fsObjects) abortMultipartUploadCommon(bucket, object, uploadID string) error { +// abortMultipartUpload - aborts a multipart upload. +func (fs fsObjects) abortMultipartUpload(bucket, object, uploadID string) error { // Verify if bucket is valid. if !IsValidBucketName(bucket) { return BucketNameInvalid{Bucket: bucket} @@ -581,7 +582,7 @@ func (fs fsObjects) abortMultipartUploadCommon(bucket, object, uploadID string) return nil } -// AbortMultipartUpload - aborts a multipart upload. +// AbortMultipartUpload - aborts an multipart upload. func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error { - return fs.abortMultipartUploadCommon(bucket, object, uploadID) + return fs.abortMultipartUpload(bucket, object, uploadID) } diff --git a/fs-v1.go b/fs-v1.go index 063818023..87f383d58 100644 --- a/fs-v1.go +++ b/fs-v1.go @@ -57,9 +57,8 @@ func newFSObjects(disk string) (ObjectLayer, error) { } } - // Initialize object layer - like creating minioMetaBucket, - // cleaning up tmp files etc. - initObjectLayer(storage) + // Runs house keeping code, like creating minioMetaBucket, cleaning up tmp files etc. + fsHouseKeeping(storage) // Return successfully initialized object layer. return fsObjects{ @@ -311,7 +310,7 @@ func isBucketExist(storage StorageAPI, bucketName string) bool { return true } -func (fs fsObjects) listObjectsFS(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { +func (fs fsObjects) listObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { // Convert entry to FileInfo entryToFileInfo := func(entry string) (fileInfo FileInfo, err error) { if strings.HasSuffix(entry, slashSeparator) { @@ -443,5 +442,5 @@ func (fs fsObjects) listObjectsFS(bucket, prefix, marker, delimiter string, maxK // ListObjects - list all objects. func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { - return fs.listObjectsFS(bucket, prefix, marker, delimiter, maxKeys) + return fs.listObjects(bucket, prefix, marker, delimiter, maxKeys) } diff --git a/object-common.go b/object-common.go index 27747313e..979c3182b 100644 --- a/object-common.go +++ b/object-common.go @@ -26,8 +26,25 @@ const ( blockSizeV1 = 10 * 1024 * 1024 // 10MiB. ) -// Common initialization needed for both object layers. -func initObjectLayer(storageDisks ...StorageAPI) error { +// House keeping code needed for FS. +func fsHouseKeeping(storageDisk StorageAPI) error { + // Attempt to create `.minio`. + err := storageDisk.MakeVol(minioMetaBucket) + if err != nil { + if err != errVolumeExists && err != errDiskNotFound { + return err + } + } + // Cleanup all temp entries upon start. + err = cleanupDir(storageDisk, minioMetaBucket, tmpMetaPrefix) + if err != nil { + return err + } + return nil +} + +// House keeping code needed for XL. +func xlHouseKeeping(storageDisks []StorageAPI) error { // This happens for the first time, but keep this here since this // is the only place where it can be made expensive optimizing all // other calls. Create minio meta volume, if it doesn't exist yet. diff --git a/tree-walk-xl.go b/tree-walk-xl.go index eb1c2a683..85d86a474 100644 --- a/tree-walk-xl.go +++ b/tree-walk-xl.go @@ -17,7 +17,6 @@ package main import ( - "math/rand" "sort" "strings" "time" @@ -79,17 +78,8 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) return nil, err } -// getRandomDisk - gives a random disk at any point in time from the -// available pool of disks. -func (xl xlObjects) getRandomDisk() (disk StorageAPI) { - rand.Seed(time.Now().UTC().UnixNano()) // Seed with current time. - randIndex := rand.Intn(len(xl.storageDisks) - 1) - disk = xl.storageDisks[randIndex] // Pick a random disk. - return disk -} - -// treeWalkXL walks directory tree recursively pushing fileInfo into the channel as and when it encounters files. -func (xl xlObjects) treeWalkXL(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, send func(treeWalkResult) bool, count *int, isLeaf func(string, string) bool) bool { +// treeWalk walks directory tree recursively pushing fileInfo into the channel as and when it encounters files. +func (xl xlObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, send func(treeWalkResult) bool, count *int, isLeaf func(string, string) bool) bool { // Example: // if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively // called with prefixDir="one/two/three/four/" and marker="five.txt" @@ -133,7 +123,7 @@ func (xl xlObjects) treeWalkXL(bucket, prefixDir, entryPrefixMatch, marker strin if recursive && !strings.HasSuffix(entry, slashSeparator) { // We should not skip for recursive listing and if markerDir is a directory // for ex. if marker is "four/five.txt" markerDir will be "four/" which - // should not be skipped, instead it will need to be treeWalkXL()'ed into. + // should not be skipped, instead it will need to be treeWalk()'ed into. // Skip if it is a file though as it would be listed in previous listing. *count-- @@ -151,7 +141,7 @@ func (xl xlObjects) treeWalkXL(bucket, prefixDir, entryPrefixMatch, marker strin } *count-- prefixMatch := "" // Valid only for first level treeWalk and empty for subdirectories. - if !xl.treeWalkXL(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, send, count, isLeaf) { + if !xl.treeWalk(bucket, pathJoin(prefixDir, entry), prefixMatch, markerArg, recursive, send, count, isLeaf) { return false } continue @@ -165,7 +155,7 @@ func (xl xlObjects) treeWalkXL(bucket, prefixDir, entryPrefixMatch, marker strin } // Initiate a new treeWalk in a goroutine. -func (xl xlObjects) startTreeWalkXL(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool) *treeWalker { +func (xl xlObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool) *treeWalker { // Example 1 // If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt" // treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt" @@ -202,13 +192,13 @@ func (xl xlObjects) startTreeWalkXL(bucket, prefix, marker string, recursive boo return false } } - xl.treeWalkXL(bucket, prefixDir, entryPrefixMatch, marker, recursive, send, &count, isLeaf) + xl.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, send, &count, isLeaf) }() return &walkNotify } // Save the goroutine reference in the map -func (xl xlObjects) saveTreeWalkXL(params listParams, walker *treeWalker) { +func (xl xlObjects) saveTreeWalk(params listParams, walker *treeWalker) { xl.listObjectMapMutex.Lock() defer xl.listObjectMapMutex.Unlock() @@ -219,7 +209,7 @@ func (xl xlObjects) saveTreeWalkXL(params listParams, walker *treeWalker) { } // Lookup the goroutine reference from map -func (xl xlObjects) lookupTreeWalkXL(params listParams) *treeWalker { +func (xl xlObjects) lookupTreeWalk(params listParams) *treeWalker { xl.listObjectMapMutex.Lock() defer xl.listObjectMapMutex.Unlock() diff --git a/xl-v1-common.go b/xl-v1-common.go new file mode 100644 index 000000000..2e0352636 --- /dev/null +++ b/xl-v1-common.go @@ -0,0 +1,112 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "math/rand" + "path" + "sync" + "time" +) + +// getRandomDisk - gives a random disk at any point in time from the +// available pool of disks. +func (xl xlObjects) getRandomDisk() (disk StorageAPI) { + rand.Seed(time.Now().UTC().UnixNano()) // Seed with current time. + randIndex := rand.Intn(len(xl.storageDisks) - 1) + disk = xl.storageDisks[randIndex] // Pick a random disk. + return disk +} + +// This function does the following check, suppose +// object is "a/b/c/d", stat makes sure that objects ""a/b/c"" +// "a/b" and "a" do not exist. +func (xl xlObjects) parentDirIsObject(bucket, parent string) bool { + var isParentDirObject func(string) bool + isParentDirObject = func(p string) bool { + if p == "." { + return false + } + if xl.isObject(bucket, p) { + // If there is already a file at prefix "p" return error. + return true + } + // Check if there is a file as one of the parent paths. + return isParentDirObject(path.Dir(p)) + } + return isParentDirObject(parent) +} + +func (xl xlObjects) isObject(bucket, prefix string) bool { + // Create errs and volInfo slices of storageDisks size. + var errs = make([]error, len(xl.storageDisks)) + + // Allocate a new waitgroup. + var wg = &sync.WaitGroup{} + for index, disk := range xl.storageDisks { + wg.Add(1) + // Stat file on all the disks in a routine. + go func(index int, disk StorageAPI) { + defer wg.Done() + _, err := disk.StatFile(bucket, path.Join(prefix, xlMetaJSONFile)) + if err != nil { + errs[index] = err + return + } + errs[index] = nil + }(index, disk) + } + + // Wait for all the Stat operations to finish. + wg.Wait() + + var errFileNotFoundCount int + for _, err := range errs { + if err != nil { + if err == errFileNotFound { + errFileNotFoundCount++ + // If we have errors with file not found greater than allowed read + // quorum we return err as errFileNotFound. + if errFileNotFoundCount > len(xl.storageDisks)-xl.readQuorum { + return false + } + continue + } + errorIf(err, "Unable to access file "+path.Join(bucket, prefix)) + return false + } + } + return true +} + +// statPart - stat a part file. +func (xl xlObjects) statPart(bucket, objectPart string) (fileInfo FileInfo, err error) { + // Count for errors encountered. + var xlJSONErrCount = 0 + + // Return the first success entry based on the selected random disk. + for xlJSONErrCount < len(xl.storageDisks) { + // Choose a random disk on each attempt. + disk := xl.getRandomDisk() + fileInfo, err = disk.StatFile(bucket, objectPart) + if err == nil { + return fileInfo, nil + } + xlJSONErrCount++ // Update error count. + } + return FileInfo{}, err +} diff --git a/xl-v1-list-objects.go b/xl-v1-list-objects.go index 99154d8fd..e30e4d8f4 100644 --- a/xl-v1-list-objects.go +++ b/xl-v1-list-objects.go @@ -1,17 +1,34 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package main import "strings" -func (xl xlObjects) listObjectsXL(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { +// listObjects - wrapper function implemented over file tree walk. +func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { // Default is recursive, if delimiter is set then list non recursive. recursive := true if delimiter == slashSeparator { recursive = false } - walker := xl.lookupTreeWalkXL(listParams{bucket, recursive, marker, prefix}) + walker := xl.lookupTreeWalk(listParams{bucket, recursive, marker, prefix}) if walker == nil { - walker = xl.startTreeWalkXL(bucket, prefix, marker, recursive, xl.isObject) + walker = xl.startTreeWalk(bucket, prefix, marker, recursive, xl.isObject) } var objInfos []ObjectInfo var eof bool @@ -57,7 +74,7 @@ func (xl xlObjects) listObjectsXL(bucket, prefix, marker, delimiter string, maxK } params := listParams{bucket, recursive, nextMarker, prefix} if !eof { - xl.saveTreeWalkXL(params, walker) + xl.saveTreeWalk(params, walker) } result := ListObjectsInfo{IsTruncated: !eof} @@ -128,7 +145,7 @@ func (xl xlObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey } // Initiate a list operation, if successful filter and return quickly. - listObjInfo, err := xl.listObjectsXL(bucket, prefix, marker, delimiter, maxKeys) + listObjInfo, err := xl.listObjects(bucket, prefix, marker, delimiter, maxKeys) if err == nil { // We got the entries successfully return. return listObjInfo, nil diff --git a/xl-v1-metadata.go b/xl-v1-metadata.go index 2911956b2..844599eb5 100644 --- a/xl-v1-metadata.go +++ b/xl-v1-metadata.go @@ -18,7 +18,6 @@ package main import ( "encoding/json" - "math/rand" "path" "sort" "sync" @@ -40,6 +39,13 @@ type objectPartInfo struct { Size int64 `json:"size"` } +// byPartName is a collection satisfying sort.Interface. +type byPartNumber []objectPartInfo + +func (t byPartNumber) Len() int { return len(t) } +func (t byPartNumber) Swap(i, j int) { t[i], t[j] = t[j], t[i] } +func (t byPartNumber) Less(i, j int) bool { return t[i].Number < t[j].Number } + // A xlMetaV1 represents a metadata header mapping keys to sets of values. type xlMetaV1 struct { Version string `json:"version"` @@ -69,12 +75,19 @@ type xlMetaV1 struct { Parts []objectPartInfo `json:"parts,omitempty"` } -// byPartName is a collection satisfying sort.Interface. -type byPartNumber []objectPartInfo - -func (t byPartNumber) Len() int { return len(t) } -func (t byPartNumber) Swap(i, j int) { t[i], t[j] = t[j], t[i] } -func (t byPartNumber) Less(i, j int) bool { return t[i].Number < t[j].Number } +// newXLMetaV1 - initializes new xlMetaV1. +func newXLMetaV1(dataBlocks, parityBlocks int) (xlMeta xlMetaV1) { + xlMeta = xlMetaV1{} + xlMeta.Version = "1" + xlMeta.Format = "xl" + xlMeta.Minio.Release = minioReleaseTag + xlMeta.Erasure.Algorithm = erasureAlgorithmKlauspost + xlMeta.Erasure.DataBlocks = dataBlocks + xlMeta.Erasure.ParityBlocks = parityBlocks + xlMeta.Erasure.BlockSize = blockSizeV1 + xlMeta.Erasure.Distribution = randInts(dataBlocks + parityBlocks) + return xlMeta +} // ObjectPartIndex - returns the index of matching object part number. func (m xlMetaV1) ObjectPartIndex(partNumber int) (index int) { @@ -139,16 +152,13 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err // Count for errors encountered. var xlJSONErrCount = 0 - // Allocate 10MiB buffer. - buffer := make([]byte, blockSizeV1) - // Return the first successful lookup from a random list of disks. for xlJSONErrCount < len(xl.storageDisks) { disk := xl.getRandomDisk() // Choose a random disk on each attempt. - var n int64 - n, err = disk.ReadFile(bucket, path.Join(object, xlMetaJSONFile), int64(0), buffer) + var buffer []byte + buffer, err = readAll(disk, bucket, path.Join(object, xlMetaJSONFile)) if err == nil { - err = json.Unmarshal(buffer[:n], &xlMeta) + err = json.Unmarshal(buffer, &xlMeta) if err == nil { return xlMeta, nil } @@ -158,20 +168,6 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err return xlMetaV1{}, err } -// newXLMetaV1 - initializes new xlMetaV1. -func newXLMetaV1(dataBlocks, parityBlocks int) (xlMeta xlMetaV1) { - xlMeta = xlMetaV1{} - xlMeta.Version = "1" - xlMeta.Format = "xl" - xlMeta.Minio.Release = minioReleaseTag - xlMeta.Erasure.Algorithm = erasureAlgorithmKlauspost - xlMeta.Erasure.DataBlocks = dataBlocks - xlMeta.Erasure.ParityBlocks = parityBlocks - xlMeta.Erasure.BlockSize = blockSizeV1 - xlMeta.Erasure.Distribution = randErasureDistribution(dataBlocks + parityBlocks) - return xlMeta -} - // renameXLMetadata - renames `xl.json` from source prefix to destination prefix. func (xl xlObjects) renameXLMetadata(srcBucket, srcPrefix, dstBucket, dstPrefix string) error { var wg = &sync.WaitGroup{} @@ -234,6 +230,7 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro mErrs[index] = err return } + // Persist marshalled data. n, mErr := disk.AppendFile(bucket, jsonFile, metadataBytes) if mErr != nil { mErrs[index] = mErr @@ -259,18 +256,3 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro } return nil } - -// randErasureDistribution - uses Knuth Fisher-Yates shuffle algorithm. -func randErasureDistribution(numBlocks int) []int { - rand.Seed(time.Now().UTC().UnixNano()) // Seed with current time. - distribution := make([]int, numBlocks) - for i := 0; i < numBlocks; i++ { - distribution[i] = i + 1 - } - for i := 0; i < numBlocks; i++ { - // Choose index uniformly in [i, numBlocks-1] - r := i + rand.Intn(numBlocks-i) - distribution[r], distribution[i] = distribution[i], distribution[r] - } - return distribution -} diff --git a/xl-v1-multipart-common.go b/xl-v1-multipart-common.go index 216a81242..740ed9ddd 100644 --- a/xl-v1-multipart-common.go +++ b/xl-v1-multipart-common.go @@ -81,13 +81,13 @@ func readUploadsJSON(bucket, object string, storageDisks ...StorageAPI) (uploadI // Read `uploads.json` in a routine. go func(index int, disk StorageAPI) { defer wg.Done() - var buffer = make([]byte, blockSizeV1) // Allocate blockSized buffer. - n, rErr := disk.ReadFile(minioMetaBucket, uploadJSONPath, int64(0), buffer) + // Read all of 'uploads.json' + buffer, rErr := readAll(disk, minioMetaBucket, uploadJSONPath) if rErr != nil { errs[index] = rErr return } - rErr = json.Unmarshal(buffer[:n], &uploads[index]) + rErr = json.Unmarshal(buffer, &uploads[index]) if rErr != nil { errs[index] = rErr return @@ -331,9 +331,8 @@ func (xl xlObjects) listUploadsInfo(prefixPath string) (uploadsInfo []uploadInfo return uploadsInfo, nil } -// listMultipartUploadsCommon - lists all multipart uploads, common -// function for both object layers. -func (xl xlObjects) listMultipartUploadsCommon(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { +// listMultipartUploads - lists all multipart uploads. +func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { result := ListMultipartsInfo{} // Verify if bucket is valid. if !IsValidBucketName(bucket) { @@ -409,9 +408,9 @@ func (xl xlObjects) listMultipartUploadsCommon(bucket, prefix, keyMarker, upload maxUploads = maxUploads - len(uploads) } if maxUploads > 0 { - walker := xl.lookupTreeWalkXL(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath}) + walker := xl.lookupTreeWalk(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath}) if walker == nil { - walker = xl.startTreeWalkXL(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, xl.isMultipartUpload) + walker = xl.startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, xl.isMultipartUpload) } for maxUploads > 0 { walkResult, ok := <-walker.ch diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index c8bd6c9f2..96a1a145b 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -31,13 +31,11 @@ import ( // ListMultipartUploads - list multipart uploads. func (xl xlObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { - return xl.listMultipartUploadsCommon(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) + return xl.listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) } -/// Common multipart object layer functions. - -// newMultipartUploadCommon - initialize a new multipart, is a common function for both object layers. -func (xl xlObjects) newMultipartUploadCommon(bucket string, object string, meta map[string]string) (uploadID string, err error) { +// newMultipartUpload - initialize a new multipart. +func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[string]string) (uploadID string, err error) { // Verify if bucket name is valid. if !IsValidBucketName(bucket) { return "", BucketNameInvalid{Bucket: bucket} @@ -96,11 +94,11 @@ func (xl xlObjects) newMultipartUploadCommon(bucket string, object string, meta // NewMultipartUpload - initialize a new multipart upload, returns a unique id. func (xl xlObjects) NewMultipartUpload(bucket, object string, meta map[string]string) (string, error) { - return xl.newMultipartUploadCommon(bucket, object, meta) + return xl.newMultipartUpload(bucket, object, meta) } -// putObjectPartCommon - put object part. -func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { +// putObjectPart - put object part. +func (xl xlObjects) putObjectPart(bucket string, object string, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { return "", BucketNameInvalid{Bucket: bucket} @@ -233,11 +231,11 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s // PutObjectPart - writes the multipart upload chunks. func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { - return xl.putObjectPartCommon(bucket, object, uploadID, partID, size, data, md5Hex) + return xl.putObjectPart(bucket, object, uploadID, partID, size, data, md5Hex) } -// ListObjectParts - list object parts, common function across both object layers. -func (xl xlObjects) listObjectPartsCommon(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) { +// ListObjectParts - list object parts. +func (xl xlObjects) listObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { return ListPartsInfo{}, BucketNameInvalid{Bucket: bucket} @@ -319,7 +317,7 @@ func (xl xlObjects) listObjectPartsCommon(bucket, object, uploadID string, partN // ListObjectParts - list object parts. func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) { - return xl.listObjectPartsCommon(bucket, object, uploadID, partNumberMarker, maxParts) + return xl.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts) } func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, error) { @@ -476,8 +474,8 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload return s3MD5, nil } -// abortMultipartUploadCommon - aborts a multipart upload, common function used by both object layers. -func (xl xlObjects) abortMultipartUploadCommon(bucket, object, uploadID string) error { +// abortMultipartUpload - aborts a multipart upload. +func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) error { // Verify if bucket is valid. if !IsValidBucketName(bucket) { return BucketNameInvalid{Bucket: bucket} @@ -528,5 +526,5 @@ func (xl xlObjects) abortMultipartUploadCommon(bucket, object, uploadID string) // AbortMultipartUpload - aborts a multipart upload. func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error { - return xl.abortMultipartUploadCommon(bucket, object, uploadID) + return xl.abortMultipartUpload(bucket, object, uploadID) } diff --git a/xl-v1-utils.go b/xl-v1-utils.go index beed862a8..8a161ceab 100644 --- a/xl-v1-utils.go +++ b/xl-v1-utils.go @@ -1,85 +1,59 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package main import ( - "path" - "sync" + "bytes" + "io" + "math/rand" + "time" ) -// This function does the following check, suppose -// object is "a/b/c/d", stat makes sure that objects ""a/b/c"" -// "a/b" and "a" do not exist. -func (xl xlObjects) parentDirIsObject(bucket, parent string) bool { - var isParentDirObject func(string) bool - isParentDirObject = func(p string) bool { - if p == "." { - return false - } - if xl.isObject(bucket, p) { - // If there is already a file at prefix "p" return error. - return true - } - // Check if there is a file as one of the parent paths. - return isParentDirObject(path.Dir(p)) - } - return isParentDirObject(parent) -} - -func (xl xlObjects) isObject(bucket, prefix string) bool { - // Create errs and volInfo slices of storageDisks size. - var errs = make([]error, len(xl.storageDisks)) - - // Allocate a new waitgroup. - var wg = &sync.WaitGroup{} - for index, disk := range xl.storageDisks { - wg.Add(1) - // Stat file on all the disks in a routine. - go func(index int, disk StorageAPI) { - defer wg.Done() - _, err := disk.StatFile(bucket, path.Join(prefix, xlMetaJSONFile)) - if err != nil { - errs[index] = err - return - } - errs[index] = nil - }(index, disk) +// randInts - uses Knuth Fisher-Yates shuffle algorithm for generating uniform shuffling. +func randInts(count int) []int { + rand.Seed(time.Now().UTC().UnixNano()) // Seed with current time. + ints := make([]int, count) + for i := 0; i < count; i++ { + ints[i] = i + 1 } - - // Wait for all the Stat operations to finish. - wg.Wait() - - var errFileNotFoundCount int - for _, err := range errs { - if err != nil { - if err == errFileNotFound { - errFileNotFoundCount++ - // If we have errors with file not found greater than allowed read - // quorum we return err as errFileNotFound. - if errFileNotFoundCount > len(xl.storageDisks)-xl.readQuorum { - return false - } - continue - } - errorIf(err, "Unable to access file "+path.Join(bucket, prefix)) - return false - } + for i := 0; i < count; i++ { + // Choose index uniformly in [i, count-1] + r := i + rand.Intn(count-i) + ints[r], ints[i] = ints[i], ints[r] } - return true + return ints } -// statPart - stat a part file. -func (xl xlObjects) statPart(bucket, objectPart string) (fileInfo FileInfo, err error) { - // Count for errors encountered. - var xlJSONErrCount = 0 - - // Return the first success entry based on the selected random disk. - for xlJSONErrCount < len(xl.storageDisks) { - // Choose a random disk on each attempt. - disk := xl.getRandomDisk() - fileInfo, err = disk.StatFile(bucket, objectPart) - if err == nil { - return fileInfo, nil +// readAll reads from bucket, object until an error or returns the data it read until io.EOF. +func readAll(disk StorageAPI, bucket, object string) ([]byte, error) { + var writer = new(bytes.Buffer) + startOffset := int64(0) + // Read until io.EOF. + for { + buf := make([]byte, blockSizeV1) + n, err := disk.ReadFile(bucket, object, startOffset, buf) + if err == io.EOF { + break + } + if err != nil && err != io.EOF { + return nil, err } - xlJSONErrCount++ // Update error count. + writer.Write(buf[:n]) + startOffset += n } - return FileInfo{}, err + return writer.Bytes(), nil } diff --git a/xl-v1.go b/xl-v1.go index e55e5a48a..1278007f6 100644 --- a/xl-v1.go +++ b/xl-v1.go @@ -27,32 +27,38 @@ import ( "github.com/minio/minio/pkg/disk" ) +// XL constants. const ( + // Format config file carries backend format specific details. formatConfigFile = "format.json" - xlMetaJSONFile = "xl.json" - uploadsJSONFile = "uploads.json" + // XL metadata file carries per object metadata. + xlMetaJSONFile = "xl.json" + // Uploads metadata file carries per multipart object metadata. + uploadsJSONFile = "uploads.json" ) -// xlObjects - Implements fs object layer. +// xlObjects - Implements XL object layer. type xlObjects struct { - storageDisks []StorageAPI - physicalDisks []string - dataBlocks int - parityBlocks int - readQuorum int - writeQuorum int + storageDisks []StorageAPI // Collection of initialized backend disks. + physicalDisks []string // Collection of regular disks. + dataBlocks int // dataBlocks count caculated for erasure. + parityBlocks int // parityBlocks count calculated for erasure. + readQuorum int // readQuorum minimum required disks to read data. + writeQuorum int // writeQuorum minimum required disks to write data. + + // List pool management. listObjectMap map[listParams][]*treeWalker listObjectMapMutex *sync.Mutex } -// errMaxDisks - returned for reached maximum of disks. -var errMaxDisks = errors.New("Number of disks are higher than supported maximum count '16'") +// errXLMaxDisks - returned for reached maximum of disks. +var errXLMaxDisks = errors.New("Number of disks are higher than supported maximum count '16'") -// errMinDisks - returned for minimum number of disks. -var errMinDisks = errors.New("Number of disks are smaller than supported minimum count '8'") +// errXLMinDisks - returned for minimum number of disks. +var errXLMinDisks = errors.New("Number of disks are smaller than supported minimum count '8'") -// errNumDisks - returned for odd number of disks. -var errNumDisks = errors.New("Number of disks should be multiples of '2'") +// errXLNumDisks - returned for odd number of disks. +var errXLNumDisks = errors.New("Number of disks should be multiples of '2'") const ( // Maximum erasure blocks. @@ -61,14 +67,15 @@ const ( minErasureBlocks = 8 ) +// Validate if input disks are sufficient for initializing XL. func checkSufficientDisks(disks []string) error { // Verify total number of disks. totalDisks := len(disks) if totalDisks > maxErasureBlocks { - return errMaxDisks + return errXLMaxDisks } if totalDisks < minErasureBlocks { - return errMinDisks + return errXLMinDisks } // isEven function to verify if a given number if even. @@ -77,16 +84,16 @@ func checkSufficientDisks(disks []string) error { } // Verify if we have even number of disks. - // only combination of 8, 10, 12, 14, 16 are supported. + // only combination of 8, 12, 16 are supported. if !isEven(totalDisks) { - return errNumDisks + return errXLNumDisks } return nil } -// Depending on the disk type network or local, initialize storage layer. -func newStorageLayer(disk string) (storage StorageAPI, err error) { +// Depending on the disk type network or local, initialize storage API. +func newStorageAPI(disk string) (storage StorageAPI, err error) { if !strings.ContainsRune(disk, ':') || filepath.VolumeName(disk) != "" { // Initialize filesystem storage API. return newPosix(disk) @@ -95,37 +102,27 @@ func newStorageLayer(disk string) (storage StorageAPI, err error) { return newRPCClient(disk) } -// Initialize all storage disks to bootstrap. -func bootstrapDisks(disks []string) ([]StorageAPI, error) { - storageDisks := make([]StorageAPI, len(disks)) - for index, disk := range disks { - var err error - // Intentionally ignore disk not found errors while - // initializing POSIX, so that we have successfully - // initialized posix Storage. Subsequent calls to XL/Erasure - // will manage any errors related to disks. - storageDisks[index], err = newStorageLayer(disk) - if err != nil && err != errDiskNotFound { - return nil, err - } - } - return storageDisks, nil -} - // newXLObjects - initialize new xl object layer. func newXLObjects(disks []string) (ObjectLayer, error) { + // Validate if input disks are sufficient. if err := checkSufficientDisks(disks); err != nil { return nil, err } // Bootstrap disks. - storageDisks, err := bootstrapDisks(disks) - if err != nil { - return nil, err + storageDisks := make([]StorageAPI, len(disks)) + for index, disk := range disks { + var err error + // Intentionally ignore disk not found errors. XL will + // manage such errors internally. + storageDisks[index], err = newStorageAPI(disk) + if err != nil && err != errDiskNotFound { + return nil, err + } } - // Initialize object layer - like creating minioMetaBucket, cleaning up tmp files etc. - initObjectLayer(storageDisks...) + // Runs house keeping code, like creating minioMetaBucket, cleaning up tmp files etc. + xlHouseKeeping(storageDisks) // Load saved XL format.json and validate. newPosixDisks, err := loadFormatXL(storageDisks)