diff --git a/pkg/dsync/drwmutex.go b/pkg/dsync/drwmutex.go index 8731e3537..054630332 100644 --- a/pkg/dsync/drwmutex.go +++ b/pkg/dsync/drwmutex.go @@ -133,7 +133,7 @@ func (dm *DRWMutex) GetRLock(ctx context.Context, id, source string, opts Option } const ( - lockRetryInterval = 50 * time.Millisecond + lockRetryInterval = 100 * time.Millisecond ) // lockBlocking will try to acquire either a read or a write lock @@ -259,7 +259,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is }(index, isReadLock, c) } - quorumMet := false + quorumLocked := false wg.Add(1) go func(isReadLock bool) { @@ -289,16 +289,16 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is done = true // Increment the number of grants received from the buffered channel. i++ - releaseAll(ds, owner, locks, isReadLock, restClnts, lockNames...) + releaseAll(ds, quorum, owner, locks, isReadLock, restClnts, lockNames...) } } case <-timeout: done = true // timeout happened, maybe one of the nodes is slow, count // number of locks to check whether we have quorum or not - if !checkQuorumMet(locks, quorum) { + if !checkQuorumLocked(locks, quorum) { log("Quorum not met after timeout\n") - releaseAll(ds, owner, locks, isReadLock, restClnts, lockNames...) + releaseAll(ds, quorum, owner, locks, isReadLock, restClnts, lockNames...) } else { log("Quorum met after timeout\n") } @@ -310,7 +310,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is } // Count locks in order to determine whether we have quorum or not - quorumMet = checkQuorumMet(locks, quorum) + quorumLocked = checkQuorumLocked(locks, quorum) // Signal that we have the quorum wg.Done() @@ -331,11 +331,22 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is wg.Wait() - return quorumMet + return quorumLocked } -// checkQuorumMet determines whether we have acquired the required quorum of underlying locks or not -func checkQuorumMet(locks *[]string, quorum int) bool { +// checkQuorumUnlocked determines whether we have unlocked the required quorum of underlying locks or not +func checkQuorumUnlocked(locks *[]string, quorum int) bool { + count := 0 + for _, uid := range *locks { + if uid == "" { + count++ + } + } + return count >= quorum +} + +// checkQuorumLocked determines whether we have locked the required quorum of underlying locks or not +func checkQuorumLocked(locks *[]string, quorum int) bool { count := 0 for _, uid := range *locks { if isLocked(uid) { @@ -347,13 +358,21 @@ func checkQuorumMet(locks *[]string, quorum int) bool { } // releaseAll releases all locks that are marked as locked -func releaseAll(ds *Dsync, owner string, locks *[]string, isReadLock bool, restClnts []NetLocker, lockNames ...string) { - for lock := range restClnts { - if isLocked((*locks)[lock]) { - sendRelease(ds, restClnts[lock], owner, (*locks)[lock], isReadLock, lockNames...) - (*locks)[lock] = "" - } +func releaseAll(ds *Dsync, quorum int, owner string, locks *[]string, isReadLock bool, restClnts []NetLocker, lockNames ...string) bool { + var wg sync.WaitGroup + for lockID := range restClnts { + wg.Add(1) + go func(lockID int) { + defer wg.Done() + if isLocked((*locks)[lockID]) { + if sendRelease(ds, restClnts[lockID], owner, (*locks)[lockID], isReadLock, lockNames...) { + (*locks)[lockID] = "" + } + } + }(lockID) } + wg.Wait() + return checkQuorumUnlocked(locks, quorum) } // Unlock unlocks the write lock. @@ -383,12 +402,27 @@ func (dm *DRWMutex) Unlock() { // Copy write locks to stack array copy(locks, dm.writeLocks[:]) - // Clear write locks array - dm.writeLocks = make([]string, len(restClnts)) + } + + // Tolerance is not set, defaults to half of the locker clients. + tolerance := len(restClnts) / 2 + + // Quorum is effectively = total clients subtracted with tolerance limit + quorum := len(restClnts) - tolerance + + // In situations for write locks, as a special case + // to avoid split brains we make sure to acquire + // quorum + 1 when tolerance is exactly half of the + // total locker clients. + if quorum == tolerance { + quorum++ } isReadLock := false - unlock(dm.clnt, owner, locks, isReadLock, restClnts, dm.Names...) + r := rand.New(rand.NewSource(time.Now().UnixNano())) + for !releaseAll(dm.clnt, quorum, owner, &locks, isReadLock, restClnts, dm.Names...) { + time.Sleep(time.Duration(r.Float64() * float64(lockRetryInterval))) + } } // RUnlock releases a read lock held on dm. @@ -412,29 +446,24 @@ func (dm *DRWMutex) RUnlock() { dm.readersLocks = dm.readersLocks[1:] } - isReadLock := true - unlock(dm.clnt, owner, locks, isReadLock, restClnts, dm.Names...) -} - -func unlock(ds *Dsync, owner string, locks []string, isReadLock bool, restClnts []NetLocker, names ...string) { - - // We don't need to synchronously wait until we have released all the locks (or the quorum) - // (a subsequent lock will retry automatically in case it would fail to get quorum) + // Tolerance is not set, defaults to half of the locker clients. + tolerance := len(restClnts) / 2 - for index, c := range restClnts { + // Quorum is effectively = total clients subtracted with tolerance limit + quorum := len(restClnts) - tolerance - if isLocked(locks[index]) { - // broadcast lock release to all nodes that granted the lock - sendRelease(ds, c, owner, locks[index], isReadLock, names...) - } + isReadLock := true + r := rand.New(rand.NewSource(time.Now().UnixNano())) + for !releaseAll(dm.clnt, quorum, owner, &locks, isReadLock, restClnts, dm.Names...) { + time.Sleep(time.Duration(r.Float64() * float64(lockRetryInterval))) } } // sendRelease sends a release message to a node that previously granted a lock -func sendRelease(ds *Dsync, c NetLocker, owner string, uid string, isReadLock bool, names ...string) { +func sendRelease(ds *Dsync, c NetLocker, owner string, uid string, isReadLock bool, names ...string) bool { if c == nil { log("Unable to call RUnlock failed with %s\n", errors.New("netLocker is offline")) - return + return false } args := LockArgs{ @@ -442,13 +471,18 @@ func sendRelease(ds *Dsync, c NetLocker, owner string, uid string, isReadLock bo UID: uid, Resources: names, } + if isReadLock { if _, err := c.RUnlock(args); err != nil { log("dsync: Unable to call RUnlock failed with %s for %#v at %s\n", err, args, c) + return false } } else { if _, err := c.Unlock(args); err != nil { log("dsync: Unable to call Unlock failed with %s for %#v at %s\n", err, args, c) + return false } } + + return true }