From 8757c963baad01dc87f37a984c455cd070c41c71 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 14 Jan 2019 12:14:20 +0530 Subject: [PATCH] Migrate all Peer communication to common Notification subsystem (#7031) Deprecate the use of Admin Peers concept and migrate all peer communication to Notification subsystem. This finally allows for a common subsystem for all peer notification in case of distributed server deployments. --- cmd/admin-handlers.go | 273 +++++++++++++++++---------------- cmd/admin-handlers_test.go | 5 - cmd/admin-heal-ops.go | 8 +- cmd/admin-rpc-client.go | 261 ------------------------------- cmd/admin-rpc-server.go | 86 ----------- cmd/admin-rpc_test.go | 161 ------------------- cmd/gateway-main.go | 9 -- cmd/globals.go | 3 - cmd/local-admin-client.go | 112 -------------- cmd/local-admin-client_test.go | 29 ---- cmd/notification.go | 269 ++++++++++++++++++++++++++------ cmd/peer-rpc-client.go | 32 ++++ cmd/peer-rpc-server.go | 118 +++++++++++++- cmd/routers.go | 5 +- cmd/server-main.go | 3 - cmd/sts-handlers.go | 16 +- cmd/utils.go | 20 +++ cmd/web-handlers.go | 6 +- 18 files changed, 545 insertions(+), 871 deletions(-) delete mode 100644 cmd/admin-rpc-client.go delete mode 100644 cmd/admin-rpc-server.go delete mode 100644 cmd/admin-rpc_test.go delete mode 100644 cmd/local-admin-client.go delete mode 100644 cmd/local-admin-client_test.go diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 51bd4e431..4fafd5501 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -17,22 +17,22 @@ package cmd import ( - "archive/zip" "bytes" "context" "encoding/base64" "encoding/json" "errors" - "fmt" "io" "net/http" "os" "strconv" "strings" - "sync" "time" "github.com/gorilla/mux" + "github.com/tidwall/gjson" + "github.com/tidwall/sjson" + "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/cpu" @@ -41,9 +41,8 @@ import ( "github.com/minio/minio/pkg/iam/policy" "github.com/minio/minio/pkg/madmin" "github.com/minio/minio/pkg/mem" + xnet "github.com/minio/minio/pkg/net" "github.com/minio/minio/pkg/quick" - "github.com/tidwall/gjson" - "github.com/tidwall/sjson" ) const ( @@ -97,6 +96,11 @@ func (a adminAPIHandlers) VersionHandler(w http.ResponseWriter, r *http.Request) func (a adminAPIHandlers) ServiceStatusHandler(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "ServiceStatus") + if globalNotificationSys == nil { + writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL) + return + } + adminAPIErr := checkAdminRequestAuthType(ctx, r, "") if adminAPIErr != ErrNone { writeErrorResponseJSON(w, adminAPIErr, r.URL) @@ -109,13 +113,8 @@ func (a adminAPIHandlers) ServiceStatusHandler(w http.ResponseWriter, r *http.Re CommitID: CommitID, } - // Fetch uptimes from all peers. This may fail to due to lack - // of read-quorum availability. - uptime, err := getPeerUptimes(globalAdminPeers) - if err != nil { - writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL) - return - } + // Fetch uptimes from all peers and pick the latest. + uptime := getPeerUptimes(globalNotificationSys.ServerInfo(ctx)) // Create API response serverStatus := madmin.ServiceStatus{ @@ -129,6 +128,7 @@ func (a adminAPIHandlers) ServiceStatusHandler(w http.ResponseWriter, r *http.Re writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL) return } + // Reply with storage information (across nodes in a // distributed setup) as json. writeSuccessResponseJSON(w, jsonBytes) @@ -142,6 +142,11 @@ func (a adminAPIHandlers) ServiceStatusHandler(w http.ResponseWriter, r *http.Re func (a adminAPIHandlers) ServiceStopNRestartHandler(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "ServiceStopNRestart") + if globalNotificationSys == nil { + writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL) + return + } + adminAPIErr := checkAdminRequestAuthType(ctx, r, "") if adminAPIErr != ErrNone { writeErrorResponseJSON(w, adminAPIErr, r.URL) @@ -151,7 +156,6 @@ func (a adminAPIHandlers) ServiceStopNRestartHandler(w http.ResponseWriter, r *h var sa madmin.ServiceAction err := json.NewDecoder(r.Body).Decode(&sa) if err != nil { - logger.LogIf(ctx, err) writeErrorResponseJSON(w, ErrRequestBodyParse, r.URL) return } @@ -171,7 +175,15 @@ func (a adminAPIHandlers) ServiceStopNRestartHandler(w http.ResponseWriter, r *h // Reply to the client before restarting minio server. writeSuccessResponseHeadersOnly(w) - sendServiceCmd(globalAdminPeers, serviceSig) + // Notify all other Minio peers signal service. + for _, nerr := range globalNotificationSys.SignalService(serviceSig) { + if nerr.Err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(ctx, nerr.Err) + } + } + + globalServiceSignalCh <- serviceSig } // ServerProperties holds some server information such as, version, region @@ -235,6 +247,12 @@ type ServerInfo struct { func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "ServerInfo") + objectAPI := newObjectLayerFn() + if objectAPI == nil || globalNotificationSys == nil { + writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL) + return + } + // Authenticate request // Setting the region as empty so as the mc server info command is irrespective to the region. @@ -244,39 +262,33 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque return } - // Web service response - reply := make([]ServerInfo, len(globalAdminPeers)) - - var wg sync.WaitGroup - - // Gather server information for all nodes - for i, p := range globalAdminPeers { - wg.Add(1) - - // Gather information from a peer in a goroutine - go func(idx int, peer adminPeer) { - defer wg.Done() - - // Initialize server info at index - reply[idx] = ServerInfo{Addr: peer.addr} - - serverInfoData, err := peer.cmdRunner.ServerInfo() - if err != nil { - reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", peer.addr) - ctx := logger.SetReqInfo(ctx, reqInfo) - logger.LogIf(ctx, err) - reply[idx].Error = err.Error() - return - } - - reply[idx].Data = &serverInfoData - }(i, p) + thisAddr, err := xnet.ParseHost(GetLocalPeer(globalEndpoints)) + if err != nil { + writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL) + return } - wg.Wait() + serverInfo := globalNotificationSys.ServerInfo(ctx) + // Once we have recieved all the ServerInfo from peers + // add the local peer server info as well. + serverInfo = append(serverInfo, ServerInfo{ + Addr: thisAddr.String(), + Data: &ServerInfoData{ + StorageInfo: objectAPI.StorageInfo(ctx), + ConnStats: globalConnStats.toServerConnStats(), + HTTPStats: globalHTTPStats.toServerHTTPStats(), + Properties: ServerProperties{ + Uptime: UTCNow().Sub(globalBootTime), + Version: Version, + CommitID: CommitID, + SQSARN: globalNotificationSys.GetARNList(), + Region: globalServerConfig.GetRegion(), + }, + }, + }) // Marshal API response - jsonBytes, err := json.Marshal(reply) + jsonBytes, err := json.Marshal(serverInfo) if err != nil { writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL) return @@ -323,7 +335,7 @@ func (a adminAPIHandlers) PerfInfoHandler(w http.ResponseWriter, r *http.Request // Get object layer instance. objLayer := newObjectLayerFn() - if objLayer == nil { + if objLayer == nil || globalNotificationSys == nil { writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL) return } @@ -417,6 +429,11 @@ type StartProfilingResult struct { func (a adminAPIHandlers) StartProfilingHandler(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "StartProfiling") + if globalNotificationSys == nil { + writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL) + return + } + adminAPIErr := checkAdminRequestAuthType(ctx, r, "") if adminAPIErr != ErrNone { writeErrorResponseJSON(w, adminAPIErr, r.URL) @@ -426,31 +443,53 @@ func (a adminAPIHandlers) StartProfilingHandler(w http.ResponseWriter, r *http.R vars := mux.Vars(r) profiler := vars["profilerType"] - startProfilingResult := make([]StartProfilingResult, len(globalAdminPeers)) - - // Call StartProfiling function on all nodes and save results - wg := sync.WaitGroup{} - for i, peer := range globalAdminPeers { - wg.Add(1) - go func(idx int, peer adminPeer) { - defer wg.Done() - result := StartProfilingResult{NodeName: peer.addr} - if err := peer.cmdRunner.StartProfiling(profiler); err != nil { - result.Error = err.Error() - return - } + thisAddr, err := xnet.ParseHost(GetLocalPeer(globalEndpoints)) + if err != nil { + writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL) + return + } + + // Start profiling on remote servers. + hostErrs := globalNotificationSys.StartProfiling(profiler) + + // Start profiling locally as well. + { + if globalProfiler != nil { + globalProfiler.Stop() + } + prof, err := startProfiler(profiler, "") + if err != nil { + hostErrs = append(hostErrs, NotificationPeerErr{ + Host: *thisAddr, + Err: err, + }) + } else { + globalProfiler = prof + hostErrs = append(hostErrs, NotificationPeerErr{ + Host: *thisAddr, + }) + } + } + + var startProfilingResult []StartProfilingResult + + for _, nerr := range hostErrs { + result := StartProfilingResult{NodeName: nerr.Host.String()} + if nerr.Err != nil { + result.Error = nerr.Err.Error() + } else { result.Success = true - startProfilingResult[idx] = result - }(i, peer) + } + startProfilingResult = append(startProfilingResult, result) } - wg.Wait() // Create JSON result and send it to the client startProfilingResultInBytes, err := json.Marshal(startProfilingResult) if err != nil { - writeCustomErrorResponseJSON(w, http.StatusInternalServerError, err.Error(), r.URL) + writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL) return } + writeSuccessResponseJSON(w, []byte(startProfilingResultInBytes)) } @@ -478,51 +517,18 @@ func (f dummyFileInfo) Sys() interface{} { return f.sys } func (a adminAPIHandlers) DownloadProfilingHandler(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "DownloadProfiling") + if globalNotificationSys == nil { + writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL) + return + } + adminAPIErr := checkAdminRequestAuthType(ctx, r, "") if adminAPIErr != ErrNone { writeErrorResponseJSON(w, adminAPIErr, r.URL) return } - profilingDataFound := false - - // Initialize a zip writer which will provide a zipped content - // of profiling data of all nodes - zipWriter := zip.NewWriter(w) - defer zipWriter.Close() - - for i, peer := range globalAdminPeers { - // Get profiling data from a node - data, err := peer.cmdRunner.DownloadProfilingData() - if err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to download profiling data from node `%s`, reason: %s", peer.addr, err.Error())) - continue - } - - profilingDataFound = true - - // Send profiling data to zip as file - header, err := zip.FileInfoHeader(dummyFileInfo{ - name: fmt.Sprintf("profiling-%d", i), - size: int64(len(data)), - mode: 0600, - modTime: time.Now().UTC(), - isDir: false, - sys: nil, - }) - if err != nil { - continue - } - writer, err := zipWriter.CreateHeader(header) - if err != nil { - continue - } - if _, err = io.Copy(writer, bytes.NewBuffer(data)); err != nil { - return - } - } - - if !profilingDataFound { + if !globalNotificationSys.DownloadProfilingData(ctx, w) { writeErrorResponseJSON(w, ErrAdminProfilerNotEnabled, r.URL) return } @@ -952,6 +958,11 @@ func (a adminAPIHandlers) ListUsers(w http.ResponseWriter, r *http.Request) { func (a adminAPIHandlers) SetUserStatus(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "SetUserStatus") + if globalNotificationSys == nil || globalIAMSys == nil { + writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL) + return + } + // Get current object layer instance. objectAPI := newObjectLayerFn() if objectAPI == nil { @@ -988,10 +999,10 @@ func (a adminAPIHandlers) SetUserStatus(w http.ResponseWriter, r *http.Request) } // Notify all other Minio peers to reload users - for host, err := range globalNotificationSys.LoadUsers() { - if err != nil { - logger.GetReqInfo(ctx).SetTags("peerAddress", host.String()) - logger.LogIf(ctx, err) + for _, nerr := range globalNotificationSys.LoadUsers() { + if nerr.Err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(ctx, nerr.Err) } } } @@ -1002,7 +1013,7 @@ func (a adminAPIHandlers) AddUser(w http.ResponseWriter, r *http.Request) { // Get current object layer instance. objectAPI := newObjectLayerFn() - if objectAPI == nil { + if objectAPI == nil || globalNotificationSys == nil || globalIAMSys == nil { writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL) return } @@ -1056,10 +1067,10 @@ func (a adminAPIHandlers) AddUser(w http.ResponseWriter, r *http.Request) { } // Notify all other Minio peers to reload users - for host, err := range globalNotificationSys.LoadUsers() { - if err != nil { - logger.GetReqInfo(ctx).SetTags("peerAddress", host.String()) - logger.LogIf(ctx, err) + for _, nerr := range globalNotificationSys.LoadUsers() { + if nerr.Err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(ctx, nerr.Err) } } } @@ -1070,7 +1081,7 @@ func (a adminAPIHandlers) ListCannedPolicies(w http.ResponseWriter, r *http.Requ // Get current object layer instance. objectAPI := newObjectLayerFn() - if objectAPI == nil { + if objectAPI == nil || globalIAMSys == nil || globalNotificationSys == nil { writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL) return } @@ -1102,7 +1113,7 @@ func (a adminAPIHandlers) RemoveCannedPolicy(w http.ResponseWriter, r *http.Requ // Get current object layer instance. objectAPI := newObjectLayerFn() - if objectAPI == nil { + if objectAPI == nil || globalIAMSys == nil || globalNotificationSys == nil { writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL) return } @@ -1129,10 +1140,10 @@ func (a adminAPIHandlers) RemoveCannedPolicy(w http.ResponseWriter, r *http.Requ } // Notify all other Minio peers to reload users - for host, err := range globalNotificationSys.LoadUsers() { - if err != nil { - logger.GetReqInfo(ctx).SetTags("peerAddress", host.String()) - logger.LogIf(ctx, err) + for _, nerr := range globalNotificationSys.LoadUsers() { + if nerr.Err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(ctx, nerr.Err) } } } @@ -1143,7 +1154,7 @@ func (a adminAPIHandlers) AddCannedPolicy(w http.ResponseWriter, r *http.Request // Get current object layer instance. objectAPI := newObjectLayerFn() - if objectAPI == nil { + if objectAPI == nil || globalIAMSys == nil || globalNotificationSys == nil { writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL) return } @@ -1194,10 +1205,10 @@ func (a adminAPIHandlers) AddCannedPolicy(w http.ResponseWriter, r *http.Request } // Notify all other Minio peers to reload users - for host, err := range globalNotificationSys.LoadUsers() { - if err != nil { - logger.GetReqInfo(ctx).SetTags("peerAddress", host.String()) - logger.LogIf(ctx, err) + for _, nerr := range globalNotificationSys.LoadUsers() { + if nerr.Err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(ctx, nerr.Err) } } } @@ -1208,7 +1219,7 @@ func (a adminAPIHandlers) SetUserPolicy(w http.ResponseWriter, r *http.Request) // Get current object layer instance. objectAPI := newObjectLayerFn() - if objectAPI == nil { + if objectAPI == nil || globalIAMSys == nil || globalNotificationSys == nil { writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL) return } @@ -1241,10 +1252,10 @@ func (a adminAPIHandlers) SetUserPolicy(w http.ResponseWriter, r *http.Request) } // Notify all other Minio peers to reload users - for host, err := range globalNotificationSys.LoadUsers() { - if err != nil { - logger.GetReqInfo(ctx).SetTags("peerAddress", host.String()) - logger.LogIf(ctx, err) + for _, nerr := range globalNotificationSys.LoadUsers() { + if nerr.Err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(ctx, nerr.Err) } } } @@ -1255,7 +1266,7 @@ func (a adminAPIHandlers) SetConfigHandler(w http.ResponseWriter, r *http.Reques // Get current object layer instance. objectAPI := newObjectLayerFn() - if objectAPI == nil { + if objectAPI == nil || globalNotificationSys == nil { writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL) return } @@ -1472,7 +1483,7 @@ func (a adminAPIHandlers) UpdateAdminCredentialsHandler(w http.ResponseWriter, // Get current object layer instance. objectAPI := newObjectLayerFn() - if objectAPI == nil { + if objectAPI == nil || globalNotificationSys == nil { writeErrorResponseJSON(w, ErrServerNotInitialized, r.URL) return } @@ -1535,10 +1546,10 @@ func (a adminAPIHandlers) UpdateAdminCredentialsHandler(w http.ResponseWriter, } // Notify all other Minio peers to update credentials - for host, err := range globalNotificationSys.LoadCredentials() { - if err != nil { - logger.GetReqInfo(ctx).SetTags("peerAddress", host.String()) - logger.LogIf(ctx, err) + for _, nerr := range globalNotificationSys.LoadCredentials() { + if nerr.Err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(ctx, nerr.Err) } } diff --git a/cmd/admin-handlers_test.go b/cmd/admin-handlers_test.go index 10af25d72..56d3f46a4 100644 --- a/cmd/admin-handlers_test.go +++ b/cmd/admin-handlers_test.go @@ -533,7 +533,6 @@ func testServicesCmdHandler(cmd cmdType, t *testing.T) { // single node setup, this degenerates to a simple function // call under the hood. globalMinioAddr = "127.0.0.1:9000" - initGlobalAdminPeers(mustGetNewEndpointList("http://127.0.0.1:9000/d1")) var wg sync.WaitGroup @@ -607,7 +606,6 @@ func TestServiceSetCreds(t *testing.T) { // single node setup, this degenerates to a simple function // call under the hood. globalMinioAddr = "127.0.0.1:9000" - initGlobalAdminPeers(mustGetNewEndpointList("http://127.0.0.1:9000/d1")) credentials := globalServerConfig.GetCredential() @@ -706,7 +704,6 @@ func TestGetConfigHandler(t *testing.T) { // Initialize admin peers to make admin RPC calls. globalMinioAddr = "127.0.0.1:9000" - initGlobalAdminPeers(mustGetNewEndpointList("http://127.0.0.1:9000/d1")) // Prepare query params for get-config mgmt REST API. queryVal := url.Values{} @@ -735,7 +732,6 @@ func TestSetConfigHandler(t *testing.T) { // Initialize admin peers to make admin RPC calls. globalMinioAddr = "127.0.0.1:9000" - initGlobalAdminPeers(mustGetNewEndpointList("http://127.0.0.1:9000/d1")) // Prepare query params for set-config mgmt REST API. queryVal := url.Values{} @@ -807,7 +803,6 @@ func TestAdminServerInfo(t *testing.T) { // Initialize admin peers to make admin RPC calls. globalMinioAddr = "127.0.0.1:9000" - initGlobalAdminPeers(mustGetNewEndpointList("http://127.0.0.1:9000/d1")) // Prepare query params for set-config mgmt REST API. queryVal := url.Values{} diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index 982534ac8..5389329d9 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -641,10 +641,10 @@ func (h *healSequence) healDiskFormat() error { // Healing succeeded notify the peers to reload format and re-initialize disks. // We will not notify peers only if healing succeeded. if err == nil { - for host, rerr := range globalNotificationSys.ReloadFormat(h.settings.DryRun) { - if rerr != nil { - logger.GetReqInfo(h.ctx).SetTags("peerAddress", host.String()) - logger.LogIf(h.ctx, rerr) + for _, nerr := range globalNotificationSys.ReloadFormat(h.settings.DryRun) { + if nerr.Err != nil { + logger.GetReqInfo(h.ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(h.ctx, nerr.Err) } } } diff --git a/cmd/admin-rpc-client.go b/cmd/admin-rpc-client.go deleted file mode 100644 index 874dc2f6f..000000000 --- a/cmd/admin-rpc-client.go +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2014, 2015, 2016, 2017, 2018 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "context" - "crypto/tls" - "fmt" - "net" - "sort" - "strings" - "sync" - "time" - - "github.com/minio/minio/cmd/logger" - xnet "github.com/minio/minio/pkg/net" -) - -var errUnsupportedSignal = fmt.Errorf("unsupported signal: only restart and stop signals are supported") - -// AdminRPCClient - admin RPC client talks to admin RPC server. -type AdminRPCClient struct { - *RPCClient -} - -// SignalService - calls SignalService RPC. -func (rpcClient *AdminRPCClient) SignalService(signal serviceSignal) (err error) { - args := SignalServiceArgs{Sig: signal} - reply := VoidReply{} - - return rpcClient.Call(adminServiceName+".SignalService", &args, &reply) -} - -// ServerInfo - returns the server info of the server to which the RPC call is made. -func (rpcClient *AdminRPCClient) ServerInfo() (sid ServerInfoData, err error) { - err = rpcClient.Call(adminServiceName+".ServerInfo", &AuthArgs{}, &sid) - return sid, err -} - -// StartProfiling - starts profiling in the remote server. -func (rpcClient *AdminRPCClient) StartProfiling(profiler string) error { - args := StartProfilingArgs{Profiler: profiler} - reply := VoidReply{} - return rpcClient.Call(adminServiceName+".StartProfiling", &args, &reply) -} - -// DownloadProfilingData - returns profiling data of the remote server. -func (rpcClient *AdminRPCClient) DownloadProfilingData() ([]byte, error) { - args := AuthArgs{} - var reply []byte - - err := rpcClient.Call(adminServiceName+".DownloadProfilingData", &args, &reply) - return reply, err -} - -// NewAdminRPCClient - returns new admin RPC client. -func NewAdminRPCClient(host *xnet.Host) (*AdminRPCClient, error) { - scheme := "http" - if globalIsSSL { - scheme = "https" - } - - serviceURL := &xnet.URL{ - Scheme: scheme, - Host: host.String(), - Path: adminServicePath, - } - - var tlsConfig *tls.Config - if globalIsSSL { - tlsConfig = &tls.Config{ - ServerName: host.Name, - RootCAs: globalRootCAs, - } - } - - rpcClient, err := NewRPCClient( - RPCClientArgs{ - NewAuthTokenFunc: newAuthToken, - RPCVersion: globalRPCAPIVersion, - ServiceName: adminServiceName, - ServiceURL: serviceURL, - TLSConfig: tlsConfig, - }, - ) - if err != nil { - return nil, err - } - - return &AdminRPCClient{rpcClient}, nil -} - -// adminCmdRunner - abstracts local and remote execution of admin -// commands like service stop and service restart. -type adminCmdRunner interface { - SignalService(s serviceSignal) error - ServerInfo() (ServerInfoData, error) - StartProfiling(string) error - DownloadProfilingData() ([]byte, error) -} - -// adminPeer - represents an entity that implements admin API RPCs. -type adminPeer struct { - addr string - cmdRunner adminCmdRunner - isLocal bool -} - -// type alias for a collection of adminPeer. -type adminPeers []adminPeer - -// makeAdminPeers - helper function to construct a collection of adminPeer. -func makeAdminPeers(endpoints EndpointList) (adminPeerList adminPeers) { - localAddr := GetLocalPeer(endpoints) - if strings.HasPrefix(localAddr, "127.0.0.1:") { - // Use first IPv4 instead of loopback address. - localAddr = net.JoinHostPort(sortIPs(localIP4.ToSlice())[0], globalMinioPort) - } - if strings.HasPrefix(localAddr, "[::1]:") { - // Use first IPv4 instead of loopback address. - localAddr = net.JoinHostPort(localIP6.ToSlice()[0], globalMinioPort) - } - - adminPeerList = append(adminPeerList, adminPeer{ - addr: localAddr, - cmdRunner: localAdminClient{}, - isLocal: true, - }) - - for _, hostStr := range GetRemotePeers(endpoints) { - host, err := xnet.ParseHost(hostStr) - logger.FatalIf(err, "Unable to parse Admin RPC Host") - rpcClient, err := NewAdminRPCClient(host) - logger.FatalIf(err, "Unable to initialize Admin RPC Client") - adminPeerList = append(adminPeerList, adminPeer{ - addr: hostStr, - cmdRunner: rpcClient, - }) - } - - return adminPeerList -} - -// Initialize global adminPeer collection. -func initGlobalAdminPeers(endpoints EndpointList) { - globalAdminPeers = makeAdminPeers(endpoints) -} - -// invokeServiceCmd - Invoke Restart/Stop command. -func invokeServiceCmd(cp adminPeer, cmd serviceSignal) (err error) { - switch cmd { - case serviceRestart, serviceStop: - err = cp.cmdRunner.SignalService(cmd) - } - return err -} - -// sendServiceCmd - Invoke Restart command on remote peers -// adminPeer followed by on the local peer. -func sendServiceCmd(cps adminPeers, cmd serviceSignal) { - // Send service command like stop or restart to all remote nodes and finally run on local node. - errs := make([]error, len(cps)) - var wg sync.WaitGroup - remotePeers := cps[1:] - for i := range remotePeers { - wg.Add(1) - go func(idx int) { - defer wg.Done() - // we use idx+1 because remotePeers slice is 1 position shifted w.r.t cps - errs[idx+1] = invokeServiceCmd(remotePeers[idx], cmd) - }(i) - } - wg.Wait() - errs[0] = invokeServiceCmd(cps[0], cmd) -} - -// uptimeSlice - used to sort uptimes in chronological order. -type uptimeSlice []struct { - err error - uptime time.Duration -} - -func (ts uptimeSlice) Len() int { - return len(ts) -} - -func (ts uptimeSlice) Less(i, j int) bool { - return ts[i].uptime < ts[j].uptime -} - -func (ts uptimeSlice) Swap(i, j int) { - ts[i], ts[j] = ts[j], ts[i] -} - -// getPeerUptimes - returns the uptime since the last time read quorum -// was established on success. Otherwise returns errXLReadQuorum. -func getPeerUptimes(peers adminPeers) (time.Duration, error) { - // In a single node Erasure or FS backend setup the uptime of - // the setup is the uptime of the single minio server - // instance. - if !globalIsDistXL { - return UTCNow().Sub(globalBootTime), nil - } - - uptimes := make(uptimeSlice, len(peers)) - - // Get up time of all servers. - wg := sync.WaitGroup{} - for i, peer := range peers { - wg.Add(1) - go func(idx int, peer adminPeer) { - defer wg.Done() - serverInfoData, rpcErr := peer.cmdRunner.ServerInfo() - uptimes[idx].uptime, uptimes[idx].err = serverInfoData.Properties.Uptime, rpcErr - }(i, peer) - } - wg.Wait() - - // Sort uptimes in chronological order. - sort.Sort(uptimes) - - // Pick the readQuorum'th uptime in chronological order. i.e, - // the time at which read quorum was (re-)established. - readQuorum := len(uptimes) / 2 - validCount := 0 - latestUptime := time.Duration(0) - for _, uptime := range uptimes { - if uptime.err != nil { - logger.LogIf(context.Background(), uptime.err) - continue - } - - validCount++ - if validCount >= readQuorum { - latestUptime = uptime.uptime - break - } - } - - // Less than readQuorum "Admin.Uptime" RPC call returned - // successfully, so read-quorum unavailable. - if validCount < readQuorum { - return time.Duration(0), InsufficientReadQuorum{} - } - - return latestUptime, nil -} diff --git a/cmd/admin-rpc-server.go b/cmd/admin-rpc-server.go deleted file mode 100644 index 91afa2e2d..000000000 --- a/cmd/admin-rpc-server.go +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016, 2017, 2018 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "path" - - "github.com/gorilla/mux" - "github.com/minio/minio/cmd/logger" - xrpc "github.com/minio/minio/cmd/rpc" -) - -const adminServiceName = "Admin" -const adminServiceSubPath = "/admin" - -var adminServicePath = path.Join(minioReservedBucketPath, adminServiceSubPath) - -// adminRPCReceiver - Admin RPC receiver for admin RPC server. -type adminRPCReceiver struct { - local *localAdminClient -} - -// SignalServiceArgs - provides the signal argument to SignalService RPC -type SignalServiceArgs struct { - AuthArgs - Sig serviceSignal -} - -// SignalService - Send a restart or stop signal to the service -func (receiver *adminRPCReceiver) SignalService(args *SignalServiceArgs, reply *VoidReply) error { - return receiver.local.SignalService(args.Sig) -} - -// ServerInfo - returns the server info when object layer was initialized on this server. -func (receiver *adminRPCReceiver) ServerInfo(args *AuthArgs, reply *ServerInfoData) (err error) { - *reply, err = receiver.local.ServerInfo() - return err -} - -// StartProfilingArgs - holds the RPC argument for StartingProfiling RPC call -type StartProfilingArgs struct { - AuthArgs - Profiler string -} - -// StartProfiling - starts profiling of this server -func (receiver *adminRPCReceiver) StartProfiling(args *StartProfilingArgs, reply *VoidReply) error { - return receiver.local.StartProfiling(args.Profiler) -} - -// DownloadProfilingData - stops and returns profiling data of this server -func (receiver *adminRPCReceiver) DownloadProfilingData(args *AuthArgs, reply *[]byte) (err error) { - *reply, err = receiver.local.DownloadProfilingData() - return -} - -// NewAdminRPCServer - returns new admin RPC server. -func NewAdminRPCServer() (*xrpc.Server, error) { - rpcServer := xrpc.NewServer() - if err := rpcServer.RegisterName(adminServiceName, &adminRPCReceiver{&localAdminClient{}}); err != nil { - return nil, err - } - return rpcServer, nil -} - -// registerAdminRPCRouter - creates and registers Admin RPC server and its router. -func registerAdminRPCRouter(router *mux.Router) { - rpcServer, err := NewAdminRPCServer() - logger.FatalIf(err, "Unable to initialize Lock RPC Server") - subrouter := router.PathPrefix(minioReservedBucketPath).Subrouter() - subrouter.Path(adminServiceSubPath).HandlerFunc(httpTraceHdrs(rpcServer.ServeHTTP)) -} diff --git a/cmd/admin-rpc_test.go b/cmd/admin-rpc_test.go deleted file mode 100644 index f92176e1e..000000000 --- a/cmd/admin-rpc_test.go +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2018 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "net/http" - "net/http/httptest" - "testing" - "time" - - xnet "github.com/minio/minio/pkg/net" -) - -/////////////////////////////////////////////////////////////////////////////// -// -// localAdminClient and AdminRPCClient are adminCmdRunner interface compatible, -// hence below test functions are available for both clients. -// -/////////////////////////////////////////////////////////////////////////////// - -/////////////////////////////////////////////////////////////////////////////// -// -// Admin RPC server, adminRPCReceiver and AdminRPCClient are -// inter-dependent, below test functions are sufficient to test all of them. -// -/////////////////////////////////////////////////////////////////////////////// - -func testAdminCmdRunnerSignalService(t *testing.T, client adminCmdRunner) { - defer func(sigChan chan serviceSignal) { globalServiceSignalCh = sigChan }(globalServiceSignalCh) - globalServiceSignalCh = make(chan serviceSignal, 10) - - testCases := []struct { - signal serviceSignal - expectErr bool - }{ - {serviceRestart, false}, - {serviceStop, false}, - {serviceStatus, true}, - {serviceSignal(100), true}, - } - - for i, testCase := range testCases { - err := client.SignalService(testCase.signal) - expectErr := (err != nil) - - if expectErr != testCase.expectErr { - t.Fatalf("case %v: expected: %v, got: %v", i+1, testCase.expectErr, expectErr) - } - } -} - -func testAdminCmdRunnerServerInfo(t *testing.T, client adminCmdRunner) { - tmpGlobalBootTime := globalBootTime - tmpGlobalObjectAPI := globalObjectAPI - tmpGlobalConnStats := globalConnStats - tmpGlobalHTTPStats := globalHTTPStats - tmpGlobalNotificationSys := globalNotificationSys - defer func() { - globalBootTime = tmpGlobalBootTime - globalObjectAPI = tmpGlobalObjectAPI - globalConnStats = tmpGlobalConnStats - globalHTTPStats = tmpGlobalHTTPStats - globalNotificationSys = tmpGlobalNotificationSys - }() - - endpoints := new(EndpointList) - - notificationSys := NewNotificationSys(globalServerConfig, *endpoints) - - testCases := []struct { - bootTime time.Time - objectAPI ObjectLayer - connStats *ConnStats - httpStats *HTTPStats - notificationSys *NotificationSys - expectErr bool - }{ - {UTCNow(), &DummyObjectLayer{}, newConnStats(), newHTTPStats(), notificationSys, false}, - {time.Time{}, nil, nil, nil, nil, true}, - {UTCNow(), nil, nil, nil, nil, true}, - } - - for i, testCase := range testCases { - globalBootTime = testCase.bootTime - globalObjectAPI = testCase.objectAPI - globalConnStats = testCase.connStats - globalHTTPStats = testCase.httpStats - globalNotificationSys = testCase.notificationSys - _, err := client.ServerInfo() - expectErr := (err != nil) - - if expectErr != testCase.expectErr { - t.Fatalf("case %v: expected: %v, got: %v", i+1, testCase.expectErr, expectErr) - } - } -} - -func newAdminRPCHTTPServerClient(t *testing.T) (*httptest.Server, *AdminRPCClient, *serverConfig) { - rpcServer, err := NewAdminRPCServer() - if err != nil { - t.Fatalf("unexpected error %v", err) - } - - httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - rpcServer.ServeHTTP(w, r) - })) - - url, err := xnet.ParseURL(httpServer.URL) - if err != nil { - t.Fatalf("unexpected error %v", err) - } - - host, err := xnet.ParseHost(url.Host) - if err != nil { - t.Fatalf("unexpected error %v", err) - } - - prevGlobalServerConfig := globalServerConfig - globalServerConfig = newServerConfig() - - rpcClient, err := NewAdminRPCClient(host) - if err != nil { - t.Fatalf("unexpected error %v", err) - } - - return httpServer, rpcClient, prevGlobalServerConfig -} - -func TestAdminRPCClientSignalService(t *testing.T) { - httpServer, rpcClient, prevGlobalServerConfig := newAdminRPCHTTPServerClient(t) - defer httpServer.Close() - defer func() { - globalServerConfig = prevGlobalServerConfig - }() - - testAdminCmdRunnerSignalService(t, rpcClient) -} - -func TestAdminRPCClientServerInfo(t *testing.T) { - httpServer, rpcClient, prevGlobalServerConfig := newAdminRPCHTTPServerClient(t) - defer httpServer.Close() - defer func() { - globalServerConfig = prevGlobalServerConfig - }() - - testAdminCmdRunnerServerInfo(t, rpcClient) -} diff --git a/cmd/gateway-main.go b/cmd/gateway-main.go index dae337194..d22856c00 100644 --- a/cmd/gateway-main.go +++ b/cmd/gateway-main.go @@ -176,15 +176,6 @@ func StartGateway(ctx *cli.Context, gw Gateway) { // Add API router. registerAPIRouter(router, encryptionEnabled) - // Dummy endpoint representing gateway instance. - globalEndpoints = []Endpoint{{ - URL: &url.URL{Path: "/minio/gateway"}, - IsLocal: true, - }} - - // Initialize Admin Peers. - initGlobalAdminPeers(globalEndpoints) - var getCert certs.GetCertificateFunc if globalTLSCerts != nil { getCert = globalTLSCerts.GetCertificate diff --git a/cmd/globals.go b/cmd/globals.go index a6d1f1150..aeae47d3e 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -162,9 +162,6 @@ var ( // File to log HTTP request/response headers and body. globalHTTPTraceFile *os.File - // List of admin peers. - globalAdminPeers = adminPeers{} - // Minio server user agent string. globalServerUserAgent = "Minio/" + ReleaseTag + " (" + runtime.GOOS + "; " + runtime.GOARCH + ")" diff --git a/cmd/local-admin-client.go b/cmd/local-admin-client.go deleted file mode 100644 index 72411c4bb..000000000 --- a/cmd/local-admin-client.go +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2018 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "context" - "errors" - "os" - - "io/ioutil" -) - -// localAdminClient - represents admin operation to be executed locally. -type localAdminClient struct{} - -// SignalService - sends a restart or stop signal to the local server -func (lc localAdminClient) SignalService(s serviceSignal) error { - switch s { - case serviceRestart, serviceStop: - globalServiceSignalCh <- s - default: - return errUnsupportedSignal - } - return nil -} - -// ReInitFormat - re-initialize disk format. -func (lc localAdminClient) ReInitFormat(dryRun bool) error { - objectAPI := newObjectLayerFn() - if objectAPI == nil { - return errServerNotInitialized - } - return objectAPI.ReloadFormat(context.Background(), dryRun) -} - -// ServerInfo - Returns the server info of this server. -func (lc localAdminClient) ServerInfo() (sid ServerInfoData, e error) { - if globalBootTime.IsZero() { - return sid, errServerNotInitialized - } - - // Build storage info - objLayer := newObjectLayerFn() - if objLayer == nil { - return sid, errServerNotInitialized - } - storage := objLayer.StorageInfo(context.Background()) - - return ServerInfoData{ - StorageInfo: storage, - ConnStats: globalConnStats.toServerConnStats(), - HTTPStats: globalHTTPStats.toServerHTTPStats(), - Properties: ServerProperties{ - Uptime: UTCNow().Sub(globalBootTime), - Version: Version, - CommitID: CommitID, - SQSARN: globalNotificationSys.GetARNList(), - Region: globalServerConfig.GetRegion(), - }, - }, nil -} - -// StartProfiling - starts profiling on the local server. -func (lc localAdminClient) StartProfiling(profiler string) error { - if globalProfiler != nil { - globalProfiler.Stop() - } - prof, err := startProfiler(profiler, "") - if err != nil { - return err - } - globalProfiler = prof - return nil -} - -// DownloadProfilingData - stops and returns profiling data of the local server. -func (lc localAdminClient) DownloadProfilingData() ([]byte, error) { - if globalProfiler == nil { - return nil, errors.New("profiler not enabled") - } - - profilerPath := globalProfiler.Path() - - // Stop the profiler - globalProfiler.Stop() - - profilerFile, err := os.Open(profilerPath) - if err != nil { - return nil, err - } - - data, err := ioutil.ReadAll(profilerFile) - if err != nil { - return nil, err - } - - return data, nil -} diff --git a/cmd/local-admin-client_test.go b/cmd/local-admin-client_test.go deleted file mode 100644 index c6a2355ff..000000000 --- a/cmd/local-admin-client_test.go +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2018 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "testing" -) - -func TestLocalAdminClientSignalService(t *testing.T) { - testAdminCmdRunnerSignalService(t, &localAdminClient{}) -} - -func TestLocalAdminClientServerInfo(t *testing.T) { - testAdminCmdRunnerServerInfo(t, &localAdminClient{}) -} diff --git a/cmd/notification.go b/cmd/notification.go index 8ad2e2eea..73b83506f 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -17,11 +17,13 @@ package cmd import ( + "archive/zip" "bytes" "context" "encoding/json" "encoding/xml" "fmt" + "io" "net" "net/url" "path" @@ -90,85 +92,252 @@ func (sys *NotificationSys) DeleteBucket(ctx context.Context, bucketName string) }() } -// ReloadFormat - calls ReloadFormat RPC call on all peers. -func (sys *NotificationSys) ReloadFormat(dryRun bool) map[xnet.Host]error { - errors := make(map[xnet.Host]error) - var wg sync.WaitGroup - for addr, client := range sys.peerRPCClientMap { - wg.Add(1) - go func(addr xnet.Host, client *PeerRPCClient) { - defer wg.Done() - // Try to load format in three attempts, before giving up. - for i := 0; i < 3; i++ { - err := client.ReloadFormat(dryRun) - if err == nil { - break +// A NotificationGroup is a collection of goroutines working on subtasks that are part of +// the same overall task. +// +// A zero NotificationGroup is valid and does not cancel on error. +type NotificationGroup struct { + wg sync.WaitGroup + errs []NotificationPeerErr +} + +// WithNPeers returns a new NotificationGroup with length of errs slice upto nerrs, +// upon Wait() errors are returned collected from all tasks. +func WithNPeers(nerrs int) *NotificationGroup { + return &NotificationGroup{errs: make([]NotificationPeerErr, nerrs)} +} + +// Wait blocks until all function calls from the Go method have returned, then +// returns the slice of errors from all function calls. +func (g *NotificationGroup) Wait() []NotificationPeerErr { + g.wg.Wait() + return g.errs +} + +// Go calls the given function in a new goroutine. +// +// The first call to return a non-nil error will be +// collected in errs slice and returned by Wait(). +func (g *NotificationGroup) Go(ctx context.Context, f func() error, index int, addr xnet.Host) { + g.wg.Add(1) + + go func() { + defer g.wg.Done() + g.errs[index] = NotificationPeerErr{ + Host: addr, + } + for i := 0; i < 3; i++ { + if err := f(); err != nil { + g.errs[index].Err = err + // Last iteration log the error. + if i == 2 { + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String()) + ctx := logger.SetReqInfo(ctx, reqInfo) + logger.LogIf(ctx, err) } - errors[addr] = err // Wait for one second and no need wait after last attempt. if i < 2 { time.Sleep(1 * time.Second) } + continue } - }(addr, client) - } - wg.Wait() + break + } + }() +} - return errors +// ReloadFormat - calls ReloadFormat RPC call on all peers. +func (sys *NotificationSys) ReloadFormat(dryRun bool) []NotificationPeerErr { + var idx = 0 + ng := WithNPeers(len(sys.peerRPCClientMap)) + for addr, client := range sys.peerRPCClientMap { + client := client + ng.Go(context.Background(), func() error { + return client.ReloadFormat(dryRun) + }, idx, addr) + idx++ + } + return ng.Wait() } // LoadUsers - calls LoadUsers RPC call on all peers. -func (sys *NotificationSys) LoadUsers() map[xnet.Host]error { - errors := make(map[xnet.Host]error) - var wg sync.WaitGroup +func (sys *NotificationSys) LoadUsers() []NotificationPeerErr { + var idx = 0 + ng := WithNPeers(len(sys.peerRPCClientMap)) for addr, client := range sys.peerRPCClientMap { - wg.Add(1) - go func(addr xnet.Host, client *PeerRPCClient) { - defer wg.Done() - // Try to load users in three attempts. - for i := 0; i < 3; i++ { - err := client.LoadUsers() - if err == nil { - break - } - errors[addr] = err - // Wait for one second and no need wait after last attempt. - if i < 2 { - time.Sleep(1 * time.Second) - } - } - }(addr, client) + ng.Go(context.Background(), client.LoadUsers, idx, addr) + idx++ } - wg.Wait() - - return errors + return ng.Wait() } // LoadCredentials - calls LoadCredentials RPC call on all peers. -func (sys *NotificationSys) LoadCredentials() map[xnet.Host]error { - errors := make(map[xnet.Host]error) +func (sys *NotificationSys) LoadCredentials() []NotificationPeerErr { + var idx = 0 + ng := WithNPeers(len(sys.peerRPCClientMap)) + for addr, client := range sys.peerRPCClientMap { + ng.Go(context.Background(), client.LoadCredentials, idx, addr) + idx++ + } + return ng.Wait() +} + +// StartProfiling - start profiling on remote peers, by initiating a remote RPC. +func (sys *NotificationSys) StartProfiling(profiler string) []NotificationPeerErr { + var idx = 0 + ng := WithNPeers(len(sys.peerRPCClientMap)) + for addr, client := range sys.peerRPCClientMap { + client := client + ng.Go(context.Background(), func() error { + return client.StartProfiling(profiler) + }, idx, addr) + idx++ + } + return ng.Wait() +} + +// DownloadProfilingData - download profiling data from all remote peers. +func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io.Writer) bool { + profilingDataFound := false + + // Initialize a zip writer which will provide a zipped content + // of profiling data of all nodes + zipWriter := zip.NewWriter(writer) + defer zipWriter.Close() + + for addr, client := range sys.peerRPCClientMap { + data, err := client.DownloadProfilingData() + if err != nil { + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String()) + ctx := logger.SetReqInfo(ctx, reqInfo) + logger.LogIf(ctx, err) + continue + } + + profilingDataFound = true + + // Send profiling data to zip as file + header, zerr := zip.FileInfoHeader(dummyFileInfo{ + name: fmt.Sprintf("profiling-%s.pprof", addr), + size: int64(len(data)), + mode: 0600, + modTime: UTCNow(), + isDir: false, + sys: nil, + }) + if zerr != nil { + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String()) + ctx := logger.SetReqInfo(ctx, reqInfo) + logger.LogIf(ctx, zerr) + continue + } + zwriter, zerr := zipWriter.CreateHeader(header) + if zerr != nil { + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String()) + ctx := logger.SetReqInfo(ctx, reqInfo) + logger.LogIf(ctx, zerr) + continue + } + if _, err = io.Copy(zwriter, bytes.NewBuffer(data)); err != nil { + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String()) + ctx := logger.SetReqInfo(ctx, reqInfo) + logger.LogIf(ctx, err) + continue + } + } + + thisAddr, err := xnet.ParseHost(GetLocalPeer(globalEndpoints)) + if err != nil { + logger.LogIf(ctx, err) + return profilingDataFound + } + + data, err := getProfileData() + if err != nil { + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", thisAddr.String()) + ctx := logger.SetReqInfo(ctx, reqInfo) + logger.LogIf(ctx, err) + return profilingDataFound + } + + profilingDataFound = true + + // Send profiling data to zip as file + header, zerr := zip.FileInfoHeader(dummyFileInfo{ + name: fmt.Sprintf("profiling-%s.pprof", thisAddr), + size: int64(len(data)), + mode: 0600, + modTime: UTCNow(), + isDir: false, + sys: nil, + }) + + zwriter, zerr := zipWriter.CreateHeader(header) + if zerr != nil { + return profilingDataFound + } + + if _, err = io.Copy(zwriter, bytes.NewBuffer(data)); err != nil { + return profilingDataFound + } + + return profilingDataFound +} + +// SignalService - calls signal service RPC call on all peers. +func (sys *NotificationSys) SignalService(sig serviceSignal) []NotificationPeerErr { + var idx = 0 + ng := WithNPeers(len(sys.peerRPCClientMap)) + for addr, client := range sys.peerRPCClientMap { + client := client + ng.Go(context.Background(), func() error { + return client.SignalService(sig) + }, idx, addr) + idx++ + } + return ng.Wait() +} + +// ServerInfo - calls ServerInfo RPC call on all peers. +func (sys *NotificationSys) ServerInfo(ctx context.Context) []ServerInfo { + var idx = 0 + serverInfo := make([]ServerInfo, len(sys.peerRPCClientMap)) var wg sync.WaitGroup for addr, client := range sys.peerRPCClientMap { wg.Add(1) - go func(addr xnet.Host, client *PeerRPCClient) { + go func(idx int, addr xnet.Host, client *PeerRPCClient) { defer wg.Done() - // Try to load credentials in three attempts. + // Try to fetch serverInfo remotely in three attempts. for i := 0; i < 3; i++ { - err := client.LoadCredentials() + info, err := client.ServerInfo() if err == nil { - break + serverInfo[idx] = ServerInfo{ + Addr: addr.String(), + Data: &info, + } + return + } + serverInfo[idx] = ServerInfo{ + Addr: addr.String(), + Data: &info, + Error: err.Error(), + } + // Last iteration log the error. + if i == 2 { + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String()) + ctx := logger.SetReqInfo(ctx, reqInfo) + logger.LogIf(ctx, err) } - errors[addr] = err // Wait for one second and no need wait after last attempt. if i < 2 { time.Sleep(1 * time.Second) } } - }(addr, client) + }(idx, addr, client) + idx++ } wg.Wait() - - return errors + return serverInfo } // SetBucketPolicy - calls SetBucketPolicy RPC call on all peers. diff --git a/cmd/peer-rpc-client.go b/cmd/peer-rpc-client.go index 4b71473c7..76b8c3958 100644 --- a/cmd/peer-rpc-client.go +++ b/cmd/peer-rpc-client.go @@ -168,6 +168,38 @@ func (rpcClient *PeerRPCClient) CPULoadInfo() (ServerCPULoadInfo, error) { return reply, err } +// StartProfiling - starts profiling on the remote server. +func (rpcClient *PeerRPCClient) StartProfiling(profiler string) error { + args := StartProfilingArgs{Profiler: profiler} + reply := VoidReply{} + return rpcClient.Call(peerServiceName+".StartProfiling", &args, &reply) +} + +// DownloadProfilingData - download already running profiling on the remote server. +func (rpcClient *PeerRPCClient) DownloadProfilingData() ([]byte, error) { + args := AuthArgs{} + var reply []byte + err := rpcClient.Call(peerServiceName+".DownloadProfilingData", &args, &reply) + return reply, err +} + +// SignalService - calls load server info RPC. +func (rpcClient *PeerRPCClient) SignalService(sig serviceSignal) error { + args := SignalServiceArgs{Sig: sig} + reply := VoidReply{} + + return rpcClient.Call(peerServiceName+".SignalService", &args, &reply) +} + +// ServerInfo - calls load server info RPC. +func (rpcClient *PeerRPCClient) ServerInfo() (ServerInfoData, error) { + args := AuthArgs{} + reply := ServerInfoData{} + + err := rpcClient.Call(peerServiceName+".ServerInfo", &args, &reply) + return reply, err +} + // NewPeerRPCClient - returns new peer RPC client. func NewPeerRPCClient(host *xnet.Host) (*PeerRPCClient, error) { scheme := "http" diff --git a/cmd/peer-rpc-server.go b/cmd/peer-rpc-server.go index b267cf13e..2c06060ce 100644 --- a/cmd/peer-rpc-server.go +++ b/cmd/peer-rpc-server.go @@ -20,6 +20,8 @@ import ( "context" "fmt" "path" + "sort" + "time" "github.com/gorilla/mux" "github.com/minio/minio/cmd/logger" @@ -30,7 +32,7 @@ import ( ) const peerServiceName = "Peer" -const peerServiceSubPath = "/s3/remote" +const peerServiceSubPath = "/peer/remote" var peerServicePath = path.Join(minioReservedBucketPath, peerServiceSubPath) @@ -118,7 +120,8 @@ type ListenBucketNotificationArgs struct { Addr xnet.Host `json:"addr"` } -// ListenBucketNotification - handles listen bucket notification RPC call. It creates PeerRPCClient target which pushes requested events to target in remote peer. +// ListenBucketNotification - handles listen bucket notification RPC call. +// It creates PeerRPCClient target which pushes requested events to target in remote peer. func (receiver *peerRPCReceiver) ListenBucketNotification(args *ListenBucketNotificationArgs, reply *VoidReply) error { objAPI := newObjectLayerFn() if objAPI == nil { @@ -269,6 +272,117 @@ func (receiver *peerRPCReceiver) MemUsageInfo(args *AuthArgs, reply *ServerMemUs return nil } +// uptimes - used to sort uptimes in chronological order. +type uptimes []time.Duration + +func (ts uptimes) Len() int { + return len(ts) +} + +func (ts uptimes) Less(i, j int) bool { + return ts[i] < ts[j] +} + +func (ts uptimes) Swap(i, j int) { + ts[i], ts[j] = ts[j], ts[i] +} + +// getPeerUptimes - returns the uptime. +func getPeerUptimes(serverInfo []ServerInfo) time.Duration { + // In a single node Erasure or FS backend setup the uptime of + // the setup is the uptime of the single minio server + // instance. + if !globalIsDistXL { + return UTCNow().Sub(globalBootTime) + } + + var times []time.Duration + + for _, info := range serverInfo { + if info.Error != "" { + continue + } + times = append(times, info.Data.Properties.Uptime) + } + + // Sort uptimes in chronological order. + sort.Sort(uptimes(times)) + + // Return the latest time as the uptime. + return times[0] +} + +// StartProfilingArgs - holds the RPC argument for StartingProfiling RPC call +type StartProfilingArgs struct { + AuthArgs + Profiler string +} + +// StartProfiling - profiling server receiver. +func (receiver *peerRPCReceiver) StartProfiling(args *StartProfilingArgs, reply *VoidReply) error { + if globalProfiler != nil { + globalProfiler.Stop() + } + var err error + globalProfiler, err = startProfiler(args.Profiler, "") + return err +} + +// DownloadProfilingData - download profiling data. +func (receiver *peerRPCReceiver) DownloadProfilingData(args *AuthArgs, reply *[]byte) error { + var err error + *reply, err = getProfileData() + return err +} + +var errUnsupportedSignal = fmt.Errorf("unsupported signal: only restart and stop signals are supported") + +// SignalServiceArgs - send event RPC arguments. +type SignalServiceArgs struct { + AuthArgs + Sig serviceSignal +} + +// SignalService - signal service receiver. +func (receiver *peerRPCReceiver) SignalService(args *SignalServiceArgs, reply *VoidReply) error { + switch args.Sig { + case serviceRestart, serviceStop: + globalServiceSignalCh <- args.Sig + default: + return errUnsupportedSignal + } + return nil +} + +// ServerInfo - server info receiver. +func (receiver *peerRPCReceiver) ServerInfo(args *AuthArgs, reply *ServerInfoData) error { + if globalBootTime.IsZero() { + return errServerNotInitialized + } + + // Build storage info + objLayer := newObjectLayerFn() + if objLayer == nil { + return errServerNotInitialized + } + + // Server info data. + *reply = ServerInfoData{ + StorageInfo: objLayer.StorageInfo(context.Background()), + ConnStats: globalConnStats.toServerConnStats(), + HTTPStats: globalHTTPStats.toServerHTTPStats(), + Properties: ServerProperties{ + Uptime: UTCNow().Sub(globalBootTime), + Version: Version, + CommitID: CommitID, + SQSARN: globalNotificationSys.GetARNList(), + Region: globalServerConfig.GetRegion(), + }, + } + + return nil +} + // NewPeerRPCServer - returns new peer RPC server. func NewPeerRPCServer() (*xrpc.Server, error) { rpcServer := xrpc.NewServer() diff --git a/cmd/routers.go b/cmd/routers.go index 7de264830..3fa047d87 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -41,7 +41,7 @@ func registerDistXLRouters(router *mux.Router, endpoints EndpointList) { // Register distributed namespace lock. registerDistNSLockRouter(router) - // Register S3 peer communication router. + // Register peer communication router. registerPeerRPCRouter(router) } @@ -104,9 +104,6 @@ func configureServerHandler(endpoints EndpointList) (http.Handler, error) { // Add STS router always. registerSTSRouter(router) - // Add Admin RPC router - registerAdminRPCRouter(router) - // Add Admin router, all APIs are enabled in server mode. registerAdminRouter(router, true) diff --git a/cmd/server-main.go b/cmd/server-main.go index a95181641..2adcda099 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -293,9 +293,6 @@ func serverMain(ctx *cli.Context) { logger.Fatal(uiErrUnexpectedError(err), "Unable to configure one of server's RPC services") } - // Initialize Admin Peers inter-node communication only in distributed setup. - initGlobalAdminPeers(globalEndpoints) - var getCert certs.GetCertificateFunc if globalTLSCerts != nil { getCert = globalTLSCerts.GetCertificate diff --git a/cmd/sts-handlers.go b/cmd/sts-handlers.go index 1abc960fb..a27b85b8e 100644 --- a/cmd/sts-handlers.go +++ b/cmd/sts-handlers.go @@ -123,10 +123,10 @@ func (sts *stsAPIHandlers) AssumeRoleWithWebIdentity(w http.ResponseWriter, r *h } // Notify all other Minio peers to reload temp users - for host, err := range globalNotificationSys.LoadUsers() { - if err != nil { - logger.GetReqInfo(ctx).SetTags("peerAddress", host.String()) - logger.LogIf(ctx, err) + for _, nerr := range globalNotificationSys.LoadUsers() { + if nerr.Err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(ctx, nerr.Err) } } @@ -207,10 +207,10 @@ func (sts *stsAPIHandlers) AssumeRoleWithClientGrants(w http.ResponseWriter, r * } // Notify all other Minio peers to reload temp users - for host, err := range globalNotificationSys.LoadUsers() { - if err != nil { - logger.GetReqInfo(ctx).SetTags("peerAddress", host.String()) - logger.LogIf(ctx, err) + for _, nerr := range globalNotificationSys.LoadUsers() { + if nerr.Err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(ctx, nerr.Err) } } diff --git a/cmd/utils.go b/cmd/utils.go index a577c23ff..b32ebd01e 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -208,6 +208,26 @@ func (p profilerWrapper) Path() string { return p.pathFn() } +// Returns current profile data, returns error if there is no active +// profiling in progress. Stops an active profile. +func getProfileData() ([]byte, error) { + if globalProfiler == nil { + return nil, errors.New("profiler not enabled") + } + + profilerPath := globalProfiler.Path() + + // Stop the profiler + globalProfiler.Stop() + + profilerFile, err := os.Open(profilerPath) + if err != nil { + return nil, err + } + + return ioutil.ReadAll(profilerFile) +} + // Starts a profiler returns nil if profiler is not enabled, caller needs to handle this. func startProfiler(profilerType, dirPath string) (interface { Stop() diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index 1ee0be098..086e86355 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -752,10 +752,10 @@ func (web *webAPIHandlers) SetAuth(r *http.Request, args *SetAuthArgs, reply *Se if errs := globalNotificationSys.LoadCredentials(); len(errs) != 0 { reply.PeerErrMsgs = make(map[string]string) - for host, err := range errs { - err = fmt.Errorf("Unable to update credentials on server %v: %v", host, err) + for _, nerr := range errs { + err = fmt.Errorf("Unable to update credentials on server %v: %v", nerr.Host, nerr.Err) logger.LogIf(context.Background(), err) - reply.PeerErrMsgs[host.String()] = err.Error() + reply.PeerErrMsgs[nerr.Host.String()] = err.Error() } } else { reply.Token, err = authenticateWeb(creds.AccessKey, creds.SecretKey)