From 331138bc6bd41d6cdb4d3f8c2f06f085beef659e Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Fri, 29 Apr 2022 11:07:05 +0300 Subject: [PATCH] Split forward backfilling and do it in one batch Multiple batches is complicated, as it would require figuring out where the batch ended when handling the next batch of new-old messages. --- historysync.go | 145 +++++++++++++++++++++++++------------------------ 1 file changed, 74 insertions(+), 71 deletions(-) diff --git a/historysync.go b/historysync.go index e0e47e7..e7c8c83 100644 --- a/historysync.go +++ b/historysync.go @@ -128,12 +128,12 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac portal.Update() } - user.createOrUpdatePortalAndBackfillWithLock(req, conv, portal) + user.backfillInChunks(req, conv, portal) } } } -func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill, conv *database.HistorySyncConversation, portal *Portal) { +func (user *User) backfillInChunks(req *database.Backfill, conv *database.HistorySyncConversation, portal *Portal) { portal.backfillLock.Lock() defer portal.backfillLock.Unlock() @@ -141,6 +141,22 @@ func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill return } + var forwardPrevID id.EventID + if req.BackfillType == database.BackfillForward { + // TODO this overrides the TimeStart set when enqueuing the backfill + // maybe the enqueue should instead include the prev event ID + lastMessage := portal.bridge.DB.Message.GetLastInChat(portal.Key) + forwardPrevID = lastMessage.MXID + start := lastMessage.Timestamp.Add(1 * time.Second) + req.TimeStart = &start + } else { + firstMessage := portal.bridge.DB.Message.GetFirstInChat(portal.Key) + if firstMessage != nil && (req.TimeEnd == nil || firstMessage.Timestamp.Before(*req.TimeEnd)) { + end := firstMessage.Timestamp.Add(-1 * time.Second) + req.TimeEnd = &end + user.log.Debugfln("Limiting backfill to end at %v", end) + } + } allMsgs := user.bridge.DB.HistorySyncQuery.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, req.TimeEnd, req.MaxTotalEvents) if len(allMsgs) == 0 { user.log.Debugfln("Not backfilling %s: no bridgeable messages found", portal.Key.JID) @@ -161,7 +177,7 @@ func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill var insertionEventIds []id.EventID for len(toBackfill) > 0 { var msgs []*waProto.WebMessageInfo - if len(toBackfill) <= req.MaxBatchEvents { + if len(toBackfill) <= req.MaxBatchEvents || req.MaxBatchEvents < 0 { msgs = toBackfill toBackfill = nil } else { @@ -172,7 +188,10 @@ func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill if len(msgs) > 0 { time.Sleep(time.Duration(req.BatchDelay) * time.Second) user.log.Debugfln("Backfilling %d messages in %s (queue ID: %d)", len(msgs), portal.Key.JID, req.QueueID) - insertionEventIds = append(insertionEventIds, portal.backfill(user, msgs)...) + resp := portal.backfill(user, msgs, req.BackfillType == database.BackfillForward, forwardPrevID) + if resp != nil { + insertionEventIds = append(insertionEventIds, resp.BaseInsertionEventID) + } } } user.log.Debugfln("Finished backfilling %d messages in %s (queue ID: %d)", len(allMsgs), portal.Key.JID, req.QueueID) @@ -297,6 +316,7 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History // Enqueue immediate backfills for the most recent messages first. user.EnqueueImmedateBackfills(portals) case waProto.HistorySync_FULL, waProto.HistorySync_RECENT: + user.EnqueueForwardBackfills(portals) // Enqueue deferred backfills as configured. user.EnqueueDeferredBackfills(portals) user.EnqueueMediaBackfills(portals) @@ -340,6 +360,18 @@ func (user *User) EnqueueDeferredBackfills(portals []*Portal) { } } +func (user *User) EnqueueForwardBackfills(portals []*Portal) { + for priority, portal := range portals { + lastMsg := user.bridge.DB.Message.GetLastInChat(portal.Key) + if lastMsg == nil { + continue + } + backfill := user.bridge.DB.BackfillQuery.NewWithValues( + user.MXID, database.BackfillForward, priority, &portal.Key, &lastMsg.Timestamp, nil, -1, -1, 0) + backfill.Insert() + } +} + func (user *User) EnqueueMediaBackfills(portals []*Portal) { numPortals := len(portals) for stageIdx, backfillStage := range user.bridge.Config.Bridge.HistorySync.Media { @@ -367,16 +399,26 @@ var ( HistorySyncMarker = event.Type{Type: "org.matrix.msc2716.marker", Class: event.MessageEventType} ) -func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) []id.EventID { - var historyBatch, newBatch mautrix.ReqBatchSend - var historyBatchInfos, newBatchInfos []*wrappedInfo +func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo, isForward bool, prevEventID id.EventID) *mautrix.RespBatchSend { + var req mautrix.ReqBatchSend + var infos []*wrappedInfo - firstMsgTimestamp := time.Unix(int64(messages[len(messages)-1].GetMessageTimestamp()), 0) + if !isForward { + if portal.FirstEventID != "" || portal.NextBatchID != "" { + req.PrevEventID = portal.FirstEventID + req.BatchID = portal.NextBatchID + } else { + portal.log.Warnfln("Can't backfill %d messages through %s to chat: first event ID not known", len(messages), source.MXID) + return nil + } + } else { + req.PrevEventID = prevEventID + } - historyBatch.StateEventsAtStart = make([]*event.Event, 0) - newBatch.StateEventsAtStart = make([]*event.Event, 0) + beforeFirstMessageTimestampMillis := (int64(messages[len(messages)-1].GetMessageTimestamp()) * 1000) - 1 + req.StateEventsAtStart = make([]*event.Event, 0) - addedMembers := make(map[id.UserID]*event.MemberEventContent) + addedMembers := make(map[id.UserID]struct{}) addMember := func(puppet *Puppet) { if _, alreadyAdded := addedMembers[puppet.MXID]; alreadyAdded { return @@ -389,41 +431,23 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) } inviteContent := content inviteContent.Membership = event.MembershipInvite - historyBatch.StateEventsAtStart = append(historyBatch.StateEventsAtStart, &event.Event{ + req.StateEventsAtStart = append(req.StateEventsAtStart, &event.Event{ Type: event.StateMember, Sender: portal.MainIntent().UserID, StateKey: &mxid, - Timestamp: firstMsgTimestamp.UnixMilli(), + Timestamp: beforeFirstMessageTimestampMillis, Content: event.Content{Parsed: &inviteContent}, }, &event.Event{ Type: event.StateMember, Sender: puppet.MXID, StateKey: &mxid, - Timestamp: firstMsgTimestamp.UnixMilli(), + Timestamp: beforeFirstMessageTimestampMillis, Content: event.Content{Parsed: &content}, }) - addedMembers[puppet.MXID] = &content + addedMembers[puppet.MXID] = struct{}{} } - firstMessage := portal.bridge.DB.Message.GetFirstInChat(portal.Key) - lastMessage := portal.bridge.DB.Message.GetLastInChat(portal.Key) - var historyMaxTs, newMinTs time.Time - - if portal.FirstEventID != "" || portal.NextBatchID != "" { - historyBatch.PrevEventID = portal.FirstEventID - historyBatch.BatchID = portal.NextBatchID - if firstMessage == nil && lastMessage == nil { - historyMaxTs = time.Now() - } else { - historyMaxTs = firstMessage.Timestamp - } - } - if lastMessage != nil { - newBatch.PrevEventID = lastMessage.MXID - newMinTs = lastMessage.Timestamp - } - - portal.log.Debugfln("Processing backfill with %d messages", len(messages)) + portal.log.Infofln("Processing history sync with %d messages (forward: %t)", len(messages), isForward) // The messages are ordered newest to oldest, so iterate them in reverse order. for i := len(messages) - 1; i >= 0; i-- { webMsg := messages[i] @@ -446,15 +470,6 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) if info == nil { continue } - var batch *mautrix.ReqBatchSend - var infos *[]*wrappedInfo - if !historyMaxTs.IsZero() && info.Timestamp.Before(historyMaxTs) { - batch, infos = &historyBatch, &historyBatchInfos - } else if !newMinTs.IsZero() && info.Timestamp.After(newMinTs) { - batch, infos = &newBatch, &newBatchInfos - } else { - continue - } if webMsg.GetPushName() != "" && webMsg.GetPushName() != "-" { existingContact, _ := source.Client.Store.Contacts.GetContact(info.Sender) if !existingContact.Found || existingContact.PushName == "" { @@ -484,13 +499,18 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) if len(converted.ReplyTo) > 0 { portal.SetReply(converted.Content, converted.ReplyTo) } - err := portal.appendBatchEvents(converted, info, webMsg.GetEphemeralStartTimestamp(), &batch.Events, infos) + err := portal.appendBatchEvents(converted, info, webMsg.GetEphemeralStartTimestamp(), &req.Events, &infos) if err != nil { portal.log.Errorfln("Error handling message %s during backfill: %v", info.ID, err) } } + portal.log.Infofln("Made %d Matrix events from messages in batch", len(req.Events)) - if (len(historyBatch.Events) > 0 && len(historyBatch.BatchID) == 0) || len(newBatch.Events) > 0 { + if len(req.Events) == 0 { + return nil + } + + if len(req.BatchID) == 0 || isForward { portal.log.Debugln("Sending a dummy event to avoid forward extremity errors with backfill") _, err := portal.MainIntent().SendMessageEvent(portal.MXID, PreBackfillDummyEvent, struct{}{}) if err != nil { @@ -498,33 +518,16 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) } } - var insertionEventIds []id.EventID - - if len(historyBatch.Events) > 0 && len(historyBatch.PrevEventID) > 0 { - portal.log.Infofln("Sending %d historical messages...", len(historyBatch.Events)) - historyResp, err := portal.MainIntent().BatchSend(portal.MXID, &historyBatch) - if err != nil { - portal.log.Errorln("Error sending batch of historical messages:", err) - } else { - insertionEventIds = append(insertionEventIds, historyResp.BaseInsertionEventID) - portal.finishBatch(historyResp.EventIDs, historyBatchInfos) - portal.NextBatchID = historyResp.NextBatchID - portal.Update() - } + resp, err := portal.MainIntent().BatchSend(portal.MXID, &req) + if err != nil { + portal.log.Errorln("Error batch sending messages:", err) + return nil + } else { + portal.finishBatch(resp.EventIDs, infos) + portal.NextBatchID = resp.NextBatchID + portal.Update() + return resp } - - if len(newBatch.Events) > 0 && len(newBatch.PrevEventID) > 0 { - portal.log.Infofln("Sending %d new messages...", len(newBatch.Events)) - newResp, err := portal.MainIntent().BatchSend(portal.MXID, &newBatch) - if err != nil { - portal.log.Errorln("Error sending batch of new messages:", err) - } else { - insertionEventIds = append(insertionEventIds, newResp.BaseInsertionEventID) - portal.finishBatch(newResp.EventIDs, newBatchInfos) - } - } - - return insertionEventIds } func (portal *Portal) parseWebMessageInfo(source *User, webMsg *waProto.WebMessageInfo) *types.MessageInfo {