From 113b93346b8157a886fec99d7917cc2195ceafc2 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 19 Sep 2016 13:14:55 -0700 Subject: [PATCH] lock: Make some cleanup and moving the code around. (#2718) This patch just avoids lot of ifs and inverts some logic. --- cmd/benchmark-utils_test.go | 2 +- cmd/control-lock-main.go | 2 +- cmd/erasure-readfile_test.go | 2 +- cmd/fs-v1-multipart-common_test.go | 4 +- cmd/leak-detect_test.go | 4 +- cmd/lock-rpc-server-common.go | 85 ++++++++ cmd/lock-rpc-server-common_test.go | 84 ++++++++ cmd/lock-rpc-server.go | 306 +++++++++++++---------------- cmd/namespace-lock.go | 48 ++--- cmd/server-mux_test.go | 4 +- cmd/signature-v4-parser_test.go | 2 +- 11 files changed, 337 insertions(+), 206 deletions(-) create mode 100644 cmd/lock-rpc-server-common.go create mode 100644 cmd/lock-rpc-server-common_test.go diff --git a/cmd/benchmark-utils_test.go b/cmd/benchmark-utils_test.go index 5f8e5711d..ab4cf48fd 100644 --- a/cmd/benchmark-utils_test.go +++ b/cmd/benchmark-utils_test.go @@ -235,7 +235,7 @@ func runGetObjectBenchmark(b *testing.B, obj ObjectLayer, objSize int) { func getRandomByte() []byte { const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" // seeding the random number generator. - rand.Seed(time.Now().UnixNano()) + rand.Seed(time.Now().UTC().UnixNano()) var b byte // pick a character randomly. b = letterBytes[rand.Intn(len(letterBytes))] diff --git a/cmd/control-lock-main.go b/cmd/control-lock-main.go index 7843fa696..508b5a15c 100644 --- a/cmd/control-lock-main.go +++ b/cmd/control-lock-main.go @@ -83,7 +83,7 @@ func generateSystemLockResponse() (SystemLockState, error) { opsState.LockOrigin = nsMutex.debugLockMap[param].lockInfo[opsID].lockOrigin opsState.LockType = nsMutex.debugLockMap[param].lockInfo[opsID].lockType opsState.Status = nsMutex.debugLockMap[param].lockInfo[opsID].status - opsState.StatusSince = time.Now().Sub(nsMutex.debugLockMap[param].lockInfo[opsID].since).String() + opsState.StatusSince = time.Now().UTC().Sub(nsMutex.debugLockMap[param].lockInfo[opsID].since).String() volLockInfo.LockDetailsOnObject = append(volLockInfo.LockDetailsOnObject, opsState) } diff --git a/cmd/erasure-readfile_test.go b/cmd/erasure-readfile_test.go index 05a7c2296..82f3ce776 100644 --- a/cmd/erasure-readfile_test.go +++ b/cmd/erasure-readfile_test.go @@ -435,7 +435,7 @@ func TestErasureReadFileRandomOffsetLength(t *testing.T) { } // To generate random offset/length. - r := rand.New(rand.NewSource(time.Now().UnixNano())) + r := rand.New(rand.NewSource(time.Now().UTC().UnixNano())) // create pool buffer which will be used by erasureReadFile for // reading from disks and erasure decoding. diff --git a/cmd/fs-v1-multipart-common_test.go b/cmd/fs-v1-multipart-common_test.go index 9050832ca..c18990bc8 100644 --- a/cmd/fs-v1-multipart-common_test.go +++ b/cmd/fs-v1-multipart-common_test.go @@ -118,7 +118,7 @@ func TestFSWriteUploadJSON(t *testing.T) { obj.MakeBucket(bucketName) uploadID, err := obj.NewMultipartUpload(bucketName, objectName, nil) - if err := fs.writeUploadJSON(bucketName, objectName, uploadID, time.Now()); err != nil { + if err := fs.writeUploadJSON(bucketName, objectName, uploadID, time.Now().UTC()); err != nil { t.Fatal("Unexpected err: ", err) } @@ -127,7 +127,7 @@ func TestFSWriteUploadJSON(t *testing.T) { for i := 1; i <= 3; i++ { naughty := newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil) fs.storage = naughty - if err := fs.writeUploadJSON(bucketName, objectName, uploadID, time.Now()); errorCause(err) != errFaultyDisk { + if err := fs.writeUploadJSON(bucketName, objectName, uploadID, time.Now().UTC()); errorCause(err) != errFaultyDisk { t.Fatal("Unexpected err: ", err) } } diff --git a/cmd/leak-detect_test.go b/cmd/leak-detect_test.go index ff34528f7..faf32e823 100644 --- a/cmd/leak-detect_test.go +++ b/cmd/leak-detect_test.go @@ -57,7 +57,7 @@ func (initialSnapShot LeakDetect) DetectLeak(t TestErrHandler) { } // Loop, waiting for goroutines to shut down. // Wait up to 5 seconds, but finish as quickly as possible. - deadline := time.Now().Add(leakDetectDeadline * time.Second) + deadline := time.Now().UTC().Add(leakDetectDeadline * time.Second) for { // get sack snapshot of relevant go routines. leaked := initialSnapShot.CompareCurrentSnapshot() @@ -66,7 +66,7 @@ func (initialSnapShot LeakDetect) DetectLeak(t TestErrHandler) { return } // wait a test again will deadline. - if time.Now().Before(deadline) { + if time.Now().UTC().Before(deadline) { time.Sleep(leakDetectPauseTimeMs * time.Millisecond) continue } diff --git a/cmd/lock-rpc-server-common.go b/cmd/lock-rpc-server-common.go new file mode 100644 index 000000000..007888cf7 --- /dev/null +++ b/cmd/lock-rpc-server-common.go @@ -0,0 +1,85 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "errors" + "time" +) + +// Similar to removeEntry but only removes an entry only if the lock entry exists in map. +func (l *lockServer) removeEntryIfExists(nlrip nameLockRequesterInfoPair) { + // Check if entry is still in map (could have been removed altogether by 'concurrent' (R)Unlock of last entry) + if lri, ok := l.lockMap[nlrip.name]; ok { + if !l.removeEntry(nlrip.name, nlrip.lri.uid, &lri) { + // Remove failed, in case it is a: + if nlrip.lri.writer { + // Writer: this should never happen as the whole (mapped) entry should have been deleted + errorIf(errors.New(""), "Lock maintenance failed to remove entry for write lock (should never happen)", nlrip.name, nlrip.lri.uid, lri) + } // Reader: this can happen if multiple read locks were active and + // the one we are looking for has been released concurrently (so it is fine). + } // Removal went okay, all is fine. + } +} + +// removeEntry either, based on the uid of the lock message, removes a single entry from the +// lockRequesterInfo array or the whole array from the map (in case of a write lock or last read lock) +func (l *lockServer) removeEntry(name, uid string, lri *[]lockRequesterInfo) bool { + // Find correct entry to remove based on uid. + for index, entry := range *lri { + if entry.uid == uid { + if len(*lri) == 1 { + // Remove the (last) lock. + delete(l.lockMap, name) + } else { + // Remove the appropriate read lock. + *lri = append((*lri)[:index], (*lri)[index+1:]...) + l.lockMap[name] = *lri + } + return true + } + } + // None found return false, perhaps entry removed in previous run. + return false +} + +// Validate lock args. +func (l *lockServer) validateLockArgs(args *LockArgs) error { + if !l.timestamp.Equal(args.Timestamp) { + return errInvalidTimestamp + } + if !isRPCTokenValid(args.Token) { + return errInvalidToken + } + return nil +} + +// getLongLivedLocks returns locks that are older than a certain time and +// have not been 'checked' for validity too soon enough +func getLongLivedLocks(m map[string][]lockRequesterInfo, interval time.Duration) []nameLockRequesterInfoPair { + rslt := []nameLockRequesterInfoPair{} + for name, lriArray := range m { + for idx := range lriArray { + // Check whether enough time has gone by since last check + if time.Since(lriArray[idx].timeLastCheck) >= interval { + rslt = append(rslt, nameLockRequesterInfoPair{name: name, lri: lriArray[idx]}) + lriArray[idx].timeLastCheck = time.Now().UTC() + } + } + } + return rslt +} diff --git a/cmd/lock-rpc-server-common_test.go b/cmd/lock-rpc-server-common_test.go new file mode 100644 index 000000000..3e5b9b86b --- /dev/null +++ b/cmd/lock-rpc-server-common_test.go @@ -0,0 +1,84 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "reflect" + "testing" + "time" +) + +// Tests function returning long lived locks. +func TestGetLongLivedLocks(t *testing.T) { + ut := time.Now().UTC() + // Collection of test cases for verifying returning valid long lived locks. + testCases := []struct { + lockMap map[string][]lockRequesterInfo + lockInterval time.Duration + expectedNSLR []nameLockRequesterInfoPair + }{ + // Testcase - 1 validates long lived locks, returns empty list. + { + lockMap: map[string][]lockRequesterInfo{ + "test": {{ + writer: true, + node: "10.1.10.21", + rpcPath: "/lock/mnt/disk1", + uid: "10000112", + timestamp: ut, + timeLastCheck: ut, + }}, + }, + lockInterval: 1 * time.Minute, + expectedNSLR: []nameLockRequesterInfoPair{}, + }, + // Testcase - 2 validates long lived locks, returns at least one list. + { + lockMap: map[string][]lockRequesterInfo{ + "test": {{ + writer: true, + node: "10.1.10.21", + rpcPath: "/lock/mnt/disk1", + uid: "10000112", + timestamp: ut, + timeLastCheck: ut.Add(-2 * time.Minute), + }}, + }, + lockInterval: 1 * time.Minute, + expectedNSLR: []nameLockRequesterInfoPair{ + { + name: "test", + lri: lockRequesterInfo{ + writer: true, + node: "10.1.10.21", + rpcPath: "/lock/mnt/disk1", + uid: "10000112", + timestamp: ut, + timeLastCheck: ut.Add(-2 * time.Minute), + }, + }, + }, + }, + } + // Validates all test cases here. + for i, testCase := range testCases { + nsLR := getLongLivedLocks(testCase.lockMap, testCase.lockInterval) + if !reflect.DeepEqual(testCase.expectedNSLR, nsLR) { + t.Errorf("Test %d: Expected %#v, got %#v", i+1, testCase.expectedNSLR, nsLR) + } + } +} diff --git a/cmd/lock-rpc-server.go b/cmd/lock-rpc-server.go index 3e3fefa7e..a4e455fb2 100644 --- a/cmd/lock-rpc-server.go +++ b/cmd/lock-rpc-server.go @@ -70,20 +70,71 @@ func isWriteLock(lri []lockRequesterInfo) bool { // lockServer is type for RPC handlers type lockServer struct { - rpcPath string - mutex sync.Mutex - lockMap map[string][]lockRequesterInfo - timestamp time.Time // Timestamp set at the time of initialization. Resets naturally on minio server restart. + rpcPath string + mutex sync.Mutex + lockMap map[string][]lockRequesterInfo + // Timestamp set at the time of initialization. Resets naturally on minio server restart. + timestamp time.Time } -func (l *lockServer) verifyArgs(args *LockArgs) error { - if !l.timestamp.Equal(args.Timestamp) { - return errInvalidTimestamp +// Initialize distributed name space lock. +func initDistributedNSLock(mux *router.Router, serverConfig serverCmdConfig) { + lockServers := newLockServers(serverConfig) + registerStorageLockers(mux, lockServers) +} + +// Create one lock server for every local storage rpc server. +func newLockServers(serverConfig serverCmdConfig) (lockServers []*lockServer) { + // Initialize posix storage API. + exports := serverConfig.disks + ignoredExports := serverConfig.ignoredDisks + + // Save ignored disks in a map + skipDisks := make(map[string]bool) + for _, ignoredExport := range ignoredExports { + skipDisks[ignoredExport] = true } - if !isRPCTokenValid(args.Token) { - return errInvalidToken + for _, export := range exports { + if skipDisks[export] { + continue + } + // Not local storage move to the next node. + if !isLocalStorage(export) { + continue + } + if idx := strings.LastIndex(export, ":"); idx != -1 { + export = export[idx+1:] + } + // Create handler for lock RPCs + locker := &lockServer{ + rpcPath: export, + mutex: sync.Mutex{}, + lockMap: make(map[string][]lockRequesterInfo), + timestamp: time.Now().UTC(), + } + + // Start loop for stale lock maintenance + go func() { + // Start with random sleep time, so as to avoid "synchronous checks" between servers + time.Sleep(time.Duration(rand.Float64() * float64(lockMaintenanceLoop))) + for { + time.Sleep(lockMaintenanceLoop) + locker.lockMaintenance(lockCheckValidityInterval) + } + }() + lockServers = append(lockServers, locker) + } + return lockServers +} + +// registerStorageLockers - register locker rpc handlers for net/rpc library clients +func registerStorageLockers(mux *router.Router, lockServers []*lockServer) { + for _, lockServer := range lockServers { + lockRPCServer := rpc.NewServer() + lockRPCServer.RegisterName("Dsync", lockServer) + lockRouter := mux.PathPrefix(reservedBucket).Subrouter() + lockRouter.Path(path.Join("/lock", lockServer.rpcPath)).Handler(lockRPCServer) } - return nil } /// Distributed lock handlers @@ -110,12 +161,21 @@ func (l *lockServer) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) erro func (l *lockServer) Lock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() - if err := l.verifyArgs(args); err != nil { + if err := l.validateLockArgs(args); err != nil { return err } _, *reply = l.lockMap[args.Name] if !*reply { // No locks held on the given name, so claim write lock - l.lockMap[args.Name] = []lockRequesterInfo{{writer: true, node: args.Node, rpcPath: args.RPCPath, uid: args.UID, timestamp: time.Now(), timeLastCheck: time.Now()}} + l.lockMap[args.Name] = []lockRequesterInfo{ + { + writer: true, + node: args.Node, + rpcPath: args.RPCPath, + uid: args.UID, + timestamp: time.Now().UTC(), + timeLastCheck: time.Now().UTC(), + }, + } } *reply = !*reply // Negate *reply to return true when lock is granted or false otherwise return nil @@ -125,39 +185,44 @@ func (l *lockServer) Lock(args *LockArgs, reply *bool) error { func (l *lockServer) Unlock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() - if err := l.verifyArgs(args); err != nil { + if err := l.validateLockArgs(args); err != nil { return err } var lri []lockRequesterInfo - lri, *reply = l.lockMap[args.Name] - if !*reply { // No lock is held on the given name + if lri, *reply = l.lockMap[args.Name]; !*reply { // No lock is held on the given name return fmt.Errorf("Unlock attempted on an unlocked entity: %s", args.Name) } if *reply = isWriteLock(lri); !*reply { // Unless it is a write lock return fmt.Errorf("Unlock attempted on a read locked entity: %s (%d read locks active)", args.Name, len(lri)) } - if l.removeEntry(args.Name, args.UID, &lri) { - return nil + if !l.removeEntry(args.Name, args.UID, &lri) { + return fmt.Errorf("Unlock unable to find corresponding lock for uid: %s", args.UID) } - return fmt.Errorf("Unlock unable to find corresponding lock for uid: %s", args.UID) + return nil } // RLock - rpc handler for read lock operation. func (l *lockServer) RLock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() - if err := l.verifyArgs(args); err != nil { + if err := l.validateLockArgs(args); err != nil { return err } - var lri []lockRequesterInfo - lri, *reply = l.lockMap[args.Name] - if !*reply { // No locks held on the given name, so claim (first) read lock - l.lockMap[args.Name] = []lockRequesterInfo{{writer: false, node: args.Node, rpcPath: args.RPCPath, uid: args.UID, timestamp: time.Now(), timeLastCheck: time.Now()}} - *reply = true - } else { + lrInfo := lockRequesterInfo{ + writer: false, + node: args.Node, + rpcPath: args.RPCPath, + uid: args.UID, + timestamp: time.Now().UTC(), + timeLastCheck: time.Now().UTC(), + } + if lri, ok := l.lockMap[args.Name]; ok { if *reply = !isWriteLock(lri); *reply { // Unless there is a write lock - l.lockMap[args.Name] = append(l.lockMap[args.Name], lockRequesterInfo{writer: false, node: args.Node, rpcPath: args.RPCPath, uid: args.UID, timestamp: time.Now(), timeLastCheck: time.Now()}) + l.lockMap[args.Name] = append(l.lockMap[args.Name], lrInfo) } + } else { // No locks held on the given name, so claim (first) read lock + l.lockMap[args.Name] = []lockRequesterInfo{lrInfo} + *reply = true } return nil } @@ -166,7 +231,7 @@ func (l *lockServer) RLock(args *LockArgs, reply *bool) error { func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() - if err := l.verifyArgs(args); err != nil { + if err := l.validateLockArgs(args); err != nil { return err } var lri []lockRequesterInfo @@ -176,49 +241,33 @@ func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error { if *reply = !isWriteLock(lri); !*reply { // A write-lock is held, cannot release a read lock return fmt.Errorf("RUnlock attempted on a write locked entity: %s", args.Name) } - if l.removeEntry(args.Name, args.UID, &lri) { - return nil + if !l.removeEntry(args.Name, args.UID, &lri) { + return fmt.Errorf("RUnlock unable to find corresponding read lock for uid: %s", args.UID) } - return fmt.Errorf("RUnlock unable to find corresponding read lock for uid: %s", args.UID) + return nil } -// Active - rpc handler for active lock status. -func (l *lockServer) Active(args *LockArgs, reply *bool) error { +// Expired - rpc handler for expired lock status. +func (l *lockServer) Expired(args *LockArgs, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() - if err := l.verifyArgs(args); err != nil { + if err := l.validateLockArgs(args); err != nil { return err } - var lri []lockRequesterInfo - if lri, *reply = l.lockMap[args.Name]; !*reply { - return nil // No lock is held on the given name so return false - } - // Check whether uid is still active - for _, entry := range lri { - if *reply = entry.uid == args.UID; *reply { - return nil // When uid found return true - } - } - return nil // None found so return false -} - -// removeEntry either, based on the uid of the lock message, removes a single entry from the -// lockRequesterInfo array or the whole array from the map (in case of a write lock or last read lock) -func (l *lockServer) removeEntry(name, uid string, lri *[]lockRequesterInfo) bool { - // Find correct entry to remove based on uid - for index, entry := range *lri { - if entry.uid == uid { - if len(*lri) == 1 { - delete(l.lockMap, name) // Remove the (last) lock - } else { - // Remove the appropriate read lock - *lri = append((*lri)[:index], (*lri)[index+1:]...) - l.lockMap[name] = *lri + // Lock found, proceed to verify if belongs to given uid. + if lri, ok := l.lockMap[args.Name]; ok { + // Check whether uid is still active + for _, entry := range lri { + if entry.uid == args.UID { + *reply = false // When uid found, lock is still active so return not expired. + return nil // When uid found *reply is set to true. } - return true } } - return false + // When we get here lock is no longer active due to either args.Name + // being absent from map or uid not found for given args.Name + *reply = true + return nil } // nameLockRequesterInfoPair is a helper type for lock maintenance @@ -227,132 +276,41 @@ type nameLockRequesterInfoPair struct { lri lockRequesterInfo } -// getLongLivedLocks returns locks that are older than a certain time and -// have not been 'checked' for validity too soon enough -func getLongLivedLocks(m map[string][]lockRequesterInfo, interval time.Duration) []nameLockRequesterInfoPair { - - rslt := []nameLockRequesterInfoPair{} - - for name, lriArray := range m { - - for idx := range lriArray { - // Check whether enough time has gone by since last check - if time.Since(lriArray[idx].timeLastCheck) >= interval { - rslt = append(rslt, nameLockRequesterInfoPair{name: name, lri: lriArray[idx]}) - lriArray[idx].timeLastCheck = time.Now() - } - } - } - - return rslt -} - // lockMaintenance loops over locks that have been active for some time and checks back // with the original server whether it is still alive or not +// +// Following logic inside ignores the errors generated for Dsync.Active operation. +// - server at client down +// - some network error (and server is up normally) +// +// We will ignore the error, and we will retry later to get a resolve on this lock func (l *lockServer) lockMaintenance(interval time.Duration) { - l.mutex.Lock() - // get list of locks to check + // Get list of long lived locks to check for staleness. nlripLongLived := getLongLivedLocks(l.lockMap, interval) l.mutex.Unlock() + // Validate if long lived locks are indeed clean. for _, nlrip := range nlripLongLived { - + // Initialize client based on the long live locks. c := newClient(nlrip.lri.node, nlrip.lri.rpcPath) - var active bool + var expired bool // Call back to original server verify whether the lock is still active (based on name & uid) - if err := c.Call("Dsync.Active", &LockArgs{Name: nlrip.name, UID: nlrip.lri.uid}, &active); err != nil { - // We failed to connect back to the server that originated the lock, this can either be due to - // - server at client down - // - some network error (and server is up normally) - // - // We will ignore the error, and we will retry later to get resolve on this lock - c.Close() - } else { - c.Close() - - if !active { // The lock is no longer active at server that originated the lock - // so remove the lock from the map - l.mutex.Lock() - // Check if entry is still in map (could have been removed altogether by 'concurrent' (R)Unlock of last entry) - if lri, ok := l.lockMap[nlrip.name]; ok { - if !l.removeEntry(nlrip.name, nlrip.lri.uid, &lri) { - // Remove failed, in case it is a: - if nlrip.lri.writer { - // Writer: this should never happen as the whole (mapped) entry should have been deleted - log.Errorln("Lock maintenance failed to remove entry for write lock (should never happen)", nlrip.name, nlrip.lri, lri) - } else { - // Reader: this can happen if multiple read locks were active and the one we are looking for - // has been released concurrently (so it is fine) - } - } else { - // remove went okay, all is fine - } - } - l.mutex.Unlock() - } - } - } -} - -// Initialize distributed lock. -func initDistributedNSLock(mux *router.Router, serverConfig serverCmdConfig) { - lockServers := newLockServers(serverConfig) - registerStorageLockers(mux, lockServers) -} - -// Create one lock server for every local storage rpc server. -func newLockServers(serverConfig serverCmdConfig) (lockServers []*lockServer) { - // Initialize posix storage API. - exports := serverConfig.disks - ignoredExports := serverConfig.ignoredDisks - - // Save ignored disks in a map - skipDisks := make(map[string]bool) - for _, ignoredExport := range ignoredExports { - skipDisks[ignoredExport] = true - } - for _, export := range exports { - if skipDisks[export] { - continue - } - if isLocalStorage(export) { - if idx := strings.LastIndex(export, ":"); idx != -1 { - export = export[idx+1:] - } - - // Create handler for lock RPCs - locker := &lockServer{ - rpcPath: export, - mutex: sync.Mutex{}, - lockMap: make(map[string][]lockRequesterInfo), - timestamp: time.Now().UTC(), - } - - // Start loop for stale lock maintenance - go func() { - // Start with random sleep time, so as to avoid "synchronous checks" between servers - time.Sleep(time.Duration(rand.Float64() * float64(lockMaintenanceLoop))) - for { - time.Sleep(lockMaintenanceLoop) - locker.lockMaintenance(lockCheckValidityInterval) - } - }() - - lockServers = append(lockServers, locker) + c.Call("Dsync.Expired", &LockArgs{ + Name: nlrip.name, + UID: nlrip.lri.uid, + }, &expired) + c.Close() // Close the connection regardless of the call response. + + // For successful response, verify if lock is indeed active or stale. + if expired { + // The lock is no longer active at server that originated the lock + // So remove the lock from the map. + l.mutex.Lock() + l.removeEntryIfExists(nlrip) // Purge the stale entry if it exists. + l.mutex.Unlock() } } - return lockServers -} - -// registerStorageLockers - register locker rpc handlers for net/rpc library clients -func registerStorageLockers(mux *router.Router, lockServers []*lockServer) { - for _, lockServer := range lockServers { - lockRPCServer := rpc.NewServer() - lockRPCServer.RegisterName("Dsync", lockServer) - lockRouter := mux.PathPrefix(reservedBucket).Subrouter() - lockRouter.Path(path.Join("/lock", lockServer.rpcPath)).Handler(lockRPCServer) - } } diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index 25b6101f8..010abbcf5 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -56,7 +56,6 @@ func initDsyncNodes(disks []string, port int) error { } } } - return dsync.SetNodesWithClients(clnts, myNode) } @@ -100,12 +99,14 @@ type nsLock struct { // Unlock, RLock and RUnlock. type nsLockMap struct { // lock counter used for lock debugging. - globalLockCounter int64 //total locks held. - blockedCounter int64 // total operations blocked waiting for locks. - runningLockCounter int64 // total locks held but not released yet. - debugLockMap map[nsParam]*debugLockInfoPerVolumePath // info for instrumentation on locks. - - isDist bool // indicates whether the locking service is part of a distributed setup or not. + globalLockCounter int64 // Total locks held. + blockedCounter int64 // Total operations blocked waiting for locks. + runningLockCounter int64 // Total locks held but not released yet. + debugLockMap map[nsParam]*debugLockInfoPerVolumePath // Info for instrumentation on locks. + + // Indicates whether the locking service is part + // of a distributed setup or not. + isDist bool lockMap map[nsParam]*nsLock lockMapMutex sync.Mutex } @@ -132,13 +133,14 @@ func (n *nsLockMap) lock(volume, path string, lockOrigin, opsID string, readLock nsLk.ref++ // Update ref count here to avoid multiple races. if globalDebugLock { - // change the state of the lock to be blocked for the given pair of and till the lock unblocks. - // The lock for accessing `nsMutex` is held inside the function itself. - err := n.statusNoneToBlocked(param, lockOrigin, opsID, readLock) - if err != nil { + // Change the state of the lock to be blocked for the given pair of + // and till the lock unblocks. The lock for accessing `nsMutex` is + // held inside the function itself. + if err := n.statusNoneToBlocked(param, lockOrigin, opsID, readLock); err != nil { errorIf(err, "Failed to set lock state to blocked.") } } + // Unlock map before Locking NS which might block. n.lockMapMutex.Unlock() @@ -149,12 +151,12 @@ func (n *nsLockMap) lock(volume, path string, lockOrigin, opsID string, readLock nsLk.Lock() } - // check if lock debugging enabled. + // Check if lock debugging enabled. if globalDebugLock { // Changing the status of the operation from blocked to running. - // change the state of the lock to be running (from blocked) for the given pair of and . - err := n.statusBlockedToRunning(param, lockOrigin, opsID, readLock) - if err != nil { + // change the state of the lock to be running (from blocked) for + // the given pair of and . + if err := n.statusBlockedToRunning(param, lockOrigin, opsID, readLock); err != nil { errorIf(err, "Failed to set the lock state to running.") } } @@ -178,7 +180,7 @@ func (n *nsLockMap) unlock(volume, path, opsID string, readLock bool) { } if nsLk.ref != 0 { nsLk.ref-- - // locking debug enabled, delete the lock state entry for given operation ID. + // Locking debug enabled, delete the lock state entry for given operation ID. if globalDebugLock { err := n.deleteLockInfoEntryForOps(param, opsID) if err != nil { @@ -190,7 +192,7 @@ func (n *nsLockMap) unlock(volume, path, opsID string, readLock bool) { // Remove from the map if there are no more references. delete(n.lockMap, param) - // locking debug enabled, delete the lock state entry for given pair. + // Locking debug enabled, delete the lock state entry for given pair. if globalDebugLock { err := n.deleteLockInfoEntryForVolumePath(param) if err != nil { @@ -205,9 +207,10 @@ func (n *nsLockMap) unlock(volume, path, opsID string, readLock bool) { // allocated name space lock or initializing a new one. func (n *nsLockMap) Lock(volume, path, opsID string) { var lockOrigin string - // lock debugging enabled. The caller information of the lock held has be obtained here before calling any other function. + // Lock debugging enabled. The caller information of the lock held has + // been obtained here before calling any other function. if globalDebugLock { - // fetching the package, function name and the line number of the caller from the runtime. + // Fetching the package, function name and the line number of the caller from the runtime. // here is an example https://play.golang.org/p/perrmNRI9_ . pc, fn, line, success := runtime.Caller(1) if !success { @@ -229,10 +232,11 @@ func (n *nsLockMap) Unlock(volume, path, opsID string) { func (n *nsLockMap) RLock(volume, path, opsID string) { var lockOrigin string readLock := true - // lock debugging enabled. The caller information of the lock held has be obtained here before calling any other function. + // Lock debugging enabled. The caller information of the lock held has + // been obtained here before calling any other function. if globalDebugLock { - // fetching the package, function name and the line number of the caller from the runtime. - // here is an example https://play.golang.org/p/perrmNRI9_ . + // Fetching the package, function name and the line number of the + // caller from the runtime. Here is an example https://play.golang.org/p/perrmNRI9_ . pc, fn, line, success := runtime.Caller(1) if !success { errorIf(errors.New("Couldn't get caller info."), "Fetching caller info form runtime failed.") diff --git a/cmd/server-mux_test.go b/cmd/server-mux_test.go index bdf556025..072068ffa 100644 --- a/cmd/server-mux_test.go +++ b/cmd/server-mux_test.go @@ -293,8 +293,8 @@ func generateTestCert(host string) error { Subject: pkix.Name{ Organization: []string{"Minio Test Cert"}, }, - NotBefore: time.Now(), - NotAfter: time.Now().Add(time.Minute * 1), + NotBefore: time.Now().UTC(), + NotAfter: time.Now().UTC().Add(time.Minute * 1), KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature, ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, diff --git a/cmd/signature-v4-parser_test.go b/cmd/signature-v4-parser_test.go index d0ec21cac..94db8fa20 100644 --- a/cmd/signature-v4-parser_test.go +++ b/cmd/signature-v4-parser_test.go @@ -565,7 +565,7 @@ func TestParsePreSignV4(t *testing.T) { return strconv.FormatInt(int64(expires), 10) } // used in expected preSignValues, preSignValues.Date is of type time.Time . - queryTime := time.Now() + queryTime := time.Now().UTC() sampleTimeStr := time.Now().UTC().Format(yyyymmdd)