From fe157166cae7ae505fd72cf07a37431decef1515 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 29 Jul 2020 23:15:34 -0700 Subject: [PATCH] fix: Pass context all the way down to the network call in lockers (#10161) Context timeout might race on each other when timeouts are lower i.e when two lock attempts happened very quickly on the same resource and the servers were yet trying to establish quorum. This situation can lead to locks held which wouldn't be unlocked and subsequent lock attempts would fail. This would require a complete server restart. A potential of this issue happening is when server is booting up and we are trying to hold a 'transaction.lock' in quick bursts of timeout. --- cmd/healthcheck-handler.go | 2 +- cmd/iam.go | 2 +- cmd/local-locker.go | 118 +++++++++++++++++------------- cmd/lock-rest-client.go | 22 +++--- cmd/lock-rest-client_test.go | 5 +- cmd/lock-rest-server.go | 36 ++++----- cmd/rest/client.go | 12 ++- cmd/server-main.go | 2 +- pkg/dsync/drwmutex.go | 14 ++-- pkg/dsync/rpc-client-impl_test.go | 7 +- pkg/dsync/rpc-client-interface.go | 8 +- 11 files changed, 129 insertions(+), 99 deletions(-) diff --git a/cmd/healthcheck-handler.go b/cmd/healthcheck-handler.go index 8c3e5a9ef..5c1335bae 100644 --- a/cmd/healthcheck-handler.go +++ b/cmd/healthcheck-handler.go @@ -23,7 +23,7 @@ import ( // ClusterCheckHandler returns if the server is ready for requests. func ClusterCheckHandler(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "ClusterCheckCheckHandler") + ctx := newContext(r, w, "ClusterCheckHandler") objLayer := newObjectLayerFn() // Service not initialized yet diff --git a/cmd/iam.go b/cmd/iam.go index 1007be641..5ce1b975a 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -451,7 +451,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) { for range retry.NewTimerWithJitter(retryCtx, time.Second, 5*time.Second, retry.MaxJitter) { // let one of the server acquire the lock, if not let them timeout. // which shall be retried again by this loop. - if err := txnLk.GetLock(newDynamicTimeout(1*time.Second, 5*time.Second)); err != nil { + if err := txnLk.GetLock(newDynamicTimeout(3*time.Second, 5*time.Second)); err != nil { logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. trying to acquire lock") continue } diff --git a/cmd/local-locker.go b/cmd/local-locker.go index 707613e26..0f8e5068f 100644 --- a/cmd/local-locker.go +++ b/cmd/local-locker.go @@ -17,6 +17,7 @@ package cmd import ( + "context" "fmt" "sync" "time" @@ -71,30 +72,35 @@ func (l *localLocker) canTakeLock(resources ...string) bool { return noLkCnt == len(resources) } -func (l *localLocker) Lock(args dsync.LockArgs) (reply bool, err error) { - l.mutex.Lock() - defer l.mutex.Unlock() +func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + l.mutex.Lock() + defer l.mutex.Unlock() - if !l.canTakeLock(args.Resources...) { - // Not all locks can be taken on resources, - // reject it completely. - return false, nil - } + if !l.canTakeLock(args.Resources...) { + // Not all locks can be taken on resources, + // reject it completely. + return false, nil + } - // No locks held on the all resources, so claim write - // lock on all resources at once. - for _, resource := range args.Resources { - l.lockMap[resource] = []lockRequesterInfo{ - { - Writer: true, - Source: args.Source, - UID: args.UID, - Timestamp: UTCNow(), - TimeLastCheck: UTCNow(), - }, + // No locks held on the all resources, so claim write + // lock on all resources at once. + for _, resource := range args.Resources { + l.lockMap[resource] = []lockRequesterInfo{ + { + Writer: true, + Source: args.Source, + UID: args.UID, + Timestamp: UTCNow(), + TimeLastCheck: UTCNow(), + }, + } } + return true, nil } - return true, nil } func (l *localLocker) Unlock(args dsync.LockArgs) (reply bool, err error) { @@ -138,28 +144,33 @@ func (l *localLocker) removeEntry(name, uid string, lri *[]lockRequesterInfo) bo return false } -func (l *localLocker) RLock(args dsync.LockArgs) (reply bool, err error) { - l.mutex.Lock() - defer l.mutex.Unlock() - lrInfo := lockRequesterInfo{ - Writer: false, - Source: args.Source, - UID: args.UID, - Timestamp: UTCNow(), - TimeLastCheck: UTCNow(), - } - resource := args.Resources[0] - if lri, ok := l.lockMap[resource]; ok { - if reply = !isWriteLock(lri); reply { - // Unless there is a write lock - l.lockMap[resource] = append(l.lockMap[resource], lrInfo) +func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + l.mutex.Lock() + defer l.mutex.Unlock() + lrInfo := lockRequesterInfo{ + Writer: false, + Source: args.Source, + UID: args.UID, + Timestamp: UTCNow(), + TimeLastCheck: UTCNow(), } - } else { - // No locks held on the given name, so claim (first) read lock - l.lockMap[resource] = []lockRequesterInfo{lrInfo} - reply = true + resource := args.Resources[0] + if lri, ok := l.lockMap[resource]; ok { + if reply = !isWriteLock(lri); reply { + // Unless there is a write lock + l.lockMap[resource] = append(l.lockMap[resource], lrInfo) + } + } else { + // No locks held on the given name, so claim (first) read lock + l.lockMap[resource] = []lockRequesterInfo{lrInfo} + reply = true + } + return reply, nil } - return reply, nil } func (l *localLocker) RUnlock(args dsync.LockArgs) (reply bool, err error) { @@ -202,22 +213,27 @@ func (l *localLocker) IsOnline() bool { return true } -func (l *localLocker) Expired(args dsync.LockArgs) (expired bool, err error) { - l.mutex.Lock() - defer l.mutex.Unlock() +func (l *localLocker) Expired(ctx context.Context, args dsync.LockArgs) (expired bool, err error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + l.mutex.Lock() + defer l.mutex.Unlock() - // Lock found, proceed to verify if belongs to given uid. - for _, resource := range args.Resources { - if lri, ok := l.lockMap[resource]; ok { - // Check whether uid is still active - for _, entry := range lri { - if entry.UID == args.UID { - return false, nil + // Lock found, proceed to verify if belongs to given uid. + for _, resource := range args.Resources { + if lri, ok := l.lockMap[resource]; ok { + // Check whether uid is still active + for _, entry := range lri { + if entry.UID == args.UID { + return false, nil + } } } } + return true, nil } - return true, nil } // Similar to removeEntry but only removes an entry only if the lock entry exists in map. diff --git a/cmd/lock-rest-client.go b/cmd/lock-rest-client.go index 8e61caf71..02421bafd 100644 --- a/cmd/lock-rest-client.go +++ b/cmd/lock-rest-client.go @@ -58,7 +58,7 @@ func (client *lockRESTClient) String() string { // Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected // permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() // after verifying format.json -func (client *lockRESTClient) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { +func (client *lockRESTClient) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { if values == nil { values = make(url.Values) } @@ -83,7 +83,7 @@ func (client *lockRESTClient) Close() error { } // restCall makes a call to the lock REST server. -func (client *lockRESTClient) restCall(call string, args dsync.LockArgs) (reply bool, err error) { +func (client *lockRESTClient) restCall(ctx context.Context, call string, args dsync.LockArgs) (reply bool, err error) { values := url.Values{} values.Set(lockRESTUID, args.UID) values.Set(lockRESTSource, args.Source) @@ -92,7 +92,7 @@ func (client *lockRESTClient) restCall(call string, args dsync.LockArgs) (reply buffer.WriteString(resource) buffer.WriteString("\n") } - respBody, err := client.call(call, values, &buffer, -1) + respBody, err := client.callWithContext(ctx, call, values, &buffer, -1) defer http.DrainBody(respBody) switch err { case nil: @@ -105,28 +105,28 @@ func (client *lockRESTClient) restCall(call string, args dsync.LockArgs) (reply } // RLock calls read lock REST API. -func (client *lockRESTClient) RLock(args dsync.LockArgs) (reply bool, err error) { - return client.restCall(lockRESTMethodRLock, args) +func (client *lockRESTClient) RLock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { + return client.restCall(ctx, lockRESTMethodRLock, args) } // Lock calls lock REST API. -func (client *lockRESTClient) Lock(args dsync.LockArgs) (reply bool, err error) { - return client.restCall(lockRESTMethodLock, args) +func (client *lockRESTClient) Lock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { + return client.restCall(ctx, lockRESTMethodLock, args) } // RUnlock calls read unlock REST API. func (client *lockRESTClient) RUnlock(args dsync.LockArgs) (reply bool, err error) { - return client.restCall(lockRESTMethodRUnlock, args) + return client.restCall(context.Background(), lockRESTMethodRUnlock, args) } // Unlock calls write unlock RPC. func (client *lockRESTClient) Unlock(args dsync.LockArgs) (reply bool, err error) { - return client.restCall(lockRESTMethodUnlock, args) + return client.restCall(context.Background(), lockRESTMethodUnlock, args) } // Expired calls expired handler to check if lock args have expired. -func (client *lockRESTClient) Expired(args dsync.LockArgs) (expired bool, err error) { - return client.restCall(lockRESTMethodExpired, args) +func (client *lockRESTClient) Expired(ctx context.Context, args dsync.LockArgs) (expired bool, err error) { + return client.restCall(ctx, lockRESTMethodExpired, args) } func newLockAPI(endpoint Endpoint) dsync.NetLocker { diff --git a/cmd/lock-rest-client_test.go b/cmd/lock-rest-client_test.go index d05659ccd..cd2b7c6d5 100644 --- a/cmd/lock-rest-client_test.go +++ b/cmd/lock-rest-client_test.go @@ -17,6 +17,7 @@ package cmd import ( + "context" "testing" "github.com/minio/minio/pkg/dsync" @@ -34,12 +35,12 @@ func TestLockRESTlient(t *testing.T) { } // Attempt all calls. - _, err = lkClient.RLock(dsync.LockArgs{}) + _, err = lkClient.RLock(context.Background(), dsync.LockArgs{}) if err == nil { t.Fatal("Expected for Rlock to fail") } - _, err = lkClient.Lock(dsync.LockArgs{}) + _, err = lkClient.Lock(context.Background(), dsync.LockArgs{}) if err == nil { t.Fatal("Expected for Lock to fail") } diff --git a/cmd/lock-rest-server.go b/cmd/lock-rest-server.go index 6c9e91910..620115c9a 100644 --- a/cmd/lock-rest-server.go +++ b/cmd/lock-rest-server.go @@ -96,7 +96,7 @@ func (l *lockRESTServer) LockHandler(w http.ResponseWriter, r *http.Request) { return } - success, err := l.ll.Lock(args) + success, err := l.ll.Lock(r.Context(), args) if err == nil && !success { err = errLockConflict } @@ -141,7 +141,7 @@ func (l *lockRESTServer) RLockHandler(w http.ResponseWriter, r *http.Request) { return } - success, err := l.ll.RLock(args) + success, err := l.ll.RLock(r.Context(), args) if err == nil && !success { err = errLockConflict } @@ -185,20 +185,14 @@ func (l *lockRESTServer) ExpiredHandler(w http.ResponseWriter, r *http.Request) return } - l.ll.mutex.Lock() - defer l.ll.mutex.Unlock() - - // Lock found, proceed to verify if belongs to given uid. - for _, resource := range args.Resources { - if lri, ok := l.ll.lockMap[resource]; ok { - // Check whether uid is still active - for _, entry := range lri { - if entry.UID == args.UID { - l.writeErrorResponse(w, errLockNotExpired) - return - } - } - } + expired, err := l.ll.Expired(r.Context(), args) + if err != nil { + l.writeErrorResponse(w, err) + return + } + if !expired { + l.writeErrorResponse(w, errLockNotExpired) + return } } @@ -219,7 +213,10 @@ func getLongLivedLocks(interval time.Duration) map[Endpoint][]nameLockRequesterI for idx := range lriArray { // Check whether enough time has gone by since last check if time.Since(lriArray[idx].TimeLastCheck) >= interval { - rslt = append(rslt, nameLockRequesterInfoPair{name: name, lri: lriArray[idx]}) + rslt = append(rslt, nameLockRequesterInfoPair{ + name: name, + lri: lriArray[idx], + }) lriArray[idx].TimeLastCheck = UTCNow() } } @@ -254,12 +251,15 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error { continue } + ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second) + // Call back to original server verify whether the lock is // still active (based on name & uid) - expired, err := c.Expired(dsync.LockArgs{ + expired, err := c.Expired(ctx, dsync.LockArgs{ UID: nlrip.lri.UID, Resources: []string{nlrip.name}, }) + cancel() if err != nil { nlripsMap[nlrip.name]++ c.Close() diff --git a/cmd/rest/client.go b/cmd/rest/client.go index fffc3c24d..dbbf03d6a 100644 --- a/cmd/rest/client.go +++ b/cmd/rest/client.go @@ -85,10 +85,20 @@ const ( querySep = "?" ) +type restError string + +func (e restError) Error() string { + return string(e) +} + +func (e restError) Timeout() bool { + return true +} + // CallWithContext - make a REST call with context. func (c *Client) CallWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (reply io.ReadCloser, err error) { if !c.IsOnline() { - return nil, &NetworkError{Err: errors.New("remote server offline")} + return nil, &NetworkError{Err: &url.Error{Op: method, URL: c.url.String(), Err: restError("remote server offline")}} } req, err := http.NewRequest(http.MethodPost, c.url.String()+method+querySep+values.Encode(), body) if err != nil { diff --git a/cmd/server-main.go b/cmd/server-main.go index f1cf56596..6f70fe03b 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -229,7 +229,7 @@ func initSafeMode(ctx context.Context, newObject ObjectLayer) (err error) { for range retry.NewTimer(retryCtx) { // let one of the server acquire the lock, if not let them timeout. // which shall be retried again by this loop. - if err = txnLk.GetLock(newDynamicTimeout(1*time.Second, 3*time.Second)); err != nil { + if err = txnLk.GetLock(newDynamicTimeout(3*time.Second, 3*time.Second)); err != nil { logger.Info("Waiting for all MinIO sub-systems to be initialized.. trying to acquire lock") continue } diff --git a/pkg/dsync/drwmutex.go b/pkg/dsync/drwmutex.go index 44683d6d1..15bbec525 100644 --- a/pkg/dsync/drwmutex.go +++ b/pkg/dsync/drwmutex.go @@ -140,8 +140,8 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, timeout time.Duration, id, locks := make([]string, len(restClnts)) // Try to acquire the lock. - success := lock(dm.clnt, &locks, id, source, isReadLock, dm.Names...) - if !success { + locked = lock(retryCtx, dm.clnt, &locks, id, source, isReadLock, dm.Names...) + if !locked { continue } @@ -158,16 +158,16 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, timeout time.Duration, id, } dm.m.Unlock() - return true + return locked } // Failed to acquire the lock on this attempt, incrementally wait // for a longer back-off time and try again afterwards. - return false + return locked } // lock tries to acquire the distributed lock, returning true or false. -func lock(ds *Dsync, locks *[]string, id, source string, isReadLock bool, lockNames ...string) bool { +func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, isReadLock bool, lockNames ...string) bool { restClnts := ds.GetLockersFn() @@ -199,11 +199,11 @@ func lock(ds *Dsync, locks *[]string, id, source string, isReadLock bool, lockNa var locked bool var err error if isReadLock { - if locked, err = c.RLock(args); err != nil { + if locked, err = c.RLock(ctx, args); err != nil { log("dsync: Unable to call RLock failed with %s for %#v at %s\n", err, args, c) } } else { - if locked, err = c.Lock(args); err != nil { + if locked, err = c.Lock(ctx, args); err != nil { log("dsync: Unable to call Lock failed with %s for %#v at %s\n", err, args, c) } } diff --git a/pkg/dsync/rpc-client-impl_test.go b/pkg/dsync/rpc-client-impl_test.go index 042ec563d..92c1ff529 100644 --- a/pkg/dsync/rpc-client-impl_test.go +++ b/pkg/dsync/rpc-client-impl_test.go @@ -17,6 +17,7 @@ package dsync_test import ( + "context" "net/rpc" "sync" @@ -89,12 +90,12 @@ func (rpcClient *ReconnectRPCClient) Call(serviceMethod string, args interface{} return err } -func (rpcClient *ReconnectRPCClient) RLock(args LockArgs) (status bool, err error) { +func (rpcClient *ReconnectRPCClient) RLock(ctx context.Context, args LockArgs) (status bool, err error) { err = rpcClient.Call("Dsync.RLock", &args, &status) return status, err } -func (rpcClient *ReconnectRPCClient) Lock(args LockArgs) (status bool, err error) { +func (rpcClient *ReconnectRPCClient) Lock(ctx context.Context, args LockArgs) (status bool, err error) { err = rpcClient.Call("Dsync.Lock", &args, &status) return status, err } @@ -109,7 +110,7 @@ func (rpcClient *ReconnectRPCClient) Unlock(args LockArgs) (status bool, err err return status, err } -func (rpcClient *ReconnectRPCClient) Expired(args LockArgs) (expired bool, err error) { +func (rpcClient *ReconnectRPCClient) Expired(ctx context.Context, args LockArgs) (expired bool, err error) { err = rpcClient.Call("Dsync.Expired", &args, &expired) return expired, err } diff --git a/pkg/dsync/rpc-client-interface.go b/pkg/dsync/rpc-client-interface.go index aec5187e9..d8c2542d2 100644 --- a/pkg/dsync/rpc-client-interface.go +++ b/pkg/dsync/rpc-client-interface.go @@ -16,6 +16,8 @@ package dsync +import "context" + // LockArgs is minimal required values for any dsync compatible lock operation. type LockArgs struct { // Unique ID of lock/unlock request. @@ -34,12 +36,12 @@ type NetLocker interface { // Do read lock for given LockArgs. It should return // * a boolean to indicate success/failure of the operation // * an error on failure of lock request operation. - RLock(args LockArgs) (bool, error) + RLock(ctx context.Context, args LockArgs) (bool, error) // Do write lock for given LockArgs. It should return // * a boolean to indicate success/failure of the operation // * an error on failure of lock request operation. - Lock(args LockArgs) (bool, error) + Lock(ctx context.Context, args LockArgs) (bool, error) // Do read unlock for given LockArgs. It should return // * a boolean to indicate success/failure of the operation @@ -52,7 +54,7 @@ type NetLocker interface { Unlock(args LockArgs) (bool, error) // Expired returns if current lock args has expired. - Expired(args LockArgs) (bool, error) + Expired(ctx context.Context, args LockArgs) (bool, error) // Returns underlying endpoint of this lock client instance. String() string