diff --git a/cmd/auth-rpc-client.go b/cmd/auth-rpc-client.go index 7a41e9d0b..778b2a546 100644 --- a/cmd/auth-rpc-client.go +++ b/cmd/auth-rpc-client.go @@ -29,8 +29,13 @@ type GenericReply struct{} // GenericArgs represents any generic RPC arguments. type GenericArgs struct { - Token string // Used to authenticate every RPC call. - Timestamp time.Time // Used to verify if the RPC call was issued between the same Login() and disconnect event pair. + Token string // Used to authenticate every RPC call. + // Used to verify if the RPC call was issued between + // the same Login() and disconnect event pair. + Timestamp time.Time + + // Indicates if args should be sent to remote peers as well. + Remote bool } // SetToken - sets the token to the supplied value. @@ -95,7 +100,6 @@ type AuthRPCClient struct { rpc *RPCClient // reconnect'able rpc client built on top of net/rpc Client isLoggedIn bool // Indicates if the auth client has been logged in and token is valid. token string // JWT based token - tstamp time.Time // Timestamp as received on Login RPC. serverVersion string // Server version exchanged by the RPC. } @@ -141,7 +145,6 @@ func (authClient *AuthRPCClient) Login() error { } // Set token, time stamp as received from a successful login call. authClient.token = reply.Token - authClient.tstamp = reply.Timestamp authClient.serverVersion = reply.ServerVersion authClient.isLoggedIn = true return nil @@ -158,7 +161,7 @@ func (authClient *AuthRPCClient) Call(serviceMethod string, args interface { if err = authClient.Login(); err == nil { // Set token and timestamp before the rpc call. args.SetToken(authClient.token) - args.SetTimestamp(authClient.tstamp) + args.SetTimestamp(time.Now().UTC()) // Call the underlying rpc. err = authClient.rpc.Call(serviceMethod, args, reply) diff --git a/cmd/controller-handlers.go b/cmd/control-handlers.go similarity index 73% rename from cmd/controller-handlers.go rename to cmd/control-handlers.go index 330be1e22..51d87d13f 100644 --- a/cmd/controller-handlers.go +++ b/cmd/control-handlers.go @@ -34,7 +34,7 @@ var errServerTimeMismatch = errors.New("Server times are too far apart.") /// Auth operations // Login - login handler. -func (c *controllerAPIHandlers) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) error { +func (c *controlAPIHandlers) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) error { jwt, err := newJWT(defaultInterNodeJWTExpiry) if err != nil { return err @@ -47,7 +47,7 @@ func (c *controllerAPIHandlers) LoginHandler(args *RPCLoginArgs, reply *RPCLogin return err } reply.Token = token - reply.Timestamp = c.timestamp + reply.Timestamp = time.Now().UTC() reply.ServerVersion = Version return nil } @@ -72,7 +72,7 @@ type HealListReply struct { } // ListObjects - list all objects that needs healing. -func (c *controllerAPIHandlers) ListObjectsHealHandler(args *HealListArgs, reply *HealListReply) error { +func (c *controlAPIHandlers) ListObjectsHealHandler(args *HealListArgs, reply *HealListReply) error { objAPI := c.ObjectAPI() if objAPI == nil { return errServerNotInitialized @@ -108,7 +108,7 @@ type HealObjectArgs struct { type HealObjectReply struct{} // HealObject - heal the object. -func (c *controllerAPIHandlers) HealObjectHandler(args *HealObjectArgs, reply *GenericReply) error { +func (c *controlAPIHandlers) HealObjectHandler(args *HealObjectArgs, reply *GenericReply) error { objAPI := c.ObjectAPI() if objAPI == nil { return errServerNotInitialized @@ -120,7 +120,7 @@ func (c *controllerAPIHandlers) HealObjectHandler(args *HealObjectArgs, reply *G } // HealObject - heal the object. -func (c *controllerAPIHandlers) HealDiskMetadataHandler(args *GenericArgs, reply *GenericReply) error { +func (c *controlAPIHandlers) HealDiskMetadataHandler(args *GenericArgs, reply *GenericReply) error { if !isRPCTokenValid(args.Token) { return errInvalidToken } @@ -143,9 +143,6 @@ type ServiceArgs struct { // to perform. Currently supported signals are // stop, restart and status. Signal serviceSignal - - // Make remote calls. - Remote bool } // ServiceReply - represents service operation success info. @@ -153,24 +150,30 @@ type ServiceReply struct { StorageInfo StorageInfo } -func (c *controllerAPIHandlers) remoteCall(serviceMethod string, args interface { - SetToken(token string) - SetTimestamp(tstamp time.Time) -}, reply interface{}) { +// Remote procedure call, calls serviceMethod with given input args. +func (c *controlAPIHandlers) remoteServiceCall(args *ServiceArgs, replies []*ServiceReply) error { var wg sync.WaitGroup - for index, clnt := range c.RemoteControllers { + var errs = make([]error, len(c.RemoteControls)) + // Send remote call to all neighboring peers to restart minio servers. + for index, clnt := range c.RemoteControls { wg.Add(1) go func(index int, client *AuthRPCClient) { defer wg.Done() - err := client.Call(serviceMethod, args, reply) - errorIf(err, "Unable to initiate %s", serviceMethod) + errs[index] = client.Call("Control.ServiceHandler", args, replies[index]) + errorIf(errs[index], "Unable to initiate control service request to remote node %s", client.Node()) }(index, clnt) } wg.Wait() + for _, err := range errs { + if err != nil { + return err + } + } + return nil } // Service - handler for sending service signals across many servers. -func (c *controllerAPIHandlers) ServiceHandler(args *ServiceArgs, reply *ServiceReply) error { +func (c *controlAPIHandlers) ServiceHandler(args *ServiceArgs, reply *ServiceReply) error { if !isRPCTokenValid(args.Token) { return errInvalidToken } @@ -182,21 +185,24 @@ func (c *controllerAPIHandlers) ServiceHandler(args *ServiceArgs, reply *Service reply.StorageInfo = objAPI.StorageInfo() return nil } + var replies = make([]*ServiceReply, len(c.RemoteControls)) switch args.Signal { case serviceRestart: if args.Remote { // Set remote as false for remote calls. args.Remote = false - // Send remote call to all neighboring peers to restart minio servers. - c.remoteCall("Controller.ServiceHandler", args, reply) + if err := c.remoteServiceCall(args, replies); err != nil { + return err + } } globalServiceSignalCh <- serviceRestart case serviceStop: if args.Remote { // Set remote as false for remote calls. args.Remote = false - // Send remote call to all neighboring peers to stop minio servers. - c.remoteCall("Controller.ServiceHandler", args, reply) + if err := c.remoteServiceCall(args, replies); err != nil { + return err + } } globalServiceSignalCh <- serviceStop } @@ -204,14 +210,13 @@ func (c *controllerAPIHandlers) ServiceHandler(args *ServiceArgs, reply *Service } // LockInfo - RPC control handler for `minio control lock`. Returns the info of the locks held in the system. -func (c *controllerAPIHandlers) LockInfo(arg *GenericArgs, reply *SystemLockState) error { - // obtain the lock state information. - lockInfo, err := generateSystemLockResponse() - // in case of error, return err to the RPC client. - if err != nil { - return err +func (c *controlAPIHandlers) TryInitHandler(args *GenericArgs, reply *GenericReply) error { + if !isRPCTokenValid(args.Token) { + return errInvalidToken } - // The response containing the lock info. - *reply = lockInfo + go func() { + globalWakeupCh <- struct{}{} + }() + *reply = GenericReply{} return nil } diff --git a/cmd/control-heal-main.go b/cmd/control-heal-main.go index 056bf4bea..3e2cca5db 100644 --- a/cmd/control-heal-main.go +++ b/cmd/control-heal-main.go @@ -90,7 +90,7 @@ func healControl(ctx *cli.Context) { secureConn: parsedURL.Scheme == "https", address: parsedURL.Host, path: path.Join(reservedBucket, controlPath), - loginMethod: "Controller.LoginHandler", + loginMethod: "Control.LoginHandler", } client := newAuthClient(authCfg) @@ -98,7 +98,7 @@ func healControl(ctx *cli.Context) { fmt.Print("Checking and healing disk metadata..") args := &GenericArgs{} reply := &GenericReply{} - err = client.Call("Controller.HealDiskMetadataHandler", args, reply) + err = client.Call("Control.HealDiskMetadataHandler", args, reply) fatalIf(err, "Unable to heal disk metadata.") fmt.Println(" ok") @@ -112,7 +112,7 @@ func healControl(ctx *cli.Context) { fmt.Printf("Healing : /%s/%s\n", bucketName, objectName) args := &HealObjectArgs{Bucket: bucketName, Object: objectName} reply := &HealObjectReply{} - err = client.Call("Controller.HealObjectHandler", args, reply) + err = client.Call("Control.HealObjectHandler", args, reply) errorIf(err, "Healing object %s failed.", objectName) return } @@ -129,7 +129,7 @@ func healControl(ctx *cli.Context) { MaxKeys: 1000, } reply := &HealListReply{} - err = client.Call("Controller.ListObjectsHealHandler", args, reply) + err = client.Call("Control.ListObjectsHealHandler", args, reply) fatalIf(err, "Unable to list objects for healing.") // Heal the objects returned in the ListObjects reply. @@ -137,7 +137,7 @@ func healControl(ctx *cli.Context) { fmt.Printf("Healing : /%s/%s\n", bucketName, obj) reply := &GenericReply{} healArgs := &HealObjectArgs{Bucket: bucketName, Object: obj} - err = client.Call("Controller.HealObjectHandler", healArgs, reply) + err = client.Call("Control.HealObjectHandler", healArgs, reply) errorIf(err, "Healing object %s failed.", obj) } diff --git a/cmd/control-lock-main.go b/cmd/control-lock-main.go index a5e0982ed..70bd9b568 100644 --- a/cmd/control-lock-main.go +++ b/cmd/control-lock-main.go @@ -17,129 +17,134 @@ package cmd import ( - "encoding/json" - "fmt" "net/url" "path" "time" "github.com/minio/cli" + "github.com/minio/mc/pkg/console" ) -// SystemLockState - Structure to fill the lock state of entire object storage. -// That is the total locks held, total calls blocked on locks and state of all the locks for the entire system. -type SystemLockState struct { - TotalLocks int64 `json:"totalLocks"` - TotalBlockedLocks int64 `json:"totalBlockedLocks"` // count of operations which are blocked waiting for the lock to be released. - TotalAcquiredLocks int64 `json:"totalAcquiredLocks"` // count of operations which has successfully acquired the lock but hasn't unlocked yet( operation in progress). - LocksInfoPerObject []VolumeLockInfo `json:"locksInfoPerObject"` -} - -// VolumeLockInfo - Structure to contain the lock state info for volume, path pair. -type VolumeLockInfo struct { - Bucket string `json:"bucket"` - Object string `json:"object"` - LocksOnObject int64 `json:"locksOnObject"` // All locks blocked + running for given pair. - LocksAcquiredOnObject int64 `json:"locksAcquiredOnObject"` // count of operations which has successfully acquired the lock but hasn't unlocked yet( operation in progress). - TotalBlockedLocks int64 `json:"locksBlockedOnObject"` // count of operations which are blocked waiting for the lock to be released. - LockDetailsOnObject []OpsLockState `json:"lockDetailsOnObject"` // state information containing state of the locks for all operations on given pair. -} - -// OpsLockState - structure to fill in state information of the lock. -// structure to fill in status information for each operation with given operation ID. -type OpsLockState struct { - OperationID string `json:"opsID"` // string containing operation ID. - LockOrigin string `json:"lockOrigin"` // contant which mentions the operation type (Get Obejct, PutObject...) - LockType string `json:"lockType"` - Status string `json:"status"` // status can be running/ready/blocked. - StatusSince string `json:"statusSince"` // time info of the since how long the status holds true, value in seconds. -} - -// Read entire state of the locks in the system and return. -func generateSystemLockResponse() (SystemLockState, error) { - nsMutex.lockMapMutex.Lock() - defer nsMutex.lockMapMutex.Unlock() - - if nsMutex.debugLockMap == nil { - return SystemLockState{}, errLockNotInitialized - } - - lockState := SystemLockState{} - - lockState.TotalBlockedLocks = nsMutex.blockedCounter - lockState.TotalLocks = nsMutex.globalLockCounter - lockState.TotalAcquiredLocks = nsMutex.runningLockCounter - - for param := range nsMutex.debugLockMap { - volLockInfo := VolumeLockInfo{} - volLockInfo.Bucket = param.volume - volLockInfo.Object = param.path - volLockInfo.TotalBlockedLocks = nsMutex.debugLockMap[param].blocked - volLockInfo.LocksAcquiredOnObject = nsMutex.debugLockMap[param].running - volLockInfo.LocksOnObject = nsMutex.debugLockMap[param].ref - for opsID := range nsMutex.debugLockMap[param].lockInfo { - opsState := OpsLockState{} - opsState.OperationID = opsID - opsState.LockOrigin = nsMutex.debugLockMap[param].lockInfo[opsID].lockOrigin - opsState.LockType = nsMutex.debugLockMap[param].lockInfo[opsID].lockType - opsState.Status = nsMutex.debugLockMap[param].lockInfo[opsID].status - opsState.StatusSince = time.Now().UTC().Sub(nsMutex.debugLockMap[param].lockInfo[opsID].since).String() - - volLockInfo.LockDetailsOnObject = append(volLockInfo.LockDetailsOnObject, opsState) - } - lockState.LocksInfoPerObject = append(lockState.LocksInfoPerObject, volLockInfo) - } - - return lockState, nil +var lockFlags = []cli.Flag{ + cli.StringFlag{ + Name: "older-than", + Usage: "List locks older than given time.", + Value: "24h", + }, + cli.BoolFlag{ + Name: "verbose", + Usage: "Lists more information about locks.", + }, } var lockCmd = cli.Command{ Name: "lock", - Usage: "info about the locks in the node.", + Usage: "Prints current lock information.", Action: lockControl, - Flags: globalFlags, + Flags: append(lockFlags, globalFlags...), CustomHelpTemplate: `NAME: minio control {{.Name}} - {{.Usage}} USAGE: - minio control {{.Name}} http://localhost:9000/ + minio control {{.Name}} [list|clear] http://localhost:9000/ FLAGS: {{range .Flags}}{{.}} {{end}} - EAMPLES: - 1. Get all the info about the blocked/held locks in the node: - $ minio control lock http://localhost:9000/ + 1. List all currently active locks from all nodes. Defaults to list locks held longer than 24hrs. + $ minio control {{.Name}} list http://localhost:9000/ + + 2. List all currently active locks from all nodes. Request locks from older than 1minute. + $ minio control {{.Name}} --older-than=1m list http://localhost:9000/ `, } +// printLockStateVerbose - pretty prints systemLockState, additionally this filters out based on a given duration. +func printLockStateVerbose(lkStateRep map[string]SystemLockState, olderThan time.Duration) { + console.Println("Duration Server LockType LockAcquired Status LockOrigin Resource") + for server, lockState := range lkStateRep { + for _, lockInfo := range lockState.LocksInfoPerObject { + lockedResource := path.Join(lockInfo.Bucket, lockInfo.Object) + for _, lockDetails := range lockInfo.LockDetailsOnObject { + if lockDetails.Duration < olderThan { + continue + } + console.Println(lockDetails.Duration, server, + lockDetails.LockType, lockDetails.Since, + lockDetails.Status, lockDetails.LockOrigin, + lockedResource) + } + } + } +} + +// printLockState - pretty prints systemLockState, additionally this filters out based on a given duration. +func printLockState(lkStateRep map[string]SystemLockState, olderThan time.Duration) { + console.Println("Duration Server LockType Resource") + for server, lockState := range lkStateRep { + for _, lockInfo := range lockState.LocksInfoPerObject { + lockedResource := path.Join(lockInfo.Bucket, lockInfo.Object) + for _, lockDetails := range lockInfo.LockDetailsOnObject { + if lockDetails.Duration < olderThan { + continue + } + console.Println(lockDetails.Duration, server, + lockDetails.LockType, lockedResource) + } + } + } +} + // "minio control lock" entry point. func lockControl(c *cli.Context) { - if len(c.Args()) != 1 { + if !c.Args().Present() && len(c.Args()) != 2 { cli.ShowCommandHelpAndExit(c, "lock", 1) } - parsedURL, err := url.Parse(c.Args()[0]) + parsedURL, err := url.Parse(c.Args().Get(1)) fatalIf(err, "Unable to parse URL.") + // Parse older than string. + olderThanStr := c.String("older-than") + olderThan, err := time.ParseDuration(olderThanStr) + fatalIf(err, "Unable to parse older-than time duration.") + + // Verbose flag. + verbose := c.Bool("verbose") + authCfg := &authConfig{ accessKey: serverConfig.GetCredential().AccessKeyID, secretKey: serverConfig.GetCredential().SecretAccessKey, secureConn: parsedURL.Scheme == "https", address: parsedURL.Host, path: path.Join(reservedBucket, controlPath), - loginMethod: "Controller.LoginHandler", + loginMethod: "Control.LoginHandler", } client := newAuthClient(authCfg) - args := &GenericArgs{} - reply := &SystemLockState{} - err = client.Call("Controller.LockInfo", args, reply) - // logs the error and returns if err != nil. - fatalIf(err, "RPC Controller.LockInfo call failed") - // print the lock info on the console. - b, err := json.MarshalIndent(*reply, "", " ") - fatalIf(err, "Failed to parse the RPC lock info response") - fmt.Print(string(b)) + args := &GenericArgs{ + // This is necessary so that the remotes, + // don't end up sending requests back and forth. + Remote: true, + } + + subCommand := c.Args().Get(0) + switch subCommand { + case "list": + lkStateRep := make(map[string]SystemLockState) + // Request lock info, fetches from all the nodes in the cluster. + err = client.Call("Control.LockInfo", args, &lkStateRep) + fatalIf(err, "Unable to fetch system lockInfo.") + if !verbose { + printLockState(lkStateRep, olderThan) + } else { + printLockStateVerbose(lkStateRep, olderThan) + } + case "clear": + // TODO. Defaults to clearing all locks. + default: + fatalIf(errInvalidArgument, "Unsupported lock control operation %s", c.Args().Get(0)) + } + } diff --git a/cmd/control-lock-main_test.go b/cmd/control-lock-main_test.go new file mode 100644 index 000000000..e2131a60b --- /dev/null +++ b/cmd/control-lock-main_test.go @@ -0,0 +1,46 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "testing" + "time" +) + +// Test print systemState. +func TestPrintLockState(t *testing.T) { + nsMutex.Lock("testbucket", "1.txt", "11-11") + sysLockState, err := getSystemLockState() + if err != nil { + t.Fatal(err) + } + nsMutex.Unlock("testbucket", "1.txt", "11-11") + sysLockStateMap := map[string]SystemLockState{} + sysLockStateMap["bucket"] = sysLockState + + // Print lock state. + printLockState(sysLockStateMap, 0) + + // Print lock state verbose. + printLockStateVerbose(sysLockStateMap, 0) + + // Does not print any lock state in normal print mode. + printLockState(sysLockStateMap, 10*time.Second) + + // Does not print any lock state in debug print mode. + printLockStateVerbose(sysLockStateMap, 10*time.Second) +} diff --git a/cmd/control-mains_test.go b/cmd/control-mains_test.go index d8af71574..ed6a552eb 100644 --- a/cmd/control-mains_test.go +++ b/cmd/control-mains_test.go @@ -49,23 +49,23 @@ func TestControlHealMain(t *testing.T) { // Test to call lockControl() in control-lock-main.go func TestControlLockMain(t *testing.T) { - // create cli app for testing + // Create cli app for testing app := cli.NewApp() app.Commands = []cli.Command{controlCmd} - // start test server + // Start test server testServer := StartTestServer(t, "XL") - // schedule cleanup at the end + // Schedule cleanup at the end defer testServer.Stop() - // fetch http server endpoint + // Fetch http server endpoint url := testServer.Server.URL - // create args to call - args := []string{"./minio", "control", "lock", url} + // Create args to call + args := []string{"./minio", "control", "lock", "list", url} - // run app + // Run app err := app.Run(args) if err != nil { t.Errorf("Control-Lock-Main test failed with - %s", err.Error()) diff --git a/cmd/controller-router.go b/cmd/control-router.go similarity index 54% rename from cmd/controller-router.go rename to cmd/control-router.go index 0afdf883b..8514b1c7d 100644 --- a/cmd/controller-router.go +++ b/cmd/control-router.go @@ -21,7 +21,6 @@ import ( "net/rpc" "path" "strings" - "time" router "github.com/gorilla/mux" "github.com/minio/minio-go/pkg/set" @@ -29,31 +28,39 @@ import ( // Routes paths for "minio control" commands. const ( - controlPath = "/controller" + controlPath = "/control" ) -// Initializes remote controller clients for making remote requests. -func initRemoteControllerClients(srvCmdConfig serverCmdConfig) []*AuthRPCClient { +// Find local node through the command line arguments. +func getLocalAddress(srvCmdConfig serverCmdConfig) string { + if !srvCmdConfig.isDistXL { + return fmt.Sprintf(":%d", globalMinioPort) + } + for _, export := range srvCmdConfig.disks { + // Validates if remote disk is local. + if isLocalStorage(export) { + var host string + if idx := strings.LastIndex(export, ":"); idx != -1 { + host = export[:idx] + } + return fmt.Sprintf("%s:%d", host, globalMinioPort) + } + } + return "" +} + +// Initializes remote control clients for making remote requests. +func initRemoteControlClients(srvCmdConfig serverCmdConfig) []*AuthRPCClient { if !srvCmdConfig.isDistXL { return nil } var newExports []string // Initialize auth rpc clients. exports := srvCmdConfig.disks - ignoredExports := srvCmdConfig.ignoredDisks remoteHosts := set.NewStringSet() - // Initialize ignored disks in a new set. - ignoredSet := set.NewStringSet() - if len(ignoredExports) > 0 { - ignoredSet = set.CreateStringSet(ignoredExports...) - } - var authRPCClients []*AuthRPCClient + var remoteControlClnts []*AuthRPCClient for _, export := range exports { - if ignoredSet.Contains(export) { - // Ignore initializing ignored export. - continue - } // Validates if remote disk is local. if isLocalStorage(export) { continue @@ -68,41 +75,40 @@ func initRemoteControllerClients(srvCmdConfig serverCmdConfig) []*AuthRPCClient remoteHosts.Add(fmt.Sprintf("%s:%d", host, globalMinioPort)) } for host := range remoteHosts { - authRPCClients = append(authRPCClients, newAuthClient(&authConfig{ + remoteControlClnts = append(remoteControlClnts, newAuthClient(&authConfig{ accessKey: serverConfig.GetCredential().AccessKeyID, secretKey: serverConfig.GetCredential().SecretAccessKey, secureConn: isSSL(), address: host, path: path.Join(reservedBucket, controlPath), - loginMethod: "Controller.LoginHandler", + loginMethod: "Control.LoginHandler", })) } - return authRPCClients + return remoteControlClnts } -// Register controller RPC handlers. -func registerControllerRPCRouter(mux *router.Router, srvCmdConfig serverCmdConfig) { - // Initialize controller. - ctrlHandlers := &controllerAPIHandlers{ - ObjectAPI: newObjectLayerFn, - StorageDisks: srvCmdConfig.storageDisks, - timestamp: time.Now().UTC(), - } +// Represents control object which provides handlers for control +// operations on server. +type controlAPIHandlers struct { + ObjectAPI func() ObjectLayer + StorageDisks []StorageAPI + RemoteControls []*AuthRPCClient + LocalNode string +} - // Initializes remote controller clients. - ctrlHandlers.RemoteControllers = initRemoteControllerClients(srvCmdConfig) +// Register control RPC handlers. +func registerControlRPCRouter(mux *router.Router, srvCmdConfig serverCmdConfig) { + // Initialize Control. + ctrlHandlers := &controlAPIHandlers{ + ObjectAPI: newObjectLayerFn, + RemoteControls: initRemoteControlClients(srvCmdConfig), + LocalNode: getLocalAddress(srvCmdConfig), + StorageDisks: srvCmdConfig.storageDisks, + } ctrlRPCServer := rpc.NewServer() - ctrlRPCServer.RegisterName("Controller", ctrlHandlers) + ctrlRPCServer.RegisterName("Control", ctrlHandlers) ctrlRouter := mux.NewRoute().PathPrefix(reservedBucket).Subrouter() ctrlRouter.Path(controlPath).Handler(ctrlRPCServer) } - -// Handler for object healing. -type controllerAPIHandlers struct { - ObjectAPI func() ObjectLayer - StorageDisks []StorageAPI - RemoteControllers []*AuthRPCClient - timestamp time.Time -} diff --git a/cmd/controller-router_test.go b/cmd/control-router_test.go similarity index 61% rename from cmd/controller-router_test.go rename to cmd/control-router_test.go index 24982d943..4e3d4c293 100644 --- a/cmd/controller-router_test.go +++ b/cmd/control-router_test.go @@ -16,10 +16,73 @@ package cmd -import "testing" +import ( + "runtime" + "testing" +) + +// Tests fetch local address. +func TestLocalAddress(t *testing.T) { + if runtime.GOOS == "windows" { + return + } + testCases := []struct { + srvCmdConfig serverCmdConfig + localAddr string + }{ + // Test 1 - local address is found. + { + srvCmdConfig: serverCmdConfig{ + isDistXL: true, + disks: []string{ + "localhost:/mnt/disk1", + "1.1.1.2:/mnt/disk2", + "1.1.2.1:/mnt/disk3", + "1.1.2.2:/mnt/disk4", + }, + }, + localAddr: "localhost:9000", + }, + // Test 2 - local address is everything. + { + srvCmdConfig: serverCmdConfig{ + isDistXL: false, + disks: []string{ + "/mnt/disk1", + "/mnt/disk2", + "/mnt/disk3", + "/mnt/disk4", + }, + }, + localAddr: ":9000", + }, + // Test 3 - local address is not found. + { + srvCmdConfig: serverCmdConfig{ + isDistXL: true, + disks: []string{ + "1.1.1.1:/mnt/disk1", + "1.1.1.2:/mnt/disk2", + "1.1.2.1:/mnt/disk3", + "1.1.2.2:/mnt/disk4", + }, + }, + localAddr: "", + }, + } + + // Validates fetching local address. + for i, testCase := range testCases { + localAddr := getLocalAddress(testCase.srvCmdConfig) + if localAddr != testCase.localAddr { + t.Fatalf("Test %d: Expected %s, got %s", i+1, testCase.localAddr, localAddr) + } + } + +} // Tests initialization of remote controller clients. -func TestInitRemoteControllerClients(t *testing.T) { +func TestInitRemoteControlClients(t *testing.T) { rootPath, err := newTestConfig("us-east-1") if err != nil { t.Fatal("Unable to initialize config", err) @@ -63,27 +126,11 @@ func TestInitRemoteControllerClients(t *testing.T) { }, totalClients: 4, }, - // Test - 4 2 clients allocated with 4 disks with 1 disk ignored. - { - srvCmdConfig: serverCmdConfig{ - isDistXL: true, - disks: []string{ - "10.1.10.1:/mnt/disk1", - "10.1.10.2:/mnt/disk2", - "10.1.10.3:/mnt/disk3", - "10.1.10.4:/mnt/disk4", - }, - ignoredDisks: []string{ - "10.1.10.1:/mnt/disk1", - }, - }, - totalClients: 3, - }, } // Evaluate and validate all test cases. for i, testCase := range testCases { - rclients := initRemoteControllerClients(testCase.srvCmdConfig) + rclients := initRemoteControlClients(testCase.srvCmdConfig) if len(rclients) != testCase.totalClients { t.Errorf("Test %d, Expected %d, got %d RPC clients.", i+1, testCase.totalClients, len(rclients)) } diff --git a/cmd/control-service-main.go b/cmd/control-service-main.go index b1bf51255..3e6fbfb52 100644 --- a/cmd/control-service-main.go +++ b/cmd/control-service-main.go @@ -65,7 +65,7 @@ func serviceControl(c *cli.Context) { case "stop": signal = serviceStop default: - fatalIf(errInvalidArgument, "Unsupported signalling requested %s", c.Args().Get(0)) + fatalIf(errInvalidArgument, "Unrecognized service %s", c.Args().Get(0)) } parsedURL, err := url.Parse(c.Args().Get(1)) @@ -77,18 +77,18 @@ func serviceControl(c *cli.Context) { secureConn: parsedURL.Scheme == "https", address: parsedURL.Host, path: path.Join(reservedBucket, controlPath), - loginMethod: "Controller.LoginHandler", + loginMethod: "Control.LoginHandler", } client := newAuthClient(authCfg) args := &ServiceArgs{ Signal: signal, - // This is necessary so that the remotes, - // don't end up sending requests back and forth. - Remote: true, } + // This is necessary so that the remotes, + // don't end up sending requests back and forth. + args.Remote = true reply := &ServiceReply{} - err = client.Call("Controller.ServiceHandler", args, reply) + err = client.Call("Control.ServiceHandler", args, reply) fatalIf(err, "Service command %s failed for %s", c.Args().Get(0), parsedURL.Host) if signal == serviceStatus { console.Println(getStorageInfoMsg(reply.StorageInfo)) diff --git a/cmd/controller_test.go b/cmd/control_test.go similarity index 77% rename from cmd/controller_test.go rename to cmd/control_test.go index 162872fa3..652688c2c 100644 --- a/cmd/controller_test.go +++ b/cmd/control_test.go @@ -26,7 +26,7 @@ import ( ) // API suite container common to both FS and XL. -type TestRPCControllerSuite struct { +type TestRPCControlSuite struct { serverType string testServer TestServer testAuthConf *authConfig @@ -34,27 +34,27 @@ type TestRPCControllerSuite struct { // Setting up the test suite. // Starting the Test server with temporary FS backend. -func (s *TestRPCControllerSuite) SetUpSuite(c *testing.T) { +func (s *TestRPCControlSuite) SetUpSuite(c *testing.T) { s.testServer = StartTestControlRPCServer(c, s.serverType) s.testAuthConf = &authConfig{ address: s.testServer.Server.Listener.Addr().String(), accessKey: s.testServer.AccessKey, secretKey: s.testServer.SecretKey, path: path.Join(reservedBucket, controlPath), - loginMethod: "Controller.LoginHandler", + loginMethod: "Control.LoginHandler", } } // No longer used with gocheck, but used in explicit teardown code in // each test function. // Called implicitly by "gopkg.in/check.v1" // after all tests are run. -func (s *TestRPCControllerSuite) TearDownSuite(c *testing.T) { +func (s *TestRPCControlSuite) TearDownSuite(c *testing.T) { s.testServer.Stop() } func TestRPCControlLock(t *testing.T) { //setup code - s := &TestRPCControllerSuite{serverType: "XL"} + s := &TestRPCControlSuite{serverType: "XL"} s.SetUpSuite(t) //run test @@ -65,7 +65,7 @@ func TestRPCControlLock(t *testing.T) { } // Tests to validate the correctness of lock instrumentation control RPC end point. -func (s *TestRPCControllerSuite) testRPCControlLock(c *testing.T) { +func (s *TestRPCControlSuite) testRPCControlLock(c *testing.T) { expectedResult := []lockStateCase{ // Test case - 1. // Case where 10 read locks are held. @@ -188,9 +188,9 @@ func (s *TestRPCControllerSuite) testRPCControlLock(c *testing.T) { defer client.Close() args := &GenericArgs{} - reply := &SystemLockState{} + reply := make(map[string]*SystemLockState) // Call the lock instrumentation RPC end point. - err := client.Call("Controller.LockInfo", args, reply) + err := client.Call("Control.LockInfo", args, &reply) if err != nil { c.Errorf("Add: expected no error but got string %q", err.Error()) } @@ -198,11 +198,11 @@ func (s *TestRPCControllerSuite) testRPCControlLock(c *testing.T) { expectedLockStats := expectedResult[0] // verify the actual lock info with the expected one. // verify the existence entry for first read lock (read lock with opsID "0"). - verifyRPCLockInfoResponse(expectedLockStats, *reply, c, 1) + verifyRPCLockInfoResponse(expectedLockStats, reply, c, 1) expectedLockStats = expectedResult[1] // verify the actual lock info with the expected one. // verify the existence entry for last read lock (read lock with opsID "9"). - verifyRPCLockInfoResponse(expectedLockStats, *reply, c, 2) + verifyRPCLockInfoResponse(expectedLockStats, reply, c, 2) // now hold a write lock in a different go routine and it should block since 10 read locks are // still held. @@ -217,13 +217,13 @@ func (s *TestRPCControllerSuite) testRPCControlLock(c *testing.T) { // count of running locks should increase by 1. // Call the RPC control handle to fetch the lock instrumentation info. - reply = &SystemLockState{} + reply = make(map[string]*SystemLockState) // Call the lock instrumentation RPC end point. - err = client.Call("Controller.LockInfo", args, reply) + err = client.Call("Control.LockInfo", args, &reply) if err != nil { c.Errorf("Add: expected no error but got string %q", err.Error()) } - verifyRPCLockInfoResponse(expectedWLockStats, *reply, c, 4) + verifyRPCLockInfoResponse(expectedWLockStats, reply, c, 4) // release the write lock. nsMutex.Unlock("my-bucket", "my-object", strconv.Itoa(10)) @@ -237,13 +237,13 @@ func (s *TestRPCControllerSuite) testRPCControlLock(c *testing.T) { expectedLockStats = expectedResult[2] // Call the RPC control handle to fetch the lock instrumentation info. - reply = &SystemLockState{} + reply = make(map[string]*SystemLockState) // Call the lock instrumentation RPC end point. - err = client.Call("Controller.LockInfo", args, reply) + err = client.Call("Control.LockInfo", args, &reply) if err != nil { c.Errorf("Add: expected no error but got string %q", err.Error()) } - verifyRPCLockInfoResponse(expectedLockStats, *reply, c, 3) + verifyRPCLockInfoResponse(expectedLockStats, reply, c, 3) // Release all the read locks held. // the blocked write lock in the above go routines should get unblocked. for i := 0; i < 10; i++ { @@ -252,98 +252,99 @@ func (s *TestRPCControllerSuite) testRPCControlLock(c *testing.T) { wg.Wait() // Since all the locks are released. There should not be any entry in the lock info. // and all the counters should be set to 0. - reply = &SystemLockState{} + reply = make(map[string]*SystemLockState) // Call the lock instrumentation RPC end point. - err = client.Call("Controller.LockInfo", args, reply) + err = client.Call("Control.LockInfo", args, &reply) if err != nil { c.Errorf("Add: expected no error but got string %q", err.Error()) } - if reply.TotalAcquiredLocks != 0 && reply.TotalLocks != 0 && reply.TotalBlockedLocks != 0 { - c.Fatalf("The counters are not reset properly after all locks are released") - } - if len(reply.LocksInfoPerObject) != 0 { - c.Fatalf("Since all locks are released there shouldn't have been any lock info entry, but found %d", len(reply.LocksInfoPerObject)) + for _, rpcLockInfo := range reply { + if rpcLockInfo.TotalAcquiredLocks != 0 && rpcLockInfo.TotalLocks != 0 && rpcLockInfo.TotalBlockedLocks != 0 { + c.Fatalf("The counters are not reset properly after all locks are released") + } + if len(rpcLockInfo.LocksInfoPerObject) != 0 { + c.Fatalf("Since all locks are released there shouldn't have been any lock info entry, but found %d", len(rpcLockInfo.LocksInfoPerObject)) + } } } -func TestControllerHealDiskMetadataH(t *testing.T) { +func TestControlHealDiskMetadataH(t *testing.T) { //setup code - s := &TestRPCControllerSuite{serverType: "XL"} + s := &TestRPCControlSuite{serverType: "XL"} s.SetUpSuite(t) //run test - s.testControllerHealDiskMetadataH(t) + s.testControlHealDiskMetadataH(t) //teardown code s.TearDownSuite(t) } -// TestControllerHandlerHealDiskMetadata - Registers and call the `HealDiskMetadataHandler`, -// asserts to validate the success. -func (s *TestRPCControllerSuite) testControllerHealDiskMetadataH(c *testing.T) { +// TestControlHandlerHealDiskMetadata - Registers and call the `HealDiskMetadataHandler`, asserts to validate the success. +func (s *TestRPCControlSuite) testControlHealDiskMetadataH(c *testing.T) { // The suite has already started the test RPC server, just send RPC calls. client := newAuthClient(s.testAuthConf) defer client.Close() args := &GenericArgs{} reply := &GenericReply{} - err := client.Call("Controller.HealDiskMetadataHandler", args, reply) + err := client.Call("Control.HealDiskMetadataHandler", args, reply) if err != nil { c.Errorf("Control.HealDiskMetadataH - test failed with %s", err) } } -func TestControllerHealObjectH(t *testing.T) { +func TestControlHealObjectH(t *testing.T) { //setup code - s := &TestRPCControllerSuite{serverType: "XL"} + s := &TestRPCControlSuite{serverType: "XL"} s.SetUpSuite(t) //run test - s.testControllerHealObjectH(t) + s.testControlHealObjectH(t) //teardown code s.TearDownSuite(t) } -func (s *TestRPCControllerSuite) testControllerHealObjectH(t *testing.T) { +func (s *TestRPCControlSuite) testControlHealObjectH(t *testing.T) { client := newAuthClient(s.testAuthConf) defer client.Close() err := newObjectLayerFn().MakeBucket("testbucket") if err != nil { t.Fatalf( - "Controller.HealObjectH - create bucket failed with %s", err) + "Control.HealObjectH - create bucket failed with %s", err) } datum := strings.NewReader("a") _, err = newObjectLayerFn().PutObject("testbucket", "testobject", 1, datum, nil, "") if err != nil { - t.Fatalf("Controller.HealObjectH - put object failed with %s", err) + t.Fatalf("Control.HealObjectH - put object failed with %s", err) } args := &HealObjectArgs{GenericArgs{}, "testbucket", "testobject"} reply := &GenericReply{} - err = client.Call("Controller.HealObjectHandler", args, reply) + err = client.Call("Control.HealObjectHandler", args, reply) if err != nil { - t.Errorf("Controller.HealObjectH - test failed with %s", err) + t.Errorf("Control.HealObjectH - test failed with %s", err) } } -func TestControllerListObjectsHealH(t *testing.T) { +func TestControlListObjectsHealH(t *testing.T) { //setup code - s := &TestRPCControllerSuite{serverType: "XL"} + s := &TestRPCControlSuite{serverType: "XL"} s.SetUpSuite(t) //run test - s.testControllerListObjectsHealH(t) + s.testControlListObjectsHealH(t) //teardown code s.TearDownSuite(t) } -func (s *TestRPCControllerSuite) testControllerListObjectsHealH(t *testing.T) { +func (s *TestRPCControlSuite) testControlListObjectsHealH(t *testing.T) { client := newAuthClient(s.testAuthConf) defer client.Close() @@ -351,13 +352,13 @@ func (s *TestRPCControllerSuite) testControllerListObjectsHealH(t *testing.T) { err := newObjectLayerFn().MakeBucket("testbucket") if err != nil { t.Fatalf( - "Controller.ListObjectsHealH - create bucket failed - %s", err) + "Control.ListObjectsHealH - create bucket failed - %s", err) } r := strings.NewReader("0") _, err = newObjectLayerFn().PutObject("testbucket", "testObj-0", 1, r, nil, "") if err != nil { - t.Fatalf("Controller.ListObjectsHealH - object creation failed - %s", err) + t.Fatalf("Control.ListObjectsHealH - object creation failed - %s", err) } args := &HealListArgs{ @@ -365,9 +366,9 @@ func (s *TestRPCControllerSuite) testControllerListObjectsHealH(t *testing.T) { "", "", 100, } reply := &GenericReply{} - err = client.Call("Controller.ListObjectsHealHandler", args, reply) + err = client.Call("Control.ListObjectsHealHandler", args, reply) if err != nil { - t.Errorf("Controller.ListObjectsHealHandler - test failed - %s", err) + t.Errorf("Control.ListObjectsHealHandler - test failed - %s", err) } } diff --git a/cmd/lock-instrument.go b/cmd/lock-instrument.go index 9938ca0ab..763902a9a 100644 --- a/cmd/lock-instrument.go +++ b/cmd/lock-instrument.go @@ -22,29 +22,45 @@ import ( "time" ) +type statusType string + +const ( + runningStatus statusType = "Running" + readyStatus statusType = "Ready" + blockedStatus statusType = "Blocked" +) + +type lockType string + const ( - debugRLockStr = "RLock" - debugWLockStr = "WLock" + debugRLockStr lockType = "RLock" + debugWLockStr lockType = "WLock" ) -// struct containing information of status (ready/running/blocked) of an operation with given operation ID. +// Struct containing information of status (ready/running/blocked) of an operation with given operation ID. type debugLockInfo struct { - lockType string // "Rlock" or "WLock". - lockOrigin string // contains the trace of the function which invoked the lock, obtained from runtime. - status string // status can be running/ready/blocked. - since time.Time // time info of the since how long the status holds true. + // "RLock" or "WLock". + lType lockType + // Contains the trace of the function which invoked the lock, obtained from runtime. + lockOrigin string + // Status can be running/ready/blocked. + status statusType + // Time info of the since how long the status holds true. + since time.Time } -// debugLockInfo - container for storing locking information for unique copy (volume,path) pair. -// ref variable holds the reference count for locks held for. +// debugLockInfo - container for storing locking information for unique copy +// (volume,path) pair. ref variable holds the reference count for locks held for. // `ref` values helps us understand the n locks held for given pair. -// `running` value helps us understand the total successful locks held (not blocked) for given pair and the operation is under execution. -// `blocked` value helps us understand the total number of operations blocked waiting on locks for given pair. +// `running` value helps us understand the total successful locks held (not blocked) +// for given pair and the operation is under execution. `blocked` +// value helps us understand the total number of operations blocked waiting on +// locks for given pair. type debugLockInfoPerVolumePath struct { - ref int64 // running + blocked operations. - running int64 // count of successful lock acquire and running operations. - blocked int64 // count of number of operations blocked waiting on lock. - lockInfo (map[string]debugLockInfo) // map of [operationID] debugLockInfo{operation, status, since} . + ref int64 // running + blocked operations. + running int64 // count of successful lock acquire and running operations. + blocked int64 // count of number of operations blocked waiting on lock. + lockInfo map[string]debugLockInfo // map of [opsID] debugLockInfo{operation, status, since} . } // returns an instance of debugLockInfo. @@ -62,15 +78,15 @@ func newDebugLockInfoPerVolumePath() *debugLockInfoPerVolumePath { // LockInfoOriginNotFound - While changing the state of the lock info its important that the entry for // lock at a given origin exists, if not `LockInfoOriginNotFound` is returned. type LockInfoOriginNotFound struct { - volume string - path string - operationID string - lockOrigin string + volume string + path string + opsID string + lockOrigin string } func (l LockInfoOriginNotFound) Error() string { - return fmt.Sprintf("No lock state stored for the lock origined at \"%s\", for %s, %s, %s.", - l.lockOrigin, l.volume, l.path, l.operationID) + return fmt.Sprintf("No lock state stored for the lock origined at \"%s\", for %s, %s, %s.", + l.lockOrigin, l.volume, l.path, l.opsID) } // LockInfoVolPathMssing - Error interface. Returned when the info the @@ -86,79 +102,80 @@ func (l LockInfoVolPathMssing) Error() string { // LockInfoOpsIDNotFound - Returned when the lock state info exists, but the entry for // given operation ID doesn't exist. type LockInfoOpsIDNotFound struct { - volume string - path string - operationID string + volume string + path string + opsID string } func (l LockInfoOpsIDNotFound) Error() string { - return fmt.Sprintf("No entry in lock info for %s, %s, %s.", l.operationID, l.volume, l.path) + return fmt.Sprintf("No entry in lock info for %s, %s, %s.", l.opsID, l.volume, l.path) } // LockInfoStateNotBlocked - When an attempt to change the state of the lock form `blocked` to `running` is done, // its necessary that the state before the transsition is "blocked", otherwise LockInfoStateNotBlocked returned. type LockInfoStateNotBlocked struct { - volume string - path string - operationID string + volume string + path string + opsID string } func (l LockInfoStateNotBlocked) Error() string { - return fmt.Sprintf("Lock state should be \"Blocked\" for %s, %s, %s.", l.volume, l.path, l.operationID) + return fmt.Sprintf("Lock state should be \"Blocked\" for %s, %s, %s.", l.volume, l.path, l.opsID) } var errLockNotInitialized = errors.New("Debug lockMap not initialized.") -// change the state of the lock from Blocked to Running. -func (n *nsLockMap) statusBlockedToRunning(param nsParam, lockOrigin, operationID string, readLock bool) error { +// Initialize lock info volume path. +func (n *nsLockMap) initLockInfoForVolumePath(param nsParam) { + n.debugLockMap[param] = newDebugLockInfoPerVolumePath() +} + +// Change the state of the lock from Blocked to Running. +func (n *nsLockMap) statusBlockedToRunning(param nsParam, lockOrigin, opsID string, readLock bool) error { // This operation is not executed under the scope nsLockMap.mutex.Lock(), lock has to be explicitly held here. n.lockMapMutex.Lock() defer n.lockMapMutex.Unlock() // new state info to be set for the lock. newLockInfo := debugLockInfo{ lockOrigin: lockOrigin, - status: "Running", + status: runningStatus, since: time.Now().UTC(), } - // set lock type. + // Set lock type. if readLock { - newLockInfo.lockType = debugRLockStr + newLockInfo.lType = debugRLockStr } else { - newLockInfo.lockType = debugWLockStr + newLockInfo.lType = debugWLockStr } - // check whether the lock info entry for pair already exists and its not `nil`. - lockInfo, ok := n.debugLockMap[param] + // Check whether the lock info entry for pair already exists and its not `nil`. + debugLockMap, ok := n.debugLockMap[param] if !ok { - // The lock state info for given pair should already exist. + // The lock state info foe given pair should already exist. // If not return `LockInfoVolPathMssing`. return LockInfoVolPathMssing{param.volume, param.path} } - // Lock info the for the given operation ID shouldn't be `nil`. - if lockInfo == nil { + // ``debugLockMap`` entry containing lock info for `param ` is `nil`. + if debugLockMap == nil { return errLockNotInitialized } - lockInfoOpID, ok := n.debugLockMap[param].lockInfo[operationID] + lockInfo, ok := n.debugLockMap[param].lockInfo[opsID] if !ok { // The lock info entry for given `opsID` should already exist for given pair. // If not return `LockInfoOpsIDNotFound`. - return LockInfoOpsIDNotFound{param.volume, param.path, operationID} + return LockInfoOpsIDNotFound{param.volume, param.path, opsID} } - // The entry for the lock origined at `lockOrigin` should already exist. - // If not return `LockInfoOriginNotFound`. - if lockInfoOpID.lockOrigin != lockOrigin { - return LockInfoOriginNotFound{param.volume, param.path, operationID, lockOrigin} + // The entry for the lock origined at `lockOrigin` should already exist. If not return `LockInfoOriginNotFound`. + if lockInfo.lockOrigin != lockOrigin { + return LockInfoOriginNotFound{param.volume, param.path, opsID, lockOrigin} } - // Status of the lock should already be set to "Blocked". - // If not return `LockInfoStateNotBlocked`. - if lockInfoOpID.status != "Blocked" { - return LockInfoStateNotBlocked{param.volume, param.path, operationID} + // Status of the lock should already be set to "Blocked". If not return `LockInfoStateNotBlocked`. + if lockInfo.status != blockedStatus { + return LockInfoStateNotBlocked{param.volume, param.path, opsID} } - - // All checks finished. - // changing the status of the operation from blocked to running and updating the time. - n.debugLockMap[param].lockInfo[operationID] = newLockInfo + // All checks finished. Changing the status of the operation from blocked to running and updating the time. + n.debugLockMap[param].lockInfo[opsID] = newLockInfo // After locking unblocks decrease the blocked counter. n.blockedCounter-- @@ -169,21 +186,17 @@ func (n *nsLockMap) statusBlockedToRunning(param nsParam, lockOrigin, operationI return nil } -func (n *nsLockMap) initLockInfoForVolumePath(param nsParam) { - n.debugLockMap[param] = newDebugLockInfoPerVolumePath() -} - -// change the state of the lock from Ready to Blocked. -func (n *nsLockMap) statusNoneToBlocked(param nsParam, lockOrigin, operationID string, readLock bool) error { +// Change the state of the lock from Ready to Blocked. +func (n *nsLockMap) statusNoneToBlocked(param nsParam, lockOrigin, opsID string, readLock bool) error { newLockInfo := debugLockInfo{ lockOrigin: lockOrigin, - status: "Blocked", + status: blockedStatus, since: time.Now().UTC(), } if readLock { - newLockInfo.lockType = debugRLockStr + newLockInfo.lType = debugRLockStr } else { - newLockInfo.lockType = debugWLockStr + newLockInfo.lType = debugWLockStr } lockInfo, ok := n.debugLockMap[param] @@ -192,15 +205,16 @@ func (n *nsLockMap) statusNoneToBlocked(param nsParam, lockOrigin, operationID s n.initLockInfoForVolumePath(param) } if lockInfo == nil { - // *debugLockInfoPerVolumePath entry is nil, initialize here to avoid any case of `nil` pointer access. + // *lockInfo is nil, initialize here. n.initLockInfoForVolumePath(param) } + // lockInfo is a map[string]debugLockInfo, which holds map[OperationID]{status,time, origin} of the lock. if n.debugLockMap[param].lockInfo == nil { n.debugLockMap[param].lockInfo = make(map[string]debugLockInfo) } // The status of the operation with the given operation ID is marked blocked till its gets unblocked from the lock. - n.debugLockMap[param].lockInfo[operationID] = newLockInfo + n.debugLockMap[param].lockInfo[opsID] = newLockInfo // Increment the Global lock counter. n.globalLockCounter++ // Increment the counter for number of blocked opertions, decrement it after the locking unblocks. @@ -212,7 +226,8 @@ func (n *nsLockMap) statusNoneToBlocked(param nsParam, lockOrigin, operationID s return nil } -// deleteLockInfoEntry - Deletes the lock state information for given pair. Called when nsLk.ref count is 0. +// deleteLockInfoEntry - Deletes the lock state information for given +// pair. Called when nsLk.ref count is 0. func (n *nsLockMap) deleteLockInfoEntryForVolumePath(param nsParam) error { // delete the lock info for the given operation. if _, found := n.debugLockMap[param]; !found { @@ -223,32 +238,36 @@ func (n *nsLockMap) deleteLockInfoEntryForVolumePath(param nsParam) error { return nil } -// deleteLockInfoEntry - Deletes the entry for given opsID in the lock state information of given pair. -// called when the nsLk ref count for the given pair is not 0. -func (n *nsLockMap) deleteLockInfoEntryForOps(param nsParam, operationID string) error { +// deleteLockInfoEntry - Deletes the entry for given opsID in the lock state information +// of given pair. Called when the nsLk ref count for the given +// pair is not 0. +func (n *nsLockMap) deleteLockInfoEntryForOps(param nsParam, opsID string) error { // delete the lock info for the given operation. infoMap, found := n.debugLockMap[param] if !found { return LockInfoVolPathMssing{param.volume, param.path} } - // the opertion finished holding the lock on the resource, remove the entry for the given operation with the operation ID. - if _, foundInfo := infoMap.lockInfo[operationID]; !foundInfo { + // The opertion finished holding the lock on the resource, remove + // the entry for the given operation with the operation ID. + _, foundInfo := infoMap.lockInfo[opsID] + if !foundInfo { // Unlock request with invalid opertion ID not accepted. - return LockInfoOpsIDNotFound{param.volume, param.path, operationID} + return LockInfoOpsIDNotFound{param.volume, param.path, opsID} } - // decrease the global running and lock reference counter. + // Decrease the global running and lock reference counter. n.runningLockCounter-- n.globalLockCounter-- - // decrease the lock referee counter for the lock info for given pair. - // decrease the running operation number. Its assumed that the operation is over once an attempt to release the lock is made. + // Decrease the lock referee counter for the lock info for given pair. + // Decrease the running operation number. Its assumed that the operation is over + // once an attempt to release the lock is made. infoMap.running-- - // decrease the total reference count of locks jeld on pair. + // Decrease the total reference count of locks jeld on pair. infoMap.ref-- - delete(infoMap.lockInfo, operationID) + delete(infoMap.lockInfo, opsID) return nil } -// return randomly generated string ID +// Return randomly generated string ID func getOpsID() string { return string(generateRequestID()) } diff --git a/cmd/lock-instrument_test.go b/cmd/lock-instrument_test.go index ca102ce7c..10afec1e0 100644 --- a/cmd/lock-instrument_test.go +++ b/cmd/lock-instrument_test.go @@ -29,95 +29,94 @@ type lockStateCase struct { readLock bool // lock type. setBlocked bool // initialize the initial state to blocked. expectedErr error - // expected global lock stats. - expectedLockStatus string // Status of the lock Blocked/Running. + // Expected global lock stats. + expectedLockStatus statusType // Status of the lock Blocked/Running. expectedGlobalLockCount int // Total number of locks held across the system, includes blocked + held locks. expectedBlockedLockCount int // Total blocked lock across the system. expectedRunningLockCount int // Total successfully held locks (non-blocking). - // expected lock statu for given pair. + // Expected lock status for given pair. expectedVolPathLockCount int // Total locks held for given pair, includes blocked locks. expectedVolPathRunningCount int // Total succcesfully held locks for given pair. expectedVolPathBlockCount int // Total locks blocked on the given pair. } // Used for validating the Lock info obtaining from contol RPC end point for obtaining lock related info. -func verifyRPCLockInfoResponse(l lockStateCase, rpcLockInfoResponse SystemLockState, t TestErrHandler, testNum int) { - // Assert the total number of locks (locked + acquired) in the system. - if rpcLockInfoResponse.TotalLocks != int64(l.expectedGlobalLockCount) { - t.Fatalf("Test %d: Expected the global lock counter to be %v, but got %v", testNum, int64(l.expectedGlobalLockCount), - rpcLockInfoResponse.TotalLocks) - } +func verifyRPCLockInfoResponse(l lockStateCase, rpcLockInfoMap map[string]*SystemLockState, t TestErrHandler, testNum int) { + for _, rpcLockInfoResponse := range rpcLockInfoMap { + // Assert the total number of locks (locked + acquired) in the system. + if rpcLockInfoResponse.TotalLocks != int64(l.expectedGlobalLockCount) { + t.Fatalf("Test %d: Expected the global lock counter to be %v, but got %v", testNum, int64(l.expectedGlobalLockCount), + rpcLockInfoResponse.TotalLocks) + } - // verify the count for total blocked locks. - if rpcLockInfoResponse.TotalBlockedLocks != int64(l.expectedBlockedLockCount) { - t.Fatalf("Test %d: Expected the total blocked lock counter to be %v, but got %v", testNum, int64(l.expectedBlockedLockCount), - rpcLockInfoResponse.TotalBlockedLocks) - } + // verify the count for total blocked locks. + if rpcLockInfoResponse.TotalBlockedLocks != int64(l.expectedBlockedLockCount) { + t.Fatalf("Test %d: Expected the total blocked lock counter to be %v, but got %v", testNum, int64(l.expectedBlockedLockCount), + rpcLockInfoResponse.TotalBlockedLocks) + } - // verify the count for total running locks. - if rpcLockInfoResponse.TotalAcquiredLocks != int64(l.expectedRunningLockCount) { - t.Fatalf("Test %d: Expected the total running lock counter to be %v, but got %v", testNum, int64(l.expectedRunningLockCount), - rpcLockInfoResponse.TotalAcquiredLocks) - } - - for _, locksInfoPerObject := range rpcLockInfoResponse.LocksInfoPerObject { - // See whether the entry for the exists in the RPC response. - if locksInfoPerObject.Bucket == l.volume && locksInfoPerObject.Object == l.path { - // Assert the total number of locks (blocked + acquired) for the given pair. - if locksInfoPerObject.LocksOnObject != int64(l.expectedVolPathLockCount) { - t.Errorf("Test %d: Expected the total lock count for bucket: \"%s\", object: \"%s\" to be %v, but got %v", testNum, - l.volume, l.path, int64(l.expectedVolPathLockCount), locksInfoPerObject.LocksOnObject) - } - // Assert the total number of acquired locks for the given pair. - if locksInfoPerObject.LocksAcquiredOnObject != int64(l.expectedVolPathRunningCount) { - t.Errorf("Test %d: Expected the acquired lock count for bucket: \"%s\", object: \"%s\" to be %v, but got %v", testNum, - l.volume, l.path, int64(l.expectedVolPathRunningCount), locksInfoPerObject.LocksAcquiredOnObject) - } - // Assert the total number of blocked locks for the given pair. - if locksInfoPerObject.TotalBlockedLocks != int64(l.expectedVolPathBlockCount) { - t.Errorf("Test %d: Expected the blocked lock count for bucket: \"%s\", object: \"%s\" to be %v, but got %v", testNum, - l.volume, l.path, int64(l.expectedVolPathBlockCount), locksInfoPerObject.TotalBlockedLocks) - } - // Flag to mark whether there's an entry in the RPC lock info response for given opsID. - var opsIDfound bool - for _, opsLockState := range locksInfoPerObject.LockDetailsOnObject { - // first check whether the entry for the given operation ID exists. - if opsLockState.OperationID == l.opsID { - opsIDfound = true - // asserting the type of lock (RLock/WLock) from the RPC lock info response. - if l.readLock { - if opsLockState.LockType != debugRLockStr { - t.Errorf("Test case %d: Expected the lock type to be \"%s\"", testNum, debugRLockStr) + // verify the count for total running locks. + if rpcLockInfoResponse.TotalAcquiredLocks != int64(l.expectedRunningLockCount) { + t.Fatalf("Test %d: Expected the total running lock counter to be %v, but got %v", testNum, int64(l.expectedRunningLockCount), + rpcLockInfoResponse.TotalAcquiredLocks) + } + + for _, locksInfoPerObject := range rpcLockInfoResponse.LocksInfoPerObject { + // See whether the entry for the exists in the RPC response. + if locksInfoPerObject.Bucket == l.volume && locksInfoPerObject.Object == l.path { + // Assert the total number of locks (blocked + acquired) for the given pair. + if locksInfoPerObject.LocksOnObject != int64(l.expectedVolPathLockCount) { + t.Errorf("Test %d: Expected the total lock count for bucket: \"%s\", object: \"%s\" to be %v, but got %v", testNum, + l.volume, l.path, int64(l.expectedVolPathLockCount), locksInfoPerObject.LocksOnObject) + } + // Assert the total number of acquired locks for the given pair. + if locksInfoPerObject.LocksAcquiredOnObject != int64(l.expectedVolPathRunningCount) { + t.Errorf("Test %d: Expected the acquired lock count for bucket: \"%s\", object: \"%s\" to be %v, but got %v", testNum, + l.volume, l.path, int64(l.expectedVolPathRunningCount), locksInfoPerObject.LocksAcquiredOnObject) + } + // Assert the total number of blocked locks for the given pair. + if locksInfoPerObject.TotalBlockedLocks != int64(l.expectedVolPathBlockCount) { + t.Errorf("Test %d: Expected the blocked lock count for bucket: \"%s\", object: \"%s\" to be %v, but got %v", testNum, + l.volume, l.path, int64(l.expectedVolPathBlockCount), locksInfoPerObject.TotalBlockedLocks) + } + // Flag to mark whether there's an entry in the RPC lock info response for given opsID. + var opsIDfound bool + for _, opsLockState := range locksInfoPerObject.LockDetailsOnObject { + // first check whether the entry for the given operation ID exists. + if opsLockState.OperationID == l.opsID { + opsIDfound = true + // asserting the type of lock (RLock/WLock) from the RPC lock info response. + if l.readLock { + if opsLockState.LockType != debugRLockStr { + t.Errorf("Test case %d: Expected the lock type to be \"%s\"", testNum, debugRLockStr) + } + } else { + if opsLockState.LockType != debugWLockStr { + t.Errorf("Test case %d: Expected the lock type to be \"%s\"", testNum, debugWLockStr) + } } - } else { - if opsLockState.LockType != debugWLockStr { - t.Errorf("Test case %d: Expected the lock type to be \"%s\"", testNum, debugWLockStr) + + if opsLockState.Status != l.expectedLockStatus { + t.Errorf("Test case %d: Expected the status of the operation to be \"%s\", got \"%s\"", testNum, l.expectedLockStatus, opsLockState.Status) } - } - if opsLockState.Status != l.expectedLockStatus { - t.Errorf("Test case %d: Expected the status of the operation to be \"%s\", got \"%s\"", testNum, l.expectedLockStatus, opsLockState.Status) + // all check satisfied, return here. + // Any mismatch in the earlier checks would have ended the tests due to `Fatalf`, + // control reaching here implies that all checks are satisfied. + return } - - // if opsLockState.LockOrigin != l.lockOrigin { - // t.Fatalf("Test case %d: Expected the origin of the lock to be \"%s\", got \"%s\"", testNum, opsLockState.LockOrigin, l.lockOrigin) - // } - // all check satisfied, return here. - // Any mismatch in the earlier checks would have ended the tests due to `Fatalf`, - // control reaching here implies that all checks are satisfied. - return } - } - // opsID not found. - // No entry for an operation with given operation ID exists. - if !opsIDfound { - t.Fatalf("Test case %d: Entry for OpsId: \"%s\" not found in : \"%s\", : \"%s\" doesn't exist in the RPC response", testNum, l.opsID, l.volume, l.path) + // opsID not found. + // No entry for an operation with given operation ID exists. + if !opsIDfound { + t.Fatalf("Test case %d: Entry for OpsId: \"%s\" not found in : \"%s\", : \"%s\" doesn't exist in the RPC response", testNum, l.opsID, l.volume, l.path) + } } } + // No entry exists for given pair in the RPC response. + t.Errorf("Test case %d: Entry for : \"%s\", : \"%s\" doesn't exist in the RPC response", testNum, l.volume, l.path) } - // No entry exists for given pair in the RPC response. - t.Errorf("Test case %d: Entry for : \"%s\", : \"%s\" doesn't exist in the RPC response", testNum, l.volume, l.path) } // Asserts the lock counter from the global nsMutex inmemory lock with the expected one. @@ -142,7 +141,7 @@ func verifyGlobalLockStats(l lockStateCase, t *testing.T, testNum int) { nsMutex.lockMapMutex.Unlock() // Verifying again with the JSON response of the lock info. // Verifying the lock stats. - sysLockState, err := generateSystemLockResponse() + sysLockState, err := getSystemLockState() if err != nil { t.Fatalf("Obtaining lock info failed with %s", err) @@ -197,11 +196,11 @@ func verifyLockState(l lockStateCase, t *testing.T, testNum int) { if lockInfo, ok := debugLockMap.lockInfo[l.opsID]; ok { // Validating the lock type filed in the debug lock information. if l.readLock { - if lockInfo.lockType != debugRLockStr { + if lockInfo.lType != debugRLockStr { t.Errorf("Test case %d: Expected the lock type in the lock debug info to be \"%s\"", testNum, debugRLockStr) } } else { - if lockInfo.lockType != debugWLockStr { + if lockInfo.lType != debugWLockStr { t.Errorf("Test case %d: Expected the lock type in the lock debug info to be \"%s\"", testNum, debugWLockStr) } } @@ -251,8 +250,8 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) { path string lockOrigin string opsID string - readLock bool // lock type. - setBlocked bool // initialize the initial state to blocked. + readLock bool // Read lock type. + setBlocked bool // Initialize the initial state to blocked. expectedErr error }{ // Test case - 1. @@ -413,11 +412,11 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) { if lockInfo, ok := debugLockMap.lockInfo[testCase.opsID]; ok { // Validating the lock type filed in the debug lock information. if testCase.readLock { - if lockInfo.lockType != debugRLockStr { + if lockInfo.lType != debugRLockStr { t.Errorf("Test case %d: Expected the lock type in the lock debug info to be \"%s\"", i+1, debugRLockStr) } } else { - if lockInfo.lockType != debugWLockStr { + if lockInfo.lType != debugWLockStr { t.Errorf("Test case %d: Expected the lock type in the lock debug info to be \"%s\"", i+1, debugWLockStr) } } @@ -427,7 +426,7 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) { t.Errorf("Test %d: Expected the lock origin info to be \"%s\", but got \"%s\"", i+1, testCase.lockOrigin, lockInfo.lockOrigin) } // validating the status of the lock. - if lockInfo.status != "Running" { + if lockInfo.status != runningStatus { t.Errorf("Test %d: Expected the status of the lock to be \"%s\", but got \"%s\"", i+1, "Running", lockInfo.status) } } else { @@ -457,7 +456,7 @@ func TestNsLockMapStatusNoneToBlocked(t *testing.T) { readLock: true, // expected metrics. expectedErr: nil, - expectedLockStatus: "Blocked", + expectedLockStatus: blockedStatus, expectedGlobalLockCount: 1, expectedRunningLockCount: 0, @@ -479,7 +478,7 @@ func TestNsLockMapStatusNoneToBlocked(t *testing.T) { readLock: false, // expected metrics. expectedErr: nil, - expectedLockStatus: "Blocked", + expectedLockStatus: blockedStatus, expectedGlobalLockCount: 2, expectedRunningLockCount: 0, diff --git a/cmd/lock-rpc-server-common.go b/cmd/lock-rpc-server-common.go index 007888cf7..eee2f6246 100644 --- a/cmd/lock-rpc-server-common.go +++ b/cmd/lock-rpc-server-common.go @@ -59,8 +59,9 @@ func (l *lockServer) removeEntry(name, uid string, lri *[]lockRequesterInfo) boo // Validate lock args. func (l *lockServer) validateLockArgs(args *LockArgs) error { - if !l.timestamp.Equal(args.Timestamp) { - return errInvalidTimestamp + curTime := time.Now().UTC() + if curTime.Sub(args.Timestamp) > globalMaxSkewTime { + return errServerTimeMismatch } if !isRPCTokenValid(args.Token) { return errInvalidToken diff --git a/cmd/lock-rpc-server-common_test.go b/cmd/lock-rpc-server-common_test.go index db19d479c..eddc3ee8d 100644 --- a/cmd/lock-rpc-server-common_test.go +++ b/cmd/lock-rpc-server-common_test.go @@ -24,8 +24,7 @@ import ( // Test function to remove lock entries from map only in case they still exist based on name & uid combination func TestLockRpcServerRemoveEntryIfExists(t *testing.T) { - - testPath, locker, _, _ := createLockTestServer(t) + testPath, locker, _ := createLockTestServer(t) defer removeAll(testPath) lri := lockRequesterInfo{ @@ -62,8 +61,7 @@ func TestLockRpcServerRemoveEntryIfExists(t *testing.T) { // Test function to remove lock entries from map based on name & uid combination func TestLockRpcServerRemoveEntry(t *testing.T) { - - testPath, locker, _, _ := createLockTestServer(t) + testPath, locker, _ := createLockTestServer(t) defer removeAll(testPath) lockRequesterInfo1 := lockRequesterInfo{ diff --git a/cmd/lock-rpc-server.go b/cmd/lock-rpc-server.go index 1ebabe958..38c94f4f1 100644 --- a/cmd/lock-rpc-server.go +++ b/cmd/lock-rpc-server.go @@ -26,6 +26,7 @@ import ( "time" router "github.com/gorilla/mux" + "github.com/minio/minio-go/pkg/set" ) const lockRPCPath = "/minio/lock" @@ -73,8 +74,6 @@ type lockServer struct { rpcPath string mutex sync.Mutex lockMap map[string][]lockRequesterInfo - // Timestamp set at the time of initialization. Resets naturally on minio server restart. - timestamp time.Time } // Register distributed NS lock handlers. @@ -90,12 +89,14 @@ func newLockServers(serverConfig serverCmdConfig) (lockServers []*lockServer) { ignoredExports := serverConfig.ignoredDisks // Save ignored disks in a map - skipDisks := make(map[string]bool) - for _, ignoredExport := range ignoredExports { - skipDisks[ignoredExport] = true + // Initialize ignored disks in a new set. + ignoredSet := set.NewStringSet() + if len(ignoredExports) > 0 { + ignoredSet = set.CreateStringSet(ignoredExports...) } for _, export := range exports { - if skipDisks[export] { + if ignoredSet.Contains(export) { + // Ignore initializing ignored export. continue } // Not local storage move to the next node. @@ -107,10 +108,9 @@ func newLockServers(serverConfig serverCmdConfig) (lockServers []*lockServer) { } // Create handler for lock RPCs locker := &lockServer{ - rpcPath: export, - mutex: sync.Mutex{}, - lockMap: make(map[string][]lockRequesterInfo), - timestamp: time.Now().UTC(), + rpcPath: export, + mutex: sync.Mutex{}, + lockMap: make(map[string][]lockRequesterInfo), } // Start loop for stale lock maintenance @@ -153,7 +153,7 @@ func (l *lockServer) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) erro return err } reply.Token = token - reply.Timestamp = l.timestamp + reply.Timestamp = time.Now().UTC() reply.ServerVersion = Version return nil } diff --git a/cmd/lock-rpc-server_test.go b/cmd/lock-rpc-server_test.go index 85bd34cdb..11b925045 100644 --- a/cmd/lock-rpc-server_test.go +++ b/cmd/lock-rpc-server_test.go @@ -17,6 +17,7 @@ package cmd import ( + "runtime" "sync" "testing" "time" @@ -24,7 +25,6 @@ import ( // Helper function to test equality of locks (without taking timing info into account) func testLockEquality(lriLeft, lriRight []lockRequesterInfo) bool { - if len(lriLeft) != len(lriRight) { return false } @@ -41,8 +41,7 @@ func testLockEquality(lriLeft, lriRight []lockRequesterInfo) bool { } // Helper function to create a lock server for testing -func createLockTestServer(t *testing.T) (string, *lockServer, string, time.Time) { - +func createLockTestServer(t *testing.T) (string, *lockServer, string) { testPath, err := newTestConfig("us-east-1") if err != nil { t.Fatalf("unable initialize config file, %s", err) @@ -63,21 +62,20 @@ func createLockTestServer(t *testing.T) (string, *lockServer, string, time.Time) t.Fatalf("unable for JWT to generate token, %s", err) } - timestamp := time.Now().UTC() locker := &lockServer{ - rpcPath: "rpc-path", - mutex: sync.Mutex{}, - lockMap: make(map[string][]lockRequesterInfo), - timestamp: timestamp, + rpcPath: "rpc-path", + mutex: sync.Mutex{}, + lockMap: make(map[string][]lockRequesterInfo), } - return testPath, locker, token, timestamp + return testPath, locker, token } // Test Lock functionality func TestLockRpcServerLock(t *testing.T) { - testPath, locker, token, timestamp := createLockTestServer(t) + timestamp := time.Now().UTC() + testPath, locker, token := createLockTestServer(t) defer removeAll(testPath) la := LockArgs{ @@ -100,7 +98,7 @@ func TestLockRpcServerLock(t *testing.T) { } else { gotLri, _ := locker.lockMap["name"] expectedLri := []lockRequesterInfo{ - lockRequesterInfo{ + { writer: true, node: "node", rpcPath: "rpc-path", @@ -135,7 +133,8 @@ func TestLockRpcServerLock(t *testing.T) { // Test Unlock functionality func TestLockRpcServerUnlock(t *testing.T) { - testPath, locker, token, timestamp := createLockTestServer(t) + timestamp := time.Now().UTC() + testPath, locker, token := createLockTestServer(t) defer removeAll(testPath) la := LockArgs{ @@ -182,7 +181,8 @@ func TestLockRpcServerUnlock(t *testing.T) { // Test RLock functionality func TestLockRpcServerRLock(t *testing.T) { - testPath, locker, token, timestamp := createLockTestServer(t) + timestamp := time.Now().UTC() + testPath, locker, token := createLockTestServer(t) defer removeAll(testPath) la := LockArgs{ @@ -205,7 +205,7 @@ func TestLockRpcServerRLock(t *testing.T) { } else { gotLri, _ := locker.lockMap["name"] expectedLri := []lockRequesterInfo{ - lockRequesterInfo{ + { writer: false, node: "node", rpcPath: "rpc-path", @@ -240,7 +240,8 @@ func TestLockRpcServerRLock(t *testing.T) { // Test RUnlock functionality func TestLockRpcServerRUnlock(t *testing.T) { - testPath, locker, token, timestamp := createLockTestServer(t) + timestamp := time.Now().UTC() + testPath, locker, token := createLockTestServer(t) defer removeAll(testPath) la := LockArgs{ @@ -294,7 +295,7 @@ func TestLockRpcServerRUnlock(t *testing.T) { } else { gotLri, _ := locker.lockMap["name"] expectedLri := []lockRequesterInfo{ - lockRequesterInfo{ + { writer: false, node: "node", rpcPath: "rpc-path", @@ -327,8 +328,8 @@ func TestLockRpcServerRUnlock(t *testing.T) { // Test Expired functionality func TestLockRpcServerExpired(t *testing.T) { - - testPath, locker, token, timestamp := createLockTestServer(t) + timestamp := time.Now().UTC() + testPath, locker, token := createLockTestServer(t) defer removeAll(testPath) la := LockArgs{ @@ -369,3 +370,52 @@ func TestLockRpcServerExpired(t *testing.T) { } } } + +// Test initialization of lock servers. +func TestLockServers(t *testing.T) { + if runtime.GOOS == "windows" { + return + } + testCases := []struct { + srvCmdConfig serverCmdConfig + totalLockServers int + }{ + // Test - 1 one lock server initialized. + { + srvCmdConfig: serverCmdConfig{ + isDistXL: true, + disks: []string{ + "localhost:/mnt/disk1", + "1.1.1.2:/mnt/disk2", + "1.1.2.1:/mnt/disk3", + "1.1.2.2:/mnt/disk4", + }, + }, + totalLockServers: 1, + }, + // Test - 2 two servers possible, 1 ignored. + { + srvCmdConfig: serverCmdConfig{ + isDistXL: true, + disks: []string{ + "localhost:/mnt/disk1", + "localhost:/mnt/disk2", + "1.1.2.1:/mnt/disk3", + "1.1.2.2:/mnt/disk4", + }, + ignoredDisks: []string{ + "localhost:/mnt/disk2", + }, + }, + totalLockServers: 1, + }, + } + + // Validates lock server initialization. + for i, testCase := range testCases { + lockServers := newLockServers(testCase.srvCmdConfig) + if len(lockServers) != testCase.totalLockServers { + t.Fatalf("Test %d: Expected total %d, got %d", i+1, testCase.totalLockServers, len(lockServers)) + } + } +} diff --git a/cmd/lockinfo-handlers.go b/cmd/lockinfo-handlers.go new file mode 100644 index 000000000..5c6ac1e63 --- /dev/null +++ b/cmd/lockinfo-handlers.go @@ -0,0 +1,169 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "sync" + "time" +) + +// SystemLockState - Structure to fill the lock state of entire object storage. +// That is the total locks held, total calls blocked on locks and state of all the locks for the entire system. +type SystemLockState struct { + TotalLocks int64 `json:"totalLocks"` + // Count of operations which are blocked waiting for the lock to + // be released. + TotalBlockedLocks int64 `json:"totalBlockedLocks"` + // Count of operations which has successfully acquired the lock but + // hasn't unlocked yet( operation in progress). + TotalAcquiredLocks int64 `json:"totalAcquiredLocks"` + LocksInfoPerObject []VolumeLockInfo `json:"locksInfoPerObject"` +} + +// VolumeLockInfo - Structure to contain the lock state info for volume, path pair. +type VolumeLockInfo struct { + Bucket string `json:"bucket"` + Object string `json:"object"` + // All locks blocked + running for given pair. + LocksOnObject int64 `json:"locksOnObject"` + // Count of operations which has successfully acquired the lock + // but hasn't unlocked yet( operation in progress). + LocksAcquiredOnObject int64 `json:"locksAcquiredOnObject"` + // Count of operations which are blocked waiting for the lock + // to be released. + TotalBlockedLocks int64 `json:"locksBlockedOnObject"` + // State information containing state of the locks for all operations + // on given pair. + LockDetailsOnObject []OpsLockState `json:"lockDetailsOnObject"` +} + +// OpsLockState - structure to fill in state information of the lock. +// structure to fill in status information for each operation with given operation ID. +type OpsLockState struct { + OperationID string `json:"opsID"` // String containing operation ID. + LockOrigin string `json:"lockOrigin"` // Operation type (GetObject, PutObject...) + LockType lockType `json:"lockType"` // Lock type (RLock, WLock) + Status statusType `json:"status"` // Status can be Running/Ready/Blocked. + Since time.Time `json:"statusSince"` // Time when the lock was initially held. + Duration time.Duration `json:"statusDuration"` // Duration since the lock was held. +} + +// Read entire state of the locks in the system and return. +func getSystemLockState() (SystemLockState, error) { + nsMutex.lockMapMutex.Lock() + defer nsMutex.lockMapMutex.Unlock() + + lockState := SystemLockState{} + + lockState.TotalBlockedLocks = nsMutex.blockedCounter + lockState.TotalLocks = nsMutex.globalLockCounter + lockState.TotalAcquiredLocks = nsMutex.runningLockCounter + + for param, debugLock := range nsMutex.debugLockMap { + volLockInfo := VolumeLockInfo{} + volLockInfo.Bucket = param.volume + volLockInfo.Object = param.path + volLockInfo.LocksOnObject = debugLock.ref + volLockInfo.TotalBlockedLocks = debugLock.blocked + volLockInfo.LocksAcquiredOnObject = debugLock.running + for opsID, lockInfo := range debugLock.lockInfo { + volLockInfo.LockDetailsOnObject = append(volLockInfo.LockDetailsOnObject, OpsLockState{ + OperationID: opsID, + LockOrigin: lockInfo.lockOrigin, + LockType: lockInfo.lType, + Status: lockInfo.status, + Since: lockInfo.since, + Duration: time.Now().UTC().Sub(lockInfo.since), + }) + } + lockState.LocksInfoPerObject = append(lockState.LocksInfoPerObject, volLockInfo) + } + return lockState, nil +} + +// Remote procedure call, calls LockInfo handler with given input args. +func (c *controlAPIHandlers) remoteLockInfoCall(args *GenericArgs, replies []SystemLockState) error { + var wg sync.WaitGroup + var errs = make([]error, len(c.RemoteControls)) + // Send remote call to all neighboring peers to restart minio servers. + for index, clnt := range c.RemoteControls { + wg.Add(1) + go func(index int, client *AuthRPCClient) { + defer wg.Done() + errs[index] = client.Call("Control.RemoteLockInfo", args, &replies[index]) + errorIf(errs[index], "Unable to initiate control lockInfo request to remote node %s", client.Node()) + }(index, clnt) + } + wg.Wait() + for _, err := range errs { + if err != nil { + return err + } + } + return nil +} + +// RemoteLockInfo - RPC control handler for `minio control lock`, used internally by LockInfo to +// make calls to neighboring peers. +func (c *controlAPIHandlers) RemoteLockInfo(args *GenericArgs, reply *SystemLockState) error { + if !isRPCTokenValid(args.Token) { + return errInvalidToken + } + // Obtain the lock state information of the local system. + lockState, err := getSystemLockState() + // In case of error, return err to the RPC client. + if err != nil { + return err + } + *reply = lockState + return nil +} + +// LockInfo - RPC control handler for `minio control lock`. Returns the info of the locks held in the cluster. +func (c *controlAPIHandlers) LockInfo(args *GenericArgs, reply *map[string]SystemLockState) error { + if !isRPCTokenValid(args.Token) { + return errInvalidToken + } + var replies = make([]SystemLockState, len(c.RemoteControls)) + if args.Remote { + // Fetch lock states from all the remote peers. + args.Remote = false + if err := c.remoteLockInfoCall(args, replies); err != nil { + return err + } + } + rep := make(map[string]SystemLockState) + // The response containing the lock info. + for index, client := range c.RemoteControls { + rep[client.Node()] = replies[index] + } + // Obtain the lock state information of the local system. + lockState, err := getSystemLockState() + // In case of error, return err to the RPC client. + if err != nil { + return err + } + + // Save the local node lock state. + rep[c.LocalNode] = lockState + + // Set the reply. + *reply = rep + + // Success. + return nil +} diff --git a/cmd/logger.go b/cmd/logger.go index d2405807b..f1418b3d5 100644 --- a/cmd/logger.go +++ b/cmd/logger.go @@ -19,6 +19,9 @@ package cmd import ( "bufio" "bytes" + "fmt" + "path/filepath" + "runtime" "runtime/debug" "strings" @@ -42,6 +45,20 @@ type logger struct { // Add new loggers here. } +// Function takes input with the results from runtime.Caller(1). Depending on the boolean. +// This function can either returned a shotFile form or a longFile form. +func funcFromPC(pc uintptr, file string, line int, shortFile bool) string { + var fn, name string + if shortFile { + fn = strings.Replace(file, filepath.ToSlash(GOPATH)+"/src/github.com/minio/minio/cmd/", "", -1) + name = strings.Replace(runtime.FuncForPC(pc).Name(), "github.com/minio/minio/cmd.", "", -1) + } else { + fn = strings.Replace(file, filepath.ToSlash(GOPATH)+"/src/", "", -1) + name = strings.Replace(runtime.FuncForPC(pc).Name(), "github.com/minio/minio/cmd.", "", -1) + } + return fmt.Sprintf("%s [%s:%d]", name, fn, line) +} + // stackInfo returns printable stack trace. func stackInfo() string { // Convert stack-trace bytes to io.Reader. @@ -56,7 +73,7 @@ func stackInfo() string { stackBuf.ReadFrom(rawStack) // Strip GOPATH of the build system and return. - return strings.Replace(stackBuf.String(), GOPATH+"/src/", "", -1) + return strings.Replace(stackBuf.String(), filepath.ToSlash(GOPATH)+"/src/", "", -1) } // errorIf synonymous with fatalIf but doesn't exit on error != nil diff --git a/cmd/logger_test.go b/cmd/logger_test.go index 332ad6a56..2f69a740f 100644 --- a/cmd/logger_test.go +++ b/cmd/logger_test.go @@ -20,17 +20,36 @@ import ( "bytes" "encoding/json" "errors" + "os" + "path/filepath" + "runtime" + "testing" "github.com/Sirupsen/logrus" - - . "gopkg.in/check.v1" ) -type LoggerSuite struct{} - -var _ = Suite(&LoggerSuite{}) +// Tests func obtained from process stack counter. +func TestFuncToPC(t *testing.T) { + GOPATH = filepath.ToSlash(os.Getenv("GOPATH")) + pc, file, line, success := runtime.Caller(0) + if !success { + file = "???" + line = 0 + } + shortFile := true // We are only interested in short file form. + cLocation := funcFromPC(pc, file, line, shortFile) + if cLocation != "TestFuncToPC [logger_test.go:34]" { + t.Fatal("Unexpected caller location found", cLocation) + } + shortFile = false // We are not interested in short file form. + cLocation = funcFromPC(pc, file, line, shortFile) + if cLocation != "TestFuncToPC [github.com/minio/minio/cmd/logger_test.go:34]" { + t.Fatal("Unexpected caller location found", cLocation) + } +} -func (s *LoggerSuite) TestLogger(c *C) { +// Tests error logger. +func TestLogger(t *testing.T) { var buffer bytes.Buffer var fields logrus.Fields log.Out = &buffer @@ -38,10 +57,17 @@ func (s *LoggerSuite) TestLogger(c *C) { errorIf(errors.New("Fake error"), "Failed with error.") err := json.Unmarshal(buffer.Bytes(), &fields) - c.Assert(err, IsNil) - c.Assert(fields["level"], Equals, "error") - + if err != nil { + t.Fatal(err) + } + if fields["level"] != "error" { + t.Fatalf("Expected error, got %s", fields["level"]) + } msg, ok := fields["cause"] - c.Assert(ok, Equals, true) - c.Assert(msg, Equals, "Fake error") + if !ok { + t.Fatal("Cause field missing") + } + if msg != "Fake error" { + t.Fatal("Cause field has unexpected message", msg) + } } diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index 613d05802..4b379f3af 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -18,7 +18,6 @@ package cmd import ( "errors" - "fmt" pathutil "path" "runtime" "strconv" @@ -66,8 +65,7 @@ func initNSLock(isDist bool) { lockMap: make(map[nsParam]*nsLock), } - // Initialize nsLockMap with entry for instrumentation - // information. + // Initialize nsLockMap with entry for instrumentation information. // Entries of -> stateInfo of locks nsMutex.debugLockMap = make(map[nsParam]*debugLockInfoPerVolumePath) } @@ -94,7 +92,7 @@ type nsLock struct { // nsLockMap - namespace lock map, provides primitives to Lock, // Unlock, RLock and RUnlock. type nsLockMap struct { - // lock counter used for lock debugging. + // Lock counter used for lock debugging. globalLockCounter int64 // Total locks held. blockedCounter int64 // Total operations blocked waiting for locks. runningLockCounter int64 // Total locks held but not released yet. @@ -148,8 +146,7 @@ func (n *nsLockMap) lock(volume, path string, lockOrigin, opsID string, readLock // Changing the status of the operation from blocked to // running. change the state of the lock to be running (from - // blocked) for the given pair of and - // . + // blocked) for the given pair of and . if err := n.statusBlockedToRunning(param, lockOrigin, opsID, readLock); err != nil { errorIf(err, "Failed to set the lock state to running.") } @@ -199,24 +196,22 @@ func (n *nsLockMap) unlock(volume, path, opsID string, readLock bool) { // Lock - locks the given resource for writes, using a previously // allocated name space lock or initializing a new one. func (n *nsLockMap) Lock(volume, path, opsID string) { - var lockOrigin string + readLock := false // This is a write lock. // The caller information of the lock held has been obtained // here before calling any other function. // Fetching the package, function name and the line number of - // the caller from the runtime. here is an example - // https://play.golang.org/p/perrmNRI9_ . - pc, fn, line, success := runtime.Caller(1) + // the caller from the runtime. + pc, file, line, success := runtime.Caller(1) if !success { - errorIf(errors.New("Couldn't get caller info."), - "Fetching caller info form runtime failed.") + file = "???" + line = 0 } - lockOrigin = fmt.Sprintf("[lock held] in %s[%s:%d]", - runtime.FuncForPC(pc).Name(), fn, line) + shortFile := true // We are only interested in short file form. + lockLocation := funcFromPC(pc, file, line, shortFile) - readLock := false - n.lock(volume, path, lockOrigin, opsID, readLock) + n.lock(volume, path, lockLocation, opsID, readLock) } // Unlock - unlocks any previously acquired write locks. @@ -227,24 +222,22 @@ func (n *nsLockMap) Unlock(volume, path, opsID string) { // RLock - locks any previously acquired read locks. func (n *nsLockMap) RLock(volume, path, opsID string) { - var lockOrigin string readLock := true // The caller information of the lock held has been obtained // here before calling any other function. // Fetching the package, function name and the line number of - // the caller from the runtime. Here is an example - // https://play.golang.org/p/perrmNRI9_ . - pc, fn, line, success := runtime.Caller(1) + // the caller from the runtime. + pc, file, line, success := runtime.Caller(1) if !success { - errorIf(errors.New("Couldn't get caller info."), - "Fetching caller info form runtime failed.") + file = "???" + line = 0 } - lockOrigin = fmt.Sprintf("[lock held] in %s[%s:%d]", - runtime.FuncForPC(pc).Name(), fn, line) + shortFile := true // We are only interested in short file form. + lockLocation := funcFromPC(pc, file, line, shortFile) - n.lock(volume, path, lockOrigin, opsID, readLock) + n.lock(volume, path, lockLocation, opsID, readLock) } // RUnlock - unlocks any previously acquired read locks. diff --git a/cmd/routers.go b/cmd/routers.go index ebf7bda4e..33c0be5ce 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -81,7 +81,7 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler { } // Register controller rpc router. - registerControllerRPCRouter(mux, srvCmdConfig) + registerControlRPCRouter(mux, srvCmdConfig) // set environmental variable MINIO_BROWSER=off to disable minio web browser. // By default minio web browser is enabled. diff --git a/cmd/storage-rpc-server.go b/cmd/storage-rpc-server.go index d92780ac4..13ad50db4 100644 --- a/cmd/storage-rpc-server.go +++ b/cmd/storage-rpc-server.go @@ -53,7 +53,7 @@ func (s *storageServer) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) e return err } reply.Token = token - reply.Timestamp = s.timestamp + reply.Timestamp = time.Now().UTC() reply.ServerVersion = Version return nil } @@ -229,7 +229,6 @@ func newRPCServer(serverConfig serverCmdConfig) (servers []*storageServer, err e if len(ignoredExports) > 0 { ignoredSet = set.CreateStringSet(ignoredExports...) } - tstamp := time.Now().UTC() for _, export := range exports { if ignoredSet.Contains(export) { // Ignore initializing ignored export. @@ -249,9 +248,8 @@ func newRPCServer(serverConfig serverCmdConfig) (servers []*storageServer, err e export = export[idx+1:] } servers = append(servers, &storageServer{ - storage: storage, - path: export, - timestamp: tstamp, + storage: storage, + path: export, }) } } diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index 653c2dddc..cbb151844 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -241,7 +241,7 @@ func StartTestStorageRPCServer(t TestErrHandler, instanceType string, diskN int) func initTestControlRPCEndPoint(srvCmdConfig serverCmdConfig) http.Handler { // Initialize router. muxRouter := router.NewRouter() - registerControllerRPCRouter(muxRouter, srvCmdConfig) + registerControlRPCRouter(muxRouter, srvCmdConfig) return muxRouter } diff --git a/cmd/typed-errors.go b/cmd/typed-errors.go index df85cc7da..497a5c5da 100644 --- a/cmd/typed-errors.go +++ b/cmd/typed-errors.go @@ -30,9 +30,6 @@ var errSignatureMismatch = errors.New("Signature does not match") // used when token used for authentication by the MinioBrowser has expired var errInvalidToken = errors.New("Invalid token") -// used when cached timestamp do not match with what client remembers. -var errInvalidTimestamp = errors.New("Timestamps don't match, server may have restarted.") - // If x-amz-content-sha256 header value mismatches with what we calculate. var errContentSHA256Mismatch = errors.New("Content checksum SHA256 mismatch")