From 5881247f7345c386170499a5ea03aa9630e4b65d Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Mon, 5 Dec 2022 18:03:52 +0200 Subject: [PATCH] Add timer to enqueue backfills --- historysync.go | 66 ++++++++++++++++++++++++++------------------------ user.go | 3 +++ 2 files changed, 38 insertions(+), 31 deletions(-) diff --git a/historysync.go b/historysync.go index b185dc8..61866dc 100644 --- a/historysync.go +++ b/historysync.go @@ -84,8 +84,40 @@ func (user *User) handleHistorySyncsLoop() { // Always save the history syncs for the user. If they want to enable // backfilling in the future, we will have it in the database. - for evt := range user.historySyncs { - user.handleHistorySync(user.BackfillQueue, evt.Data) + for { + select { + case evt := <-user.historySyncs: + user.handleHistorySync(user.BackfillQueue, evt.Data) + case <-user.enqueueBackfillsTimer.C: + user.enqueueAllBackfills() + } + } +} + +const EnqueueBackfillsDelay = 30 * time.Second + +func (user *User) enqueueAllBackfills() { + user.log.Infofln("%v has passed since the last history sync blob, enqueueing backfills", EnqueueBackfillsDelay) + nMostRecent := user.bridge.DB.HistorySync.GetNMostRecentConversations(user.MXID, user.bridge.Config.Bridge.HistorySync.MaxInitialConversations) + if len(nMostRecent) > 0 { + user.log.Infofln("Got last history sync blob, enqueuing backfills") + // Find the portals for all the conversations. + portals := []*Portal{} + for _, conv := range nMostRecent { + jid, err := types.ParseJID(conv.ConversationID) + if err != nil { + user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.ConversationID, err) + continue + } + portals = append(portals, user.GetPortalByJID(jid)) + } + + user.EnqueueImmediateBackfills(portals) + user.EnqueueForwardBackfills(portals) + user.EnqueueDeferredBackfills(portals) + + // Tell the queue to check for new backfill requests. + user.BackfillQueue.ReCheck() } } @@ -380,35 +412,7 @@ func (user *User) handleHistorySync(backfillQueue *BackfillQueue, evt *waProto.H // most recent portals. If it's the last history sync event, start // backfilling the rest of the history of the portals. if user.bridge.Config.Bridge.HistorySync.Backfill { - expectedLastSyncType := waProto.HistorySync_FULL - if !user.bridge.Config.Bridge.HistorySync.RequestFullSync { - expectedLastSyncType = waProto.HistorySync_RECENT - } - if evt.GetProgress() < 99 || evt.GetSyncType() != expectedLastSyncType { - return - } - - nMostRecent := user.bridge.DB.HistorySync.GetNMostRecentConversations(user.MXID, user.bridge.Config.Bridge.HistorySync.MaxInitialConversations) - if len(nMostRecent) > 0 { - user.log.Infofln("Got last history sync blob, enqueuing backfills") - // Find the portals for all the conversations. - portals := []*Portal{} - for _, conv := range nMostRecent { - jid, err := types.ParseJID(conv.ConversationID) - if err != nil { - user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.ConversationID, err) - continue - } - portals = append(portals, user.GetPortalByJID(jid)) - } - - user.EnqueueImmediateBackfills(portals) - user.EnqueueForwardBackfills(portals) - user.EnqueueDeferredBackfills(portals) - - // Tell the queue to check for new backfill requests. - backfillQueue.ReCheck() - } + user.enqueueBackfillsTimer.Reset(EnqueueBackfillsDelay) } } diff --git a/user.go b/user.go index 671c425..b913dd3 100644 --- a/user.go +++ b/user.go @@ -76,6 +76,7 @@ type User struct { lastPresence types.Presence historySyncLoopsStarted bool + enqueueBackfillsTimer *time.Timer spaceMembershipChecked bool lastPhoneOfflineWarning time.Time @@ -236,6 +237,8 @@ func (br *WABridge) NewUser(dbUser *database.User) *User { user.Whitelisted = user.PermissionLevel >= bridgeconfig.PermissionLevelUser user.Admin = user.PermissionLevel >= bridgeconfig.PermissionLevelAdmin user.BridgeState = br.NewBridgeStateQueue(user, user.log) + user.enqueueBackfillsTimer = time.NewTimer(5 * time.Second) + user.enqueueBackfillsTimer.Stop() go user.puppetResyncLoop() return user }