diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index b3298a286..4bb0090ad 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -336,23 +336,26 @@ func (a adminAPIHandlers) DataUsageInfoHandler(w http.ResponseWriter, r *http.Re writeSuccessResponseJSON(w, dataUsageInfoJSON) } -func lriToLockEntry(l lockRequesterInfo, resource, server string) *madmin.LockEntry { +func lriToLockEntry(l lockRequesterInfo, resource, server string, rquorum, wquorum int) *madmin.LockEntry { entry := &madmin.LockEntry{ Timestamp: l.Timestamp, Resource: resource, ServerList: []string{server}, Source: l.Source, + Owner: l.Owner, ID: l.UID, } if l.Writer { entry.Type = "WRITE" + entry.Quorum = wquorum } else { entry.Type = "READ" + entry.Quorum = rquorum } return entry } -func topLockEntries(peerLocks []*PeerLocks, count int) madmin.LockEntries { +func topLockEntries(peerLocks []*PeerLocks, count int, rquorum, wquorum int, stale bool) madmin.LockEntries { entryMap := make(map[string]*madmin.LockEntry) for _, peerLock := range peerLocks { if peerLock == nil { @@ -364,20 +367,26 @@ func topLockEntries(peerLocks []*PeerLocks, count int) madmin.LockEntries { if val, ok := entryMap[lockReqInfo.UID]; ok { val.ServerList = append(val.ServerList, peerLock.Addr) } else { - entryMap[lockReqInfo.UID] = lriToLockEntry(lockReqInfo, k, peerLock.Addr) + entryMap[lockReqInfo.UID] = lriToLockEntry(lockReqInfo, k, peerLock.Addr, rquorum, wquorum) } } } } } - var lockEntries = make(madmin.LockEntries, 0, len(entryMap)) + var lockEntries madmin.LockEntries for _, v := range entryMap { - lockEntries = append(lockEntries, *v) + if len(lockEntries) == count { + break + } + if stale { + lockEntries = append(lockEntries, *v) + continue + } + if len(v.ServerList) >= v.Quorum { + lockEntries = append(lockEntries, *v) + } } sort.Sort(lockEntries) - if len(lockEntries) > count { - lockEntries = lockEntries[:count] - } return lockEntries } @@ -407,21 +416,14 @@ func (a adminAPIHandlers) TopLocksHandler(w http.ResponseWriter, r *http.Request return } } + stale := r.URL.Query().Get("stale") == "true" // list also stale locks - peerLocks := globalNotificationSys.GetLocks(ctx) - // Once we have received all the locks currently used from peers - // add the local peer locks list as well. - var getRespLocks GetLocksResp - for _, llocker := range globalLockServers { - getRespLocks = append(getRespLocks, llocker.DupLockMap()) - } + peerLocks := globalNotificationSys.GetLocks(ctx, r) - peerLocks = append(peerLocks, &PeerLocks{ - Addr: getHostName(r), - Locks: getRespLocks, - }) + rquorum := getReadQuorum(objectAPI.SetDriveCount()) + wquorum := getWriteQuorum(objectAPI.SetDriveCount()) - topLocks := topLockEntries(peerLocks, count) + topLocks := topLockEntries(peerLocks, count, rquorum, wquorum, stale) // Marshal API response jsonBytes, err := json.Marshal(topLocks) diff --git a/cmd/data-crawler.go b/cmd/data-crawler.go index ad899d8d0..eea402212 100644 --- a/cmd/data-crawler.go +++ b/cmd/data-crawler.go @@ -21,6 +21,7 @@ import ( "context" "encoding/binary" "errors" + "math/rand" "os" "path" "strconv" @@ -49,10 +50,6 @@ const ( healDeleteDangling = true healFolderIncludeProb = 32 // Include a clean folder one in n cycles. healObjectSelectProb = 512 // Overall probability of a file being scanned; one in n. - - // sleep for an hour after a lock timeout - // before retrying to acquire lock again. - dataCrawlerLeaderLockTimeoutSleepInterval = time.Hour ) var ( @@ -73,10 +70,11 @@ func initDataCrawler(ctx context.Context, objAPI ObjectLayer) { func runDataCrawler(ctx context.Context, objAPI ObjectLayer) { // Make sure only 1 crawler is running on the cluster. locker := objAPI.NewNSLock(ctx, minioMetaBucket, "runDataCrawler.lock") + r := rand.New(rand.NewSource(time.Now().UnixNano())) for { err := locker.GetLock(dataCrawlerLeaderLockTimeout) if err != nil { - time.Sleep(dataCrawlerLeaderLockTimeoutSleepInterval) + time.Sleep(time.Duration(r.Float64() * float64(dataCrawlStartDelay))) continue } break diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index ee317bf72..858b893cf 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -66,6 +66,9 @@ type erasureSets struct { // Distributed locker clients. erasureLockers setsDsyncLockers + // Distributed lock owner (constant per running instance). + erasureLockOwner string + // List of endpoints provided on the command line. endpoints Endpoints @@ -261,11 +264,11 @@ func (s *erasureSets) monitorAndConnectEndpoints(ctx context.Context, monitorInt } } -func (s *erasureSets) GetLockers(setIndex int) func() []dsync.NetLocker { - return func() []dsync.NetLocker { +func (s *erasureSets) GetLockers(setIndex int) func() ([]dsync.NetLocker, string) { + return func() ([]dsync.NetLocker, string) { lockers := make([]dsync.NetLocker, s.setDriveCount) copy(lockers, s.erasureLockers[setIndex]) - return lockers + return lockers, s.erasureLockOwner } } @@ -308,6 +311,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto sets: make([]*erasureObjects, setCount), erasureDisks: make([][]StorageAPI, setCount), erasureLockers: make([][]dsync.NetLocker, setCount), + erasureLockOwner: mustGetUUID(), endpoints: endpoints, endpointStrings: endpointStrings, setCount: setCount, diff --git a/cmd/erasure.go b/cmd/erasure.go index 01cfab5d9..cee84a965 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -52,7 +52,7 @@ type erasureObjects struct { getDisks func() []StorageAPI // getLockers returns list of remote and local lockers. - getLockers func() []dsync.NetLocker + getLockers func() ([]dsync.NetLocker, string) // getEndpoints returns list of endpoint strings belonging this set. // some may be local and some remote. diff --git a/cmd/local-locker.go b/cmd/local-locker.go index 0f8e5068f..ddb41d859 100644 --- a/cmd/local-locker.go +++ b/cmd/local-locker.go @@ -32,6 +32,9 @@ type lockRequesterInfo struct { Timestamp time.Time // Timestamp set at the time of initialization. TimeLastCheck time.Time // Timestamp for last check of validity of lock. Source string // Contains line, function and filename reqesting the lock. + // Owner represents the UUID of the owner who originally requested the lock + // useful in expiry. + Owner string } // isWriteLock returns whether the lock is a write or read lock. @@ -73,34 +76,30 @@ func (l *localLocker) canTakeLock(resources ...string) bool { } func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { - select { - case <-ctx.Done(): - return false, ctx.Err() - default: - l.mutex.Lock() - defer l.mutex.Unlock() + l.mutex.Lock() + defer l.mutex.Unlock() - if !l.canTakeLock(args.Resources...) { - // Not all locks can be taken on resources, - // reject it completely. - return false, nil - } + if !l.canTakeLock(args.Resources...) { + // Not all locks can be taken on resources, + // reject it completely. + return false, nil + } - // No locks held on the all resources, so claim write - // lock on all resources at once. - for _, resource := range args.Resources { - l.lockMap[resource] = []lockRequesterInfo{ - { - Writer: true, - Source: args.Source, - UID: args.UID, - Timestamp: UTCNow(), - TimeLastCheck: UTCNow(), - }, - } + // No locks held on the all resources, so claim write + // lock on all resources at once. + for _, resource := range args.Resources { + l.lockMap[resource] = []lockRequesterInfo{ + { + Writer: true, + Source: args.Source, + Owner: args.Owner, + UID: args.UID, + Timestamp: UTCNow(), + TimeLastCheck: UTCNow(), + }, } - return true, nil } + return true, nil } func (l *localLocker) Unlock(args dsync.LockArgs) (reply bool, err error) { @@ -113,7 +112,7 @@ func (l *localLocker) Unlock(args dsync.LockArgs) (reply bool, err error) { } for _, resource := range args.Resources { lri := l.lockMap[resource] - if !l.removeEntry(resource, args.UID, &lri) { + if !l.removeEntry(resource, args, &lri) { return false, fmt.Errorf("Unlock unable to find corresponding lock for uid: %s on resource %s", args.UID, resource) } } @@ -124,10 +123,10 @@ func (l *localLocker) Unlock(args dsync.LockArgs) (reply bool, err error) { // removeEntry based on the uid of the lock message, removes a single entry from the // lockRequesterInfo array or the whole array from the map (in case of a write lock // or last read lock) -func (l *localLocker) removeEntry(name, uid string, lri *[]lockRequesterInfo) bool { +func (l *localLocker) removeEntry(name string, args dsync.LockArgs, lri *[]lockRequesterInfo) bool { // Find correct entry to remove based on uid. for index, entry := range *lri { - if entry.UID == uid { + if entry.UID == args.UID && entry.Owner == args.Owner { if len(*lri) == 1 { // Remove the write lock. delete(l.lockMap, name) @@ -145,32 +144,28 @@ func (l *localLocker) removeEntry(name, uid string, lri *[]lockRequesterInfo) bo } func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) { - select { - case <-ctx.Done(): - return false, ctx.Err() - default: - l.mutex.Lock() - defer l.mutex.Unlock() - lrInfo := lockRequesterInfo{ - Writer: false, - Source: args.Source, - UID: args.UID, - Timestamp: UTCNow(), - TimeLastCheck: UTCNow(), - } - resource := args.Resources[0] - if lri, ok := l.lockMap[resource]; ok { - if reply = !isWriteLock(lri); reply { - // Unless there is a write lock - l.lockMap[resource] = append(l.lockMap[resource], lrInfo) - } - } else { - // No locks held on the given name, so claim (first) read lock - l.lockMap[resource] = []lockRequesterInfo{lrInfo} - reply = true + l.mutex.Lock() + defer l.mutex.Unlock() + lrInfo := lockRequesterInfo{ + Writer: false, + Source: args.Source, + Owner: args.Owner, + UID: args.UID, + Timestamp: UTCNow(), + TimeLastCheck: UTCNow(), + } + resource := args.Resources[0] + if lri, ok := l.lockMap[resource]; ok { + if reply = !isWriteLock(lri); reply { + // Unless there is a write lock + l.lockMap[resource] = append(l.lockMap[resource], lrInfo) } - return reply, nil + } else { + // No locks held on the given name, so claim (first) read lock + l.lockMap[resource] = []lockRequesterInfo{lrInfo} + reply = true } + return reply, nil } func (l *localLocker) RUnlock(args dsync.LockArgs) (reply bool, err error) { @@ -187,7 +182,7 @@ func (l *localLocker) RUnlock(args dsync.LockArgs) (reply bool, err error) { // A write-lock is held, cannot release a read lock return reply, fmt.Errorf("RUnlock attempted on a write locked entity: %s", resource) } - if !l.removeEntry(resource, args.UID, &lri) { + if !l.removeEntry(resource, args, &lri) { return false, fmt.Errorf("RUnlock unable to find corresponding read lock for uid: %s", args.UID) } return reply, nil @@ -226,7 +221,7 @@ func (l *localLocker) Expired(ctx context.Context, args dsync.LockArgs) (expired if lri, ok := l.lockMap[resource]; ok { // Check whether uid is still active for _, entry := range lri { - if entry.UID == args.UID { + if entry.UID == args.UID && entry.Owner == args.Owner { return false, nil } } @@ -244,7 +239,7 @@ func (l *localLocker) removeEntryIfExists(nlrip nameLockRequesterInfoPair) { // Even if the entry exists, it may not be the same entry which was // considered as expired, so we simply an attempt to remove it if its // not possible there is nothing we need to do. - l.removeEntry(nlrip.name, nlrip.lri.UID, &lri) + l.removeEntry(nlrip.name, dsync.LockArgs{Owner: nlrip.lri.Owner, UID: nlrip.lri.UID}, &lri) } } diff --git a/cmd/lock-rest-client.go b/cmd/lock-rest-client.go index 6fe111f9e..72832ce54 100644 --- a/cmd/lock-rest-client.go +++ b/cmd/lock-rest-client.go @@ -87,6 +87,7 @@ func (client *lockRESTClient) Close() error { func (client *lockRESTClient) restCall(ctx context.Context, call string, args dsync.LockArgs) (reply bool, err error) { values := url.Values{} values.Set(lockRESTUID, args.UID) + values.Set(lockRESTOwner, args.Owner) values.Set(lockRESTSource, args.Source) var buffer bytes.Buffer for _, resource := range args.Resources { diff --git a/cmd/lock-rest-server-common.go b/cmd/lock-rest-server-common.go index 074163d69..0afec2ad2 100644 --- a/cmd/lock-rest-server-common.go +++ b/cmd/lock-rest-server-common.go @@ -34,8 +34,12 @@ const ( lockRESTMethodRUnlock = "/runlock" lockRESTMethodExpired = "/expired" + // lockRESTOwner represents owner UUID + lockRESTOwner = "owner" + // Unique ID of lock/unlock request. lockRESTUID = "uid" + // Source contains the line number, function and file name of the code // on the client node that requested the lock. lockRESTSource = "source" diff --git a/cmd/lock-rest-server-common_test.go b/cmd/lock-rest-server-common_test.go index 150027b2a..57724a9c0 100644 --- a/cmd/lock-rest-server-common_test.go +++ b/cmd/lock-rest-server-common_test.go @@ -21,6 +21,8 @@ import ( "reflect" "sync" "testing" + + "github.com/minio/minio/pkg/dsync" ) // Helper function to create a lock server for testing @@ -53,12 +55,14 @@ func TestLockRpcServerRemoveEntry(t *testing.T) { defer os.RemoveAll(testPath) lockRequesterInfo1 := lockRequesterInfo{ + Owner: "owner", Writer: true, UID: "0123-4567", Timestamp: UTCNow(), TimeLastCheck: UTCNow(), } lockRequesterInfo2 := lockRequesterInfo{ + Owner: "owner", Writer: true, UID: "89ab-cdef", Timestamp: UTCNow(), @@ -73,11 +77,17 @@ func TestLockRpcServerRemoveEntry(t *testing.T) { lri := locker.ll.lockMap["name"] // test unknown uid - if locker.ll.removeEntry("name", "unknown-uid", &lri) { + if locker.ll.removeEntry("name", dsync.LockArgs{ + Owner: "owner", + UID: "unknown-uid", + }, &lri) { t.Errorf("Expected %#v, got %#v", false, true) } - if !locker.ll.removeEntry("name", "0123-4567", &lri) { + if !locker.ll.removeEntry("name", dsync.LockArgs{ + Owner: "owner", + UID: "0123-4567", + }, &lri) { t.Errorf("Expected %#v, got %#v", true, false) } else { gotLri := locker.ll.lockMap["name"] @@ -87,7 +97,10 @@ func TestLockRpcServerRemoveEntry(t *testing.T) { } } - if !locker.ll.removeEntry("name", "89ab-cdef", &lri) { + if !locker.ll.removeEntry("name", dsync.LockArgs{ + Owner: "owner", + UID: "89ab-cdef", + }, &lri) { t.Errorf("Expected %#v, got %#v", true, false) } else { gotLri := locker.ll.lockMap["name"] diff --git a/cmd/lock-rest-server.go b/cmd/lock-rest-server.go index bfc080229..68b6a9203 100644 --- a/cmd/lock-rest-server.go +++ b/cmd/lock-rest-server.go @@ -32,7 +32,7 @@ import ( const ( // Lock maintenance interval. - lockMaintenanceInterval = 1 * time.Minute + lockMaintenanceInterval = 30 * time.Second // Lock validity check interval. lockValidityCheckInterval = 2 * time.Minute @@ -64,6 +64,7 @@ func (l *lockRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool { func getLockArgs(r *http.Request) (args dsync.LockArgs, err error) { args = dsync.LockArgs{ + Owner: r.URL.Query().Get(lockRESTOwner), UID: r.URL.Query().Get(lockRESTUID), Source: r.URL.Query().Get(lockRESTSource), } @@ -246,10 +247,28 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error { return nil } + type nlock struct { + locks int + writer bool + } + + updateNlocks := func(nlripsMap map[string]nlock, name string, writer bool) { + nlk, ok := nlripsMap[name] + if !ok { + nlripsMap[name] = nlock{ + locks: 1, + writer: writer, + } + } else { + nlk.locks++ + nlripsMap[name] = nlk + } + } + // Validate if long lived locks are indeed clean. // Get list of long lived locks to check for staleness. for lendpoint, nlrips := range getLongLivedLocks(interval) { - nlripsMap := make(map[string]int, len(nlrips)) + nlripsMap := make(map[string]nlock, len(nlrips)) for _, nlrip := range nlrips { // Locks are only held on first zone, make sure that // we only look for ownership of locks from endpoints @@ -257,7 +276,7 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error { for _, endpoint := range globalEndpoints[0].Endpoints { c := newLockAPI(endpoint) if !c.IsOnline() { - nlripsMap[nlrip.name]++ + updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer) continue } @@ -266,18 +285,19 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error { // Call back to original server verify whether the lock is // still active (based on name & uid) expired, err := c.Expired(ctx, dsync.LockArgs{ + Owner: nlrip.lri.Owner, UID: nlrip.lri.UID, Resources: []string{nlrip.name}, }) cancel() if err != nil { - nlripsMap[nlrip.name]++ + updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer) c.Close() continue } if !expired { - nlripsMap[nlrip.name]++ + updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer) } // Close the connection regardless of the call response. @@ -285,14 +305,13 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error { } // Read locks we assume quorum for be N/2 success - quorum := objAPI.SetDriveCount() / 2 + quorum := getReadQuorum(objAPI.SetDriveCount()) if nlrip.lri.Writer { - // For write locks we need N/2+1 success - quorum = objAPI.SetDriveCount()/2 + 1 + quorum = getWriteQuorum(objAPI.SetDriveCount()) } // less than the quorum, we have locks expired. - if nlripsMap[nlrip.name] < quorum { + if nlripsMap[nlrip.name].locks < quorum { // The lock is no longer active at server that originated // the lock, attempt to remove the lock. globalLockServers[lendpoint].mutex.Lock() @@ -348,7 +367,6 @@ func startLockMaintenance(ctx context.Context) { // registerLockRESTHandlers - register lock rest router. func registerLockRESTHandlers(router *mux.Router, endpointZones EndpointZones) { - queries := restQueries(lockRESTUID, lockRESTSource) for _, ep := range endpointZones { for _, endpoint := range ep.Endpoints { if !endpoint.IsLocal { @@ -361,11 +379,11 @@ func registerLockRESTHandlers(router *mux.Router, endpointZones EndpointZones) { subrouter := router.PathPrefix(path.Join(lockRESTPrefix, endpoint.Path)).Subrouter() subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodHealth).HandlerFunc(httpTraceHdrs(lockServer.HealthHandler)) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(lockServer.LockHandler)).Queries(queries...) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(lockServer.RLockHandler)).Queries(queries...) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler)).Queries(queries...) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler)).Queries(queries...) - subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodExpired).HandlerFunc(httpTraceAll(lockServer.ExpiredHandler)).Queries(queries...) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodLock).HandlerFunc(httpTraceHdrs(lockServer.LockHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRLock).HandlerFunc(httpTraceHdrs(lockServer.RLockHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodUnlock).HandlerFunc(httpTraceHdrs(lockServer.UnlockHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodRUnlock).HandlerFunc(httpTraceHdrs(lockServer.RUnlockHandler)) + subrouter.Methods(http.MethodPost).Path(lockRESTVersionPrefix + lockRESTMethodExpired).HandlerFunc(httpTraceAll(lockServer.ExpiredHandler)) globalLockServers[endpoint] = lockServer.ll } diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index df1772e58..45e965d9c 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -28,7 +28,6 @@ import ( "fmt" "time" - "github.com/minio/minio/cmd/config/storageclass" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/dsync" "github.com/minio/minio/pkg/lsync" @@ -148,17 +147,8 @@ func (di *distLockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error) lockSource := getSource(2) start := UTCNow() - // Lockers default to standard storage class always, why because - // we always dictate storage tolerance in terms of standard - // storage class be it number of drives or a multiplicative - // of number of nodes, defaulting lockers to this value - // simply means that locking is always similar in behavior - // and effect with erasure coded drive tolerance. - tolerance := globalStorageClass.GetParityForSC(storageclass.STANDARD) - if !di.rwMutex.GetLock(di.ctx, di.opsID, lockSource, dsync.Options{ - Timeout: timeout.Timeout(), - Tolerance: tolerance, + Timeout: timeout.Timeout(), }) { timeout.LogFailure() return OperationTimedOut{} @@ -177,12 +167,8 @@ func (di *distLockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr error lockSource := getSource(2) start := UTCNow() - // Lockers default to standard storage class always. - tolerance := globalStorageClass.GetParityForSC(storageclass.STANDARD) - if !di.rwMutex.GetRLock(di.ctx, di.opsID, lockSource, dsync.Options{ - Timeout: timeout.Timeout(), - Tolerance: tolerance, + Timeout: timeout.Timeout(), }) { timeout.LogFailure() return OperationTimedOut{} @@ -208,11 +194,11 @@ type localLockInstance struct { // NewNSLock - returns a lock instance for a given volume and // path. The returned lockInstance object encapsulates the nsLockMap, // volume, path and operation ID. -func (n *nsLockMap) NewNSLock(ctx context.Context, lockersFn func() []dsync.NetLocker, volume string, paths ...string) RWLocker { +func (n *nsLockMap) NewNSLock(ctx context.Context, lockers func() ([]dsync.NetLocker, string), volume string, paths ...string) RWLocker { opsID := mustGetUUID() if n.isDistErasure { drwmutex := dsync.NewDRWMutex(&dsync.Dsync{ - GetLockersFn: lockersFn, + GetLockers: lockers, }, pathsJoinPrefix(volume, paths...)...) return &distLockInstance{drwmutex, opsID, ctx} } diff --git a/cmd/notification.go b/cmd/notification.go index 48f8c17b5..bd4dfe5c7 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -22,6 +22,7 @@ import ( "encoding/json" "fmt" "io" + "net/http" "net/url" "sort" "strings" @@ -505,34 +506,22 @@ func (sys *NotificationSys) updateBloomFilter(ctx context.Context, current uint6 } // GetLocks - makes GetLocks RPC call on all peers. -func (sys *NotificationSys) GetLocks(ctx context.Context) []*PeerLocks { +func (sys *NotificationSys) GetLocks(ctx context.Context, r *http.Request) []*PeerLocks { locksResp := make([]*PeerLocks, len(sys.peerClients)) g := errgroup.WithNErrs(len(sys.peerClients)) for index, client := range sys.peerClients { - if client == nil { - continue - } index := index g.Go(func() error { - // Try to fetch serverInfo remotely in three attempts. - for i := 0; i < 3; i++ { - serverLocksResp, err := sys.peerClients[index].GetLocks() - if err == nil { - locksResp[index] = &PeerLocks{ - Addr: sys.peerClients[index].host.String(), - Locks: serverLocksResp, - } - return nil - } - - // Last iteration log the error. - if i == 2 { - return err - } - // Wait for one second and no need wait after last attempt. - if i < 2 { - time.Sleep(1 * time.Second) - } + if client == nil { + return nil + } + serverLocksResp, err := sys.peerClients[index].GetLocks() + if err != nil { + return err + } + locksResp[index] = &PeerLocks{ + Addr: sys.peerClients[index].host.String(), + Locks: serverLocksResp, } return nil }, index) @@ -543,6 +532,16 @@ func (sys *NotificationSys) GetLocks(ctx context.Context) []*PeerLocks { ctx := logger.SetReqInfo(ctx, reqInfo) logger.LogOnceIf(ctx, err, sys.peerClients[index].host.String()) } + // Once we have received all the locks currently used from peers + // add the local peer locks list as well. + var getRespLocks GetLocksResp + for _, llocker := range globalLockServers { + getRespLocks = append(getRespLocks, llocker.DupLockMap()) + } + locksResp = append(locksResp, &PeerLocks{ + Addr: getHostName(r), + Locks: getRespLocks, + }) return locksResp } diff --git a/pkg/dsync/drwmutex.go b/pkg/dsync/drwmutex.go index 580d66f30..8731e3537 100644 --- a/pkg/dsync/drwmutex.go +++ b/pkg/dsync/drwmutex.go @@ -71,8 +71,9 @@ func isLocked(uid string) bool { // NewDRWMutex - initializes a new dsync RW mutex. func NewDRWMutex(clnt *Dsync, names ...string) *DRWMutex { + restClnts, _ := clnt.GetLockers() return &DRWMutex{ - writeLocks: make([]string, len(clnt.GetLockersFn())), + writeLocks: make([]string, len(restClnts)), Names: names, clnt: clnt, } @@ -141,28 +142,19 @@ const ( // algorithm until either the lock is acquired successfully or more // time has elapsed than the timeout value. func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadLock bool, opts Options) (locked bool) { - restClnts := dm.clnt.GetLockersFn() + restClnts, _ := dm.clnt.GetLockers() r := rand.New(rand.NewSource(time.Now().UnixNano())) // Create lock array to capture the successful lockers locks := make([]string, len(restClnts)) - cleanLocks := func(locks []string) { - for i := range locks { - locks[i] = "" - } - } - log("lockBlocking %s/%s for %#v: lockType readLock(%t), additional opts: %#v\n", id, source, dm.Names, isReadLock, opts) retryCtx, cancel := context.WithTimeout(ctx, opts.Timeout) defer cancel() for { - // cleanup any older state, re-use the lock slice. - cleanLocks(locks) - select { case <-retryCtx.Done(): log("lockBlocking canceled %s/%s for %#v: lockType readLock(%t), additional opts: %#v\n", id, source, dm.Names, isReadLock, opts) @@ -195,8 +187,11 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, id, source string, isReadL // lock tries to acquire the distributed lock, returning true or false. func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, isReadLock bool, tolerance int, lockNames ...string) bool { + for i := range *locks { + (*locks)[i] = "" + } - restClnts := ds.GetLockersFn() + restClnts, owner := ds.GetLockers() // Tolerance is not set, defaults to half of the locker clients. if tolerance == 0 { @@ -237,6 +232,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is } args := LockArgs{ + Owner: owner, UID: id, Resources: lockNames, Source: source, @@ -293,7 +289,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is done = true // Increment the number of grants received from the buffered channel. i++ - releaseAll(ds, locks, isReadLock, restClnts, lockNames...) + releaseAll(ds, owner, locks, isReadLock, restClnts, lockNames...) } } case <-timeout: @@ -302,7 +298,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is // number of locks to check whether we have quorum or not if !checkQuorumMet(locks, quorum) { log("Quorum not met after timeout\n") - releaseAll(ds, locks, isReadLock, restClnts, lockNames...) + releaseAll(ds, owner, locks, isReadLock, restClnts, lockNames...) } else { log("Quorum met after timeout\n") } @@ -327,6 +323,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is if grantToBeReleased.isLocked() { // release lock sendRelease(ds, restClnts[grantToBeReleased.index], + owner, grantToBeReleased.lockUID, isReadLock, lockNames...) } } @@ -350,10 +347,10 @@ func checkQuorumMet(locks *[]string, quorum int) bool { } // releaseAll releases all locks that are marked as locked -func releaseAll(ds *Dsync, locks *[]string, isReadLock bool, restClnts []NetLocker, lockNames ...string) { +func releaseAll(ds *Dsync, owner string, locks *[]string, isReadLock bool, restClnts []NetLocker, lockNames ...string) { for lock := range restClnts { if isLocked((*locks)[lock]) { - sendRelease(ds, restClnts[lock], (*locks)[lock], isReadLock, lockNames...) + sendRelease(ds, restClnts[lock], owner, (*locks)[lock], isReadLock, lockNames...) (*locks)[lock] = "" } } @@ -364,7 +361,7 @@ func releaseAll(ds *Dsync, locks *[]string, isReadLock bool, restClnts []NetLock // It is a run-time error if dm is not locked on entry to Unlock. func (dm *DRWMutex) Unlock() { - restClnts := dm.clnt.GetLockersFn() + restClnts, owner := dm.clnt.GetLockers() // create temp array on stack locks := make([]string, len(restClnts)) @@ -391,7 +388,7 @@ func (dm *DRWMutex) Unlock() { } isReadLock := false - unlock(dm.clnt, locks, isReadLock, restClnts, dm.Names...) + unlock(dm.clnt, owner, locks, isReadLock, restClnts, dm.Names...) } // RUnlock releases a read lock held on dm. @@ -400,7 +397,7 @@ func (dm *DRWMutex) Unlock() { func (dm *DRWMutex) RUnlock() { // create temp array on stack - restClnts := dm.clnt.GetLockersFn() + restClnts, owner := dm.clnt.GetLockers() locks := make([]string, len(restClnts)) { @@ -416,10 +413,10 @@ func (dm *DRWMutex) RUnlock() { } isReadLock := true - unlock(dm.clnt, locks, isReadLock, restClnts, dm.Names...) + unlock(dm.clnt, owner, locks, isReadLock, restClnts, dm.Names...) } -func unlock(ds *Dsync, locks []string, isReadLock bool, restClnts []NetLocker, names ...string) { +func unlock(ds *Dsync, owner string, locks []string, isReadLock bool, restClnts []NetLocker, names ...string) { // We don't need to synchronously wait until we have released all the locks (or the quorum) // (a subsequent lock will retry automatically in case it would fail to get quorum) @@ -428,19 +425,20 @@ func unlock(ds *Dsync, locks []string, isReadLock bool, restClnts []NetLocker, n if isLocked(locks[index]) { // broadcast lock release to all nodes that granted the lock - sendRelease(ds, c, locks[index], isReadLock, names...) + sendRelease(ds, c, owner, locks[index], isReadLock, names...) } } } // sendRelease sends a release message to a node that previously granted a lock -func sendRelease(ds *Dsync, c NetLocker, uid string, isReadLock bool, names ...string) { +func sendRelease(ds *Dsync, c NetLocker, owner string, uid string, isReadLock bool, names ...string) { if c == nil { log("Unable to call RUnlock failed with %s\n", errors.New("netLocker is offline")) return } args := LockArgs{ + Owner: owner, UID: uid, Resources: names, } diff --git a/pkg/dsync/dsync.go b/pkg/dsync/dsync.go index a4022c7fe..8cae0051f 100644 --- a/pkg/dsync/dsync.go +++ b/pkg/dsync/dsync.go @@ -20,5 +20,5 @@ package dsync // authenticated clients, used to initiate lock REST calls. type Dsync struct { // List of rest client objects, one per lock server. - GetLockersFn func() []NetLocker + GetLockers func() ([]NetLocker, string) } diff --git a/pkg/dsync/dsync_test.go b/pkg/dsync/dsync_test.go index 7d61b4f6c..87450ce35 100644 --- a/pkg/dsync/dsync_test.go +++ b/pkg/dsync/dsync_test.go @@ -31,6 +31,7 @@ import ( "testing" "time" + "github.com/google/uuid" . "github.com/minio/minio/pkg/dsync" ) @@ -78,7 +79,7 @@ func TestMain(m *testing.M) { } ds = &Dsync{ - GetLockersFn: func() []NetLocker { return clnts }, + GetLockers: func() ([]NetLocker, string) { return clnts, uuid.New().String() }, } startRPCServers(nodes) diff --git a/pkg/dsync/rpc-client-interface.go b/pkg/dsync/rpc-client-interface.go index d8c2542d2..881b69d76 100644 --- a/pkg/dsync/rpc-client-interface.go +++ b/pkg/dsync/rpc-client-interface.go @@ -29,6 +29,10 @@ type LockArgs struct { // Source contains the line number, function and file name of the code // on the client node that requested the lock. Source string + + // Owner represents unique ID for this instance, an owner who originally requested + // the locked resource, useful primarily in figuring our stale locks. + Owner string } // NetLocker is dsync compatible locker interface. diff --git a/pkg/madmin/top-commands.go b/pkg/madmin/top-commands.go index c065533b5..80bd47df5 100644 --- a/pkg/madmin/top-commands.go +++ b/pkg/madmin/top-commands.go @@ -36,7 +36,9 @@ type LockEntry struct { Type string `json:"type"` // Type indicates if 'Write' or 'Read' lock Source string `json:"source"` // Source at which lock was granted ServerList []string `json:"serverlist"` // List of servers participating in the lock. + Owner string `json:"owner"` // Owner UUID indicates server owns the lock. ID string `json:"id"` // UID to uniquely identify request of client. + Quorum int `json:"quorum"` // represents quorum number of servers required to hold this lock, used to look for stale locks. } // LockEntries - To sort the locks @@ -54,13 +56,21 @@ func (l LockEntries) Swap(i, j int) { l[i], l[j] = l[j], l[i] } -// TopNLocks - returns the count number of oldest locks currently active on the server. -func (adm *AdminClient) TopNLocks(ctx context.Context, count int) (LockEntries, error) { +// TopLockOpts top lock options +type TopLockOpts struct { + Count int + Stale bool +} + +// TopLocksWithOpts - returns the count number of oldest locks currently active on the server. +// additionally we can also enable `stale` to get stale locks currently present on server. +func (adm *AdminClient) TopLocksWithOpts(ctx context.Context, opts TopLockOpts) (LockEntries, error) { // Execute GET on /minio/admin/v3/top/locks?count=10 // to get the 'count' number of oldest locks currently // active on the server. queryVals := make(url.Values) - queryVals.Set("count", strconv.Itoa(count)) + queryVals.Set("count", strconv.Itoa(opts.Count)) + queryVals.Set("stale", strconv.FormatBool(opts.Stale)) resp, err := adm.executeMethod(ctx, http.MethodGet, requestData{ @@ -89,5 +99,5 @@ func (adm *AdminClient) TopNLocks(ctx context.Context, count int) (LockEntries, // TopLocks - returns top '10' oldest locks currently active on the server. func (adm *AdminClient) TopLocks(ctx context.Context) (LockEntries, error) { - return adm.TopNLocks(ctx, 10) + return adm.TopLocksWithOpts(ctx, TopLockOpts{Count: 10}) }