From 553fdb921123889b2abf5037973a3ec99945979c Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 25 May 2016 16:42:31 -0700 Subject: [PATCH] XL: Bring in support for object versions written during writeQuorum. (#1762) Erasure is initialized as needed depending on the quorum and onlineDisks. This way we can manage the quorum at the object layer. --- erasure-createfile.go | 31 ++++++-- erasure-readfile.go | 16 +++- erasure.go | 8 +- xl-v1-healing.go | 180 ++++++++++++++++++++++++++++++++++++++++++ xl-v1-metadata.go | 97 +++-------------------- xl-v1-multipart.go | 15 +++- xl-v1-object.go | 29 +++++-- xl-v1-utils.go | 85 ++++++++++++++++++++ xl-v1.go | 7 -- 9 files changed, 353 insertions(+), 115 deletions(-) create mode 100644 xl-v1-healing.go create mode 100644 xl-v1-utils.go diff --git a/erasure-createfile.go b/erasure-createfile.go index 007831984..3321c6fe6 100644 --- a/erasure-createfile.go +++ b/erasure-createfile.go @@ -40,15 +40,34 @@ func (e erasure) writeErasure(volume, path string, reader *io.PipeReader, wclose writers := make([]io.WriteCloser, len(e.storageDisks)) + var wwg = &sync.WaitGroup{} + var errs = make([]error, len(e.storageDisks)) + // Initialize all writers. for index, disk := range e.storageDisks { - writer, err := disk.CreateFile(volume, path) - if err != nil { - e.cleanupCreateFileOps(volume, path, writers) - reader.CloseWithError(err) - return + if disk == nil { + continue + } + wwg.Add(1) + go func(index int, disk StorageAPI) { + defer wwg.Done() + writer, err := disk.CreateFile(volume, path) + if err != nil { + errs[index] = err + return + } + writers[index] = writer + }(index, disk) + } + + wwg.Wait() // Wait for all the create file to finish in parallel. + for _, err := range errs { + if err == nil { + continue } - writers[index] = writer + e.cleanupCreateFileOps(volume, path, writers) + reader.CloseWithError(err) + return } // Allocate 4MiB block size buffer for reading. diff --git a/erasure-readfile.go b/erasure-readfile.go index 690a88886..0e247082d 100644 --- a/erasure-readfile.go +++ b/erasure-readfile.go @@ -33,18 +33,21 @@ func (e erasure) ReadFile(volume, path string, startOffset int64, totalSize int6 } var rwg = &sync.WaitGroup{} + var errs = make([]error, len(e.storageDisks)) readers := make([]io.ReadCloser, len(e.storageDisks)) for index, disk := range e.storageDisks { + if disk == nil { + continue + } rwg.Add(1) go func(index int, disk StorageAPI) { defer rwg.Done() - // If disk.ReadFile returns error and we don't have read - // quorum it will be taken care as ReedSolomon.Reconstruct() - // will fail later. offset := int64(0) if reader, err := disk.ReadFile(volume, path, offset); err == nil { readers[index] = reader + } else { + errs[index] = err } }(index, disk) } @@ -52,6 +55,13 @@ func (e erasure) ReadFile(volume, path string, startOffset int64, totalSize int6 // Wait for all readers. rwg.Wait() + // For any errors in reader, we should just error out. + for _, err := range errs { + if err != nil { + return nil, err + } + } + // Initialize pipe. pipeReader, pipeWriter := io.Pipe() diff --git a/erasure.go b/erasure.go index 45d121d2f..1eb04b807 100644 --- a/erasure.go +++ b/erasure.go @@ -34,7 +34,7 @@ type erasure struct { var errUnexpected = errors.New("Unexpected error - please report at https://github.com/minio/minio/issues") // newErasure instantiate a new erasure. -func newErasure(disks []StorageAPI) (*erasure, error) { +func newErasure(disks []StorageAPI) *erasure { // Initialize E. e := &erasure{} @@ -43,9 +43,7 @@ func newErasure(disks []StorageAPI) (*erasure, error) { // Initialize reed solomon encoding. rs, err := reedsolomon.New(dataBlocks, parityBlocks) - if err != nil { - return nil, err - } + fatalIf(err, "Unable to initialize reedsolomon package.") // Save the reedsolomon. e.DataBlocks = dataBlocks @@ -56,5 +54,5 @@ func newErasure(disks []StorageAPI) (*erasure, error) { e.storageDisks = disks // Return successfully initialized. - return e, nil + return e } diff --git a/xl-v1-healing.go b/xl-v1-healing.go new file mode 100644 index 000000000..a627baad9 --- /dev/null +++ b/xl-v1-healing.go @@ -0,0 +1,180 @@ +package main + +import ( + "path" + "sync" +) + +// Get the highest integer from a given integer slice. +func highestInt(intSlice []int64) (highestInteger int64) { + highestInteger = int64(1) + for _, integer := range intSlice { + if highestInteger < integer { + highestInteger = integer + } + } + return highestInteger +} + +// Extracts objects versions from xlMetaV1 slice and returns version slice. +func listObjectVersions(partsMetadata []xlMetaV1, errs []error) (versions []int64) { + versions = make([]int64, len(partsMetadata)) + for index, metadata := range partsMetadata { + if errs[index] == nil { + versions[index] = metadata.Stat.Version + } else { + versions[index] = -1 + } + } + return versions +} + +// Reads all `xl.json` metadata as a xlMetaV1 slice. +// Returns error slice indicating the failed metadata reads. +func (xl xlObjects) readAllXLMetadata(bucket, object string) ([]xlMetaV1, []error) { + errs := make([]error, len(xl.storageDisks)) + metadataArray := make([]xlMetaV1, len(xl.storageDisks)) + xlMetaPath := path.Join(object, xlMetaJSONFile) + var wg = &sync.WaitGroup{} + for index, disk := range xl.storageDisks { + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + offset := int64(0) + metadataReader, err := disk.ReadFile(bucket, xlMetaPath, offset) + if err != nil { + errs[index] = err + return + } + defer metadataReader.Close() + + _, err = metadataArray[index].ReadFrom(metadataReader) + if err != nil { + // Unable to parse xl.json, set error. + errs[index] = err + return + } + }(index, disk) + } + + // Wait for all the routines to finish. + wg.Wait() + + // Return all the metadata. + return metadataArray, errs +} + +// error based on total errors and read quorum. +func (xl xlObjects) reduceError(errs []error) error { + fileNotFoundCount := 0 + longNameCount := 0 + diskNotFoundCount := 0 + volumeNotFoundCount := 0 + diskAccessDeniedCount := 0 + for _, err := range errs { + if err == errFileNotFound { + fileNotFoundCount++ + } else if err == errFileNameTooLong { + longNameCount++ + } else if err == errDiskNotFound { + diskNotFoundCount++ + } else if err == errVolumeAccessDenied { + diskAccessDeniedCount++ + } else if err == errVolumeNotFound { + volumeNotFoundCount++ + } + } + // If we have errors with 'file not found' greater than + // readQuorum, return as errFileNotFound. + // else if we have errors with 'volume not found' + // greater than readQuorum, return as errVolumeNotFound. + if fileNotFoundCount > len(xl.storageDisks)-xl.readQuorum { + return errFileNotFound + } else if longNameCount > len(xl.storageDisks)-xl.readQuorum { + return errFileNameTooLong + } else if volumeNotFoundCount > len(xl.storageDisks)-xl.readQuorum { + return errVolumeNotFound + } + // If we have errors with disk not found equal to the + // number of disks, return as errDiskNotFound. + if diskNotFoundCount == len(xl.storageDisks) { + return errDiskNotFound + } else if diskNotFoundCount > len(xl.storageDisks)-xl.readQuorum { + // If we have errors with 'disk not found' + // greater than readQuorum, return as errFileNotFound. + return errFileNotFound + } + // If we have errors with disk not found equal to the + // number of disks, return as errDiskNotFound. + if diskAccessDeniedCount == len(xl.storageDisks) { + return errVolumeAccessDenied + } + return nil +} + +// Similar to 'len(slice)' but returns the actualelements count +// skipping the unallocated elements. +func diskCount(disks []StorageAPI) int { + diskCount := 0 + for _, disk := range disks { + if disk == nil { + continue + } + diskCount++ + } + return diskCount +} + +func (xl xlObjects) shouldHeal(onlineDisks []StorageAPI) (heal bool) { + onlineDiskCount := diskCount(onlineDisks) + // If online disks count is lesser than configured disks, most + // probably we need to heal the file, additionally verify if the + // count is lesser than readQuorum, if not we throw an error. + if onlineDiskCount < len(xl.storageDisks) { + // Online disks lesser than total storage disks, needs to be + // healed. unless we do not have readQuorum. + heal = true + // Verify if online disks count are lesser than readQuorum + // threshold, return an error. + if onlineDiskCount < xl.readQuorum { + errorIf(errReadQuorum, "Unable to establish read quorum, disks are offline.") + return false + } + } + return heal +} + +// Returns slice of online disks needed. +// - slice returing readable disks. +// - xlMetaV1 +// - bool value indicating if healing is needed. +// - error if any. +func (xl xlObjects) listOnlineDisks(bucket, object string) (onlineDisks []StorageAPI, version int64, err error) { + onlineDisks = make([]StorageAPI, len(xl.storageDisks)) + partsMetadata, errs := xl.readAllXLMetadata(bucket, object) + if err = xl.reduceError(errs); err != nil { + if err == errFileNotFound { + // For file not found, treat as if disks are available + // return all the configured ones. + onlineDisks = xl.storageDisks + return onlineDisks, 1, nil + } + return nil, 0, err + } + highestVersion := int64(0) + // List all the file versions from partsMetadata list. + versions := listObjectVersions(partsMetadata, errs) + + // Get highest object version. + highestVersion = highestInt(versions) + + // Pick online disks with version set to highestVersion. + for index, version := range versions { + if version == highestVersion { + onlineDisks[index] = xl.storageDisks[index] + } else { + onlineDisks[index] = nil + } + } + return onlineDisks, highestVersion, nil +} diff --git a/xl-v1-metadata.go b/xl-v1-metadata.go index 5d87da137..65c447993 100644 --- a/xl-v1-metadata.go +++ b/xl-v1-metadata.go @@ -147,85 +147,6 @@ func (m xlMetaV1) getPartIndexOffset(offset int64) (partIndex int, partOffset in return 0, 0, err } -// This function does the following check, suppose -// object is "a/b/c/d", stat makes sure that objects ""a/b/c"" -// "a/b" and "a" do not exist. -func (xl xlObjects) parentDirIsObject(bucket, parent string) bool { - var isParentDirObject func(string) bool - isParentDirObject = func(p string) bool { - if p == "." { - return false - } - if xl.isObject(bucket, p) { - // If there is already a file at prefix "p" return error. - return true - } - // Check if there is a file as one of the parent paths. - return isParentDirObject(path.Dir(p)) - } - return isParentDirObject(parent) -} - -func (xl xlObjects) isObject(bucket, prefix string) bool { - // Create errs and volInfo slices of storageDisks size. - var errs = make([]error, len(xl.storageDisks)) - - // Allocate a new waitgroup. - var wg = &sync.WaitGroup{} - for index, disk := range xl.storageDisks { - wg.Add(1) - // Stat file on all the disks in a routine. - go func(index int, disk StorageAPI) { - defer wg.Done() - _, err := disk.StatFile(bucket, path.Join(prefix, xlMetaJSONFile)) - if err != nil { - errs[index] = err - return - } - errs[index] = nil - }(index, disk) - } - - // Wait for all the Stat operations to finish. - wg.Wait() - - var errFileNotFoundCount int - for _, err := range errs { - if err != nil { - if err == errFileNotFound { - errFileNotFoundCount++ - // If we have errors with file not found greater than allowed read - // quorum we return err as errFileNotFound. - if errFileNotFoundCount > len(xl.storageDisks)-xl.readQuorum { - return false - } - continue - } - errorIf(err, "Unable to access file "+path.Join(bucket, prefix)) - return false - } - } - return true -} - -// statPart - stat a part file. -func (xl xlObjects) statPart(bucket, objectPart string) (fileInfo FileInfo, err error) { - // Count for errors encountered. - var xlJSONErrCount = 0 - - // Return the first success entry based on the selected random disk. - for xlJSONErrCount < len(xl.storageDisks) { - // Choose a random disk on each attempt, do not hit the same disk all the time. - disk := xl.getRandomDisk() // Pick a random disk. - fileInfo, err = disk.StatFile(bucket, objectPart) - if err == nil { - return fileInfo, nil - } - xlJSONErrCount++ // Update error count. - } - return FileInfo{}, err -} - // readXLMetadata - read xl metadata. func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err error) { // Count for errors encountered. @@ -249,15 +170,6 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err return xlMetaV1{}, err } -// getDiskDistribution - get disk distribution. -func (xl xlObjects) getDiskDistribution() []int { - var distribution = make([]int, len(xl.storageDisks)) - for index := range xl.storageDisks { - distribution[index] = index + 1 - } - return distribution -} - // writeXLJson - write `xl.json` on all disks in order. func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) error { var wg = &sync.WaitGroup{} @@ -321,3 +233,12 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro } return nil } + +// getDiskDistribution - get disk distribution. +func (xl xlObjects) getDiskDistribution() []int { + var distribution = make([]int, len(xl.storageDisks)) + for index := range xl.storageDisks { + distribution[index] = index + 1 + } + return distribution +} diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index 394957acc..380da085f 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -69,6 +69,8 @@ func (xl xlObjects) newMultipartUploadCommon(bucket string, object string, meta } meta["content-type"] = contentType } + xlMeta.Stat.ModTime = time.Now().UTC() + xlMeta.Stat.Version = 1 xlMeta.Meta = meta // This lock needs to be held for any changes to the directory contents of ".minio/multipart/object/" @@ -123,9 +125,19 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID))) defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID))) + // List all online disks. + onlineDisks, higherVersion, err := xl.listOnlineDisks(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + if err != nil { + return "", toObjectErr(err, bucket, object) + } + if diskCount(onlineDisks) < len(xl.storageDisks) { + higherVersion++ + } + erasure := newErasure(onlineDisks) // Initialize a new erasure with online disks + partSuffix := fmt.Sprintf("object%d", partID) tmpPartPath := path.Join(tmpMetaPrefix, bucket, object, uploadID, partSuffix) - fileWriter, err := xl.erasureDisk.CreateFile(minioMetaBucket, tmpPartPath) + fileWriter, err := erasure.CreateFile(minioMetaBucket, tmpPartPath) if err != nil { return "", toObjectErr(err, minioMetaBucket, tmpPartPath) } @@ -186,6 +198,7 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s if err != nil { return "", toObjectErr(err, minioMetaBucket, uploadIDPath) } + xlMeta.Stat.Version = higherVersion xlMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size) partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix) diff --git a/xl-v1-object.go b/xl-v1-object.go index 6f10ad2f3..bb08bee69 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -29,7 +29,6 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read // Lock the object before reading. nsMutex.RLock(bucket, object) defer nsMutex.RUnlock(bucket, object) - fileReader, fileWriter := io.Pipe() // Read metadata associated with the object. xlMeta, err := xl.readXLMetadata(bucket, object) @@ -37,12 +36,21 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read return nil, toObjectErr(err, bucket, object) } + // List all online disks. + onlineDisks, _, err := xl.listOnlineDisks(bucket, object) + if err != nil { + return nil, toObjectErr(err, bucket, object) + } + erasure := newErasure(onlineDisks) // Initialize a new erasure with online disks + // Get part index offset. partIndex, offset, err := xlMeta.getPartIndexOffset(startOffset) if err != nil { return nil, toObjectErr(err, bucket, object) } + fileReader, fileWriter := io.Pipe() + // Hold a read lock once more which can be released after the following go-routine ends. // We hold RLock once more because the current function would return before the go routine below // executes and hence releasing the read lock (because of defer'ed nsMutex.RUnlock() call). @@ -51,9 +59,9 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read defer nsMutex.RUnlock(bucket, object) for ; partIndex < len(xlMeta.Parts); partIndex++ { part := xlMeta.Parts[partIndex] - r, err := xl.erasureDisk.ReadFile(bucket, pathJoin(object, part.Name), offset, part.Size) + r, err := erasure.ReadFile(bucket, pathJoin(object, part.Name), offset, part.Size) if err != nil { - fileWriter.CloseWithError(err) + fileWriter.CloseWithError(toObjectErr(err, bucket, object)) return } // Reset offset to 0 as it would be non-0 only for the first loop if startOffset is non-0. @@ -65,7 +73,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read case io.ReadCloser: reader.Close() } - fileWriter.CloseWithError(err) + fileWriter.CloseWithError(toObjectErr(err, bucket, object)) return } // Close the readerCloser that reads multiparts of an object from the xl storage layer. @@ -189,7 +197,17 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. tempErasureObj := path.Join(tmpMetaPrefix, bucket, object, "object1") tempObj := path.Join(tmpMetaPrefix, bucket, object) - fileWriter, err := xl.erasureDisk.CreateFile(minioMetaBucket, tempErasureObj) + + onlineDisks, higherVersion, err := xl.listOnlineDisks(bucket, object) + if err != nil { + return "", toObjectErr(err, bucket, object) + } + if diskCount(onlineDisks) < len(xl.storageDisks) { + // Increment version only if we have online disks less than configured storage disks. + higherVersion++ + } + erasure := newErasure(onlineDisks) // Initialize a new erasure with online disks + fileWriter, err := erasure.CreateFile(minioMetaBucket, tempErasureObj) if err != nil { return "", toObjectErr(err, bucket, object) } @@ -276,6 +294,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. xlMeta.Meta = metadata xlMeta.Stat.Size = size xlMeta.Stat.ModTime = modTime + xlMeta.Stat.Version = higherVersion xlMeta.AddObjectPart(1, "object1", newMD5Hex, xlMeta.Stat.Size) if err = xl.writeXLMetadata(bucket, object, xlMeta); err != nil { return "", toObjectErr(err, bucket, object) diff --git a/xl-v1-utils.go b/xl-v1-utils.go new file mode 100644 index 000000000..bb1edcb0f --- /dev/null +++ b/xl-v1-utils.go @@ -0,0 +1,85 @@ +package main + +import ( + "path" + "sync" +) + +// This function does the following check, suppose +// object is "a/b/c/d", stat makes sure that objects ""a/b/c"" +// "a/b" and "a" do not exist. +func (xl xlObjects) parentDirIsObject(bucket, parent string) bool { + var isParentDirObject func(string) bool + isParentDirObject = func(p string) bool { + if p == "." { + return false + } + if xl.isObject(bucket, p) { + // If there is already a file at prefix "p" return error. + return true + } + // Check if there is a file as one of the parent paths. + return isParentDirObject(path.Dir(p)) + } + return isParentDirObject(parent) +} + +func (xl xlObjects) isObject(bucket, prefix string) bool { + // Create errs and volInfo slices of storageDisks size. + var errs = make([]error, len(xl.storageDisks)) + + // Allocate a new waitgroup. + var wg = &sync.WaitGroup{} + for index, disk := range xl.storageDisks { + wg.Add(1) + // Stat file on all the disks in a routine. + go func(index int, disk StorageAPI) { + defer wg.Done() + _, err := disk.StatFile(bucket, path.Join(prefix, xlMetaJSONFile)) + if err != nil { + errs[index] = err + return + } + errs[index] = nil + }(index, disk) + } + + // Wait for all the Stat operations to finish. + wg.Wait() + + var errFileNotFoundCount int + for _, err := range errs { + if err != nil { + if err == errFileNotFound { + errFileNotFoundCount++ + // If we have errors with file not found greater than allowed read + // quorum we return err as errFileNotFound. + if errFileNotFoundCount > len(xl.storageDisks)-xl.readQuorum { + return false + } + continue + } + errorIf(err, "Unable to access file "+path.Join(bucket, prefix)) + return false + } + } + return true +} + +// statPart - stat a part file. +func (xl xlObjects) statPart(bucket, objectPart string) (fileInfo FileInfo, err error) { + // Count for errors encountered. + var xlJSONErrCount = 0 + + // Return the first success entry based on the selected random disk. + for xlJSONErrCount < len(xl.storageDisks) { + // Choose a random disk on each attempt, do not hit the same disk all the time. + disk := xl.getRandomDisk() // Pick a random disk. + fileInfo, err = disk.StatFile(bucket, objectPart) + if err == nil { + return fileInfo, nil + } + xlJSONErrCount++ // Update error count. + } + return FileInfo{}, err +} diff --git a/xl-v1.go b/xl-v1.go index 4475c5642..7d120bef4 100644 --- a/xl-v1.go +++ b/xl-v1.go @@ -33,7 +33,6 @@ const ( // xlObjects - Implements fs object layer. type xlObjects struct { storageDisks []StorageAPI - erasureDisk *erasure dataBlocks int parityBlocks int readQuorum int @@ -143,17 +142,11 @@ func newXLObjects(disks []string) (ObjectLayer, error) { // FIXME: healFormatXL(newDisks) - newErasureDisk, err := newErasure(newPosixDisks) - if err != nil { - return nil, err - } - // Calculate data and parity blocks. dataBlocks, parityBlocks := len(newPosixDisks)/2, len(newPosixDisks)/2 xl := xlObjects{ storageDisks: newPosixDisks, - erasureDisk: newErasureDisk, dataBlocks: dataBlocks, parityBlocks: parityBlocks, listObjectMap: make(map[listParams][]*treeWalker),