diff --git a/cmd/routers.go b/cmd/routers.go index 97538040c..d2a9f00e3 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -49,6 +49,11 @@ func newObjectLayerFactory(disks, ignoredDisks []string) func() ObjectLayer { if objAPI != nil { return objAPI } + + // Acquire a distributed lock to ensure only one of the nodes + // initializes the format.json. + nsMutex.Lock(minioMetaBucket, formatConfigFile) + defer nsMutex.Unlock(minioMetaBucket, formatConfigFile) objAPI, err = newObjectLayer(disks, ignoredDisks) if err != nil { return nil diff --git a/cmd/rpc-server.go b/cmd/rpc-server.go index b9bdb8c97..5380cafe7 100644 --- a/cmd/rpc-server.go +++ b/cmd/rpc-server.go @@ -92,7 +92,7 @@ func (s *storageServer) ReadAllHandler(arg *ReadFileArgs, reply *[]byte) error { if err != nil { return err } - reply = &buf + *reply = buf return nil } @@ -102,7 +102,7 @@ func (s *storageServer) ReadFileHandler(arg *ReadFileArgs, reply *int64) error { if err != nil { return err } - reply = &n + *reply = n return nil } @@ -160,9 +160,9 @@ func newRPCServer(serverConfig serverCmdConfig) (servers []*storageServer, err e // registerStorageRPCRouter - register storage rpc router. func registerStorageRPCRouters(mux *router.Router, stServers []*storageServer) { - storageRPCServer := rpc.NewServer() // Create a unique route for each disk exported from this node. for _, stServer := range stServers { + storageRPCServer := rpc.NewServer() storageRPCServer.RegisterName("Storage", stServer) // Add minio storage routes. storageRouter := mux.PathPrefix(reservedBucket).Subrouter() diff --git a/cmd/server-main.go b/cmd/server-main.go index da48a067f..fb5a94fd8 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -21,11 +21,13 @@ import ( "net" "net/http" "os" + "path" "strconv" "strings" "time" "github.com/minio/cli" + "github.com/minio/dsync" ) var srvConfig serverCmdConfig @@ -220,6 +222,29 @@ func getPort(address string) int { return portInt } +// Initialize distributed locking only in case of distributed setup. +func initDsyncNodes(disks []string, port int) error { + var isDist bool = false + var dsyncNodes []string + var rpcPaths []string + serverPort := strconv.Itoa(port) + + for _, disk := range disks { + if idx := strings.LastIndex(disk, ":"); idx != -1 { + dsyncNodes = append(dsyncNodes, disk[:idx]+":"+serverPort) + rpcPaths = append(rpcPaths, path.Join(lockRPCPath, disk[idx+1:])) + } + if !isLocalStorage(disk) { + // One or more disks supplied as arguments are remote. + isDist = true + } + } + if isDist { + return dsync.SetNodesWithPath(dsyncNodes, rpcPaths) + } + return nil +} + // serverMain handler called for 'minio server' command. func serverMain(c *cli.Context) { // Check 'server' cli arguments. @@ -245,6 +270,14 @@ func serverMain(c *cli.Context) { // Disks to be used in server init. disks := c.Args() + // Set nodes for dsync + err = initDsyncNodes(disks, port) + fatalIf(err, "Unable to initialize distributed locking") + + // Initialize name space lock. + // FIXME: add logic to switch between distributed and single-node namespace locking. + initNSLock() + // Configure server. srvConfig = serverCmdConfig{ serverAddr: serverAddress, diff --git a/lock-rpc-server.go b/lock-rpc-server.go index 785f3e0bd..4c2ad3742 100644 --- a/lock-rpc-server.go +++ b/lock-rpc-server.go @@ -26,7 +26,7 @@ import ( router "github.com/gorilla/mux" ) -const lockRPCPath = "/lock" +const lockRPCPath = "/minio/lock" type lockServer struct { rpcPath string @@ -37,7 +37,7 @@ type lockServer struct { /// Distributed lock handlers // LockHandler - rpc handler for lock operation. -func (l *lockServer) LockHandler(name *string, reply *bool) error { +func (l *lockServer) Lock(name *string, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() _, ok := l.lockMap[*name] @@ -51,7 +51,7 @@ func (l *lockServer) LockHandler(name *string, reply *bool) error { } // UnlockHandler - rpc handler for unlock operation. -func (l *lockServer) UnlockHandler(name *string, reply *bool) error { +func (l *lockServer) Unlock(name *string, reply *bool) error { l.mutex.Lock() defer l.mutex.Unlock() _, ok := l.lockMap[*name] @@ -84,22 +84,24 @@ func newLockServers(serverConfig serverCmdConfig) (lockServers []*lockServer) { if skipDisks[export] { continue } - if idx := strings.LastIndex(export, ":"); idx != -1 { - export = export[idx+1:] + if isLocalStorage(export) { + if idx := strings.LastIndex(export, ":"); idx != -1 { + export = export[idx+1:] + } + lockServers = append(lockServers, &lockServer{ + rpcPath: export, + mutex: sync.Mutex{}, + lockMap: make(map[string]struct{}), + }) } - lockServers = append(lockServers, &lockServer{ - rpcPath: export, - mutex: sync.Mutex{}, - lockMap: make(map[string]struct{}), - }) } return lockServers } -// registerStorageLockers - register locker rpc handlers for valyala/gorpc library clients +// registerStorageLockers - register locker rpc handlers for net/rpc library clients func registerStorageLockers(mux *router.Router, lockServers []*lockServer) { - lockRPCServer := rpc.NewServer() for _, lockServer := range lockServers { + lockRPCServer := rpc.NewServer() lockRPCServer.RegisterName("Dsync", lockServer) lockRouter := mux.PathPrefix(reservedBucket).Subrouter() lockRouter.Path(path.Join("/lock", lockServer.rpcPath)).Handler(lockRPCServer) diff --git a/vendor/github.com/minio/dsync/dmutex.go b/vendor/github.com/minio/dsync/dmutex.go index 2212598d5..3d64d796d 100644 --- a/vendor/github.com/minio/dsync/dmutex.go +++ b/vendor/github.com/minio/dsync/dmutex.go @@ -21,7 +21,6 @@ import ( "math" "math/rand" "net/rpc" - "strings" "sync" "time" ) @@ -52,7 +51,7 @@ func connectLazy(dm *DMutex) { for i := range dm.clnts { if dm.clnts[i] == nil { // pass in unique path (as required by server.HandleHTTP() - dm.clnts[i], _ = rpc.DialHTTPPath("tcp", nodes[i], rpcPath+"-"+strings.Split(nodes[i], ":")[1]) + dm.clnts[i], _ = rpc.DialHTTPPath("tcp", nodes[i], rpcPaths[i]) } } } diff --git a/vendor/github.com/minio/dsync/dsync.go b/vendor/github.com/minio/dsync/dsync.go index 8977614be..a760578f3 100644 --- a/vendor/github.com/minio/dsync/dsync.go +++ b/vendor/github.com/minio/dsync/dsync.go @@ -28,7 +28,7 @@ const DefaultPath = "/rpc/dsync" var n int var nodes []string -var rpcPath string +var rpcPaths []string func closeClients(clients []*rpc.Client) { for _, clnt := range clients { @@ -36,8 +36,8 @@ func closeClients(clients []*rpc.Client) { } } -// Same as SetNodes, but takes a path argument different from the package-level default. -func SetNodesWithPath(nodeList []string, path string) (err error) { +// Same as SetNodes, but takes a slice of rpc paths as argument different from the package-level default. +func SetNodesWithPath(nodeList []string, paths []string) (err error) { // Validate if number of nodes is within allowable range. if n != 0 { @@ -50,7 +50,8 @@ func SetNodesWithPath(nodeList []string, path string) (err error) { nodes = make([]string, len(nodeList)) copy(nodes, nodeList[:]) - rpcPath = path + rpcPaths = make([]string, len(paths)) + copy(rpcPaths, paths[:]) n = len(nodes) return nil } diff --git a/vendor/vendor.json b/vendor/vendor.json index c29dacc65..add16dc7c 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -98,10 +98,10 @@ "revisionTime": "2015-11-18T20:00:48-08:00" }, { - "checksumSHA1": "KCM0UiuvLA5fPiX5I83/HTklxlI=", + "checksumSHA1": "r1Vf/vQTkMsZrDVORBGAAIlOMP4=", "path": "github.com/minio/dsync", - "revision": "c10eebd6b637bb834d502a6574c53e0ea6c64997", - "revisionTime": "2016-08-05T20:56:13Z" + "revision": "6bfa8c0c1c37959c1bda15bfdae228a986d3cca8", + "revisionTime": "2016-08-07T19:01:27Z" }, { "path": "github.com/minio/go-homedir",