From b52b90412bcfb8caaad6da5eb5cab2cc6ef642b8 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 5 Aug 2019 11:45:30 -0700 Subject: [PATCH] Avoid data-transfer in distributed locking (#8004) --- cmd/iam.go | 25 ++++-- cmd/lock-rest-client.go | 36 +++------ cmd/lock-rest-server-common.go | 18 +++-- cmd/lock-rest-server.go | 144 +++++++-------------------------- 4 files changed, 68 insertions(+), 155 deletions(-) diff --git a/cmd/iam.go b/cmd/iam.go index 177f7c95b..70332e4a9 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -891,9 +891,25 @@ func (sys *IAMSys) Init(objAPI ObjectLayer) error { return errInvalidArgument } - // Migrate IAM configuration - if err := doIAMConfigMigration(objAPI); err != nil { - return err + doneCh := make(chan struct{}) + defer close(doneCh) + + // Migrating IAM needs a retry mechanism for + // the following reasons: + // - Read quorum is lost just after the initialization + // of the object layer. + for range newRetryTimerSimple(doneCh) { + // Migrate IAM configuration + if err := doIAMConfigMigration(objAPI); err != nil { + if err == errDiskNotFound || + strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) || + strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) { + logger.Info("Waiting for IAM subsystem to be initialized..") + continue + } + return err + } + break } if globalEtcdClient != nil { @@ -902,9 +918,6 @@ func (sys *IAMSys) Init(objAPI ObjectLayer) error { defer sys.watchIAMDisk(objAPI) } - doneCh := make(chan struct{}) - defer close(doneCh) - // Initializing IAM needs a retry mechanism for // the following reasons: // - Read quorum is lost just after the initialization diff --git a/cmd/lock-rest-client.go b/cmd/lock-rest-client.go index bdac5daee..4e4cd6633 100644 --- a/cmd/lock-rest-client.go +++ b/cmd/lock-rest-client.go @@ -17,10 +17,8 @@ package cmd import ( - "bytes" "context" "crypto/tls" - "encoding/gob" "errors" "io" "sync" @@ -131,32 +129,16 @@ func (client *lockRESTClient) Close() error { // restCall makes a call to the lock REST server. func (client *lockRESTClient) restCall(call string, args dsync.LockArgs) (reply bool, err error) { - - reader := bytes.NewBuffer(make([]byte, 0, 2048)) - err = gob.NewEncoder(reader).Encode(args) - if err != nil { - return false, err - } - respBody, err := client.call(call, nil, reader, -1) - if err != nil { - return false, err - } - - var resp lockResponse + values := url.Values{} + values.Set(lockRESTUID, args.UID) + values.Set(lockRESTSource, args.Source) + values.Set(lockRESTResource, args.Resource) + values.Set(lockRESTServerAddr, args.ServerAddr) + values.Set(lockRESTServerEndpoint, args.ServiceEndpoint) + + respBody, err := client.call(call, values, nil, -1) defer http.DrainBody(respBody) - err = gob.NewDecoder(respBody).Decode(&resp) - - if err != nil || !resp.Success { - reqInfo := &logger.ReqInfo{} - reqInfo.AppendTags("resource", args.Resource) - reqInfo.AppendTags("serveraddress", args.ServerAddr) - reqInfo.AppendTags("serviceendpoint", args.ServiceEndpoint) - reqInfo.AppendTags("source", args.Source) - reqInfo.AppendTags("uid", args.UID) - ctx := logger.SetReqInfo(context.Background(), reqInfo) - logger.LogIf(ctx, err) - } - return resp.Success, err + return err == nil, err } // RLock calls read lock REST API. diff --git a/cmd/lock-rest-server-common.go b/cmd/lock-rest-server-common.go index ce2d74c56..cb933df09 100644 --- a/cmd/lock-rest-server-common.go +++ b/cmd/lock-rest-server-common.go @@ -21,7 +21,7 @@ import ( "time" ) -const lockRESTVersion = "v1" +const lockRESTVersion = "v2" const lockRESTPath = minioReservedBucketPath + "/lock/" + lockRESTVersion var lockServicePath = path.Join(minioReservedBucketPath, lockServiceSubPath) @@ -33,6 +33,18 @@ const ( lockRESTMethodRUnlock = "runlock" lockRESTMethodForceUnlock = "forceunlock" lockRESTMethodExpired = "expired" + + // Unique ID of lock/unlock request. + lockRESTUID = "uid" + // Source contains the line number, function and file name of the code + // on the client node that requested the lock. + lockRESTSource = "source" + // Resource contains a entity to be locked/unlocked. + lockRESTResource = "resource" + // ServerAddr contains the address of the server who requested lock/unlock of the above resource. + lockRESTServerAddr = "serverAddr" + // ServiceEndpoint contains the network path of above server to do lock/unlock. + lockRESTServerEndpoint = "serverEndpoint" ) // nameLockRequesterInfoPair is a helper type for lock maintenance @@ -41,10 +53,6 @@ type nameLockRequesterInfoPair struct { lri lockRequesterInfo } -type lockResponse struct { - Success bool -} - // Similar to removeEntry but only removes an entry only if the lock entry exists in map. func (l *localLocker) removeEntryIfExists(nlrip nameLockRequesterInfoPair) { // Check if entry is still in map (could have been removed altogether by 'concurrent' (R)Unlock of last entry) diff --git a/cmd/lock-rest-server.go b/cmd/lock-rest-server.go index 1ce80e34b..261f1ea0a 100644 --- a/cmd/lock-rest-server.go +++ b/cmd/lock-rest-server.go @@ -18,7 +18,6 @@ package cmd import ( "context" - "encoding/gob" "errors" "math/rand" "net/http" @@ -60,6 +59,15 @@ func (l *lockRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool { return true } +func getLockArgs(r *http.Request) dsync.LockArgs { + return dsync.LockArgs{ + UID: r.URL.Query().Get(lockRESTUID), + Resource: r.URL.Query().Get(lockRESTResource), + ServerAddr: r.URL.Query().Get(lockRESTServerAddr), + ServiceEndpoint: r.URL.Query().Get(lockRESTServerEndpoint), + } +} + // LockHandler - Acquires a lock. func (l *lockRESTServer) LockHandler(w http.ResponseWriter, r *http.Request) { if !l.IsValid(w, r) { @@ -67,28 +75,10 @@ func (l *lockRESTServer) LockHandler(w http.ResponseWriter, r *http.Request) { return } - ctx := newContext(r, w, "Lock") - - var lockArgs dsync.LockArgs - if r.ContentLength < 0 { - l.writeErrorResponse(w, errInvalidArgument) - return - } - - err := gob.NewDecoder(r.Body).Decode(&lockArgs) - if err != nil { + if _, err := l.ll.Lock(getLockArgs(r)); err != nil { l.writeErrorResponse(w, err) return } - - success, err := l.ll.Lock(lockArgs) - if err != nil { - l.writeErrorResponse(w, err) - return - } - resp := lockResponse{Success: success} - logger.LogIf(ctx, gob.NewEncoder(w).Encode(resp)) - w.(http.Flusher).Flush() } // UnlockHandler - releases the acquired lock. @@ -98,28 +88,10 @@ func (l *lockRESTServer) UnlockHandler(w http.ResponseWriter, r *http.Request) { return } - ctx := newContext(r, w, "Unlock") - - var lockArgs dsync.LockArgs - if r.ContentLength < 0 { - l.writeErrorResponse(w, errInvalidArgument) - return - } - - err := gob.NewDecoder(r.Body).Decode(&lockArgs) - if err != nil { - l.writeErrorResponse(w, err) - return - } - - success, err := l.ll.Unlock(lockArgs) - if err != nil { + if _, err := l.ll.Unlock(getLockArgs(r)); err != nil { l.writeErrorResponse(w, err) return } - resp := lockResponse{Success: success} - logger.LogIf(ctx, gob.NewEncoder(w).Encode(resp)) - w.(http.Flusher).Flush() } // LockHandler - Acquires an RLock. @@ -129,27 +101,10 @@ func (l *lockRESTServer) RLockHandler(w http.ResponseWriter, r *http.Request) { return } - ctx := newContext(r, w, "RLock") - var lockArgs dsync.LockArgs - if r.ContentLength < 0 { - l.writeErrorResponse(w, errInvalidArgument) - return - } - - err := gob.NewDecoder(r.Body).Decode(&lockArgs) - if err != nil { - l.writeErrorResponse(w, err) - return - } - - success, err := l.ll.RLock(lockArgs) - if err != nil { + if _, err := l.ll.RLock(getLockArgs(r)); err != nil { l.writeErrorResponse(w, err) return } - resp := lockResponse{Success: success} - logger.LogIf(ctx, gob.NewEncoder(w).Encode(resp)) - w.(http.Flusher).Flush() } // RUnlockHandler - releases the acquired read lock. @@ -159,27 +114,10 @@ func (l *lockRESTServer) RUnlockHandler(w http.ResponseWriter, r *http.Request) return } - ctx := newContext(r, w, "RUnlock") - var lockArgs dsync.LockArgs - if r.ContentLength < 0 { - l.writeErrorResponse(w, errInvalidArgument) - return - } - - err := gob.NewDecoder(r.Body).Decode(&lockArgs) - if err != nil { + if _, err := l.ll.RUnlock(getLockArgs(r)); err != nil { l.writeErrorResponse(w, err) return } - - success, err := l.ll.RUnlock(lockArgs) - if err != nil { - l.writeErrorResponse(w, err) - return - } - resp := lockResponse{Success: success} - logger.LogIf(ctx, gob.NewEncoder(w).Encode(resp)) - w.(http.Flusher).Flush() } // ForceUnlockHandler - force releases the acquired lock. @@ -189,28 +127,10 @@ func (l *lockRESTServer) ForceUnlockHandler(w http.ResponseWriter, r *http.Reque return } - ctx := newContext(r, w, "ForceUnlock") - - var lockArgs dsync.LockArgs - if r.ContentLength < 0 { - l.writeErrorResponse(w, errInvalidArgument) - return - } - - err := gob.NewDecoder(r.Body).Decode(&lockArgs) - if err != nil { + if _, err := l.ll.ForceUnlock(getLockArgs(r)); err != nil { l.writeErrorResponse(w, err) return } - - success, err := l.ll.ForceUnlock(lockArgs) - if err != nil { - l.writeErrorResponse(w, err) - return - } - resp := lockResponse{Success: success} - logger.LogIf(ctx, gob.NewEncoder(w).Encode(resp)) - w.(http.Flusher).Flush() } // ExpiredHandler - query expired lock status. @@ -220,19 +140,8 @@ func (l *lockRESTServer) ExpiredHandler(w http.ResponseWriter, r *http.Request) return } - ctx := newContext(r, w, "Expired") - - var lockArgs dsync.LockArgs - if r.ContentLength < 0 { - l.writeErrorResponse(w, errInvalidArgument) - return - } + lockArgs := getLockArgs(r) - err := gob.NewDecoder(r.Body).Decode(&lockArgs) - if err != nil { - l.writeErrorResponse(w, err) - return - } success := true l.ll.mutex.Lock() defer l.ll.mutex.Unlock() @@ -246,11 +155,10 @@ func (l *lockRESTServer) ExpiredHandler(w http.ResponseWriter, r *http.Request) } } } - // When we get here lock is no longer active due to either dsync.LockArgs.Resource - // being absent from map or uid not found for given dsync.LockArgs.Resource - resp := lockResponse{Success: success} - logger.LogIf(ctx, gob.NewEncoder(w).Encode(resp)) - w.(http.Flusher).Flush() + if !success { + l.writeErrorResponse(w, errors.New("lock already expired")) + return + } } // lockMaintenance loops over locks that have been active for some time and checks back @@ -323,12 +231,14 @@ func startLockMaintenance(lkSrv *lockRESTServer) { // registerLockRESTHandlers - register lock rest router. func registerLockRESTHandlers(router *mux.Router) { subrouter := router.PathPrefix(lockRESTPath).Subrouter() - subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(globalLockServer.LockHandler)) - subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(globalLockServer.RLockHandler)) - subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(globalLockServer.UnlockHandler)) - subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(globalLockServer.RUnlockHandler)) - subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodForceUnlock).HandlerFunc(httpTraceHdrs(globalLockServer.ForceUnlockHandler)) - subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodExpired).HandlerFunc(httpTraceAll(globalLockServer.ExpiredHandler)) + queries := restQueries(lockRESTUID, lockRESTSource, lockRESTResource, lockRESTServerAddr, lockRESTServerEndpoint) + subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(globalLockServer.LockHandler)).Queries(queries...) + subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(globalLockServer.RLockHandler)).Queries(queries...) + subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(globalLockServer.UnlockHandler)).Queries(queries...) + subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(globalLockServer.RUnlockHandler)).Queries(queries...) + subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodForceUnlock).HandlerFunc(httpTraceHdrs(globalLockServer.ForceUnlockHandler)).Queries(queries...) + subrouter.Methods(http.MethodPost).Path("/" + lockRESTMethodExpired).HandlerFunc(httpTraceAll(globalLockServer.ExpiredHandler)).Queries(queries...) + router.NotFoundHandler = http.HandlerFunc(httpTraceAll(notFoundHandler)) // Start lock maintenance from all lock servers.