diff --git a/cmd/background-heal-ops.go b/cmd/background-heal-ops.go index b86ca3d67..24e080eeb 100644 --- a/cmd/background-heal-ops.go +++ b/cmd/background-heal-ops.go @@ -67,7 +67,7 @@ func waitForLowHTTPReq(maxIO int, maxWait time.Duration) { // Bucket notification and http trace are not costly, it is okay to ignore them // while counting the number of concurrent connections maxIOFn := func() int { - return maxIO + globalHTTPListen.NumSubscribers() + globalHTTPTrace.NumSubscribers() + return maxIO + int(globalHTTPListen.NumSubscribers()) + int(globalHTTPTrace.NumSubscribers()) } if httpServer := newHTTPServerFn(); httpServer != nil { diff --git a/cmd/consolelogger.go b/cmd/consolelogger.go index 8af3de852..5ca0042a6 100644 --- a/cmd/consolelogger.go +++ b/cmd/consolelogger.go @@ -70,7 +70,7 @@ func (sys *HTTPConsoleLoggerSys) SetNodeName(endpointServerPools EndpointServerP // HasLogListeners returns true if console log listeners are registered // for this node or peers func (sys *HTTPConsoleLoggerSys) HasLogListeners() bool { - return sys != nil && sys.pubsub.HasSubscribers() + return sys != nil && sys.pubsub.NumSubscribers() > 0 } // Subscribe starts console logging for this node. diff --git a/cmd/handler-utils.go b/cmd/handler-utils.go index 5de66ed14..a5695670d 100644 --- a/cmd/handler-utils.go +++ b/cmd/handler-utils.go @@ -348,7 +348,7 @@ func extractPostPolicyFormValues(ctx context.Context, form *multipart.Form) (fil // Log headers and body. func httpTraceAll(f http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - if !globalHTTPTrace.HasSubscribers() { + if globalHTTPTrace.NumSubscribers() == 0 { f.ServeHTTP(w, r) return } @@ -360,7 +360,7 @@ func httpTraceAll(f http.HandlerFunc) http.HandlerFunc { // Log only the headers. func httpTraceHdrs(f http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - if !globalHTTPTrace.HasSubscribers() { + if globalHTTPTrace.NumSubscribers() == 0 { f.ServeHTTP(w, r) return } diff --git a/cmd/notification.go b/cmd/notification.go index cc3af098e..bb8c2a317 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -1375,7 +1375,7 @@ func sendEvent(args eventArgs) { return } - if globalHTTPListen.HasSubscribers() { + if globalHTTPListen.NumSubscribers() > 0 { globalHTTPListen.Publish(args.ToEvent(false)) } diff --git a/cmd/web-router.go b/cmd/web-router.go index 877eab497..211a5595e 100644 --- a/cmd/web-router.go +++ b/cmd/web-router.go @@ -85,7 +85,7 @@ func registerWebRouter(router *mux.Router) error { "bucket": bucketName, "object": objectName, }) - if globalHTTPTrace.HasSubscribers() { + if globalHTTPTrace.NumSubscribers() > 0 { globalHTTPTrace.Publish(WebTrace(ri)) } logger.AuditLog(ri.ResponseWriter, ri.Request, ri.Method, claims.Map()) diff --git a/pkg/pubsub/pubsub.go b/pkg/pubsub/pubsub.go index e13c12eca..fe6cf6e86 100644 --- a/pkg/pubsub/pubsub.go +++ b/pkg/pubsub/pubsub.go @@ -18,6 +18,7 @@ package pubsub import ( "sync" + "sync/atomic" ) // Sub - subscriber entity. @@ -28,7 +29,8 @@ type Sub struct { // PubSub holds publishers and subscribers type PubSub struct { - subs []*Sub + subs []*Sub + numSubscribers int32 sync.RWMutex } @@ -56,6 +58,7 @@ func (ps *PubSub) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, filt sub := &Sub{subCh, filter} ps.subs = append(ps.subs, sub) + atomic.AddInt32(&ps.numSubscribers, 1) go func() { <-doneCh @@ -68,19 +71,13 @@ func (ps *PubSub) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, filt ps.subs = append(ps.subs[:i], ps.subs[i+1:]...) } } + atomic.AddInt32(&ps.numSubscribers, -1) }() } -// HasSubscribers returns true if pubsub system has subscribers -func (ps *PubSub) HasSubscribers() bool { - return ps.NumSubscribers() > 0 -} - // NumSubscribers returns the number of current subscribers -func (ps *PubSub) NumSubscribers() int { - ps.RLock() - defer ps.RUnlock() - return len(ps.subs) +func (ps *PubSub) NumSubscribers() int32 { + return atomic.LoadInt32(&ps.numSubscribers) } // New inits a PubSub system