diff --git a/commands.go b/commands.go index 709b0ff..4c3f7bc 100644 --- a/commands.go +++ b/commands.go @@ -1059,7 +1059,7 @@ func (handler *CommandHandler) CommandOpen(ce *CommandEvent) { portal.UpdateMatrixRoom(ce.User, info) ce.Reply("Portal room synced.") } else { - err = portal.CreateMatrixRoom(ce.User, info, true) + err = portal.CreateMatrixRoom(ce.User, info, true, true) if err != nil { ce.Reply("Failed to create room: %v", err) } else { diff --git a/historysync.go b/historysync.go index 75abc60..553dfff 100644 --- a/historysync.go +++ b/historysync.go @@ -78,6 +78,10 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac for req := range backfillRequests { user.log.Infof("Backfill request: %v", req) conv := user.bridge.DB.HistorySyncQuery.GetConversation(user.MXID, req.Portal) + if conv == nil { + user.log.Errorf("Could not find conversation for %s in %s", user.MXID, req.Portal.String()) + continue + } // Update the client store with basic chat settings. if conv.MuteEndTime.After(time.Now()) { @@ -102,7 +106,7 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac if len(portal.MXID) == 0 { user.log.Debugln("Creating portal for", portal.Key.JID, "as part of history sync handling") - err := portal.CreateMatrixRoom(user, nil, true) + err := portal.CreateMatrixRoom(user, nil, true, false) if err != nil { user.log.Warnfln("Failed to create room for %s during backfill: %v", portal.Key.JID, err) continue @@ -122,8 +126,6 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac break } - time.Sleep(time.Duration(req.BatchDelay) * time.Second) - var msgs []*waProto.WebMessageInfo if len(toBackfill) <= req.MaxBatchEvents { msgs = toBackfill @@ -134,6 +136,7 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac } if len(msgs) > 0 { + time.Sleep(time.Duration(req.BatchDelay) * time.Second) user.log.Debugf("Backfilling %d messages in %s", len(msgs), portal.Key.JID) insertionEventIds = append(insertionEventIds, portal.backfill(user, msgs)...) } @@ -224,8 +227,7 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History // If this was the initial bootstrap, enqueue immediate backfills for the // most recent portals. If it's the last history sync event, start // backfilling the rest of the history of the portals. - historySyncConfig := user.bridge.Config.Bridge.HistorySync - if historySyncConfig.Backfill && (evt.GetSyncType() == waProto.HistorySync_FULL || evt.GetSyncType() == waProto.HistorySync_INITIAL_BOOTSTRAP) { + if user.bridge.Config.Bridge.HistorySync.Backfill && evt.GetSyncType() == waProto.HistorySync_FULL { nMostRecent := user.bridge.DB.HistorySyncQuery.GetNMostRecentConversations(user.MXID, user.bridge.Config.Bridge.HistorySync.MaxInitialConversations) for i, conv := range nMostRecent { jid, err := types.ParseJID(conv.ConversationID) @@ -235,26 +237,10 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History } portal := user.GetPortalByJID(jid) - switch evt.GetSyncType() { - case waProto.HistorySync_INITIAL_BOOTSTRAP: - // Enqueue immediate backfills for the most recent messages first. - maxMessages := historySyncConfig.Immediate.MaxEvents - initialBackfill := user.bridge.DB.BackfillQuery.NewWithValues(user.MXID, database.BackfillImmediate, i, &portal.Key, nil, nil, maxMessages, maxMessages, 0) - initialBackfill.Insert() - - case waProto.HistorySync_FULL: - // Enqueue deferred backfills as configured. - for j, backfillStage := range historySyncConfig.Deferred { - var startDate *time.Time = nil - if backfillStage.StartDaysAgo > 0 { - startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo) - startDate = &startDaysAgo - } - backfill := user.bridge.DB.BackfillQuery.NewWithValues( - user.MXID, database.BackfillDeferred, j*len(nMostRecent)+i, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay) - backfill.Insert() - } - } + // Enqueue immediate backfills for the most recent messages first. + user.EnqueueImmedateBackfill(portal, i) + // Enqueue deferred backfills as configured. + user.EnqueueDeferredBackfills(portal, len(nMostRecent), i) } // Tell the queue to check for new backfill requests. diff --git a/portal.go b/portal.go index 2765ecf..c98efda 100644 --- a/portal.go +++ b/portal.go @@ -237,7 +237,7 @@ func (portal *Portal) handleMessageLoopItem(msg PortalMessage) { return } portal.log.Debugln("Creating Matrix room from incoming message") - err := portal.CreateMatrixRoom(msg.source, nil, false) + err := portal.CreateMatrixRoom(msg.source, nil, false, true) if err != nil { portal.log.Errorln("Failed to create portal room:", err) return @@ -1163,7 +1163,7 @@ func (portal *Portal) UpdateBridgeInfo() { } } -func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, isFullInfo bool) error { +func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, isFullInfo, backfill bool) error { portal.roomCreateLock.Lock() defer portal.roomCreateLock.Unlock() if len(portal.MXID) > 0 { @@ -1336,6 +1336,12 @@ func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, i portal.FirstEventID = firstEventResp.EventID portal.Update() } + + if user.bridge.Config.Bridge.HistorySync.Backfill && backfill { + user.EnqueueImmedateBackfill(portal, 0) + user.EnqueueDeferredBackfills(portal, 1, 0) + user.BackfillQueue.ReCheckQueue <- true + } return nil } diff --git a/provisioning.go b/provisioning.go index 8dad45e..f149240 100644 --- a/provisioning.go +++ b/provisioning.go @@ -346,7 +346,7 @@ func (prov *ProvisioningAPI) OpenGroup(w http.ResponseWriter, r *http.Request) { portal := user.GetPortalByJID(info.JID) status := http.StatusOK if len(portal.MXID) == 0 { - err = portal.CreateMatrixRoom(user, info, true) + err = portal.CreateMatrixRoom(user, info, true, true) if err != nil { jsonResponse(w, http.StatusInternalServerError, Error{ Error: fmt.Sprintf("Failed to create portal: %v", err), diff --git a/user.go b/user.go index dde6b7c..db2575d 100644 --- a/user.go +++ b/user.go @@ -937,7 +937,7 @@ func (user *User) ResyncGroups(createPortals bool) error { portal := user.GetPortalByJID(group.JID) if len(portal.MXID) == 0 { if createPortals { - err = portal.CreateMatrixRoom(user, group, true) + err = portal.CreateMatrixRoom(user, group, true, true) if err != nil { return fmt.Errorf("failed to create room for %s: %w", group.JID, err) } @@ -1020,7 +1020,7 @@ func (user *User) markSelfReadFull(portal *Portal) { func (user *User) handleGroupCreate(evt *events.JoinedGroup) { portal := user.GetPortalByJID(evt.JID) if len(portal.MXID) == 0 { - err := portal.CreateMatrixRoom(user, &evt.GroupInfo, true) + err := portal.CreateMatrixRoom(user, &evt.GroupInfo, true, true) if err != nil { user.log.Errorln("Failed to create Matrix room after join notification: %v", err) } @@ -1088,7 +1088,7 @@ func (user *User) StartPM(jid types.JID, reason string) (*Portal, *Puppet, bool, return portal, puppet, false, nil } } - err := portal.CreateMatrixRoom(user, nil, false) + err := portal.CreateMatrixRoom(user, nil, false, true) return portal, puppet, true, err }