From 07506358ff6fdee763506df7584f2680db752183 Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Sun, 21 Aug 2016 20:06:53 +0100 Subject: [PATCH] Refactor Heal RPC and add Shutdown RPC (#2488) --- cmd/bucket-notification-handlers.go | 2 +- cmd/control-heal-main.go | 120 ++++++++++++++++++ cmd/control-main.go | 116 +++-------------- cmd/control-shutdown-main.go | 68 ++++++++++ ...{rpc-control.go => controller-handlers.go} | 61 ++++----- cmd/controller-router.go | 42 ++++++ cmd/routers.go | 7 +- cmd/utils.go | 30 ++++- 8 files changed, 313 insertions(+), 133 deletions(-) create mode 100644 cmd/control-heal-main.go create mode 100644 cmd/control-shutdown-main.go rename cmd/{rpc-control.go => controller-handlers.go} (58%) create mode 100644 cmd/controller-router.go diff --git a/cmd/bucket-notification-handlers.go b/cmd/bucket-notification-handlers.go index 483e292e7..f1cbaf989 100644 --- a/cmd/bucket-notification-handlers.go +++ b/cmd/bucket-notification-handlers.go @@ -88,7 +88,7 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter, _, err := api.ObjectAPI.GetBucketInfo(bucket) if err != nil { - errorIf(err, "Unable to bucket info.") + errorIf(err, "Unable to find bucket info.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) return } diff --git a/cmd/control-heal-main.go b/cmd/control-heal-main.go new file mode 100644 index 000000000..c25a9fda6 --- /dev/null +++ b/cmd/control-heal-main.go @@ -0,0 +1,120 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "fmt" + "net/rpc" + "net/url" + "path" + "strings" + + "github.com/minio/cli" +) + +var healCmd = cli.Command{ + Name: "heal", + Usage: "To heal objects.", + Action: healControl, + CustomHelpTemplate: `NAME: + minio control {{.Name}} - {{.Usage}} + +USAGE: + minio control {{.Name}} + +EAMPLES: + 1. Heal an object. + $ minio control {{.Name}} http://localhost:9000/songs/classical/western/piano.mp3 + + 2. Heal all objects in a bucket recursively. + $ minio control {{.Name}} http://localhost:9000/songs + + 3. Heall all objects with a given prefix recursively. + $ minio control {{.Name}} http://localhost:9000/songs/classical/ +`, +} + +// "minio control heal" entry point. +func healControl(ctx *cli.Context) { + // Parse bucket and object from url.URL.Path + parseBucketObject := func(path string) (bucketName string, objectName string) { + splits := strings.SplitN(path, string(slashSeparator), 3) + switch len(splits) { + case 0, 1: + bucketName = "" + objectName = "" + case 2: + bucketName = splits[1] + objectName = "" + case 3: + bucketName = splits[1] + objectName = splits[2] + + } + return bucketName, objectName + } + + if len(ctx.Args()) != 1 { + cli.ShowCommandHelpAndExit(ctx, "heal", 1) + } + + parsedURL, err := url.Parse(ctx.Args()[0]) + fatalIf(err, "Unable to parse URL") + + bucketName, objectName := parseBucketObject(parsedURL.Path) + if bucketName == "" { + cli.ShowCommandHelpAndExit(ctx, "heal", 1) + } + + client, err := rpc.DialHTTPPath("tcp", parsedURL.Host, path.Join(reservedBucket, controlPath)) + fatalIf(err, "Unable to connect to %s", parsedURL.Host) + + // If object does not have trailing "/" then it's an object, hence heal it. + if objectName != "" && !strings.HasSuffix(objectName, slashSeparator) { + fmt.Printf("Healing : /%s/%s", bucketName, objectName) + args := &HealObjectArgs{bucketName, objectName} + reply := &HealObjectReply{} + err = client.Call("Control.HealObject", args, reply) + fatalIf(err, "RPC Control.HealObject call failed") + fmt.Println() + return + } + + // Recursively list and heal the objects. + prefix := objectName + marker := "" + for { + args := HealListArgs{bucketName, prefix, marker, "", 1000} + reply := &HealListReply{} + err = client.Call("Control.ListObjectsHeal", args, reply) + fatalIf(err, "RPC Heal.ListObjects call failed") + + // Heal the objects returned in the ListObjects reply. + for _, obj := range reply.Objects { + fmt.Printf("Healing : /%s/%s", bucketName, obj) + reply := &HealObjectReply{} + err = client.Call("Control.HealObject", HealObjectArgs{bucketName, obj}, reply) + fatalIf(err, "RPC Heal.HealObject call failed") + fmt.Println() + } + if !reply.IsTruncated { + // End of listing. + break + } + marker = reply.NextMarker + } +} diff --git a/cmd/control-main.go b/cmd/control-main.go index b0026b3d1..a0494c628 100644 --- a/cmd/control-main.go +++ b/cmd/control-main.go @@ -16,15 +16,7 @@ package cmd -import ( - "fmt" - "strings" - - "net/rpc" - "net/url" - - "github.com/minio/cli" -) +import "github.com/minio/cli" // "minio control" command. var controlCmd = cli.Command{ @@ -33,102 +25,28 @@ var controlCmd = cli.Command{ Action: mainControl, Subcommands: []cli.Command{ healCmd, + shutdownCmd, }, -} - -func mainControl(c *cli.Context) { - cli.ShowCommandHelp(c, "") -} - -var healCmd = cli.Command{ - Name: "heal", - Usage: "To heal objects.", - Action: healControl, CustomHelpTemplate: `NAME: - minio control {{.Name}} - {{.Usage}} + {{.Name}} - {{.Usage}} USAGE: - minio control {{.Name}} - -EAMPLES: - 1. Heal an object. - $ minio control {{.Name}} http://localhost:9000/songs/classical/western/piano.mp3 - - 2. Heal all objects in a bucket recursively. - $ minio control {{.Name}} http://localhost:9000/songs - - 3. Heall all objects with a given prefix recursively. - $ minio control {{.Name}} http://localhost:9000/songs/classical/ + {{.Name}} [FLAGS] COMMAND + +FLAGS: + {{range .Flags}}{{.}} + {{end}} +COMMANDS: + {{range .Commands}}{{join .Names ", "}}{{ "\t" }}{{.Usage}} + {{end}} `, } -// "minio control heal" entry point. -func healControl(c *cli.Context) { - // Parse bucket and object from url.URL.Path - parseBucketObject := func(path string) (bucketName string, objectName string) { - splits := strings.SplitN(path, string(slashSeparator), 3) - switch len(splits) { - case 0, 1: - bucketName = "" - objectName = "" - case 2: - bucketName = splits[1] - objectName = "" - case 3: - bucketName = splits[1] - objectName = splits[2] - - } - return bucketName, objectName - } - - if len(c.Args()) != 1 { - cli.ShowCommandHelpAndExit(c, "heal", 1) - } - - parsedURL, err := url.ParseRequestURI(c.Args()[0]) - fatalIf(err, "Unable to parse URL") - - bucketName, objectName := parseBucketObject(parsedURL.Path) - if bucketName == "" { - cli.ShowCommandHelpAndExit(c, "heal", 1) - } - - client, err := rpc.DialHTTPPath("tcp", parsedURL.Host, healPath) - fatalIf(err, "Unable to connect to %s", parsedURL.Host) - - // If object does not have trailing "/" then it's an object, hence heal it. - if objectName != "" && !strings.HasSuffix(objectName, slashSeparator) { - fmt.Printf("Healing : /%s/%s", bucketName, objectName) - args := &HealObjectArgs{bucketName, objectName} - reply := &HealObjectReply{} - err = client.Call("Heal.HealObject", args, reply) - fatalIf(err, "RPC Heal.HealObject call failed") - fmt.Println() - return - } - - // Recursively list and heal the objects. - prefix := objectName - marker := "" - for { - args := HealListArgs{bucketName, prefix, marker, "", 1000} - reply := &HealListReply{} - err = client.Call("Heal.ListObjects", args, reply) - fatalIf(err, "RPC Heal.ListObjects call failed") - - // Heal the objects returned in the ListObjects reply. - for _, obj := range reply.Objects { - fmt.Printf("Healing : /%s/%s", bucketName, obj) - reply := &HealObjectReply{} - err = client.Call("Heal.HealObject", HealObjectArgs{bucketName, obj}, reply) - fatalIf(err, "RPC Heal.HealObject call failed") - fmt.Println() - } - if !reply.IsTruncated { - // End of listing. - break - } - marker = reply.NextMarker +func mainControl(ctx *cli.Context) { + if ctx.Args().First() != "" { // command help. + cli.ShowCommandHelp(ctx, ctx.Args().First()) + } else { + // command with Subcommands is an App. + cli.ShowAppHelp(ctx) } } diff --git a/cmd/control-shutdown-main.go b/cmd/control-shutdown-main.go new file mode 100644 index 000000000..b24621d39 --- /dev/null +++ b/cmd/control-shutdown-main.go @@ -0,0 +1,68 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "net/rpc" + "net/url" + "path" + + "github.com/minio/cli" +) + +var shutdownCmd = cli.Command{ + Name: "shutdown", + Usage: "Shutdown or restart the server.", + Action: shutdownControl, + Flags: []cli.Flag{ + cli.BoolFlag{ + Name: "restart", + Usage: "Restart the server.", + }, + }, + CustomHelpTemplate: `NAME: + minio control {{.Name}} - {{.Usage}} + +USAGE: + minio control {{.Name}} http://localhost:9000/ + +EAMPLES: + 1. Shutdown the server: + $ minio control shutdown http://localhost:9000/ + + 2. Reboot the server: + $ minio control shutdown --restart http://localhost:9000/ +`, +} + +// "minio control shutdown" entry point. +func shutdownControl(c *cli.Context) { + if len(c.Args()) != 1 { + cli.ShowCommandHelpAndExit(c, "shutdown", 1) + } + + parsedURL, err := url.ParseRequestURI(c.Args()[0]) + fatalIf(err, "Unable to parse URL") + + client, err := rpc.DialHTTPPath("tcp", parsedURL.Host, path.Join(reservedBucket, controlPath)) + fatalIf(err, "Unable to connect to %s", parsedURL.Host) + + args := &ShutdownArgs{Reboot: c.Bool("restart")} + reply := &ShutdownReply{} + err = client.Call("Control.Shutdown", args, reply) + fatalIf(err, "RPC Control.Shutdown call failed") +} diff --git a/cmd/rpc-control.go b/cmd/controller-handlers.go similarity index 58% rename from cmd/rpc-control.go rename to cmd/controller-handlers.go index 8e654b648..e6c0fa9c3 100644 --- a/cmd/rpc-control.go +++ b/cmd/controller-handlers.go @@ -16,30 +16,6 @@ package cmd -import ( - "net/rpc" - - router "github.com/gorilla/mux" -) - -// Routes paths for "minio control" commands. -const ( - controlRPCPath = reservedBucket + "/control" - healPath = controlRPCPath + "/heal" -) - -// Register control RPC handlers. -func registerControlRPCRouter(mux *router.Router, objAPI ObjectLayer) { - healRPCServer := rpc.NewServer() - healRPCServer.RegisterName("Heal", &healHandler{objAPI}) - mux.Path(healPath).Handler(healRPCServer) -} - -// Handler for object healing. -type healHandler struct { - ObjectAPI ObjectLayer -} - // HealListArgs - argument for ListObjects RPC. type HealListArgs struct { Bucket string @@ -56,9 +32,13 @@ type HealListReply struct { Objects []string } -// ListObjects - list objects. -func (h healHandler) ListObjects(arg *HealListArgs, reply *HealListReply) error { - info, err := h.ObjectAPI.ListObjectsHeal(arg.Bucket, arg.Prefix, arg.Marker, arg.Delimiter, arg.MaxKeys) +// ListObjects - list all objects that needs healing. +func (c *controllerAPIHandlers) ListObjectsHeal(arg *HealListArgs, reply *HealListReply) error { + objAPI := c.ObjectAPI + if objAPI == nil { + return errInvalidArgument + } + info, err := objAPI.ListObjectsHeal(arg.Bucket, arg.Prefix, arg.Marker, arg.Delimiter, arg.MaxKeys) if err != nil { return err } @@ -80,6 +60,29 @@ type HealObjectArgs struct { type HealObjectReply struct{} // HealObject - heal the object. -func (h healHandler) HealObject(arg *HealObjectArgs, reply *HealObjectReply) error { - return h.ObjectAPI.HealObject(arg.Bucket, arg.Object) +func (c *controllerAPIHandlers) HealObject(arg *HealObjectArgs, reply *HealObjectReply) error { + objAPI := c.ObjectAPI + if objAPI == nil { + return errInvalidArgument + } + return objAPI.HealObject(arg.Bucket, arg.Object) +} + +// ShutdownArgs - argument for Shutdown RPC. +type ShutdownArgs struct { + Reboot bool +} + +// ShutdownReply - reply by Shutdown RPC. +type ShutdownReply struct{} + +// Shutdown - Shutdown the server. + +func (c *controllerAPIHandlers) Shutdown(arg *ShutdownArgs, reply *ShutdownReply) error { + if arg.Reboot { + globalShutdownSignalCh <- shutdownRestart + } else { + globalShutdownSignalCh <- shutdownHalt + } + return nil } diff --git a/cmd/controller-router.go b/cmd/controller-router.go new file mode 100644 index 000000000..8538d95c5 --- /dev/null +++ b/cmd/controller-router.go @@ -0,0 +1,42 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "net/rpc" + + router "github.com/gorilla/mux" +) + +// Routes paths for "minio control" commands. +const ( + controlPath = "/controller" +) + +// Register control RPC handlers. +func registerControlRPCRouter(mux *router.Router, ctrlHandlers *controllerAPIHandlers) { + ctrlRPCServer := rpc.NewServer() + ctrlRPCServer.RegisterName("Control", ctrlHandlers) + + ctrlRouter := mux.NewRoute().PathPrefix(reservedBucket).Subrouter() + ctrlRouter.Path(controlPath).Handler(ctrlRPCServer) +} + +// Handler for object healing. +type controllerAPIHandlers struct { + ObjectAPI ObjectLayer +} diff --git a/cmd/routers.go b/cmd/routers.go index 74b718aa3..42503de70 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -69,6 +69,11 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler { ObjectAPI: objAPI, } + // Initialize Controller. + ctrlHandlers := &controllerAPIHandlers{ + ObjectAPI: objAPI, + } + // Initialize and monitor shutdown signals. err = initGracefulShutdown(os.Exit) fatalIf(err, "Unable to initialize graceful shutdown operation") @@ -98,7 +103,7 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler { // FIXME: till net/rpc auth is brought in "minio control" can be enabled only though // this env variable. if os.Getenv("MINIO_CONTROL") != "" { - registerControlRPCRouter(mux, objAPI) + registerControlRPCRouter(mux, ctrlHandlers) } // set environmental variable MINIO_BROWSER=off to disable minio web browser. diff --git a/cmd/utils.go b/cmd/utils.go index 2e6715975..d79577acd 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -19,8 +19,10 @@ package cmd import ( "encoding/base64" "encoding/xml" + "errors" "io" "os" + "os/exec" "strings" "sync" "syscall" @@ -145,8 +147,15 @@ func initGracefulShutdown(onExitFn onExitFunc) error { return startMonitorShutdownSignal(onExitFn) } +type shutdownSignal int + +const ( + shutdownHalt = iota + shutdownRestart +) + // Global shutdown signal channel. -var globalShutdownSignalCh = make(chan struct{}, 1) +var globalShutdownSignalCh = make(chan shutdownSignal, 1) // Start to monitor shutdownSignal to execute shutdown callbacks func startMonitorShutdownSignal(onExitFn onExitFunc) error { @@ -162,8 +171,8 @@ func startMonitorShutdownSignal(onExitFn onExitFunc) error { select { case <-trapCh: // Initiate graceful shutdown. - globalShutdownSignalCh <- struct{}{} - case <-globalShutdownSignalCh: + globalShutdownSignalCh <- shutdownHalt + case signal := <-globalShutdownSignalCh: // Call all object storage shutdown callbacks and exit for emergency for _, callback := range globalShutdownCBs.GetObjectLayerCBs() { exitCode := callback() @@ -179,6 +188,21 @@ func startMonitorShutdownSignal(onExitFn onExitFunc) error { onExitFn(int(exitCode)) } } + // All shutdown callbacks ensure that the server is safely terminated + // and any concurrent process could be started again + if signal == shutdownRestart { + path := os.Args[0] + cmdArgs := os.Args[1:] + cmd := exec.Command(path, cmdArgs...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + err := cmd.Start() + if err != nil { + errorIf(errors.New("Unable to reboot."), err.Error()) + } + onExitFn(int(exitSuccess)) + } onExitFn(int(exitSuccess)) } }