fix: release locks if the client timedout (#11030)

situations where client indeed timedout there was
a potential to falsely think that lock is still
active.
master
Harshavardhana 4 years ago committed by GitHub
parent a896125490
commit ee2a436a5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      pkg/dsync/drwmutex.go

@ -215,7 +215,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
ch := make(chan Granted, len(restClnts)) ch := make(chan Granted, len(restClnts))
var wg sync.WaitGroup var wg sync.WaitGroup
// Combined timout for the lock attempt. // Combined timeout for the lock attempt.
ctx, cancel := context.WithTimeout(ctx, DRWMutexAcquireTimeout) ctx, cancel := context.WithTimeout(ctx, DRWMutexAcquireTimeout)
defer cancel() defer cancel()
for index, c := range restClnts { for index, c := range restClnts {
@ -282,8 +282,13 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
} }
} }
case <-ctx.Done(): case <-ctx.Done():
done = true // Capture timedout locks as failed or took too long
log("Timeout\n") locksFailed++
if locksFailed > tolerance {
// We know that we are not going to get the lock anymore,
// so exit out and release any locks that did get acquired
done = true
}
} }
if done { if done {
@ -291,10 +296,9 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
} }
} }
// Count locks in order to determine whether we have quorum or not
quorumLocked := checkQuorumLocked(locks, quorum) && locksFailed <= tolerance quorumLocked := checkQuorumLocked(locks, quorum) && locksFailed <= tolerance
if !quorumLocked { if !quorumLocked {
log("Quorum not met\n") log("Releasing all acquired locks now abandoned after quorum was not met\n")
releaseAll(ds, tolerance, owner, locks, isReadLock, restClnts, lockNames...) releaseAll(ds, tolerance, owner, locks, isReadLock, restClnts, lockNames...)
} }

Loading…
Cancel
Save