xl: Make namespace locking granular for PutObject (#2199)

master
Krishnan Parthasarathi 9 years ago committed by Harshavardhana
parent 0bd6b67ca5
commit 45240f158d
  1. 5
      .travis.yml
  2. 35
      xl-v1-object.go
  3. 60
      xl-v1-utils.go
  4. 26
      xl-v1-utils_test.go

@ -4,9 +4,10 @@ language: go
os:
- linux
- osx
# Enable OSX builds when travis service is back.
#- osx
osx_image: xcode7.2
#osx_image: xcode7.2
env:
- ARCH=x86_64

@ -68,7 +68,10 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
}
// If all the disks returned error, we return error.
if err := reduceErrs(errs); err != nil {
if errCount, err := reduceErrs(errs); err != nil {
if errCount < xl.readQuorum {
return toObjectErr(errXLReadQuorum, bucket, object)
}
return toObjectErr(err, bucket, object)
}
@ -368,20 +371,25 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
minioMetaTmpBucket := path.Join(minioMetaBucket, tmpMetaPrefix)
tempObj := uniqueID
// Lock the object.
nsMutex.Lock(bucket, object)
defer nsMutex.Unlock(bucket, object)
// Initialize xl meta.
xlMeta := newXLMetaV1(object, xl.dataBlocks, xl.parityBlocks)
nsMutex.RLock(bucket, object)
// Read metadata associated with the object from all disks.
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
nsMutex.RUnlock(bucket, object)
// Do we have write quroum?.
if !isDiskQuorum(errs, xl.writeQuorum) {
return "", toObjectErr(errXLWriteQuorum, bucket, object)
}
// errFileNotFound is handled specially since it's OK for the object to
// not exists in the namespace yet.
if errCount, reducedErr := reduceErrs(errs); reducedErr != nil && reducedErr != errFileNotFound {
if errCount < xl.writeQuorum {
return "", toObjectErr(errXLWriteQuorum, bucket, object)
}
return "", toObjectErr(reducedErr, bucket, object)
}
// List all online disks.
onlineDisks, _ := xl.listOnlineDisks(partsMetadata, errs)
@ -426,6 +434,9 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// Tee reader combines incoming data stream and md5, data read from input stream is written to md5.
teeReader := io.TeeReader(limitDataReader, mw)
// Initialize xl meta.
xlMeta := newXLMetaV1(object, xl.dataBlocks, xl.parityBlocks)
// Collect all the previous erasure infos across the disk.
var eInfos []erasureInfo
for range onlineDisks {
@ -483,6 +494,10 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
}
}
// Lock the object.
nsMutex.Lock(bucket, object)
defer nsMutex.Unlock(bucket, object)
// Check if an object is present as one of the parent dir.
// -- FIXME. (needs a new kind of lock).
if xl.parentDirIsObject(bucket, path.Dir(object)) {
@ -513,12 +528,12 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
}
// Write unique `xl.json` for each disk.
if err = writeUniqueXLMetadata(xl.storageDisks, minioMetaTmpBucket, tempObj, partsMetadata, xl.writeQuorum, xl.readQuorum); err != nil {
if err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, xl.writeQuorum, xl.readQuorum); err != nil {
return "", toObjectErr(err, bucket, object)
}
// Rename the successfully written temporary object to final location.
err = renameObject(xl.storageDisks, minioMetaTmpBucket, tempObj, bucket, object, xl.writeQuorum, xl.readQuorum)
err = renameObject(onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, xl.writeQuorum, xl.readQuorum)
if err != nil {
return "", toObjectErr(err, bucket, object)
}

@ -23,57 +23,27 @@ import (
"path"
)
// Returns nil even if one of the slice elements is nil.
// Else returns the error which occurs the most.
func reduceErrs(errs []error) error {
// In case the error type is not in the known error list.
var unknownErr = errors.New("unknown error")
var errTypes = []struct {
err error // error type
count int // occurrence count
}{
// List of known error types. Any new type that can be returned from StorageAPI should
// be added to this list. Most common errors are listed here.
{errDiskNotFound, 0}, {errFaultyDisk, 0}, {errFileAccessDenied, 0},
{errFileNotFound, 0}, {errFileNameTooLong, 0}, {errVolumeNotFound, 0},
{errDiskFull, 0},
// unknownErr count - count of the number of unknown errors.
{unknownErr, 0},
}
// In case unknownErr count occurs maximum number of times, unknownErrType is used to
// to store it so that it can be used for the return error type.
var unknownErrType error
// Returns number of errors that occurred the most (incl. nil) and the
// corresponding error value. N B when there is more than one error value that
// occurs maximum number of times, the error value returned depends on how
// golang's map orders keys. This doesn't affect correctness as long as quorum
// value is greater than or equal to simple majority, since none of the equally
// maximal values would occur quorum or more number of times.
// For each err in []errs increment the corresponding count value.
func reduceErrs(errs []error) (int, error) {
errorCounts := make(map[error]int)
for _, err := range errs {
if err == nil {
// Return nil even if one of the elements is nil.
return nil
}
for i := range errTypes {
if errTypes[i].err == err {
errTypes[i].count++
break
}
if errTypes[i].err == unknownErr {
errTypes[i].count++
unknownErrType = err
break
}
}
errorCounts[err]++
}
max := 0
// Get the error type which has the maximum count.
for i, errType := range errTypes {
if errType.count > errTypes[max].count {
max = i
var errMax error
for err, count := range errorCounts {
if max < count {
max = count
errMax = err
}
}
if errTypes[max].err == unknownErr {
// Return the unknown error.
return unknownErrType
}
return errTypes[max].err
return max, errMax
}
// Validates if we have quorum based on the errors related to disk only.

@ -17,27 +17,25 @@
package main
import "testing"
import "syscall"
// Test for reduceErrs.
func TestReduceErrs(t *testing.T) {
testCases := []struct {
errs []error
err error
errs []error
err error
count int
}{
{[]error{errDiskNotFound, errDiskNotFound, errDiskFull}, errDiskNotFound},
{[]error{errDiskNotFound, errDiskFull, errDiskFull}, errDiskFull},
{[]error{errDiskFull, errDiskNotFound, errDiskFull}, errDiskFull},
// A case for error not in the known errors list (refer to func reduceErrs)
{[]error{errDiskFull, syscall.EEXIST, syscall.EEXIST}, syscall.EEXIST},
{[]error{errDiskNotFound, errDiskNotFound, nil}, nil},
{[]error{nil, nil, nil}, nil},
{[]error{errDiskNotFound, errDiskNotFound, errDiskFull}, errDiskNotFound, 2},
{[]error{errDiskFull, errDiskNotFound, nil, nil}, nil, 2},
{[]error{}, nil, 0},
}
for i, testCase := range testCases {
expected := testCase.err
got := reduceErrs(testCase.errs)
if expected != got {
t.Errorf("Test %d : expected %s, got %s", i+1, expected, got)
gotMax, gotErr := reduceErrs(testCase.errs)
if testCase.err != gotErr {
t.Errorf("Test %d : expected %s, got %s", i+1, testCase.err, gotErr)
}
if testCase.count != gotMax {
t.Errorf("Test %d : expected %d, got %d", i+1, testCase.count, gotMax)
}
}
}

Loading…
Cancel
Save