From b0f3f94163eb375c5207d883ff339dc4d3db335f Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Wed, 10 Aug 2016 13:08:11 -0700 Subject: [PATCH] unify single-node and distributed namespace locking (#2401) --- cmd/namespace-lock.go | 92 +++++++++++++++++++------ cmd/server-main.go | 31 +-------- cmd/test-utils_test.go | 3 +- vendor/github.com/minio/dsync/dmutex.go | 17 +++-- vendor/github.com/minio/dsync/dsync.go | 1 + vendor/vendor.json | 6 +- 6 files changed, 89 insertions(+), 61 deletions(-) diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index b992c89a3..746de5e24 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -18,10 +18,65 @@ package cmd import ( "errors" - "github.com/minio/dsync" + pathpkg "path" + "strconv" + "strings" "sync" + + "github.com/minio/dsync" ) +// Global name space lock. +var nsMutex *nsLockMap + +// Initialize distributed locking only in case of distributed setup. +// Returns if the setup is distributed or not on success. +func initDsyncNodes(disks []string, port int) (bool, error) { + // Holds a bool indicating whether this server instance is part of + // distributed setup or not. + var isDist = false + // List of lock servers that part in the co-operative namespace locking. + var dsyncNodes []string + // Corresponding rpc paths needed for communication over net/rpc + var rpcPaths []string + + // Port to connect to for the lock servers in a distributed setup. + serverPort := strconv.Itoa(port) + + for _, disk := range disks { + if idx := strings.LastIndex(disk, ":"); idx != -1 { + dsyncNodes = append(dsyncNodes, disk[:idx]+":"+serverPort) + rpcPaths = append(rpcPaths, pathpkg.Join(lockRPCPath, disk[idx+1:])) + } + if !isLocalStorage(disk) { + // One or more disks supplied as arguments are not + // attached to the local node. + isDist = true + } + } + // Initialize rpc lock client information only if this instance is a + // distributed setup. + if isDist { + return isDist, dsync.SetNodesWithPath(dsyncNodes, rpcPaths) + } + return isDist, nil +} + +// initNSLock - initialize name space lock map. +func initNSLock(isDist bool) { + nsMutex = &nsLockMap{ + isDist: isDist, + lockMap: make(map[nsParam]*nsLock), + } +} + +// RWLocker - interface that any read-write locking library should implement. +type RWLocker interface { + sync.Locker + RLock() + RUnlock() +} + // nsParam - carries name space resource. type nsParam struct { volume string @@ -30,43 +85,40 @@ type nsParam struct { // nsLock - provides primitives for locking critical namespace regions. type nsLock struct { - *dsync.DRWMutex + RWLocker ref uint } // nsLockMap - namespace lock map, provides primitives to Lock, // Unlock, RLock and RUnlock. type nsLockMap struct { - lockMap map[nsParam]*nsLock - mutex sync.Mutex -} - -// Global name space lock. -var nsMutex *nsLockMap - -// initNSLock - initialize name space lock map. -func initNSLock() { - nsMutex = &nsLockMap{ - lockMap: make(map[nsParam]*nsLock), - } + isDist bool // indicates whether the locking service is part of a distributed setup or not. + lockMap map[nsParam]*nsLock + lockMapMutex sync.Mutex } // Lock the namespace resource. func (n *nsLockMap) lock(volume, path string, readLock bool) { - n.mutex.Lock() + var nsLk *nsLock + n.lockMapMutex.Lock() param := nsParam{volume, path} nsLk, found := n.lockMap[param] if !found { nsLk = &nsLock{ - DRWMutex: dsync.NewDRWMutex(volume + path), - ref: 0, + RWLocker: func() RWLocker { + if n.isDist { + return dsync.NewDRWMutex(pathpkg.Join(volume, path)) + } + return &sync.RWMutex{} + }(), + ref: 0, } n.lockMap[param] = nsLk } nsLk.ref++ // Update ref count here to avoid multiple races. // Unlock map before Locking NS which might block. - n.mutex.Unlock() + n.lockMapMutex.Unlock() // Locking here can block. if readLock { @@ -79,8 +131,8 @@ func (n *nsLockMap) lock(volume, path string, readLock bool) { // Unlock the namespace resource. func (n *nsLockMap) unlock(volume, path string, readLock bool) { // nsLk.Unlock() will not block, hence locking the map for the entire function is fine. - n.mutex.Lock() - defer n.mutex.Unlock() + n.lockMapMutex.Lock() + defer n.lockMapMutex.Unlock() param := nsParam{volume, path} if nsLk, found := n.lockMap[param]; found { diff --git a/cmd/server-main.go b/cmd/server-main.go index fb5a94fd8..354333834 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -21,13 +21,11 @@ import ( "net" "net/http" "os" - "path" "strconv" "strings" "time" "github.com/minio/cli" - "github.com/minio/dsync" ) var srvConfig serverCmdConfig @@ -222,29 +220,6 @@ func getPort(address string) int { return portInt } -// Initialize distributed locking only in case of distributed setup. -func initDsyncNodes(disks []string, port int) error { - var isDist bool = false - var dsyncNodes []string - var rpcPaths []string - serverPort := strconv.Itoa(port) - - for _, disk := range disks { - if idx := strings.LastIndex(disk, ":"); idx != -1 { - dsyncNodes = append(dsyncNodes, disk[:idx]+":"+serverPort) - rpcPaths = append(rpcPaths, path.Join(lockRPCPath, disk[idx+1:])) - } - if !isLocalStorage(disk) { - // One or more disks supplied as arguments are remote. - isDist = true - } - } - if isDist { - return dsync.SetNodesWithPath(dsyncNodes, rpcPaths) - } - return nil -} - // serverMain handler called for 'minio server' command. func serverMain(c *cli.Context) { // Check 'server' cli arguments. @@ -271,12 +246,12 @@ func serverMain(c *cli.Context) { disks := c.Args() // Set nodes for dsync - err = initDsyncNodes(disks, port) + var isDist bool + isDist, err = initDsyncNodes(disks, port) fatalIf(err, "Unable to initialize distributed locking") // Initialize name space lock. - // FIXME: add logic to switch between distributed and single-node namespace locking. - initNSLock() + initNSLock(isDist) // Configure server. srvConfig = serverCmdConfig{ diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index d465762a9..37e11cfe8 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -45,7 +45,8 @@ import ( // Tests should initNSLock only once. func init() { // Initialize name space lock. - initNSLock() + isDist := false + initNSLock(isDist) } // TestErrHandler - Golang Testing.T and Testing.B, and gocheck.C satisfy this interface. diff --git a/vendor/github.com/minio/dsync/dmutex.go b/vendor/github.com/minio/dsync/dmutex.go index 3d64d796d..474d13ff8 100644 --- a/vendor/github.com/minio/dsync/dmutex.go +++ b/vendor/github.com/minio/dsync/dmutex.go @@ -35,7 +35,6 @@ type DMutex struct { m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node // TODO: Decide: create per object or create once for whole class - clnts []*rpc.Client } type Granted struct { @@ -45,13 +44,13 @@ type Granted struct { } func connectLazy(dm *DMutex) { - if dm.clnts == nil { - dm.clnts = make([]*rpc.Client, n) + if clnts == nil { + clnts = make([]*rpc.Client, n) } - for i := range dm.clnts { - if dm.clnts[i] == nil { + for i := range clnts { + if clnts[i] == nil { // pass in unique path (as required by server.HandleHTTP() - dm.clnts[i], _ = rpc.DialHTTPPath("tcp", nodes[i], rpcPaths[i]) + clnts[i], _ = rpc.DialHTTPPath("tcp", nodes[i], rpcPaths[i]) } } } @@ -78,7 +77,7 @@ func (dm *DMutex) Lock() { ids := make([]string, n) // try to acquire the lock - success := lock(dm.clnts, &locks, &ids, dm.Name) + success := lock(clnts, &locks, &ids, dm.Name) if success { // if success, copy array to object dm.locks = make([]bool, n) @@ -118,7 +117,7 @@ func (dm *DMutex) tryLockTimeout() bool { ids := make([]string, n) // try to acquire the lock - success := lock(dm.clnts, &locks, &ids, dm.Name) + success := lock(clnts, &locks, &ids, dm.Name) if success { // if success, copy array to object dm.locks = make([]bool, n) @@ -286,7 +285,7 @@ func (dm *DMutex) Unlock() { // We don't need to wait until we have released all the locks (or the quorum) // (a subsequent lock will retry automatically in case it would fail to get // quorum) - for index, c := range dm.clnts { + for index, c := range clnts { if dm.locks[index] { // broadcast lock release to all nodes the granted the lock diff --git a/vendor/github.com/minio/dsync/dsync.go b/vendor/github.com/minio/dsync/dsync.go index a760578f3..fd44771da 100644 --- a/vendor/github.com/minio/dsync/dsync.go +++ b/vendor/github.com/minio/dsync/dsync.go @@ -29,6 +29,7 @@ const DefaultPath = "/rpc/dsync" var n int var nodes []string var rpcPaths []string +var clnts []*rpc.Client func closeClients(clients []*rpc.Client) { for _, clnt := range clients { diff --git a/vendor/vendor.json b/vendor/vendor.json index add16dc7c..a3893f063 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -98,10 +98,10 @@ "revisionTime": "2015-11-18T20:00:48-08:00" }, { - "checksumSHA1": "r1Vf/vQTkMsZrDVORBGAAIlOMP4=", + "checksumSHA1": "BqEf+ElZXcofLdav5iGfHH93vMY=", "path": "github.com/minio/dsync", - "revision": "6bfa8c0c1c37959c1bda15bfdae228a986d3cca8", - "revisionTime": "2016-08-07T19:01:27Z" + "revision": "9c1a398a7d687901939a31d50f8639b11bb3c5fe", + "revisionTime": "2016-08-10T17:09:05Z" }, { "path": "github.com/minio/go-homedir",