Move read receipt handling to portal loop

This ensures that read receipts are only handled after the message
they're pointing at, even if the read receipt is sent immediately
(e.g. when the user has WhatsApp open on their phone).
This commit is contained in:
Tulir Asokan 2022-01-19 14:18:32 +02:00
parent f151dc4e9e
commit 9e39ce565b
2 changed files with 48 additions and 34 deletions

View file

@ -139,6 +139,7 @@ func (bridge *Bridge) NewManualPortal(key database.PortalKey) *Portal {
log: bridge.Log.Sub(fmt.Sprintf("Portal/%s", key)), log: bridge.Log.Sub(fmt.Sprintf("Portal/%s", key)),
messages: make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer), messages: make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer),
receipts: make(chan PortalReceipt, bridge.Config.Bridge.PortalMessageBuffer),
matrixMessages: make(chan PortalMatrixMessage, bridge.Config.Bridge.PortalMessageBuffer), matrixMessages: make(chan PortalMatrixMessage, bridge.Config.Bridge.PortalMessageBuffer),
} }
portal.Key = key portal.Key = key
@ -153,6 +154,7 @@ func (bridge *Bridge) NewPortal(dbPortal *database.Portal) *Portal {
log: bridge.Log.Sub(fmt.Sprintf("Portal/%s", dbPortal.Key)), log: bridge.Log.Sub(fmt.Sprintf("Portal/%s", dbPortal.Key)),
messages: make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer), messages: make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer),
receipts: make(chan PortalReceipt, bridge.Config.Bridge.PortalMessageBuffer),
matrixMessages: make(chan PortalMatrixMessage, bridge.Config.Bridge.PortalMessageBuffer), matrixMessages: make(chan PortalMatrixMessage, bridge.Config.Bridge.PortalMessageBuffer),
} }
go portal.handleMessageLoop() go portal.handleMessageLoop()
@ -176,6 +178,11 @@ type PortalMessage struct {
source *User source *User
} }
type PortalReceipt struct {
evt *events.Receipt
source *User
}
type PortalMatrixMessage struct { type PortalMatrixMessage struct {
evt *event.Event evt *event.Event
user *User user *User
@ -206,6 +213,7 @@ type Portal struct {
currentlyTypingLock sync.Mutex currentlyTypingLock sync.Mutex
messages chan PortalMessage messages chan PortalMessage
receipts chan PortalReceipt
matrixMessages chan PortalMatrixMessage matrixMessages chan PortalMatrixMessage
relayUser *User relayUser *User
@ -248,11 +256,50 @@ func (portal *Portal) handleMatrixMessageLoopItem(msg PortalMatrixMessage) {
} }
} }
func (portal *Portal) handleReceipt(receipt *events.Receipt, source *User) {
// The order of the message ID array depends on the sender's platform, so we just have to find
// the last message based on timestamp. Also, timestamps only have second precision, so if
// there are many messages at the same second just mark them all as read, because we don't
// know which one is last
markAsRead := make([]*database.Message, 0, 1)
var bestTimestamp time.Time
for _, msgID := range receipt.MessageIDs {
msg := portal.bridge.DB.Message.GetByJID(portal.Key, msgID)
if msg == nil || msg.IsFakeMXID() {
continue
}
if msg.Timestamp.After(bestTimestamp) {
bestTimestamp = msg.Timestamp
markAsRead = append(markAsRead[:0], msg)
} else if msg != nil && msg.Timestamp.Equal(bestTimestamp) {
markAsRead = append(markAsRead, msg)
}
}
if receipt.Sender.User == source.JID.User {
if len(markAsRead) > 0 {
source.SetLastReadTS(portal.Key, markAsRead[0].Timestamp)
} else {
source.SetLastReadTS(portal.Key, receipt.Timestamp)
}
}
intent := portal.bridge.GetPuppetByJID(receipt.Sender).IntentFor(portal)
for _, msg := range markAsRead {
err := intent.SetReadMarkers(portal.MXID, makeReadMarkerContent(msg.MXID, intent.IsCustomPuppet))
if err != nil {
portal.log.Warnfln("Failed to mark message %s as read by %s: %v", msg.MXID, intent.UserID, err)
} else {
portal.log.Debugfln("Marked %s as read by %s", msg.MXID, intent.UserID)
}
}
}
func (portal *Portal) handleMessageLoop() { func (portal *Portal) handleMessageLoop() {
for { for {
select { select {
case msg := <-portal.messages: case msg := <-portal.messages:
portal.handleMessageLoopItem(msg) portal.handleMessageLoopItem(msg)
case receipt := <-portal.receipts:
portal.handleReceipt(receipt.evt, receipt.source)
case msg := <-portal.matrixMessages: case msg := <-portal.matrixMessages:
portal.handleMatrixMessageLoopItem(msg) portal.handleMatrixMessageLoopItem(msg)
} }

35
user.go
View file

@ -851,40 +851,7 @@ func (user *User) handleReceipt(receipt *events.Receipt) {
if portal == nil || len(portal.MXID) == 0 { if portal == nil || len(portal.MXID) == 0 {
return return
} }
// The order of the message ID array depends on the sender's platform, so we just have to find portal.receipts <- PortalReceipt{evt: receipt, source: user}
// the last message based on timestamp. Also, timestamps only have second precision, so if
// there are many messages at the same second just mark them all as read, because we don't
// know which one is last
markAsRead := make([]*database.Message, 0, 1)
var bestTimestamp time.Time
for _, msgID := range receipt.MessageIDs {
msg := user.bridge.DB.Message.GetByJID(portal.Key, msgID)
if msg == nil || msg.IsFakeMXID() {
continue
}
if msg.Timestamp.After(bestTimestamp) {
bestTimestamp = msg.Timestamp
markAsRead = append(markAsRead[:0], msg)
} else if msg != nil && msg.Timestamp.Equal(bestTimestamp) {
markAsRead = append(markAsRead, msg)
}
}
if receipt.Sender.User == user.JID.User {
if len(markAsRead) > 0 {
user.SetLastReadTS(portal.Key, markAsRead[0].Timestamp)
} else {
user.SetLastReadTS(portal.Key, receipt.Timestamp)
}
}
intent := user.bridge.GetPuppetByJID(receipt.Sender).IntentFor(portal)
for _, msg := range markAsRead {
err := intent.SetReadMarkers(portal.MXID, makeReadMarkerContent(msg.MXID, intent.IsCustomPuppet))
if err != nil {
user.log.Warnfln("Failed to mark message %s as read by %s: %v", msg.MXID, intent.UserID, err)
} else {
user.log.Debugfln("Marked %s as read by %s", msg.MXID, intent.UserID)
}
}
} }
func makeReadMarkerContent(eventID id.EventID, doublePuppet bool) CustomReadMarkers { func makeReadMarkerContent(eventID id.EventID, doublePuppet bool) CustomReadMarkers {