block unlocks if there are quorum failures (#10582)

fixes #10418
master
Harshavardhana 4 years ago committed by GitHub
parent 209680e89f
commit 849fcf0127
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 98
      pkg/dsync/drwmutex.go

@ -133,7 +133,7 @@ func (dm *DRWMutex) GetRLock(ctx context.Context, id, source string, opts Option
} }
const ( const (
lockRetryInterval = 50 * time.Millisecond lockRetryInterval = 100 * time.Millisecond
) )
// lockBlocking will try to acquire either a read or a write lock // lockBlocking will try to acquire either a read or a write lock
@ -259,7 +259,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
}(index, isReadLock, c) }(index, isReadLock, c)
} }
quorumMet := false quorumLocked := false
wg.Add(1) wg.Add(1)
go func(isReadLock bool) { go func(isReadLock bool) {
@ -289,16 +289,16 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
done = true done = true
// Increment the number of grants received from the buffered channel. // Increment the number of grants received from the buffered channel.
i++ i++
releaseAll(ds, owner, locks, isReadLock, restClnts, lockNames...) releaseAll(ds, quorum, owner, locks, isReadLock, restClnts, lockNames...)
} }
} }
case <-timeout: case <-timeout:
done = true done = true
// timeout happened, maybe one of the nodes is slow, count // timeout happened, maybe one of the nodes is slow, count
// number of locks to check whether we have quorum or not // number of locks to check whether we have quorum or not
if !checkQuorumMet(locks, quorum) { if !checkQuorumLocked(locks, quorum) {
log("Quorum not met after timeout\n") log("Quorum not met after timeout\n")
releaseAll(ds, owner, locks, isReadLock, restClnts, lockNames...) releaseAll(ds, quorum, owner, locks, isReadLock, restClnts, lockNames...)
} else { } else {
log("Quorum met after timeout\n") log("Quorum met after timeout\n")
} }
@ -310,7 +310,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
} }
// Count locks in order to determine whether we have quorum or not // Count locks in order to determine whether we have quorum or not
quorumMet = checkQuorumMet(locks, quorum) quorumLocked = checkQuorumLocked(locks, quorum)
// Signal that we have the quorum // Signal that we have the quorum
wg.Done() wg.Done()
@ -331,11 +331,22 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
wg.Wait() wg.Wait()
return quorumMet return quorumLocked
} }
// checkQuorumMet determines whether we have acquired the required quorum of underlying locks or not // checkQuorumUnlocked determines whether we have unlocked the required quorum of underlying locks or not
func checkQuorumMet(locks *[]string, quorum int) bool { func checkQuorumUnlocked(locks *[]string, quorum int) bool {
count := 0
for _, uid := range *locks {
if uid == "" {
count++
}
}
return count >= quorum
}
// checkQuorumLocked determines whether we have locked the required quorum of underlying locks or not
func checkQuorumLocked(locks *[]string, quorum int) bool {
count := 0 count := 0
for _, uid := range *locks { for _, uid := range *locks {
if isLocked(uid) { if isLocked(uid) {
@ -347,13 +358,21 @@ func checkQuorumMet(locks *[]string, quorum int) bool {
} }
// releaseAll releases all locks that are marked as locked // releaseAll releases all locks that are marked as locked
func releaseAll(ds *Dsync, owner string, locks *[]string, isReadLock bool, restClnts []NetLocker, lockNames ...string) { func releaseAll(ds *Dsync, quorum int, owner string, locks *[]string, isReadLock bool, restClnts []NetLocker, lockNames ...string) bool {
for lock := range restClnts { var wg sync.WaitGroup
if isLocked((*locks)[lock]) { for lockID := range restClnts {
sendRelease(ds, restClnts[lock], owner, (*locks)[lock], isReadLock, lockNames...) wg.Add(1)
(*locks)[lock] = "" go func(lockID int) {
defer wg.Done()
if isLocked((*locks)[lockID]) {
if sendRelease(ds, restClnts[lockID], owner, (*locks)[lockID], isReadLock, lockNames...) {
(*locks)[lockID] = ""
}
} }
}(lockID)
} }
wg.Wait()
return checkQuorumUnlocked(locks, quorum)
} }
// Unlock unlocks the write lock. // Unlock unlocks the write lock.
@ -383,12 +402,27 @@ func (dm *DRWMutex) Unlock() {
// Copy write locks to stack array // Copy write locks to stack array
copy(locks, dm.writeLocks[:]) copy(locks, dm.writeLocks[:])
// Clear write locks array }
dm.writeLocks = make([]string, len(restClnts))
// Tolerance is not set, defaults to half of the locker clients.
tolerance := len(restClnts) / 2
// Quorum is effectively = total clients subtracted with tolerance limit
quorum := len(restClnts) - tolerance
// In situations for write locks, as a special case
// to avoid split brains we make sure to acquire
// quorum + 1 when tolerance is exactly half of the
// total locker clients.
if quorum == tolerance {
quorum++
} }
isReadLock := false isReadLock := false
unlock(dm.clnt, owner, locks, isReadLock, restClnts, dm.Names...) r := rand.New(rand.NewSource(time.Now().UnixNano()))
for !releaseAll(dm.clnt, quorum, owner, &locks, isReadLock, restClnts, dm.Names...) {
time.Sleep(time.Duration(r.Float64() * float64(lockRetryInterval)))
}
} }
// RUnlock releases a read lock held on dm. // RUnlock releases a read lock held on dm.
@ -412,29 +446,24 @@ func (dm *DRWMutex) RUnlock() {
dm.readersLocks = dm.readersLocks[1:] dm.readersLocks = dm.readersLocks[1:]
} }
isReadLock := true // Tolerance is not set, defaults to half of the locker clients.
unlock(dm.clnt, owner, locks, isReadLock, restClnts, dm.Names...) tolerance := len(restClnts) / 2
}
func unlock(ds *Dsync, owner string, locks []string, isReadLock bool, restClnts []NetLocker, names ...string) {
// We don't need to synchronously wait until we have released all the locks (or the quorum)
// (a subsequent lock will retry automatically in case it would fail to get quorum)
for index, c := range restClnts { // Quorum is effectively = total clients subtracted with tolerance limit
quorum := len(restClnts) - tolerance
if isLocked(locks[index]) { isReadLock := true
// broadcast lock release to all nodes that granted the lock r := rand.New(rand.NewSource(time.Now().UnixNano()))
sendRelease(ds, c, owner, locks[index], isReadLock, names...) for !releaseAll(dm.clnt, quorum, owner, &locks, isReadLock, restClnts, dm.Names...) {
} time.Sleep(time.Duration(r.Float64() * float64(lockRetryInterval)))
} }
} }
// sendRelease sends a release message to a node that previously granted a lock // sendRelease sends a release message to a node that previously granted a lock
func sendRelease(ds *Dsync, c NetLocker, owner string, uid string, isReadLock bool, names ...string) { func sendRelease(ds *Dsync, c NetLocker, owner string, uid string, isReadLock bool, names ...string) bool {
if c == nil { if c == nil {
log("Unable to call RUnlock failed with %s\n", errors.New("netLocker is offline")) log("Unable to call RUnlock failed with %s\n", errors.New("netLocker is offline"))
return return false
} }
args := LockArgs{ args := LockArgs{
@ -442,13 +471,18 @@ func sendRelease(ds *Dsync, c NetLocker, owner string, uid string, isReadLock bo
UID: uid, UID: uid,
Resources: names, Resources: names,
} }
if isReadLock { if isReadLock {
if _, err := c.RUnlock(args); err != nil { if _, err := c.RUnlock(args); err != nil {
log("dsync: Unable to call RUnlock failed with %s for %#v at %s\n", err, args, c) log("dsync: Unable to call RUnlock failed with %s for %#v at %s\n", err, args, c)
return false
} }
} else { } else {
if _, err := c.Unlock(args); err != nil { if _, err := c.Unlock(args); err != nil {
log("dsync: Unable to call Unlock failed with %s for %#v at %s\n", err, args, c) log("dsync: Unable to call Unlock failed with %s for %#v at %s\n", err, args, c)
return false
} }
} }
return true
} }

Loading…
Cancel
Save