Add support for timeouts for locks (#4377)

master
Frank Wessels 7 years ago committed by deekoder
parent 93f126364e
commit 61e0b1454a
  1. 5
      cmd/admin-handlers.go
  2. 6
      cmd/api-errors.go
  3. 25
      cmd/bucket-handlers.go
  4. 12
      cmd/bucket-notification-handlers.go
  5. 16
      cmd/bucket-policy.go
  6. 120
      cmd/dynamic-timeouts.go
  7. 207
      cmd/dynamic-timeouts_test.go
  8. 24
      cmd/event-notifier.go
  9. 24
      cmd/fs-v1-multipart.go
  10. 4
      cmd/fs-v1.go
  11. 10
      cmd/gateway-handlers.go
  12. 5
      cmd/globals.go
  13. 32
      cmd/lock-instrument.go
  14. 6
      cmd/lock-stat.go
  15. 8
      cmd/lockinfo-handlers_test.go
  16. 96
      cmd/namespace-lock.go
  17. 311
      cmd/namespace-lock_test.go
  18. 9
      cmd/object-api-errors.go
  19. 4
      cmd/object-handlers-common.go
  20. 36
      cmd/object-handlers.go
  21. 3
      cmd/typed-errors.go
  22. 14
      cmd/web-handlers.go
  23. 12
      cmd/xl-v1-healing.go
  24. 8
      cmd/xl-v1-list-objects-heal.go
  25. 40
      cmd/xl-v1-multipart.go
  26. 52
      vendor/github.com/minio/dsync/drwmutex.go
  27. 121
      vendor/github.com/minio/lsync/README.md
  28. 64
      vendor/github.com/minio/lsync/lfrequentaccess.go
  29. 78
      vendor/github.com/minio/lsync/lmutex.go
  30. 180
      vendor/github.com/minio/lsync/lrwmutex.go
  31. 142
      vendor/github.com/minio/lsync/retry.go
  32. 12
      vendor/vendor.json

@ -995,7 +995,10 @@ func (adminAPI adminAPIHandlers) SetConfigHandler(w http.ResponseWriter, r *http
// bucket name and wouldn't conflict with normal object
// operations.
configLock := globalNSMutex.NewNSLock(minioReservedBucket, minioConfigFile)
configLock.Lock()
if configLock.GetLock(globalObjectTimeout) != nil {
writeErrorResponse(w, ErrOperationTimedOut, r.URL)
return
}
defer configLock.Unlock()
// Rename the temporary config file to config.json

@ -145,6 +145,7 @@ const (
ErrInvalidObjectName
ErrInvalidResourceName
ErrServerNotInitialized
ErrOperationTimedOut
// Add new extended error codes here.
// Please open a https://github.com/minio/minio/issues before adding
// new error codes here.
@ -637,6 +638,11 @@ var errorCodeResponse = map[APIErrorCode]APIError{
Description: "Cannot respond to plain-text request from TLS-encrypted server",
HTTPStatusCode: http.StatusBadRequest,
},
ErrOperationTimedOut: {
Code: "XMinioServerTimedOut",
Description: "A timeout occurred while trying to lock a resource",
HTTPStatusCode: http.StatusRequestTimeout,
},
ErrMetadataTooLarge: {
Code: "InvalidArgument",
Description: "Your metadata headers exceed the maximum allowed metadata size.",

@ -300,13 +300,16 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
return
}
objectLock := globalNSMutex.NewNSLock(bucket, obj.ObjectName)
objectLock.Lock()
if timedOutErr := objectLock.GetLock(globalObjectTimeout); timedOutErr != nil {
dErrs[i] = timedOutErr
} else {
defer objectLock.Unlock()
dErr := objectAPI.DeleteObject(bucket, obj.ObjectName)
if dErr != nil {
dErrs[i] = dErr
}
}
}(index, object)
}
wg.Wait()
@ -405,7 +408,10 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req
}
bucketLock := globalNSMutex.NewNSLock(bucket, "")
bucketLock.Lock()
if bucketLock.GetLock(globalObjectTimeout) != nil {
writeErrorResponse(w, ErrOperationTimedOut, r.URL)
return
}
defer bucketLock.Unlock()
// Proceed to creating a bucket.
@ -550,7 +556,10 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
sha256sum := ""
objectLock := globalNSMutex.NewNSLock(bucket, object)
objectLock.Lock()
if objectLock.GetLock(globalObjectTimeout) != nil {
writeErrorResponse(w, ErrOperationTimedOut, r.URL)
return
}
defer objectLock.Unlock()
objInfo, err := objectAPI.PutObject(bucket, object, fileSize, fileBody, metadata, sha256sum)
@ -626,7 +635,10 @@ func (api objectAPIHandlers) HeadBucketHandler(w http.ResponseWriter, r *http.Re
}
bucketLock := globalNSMutex.NewNSLock(bucket, "")
bucketLock.RLock()
if bucketLock.GetRLock(globalObjectTimeout) != nil {
writeErrorResponseHeadersOnly(w, ErrOperationTimedOut)
return
}
defer bucketLock.RUnlock()
if _, err := objectAPI.GetBucketInfo(bucket); err != nil {
@ -656,7 +668,10 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http.
bucket := vars["bucket"]
bucketLock := globalNSMutex.NewNSLock(bucket, "")
bucketLock.Lock()
if bucketLock.GetLock(globalObjectTimeout) != nil {
writeErrorResponse(w, ErrOperationTimedOut, r.URL)
return
}
defer bucketLock.Unlock()
// Attempt to delete bucket.

@ -173,7 +173,9 @@ func PutBucketNotificationConfig(bucket string, ncfg *notificationConfig, objAPI
// Acquire a write lock on bucket before modifying its
// configuration.
bucketLock := globalNSMutex.NewNSLock(bucket, "")
bucketLock.Lock()
if err := bucketLock.GetLock(globalOperationTimeout); err != nil {
return err
}
// Release lock after notifying peers
defer bucketLock.Unlock()
@ -386,7 +388,9 @@ func AddBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI ObjectL
// Acquire a write lock on bucket before modifying its
// configuration.
bucketLock := globalNSMutex.NewNSLock(bucket, "")
bucketLock.Lock()
if err := bucketLock.GetLock(globalOperationTimeout); err != nil {
return err
}
// Release lock after notifying peers
defer bucketLock.Unlock()
@ -427,7 +431,9 @@ func RemoveBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI Obje
// Acquire a write lock on bucket before modifying its
// configuration.
bucketLock := globalNSMutex.NewNSLock(bucket, "")
bucketLock.Lock()
if bucketLock.GetLock(globalOperationTimeout) != nil {
return
}
// Release lock after notifying peers
defer bucketLock.Unlock()

@ -146,7 +146,9 @@ func readBucketPolicyJSON(bucket string, objAPI ObjectLayer) (bucketPolicyReader
// Acquire a read lock on policy config before reading.
objLock := globalNSMutex.NewNSLock(minioMetaBucket, policyPath)
objLock.RLock()
if err = objLock.GetRLock(globalOperationTimeout); err != nil {
return nil, err
}
defer objLock.RUnlock()
var buffer bytes.Buffer
@ -187,7 +189,9 @@ func removeBucketPolicy(bucket string, objAPI ObjectLayer) error {
policyPath := pathJoin(bucketConfigPrefix, bucket, bucketPolicyConfig)
// Acquire a write lock on policy config before modifying.
objLock := globalNSMutex.NewNSLock(minioMetaBucket, policyPath)
objLock.Lock()
if err := objLock.GetLock(globalOperationTimeout); err != nil {
return err
}
defer objLock.Unlock()
if err := objAPI.DeleteObject(minioMetaBucket, policyPath); err != nil {
errorIf(err, "Unable to remove bucket-policy on bucket %s.", bucket)
@ -210,7 +214,9 @@ func writeBucketPolicy(bucket string, objAPI ObjectLayer, bpy *bucketPolicy) err
policyPath := pathJoin(bucketConfigPrefix, bucket, bucketPolicyConfig)
// Acquire a write lock on policy config before modifying.
objLock := globalNSMutex.NewNSLock(minioMetaBucket, policyPath)
objLock.Lock()
if err := objLock.GetLock(globalOperationTimeout); err != nil {
return err
}
defer objLock.Unlock()
if _, err := objAPI.PutObject(minioMetaBucket, policyPath, int64(len(buf)), bytes.NewReader(buf), nil, ""); err != nil {
errorIf(err, "Unable to set policy for the bucket %s", bucket)
@ -235,7 +241,9 @@ func parseAndPersistBucketPolicy(bucket string, policyBytes []byte, objAPI Objec
// Acquire a write lock on bucket before modifying its configuration.
bucketLock := globalNSMutex.NewNSLock(bucket, "")
bucketLock.Lock()
if bucketLock.GetLock(globalOperationTimeout) != nil {
return ErrOperationTimedOut
}
// Release lock after notifying peers
defer bucketLock.Unlock()

@ -0,0 +1,120 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"sync"
"sync/atomic"
"time"
)
const (
dynamicTimeoutIncreaseThresholdPct = 0.33 // Upper threshold for failures in order to increase timeout
dynamicTimeoutDecreaseThresholdPct = 0.10 // Lower threshold for failures in order to decrease timeout
dynamicTimeoutLogSize = 16
maxDuration = time.Duration(1<<63 - 1)
)
// timeouts that are dynamically adapted based on actual usage results
type dynamicTimeout struct {
timeout int64
minimum int64
entries int64
log [dynamicTimeoutLogSize]time.Duration
mutex sync.Mutex
}
// newDynamicTimeout returns a new dynamic timeout initialized with timeout value
func newDynamicTimeout(timeout, minimum time.Duration) *dynamicTimeout {
return &dynamicTimeout{timeout: int64(timeout), minimum: int64(minimum)}
}
// Timeout returns the current timeout value
func (dt *dynamicTimeout) Timeout() time.Duration {
return time.Duration(atomic.LoadInt64(&dt.timeout))
}
// LogSuccess logs the duration of a successful action that
// did not hit the timeout
func (dt *dynamicTimeout) LogSuccess(duration time.Duration) {
dt.logEntry(duration)
}
// LogFailure logs an action that hit the timeout
func (dt *dynamicTimeout) LogFailure() {
dt.logEntry(maxDuration)
}
// logEntry stores a log entry
func (dt *dynamicTimeout) logEntry(duration time.Duration) {
entries := int(atomic.AddInt64(&dt.entries, 1))
index := entries - 1
if index < dynamicTimeoutLogSize {
dt.mutex.Lock()
dt.log[index] = duration
dt.mutex.Unlock()
}
if entries == dynamicTimeoutLogSize {
dt.mutex.Lock()
// Make copy on stack in order to call adjust()
logCopy := [dynamicTimeoutLogSize]time.Duration{}
copy(logCopy[:], dt.log[:])
// reset log entries
atomic.StoreInt64(&dt.entries, 0)
dt.mutex.Unlock()
dt.adjust(logCopy)
}
}
// adjust changes the value of the dynamic timeout based on the
// previous results
func (dt *dynamicTimeout) adjust(entries [dynamicTimeoutLogSize]time.Duration) {
failures, average := 0, 0
for i := 0; i < len(entries); i++ {
if entries[i] == maxDuration {
failures++
} else {
average += int(entries[i])
}
}
if failures < len(entries) {
average /= len(entries) - failures
}
timeOutHitPct := float64(failures) / float64(len(entries))
if timeOutHitPct > dynamicTimeoutIncreaseThresholdPct {
// We are hitting the timeout too often, so increase the timeout by 25%
timeout := atomic.LoadInt64(&dt.timeout) * 125 / 100
atomic.StoreInt64(&dt.timeout, timeout)
} else if timeOutHitPct < dynamicTimeoutDecreaseThresholdPct {
// We are hitting the timeout relatively few times, so decrease the timeout
average = average * 125 / 100 // Add buffer of 25% on top of average
timeout := (atomic.LoadInt64(&dt.timeout) + int64(average)) / 2 // Middle between current timeout and average success
if timeout < dt.minimum {
timeout = dt.minimum
}
atomic.StoreInt64(&dt.timeout, timeout)
}
}

@ -0,0 +1,207 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"math/rand"
"testing"
"time"
)
func TestDynamicTimeoutSingleIncrease(t *testing.T) {
timeout := newDynamicTimeout(time.Minute, time.Second)
initial := timeout.Timeout()
for i := 0; i < dynamicTimeoutLogSize; i++ {
timeout.LogFailure()
}
adjusted := timeout.Timeout()
if initial >= adjusted {
t.Errorf("Failure to increase timeout, expected %v to be more than %v", adjusted, initial)
}
}
func TestDynamicTimeoutDualIncrease(t *testing.T) {
timeout := newDynamicTimeout(time.Minute, time.Second)
initial := timeout.Timeout()
for i := 0; i < dynamicTimeoutLogSize; i++ {
timeout.LogFailure()
}
adjusted := timeout.Timeout()
for i := 0; i < dynamicTimeoutLogSize; i++ {
timeout.LogFailure()
}
adjustedAgain := timeout.Timeout()
if initial >= adjusted || adjusted >= adjustedAgain {
t.Errorf("Failure to increase timeout multiple times")
}
}
func TestDynamicTimeoutSingleDecrease(t *testing.T) {
timeout := newDynamicTimeout(time.Minute, time.Second)
initial := timeout.Timeout()
for i := 0; i < dynamicTimeoutLogSize; i++ {
timeout.LogSuccess(20 * time.Second)
}
adjusted := timeout.Timeout()
if initial <= adjusted {
t.Errorf("Failure to decrease timeout, expected %v to be less than %v", adjusted, initial)
}
}
func TestDynamicTimeoutDualDecrease(t *testing.T) {
timeout := newDynamicTimeout(time.Minute, time.Second)
initial := timeout.Timeout()
for i := 0; i < dynamicTimeoutLogSize; i++ {
timeout.LogSuccess(20 * time.Second)
}
adjusted := timeout.Timeout()
for i := 0; i < dynamicTimeoutLogSize; i++ {
timeout.LogSuccess(20 * time.Second)
}
adjustedAgain := timeout.Timeout()
if initial <= adjusted || adjusted <= adjustedAgain {
t.Errorf("Failure to decrease timeout multiple times")
}
}
func TestDynamicTimeoutManyDecreases(t *testing.T) {
timeout := newDynamicTimeout(time.Minute, time.Second)
initial := timeout.Timeout()
const successTimeout = 20 * time.Second
for l := 0; l < 100; l++ {
for i := 0; i < dynamicTimeoutLogSize; i++ {
timeout.LogSuccess(successTimeout)
}
}
adjusted := timeout.Timeout()
// Check whether eventual timeout is between initial value and success timeout
if initial <= adjusted || adjusted <= successTimeout {
t.Errorf("Failure to decrease timeout appropriately")
}
}
func TestDynamicTimeoutHitMinimum(t *testing.T) {
const minimum = 30 * time.Second
timeout := newDynamicTimeout(time.Minute, minimum)
initial := timeout.Timeout()
const successTimeout = 20 * time.Second
for l := 0; l < 100; l++ {
for i := 0; i < dynamicTimeoutLogSize; i++ {
timeout.LogSuccess(successTimeout)
}
}
adjusted := timeout.Timeout()
// Check whether eventual timeout has hit the minimum value
if initial <= adjusted || adjusted != minimum {
t.Errorf("Failure to decrease timeout appropriately")
}
}
func testDynamicTimeoutAdjust(t *testing.T, timeout *dynamicTimeout, f func() float64) {
const successTimeout = 20 * time.Second
for i := 0; i < dynamicTimeoutLogSize; i++ {
rnd := f()
duration := time.Duration(float64(successTimeout) * rnd)
if duration < 100*time.Millisecond {
duration = 100 * time.Millisecond
}
if duration >= time.Minute {
timeout.LogFailure()
} else {
timeout.LogSuccess(duration)
}
}
}
func TestDynamicTimeoutAdjustExponential(t *testing.T) {
timeout := newDynamicTimeout(time.Minute, time.Second)
rand.Seed(time.Now().UTC().UnixNano())
initial := timeout.Timeout()
for try := 0; try < 10; try++ {
testDynamicTimeoutAdjust(t, timeout, rand.ExpFloat64)
}
adjusted := timeout.Timeout()
if initial <= adjusted {
t.Errorf("Failure to decrease timeout, expected %v to be less than %v", adjusted, initial)
}
}
func TestDynamicTimeoutAdjustNormalized(t *testing.T) {
timeout := newDynamicTimeout(time.Minute, time.Second)
rand.Seed(time.Now().UTC().UnixNano())
initial := timeout.Timeout()
for try := 0; try < 10; try++ {
testDynamicTimeoutAdjust(t, timeout, func() float64 {
return 1.0 + rand.NormFloat64()
})
}
adjusted := timeout.Timeout()
if initial <= adjusted {
t.Errorf("Failure to decrease timeout, expected %v to be less than %v", adjusted, initial)
}
}

@ -370,7 +370,9 @@ func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationCon
// Acquire a write lock on notification config before modifying.
objLock := globalNSMutex.NewNSLock(minioMetaBucket, ncPath)
objLock.RLock()
if err := objLock.GetRLock(globalOperationTimeout); err != nil {
return nil, err
}
defer objLock.RUnlock()
var buffer bytes.Buffer
@ -413,7 +415,9 @@ func loadListenerConfig(bucket string, objAPI ObjectLayer) ([]listenerConfig, er
// Acquire a write lock on notification config before modifying.
objLock := globalNSMutex.NewNSLock(minioMetaBucket, lcPath)
objLock.RLock()
if err := objLock.GetRLock(globalOperationTimeout); err != nil {
return nil, err
}
defer objLock.RUnlock()
var buffer bytes.Buffer
@ -454,7 +458,9 @@ func persistNotificationConfig(bucket string, ncfg *notificationConfig, obj Obje
ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
// Acquire a write lock on notification config before modifying.
objLock := globalNSMutex.NewNSLock(minioMetaBucket, ncPath)
objLock.Lock()
if err = objLock.GetLock(globalOperationTimeout); err != nil {
return err
}
defer objLock.Unlock()
// write object to path
@ -479,7 +485,9 @@ func persistListenerConfig(bucket string, lcfg []listenerConfig, obj ObjectLayer
lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig)
// Acquire a write lock on notification config before modifying.
objLock := globalNSMutex.NewNSLock(minioMetaBucket, lcPath)
objLock.Lock()
if err = objLock.GetLock(globalOperationTimeout); err != nil {
return err
}
defer objLock.Unlock()
// write object to path
@ -502,7 +510,9 @@ func removeNotificationConfig(bucket string, objAPI ObjectLayer) error {
// Acquire a write lock on notification config before modifying.
objLock := globalNSMutex.NewNSLock(minioMetaBucket, ncPath)
objLock.Lock()
if err := objLock.GetLock(globalOperationTimeout); err != nil {
return err
}
defer objLock.Unlock()
return objAPI.DeleteObject(minioMetaBucket, ncPath)
}
@ -514,7 +524,9 @@ func removeListenerConfig(bucket string, objAPI ObjectLayer) error {
// Acquire a write lock on notification config before modifying.
objLock := globalNSMutex.NewNSLock(minioMetaBucket, lcPath)
objLock.Lock()
if err := objLock.GetLock(globalOperationTimeout); err != nil {
return err
}
defer objLock.Unlock()
return objAPI.DeleteObject(minioMetaBucket, lcPath)
}

@ -118,7 +118,9 @@ func (fs fsObjects) listMultipartUploadIDs(bucketName, objectName, uploadIDMarke
// Hold the lock so that two parallel complete-multipart-uploads
// do not leave a stale uploads.json behind.
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucketName, objectName))
objectMPartPathLock.RLock()
if err := objectMPartPathLock.GetRLock(globalListingTimeout); err != nil {
return nil, false, traceError(err)
}
defer objectMPartPathLock.RUnlock()
uploadsPath := pathJoin(bucketName, objectName, uploadsJSONFile)
@ -413,7 +415,9 @@ func (fs fsObjects) NewMultipartUpload(bucket, object string, meta map[string]st
// Hold the lock so that two parallel complete-multipart-uploads
// do not leave a stale uploads.json behind.
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object))
objectMPartPathLock.Lock()
if err := objectMPartPathLock.GetLock(globalOperationTimeout); err != nil {
return "", err
}
defer objectMPartPathLock.Unlock()
return fs.newMultipartUpload(bucket, object, meta)
@ -482,7 +486,9 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// Hold the lock so that two parallel complete-multipart-uploads
// do not leave a stale uploads.json behind.
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object))
objectMPartPathLock.Lock()
if err := objectMPartPathLock.GetLock(globalOperationTimeout); err != nil {
return pi, err
}
defer objectMPartPathLock.Unlock()
// Disallow any parallel abort or complete multipart operations.
@ -582,7 +588,9 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// Lock the part so that another part upload with same part-number gets blocked
// while the part is getting appended in the background.
partLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, partPath)
partLock.Lock()
if err = partLock.GetLock(globalOperationTimeout); err != nil {
return pi, err
}
fsNSPartPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, partPath)
if err = fsRenameFile(fsPartPath, fsNSPartPath); err != nil {
@ -686,7 +694,9 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
// Hold the lock so that two parallel complete-multipart-uploads
// do not leave a stale uploads.json behind.
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object))
objectMPartPathLock.Lock()
if err = objectMPartPathLock.GetLock(globalOperationTimeout); err != nil {
return oi, err
}
defer objectMPartPathLock.Unlock()
fsMetaPathMultipart := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile)
@ -894,7 +904,9 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error
// do not leave a stale uploads.json behind.
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
pathJoin(bucket, object))
objectMPartPathLock.Lock()
if err := objectMPartPathLock.GetLock(globalOperationTimeout); err != nil {
return err
}
defer objectMPartPathLock.Unlock()
fsMetaPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile)

@ -824,7 +824,9 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey
// Protect reading `fs.json`.
objectLock := globalNSMutex.NewNSLock(bucket, entry)
objectLock.RLock()
if err = objectLock.GetRLock(globalListingTimeout); err != nil {
return ObjectInfo{}, err
}
var etag string
etag, err = fs.getObjectETag(bucket, entry)
objectLock.RUnlock()

@ -262,7 +262,10 @@ func (api gatewayAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Re
// Lock the object.
objectLock := globalNSMutex.NewNSLock(bucket, object)
objectLock.Lock()
if objectLock.GetLock(globalOperationTimeout) != nil {
writeErrorResponse(w, ErrOperationTimedOut, r.URL)
return
}
defer objectLock.Unlock()
var objInfo ObjectInfo
@ -622,7 +625,10 @@ func (api gatewayAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Re
}
bucketLock := globalNSMutex.NewNSLock(bucket, "")
bucketLock.Lock()
if bucketLock.GetLock(globalOperationTimeout) != nil {
writeErrorResponse(w, ErrOperationTimedOut, r.URL)
return
}
defer bucketLock.Unlock()
// Proceed to creating a bucket.

@ -136,6 +136,11 @@ var (
globalPublicCerts []*x509.Certificate
globalXLObjCacheDisabled bool
// Add new variable global values here.
globalListingTimeout = newDynamicTimeout( /*30*/ 600*time.Second /*5*/, 600*time.Second) // timeout for listing related ops
globalObjectTimeout = newDynamicTimeout( /*1*/ 10*time.Minute /*10*/, 600*time.Second) // timeout for Object API related ops
globalOperationTimeout = newDynamicTimeout(10*time.Minute /*30*/, 600*time.Second) // default timeout for general ops
globalHealingTimeout = newDynamicTimeout(30*time.Minute /*1*/, 30*time.Minute) // timeout for healing related ops
)
var (

@ -178,6 +178,38 @@ func (n *nsLockMap) statusNoneToBlocked(param nsParam, lockSource, opsID string,
return nil
}
// Change the state of the lock from Blocked to none.
func (n *nsLockMap) statusBlockedToNone(param nsParam, lockSource, opsID string, readLock bool) error {
_, ok := n.debugLockMap[param]
if !ok {
return traceError(LockInfoVolPathMissing{param.volume, param.path})
}
// Check whether lock info entry for the given `opsID` exists.
lockInfo, ok := n.debugLockMap[param].lockInfo[opsID]
if !ok {
return traceError(LockInfoOpsIDNotFound{param.volume, param.path, opsID})
}
// Check whether lockSource is same.
if lockInfo.lockSource != lockSource {
return traceError(LockInfoOriginMismatch{param.volume, param.path, opsID, lockSource})
}
// Status of the lock should be set to "Blocked".
if lockInfo.status != blockedStatus {
return traceError(LockInfoStateNotBlocked{param.volume, param.path, opsID})
}
// Clear the status by removing the entry for the given `opsID`.
delete(n.debugLockMap[param].lockInfo, opsID)
// Update global lock stats.
n.counters.lockTimedOut()
// Update (volume, path) lock stats.
n.debugLockMap[param].counters.lockTimedOut()
return nil
}
// deleteLockInfoEntry - Deletes the lock information for given (volume, path).
// Called when nsLk.ref count is 0.
func (n *nsLockMap) deleteLockInfoEntryForVolumePath(param nsParam) error {

@ -35,6 +35,12 @@ func (ls *lockStat) lockGranted() {
ls.granted++
}
// lockTimedOut - updates lock stat when a lock is timed out.
func (ls *lockStat) lockTimedOut() {
ls.blocked--
ls.total--
}
// lockRemoved - updates lock stat when a lock is removed, by Unlock
// or ForceUnlock.
func (ls *lockStat) lockRemoved(granted bool) {

@ -33,13 +33,17 @@ func TestListLocksInfo(t *testing.T) {
// Take 10 read locks on bucket1/prefix1/obj1
for i := 0; i < 10; i++ {
readLk := globalNSMutex.NewNSLock("bucket1", "prefix1/obj1")
readLk.RLock()
if readLk.GetRLock(newDynamicTimeout(60*time.Second, time.Second)) != nil {
t.Errorf("Failed to get read lock on iteration %d", i)
}
}
// Take write locks on bucket1/prefix/obj{11..19}
for i := 0; i < 10; i++ {
wrLk := globalNSMutex.NewNSLock("bucket1", fmt.Sprintf("prefix1/obj%d", 10+i))
wrLk.Lock()
if wrLk.GetLock(newDynamicTimeout(60*time.Second, time.Second)) != nil {
t.Errorf("Failed to get write lock on iteration %d", i)
}
}
testCases := []struct {

@ -21,7 +21,10 @@ import (
pathutil "path"
"sync"
"fmt"
"github.com/minio/dsync"
"github.com/minio/lsync"
"time"
)
// Global name space lock.
@ -30,11 +33,19 @@ var globalNSMutex *nsLockMap
// Global lock servers
var globalLockServers []*lockServer
// RWLocker - locker interface extends sync.Locker
// to introduce RLock, RUnlock.
// RWLocker - locker interface to introduce GetRLock, RUnlock.
type RWLocker interface {
sync.Locker
RLock()
GetLock(timeout *dynamicTimeout) (timedOutErr error)
Unlock()
GetRLock(timeout *dynamicTimeout) (timedOutErr error)
RUnlock()
}
// RWLockerSync - internal locker interface.
type RWLockerSync interface {
GetLock(timeout time.Duration) bool
Unlock()
GetRLock(timeout time.Duration) bool
RUnlock()
}
@ -101,7 +112,7 @@ type nsParam struct {
// nsLock - provides primitives for locking critical namespace regions.
type nsLock struct {
RWLocker
RWLockerSync
ref uint
}
@ -119,7 +130,7 @@ type nsLockMap struct {
}
// Lock the namespace resource.
func (n *nsLockMap) lock(volume, path string, lockSource, opsID string, readLock bool) {
func (n *nsLockMap) lock(volume, path string, lockSource, opsID string, readLock bool, timeout time.Duration) (locked bool) {
var nsLk *nsLock
n.lockMapMutex.Lock()
@ -127,11 +138,11 @@ func (n *nsLockMap) lock(volume, path string, lockSource, opsID string, readLock
nsLk, found := n.lockMap[param]
if !found {
nsLk = &nsLock{
RWLocker: func() RWLocker {
RWLockerSync: func() RWLockerSync {
if n.isDistXL {
return dsync.NewDRWMutex(pathJoin(volume, path))
}
return &sync.RWMutex{}
return &lsync.LRWMutex{}
}(),
ref: 0,
}
@ -144,17 +155,45 @@ func (n *nsLockMap) lock(volume, path string, lockSource, opsID string, readLock
// unblocks. The lock for accessing `globalNSMutex` is held inside
// the function itself.
if err := n.statusNoneToBlocked(param, lockSource, opsID, readLock); err != nil {
errorIf(err, "Failed to set lock state to blocked")
errorIf(err, fmt.Sprintf("Failed to set lock state to blocked (param = %v; opsID = %s)", param, opsID))
}
// Unlock map before Locking NS which might block.
n.lockMapMutex.Unlock()
// Locking here can block.
// Locking here will block (until timeout).
if readLock {
nsLk.RLock()
locked = nsLk.GetRLock(timeout)
} else {
nsLk.Lock()
locked = nsLk.GetLock(timeout)
}
if !locked { // We failed to get the lock
n.lockMapMutex.Lock()
defer n.lockMapMutex.Unlock()
// Changing the status of the operation from blocked to none
if err := n.statusBlockedToNone(param, lockSource, opsID, readLock); err != nil {
errorIf(err, fmt.Sprintf("Failed to clear the lock state (param = %v; opsID = %s)", param, opsID))
}
nsLk.ref-- // Decrement ref count since we failed to get the lock
// delete the lock state entry for given operation ID.
err := n.deleteLockInfoEntryForOps(param, opsID)
if err != nil {
errorIf(err, fmt.Sprintf("Failed to delete lock info entry (param = %v; opsID = %s)", param, opsID))
}
if nsLk.ref == 0 {
// Remove from the map if there are no more references.
delete(n.lockMap, param)
// delete the lock state entry for given
// <volume, path> pair.
err := n.deleteLockInfoEntryForVolumePath(param)
if err != nil {
errorIf(err, fmt.Sprintf("Failed to delete lock info entry (param = %v)", param))
}
}
return
}
// Changing the status of the operation from blocked to
@ -163,6 +202,7 @@ func (n *nsLockMap) lock(volume, path string, lockSource, opsID string, readLock
if err := n.statusBlockedToRunning(param, lockSource, opsID, readLock); err != nil {
errorIf(err, "Failed to set the lock state to running")
}
return
}
// Unlock the namespace resource.
@ -208,11 +248,11 @@ func (n *nsLockMap) unlock(volume, path, opsID string, readLock bool) {
// Lock - locks the given resource for writes, using a previously
// allocated name space lock or initializing a new one.
func (n *nsLockMap) Lock(volume, path, opsID string) {
func (n *nsLockMap) Lock(volume, path, opsID string, timeout time.Duration) (locked bool) {
readLock := false // This is a write lock.
lockSource := getSource() // Useful for debugging
n.lock(volume, path, lockSource, opsID, readLock)
return n.lock(volume, path, lockSource, opsID, readLock, timeout)
}
// Unlock - unlocks any previously acquired write locks.
@ -222,11 +262,11 @@ func (n *nsLockMap) Unlock(volume, path, opsID string) {
}
// RLock - locks any previously acquired read locks.
func (n *nsLockMap) RLock(volume, path, opsID string) {
func (n *nsLockMap) RLock(volume, path, opsID string, timeout time.Duration) (locked bool) {
readLock := true
lockSource := getSource() // Useful for debugging
n.lock(volume, path, lockSource, opsID, readLock)
return n.lock(volume, path, lockSource, opsID, readLock, timeout)
}
// RUnlock - unlocks any previously acquired read locks.
@ -282,11 +322,17 @@ func (n *nsLockMap) NewNSLock(volume, path string) RWLocker {
return &lockInstance{n, volume, path, getOpsID()}
}
// Lock - block until write lock is taken.
func (li *lockInstance) Lock() {
// Lock - block until write lock is taken or timeout has occurred.
func (li *lockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error) {
lockSource := getSource()
start := UTCNow()
readLock := false
li.ns.lock(li.volume, li.path, lockSource, li.opsID, readLock)
if !li.ns.lock(li.volume, li.path, lockSource, li.opsID, readLock, timeout.Timeout()) {
timeout.LogFailure()
return OperationTimedOut{Path: li.path}
}
timeout.LogSuccess(UTCNow().Sub(start))
return
}
// Unlock - block until write lock is released.
@ -295,11 +341,17 @@ func (li *lockInstance) Unlock() {
li.ns.unlock(li.volume, li.path, li.opsID, readLock)
}
// RLock - block until read lock is taken.
func (li *lockInstance) RLock() {
// RLock - block until read lock is taken or timeout has occurred.
func (li *lockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr error) {
lockSource := getSource()
start := UTCNow()
readLock := true
li.ns.lock(li.volume, li.path, lockSource, li.opsID, readLock)
if !li.ns.lock(li.volume, li.path, lockSource, li.opsID, readLock, timeout.Timeout()) {
timeout.LogFailure()
return OperationTimedOut{Path: li.path}
}
timeout.LogSuccess(UTCNow().Sub(start))
return
}
// RUnlock - block until read lock is released.

@ -17,8 +17,6 @@
package cmd
import (
"strconv"
"sync"
"testing"
"time"
)
@ -27,9 +25,9 @@ import (
func TestNamespaceLockTest(t *testing.T) {
// List of test cases.
testCases := []struct {
lk func(s1, s2, s3 string)
lk func(s1, s2, s3 string, t time.Duration) bool
unlk func(s1, s2, s3 string)
rlk func(s1, s2, s3 string)
rlk func(s1, s2, s3 string, t time.Duration) bool
runlk func(s1, s2, s3 string)
lkCount int
lockedRefCount uint
@ -63,7 +61,9 @@ func TestNamespaceLockTest(t *testing.T) {
// Write lock tests.
testCase := testCases[0]
testCase.lk("a", "b", "c") // lock once.
if !testCase.lk("a", "b", "c", 60*time.Second) { // lock once.
t.Fatalf("Failed to acquire lock")
}
nsLk, ok := globalNSMutex.lockMap[nsParam{"a", "b"}]
if !ok && testCase.shouldPass {
t.Errorf("Lock in map missing.")
@ -83,10 +83,18 @@ func TestNamespaceLockTest(t *testing.T) {
// Read lock tests.
testCase = testCases[1]
testCase.rlk("a", "b", "c") // lock once.
testCase.rlk("a", "b", "c") // lock second time.
testCase.rlk("a", "b", "c") // lock third time.
testCase.rlk("a", "b", "c") // lock fourth time.
if !testCase.rlk("a", "b", "c", 60*time.Second) { // lock once.
t.Fatalf("Failed to acquire first read lock")
}
if !testCase.rlk("a", "b", "c", 60*time.Second) { // lock second time.
t.Fatalf("Failed to acquire second read lock")
}
if !testCase.rlk("a", "b", "c", 60*time.Second) { // lock third time.
t.Fatalf("Failed to acquire third read lock")
}
if !testCase.rlk("a", "b", "c", 60*time.Second) { // lock fourth time.
t.Fatalf("Failed to acquire fourth read lock")
}
nsLk, ok = globalNSMutex.lockMap[nsParam{"a", "b"}]
if !ok && testCase.shouldPass {
t.Errorf("Lock in map missing.")
@ -108,7 +116,9 @@ func TestNamespaceLockTest(t *testing.T) {
// Read lock 0 ref count.
testCase = testCases[2]
testCase.rlk("a", "c", "d") // lock once.
if !testCase.rlk("a", "c", "d", 60*time.Second) { // lock once.
t.Fatalf("Failed to acquire read lock")
}
nsLk, ok = globalNSMutex.lockMap[nsParam{"a", "c"}]
if !ok && testCase.shouldPass {
@ -128,258 +138,47 @@ func TestNamespaceLockTest(t *testing.T) {
}
}
func TestLockStats(t *testing.T) {
expectedResult := []lockStateCase{
// Test case - 1.
// Case where 10 read locks are held.
// Entry for any of the 10 reads locks has to be found.
// Since they held in a loop, Lock origin for first 10 read locks (opsID 0-9) should be the same.
{
volume: "my-bucket",
path: "my-object",
opsID: "0",
readLock: true,
lockSource: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Running",
expectedGlobalLockCount: 10,
expectedRunningLockCount: 10,
expectedBlockedLockCount: 0,
expectedVolPathLockCount: 10,
expectedVolPathRunningCount: 10,
expectedVolPathBlockCount: 0,
},
// Test case - 2.
// Case where the first 5 read locks are released.
// Entry for any of the 6-10th "Running" reads lock has to be found.
{
volume: "my-bucket",
path: "my-object",
opsID: "6",
readLock: true,
lockSource: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Running",
expectedGlobalLockCount: 5,
expectedRunningLockCount: 5,
expectedBlockedLockCount: 0,
expectedVolPathLockCount: 5,
expectedVolPathRunningCount: 5,
expectedVolPathBlockCount: 0,
},
// Test case - 3.
{
volume: "my-bucket",
path: "my-object",
opsID: "10",
readLock: false,
lockSource: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Running",
expectedGlobalLockCount: 2,
expectedRunningLockCount: 1,
expectedBlockedLockCount: 1,
expectedVolPathLockCount: 2,
expectedVolPathRunningCount: 1,
expectedVolPathBlockCount: 1,
},
// Test case - 4.
{
volume: "my-bucket",
path: "my-object",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Blocked",
expectedGlobalLockCount: 1,
expectedRunningLockCount: 0,
expectedBlockedLockCount: 1,
expectedVolPathLockCount: 1,
expectedVolPathRunningCount: 0,
expectedVolPathBlockCount: 1,
},
// Test case - 5.
{
volume: "my-bucket",
path: "my-object",
opsID: "11",
readLock: false,
lockSource: "[lock held] in github.com/minio/minio/cmd.TestLockStats[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:298]",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Running",
expectedGlobalLockCount: 1,
expectedRunningLockCount: 1,
expectedBlockedLockCount: 0,
expectedVolPathLockCount: 1,
expectedVolPathRunningCount: 1,
expectedVolPathBlockCount: 0,
},
// Test case - 6.
// Case where in the first 5 read locks are released, but 2 write locks are
// blocked waiting for the remaining 5 read locks locks to be released (10 read locks were held initially).
// We check the entry for the first blocked write call here.
{
volume: "my-bucket",
path: "my-object",
opsID: "10",
readLock: false,
// write lock is held at line 318.
// this confirms that we are looking the right write lock.
lockSource: "[lock held] in github.com/minio/minio/cmd.TestLockStats.func2[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:318]",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Blocked",
// count of held(running) + blocked locks.
expectedGlobalLockCount: 7,
// count of acquired locks.
expectedRunningLockCount: 5,
// 2 write calls are blocked, waiting for the remaining 5 read locks.
expectedBlockedLockCount: 2,
expectedVolPathLockCount: 7,
expectedVolPathRunningCount: 5,
expectedVolPathBlockCount: 2,
},
// Test case - 7.
// Case where in 9 out of 10 read locks are released.
// Since there's one more pending read lock, the 2 write locks are still blocked.
// Testing the entry for the last read lock.
{volume: "my-bucket",
path: "my-object",
opsID: "9",
readLock: true,
lockSource: "[lock held] in github.com/minio/minio/cmd.TestLockStats.func2[/Users/hackintoshrao/mycode/go/src/github.com/minio/minio/cmd/namespace-lock_test.go:318]",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Running",
// Total running + blocked locks.
// 2 blocked write lock.
expectedGlobalLockCount: 3,
expectedRunningLockCount: 1,
expectedBlockedLockCount: 2,
expectedVolPathLockCount: 3,
expectedVolPathRunningCount: 1,
expectedVolPathBlockCount: 2,
},
// Test case - 8.
{
volume: "my-bucket",
path: "my-object",
// expected metrics.
expectedErr: nil,
expectedLockStatus: "Blocked",
func TestNamespaceLockTimedOut(t *testing.T) {
expectedGlobalLockCount: 0,
expectedRunningLockCount: 0,
expectedBlockedLockCount: 0,
},
// Get write lock
if !globalNSMutex.Lock("my-bucket", "my-object", "abc", 60*time.Second) {
t.Fatalf("Failed to acquire lock")
}
var wg sync.WaitGroup
// initializing the locks.
initNSLock(false)
// hold 10 read locks.
for i := 0; i < 10; i++ {
globalNSMutex.RLock("my-bucket", "my-object", strconv.Itoa(i))
}
// expected lock info.
expectedLockStats := expectedResult[0]
// verify the actual lock info with the expected one.
verifyLockState(expectedLockStats, t, 1)
// unlock 5 readlock.
for i := 0; i < 5; i++ {
globalNSMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(i))
// Second attempt for write lock on same resource should time out
locked := globalNSMutex.Lock("my-bucket", "my-object", "def", 1*time.Second)
if locked {
t.Fatalf("Should not have acquired lock")
}
expectedLockStats = expectedResult[1]
// verify the actual lock info with the expected one.
verifyLockState(expectedLockStats, t, 2)
syncChan := make(chan struct{}, 1)
wg.Add(1)
go func() {
defer wg.Done()
// blocks till all read locks are released.
globalNSMutex.Lock("my-bucket", "my-object", strconv.Itoa(10))
// Once the above attempt to lock is unblocked/acquired, we verify the stats and release the lock.
expectedWLockStats := expectedResult[2]
// Since the write lock acquired here, the number of blocked locks should reduce by 1 and
// count of running locks should increase by 1.
verifyLockState(expectedWLockStats, t, 3)
// release the write lock.
globalNSMutex.Unlock("my-bucket", "my-object", strconv.Itoa(10))
// The number of running locks should decrease by 1.
// expectedWLockStats = expectedResult[3]
// verifyLockState(expectedWLockStats, t, 4)
// Take the lock stats after the first write lock is unlocked.
// Only then unlock then second write lock.
syncChan <- struct{}{}
}()
// waiting so that the write locks in the above go routines are held.
// sleeping so that we can predict the order of the write locks held.
time.Sleep(100 * time.Millisecond)
// since there are 5 more readlocks still held on <"my-bucket","my-object">,
// an attempt to hold write locks blocks. So its run in a new go routine.
wg.Add(1)
go func() {
defer wg.Done()
// blocks till all read locks are released.
globalNSMutex.Lock("my-bucket", "my-object", strconv.Itoa(11))
// Once the above attempt to lock is unblocked/acquired, we release the lock.
// Unlock the second write lock only after lock stats for first write lock release is taken.
<-syncChan
// The number of running locks should decrease by 1.
expectedWLockStats := expectedResult[4]
verifyLockState(expectedWLockStats, t, 5)
globalNSMutex.Unlock("my-bucket", "my-object", strconv.Itoa(11))
}()
// Read lock on same resource should also time out
locked = globalNSMutex.RLock("my-bucket", "my-object", "def", 1*time.Second)
if locked {
t.Fatalf("Should not have acquired read lock while write lock is active")
}
expectedLockStats = expectedResult[5]
// Release write lock
globalNSMutex.Unlock("my-bucket", "my-object", "abc")
time.Sleep(1 * time.Second)
// verify the actual lock info with the expected one.
verifyLockState(expectedLockStats, t, 6)
// Get read lock
if !globalNSMutex.RLock("my-bucket", "my-object", "ghi", 60*time.Second) {
t.Fatalf("Failed to acquire read lock")
}
// unlock 4 out of remaining 5 read locks.
for i := 0; i < 4; i++ {
globalNSMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(i+5))
// Write lock on same resource should time out
locked = globalNSMutex.Lock("my-bucket", "my-object", "klm", 1*time.Second)
if locked {
t.Fatalf("Should not have acquired lock")
}
// verify the entry for one remaining read lock and count of blocked write locks.
expectedLockStats = expectedResult[6]
// verify the actual lock info with the expected one.
verifyLockState(expectedLockStats, t, 7)
// 2nd read lock should be just fine
if !globalNSMutex.RLock("my-bucket", "my-object", "nop", 60*time.Second) {
t.Fatalf("Failed to acquire second read lock")
}
// Releasing the last read lock.
globalNSMutex.RUnlock("my-bucket", "my-object", strconv.Itoa(9))
wg.Wait()
expectedLockStats = expectedResult[7]
// verify the actual lock info with the expected one.
verifyGlobalLockStats(expectedLockStats, t, 8)
// Release both read locks
globalNSMutex.RUnlock("my-bucket", "my-object", "ghi")
globalNSMutex.RUnlock("my-bucket", "my-object", "nop")
}
// Tests functionality to forcefully unlock locks.
@ -387,7 +186,9 @@ func TestNamespaceForceUnlockTest(t *testing.T) {
// Create lock.
lock := globalNSMutex.NewNSLock("bucket", "object")
lock.Lock()
if lock.GetLock(newDynamicTimeout(60*time.Second, time.Second)) != nil {
t.Fatalf("Failed to get lock")
}
// Forcefully unlock lock.
globalNSMutex.ForceUnlock("bucket", "object")
@ -396,8 +197,10 @@ func TestNamespaceForceUnlockTest(t *testing.T) {
go func() {
// Try to claim lock again.
anotherLock := globalNSMutex.NewNSLock("bucket", "object")
anotherLock.Lock()
// And signal succes.
if anotherLock.GetLock(newDynamicTimeout(60*time.Second, time.Second)) != nil {
t.Fatalf("Failed to get lock")
}
// And signal success.
ch <- struct{}{}
}()

@ -305,6 +305,15 @@ func (e ObjectTooSmall) Error() string {
return "size of the object less than what is expected"
}
// OperationTimedOut - a timeout occurred.
type OperationTimedOut struct {
Path string
}
func (e OperationTimedOut) Error() string {
return "Operation timed out: " + e.Path
}
/// Multipart related errors.
// MalformedUploadID malformed upload id.

@ -232,7 +232,9 @@ func isETagEqual(left, right string) bool {
func deleteObject(obj ObjectLayer, bucket, object string, r *http.Request) (err error) {
// Acquire a write lock before deleting the object.
objectLock := globalNSMutex.NewNSLock(bucket, object)
objectLock.Lock()
if err = objectLock.GetLock(globalOperationTimeout); err != nil {
return err
}
defer objectLock.Unlock()
// Proceed to delete the object.

@ -112,7 +112,10 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
// Lock the object before reading.
objectLock := globalNSMutex.NewNSLock(bucket, object)
objectLock.RLock()
if objectLock.GetRLock(globalObjectTimeout) != nil {
writeErrorResponse(w, ErrOperationTimedOut, r.URL)
return
}
defer objectLock.RUnlock()
objInfo, err := objectAPI.GetObjectInfo(bucket, object)
@ -232,7 +235,10 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re
// Lock the object before reading.
objectLock := globalNSMutex.NewNSLock(bucket, object)
objectLock.RLock()
if objectLock.GetRLock(globalObjectTimeout) != nil {
writeErrorResponseHeadersOnly(w, ErrOperationTimedOut)
return
}
defer objectLock.RUnlock()
objInfo, err := objectAPI.GetObjectInfo(bucket, object)
@ -345,7 +351,10 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
// - if source and destination are different
// it is the sole mutating state.
objectDWLock := globalNSMutex.NewNSLock(dstBucket, dstObject)
objectDWLock.Lock()
if objectDWLock.GetLock(globalObjectTimeout) != nil {
writeErrorResponse(w, ErrOperationTimedOut, r.URL)
return
}
defer objectDWLock.Unlock()
// if source and destination are different, we have to hold
@ -355,9 +364,11 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
// Hold read locks on source object only if we are
// going to read data from source object.
objectSRLock := globalNSMutex.NewNSLock(srcBucket, srcObject)
objectSRLock.RLock()
if objectSRLock.GetRLock(globalObjectTimeout) != nil {
writeErrorResponse(w, ErrOperationTimedOut, r.URL)
return
}
defer objectSRLock.RUnlock()
}
objInfo, err := objectAPI.GetObjectInfo(srcBucket, srcObject)
@ -514,7 +525,10 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
// Lock the object.
objectLock := globalNSMutex.NewNSLock(bucket, object)
objectLock.Lock()
if objectLock.GetLock(globalObjectTimeout) != nil {
writeErrorResponse(w, ErrOperationTimedOut, r.URL)
return
}
defer objectLock.Unlock()
var objInfo ObjectInfo
@ -679,7 +693,10 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
// Hold read locks on source object only if we are
// going to read data from source object.
objectSRLock := globalNSMutex.NewNSLock(srcBucket, srcObject)
objectSRLock.RLock()
if objectSRLock.GetRLock(globalObjectTimeout) != nil {
writeErrorResponse(w, ErrOperationTimedOut, r.URL)
return
}
defer objectSRLock.RUnlock()
objInfo, err := objectAPI.GetObjectInfo(srcBucket, srcObject)
@ -977,7 +994,10 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
// Hold write lock on the object.
destLock := globalNSMutex.NewNSLock(bucket, object)
destLock.Lock()
if destLock.GetLock(globalObjectTimeout) != nil {
writeErrorResponse(w, ErrOperationTimedOut, r.URL)
return
}
defer destLock.Unlock()
objInfo, err := objectAPI.CompleteMultipartUpload(bucket, object, uploadID, completeParts)

@ -61,3 +61,6 @@ var errInvalidRange = errors.New("Invalid range")
// errInvalidRangeSource - returned when given range value exceeds
// the source object size.
var errInvalidRangeSource = errors.New("Range specified exceeds source object size")
// errOperationTimedOut - operation timed out.
var errOperationTimedOut = errors.New("Operation timed out")

@ -130,7 +130,9 @@ func (web *webAPIHandlers) MakeBucket(r *http.Request, args *MakeBucketArgs, rep
}
bucketLock := globalNSMutex.NewNSLock(args.BucketName, "")
bucketLock.Lock()
if err := bucketLock.GetLock(globalObjectTimeout); err != nil {
return toJSONError(errOperationTimedOut)
}
defer bucketLock.Unlock()
if err := objectAPI.MakeBucketWithLocation(args.BucketName, serverConfig.GetRegion()); err != nil {
@ -533,7 +535,10 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
// Lock the object.
objectLock := globalNSMutex.NewNSLock(bucket, object)
objectLock.Lock()
if objectLock.GetLock(globalObjectTimeout) != nil {
writeWebErrorResponse(w, errOperationTimedOut)
return
}
defer objectLock.Unlock()
sha256sum := ""
@ -575,7 +580,10 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) {
// Lock the object before reading.
objectLock := globalNSMutex.NewNSLock(bucket, object)
objectLock.RLock()
if objectLock.GetRLock(globalObjectTimeout) != nil {
writeWebErrorResponse(w, errOperationTimedOut)
return
}
defer objectLock.RUnlock()
if err := objectAPI.GetObject(bucket, object, 0, -1, w); err != nil {

@ -91,7 +91,9 @@ func (xl xlObjects) HealBucket(bucket string) error {
// Heal bucket - create buckets on disks where it does not exist.
func healBucket(storageDisks []StorageAPI, bucket string, writeQuorum int) error {
bucketLock := globalNSMutex.NewNSLock(bucket, "")
bucketLock.Lock()
if err := bucketLock.GetLock(globalHealingTimeout); err != nil {
return err
}
defer bucketLock.Unlock()
// Initialize sync waitgroup.
@ -138,7 +140,9 @@ func healBucket(storageDisks []StorageAPI, bucket string, writeQuorum int) error
func healBucketMetadata(storageDisks []StorageAPI, bucket string, readQuorum int) error {
healBucketMetaFn := func(metaPath string) error {
metaLock := globalNSMutex.NewNSLock(minioMetaBucket, metaPath)
metaLock.RLock()
if err := metaLock.GetRLock(globalHealingTimeout); err != nil {
return err
}
defer metaLock.RUnlock()
// Heals the given file at metaPath.
if _, _, err := healObject(storageDisks, minioMetaBucket, metaPath, readQuorum); err != nil && !isErrObjectNotFound(err) {
@ -520,7 +524,9 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum
func (xl xlObjects) HealObject(bucket, object string) (int, int, error) {
// Lock the object before healing.
objectLock := globalNSMutex.NewNSLock(bucket, object)
objectLock.RLock()
if err := objectLock.GetRLock(globalHealingTimeout); err != nil {
return 0, 0, err
}
defer objectLock.RUnlock()
// Heal the object.

@ -142,7 +142,9 @@ func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, ma
// Check if the current object needs healing
objectLock := globalNSMutex.NewNSLock(bucket, objInfo.Name)
objectLock.RLock()
if err := objectLock.GetRLock(globalHealingTimeout); err != nil {
return loi, err
}
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, objInfo.Name)
if xlShouldHeal(xl.storageDisks, partsMetadata, errs, bucket, objInfo.Name) {
healStat := xlHealStat(xl, partsMetadata, errs)
@ -226,7 +228,9 @@ func fetchMultipartUploadIDs(bucket, keyMarker, uploadIDMarker string,
// Hold a read lock on keyMarker path.
keyMarkerLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
pathJoin(bucket, keyMarker))
keyMarkerLock.RLock()
if err = keyMarkerLock.GetRLock(globalHealingTimeout); err != nil {
return uploads, end, err
}
for _, disk := range disks {
if disk == nil {
continue

@ -308,7 +308,9 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
// hold lock on keyMarker path
keyMarkerLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
pathJoin(bucket, keyMarker))
keyMarkerLock.RLock()
if err = keyMarkerLock.GetRLock(globalListingTimeout); err != nil {
return lmi, err
}
for _, disk := range xl.getLoadBalancedDisks() {
if disk == nil {
continue
@ -374,7 +376,9 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
// pending uploadIDs.
entryLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
pathJoin(bucket, entry))
entryLock.RLock()
if err = entryLock.GetRLock(globalListingTimeout); err != nil {
return lmi, err
}
var disk StorageAPI
for _, disk = range xl.getLoadBalancedDisks() {
if disk == nil {
@ -482,7 +486,9 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st
// contents of ".minio.sys/multipart/object/"
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
pathJoin(bucket, object))
objectMPartPathLock.Lock()
if err := objectMPartPathLock.GetLock(globalOperationTimeout); err != nil {
return "", err
}
defer objectMPartPathLock.Unlock()
uploadID := mustGetUUID()
@ -580,7 +586,9 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// pre-check upload id lock.
preUploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath)
preUploadIDLock.RLock()
if err := preUploadIDLock.GetRLock(globalOperationTimeout); err != nil {
return pi, err
}
// Validates if upload ID exists.
if !xl.isUploadIDExists(bucket, object, uploadID) {
preUploadIDLock.RUnlock()
@ -685,7 +693,9 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// post-upload check (write) lock
postUploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath)
postUploadIDLock.Lock()
if err = postUploadIDLock.GetLock(globalOperationTimeout); err != nil {
return pi, err
}
defer postUploadIDLock.Unlock()
// Validate again if upload ID still exists.
@ -836,7 +846,9 @@ func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberM
// abort-multipart-upload or complete-multipart-upload.
uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
pathJoin(bucket, object, uploadID))
uploadIDLock.Lock()
if err := uploadIDLock.GetLock(globalListingTimeout); err != nil {
return lpi, err
}
defer uploadIDLock.Unlock()
if !xl.isUploadIDExists(bucket, object, uploadID) {
@ -865,7 +877,9 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
// multipart upload
uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
pathJoin(bucket, object, uploadID))
uploadIDLock.Lock()
if err := uploadIDLock.GetLock(globalOperationTimeout); err != nil {
return oi, err
}
defer uploadIDLock.Unlock()
if !xl.isUploadIDExists(bucket, object, uploadID) {
@ -1037,7 +1051,9 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
// uploads.json behind.
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
pathJoin(bucket, object))
objectMPartPathLock.Lock()
if err = objectMPartPathLock.GetLock(globalOperationTimeout); err != nil {
return oi, toObjectErr(err, bucket, object)
}
defer objectMPartPathLock.Unlock()
// remove entry from uploads.json with quorum
@ -1106,7 +1122,9 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e
// multipart request.
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
pathJoin(bucket, object))
objectMPartPathLock.Lock()
if err = objectMPartPathLock.GetLock(globalOperationTimeout); err != nil {
return toObjectErr(err, bucket, object)
}
defer objectMPartPathLock.Unlock()
// remove entry from uploads.json with quorum
@ -1138,7 +1156,9 @@ func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error
// complete-multipart-upload or put-object-part.
uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
pathJoin(bucket, object, uploadID))
uploadIDLock.Lock()
if err := uploadIDLock.GetLock(globalOperationTimeout); err != nil {
return err
}
defer uploadIDLock.Unlock()
if !xl.isUploadIDExists(bucket, object, uploadID) {

@ -41,6 +41,7 @@ func log(msg ...interface{}) {
// DRWMutexAcquireTimeout - tolerance limit to wait for lock acquisition before.
const DRWMutexAcquireTimeout = 1 * time.Second // 1 second.
const drwMutexInfinite = time.Duration(1<<63 - 1)
// A DRWMutex is a distributed mutual exclusion lock.
type DRWMutex struct {
@ -79,29 +80,52 @@ func NewDRWMutex(name string) *DRWMutex {
func (dm *DRWMutex) Lock() {
isReadLock := false
dm.lockBlocking(isReadLock)
dm.lockBlocking(drwMutexInfinite, isReadLock)
}
// GetLock tries to get a write lock on dm before the timeout elapses.
//
// If the lock is already in use, the calling go routine
// blocks until either the mutex becomes available and return success or
// more time has passed than the timeout value and return false.
func (dm *DRWMutex) GetLock(timeout time.Duration) (locked bool) {
isReadLock := false
return dm.lockBlocking(timeout, isReadLock)
}
// RLock holds a read lock on dm.
//
// If one or more read lock are already in use, it will grant another lock.
// If one or more read locks are already in use, it will grant another lock.
// Otherwise the calling go routine blocks until the mutex is available.
func (dm *DRWMutex) RLock() {
isReadLock := true
dm.lockBlocking(isReadLock)
dm.lockBlocking(drwMutexInfinite, isReadLock)
}
// GetRLock tries to get a read lock on dm before the timeout elapses.
//
// If one or more read locks are already in use, it will grant another lock.
// Otherwise the calling go routine blocks until either the mutex becomes
// available and return success or more time has passed than the timeout
// value and return false.
func (dm *DRWMutex) GetRLock(timeout time.Duration) (locked bool) {
isReadLock := true
return dm.lockBlocking(timeout, isReadLock)
}
// lockBlocking will acquire either a read or a write lock
// lockBlocking will try to acquire either a read or a write lock
//
// The call will block until the lock is granted using a built-in
// timing randomized back-off algorithm to try again until successful
func (dm *DRWMutex) lockBlocking(isReadLock bool) {
doneCh := make(chan struct{})
// The function will loop using a built-in timing randomized back-off
// algorithm until either the lock is acquired successfully or more
// time has elapsed than the timeout value.
func (dm *DRWMutex) lockBlocking(timeout time.Duration, isReadLock bool) (locked bool) {
doneCh, start := make(chan struct{}), time.Now().UTC()
defer close(doneCh)
// We timed out on the previous lock, incrementally wait
// for a longer back-off time and try again afterwards.
// Use incremental back-off algorithm for repeated attempts to acquire the lock
for range newRetryTimerSimple(doneCh) {
// Create temp array on stack.
locks := make([]string, dnodeCount)
@ -122,11 +146,15 @@ func (dm *DRWMutex) lockBlocking(isReadLock bool) {
copy(dm.writeLocks, locks[:])
}
return
return true
}
if time.Now().UTC().Sub(start) >= timeout { // Are we past the timeout?
break
}
// We timed out on the previous lock, incrementally wait
// Failed to acquire the lock on this attempt, incrementally wait
// for a longer back-off time and try again afterwards.
}
return false
}
// lock tries to acquire the distributed lock, returning true or false.

@ -0,0 +1,121 @@
# lsync
Local syncing package with support for timeouts. This package offers both a `sync.Mutex` and `sync.RWMutex` compatible interface.
Additionally it provides `lsync.LFrequentAccess` which uses an atomic load and store of a consistently typed value. This can be usefull for shared data structures that are frequently read but infrequently updated (using an copy-on-write mechanism) without the need for protection with a regular mutex.
### Example of LRWMutex
```go
// Create RWMutex compatible mutex
lrwm := NewLRWMutex()
// Try to get lock within timeout
if !lrwm.GetLock(1000 * time.Millisecond) {
fmt.Println("Timeout occured")
return
}
// Acquired lock, do your stuff ...
lrwm.Unlock() // Release lock
```
### Example of LFrequentAccess
````go
type Map map[string]string
// Create new LFrequentAccess for type Map
freqaccess := NewLFrequentAccess(make(Map))
cur := freqaccess.LockBeforeSet().(Map) // Lock in order to update
mp := make(Map) // Create new Map
for k, v := range cur { // Copy over old contents
mp[k] = v
}
mp[key] = val // Add new value
freqaccess.SetNewCopyAndUnlock(mp) // Exchange old version of map with new version
mpReadOnly := freqaccess.ReadOnlyAccess().(Map) // Get read only access to Map
fmt.Println(mpReadOnly[key]) // Safe access with no further synchronization
````
## Design
The design is pretty straightforward in the sense that `lsync` tries to get a lock in a loop with an exponential [backoff](https://www.awsarchitectureblog.com/2015/03/backoff.html) algorithm. The algorithm is configurable in terms of initial delay and jitter.
If the lock is acquired before the timeout has occured, it will return success to the caller and the caller can proceed as intended. The caller must call `unlock` after the operation that is to be protected has completed in order to release the lock.
When more time has elapsed than the timeout value the lock loop will cancel out and signal back to the caller that the lock has not been acquired. In this case the caller must _not_ call `unlock` since no lock was obtained. Typically it should signal an error back up the call stack so that errors can be dealt with appropriately at the correct level.
Note that this algorithm is not 'real-time' in the sense that it will time out exactly at the timeout value, but instead a (short) while after the timeout has lapsed. It is even possible that (in edge cases) a succesful lock can be returned a very short time after the timeout has lapsed.
## API
#### LMutex
```go
func (lm *LMutex) Lock()
func (lm *LMutex) GetLock(timeout time.Duration) (locked bool)
func (lm *LMutex) Unlock()
```
#### LRWMutex
```go
func (lm *LRWMutex) Lock()
func (lm *LRWMutex) GetLock(timeout time.Duration) (locked bool)
func (lm *LRWMutex) RLock()
func (lm *LRWMutex) GetRLock(timeout time.Duration) (locked bool)
func (lm *LRWMutex) Unlock()
func (lm *LRWMutex) RUnlock()
```
#### LFrequentAccess
```go
func (lm *LFrequentAccess) ReadOnlyAccess() (constReadOnly interface{})
func (lm *LFrequentAccess) LockBeforeSet() (constCurVersion interface{})
func (lm *LFrequentAccess) SetNewCopyAndUnlock(newCopy interface{})
```
## Benchmarks
### sync.Mutex vs lsync.LMutex
(with `defaultRetryUnit` and `defaultRetryCap` at 10 microsec)
```
BenchmarkMutex-8 111 1579 +1322.52%
BenchmarkMutexSlack-8 120 1033 +760.83%
BenchmarkMutexWork-8 133 1604 +1106.02%
BenchmarkMutexWorkSlack-8 137 1038 +657.66%
```
(with `defaultRetryUnit` and `defaultRetryCap` at 1 millisec)
```
benchmark old ns/op new ns/op delta
BenchmarkMutex-8 111 2649 +2286.49%
BenchmarkMutexSlack-8 120 1719 +1332.50%
BenchmarkMutexWork-8 133 2637 +1882.71%
BenchmarkMutexWorkSlack-8 137 1729 +1162.04%
```
(with `defaultRetryUnit` and `defaultRetryCap` at 100 millisec)
```
benchmark old ns/op new ns/op delta
BenchmarkMutex-8 111 2649 +2286.49%
BenchmarkMutexSlack-8 120 2478 +1965.00%
BenchmarkMutexWork-8 133 2547 +1815.04%
BenchmarkMutexWorkSlack-8 137 2683 +1858.39%
```
### LFrequentAccess
An `lsync.LFrequentAccess` provides an atomic load and store of a consistently typed value.
```
benchmark old ns/op new ns/op delta
BenchmarkLFrequentAccessMap-8 114 4.67 -95.90%
BenchmarkLFrequentAccessSlice-8 109 5.95 -94.54%
```

@ -0,0 +1,64 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package lsync
import (
"sync"
"sync/atomic"
)
// LFrequentAccess is a synchronization mechanism for frequently read yet
// infrequently updated data structures. It uses a copy-on-write paradigm
// for updates to the data.
type LFrequentAccess struct {
state atomic.Value
writeLock sync.Mutex
locked bool
}
// NewLFrequentAccess - initializes a new LFrequentAccess.
func NewLFrequentAccess(x interface{}) *LFrequentAccess {
lm := &LFrequentAccess{}
lm.state.Store(x)
return lm
}
// ReadOnlyAccess returns the data intented for reads without further synchronization
func (lm *LFrequentAccess) ReadOnlyAccess() (constReadOnly interface{}) {
return lm.state.Load()
}
// LockBeforeSet must be called before updates of the data in order to synchronize
// with other potential writers. It returns the current version of the data that
// needs to be copied over into a new version.
func (lm *LFrequentAccess) LockBeforeSet() (constCurVersion interface{}) {
lm.writeLock.Lock()
lm.locked = true
return lm.state.Load()
}
// SetNewCopyAndUnlock updates the data with a new modified copy and unlocks
// simultaneously. Make sure to call LockBeforeSet beforehand to synchronize
// between potential parallel writers (and not lose any updated information).
func (lm *LFrequentAccess) SetNewCopyAndUnlock(newCopy interface{}) {
if !lm.locked {
panic("SetNewCopyAndUnlock: locked state is false (did you call LockBeforeSet?)")
}
lm.state.Store(newCopy)
lm.locked = false
lm.writeLock.Unlock()
}

@ -0,0 +1,78 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package lsync
import (
"sync/atomic"
"time"
)
// A LMutex is a mutual exclusion lock with timeouts.
type LMutex struct {
state int64
}
// NewLMutex - initializes a new lsync mutex.
func NewLMutex() *LMutex {
return &LMutex{}
}
// Lock holds a lock on lm.
//
// If the lock is already in use, the calling go routine
// blocks until the mutex is available.
func (lm *LMutex) Lock() {
lm.lockLoop(time.Duration(1<<63 - 1))
}
// GetLock tries to get a write lock on lm before the timeout occurs.
func (lm *LMutex) GetLock(timeout time.Duration) (locked bool) {
return lm.lockLoop(timeout)
}
// lockLoop will acquire either a read or a write lock
//
// The call will block until the lock is granted using a built-in
// timing randomized back-off algorithm to try again until successful
func (lm *LMutex) lockLoop(timeout time.Duration) bool {
doneCh, start := make(chan struct{}), time.Now().UTC()
defer close(doneCh)
// We timed out on the previous lock, incrementally wait
// for a longer back-off time and try again afterwards.
for range newRetryTimerSimple(doneCh) {
// Try to acquire the lock.
if atomic.CompareAndSwapInt64(&lm.state, NOLOCKS, WRITELOCK) {
return true
} else if time.Now().UTC().Sub(start) >= timeout { // Are we past the timeout?
break
}
// We timed out on the previous lock, incrementally wait
// for a longer back-off time and try again afterwards.
}
return false
}
// Unlock unlocks the lock.
//
// It is a run-time error if lm is not locked on entry to Unlock.
func (lm *LMutex) Unlock() {
if !atomic.CompareAndSwapInt64(&lm.state, WRITELOCK, NOLOCKS) {
panic("Trying to Unlock() while no Lock() is active")
}
}

@ -0,0 +1,180 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package lsync
import (
"sync"
"time"
)
const (
WRITELOCK = -1 + iota
NOLOCKS
READLOCKS
)
// A LRWMutex is a mutual exclusion lock with timeouts.
type LRWMutex struct {
state int64
m sync.Mutex // Mutex to prevent multiple simultaneous locks
}
// NewLRWMutex - initializes a new lsync RW mutex.
func NewLRWMutex() *LRWMutex {
return &LRWMutex{}
}
// Lock holds a write lock on lm.
//
// If the lock is already in use, the calling go routine
// blocks until the mutex is available.
func (lm *LRWMutex) Lock() {
isWriteLock := true
lm.lockLoop(time.Duration(1<<63-1), isWriteLock)
}
// GetLock tries to get a write lock on lm before the timeout occurs.
func (lm *LRWMutex) GetLock(timeout time.Duration) (locked bool) {
isWriteLock := true
return lm.lockLoop(timeout, isWriteLock)
}
// RLock holds a read lock on lm.
//
// If one or more read lock are already in use, it will grant another lock.
// Otherwise the calling go routine blocks until the mutex is available.
func (lm *LRWMutex) RLock() {
isWriteLock := false
lm.lockLoop(time.Duration(1<<63-1), isWriteLock)
}
// GetRLock tries to get a read lock on lm before the timeout occurs.
func (lm *LRWMutex) GetRLock(timeout time.Duration) (locked bool) {
isWriteLock := false
return lm.lockLoop(timeout, isWriteLock)
}
// lockLoop will acquire either a read or a write lock
//
// The call will block until the lock is granted using a built-in
// timing randomized back-off algorithm to try again until successful
func (lm *LRWMutex) lockLoop(timeout time.Duration, isWriteLock bool) bool {
doneCh, start := make(chan struct{}), time.Now().UTC()
defer close(doneCh)
// We timed out on the previous lock, incrementally wait
// for a longer back-off time and try again afterwards.
for range newRetryTimerSimple(doneCh) {
// Try to acquire the lock.
var success bool
{
lm.m.Lock()
if isWriteLock {
if lm.state == NOLOCKS {
lm.state = WRITELOCK
success = true
}
} else {
if lm.state != WRITELOCK {
lm.state += 1
success = true
}
}
lm.m.Unlock()
}
if success {
return true
}
if time.Now().UTC().Sub(start) >= timeout { // Are we past the timeout?
break
}
// We timed out on the previous lock, incrementally wait
// for a longer back-off time and try again afterwards.
}
return false
}
// Unlock unlocks the write lock.
//
// It is a run-time error if lm is not locked on entry to Unlock.
func (lm *LRWMutex) Unlock() {
isWriteLock := true
success := lm.unlock(isWriteLock)
if !success {
panic("Trying to Unlock() while no Lock() is active")
}
}
// RUnlock releases a read lock held on lm.
//
// It is a run-time error if lm is not locked on entry to RUnlock.
func (lm *LRWMutex) RUnlock() {
isWriteLock := false
success := lm.unlock(isWriteLock)
if !success {
panic("Trying to RUnlock() while no RLock() is active")
}
}
func (lm *LRWMutex) unlock(isWriteLock bool) (unlocked bool) {
lm.m.Lock()
// Try to release lock.
if isWriteLock {
if lm.state == WRITELOCK {
lm.state = NOLOCKS
unlocked = true
}
} else {
if lm.state == WRITELOCK || lm.state == NOLOCKS {
unlocked = false // unlocked called without any active read locks
} else {
lm.state -= 1
unlocked = true
}
}
lm.m.Unlock()
return unlocked
}
// ForceUnlock will forcefully clear a write or read lock.
func (lm *LRWMutex) ForceUnlock() {
lm.m.Lock()
lm.state = NOLOCKS
lm.m.Unlock()
}
// DRLocker returns a sync.Locker interface that implements
// the Lock and Unlock methods by calling drw.RLock and drw.RUnlock.
func (dm *LRWMutex) DRLocker() sync.Locker {
return (*drlocker)(dm)
}
type drlocker LRWMutex
func (dr *drlocker) Lock() { (*LRWMutex)(dr).RLock() }
func (dr *drlocker) Unlock() { (*LRWMutex)(dr).RUnlock() }

@ -0,0 +1,142 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package lsync
import (
"math/rand"
"sync"
"time"
)
// lockedRandSource provides protected rand source, implements rand.Source interface.
type lockedRandSource struct {
lk sync.Mutex
src rand.Source
}
// Int63 returns a non-negative pseudo-random 63-bit integer as an
// int64.
func (r *lockedRandSource) Int63() (n int64) {
r.lk.Lock()
n = r.src.Int63()
r.lk.Unlock()
return
}
// Seed uses the provided seed value to initialize the generator to a
// deterministic state.
func (r *lockedRandSource) Seed(seed int64) {
r.lk.Lock()
r.src.Seed(seed)
r.lk.Unlock()
}
// MaxJitter will randomize over the full exponential backoff time
const MaxJitter = 1.0
// NoJitter disables the use of jitter for randomizing the
// exponential backoff time
const NoJitter = 0.0
// Global random source for fetching random values.
var globalRandomSource = rand.New(&lockedRandSource{
src: rand.NewSource(time.Now().UTC().UnixNano()),
})
// newRetryTimerJitter creates a timer with exponentially increasing delays
// until the maximum retry attempts are reached. - this function is a fully
// configurable version, meant for only advanced use cases. For the most part
// one should use newRetryTimerSimple and newRetryTimer.
func newRetryTimerWithJitter(unit time.Duration, cap time.Duration, jitter float64, doneCh chan struct{}) <-chan int {
attemptCh := make(chan int)
// normalize jitter to the range [0, 1.0]
if jitter < NoJitter {
jitter = NoJitter
}
if jitter > MaxJitter {
jitter = MaxJitter
}
// computes the exponential backoff duration according to
// https://www.awsarchitectureblog.com/2015/03/backoff.html
exponentialBackoffWait := func(attempt int) time.Duration {
// 1<<uint(attempt) below could overflow, so limit the value of attempt
maxAttempt := 30
if attempt > maxAttempt {
attempt = maxAttempt
}
//sleep = random_between(0, min(cap, base * 2 ** attempt))
sleep := unit * time.Duration(1<<uint(attempt))
if sleep > cap {
sleep = cap
}
if jitter != NoJitter {
sleep -= time.Duration(globalRandomSource.Float64() * float64(sleep) * jitter)
}
return sleep
}
go func() {
defer close(attemptCh)
nextBackoff := 0
// Channel used to signal after the expiry of backoff wait seconds.
var timer *time.Timer
for {
select { // Attempts starts.
case attemptCh <- nextBackoff:
nextBackoff++
case <-doneCh:
// Stop the routine.
return
}
timer = time.NewTimer(exponentialBackoffWait(nextBackoff))
// wait till next backoff time or till doneCh gets a message.
select {
case <-timer.C:
case <-doneCh:
// stop the timer and return.
timer.Stop()
return
}
}
}()
// Start reading..
return attemptCh
}
// Default retry constants.
const (
defaultRetryUnit = 10 * time.Millisecond // 10 millisecond.
defaultRetryCap = 10 * time.Millisecond // 10 millisecond.
)
// newRetryTimer creates a timer with exponentially increasing delays
// until the maximum retry attempts are reached. - this function provides
// resulting retry values to be of maximum jitter.
func newRetryTimer(unit time.Duration, cap time.Duration, doneCh chan struct{}) <-chan int {
return newRetryTimerWithJitter(unit, cap, MaxJitter, doneCh)
}
// newRetryTimerSimple creates a timer with exponentially increasing delays
// until the maximum retry attempts are reached. - this function is a
// simpler version with all default values.
func newRetryTimerSimple(doneCh chan struct{}) <-chan int {
return newRetryTimerWithJitter(defaultRetryUnit, defaultRetryCap, MaxJitter, doneCh)
}

12
vendor/vendor.json vendored

@ -290,16 +290,22 @@
"revisionTime": "2017-02-27T07:32:28Z"
},
{
"checksumSHA1": "vrIbl0L+RLwyPRCxMss5+eZtADE=",
"checksumSHA1": "hQ8i4UPTbFW68oPJP3uFxYTLfxk=",
"path": "github.com/minio/dsync",
"revision": "535db94aebce49cacce4de9c6f5f5821601281cd",
"revisionTime": "2017-04-19T20:41:15Z"
"revision": "a26b9de6c8006208d10a9517720d3212b42c374e",
"revisionTime": "2017-05-25T17:53:53Z"
},
{
"path": "github.com/minio/go-homedir",
"revision": "0b1069c753c94b3633cc06a1995252dbcc27c7a6",
"revisionTime": "2016-02-15T17:25:11+05:30"
},
{
"checksumSHA1": "7/Hdd23/j4/yt4BXa+h0kqz1yjw=",
"path": "github.com/minio/lsync",
"revision": "2d7c40f41402df6f0713a749a011cddc12d1b2f3",
"revisionTime": "2017-08-09T21:08:26Z"
},
{
"path": "github.com/minio/mc/pkg/console",
"revision": "db6b4f13442b26995f04b3b2b31b006cae7786e6",

Loading…
Cancel
Save