Track WhatsApp message age and processing duration

This commit is contained in:
Tulir Asokan 2021-06-25 15:33:37 +03:00
parent 3eb5f44023
commit e926d0175e
3 changed files with 95 additions and 53 deletions

View file

@ -60,7 +60,7 @@ func NewMatrixHandler(bridge *Bridge) *MatrixHandler {
} }
func (mx *MatrixHandler) HandleEncryption(evt *event.Event) { func (mx *MatrixHandler) HandleEncryption(evt *event.Event) {
defer mx.bridge.Metrics.TrackEvent(evt.Type)() defer mx.bridge.Metrics.TrackMatrixEvent(evt.Type)()
if evt.Content.AsEncryption().Algorithm != id.AlgorithmMegolmV1 { if evt.Content.AsEncryption().Algorithm != id.AlgorithmMegolmV1 {
return return
} }
@ -242,7 +242,7 @@ func (mx *MatrixHandler) HandleMembership(evt *event.Event) {
if _, isPuppet := mx.bridge.ParsePuppetMXID(evt.Sender); evt.Sender == mx.bridge.Bot.UserID || isPuppet { if _, isPuppet := mx.bridge.ParsePuppetMXID(evt.Sender); evt.Sender == mx.bridge.Bot.UserID || isPuppet {
return return
} }
defer mx.bridge.Metrics.TrackEvent(evt.Type)() defer mx.bridge.Metrics.TrackMatrixEvent(evt.Type)()
if mx.bridge.Crypto != nil { if mx.bridge.Crypto != nil {
mx.bridge.Crypto.HandleMemberEvent(evt) mx.bridge.Crypto.HandleMemberEvent(evt)
@ -294,7 +294,7 @@ func (mx *MatrixHandler) HandleMembership(evt *event.Event) {
} }
func (mx *MatrixHandler) HandleRoomMetadata(evt *event.Event) { func (mx *MatrixHandler) HandleRoomMetadata(evt *event.Event) {
defer mx.bridge.Metrics.TrackEvent(evt.Type)() defer mx.bridge.Metrics.TrackMatrixEvent(evt.Type)()
if mx.shouldIgnoreEvent(evt) { if mx.shouldIgnoreEvent(evt) {
return return
} }
@ -330,7 +330,7 @@ func (mx *MatrixHandler) shouldIgnoreEvent(evt *event.Event) bool {
const sessionWaitTimeout = 5 * time.Second const sessionWaitTimeout = 5 * time.Second
func (mx *MatrixHandler) HandleEncrypted(evt *event.Event) { func (mx *MatrixHandler) HandleEncrypted(evt *event.Event) {
defer mx.bridge.Metrics.TrackEvent(evt.Type)() defer mx.bridge.Metrics.TrackMatrixEvent(evt.Type)()
if mx.shouldIgnoreEvent(evt) || mx.bridge.Crypto == nil { if mx.shouldIgnoreEvent(evt) || mx.bridge.Crypto == nil {
return return
} }
@ -400,7 +400,7 @@ func (mx *MatrixHandler) waitLongerForSession(evt *event.Event) {
} }
func (mx *MatrixHandler) HandleMessage(evt *event.Event) { func (mx *MatrixHandler) HandleMessage(evt *event.Event) {
defer mx.bridge.Metrics.TrackEvent(evt.Type)() defer mx.bridge.Metrics.TrackMatrixEvent(evt.Type)()
if mx.shouldIgnoreEvent(evt) { if mx.shouldIgnoreEvent(evt) {
return return
} }
@ -426,7 +426,7 @@ func (mx *MatrixHandler) HandleMessage(evt *event.Event) {
} }
func (mx *MatrixHandler) HandleRedaction(evt *event.Event) { func (mx *MatrixHandler) HandleRedaction(evt *event.Event) {
defer mx.bridge.Metrics.TrackEvent(evt.Type)() defer mx.bridge.Metrics.TrackMatrixEvent(evt.Type)()
if _, isPuppet := mx.bridge.ParsePuppetMXID(evt.Sender); evt.Sender == mx.bridge.Bot.UserID || isPuppet { if _, isPuppet := mx.bridge.ParsePuppetMXID(evt.Sender); evt.Sender == mx.bridge.Bot.UserID || isPuppet {
return return
} }

View file

@ -44,7 +44,9 @@ type MetricsHandler struct {
ctx context.Context ctx context.Context
stopRecorder func() stopRecorder func()
messageHandling *prometheus.HistogramVec matrixEventHandling *prometheus.HistogramVec
whatsappMessageAge prometheus.Histogram
whatsappMessageHandling *prometheus.HistogramVec
countCollection prometheus.Histogram countCollection prometheus.Histogram
disconnections *prometheus.CounterVec disconnections *prometheus.CounterVec
puppetCount prometheus.Gauge puppetCount prometheus.Gauge
@ -76,10 +78,19 @@ func NewMetricsHandler(address string, log log.Logger, db *database.Database) *M
log: log, log: log,
running: false, running: false,
messageHandling: promauto.NewHistogramVec(prometheus.HistogramOpts{ matrixEventHandling: promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "matrix_event", Name: "matrix_event",
Help: "Time spent processing Matrix events", Help: "Time spent processing Matrix events",
}, []string{"event_type"}), }, []string{"event_type"}),
whatsappMessageAge: promauto.NewHistogram(prometheus.HistogramOpts{
Name: "whatsapp_message_age",
Help: "Age of messages received from WhatsApp",
Buckets: []float64{1, 2, 3, 5, 7.5, 10, 20, 30, 60},
}),
whatsappMessageHandling: promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "whatsapp_message",
Help: "Time spent processing WhatsApp messages",
}, []string{"message_type"}),
countCollection: promauto.NewHistogram(prometheus.HistogramOpts{ countCollection: promauto.NewHistogram(prometheus.HistogramOpts{
Name: "whatsapp_count_collection", Name: "whatsapp_count_collection",
Help: "Time spent collecting the whatsapp_*_total metrics", Help: "Time spent collecting the whatsapp_*_total metrics",
@ -130,19 +141,34 @@ func NewMetricsHandler(address string, log log.Logger, db *database.Database) *M
func noop() {} func noop() {}
func (mh *MetricsHandler) TrackEvent(eventType event.Type) func() { func (mh *MetricsHandler) TrackMatrixEvent(eventType event.Type) func() {
if !mh.running { if !mh.running {
return noop return noop
} }
start := time.Now() start := time.Now()
return func() { return func() {
duration := time.Now().Sub(start) duration := time.Now().Sub(start)
mh.messageHandling. mh.matrixEventHandling.
With(prometheus.Labels{"event_type": eventType.Type}). With(prometheus.Labels{"event_type": eventType.Type}).
Observe(duration.Seconds()) Observe(duration.Seconds())
} }
} }
func (mh *MetricsHandler) TrackWhatsAppMessage(timestamp uint64, messageType string) func() {
if !mh.running {
return noop
}
start := time.Now()
return func() {
duration := time.Now().Sub(start)
mh.whatsappMessageHandling.
With(prometheus.Labels{"message_type": messageType}).
Observe(duration.Seconds())
mh.whatsappMessageAge.Observe(time.Now().Sub(time.Unix(int64(timestamp), 0)).Seconds())
}
}
func (mh *MetricsHandler) TrackDisconnection(userID id.UserID) { func (mh *MetricsHandler) TrackDisconnection(userID id.UserID) {
if !mh.running { if !mh.running {
return return

102
portal.go
View file

@ -254,29 +254,35 @@ func (portal *Portal) handleMessage(msg PortalMessage, isBackfill bool) {
portal.log.Warnln("handleMessage called even though portal.MXID is empty") portal.log.Warnln("handleMessage called even though portal.MXID is empty")
return return
} }
var triedToHandle bool
var trackMessageCallback func()
dataType := reflect.TypeOf(msg.data)
if !isBackfill {
trackMessageCallback = portal.bridge.Metrics.TrackWhatsAppMessage(msg.timestamp, dataType.Name())
}
switch data := msg.data.(type) { switch data := msg.data.(type) {
case whatsapp.TextMessage: case whatsapp.TextMessage:
portal.HandleTextMessage(msg.source, data) triedToHandle = portal.HandleTextMessage(msg.source, data)
case whatsapp.ImageMessage: case whatsapp.ImageMessage:
portal.HandleMediaMessage(msg.source, mediaMessage{ triedToHandle = portal.HandleMediaMessage(msg.source, mediaMessage{
base: base{data.Download, data.Info, data.ContextInfo, data.Type}, base: base{data.Download, data.Info, data.ContextInfo, data.Type},
thumbnail: data.Thumbnail, thumbnail: data.Thumbnail,
caption: data.Caption, caption: data.Caption,
}) })
case whatsapp.StickerMessage: case whatsapp.StickerMessage:
portal.HandleMediaMessage(msg.source, mediaMessage{ triedToHandle = portal.HandleMediaMessage(msg.source, mediaMessage{
base: base{data.Download, data.Info, data.ContextInfo, data.Type}, base: base{data.Download, data.Info, data.ContextInfo, data.Type},
sendAsSticker: true, sendAsSticker: true,
}) })
case whatsapp.VideoMessage: case whatsapp.VideoMessage:
portal.HandleMediaMessage(msg.source, mediaMessage{ triedToHandle = portal.HandleMediaMessage(msg.source, mediaMessage{
base: base{data.Download, data.Info, data.ContextInfo, data.Type}, base: base{data.Download, data.Info, data.ContextInfo, data.Type},
thumbnail: data.Thumbnail, thumbnail: data.Thumbnail,
caption: data.Caption, caption: data.Caption,
length: data.Length * 1000, length: data.Length * 1000,
}) })
case whatsapp.AudioMessage: case whatsapp.AudioMessage:
portal.HandleMediaMessage(msg.source, mediaMessage{ triedToHandle = portal.HandleMediaMessage(msg.source, mediaMessage{
base: base{data.Download, data.Info, data.ContextInfo, data.Type}, base: base{data.Download, data.Info, data.ContextInfo, data.Type},
length: data.Length * 1000, length: data.Length * 1000,
}) })
@ -285,23 +291,26 @@ func (portal *Portal) handleMessage(msg PortalMessage, isBackfill bool) {
if len(fileName) == 0 { if len(fileName) == 0 {
fileName = data.Title fileName = data.Title
} }
portal.HandleMediaMessage(msg.source, mediaMessage{ triedToHandle = portal.HandleMediaMessage(msg.source, mediaMessage{
base: base{data.Download, data.Info, data.ContextInfo, data.Type}, base: base{data.Download, data.Info, data.ContextInfo, data.Type},
thumbnail: data.Thumbnail, thumbnail: data.Thumbnail,
fileName: fileName, fileName: fileName,
}) })
case whatsapp.ContactMessage: case whatsapp.ContactMessage:
portal.HandleContactMessage(msg.source, data) triedToHandle = portal.HandleContactMessage(msg.source, data)
case whatsapp.LocationMessage: case whatsapp.LocationMessage:
portal.HandleLocationMessage(msg.source, data) triedToHandle = portal.HandleLocationMessage(msg.source, data)
case whatsapp.StubMessage: case whatsapp.StubMessage:
portal.HandleStubMessage(msg.source, data, isBackfill) triedToHandle = portal.HandleStubMessage(msg.source, data, isBackfill)
case whatsapp.MessageRevocation: case whatsapp.MessageRevocation:
portal.HandleMessageRevoke(msg.source, data) triedToHandle = portal.HandleMessageRevoke(msg.source, data)
case FakeMessage: case FakeMessage:
portal.HandleFakeMessage(msg.source, data) triedToHandle = portal.HandleFakeMessage(msg.source, data)
default: default:
portal.log.Warnln("Unknown message type:", reflect.TypeOf(msg.data)) portal.log.Warnln("Unknown message type:", dataType)
}
if triedToHandle && trackMessageCallback != nil {
trackMessageCallback()
} }
} }
@ -1281,10 +1290,10 @@ func (portal *Portal) SetReply(content *event.MessageEventContent, info whatsapp
return return
} }
func (portal *Portal) HandleMessageRevoke(user *User, message whatsapp.MessageRevocation) { func (portal *Portal) HandleMessageRevoke(user *User, message whatsapp.MessageRevocation) bool {
msg := portal.bridge.DB.Message.GetByJID(portal.Key, message.Id) msg := portal.bridge.DB.Message.GetByJID(portal.Key, message.Id)
if msg == nil || msg.IsFakeMXID() { if msg == nil || msg.IsFakeMXID() {
return return false
} }
var intent *appservice.IntentAPI var intent *appservice.IntentAPI
if message.FromMe { if message.FromMe {
@ -1302,14 +1311,15 @@ func (portal *Portal) HandleMessageRevoke(user *User, message whatsapp.MessageRe
_, err := intent.RedactEvent(portal.MXID, msg.MXID) _, err := intent.RedactEvent(portal.MXID, msg.MXID)
if err != nil { if err != nil {
portal.log.Errorln("Failed to redact %s: %v", msg.JID, err) portal.log.Errorln("Failed to redact %s: %v", msg.JID, err)
return } else {
msg.Delete()
} }
msg.Delete() return true
} }
func (portal *Portal) HandleFakeMessage(_ *User, message FakeMessage) { func (portal *Portal) HandleFakeMessage(_ *User, message FakeMessage) bool {
if portal.isRecentlyHandled(message.ID) { if portal.isRecentlyHandled(message.ID) {
return return false
} }
content := event.MessageEventContent{ content := event.MessageEventContent{
@ -1322,7 +1332,7 @@ func (portal *Portal) HandleFakeMessage(_ *User, message FakeMessage) {
_, err := portal.sendMainIntentMessage(content) _, err := portal.sendMainIntentMessage(content)
if err != nil { if err != nil {
portal.log.Errorfln("Failed to handle fake message %s: %v", message.ID, err) portal.log.Errorfln("Failed to handle fake message %s: %v", message.ID, err)
return return true
} }
portal.recentlyHandledLock.Lock() portal.recentlyHandledLock.Lock()
@ -1330,6 +1340,7 @@ func (portal *Portal) HandleFakeMessage(_ *User, message FakeMessage) {
portal.recentlyHandledIndex = (portal.recentlyHandledIndex + 1) % recentlyHandledLength portal.recentlyHandledIndex = (portal.recentlyHandledIndex + 1) % recentlyHandledLength
portal.recentlyHandledLock.Unlock() portal.recentlyHandledLock.Unlock()
portal.recentlyHandled[index] = message.ID portal.recentlyHandled[index] = message.ID
return true
} }
func (portal *Portal) sendMainIntentMessage(content interface{}) (*mautrix.RespSendEvent, error) { func (portal *Portal) sendMainIntentMessage(content interface{}) (*mautrix.RespSendEvent, error) {
@ -1361,10 +1372,10 @@ func (portal *Portal) sendMessage(intent *appservice.IntentAPI, eventType event.
} }
} }
func (portal *Portal) HandleTextMessage(source *User, message whatsapp.TextMessage) { func (portal *Portal) HandleTextMessage(source *User, message whatsapp.TextMessage) bool {
intent := portal.startHandling(source, message.Info, "text") intent := portal.startHandling(source, message.Info, "text")
if intent == nil { if intent == nil {
return return false
} }
content := &event.MessageEventContent{ content := &event.MessageEventContent{
@ -1379,20 +1390,21 @@ func (portal *Portal) HandleTextMessage(source *User, message whatsapp.TextMessa
resp, err := portal.sendMessage(intent, event.EventMessage, content, int64(message.Info.Timestamp*1000)) resp, err := portal.sendMessage(intent, event.EventMessage, content, int64(message.Info.Timestamp*1000))
if err != nil { if err != nil {
portal.log.Errorfln("Failed to handle message %s: %v", message.Info.Id, err) portal.log.Errorfln("Failed to handle message %s: %v", message.Info.Id, err)
return } else {
portal.finishHandling(source, message.Info.Source, resp.EventID)
} }
portal.finishHandling(source, message.Info.Source, resp.EventID) return true
} }
func (portal *Portal) HandleStubMessage(source *User, message whatsapp.StubMessage, isBackfill bool) { func (portal *Portal) HandleStubMessage(source *User, message whatsapp.StubMessage, isBackfill bool) bool {
if portal.bridge.Config.Bridge.ChatMetaSync && (!portal.IsBroadcastList() || isBackfill) { if portal.bridge.Config.Bridge.ChatMetaSync && (!portal.IsBroadcastList() || isBackfill) {
// Chat meta sync is enabled, so we use chat update commands and full-syncs instead of message history // Chat meta sync is enabled, so we use chat update commands and full-syncs instead of message history
// However, broadcast lists don't have update commands, so we handle these if it's not a backfill // However, broadcast lists don't have update commands, so we handle these if it's not a backfill
return return false
} }
intent := portal.startHandling(source, message.Info, fmt.Sprintf("stub %s", message.Type.String())) intent := portal.startHandling(source, message.Info, fmt.Sprintf("stub %s", message.Type.String()))
if intent == nil { if intent == nil {
return return false
} }
var senderJID string var senderJID string
if message.Info.FromMe { if message.Info.FromMe {
@ -1426,18 +1438,19 @@ func (portal *Portal) HandleStubMessage(source *User, message whatsapp.StubMessa
case waProto.WebMessageInfo_GROUP_PARTICIPANT_DEMOTE: case waProto.WebMessageInfo_GROUP_PARTICIPANT_DEMOTE:
eventID = portal.ChangeAdminStatus(message.Params, false) eventID = portal.ChangeAdminStatus(message.Params, false)
default: default:
return return false
} }
if len(eventID) == 0 { if len(eventID) == 0 {
eventID = id.EventID(fmt.Sprintf("net.maunium.whatsapp.fake::%s", message.Info.Id)) eventID = id.EventID(fmt.Sprintf("net.maunium.whatsapp.fake::%s", message.Info.Id))
} }
portal.markHandled(source, message.Info.Source, eventID, true) portal.markHandled(source, message.Info.Source, eventID, true)
return true
} }
func (portal *Portal) HandleLocationMessage(source *User, message whatsapp.LocationMessage) { func (portal *Portal) HandleLocationMessage(source *User, message whatsapp.LocationMessage) bool {
intent := portal.startHandling(source, message.Info, "location") intent := portal.startHandling(source, message.Info, "location")
if intent == nil { if intent == nil {
return return false
} }
url := message.Url url := message.Url
@ -1488,15 +1501,16 @@ func (portal *Portal) HandleLocationMessage(source *User, message whatsapp.Locat
resp, err := portal.sendMessage(intent, event.EventMessage, content, int64(message.Info.Timestamp*1000)) resp, err := portal.sendMessage(intent, event.EventMessage, content, int64(message.Info.Timestamp*1000))
if err != nil { if err != nil {
portal.log.Errorfln("Failed to handle message %s: %v", message.Info.Id, err) portal.log.Errorfln("Failed to handle message %s: %v", message.Info.Id, err)
return } else {
portal.finishHandling(source, message.Info.Source, resp.EventID)
} }
portal.finishHandling(source, message.Info.Source, resp.EventID) return true
} }
func (portal *Portal) HandleContactMessage(source *User, message whatsapp.ContactMessage) { func (portal *Portal) HandleContactMessage(source *User, message whatsapp.ContactMessage) bool {
intent := portal.startHandling(source, message.Info, "contact") intent := portal.startHandling(source, message.Info, "contact")
if intent == nil { if intent == nil {
return return false
} }
fileName := fmt.Sprintf("%s.vcf", message.DisplayName) fileName := fmt.Sprintf("%s.vcf", message.DisplayName)
@ -1507,7 +1521,7 @@ func (portal *Portal) HandleContactMessage(source *User, message whatsapp.Contac
uploadResp, err := intent.UploadBytesWithName(data, uploadMimeType, fileName) uploadResp, err := intent.UploadBytesWithName(data, uploadMimeType, fileName)
if err != nil { if err != nil {
portal.log.Errorfln("Failed to upload vcard of %s: %v", message.DisplayName, err) portal.log.Errorfln("Failed to upload vcard of %s: %v", message.DisplayName, err)
return return true
} }
content := &event.MessageEventContent{ content := &event.MessageEventContent{
@ -1531,9 +1545,10 @@ func (portal *Portal) HandleContactMessage(source *User, message whatsapp.Contac
resp, err := portal.sendMessage(intent, event.EventMessage, content, int64(message.Info.Timestamp*1000)) resp, err := portal.sendMessage(intent, event.EventMessage, content, int64(message.Info.Timestamp*1000))
if err != nil { if err != nil {
portal.log.Errorfln("Failed to handle message %s: %v", message.Info.Id, err) portal.log.Errorfln("Failed to handle message %s: %v", message.Info.Id, err)
return } else {
portal.finishHandling(source, message.Info.Source, resp.EventID)
} }
portal.finishHandling(source, message.Info.Source, resp.EventID) return true
} }
func (portal *Portal) sendMediaBridgeFailure(source *User, intent *appservice.IntentAPI, info whatsapp.MessageInfo, bridgeErr error) { func (portal *Portal) sendMediaBridgeFailure(source *User, intent *appservice.IntentAPI, info whatsapp.MessageInfo, bridgeErr error) {
@ -1667,10 +1682,10 @@ type mediaMessage struct {
sendAsSticker bool sendAsSticker bool
} }
func (portal *Portal) HandleMediaMessage(source *User, msg mediaMessage) { func (portal *Portal) HandleMediaMessage(source *User, msg mediaMessage) bool {
intent := portal.startHandling(source, msg.info, fmt.Sprintf("media %s", msg.mimeType)) intent := portal.startHandling(source, msg.info, fmt.Sprintf("media %s", msg.mimeType))
if intent == nil { if intent == nil {
return return false
} }
data, err := msg.download() data, err := msg.download()
@ -1679,16 +1694,16 @@ func (portal *Portal) HandleMediaMessage(source *User, msg mediaMessage) {
_, err = source.Conn.LoadMediaInfo(msg.info.RemoteJid, msg.info.Id, msg.info.FromMe) _, err = source.Conn.LoadMediaInfo(msg.info.RemoteJid, msg.info.Id, msg.info.FromMe)
if err != nil { if err != nil {
portal.sendMediaBridgeFailure(source, intent, msg.info, fmt.Errorf("failed to load media info: %w", err)) portal.sendMediaBridgeFailure(source, intent, msg.info, fmt.Errorf("failed to load media info: %w", err))
return return true
} }
data, err = msg.download() data, err = msg.download()
} }
if err == whatsapp.ErrNoURLPresent { if err == whatsapp.ErrNoURLPresent {
portal.log.Debugfln("No URL present error for media message %s, ignoring...", msg.info.Id) portal.log.Debugfln("No URL present error for media message %s, ignoring...", msg.info.Id)
return return true
} else if err != nil { } else if err != nil {
portal.sendMediaBridgeFailure(source, intent, msg.info, err) portal.sendMediaBridgeFailure(source, intent, msg.info, err)
return return true
} }
var width, height int var width, height int
@ -1708,7 +1723,7 @@ func (portal *Portal) HandleMediaMessage(source *User, msg mediaMessage) {
} else { } else {
portal.sendMediaBridgeFailure(source, intent, msg.info, fmt.Errorf("failed to upload media: %w", err)) portal.sendMediaBridgeFailure(source, intent, msg.info, fmt.Errorf("failed to upload media: %w", err))
} }
return return true
} }
if msg.fileName == "" { if msg.fileName == "" {
@ -1790,7 +1805,7 @@ func (portal *Portal) HandleMediaMessage(source *User, msg mediaMessage) {
resp, err := portal.sendMessage(intent, eventType, content, ts) resp, err := portal.sendMessage(intent, eventType, content, ts)
if err != nil { if err != nil {
portal.log.Errorfln("Failed to handle message %s: %v", msg.info.Id, err) portal.log.Errorfln("Failed to handle message %s: %v", msg.info.Id, err)
return return true
} }
if len(msg.caption) > 0 { if len(msg.caption) > 0 {
@ -1808,6 +1823,7 @@ func (portal *Portal) HandleMediaMessage(source *User, msg mediaMessage) {
} }
portal.finishHandling(source, msg.info.Source, resp.EventID) portal.finishHandling(source, msg.info.Source, resp.EventID)
return true
} }
func makeMessageID() *string { func makeMessageID() *string {