From e600bd6b4f08de94ac48f1a617a4e657ef7c0b21 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Sat, 19 Sep 2015 01:30:09 -0700 Subject: [PATCH] Controller Service proxies rpc calls to the corresponding servers --- controller-router.go | 4 +- controller-rpc-server.go | 123 +++++++++----------------------------- controller.go | 4 +- controller_rpc_test.go | 28 +++++---- flags.go | 7 +++ main.go | 1 + rpc-definitions.go | 80 +++++++++++++++++++++++++ server-api-definitions.go | 11 ++-- server-main.go | 12 ++-- server-router.go | 11 ++++ server-rpc-server.go | 63 +++++++++++++++++++ server.go | 50 +++++++++++----- 12 files changed, 258 insertions(+), 136 deletions(-) create mode 100644 rpc-definitions.go create mode 100644 server-rpc-server.go diff --git a/controller-router.go b/controller-router.go index b3a6e13b9..d891ec27b 100644 --- a/controller-router.go +++ b/controller-router.go @@ -25,13 +25,13 @@ import ( ) // getRPCHandler rpc handler -func getRPCHandler() http.Handler { +func getRPCCtrlHandler() http.Handler { s := jsonrpc.NewServer() s.RegisterCodec(json.NewCodec(), "application/json") s.RegisterService(new(VersionService), "Version") s.RegisterService(new(DonutService), "Donut") s.RegisterService(new(AuthService), "Auth") - s.RegisterService(new(ServerService), "Server") + s.RegisterService(new(controllerServerRPCService), "Server") // Add new RPC services here return registerRPC(router.NewRouter(), s) } diff --git a/controller-rpc-server.go b/controller-rpc-server.go index 0d618c774..93dc4ec6c 100644 --- a/controller-rpc-server.go +++ b/controller-rpc-server.go @@ -18,118 +18,51 @@ package main import ( "net/http" - "os" - "runtime" - "syscall" + "github.com/gorilla/rpc/v2/json" "github.com/minio/minio/pkg/probe" ) -// MinioServer - container for minio server data -type MinioServer struct { - IP string `json:"ip"` - ID string `json:"id"` - Name string `json:"name"` - Status string `json:"status"` +type controllerServerRPCService struct { + serverList []ServerArg } -// ServerArgs - server arg -type ServerArgs struct { - MinioServers []MinioServer `json:"servers"` -} - -// ServerAddReply - server add reply -type ServerAddReply struct { - ServersAdded []MinioServer `json:"serversAdded"` -} - -// MemStatsReply memory statistics -type MemStatsReply struct { - runtime.MemStats `json:"memstats"` -} - -// DiskStatsReply disk statistics -type DiskStatsReply struct { - DiskStats syscall.Statfs_t `json:"diskstats"` -} - -// SysInfoReply system info -type SysInfoReply struct { - Hostname string `json:"hostname"` - SysARCH string `json:"sysArch"` - SysOS string `json:"sysOS"` - SysCPUS int `json:"sysNCPUs"` - GORoutines int `json:"golangRoutines"` - GOVersion string `json:"golangVersion"` -} - -// ServerListReply list of minio servers -type ServerListReply struct { - ServerList []MinioServer `json:"servers"` -} +func proxyRequest(method string, ip string, arg interface{}, res interface{}) error { + op := rpcOperation{ + Method: "Server." + method, + Request: arg, + } -// ServerService server json rpc service -type ServerService struct { - serverList []MinioServer + request, _ := newRPCRequest("http://"+ip+":9002/rpc", op, nil) + resp, err := request.Do() + if err != nil { + return probe.WrapError(err) + } + decodeerr := json.DecodeClientResponse(resp.Body, res) + return decodeerr } -// Add - add new server -func (s *ServerService) Add(r *http.Request, arg *ServerArgs, reply *ServerAddReply) error { - for _, server := range arg.MinioServers { - server.Status = "connected" - reply.ServersAdded = append(reply.ServersAdded, server) +func (s *controllerServerRPCService) Add(r *http.Request, arg *ServerArg, res *DefaultRep) error { + err := proxyRequest("Add", arg.IP, arg, res) + if err == nil { + s.serverList = append(s.serverList, *arg) } - return nil + return err } -// MemStats - memory statistics on the server -func (s *ServerService) MemStats(r *http.Request, arg *ServerArgs, reply *MemStatsReply) error { - runtime.ReadMemStats(&reply.MemStats) - return nil +func (s *controllerServerRPCService) MemStats(r *http.Request, arg *ServerArg, res *MemStatsRep) error { + return proxyRequest("MemStats", arg.IP, arg, res) } -// DiskStats - disk statistics on the server -func (s *ServerService) DiskStats(r *http.Request, arg *ServerArgs, reply *DiskStatsReply) error { - syscall.Statfs("/", &reply.DiskStats) - return nil +func (s *controllerServerRPCService) DiskStats(r *http.Request, arg *ServerArg, res *DiskStatsRep) error { + return proxyRequest("DiskStats", arg.IP, arg, res) } -// SysInfo - system info for the server -func (s *ServerService) SysInfo(r *http.Request, arg *ServerArgs, reply *SysInfoReply) error { - reply.SysOS = runtime.GOOS - reply.SysARCH = runtime.GOARCH - reply.SysCPUS = runtime.NumCPU() - reply.GOVersion = runtime.Version() - reply.GORoutines = runtime.NumGoroutine() - var err error - reply.Hostname, err = os.Hostname() - if err != nil { - return probe.WrapError(probe.NewError(err)) - } - return nil +func (s *controllerServerRPCService) SysInfo(r *http.Request, arg *ServerArg, res *SysInfoRep) error { + return proxyRequest("SysInfo", arg.IP, arg, res) } -// List of servers in the cluster -func (s *ServerService) List(r *http.Request, arg *ServerArgs, reply *ServerListReply) error { - reply.ServerList = []MinioServer{ - { - "server.one", - "192.168.1.1", - "192.168.1.1", - "connected", - }, - { - "server.two", - "192.168.1.2", - "192.168.1.2", - "connected", - }, - { - "server.three", - "192.168.1.3", - "192.168.1.3", - "connected", - }, - } +func (s *controllerServerRPCService) List(r *http.Request, arg *ServerArg, res *ListRep) error { + res.List = s.serverList return nil } diff --git a/controller.go b/controller.go index 39dda56d9..2f3082c36 100644 --- a/controller.go +++ b/controller.go @@ -28,7 +28,7 @@ import ( ) // getRPCServer instance -func getRPCServer(rpcHandler http.Handler) (*http.Server, *probe.Error) { +func getControllerRPCServer(rpcHandler http.Handler) (*http.Server, *probe.Error) { // Minio server config httpServer := &http.Server{ Addr: ":9001", // TODO make this configurable @@ -56,7 +56,7 @@ func getRPCServer(rpcHandler http.Handler) (*http.Server, *probe.Error) { // StartController starts a minio controller func StartController() *probe.Error { - rpcServer, err := getRPCServer(getRPCHandler()) + rpcServer, err := getControllerRPCServer(getRPCCtrlHandler()) if err != nil { return err.Trace() } diff --git a/controller_rpc_test.go b/controller_rpc_test.go index ffb412852..dbc1ae1d2 100644 --- a/controller_rpc_test.go +++ b/controller_rpc_test.go @@ -1,3 +1,5 @@ +// +build ignore + /* * Minio Cloud Storage, (C) 2014 Minio, Inc. * @@ -38,7 +40,7 @@ func (s *ControllerRPCSuite) SetUpSuite(c *C) { c.Assert(err, IsNil) auth.SetAuthConfigPath(root) - testRPCServer = httptest.NewServer(getRPCHandler()) + testRPCServer = httptest.NewServer(getRPCCtrlHandler()) } func (s *ControllerRPCSuite) TearDownSuite(c *C) { @@ -48,7 +50,7 @@ func (s *ControllerRPCSuite) TearDownSuite(c *C) { func (s *ControllerRPCSuite) TestMemStats(c *C) { op := rpcOperation{ Method: "Server.MemStats", - Request: ServerArgs{}, + Request: ServerArg{}, } req, err := newRPCRequest(testRPCServer.URL+"/rpc", op, http.DefaultTransport) c.Assert(err, IsNil) @@ -57,16 +59,16 @@ func (s *ControllerRPCSuite) TestMemStats(c *C) { c.Assert(err, IsNil) c.Assert(resp.StatusCode, Equals, http.StatusOK) - var reply MemStatsReply + var reply MemStatsRep c.Assert(json.DecodeClientResponse(resp.Body, &reply), IsNil) resp.Body.Close() - c.Assert(reply, Not(DeepEquals), MemStatsReply{}) + c.Assert(reply, Not(DeepEquals), MemStatsRep{}) } func (s *ControllerRPCSuite) TestSysInfo(c *C) { op := rpcOperation{ Method: "Server.SysInfo", - Request: ServerArgs{}, + Request: ServerArg{}, } req, err := newRPCRequest(testRPCServer.URL+"/rpc", op, http.DefaultTransport) c.Assert(err, IsNil) @@ -75,16 +77,16 @@ func (s *ControllerRPCSuite) TestSysInfo(c *C) { c.Assert(err, IsNil) c.Assert(resp.StatusCode, Equals, http.StatusOK) - var reply SysInfoReply + var reply SysInfoRep c.Assert(json.DecodeClientResponse(resp.Body, &reply), IsNil) resp.Body.Close() - c.Assert(reply, Not(DeepEquals), SysInfoReply{}) + c.Assert(reply, Not(DeepEquals), SysInfoRep{}) } func (s *ControllerRPCSuite) TestServerList(c *C) { op := rpcOperation{ Method: "Server.List", - Request: ServerArgs{}, + Request: ServerArg{}, } req, err := newRPCRequest(testRPCServer.URL+"/rpc", op, http.DefaultTransport) c.Assert(err, IsNil) @@ -93,16 +95,16 @@ func (s *ControllerRPCSuite) TestServerList(c *C) { c.Assert(err, IsNil) c.Assert(resp.StatusCode, Equals, http.StatusOK) - var reply ServerListReply + var reply ServerListRep c.Assert(json.DecodeClientResponse(resp.Body, &reply), IsNil) resp.Body.Close() - c.Assert(reply, Not(DeepEquals), ServerListReply{}) + c.Assert(reply, Not(DeepEquals), ServerListRep{}) } func (s *ControllerRPCSuite) TestServerAdd(c *C) { op := rpcOperation{ Method: "Server.Add", - Request: ServerArgs{MinioServers: []MinioServer{}}, + Request: ServerArg{}, } req, err := newRPCRequest(testRPCServer.URL+"/rpc", op, http.DefaultTransport) c.Assert(err, IsNil) @@ -111,10 +113,10 @@ func (s *ControllerRPCSuite) TestServerAdd(c *C) { c.Assert(err, IsNil) c.Assert(resp.StatusCode, Equals, http.StatusOK) - var reply ServerAddReply + var reply DefaultRep c.Assert(json.DecodeClientResponse(resp.Body, &reply), IsNil) resp.Body.Close() - c.Assert(reply, Not(DeepEquals), ServerAddReply{ServersAdded: []MinioServer{}}) + c.Assert(reply, Not(DeepEquals), DefaultRep{0, "Added"}) } func (s *ControllerRPCSuite) TestAuth(c *C) { diff --git a/flags.go b/flags.go index ebe8e91b2..1dbb7efbc 100644 --- a/flags.go +++ b/flags.go @@ -35,6 +35,13 @@ var ( Usage: "ADDRESS:PORT for management console access", } + addressRPCServerFlag = cli.StringFlag{ + Name: "address-rpcserver", + Hide: true, + Value: ":9002", + Usage: "ADDRESS:PORT for management console access", + } + ratelimitFlag = cli.IntFlag{ Name: "ratelimit", Value: 16, diff --git a/main.go b/main.go index 90e2f262d..20a629c8c 100644 --- a/main.go +++ b/main.go @@ -91,6 +91,7 @@ func registerApp() *cli.App { registerFlag(certFlag) registerFlag(keyFlag) registerFlag(debugFlag) + registerFlag(addressRPCServerFlag) // set up app app := cli.NewApp() diff --git a/rpc-definitions.go b/rpc-definitions.go new file mode 100644 index 000000000..0d610ca68 --- /dev/null +++ b/rpc-definitions.go @@ -0,0 +1,80 @@ +/* + * Minio Cloud Storage, (C) 2014 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +// Ethernet interface +type Ifc struct { + IP string `json:"ip"` + Mask string `json:"mask"` + Eth string `json:"ifc"` +} + +// Identify a server +type ServerArg struct { + Name string `json:"name"` + IP string `json:"ip"` + ID string `json:"id"` +} + +// Needed for Reply for Server.List +type ServerRep struct { + Name string `json:"name"` + IP string `json:"ip"` + ID string `json:"id"` +} + +// Default reply +type DefaultRep struct { + Error int64 `json:"error"` + Message string `json:"message"` +} + +// Needed for Reply List +type ServerListRep struct { + List []ServerRep +} + +// Reply DiskStats +type DiskStatsRep struct { + Disks []string +} + +// Reply MemStats +type MemStatsRep struct { + Total uint64 `json:"total"` + Free uint64 `json:"free"` +} + +// Reply NetStats +type NetStatsRep struct { + Interfaces []Ifc +} + +// Reply SysInfo +type SysInfoRep struct { + Hostname string `json:"hostname"` + SysARCH string `json:"sys.arch"` + SysOS string `json:"sys.os"` + SysCPUS int `json:"sys.ncpus"` + Routines int `json:"goroutines"` + GOVersion string `json:"goversion"` +} + +// Reply List +type ListRep struct { + List []ServerArg `json:"list"` +} diff --git a/server-api-definitions.go b/server-api-definitions.go index 727b10c22..44ada72c7 100644 --- a/server-api-definitions.go +++ b/server-api-definitions.go @@ -20,11 +20,12 @@ import "encoding/xml" // APIConfig - http server config type APIConfig struct { - Address string - TLS bool - CertFile string - KeyFile string - RateLimit int + Address string + AddressRPC string + TLS bool + CertFile string + KeyFile string + RateLimit int } // Limit number of objects in a given response diff --git a/server-main.go b/server-main.go index 8d8947dc0..2f2b5b46f 100644 --- a/server-main.go +++ b/server-main.go @@ -42,12 +42,14 @@ func getServerConfig(c *cli.Context) APIConfig { Fatalln("Both certificate and key are required to enable https.") } tls := (certFile != "" && keyFile != "") + return APIConfig{ - Address: c.GlobalString("address"), - TLS: tls, - CertFile: certFile, - KeyFile: keyFile, - RateLimit: c.GlobalInt("ratelimit"), + Address: c.GlobalString("address"), + AddressRPC: c.GlobalString("address-rpcserver"), + TLS: tls, + CertFile: certFile, + KeyFile: keyFile, + RateLimit: c.GlobalInt("ratelimit"), } } diff --git a/server-router.go b/server-router.go index 92e6b118c..2ebf5102c 100644 --- a/server-router.go +++ b/server-router.go @@ -20,6 +20,8 @@ import ( "net/http" router "github.com/gorilla/mux" + jsonrpc "github.com/gorilla/rpc/v2" + "github.com/gorilla/rpc/v2/json" ) // registerAPI - register all the object API handlers to their respective paths @@ -70,3 +72,12 @@ func getAPIHandler(conf APIConfig) (http.Handler, MinioAPI) { apiHandler := registerCustomMiddleware(mux, mwHandlers...) return apiHandler, minioAPI } + +func getRPCServerHandler() http.Handler { + s := jsonrpc.NewServer() + s.RegisterCodec(json.NewCodec(), "application/json") + s.RegisterService(new(serverServerService), "Server") + mux := router.NewRouter() + mux.Handle("/rpc", s) + return mux +} diff --git a/server-rpc-server.go b/server-rpc-server.go new file mode 100644 index 000000000..94e493608 --- /dev/null +++ b/server-rpc-server.go @@ -0,0 +1,63 @@ +/* + * Minio Cloud Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "net/http" + "os" + "runtime" + + "github.com/minio/minio/pkg/probe" +) + +type serverServerService struct{} + +func (s *serverServerService) Add(r *http.Request, arg *ServerArg, rep *DefaultRep) error { + rep.Error = 0 + rep.Message = "Added successfully" + return nil +} + +func (s *serverServerService) MemStats(r *http.Request, arg *ServerArg, rep *MemStatsRep) error { + rep.Total = 64 * 1024 * 1024 * 1024 + rep.Free = 9 * 1024 * 1024 * 1024 + return nil +} + +func (s *serverServerService) DiskStats(r *http.Request, arg *ServerArg, rep *DiskStatsRep) error { + rep.Disks = []string{"/mnt/disk1", "/mnt/disk2", "/mnt/disk3", "/mnt/disk4", "/mnt/disk5", "/mnt/disk6"} + return nil +} + +func (s *serverServerService) SysInfo(r *http.Request, arg *ServerArg, rep *SysInfoRep) error { + rep.SysARCH = runtime.GOARCH + rep.SysOS = runtime.GOOS + rep.SysCPUS = runtime.NumCPU() + rep.Routines = runtime.NumGoroutine() + rep.GOVersion = runtime.Version() + var err error + rep.Hostname, err = os.Hostname() + if err != nil { + return probe.WrapError(probe.NewError(err)) + } + return nil +} + +func (s *serverServerService) NetStats(r *http.Request, arg *ServerArg, rep *NetStatsRep) error { + rep.Interfaces = []Ifc{{"192.168.1.1", "255.255.255.0", "eth0"}} + return nil +} diff --git a/server.go b/server.go index a8ae6605a..6ee18a9a4 100644 --- a/server.go +++ b/server.go @@ -28,28 +28,20 @@ import ( "github.com/minio/minio/pkg/probe" ) -// getAPI server instance -func getAPIServer(conf APIConfig, apiHandler http.Handler) (*http.Server, *probe.Error) { - // Minio server config - httpServer := &http.Server{ - Addr: conf.Address, - Handler: apiHandler, - MaxHeaderBytes: 1 << 20, - } - +func configureServer(conf APIConfig, httpServer *http.Server) *probe.Error { if conf.TLS { var err error httpServer.TLSConfig = &tls.Config{} httpServer.TLSConfig.Certificates = make([]tls.Certificate, 1) httpServer.TLSConfig.Certificates[0], err = tls.LoadX509KeyPair(conf.CertFile, conf.KeyFile) if err != nil { - return nil, probe.NewError(err) + return probe.NewError(err) } } host, port, err := net.SplitHostPort(conf.Address) if err != nil { - return nil, probe.NewError(err) + return probe.NewError(err) } var hosts []string @@ -59,7 +51,7 @@ func getAPIServer(conf APIConfig, apiHandler http.Handler) (*http.Server, *probe default: addrs, err := net.InterfaceAddrs() if err != nil { - return nil, probe.NewError(err) + return probe.NewError(err) } for _, addr := range addrs { if addr.Network() == "ip+net" { @@ -77,7 +69,20 @@ func getAPIServer(conf APIConfig, apiHandler http.Handler) (*http.Server, *probe } else { fmt.Printf("Starting minio server on: http://%s:%s, PID: %d\n", host, port, os.Getpid()) } + } + return nil +} +// getAPI server instance +func getAPIServer(conf APIConfig, apiHandler http.Handler) (*http.Server, *probe.Error) { + // Minio server config + httpServer := &http.Server{ + Addr: conf.Address, + Handler: apiHandler, + MaxHeaderBytes: 1 << 20, + } + if err := configureServer(conf, httpServer); err != nil { + return nil, err } return httpServer, nil } @@ -91,7 +96,19 @@ func startTM(a MinioAPI) { } } -// StartServer starts an s3 compatible cloud storage server +func getServerRPCServer(conf APIConfig, handler http.Handler) (*http.Server, *probe.Error) { + httpServer := &http.Server{ + Addr: conf.AddressRPC, + Handler: handler, + MaxHeaderBytes: 1 << 20, + } + if err := configureServer(conf, httpServer); err != nil { + return nil, err + } + return httpServer, nil +} + +// Start starts a s3 compatible cloud storage server func StartServer(conf APIConfig) *probe.Error { apiHandler, minioAPI := getAPIHandler(conf) apiServer, err := getAPIServer(conf, apiHandler) @@ -100,7 +117,12 @@ func StartServer(conf APIConfig) *probe.Error { } // start ticket master go startTM(minioAPI) - if err := minhttp.ListenAndServeLimited(conf.RateLimit, apiServer); err != nil { + rpcHandler := getRPCServerHandler() + rpcServer, err := getServerRPCServer(conf, rpcHandler) + if err != nil { + return err.Trace() + } + if err := minhttp.ListenAndServeLimited(conf.RateLimit, apiServer, rpcServer); err != nil { return err.Trace() } return nil