Consolidate controller rpc into one single service

master
Harshavardhana 9 years ago
parent 29673fed76
commit d1621691b7
  1. 4
      controller-router.go
  2. 63
      controller-rpc-donut.go
  3. 73
      controller-rpc-server.go
  4. 89
      controller-rpc.go
  5. 50
      controller_rpc_test.go
  6. 35
      rpc-definitions.go
  7. 16
      server-rpc-server.go

@ -28,9 +28,7 @@ import (
func getControllerRPCHandler() http.Handler { func getControllerRPCHandler() http.Handler {
s := jsonrpc.NewServer() s := jsonrpc.NewServer()
s.RegisterCodec(json.NewCodec(), "application/json") s.RegisterCodec(json.NewCodec(), "application/json")
s.RegisterService(new(DonutService), "Donut") s.RegisterService(new(controllerRPCService), "Controller")
s.RegisterService(new(AuthService), "Auth")
s.RegisterService(new(controllerRPCService), "Server")
// Add new RPC services here // Add new RPC services here
return registerRPC(router.NewRouter(), s) return registerRPC(router.NewRouter(), s)
} }

@ -1,63 +0,0 @@
/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"net/http"
"github.com/minio/minio/pkg/donut"
"github.com/minio/minio/pkg/probe"
)
// DonutService donut service
type DonutService struct{}
// DonutArgs collections of disks and name to initialize donut
type DonutArgs struct {
Name string
MaxSize uint64
Hostname string
Disks []string
}
// Reply reply for successful or failed Set operation
type Reply struct {
Message string `json:"message"`
Error error `json:"error"`
}
func setDonut(args *DonutArgs, reply *Reply) *probe.Error {
conf := &donut.Config{Version: "0.0.1"}
conf.DonutName = args.Name
conf.MaxSize = args.MaxSize
conf.NodeDiskMap = make(map[string][]string)
conf.NodeDiskMap[args.Hostname] = args.Disks
if err := donut.SaveConfig(conf); err != nil {
return err.Trace()
}
reply.Message = "success"
reply.Error = nil
return nil
}
// Set method
func (s *DonutService) Set(r *http.Request, args *DonutArgs, reply *Reply) error {
if err := setDonut(args, reply); err != nil {
return probe.WrapError(err)
}
return nil
}

@ -1,73 +0,0 @@
/*
* Minio Cloud Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"net/http"
"github.com/gorilla/rpc/v2/json"
"github.com/minio/minio/pkg/probe"
)
type controllerRPCService struct {
serverList []ServerArg
}
func proxyRequest(method string, url string, arg interface{}, res interface{}) error {
// can be configured to something else in future
namespace := "Server"
op := rpcOperation{
Method: namespace + "." + method,
Request: arg,
}
request, _ := newRPCRequest(url, op, nil)
resp, err := request.Do()
if err != nil {
return probe.WrapError(err)
}
decodeerr := json.DecodeClientResponse(resp.Body, res)
return decodeerr
}
func (s *controllerRPCService) Add(r *http.Request, arg *ServerArg, res *DefaultRep) error {
err := proxyRequest("Add", arg.URL, arg, res)
if err == nil {
s.serverList = append(s.serverList, *arg)
}
return err
}
func (s *controllerRPCService) MemStats(r *http.Request, arg *ServerArg, res *MemStatsRep) error {
return proxyRequest("MemStats", arg.URL, arg, res)
}
func (s *controllerRPCService) DiskStats(r *http.Request, arg *ServerArg, res *DiskStatsRep) error {
return proxyRequest("DiskStats", arg.URL, arg, res)
}
func (s *controllerRPCService) SysInfo(r *http.Request, arg *ServerArg, res *SysInfoRep) error {
return proxyRequest("SysInfo", arg.URL, arg, res)
}
func (s *controllerRPCService) List(r *http.Request, arg *ServerArg, res *ListRep) error {
res.List = s.serverList
return nil
}
func (s *controllerRPCService) Version(r *http.Request, arg *ServerArg, res *VersionRep) error {
return proxyRequest("Version", arg.URL, arg, res)
}

@ -22,27 +22,40 @@ import (
"os" "os"
"strings" "strings"
"github.com/gorilla/rpc/v2/json"
"github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/donut"
"github.com/minio/minio/pkg/probe" "github.com/minio/minio/pkg/probe"
) )
// AuthService auth service type controllerRPCService struct {
type AuthService struct{} serverList []ServerArg
}
// AuthArgs auth params func makeDonut(args *DonutArgs, reply *DefaultRep) *probe.Error {
type AuthArgs struct { conf := &donut.Config{Version: "0.0.1"}
User string `json:"user"` conf.DonutName = args.Name
conf.MaxSize = args.MaxSize
conf.NodeDiskMap = make(map[string][]string)
conf.NodeDiskMap[args.Hostname] = args.Disks
if err := donut.SaveConfig(conf); err != nil {
return err.Trace()
}
reply.Message = "success"
reply.Error = nil
return nil
} }
// AuthReply reply with new access keys and secret ids // MakeDonut method
type AuthReply struct { func (s *controllerRPCService) MakeDonut(r *http.Request, args *DonutArgs, reply *DefaultRep) error {
Name string `json:"name"` if err := makeDonut(args, reply); err != nil {
AccessKeyID string `json:"accessKeyId"` return probe.WrapError(err)
SecretAccessKey string `json:"secretAccessKey"` }
return nil
} }
// generateAuth generate new auth keys for a user // generateAuth generate new auth keys for a user
func generateAuth(args *AuthArgs, reply *AuthReply) *probe.Error { func generateAuth(args *AuthArgs, reply *AuthRep) *probe.Error {
config, err := auth.LoadConfig() config, err := auth.LoadConfig()
if err != nil { if err != nil {
if os.IsNotExist(err.ToGoError()) { if os.IsNotExist(err.ToGoError()) {
@ -82,7 +95,7 @@ func generateAuth(args *AuthArgs, reply *AuthReply) *probe.Error {
} }
// fetchAuth fetch auth keys for a user // fetchAuth fetch auth keys for a user
func fetchAuth(args *AuthArgs, reply *AuthReply) *probe.Error { func fetchAuth(args *AuthArgs, reply *AuthRep) *probe.Error {
config, err := auth.LoadConfig() config, err := auth.LoadConfig()
if err != nil { if err != nil {
return err.Trace() return err.Trace()
@ -97,7 +110,7 @@ func fetchAuth(args *AuthArgs, reply *AuthReply) *probe.Error {
} }
// resetAuth reset auth keys for a user // resetAuth reset auth keys for a user
func resetAuth(args *AuthArgs, reply *AuthReply) *probe.Error { func resetAuth(args *AuthArgs, reply *AuthRep) *probe.Error {
config, err := auth.LoadConfig() config, err := auth.LoadConfig()
if err != nil { if err != nil {
return err.Trace() return err.Trace()
@ -126,7 +139,7 @@ func resetAuth(args *AuthArgs, reply *AuthReply) *probe.Error {
} }
// Generate auth keys // Generate auth keys
func (s *AuthService) Generate(r *http.Request, args *AuthArgs, reply *AuthReply) error { func (s *controllerRPCService) GenerateAuth(r *http.Request, args *AuthArgs, reply *AuthRep) error {
if strings.TrimSpace(args.User) == "" { if strings.TrimSpace(args.User) == "" {
return errors.New("Invalid argument") return errors.New("Invalid argument")
} }
@ -137,7 +150,7 @@ func (s *AuthService) Generate(r *http.Request, args *AuthArgs, reply *AuthReply
} }
// Fetch auth keys // Fetch auth keys
func (s *AuthService) Fetch(r *http.Request, args *AuthArgs, reply *AuthReply) error { func (s *controllerRPCService) FetchAuth(r *http.Request, args *AuthArgs, reply *AuthRep) error {
if strings.TrimSpace(args.User) == "" { if strings.TrimSpace(args.User) == "" {
return errors.New("Invalid argument") return errors.New("Invalid argument")
} }
@ -148,7 +161,7 @@ func (s *AuthService) Fetch(r *http.Request, args *AuthArgs, reply *AuthReply) e
} }
// Reset auth keys, generates new set of auth keys // Reset auth keys, generates new set of auth keys
func (s *AuthService) Reset(r *http.Request, args *AuthArgs, reply *AuthReply) error { func (s *controllerRPCService) ResetAuth(r *http.Request, args *AuthArgs, reply *AuthRep) error {
if strings.TrimSpace(args.User) == "" { if strings.TrimSpace(args.User) == "" {
return errors.New("Invalid argument") return errors.New("Invalid argument")
} }
@ -157,3 +170,47 @@ func (s *AuthService) Reset(r *http.Request, args *AuthArgs, reply *AuthReply) e
} }
return nil return nil
} }
func proxyRequest(method string, url string, arg interface{}, res interface{}) error {
// can be configured to something else in future
op := rpcOperation{
Method: method,
Request: arg,
}
request, _ := newRPCRequest(url, op, nil)
resp, err := request.Do()
if err != nil {
return probe.WrapError(err)
}
decodeerr := json.DecodeClientResponse(resp.Body, res)
return decodeerr
}
func (s *controllerRPCService) AddServer(r *http.Request, arg *ServerArg, res *DefaultRep) error {
err := proxyRequest("Server.Add", arg.URL, arg, res)
if err == nil {
s.serverList = append(s.serverList, *arg)
}
return err
}
func (s *controllerRPCService) GetServerMemStats(r *http.Request, arg *ServerArg, res *MemStatsRep) error {
return proxyRequest("Server.MemStats", arg.URL, arg, res)
}
func (s *controllerRPCService) GetServerDiskStats(r *http.Request, arg *ServerArg, res *DiskStatsRep) error {
return proxyRequest("Server.DiskStats", arg.URL, arg, res)
}
func (s *controllerRPCService) GetServerSysInfo(r *http.Request, arg *ServerArg, res *SysInfoRep) error {
return proxyRequest("Server.SysInfo", arg.URL, arg, res)
}
func (s *controllerRPCService) ListServers(r *http.Request, arg *ServerArg, res *ListRep) error {
res.List = s.serverList
return nil
}
func (s *controllerRPCService) GetServerVersion(r *http.Request, arg *ServerArg, res *VersionRep) error {
return proxyRequest("Server.Version", arg.URL, arg, res)
}

@ -54,7 +54,7 @@ func (s *ControllerRPCSuite) TearDownSuite(c *C) {
func (s *ControllerRPCSuite) TestMemStats(c *C) { func (s *ControllerRPCSuite) TestMemStats(c *C) {
op := rpcOperation{ op := rpcOperation{
Method: "Server.MemStats", Method: "Controller.GetServerMemStats",
Request: ServerArg{URL: testServerRPC.URL + "/rpc"}, Request: ServerArg{URL: testServerRPC.URL + "/rpc"},
} }
req, err := newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport) req, err := newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport)
@ -70,9 +70,27 @@ func (s *ControllerRPCSuite) TestMemStats(c *C) {
c.Assert(reply, Not(DeepEquals), MemStatsRep{}) c.Assert(reply, Not(DeepEquals), MemStatsRep{})
} }
func (s *ControllerRPCSuite) TestDiskStats(c *C) {
op := rpcOperation{
Method: "Controller.GetServerDiskStats",
Request: ServerArg{URL: testServerRPC.URL + "/rpc"},
}
req, err := newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport)
c.Assert(err, IsNil)
c.Assert(req.Get("Content-Type"), Equals, "application/json")
resp, err := req.Do()
c.Assert(err, IsNil)
c.Assert(resp.StatusCode, Equals, http.StatusOK)
var reply MemStatsRep
c.Assert(json.DecodeClientResponse(resp.Body, &reply), IsNil)
resp.Body.Close()
c.Assert(reply, Not(DeepEquals), DiskStatsRep{})
}
func (s *ControllerRPCSuite) TestSysInfo(c *C) { func (s *ControllerRPCSuite) TestSysInfo(c *C) {
op := rpcOperation{ op := rpcOperation{
Method: "Server.SysInfo", Method: "Controller.GetServerSysInfo",
Request: ServerArg{URL: testServerRPC.URL + "/rpc"}, Request: ServerArg{URL: testServerRPC.URL + "/rpc"},
} }
req, err := newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport) req, err := newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport)
@ -90,7 +108,7 @@ func (s *ControllerRPCSuite) TestSysInfo(c *C) {
func (s *ControllerRPCSuite) TestServerList(c *C) { func (s *ControllerRPCSuite) TestServerList(c *C) {
op := rpcOperation{ op := rpcOperation{
Method: "Server.List", Method: "Controller.ListServers",
Request: ServerArg{URL: testServerRPC.URL + "/rpc"}, Request: ServerArg{URL: testServerRPC.URL + "/rpc"},
} }
req, err := newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport) req, err := newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport)
@ -108,7 +126,7 @@ func (s *ControllerRPCSuite) TestServerList(c *C) {
func (s *ControllerRPCSuite) TestServerAdd(c *C) { func (s *ControllerRPCSuite) TestServerAdd(c *C) {
op := rpcOperation{ op := rpcOperation{
Method: "Server.Add", Method: "Controller.AddServer",
Request: ServerArg{URL: testServerRPC.URL + "/rpc"}, Request: ServerArg{URL: testServerRPC.URL + "/rpc"},
} }
req, err := newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport) req, err := newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport)
@ -121,12 +139,12 @@ func (s *ControllerRPCSuite) TestServerAdd(c *C) {
var reply DefaultRep var reply DefaultRep
c.Assert(json.DecodeClientResponse(resp.Body, &reply), IsNil) c.Assert(json.DecodeClientResponse(resp.Body, &reply), IsNil)
resp.Body.Close() resp.Body.Close()
c.Assert(reply, Not(DeepEquals), DefaultRep{0, "Added"}) c.Assert(reply, Not(DeepEquals), DefaultRep{nil, "Added"})
} }
func (s *ControllerRPCSuite) TestAuth(c *C) { func (s *ControllerRPCSuite) TestAuth(c *C) {
op := rpcOperation{ op := rpcOperation{
Method: "Auth.Generate", Method: "Controller.GenerateAuth",
Request: AuthArgs{User: "newuser"}, Request: AuthArgs{User: "newuser"},
} }
req, err := newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport) req, err := newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport)
@ -136,16 +154,16 @@ func (s *ControllerRPCSuite) TestAuth(c *C) {
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(resp.StatusCode, Equals, http.StatusOK) c.Assert(resp.StatusCode, Equals, http.StatusOK)
var reply AuthReply var reply AuthRep
c.Assert(json.DecodeClientResponse(resp.Body, &reply), IsNil) c.Assert(json.DecodeClientResponse(resp.Body, &reply), IsNil)
resp.Body.Close() resp.Body.Close()
c.Assert(reply, Not(DeepEquals), AuthReply{}) c.Assert(reply, Not(DeepEquals), AuthRep{})
c.Assert(len(reply.AccessKeyID), Equals, 20) c.Assert(len(reply.AccessKeyID), Equals, 20)
c.Assert(len(reply.SecretAccessKey), Equals, 40) c.Assert(len(reply.SecretAccessKey), Equals, 40)
c.Assert(len(reply.Name), Not(Equals), 0) c.Assert(len(reply.Name), Not(Equals), 0)
op = rpcOperation{ op = rpcOperation{
Method: "Auth.Fetch", Method: "Controller.FetchAuth",
Request: AuthArgs{User: "newuser"}, Request: AuthArgs{User: "newuser"},
} }
req, err = newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport) req, err = newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport)
@ -155,16 +173,16 @@ func (s *ControllerRPCSuite) TestAuth(c *C) {
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(resp.StatusCode, Equals, http.StatusOK) c.Assert(resp.StatusCode, Equals, http.StatusOK)
var newReply AuthReply var newReply AuthRep
c.Assert(json.DecodeClientResponse(resp.Body, &newReply), IsNil) c.Assert(json.DecodeClientResponse(resp.Body, &newReply), IsNil)
resp.Body.Close() resp.Body.Close()
c.Assert(newReply, Not(DeepEquals), AuthReply{}) c.Assert(newReply, Not(DeepEquals), AuthRep{})
c.Assert(reply.AccessKeyID, Equals, newReply.AccessKeyID) c.Assert(reply.AccessKeyID, Equals, newReply.AccessKeyID)
c.Assert(reply.SecretAccessKey, Equals, newReply.SecretAccessKey) c.Assert(reply.SecretAccessKey, Equals, newReply.SecretAccessKey)
c.Assert(len(reply.Name), Not(Equals), 0) c.Assert(len(reply.Name), Not(Equals), 0)
op = rpcOperation{ op = rpcOperation{
Method: "Auth.Reset", Method: "Controller.ResetAuth",
Request: AuthArgs{User: "newuser"}, Request: AuthArgs{User: "newuser"},
} }
req, err = newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport) req, err = newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport)
@ -174,10 +192,10 @@ func (s *ControllerRPCSuite) TestAuth(c *C) {
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(resp.StatusCode, Equals, http.StatusOK) c.Assert(resp.StatusCode, Equals, http.StatusOK)
var resetReply AuthReply var resetReply AuthRep
c.Assert(json.DecodeClientResponse(resp.Body, &resetReply), IsNil) c.Assert(json.DecodeClientResponse(resp.Body, &resetReply), IsNil)
resp.Body.Close() resp.Body.Close()
c.Assert(newReply, Not(DeepEquals), AuthReply{}) c.Assert(newReply, Not(DeepEquals), AuthRep{})
c.Assert(reply.AccessKeyID, Not(Equals), resetReply.AccessKeyID) c.Assert(reply.AccessKeyID, Not(Equals), resetReply.AccessKeyID)
c.Assert(reply.SecretAccessKey, Not(Equals), resetReply.SecretAccessKey) c.Assert(reply.SecretAccessKey, Not(Equals), resetReply.SecretAccessKey)
c.Assert(len(reply.Name), Not(Equals), 0) c.Assert(len(reply.Name), Not(Equals), 0)
@ -186,7 +204,7 @@ func (s *ControllerRPCSuite) TestAuth(c *C) {
/// generating access for existing user fails /// generating access for existing user fails
op = rpcOperation{ op = rpcOperation{
Method: "Auth.Generate", Method: "Controller.GenerateAuth",
Request: AuthArgs{User: "newuser"}, Request: AuthArgs{User: "newuser"},
} }
req, err = newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport) req, err = newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport)
@ -198,7 +216,7 @@ func (s *ControllerRPCSuite) TestAuth(c *C) {
/// null user provided invalid /// null user provided invalid
op = rpcOperation{ op = rpcOperation{
Method: "Auth.Generate", Method: "Controller.GenerateAuth",
Request: AuthArgs{User: ""}, Request: AuthArgs{User: ""},
} }
req, err = newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport) req, err = newRPCRequest(testControllerRPC.URL+"/rpc", op, http.DefaultTransport)

@ -16,11 +16,19 @@
package main package main
// Network properties of a server //// RPC params
type Network struct {
IP string `json:"address"` // AuthArgs auth params
Mask string `json:"netmask"` type AuthArgs struct {
Ethernet string `json:"networkInterface"` User string `json:"user"`
}
// DonutArgs collections of disks and name to initialize donut
type DonutArgs struct {
Name string
MaxSize uint64
Hostname string
Disks []string
} }
// ServerArg server metadata to identify a server // ServerArg server metadata to identify a server
@ -30,6 +38,8 @@ type ServerArg struct {
ID string `json:"id"` ID string `json:"id"`
} }
//// RPC replies
// ServerRep server reply container for Server.List // ServerRep server reply container for Server.List
type ServerRep struct { type ServerRep struct {
Name string `json:"name"` Name string `json:"name"`
@ -39,7 +49,7 @@ type ServerRep struct {
// DefaultRep default reply // DefaultRep default reply
type DefaultRep struct { type DefaultRep struct {
Error int64 `json:"error"` Error error `json:"error"`
Message string `json:"message"` Message string `json:"message"`
} }
@ -61,7 +71,11 @@ type MemStatsRep struct {
// NetStatsRep network statistics of a server // NetStatsRep network statistics of a server
type NetStatsRep struct { type NetStatsRep struct {
Interfaces []Network Interfaces []struct {
IP string `json:"address"`
Mask string `json:"netmask"`
Ethernet string `json:"networkInterface"`
}
} }
// SysInfoRep system information of a server // SysInfoRep system information of a server
@ -86,3 +100,10 @@ type VersionRep struct {
Architecture string `json:"arch"` Architecture string `json:"arch"`
OperatingSystem string `json:"os"` OperatingSystem string `json:"os"`
} }
// AuthRep reply with access keys and secret ids for the user
type AuthRep struct {
Name string `json:"name"`
AccessKeyID string `json:"accessKeyId"`
SecretAccessKey string `json:"secretAccessKey"`
}

@ -27,8 +27,8 @@ import (
type serverRPCService struct{} type serverRPCService struct{}
func (s *serverRPCService) Add(r *http.Request, arg *ServerArg, rep *DefaultRep) error { func (s *serverRPCService) Add(r *http.Request, arg *ServerArg, rep *DefaultRep) error {
rep.Error = 0 rep.Message = "Server " + arg.URL + " added successfully"
rep.Message = "Added successfully" rep.Error = nil
return nil return nil
} }
@ -58,7 +58,17 @@ func (s *serverRPCService) SysInfo(r *http.Request, arg *ServerArg, rep *SysInfo
} }
func (s *serverRPCService) NetStats(r *http.Request, arg *ServerArg, rep *NetStatsRep) error { func (s *serverRPCService) NetStats(r *http.Request, arg *ServerArg, rep *NetStatsRep) error {
rep.Interfaces = []Network{{"192.168.1.1", "255.255.255.0", "eth0"}} rep.Interfaces = []struct {
IP string `json:"address"`
Mask string `json:"netmask"`
Ethernet string `json:"networkInterface"`
}{
{
"192.168.1.1",
"255.255.255.0",
"eth0",
},
}
return nil return nil
} }

Loading…
Cancel
Save