|
|
|
@ -86,7 +86,15 @@ func NewDRWMutex(clnt *Dsync, names ...string) *DRWMutex { |
|
|
|
|
func (dm *DRWMutex) Lock(id, source string) { |
|
|
|
|
|
|
|
|
|
isReadLock := false |
|
|
|
|
dm.lockBlocking(context.Background(), drwMutexInfinite, id, source, isReadLock) |
|
|
|
|
dm.lockBlocking(context.Background(), id, source, isReadLock, Options{ |
|
|
|
|
Timeout: drwMutexInfinite, |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Options lock options.
|
|
|
|
|
type Options struct { |
|
|
|
|
Timeout time.Duration |
|
|
|
|
Tolerance int |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GetLock tries to get a write lock on dm before the timeout elapses.
|
|
|
|
@ -94,10 +102,10 @@ func (dm *DRWMutex) Lock(id, source string) { |
|
|
|
|
// If the lock is already in use, the calling go routine
|
|
|
|
|
// blocks until either the mutex becomes available and return success or
|
|
|
|
|
// more time has passed than the timeout value and return false.
|
|
|
|
|
func (dm *DRWMutex) GetLock(ctx context.Context, id, source string, timeout time.Duration) (locked bool) { |
|
|
|
|
func (dm *DRWMutex) GetLock(ctx context.Context, id, source string, opts Options) (locked bool) { |
|
|
|
|
|
|
|
|
|
isReadLock := false |
|
|
|
|
return dm.lockBlocking(ctx, timeout, id, source, isReadLock) |
|
|
|
|
return dm.lockBlocking(ctx, id, source, isReadLock, opts) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// RLock holds a read lock on dm.
|
|
|
|
@ -107,7 +115,9 @@ func (dm *DRWMutex) GetLock(ctx context.Context, id, source string, timeout time |
|
|
|
|
func (dm *DRWMutex) RLock(id, source string) { |
|
|
|
|
|
|
|
|
|
isReadLock := true |
|
|
|
|
dm.lockBlocking(context.Background(), drwMutexInfinite, id, source, isReadLock) |
|
|
|
|
dm.lockBlocking(context.Background(), id, source, isReadLock, Options{ |
|
|
|
|
Timeout: drwMutexInfinite, |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GetRLock tries to get a read lock on dm before the timeout elapses.
|
|
|
|
@ -116,10 +126,10 @@ func (dm *DRWMutex) RLock(id, source string) { |
|
|
|
|
// Otherwise the calling go routine blocks until either the mutex becomes
|
|
|
|
|
// available and return success or more time has passed than the timeout
|
|
|
|
|
// value and return false.
|
|
|
|
|
func (dm *DRWMutex) GetRLock(ctx context.Context, id, source string, timeout time.Duration) (locked bool) { |
|
|
|
|
func (dm *DRWMutex) GetRLock(ctx context.Context, id, source string, opts Options) (locked bool) { |
|
|
|
|
|
|
|
|
|
isReadLock := true |
|
|
|
|
return dm.lockBlocking(ctx, timeout, id, source, isReadLock) |
|
|
|
|
return dm.lockBlocking(ctx, id, source, isReadLock, opts) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// lockBlocking will try to acquire either a read or a write lock
|
|
|
|
@ -127,10 +137,10 @@ func (dm *DRWMutex) GetRLock(ctx context.Context, id, source string, timeout tim |
|
|
|
|
// The function will loop using a built-in timing randomized back-off
|
|
|
|
|
// algorithm until either the lock is acquired successfully or more
|
|
|
|
|
// time has elapsed than the timeout value.
|
|
|
|
|
func (dm *DRWMutex) lockBlocking(ctx context.Context, timeout time.Duration, id, source string, isReadLock bool) (locked bool) { |
|
|
|
|
func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadLock bool, opts Options) (locked bool) { |
|
|
|
|
restClnts := dm.clnt.GetLockersFn() |
|
|
|
|
|
|
|
|
|
retryCtx, cancel := context.WithTimeout(ctx, timeout) |
|
|
|
|
retryCtx, cancel := context.WithTimeout(ctx, opts.Timeout) |
|
|
|
|
|
|
|
|
|
defer cancel() |
|
|
|
|
|
|
|
|
@ -140,7 +150,7 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, timeout time.Duration, id, |
|
|
|
|
locks := make([]string, len(restClnts)) |
|
|
|
|
|
|
|
|
|
// Try to acquire the lock.
|
|
|
|
|
locked = lock(retryCtx, dm.clnt, &locks, id, source, isReadLock, dm.Names...) |
|
|
|
|
locked = lock(retryCtx, dm.clnt, &locks, id, source, isReadLock, opts.Tolerance, dm.Names...) |
|
|
|
|
if !locked { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
@ -167,10 +177,29 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, timeout time.Duration, id, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// lock tries to acquire the distributed lock, returning true or false.
|
|
|
|
|
func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, isReadLock bool, lockNames ...string) bool { |
|
|
|
|
func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, isReadLock bool, tolerance int, lockNames ...string) bool { |
|
|
|
|
|
|
|
|
|
restClnts := ds.GetLockersFn() |
|
|
|
|
|
|
|
|
|
// Tolerance is not set, defaults to half of the locker clients.
|
|
|
|
|
if tolerance == 0 { |
|
|
|
|
tolerance = len(restClnts) / 2 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Quorum is effectively = total clients subtracted with tolerance limit
|
|
|
|
|
quorum := len(restClnts) - tolerance |
|
|
|
|
if !isReadLock { |
|
|
|
|
// In situations for write locks, as a special case
|
|
|
|
|
// to avoid split brains we make sure to acquire
|
|
|
|
|
// quorum + 1 when tolerance is exactly half of the
|
|
|
|
|
// total locker clients.
|
|
|
|
|
if quorum == tolerance { |
|
|
|
|
quorum++ |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
tolerance = len(restClnts) - quorum |
|
|
|
|
|
|
|
|
|
// Create buffered channel of size equal to total number of nodes.
|
|
|
|
|
ch := make(chan Granted, len(restClnts)) |
|
|
|
|
defer close(ch) |
|
|
|
@ -217,7 +246,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is |
|
|
|
|
}(index, isReadLock, c) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
quorum := false |
|
|
|
|
quorumMet := false |
|
|
|
|
|
|
|
|
|
wg.Add(1) |
|
|
|
|
go func(isReadLock bool) { |
|
|
|
@ -232,9 +261,6 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is |
|
|
|
|
done := false |
|
|
|
|
timeout := time.After(DRWMutexAcquireTimeout) |
|
|
|
|
|
|
|
|
|
dquorumReads := (len(restClnts) + 1) / 2 |
|
|
|
|
dquorum := dquorumReads + 1 |
|
|
|
|
|
|
|
|
|
for ; i < len(restClnts); i++ { // Loop until we acquired all locks
|
|
|
|
|
|
|
|
|
|
select { |
|
|
|
@ -244,8 +270,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is |
|
|
|
|
(*locks)[grant.index] = grant.lockUID |
|
|
|
|
} else { |
|
|
|
|
locksFailed++ |
|
|
|
|
if !isReadLock && locksFailed > len(restClnts)-dquorum || |
|
|
|
|
isReadLock && locksFailed > len(restClnts)-dquorumReads { |
|
|
|
|
if locksFailed > tolerance { |
|
|
|
|
// We know that we are not going to get the lock anymore,
|
|
|
|
|
// so exit out and release any locks that did get acquired
|
|
|
|
|
done = true |
|
|
|
@ -258,7 +283,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is |
|
|
|
|
done = true |
|
|
|
|
// timeout happened, maybe one of the nodes is slow, count
|
|
|
|
|
// number of locks to check whether we have quorum or not
|
|
|
|
|
if !quorumMet(locks, isReadLock, dquorum, dquorumReads) { |
|
|
|
|
if !checkQuorumMet(locks, quorum) { |
|
|
|
|
log("Quorum not met after timeout\n") |
|
|
|
|
releaseAll(ds, locks, isReadLock, restClnts, lockNames...) |
|
|
|
|
} else { |
|
|
|
@ -272,7 +297,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Count locks in order to determine whether we have quorum or not
|
|
|
|
|
quorum = quorumMet(locks, isReadLock, dquorum, dquorumReads) |
|
|
|
|
quorumMet = checkQuorumMet(locks, quorum) |
|
|
|
|
|
|
|
|
|
// Signal that we have the quorum
|
|
|
|
|
wg.Done() |
|
|
|
@ -292,12 +317,11 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is |
|
|
|
|
|
|
|
|
|
wg.Wait() |
|
|
|
|
|
|
|
|
|
return quorum |
|
|
|
|
return quorumMet |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// quorumMet determines whether we have acquired the required quorum of underlying locks or not
|
|
|
|
|
func quorumMet(locks *[]string, isReadLock bool, quorum, quorumReads int) bool { |
|
|
|
|
|
|
|
|
|
// checkQuorumMet determines whether we have acquired the required quorum of underlying locks or not
|
|
|
|
|
func checkQuorumMet(locks *[]string, quorum int) bool { |
|
|
|
|
count := 0 |
|
|
|
|
for _, uid := range *locks { |
|
|
|
|
if isLocked(uid) { |
|
|
|
@ -305,14 +329,7 @@ func quorumMet(locks *[]string, isReadLock bool, quorum, quorumReads int) bool { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var metQuorum bool |
|
|
|
|
if isReadLock { |
|
|
|
|
metQuorum = count >= quorumReads |
|
|
|
|
} else { |
|
|
|
|
metQuorum = count >= quorum |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return metQuorum |
|
|
|
|
return count >= quorum |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// releaseAll releases all locks that are marked as locked
|
|
|
|
|