tracing: NumSubscribers() to use atomic instead of mutex (#11219)

globalSubscribers.NumSubscribers() is heavily used in S3 requests and it
uses mutex, use atomic.Load instead since it is faster

Co-authored-by: Anis Elleuch <anis@min.io>
master
Anis Elleuch 4 years ago committed by GitHub
parent dfd99b6d8f
commit 153d4be032
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      cmd/background-heal-ops.go
  2. 2
      cmd/consolelogger.go
  3. 4
      cmd/handler-utils.go
  4. 2
      cmd/notification.go
  5. 2
      cmd/web-router.go
  6. 17
      pkg/pubsub/pubsub.go

@ -67,7 +67,7 @@ func waitForLowHTTPReq(maxIO int, maxWait time.Duration) {
// Bucket notification and http trace are not costly, it is okay to ignore them // Bucket notification and http trace are not costly, it is okay to ignore them
// while counting the number of concurrent connections // while counting the number of concurrent connections
maxIOFn := func() int { maxIOFn := func() int {
return maxIO + globalHTTPListen.NumSubscribers() + globalHTTPTrace.NumSubscribers() return maxIO + int(globalHTTPListen.NumSubscribers()) + int(globalHTTPTrace.NumSubscribers())
} }
if httpServer := newHTTPServerFn(); httpServer != nil { if httpServer := newHTTPServerFn(); httpServer != nil {

@ -70,7 +70,7 @@ func (sys *HTTPConsoleLoggerSys) SetNodeName(endpointServerPools EndpointServerP
// HasLogListeners returns true if console log listeners are registered // HasLogListeners returns true if console log listeners are registered
// for this node or peers // for this node or peers
func (sys *HTTPConsoleLoggerSys) HasLogListeners() bool { func (sys *HTTPConsoleLoggerSys) HasLogListeners() bool {
return sys != nil && sys.pubsub.HasSubscribers() return sys != nil && sys.pubsub.NumSubscribers() > 0
} }
// Subscribe starts console logging for this node. // Subscribe starts console logging for this node.

@ -348,7 +348,7 @@ func extractPostPolicyFormValues(ctx context.Context, form *multipart.Form) (fil
// Log headers and body. // Log headers and body.
func httpTraceAll(f http.HandlerFunc) http.HandlerFunc { func httpTraceAll(f http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
if !globalHTTPTrace.HasSubscribers() { if globalHTTPTrace.NumSubscribers() == 0 {
f.ServeHTTP(w, r) f.ServeHTTP(w, r)
return return
} }
@ -360,7 +360,7 @@ func httpTraceAll(f http.HandlerFunc) http.HandlerFunc {
// Log only the headers. // Log only the headers.
func httpTraceHdrs(f http.HandlerFunc) http.HandlerFunc { func httpTraceHdrs(f http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
if !globalHTTPTrace.HasSubscribers() { if globalHTTPTrace.NumSubscribers() == 0 {
f.ServeHTTP(w, r) f.ServeHTTP(w, r)
return return
} }

@ -1375,7 +1375,7 @@ func sendEvent(args eventArgs) {
return return
} }
if globalHTTPListen.HasSubscribers() { if globalHTTPListen.NumSubscribers() > 0 {
globalHTTPListen.Publish(args.ToEvent(false)) globalHTTPListen.Publish(args.ToEvent(false))
} }

@ -85,7 +85,7 @@ func registerWebRouter(router *mux.Router) error {
"bucket": bucketName, "bucket": bucketName,
"object": objectName, "object": objectName,
}) })
if globalHTTPTrace.HasSubscribers() { if globalHTTPTrace.NumSubscribers() > 0 {
globalHTTPTrace.Publish(WebTrace(ri)) globalHTTPTrace.Publish(WebTrace(ri))
} }
logger.AuditLog(ri.ResponseWriter, ri.Request, ri.Method, claims.Map()) logger.AuditLog(ri.ResponseWriter, ri.Request, ri.Method, claims.Map())

@ -18,6 +18,7 @@ package pubsub
import ( import (
"sync" "sync"
"sync/atomic"
) )
// Sub - subscriber entity. // Sub - subscriber entity.
@ -28,7 +29,8 @@ type Sub struct {
// PubSub holds publishers and subscribers // PubSub holds publishers and subscribers
type PubSub struct { type PubSub struct {
subs []*Sub subs []*Sub
numSubscribers int32
sync.RWMutex sync.RWMutex
} }
@ -56,6 +58,7 @@ func (ps *PubSub) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, filt
sub := &Sub{subCh, filter} sub := &Sub{subCh, filter}
ps.subs = append(ps.subs, sub) ps.subs = append(ps.subs, sub)
atomic.AddInt32(&ps.numSubscribers, 1)
go func() { go func() {
<-doneCh <-doneCh
@ -68,19 +71,13 @@ func (ps *PubSub) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, filt
ps.subs = append(ps.subs[:i], ps.subs[i+1:]...) ps.subs = append(ps.subs[:i], ps.subs[i+1:]...)
} }
} }
atomic.AddInt32(&ps.numSubscribers, -1)
}() }()
} }
// HasSubscribers returns true if pubsub system has subscribers
func (ps *PubSub) HasSubscribers() bool {
return ps.NumSubscribers() > 0
}
// NumSubscribers returns the number of current subscribers // NumSubscribers returns the number of current subscribers
func (ps *PubSub) NumSubscribers() int { func (ps *PubSub) NumSubscribers() int32 {
ps.RLock() return atomic.LoadInt32(&ps.numSubscribers)
defer ps.RUnlock()
return len(ps.subs)
} }
// New inits a PubSub system // New inits a PubSub system

Loading…
Cancel
Save