Implement ServerConfig admin REST API (#3741)

Returns a valid config.json of the setup. In case of distributed
setup, it checks if quorum or more number of nodes have the same
config.json.
master
Krishnan Parthasarathi 8 years ago committed by Harshavardhana
parent 70d825c608
commit 2745bf2f1f
  1. 29
      cmd/admin-handlers.go
  2. 45
      cmd/admin-handlers_test.go
  3. 5
      cmd/admin-router.go
  4. 153
      cmd/admin-rpc-client.go
  5. 260
      cmd/admin-rpc-client_test.go
  6. 26
      cmd/admin-rpc-server.go
  7. 44
      cmd/admin-rpc-server_test.go
  8. 35
      pkg/madmin/API.md
  9. 49
      pkg/madmin/config-commands.go
  10. 52
      pkg/madmin/examples/get-config.go
  11. 3
      pkg/madmin/examples/heal-bucket.go

@ -672,3 +672,32 @@ func (adminAPI adminAPIHandlers) HealFormatHandler(w http.ResponseWriter, r *htt
// Return 200 on success.
writeSuccessResponseHeadersOnly(w)
}
// GetConfigHandler - GET /?config
// - x-minio-operation = get
// Get config.json of this minio setup.
func (adminAPI adminAPIHandlers) GetConfigHandler(w http.ResponseWriter, r *http.Request) {
// Validate request signature.
adminAPIErr := checkRequestAuthType(r, "", "", "")
if adminAPIErr != ErrNone {
writeErrorResponse(w, adminAPIErr, r.URL)
return
}
// check if objectLayer is initialized, if not return.
if newObjectLayerFn() == nil {
writeErrorResponse(w, ErrServerNotInitialized, r.URL)
return
}
// Get config.json from all nodes. In a single node setup, it
// returns local config.json.
configBytes, err := getPeerConfig(globalAdminPeers)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
errorIf(err, "Failed to get config from peers")
return
}
writeSuccessResponseJSON(w, configBytes)
}

@ -988,3 +988,48 @@ func TestHealFormatHandler(t *testing.T) {
t.Errorf("Expected to succeed but failed with %d", rec.Code)
}
}
// TestGetConfigHandler - test for GetConfigHandler.
func TestGetConfigHandler(t *testing.T) {
adminTestBed, err := prepareAdminXLTestBed()
if err != nil {
t.Fatal("Failed to initialize a single node XL backend for admin handler tests.")
}
defer adminTestBed.TearDown()
// Initialize admin peers to make admin RPC calls.
eps, err := parseStorageEndpoints([]string{"http://127.0.0.1"})
if err != nil {
t.Fatalf("Failed to parse storage end point - %v", err)
}
// Set globalMinioAddr to be able to distinguish local endpoints from remote.
globalMinioAddr = eps[0].Host
initGlobalAdminPeers(eps)
// Prepare query params for get-config mgmt REST API.
queryVal := url.Values{}
queryVal.Set("config", "")
req, err := newTestRequest("GET", "/?"+queryVal.Encode(), 0, nil)
if err != nil {
t.Fatalf("Failed to construct get-config object request - %v", err)
}
// Set x-minio-operation header to get.
req.Header.Set(minioAdminOpHeader, "get")
// Sign the request using signature v4.
cred := serverConfig.GetCredential()
err = signRequestV4(req, cred.AccessKey, cred.SecretKey)
if err != nil {
t.Fatalf("Failed to sign heal object request - %v", err)
}
rec := httptest.NewRecorder()
adminTestBed.mux.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
t.Errorf("Expected to succeed but failed with %d", rec.Code)
}
}

@ -62,4 +62,9 @@ func registerAdminRouter(mux *router.Router) {
adminRouter.Methods("POST").Queries("heal", "").Headers(minioAdminOpHeader, "object").HandlerFunc(adminAPI.HealObjectHandler)
// Heal Format.
adminRouter.Methods("POST").Queries("heal", "").Headers(minioAdminOpHeader, "format").HandlerFunc(adminAPI.HealFormatHandler)
/// Config operations
// Get config
adminRouter.Methods("GET").Queries("config", "").Headers(minioAdminOpHeader, "get").HandlerFunc(adminAPI.GetConfigHandler)
}

@ -17,8 +17,11 @@
package cmd
import (
"encoding/json"
"errors"
"net/url"
"path"
"reflect"
"sort"
"sync"
"time"
@ -41,6 +44,7 @@ type adminCmdRunner interface {
ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error)
ReInitDisks() error
Uptime() (time.Duration, error)
GetConfig() ([]byte, error)
}
// Restart - Sends a message over channel to the go-routine
@ -112,6 +116,25 @@ func (rc remoteAdminClient) Uptime() (time.Duration, error) {
return reply.Uptime, nil
}
// GetConfig - returns config.json of the local server.
func (lc localAdminClient) GetConfig() ([]byte, error) {
if serverConfig == nil {
return nil, errors.New("config not present")
}
return json.Marshal(serverConfig)
}
// GetConfig - returns config.json of the remote server.
func (rc remoteAdminClient) GetConfig() ([]byte, error) {
args := AuthRPCArgs{}
reply := ConfigReply{}
if err := rc.Call("Admin.GetConfig", &args, &reply); err != nil {
return nil, err
}
return reply.Config, nil
}
// adminPeer - represents an entity that implements Restart methods.
type adminPeer struct {
addr string
@ -336,3 +359,133 @@ func getPeerUptimes(peers adminPeers) (time.Duration, error) {
return latestUptime, nil
}
// getPeerConfig - Fetches config.json from all nodes in the setup and
// returns the one that occurs in a majority of them.
func getPeerConfig(peers adminPeers) ([]byte, error) {
if !globalIsDistXL {
return peers[0].cmdRunner.GetConfig()
}
errs := make([]error, len(peers))
configs := make([][]byte, len(peers))
// Get config from all servers.
wg := sync.WaitGroup{}
for i, peer := range peers {
wg.Add(1)
go func(idx int, peer adminPeer) {
defer wg.Done()
configs[idx], errs[idx] = peer.cmdRunner.GetConfig()
}(i, peer)
}
wg.Wait()
// Find the maximally occurring config among peers in a
// distributed setup.
serverConfigs := make([]serverConfigV13, len(peers))
for i, configBytes := range configs {
if errs[i] != nil {
continue
}
// Unmarshal the received config files.
err := json.Unmarshal(configBytes, &serverConfigs[i])
if err != nil {
errorIf(err, "Failed to unmarshal serverConfig from ", peers[i].addr)
return nil, err
}
}
configJSON, err := getValidServerConfig(serverConfigs, errs)
if err != nil {
errorIf(err, "Unable to find a valid server config")
return nil, traceError(err)
}
// Return the config.json that was present quorum or more
// number of disks.
return json.Marshal(configJSON)
}
// getValidServerConfig - finds the server config that is present in
// quorum or more number of servers.
func getValidServerConfig(serverConfigs []serverConfigV13, errs []error) (serverConfigV13, error) {
// majority-based quorum
quorum := len(serverConfigs)/2 + 1
// Count the number of disks a config.json was found in.
configCounter := make([]int, len(serverConfigs))
// We group equal serverConfigs by the lowest index of the
// same value; e.g, let us take the following serverConfigs
// in a 4-node setup,
// serverConfigs == [c1, c2, c1, c1]
// configCounter == [3, 1, 0, 0]
// c1, c2 are the only distinct values that appear. c1 is
// identified by 0, the lowest index it appears in and c2 is
// identified by 1. So, we need to find the number of times
// each of these distinct values occur.
// Invariants:
// 1. At the beginning of the i-th iteration, the number of
// unique configurations seen so far is equal to the number of
// non-zero counter values in config[:i].
// 2. At the beginning of the i-th iteration, the sum of
// elements of configCounter[:i] is equal to the number of
// non-error configurations seen so far.
// For each of the serverConfig ...
for i := range serverConfigs {
// Skip nodes where getConfig failed.
if errs[i] != nil {
continue
}
// Check if it is equal to any of the configurations
// seen so far. If j == i is reached then we have an
// unseen configuration.
for j := 0; j <= i; j++ {
if j < i && configCounter[j] == 0 {
// serverConfigs[j] is known to be
// equal to a value that was already
// seen. See example above for
// clarity.
continue
} else if j < i && reflect.DeepEqual(serverConfigs[i], serverConfigs[j]) {
// serverConfigs[i] is equal to
// serverConfigs[j], update
// serverConfigs[j]'s counter since it
// is the lower index.
configCounter[j]++
break
} else if j == i {
// serverConfigs[i] is equal to no
// other value seen before. It is
// unique so far.
configCounter[i] = 1
break
} // else invariants specified above are violated.
}
}
// We find the maximally occurring server config and check if
// there is quorum.
var configJSON serverConfigV13
maxOccurrence := 0
for i, count := range configCounter {
if maxOccurrence < count {
maxOccurrence = count
configJSON = serverConfigs[i]
}
}
// If quorum nodes don't agree.
if maxOccurrence < quorum {
return serverConfigV13{}, errXLWriteQuorum
}
return configJSON, nil
}

@ -0,0 +1,260 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"encoding/json"
"reflect"
"testing"
)
var (
config1 = []byte(`{
"version": "13",
"credential": {
"accessKey": "minio",
"secretKey": "minio123"
},
"region": "us-east-1",
"logger": {
"console": {
"enable": true,
"level": "debug"
},
"file": {
"enable": false,
"fileName": "",
"level": ""
}
},
"notify": {
"amqp": {
"1": {
"enable": false,
"url": "",
"exchange": "",
"routingKey": "",
"exchangeType": "",
"mandatory": false,
"immediate": false,
"durable": false,
"internal": false,
"noWait": false,
"autoDeleted": false
}
},
"nats": {
"1": {
"enable": false,
"address": "",
"subject": "",
"username": "",
"password": "",
"token": "",
"secure": false,
"pingInterval": 0,
"streaming": {
"enable": false,
"clusterID": "",
"clientID": "",
"async": false,
"maxPubAcksInflight": 0
}
}
},
"elasticsearch": {
"1": {
"enable": false,
"url": "",
"index": ""
}
},
"redis": {
"1": {
"enable": false,
"address": "",
"password": "",
"key": ""
}
},
"postgresql": {
"1": {
"enable": false,
"connectionString": "",
"table": "",
"host": "",
"port": "",
"user": "",
"password": "",
"database": ""
}
},
"kafka": {
"1": {
"enable": false,
"brokers": null,
"topic": ""
}
},
"webhook": {
"1": {
"enable": false,
"endpoint": ""
}
}
}
}
`)
// diff from config1 - amqp.Enable is True
config2 = []byte(`{
"version": "13",
"credential": {
"accessKey": "minio",
"secretKey": "minio123"
},
"region": "us-east-1",
"logger": {
"console": {
"enable": true,
"level": "debug"
},
"file": {
"enable": false,
"fileName": "",
"level": ""
}
},
"notify": {
"amqp": {
"1": {
"enable": true,
"url": "",
"exchange": "",
"routingKey": "",
"exchangeType": "",
"mandatory": false,
"immediate": false,
"durable": false,
"internal": false,
"noWait": false,
"autoDeleted": false
}
},
"nats": {
"1": {
"enable": false,
"address": "",
"subject": "",
"username": "",
"password": "",
"token": "",
"secure": false,
"pingInterval": 0,
"streaming": {
"enable": false,
"clusterID": "",
"clientID": "",
"async": false,
"maxPubAcksInflight": 0
}
}
},
"elasticsearch": {
"1": {
"enable": false,
"url": "",
"index": ""
}
},
"redis": {
"1": {
"enable": false,
"address": "",
"password": "",
"key": ""
}
},
"postgresql": {
"1": {
"enable": false,
"connectionString": "",
"table": "",
"host": "",
"port": "",
"user": "",
"password": "",
"database": ""
}
},
"kafka": {
"1": {
"enable": false,
"brokers": null,
"topic": ""
}
},
"webhook": {
"1": {
"enable": false,
"endpoint": ""
}
}
}
}
`)
)
// TestGetValidServerConfig - test for getValidServerConfig.
func TestGetValidServerConfig(t *testing.T) {
var c1, c2 serverConfigV13
err := json.Unmarshal(config1, &c1)
if err != nil {
t.Fatalf("json unmarshal of %s failed: %v", string(config1), err)
}
err = json.Unmarshal(config2, &c2)
if err != nil {
t.Fatalf("json unmarshal of %s failed: %v", string(config2), err)
}
// Valid config.
noErrs := []error{nil, nil, nil, nil}
serverConfigs := []serverConfigV13{c1, c2, c1, c1}
validConfig, err := getValidServerConfig(serverConfigs, noErrs)
if err != nil {
t.Errorf("Expected a valid config but received %v instead", err)
}
if !reflect.DeepEqual(validConfig, c1) {
t.Errorf("Expected valid config to be %v but received %v", config1, validConfig)
}
// Invalid config - no quorum.
serverConfigs = []serverConfigV13{c1, c2, c2, c1}
validConfig, err = getValidServerConfig(serverConfigs, noErrs)
if err != errXLWriteQuorum {
t.Errorf("Expected to fail due to lack of quorum but received %v", err)
}
// All errors
allErrs := []error{errDiskNotFound, errDiskNotFound, errDiskNotFound, errDiskNotFound}
serverConfigs = []serverConfigV13{serverConfigV13{}, serverConfigV13{}, serverConfigV13{}, serverConfigV13{}}
validConfig, err = getValidServerConfig(serverConfigs, allErrs)
if err != errXLWriteQuorum {
t.Errorf("Expected to fail due to lack of quorum but received %v", err)
}
}

@ -17,6 +17,7 @@
package cmd
import (
"encoding/json"
"errors"
"net/rpc"
"time"
@ -54,6 +55,12 @@ type UptimeReply struct {
Uptime time.Duration
}
// ConfigReply - wraps the server config response over RPC.
type ConfigReply struct {
AuthRPCReply
Config []byte // json-marshalled bytes of serverConfigV13
}
// Restart - Restart this instance of minio server.
func (s *adminCmd) Restart(args *AuthRPCArgs, reply *AuthRPCReply) error {
if err := args.IsAuthenticated(); err != nil {
@ -132,6 +139,25 @@ func (s *adminCmd) Uptime(args *AuthRPCArgs, reply *UptimeReply) error {
return nil
}
// GetConfig - returns the config.json of this server.
func (s *adminCmd) GetConfig(args *AuthRPCArgs, reply *ConfigReply) error {
if err := args.IsAuthenticated(); err != nil {
return err
}
if serverConfig == nil {
return errors.New("config not present")
}
jsonBytes, err := json.Marshal(serverConfig)
if err != nil {
return err
}
reply.Config = jsonBytes
return nil
}
// registerAdminRPCRouter - registers RPC methods for service status,
// stop and restart commands.
func registerAdminRPCRouter(mux *router.Router) error {

@ -17,6 +17,7 @@
package cmd
import (
"encoding/json"
"net/url"
"testing"
"time"
@ -144,3 +145,46 @@ func TestReInitDisks(t *testing.T) {
errUnsupportedBackend, err)
}
}
func TestGetConfig(t *testing.T) {
// Reset global variables to start afresh.
resetTestGlobals()
rootPath, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("Unable to initialize server config. %s", err)
}
defer removeAll(rootPath)
adminServer := adminCmd{}
creds := serverConfig.GetCredential()
args := LoginRPCArgs{
Username: creds.AccessKey,
Password: creds.SecretKey,
Version: Version,
RequestTime: time.Now().UTC(),
}
reply := LoginRPCReply{}
err = adminServer.Login(&args, &reply)
if err != nil {
t.Fatalf("Failed to login to admin server - %v", err)
}
authArgs := AuthRPCArgs{
AuthToken: reply.AuthToken,
RequestTime: time.Now().UTC(),
}
configReply := ConfigReply{}
err = adminServer.GetConfig(&authArgs, &configReply)
if err != nil {
t.Errorf("Expected GetConfig to pass but failed with %v", err)
}
var config serverConfigV13
err = json.Unmarshal(configReply.Config, &config)
if err != nil {
t.Errorf("Expected json unmarshal to pass but failed with %v", err)
}
}

@ -36,13 +36,13 @@ func main() {
```
| Service operations|LockInfo operations|Healing operations|
|:---|:---|:---|
|[`ServiceStatus`](#ServiceStatus)| [`ListLocks`](#ListLocks)| [`ListObjectsHeal`](#ListObjectsHeal)|
|[`ServiceRestart`](#ServiceRestart)| [`ClearLocks`](#ClearLocks)| [`ListBucketsHeal`](#ListBucketsHeal)|
| | |[`HealBucket`](#HealBucket) |
| | |[`HealObject`](#HealObject)|
| | |[`HealFormat`](#HealFormat)|
| Service operations|LockInfo operations|Healing operations|Config operations|
|:---|:---|:---|:---|
|[`ServiceStatus`](#ServiceStatus)| [`ListLocks`](#ListLocks)| [`ListObjectsHeal`](#ListObjectsHeal)|[`GetConfig`](#GetConfig)|
|[`ServiceRestart`](#ServiceRestart)| [`ClearLocks`](#ClearLocks)| [`ListBucketsHeal`](#ListBucketsHeal)||
| | |[`HealBucket`](#HealBucket) ||
| | |[`HealObject`](#HealObject)||
| | |[`HealFormat`](#HealFormat)||
## 1. Constructor
<a name="Minio"></a>
@ -272,3 +272,24 @@ __Example__
log.Println("successfully healed storage format on available disks.")
```
<a name="GetConfig"></a>
### GetConfig() ([]byte, error)
Get config.json of a minio setup.
__Example__
``` go
configBytes, err := madmClnt.GetConfig()
if err != nil {
log.Fatalf("failed due to: %v", err)
}
// Pretty-print config received as json.
var buf bytes.Buffer
err = json.Indent(buf, configBytes, "", "\t")
if err != nil {
log.Fatalf("failed due to: %v", err)
}
log.Println("config received successfully: ", string(buf.Bytes()))
```

@ -0,0 +1,49 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package madmin
import (
"io/ioutil"
"net/http"
"net/url"
)
// GetConfig - returns the config.json of a minio setup.
func (adm *AdminClient) GetConfig() ([]byte, error) {
queryVal := make(url.Values)
queryVal.Set("config", "")
hdrs := make(http.Header)
hdrs.Set(minioAdminOpHeader, "get")
reqData := requestData{
queryValues: queryVal,
customHeaders: hdrs,
}
// Execute GET on /?lock to list locks.
resp, err := adm.executeMethod("GET", reqData)
defer closeResponse(resp)
if err != nil {
return nil, err
}
// Return the JSON marshalled bytes to user.
return ioutil.ReadAll(resp.Body)
}

@ -0,0 +1,52 @@
/* +build ignore
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package main
import (
"bytes"
"encoding/json"
"log"
"github.com/minio/minio/pkg/madmin"
)
func main() {
// Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY are
// dummy values, please replace them with original values.
// API requests are secure (HTTPS) if secure=true and insecure (HTTPS) otherwise.
// New returns an Minio Admin client object.
madmClnt, err := madmin.New("your-minio.example.com:9000", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true)
if err != nil {
log.Fatalln(err)
}
configBytes, err := madmClnt.GetConfig()
if err != nil {
log.Fatalf("failed due to: %v", err)
}
// Pretty-print config received as json.
var buf bytes.Buffer
err = json.Indent(&buf, configBytes, "", "\t")
if err != nil {
log.Fatalf("failed due to: %v", err)
}
log.Println("config received successfully: ", string(buf.Bytes()))
}

@ -29,9 +29,6 @@ func main() {
// Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY are
// dummy values, please replace them with original values.
// Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY are
// dummy values, please replace them with original values.
// API requests are secure (HTTPS) if secure=true and insecure (HTTPS) otherwise.
// New returns an Minio Admin client object.
madmClnt, err := madmin.New("your-minio.example.com:9000", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true)

Loading…
Cancel
Save