From 45240f158d6769644203290cc409aa307f1e74b2 Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Wed, 13 Jul 2016 11:56:25 -0700 Subject: [PATCH] xl: Make namespace locking granular for PutObject (#2199) --- .travis.yml | 5 ++-- xl-v1-object.go | 35 ++++++++++++++++++-------- xl-v1-utils.go | 60 ++++++++++++--------------------------------- xl-v1-utils_test.go | 26 +++++++++----------- 4 files changed, 55 insertions(+), 71 deletions(-) diff --git a/.travis.yml b/.travis.yml index 9cab4601a..5fe785c99 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,9 +4,10 @@ language: go os: - linux -- osx +# Enable OSX builds when travis service is back. +#- osx -osx_image: xcode7.2 +#osx_image: xcode7.2 env: - ARCH=x86_64 diff --git a/xl-v1-object.go b/xl-v1-object.go index cdde92308..28c36fc60 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -68,7 +68,10 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i } // If all the disks returned error, we return error. - if err := reduceErrs(errs); err != nil { + if errCount, err := reduceErrs(errs); err != nil { + if errCount < xl.readQuorum { + return toObjectErr(errXLReadQuorum, bucket, object) + } return toObjectErr(err, bucket, object) } @@ -368,20 +371,25 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. minioMetaTmpBucket := path.Join(minioMetaBucket, tmpMetaPrefix) tempObj := uniqueID - // Lock the object. - nsMutex.Lock(bucket, object) - defer nsMutex.Unlock(bucket, object) - - // Initialize xl meta. - xlMeta := newXLMetaV1(object, xl.dataBlocks, xl.parityBlocks) - + nsMutex.RLock(bucket, object) // Read metadata associated with the object from all disks. partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object) + nsMutex.RUnlock(bucket, object) + // Do we have write quroum?. if !isDiskQuorum(errs, xl.writeQuorum) { return "", toObjectErr(errXLWriteQuorum, bucket, object) } + // errFileNotFound is handled specially since it's OK for the object to + // not exists in the namespace yet. + if errCount, reducedErr := reduceErrs(errs); reducedErr != nil && reducedErr != errFileNotFound { + if errCount < xl.writeQuorum { + return "", toObjectErr(errXLWriteQuorum, bucket, object) + } + return "", toObjectErr(reducedErr, bucket, object) + } + // List all online disks. onlineDisks, _ := xl.listOnlineDisks(partsMetadata, errs) @@ -426,6 +434,9 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. // Tee reader combines incoming data stream and md5, data read from input stream is written to md5. teeReader := io.TeeReader(limitDataReader, mw) + // Initialize xl meta. + xlMeta := newXLMetaV1(object, xl.dataBlocks, xl.parityBlocks) + // Collect all the previous erasure infos across the disk. var eInfos []erasureInfo for range onlineDisks { @@ -483,6 +494,10 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. } } + // Lock the object. + nsMutex.Lock(bucket, object) + defer nsMutex.Unlock(bucket, object) + // Check if an object is present as one of the parent dir. // -- FIXME. (needs a new kind of lock). if xl.parentDirIsObject(bucket, path.Dir(object)) { @@ -513,12 +528,12 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. } // Write unique `xl.json` for each disk. - if err = writeUniqueXLMetadata(xl.storageDisks, minioMetaTmpBucket, tempObj, partsMetadata, xl.writeQuorum, xl.readQuorum); err != nil { + if err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, xl.writeQuorum, xl.readQuorum); err != nil { return "", toObjectErr(err, bucket, object) } // Rename the successfully written temporary object to final location. - err = renameObject(xl.storageDisks, minioMetaTmpBucket, tempObj, bucket, object, xl.writeQuorum, xl.readQuorum) + err = renameObject(onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, xl.writeQuorum, xl.readQuorum) if err != nil { return "", toObjectErr(err, bucket, object) } diff --git a/xl-v1-utils.go b/xl-v1-utils.go index a879281f0..23ca47aa1 100644 --- a/xl-v1-utils.go +++ b/xl-v1-utils.go @@ -23,57 +23,27 @@ import ( "path" ) -// Returns nil even if one of the slice elements is nil. -// Else returns the error which occurs the most. -func reduceErrs(errs []error) error { - // In case the error type is not in the known error list. - var unknownErr = errors.New("unknown error") - var errTypes = []struct { - err error // error type - count int // occurrence count - }{ - // List of known error types. Any new type that can be returned from StorageAPI should - // be added to this list. Most common errors are listed here. - {errDiskNotFound, 0}, {errFaultyDisk, 0}, {errFileAccessDenied, 0}, - {errFileNotFound, 0}, {errFileNameTooLong, 0}, {errVolumeNotFound, 0}, - {errDiskFull, 0}, - // unknownErr count - count of the number of unknown errors. - {unknownErr, 0}, - } - // In case unknownErr count occurs maximum number of times, unknownErrType is used to - // to store it so that it can be used for the return error type. - var unknownErrType error +// Returns number of errors that occurred the most (incl. nil) and the +// corresponding error value. N B when there is more than one error value that +// occurs maximum number of times, the error value returned depends on how +// golang's map orders keys. This doesn't affect correctness as long as quorum +// value is greater than or equal to simple majority, since none of the equally +// maximal values would occur quorum or more number of times. - // For each err in []errs increment the corresponding count value. +func reduceErrs(errs []error) (int, error) { + errorCounts := make(map[error]int) for _, err := range errs { - if err == nil { - // Return nil even if one of the elements is nil. - return nil - } - for i := range errTypes { - if errTypes[i].err == err { - errTypes[i].count++ - break - } - if errTypes[i].err == unknownErr { - errTypes[i].count++ - unknownErrType = err - break - } - } + errorCounts[err]++ } max := 0 - // Get the error type which has the maximum count. - for i, errType := range errTypes { - if errType.count > errTypes[max].count { - max = i + var errMax error + for err, count := range errorCounts { + if max < count { + max = count + errMax = err } } - if errTypes[max].err == unknownErr { - // Return the unknown error. - return unknownErrType - } - return errTypes[max].err + return max, errMax } // Validates if we have quorum based on the errors related to disk only. diff --git a/xl-v1-utils_test.go b/xl-v1-utils_test.go index 7745c5eb5..beb3097e7 100644 --- a/xl-v1-utils_test.go +++ b/xl-v1-utils_test.go @@ -17,27 +17,25 @@ package main import "testing" -import "syscall" // Test for reduceErrs. func TestReduceErrs(t *testing.T) { testCases := []struct { - errs []error - err error + errs []error + err error + count int }{ - {[]error{errDiskNotFound, errDiskNotFound, errDiskFull}, errDiskNotFound}, - {[]error{errDiskNotFound, errDiskFull, errDiskFull}, errDiskFull}, - {[]error{errDiskFull, errDiskNotFound, errDiskFull}, errDiskFull}, - // A case for error not in the known errors list (refer to func reduceErrs) - {[]error{errDiskFull, syscall.EEXIST, syscall.EEXIST}, syscall.EEXIST}, - {[]error{errDiskNotFound, errDiskNotFound, nil}, nil}, - {[]error{nil, nil, nil}, nil}, + {[]error{errDiskNotFound, errDiskNotFound, errDiskFull}, errDiskNotFound, 2}, + {[]error{errDiskFull, errDiskNotFound, nil, nil}, nil, 2}, + {[]error{}, nil, 0}, } for i, testCase := range testCases { - expected := testCase.err - got := reduceErrs(testCase.errs) - if expected != got { - t.Errorf("Test %d : expected %s, got %s", i+1, expected, got) + gotMax, gotErr := reduceErrs(testCase.errs) + if testCase.err != gotErr { + t.Errorf("Test %d : expected %s, got %s", i+1, testCase.err, gotErr) + } + if testCase.count != gotMax { + t.Errorf("Test %d : expected %d, got %d", i+1, testCase.count, gotMax) } } }