Fix handling new messages during initial portal backfill

This commit is contained in:
Tulir Asokan 2020-05-28 20:35:43 +03:00
parent 7cf19b0908
commit c72610f3f0
3 changed files with 19 additions and 9 deletions

View file

@ -59,7 +59,6 @@ func (mx *MatrixHandler) HandleEncryption(evt *event.Event) {
return return
} }
portal := mx.bridge.GetPortalByMXID(evt.RoomID) portal := mx.bridge.GetPortalByMXID(evt.RoomID)
mx.log.Debugln(portal)
if portal != nil && !portal.Encrypted { if portal != nil && !portal.Encrypted {
mx.log.Debugfln("%s enabled encryption in %s", evt.Sender, evt.RoomID) mx.log.Debugfln("%s enabled encryption in %s", evt.Sender, evt.RoomID)
portal.Encrypted = true portal.Encrypted = true

View file

@ -173,6 +173,7 @@ func (portal *Portal) handleMessageLoop() {
portal.log.Debugln("Not creating portal room for incoming message as the message is too old.") portal.log.Debugln("Not creating portal room for incoming message as the message is too old.")
continue continue
} }
portal.log.Debugln("Creating Matrix room from incoming message")
err := portal.CreateMatrixRoom(msg.source) err := portal.CreateMatrixRoom(msg.source)
if err != nil { if err != nil {
portal.log.Errorln("Failed to create portal room:", err) portal.log.Errorln("Failed to create portal room:", err)
@ -261,14 +262,20 @@ func (portal *Portal) markHandled(source *User, message *waProto.WebMessageInfo,
} }
func (portal *Portal) startHandling(info whatsapp.MessageInfo) bool { func (portal *Portal) startHandling(info whatsapp.MessageInfo) bool {
if portal.lastMessageTs > info.Timestamp+1 || // TODO these should all be trace logs
portal.isRecentlyHandled(info.Id) || if portal.lastMessageTs > info.Timestamp+1 {
portal.isDuplicate(info.Id) { portal.log.Debugfln("Not handling %s: message is older (%d) than last bridge message (%d)", info.Id, info.Timestamp, portal.lastMessageTs)
return false } else if portal.isRecentlyHandled(info.Id) {
} portal.log.Debugfln("Not handling %s: message was recently handled", info.Id)
} else if portal.isDuplicate(info.Id) {
portal.log.Debugfln("Not handling %s: message is duplicate", info.Id)
} else {
portal.log.Debugfln("Starting handling of %s (ts: %d)", info.Id, info.Timestamp)
portal.lastMessageTs = info.Timestamp portal.lastMessageTs = info.Timestamp
return true return true
} }
return false
}
func (portal *Portal) finishHandling(source *User, message *waProto.WebMessageInfo, mxid id.EventID) { func (portal *Portal) finishHandling(source *User, message *waProto.WebMessageInfo, mxid id.EventID) {
portal.markHandled(source, message, mxid) portal.markHandled(source, message, mxid)
@ -795,6 +802,9 @@ func (portal *Portal) CreateMatrixRoom(user *User) error {
} }
portal.MXID = resp.RoomID portal.MXID = resp.RoomID
portal.Update() portal.Update()
portal.bridge.portalsLock.Lock()
portal.bridge.portalsByMXID[portal.MXID] = portal
portal.bridge.portalsLock.Unlock()
// We set the memberships beforehand to make sure the encryption key exchange in initial backfill knows the users are here. // We set the memberships beforehand to make sure the encryption key exchange in initial backfill knows the users are here.
for _, user := range invite { for _, user := range invite {
@ -1548,10 +1558,12 @@ func (portal *Portal) HandleMatrixRedaction(sender *User, evt *event.Event) {
func (portal *Portal) Delete() { func (portal *Portal) Delete() {
portal.Portal.Delete() portal.Portal.Delete()
portal.bridge.portalsLock.Lock()
delete(portal.bridge.portalsByJID, portal.Key) delete(portal.bridge.portalsByJID, portal.Key)
if len(portal.MXID) > 0 { if len(portal.MXID) > 0 {
delete(portal.bridge.portalsByMXID, portal.MXID) delete(portal.bridge.portalsByMXID, portal.MXID)
} }
portal.bridge.portalsLock.Unlock()
} }
func (portal *Portal) Cleanup(puppetsOnly bool) { func (portal *Portal) Cleanup(puppetsOnly bool) {

View file

@ -145,8 +145,6 @@ func (user *User) GetPortals() []*Portal {
portals := make([]*Portal, len(keys)) portals := make([]*Portal, len(keys))
user.bridge.portalsLock.Lock() user.bridge.portalsLock.Lock()
defer user.bridge.portalsLock.Unlock()
for i, key := range keys { for i, key := range keys {
portal, ok := user.bridge.portalsByJID[key] portal, ok := user.bridge.portalsByJID[key]
if !ok { if !ok {
@ -154,6 +152,7 @@ func (user *User) GetPortals() []*Portal {
} }
portals[i] = portal portals[i] = portal
} }
user.bridge.portalsLock.Unlock()
return portals return portals
} }