From 9cdd981ce7de3c9c4ede4ff85cf9a0fb8c798b1f Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 25 Jan 2021 10:01:27 -0800 Subject: [PATCH] fix: expire locks only on participating lockers (#11335) additionally also add a new ForceUnlock API, to allow forcibly unlocking locks if possible. --- cmd/admin-handlers.go | 40 +++++++++ cmd/admin-router.go | 6 +- cmd/admin-server-info.go | 5 +- cmd/erasure-server-pool.go | 5 -- cmd/erasure-sets.go | 21 ----- cmd/local-locker.go | 58 +++++++++---- cmd/lock-rest-client.go | 5 ++ cmd/lock-rest-server-common.go | 15 ++-- cmd/lock-rest-server.go | 131 +++++++++++++++++++----------- cmd/prepare-storage.go | 6 +- cmd/server-main.go | 5 +- pkg/dsync/rpc-client-impl_test.go | 5 ++ pkg/dsync/rpc-client-interface.go | 5 ++ pkg/iam/policy/admin-action.go | 2 + pkg/madmin/top-commands.go | 25 ++++++ 15 files changed, 227 insertions(+), 107 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 798f5bff9..825c39140 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -42,6 +42,7 @@ import ( "github.com/minio/minio/cmd/logger/message/log" "github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/bandwidth" + "github.com/minio/minio/pkg/dsync" "github.com/minio/minio/pkg/handlers" iampolicy "github.com/minio/minio/pkg/iam/policy" "github.com/minio/minio/pkg/madmin" @@ -403,6 +404,45 @@ type PeerLocks struct { Locks map[string][]lockRequesterInfo } +// ForceUnlockHandler force unlocks requested resource +func (a adminAPIHandlers) ForceUnlockHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "ForceUnlock") + + defer logger.AuditLog(w, r, "ForceUnlock", mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.ForceUnlockAdminAction) + if objectAPI == nil { + return + } + + z, ok := objectAPI.(*erasureServerPools) + if !ok { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + + vars := mux.Vars(r) + + var args dsync.LockArgs + lockersMap := make(map[string]dsync.NetLocker) + for _, path := range strings.Split(vars["paths"], ",") { + if path == "" { + continue + } + args.Resources = append(args.Resources, path) + lockers, _ := z.serverPools[0].getHashedSet(path).getLockers() + for _, locker := range lockers { + if locker != nil { + lockersMap[locker.String()] = locker + } + } + } + + for _, locker := range lockersMap { + locker.ForceUnlock(ctx, args) + } +} + // TopLocksHandler Get list of locks in use func (a adminAPIHandlers) TopLocksHandler(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "TopLocks") diff --git a/cmd/admin-router.go b/cmd/admin-router.go index 3ca06cca4..81efe7e7d 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -189,10 +189,12 @@ func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool) httpTraceHdrs(adminAPI.RemoveRemoteTargetHandler)).Queries("bucket", "{bucket:.*}", "arn", "{arn:.*}") } - // -- Top APIs -- - // Top locks if globalIsDistErasure { + // Top locks adminRouter.Methods(http.MethodGet).Path(adminVersion + "/top/locks").HandlerFunc(httpTraceHdrs(adminAPI.TopLocksHandler)) + // Force unlocks paths + adminRouter.Methods(http.MethodPost).Path(adminVersion+"/force-unlock"). + Queries("paths", "{paths:.*}").HandlerFunc(httpTraceHdrs(adminAPI.ForceUnlockHandler)) } // HTTP Trace diff --git a/cmd/admin-server-info.go b/cmd/admin-server-info.go index c6321a032..93e540db2 100644 --- a/cmd/admin-server-info.go +++ b/cmd/admin-server-info.go @@ -18,6 +18,7 @@ package cmd import ( "net/http" + "time" "github.com/minio/minio/pkg/madmin" ) @@ -45,7 +46,7 @@ func getLocalServerProperty(endpointServerPools EndpointServerPools, r *http.Req } _, present := network[nodeName] if !present { - if err := isServerResolvable(endpoint); err == nil { + if err := isServerResolvable(endpoint, time.Second); err == nil { network[nodeName] = "online" } else { network[nodeName] = "offline" @@ -88,7 +89,7 @@ func getLocalDisks(endpointServerPools EndpointServerPools) []madmin.Disk { } _, present := network[nodeName] if !present { - if err := isServerResolvable(endpoint); err == nil { + if err := isServerResolvable(endpoint, time.Second); err == nil { network[nodeName] = "online" } else { network[nodeName] = "offline" diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index c16f62433..a175fdc31 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -34,7 +34,6 @@ import ( "github.com/minio/minio/cmd/config/storageclass" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/color" - "github.com/minio/minio/pkg/dsync" "github.com/minio/minio/pkg/madmin" "github.com/minio/minio/pkg/sync/errgroup" ) @@ -163,10 +162,6 @@ func (z *erasureServerPools) GetDisksID(ids ...string) []StorageAPI { return res } -func (z *erasureServerPools) GetAllLockers() []dsync.NetLocker { - return z.serverPools[0].GetAllLockers() -} - func (z *erasureServerPools) SetDriveCounts() []int { setDriveCounts := make([]int, len(z.serverPools)) for i := range z.serverPools { diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index a8ab88f27..918133de4 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -292,27 +292,6 @@ func (s *erasureSets) monitorAndConnectEndpoints(ctx context.Context, monitorInt } } -// GetAllLockers return a list of all lockers for all sets. -func (s *erasureSets) GetAllLockers() []dsync.NetLocker { - var allLockers []dsync.NetLocker - lockEpSet := set.NewStringSet() - for _, lockers := range s.erasureLockers { - for _, locker := range lockers { - if locker == nil || !locker.IsOnline() { - // Skip any offline lockers. - continue - } - if lockEpSet.Contains(locker.String()) { - // Skip duplicate lockers. - continue - } - lockEpSet.Add(locker.String()) - allLockers = append(allLockers, locker) - } - } - return allLockers -} - func (s *erasureSets) GetLockers(setIndex int) func() ([]dsync.NetLocker, string) { return func() ([]dsync.NetLocker, string) { lockers := make([]dsync.NetLocker, len(s.erasureLockers[setIndex])) diff --git a/cmd/local-locker.go b/cmd/local-locker.go index ff70cd66a..aa123c622 100644 --- a/cmd/local-locker.go +++ b/cmd/local-locker.go @@ -27,11 +27,13 @@ import ( // lockRequesterInfo stores various info from the client for each lock that is requested. type lockRequesterInfo struct { + Name string // name of the resource lock was requested for Writer bool // Bool whether write or read lock. UID string // UID to uniquely identify request of client. Timestamp time.Time // Timestamp set at the time of initialization. TimeLastCheck time.Time // Timestamp for last check of validity of lock. Source string // Contains line, function and filename reqesting the lock. + Group bool // indicates if it was a group lock. // Owner represents the UUID of the owner who originally requested the lock // useful in expiry. Owner string @@ -91,12 +93,14 @@ func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool for _, resource := range args.Resources { l.lockMap[resource] = []lockRequesterInfo{ { + Name: resource, Writer: true, Source: args.Source, Owner: args.Owner, UID: args.UID, Timestamp: UTCNow(), TimeLastCheck: UTCNow(), + Group: len(args.Resources) > 1, Quorum: args.Quorum, }, } @@ -148,7 +152,9 @@ func (l *localLocker) removeEntry(name string, args dsync.LockArgs, lri *[]lockR func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { l.mutex.Lock() defer l.mutex.Unlock() + resource := args.Resources[0] lrInfo := lockRequesterInfo{ + Name: resource, Writer: false, Source: args.Source, Owner: args.Owner, @@ -157,7 +163,6 @@ func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply boo TimeLastCheck: UTCNow(), Quorum: args.Quorum, } - resource := args.Resources[0] if lri, ok := l.lockMap[resource]; ok { if reply = !isWriteLock(lri); reply { // Unless there is a write lock @@ -214,6 +219,23 @@ func (l *localLocker) IsLocal() bool { return true } +func (l *localLocker) ForceUnlock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + l.mutex.Lock() + defer l.mutex.Unlock() + if len(args.UID) != 0 { + return false, fmt.Errorf("ForceUnlock called with non-empty UID: %s", args.UID) + } + for _, resource := range args.Resources { + delete(l.lockMap, resource) // Remove the lock (irrespective of write or read lock) + } + return true, nil + } +} + func (l *localLocker) Expired(ctx context.Context, args dsync.LockArgs) (expired bool, err error) { select { case <-ctx.Done(): @@ -222,38 +244,42 @@ func (l *localLocker) Expired(ctx context.Context, args dsync.LockArgs) (expired l.mutex.Lock() defer l.mutex.Unlock() + resource := args.Resources[0] // expiry check is always per resource. + // Lock found, proceed to verify if belongs to given uid. - for _, resource := range args.Resources { - if lri, ok := l.lockMap[resource]; ok { - // Check whether uid is still active - for _, entry := range lri { - if entry.UID == args.UID && entry.Owner == args.Owner { - if ep, ok := globalRemoteEndpoints[args.Owner]; ok { - if err = isServerResolvable(ep); err != nil { - return true, nil - } - } - return false, nil - } + lri, ok := l.lockMap[resource] + if !ok { + return true, nil + } + + // Check whether uid is still active + for _, entry := range lri { + if entry.UID == args.UID && entry.Owner == args.Owner { + ep := globalRemoteEndpoints[args.Owner] + if !ep.IsLocal { + // check if the owner is online + return isServerResolvable(ep, 250*time.Millisecond) != nil, nil } + return false, nil } } + return true, nil } } // Similar to removeEntry but only removes an entry only if the lock entry exists in map. // Caller must hold 'l.mutex' lock. -func (l *localLocker) removeEntryIfExists(nlrip nameLockRequesterInfoPair) { +func (l *localLocker) removeEntryIfExists(lrip lockRequesterInfo) { l.mutex.Lock() defer l.mutex.Unlock() // Check if entry is still in map (could have been removed altogether by 'concurrent' (R)Unlock of last entry) - if lri, ok := l.lockMap[nlrip.name]; ok { + if lri, ok := l.lockMap[lrip.Name]; ok { // Even if the entry exists, it may not be the same entry which was // considered as expired, so we simply an attempt to remove it if its // not possible there is nothing we need to do. - l.removeEntry(nlrip.name, dsync.LockArgs{Owner: nlrip.lri.Owner, UID: nlrip.lri.UID}, &lri) + l.removeEntry(lrip.Name, dsync.LockArgs{Owner: lrip.Owner, UID: lrip.UID}, &lri) } } diff --git a/cmd/lock-rest-client.go b/cmd/lock-rest-client.go index 1a4747e6c..ca3ed977a 100644 --- a/cmd/lock-rest-client.go +++ b/cmd/lock-rest-client.go @@ -135,6 +135,11 @@ func (client *lockRESTClient) Expired(ctx context.Context, args dsync.LockArgs) return client.restCall(ctx, lockRESTMethodExpired, args) } +// ForceUnlock calls force unlock handler to forcibly unlock an active lock. +func (client *lockRESTClient) ForceUnlock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { + return client.restCall(ctx, lockRESTMethodForceUnlock, args) +} + func newLockAPI(endpoint Endpoint) dsync.NetLocker { if endpoint.IsLocal { return globalLockServer diff --git a/cmd/lock-rest-server-common.go b/cmd/lock-rest-server-common.go index 33eac9b5c..7b10a6111 100644 --- a/cmd/lock-rest-server-common.go +++ b/cmd/lock-rest-server-common.go @@ -21,18 +21,19 @@ import ( ) const ( - lockRESTVersion = "v4" // Add Quorum query param + lockRESTVersion = "v5" // Add Quorum query param lockRESTVersionPrefix = SlashSeparator + lockRESTVersion lockRESTPrefix = minioReservedBucketPath + "/lock" ) const ( - lockRESTMethodHealth = "/health" - lockRESTMethodLock = "/lock" - lockRESTMethodRLock = "/rlock" - lockRESTMethodUnlock = "/unlock" - lockRESTMethodRUnlock = "/runlock" - lockRESTMethodExpired = "/expired" + lockRESTMethodHealth = "/health" + lockRESTMethodLock = "/lock" + lockRESTMethodRLock = "/rlock" + lockRESTMethodUnlock = "/unlock" + lockRESTMethodRUnlock = "/runlock" + lockRESTMethodExpired = "/expired" + lockRESTMethodForceUnlock = "/force-unlock" // lockRESTOwner represents owner UUID lockRESTOwner = "owner" diff --git a/cmd/lock-rest-server.go b/cmd/lock-rest-server.go index b808bfbf3..367589e6f 100644 --- a/cmd/lock-rest-server.go +++ b/cmd/lock-rest-server.go @@ -24,6 +24,7 @@ import ( "net/http" "sort" "strconv" + "sync" "time" "github.com/gorilla/mux" @@ -32,10 +33,10 @@ import ( const ( // Lock maintenance interval. - lockMaintenanceInterval = 30 * time.Second + lockMaintenanceInterval = 10 * time.Second // Lock validity check interval. - lockValidityCheckInterval = 5 * time.Second + lockValidityCheckInterval = 30 * time.Second ) // To abstract a node over network. @@ -98,7 +99,7 @@ func (l *lockRESTServer) HealthHandler(w http.ResponseWriter, r *http.Request) { // LockHandler - Acquires a lock. func (l *lockRESTServer) LockHandler(w http.ResponseWriter, r *http.Request) { if !l.IsValid(w, r) { - l.writeErrorResponse(w, errors.New("Invalid request")) + l.writeErrorResponse(w, errors.New("invalid request")) return } @@ -121,7 +122,7 @@ func (l *lockRESTServer) LockHandler(w http.ResponseWriter, r *http.Request) { // UnlockHandler - releases the acquired lock. func (l *lockRESTServer) UnlockHandler(w http.ResponseWriter, r *http.Request) { if !l.IsValid(w, r) { - l.writeErrorResponse(w, errors.New("Invalid request")) + l.writeErrorResponse(w, errors.New("invalid request")) return } @@ -143,7 +144,7 @@ func (l *lockRESTServer) UnlockHandler(w http.ResponseWriter, r *http.Request) { // LockHandler - Acquires an RLock. func (l *lockRESTServer) RLockHandler(w http.ResponseWriter, r *http.Request) { if !l.IsValid(w, r) { - l.writeErrorResponse(w, errors.New("Invalid request")) + l.writeErrorResponse(w, errors.New("invalid request")) return } @@ -166,7 +167,7 @@ func (l *lockRESTServer) RLockHandler(w http.ResponseWriter, r *http.Request) { // RUnlockHandler - releases the acquired read lock. func (l *lockRESTServer) RUnlockHandler(w http.ResponseWriter, r *http.Request) { if !l.IsValid(w, r) { - l.writeErrorResponse(w, errors.New("Invalid request")) + l.writeErrorResponse(w, errors.New("invalid request")) return } @@ -184,10 +185,29 @@ func (l *lockRESTServer) RUnlockHandler(w http.ResponseWriter, r *http.Request) } } +// ForceUnlockHandler - query expired lock status. +func (l *lockRESTServer) ForceUnlockHandler(w http.ResponseWriter, r *http.Request) { + if !l.IsValid(w, r) { + l.writeErrorResponse(w, errors.New("invalid request")) + return + } + + args, err := getLockArgs(r) + if err != nil { + l.writeErrorResponse(w, err) + return + } + + if _, err = l.ll.ForceUnlock(r.Context(), args); err != nil { + l.writeErrorResponse(w, err) + return + } +} + // ExpiredHandler - query expired lock status. func (l *lockRESTServer) ExpiredHandler(w http.ResponseWriter, r *http.Request) { if !l.IsValid(w, r) { - l.writeErrorResponse(w, errors.New("Invalid request")) + l.writeErrorResponse(w, errors.New("invalid request")) return } @@ -208,31 +228,22 @@ func (l *lockRESTServer) ExpiredHandler(w http.ResponseWriter, r *http.Request) } } -// nameLockRequesterInfoPair is a helper type for lock maintenance -type nameLockRequesterInfoPair struct { - name string - lri lockRequesterInfo -} - // getLongLivedLocks returns locks that are older than a certain time and // have not been 'checked' for validity too soon enough -func getLongLivedLocks(interval time.Duration) []nameLockRequesterInfoPair { - nlrip := []nameLockRequesterInfoPair{} +func getLongLivedLocks(interval time.Duration) []lockRequesterInfo { + lrips := []lockRequesterInfo{} globalLockServer.mutex.Lock() - for name, lriArray := range globalLockServer.lockMap { + for _, lriArray := range globalLockServer.lockMap { for idx := range lriArray { // Check whether enough time has gone by since last check if time.Since(lriArray[idx].TimeLastCheck) >= interval { - nlrip = append(nlrip, nameLockRequesterInfoPair{ - name: name, - lri: lriArray[idx], - }) + lrips = append(lrips, lriArray[idx]) lriArray[idx].TimeLastCheck = UTCNow() } } } globalLockServer.mutex.Unlock() - return nlrip + return lrips } // lockMaintenance loops over locks that have been active for some time and checks back @@ -261,47 +272,68 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error { updateNlocks := func(nlripsMap map[string]nlock, name string, writer bool) { nlk, ok := nlripsMap[name] - if !ok { + if ok { + nlk.locks++ + nlripsMap[name] = nlk + } else { nlripsMap[name] = nlock{ locks: 1, writer: writer, } - } else { - nlk.locks++ - nlripsMap[name] = nlk } } // Validate if long lived locks are indeed clean. // Get list of long lived locks to check for staleness. - nlrips := getLongLivedLocks(interval) - nlripsMap := make(map[string]nlock, len(nlrips)) - for _, nlrip := range nlrips { - for _, c := range z.GetAllLockers() { - ctx, cancel := context.WithTimeout(GlobalContext, 5*time.Second) - - // Call back to original server verify whether the lock is - // still active (based on name & uid) - expired, err := c.Expired(ctx, dsync.LockArgs{ - Owner: nlrip.lri.Owner, - UID: nlrip.lri.UID, - Resources: []string{nlrip.name}, - }) - cancel() - if err != nil { - updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer) - continue - } - - if !expired { - updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer) - } + lrips := getLongLivedLocks(interval) + lripsMap := make(map[string]nlock, len(lrips)) + var mutex sync.Mutex // mutex for lripsMap updates + for _, lrip := range lrips { + // fetch the lockers participated in handing + // out locks for `nlrip.name` + var lockers []dsync.NetLocker + if lrip.Group { + lockers, _ = z.serverPools[0].getHashedSet("").getLockers() + } else { + lockers, _ = z.serverPools[0].getHashedSet(lrip.Name).getLockers() + } + var wg sync.WaitGroup + wg.Add(len(lockers)) + for _, c := range lockers { + go func(lrip lockRequesterInfo, c dsync.NetLocker) { + defer wg.Done() + ctx, cancel := context.WithTimeout(GlobalContext, 3*time.Second) + + // Call back to all participating servers, verify + // if each of those servers think lock is still + // active, if not expire it. + expired, err := c.Expired(ctx, dsync.LockArgs{ + Owner: lrip.Owner, + UID: lrip.UID, + Resources: []string{lrip.Name}, + }) + cancel() + + if err != nil { + mutex.Lock() + updateNlocks(lripsMap, lrip.Name, lrip.Writer) + mutex.Unlock() + return + } + + if !expired { + mutex.Lock() + updateNlocks(lripsMap, lrip.Name, lrip.Writer) + mutex.Unlock() + } + }(lrip, c) } + wg.Wait() // less than the quorum, we have locks expired. - if nlripsMap[nlrip.name].locks < nlrip.lri.Quorum { + if lripsMap[lrip.Name].locks < lrip.Quorum { // Purge the stale entry if it exists. - globalLockServer.removeEntryIfExists(nlrip) + globalLockServer.removeEntryIfExists(lrip) } } @@ -360,6 +392,7 @@ func registerLockRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler)) subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler)) subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodExpired).HandlerFunc(httpTraceAll(lockServer.ExpiredHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodForceUnlock).HandlerFunc(httpTraceAll(lockServer.ForceUnlockHandler)) globalLockServer = lockServer.ll diff --git a/cmd/prepare-storage.go b/cmd/prepare-storage.go index 3bb2bf328..8d6cdda04 100644 --- a/cmd/prepare-storage.go +++ b/cmd/prepare-storage.go @@ -165,7 +165,7 @@ var errErasureV3ThisEmpty = fmt.Errorf("Erasure format version 3 has This field // isServerResolvable - checks if the endpoint is resolvable // by sending a naked HTTP request with liveness checks. -func isServerResolvable(endpoint Endpoint) error { +func isServerResolvable(endpoint Endpoint, timeout time.Duration) error { serverURL := &url.URL{ Scheme: endpoint.Scheme, Host: endpoint.Host, @@ -199,7 +199,7 @@ func isServerResolvable(endpoint Endpoint) error { } defer httpClient.CloseIdleConnections() - ctx, cancel := context.WithTimeout(GlobalContext, 3*time.Second) + ctx, cancel := context.WithTimeout(GlobalContext, timeout) req, err := http.NewRequestWithContext(ctx, http.MethodGet, serverURL.String(), nil) if err != nil { @@ -244,7 +244,7 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints, return nil, nil, fmt.Errorf("Disk %s: %w", endpoints[i], err) } if retryCount >= 5 { - logger.Info("Unable to connect to %s: %v\n", endpoints[i], isServerResolvable(endpoints[i])) + logger.Info("Unable to connect to %s: %v\n", endpoints[i], isServerResolvable(endpoints[i], time.Second)) } } } diff --git a/cmd/server-main.go b/cmd/server-main.go index 22c1715c6..1e681f03b 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -144,9 +144,10 @@ func serverHandleCmdArgs(ctx *cli.Context) { for _, z := range globalEndpoints { for _, ep := range z.Endpoints { if ep.IsLocal { - continue + globalRemoteEndpoints[GetLocalPeer(globalEndpoints)] = ep + } else { + globalRemoteEndpoints[ep.Host] = ep } - globalRemoteEndpoints[ep.Host] = ep } } logger.FatalIf(err, "Invalid command line arguments") diff --git a/pkg/dsync/rpc-client-impl_test.go b/pkg/dsync/rpc-client-impl_test.go index 99617c0e5..35d045ef8 100644 --- a/pkg/dsync/rpc-client-impl_test.go +++ b/pkg/dsync/rpc-client-impl_test.go @@ -119,6 +119,11 @@ func (rpcClient *ReconnectRPCClient) Expired(ctx context.Context, args LockArgs) return expired, err } +func (rpcClient *ReconnectRPCClient) ForceUnlock(ctx context.Context, args LockArgs) (reply bool, err error) { + err = rpcClient.Call("Dsync.ForceUnlock", &args, &reply) + return reply, err +} + func (rpcClient *ReconnectRPCClient) String() string { return "http://" + rpcClient.addr + "/" + rpcClient.endpoint } diff --git a/pkg/dsync/rpc-client-interface.go b/pkg/dsync/rpc-client-interface.go index eb96408c3..5ba86dd56 100644 --- a/pkg/dsync/rpc-client-interface.go +++ b/pkg/dsync/rpc-client-interface.go @@ -63,6 +63,11 @@ type NetLocker interface { // Expired returns if current lock args has expired. Expired(ctx context.Context, args LockArgs) (bool, error) + // Unlock (read/write) forcefully for given LockArgs. It should return + // * a boolean to indicate success/failure of the operation + // * an error on failure of unlock request operation. + ForceUnlock(ctx context.Context, args LockArgs) (bool, error) + // Returns underlying endpoint of this lock client instance. String() string diff --git a/pkg/iam/policy/admin-action.go b/pkg/iam/policy/admin-action.go index b0b07ac74..5dc1b9e13 100644 --- a/pkg/iam/policy/admin-action.go +++ b/pkg/iam/policy/admin-action.go @@ -33,6 +33,8 @@ const ( StorageInfoAdminAction = "admin:StorageInfo" // DataUsageInfoAdminAction - allow listing data usage info DataUsageInfoAdminAction = "admin:DataUsageInfo" + // ForceUnlockAdminAction - allow force unlocking locks + ForceUnlockAdminAction = "admin:ForceUnlock" // TopLocksAdminAction - allow listing top locks TopLocksAdminAction = "admin:TopLocksInfo" // ProfilingAdminAction - allow profiling diff --git a/pkg/madmin/top-commands.go b/pkg/madmin/top-commands.go index 319baf985..cc5b430b5 100644 --- a/pkg/madmin/top-commands.go +++ b/pkg/madmin/top-commands.go @@ -24,6 +24,7 @@ import ( "net/http" "net/url" "strconv" + "strings" "time" ) @@ -63,6 +64,30 @@ type TopLockOpts struct { Stale bool } +// ForceUnlock force unlocks input paths... +func (adm *AdminClient) ForceUnlock(ctx context.Context, paths ...string) error { + // Execute POST on /minio/admin/v3/force-unlock + queryVals := make(url.Values) + queryVals.Set("paths", strings.Join(paths, ",")) + resp, err := adm.executeMethod(ctx, + http.MethodPost, + requestData{ + relPath: adminAPIPrefix + "/force-unlock", + queryValues: queryVals, + }, + ) + defer closeResponse(resp) + if err != nil { + return err + } + + if resp.StatusCode != http.StatusOK { + return httpRespToErrorResponse(resp) + } + + return nil +} + // TopLocksWithOpts - returns the count number of oldest locks currently active on the server. // additionally we can also enable `stale` to get stale locks currently present on server. func (adm *AdminClient) TopLocksWithOpts(ctx context.Context, opts TopLockOpts) (LockEntries, error) {