Merge pull request #861 from harshavardhana/controller

Add list of servers, for controller args.
master
Harshavardhana 9 years ago
commit c3aa35424e
  1. 93
      controller-rpc.go
  2. 18
      controller_rpc_test.go
  3. 36
      rpc-definitions.go
  4. 21
      server-rpc.go

@ -19,6 +19,7 @@ package main
import ( import (
"errors" "errors"
"net/http" "net/http"
"net/url"
"os" "os"
"strings" "strings"
@ -29,7 +30,7 @@ import (
) )
type controllerRPCService struct { type controllerRPCService struct {
serverList []ServerArg serverList []ServerRep
} }
func makeDonut(args *DonutArgs, reply *DefaultRep) *probe.Error { func makeDonut(args *DonutArgs, reply *DefaultRep) *probe.Error {
@ -171,46 +172,92 @@ func (s *controllerRPCService) ResetAuth(r *http.Request, args *AuthArgs, reply
return nil return nil
} }
func proxyRequest(method string, url string, arg interface{}, res interface{}) error { func proxyRequest(method, host string, ssl bool, res interface{}) *probe.Error {
// can be configured to something else in future u := &url.URL{
Scheme: func() string {
if ssl {
return "https"
}
return "http"
}(),
Host: host,
Path: "/rpc",
}
op := rpcOperation{ op := rpcOperation{
Method: method, Method: method,
Request: arg, Request: ServerArg{},
} }
request, _ := newRPCRequest(url, op, nil) request, err := newRPCRequest(u.String(), op, nil)
resp, err := request.Do()
if err != nil { if err != nil {
return probe.WrapError(err) return err.Trace()
} }
decodeerr := json.DecodeClientResponse(resp.Body, res) var resp *http.Response
return decodeerr resp, err = request.Do()
if err != nil {
return err.Trace()
}
if err := json.DecodeClientResponse(resp.Body, res); err != nil {
return probe.NewError(err)
}
return nil
} }
func (s *controllerRPCService) AddServer(r *http.Request, arg *ServerArg, res *DefaultRep) error { func (s *controllerRPCService) AddServer(r *http.Request, args *ControllerArgs, res *ServerRep) error {
err := proxyRequest("Server.Add", arg.URL, arg, res) for _, host := range args.Hosts {
if err == nil { err := proxyRequest("Server.Add", host, args.SSL, res)
s.serverList = append(s.serverList, *arg) if err != nil {
return probe.WrapError(err)
}
s.serverList = append(s.serverList, *res)
} }
return err return nil
} }
func (s *controllerRPCService) GetServerMemStats(r *http.Request, arg *ServerArg, res *MemStatsRep) error { func (s *controllerRPCService) GetServerMemStats(r *http.Request, args *ControllerArgs, res *MemStatsRep) error {
return proxyRequest("Server.MemStats", arg.URL, arg, res) for _, host := range args.Hosts {
err := proxyRequest("Server.MemStats", host, args.SSL, res)
if err != nil {
return probe.WrapError(err)
}
return nil
}
return errors.New("Invalid argument")
} }
func (s *controllerRPCService) GetServerDiskStats(r *http.Request, arg *ServerArg, res *DiskStatsRep) error { func (s *controllerRPCService) GetServerDiskStats(r *http.Request, args *ControllerArgs, res *DiskStatsRep) error {
return proxyRequest("Server.DiskStats", arg.URL, arg, res) for _, host := range args.Hosts {
err := proxyRequest("Server.DiskStats", host, args.SSL, res)
if err != nil {
return probe.WrapError(err)
}
return nil
}
return errors.New("Invalid argument")
} }
func (s *controllerRPCService) GetServerSysInfo(r *http.Request, arg *ServerArg, res *SysInfoRep) error { func (s *controllerRPCService) GetServerSysInfo(r *http.Request, args *ControllerArgs, res *SysInfoRep) error {
return proxyRequest("Server.SysInfo", arg.URL, arg, res) for _, host := range args.Hosts {
err := proxyRequest("Server.SysInfo", host, args.SSL, res)
if err != nil {
return probe.WrapError(err)
}
return nil
}
return errors.New("Invalid argument")
} }
func (s *controllerRPCService) ListServers(r *http.Request, arg *ServerArg, res *ListRep) error { func (s *controllerRPCService) ListServers(r *http.Request, args *ControllerArgs, res *ListRep) error {
res.List = s.serverList res.List = s.serverList
return nil return nil
} }
func (s *controllerRPCService) GetServerVersion(r *http.Request, arg *ServerArg, res *VersionRep) error { func (s *controllerRPCService) GetServerVersion(r *http.Request, args *ControllerArgs, res *VersionRep) error {
return proxyRequest("Server.Version", arg.URL, arg, res) for _, host := range args.Hosts {
err := proxyRequest("Server.Version", host, args.SSL, res)
if err != nil {
return probe.WrapError(err)
}
return nil
}
return errors.New("Invalid argument")
} }

@ -20,6 +20,7 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url"
"os" "os"
"github.com/gorilla/rpc/v2/json" "github.com/gorilla/rpc/v2/json"
@ -29,6 +30,7 @@ import (
type ControllerRPCSuite struct { type ControllerRPCSuite struct {
root string root string
url *url.URL
} }
var _ = Suite(&ControllerRPCSuite{}) var _ = Suite(&ControllerRPCSuite{})
@ -48,6 +50,10 @@ func (s *ControllerRPCSuite) SetUpSuite(c *C) {
testServerRPC = httptest.NewUnstartedServer(getServerRPCHandler()) testServerRPC = httptest.NewUnstartedServer(getServerRPCHandler())
testServerRPC.Config.Addr = ":9002" testServerRPC.Config.Addr = ":9002"
testServerRPC.Start() testServerRPC.Start()
url, gerr := url.Parse(testServerRPC.URL)
c.Assert(gerr, IsNil)
s.url = url
} }
func (s *ControllerRPCSuite) TearDownSuite(c *C) { func (s *ControllerRPCSuite) TearDownSuite(c *C) {
@ -59,7 +65,7 @@ func (s *ControllerRPCSuite) TearDownSuite(c *C) {
func (s *ControllerRPCSuite) TestMemStats(c *C) { func (s *ControllerRPCSuite) TestMemStats(c *C) {
op := rpcOperation{ op := rpcOperation{
Method: "Controller.GetServerMemStats", Method: "Controller.GetServerMemStats",
Request: ServerArg{URL: testServerRPC.URL + "/rpc"}, Request: ControllerArgs{Hosts: []string{s.url.Host}},
} }
req, err := newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport) req, err := newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport)
c.Assert(err, IsNil) c.Assert(err, IsNil)
@ -77,7 +83,7 @@ func (s *ControllerRPCSuite) TestMemStats(c *C) {
func (s *ControllerRPCSuite) TestDiskStats(c *C) { func (s *ControllerRPCSuite) TestDiskStats(c *C) {
op := rpcOperation{ op := rpcOperation{
Method: "Controller.GetServerDiskStats", Method: "Controller.GetServerDiskStats",
Request: ServerArg{URL: testServerRPC.URL + "/rpc"}, Request: ControllerArgs{Hosts: []string{s.url.Host}},
} }
req, err := newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport) req, err := newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport)
c.Assert(err, IsNil) c.Assert(err, IsNil)
@ -95,7 +101,7 @@ func (s *ControllerRPCSuite) TestDiskStats(c *C) {
func (s *ControllerRPCSuite) TestSysInfo(c *C) { func (s *ControllerRPCSuite) TestSysInfo(c *C) {
op := rpcOperation{ op := rpcOperation{
Method: "Controller.GetServerSysInfo", Method: "Controller.GetServerSysInfo",
Request: ServerArg{URL: testServerRPC.URL + "/rpc"}, Request: ControllerArgs{Hosts: []string{s.url.Host}},
} }
req, err := newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport) req, err := newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport)
c.Assert(err, IsNil) c.Assert(err, IsNil)
@ -113,7 +119,7 @@ func (s *ControllerRPCSuite) TestSysInfo(c *C) {
func (s *ControllerRPCSuite) TestServerList(c *C) { func (s *ControllerRPCSuite) TestServerList(c *C) {
op := rpcOperation{ op := rpcOperation{
Method: "Controller.ListServers", Method: "Controller.ListServers",
Request: ServerArg{URL: testServerRPC.URL + "/rpc"}, Request: ControllerArgs{Hosts: []string{s.url.Host}},
} }
req, err := newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport) req, err := newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport)
c.Assert(err, IsNil) c.Assert(err, IsNil)
@ -125,13 +131,13 @@ func (s *ControllerRPCSuite) TestServerList(c *C) {
var reply ServerListRep var reply ServerListRep
c.Assert(json.DecodeClientResponse(resp.Body, &reply), IsNil) c.Assert(json.DecodeClientResponse(resp.Body, &reply), IsNil)
resp.Body.Close() resp.Body.Close()
c.Assert(reply, Not(DeepEquals), ServerListRep{}) c.Assert(reply, Not(DeepEquals), ServerListRep{List: []ServerRep{}})
} }
func (s *ControllerRPCSuite) TestServerAdd(c *C) { func (s *ControllerRPCSuite) TestServerAdd(c *C) {
op := rpcOperation{ op := rpcOperation{
Method: "Controller.AddServer", Method: "Controller.AddServer",
Request: ServerArg{URL: testServerRPC.URL + "/rpc"}, Request: ControllerArgs{Hosts: []string{s.url.Host}},
} }
req, err := newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport) req, err := newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport)
c.Assert(err, IsNil) c.Assert(err, IsNil)

@ -16,6 +16,8 @@
package main package main
//// In memory metadata
//// RPC params //// RPC params
// AuthArgs auth params // AuthArgs auth params
@ -31,20 +33,22 @@ type DonutArgs struct {
Disks []string Disks []string
} }
// ServerArg server metadata to identify a server // ServerArg server params
type ServerArg struct { type ServerArg struct{}
Name string `json:"name"`
URL string `json:"url"` // ControllerArgs controller params
ID string `json:"id"` type ControllerArgs struct {
Hosts []string `json:"hosts"` // hosts is a collection of host or host:port
SSL bool `json:"ssl"`
ID string `json:"id"`
} }
//// RPC replies //// RPC replies
// ServerRep server reply container for Server.List // ServerRep server reply container for Server.List
type ServerRep struct { type ServerRep struct {
Name string `json:"name"` Host string `json:"host"`
Address string `json:"address"` ID string `json:"id"`
ID string `json:"id"`
} }
// DefaultRep default reply // DefaultRep default reply
@ -69,13 +73,17 @@ type MemStatsRep struct {
Free uint64 `json:"free"` Free uint64 `json:"free"`
} }
// Network metadata of a server
type Network struct {
IP string `json:"address"`
NetMask string `json:"netmask"`
Hostname string `json:"hostname"`
Ethernet string `json:"networkInterface"`
}
// NetStatsRep network statistics of a server // NetStatsRep network statistics of a server
type NetStatsRep struct { type NetStatsRep struct {
Interfaces []struct { Interfaces []Network
IP string `json:"address"`
Mask string `json:"netmask"`
Ethernet string `json:"networkInterface"`
}
} }
// SysInfoRep system information of a server // SysInfoRep system information of a server
@ -90,7 +98,7 @@ type SysInfoRep struct {
// ListRep all servers list // ListRep all servers list
type ListRep struct { type ListRep struct {
List []ServerArg `json:"list"` List []ServerRep `json:"list"`
} }
// VersionRep version reply // VersionRep version reply

@ -26,9 +26,11 @@ import (
type serverRPCService struct{} type serverRPCService struct{}
func (s *serverRPCService) Add(r *http.Request, arg *ServerArg, rep *DefaultRep) error { func (s *serverRPCService) Add(r *http.Request, arg *ServerArg, rep *ServerRep) error {
rep.Message = "Server " + arg.URL + " added successfully" rep = &ServerRep{
rep.Error = nil Host: "192.168.1.1:9002",
ID: "6F27CB16-493D-40FA-B035-2A2E5646066A",
}
return nil return nil
} }
@ -58,17 +60,8 @@ func (s *serverRPCService) SysInfo(r *http.Request, arg *ServerArg, rep *SysInfo
} }
func (s *serverRPCService) NetStats(r *http.Request, arg *ServerArg, rep *NetStatsRep) error { func (s *serverRPCService) NetStats(r *http.Request, arg *ServerArg, rep *NetStatsRep) error {
rep.Interfaces = []struct { rep.Interfaces = make([]Network, 0)
IP string `json:"address"` rep.Interfaces = append(rep.Interfaces, Network{IP: "192.168.1.1", NetMask: "255.255.255.0", Hostname: "hostname1", Ethernet: "eth0"})
Mask string `json:"netmask"`
Ethernet string `json:"networkInterface"`
}{
{
"192.168.1.1",
"255.255.255.0",
"eth0",
},
}
return nil return nil
} }

Loading…
Cancel
Save