Add log to find which part of message handling takes long

This commit is contained in:
Tulir Asokan 2022-07-11 14:20:31 +03:00
parent a2d808626f
commit 73f4449761
5 changed files with 105 additions and 26 deletions

4
go.mod
View file

@ -10,12 +10,12 @@ require (
github.com/prometheus/client_golang v1.12.2-0.20220613221938-ebd77f036066
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
github.com/tidwall/gjson v1.14.1
go.mau.fi/whatsmeow v0.0.0-20220709074059-aef48d107b24
go.mau.fi/whatsmeow v0.0.0-20220711111122-6340068d67de
golang.org/x/image v0.0.0-20220617043117-41969df76e82
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e
google.golang.org/protobuf v1.28.0
maunium.net/go/maulogger/v2 v2.3.2
maunium.net/go/mautrix v0.11.1-0.20220708152210-84b367ef7282
maunium.net/go/mautrix v0.11.1-0.20220711103551-a5a1e7e5df84
)
require (

8
go.sum
View file

@ -64,8 +64,8 @@ github.com/yuin/goldmark v1.4.12 h1:6hffw6vALvEDqJ19dOJvJKOoAOKe4NDaTqvd2sktGN0=
github.com/yuin/goldmark v1.4.12/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.mau.fi/libsignal v0.0.0-20220628090436-4d18b66b087e h1:ByHDg+D+dMIGuBA2n+1xOUf4xr3FJFYg8yxl06s1YBE=
go.mau.fi/libsignal v0.0.0-20220628090436-4d18b66b087e/go.mod h1:RCdzkTWSJv0AKGqurzPXJsEGIVMuQps3E/h7CMUPous=
go.mau.fi/whatsmeow v0.0.0-20220709074059-aef48d107b24 h1:hjq62uZNfGZ8mdGPyLV196ww/65f881WnII5n5sy7BM=
go.mau.fi/whatsmeow v0.0.0-20220709074059-aef48d107b24/go.mod h1:hsjqq2xLuoFew8vbsDCJcGf5EbXCRcR/yoQ+87w6m3k=
go.mau.fi/whatsmeow v0.0.0-20220711111122-6340068d67de h1:ZrxHSdpUGODtCtq/0A6CXisEgWtcNwK6BGG4f+WVTlM=
go.mau.fi/whatsmeow v0.0.0-20220711111122-6340068d67de/go.mod h1:hsjqq2xLuoFew8vbsDCJcGf5EbXCRcR/yoQ+87w6m3k=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/image v0.0.0-20220617043117-41969df76e82 h1:KpZB5pUSBvrHltNEdK/tw0xlPeD13M6M6aGP32gKqiw=
@ -108,5 +108,5 @@ maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M=
maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA=
maunium.net/go/maulogger/v2 v2.3.2 h1:1XmIYmMd3PoQfp9J+PaHhpt80zpfmMqaShzUTC7FwY0=
maunium.net/go/maulogger/v2 v2.3.2/go.mod h1:TYWy7wKwz/tIXTpsx8G3mZseIRiC5DoMxSZazOHy68A=
maunium.net/go/mautrix v0.11.1-0.20220708152210-84b367ef7282 h1:nvYmIHiOeiS7nhsKbTeMPCCvyBkF//SF+kQhHjbPwjA=
maunium.net/go/mautrix v0.11.1-0.20220708152210-84b367ef7282/go.mod h1:85mjebfgKX7jjca7XNKTt7lHueX3YQsFUU+5o/FxpTw=
maunium.net/go/mautrix v0.11.1-0.20220711103551-a5a1e7e5df84 h1:Zhx97T0nWGKF8phXeQ/uws6fnntc9c8WIvQ4yh/fgPU=
maunium.net/go/mautrix v0.11.1-0.20220711103551-a5a1e7e5df84/go.mod h1:85mjebfgKX7jjca7XNKTt7lHueX3YQsFUU+5o/FxpTw=

View file

@ -21,9 +21,11 @@ import (
"errors"
"fmt"
"sync"
"time"
log "maunium.net/go/maulogger/v2"
"go.mau.fi/whatsmeow"
log "maunium.net/go/maulogger/v2"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/bridge"
@ -214,6 +216,65 @@ func (portal *Portal) sendMessageMetrics(evt *event.Event, err error, part strin
})
}
}
if ms != nil {
portal.log.Debugfln("Timings for %s: %s", evt.ID, ms.timings.String())
}
}
type messageTimings struct {
initReceive time.Duration
decrypt time.Duration
implicitRR time.Duration
portalQueue time.Duration
totalReceive time.Duration
preproc time.Duration
convert time.Duration
whatsmeow whatsmeow.MessageDebugTimings
totalSend time.Duration
}
func niceRound(dur time.Duration) time.Duration {
switch {
case dur < time.Millisecond:
return dur
case dur < time.Second:
return dur.Round(100 * time.Microsecond)
default:
return dur.Round(time.Millisecond)
}
}
func (mt *messageTimings) String() string {
mt.initReceive = niceRound(mt.initReceive)
mt.decrypt = niceRound(mt.decrypt)
mt.portalQueue = niceRound(mt.portalQueue)
mt.totalReceive = niceRound(mt.totalReceive)
mt.implicitRR = niceRound(mt.implicitRR)
mt.preproc = niceRound(mt.preproc)
mt.convert = niceRound(mt.convert)
mt.whatsmeow.Queue = niceRound(mt.whatsmeow.Queue)
mt.whatsmeow.Marshal = niceRound(mt.whatsmeow.Marshal)
mt.whatsmeow.GetParticipants = niceRound(mt.whatsmeow.GetParticipants)
mt.whatsmeow.GetDevices = niceRound(mt.whatsmeow.GetDevices)
mt.whatsmeow.GroupEncrypt = niceRound(mt.whatsmeow.GroupEncrypt)
mt.whatsmeow.PeerEncrypt = niceRound(mt.whatsmeow.PeerEncrypt)
mt.whatsmeow.Send = niceRound(mt.whatsmeow.Send)
mt.whatsmeow.Resp = niceRound(mt.whatsmeow.Resp)
mt.whatsmeow.Retry = niceRound(mt.whatsmeow.Retry)
mt.totalSend = niceRound(mt.totalSend)
whatsmeowTimings := "N/A"
if mt.totalSend > 0 {
format := "queue: %[1]s, marshal: %[2]s, ske: %[3]s, pcp: %[4]s, dev: %[5]s, encrypt: %[6]s, send: %[7]s, resp: %[8]s"
if mt.whatsmeow.GetParticipants == 0 && mt.whatsmeow.GroupEncrypt == 0 {
format = "queue: %[1]s, marshal: %[2]s, dev: %[5]s, encrypt: %[6]s, send: %[7]s, resp: %[8]s"
}
if mt.whatsmeow.Retry > 0 {
format += ", retry: %[9]s"
}
whatsmeowTimings = fmt.Sprintf(format, mt.whatsmeow.Queue, mt.whatsmeow.Marshal, mt.whatsmeow.GroupEncrypt, mt.whatsmeow.GetParticipants, mt.whatsmeow.GetDevices, mt.whatsmeow.PeerEncrypt, mt.whatsmeow.Send, mt.whatsmeow.Resp, mt.whatsmeow.Retry)
}
return fmt.Sprintf("BRIDGE: receive: %s, decrypt: %s, queue: %s, total hs->portal: %s, implicit rr: %s -- PORTAL: preprocess: %s, convert: %s, total send: %s -- WHATSMEOW: %s", mt.initReceive, mt.decrypt, mt.implicitRR, mt.portalQueue, mt.totalReceive, mt.preproc, mt.convert, mt.totalSend, whatsmeowTimings)
}
type metricSender struct {
@ -222,6 +283,7 @@ type metricSender struct {
lock sync.Mutex
completed bool
retryNum int
timings *messageTimings
}
func (ms *metricSender) getRetryNum() int {

View file

@ -100,7 +100,7 @@ func (portal *Portal) MarkEncrypted() {
func (portal *Portal) ReceiveMatrixEvent(user bridge.User, evt *event.Event) {
if user.GetPermissionLevel() >= bridgeconfig.PermissionLevelUser || portal.HasRelaybot() {
portal.matrixMessages <- PortalMatrixMessage{user: user.(*User), evt: evt}
portal.matrixMessages <- PortalMatrixMessage{user: user.(*User), evt: evt, receivedAt: time.Now()}
}
}
@ -216,8 +216,9 @@ type PortalMessage struct {
}
type PortalMatrixMessage struct {
evt *event.Event
user *User
evt *event.Event
user *User
receivedAt time.Time
}
type PortalMediaRetry struct {
@ -290,10 +291,19 @@ func (portal *Portal) handleMessageLoopItem(msg PortalMessage) {
}
func (portal *Portal) handleMatrixMessageLoopItem(msg PortalMatrixMessage) {
portal.handleMatrixReadReceipt(msg.user, "", time.UnixMilli(msg.evt.Timestamp), false)
evtTS := time.UnixMilli(msg.evt.Timestamp)
timings := messageTimings{
initReceive: msg.evt.Mautrix.ReceivedAt.Sub(evtTS),
decrypt: msg.evt.Mautrix.DecryptionDuration,
portalQueue: time.Since(msg.receivedAt),
totalReceive: time.Since(evtTS),
}
implicitRRStart := time.Now()
portal.handleMatrixReadReceipt(msg.user, "", evtTS, false)
timings.implicitRR = time.Since(implicitRRStart)
switch msg.evt.Type {
case event.EventMessage, event.EventSticker:
portal.HandleMatrixMessage(msg.user, msg.evt)
portal.HandleMatrixMessage(msg.user, msg.evt, timings)
case event.EventRedaction:
portal.HandleMatrixRedaction(msg.user, msg.evt)
case event.EventReaction:
@ -3084,18 +3094,19 @@ func (portal *Portal) generateMessageInfo(sender *User) *types.MessageInfo {
}
}
func (portal *Portal) HandleMatrixMessage(sender *User, evt *event.Event) {
func (portal *Portal) HandleMatrixMessage(sender *User, evt *event.Event, timings messageTimings) {
start := time.Now()
ms := metricSender{portal: portal, timings: &timings}
if err := portal.canBridgeFrom(sender, true); err != nil {
go portal.sendMessageMetrics(evt, err, "Ignoring", nil)
go ms.sendMessageMetrics(evt, err, "Ignoring", true)
return
} else if portal.Key.JID == types.StatusBroadcastJID && portal.bridge.Config.Bridge.DisableStatusBroadcastSend {
go portal.sendMessageMetrics(evt, errBroadcastSendDisabled, "Ignoring", nil)
go ms.sendMessageMetrics(evt, errBroadcastSendDisabled, "Ignoring", true)
return
}
messageAge := time.Since(time.UnixMilli(evt.Timestamp))
ms := metricSender{portal: portal}
messageAge := timings.totalReceive
origEvtID := evt.ID
var dbMsg *database.Message
if retryMeta := evt.Content.AsMessage().MessageSendRetry; retryMeta != nil {
@ -3135,7 +3146,10 @@ func (portal *Portal) HandleMatrixMessage(sender *User, evt *event.Event) {
defer cancel()
}
timings.preproc = time.Since(start)
start = time.Now()
msg, sender, err := portal.convertMatrixMessage(ctx, sender, evt)
timings.convert = time.Since(start)
if msg == nil {
go ms.sendMessageMetrics(evt, err, "Error converting", true)
return
@ -3148,10 +3162,13 @@ func (portal *Portal) HandleMatrixMessage(sender *User, evt *event.Event) {
info.ID = dbMsg.JID
}
portal.log.Debugln("Sending event", evt.ID, "to WhatsApp", info.ID)
ts, err := sender.Client.SendMessage(ctx, portal.Key.JID, info.ID, msg)
start = time.Now()
resp, err := sender.Client.SendMessage(ctx, portal.Key.JID, info.ID, msg)
timings.totalSend = time.Since(start)
timings.whatsmeow = resp.DebugTimings
go ms.sendMessageMetrics(evt, err, "Error sending", true)
if err == nil {
dbMsg.MarkSent(ts)
dbMsg.MarkSent(resp.Timestamp)
}
}
@ -3195,14 +3212,14 @@ func (portal *Portal) handleMatrixReaction(sender *User, evt *event.Event) error
dbMsg := portal.markHandled(nil, nil, info, evt.ID, false, true, database.MsgReaction, database.MsgNoError)
portal.upsertReaction(nil, target.JID, sender.JID, evt.ID, info.ID)
portal.log.Debugln("Sending reaction", evt.ID, "to WhatsApp", info.ID)
ts, err := portal.sendReactionToWhatsApp(sender, info.ID, target, content.RelatesTo.Key, evt.Timestamp)
resp, err := portal.sendReactionToWhatsApp(sender, info.ID, target, content.RelatesTo.Key, evt.Timestamp)
if err == nil {
dbMsg.MarkSent(ts)
dbMsg.MarkSent(resp.Timestamp)
}
return err
}
func (portal *Portal) sendReactionToWhatsApp(sender *User, id types.MessageID, target *database.Message, key string, timestamp int64) (time.Time, error) {
func (portal *Portal) sendReactionToWhatsApp(sender *User, id types.MessageID, target *database.Message, key string, timestamp int64) (whatsmeow.SendResponse, error) {
var messageKeyParticipant *string
if !portal.IsPrivateChat() {
messageKeyParticipant = proto.String(target.Sender.ToNonAD().String())

View file

@ -673,7 +673,7 @@ func (user *User) sendHackyPhonePing() {
} else {
user.log.Warnfln("Failed to get last app state key ID to send hacky phone ping: %v - sending empty request", err)
}
ts, err := user.Client.SendMessage(context.Background(), user.JID.ToNonAD(), msgID, &waProto.Message{
resp, err := user.Client.SendMessage(context.Background(), user.JID.ToNonAD(), msgID, &waProto.Message{
ProtocolMessage: &waProto.ProtocolMessage{
Type: waProto.ProtocolMessage_APP_STATE_SYNC_KEY_REQUEST.Enum(),
AppStateSyncKeyRequest: &waProto.AppStateSyncKeyRequest{
@ -684,8 +684,8 @@ func (user *User) sendHackyPhonePing() {
if err != nil {
user.log.Warnfln("Failed to send hacky phone ping: %v", err)
} else {
user.log.Debugfln("Sent hacky phone ping %s/%s because phone has been offline for >10 days", msgID, ts.Unix())
user.PhoneLastPinged = ts
user.log.Debugfln("Sent hacky phone ping %s/%s because phone has been offline for >10 days", msgID, resp.Timestamp.Unix())
user.PhoneLastPinged = resp.Timestamp
user.Update()
}
}