diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index d57f7cebc..afde2d4fa 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -1817,3 +1817,68 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) { } } } + +// The handler sends console logs to the connected HTTP client. +func (a adminAPIHandlers) ConsoleLogHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "ConsoleLog") + + objectAPI := validateAdminReq(ctx, w, r) + if objectAPI == nil { + return + } + node := r.URL.Query().Get("node") + // limit buffered console entries if client requested it. + limitStr := r.URL.Query().Get("limit") + limitLines, err := strconv.Atoi(limitStr) + if err != nil { + limitLines = 10 + } + // Avoid reusing tcp connection if read timeout is hit + // This is needed to make r.Context().Done() work as + // expected in case of read timeout + w.Header().Add("Connection", "close") + w.Header().Set(xhttp.ContentType, "text/event-stream") + + doneCh := make(chan struct{}) + defer close(doneCh) + logCh := make(chan interface{}, 4000) + + remoteHosts := getRemoteHosts(globalEndpoints) + peers, err := getRestClients(remoteHosts) + if err != nil { + return + } + + globalConsoleSys.Subscribe(logCh, doneCh, node, limitLines, nil) + + for _, peer := range peers { + if node == "" || strings.ToLower(peer.host.Name) == strings.ToLower(node) { + peer.ConsoleLog(logCh, doneCh) + } + } + + enc := json.NewEncoder(w) + + keepAliveTicker := time.NewTicker(500 * time.Millisecond) + defer keepAliveTicker.Stop() + + for { + select { + case entry := <-logCh: + log := entry.(madmin.LogInfo) + if log.SendLog(node) { + if err := enc.Encode(log); err != nil { + return + } + w.(http.Flusher).Flush() + } + case <-keepAliveTicker.C: + if _, err := w.Write([]byte(" ")); err != nil { + return + } + w.(http.Flusher).Flush() + case <-GlobalServiceDoneCh: + return + } + } +} diff --git a/cmd/admin-router.go b/cmd/admin-router.go index fd3ec0ad6..71b9b6f01 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -135,6 +135,9 @@ func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool) // HTTP Trace adminV1Router.Methods(http.MethodGet).Path("/trace").HandlerFunc(adminAPI.TraceHandler) + // Console Logs + adminV1Router.Methods(http.MethodGet).Path("/log").HandlerFunc(httpTraceAll(adminAPI.ConsoleLogHandler)) + // If none of the routes match, return error. adminV1Router.NotFoundHandler = http.HandlerFunc(httpTraceHdrs(notFoundHandlerJSON)) } diff --git a/cmd/common-main.go b/cmd/common-main.go index e6984edb7..207e07b3c 100644 --- a/cmd/common-main.go +++ b/cmd/common-main.go @@ -31,7 +31,6 @@ import ( "github.com/minio/cli" "github.com/minio/minio-go/v6/pkg/set" "github.com/minio/minio/cmd/logger" - "github.com/minio/minio/cmd/logger/target/console" "github.com/minio/minio/cmd/logger/target/http" "github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/dns" @@ -97,7 +96,7 @@ func loadLoggers() { if globalServerConfig.Logger.Console.Enabled { // Enable console logging - logger.AddTarget(console.New()) + logger.AddTarget(globalConsoleSys.Console()) } } diff --git a/cmd/consolelogger.go b/cmd/consolelogger.go new file mode 100644 index 000000000..9d300c0b1 --- /dev/null +++ b/cmd/consolelogger.go @@ -0,0 +1,128 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + ring "container/ring" + "context" + + "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/cmd/logger/message/log" + "github.com/minio/minio/cmd/logger/target/console" + "github.com/minio/minio/pkg/madmin" + xnet "github.com/minio/minio/pkg/net" + "github.com/minio/minio/pkg/pubsub" +) + +// number of log messages to buffer +const defaultLogBufferCount = 10000 + +//HTTPConsoleLoggerSys holds global console logger state +type HTTPConsoleLoggerSys struct { + pubsub *pubsub.PubSub + console *console.Target + nodeName string + logBuf *ring.Ring +} + +// NewConsoleLogger - creates new HTTPConsoleLoggerSys with all nodes subscribed to +// the console logging pub sub system +func NewConsoleLogger(ctx context.Context, endpoints EndpointList) *HTTPConsoleLoggerSys { + host, err := xnet.ParseHost(GetLocalPeer(globalEndpoints)) + if err != nil { + logger.FatalIf(err, "Unable to start console logging subsystem") + } + var nodeName string + if globalIsDistXL { + nodeName = host.Name + } + ps := pubsub.New() + return &HTTPConsoleLoggerSys{ + ps, nil, nodeName, ring.New(defaultLogBufferCount), + } +} + +// HasLogListeners returns true if console log listeners are registered +// for this node or peers +func (sys *HTTPConsoleLoggerSys) HasLogListeners() bool { + return sys != nil && sys.pubsub.HasSubscribers() +} + +// Subscribe starts console logging for this node. +func (sys *HTTPConsoleLoggerSys) Subscribe(subCh chan interface{}, doneCh chan struct{}, node string, last int, filter func(entry interface{}) bool) { + // Enable console logging for remote client even if local console logging is disabled in the config. + if !globalServerConfig.Logger.Console.Enabled && !sys.pubsub.HasSubscribers() { + logger.AddTarget(globalConsoleSys.Console()) + } + + cnt := 0 + // by default send all console logs in the ring buffer unless node or limit query parameters + // are set. + var lastN []madmin.LogInfo + if last > defaultLogBufferCount || last <= 0 { + last = defaultLogBufferCount + } + + lastN = make([]madmin.LogInfo, last) + r := sys.logBuf + r.Do(func(p interface{}) { + if p != nil && (p.(madmin.LogInfo)).SendLog(node) { + lastN[cnt%last] = p.(madmin.LogInfo) + cnt++ + } + }) + // send last n console log messages in order filtered by node + if cnt > 0 { + for i := 0; i < last; i++ { + entry := lastN[(cnt+i)%last] + if (entry == madmin.LogInfo{}) { + continue + } + select { + case subCh <- entry: + case <-doneCh: + return + } + } + } + sys.pubsub.Subscribe(subCh, doneCh, filter) +} + +// Console returns a console target +func (sys *HTTPConsoleLoggerSys) Console() *HTTPConsoleLoggerSys { + if sys.console == nil { + sys.console = console.New() + } + return sys +} + +// Send log message 'e' to console and publish to console +// log pubsub system +func (sys *HTTPConsoleLoggerSys) Send(e interface{}) error { + lg := madmin.LogInfo{} + lg.Entry = e.(log.Entry) + lg.NodeName = sys.nodeName + sys.pubsub.Publish(lg) + // add log to ring buffer + sys.logBuf.Value = lg + sys.logBuf = sys.logBuf.Next() + + if globalServerConfig.Logger.Console.Enabled { + return sys.console.Send(e) + } + return nil +} diff --git a/cmd/gateway-main.go b/cmd/gateway-main.go index 6d1365f93..786c32a6d 100644 --- a/cmd/gateway-main.go +++ b/cmd/gateway-main.go @@ -159,6 +159,9 @@ func StartGateway(ctx *cli.Context, gw Gateway) { registerSTSRouter(router) } + // initialize globalConsoleSys system + globalConsoleSys = NewConsoleLogger(context.Background(), globalEndpoints) + enableConfigOps := gatewayName == "nas" enableIAMOps := globalEtcdClient != nil diff --git a/cmd/globals.go b/cmd/globals.go index 8fe4f7b2c..76d96c8a3 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -174,6 +174,10 @@ var ( // registered listeners globalHTTPTrace = pubsub.New() + // global console system to send console logs to + // registered listeners + globalConsoleSys *HTTPConsoleLoggerSys + globalEndpoints EndpointList // Global server's network statistics diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index c32174901..3c1860f4c 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -629,6 +629,48 @@ func (client *peerRESTClient) Trace(traceCh chan interface{}, doneCh chan struct }() } +// ConsoleLog - sends request to peer nodes to get console logs +func (client *peerRESTClient) ConsoleLog(logCh chan interface{}, doneCh chan struct{}) { + go func() { + for { + // get cancellation context to properly unsubscribe peers + ctx, cancel := context.WithCancel(context.Background()) + respBody, err := client.callWithContext(ctx, peerRESTMethodLog, nil, nil, -1) + if err != nil { + // Retry the failed request. + time.Sleep(5 * time.Second) + } else { + dec := gob.NewDecoder(respBody) + + go func() { + <-doneCh + cancel() + }() + + for { + var log madmin.LogInfo + if err = dec.Decode(&log); err != nil { + break + } + select { + case logCh <- log: + default: + } + } + } + + select { + case <-doneCh: + cancel() + http.DrainBody(respBody) + return + default: + // There was error in the REST request, retry. + } + } + }() +} + func getRemoteHosts(endpoints EndpointList) []*xnet.Host { var remoteHosts []*xnet.Host for _, hostStr := range GetRemotePeers(endpoints) { diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index 27de8f01f..57f4954e5 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -51,6 +51,7 @@ const ( peerRESTMethodTrace = "trace" peerRESTMethodBucketLifecycleSet = "setbucketlifecycle" peerRESTMethodBucketLifecycleRemove = "removebucketlifecycle" + peerRESTMethodLog = "log" ) const ( diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 5f437d281..cfe534d05 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -32,6 +32,7 @@ import ( "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/lifecycle" + "github.com/minio/minio/pkg/madmin" xnet "github.com/minio/minio/pkg/net" "github.com/minio/minio/pkg/policy" trace "github.com/minio/minio/pkg/trace" @@ -903,6 +904,38 @@ func (s *peerRESTServer) BackgroundOpsStatusHandler(w http.ResponseWriter, r *ht logger.LogIf(ctx, gob.NewEncoder(w).Encode(state)) } +// ConsoleLogHandler sends console logs of this node back to peer rest client +func (s *peerRESTServer) ConsoleLogHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + w.Header().Set("Connection", "close") + w.WriteHeader(http.StatusOK) + w.(http.Flusher).Flush() + + doneCh := make(chan struct{}) + defer close(doneCh) + + ch := make(chan interface{}, 2000) + globalConsoleSys.Subscribe(ch, doneCh, "", 0, nil) + + enc := gob.NewEncoder(w) + for { + select { + case entry := <-ch: + log := entry.(madmin.LogInfo) + if err := enc.Encode(log); err != nil { + return + } + w.(http.Flusher).Flush() + case <-r.Context().Done(): + return + } + } +} + func (s *peerRESTServer) writeErrorResponse(w http.ResponseWriter, err error) { w.WriteHeader(http.StatusForbidden) w.Write([]byte(err.Error())) @@ -958,6 +991,7 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(SlashSeparator + peerRESTMethodTrace).HandlerFunc(server.TraceHandler) subrouter.Methods(http.MethodPost).Path(SlashSeparator + peerRESTMethodBackgroundHealStatus).HandlerFunc(server.BackgroundHealStatusHandler) + subrouter.Methods(http.MethodPost).Path(SlashSeparator + peerRESTMethodLog).HandlerFunc(server.ConsoleLogHandler) router.NotFoundHandler = http.HandlerFunc(httpTraceAll(notFoundHandler)) } diff --git a/cmd/server-main.go b/cmd/server-main.go index a6053b698..f89a4d51b 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -289,6 +289,8 @@ func serverMain(ctx *cli.Context) { globalSweepHealState = initHealState() } + // initialize globalConsoleSys system + globalConsoleSys = NewConsoleLogger(context.Background(), globalEndpoints) // Configure server. var handler http.Handler handler, err = configureServerHandler(globalEndpoints) diff --git a/pkg/madmin/api-log.go b/pkg/madmin/api-log.go new file mode 100644 index 000000000..e9e5bc5c5 --- /dev/null +++ b/pkg/madmin/api-log.go @@ -0,0 +1,85 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package madmin + +import ( + "encoding/json" + "net/http" + "net/url" + "strconv" + "strings" + + "github.com/minio/minio/cmd/logger/message/log" +) + +// LogInfo holds console log messages +type LogInfo struct { + log.Entry + NodeName string `json:"node"` + Err error `json:"-"` +} + +// SendLog returns true if log pertains to node specified in args. +func (l LogInfo) SendLog(node string) bool { + return node == "" || strings.ToLower(node) == strings.ToLower(l.NodeName) +} + +// GetLogs - listen on console log messages. +func (adm AdminClient) GetLogs(node string, lineCnt int, doneCh <-chan struct{}) <-chan LogInfo { + logCh := make(chan LogInfo, 1) + + // Only success, start a routine to start reading line by line. + go func(logCh chan<- LogInfo) { + defer close(logCh) + urlValues := make(url.Values) + urlValues.Set("node", node) + urlValues.Set("limit", strconv.Itoa(lineCnt)) + for { + reqData := requestData{ + relPath: "/v1/log", + queryValues: urlValues, + } + // Execute GET to call log handler + resp, err := adm.executeMethod("GET", reqData) + if err != nil { + closeResponse(resp) + return + } + + if resp.StatusCode != http.StatusOK { + logCh <- LogInfo{Err: httpRespToErrorResponse(resp)} + return + } + dec := json.NewDecoder(resp.Body) + for { + var info LogInfo + if err = dec.Decode(&info); err != nil { + break + } + select { + case <-doneCh: + return + case logCh <- info: + } + } + + } + }(logCh) + + // Returns the log info channel, for caller to start reading from. + return logCh +} diff --git a/pkg/pubsub/pubsub.go b/pkg/pubsub/pubsub.go index 2f9d45985..795ff99fd 100644 --- a/pkg/pubsub/pubsub.go +++ b/pkg/pubsub/pubsub.go @@ -40,7 +40,7 @@ func (ps *PubSub) Publish(item interface{}) { defer ps.RUnlock() for _, sub := range ps.subs { - if sub.filter(item) { + if sub.filter == nil || sub.filter(item) { select { case sub.ch <- item: default: