Add mutex to block new messages in specific backfill cases

Specifically, when sending events using the `com.beeper.new_messages`
query param, normal new messages shouldn't be processed, as ordering
might get messed up.

3a264b77ed
This commit is contained in:
Tulir Asokan 2022-05-24 13:39:29 +03:00
parent dcb1152382
commit 1185b4bb71
2 changed files with 14 additions and 1 deletions

View file

@ -145,6 +145,7 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor
var forwardPrevID id.EventID var forwardPrevID id.EventID
var timeEnd *time.Time var timeEnd *time.Time
var isLatestEvents bool var isLatestEvents bool
portal.latestEventBackfillLock.Lock()
if req.BackfillType == database.BackfillForward { if req.BackfillType == database.BackfillForward {
// TODO this overrides the TimeStart set when enqueuing the backfill // TODO this overrides the TimeStart set when enqueuing the backfill
// maybe the enqueue should instead include the prev event ID // maybe the enqueue should instead include the prev event ID
@ -165,6 +166,14 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor
isLatestEvents = true isLatestEvents = true
} }
} }
if !isLatestEvents {
// We'll use normal batch sending, so no need to keep blocking new message processing
portal.latestEventBackfillLock.Unlock()
} else {
// This might involve sending events at the end of the room as non-historical events,
// make sure we don't process messages until this is done.
defer portal.latestEventBackfillLock.Unlock()
}
allMsgs := user.bridge.DB.HistorySync.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, timeEnd, req.MaxTotalEvents) allMsgs := user.bridge.DB.HistorySync.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, timeEnd, req.MaxTotalEvents)
sendDisappearedNotice := false sendDisappearedNotice := false
@ -255,7 +264,7 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor
if conv.EndOfHistoryTransferType == waProto.Conversation_COMPLETE_BUT_MORE_MESSAGES_REMAIN_ON_PRIMARY { if conv.EndOfHistoryTransferType == waProto.Conversation_COMPLETE_BUT_MORE_MESSAGES_REMAIN_ON_PRIMARY {
// Since there are more messages on the phone, but we can't // Since there are more messages on the phone, but we can't
// backfilll any more of them, indicate that the last timestamp // backfill any more of them, indicate that the last timestamp
// that we expect to be backfilled is the oldest one that was just // that we expect to be backfilled is the oldest one that was just
// backfilled. // backfilled.
backfillState.FirstExpectedTimestamp = allMsgs[len(allMsgs)-1].GetMessageTimestamp() backfillState.FirstExpectedTimestamp = allMsgs[len(allMsgs)-1].GetMessageTimestamp()

View file

@ -236,6 +236,8 @@ type Portal struct {
backfillLock sync.Mutex backfillLock sync.Mutex
avatarLock sync.Mutex avatarLock sync.Mutex
latestEventBackfillLock sync.Mutex
recentlyHandled [recentlyHandledLength]recentlyHandledWrapper recentlyHandled [recentlyHandledLength]recentlyHandledWrapper
recentlyHandledLock sync.Mutex recentlyHandledLock sync.Mutex
recentlyHandledIndex uint8 recentlyHandledIndex uint8
@ -267,6 +269,8 @@ func (portal *Portal) handleMessageLoopItem(msg PortalMessage) {
return return
} }
} }
portal.latestEventBackfillLock.Lock()
defer portal.latestEventBackfillLock.Unlock()
if msg.evt != nil { if msg.evt != nil {
portal.handleMessage(msg.source, msg.evt) portal.handleMessage(msg.source, msg.evt)
} else if msg.receipt != nil { } else if msg.receipt != nil {