From 97090aa16cdd1b8525094248e746afa7738f7d2e Mon Sep 17 00:00:00 2001 From: poornas Date: Sat, 8 Jun 2019 15:54:41 -0700 Subject: [PATCH] Add admin API to send trace notifications to registered (#7128) Remove current functionality to log trace to file using MINIO_HTTP_TRACE env, and replace it with mc admin trace command on mc client. --- cmd/admin-handlers.go | 37 +++++++ cmd/admin-router.go | 2 + cmd/common-main.go | 7 -- cmd/gateway-main.go | 3 + cmd/globals.go | 5 +- cmd/handler-utils.go | 21 ++-- {pkg/handlers => cmd}/http-tracer.go | 134 ++++++++++------------- cmd/httptrace.go | 115 ++++++++++++++++++++ cmd/peer-rest-client-target.go | 2 +- cmd/peer-rest-client.go | 61 ++++++++++- cmd/peer-rest-common.go | 2 + cmd/peer-rest-server.go | 45 ++++++++ cmd/rest/client.go | 12 ++- cmd/server-main.go | 3 + cmd/signals.go | 3 - cmd/utils.go | 10 -- cmd/utils_test.go | 13 --- pkg/handlers/http-tracer_test.go | 154 --------------------------- pkg/madmin/README.md | 21 +++- pkg/madmin/api-trace.go | 92 ++++++++++++++++ pkg/madmin/examples/trace.go | 50 +++++++++ pkg/pubsub/pubsub.go | 83 +++++++++++++++ pkg/pubsub/pubsub_test.go | 66 ++++++++++++ pkg/trace/trace.go | 49 +++++++++ 24 files changed, 708 insertions(+), 282 deletions(-) rename {pkg/handlers => cmd}/http-tracer.go (56%) create mode 100644 cmd/httptrace.go delete mode 100644 pkg/handlers/http-tracer_test.go create mode 100644 pkg/madmin/api-trace.go create mode 100644 pkg/madmin/examples/trace.go create mode 100644 pkg/pubsub/pubsub.go create mode 100644 pkg/pubsub/pubsub_test.go create mode 100644 pkg/trace/trace.go diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index f79986f39..5a8b44dc3 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -1419,3 +1419,40 @@ func (a adminAPIHandlers) SetConfigKeysHandler(w http.ResponseWriter, r *http.Re // Send success response writeSuccessResponseHeadersOnly(w) } + +// TraceHandler - POST /minio/admin/v1/trace +// ---------- +// The handler sends http trace to the connected HTTP client. +func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "HTTPTrace") + trcAll := r.URL.Query().Get("all") == "true" + objectAPI := validateAdminReq(ctx, w, r) + if objectAPI == nil { + return + } + // Avoid reusing tcp connection if read timeout is hit + // This is needed to make r.Context().Done() work as + // expected in case of read timeout + w.Header().Add("Connection", "close") + + doneCh := make(chan struct{}) + defer close(doneCh) + + traceCh := globalTrace.Trace(doneCh, trcAll) + for { + select { + case entry := <-traceCh: + if _, err := w.Write(entry); err != nil { + return + } + if _, err := w.Write([]byte("\n")); err != nil { + return + } + w.(http.Flusher).Flush() + case <-r.Context().Done(): + return + case <-GlobalServiceDoneCh: + return + } + } +} diff --git a/cmd/admin-router.go b/cmd/admin-router.go index 7c506172a..93468106b 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -116,6 +116,8 @@ func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool) // Top locks adminV1Router.Methods(http.MethodGet).Path("/top/locks").HandlerFunc(httpTraceHdrs(adminAPI.TopLocksHandler)) + // HTTP Trace + adminV1Router.Methods(http.MethodGet).Path("/trace").HandlerFunc(adminAPI.TraceHandler) // If none of the routes match, return error. adminV1Router.NotFoundHandler = http.HandlerFunc(httpTraceHdrs(notFoundHandlerJSON)) } diff --git a/cmd/common-main.go b/cmd/common-main.go index 5d2ae5407..9fb2fbd00 100644 --- a/cmd/common-main.go +++ b/cmd/common-main.go @@ -211,13 +211,6 @@ func handleCommonEnvVars() { globalIsBrowserEnabled = bool(browserFlag) } - traceFile := os.Getenv("MINIO_HTTP_TRACE") - if traceFile != "" { - var err error - globalHTTPTraceFile, err = os.OpenFile(traceFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0660) - logger.FatalIf(err, "error opening file %s", traceFile) - } - etcdEndpointsEnv, ok := os.LookupEnv("MINIO_ETCD_ENDPOINTS") if ok { etcdEndpoints := strings.Split(etcdEndpointsEnv, ",") diff --git a/cmd/gateway-main.go b/cmd/gateway-main.go index 0de8a7886..4937198c5 100644 --- a/cmd/gateway-main.go +++ b/cmd/gateway-main.go @@ -158,6 +158,9 @@ func StartGateway(ctx *cli.Context, gw Gateway) { registerSTSRouter(router) } + // initialize globalTrace system + globalTrace = NewTraceSys(context.Background(), globalEndpoints) + enableConfigOps := globalEtcdClient != nil && gatewayName == "nas" enableIAMOps := globalEtcdClient != nil diff --git a/cmd/globals.go b/cmd/globals.go index 7038aeedd..e45d34847 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -159,8 +159,9 @@ var ( globalHTTPServerErrorCh = make(chan error) globalOSSignalCh = make(chan os.Signal, 1) - // File to log HTTP request/response headers and body. - globalHTTPTraceFile *os.File + // global Trace system to send HTTP request/response logs to + // registered listeners + globalTrace *HTTPTraceSys globalEndpoints EndpointList diff --git a/cmd/handler-utils.go b/cmd/handler-utils.go index 202af6fcd..8bab249b0 100644 --- a/cmd/handler-utils.go +++ b/cmd/handler-utils.go @@ -30,7 +30,6 @@ import ( "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/handlers" - httptracer "github.com/minio/minio/pkg/handlers" ) // Parses location constraint from the incoming reader. @@ -326,18 +325,26 @@ func extractPostPolicyFormValues(ctx context.Context, form *multipart.Form) (fil // Log headers and body. func httpTraceAll(f http.HandlerFunc) http.HandlerFunc { - if globalHTTPTraceFile == nil { - return f + return func(w http.ResponseWriter, r *http.Request) { + if !globalTrace.HasTraceListeners() { + f.ServeHTTP(w, r) + return + } + trace := Trace(f, true, w, r) + globalTrace.Publish(trace) } - return httptracer.TraceReqHandlerFunc(f, globalHTTPTraceFile, true) } // Log only the headers. func httpTraceHdrs(f http.HandlerFunc) http.HandlerFunc { - if globalHTTPTraceFile == nil { - return f + return func(w http.ResponseWriter, r *http.Request) { + if !globalTrace.HasTraceListeners() { + f.ServeHTTP(w, r) + return + } + trace := Trace(f, false, w, r) + globalTrace.Publish(trace) } - return httptracer.TraceReqHandlerFunc(f, globalHTTPTraceFile, false) } // Returns "/bucketName/objectName" for path-style or virtual-host-style requests. diff --git a/pkg/handlers/http-tracer.go b/cmd/http-tracer.go similarity index 56% rename from pkg/handlers/http-tracer.go rename to cmd/http-tracer.go index fec1c1831..779a9440b 100644 --- a/pkg/handlers/http-tracer.go +++ b/cmd/http-tracer.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package handlers +package cmd import ( "bytes" @@ -24,8 +24,12 @@ import ( "net/http" "reflect" "runtime" + "strconv" "strings" "time" + + xnet "github.com/minio/minio/pkg/net" + trace "github.com/minio/minio/pkg/trace" ) // recordRequest - records the first recLen bytes @@ -109,92 +113,64 @@ func (r *recordResponseWriter) Flush() { r.ResponseWriter.(http.Flusher).Flush() } -// Return response headers. -func (r *recordResponseWriter) Headers() []byte { - return r.headers.Bytes() -} - // Return response body. func (r *recordResponseWriter) Body() []byte { return r.body.Bytes() } -// TraceReqHandlerFunc logs request/response headers and body. -func TraceReqHandlerFunc(f http.HandlerFunc, output io.Writer, logBody bool) http.HandlerFunc { +// Trace gets trace of http request +func Trace(f http.HandlerFunc, logBody bool, w http.ResponseWriter, r *http.Request) trace.Info { + name := runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name() name = strings.TrimPrefix(name, "github.com/minio/minio/cmd.") + name = strings.TrimSuffix(name, "Handler-fm") + bodyPlaceHolder := []byte("") + var reqBodyRecorder *recordRequest + + t := trace.Info{FuncName: name} + reqBodyRecorder = &recordRequest{Reader: r.Body, logBody: logBody} + r.Body = ioutil.NopCloser(reqBodyRecorder) - return func(w http.ResponseWriter, r *http.Request) { - const timeFormat = "2006-01-02 15:04:05 -0700" - var reqBodyRecorder *recordRequest - - // Generate short random request ID - reqID := fmt.Sprintf("%f", float64(time.Now().UnixNano())/1e10) - - reqBodyRecorder = &recordRequest{Reader: r.Body, logBody: logBody} - r.Body = ioutil.NopCloser(reqBodyRecorder) - - // Setup a http response body recorder - respBodyRecorder := &recordResponseWriter{ResponseWriter: w, logBody: logBody} - - b := bytes.NewBuffer(nil) - fmt.Fprintf(b, "[REQUEST %s] [%s] [%s]\n", name, reqID, time.Now().Format(timeFormat)) - - f(respBodyRecorder, r) - - // Build request log and write it to log file - fmt.Fprintf(b, "%s %s", r.Method, r.URL.Path) - if r.URL.RawQuery != "" { - fmt.Fprintf(b, "?%s", r.URL.RawQuery) - } - fmt.Fprintf(b, "\n") - - fmt.Fprintf(b, "Host: %s\n", r.Host) - for k, v := range r.Header { - fmt.Fprintf(b, "%s: %s\n", k, v[0]) - } - fmt.Fprintf(b, "\n") - if logBody { - bodyContents := reqBodyRecorder.Data() - if bodyContents != nil { - // If body logging is disabled then we print as a placeholder - // for the actual body. - b.Write(bodyContents) - fmt.Fprintf(b, "\n") - } - } else { - b.Write(bodyPlaceHolder) - fmt.Fprintf(b, "\n") - } - - fmt.Fprintf(b, "\n") - - // Build response log and write it to log file - fmt.Fprintf(b, "[RESPONSE] [%s] [%s]\n", reqID, time.Now().Format(timeFormat)) - - b.Write(respBodyRecorder.Headers()) - fmt.Fprintf(b, "\n") - - // recordResponseWriter{} is configured to record only - // responses with http code != 200 & != 206, we don't - // have to check for logBody value here. - bodyContents := respBodyRecorder.Body() - if bodyContents != nil { - b.Write(bodyContents) - fmt.Fprintf(b, "\n") - } else { - if !logBody { - // If there was no error response and body logging is disabled - // then we print as a placeholder for the actual body. - b.Write(bodyPlaceHolder) - fmt.Fprintf(b, "\n") - } - } - - fmt.Fprintf(b, "\n") - - // Write the contents in one shot so that logs don't get interspersed. - output.Write(b.Bytes()) + host, err := xnet.ParseHost(GetLocalPeer(globalEndpoints)) + if err == nil { + t.NodeName = host.Name + } + rq := trace.RequestInfo{Time: time.Now().UTC(), Method: r.Method, Path: r.URL.Path, RawQuery: r.URL.RawQuery} + rq.Headers = cloneHeader(r.Header) + rq.Headers.Set("Content-Length", strconv.Itoa(int(r.ContentLength))) + rq.Headers.Set("Host", r.Host) + for _, enc := range r.TransferEncoding { + rq.Headers.Add("Transfer-Encoding", enc) + } + if logBody { + // If body logging is disabled then we print as a placeholder + // for the actual body. + rq.Body = reqBodyRecorder.Data() + + } else { + rq.Body = bodyPlaceHolder + } + // Setup a http response body recorder + respBodyRecorder := &recordResponseWriter{ResponseWriter: w, logBody: logBody} + f(respBodyRecorder, r) + + rs := trace.ResponseInfo{Time: time.Now().UTC()} + rs.Headers = cloneHeader(respBodyRecorder.Header()) + rs.StatusCode = respBodyRecorder.statusCode + if rs.StatusCode == 0 { + rs.StatusCode = http.StatusOK + } + bodyContents := respBodyRecorder.Body() + if bodyContents != nil { + rs.Body = bodyContents + } + if !logBody { + // If there was no error response and body logging is disabled + // then we print as a placeholder for the actual body. + rs.Body = bodyPlaceHolder } + t.ReqInfo = rq + t.RespInfo = rs + return t } diff --git a/cmd/httptrace.go b/cmd/httptrace.go new file mode 100644 index 000000000..88eb8f5d0 --- /dev/null +++ b/cmd/httptrace.go @@ -0,0 +1,115 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "bytes" + "context" + "encoding/json" + "strings" + "sync" + + "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/pubsub" + "github.com/minio/minio/pkg/trace" +) + +//HTTPTraceSys holds global trace state +type HTTPTraceSys struct { + peers []*peerRESTClient + pubsub *pubsub.PubSub +} + +// NewTraceSys - creates new HTTPTraceSys with all nodes subscribed to +// the trace pub sub system +func NewTraceSys(ctx context.Context, endpoints EndpointList) *HTTPTraceSys { + remoteHosts := getRemoteHosts(endpoints) + remoteClients, err := getRestClients(remoteHosts) + if err != nil { + logger.FatalIf(err, "Unable to start httptrace sub system") + } + + ps := pubsub.New() + return &HTTPTraceSys{ + remoteClients, ps, + } +} + +// HasTraceListeners returns true if trace listeners are registered +// for this node or peers +func (sys *HTTPTraceSys) HasTraceListeners() bool { + return sys != nil && sys.pubsub.HasSubscribers() +} + +// Publish - publishes trace message to the http trace pubsub system +func (sys *HTTPTraceSys) Publish(traceMsg trace.Info) { + sys.pubsub.Publish(traceMsg) +} + +// Trace writes http trace to writer +func (sys *HTTPTraceSys) Trace(doneCh chan struct{}, trcAll bool) chan []byte { + traceCh := make(chan []byte) + go func() { + defer close(traceCh) + + var wg = &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + + buf := &bytes.Buffer{} + ch := sys.pubsub.Subscribe() + defer sys.pubsub.Unsubscribe(ch) + for { + select { + case entry := <-ch: + trcInfo := entry.(trace.Info) + path := strings.TrimPrefix(trcInfo.ReqInfo.Path, "/") + // omit inter-node traffic if trcAll is false + if !trcAll && strings.HasPrefix(path, minioReservedBucket) { + continue + } + buf.Reset() + enc := json.NewEncoder(buf) + enc.SetEscapeHTML(false) + if err := enc.Encode(trcInfo); err != nil { + continue + } + traceCh <- buf.Bytes() + case <-doneCh: + return + } + } + }() + + for _, peer := range sys.peers { + wg.Add(1) + go func(peer *peerRESTClient) { + defer wg.Done() + ch, err := peer.Trace(doneCh, trcAll) + if err != nil { + return + } + for entry := range ch { + traceCh <- entry + } + }(peer) + } + wg.Wait() + }() + return traceCh +} diff --git a/cmd/peer-rest-client-target.go b/cmd/peer-rest-client-target.go index 27b0e01f6..0dc9209d2 100644 --- a/cmd/peer-rest-client-target.go +++ b/cmd/peer-rest-client-target.go @@ -1,5 +1,5 @@ /* - * MinIO Cloud Storage, (C) 2018 MinIO, Inc. + * MinIO Cloud Storage, (C) 2018, 2019 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index f02bdffb2..f39ef7add 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -17,6 +17,7 @@ package cmd import ( + "bufio" "bytes" "context" "crypto/tls" @@ -24,6 +25,7 @@ import ( "io" "net/url" "strconv" + "time" "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" @@ -52,9 +54,16 @@ func (client *peerRESTClient) reConnect() error { // permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() // after verifying format.json func (client *peerRESTClient) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { + return client.callWithContext(context.Background(), method, values, body, length) +} + +// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected +// permanently. The only way to restore the connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() +// after verifying format.json +func (client *peerRESTClient) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { if !client.connected { err := client.reConnect() - logger.LogIf(context.Background(), err) + logger.LogIf(ctx, err) if err != nil { return nil, err } @@ -64,7 +73,7 @@ func (client *peerRESTClient) call(method string, values url.Values, body io.Rea values = make(url.Values) } - respBody, err = client.restClient.Call(method, values, body, length) + respBody, err = client.restClient.CallWithContext(ctx, method, values, body, length) if err == nil { return respBody, nil } @@ -413,6 +422,54 @@ func (client *peerRESTClient) SignalService(sig serviceSignal) error { return nil } +// Trace - send http trace request to peer nodes +func (client *peerRESTClient) Trace(doneCh chan struct{}, trcAll bool) (chan []byte, error) { + ch := make(chan []byte) + go func() { + cleanupFn := func(cancel context.CancelFunc, ch chan []byte, respBody io.ReadCloser) { + close(ch) + if cancel != nil { + cancel() + } + http.DrainBody(respBody) + } + for { + values := make(url.Values) + values.Set(peerRESTTraceAll, strconv.FormatBool(trcAll)) + // get cancellation context to properly unsubscribe peers + ctx, cancel := context.WithCancel(context.Background()) + respBody, err := client.callWithContext(ctx, peerRESTMethodTrace, values, nil, -1) + if err != nil { + //retry + time.Sleep(5 * time.Second) + select { + case <-doneCh: + cleanupFn(cancel, ch, respBody) + return + default: + } + continue + } + bio := bufio.NewScanner(respBody) + go func() { + <-doneCh + cancel() + }() + // Unmarshal each line, returns marshaled values. + for bio.Scan() { + ch <- bio.Bytes() + } + select { + case <-doneCh: + cleanupFn(cancel, ch, respBody) + return + default: + } + } + }() + return ch, nil +} + func getRemoteHosts(endpoints EndpointList) []*xnet.Host { var remoteHosts []*xnet.Host for _, hostStr := range GetRemotePeers(endpoints) { diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index 2556a38ff..1547af5b9 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -41,6 +41,7 @@ const ( peerRESTMethodReloadFormat = "reloadformat" peerRESTMethodTargetExists = "targetexists" peerRESTMethodSendEvent = "sendevent" + peerRESTMethodTrace = "trace" ) const ( @@ -51,4 +52,5 @@ const ( peerRESTSignal = "signal" peerRESTProfiler = "profiler" peerRESTDryRun = "dry-run" + peerRESTTraceAll = "all" ) diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index a7112671d..ca6fcae21 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -19,6 +19,7 @@ package cmd import ( "context" "encoding/gob" + "encoding/json" "errors" "fmt" "net/http" @@ -32,6 +33,7 @@ import ( "github.com/minio/minio/pkg/event" xnet "github.com/minio/minio/pkg/net" "github.com/minio/minio/pkg/policy" + trace "github.com/minio/minio/pkg/trace" ) // To abstract a node over network. @@ -666,6 +668,47 @@ func (s *peerRESTServer) SignalServiceHandler(w http.ResponseWriter, r *http.Req } } +// TraceHandler sends http trace messages back to peer rest client +func (s *peerRESTServer) TraceHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + trcAll := r.URL.Query().Get(peerRESTTraceAll) == "true" + + w.Header().Set("Connection", "close") + w.WriteHeader(http.StatusOK) + w.(http.Flusher).Flush() + ch := globalTrace.pubsub.Subscribe() + defer globalTrace.pubsub.Unsubscribe(ch) + + enc := json.NewEncoder(w) + enc.SetEscapeHTML(false) + for { + select { + case entry := <-ch: + trcInfo := entry.(trace.Info) + path := strings.TrimPrefix(trcInfo.ReqInfo.Path, "/") + // omit inter-node traffic if trcAll is false + if !trcAll && strings.HasPrefix(path, minioReservedBucket) { + continue + } + + if err := enc.Encode(trcInfo); err != nil { + return + } + + if _, err := w.Write([]byte("\n")); err != nil { + return + } + w.(http.Flusher).Flush() + case <-r.Context().Done(): + return + + } + } +} + func (s *peerRESTServer) writeErrorResponse(w http.ResponseWriter, err error) { w.WriteHeader(http.StatusForbidden) w.Write([]byte(err.Error())) @@ -711,5 +754,7 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodReloadFormat).HandlerFunc(httpTraceHdrs(server.ReloadFormatHandler)).Queries(restQueries(peerRESTDryRun)...) + subrouter.Methods(http.MethodPost).Path("/" + peerRESTMethodTrace).HandlerFunc(server.TraceHandler) + router.NotFoundHandler = http.HandlerFunc(httpTraceAll(notFoundHandler)) } diff --git a/cmd/rest/client.go b/cmd/rest/client.go index 817cd2d93..fe435e0fe 100644 --- a/cmd/rest/client.go +++ b/cmd/rest/client.go @@ -52,13 +52,13 @@ type Client struct { newAuthToken func() string } -// Call - make a REST call. -func (c *Client) Call(method string, values url.Values, body io.Reader, length int64) (reply io.ReadCloser, err error) { +// CallWithContext - make a REST call with context. +func (c *Client) CallWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (reply io.ReadCloser, err error) { req, err := http.NewRequest(http.MethodPost, c.url.String()+"/"+method+"?"+values.Encode(), body) if err != nil { return nil, &NetworkError{err} } - + req = req.WithContext(ctx) req.Header.Set("Authorization", "Bearer "+c.newAuthToken()) req.Header.Set("X-Minio-Time", time.Now().UTC().Format(time.RFC3339)) if length > 0 { @@ -84,6 +84,12 @@ func (c *Client) Call(method string, values url.Values, body io.Reader, length i return resp.Body, nil } +// Call - make a REST call. +func (c *Client) Call(method string, values url.Values, body io.Reader, length int64) (reply io.ReadCloser, err error) { + ctx := context.Background() + return c.CallWithContext(ctx, method, values, body, length) +} + // Close closes all idle connections of the underlying http client func (c *Client) Close() { if c.httpIdleConnsCloser != nil { diff --git a/cmd/server-main.go b/cmd/server-main.go index 2b2a5da95..af7b6775f 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -290,6 +290,9 @@ func serverMain(ctx *cli.Context) { // Init global heal state initAllHealState(globalIsXL) + // initialize globalTrace system + globalTrace = NewTraceSys(context.Background(), globalEndpoints) + // Configure server. var handler http.Handler handler, err = configureServerHandler(globalEndpoints) diff --git a/cmd/signals.go b/cmd/signals.go index abe5ce31a..c8475537d 100644 --- a/cmd/signals.go +++ b/cmd/signals.go @@ -74,7 +74,6 @@ func handleSignals() { exit(err == nil && oerr == nil) case osSignal := <-globalOSSignalCh: - stopHTTPTrace() logger.Info("Exiting on signal: %s", strings.ToUpper(osSignal.String())) exit(stopProcess()) case signal := <-globalServiceSignalCh: @@ -83,14 +82,12 @@ func handleSignals() { // Ignore this at the moment. case serviceRestart: logger.Info("Restarting on service signal") - stopHTTPTrace() stop := stopProcess() rerr := restartProcess() logger.LogIf(context.Background(), rerr) exit(stop && rerr == nil) case serviceStop: logger.Info("Stopping on service signal") - stopHTTPTrace() exit(stopProcess()) } } diff --git a/cmd/utils.go b/cmd/utils.go index 65c56beb8..d161ec1a7 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -59,16 +59,6 @@ func IsErr(err error, errs ...error) bool { return false } -// Close Http tracing file. -func stopHTTPTrace() { - if globalHTTPTraceFile != nil { - reqInfo := (&logger.ReqInfo{}).AppendTags("traceFile", globalHTTPTraceFile.Name()) - ctx := logger.SetReqInfo(context.Background(), reqInfo) - logger.LogIf(ctx, globalHTTPTraceFile.Close()) - globalHTTPTraceFile = nil - } -} - // make a copy of http.Header func cloneHeader(h http.Header) http.Header { h2 := make(http.Header, len(h)) diff --git a/cmd/utils_test.go b/cmd/utils_test.go index 9513b1bb1..1cd8bbe1d 100644 --- a/cmd/utils_test.go +++ b/cmd/utils_test.go @@ -54,19 +54,6 @@ func TestCloneHeader(t *testing.T) { } } -// Tests closing http tracing file. -func TestStopHTTPTrace(t *testing.T) { - var err error - globalHTTPTraceFile, err = ioutil.TempFile("", "") - if err != nil { - defer os.Remove(globalHTTPTraceFile.Name()) - stopHTTPTrace() - if globalHTTPTraceFile != nil { - t.Errorf("globalHTTPTraceFile is not nil, it is expected to be nil") - } - } -} - // Tests maximum object size. func TestMaxObjectSize(t *testing.T) { sizes := []struct { diff --git a/pkg/handlers/http-tracer_test.go b/pkg/handlers/http-tracer_test.go deleted file mode 100644 index aba05f583..000000000 --- a/pkg/handlers/http-tracer_test.go +++ /dev/null @@ -1,154 +0,0 @@ -/* - * MinIO Cloud Storage, (C) 2017 MinIO, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package handlers - -import ( - "bytes" - "io" - "io/ioutil" - "net/http" - "net/http/httptest" - "regexp" - "testing" -) - -func wsTestSuccessHandler(w http.ResponseWriter, r *http.Request) { - // A very simple health check. - w.WriteHeader(http.StatusOK) - w.Header().Set("Content-Type", "application/json") - - ioutil.ReadAll(r.Body) - - // In the future we could report back on the status of our DB, or our cache - // (e.g. Redis) by performing a simple PING, and include them in the response. - io.WriteString(w, `{"success": true}`) -} - -func wsTest404Handler(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusNotFound) -} - -func TestTraceHTTPHandler(t *testing.T) { - - logOutput := bytes.NewBuffer([]byte("")) - - testCases := []struct { - method string - path string - sentData string - headers map[string]string - handler http.HandlerFunc - expectedStatus int - expectedLogRegexp string - }{ - - { - method: "PUT", - path: "/test-log", - sentData: "sending data", - headers: map[string]string{"Test-Header": "TestHeaderValue"}, - handler: TraceReqHandlerFunc(http.HandlerFunc(wsTestSuccessHandler), logOutput, true), - expectedStatus: http.StatusOK, - expectedLogRegexp: `\[REQUEST github.com/minio/minio/pkg/handlers.wsTestSuccessHandler\] \[[^\]]*\] \[[^\]]*\] -PUT /test-log -Host:\ -Test-Header: TestHeaderValue - -sending data - -\[RESPONSE\] \[[^\]]*\] \[[^\]]*\] -200 OK - -{"success": true} - -`, - }, - { - method: "POST", - path: "/test-log", - handler: TraceReqHandlerFunc(http.HandlerFunc(wsTestSuccessHandler), logOutput, false), - headers: map[string]string{"Test-Header": "TestHeaderValue"}, - expectedStatus: http.StatusOK, - expectedLogRegexp: `\[REQUEST github.com/minio/minio/pkg/handlers.wsTestSuccessHandler\] \[[^\]]*\] \[[^\]]*\] -POST /test-log -Host:\ -Test-Header: TestHeaderValue - - - -\[RESPONSE\] \[[^\]]*\] \[[^\]]*\] -200 OK - - - -`, - }, - { - method: "POST", - path: "/test-log", - handler: TraceReqHandlerFunc(http.HandlerFunc(wsTest404Handler), logOutput, false), - headers: map[string]string{"Test-Header": "TestHeaderValue"}, - expectedStatus: http.StatusNotFound, - expectedLogRegexp: `\[REQUEST github.com/minio/minio/pkg/handlers.wsTest404Handler\] \[[^\]]*\] \[[^\]]*\] -POST /test-log -Host:\ -Test-Header: TestHeaderValue - - - -\[RESPONSE\] \[[^\]]*\] \[[^\]]*\] -404 Not Found - - - -`, - }, - } - - for i, testCase := range testCases { - logOutput.Reset() - - req, err := http.NewRequest(testCase.method, testCase.path, bytes.NewBuffer([]byte(testCase.sentData))) - if err != nil { - t.Fatalf("Test %d: %v\n", i+1, err) - } - - for k, v := range testCase.headers { - req.Header.Set(k, v) - } - - rr := httptest.NewRecorder() - - handler := testCase.handler - handler.ServeHTTP(rr, req) - - // Check the status code is what we expect. - if status := rr.Code; status != testCase.expectedStatus { - t.Errorf("Test %d: handler returned wrong status code: got %v want %v", i+1, - status, testCase.expectedStatus) - } - - matched, err := regexp.MatchString(testCase.expectedLogRegexp, logOutput.String()) - if err != nil { - t.Fatalf("Test %d: Incorrect regexp: %v", i+1, err) - } - - if !matched { - t.Fatalf("Test %d: Unexpected log content, found: `%s`", i+1, logOutput.String()) - } - } -} diff --git a/pkg/madmin/README.md b/pkg/madmin/README.md index ae15c785f..72595b760 100644 --- a/pkg/madmin/README.md +++ b/pkg/madmin/README.md @@ -45,7 +45,7 @@ func main() { |:------------------------------------------|:--------------------------------------------|:-------------------|:----------------------------------|:------------------------|:--------------------------------------|:--------------------------------------------------| | [`ServiceStatus`](#ServiceStatus) | [`ServerInfo`](#ServerInfo) | [`Heal`](#Heal) | [`GetConfig`](#GetConfig) | [`TopLocks`](#TopLocks) | [`AddUser`](#AddUser) | | | [`ServiceSendAction`](#ServiceSendAction) | [`ServerCPULoadInfo`](#ServerCPULoadInfo) | | [`SetConfig`](#SetConfig) | | [`SetUserPolicy`](#SetUserPolicy) | [`StartProfiling`](#StartProfiling) | -| | [`ServerMemUsageInfo`](#ServerMemUsageInfo) | | [`GetConfigKeys`](#GetConfigKeys) | | [`ListUsers`](#ListUsers) | [`DownloadProfilingData`](#DownloadProfilingData) | +| [`Trace`](#Trace) | [`ServerMemUsageInfo`](#ServerMemUsageInfo) | | [`GetConfigKeys`](#GetConfigKeys) | | [`ListUsers`](#ListUsers) | [`DownloadProfilingData`](#DownloadProfilingData) | | | | | [`SetConfigKeys`](#SetConfigKeys) | | [`AddCannedPolicy`](#AddCannedPolicy) | | @@ -537,3 +537,22 @@ __Example__ log.Println("Profiling data successfully downloaded.") ``` + + +### Trace(allTrace bool,doneCh <-chan struct{}) <-chan TraceInfo +Enable HTTP request tracing on all nodes in a MinIO cluster + +__Example__ + +``` go + doneCh := make(chan struct{}) + defer close(doneCh) + // listen to all trace including internal API calls + allTrace := true + // Start listening on all trace activity. + traceCh := madmClnt.Trace(allTrace,doneCh) + for traceInfo := range traceCh { + fmt.Println(traceInfo.String()) + } + log.Println("Success") +``` \ No newline at end of file diff --git a/pkg/madmin/api-trace.go b/pkg/madmin/api-trace.go new file mode 100644 index 000000000..3e2c6858e --- /dev/null +++ b/pkg/madmin/api-trace.go @@ -0,0 +1,92 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package madmin + +import ( + "bufio" + "encoding/json" + "io" + "net/http" + "net/url" + "strconv" + + trace "github.com/minio/minio/pkg/trace" +) + +// TraceInfo holds http trace +type TraceInfo struct { + Trace trace.Info + Err error `json:"-"` +} + +// Trace - listen on http trace notifications. +func (adm AdminClient) Trace(allTrace bool, doneCh <-chan struct{}) <-chan TraceInfo { + traceInfoCh := make(chan TraceInfo, 1) + // Only success, start a routine to start reading line by line. + go func(traceInfoCh chan<- TraceInfo) { + defer close(traceInfoCh) + for { + urlValues := make(url.Values) + urlValues.Set("all", strconv.FormatBool(allTrace)) + reqData := requestData{ + relPath: "/v1/trace", + queryValues: urlValues, + } + // Execute GET to call trace handler + resp, err := adm.executeMethod("GET", reqData) + if err != nil { + closeResponse(resp) + return + } + + if resp.StatusCode != http.StatusOK { + traceInfoCh <- TraceInfo{Err: httpRespToErrorResponse(resp)} + return + } + + // Initialize a new bufio scanner, to read line by line. + bio := bufio.NewScanner(resp.Body) + + // Close the response body. + defer resp.Body.Close() + + // Unmarshal each line, returns marshaled values. + for bio.Scan() { + var traceRec trace.Info + if err = json.Unmarshal(bio.Bytes(), &traceRec); err != nil { + continue + } + select { + case <-doneCh: + return + case traceInfoCh <- TraceInfo{Trace: traceRec}: + } + } + // Look for any underlying errors. + if err = bio.Err(); err != nil { + // For an unexpected connection drop from server, we close the body + // and re-connect. + if err == io.ErrUnexpectedEOF { + resp.Body.Close() + } + } + } + }(traceInfoCh) + + // Returns the trace info channel, for caller to start reading from. + return traceInfoCh +} diff --git a/pkg/madmin/examples/trace.go b/pkg/madmin/examples/trace.go new file mode 100644 index 000000000..7566562ac --- /dev/null +++ b/pkg/madmin/examples/trace.go @@ -0,0 +1,50 @@ +// +build ignore + +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package main + +import ( + "log" + + "github.com/minio/minio/pkg/madmin" +) + +func main() { + // Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY are + // dummy values, please replace them with original values. + + // API requests are secure (HTTPS) if secure=true and insecure (HTTPS) otherwise. + // New returns an MinIO Admin client object. + madmClnt, err := madmin.New("your-minio.example.com:9000", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true) + if err != nil { + log.Fatalln(err) + } + doneCh := make(chan struct{}) + defer close(doneCh) + + // Start listening on all http trace activity from all servers + // in the minio cluster. + traceCh := madmClnt.Trace(false, doneCh) + for traceInfo := range traceCh { + if traceInfo.Err != nil { + fmt.Println(traceInfo.Err) + } + fmt.Println(traceInfo) + } +} diff --git a/pkg/pubsub/pubsub.go b/pkg/pubsub/pubsub.go new file mode 100644 index 000000000..308e548e0 --- /dev/null +++ b/pkg/pubsub/pubsub.go @@ -0,0 +1,83 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub + +import ( + "sync" +) + +// PubSub holds publishers and subscribers +type PubSub struct { + subs []chan interface{} + pub chan interface{} + mutex sync.Mutex +} + +// process item to subscribers. +func (ps *PubSub) process() { + for item := range ps.pub { + ps.mutex.Lock() + for _, sub := range ps.subs { + go func(s chan interface{}) { + s <- item + }(sub) + } + ps.mutex.Unlock() + } +} + +// Publish message to pubsub system +func (ps *PubSub) Publish(item interface{}) { + ps.pub <- item +} + +// Subscribe - Adds a subscriber to pubsub system +func (ps *PubSub) Subscribe() chan interface{} { + ps.mutex.Lock() + defer ps.mutex.Unlock() + ch := make(chan interface{}) + ps.subs = append(ps.subs, ch) + return ch +} + +// Unsubscribe removes current subscriber +func (ps *PubSub) Unsubscribe(ch chan interface{}) { + ps.mutex.Lock() + defer ps.mutex.Unlock() + + for i, sub := range ps.subs { + if sub == ch { + close(ch) + ps.subs = append(ps.subs[:i], ps.subs[i+1:]...) + } + } +} + +// HasSubscribers returns true if pubsub system has subscribers +func (ps *PubSub) HasSubscribers() bool { + ps.mutex.Lock() + defer ps.mutex.Unlock() + return len(ps.subs) > 0 +} + +// New inits a PubSub system +func New() *PubSub { + ps := &PubSub{} + ps.pub = make(chan interface{}) + go ps.process() + return ps +} diff --git a/pkg/pubsub/pubsub_test.go b/pkg/pubsub/pubsub_test.go new file mode 100644 index 000000000..cd4119757 --- /dev/null +++ b/pkg/pubsub/pubsub_test.go @@ -0,0 +1,66 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub + +import ( + "fmt" + "testing" +) + +func TestSubscribe(t *testing.T) { + ps := New() + ps.Subscribe() + ps.Subscribe() + if len(ps.subs) != 2 { + t.Errorf("expected 2 subscribers") + } +} + +func TestUnsubscribe(t *testing.T) { + ps := New() + c1 := ps.Subscribe() + ps.Subscribe() + ps.Unsubscribe(c1) + if len(ps.subs) != 1 { + t.Errorf("expected 1 subscriber") + } +} + +func TestPubSub(t *testing.T) { + ps := New() + c1 := ps.Subscribe() + val := "hello" + ps.Publish(val) + msg := <-c1 + if msg != "hello" { + t.Errorf(fmt.Sprintf("expected %s , found %s", val, msg)) + } +} + +func TestMultiPubSub(t *testing.T) { + ps := New() + c1 := ps.Subscribe() + c2 := ps.Subscribe() + val := "hello" + ps.Publish(val) + + msg1 := <-c1 + msg2 := <-c2 + if msg1 != "hello" && msg2 != "hello" { + t.Errorf(fmt.Sprintf("expected both subscribers to have%s , found %s and %s", val, msg1, msg2)) + } +} diff --git a/pkg/trace/trace.go b/pkg/trace/trace.go new file mode 100644 index 000000000..6ebd6525c --- /dev/null +++ b/pkg/trace/trace.go @@ -0,0 +1,49 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package trace + +import ( + "net/http" + "time" +) + +// Info - represents a trace record, additionally +// also reports errors if any while listening on trace. +type Info struct { + NodeName string `json:"nodename"` + FuncName string `json:"funcname"` + ReqInfo RequestInfo `json:"request"` + RespInfo ResponseInfo `json:"response"` +} + +// RequestInfo represents trace of http request +type RequestInfo struct { + Time time.Time `json:"time"` + Method string `json:"method"` + Path string `json:"path,omitempty"` + RawQuery string `json:"rawquery,omitempty"` + Headers http.Header `json:"headers,omitempty"` + Body []byte `json:"body,omitempty"` +} + +// ResponseInfo represents trace of http request +type ResponseInfo struct { + Time time.Time `json:"time"` + Headers http.Header `json:"headers,omitempty"` + Body []byte `json:"body,omitempty"` + StatusCode int `json:"statuscode,omitempty"` +}