|
|
|
@ -17,27 +17,36 @@ |
|
|
|
|
package dsync |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"log" |
|
|
|
|
"math" |
|
|
|
|
"math/rand" |
|
|
|
|
"net" |
|
|
|
|
"os" |
|
|
|
|
"sync" |
|
|
|
|
"time" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
const DRWMutexAcquireTimeout = 25 * time.Millisecond |
|
|
|
|
// Indicator if logging is enabled.
|
|
|
|
|
var dsyncLog bool |
|
|
|
|
|
|
|
|
|
func init() { |
|
|
|
|
// Check for DSYNC_LOG env variable, if set logging will be enabled for failed RPC operations.
|
|
|
|
|
dsyncLog = os.Getenv("DSYNC_LOG") == "1" |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// DRWMutexAcquireTimeout - tolerance limit to wait for lock acquisition before.
|
|
|
|
|
const DRWMutexAcquireTimeout = 25 * time.Millisecond // 25ms.
|
|
|
|
|
|
|
|
|
|
// A DRWMutex is a distributed mutual exclusion lock.
|
|
|
|
|
type DRWMutex struct { |
|
|
|
|
Name string |
|
|
|
|
locks []bool // Array of nodes that granted a lock
|
|
|
|
|
uids []string // Array of uids for verification of sending correct release messages
|
|
|
|
|
m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type Granted struct { |
|
|
|
|
index int |
|
|
|
|
locked bool |
|
|
|
|
uid string |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type LockArgs struct { |
|
|
|
@ -58,7 +67,6 @@ func NewDRWMutex(name string) *DRWMutex { |
|
|
|
|
return &DRWMutex{ |
|
|
|
|
Name: name, |
|
|
|
|
locks: make([]bool, dnodeCount), |
|
|
|
|
uids: make([]string, dnodeCount), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -78,15 +86,13 @@ func (dm *DRWMutex) RLock() { |
|
|
|
|
|
|
|
|
|
// create temp arrays on stack
|
|
|
|
|
locks := make([]bool, dnodeCount) |
|
|
|
|
ids := make([]string, dnodeCount) |
|
|
|
|
|
|
|
|
|
// try to acquire the lock
|
|
|
|
|
isReadLock := true |
|
|
|
|
success := lock(clnts, &locks, &ids, dm.Name, isReadLock) |
|
|
|
|
success := lock(clnts, &locks, dm.Name, isReadLock) |
|
|
|
|
if success { |
|
|
|
|
// if success, copy array to object
|
|
|
|
|
copy(dm.locks, locks[:]) |
|
|
|
|
copy(dm.uids, ids[:]) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -121,15 +127,13 @@ func (dm *DRWMutex) Lock() { |
|
|
|
|
for { |
|
|
|
|
// create temp arrays on stack
|
|
|
|
|
locks := make([]bool, dnodeCount) |
|
|
|
|
ids := make([]string, dnodeCount) |
|
|
|
|
|
|
|
|
|
// try to acquire the lock
|
|
|
|
|
isReadLock := false |
|
|
|
|
success := lock(clnts, &locks, &ids, dm.Name, isReadLock) |
|
|
|
|
success := lock(clnts, &locks, dm.Name, isReadLock) |
|
|
|
|
if success { |
|
|
|
|
// if success, copy array to object
|
|
|
|
|
copy(dm.locks, locks[:]) |
|
|
|
|
copy(dm.uids, ids[:]) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -150,10 +154,10 @@ func (dm *DRWMutex) Lock() { |
|
|
|
|
|
|
|
|
|
// lock tries to acquire the distributed lock, returning true or false
|
|
|
|
|
//
|
|
|
|
|
func lock(clnts []RPC, locks *[]bool, uids *[]string, lockName string, isReadLock bool) bool { |
|
|
|
|
func lock(clnts []RPC, locks *[]bool, lockName string, isReadLock bool) bool { |
|
|
|
|
|
|
|
|
|
// Create buffered channel of quorum size
|
|
|
|
|
ch := make(chan Granted, dquorum) |
|
|
|
|
ch := make(chan Granted, dnodeCount) |
|
|
|
|
|
|
|
|
|
for index, c := range clnts { |
|
|
|
|
|
|
|
|
@ -161,51 +165,52 @@ func lock(clnts []RPC, locks *[]bool, uids *[]string, lockName string, isReadLoc |
|
|
|
|
go func(index int, isReadLock bool, c RPC) { |
|
|
|
|
// All client methods issuing RPCs are thread-safe and goroutine-safe,
|
|
|
|
|
// i.e. it is safe to call them from multiple concurrently running go routines.
|
|
|
|
|
var status bool |
|
|
|
|
var err error |
|
|
|
|
var locked bool |
|
|
|
|
if isReadLock { |
|
|
|
|
err = c.Call("Dsync.RLock", &LockArgs{Name: lockName}, &status) |
|
|
|
|
if err := c.Call("Dsync.RLock", &LockArgs{Name: lockName}, &locked); err != nil { |
|
|
|
|
if dsyncLog { |
|
|
|
|
log.Println("Unable to call Dsync.RLock", err) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
err = c.Call("Dsync.Lock", &LockArgs{Name: lockName}, &status) |
|
|
|
|
if err := c.Call("Dsync.Lock", &LockArgs{Name: lockName}, &locked); err != nil { |
|
|
|
|
if dsyncLog { |
|
|
|
|
log.Println("Unable to call Dsync.Lock", err) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
locked, uid := false, "" |
|
|
|
|
if err == nil { |
|
|
|
|
locked = status |
|
|
|
|
// TODO: Get UIOD again
|
|
|
|
|
uid = "" |
|
|
|
|
} |
|
|
|
|
// silently ignore error, retry later
|
|
|
|
|
|
|
|
|
|
ch <- Granted{index: index, locked: locked, uid: uid} |
|
|
|
|
ch <- Granted{index: index, locked: locked} |
|
|
|
|
|
|
|
|
|
}(index, isReadLock, c) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var wg sync.WaitGroup |
|
|
|
|
wg.Add(1) |
|
|
|
|
|
|
|
|
|
quorum := false |
|
|
|
|
|
|
|
|
|
var wg sync.WaitGroup |
|
|
|
|
wg.Add(1) |
|
|
|
|
go func(isReadLock bool) { |
|
|
|
|
|
|
|
|
|
// Wait until we have received (minimally) quorum number of responses or timeout
|
|
|
|
|
i := 0 |
|
|
|
|
// Wait until we have either a) received all lock responses, b) received too many 'non-'locks for quorum to be or c) time out
|
|
|
|
|
i, locksFailed := 0, 0 |
|
|
|
|
done := false |
|
|
|
|
timeout := time.After(DRWMutexAcquireTimeout) |
|
|
|
|
|
|
|
|
|
for ; i < dnodeCount; i++ { |
|
|
|
|
for ; i < dnodeCount; i++ { // Loop until we acquired all locks
|
|
|
|
|
|
|
|
|
|
select { |
|
|
|
|
case grant := <-ch: |
|
|
|
|
if grant.locked { |
|
|
|
|
// Mark that this node has acquired the lock
|
|
|
|
|
(*locks)[grant.index] = true |
|
|
|
|
(*uids)[grant.index] = grant.uid |
|
|
|
|
} else { |
|
|
|
|
locksFailed++ |
|
|
|
|
if locksFailed > dnodeCount - dquorum { |
|
|
|
|
// We know that we are not going to get the lock anymore, so exit out
|
|
|
|
|
// and release any locks that did get acquired
|
|
|
|
|
done = true |
|
|
|
|
//fmt.Println("one lock failed before quorum -- release locks acquired")
|
|
|
|
|
releaseAll(clnts, locks, uids, lockName, isReadLock) |
|
|
|
|
releaseAll(clnts, locks, lockName, isReadLock) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
case <-timeout: |
|
|
|
@ -213,8 +218,7 @@ func lock(clnts []RPC, locks *[]bool, uids *[]string, lockName string, isReadLoc |
|
|
|
|
// timeout happened, maybe one of the nodes is slow, count
|
|
|
|
|
// number of locks to check whether we have quorum or not
|
|
|
|
|
if !quorumMet(locks) { |
|
|
|
|
//fmt.Println("timed out -- release locks acquired")
|
|
|
|
|
releaseAll(clnts, locks, uids, lockName, isReadLock) |
|
|
|
|
releaseAll(clnts, locks, lockName, isReadLock) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -236,7 +240,7 @@ func lock(clnts []RPC, locks *[]bool, uids *[]string, lockName string, isReadLoc |
|
|
|
|
grantToBeReleased := <-ch |
|
|
|
|
if grantToBeReleased.locked { |
|
|
|
|
// release lock
|
|
|
|
|
sendRelease(clnts[grantToBeReleased.index], lockName, grantToBeReleased.uid, isReadLock) |
|
|
|
|
sendRelease(clnts[grantToBeReleased.index], lockName, isReadLock) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}(isReadLock) |
|
|
|
@ -260,13 +264,11 @@ func quorumMet(locks *[]bool) bool { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// releaseAll releases all locks that are marked as locked
|
|
|
|
|
func releaseAll(clnts []RPC, locks *[]bool, ids *[]string, lockName string, isReadLock bool) { |
|
|
|
|
|
|
|
|
|
func releaseAll(clnts []RPC, locks *[]bool, lockName string, isReadLock bool) { |
|
|
|
|
for lock := 0; lock < dnodeCount; lock++ { |
|
|
|
|
if (*locks)[lock] { |
|
|
|
|
sendRelease(clnts[lock], lockName, (*ids)[lock], isReadLock) |
|
|
|
|
sendRelease(clnts[lock], lockName, isReadLock) |
|
|
|
|
(*locks)[lock] = false |
|
|
|
|
(*ids)[lock] = "" |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -288,7 +290,7 @@ func (dm *DRWMutex) RUnlock() { |
|
|
|
|
if dm.locks[index] { |
|
|
|
|
// broadcast lock release to all nodes the granted the lock
|
|
|
|
|
isReadLock := true |
|
|
|
|
sendRelease(c, dm.Name, dm.uids[index], isReadLock) |
|
|
|
|
sendRelease(c, dm.Name, isReadLock) |
|
|
|
|
|
|
|
|
|
dm.locks[index] = false |
|
|
|
|
} |
|
|
|
@ -312,7 +314,7 @@ func (dm *DRWMutex) Unlock() { |
|
|
|
|
if dm.locks[index] { |
|
|
|
|
// broadcast lock release to all nodes the granted the lock
|
|
|
|
|
isReadLock := false |
|
|
|
|
sendRelease(c, dm.Name, dm.uids[index], isReadLock) |
|
|
|
|
sendRelease(c, dm.Name, isReadLock) |
|
|
|
|
|
|
|
|
|
dm.locks[index] = false |
|
|
|
|
} |
|
|
|
@ -320,34 +322,46 @@ func (dm *DRWMutex) Unlock() { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// sendRelease sends a release message to a node that previously granted a lock
|
|
|
|
|
func sendRelease(c RPC, name, uid string, isReadLock bool) { |
|
|
|
|
func sendRelease(c RPC, name string, isReadLock bool) { |
|
|
|
|
|
|
|
|
|
backOffArray := []time.Duration{30 * time.Second, 1 * time.Minute, 3 * time.Minute, 10 * time.Minute, 30 * time.Minute, 1 * time.Hour} |
|
|
|
|
backOffArray := []time.Duration{ |
|
|
|
|
30 * time.Second, // 30secs.
|
|
|
|
|
1 * time.Minute, // 1min.
|
|
|
|
|
3 * time.Minute, // 3min.
|
|
|
|
|
10 * time.Minute, // 10min.
|
|
|
|
|
30 * time.Minute, // 30min.
|
|
|
|
|
1 * time.Hour, // 1hr.
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
go func(c RPC, name, uid string) { |
|
|
|
|
go func(c RPC, name string) { |
|
|
|
|
|
|
|
|
|
for _, backOff := range backOffArray { |
|
|
|
|
|
|
|
|
|
// All client methods issuing RPCs are thread-safe and goroutine-safe,
|
|
|
|
|
// i.e. it is safe to call them from multiple concurrently running goroutines.
|
|
|
|
|
var status bool |
|
|
|
|
var err error |
|
|
|
|
// TODO: Send UID to server
|
|
|
|
|
var unlocked bool |
|
|
|
|
|
|
|
|
|
if isReadLock { |
|
|
|
|
if err = c.Call("Dsync.RUnlock", &LockArgs{Name: name}, &status); err == nil { |
|
|
|
|
if err := c.Call("Dsync.RUnlock", &LockArgs{Name: name}, &unlocked); err == nil { |
|
|
|
|
// RUnlock delivered, exit out
|
|
|
|
|
return |
|
|
|
|
} else if err != nil { |
|
|
|
|
if dsyncLog { |
|
|
|
|
log.Println("Unable to call Dsync.RUnlock", err) |
|
|
|
|
} |
|
|
|
|
if nErr, ok := err.(net.Error); ok && nErr.Timeout() { |
|
|
|
|
// RUnlock possibly failed with server timestamp mismatch, server may have restarted.
|
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
if err = c.Call("Dsync.Unlock", &LockArgs{Name: name}, &status); err == nil { |
|
|
|
|
if err := c.Call("Dsync.Unlock", &LockArgs{Name: name}, &unlocked); err == nil { |
|
|
|
|
// Unlock delivered, exit out
|
|
|
|
|
return |
|
|
|
|
} else if err != nil { |
|
|
|
|
if dsyncLog { |
|
|
|
|
log.Println("Unable to call Dsync.Unlock", err) |
|
|
|
|
} |
|
|
|
|
if nErr, ok := err.(net.Error); ok && nErr.Timeout() { |
|
|
|
|
// Unlock possibly failed with server timestamp mismatch, server may have restarted.
|
|
|
|
|
return |
|
|
|
@ -355,8 +369,8 @@ func sendRelease(c RPC, name, uid string, isReadLock bool) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// wait
|
|
|
|
|
// Wait..
|
|
|
|
|
time.Sleep(backOff) |
|
|
|
|
} |
|
|
|
|
}(c, name, uid) |
|
|
|
|
}(c, name) |
|
|
|
|
} |
|
|
|
|