From f655592ff5b26ccb60cdb9d1dfa299c37bd85d57 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sun, 28 Aug 2016 20:15:45 -0700 Subject: [PATCH] vendorize new dsync with new changes of logging. --- vendor/github.com/minio/dsync/drwmutex.go | 124 ++++++++++++---------- vendor/vendor.json | 6 +- 2 files changed, 72 insertions(+), 58 deletions(-) diff --git a/vendor/github.com/minio/dsync/drwmutex.go b/vendor/github.com/minio/dsync/drwmutex.go index acf6af226..4fee737bc 100644 --- a/vendor/github.com/minio/dsync/drwmutex.go +++ b/vendor/github.com/minio/dsync/drwmutex.go @@ -17,27 +17,36 @@ package dsync import ( + "log" "math" "math/rand" "net" + "os" "sync" "time" ) -const DRWMutexAcquireTimeout = 25 * time.Millisecond +// Indicator if logging is enabled. +var dsyncLog bool + +func init() { + // Check for DSYNC_LOG env variable, if set logging will be enabled for failed RPC operations. + dsyncLog = os.Getenv("DSYNC_LOG") == "1" +} + +// DRWMutexAcquireTimeout - tolerance limit to wait for lock acquisition before. +const DRWMutexAcquireTimeout = 25 * time.Millisecond // 25ms. // A DRWMutex is a distributed mutual exclusion lock. type DRWMutex struct { Name string locks []bool // Array of nodes that granted a lock - uids []string // Array of uids for verification of sending correct release messages m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node } type Granted struct { index int locked bool - uid string } type LockArgs struct { @@ -58,7 +67,6 @@ func NewDRWMutex(name string) *DRWMutex { return &DRWMutex{ Name: name, locks: make([]bool, dnodeCount), - uids: make([]string, dnodeCount), } } @@ -78,15 +86,13 @@ func (dm *DRWMutex) RLock() { // create temp arrays on stack locks := make([]bool, dnodeCount) - ids := make([]string, dnodeCount) // try to acquire the lock isReadLock := true - success := lock(clnts, &locks, &ids, dm.Name, isReadLock) + success := lock(clnts, &locks, dm.Name, isReadLock) if success { // if success, copy array to object copy(dm.locks, locks[:]) - copy(dm.uids, ids[:]) return } @@ -121,15 +127,13 @@ func (dm *DRWMutex) Lock() { for { // create temp arrays on stack locks := make([]bool, dnodeCount) - ids := make([]string, dnodeCount) // try to acquire the lock isReadLock := false - success := lock(clnts, &locks, &ids, dm.Name, isReadLock) + success := lock(clnts, &locks, dm.Name, isReadLock) if success { // if success, copy array to object copy(dm.locks, locks[:]) - copy(dm.uids, ids[:]) return } @@ -150,10 +154,10 @@ func (dm *DRWMutex) Lock() { // lock tries to acquire the distributed lock, returning true or false // -func lock(clnts []RPC, locks *[]bool, uids *[]string, lockName string, isReadLock bool) bool { +func lock(clnts []RPC, locks *[]bool, lockName string, isReadLock bool) bool { // Create buffered channel of quorum size - ch := make(chan Granted, dquorum) + ch := make(chan Granted, dnodeCount) for index, c := range clnts { @@ -161,51 +165,52 @@ func lock(clnts []RPC, locks *[]bool, uids *[]string, lockName string, isReadLoc go func(index int, isReadLock bool, c RPC) { // All client methods issuing RPCs are thread-safe and goroutine-safe, // i.e. it is safe to call them from multiple concurrently running go routines. - var status bool - var err error + var locked bool if isReadLock { - err = c.Call("Dsync.RLock", &LockArgs{Name: lockName}, &status) + if err := c.Call("Dsync.RLock", &LockArgs{Name: lockName}, &locked); err != nil { + if dsyncLog { + log.Println("Unable to call Dsync.RLock", err) + } + } } else { - err = c.Call("Dsync.Lock", &LockArgs{Name: lockName}, &status) - } - - locked, uid := false, "" - if err == nil { - locked = status - // TODO: Get UIOD again - uid = "" + if err := c.Call("Dsync.Lock", &LockArgs{Name: lockName}, &locked); err != nil { + if dsyncLog { + log.Println("Unable to call Dsync.Lock", err) + } + } } - // silently ignore error, retry later - ch <- Granted{index: index, locked: locked, uid: uid} + ch <- Granted{index: index, locked: locked} }(index, isReadLock, c) } - var wg sync.WaitGroup - wg.Add(1) - quorum := false + var wg sync.WaitGroup + wg.Add(1) go func(isReadLock bool) { - // Wait until we have received (minimally) quorum number of responses or timeout - i := 0 + // Wait until we have either a) received all lock responses, b) received too many 'non-'locks for quorum to be or c) time out + i, locksFailed := 0, 0 done := false timeout := time.After(DRWMutexAcquireTimeout) - for ; i < dnodeCount; i++ { + for ; i < dnodeCount; i++ { // Loop until we acquired all locks select { case grant := <-ch: if grant.locked { // Mark that this node has acquired the lock (*locks)[grant.index] = true - (*uids)[grant.index] = grant.uid } else { - done = true - //fmt.Println("one lock failed before quorum -- release locks acquired") - releaseAll(clnts, locks, uids, lockName, isReadLock) + locksFailed++ + if locksFailed > dnodeCount - dquorum { + // We know that we are not going to get the lock anymore, so exit out + // and release any locks that did get acquired + done = true + releaseAll(clnts, locks, lockName, isReadLock) + } } case <-timeout: @@ -213,8 +218,7 @@ func lock(clnts []RPC, locks *[]bool, uids *[]string, lockName string, isReadLoc // timeout happened, maybe one of the nodes is slow, count // number of locks to check whether we have quorum or not if !quorumMet(locks) { - //fmt.Println("timed out -- release locks acquired") - releaseAll(clnts, locks, uids, lockName, isReadLock) + releaseAll(clnts, locks, lockName, isReadLock) } } @@ -236,7 +240,7 @@ func lock(clnts []RPC, locks *[]bool, uids *[]string, lockName string, isReadLoc grantToBeReleased := <-ch if grantToBeReleased.locked { // release lock - sendRelease(clnts[grantToBeReleased.index], lockName, grantToBeReleased.uid, isReadLock) + sendRelease(clnts[grantToBeReleased.index], lockName, isReadLock) } } }(isReadLock) @@ -260,13 +264,11 @@ func quorumMet(locks *[]bool) bool { } // releaseAll releases all locks that are marked as locked -func releaseAll(clnts []RPC, locks *[]bool, ids *[]string, lockName string, isReadLock bool) { - +func releaseAll(clnts []RPC, locks *[]bool, lockName string, isReadLock bool) { for lock := 0; lock < dnodeCount; lock++ { if (*locks)[lock] { - sendRelease(clnts[lock], lockName, (*ids)[lock], isReadLock) + sendRelease(clnts[lock], lockName, isReadLock) (*locks)[lock] = false - (*ids)[lock] = "" } } @@ -288,7 +290,7 @@ func (dm *DRWMutex) RUnlock() { if dm.locks[index] { // broadcast lock release to all nodes the granted the lock isReadLock := true - sendRelease(c, dm.Name, dm.uids[index], isReadLock) + sendRelease(c, dm.Name, isReadLock) dm.locks[index] = false } @@ -312,7 +314,7 @@ func (dm *DRWMutex) Unlock() { if dm.locks[index] { // broadcast lock release to all nodes the granted the lock isReadLock := false - sendRelease(c, dm.Name, dm.uids[index], isReadLock) + sendRelease(c, dm.Name, isReadLock) dm.locks[index] = false } @@ -320,34 +322,46 @@ func (dm *DRWMutex) Unlock() { } // sendRelease sends a release message to a node that previously granted a lock -func sendRelease(c RPC, name, uid string, isReadLock bool) { - - backOffArray := []time.Duration{30 * time.Second, 1 * time.Minute, 3 * time.Minute, 10 * time.Minute, 30 * time.Minute, 1 * time.Hour} +func sendRelease(c RPC, name string, isReadLock bool) { + + backOffArray := []time.Duration{ + 30 * time.Second, // 30secs. + 1 * time.Minute, // 1min. + 3 * time.Minute, // 3min. + 10 * time.Minute, // 10min. + 30 * time.Minute, // 30min. + 1 * time.Hour, // 1hr. + } - go func(c RPC, name, uid string) { + go func(c RPC, name string) { for _, backOff := range backOffArray { // All client methods issuing RPCs are thread-safe and goroutine-safe, // i.e. it is safe to call them from multiple concurrently running goroutines. - var status bool - var err error - // TODO: Send UID to server + var unlocked bool + if isReadLock { - if err = c.Call("Dsync.RUnlock", &LockArgs{Name: name}, &status); err == nil { + if err := c.Call("Dsync.RUnlock", &LockArgs{Name: name}, &unlocked); err == nil { // RUnlock delivered, exit out return } else if err != nil { + if dsyncLog { + log.Println("Unable to call Dsync.RUnlock", err) + } if nErr, ok := err.(net.Error); ok && nErr.Timeout() { // RUnlock possibly failed with server timestamp mismatch, server may have restarted. return } } } else { - if err = c.Call("Dsync.Unlock", &LockArgs{Name: name}, &status); err == nil { + if err := c.Call("Dsync.Unlock", &LockArgs{Name: name}, &unlocked); err == nil { // Unlock delivered, exit out return } else if err != nil { + if dsyncLog { + log.Println("Unable to call Dsync.Unlock", err) + } if nErr, ok := err.(net.Error); ok && nErr.Timeout() { // Unlock possibly failed with server timestamp mismatch, server may have restarted. return @@ -355,8 +369,8 @@ func sendRelease(c RPC, name, uid string, isReadLock bool) { } } - // wait + // Wait.. time.Sleep(backOff) } - }(c, name, uid) + }(c, name) } diff --git a/vendor/vendor.json b/vendor/vendor.json index 46e73752f..1ad37a23e 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -98,10 +98,10 @@ "revisionTime": "2015-11-18T20:00:48-08:00" }, { - "checksumSHA1": "OOADbvXPHaDRzp8WEvNw6esmfu0=", + "checksumSHA1": "0raNaLP/AhxXhEeB5CdSnbED3O4=", "path": "github.com/minio/dsync", - "revision": "a2d8949cd8284e6cfa5b2ff9b617e4edb87f513f", - "revisionTime": "2016-08-24T09:12:34Z" + "revision": "1f615ccd013d35489becfe710e0ba7dce98b59e5", + "revisionTime": "2016-08-29T17:06:27Z" }, { "path": "github.com/minio/go-homedir",