add network read performance collection support. (#8038)

ReST API on /minio/admin/v1/performance?perfType=net[?size=N] 
returns

```
{
  "PEER-1": [
             {
	       "addr": ADDR,
	       "readPerf": DURATION,
	       "error": ERROR,
	     },
	     ...
	   ],
  ...
  ...
  "PEER-N": [
             {
	       "addr": ADDR,
	       "readPerf": DURATION,
	       "error": ERROR,
	     },
	     ...
	   ]
}
```
master
Bala FA 5 years ago committed by Nitish Tiwari
parent e5fb6294a7
commit 60f52f461f
  1. 47
      cmd/admin-handlers.go
  2. 49
      cmd/notification.go
  3. 32
      cmd/peer-rest-client.go
  4. 3
      cmd/peer-rest-common.go
  5. 81
      cmd/peer-rest-server.go

@ -31,6 +31,7 @@ import (
"strings"
"time"
humanize "github.com/dustin/go-humanize"
"github.com/gorilla/mux"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
@ -50,6 +51,7 @@ import (
const (
maxEConfigJSONSize = 262272
defaultNetPerfSize = 100 * humanize.MiByte
)
// Type-safe query params.
@ -304,6 +306,13 @@ type ServerMemUsageInfo struct {
HistoricUsage []mem.Usage `json:"historicUsage"`
}
// ServerNetReadPerfInfo network read performance information.
type ServerNetReadPerfInfo struct {
Addr string `json:"addr"`
ReadPerf time.Duration `json:"readPerf"`
Error string `json:"error,omitempty"`
}
// PerfInfoHandler - GET /minio/admin/v1/performance?perfType={perfType}
// ----------
// Get all performance information based on input type
@ -318,6 +327,44 @@ func (a adminAPIHandlers) PerfInfoHandler(w http.ResponseWriter, r *http.Request
vars := mux.Vars(r)
switch perfType := vars["perfType"]; perfType {
case "net":
var size int64 = defaultNetPerfSize
if sizeStr, found := vars["size"]; found {
var err error
if size, err = strconv.ParseInt(sizeStr, 10, 64); err != nil || size < 0 {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrBadRequest), r.URL)
return
}
}
storage := objectAPI.StorageInfo(ctx)
if !(storage.Backend.Type == BackendFS || storage.Backend.Type == BackendErasure) {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL)
return
}
addr := r.Host
if globalIsDistXL {
addr = GetLocalPeer(globalEndpoints)
}
infos := map[string][]ServerNetReadPerfInfo{}
infos[addr] = globalNotificationSys.NetReadPerfInfo(size)
for peer, info := range globalNotificationSys.CollectNetPerfInfo(size) {
infos[peer] = info
}
// Marshal API response
jsonBytes, err := json.Marshal(infos)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
// Reply with performance information (across nodes in a
// distributed setup) as json.
writeSuccessResponseJSON(w, jsonBytes)
case "drive":
info := objectAPI.StorageInfo(ctx)
if !(info.Backend.Type == BackendFS || info.Backend.Type == BackendErasure) {

@ -907,6 +907,55 @@ func (sys *NotificationSys) Send(args eventArgs) []event.TargetIDErr {
return sys.send(args.BucketName, args.ToEvent(), targetIDs...)
}
// NetReadPerfInfo - Network read performance information.
func (sys *NotificationSys) NetReadPerfInfo(size int64) []ServerNetReadPerfInfo {
reply := make([]ServerNetReadPerfInfo, len(sys.peerClients))
// Execution is done serially.
for i, client := range sys.peerClients {
if client == nil {
continue
}
info, err := client.NetReadPerfInfo(size)
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", client.host.String())
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
info.Addr = client.host.String()
info.Error = err.Error()
}
reply[i] = info
}
return reply
}
// CollectNetPerfInfo - Collect network performance information of all peers.
func (sys *NotificationSys) CollectNetPerfInfo(size int64) map[string][]ServerNetReadPerfInfo {
reply := map[string][]ServerNetReadPerfInfo{}
// Execution is done serially.
for _, client := range sys.peerClients {
if client == nil {
continue
}
info, err := client.CollectNetPerfInfo(size)
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", client.host.String())
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
}
reply[client.host.String()] = info
}
return reply
}
// DrivePerfInfo - Drive speed (read and write) information
func (sys *NotificationSys) DrivePerfInfo() []ServerDrivesPerfInfo {
reply := make([]ServerDrivesPerfInfo, len(sys.peerClients))

@ -22,6 +22,7 @@ import (
"crypto/tls"
"encoding/gob"
"io"
"math/rand"
"net/url"
"strconv"
"time"
@ -107,6 +108,37 @@ func (client *peerRESTClient) Close() error {
// GetLocksResp stores various info from the client for each lock that is requested.
type GetLocksResp map[string][]lockRequesterInfo
// NetReadPerfInfo - fetch network read performance information for a remote node.
func (client *peerRESTClient) NetReadPerfInfo(size int64) (info ServerNetReadPerfInfo, err error) {
params := make(url.Values)
params.Set(peerRESTNetPerfSize, strconv.FormatInt(size, 10))
respBody, err := client.call(
peerRESTMethodNetReadPerfInfo,
params,
rand.New(rand.NewSource(time.Now().UnixNano())),
size,
)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
}
// CollectNetPerfInfo - collect network performance information of other peers.
func (client *peerRESTClient) CollectNetPerfInfo(size int64) (info []ServerNetReadPerfInfo, err error) {
params := make(url.Values)
params.Set(peerRESTNetPerfSize, strconv.FormatInt(size, 10))
respBody, err := client.call(peerRESTMethodCollectNetPerfInfo, params, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
}
// GetLocks - fetch older locks for a remote node.
func (client *peerRESTClient) GetLocks() (locks GetLocksResp, err error) {
respBody, err := client.call(peerRESTMethodGetLocks, nil, nil, -1)

@ -20,6 +20,8 @@ const peerRESTVersion = "v4"
const peerRESTPath = minioReservedBucketPath + "/peer/" + peerRESTVersion
const (
peerRESTMethodNetReadPerfInfo = "netreadperfinfo"
peerRESTMethodCollectNetPerfInfo = "collectnetperfinfo"
peerRESTMethodServerInfo = "serverinfo"
peerRESTMethodCPULoadInfo = "cpuloadinfo"
peerRESTMethodMemUsageInfo = "memusageinfo"
@ -51,6 +53,7 @@ const (
)
const (
peerRESTNetPerfSize = "netperfsize"
peerRESTBucket = "bucket"
peerRESTUser = "user"
peerRESTGroup = "group"

@ -21,6 +21,8 @@ import (
"encoding/gob"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"sort"
"strconv"
@ -105,6 +107,83 @@ func getPeerUptimes(serverInfo []ServerInfo) time.Duration {
return times[0]
}
// NetReadPerfInfoHandler - returns network read performance information.
func (s *peerRESTServer) NetReadPerfInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
params := mux.Vars(r)
sizeStr, found := params[peerRESTNetPerfSize]
if !found {
s.writeErrorResponse(w, errors.New("size is missing"))
return
}
size, err := strconv.ParseInt(sizeStr, 10, 64)
if err != nil || size < 0 {
s.writeErrorResponse(w, errInvalidArgument)
return
}
start := time.Now()
n, err := io.CopyN(ioutil.Discard, r.Body, size)
end := time.Now()
if err != nil {
s.writeErrorResponse(w, err)
return
}
if n != size {
s.writeErrorResponse(w, fmt.Errorf("short read; expected: %v, got: %v", size, n))
return
}
addr := r.Host
if globalIsDistXL {
addr = GetLocalPeer(globalEndpoints)
}
info := ServerNetReadPerfInfo{
Addr: addr,
ReadPerf: end.Sub(start),
}
ctx := newContext(r, w, "NetReadPerfInfo")
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
w.(http.Flusher).Flush()
}
// CollectNetPerfInfoHandler - returns network performance information collected from other peers.
func (s *peerRESTServer) CollectNetPerfInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
params := mux.Vars(r)
sizeStr, found := params[peerRESTNetPerfSize]
if !found {
s.writeErrorResponse(w, errors.New("size is missing"))
return
}
size, err := strconv.ParseInt(sizeStr, 10, 64)
if err != nil || size < 0 {
s.writeErrorResponse(w, errInvalidArgument)
return
}
info := globalNotificationSys.NetReadPerfInfo(size)
ctx := newContext(r, w, "CollectNetPerfInfo")
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
w.(http.Flusher).Flush()
}
// GetLocksHandler - returns list of older lock from the server.
func (s *peerRESTServer) GetLocksHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
@ -847,6 +926,8 @@ func (s *peerRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool {
func registerPeerRESTHandlers(router *mux.Router) {
server := &peerRESTServer{}
subrouter := router.PathPrefix(peerRESTPath).Subrouter()
subrouter.Methods(http.MethodPost).Path(SlashSeparator + peerRESTMethodNetReadPerfInfo).HandlerFunc(httpTraceHdrs(server.NetReadPerfInfoHandler))
subrouter.Methods(http.MethodPost).Path(SlashSeparator + peerRESTMethodCollectNetPerfInfo).HandlerFunc(httpTraceHdrs(server.CollectNetPerfInfoHandler))
subrouter.Methods(http.MethodPost).Path(SlashSeparator + peerRESTMethodGetLocks).HandlerFunc(httpTraceHdrs(server.GetLocksHandler))
subrouter.Methods(http.MethodPost).Path(SlashSeparator + peerRESTMethodServerInfo).HandlerFunc(httpTraceHdrs(server.ServerInfoHandler))
subrouter.Methods(http.MethodPost).Path(SlashSeparator + peerRESTMethodCPULoadInfo).HandlerFunc(httpTraceHdrs(server.CPULoadInfoHandler))

Loading…
Cancel
Save