historysync: lock earlier to prevent races and duplicate messages

Closes BRI-2709
This commit is contained in:
Sumner Evans 2022-04-04 16:07:25 -06:00
parent 54534f6b42
commit 748c9509a1
No known key found for this signature in database
GPG key ID: 8904527AB50022FD

View file

@ -100,64 +100,71 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac
portal.Update() portal.Update()
} }
if !user.shouldCreatePortalForHistorySync(conv, portal) { user.createOrUpdatePortalAndBackfillWithLock(req, conv, portal)
continue }
}
func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill, conv *database.HistorySyncConversation, portal *Portal) {
portal.backfillLock.Lock()
defer portal.backfillLock.Unlock()
if !user.shouldCreatePortalForHistorySync(conv, portal) {
return
}
if len(portal.MXID) == 0 {
user.log.Debugln("Creating portal for", portal.Key.JID, "as part of history sync handling")
err := portal.CreateMatrixRoom(user, nil, true, false)
if err != nil {
user.log.Warnfln("Failed to create room for %s during backfill: %v", portal.Key.JID, err)
return
} }
} else {
portal.UpdateMatrixRoom(user, nil)
}
if len(portal.MXID) == 0 { allMsgs := user.bridge.DB.HistorySyncQuery.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, req.TimeEnd, req.MaxTotalEvents)
user.log.Debugln("Creating portal for", portal.Key.JID, "as part of history sync handling")
err := portal.CreateMatrixRoom(user, nil, true, false) if len(allMsgs) > 0 {
if err != nil { user.log.Debugf("Backfilling %d messages in %s, %d messages at a time", len(allMsgs), portal.Key.JID, req.MaxBatchEvents)
user.log.Warnfln("Failed to create room for %s during backfill: %v", portal.Key.JID, err) toBackfill := allMsgs[0:]
continue insertionEventIds := []id.EventID{}
for {
if len(toBackfill) == 0 {
break
}
var msgs []*database.WrappedWebMessageInfo
if len(toBackfill) <= req.MaxBatchEvents {
msgs = toBackfill
toBackfill = toBackfill[0:0]
} else {
msgs = toBackfill[len(toBackfill)-req.MaxBatchEvents:]
toBackfill = toBackfill[:len(toBackfill)-req.MaxBatchEvents]
}
if len(msgs) > 0 {
time.Sleep(time.Duration(req.BatchDelay) * time.Second)
user.log.Debugf("Backfilling %d messages in %s", len(msgs), portal.Key.JID)
insertionEventIds = append(insertionEventIds, portal.backfill(user, msgs)...)
} }
} else {
portal.UpdateMatrixRoom(user, nil)
} }
user.log.Debugf("Finished backfilling %d messages in %s", len(allMsgs), portal.Key.JID)
allMsgs := user.bridge.DB.HistorySyncQuery.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, req.TimeEnd, req.MaxTotalEvents) if len(insertionEventIds) > 0 {
portal.sendPostBackfillDummy(
if len(allMsgs) > 0 { time.Unix(int64(allMsgs[len(allMsgs)-1].Message.GetMessageTimestamp()), 0),
user.log.Debugf("Backfilling %d messages in %s, %d messages at a time", len(allMsgs), portal.Key.JID, req.MaxBatchEvents) insertionEventIds[0])
toBackfill := allMsgs[0:]
insertionEventIds := []id.EventID{}
for {
if len(toBackfill) == 0 {
break
}
var msgs []*database.WrappedWebMessageInfo
if len(toBackfill) <= req.MaxBatchEvents {
msgs = toBackfill
toBackfill = toBackfill[0:0]
} else {
msgs = toBackfill[len(toBackfill)-req.MaxBatchEvents:]
toBackfill = toBackfill[:len(toBackfill)-req.MaxBatchEvents]
}
if len(msgs) > 0 {
time.Sleep(time.Duration(req.BatchDelay) * time.Second)
user.log.Debugf("Backfilling %d messages in %s", len(msgs), portal.Key.JID)
insertionEventIds = append(insertionEventIds, portal.backfill(user, msgs)...)
}
}
user.log.Debugf("Finished backfilling %d messages in %s", len(allMsgs), portal.Key.JID)
if len(insertionEventIds) > 0 {
portal.sendPostBackfillDummy(
time.Unix(int64(allMsgs[len(allMsgs)-1].Message.GetMessageTimestamp()), 0),
insertionEventIds[0])
}
user.log.Debugf("Deleting %d history sync messages after backfilling", len(allMsgs))
err := user.bridge.DB.HistorySyncQuery.DeleteMessages(allMsgs)
if err != nil {
user.log.Warnf("Failed to delete %d history sync messages after backfilling: %v", len(allMsgs), err)
}
} else {
user.log.Debugfln("Not backfilling %s: no bridgeable messages found", portal.Key.JID)
} }
if !conv.MarkedAsUnread && conv.UnreadCount == 0 { user.log.Debugf("Deleting %d history sync messages after backfilling", len(allMsgs))
user.markSelfReadFull(portal) err := user.bridge.DB.HistorySyncQuery.DeleteMessages(allMsgs)
if err != nil {
user.log.Warnf("Failed to delete %d history sync messages after backfilling: %v", len(allMsgs), err)
} }
} else {
user.log.Debugfln("Not backfilling %s: no bridgeable messages found", portal.Key.JID)
}
if !conv.MarkedAsUnread && conv.UnreadCount == 0 {
user.markSelfReadFull(portal)
} }
} }
@ -298,9 +305,6 @@ var (
) )
func (portal *Portal) backfill(source *User, messages []*database.WrappedWebMessageInfo) []id.EventID { func (portal *Portal) backfill(source *User, messages []*database.WrappedWebMessageInfo) []id.EventID {
portal.backfillLock.Lock()
defer portal.backfillLock.Unlock()
var historyBatch, newBatch mautrix.ReqBatchSend var historyBatch, newBatch mautrix.ReqBatchSend
var historyBatchInfos, newBatchInfos []*wrappedInfo var historyBatchInfos, newBatchInfos []*wrappedInfo