From 60f52f461fd57e7526c04a352b7a94683e07e6a5 Mon Sep 17 00:00:00 2001 From: Bala FA Date: Mon, 19 Aug 2019 02:56:32 +0000 Subject: [PATCH] add network read performance collection support. (#8038) ReST API on /minio/admin/v1/performance?perfType=net[?size=N] returns ``` { "PEER-1": [ { "addr": ADDR, "readPerf": DURATION, "error": ERROR, }, ... ], ... ... "PEER-N": [ { "addr": ADDR, "readPerf": DURATION, "error": ERROR, }, ... ] } ``` --- cmd/admin-handlers.go | 47 ++++++++++++++++++++++++ cmd/notification.go | 49 +++++++++++++++++++++++++ cmd/peer-rest-client.go | 32 ++++++++++++++++ cmd/peer-rest-common.go | 3 ++ cmd/peer-rest-server.go | 81 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 212 insertions(+) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index a392f9c65..a0e74887a 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -31,6 +31,7 @@ import ( "strings" "time" + humanize "github.com/dustin/go-humanize" "github.com/gorilla/mux" "github.com/tidwall/gjson" "github.com/tidwall/sjson" @@ -50,6 +51,7 @@ import ( const ( maxEConfigJSONSize = 262272 + defaultNetPerfSize = 100 * humanize.MiByte ) // Type-safe query params. @@ -304,6 +306,13 @@ type ServerMemUsageInfo struct { HistoricUsage []mem.Usage `json:"historicUsage"` } +// ServerNetReadPerfInfo network read performance information. +type ServerNetReadPerfInfo struct { + Addr string `json:"addr"` + ReadPerf time.Duration `json:"readPerf"` + Error string `json:"error,omitempty"` +} + // PerfInfoHandler - GET /minio/admin/v1/performance?perfType={perfType} // ---------- // Get all performance information based on input type @@ -318,6 +327,44 @@ func (a adminAPIHandlers) PerfInfoHandler(w http.ResponseWriter, r *http.Request vars := mux.Vars(r) switch perfType := vars["perfType"]; perfType { + case "net": + var size int64 = defaultNetPerfSize + if sizeStr, found := vars["size"]; found { + var err error + if size, err = strconv.ParseInt(sizeStr, 10, 64); err != nil || size < 0 { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrBadRequest), r.URL) + return + } + } + + storage := objectAPI.StorageInfo(ctx) + if !(storage.Backend.Type == BackendFS || storage.Backend.Type == BackendErasure) { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL) + return + } + + addr := r.Host + if globalIsDistXL { + addr = GetLocalPeer(globalEndpoints) + } + + infos := map[string][]ServerNetReadPerfInfo{} + infos[addr] = globalNotificationSys.NetReadPerfInfo(size) + for peer, info := range globalNotificationSys.CollectNetPerfInfo(size) { + infos[peer] = info + } + + // Marshal API response + jsonBytes, err := json.Marshal(infos) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + + // Reply with performance information (across nodes in a + // distributed setup) as json. + writeSuccessResponseJSON(w, jsonBytes) + case "drive": info := objectAPI.StorageInfo(ctx) if !(info.Backend.Type == BackendFS || info.Backend.Type == BackendErasure) { diff --git a/cmd/notification.go b/cmd/notification.go index 7b2952ce7..98137fca2 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -907,6 +907,55 @@ func (sys *NotificationSys) Send(args eventArgs) []event.TargetIDErr { return sys.send(args.BucketName, args.ToEvent(), targetIDs...) } +// NetReadPerfInfo - Network read performance information. +func (sys *NotificationSys) NetReadPerfInfo(size int64) []ServerNetReadPerfInfo { + reply := make([]ServerNetReadPerfInfo, len(sys.peerClients)) + + // Execution is done serially. + for i, client := range sys.peerClients { + if client == nil { + continue + } + + info, err := client.NetReadPerfInfo(size) + if err != nil { + reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", client.host.String()) + ctx := logger.SetReqInfo(context.Background(), reqInfo) + logger.LogIf(ctx, err) + + info.Addr = client.host.String() + info.Error = err.Error() + } + + reply[i] = info + } + + return reply +} + +// CollectNetPerfInfo - Collect network performance information of all peers. +func (sys *NotificationSys) CollectNetPerfInfo(size int64) map[string][]ServerNetReadPerfInfo { + reply := map[string][]ServerNetReadPerfInfo{} + + // Execution is done serially. + for _, client := range sys.peerClients { + if client == nil { + continue + } + + info, err := client.CollectNetPerfInfo(size) + if err != nil { + reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", client.host.String()) + ctx := logger.SetReqInfo(context.Background(), reqInfo) + logger.LogIf(ctx, err) + } + + reply[client.host.String()] = info + } + + return reply +} + // DrivePerfInfo - Drive speed (read and write) information func (sys *NotificationSys) DrivePerfInfo() []ServerDrivesPerfInfo { reply := make([]ServerDrivesPerfInfo, len(sys.peerClients)) diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 352171b98..0cb8f5a75 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -22,6 +22,7 @@ import ( "crypto/tls" "encoding/gob" "io" + "math/rand" "net/url" "strconv" "time" @@ -107,6 +108,37 @@ func (client *peerRESTClient) Close() error { // GetLocksResp stores various info from the client for each lock that is requested. type GetLocksResp map[string][]lockRequesterInfo +// NetReadPerfInfo - fetch network read performance information for a remote node. +func (client *peerRESTClient) NetReadPerfInfo(size int64) (info ServerNetReadPerfInfo, err error) { + params := make(url.Values) + params.Set(peerRESTNetPerfSize, strconv.FormatInt(size, 10)) + respBody, err := client.call( + peerRESTMethodNetReadPerfInfo, + params, + rand.New(rand.NewSource(time.Now().UnixNano())), + size, + ) + if err != nil { + return + } + defer http.DrainBody(respBody) + err = gob.NewDecoder(respBody).Decode(&info) + return info, err +} + +// CollectNetPerfInfo - collect network performance information of other peers. +func (client *peerRESTClient) CollectNetPerfInfo(size int64) (info []ServerNetReadPerfInfo, err error) { + params := make(url.Values) + params.Set(peerRESTNetPerfSize, strconv.FormatInt(size, 10)) + respBody, err := client.call(peerRESTMethodCollectNetPerfInfo, params, nil, -1) + if err != nil { + return + } + defer http.DrainBody(respBody) + err = gob.NewDecoder(respBody).Decode(&info) + return info, err +} + // GetLocks - fetch older locks for a remote node. func (client *peerRESTClient) GetLocks() (locks GetLocksResp, err error) { respBody, err := client.call(peerRESTMethodGetLocks, nil, nil, -1) diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index 5faba48e6..bf5e27023 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -20,6 +20,8 @@ const peerRESTVersion = "v4" const peerRESTPath = minioReservedBucketPath + "/peer/" + peerRESTVersion const ( + peerRESTMethodNetReadPerfInfo = "netreadperfinfo" + peerRESTMethodCollectNetPerfInfo = "collectnetperfinfo" peerRESTMethodServerInfo = "serverinfo" peerRESTMethodCPULoadInfo = "cpuloadinfo" peerRESTMethodMemUsageInfo = "memusageinfo" @@ -51,6 +53,7 @@ const ( ) const ( + peerRESTNetPerfSize = "netperfsize" peerRESTBucket = "bucket" peerRESTUser = "user" peerRESTGroup = "group" diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 5ca851a12..4b624e709 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -21,6 +21,8 @@ import ( "encoding/gob" "errors" "fmt" + "io" + "io/ioutil" "net/http" "sort" "strconv" @@ -105,6 +107,83 @@ func getPeerUptimes(serverInfo []ServerInfo) time.Duration { return times[0] } +// NetReadPerfInfoHandler - returns network read performance information. +func (s *peerRESTServer) NetReadPerfInfoHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + params := mux.Vars(r) + + sizeStr, found := params[peerRESTNetPerfSize] + if !found { + s.writeErrorResponse(w, errors.New("size is missing")) + return + } + + size, err := strconv.ParseInt(sizeStr, 10, 64) + if err != nil || size < 0 { + s.writeErrorResponse(w, errInvalidArgument) + return + } + + start := time.Now() + n, err := io.CopyN(ioutil.Discard, r.Body, size) + end := time.Now() + + if err != nil { + s.writeErrorResponse(w, err) + return + } + + if n != size { + s.writeErrorResponse(w, fmt.Errorf("short read; expected: %v, got: %v", size, n)) + return + } + + addr := r.Host + if globalIsDistXL { + addr = GetLocalPeer(globalEndpoints) + } + + info := ServerNetReadPerfInfo{ + Addr: addr, + ReadPerf: end.Sub(start), + } + + ctx := newContext(r, w, "NetReadPerfInfo") + logger.LogIf(ctx, gob.NewEncoder(w).Encode(info)) + w.(http.Flusher).Flush() +} + +// CollectNetPerfInfoHandler - returns network performance information collected from other peers. +func (s *peerRESTServer) CollectNetPerfInfoHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + params := mux.Vars(r) + sizeStr, found := params[peerRESTNetPerfSize] + if !found { + s.writeErrorResponse(w, errors.New("size is missing")) + return + } + + size, err := strconv.ParseInt(sizeStr, 10, 64) + if err != nil || size < 0 { + s.writeErrorResponse(w, errInvalidArgument) + return + } + + info := globalNotificationSys.NetReadPerfInfo(size) + + ctx := newContext(r, w, "CollectNetPerfInfo") + logger.LogIf(ctx, gob.NewEncoder(w).Encode(info)) + w.(http.Flusher).Flush() +} + // GetLocksHandler - returns list of older lock from the server. func (s *peerRESTServer) GetLocksHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -847,6 +926,8 @@ func (s *peerRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool { func registerPeerRESTHandlers(router *mux.Router) { server := &peerRESTServer{} subrouter := router.PathPrefix(peerRESTPath).Subrouter() + subrouter.Methods(http.MethodPost).Path(SlashSeparator + peerRESTMethodNetReadPerfInfo).HandlerFunc(httpTraceHdrs(server.NetReadPerfInfoHandler)) + subrouter.Methods(http.MethodPost).Path(SlashSeparator + peerRESTMethodCollectNetPerfInfo).HandlerFunc(httpTraceHdrs(server.CollectNetPerfInfoHandler)) subrouter.Methods(http.MethodPost).Path(SlashSeparator + peerRESTMethodGetLocks).HandlerFunc(httpTraceHdrs(server.GetLocksHandler)) subrouter.Methods(http.MethodPost).Path(SlashSeparator + peerRESTMethodServerInfo).HandlerFunc(httpTraceHdrs(server.ServerInfoHandler)) subrouter.Methods(http.MethodPost).Path(SlashSeparator + peerRESTMethodCPULoadInfo).HandlerFunc(httpTraceHdrs(server.CPULoadInfoHandler))