diff --git a/backfillqueue.go b/backfillqueue.go index da58af4..a952881 100644 --- a/backfillqueue.go +++ b/backfillqueue.go @@ -20,43 +20,73 @@ import ( "time" log "maunium.net/go/maulogger/v2" + "maunium.net/go/mautrix/id" "maunium.net/go/mautrix-whatsapp/database" ) type BackfillQueue struct { - BackfillQuery *database.BackfillQuery - ImmediateBackfillRequests chan *database.Backfill - DeferredBackfillRequests chan *database.Backfill - ReCheckQueue chan bool - - log log.Logger + BackfillQuery *database.BackfillQuery + reCheckChannels []chan bool + log log.Logger } -// RunLoop fetches backfills from the database, prioritizing immediate and forward backfills -func (bq *BackfillQueue) RunLoop(user *User) { +func (bq *BackfillQueue) ReCheck() { + bq.log.Info("Sending re-checks to %d channels", len(bq.reCheckChannels)) + for _, channel := range bq.reCheckChannels { + go func(c chan bool) { + c <- true + }(channel) + } +} + +func (bq *BackfillQueue) GetNextBackfill(userID id.UserID, backfillTypes []database.BackfillType, reCheckChannel chan bool) *database.Backfill { for { - if backfill := bq.BackfillQuery.GetNext(user.MXID); backfill != nil { - if backfill.BackfillType == database.BackfillImmediate || backfill.BackfillType == database.BackfillForward { - bq.ImmediateBackfillRequests <- backfill - } else if backfill.BackfillType == database.BackfillDeferred { - select { - case <-bq.ReCheckQueue: - // If a queue re-check is requested, interrupt sending the - // backfill request to the deferred channel so that - // immediate backfills can happen ASAP. - continue - case bq.DeferredBackfillRequests <- backfill: - } - } else { - bq.log.Debugfln("Unrecognized backfill type %d in queue", backfill.BackfillType) - } + if backfill := bq.BackfillQuery.GetNext(userID, backfillTypes); backfill != nil { backfill.MarkDispatched() + return backfill } else { select { - case <-bq.ReCheckQueue: + case <-reCheckChannel: case <-time.After(time.Minute): } } } } + +func (user *User) HandleBackfillRequestsLoop(backfillTypes []database.BackfillType) { + reCheckChannel := make(chan bool) + user.BackfillQueue.reCheckChannels = append(user.BackfillQueue.reCheckChannels, reCheckChannel) + + for { + req := user.BackfillQueue.GetNextBackfill(user.MXID, backfillTypes, reCheckChannel) + user.log.Infofln("Handling backfill request %s", req) + + conv := user.bridge.DB.HistorySync.GetConversation(user.MXID, req.Portal) + if conv == nil { + user.log.Debugfln("Could not find history sync conversation data for %s", req.Portal.String()) + req.MarkDone() + continue + } + portal := user.GetPortalByJID(conv.PortalKey.JID) + + // Update the client store with basic chat settings. + if conv.MuteEndTime.After(time.Now()) { + user.Client.Store.ChatSettings.PutMutedUntil(conv.PortalKey.JID, conv.MuteEndTime) + } + if conv.Archived { + user.Client.Store.ChatSettings.PutArchived(conv.PortalKey.JID, true) + } + if conv.Pinned > 0 { + user.Client.Store.ChatSettings.PutPinned(conv.PortalKey.JID, true) + } + + if conv.EphemeralExpiration != nil && portal.ExpirationTime != *conv.EphemeralExpiration { + portal.ExpirationTime = *conv.EphemeralExpiration + portal.Update(nil) + } + + user.backfillInChunks(req, conv, portal) + req.MarkDone() + } +} diff --git a/commands.go b/commands.go index ae16366..a1d70fe 100644 --- a/commands.go +++ b/commands.go @@ -876,7 +876,7 @@ func (handler *CommandHandler) CommandBackfill(ce *CommandEvent) { backfillMessages := ce.Portal.bridge.DB.Backfill.NewWithValues(ce.User.MXID, database.BackfillImmediate, 0, &ce.Portal.Key, nil, batchSize, -1, batchDelay) backfillMessages.Insert() - ce.User.BackfillQueue.ReCheckQueue <- true + ce.User.BackfillQueue.ReCheck() } const cmdListHelp = `list [page] [items per page] - Get a list of all contacts and groups.` diff --git a/database/backfillqueue.go b/database/backfillqueue.go index 1d053d6..d0885a5 100644 --- a/database/backfillqueue.go +++ b/database/backfillqueue.go @@ -20,6 +20,8 @@ import ( "database/sql" "errors" "fmt" + "strconv" + "strings" "time" log "maunium.net/go/maulogger/v2" @@ -79,6 +81,7 @@ const ( SELECT queue_id, user_mxid, type, priority, portal_jid, portal_receiver, time_start, max_batch_events, max_total_events, batch_delay FROM backfill_queue WHERE user_mxid=$1 + AND type IN (%s) AND dispatch_time IS NULL ORDER BY type, priority, queue_id LIMIT 1 @@ -86,13 +89,17 @@ const ( ) // GetNext returns the next backfill to perform -func (bq *BackfillQuery) GetNext(userID id.UserID) (backfill *Backfill) { - rows, err := bq.db.Query(getNextBackfillQuery, userID) - defer rows.Close() +func (bq *BackfillQuery) GetNext(userID id.UserID, backfillTypes []BackfillType) (backfill *Backfill) { + types := []string{} + for _, backfillType := range backfillTypes { + types = append(types, strconv.Itoa(int(backfillType))) + } + rows, err := bq.db.Query(fmt.Sprintf(getNextBackfillQuery, strings.Join(types, ",")), userID) if err != nil || rows == nil { bq.log.Error(err) return } + defer rows.Close() if rows.Next() { backfill = bq.New().Scan(rows) } diff --git a/historysync.go b/historysync.go index 825eba5..69e6fc4 100644 --- a/historysync.go +++ b/historysync.go @@ -51,27 +51,22 @@ func (user *User) handleHistorySyncsLoop() { return } - reCheckQueue := make(chan bool, 1) // Start the backfill queue. user.BackfillQueue = &BackfillQueue{ - BackfillQuery: user.bridge.DB.Backfill, - ImmediateBackfillRequests: make(chan *database.Backfill, 1), - DeferredBackfillRequests: make(chan *database.Backfill, 1), - ReCheckQueue: make(chan bool, 1), - log: user.log.Sub("BackfillQueue"), + BackfillQuery: user.bridge.DB.Backfill, + reCheckChannels: []chan bool{}, + log: user.log.Sub("BackfillQueue"), } - reCheckQueue = user.BackfillQueue.ReCheckQueue // Immediate backfills can be done in parallel for i := 0; i < user.bridge.Config.Bridge.HistorySync.Immediate.WorkerCount; i++ { - go user.handleBackfillRequestsLoop(user.BackfillQueue.ImmediateBackfillRequests) + go user.HandleBackfillRequestsLoop([]database.BackfillType{database.BackfillImmediate, database.BackfillForward}) } // Deferred backfills should be handled synchronously so as not to // overload the homeserver. Users can configure their backfill stages // to be more or less aggressive with backfilling at this stage. - go user.handleBackfillRequestsLoop(user.BackfillQueue.DeferredBackfillRequests) - go user.BackfillQueue.RunLoop(user) + go user.HandleBackfillRequestsLoop([]database.BackfillType{database.BackfillDeferred}) if user.bridge.Config.Bridge.HistorySync.MediaRequests.AutoRequestMedia && user.bridge.Config.Bridge.HistorySync.MediaRequests.RequestMethod == config.MediaRequestMethodLocalTime { @@ -81,7 +76,7 @@ func (user *User) handleHistorySyncsLoop() { // Always save the history syncs for the user. If they want to enable // backfilling in the future, we will have it in the database. for evt := range user.historySyncs { - user.handleHistorySync(reCheckQueue, evt.Data) + user.handleHistorySync(user.BackfillQueue, evt.Data) } } @@ -131,38 +126,6 @@ func (user *User) dailyMediaRequestLoop() { } } -func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Backfill) { - for req := range backfillRequests { - user.log.Infofln("Handling backfill request %s", req) - conv := user.bridge.DB.HistorySync.GetConversation(user.MXID, req.Portal) - if conv == nil { - user.log.Debugfln("Could not find history sync conversation data for %s", req.Portal.String()) - req.MarkDone() - continue - } - portal := user.GetPortalByJID(conv.PortalKey.JID) - - // Update the client store with basic chat settings. - if conv.MuteEndTime.After(time.Now()) { - user.Client.Store.ChatSettings.PutMutedUntil(conv.PortalKey.JID, conv.MuteEndTime) - } - if conv.Archived { - user.Client.Store.ChatSettings.PutArchived(conv.PortalKey.JID, true) - } - if conv.Pinned > 0 { - user.Client.Store.ChatSettings.PutPinned(conv.PortalKey.JID, true) - } - - if conv.EphemeralExpiration != nil && portal.ExpirationTime != *conv.EphemeralExpiration { - portal.ExpirationTime = *conv.EphemeralExpiration - portal.Update(nil) - } - - user.backfillInChunks(req, conv, portal) - req.MarkDone() - } -} - func (user *User) backfillInChunks(req *database.Backfill, conv *database.HistorySyncConversation, portal *Portal) { portal.backfillLock.Lock() defer portal.backfillLock.Unlock() @@ -294,7 +257,7 @@ func (user *User) shouldCreatePortalForHistorySync(conv *database.HistorySyncCon return false } -func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.HistorySync) { +func (user *User) handleHistorySync(backfillQueue *BackfillQueue, evt *waProto.HistorySync) { if evt == nil || evt.SyncType == nil || evt.GetSyncType() == waProto.HistorySync_INITIAL_STATUS_V3 || evt.GetSyncType() == waProto.HistorySync_PUSH_NAME { return } @@ -385,7 +348,7 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History } // Tell the queue to check for new backfill requests. - reCheckQueue <- true + backfillQueue.ReCheck() } } } diff --git a/portal.go b/portal.go index c380dd0..967bb49 100644 --- a/portal.go +++ b/portal.go @@ -1368,7 +1368,7 @@ func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, i portals := []*Portal{portal} user.EnqueueImmedateBackfills(portals) user.EnqueueDeferredBackfills(portals) - user.BackfillQueue.ReCheckQueue <- true + user.BackfillQueue.ReCheck() } return nil }