From 7d54aca762ea6760e74ee8029fba43ea4a2f3519 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Mon, 16 Nov 2020 14:28:08 +0200 Subject: [PATCH] Add metric for tracking buffer size --- metrics.go | 12 ++++++++++++ portal.go | 56 +++++++++++++++++++++++++++++++++++------------------- user.go | 3 +++ 3 files changed, 51 insertions(+), 20 deletions(-) diff --git a/metrics.go b/metrics.go index 156ed0f..8db555d 100644 --- a/metrics.go +++ b/metrics.go @@ -61,6 +61,7 @@ type MetricsHandler struct { loggedInState map[types.WhatsAppID]bool syncLocked prometheus.Gauge syncLockedState map[types.WhatsAppID]bool + bufferLength *prometheus.GaugeVec } func NewMetricsHandler(address string, log log.Logger, db *database.Database) *MetricsHandler { @@ -119,6 +120,10 @@ func NewMetricsHandler(address string, log log.Logger, db *database.Database) *M Help: "Bridge users locked in post-login sync", }), syncLockedState: make(map[types.WhatsAppID]bool), + bufferLength: promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "bridge_buffer_size", + Help: "Number of messages in buffer", + }, []string{"user_id"}), } } @@ -189,6 +194,13 @@ func (mh *MetricsHandler) TrackSyncLock(jid types.WhatsAppID, locked bool) { } } +func (mh *MetricsHandler) TrackBufferLength(id id.UserID, length int) { + if !mh.running { + return + } + mh.bufferLength.With(prometheus.Labels{"user_id": string(id)}).Set(float64(length)) +} + func (mh *MetricsHandler) updateStats() { start := time.Now() var puppetCount int diff --git a/portal.go b/portal.go index 67c0f32..ebaad83 100644 --- a/portal.go +++ b/portal.go @@ -133,7 +133,8 @@ func (bridge *Bridge) NewManualPortal(key database.PortalKey) *Portal { recentlyHandled: [recentlyHandledLength]types.WhatsAppMessageID{}, - messages: make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer), + messages: make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer), + backfillStart: make(chan struct{}, 1), } portal.Key = key go portal.handleMessageLoop() @@ -148,7 +149,8 @@ func (bridge *Bridge) NewPortal(dbPortal *database.Portal) *Portal { recentlyHandled: [recentlyHandledLength]types.WhatsAppMessageID{}, - messages: make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer), + messages: make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer), + backfillStart: make(chan struct{}, 1), } go portal.handleMessageLoop() return portal @@ -175,7 +177,8 @@ type Portal struct { recentlyHandledLock sync.Mutex recentlyHandledIndex uint8 - backfillLock sync.Mutex + backfillStart chan struct{} + backfillWait sync.WaitGroup backfilling bool lastMessageTs uint64 @@ -190,25 +193,34 @@ type Portal struct { const MaxMessageAgeToCreatePortal = 5 * 60 // 5 minutes func (portal *Portal) handleMessageLoop() { - for msg := range portal.messages { - if len(portal.MXID) == 0 { - if msg.timestamp+MaxMessageAgeToCreatePortal < uint64(time.Now().Unix()) { - portal.log.Debugln("Not creating portal room for incoming message as the message is too old.") - continue - } - portal.log.Debugln("Creating Matrix room from incoming message") - err := portal.CreateMatrixRoom(msg.source) - if err != nil { - portal.log.Errorln("Failed to create portal room:", err) - return - } + for { + select { + case msg := <-portal.messages: + portal.handleNewMessage(msg) + case <-portal.backfillStart: + portal.log.Debugln("Processing of incoming messages is locked for backfilling") + portal.backfillWait.Wait() + portal.log.Debugln("Processing of incoming messages unlocked after backfilling") } - portal.backfillLock.Lock() - portal.handleMessage(msg) - portal.backfillLock.Unlock() } } +func (portal *Portal) handleNewMessage(msg PortalMessage) { + if len(portal.MXID) == 0 { + if msg.timestamp+MaxMessageAgeToCreatePortal < uint64(time.Now().Unix()) { + portal.log.Debugln("Not creating portal room for incoming message as the message is too old.") + return + } + portal.log.Debugln("Creating Matrix room from incoming message") + err := portal.CreateMatrixRoom(msg.source) + if err != nil { + portal.log.Errorln("Failed to create portal room:", err) + return + } + } + portal.handleMessage(msg) +} + func (portal *Portal) handleMessage(msg PortalMessage) { if len(portal.MXID) == 0 { portal.log.Warnln("handleMessage called even though portal.MXID is empty") @@ -729,7 +741,11 @@ func (portal *Portal) BackfillHistory(user *User, lastMessageTime uint64) error } func (portal *Portal) beginBackfill() func() { - portal.backfillLock.Lock() + portal.backfillWait.Add(1) + select { + case portal.backfillStart <- struct{}{}: + default: + } portal.backfilling = true var privateChatPuppetInvited bool var privateChatPuppet *Puppet @@ -747,7 +763,7 @@ func (portal *Portal) beginBackfill() func() { return func() { portal.backfilling = false portal.privateChatBackfillInvitePuppet = nil - portal.backfillLock.Unlock() + portal.backfillWait.Done() if privateChatPuppet != nil && privateChatPuppetInvited { _, _ = privateChatPuppet.DefaultIntent().LeaveRoom(portal.MXID) } diff --git a/user.go b/user.go index 82db5ee..0d08938 100644 --- a/user.go +++ b/user.go @@ -416,6 +416,7 @@ func (cl ChatList) Swap(i, j int) { func (user *User) PostLogin() { user.bridge.Metrics.TrackConnectionState(user.JID, true) user.bridge.Metrics.TrackLoginState(user.JID, true) + user.bridge.Metrics.TrackBufferLength(user.MXID, 0) user.log.Debugln("Locking processing of incoming messages and starting post-login sync") user.syncWait.Add(1) user.syncStart <- struct{}{} @@ -799,6 +800,7 @@ func (user *User) runMessageRingBuffer() { for msg := range user.messageInput { select { case user.messageOutput <- msg: + user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput)) default: dropped := <-user.messageOutput user.log.Warnln("Buffer is full, dropping message in", dropped.chat) @@ -811,6 +813,7 @@ func (user *User) handleMessageLoop() { for { select { case msg := <-user.messageOutput: + user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput)) user.GetPortalByJID(msg.chat).messages <- msg case <-user.syncStart: user.log.Debugln("Processing of incoming messages is locked")