From ea406754a6056b5e2260e1c0f14ce8a6d48e53e2 Mon Sep 17 00:00:00 2001 From: Frank Date: Mon, 17 Oct 2016 10:53:29 +0200 Subject: [PATCH] New dsync and added ForceUnlock to lock rpc server (#2956) * Update dsync and added ForceUnlock function * Added test cases for ForceUnlock --- cmd/lock-rpc-server.go | 17 ++++++ cmd/lock-rpc-server_test.go | 68 +++++++++++++++++++++++ vendor/github.com/minio/dsync/drwmutex.go | 54 ++++++++++++++---- vendor/vendor.json | 6 +- 4 files changed, 131 insertions(+), 14 deletions(-) diff --git a/cmd/lock-rpc-server.go b/cmd/lock-rpc-server.go index 2c28c05cb..fbe49c710 100644 --- a/cmd/lock-rpc-server.go +++ b/cmd/lock-rpc-server.go @@ -252,6 +252,23 @@ func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error { return nil } +// ForceUnlock - rpc handler for force unlock operation. +func (l *lockServer) ForceUnlock(args *LockArgs, reply *bool) error { + l.mutex.Lock() + defer l.mutex.Unlock() + if err := l.validateLockArgs(args); err != nil { + return err + } + if len(args.UID) != 0 { + return fmt.Errorf("ForceUnlock called with non-empty UID: %s", args.UID) + } + if _, ok := l.lockMap[args.Name]; ok { // Only clear lock when set + delete(l.lockMap, args.Name) // Remove the lock (irrespective of write or read lock) + } + *reply = true + return nil +} + // Expired - rpc handler for expired lock status. func (l *lockServer) Expired(args *LockArgs, reply *bool) error { l.mutex.Lock() diff --git a/cmd/lock-rpc-server_test.go b/cmd/lock-rpc-server_test.go index 11b925045..12814dd78 100644 --- a/cmd/lock-rpc-server_test.go +++ b/cmd/lock-rpc-server_test.go @@ -326,6 +326,74 @@ func TestLockRpcServerRUnlock(t *testing.T) { } } +// Test ForceUnlock functionality +func TestLockRpcServerForceUnlock(t *testing.T) { + + timestamp := time.Now().UTC() + testPath, locker, token := createLockTestServer(t) + defer removeAll(testPath) + + laForce := LockArgs{ + Name: "name", + Token: token, + Timestamp: timestamp, + Node: "node", + RPCPath: "rpc-path", + UID: "1234-5678", + } + + // First test that UID should be empty + var result bool + err := locker.ForceUnlock(&laForce, &result) + if err == nil { + t.Errorf("Expected error, got %#v", nil) + } + + // Then test force unlock of a lock that does not exist (not returning an error) + laForce.UID = "" + err = locker.ForceUnlock(&laForce, &result) + if err != nil { + t.Errorf("Expected no error, got %#v", err) + } + + la := LockArgs{ + Name: "name", + Token: token, + Timestamp: timestamp, + Node: "node", + RPCPath: "rpc-path", + UID: "0123-4567", + } + + // Create lock ... (so that we can force unlock) + err = locker.Lock(&la, &result) + if err != nil { + t.Errorf("Expected %#v, got %#v", nil, err) + } else if !result { + t.Errorf("Expected %#v, got %#v", true, result) + } + + // Forcefully unlock the lock (not returning an error) + err = locker.ForceUnlock(&laForce, &result) + if err != nil { + t.Errorf("Expected no error, got %#v", err) + } + + // Try to get lock again (should be granted) + err = locker.Lock(&la, &result) + if err != nil { + t.Errorf("Expected %#v, got %#v", nil, err) + } else if !result { + t.Errorf("Expected %#v, got %#v", true, result) + } + + // Finally forcefully unlock the lock once again + err = locker.ForceUnlock(&laForce, &result) + if err != nil { + t.Errorf("Expected no error, got %#v", err) + } +} + // Test Expired functionality func TestLockRpcServerExpired(t *testing.T) { timestamp := time.Now().UTC() diff --git a/vendor/github.com/minio/dsync/drwmutex.go b/vendor/github.com/minio/dsync/drwmutex.go index 3fcac6546..fb75b845a 100644 --- a/vendor/github.com/minio/dsync/drwmutex.go +++ b/vendor/github.com/minio/dsync/drwmutex.go @@ -124,7 +124,7 @@ func (dm *DRWMutex) lockBlocking(isReadLock bool) { // if success, copy array to object if isReadLock { - // append new array of bools at the end + // append new array of strings at the end dm.readersLocks = append(dm.readersLocks, make([]string, dnodeCount)) // and copy stack array into last spot copy(dm.readersLocks[len(dm.readersLocks)-1], locks[:]) @@ -315,12 +315,14 @@ func (dm *DRWMutex) Unlock() { panic("Trying to Unlock() while no Lock() is active") } - // Copy writelocks to stack array + // Copy write locks to stack array copy(locks, dm.writeLocks[:]) + // Clear write locks array + dm.writeLocks = make([]string, dnodeCount) } isReadLock := false - unlock(&locks, dm.Name, isReadLock) + unlock(locks, dm.Name, isReadLock) } // RUnlock releases a read lock held on dm. @@ -344,25 +346,42 @@ func (dm *DRWMutex) RUnlock() { } isReadLock := true - unlock(&locks, dm.Name, isReadLock) + unlock(locks, dm.Name, isReadLock) } -func unlock(locks *[]string, name string, isReadLock bool) { +func unlock(locks []string, name string, isReadLock bool) { // We don't need to synchronously wait until we have released all the locks (or the quorum) // (a subsequent lock will retry automatically in case it would fail to get quorum) for index, c := range clnts { - if isLocked((*locks)[index]) { - // broadcast lock release to all nodes the granted the lock - sendRelease(c, name, (*locks)[index], isReadLock) - - (*locks)[index] = "" + if isLocked(locks[index]) { + // broadcast lock release to all nodes that granted the lock + sendRelease(c, name, locks[index], isReadLock) } } } +// ForceUnlock will forcefully clear a write or read lock. +func (dm *DRWMutex) ForceUnlock() { + + { + dm.m.Lock() + defer dm.m.Unlock() + + // Clear write locks array + dm.writeLocks = make([]string, dnodeCount) + // Clear read locks array + dm.readersLocks = nil + } + + for _, c := range clnts { + // broadcast lock release to all nodes that granted the lock + sendRelease(c, dm.Name, "", false) + } +} + // sendRelease sends a release message to a node that previously granted a lock func sendRelease(c RPC, name, uid string, isReadLock bool) { @@ -383,7 +402,20 @@ func sendRelease(c RPC, name, uid string, isReadLock bool) { // i.e. it is safe to call them from multiple concurrently running goroutines. var unlocked bool args := LockArgs{Name: name, UID: uid} // Just send name & uid (and leave out node and rpcPath; unimportant for unlocks) - if isReadLock { + if len(uid) == 0 { + if err := c.Call("Dsync.ForceUnlock", &args, &unlocked); err == nil { + // ForceUnlock delivered, exit out + return + } else if err != nil { + if dsyncLog { + log.Println("Unable to call Dsync.ForceUnlock", err) + } + if nErr, ok := err.(net.Error); ok && nErr.Timeout() { + // ForceUnlock possibly failed with server timestamp mismatch, server may have restarted. + return + } + } + } else if isReadLock { if err := c.Call("Dsync.RUnlock", &args, &unlocked); err == nil { // RUnlock delivered, exit out return diff --git a/vendor/vendor.json b/vendor/vendor.json index 1505467e4..f325b800c 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -111,10 +111,10 @@ "revisionTime": "2015-11-18T20:00:48-08:00" }, { - "checksumSHA1": "5DKBTLXf5GY5RkVC9DGP2XFMabc=", + "checksumSHA1": "UWpLeW+oLfe/MiphMckp1HqKrW0=", "path": "github.com/minio/dsync", - "revision": "66c2a42bf14fcaad0322e9f06d9ab85dd6dba9b7", - "revisionTime": "2016-10-07T07:30:36Z" + "revision": "fcea3bf5533c1b8a5af3cb377d30363782d2532d", + "revisionTime": "2016-10-15T15:40:54Z" }, { "path": "github.com/minio/go-homedir",