XL: Bring in support for object versions written during writeQuorum. (#1762)

Erasure is initialized as needed depending on the quorum and onlineDisks.
This way we can manage the quorum at the object layer.
master
Harshavardhana 9 years ago committed by Harshavardhana
parent cae4782973
commit 553fdb9211
  1. 31
      erasure-createfile.go
  2. 16
      erasure-readfile.go
  3. 8
      erasure.go
  4. 180
      xl-v1-healing.go
  5. 97
      xl-v1-metadata.go
  6. 15
      xl-v1-multipart.go
  7. 29
      xl-v1-object.go
  8. 85
      xl-v1-utils.go
  9. 7
      xl-v1.go

@ -40,15 +40,34 @@ func (e erasure) writeErasure(volume, path string, reader *io.PipeReader, wclose
writers := make([]io.WriteCloser, len(e.storageDisks))
var wwg = &sync.WaitGroup{}
var errs = make([]error, len(e.storageDisks))
// Initialize all writers.
for index, disk := range e.storageDisks {
writer, err := disk.CreateFile(volume, path)
if err != nil {
e.cleanupCreateFileOps(volume, path, writers)
reader.CloseWithError(err)
return
if disk == nil {
continue
}
wwg.Add(1)
go func(index int, disk StorageAPI) {
defer wwg.Done()
writer, err := disk.CreateFile(volume, path)
if err != nil {
errs[index] = err
return
}
writers[index] = writer
}(index, disk)
}
wwg.Wait() // Wait for all the create file to finish in parallel.
for _, err := range errs {
if err == nil {
continue
}
writers[index] = writer
e.cleanupCreateFileOps(volume, path, writers)
reader.CloseWithError(err)
return
}
// Allocate 4MiB block size buffer for reading.

@ -33,18 +33,21 @@ func (e erasure) ReadFile(volume, path string, startOffset int64, totalSize int6
}
var rwg = &sync.WaitGroup{}
var errs = make([]error, len(e.storageDisks))
readers := make([]io.ReadCloser, len(e.storageDisks))
for index, disk := range e.storageDisks {
if disk == nil {
continue
}
rwg.Add(1)
go func(index int, disk StorageAPI) {
defer rwg.Done()
// If disk.ReadFile returns error and we don't have read
// quorum it will be taken care as ReedSolomon.Reconstruct()
// will fail later.
offset := int64(0)
if reader, err := disk.ReadFile(volume, path, offset); err == nil {
readers[index] = reader
} else {
errs[index] = err
}
}(index, disk)
}
@ -52,6 +55,13 @@ func (e erasure) ReadFile(volume, path string, startOffset int64, totalSize int6
// Wait for all readers.
rwg.Wait()
// For any errors in reader, we should just error out.
for _, err := range errs {
if err != nil {
return nil, err
}
}
// Initialize pipe.
pipeReader, pipeWriter := io.Pipe()

@ -34,7 +34,7 @@ type erasure struct {
var errUnexpected = errors.New("Unexpected error - please report at https://github.com/minio/minio/issues")
// newErasure instantiate a new erasure.
func newErasure(disks []StorageAPI) (*erasure, error) {
func newErasure(disks []StorageAPI) *erasure {
// Initialize E.
e := &erasure{}
@ -43,9 +43,7 @@ func newErasure(disks []StorageAPI) (*erasure, error) {
// Initialize reed solomon encoding.
rs, err := reedsolomon.New(dataBlocks, parityBlocks)
if err != nil {
return nil, err
}
fatalIf(err, "Unable to initialize reedsolomon package.")
// Save the reedsolomon.
e.DataBlocks = dataBlocks
@ -56,5 +54,5 @@ func newErasure(disks []StorageAPI) (*erasure, error) {
e.storageDisks = disks
// Return successfully initialized.
return e, nil
return e
}

@ -0,0 +1,180 @@
package main
import (
"path"
"sync"
)
// Get the highest integer from a given integer slice.
func highestInt(intSlice []int64) (highestInteger int64) {
highestInteger = int64(1)
for _, integer := range intSlice {
if highestInteger < integer {
highestInteger = integer
}
}
return highestInteger
}
// Extracts objects versions from xlMetaV1 slice and returns version slice.
func listObjectVersions(partsMetadata []xlMetaV1, errs []error) (versions []int64) {
versions = make([]int64, len(partsMetadata))
for index, metadata := range partsMetadata {
if errs[index] == nil {
versions[index] = metadata.Stat.Version
} else {
versions[index] = -1
}
}
return versions
}
// Reads all `xl.json` metadata as a xlMetaV1 slice.
// Returns error slice indicating the failed metadata reads.
func (xl xlObjects) readAllXLMetadata(bucket, object string) ([]xlMetaV1, []error) {
errs := make([]error, len(xl.storageDisks))
metadataArray := make([]xlMetaV1, len(xl.storageDisks))
xlMetaPath := path.Join(object, xlMetaJSONFile)
var wg = &sync.WaitGroup{}
for index, disk := range xl.storageDisks {
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
offset := int64(0)
metadataReader, err := disk.ReadFile(bucket, xlMetaPath, offset)
if err != nil {
errs[index] = err
return
}
defer metadataReader.Close()
_, err = metadataArray[index].ReadFrom(metadataReader)
if err != nil {
// Unable to parse xl.json, set error.
errs[index] = err
return
}
}(index, disk)
}
// Wait for all the routines to finish.
wg.Wait()
// Return all the metadata.
return metadataArray, errs
}
// error based on total errors and read quorum.
func (xl xlObjects) reduceError(errs []error) error {
fileNotFoundCount := 0
longNameCount := 0
diskNotFoundCount := 0
volumeNotFoundCount := 0
diskAccessDeniedCount := 0
for _, err := range errs {
if err == errFileNotFound {
fileNotFoundCount++
} else if err == errFileNameTooLong {
longNameCount++
} else if err == errDiskNotFound {
diskNotFoundCount++
} else if err == errVolumeAccessDenied {
diskAccessDeniedCount++
} else if err == errVolumeNotFound {
volumeNotFoundCount++
}
}
// If we have errors with 'file not found' greater than
// readQuorum, return as errFileNotFound.
// else if we have errors with 'volume not found'
// greater than readQuorum, return as errVolumeNotFound.
if fileNotFoundCount > len(xl.storageDisks)-xl.readQuorum {
return errFileNotFound
} else if longNameCount > len(xl.storageDisks)-xl.readQuorum {
return errFileNameTooLong
} else if volumeNotFoundCount > len(xl.storageDisks)-xl.readQuorum {
return errVolumeNotFound
}
// If we have errors with disk not found equal to the
// number of disks, return as errDiskNotFound.
if diskNotFoundCount == len(xl.storageDisks) {
return errDiskNotFound
} else if diskNotFoundCount > len(xl.storageDisks)-xl.readQuorum {
// If we have errors with 'disk not found'
// greater than readQuorum, return as errFileNotFound.
return errFileNotFound
}
// If we have errors with disk not found equal to the
// number of disks, return as errDiskNotFound.
if diskAccessDeniedCount == len(xl.storageDisks) {
return errVolumeAccessDenied
}
return nil
}
// Similar to 'len(slice)' but returns the actualelements count
// skipping the unallocated elements.
func diskCount(disks []StorageAPI) int {
diskCount := 0
for _, disk := range disks {
if disk == nil {
continue
}
diskCount++
}
return diskCount
}
func (xl xlObjects) shouldHeal(onlineDisks []StorageAPI) (heal bool) {
onlineDiskCount := diskCount(onlineDisks)
// If online disks count is lesser than configured disks, most
// probably we need to heal the file, additionally verify if the
// count is lesser than readQuorum, if not we throw an error.
if onlineDiskCount < len(xl.storageDisks) {
// Online disks lesser than total storage disks, needs to be
// healed. unless we do not have readQuorum.
heal = true
// Verify if online disks count are lesser than readQuorum
// threshold, return an error.
if onlineDiskCount < xl.readQuorum {
errorIf(errReadQuorum, "Unable to establish read quorum, disks are offline.")
return false
}
}
return heal
}
// Returns slice of online disks needed.
// - slice returing readable disks.
// - xlMetaV1
// - bool value indicating if healing is needed.
// - error if any.
func (xl xlObjects) listOnlineDisks(bucket, object string) (onlineDisks []StorageAPI, version int64, err error) {
onlineDisks = make([]StorageAPI, len(xl.storageDisks))
partsMetadata, errs := xl.readAllXLMetadata(bucket, object)
if err = xl.reduceError(errs); err != nil {
if err == errFileNotFound {
// For file not found, treat as if disks are available
// return all the configured ones.
onlineDisks = xl.storageDisks
return onlineDisks, 1, nil
}
return nil, 0, err
}
highestVersion := int64(0)
// List all the file versions from partsMetadata list.
versions := listObjectVersions(partsMetadata, errs)
// Get highest object version.
highestVersion = highestInt(versions)
// Pick online disks with version set to highestVersion.
for index, version := range versions {
if version == highestVersion {
onlineDisks[index] = xl.storageDisks[index]
} else {
onlineDisks[index] = nil
}
}
return onlineDisks, highestVersion, nil
}

@ -147,85 +147,6 @@ func (m xlMetaV1) getPartIndexOffset(offset int64) (partIndex int, partOffset in
return 0, 0, err
}
// This function does the following check, suppose
// object is "a/b/c/d", stat makes sure that objects ""a/b/c""
// "a/b" and "a" do not exist.
func (xl xlObjects) parentDirIsObject(bucket, parent string) bool {
var isParentDirObject func(string) bool
isParentDirObject = func(p string) bool {
if p == "." {
return false
}
if xl.isObject(bucket, p) {
// If there is already a file at prefix "p" return error.
return true
}
// Check if there is a file as one of the parent paths.
return isParentDirObject(path.Dir(p))
}
return isParentDirObject(parent)
}
func (xl xlObjects) isObject(bucket, prefix string) bool {
// Create errs and volInfo slices of storageDisks size.
var errs = make([]error, len(xl.storageDisks))
// Allocate a new waitgroup.
var wg = &sync.WaitGroup{}
for index, disk := range xl.storageDisks {
wg.Add(1)
// Stat file on all the disks in a routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
_, err := disk.StatFile(bucket, path.Join(prefix, xlMetaJSONFile))
if err != nil {
errs[index] = err
return
}
errs[index] = nil
}(index, disk)
}
// Wait for all the Stat operations to finish.
wg.Wait()
var errFileNotFoundCount int
for _, err := range errs {
if err != nil {
if err == errFileNotFound {
errFileNotFoundCount++
// If we have errors with file not found greater than allowed read
// quorum we return err as errFileNotFound.
if errFileNotFoundCount > len(xl.storageDisks)-xl.readQuorum {
return false
}
continue
}
errorIf(err, "Unable to access file "+path.Join(bucket, prefix))
return false
}
}
return true
}
// statPart - stat a part file.
func (xl xlObjects) statPart(bucket, objectPart string) (fileInfo FileInfo, err error) {
// Count for errors encountered.
var xlJSONErrCount = 0
// Return the first success entry based on the selected random disk.
for xlJSONErrCount < len(xl.storageDisks) {
// Choose a random disk on each attempt, do not hit the same disk all the time.
disk := xl.getRandomDisk() // Pick a random disk.
fileInfo, err = disk.StatFile(bucket, objectPart)
if err == nil {
return fileInfo, nil
}
xlJSONErrCount++ // Update error count.
}
return FileInfo{}, err
}
// readXLMetadata - read xl metadata.
func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err error) {
// Count for errors encountered.
@ -249,15 +170,6 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err
return xlMetaV1{}, err
}
// getDiskDistribution - get disk distribution.
func (xl xlObjects) getDiskDistribution() []int {
var distribution = make([]int, len(xl.storageDisks))
for index := range xl.storageDisks {
distribution[index] = index + 1
}
return distribution
}
// writeXLJson - write `xl.json` on all disks in order.
func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) error {
var wg = &sync.WaitGroup{}
@ -321,3 +233,12 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro
}
return nil
}
// getDiskDistribution - get disk distribution.
func (xl xlObjects) getDiskDistribution() []int {
var distribution = make([]int, len(xl.storageDisks))
for index := range xl.storageDisks {
distribution[index] = index + 1
}
return distribution
}

@ -69,6 +69,8 @@ func (xl xlObjects) newMultipartUploadCommon(bucket string, object string, meta
}
meta["content-type"] = contentType
}
xlMeta.Stat.ModTime = time.Now().UTC()
xlMeta.Stat.Version = 1
xlMeta.Meta = meta
// This lock needs to be held for any changes to the directory contents of ".minio/multipart/object/"
@ -123,9 +125,19 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID)))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID)))
// List all online disks.
onlineDisks, higherVersion, err := xl.listOnlineDisks(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
if err != nil {
return "", toObjectErr(err, bucket, object)
}
if diskCount(onlineDisks) < len(xl.storageDisks) {
higherVersion++
}
erasure := newErasure(onlineDisks) // Initialize a new erasure with online disks
partSuffix := fmt.Sprintf("object%d", partID)
tmpPartPath := path.Join(tmpMetaPrefix, bucket, object, uploadID, partSuffix)
fileWriter, err := xl.erasureDisk.CreateFile(minioMetaBucket, tmpPartPath)
fileWriter, err := erasure.CreateFile(minioMetaBucket, tmpPartPath)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, tmpPartPath)
}
@ -186,6 +198,7 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s
if err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
xlMeta.Stat.Version = higherVersion
xlMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size)
partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix)

@ -29,7 +29,6 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read
// Lock the object before reading.
nsMutex.RLock(bucket, object)
defer nsMutex.RUnlock(bucket, object)
fileReader, fileWriter := io.Pipe()
// Read metadata associated with the object.
xlMeta, err := xl.readXLMetadata(bucket, object)
@ -37,12 +36,21 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read
return nil, toObjectErr(err, bucket, object)
}
// List all online disks.
onlineDisks, _, err := xl.listOnlineDisks(bucket, object)
if err != nil {
return nil, toObjectErr(err, bucket, object)
}
erasure := newErasure(onlineDisks) // Initialize a new erasure with online disks
// Get part index offset.
partIndex, offset, err := xlMeta.getPartIndexOffset(startOffset)
if err != nil {
return nil, toObjectErr(err, bucket, object)
}
fileReader, fileWriter := io.Pipe()
// Hold a read lock once more which can be released after the following go-routine ends.
// We hold RLock once more because the current function would return before the go routine below
// executes and hence releasing the read lock (because of defer'ed nsMutex.RUnlock() call).
@ -51,9 +59,9 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read
defer nsMutex.RUnlock(bucket, object)
for ; partIndex < len(xlMeta.Parts); partIndex++ {
part := xlMeta.Parts[partIndex]
r, err := xl.erasureDisk.ReadFile(bucket, pathJoin(object, part.Name), offset, part.Size)
r, err := erasure.ReadFile(bucket, pathJoin(object, part.Name), offset, part.Size)
if err != nil {
fileWriter.CloseWithError(err)
fileWriter.CloseWithError(toObjectErr(err, bucket, object))
return
}
// Reset offset to 0 as it would be non-0 only for the first loop if startOffset is non-0.
@ -65,7 +73,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read
case io.ReadCloser:
reader.Close()
}
fileWriter.CloseWithError(err)
fileWriter.CloseWithError(toObjectErr(err, bucket, object))
return
}
// Close the readerCloser that reads multiparts of an object from the xl storage layer.
@ -189,7 +197,17 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
tempErasureObj := path.Join(tmpMetaPrefix, bucket, object, "object1")
tempObj := path.Join(tmpMetaPrefix, bucket, object)
fileWriter, err := xl.erasureDisk.CreateFile(minioMetaBucket, tempErasureObj)
onlineDisks, higherVersion, err := xl.listOnlineDisks(bucket, object)
if err != nil {
return "", toObjectErr(err, bucket, object)
}
if diskCount(onlineDisks) < len(xl.storageDisks) {
// Increment version only if we have online disks less than configured storage disks.
higherVersion++
}
erasure := newErasure(onlineDisks) // Initialize a new erasure with online disks
fileWriter, err := erasure.CreateFile(minioMetaBucket, tempErasureObj)
if err != nil {
return "", toObjectErr(err, bucket, object)
}
@ -276,6 +294,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
xlMeta.Meta = metadata
xlMeta.Stat.Size = size
xlMeta.Stat.ModTime = modTime
xlMeta.Stat.Version = higherVersion
xlMeta.AddObjectPart(1, "object1", newMD5Hex, xlMeta.Stat.Size)
if err = xl.writeXLMetadata(bucket, object, xlMeta); err != nil {
return "", toObjectErr(err, bucket, object)

@ -0,0 +1,85 @@
package main
import (
"path"
"sync"
)
// This function does the following check, suppose
// object is "a/b/c/d", stat makes sure that objects ""a/b/c""
// "a/b" and "a" do not exist.
func (xl xlObjects) parentDirIsObject(bucket, parent string) bool {
var isParentDirObject func(string) bool
isParentDirObject = func(p string) bool {
if p == "." {
return false
}
if xl.isObject(bucket, p) {
// If there is already a file at prefix "p" return error.
return true
}
// Check if there is a file as one of the parent paths.
return isParentDirObject(path.Dir(p))
}
return isParentDirObject(parent)
}
func (xl xlObjects) isObject(bucket, prefix string) bool {
// Create errs and volInfo slices of storageDisks size.
var errs = make([]error, len(xl.storageDisks))
// Allocate a new waitgroup.
var wg = &sync.WaitGroup{}
for index, disk := range xl.storageDisks {
wg.Add(1)
// Stat file on all the disks in a routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
_, err := disk.StatFile(bucket, path.Join(prefix, xlMetaJSONFile))
if err != nil {
errs[index] = err
return
}
errs[index] = nil
}(index, disk)
}
// Wait for all the Stat operations to finish.
wg.Wait()
var errFileNotFoundCount int
for _, err := range errs {
if err != nil {
if err == errFileNotFound {
errFileNotFoundCount++
// If we have errors with file not found greater than allowed read
// quorum we return err as errFileNotFound.
if errFileNotFoundCount > len(xl.storageDisks)-xl.readQuorum {
return false
}
continue
}
errorIf(err, "Unable to access file "+path.Join(bucket, prefix))
return false
}
}
return true
}
// statPart - stat a part file.
func (xl xlObjects) statPart(bucket, objectPart string) (fileInfo FileInfo, err error) {
// Count for errors encountered.
var xlJSONErrCount = 0
// Return the first success entry based on the selected random disk.
for xlJSONErrCount < len(xl.storageDisks) {
// Choose a random disk on each attempt, do not hit the same disk all the time.
disk := xl.getRandomDisk() // Pick a random disk.
fileInfo, err = disk.StatFile(bucket, objectPart)
if err == nil {
return fileInfo, nil
}
xlJSONErrCount++ // Update error count.
}
return FileInfo{}, err
}

@ -33,7 +33,6 @@ const (
// xlObjects - Implements fs object layer.
type xlObjects struct {
storageDisks []StorageAPI
erasureDisk *erasure
dataBlocks int
parityBlocks int
readQuorum int
@ -143,17 +142,11 @@ func newXLObjects(disks []string) (ObjectLayer, error) {
// FIXME: healFormatXL(newDisks)
newErasureDisk, err := newErasure(newPosixDisks)
if err != nil {
return nil, err
}
// Calculate data and parity blocks.
dataBlocks, parityBlocks := len(newPosixDisks)/2, len(newPosixDisks)/2
xl := xlObjects{
storageDisks: newPosixDisks,
erasureDisk: newErasureDisk,
dataBlocks: dataBlocks,
parityBlocks: parityBlocks,
listObjectMap: make(map[listParams][]*treeWalker),

Loading…
Cancel
Save