Add timer to enqueue backfills

This commit is contained in:
Tulir Asokan 2022-12-05 18:03:52 +02:00
parent 74e6c5f53e
commit 5881247f73
2 changed files with 38 additions and 31 deletions

View file

@ -84,8 +84,40 @@ func (user *User) handleHistorySyncsLoop() {
// Always save the history syncs for the user. If they want to enable
// backfilling in the future, we will have it in the database.
for evt := range user.historySyncs {
user.handleHistorySync(user.BackfillQueue, evt.Data)
for {
select {
case evt := <-user.historySyncs:
user.handleHistorySync(user.BackfillQueue, evt.Data)
case <-user.enqueueBackfillsTimer.C:
user.enqueueAllBackfills()
}
}
}
const EnqueueBackfillsDelay = 30 * time.Second
func (user *User) enqueueAllBackfills() {
user.log.Infofln("%v has passed since the last history sync blob, enqueueing backfills", EnqueueBackfillsDelay)
nMostRecent := user.bridge.DB.HistorySync.GetNMostRecentConversations(user.MXID, user.bridge.Config.Bridge.HistorySync.MaxInitialConversations)
if len(nMostRecent) > 0 {
user.log.Infofln("Got last history sync blob, enqueuing backfills")
// Find the portals for all the conversations.
portals := []*Portal{}
for _, conv := range nMostRecent {
jid, err := types.ParseJID(conv.ConversationID)
if err != nil {
user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.ConversationID, err)
continue
}
portals = append(portals, user.GetPortalByJID(jid))
}
user.EnqueueImmediateBackfills(portals)
user.EnqueueForwardBackfills(portals)
user.EnqueueDeferredBackfills(portals)
// Tell the queue to check for new backfill requests.
user.BackfillQueue.ReCheck()
}
}
@ -380,35 +412,7 @@ func (user *User) handleHistorySync(backfillQueue *BackfillQueue, evt *waProto.H
// most recent portals. If it's the last history sync event, start
// backfilling the rest of the history of the portals.
if user.bridge.Config.Bridge.HistorySync.Backfill {
expectedLastSyncType := waProto.HistorySync_FULL
if !user.bridge.Config.Bridge.HistorySync.RequestFullSync {
expectedLastSyncType = waProto.HistorySync_RECENT
}
if evt.GetProgress() < 99 || evt.GetSyncType() != expectedLastSyncType {
return
}
nMostRecent := user.bridge.DB.HistorySync.GetNMostRecentConversations(user.MXID, user.bridge.Config.Bridge.HistorySync.MaxInitialConversations)
if len(nMostRecent) > 0 {
user.log.Infofln("Got last history sync blob, enqueuing backfills")
// Find the portals for all the conversations.
portals := []*Portal{}
for _, conv := range nMostRecent {
jid, err := types.ParseJID(conv.ConversationID)
if err != nil {
user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.ConversationID, err)
continue
}
portals = append(portals, user.GetPortalByJID(jid))
}
user.EnqueueImmediateBackfills(portals)
user.EnqueueForwardBackfills(portals)
user.EnqueueDeferredBackfills(portals)
// Tell the queue to check for new backfill requests.
backfillQueue.ReCheck()
}
user.enqueueBackfillsTimer.Reset(EnqueueBackfillsDelay)
}
}

View file

@ -76,6 +76,7 @@ type User struct {
lastPresence types.Presence
historySyncLoopsStarted bool
enqueueBackfillsTimer *time.Timer
spaceMembershipChecked bool
lastPhoneOfflineWarning time.Time
@ -236,6 +237,8 @@ func (br *WABridge) NewUser(dbUser *database.User) *User {
user.Whitelisted = user.PermissionLevel >= bridgeconfig.PermissionLevelUser
user.Admin = user.PermissionLevel >= bridgeconfig.PermissionLevelAdmin
user.BridgeState = br.NewBridgeStateQueue(user, user.log)
user.enqueueBackfillsTimer = time.NewTimer(5 * time.Second)
user.enqueueBackfillsTimer.Stop()
go user.puppetResyncLoop()
return user
}