From 720442b1a2acb97712d18c1d86eedb8a27687780 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 25 Nov 2019 16:39:43 -0800 Subject: [PATCH] Add lock expiry handler to expire state locks (#8562) --- buildscripts/verify-build.sh | 4 +- cmd/local-locker.go | 32 ++++++++ cmd/lock-rest-client.go | 9 ++- cmd/lock-rest-server-common.go | 6 +- cmd/lock-rest-server.go | 129 ++++++++++++++++++++++++++++++ pkg/dsync/rpc-client-impl_test.go | 6 +- pkg/dsync/rpc-client-interface.go | 3 + 7 files changed, 182 insertions(+), 7 deletions(-) diff --git a/buildscripts/verify-build.sh b/buildscripts/verify-build.sh index 87376a3af..b9c12448c 100755 --- a/buildscripts/verify-build.sh +++ b/buildscripts/verify-build.sh @@ -125,7 +125,7 @@ function start_minio_dist_erasure_sets() "${MINIO[@]}" server --address=:9009 "http://127.0.0.1:9000${WORK_DIR}/dist-disk-sets1" "http://127.0.0.1:9001${WORK_DIR}/dist-disk-sets2" "http://127.0.0.1:9002${WORK_DIR}/dist-disk-sets3" "http://127.0.0.1:9003${WORK_DIR}/dist-disk-sets4" "http://127.0.0.1:9004${WORK_DIR}/dist-disk-sets5" "http://127.0.0.1:9005${WORK_DIR}/dist-disk-sets6" "http://127.0.0.1:9006${WORK_DIR}/dist-disk-sets7" "http://127.0.0.1:9007${WORK_DIR}/dist-disk-sets8" "http://127.0.0.1:9008${WORK_DIR}/dist-disk-sets9" "http://127.0.0.1:9009${WORK_DIR}/dist-disk-sets10" "http://127.0.0.1:9000${WORK_DIR}/dist-disk-sets11" "http://127.0.0.1:9001${WORK_DIR}/dist-disk-sets12" "http://127.0.0.1:9002${WORK_DIR}/dist-disk-sets13" "http://127.0.0.1:9003${WORK_DIR}/dist-disk-sets14" "http://127.0.0.1:9004${WORK_DIR}/dist-disk-sets15" "http://127.0.0.1:9005${WORK_DIR}/dist-disk-sets16" "http://127.0.0.1:9006${WORK_DIR}/dist-disk-sets17" "http://127.0.0.1:9007${WORK_DIR}/dist-disk-sets18" "http://127.0.0.1:9008${WORK_DIR}/dist-disk-sets19" "http://127.0.0.1:9009${WORK_DIR}/dist-disk-sets20" >"$WORK_DIR/dist-minio-9009.log" 2>&1 & minio_pids[9]=$! - sleep 30 + sleep 35 echo "${minio_pids[@]}" } @@ -170,7 +170,7 @@ function start_minio_dist_erasure() "${MINIO[@]}" server --address=:9003 "http://127.0.0.1:9000${WORK_DIR}/dist-disk1" "http://127.0.0.1:9001${WORK_DIR}/dist-disk2" "http://127.0.0.1:9002${WORK_DIR}/dist-disk3" "http://127.0.0.1:9003${WORK_DIR}/dist-disk4" >"$WORK_DIR/dist-minio-9003.log" 2>&1 & minio_pids[3]=$! - sleep 30 + sleep 35 echo "${minio_pids[@]}" } diff --git a/cmd/local-locker.go b/cmd/local-locker.go index acc37c2ff..2f9a2e6ee 100644 --- a/cmd/local-locker.go +++ b/cmd/local-locker.go @@ -69,6 +69,10 @@ func (d *errorLocker) IsOnline() bool { return false } +func (d *errorLocker) Expired(args dsync.LockArgs) (reply bool, err error) { + return false, errors.New("unable to check for lock expiration") +} + // localLocker implements Dsync.NetLocker type localLocker struct { mutex sync.Mutex @@ -202,6 +206,34 @@ func (l *localLocker) IsOnline() bool { return true } +func (l *localLocker) Expired(args dsync.LockArgs) (expired bool, err error) { + l.mutex.Lock() + defer l.mutex.Unlock() + + // Lock found, proceed to verify if belongs to given uid. + if lri, ok := l.lockMap[args.Resource]; ok { + // Check whether uid is still active + for _, entry := range lri { + if entry.UID == args.UID { + return false, nil + } + } + } + return true, nil +} + +// Similar to removeEntry but only removes an entry only if the lock entry exists in map. +// Caller must hold 'l.mutex' lock. +func (l *localLocker) removeEntryIfExists(nlrip nameLockRequesterInfoPair) { + // Check if entry is still in map (could have been removed altogether by 'concurrent' (R)Unlock of last entry) + if lri, ok := l.lockMap[nlrip.name]; ok { + // Even if the entry exists, it may not be the same entry which was + // considered as expired, so we simply an attempt to remove it if its + // not possible there is nothing we need to do. + l.removeEntry(nlrip.name, nlrip.lri.UID, &lri) + } +} + func newLocker(endpoint Endpoint) *localLocker { return &localLocker{ endpoint: endpoint, diff --git a/cmd/lock-rest-client.go b/cmd/lock-rest-client.go index c4c8793b5..87ab6f5af 100644 --- a/cmd/lock-rest-client.go +++ b/cmd/lock-rest-client.go @@ -47,6 +47,8 @@ func toLockError(err error) error { switch err.Error() { case errLockConflict.Error(): return errLockConflict + case errLockNotExpired.Error(): + return errLockNotExpired } return err } @@ -109,7 +111,7 @@ func (client *lockRESTClient) restCall(call string, args dsync.LockArgs) (reply switch err { case nil: return true, nil - case errLockConflict: + case errLockConflict, errLockNotExpired: return false, nil default: return false, err @@ -136,6 +138,11 @@ func (client *lockRESTClient) Unlock(args dsync.LockArgs) (reply bool, err error return client.restCall(lockRESTMethodUnlock, args) } +// Expired calls expired handler to check if lock args have expired. +func (client *lockRESTClient) Expired(args dsync.LockArgs) (expired bool, err error) { + return client.restCall(lockRESTMethodExpired, args) +} + func closeLockers(lockers []dsync.NetLocker) { for _, locker := range lockers { locker.Close() diff --git a/cmd/lock-rest-server-common.go b/cmd/lock-rest-server-common.go index e38dabf20..f11ae2fa3 100644 --- a/cmd/lock-rest-server-common.go +++ b/cmd/lock-rest-server-common.go @@ -31,6 +31,7 @@ const ( lockRESTMethodRLock = "/rlock" lockRESTMethodUnlock = "/unlock" lockRESTMethodRUnlock = "/runlock" + lockRESTMethodExpired = "/expired" // Unique ID of lock/unlock request. lockRESTUID = "uid" @@ -41,4 +42,7 @@ const ( lockRESTResource = "resource" ) -var errLockConflict = errors.New("lock conflict") +var ( + errLockConflict = errors.New("lock conflict") + errLockNotExpired = errors.New("lock not expired") +) diff --git a/cmd/lock-rest-server.go b/cmd/lock-rest-server.go index af340867d..845ef2a8e 100644 --- a/cmd/lock-rest-server.go +++ b/cmd/lock-rest-server.go @@ -18,6 +18,7 @@ package cmd import ( "errors" + "math/rand" "net/http" "path" "time" @@ -127,6 +128,131 @@ func (l *lockRESTServer) RUnlockHandler(w http.ResponseWriter, r *http.Request) } } +// ExpiredHandler - query expired lock status. +func (l *lockRESTServer) ExpiredHandler(w http.ResponseWriter, r *http.Request) { + if !l.IsValid(w, r) { + l.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + lockArgs := getLockArgs(r) + + l.ll.mutex.Lock() + defer l.ll.mutex.Unlock() + // Lock found, proceed to verify if belongs to given uid. + if lri, ok := l.ll.lockMap[lockArgs.Resource]; ok { + // Check whether uid is still active + for _, entry := range lri { + if entry.UID == lockArgs.UID { + l.writeErrorResponse(w, errLockNotExpired) + return + } + } + } +} + +// nameLockRequesterInfoPair is a helper type for lock maintenance +type nameLockRequesterInfoPair struct { + name string + lri lockRequesterInfo +} + +// getLongLivedLocks returns locks that are older than a certain time and +// have not been 'checked' for validity too soon enough +func getLongLivedLocks(interval time.Duration) map[Endpoint][]nameLockRequesterInfoPair { + nlripMap := make(map[Endpoint][]nameLockRequesterInfoPair) + for endpoint, locker := range globalLockServers { + rslt := []nameLockRequesterInfoPair{} + locker.mutex.Lock() + for name, lriArray := range locker.lockMap { + for idx := range lriArray { + // Check whether enough time has gone by since last check + if time.Since(lriArray[idx].TimeLastCheck) >= interval { + rslt = append(rslt, nameLockRequesterInfoPair{name: name, lri: lriArray[idx]}) + lriArray[idx].TimeLastCheck = UTCNow() + } + } + } + nlripMap[endpoint] = rslt + locker.mutex.Unlock() + } + return nlripMap +} + +// lockMaintenance loops over locks that have been active for some time and checks back +// with the original server whether it is still alive or not +// +// Following logic inside ignores the errors generated for Dsync.Active operation. +// - server at client down +// - some network error (and server is up normally) +// +// We will ignore the error, and we will retry later to get a resolve on this lock +func lockMaintenance(interval time.Duration) { + // Validate if long lived locks are indeed clean. + // Get list of long lived locks to check for staleness. + for lendpoint, nlrips := range getLongLivedLocks(interval) { + for _, nlrip := range nlrips { + for _, ep := range globalEndpoints { + for _, endpoint := range ep.Endpoints { + if endpoint.String() == lendpoint.String() { + continue + } + + c := newLockAPI(endpoint) + if !c.IsOnline() { + continue + } + + // Call back to original server verify whether the lock is still active (based on name & uid) + expired, err := c.Expired(dsync.LockArgs{ + UID: nlrip.lri.UID, + Resource: nlrip.name, + }) + + if err != nil { + c.Close() + continue + } + + // For successful response, verify if lock was indeed active or stale. + if expired { + // The lock is no longer active at server that originated + // the lock, attempt to remove the lock. + globalLockServers[lendpoint].mutex.Lock() + // Purge the stale entry if it exists. + globalLockServers[lendpoint].removeEntryIfExists(nlrip) + globalLockServers[lendpoint].mutex.Unlock() + } + + // Close the connection regardless of the call response. + c.Close() + } + } + } + } +} + +// Start lock maintenance from all lock servers. +func startLockMaintenance() { + // Start with random sleep time, so as to avoid "synchronous checks" between servers + time.Sleep(time.Duration(rand.Float64() * float64(lockMaintenanceInterval))) + + // Initialize a new ticker with a minute between each ticks. + ticker := time.NewTicker(lockMaintenanceInterval) + // Stop the timer upon service closure and cleanup the go-routine. + defer ticker.Stop() + + for { + // Verifies every minute for locks held more than 2 minutes. + select { + case <-GlobalServiceDoneCh: + return + case <-ticker.C: + lockMaintenance(lockValidityCheckInterval) + } + } +} + // registerLockRESTHandlers - register lock rest router. func registerLockRESTHandlers(router *mux.Router, endpointZones EndpointZones) { queries := restQueries(lockRESTUID, lockRESTSource, lockRESTResource) @@ -145,8 +271,11 @@ func registerLockRESTHandlers(router *mux.Router, endpointZones EndpointZones) { subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(lockServer.RLockHandler)).Queries(queries...) subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler)).Queries(queries...) subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler)).Queries(queries...) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodExpired).HandlerFunc(httpTraceAll(lockServer.ExpiredHandler)).Queries(queries...) globalLockServers[endpoint] = lockServer.ll } } + + go startLockMaintenance() } diff --git a/pkg/dsync/rpc-client-impl_test.go b/pkg/dsync/rpc-client-impl_test.go index f9f2065d8..042ec563d 100644 --- a/pkg/dsync/rpc-client-impl_test.go +++ b/pkg/dsync/rpc-client-impl_test.go @@ -109,9 +109,9 @@ func (rpcClient *ReconnectRPCClient) Unlock(args LockArgs) (status bool, err err return status, err } -func (rpcClient *ReconnectRPCClient) ForceUnlock(args LockArgs) (status bool, err error) { - err = rpcClient.Call("Dsync.ForceUnlock", &args, &status) - return status, err +func (rpcClient *ReconnectRPCClient) Expired(args LockArgs) (expired bool, err error) { + err = rpcClient.Call("Dsync.Expired", &args, &expired) + return expired, err } func (rpcClient *ReconnectRPCClient) String() string { diff --git a/pkg/dsync/rpc-client-interface.go b/pkg/dsync/rpc-client-interface.go index ffce5f5c7..086787dd9 100644 --- a/pkg/dsync/rpc-client-interface.go +++ b/pkg/dsync/rpc-client-interface.go @@ -51,6 +51,9 @@ type NetLocker interface { // * an error on failure of unlock request operation. Unlock(args LockArgs) (bool, error) + // Expired returns if current lock args has expired. + Expired(args LockArgs) (bool, error) + // Returns underlying endpoint of this lock client instance. String() string