diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 543d482ff..db3eeb778 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -43,6 +43,7 @@ import ( "github.com/minio/minio/pkg/mem" xnet "github.com/minio/minio/pkg/net" "github.com/minio/minio/pkg/quick" + trace "github.com/minio/minio/pkg/trace" ) const ( @@ -1483,11 +1484,6 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) { return } - if globalTrace == nil { - writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) - return - } - // Avoid reusing tcp connection if read timeout is hit // This is needed to make r.Context().Done() work as // expected in case of read timeout @@ -1496,14 +1492,33 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) { doneCh := make(chan struct{}) defer close(doneCh) - traceCh := globalTrace.Trace(doneCh, trcAll) + // Trace Publisher and peer-trace-client uses nonblocking send and hence does not wait for slow receivers. + // Use buffered channel to take care of burst sends or slow w.Write() + traceCh := make(chan interface{}, 4000) + + filter := func(entry interface{}) bool { + if trcAll { + return true + } + trcInfo := entry.(trace.Info) + return !strings.HasPrefix(trcInfo.ReqInfo.Path, minioReservedBucketPath) + } + remoteHosts := getRemoteHosts(globalEndpoints) + peers, err := getRestClients(remoteHosts) + if err != nil { + return + } + globalHTTPTrace.Subscribe(traceCh, doneCh, filter) + + for _, peer := range peers { + peer.Trace(traceCh, doneCh, trcAll) + } + + enc := json.NewEncoder(w) for { select { case entry := <-traceCh: - if _, err := w.Write(entry); err != nil { - return - } - if _, err := w.Write([]byte("\n")); err != nil { + if err := enc.Encode(entry); err != nil { return } w.(http.Flusher).Flush() diff --git a/cmd/gateway-main.go b/cmd/gateway-main.go index 588d43580..d6072eff0 100644 --- a/cmd/gateway-main.go +++ b/cmd/gateway-main.go @@ -158,9 +158,6 @@ func StartGateway(ctx *cli.Context, gw Gateway) { registerSTSRouter(router) } - // initialize globalTrace system - globalTrace = NewTraceSys(context.Background(), globalEndpoints) - enableConfigOps := globalEtcdClient != nil && gatewayName == "nas" enableIAMOps := globalEtcdClient != nil diff --git a/cmd/globals.go b/cmd/globals.go index f186d5820..d4356489e 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -35,6 +35,7 @@ import ( "github.com/minio/minio/pkg/dns" iampolicy "github.com/minio/minio/pkg/iam/policy" "github.com/minio/minio/pkg/iam/validator" + "github.com/minio/minio/pkg/pubsub" ) // minio configuration related constants. @@ -161,7 +162,7 @@ var ( // global Trace system to send HTTP request/response logs to // registered listeners - globalTrace *HTTPTraceSys + globalHTTPTrace = pubsub.New() globalEndpoints EndpointList diff --git a/cmd/handler-utils.go b/cmd/handler-utils.go index 8bab249b0..bcc913296 100644 --- a/cmd/handler-utils.go +++ b/cmd/handler-utils.go @@ -326,24 +326,24 @@ func extractPostPolicyFormValues(ctx context.Context, form *multipart.Form) (fil // Log headers and body. func httpTraceAll(f http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - if !globalTrace.HasTraceListeners() { + if !globalHTTPTrace.HasSubscribers() { f.ServeHTTP(w, r) return } trace := Trace(f, true, w, r) - globalTrace.Publish(trace) + globalHTTPTrace.Publish(trace) } } // Log only the headers. func httpTraceHdrs(f http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - if !globalTrace.HasTraceListeners() { + if !globalHTTPTrace.HasSubscribers() { f.ServeHTTP(w, r) return } trace := Trace(f, false, w, r) - globalTrace.Publish(trace) + globalHTTPTrace.Publish(trace) } } diff --git a/cmd/httptrace.go b/cmd/httptrace.go deleted file mode 100644 index 88eb8f5d0..000000000 --- a/cmd/httptrace.go +++ /dev/null @@ -1,115 +0,0 @@ -/* - * MinIO Cloud Storage, (C) 2019 MinIO, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import ( - "bytes" - "context" - "encoding/json" - "strings" - "sync" - - "github.com/minio/minio/cmd/logger" - "github.com/minio/minio/pkg/pubsub" - "github.com/minio/minio/pkg/trace" -) - -//HTTPTraceSys holds global trace state -type HTTPTraceSys struct { - peers []*peerRESTClient - pubsub *pubsub.PubSub -} - -// NewTraceSys - creates new HTTPTraceSys with all nodes subscribed to -// the trace pub sub system -func NewTraceSys(ctx context.Context, endpoints EndpointList) *HTTPTraceSys { - remoteHosts := getRemoteHosts(endpoints) - remoteClients, err := getRestClients(remoteHosts) - if err != nil { - logger.FatalIf(err, "Unable to start httptrace sub system") - } - - ps := pubsub.New() - return &HTTPTraceSys{ - remoteClients, ps, - } -} - -// HasTraceListeners returns true if trace listeners are registered -// for this node or peers -func (sys *HTTPTraceSys) HasTraceListeners() bool { - return sys != nil && sys.pubsub.HasSubscribers() -} - -// Publish - publishes trace message to the http trace pubsub system -func (sys *HTTPTraceSys) Publish(traceMsg trace.Info) { - sys.pubsub.Publish(traceMsg) -} - -// Trace writes http trace to writer -func (sys *HTTPTraceSys) Trace(doneCh chan struct{}, trcAll bool) chan []byte { - traceCh := make(chan []byte) - go func() { - defer close(traceCh) - - var wg = &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - - buf := &bytes.Buffer{} - ch := sys.pubsub.Subscribe() - defer sys.pubsub.Unsubscribe(ch) - for { - select { - case entry := <-ch: - trcInfo := entry.(trace.Info) - path := strings.TrimPrefix(trcInfo.ReqInfo.Path, "/") - // omit inter-node traffic if trcAll is false - if !trcAll && strings.HasPrefix(path, minioReservedBucket) { - continue - } - buf.Reset() - enc := json.NewEncoder(buf) - enc.SetEscapeHTML(false) - if err := enc.Encode(trcInfo); err != nil { - continue - } - traceCh <- buf.Bytes() - case <-doneCh: - return - } - } - }() - - for _, peer := range sys.peers { - wg.Add(1) - go func(peer *peerRESTClient) { - defer wg.Done() - ch, err := peer.Trace(doneCh, trcAll) - if err != nil { - return - } - for entry := range ch { - traceCh <- entry - } - }(peer) - } - wg.Wait() - }() - return traceCh -} diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index d62fa08a1..98ab0d4e4 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -17,7 +17,6 @@ package cmd import ( - "bufio" "bytes" "context" "crypto/tls" @@ -34,6 +33,7 @@ import ( "github.com/minio/minio/pkg/madmin" xnet "github.com/minio/minio/pkg/net" "github.com/minio/minio/pkg/policy" + trace "github.com/minio/minio/pkg/trace" ) // client to talk to peer Nodes. @@ -435,52 +435,59 @@ func (client *peerRESTClient) BackgroundHealStatus() (madmin.BgHealState, error) return state, err } -// Trace - send http trace request to peer nodes -func (client *peerRESTClient) Trace(doneCh chan struct{}, trcAll bool) (chan []byte, error) { - ch := make(chan []byte) +func (client *peerRESTClient) doTrace(traceCh chan interface{}, doneCh chan struct{}, trcAll bool) { + values := make(url.Values) + values.Set(peerRESTTraceAll, strconv.FormatBool(trcAll)) + + // To cancel the REST request in case doneCh gets closed. + ctx, cancel := context.WithCancel(context.Background()) + + cancelCh := make(chan struct{}) + defer close(cancelCh) go func() { - cleanupFn := func(cancel context.CancelFunc, ch chan []byte, respBody io.ReadCloser) { - close(ch) - if cancel != nil { - cancel() - } - http.DrainBody(respBody) + select { + case <-doneCh: + case <-cancelCh: + // There was an error in the REST request. + } + cancel() + }() + + respBody, err := client.callWithContext(ctx, peerRESTMethodTrace, values, nil, -1) + defer http.DrainBody(respBody) + + if err != nil { + return + } + + dec := gob.NewDecoder(respBody) + for { + var info trace.Info + if err = dec.Decode(&info); err != nil { + return + } + select { + case traceCh <- info: + default: + // Do not block on slow receivers. } + } +} + +// Trace - send http trace request to peer nodes +func (client *peerRESTClient) Trace(traceCh chan interface{}, doneCh chan struct{}, trcAll bool) { + go func() { for { - values := make(url.Values) - values.Set(peerRESTTraceAll, strconv.FormatBool(trcAll)) - // get cancellation context to properly unsubscribe peers - ctx, cancel := context.WithCancel(context.Background()) - respBody, err := client.callWithContext(ctx, peerRESTMethodTrace, values, nil, -1) - if err != nil { - //retry - time.Sleep(5 * time.Second) - select { - case <-doneCh: - cleanupFn(cancel, ch, respBody) - return - default: - } - continue - } - bio := bufio.NewScanner(respBody) - go func() { - <-doneCh - cancel() - }() - // Unmarshal each line, returns marshaled values. - for bio.Scan() { - ch <- bio.Bytes() - } + client.doTrace(traceCh, doneCh, trcAll) select { case <-doneCh: - cleanupFn(cancel, ch, respBody) return default: + // There was error in the REST request, retry after sometime as probably the peer is down. + time.Sleep(5 * time.Second) } } }() - return ch, nil } func getRemoteHosts(endpoints EndpointList) []*xnet.Host { diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index a84be0298..3dbe945d2 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -19,7 +19,6 @@ package cmd import ( "context" "encoding/gob" - "encoding/json" "errors" "fmt" "net/http" @@ -679,32 +678,33 @@ func (s *peerRESTServer) TraceHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Connection", "close") w.WriteHeader(http.StatusOK) w.(http.Flusher).Flush() - ch := globalTrace.pubsub.Subscribe() - defer globalTrace.pubsub.Unsubscribe(ch) - enc := json.NewEncoder(w) - enc.SetEscapeHTML(false) + filter := func(entry interface{}) bool { + if trcAll { + return true + } + trcInfo := entry.(trace.Info) + return !strings.HasPrefix(trcInfo.ReqInfo.Path, minioReservedBucketPath) + } + + doneCh := make(chan struct{}) + defer close(doneCh) + + // Trace Publisher uses nonblocking publish and hence does not wait for slow subscribers. + // Use buffered channel to take care of burst sends or slow w.Write() + ch := make(chan interface{}, 2000) + globalHTTPTrace.Subscribe(ch, doneCh, filter) + + enc := gob.NewEncoder(w) for { select { case entry := <-ch: - trcInfo := entry.(trace.Info) - path := strings.TrimPrefix(trcInfo.ReqInfo.Path, "/") - // omit inter-node traffic if trcAll is false - if !trcAll && strings.HasPrefix(path, minioReservedBucket) { - continue - } - - if err := enc.Encode(trcInfo); err != nil { - return - } - - if _, err := w.Write([]byte("\n")); err != nil { + if err := enc.Encode(entry); err != nil { return } w.(http.Flusher).Flush() case <-r.Context().Done(): return - } } } diff --git a/cmd/server-main.go b/cmd/server-main.go index d963ab012..76b5eec92 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -294,9 +294,6 @@ func serverMain(ctx *cli.Context) { globalSweepHealState = initHealState() } - // initialize globalTrace system - globalTrace = NewTraceSys(context.Background(), globalEndpoints) - // Configure server. var handler http.Handler handler, err = configureServerHandler(globalEndpoints) diff --git a/pkg/madmin/api-trace.go b/pkg/madmin/api-trace.go index 3e2c6858e..fd193ade9 100644 --- a/pkg/madmin/api-trace.go +++ b/pkg/madmin/api-trace.go @@ -17,9 +17,7 @@ package madmin import ( - "bufio" "encoding/json" - "io" "net/http" "net/url" "strconv" @@ -35,7 +33,7 @@ type TraceInfo struct { // Trace - listen on http trace notifications. func (adm AdminClient) Trace(allTrace bool, doneCh <-chan struct{}) <-chan TraceInfo { - traceInfoCh := make(chan TraceInfo, 1) + traceInfoCh := make(chan TraceInfo) // Only success, start a routine to start reading line by line. go func(traceInfoCh chan<- TraceInfo) { defer close(traceInfoCh) @@ -58,30 +56,16 @@ func (adm AdminClient) Trace(allTrace bool, doneCh <-chan struct{}) <-chan Trace return } - // Initialize a new bufio scanner, to read line by line. - bio := bufio.NewScanner(resp.Body) - - // Close the response body. - defer resp.Body.Close() - - // Unmarshal each line, returns marshaled values. - for bio.Scan() { - var traceRec trace.Info - if err = json.Unmarshal(bio.Bytes(), &traceRec); err != nil { - continue + dec := json.NewDecoder(resp.Body) + for { + var info trace.Info + if err = dec.Decode(&info); err != nil { + break } select { case <-doneCh: return - case traceInfoCh <- TraceInfo{Trace: traceRec}: - } - } - // Look for any underlying errors. - if err = bio.Err(); err != nil { - // For an unexpected connection drop from server, we close the body - // and re-connect. - if err == io.ErrUnexpectedEOF { - resp.Body.Close() + case traceInfoCh <- TraceInfo{Trace: info}: } } } diff --git a/pkg/pubsub/pubsub.go b/pkg/pubsub/pubsub.go index 308e548e0..2f9d45985 100644 --- a/pkg/pubsub/pubsub.go +++ b/pkg/pubsub/pubsub.go @@ -20,64 +20,65 @@ import ( "sync" ) +// Sub - subscriber entity. +type Sub struct { + ch chan interface{} + filter func(entry interface{}) bool +} + // PubSub holds publishers and subscribers type PubSub struct { - subs []chan interface{} - pub chan interface{} - mutex sync.Mutex + subs []*Sub + sync.RWMutex } -// process item to subscribers. -func (ps *PubSub) process() { - for item := range ps.pub { - ps.mutex.Lock() - for _, sub := range ps.subs { - go func(s chan interface{}) { - s <- item - }(sub) +// Publish message to the subscribers. +// Note that publish is always nob-blocking send so that we don't block on slow receivers. +// Hence receivers should use buffered channel so as not to miss the published events. +func (ps *PubSub) Publish(item interface{}) { + ps.RLock() + defer ps.RUnlock() + + for _, sub := range ps.subs { + if sub.filter(item) { + select { + case sub.ch <- item: + default: + } } - ps.mutex.Unlock() } } -// Publish message to pubsub system -func (ps *PubSub) Publish(item interface{}) { - ps.pub <- item -} - // Subscribe - Adds a subscriber to pubsub system -func (ps *PubSub) Subscribe() chan interface{} { - ps.mutex.Lock() - defer ps.mutex.Unlock() - ch := make(chan interface{}) - ps.subs = append(ps.subs, ch) - return ch -} +func (ps *PubSub) Subscribe(subCh chan interface{}, doneCh chan struct{}, filter func(entry interface{}) bool) { + ps.Lock() + defer ps.Unlock() + + sub := &Sub{subCh, filter} + ps.subs = append(ps.subs, sub) -// Unsubscribe removes current subscriber -func (ps *PubSub) Unsubscribe(ch chan interface{}) { - ps.mutex.Lock() - defer ps.mutex.Unlock() + go func() { + <-doneCh - for i, sub := range ps.subs { - if sub == ch { - close(ch) - ps.subs = append(ps.subs[:i], ps.subs[i+1:]...) + ps.Lock() + defer ps.Unlock() + + for i, s := range ps.subs { + if s == sub { + ps.subs = append(ps.subs[:i], ps.subs[i+1:]...) + } } - } + }() } // HasSubscribers returns true if pubsub system has subscribers func (ps *PubSub) HasSubscribers() bool { - ps.mutex.Lock() - defer ps.mutex.Unlock() + ps.RLock() + defer ps.RUnlock() return len(ps.subs) > 0 } // New inits a PubSub system func New() *PubSub { - ps := &PubSub{} - ps.pub = make(chan interface{}) - go ps.process() - return ps + return &PubSub{} } diff --git a/pkg/pubsub/pubsub_test.go b/pkg/pubsub/pubsub_test.go index cd4119757..f30a71a28 100644 --- a/pkg/pubsub/pubsub_test.go +++ b/pkg/pubsub/pubsub_test.go @@ -19,12 +19,19 @@ package pubsub import ( "fmt" "testing" + "time" ) func TestSubscribe(t *testing.T) { ps := New() - ps.Subscribe() - ps.Subscribe() + ch1 := make(chan interface{}, 1) + ch2 := make(chan interface{}, 1) + doneCh := make(chan struct{}) + defer close(doneCh) + ps.Subscribe(ch1, doneCh, nil) + ps.Subscribe(ch2, doneCh, nil) + ps.Lock() + defer ps.Unlock() if len(ps.subs) != 2 { t.Errorf("expected 2 subscribers") } @@ -32,20 +39,33 @@ func TestSubscribe(t *testing.T) { func TestUnsubscribe(t *testing.T) { ps := New() - c1 := ps.Subscribe() - ps.Subscribe() - ps.Unsubscribe(c1) + ch1 := make(chan interface{}, 1) + ch2 := make(chan interface{}, 1) + doneCh1 := make(chan struct{}) + doneCh2 := make(chan struct{}) + ps.Subscribe(ch1, doneCh1, nil) + ps.Subscribe(ch2, doneCh2, nil) + + close(doneCh1) + // Allow for the above statement to take effect. + time.Sleep(100 * time.Millisecond) + ps.Lock() if len(ps.subs) != 1 { t.Errorf("expected 1 subscriber") } + ps.Unlock() + close(doneCh2) } func TestPubSub(t *testing.T) { ps := New() - c1 := ps.Subscribe() + ch1 := make(chan interface{}, 1) + doneCh1 := make(chan struct{}) + defer close(doneCh1) + ps.Subscribe(ch1, doneCh1, func(entry interface{}) bool { return true }) val := "hello" ps.Publish(val) - msg := <-c1 + msg := <-ch1 if msg != "hello" { t.Errorf(fmt.Sprintf("expected %s , found %s", val, msg)) } @@ -53,13 +73,17 @@ func TestPubSub(t *testing.T) { func TestMultiPubSub(t *testing.T) { ps := New() - c1 := ps.Subscribe() - c2 := ps.Subscribe() + ch1 := make(chan interface{}, 1) + ch2 := make(chan interface{}, 1) + doneCh := make(chan struct{}) + defer close(doneCh) + ps.Subscribe(ch1, doneCh, func(entry interface{}) bool { return true }) + ps.Subscribe(ch2, doneCh, func(entry interface{}) bool { return true }) val := "hello" ps.Publish(val) - msg1 := <-c1 - msg2 := <-c2 + msg1 := <-ch1 + msg2 := <-ch2 if msg1 != "hello" && msg2 != "hello" { t.Errorf(fmt.Sprintf("expected both subscribers to have%s , found %s and %s", val, msg1, msg2)) }