diff --git a/historysync.go b/historysync.go index 36baa1b..5fece97 100644 --- a/historysync.go +++ b/historysync.go @@ -17,6 +17,7 @@ package main import ( + "fmt" "sort" "sync" "time" @@ -34,12 +35,6 @@ import ( // region User history sync handling -const ( - FastBackfillPortalCount = 20 - FastBackfillMessageCount = 20 - FastBackfillMessageCap = 30 -) - type portalToBackfill struct { portal *Portal conv *waProto.Conversation @@ -85,52 +80,26 @@ func (user *User) handleHistorySync(evt *waProto.HistorySync) { if evt == nil || evt.SyncType == nil || evt.GetSyncType() == waProto.HistorySync_INITIAL_STATUS_V3 || evt.GetSyncType() == waProto.HistorySync_PUSH_NAME { return } - user.log.Infofln("Handling history sync with type %s, %d conversations, chunk order %d, progress %d%%", evt.GetSyncType(), len(evt.GetConversations()), evt.GetChunkOrder(), evt.GetProgress()) + description := fmt.Sprintf("type %s, %d conversations, chunk order %d, progress %d%%", evt.GetSyncType(), len(evt.GetConversations()), evt.GetChunkOrder(), evt.GetProgress()) + user.log.Infoln("Handling history sync with", description) conversations := conversationList(evt.GetConversations()) // We want to handle recent conversations first sort.Sort(sort.Reverse(conversations)) portalsToBackfill := make(chan portalToBackfill, len(conversations)) - var backfillWait, fastBackfillWait sync.WaitGroup - var fastBackfillWaitDoneOnce sync.Once - // We have to add 1 to the waitgroup beforehand to make sure the wait in the goroutine doesn't finish - // before we add the actual numbers. - fastBackfillWait.Add(1) + var backfillWait sync.WaitGroup backfillWait.Add(1) - go func() { - // Wait for the fast parallelized backfill to complete, then start the slow backfill loop (see explanation below) - fastBackfillWait.Wait() - user.slowBackfillLoop(portalsToBackfill, backfillWait.Done) - }() - for i, conv := range conversations { - // This will create portals and start backfilling for them. - // - // The first 20 (FastBackfillPortalCount) portals will be parallelized, where the portal is - // created and recent messages are backfilled in parallel. Other portals will be created - // synchronously (and this will only return when they're created). - // - // For said other portals, and older messages in the parallelized portals, backfilling also - // happens synchronously: the portals and messages to backfill are added to the - // portalsToBackfill channel, which is consumed one-by-one in the slowBackfillLoop method. - // That loop is only started after the fast parallelized backfill is completed. - user.handleHistorySyncConversation(i, conv, &fastBackfillWait, portalsToBackfill) - if i == FastBackfillPortalCount { - // There won't be any more portals going the fast backfill route, so remove the 1 item - // that was added to the wait group at the beginning. - fastBackfillWaitDoneOnce.Do(fastBackfillWait.Done) - } + go user.backfillLoop(portalsToBackfill, backfillWait.Done) + for _, conv := range conversations { + user.handleHistorySyncConversation(conv, portalsToBackfill) } - fastBackfillWaitDoneOnce.Do(fastBackfillWait.Done) - // Wait for fast backfill to complete to make sure everything necessary is in the slow backfill queue, - // then close the slow backfill queue and wait for the loop to finish handling the queue. - fastBackfillWait.Wait() close(portalsToBackfill) backfillWait.Wait() - user.log.Infofln("Finished handling history sync with type %s, %d conversations, chunk order %d, progress %d%%", evt.GetSyncType(), len(conversations), evt.GetChunkOrder(), evt.GetProgress()) + user.log.Infoln("Finished handling history sync with", description) } -func (user *User) slowBackfillLoop(ch chan portalToBackfill, done func()) { +func (user *User) backfillLoop(ch chan portalToBackfill, done func()) { defer done() for ptb := range ch { if len(ptb.msgs) > 0 { @@ -145,7 +114,7 @@ func (user *User) slowBackfillLoop(ch chan portalToBackfill, done func()) { } } -func (user *User) handleHistorySyncConversation(index int, conv *waProto.Conversation, fastBackfillWait *sync.WaitGroup, portalsToBackfill chan portalToBackfill) { +func (user *User) handleHistorySyncConversation(conv *waProto.Conversation, portalsToBackfill chan portalToBackfill) { jid, err := types.ParseJID(conv.GetId()) if err != nil { user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.GetId(), err) @@ -180,12 +149,6 @@ func (user *User) handleHistorySyncConversation(index int, conv *waProto.Convers } ptb := portalToBackfill{portal: portal, conv: conv, msgs: msgs} if len(portal.MXID) == 0 { - // For the first few chats, do the portal creation and some backfilling in parallel to populate the chat list ASAP - if index < FastBackfillPortalCount { - fastBackfillWait.Add(1) - go user.fastBackfillRoutine(ptb, fastBackfillWait.Done, portalsToBackfill) - return - } user.log.Debugln("Creating portal for", portal.Key.JID, "as part of history sync handling") err = portal.CreateMatrixRoom(user, getPartialInfoFromConversation(jid, conv), false) if err != nil { @@ -234,34 +197,6 @@ func (user *User) shouldCreatePortalForHistorySync(conv *waProto.Conversation, p return false } -func (user *User) fastBackfillRoutine(ptb portalToBackfill, done func(), slowBackfillChan chan portalToBackfill) { - defer done() - - user.log.Debugln("Asynchronously creating portal for", ptb.portal.Key.JID, "as part of history sync handling") - err := ptb.portal.CreateMatrixRoom(user, getPartialInfoFromConversation(ptb.portal.Key.JID, ptb.conv), false) - if err != nil { - user.log.Warnfln("Failed to create room for %s during backfill: %v", ptb.portal.Key.JID, err) - return - } - - if user.bridge.Config.Bridge.HistorySync.Backfill { - if len(ptb.msgs) > FastBackfillMessageCap { - user.log.Debugfln("Bridging first %d messages of history sync payload for %s (async)", FastBackfillMessageCount, ptb.portal.Key.JID) - ptb.portal.backfill(user, ptb.msgs[:FastBackfillMessageCount]) - // Send the rest of the messages off to the slow backfill queue - ptb.msgs = ptb.msgs[FastBackfillMessageCount:] - slowBackfillChan <- ptb - } else if len(ptb.msgs) > 0 { - user.log.Debugfln("Bridging all messages (%d) of history sync payload for %s (async)", len(ptb.msgs), ptb.portal.Key.JID) - ptb.portal.backfill(user, ptb.msgs) - } else { - user.log.Debugfln("Not backfilling %s: no bridgeable messages found", ptb.portal.Key.JID) - } - } else { - user.log.Debugln("Backfill is disabled, not bridging history sync payload for", ptb.portal.Key.JID) - } -} - func filterMessagesToBackfill(messages []*waProto.HistorySyncMsg) []*waProto.WebMessageInfo { filtered := make([]*waProto.WebMessageInfo, 0, len(messages)) for _, msg := range messages {