From 12a23e2ca568764b6db538f008149a6af2266709 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Thu, 24 Mar 2022 14:21:23 -0600 Subject: [PATCH] historysync: refactor to utilize backfill queue Also sends the `m.room.marker` event when a backfill stage is complete. --- backfillqueue.go | 69 +++++++ database/historysync.go | 5 +- historysync.go | 435 +++++++++++++++++++++++----------------- provisioning.go | 3 + user.go | 9 +- 5 files changed, 325 insertions(+), 196 deletions(-) create mode 100644 backfillqueue.go diff --git a/backfillqueue.go b/backfillqueue.go new file mode 100644 index 0000000..8d0d9c5 --- /dev/null +++ b/backfillqueue.go @@ -0,0 +1,69 @@ +// mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge. +// Copyright (C) 2021 Tulir Asokan, Sumner Evans +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package main + +import ( + "time" + + log "maunium.net/go/maulogger/v2" + "maunium.net/go/mautrix-whatsapp/database" +) + +type BackfillQueue struct { + BackfillQuery *database.BackfillQuery + ImmediateBackfillRequests chan *database.Backfill + DeferredBackfillRequests chan *database.Backfill + ReCheckQueue chan bool + + log log.Logger +} + +func (bq *BackfillQueue) RunLoops(user *User) { + go bq.immediateBackfillLoop(user) + bq.deferredBackfillLoop(user) +} + +func (bq *BackfillQueue) immediateBackfillLoop(user *User) { + for { + if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); backfill != nil { + bq.ImmediateBackfillRequests <- backfill + backfill.Delete() + } else { + select { + case <-bq.ReCheckQueue: + case <-time.After(10 * time.Second): + } + } + } +} + +func (bq *BackfillQueue) deferredBackfillLoop(user *User) { + for { + // Finish all immediate backfills before doing the deferred ones. + if immediate := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); immediate != nil { + time.Sleep(10 * time.Second) + continue + } + + if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillDeferred); backfill != nil { + bq.DeferredBackfillRequests <- backfill + backfill.Delete() + } else { + time.Sleep(10 * time.Second) + } + } +} diff --git a/database/historysync.go b/database/historysync.go index 04821a2..89fefb6 100644 --- a/database/historysync.go +++ b/database/historysync.go @@ -117,7 +117,10 @@ func (hsc *HistorySyncConversation) Upsert() { DO UPDATE SET portal_jid=EXCLUDED.portal_jid, portal_receiver=EXCLUDED.portal_receiver, - last_message_timestamp=EXCLUDED.last_message_timestamp, + last_message_timestamp=CASE + WHEN EXCLUDED.last_message_timestamp > history_sync_conversation.last_message_timestamp THEN EXCLUDED.last_message_timestamp + ELSE history_sync_conversation.last_message_timestamp + END, archived=EXCLUDED.archived, pinned=EXCLUDED.pinned, mute_end_time=EXCLUDED.mute_end_time, diff --git a/historysync.go b/historysync.go index 5fece97..75abc60 100644 --- a/historysync.go +++ b/historysync.go @@ -18,8 +18,6 @@ package main import ( "fmt" - "sort" - "sync" "time" waProto "go.mau.fi/whatsmeow/binary/proto" @@ -35,12 +33,6 @@ import ( // region User history sync handling -type portalToBackfill struct { - portal *Portal - conv *waProto.Conversation - msgs []*waProto.WebMessageInfo -} - type wrappedInfo struct { *types.MessageInfo Type database.MessageType @@ -50,118 +42,223 @@ type wrappedInfo struct { ExpiresIn uint32 } -type conversationList []*waProto.Conversation - -var _ sort.Interface = (conversationList)(nil) - -func (c conversationList) Len() int { - return len(c) -} - -func (c conversationList) Less(i, j int) bool { - return getConversationTimestamp(c[i]) < getConversationTimestamp(c[j]) -} - -func (c conversationList) Swap(i, j int) { - c[i], c[j] = c[j], c[i] -} - func (user *User) handleHistorySyncsLoop() { + reCheckQueue := make(chan bool, 1) + if user.bridge.Config.Bridge.HistorySync.Backfill { + // Start the backfill queue. + user.BackfillQueue = &BackfillQueue{ + BackfillQuery: user.bridge.DB.BackfillQuery, + ImmediateBackfillRequests: make(chan *database.Backfill, 1), + DeferredBackfillRequests: make(chan *database.Backfill, 1), + ReCheckQueue: make(chan bool, 1), + log: user.log.Sub("BackfillQueue"), + } + reCheckQueue = user.BackfillQueue.ReCheckQueue + + // Immediate backfills can be done in parallel + for i := 0; i < user.bridge.Config.Bridge.HistorySync.Immediate.WorkerCount; i++ { + go user.handleBackfillRequestsLoop(user.BackfillQueue.ImmediateBackfillRequests) + } + + // Deferred backfills should be handled synchronously so as not to + // overload the homeserver. Users can configure their backfill stages + // to be more or less aggressive with backfilling at this stage. + go user.handleBackfillRequestsLoop(user.BackfillQueue.DeferredBackfillRequests) + go user.BackfillQueue.RunLoops(user) + } + + // Always save the history syncs for the user. If they want to enable + // backfilling in the future, we will have it in the database. for evt := range user.historySyncs { - go user.sendBridgeState(BridgeState{StateEvent: StateBackfilling}) - user.handleHistorySync(evt.Data) - if len(user.historySyncs) == 0 && user.IsConnected() { - go user.sendBridgeState(BridgeState{StateEvent: StateConnected}) + user.handleHistorySync(reCheckQueue, evt.Data) + } +} + +func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Backfill) { + for req := range backfillRequests { + user.log.Infof("Backfill request: %v", req) + conv := user.bridge.DB.HistorySyncQuery.GetConversation(user.MXID, req.Portal) + + // Update the client store with basic chat settings. + if conv.MuteEndTime.After(time.Now()) { + user.Client.Store.ChatSettings.PutMutedUntil(conv.PortalKey.JID, conv.MuteEndTime) + } + if conv.Archived { + user.Client.Store.ChatSettings.PutArchived(conv.PortalKey.JID, true) + } + if conv.Pinned > 0 { + user.Client.Store.ChatSettings.PutPinned(conv.PortalKey.JID, true) + } + + portal := user.GetPortalByJID(conv.PortalKey.JID) + if conv.EphemeralExpiration != nil && portal.ExpirationTime != *conv.EphemeralExpiration { + portal.ExpirationTime = *conv.EphemeralExpiration + portal.Update() + } + + if !user.shouldCreatePortalForHistorySync(conv, portal) { + continue + } + + if len(portal.MXID) == 0 { + user.log.Debugln("Creating portal for", portal.Key.JID, "as part of history sync handling") + err := portal.CreateMatrixRoom(user, nil, true) + if err != nil { + user.log.Warnfln("Failed to create room for %s during backfill: %v", portal.Key.JID, err) + continue + } + } else { + portal.UpdateMatrixRoom(user, nil) + } + + allMsgs := user.bridge.DB.HistorySyncQuery.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, req.TimeEnd, req.MaxTotalEvents) + + if len(allMsgs) > 0 { + user.log.Debugf("Backfilling %d messages in %s, %d messages at a time", len(allMsgs), portal.Key.JID, req.MaxBatchEvents) + toBackfill := allMsgs[0:] + insertionEventIds := []id.EventID{} + for { + if len(toBackfill) == 0 { + break + } + + time.Sleep(time.Duration(req.BatchDelay) * time.Second) + + var msgs []*waProto.WebMessageInfo + if len(toBackfill) <= req.MaxBatchEvents { + msgs = toBackfill + toBackfill = toBackfill[0:0] + } else { + msgs = toBackfill[len(toBackfill)-req.MaxBatchEvents:] + toBackfill = toBackfill[:len(toBackfill)-req.MaxBatchEvents] + } + + if len(msgs) > 0 { + user.log.Debugf("Backfilling %d messages in %s", len(msgs), portal.Key.JID) + insertionEventIds = append(insertionEventIds, portal.backfill(user, msgs)...) + } + } + user.log.Debugf("Finished backfilling %d messages in %s", len(allMsgs), portal.Key.JID) + if len(insertionEventIds) > 0 { + portal.sendPostBackfillDummy( + time.Unix(int64(allMsgs[len(allMsgs)-1].GetMessageTimestamp()), 0), + insertionEventIds[0]) + } + } else { + user.log.Debugfln("Not backfilling %s: no bridgeable messages found", portal.Key.JID) + } + if !conv.MarkedAsUnread && conv.UnreadCount == 0 { + user.markSelfReadFull(portal) } } } -func (user *User) handleHistorySync(evt *waProto.HistorySync) { +func (user *User) shouldCreatePortalForHistorySync(conv *database.HistorySyncConversation, portal *Portal) bool { + if len(portal.MXID) > 0 { + user.log.Debugfln("Portal for %s already exists, ensuring user is invited", portal.Key.JID) + portal.ensureUserInvited(user) + // Portal exists, let backfill continue + return true + } else if !user.bridge.Config.Bridge.HistorySync.CreatePortals { + user.log.Debugfln("Not creating portal for %s: creating rooms from history sync is disabled", portal.Key.JID) + } else { + // Portal doesn't exist, but should be created + return true + } + // Portal shouldn't be created, reason logged above + return false +} + +func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.HistorySync) { if evt == nil || evt.SyncType == nil || evt.GetSyncType() == waProto.HistorySync_INITIAL_STATUS_V3 || evt.GetSyncType() == waProto.HistorySync_PUSH_NAME { return } - description := fmt.Sprintf("type %s, %d conversations, chunk order %d, progress %d%%", evt.GetSyncType(), len(evt.GetConversations()), evt.GetChunkOrder(), evt.GetProgress()) - user.log.Infoln("Handling history sync with", description) + description := fmt.Sprintf("type %s, %d conversations, chunk order %d", evt.GetSyncType(), len(evt.GetConversations()), evt.GetChunkOrder()) + user.log.Infoln("Storing history sync with", description) - conversations := conversationList(evt.GetConversations()) - // We want to handle recent conversations first - sort.Sort(sort.Reverse(conversations)) - portalsToBackfill := make(chan portalToBackfill, len(conversations)) - - var backfillWait sync.WaitGroup - backfillWait.Add(1) - go user.backfillLoop(portalsToBackfill, backfillWait.Done) - for _, conv := range conversations { - user.handleHistorySyncConversation(conv, portalsToBackfill) - } - close(portalsToBackfill) - backfillWait.Wait() - user.log.Infoln("Finished handling history sync with", description) -} - -func (user *User) backfillLoop(ch chan portalToBackfill, done func()) { - defer done() - for ptb := range ch { - if len(ptb.msgs) > 0 { - user.log.Debugln("Bridging history sync payload for", ptb.portal.Key.JID) - ptb.portal.backfill(user, ptb.msgs) - } else { - user.log.Debugfln("Not backfilling %s: no bridgeable messages found", ptb.portal.Key.JID) - } - if !ptb.conv.GetMarkedAsUnread() && ptb.conv.GetUnreadCount() == 0 { - user.markSelfReadFull(ptb.portal) - } - } -} - -func (user *User) handleHistorySyncConversation(conv *waProto.Conversation, portalsToBackfill chan portalToBackfill) { - jid, err := types.ParseJID(conv.GetId()) - if err != nil { - user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.GetId(), err) - return - } - - // Update the client store with basic chat settings. - muteEnd := time.Unix(int64(conv.GetMuteEndTime()), 0) - if muteEnd.After(time.Now()) { - _ = user.Client.Store.ChatSettings.PutMutedUntil(jid, muteEnd) - } - if conv.GetArchived() { - _ = user.Client.Store.ChatSettings.PutArchived(jid, true) - } - if conv.GetPinned() > 0 { - _ = user.Client.Store.ChatSettings.PutPinned(jid, true) - } - - portal := user.GetPortalByJID(jid) - if conv.EphemeralExpiration != nil && portal.ExpirationTime != conv.GetEphemeralExpiration() { - portal.ExpirationTime = conv.GetEphemeralExpiration() - portal.Update() - } - // Check if portal is too old or doesn't contain anything we can bridge. - if !user.shouldCreatePortalForHistorySync(conv, portal) { - return - } - - var msgs []*waProto.WebMessageInfo - if user.bridge.Config.Bridge.HistorySync.Backfill { - msgs = filterMessagesToBackfill(conv.GetMessages()) - } - ptb := portalToBackfill{portal: portal, conv: conv, msgs: msgs} - if len(portal.MXID) == 0 { - user.log.Debugln("Creating portal for", portal.Key.JID, "as part of history sync handling") - err = portal.CreateMatrixRoom(user, getPartialInfoFromConversation(jid, conv), false) + for _, conv := range evt.GetConversations() { + jid, err := types.ParseJID(conv.GetId()) if err != nil { - user.log.Warnfln("Failed to create room for %s during backfill: %v", portal.Key.JID, err) - return + user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.GetId(), err) + continue + } + portal := user.GetPortalByJID(jid) + + historySyncConversation := user.bridge.DB.HistorySyncQuery.NewConversationWithValues( + user.MXID, + conv.GetId(), + &portal.Key, + getConversationTimestamp(conv), + conv.GetMuteEndTime(), + conv.GetArchived(), + conv.GetPinned(), + conv.GetDisappearingMode().GetInitiator(), + conv.GetEndOfHistoryTransferType(), + conv.EphemeralExpiration, + conv.GetMarkedAsUnread(), + conv.GetUnreadCount()) + historySyncConversation.Upsert() + + for _, msg := range conv.GetMessages() { + // Don't store messages that will just be skipped. + wmi := msg.GetMessage() + msgType := getMessageType(wmi.GetMessage()) + if msgType == "unknown" || msgType == "ignore" || msgType == "unknown_protocol" { + continue + } + + // Don't store unsupported messages. + if !containsSupportedMessage(msg.GetMessage().GetMessage()) { + continue + } + + message, err := user.bridge.DB.HistorySyncQuery.NewMessageWithValues(user.MXID, conv.GetId(), msg) + if err != nil { + user.log.Warnf("Failed to save message %s in %s. Error: %+v", msg.Message.Key.Id, conv.GetId(), err) + continue + } + message.Insert() } - } else { - portal.UpdateMatrixRoom(user, nil) } - if !user.bridge.Config.Bridge.HistorySync.Backfill { - user.log.Debugln("Backfill is disabled, not bridging history sync payload for", portal.Key.JID) - } else { - portalsToBackfill <- ptb + + // If this was the initial bootstrap, enqueue immediate backfills for the + // most recent portals. If it's the last history sync event, start + // backfilling the rest of the history of the portals. + historySyncConfig := user.bridge.Config.Bridge.HistorySync + if historySyncConfig.Backfill && (evt.GetSyncType() == waProto.HistorySync_FULL || evt.GetSyncType() == waProto.HistorySync_INITIAL_BOOTSTRAP) { + nMostRecent := user.bridge.DB.HistorySyncQuery.GetNMostRecentConversations(user.MXID, user.bridge.Config.Bridge.HistorySync.MaxInitialConversations) + for i, conv := range nMostRecent { + jid, err := types.ParseJID(conv.ConversationID) + if err != nil { + user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.ConversationID, err) + continue + } + portal := user.GetPortalByJID(jid) + + switch evt.GetSyncType() { + case waProto.HistorySync_INITIAL_BOOTSTRAP: + // Enqueue immediate backfills for the most recent messages first. + maxMessages := historySyncConfig.Immediate.MaxEvents + initialBackfill := user.bridge.DB.BackfillQuery.NewWithValues(user.MXID, database.BackfillImmediate, i, &portal.Key, nil, nil, maxMessages, maxMessages, 0) + initialBackfill.Insert() + + case waProto.HistorySync_FULL: + // Enqueue deferred backfills as configured. + for j, backfillStage := range historySyncConfig.Deferred { + var startDate *time.Time = nil + if backfillStage.StartDaysAgo > 0 { + startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo) + startDate = &startDaysAgo + } + backfill := user.bridge.DB.BackfillQuery.NewWithValues( + user.MXID, database.BackfillDeferred, j*len(nMostRecent)+i, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay) + backfill.Insert() + } + } + } + + // Tell the queue to check for new backfill requests. + reCheckQueue <- true } } @@ -173,71 +270,22 @@ func getConversationTimestamp(conv *waProto.Conversation) uint64 { return convTs } -func (user *User) shouldCreatePortalForHistorySync(conv *waProto.Conversation, portal *Portal) bool { - maxAge := user.bridge.Config.Bridge.HistorySync.MaxAge - minLastMsgToCreate := time.Now().Add(-time.Duration(maxAge) * time.Second) - lastMsg := time.Unix(int64(getConversationTimestamp(conv)), 0) - - if len(portal.MXID) > 0 { - user.log.Debugfln("Portal for %s already exists, ensuring user is invited", portal.Key.JID) - portal.ensureUserInvited(user) - // Portal exists, let backfill continue - return true - } else if !user.bridge.Config.Bridge.HistorySync.CreatePortals { - user.log.Debugfln("Not creating portal for %s: creating rooms from history sync is disabled", portal.Key.JID) - } else if !containsSupportedMessages(conv) { - user.log.Debugfln("Not creating portal for %s: no interesting messages found", portal.Key.JID) - } else if maxAge > 0 && !lastMsg.After(minLastMsgToCreate) { - user.log.Debugfln("Not creating portal for %s: last message older than limit (%s)", portal.Key.JID, lastMsg) - } else { - // Portal doesn't exist, but should be created - return true - } - // Portal shouldn't be created, reason logged above - return false +func (user *User) EnqueueImmedateBackfill(portal *Portal, priority int) { + maxMessages := user.bridge.Config.Bridge.HistorySync.Immediate.MaxEvents + initialBackfill := user.bridge.DB.BackfillQuery.NewWithValues(user.MXID, database.BackfillImmediate, priority, &portal.Key, nil, nil, maxMessages, maxMessages, 0) + initialBackfill.Insert() } -func filterMessagesToBackfill(messages []*waProto.HistorySyncMsg) []*waProto.WebMessageInfo { - filtered := make([]*waProto.WebMessageInfo, 0, len(messages)) - for _, msg := range messages { - wmi := msg.GetMessage() - msgType := getMessageType(wmi.GetMessage()) - if msgType == "unknown" || msgType == "ignore" || msgType == "unknown_protocol" { - continue - } else { - filtered = append(filtered, wmi) +func (user *User) EnqueueDeferredBackfills(portal *Portal, numConversations, priority int) { + for j, backfillStage := range user.bridge.Config.Bridge.HistorySync.Deferred { + var startDate *time.Time = nil + if backfillStage.StartDaysAgo > 0 { + startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo) + startDate = &startDaysAgo } - } - return filtered -} - -func containsSupportedMessages(conv *waProto.Conversation) bool { - for _, msg := range conv.GetMessages() { - if containsSupportedMessage(msg.GetMessage().GetMessage()) { - return true - } - } - return false -} - -func getPartialInfoFromConversation(jid types.JID, conv *waProto.Conversation) *types.GroupInfo { - // TODO broadcast list info? - if jid.Server != types.GroupServer { - return nil - } - participants := make([]types.GroupParticipant, len(conv.GetParticipant())) - for i, pcp := range conv.GetParticipant() { - participantJID, _ := types.ParseJID(pcp.GetUserJid()) - participants[i] = types.GroupParticipant{ - JID: participantJID, - IsAdmin: pcp.GetRank() == waProto.GroupParticipant_ADMIN, - IsSuperAdmin: pcp.GetRank() == waProto.GroupParticipant_SUPERADMIN, - } - } - return &types.GroupInfo{ - JID: jid, - GroupName: types.GroupName{Name: conv.GetName()}, - Participants: participants, + backfill := user.bridge.DB.BackfillQuery.NewWithValues( + user.MXID, database.BackfillDeferred, j*numConversations+priority, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay) + backfill.Insert() } } @@ -246,11 +294,15 @@ func getPartialInfoFromConversation(jid types.JID, conv *waProto.Conversation) * var ( PortalCreationDummyEvent = event.Type{Type: "fi.mau.dummy.portal_created", Class: event.MessageEventType} - BackfillEndDummyEvent = event.Type{Type: "fi.mau.dummy.backfill_end", Class: event.MessageEventType} PreBackfillDummyEvent = event.Type{Type: "fi.mau.dummy.pre_backfill", Class: event.MessageEventType} + + // Marker events for when a backfill finishes + BackfillEndDummyEvent = event.Type{Type: "fi.mau.dummy.backfill_end", Class: event.MessageEventType} + RoomMarker = event.Type{Type: "m.room.marker", Class: event.MessageEventType} + MSC2716Marker = event.Type{Type: "org.matrix.msc2716.marker", Class: event.MessageEventType} ) -func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) { +func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) []id.EventID { portal.backfillLock.Lock() defer portal.backfillLock.Unlock() @@ -375,32 +427,33 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) } } + insertionEventIds := []id.EventID{} + if len(historyBatch.Events) > 0 && len(historyBatch.PrevEventID) > 0 { portal.log.Infofln("Sending %d historical messages...", len(historyBatch.Events)) historyResp, err := portal.MainIntent().BatchSend(portal.MXID, &historyBatch) + insertionEventIds = append(insertionEventIds, historyResp.BaseInsertionEventID) if err != nil { portal.log.Errorln("Error sending batch of historical messages:", err) } else { portal.finishBatch(historyResp.EventIDs, historyBatchInfos) portal.NextBatchID = historyResp.NextBatchID portal.Update() - // If batchID is non-empty, it means this is backfilling very old messages, and we don't need a post-backfill dummy. - if historyBatch.BatchID == "" { - portal.sendPostBackfillDummy(time.UnixMilli(historyBatch.Events[len(historyBatch.Events)-1].Timestamp)) - } } } if len(newBatch.Events) > 0 && len(newBatch.PrevEventID) > 0 { portal.log.Infofln("Sending %d new messages...", len(newBatch.Events)) newResp, err := portal.MainIntent().BatchSend(portal.MXID, &newBatch) + insertionEventIds = append(insertionEventIds, newResp.BaseInsertionEventID) if err != nil { portal.log.Errorln("Error sending batch of new messages:", err) } else { portal.finishBatch(newResp.EventIDs, newBatchInfos) - portal.sendPostBackfillDummy(time.UnixMilli(newBatch.Events[len(newBatch.Events)-1].Timestamp)) } } + + return insertionEventIds } func (portal *Portal) parseWebMessageInfo(source *User, webMsg *waProto.WebMessageInfo) *types.MessageInfo { @@ -530,19 +583,25 @@ func (portal *Portal) finishBatchEvt(info *wrappedInfo, eventID id.EventID) { } } -func (portal *Portal) sendPostBackfillDummy(lastTimestamp time.Time) { - resp, err := portal.MainIntent().SendMessageEvent(portal.MXID, BackfillEndDummyEvent, struct{}{}) - if err != nil { - portal.log.Errorln("Error sending post-backfill dummy event:", err) - return +func (portal *Portal) sendPostBackfillDummy(lastTimestamp time.Time, insertionEventId id.EventID) { + for _, evtType := range []event.Type{BackfillEndDummyEvent, RoomMarker, MSC2716Marker} { + resp, err := portal.MainIntent().SendMessageEvent(portal.MXID, evtType, map[string]interface{}{ + "org.matrix.msc2716.marker.insertion": insertionEventId, + "m.marker.insertion": insertionEventId, + }) + if err != nil { + portal.log.Errorln("Error sending post-backfill dummy event:", err) + return + } + msg := portal.bridge.DB.Message.New() + msg.Chat = portal.Key + msg.MXID = resp.EventID + msg.JID = types.MessageID(resp.EventID) + msg.Timestamp = lastTimestamp.Add(1 * time.Second) + msg.Sent = true + msg.Insert() + } - msg := portal.bridge.DB.Message.New() - msg.Chat = portal.Key - msg.MXID = resp.EventID - msg.JID = types.MessageID(resp.EventID) - msg.Timestamp = lastTimestamp.Add(1 * time.Second) - msg.Sent = true - msg.Insert() } // endregion diff --git a/provisioning.go b/provisioning.go index 622f4c4..8dad45e 100644 --- a/provisioning.go +++ b/provisioning.go @@ -436,6 +436,9 @@ func (prov *ProvisioningAPI) Logout(w http.ResponseWriter, r *http.Request) { user.bridge.Metrics.TrackConnectionState(user.JID, false) user.removeFromJIDMap(BridgeState{StateEvent: StateLoggedOut}) user.DeleteSession() + prov.bridge.DB.BackfillQuery.DeleteAll(user.MXID) + prov.bridge.DB.HistorySyncQuery.DeleteAllConversations(user.MXID) + prov.bridge.DB.HistorySyncQuery.DeleteAllMessages(user.MXID) jsonResponse(w, http.StatusOK, Response{true, "Logged out successfully."}) } diff --git a/user.go b/user.go index 7b08a9a..dde6b7c 100644 --- a/user.go +++ b/user.go @@ -74,6 +74,8 @@ type User struct { groupListCache []*types.GroupInfo groupListCacheLock sync.Mutex groupListCacheTime time.Time + + BackfillQueue *BackfillQueue } func (bridge *Bridge) getUserByMXID(userID id.UserID, onlyIfExists bool) *User { @@ -561,18 +563,11 @@ func (user *User) HandleEvent(event interface{}) { go user.tryAutomaticDoublePuppeting() case *events.OfflineSyncPreview: user.log.Infofln("Server says it's going to send %d messages and %d receipts that were missed during downtime", v.Messages, v.Receipts) - go user.sendBridgeState(BridgeState{ - StateEvent: StateBackfilling, - Message: fmt.Sprintf("backfilling %d messages and %d receipts", v.Messages, v.Receipts), - }) case *events.OfflineSyncCompleted: if !user.PhoneRecentlySeen(true) { user.log.Infofln("Offline sync completed, but phone last seen date is still %s - sending phone offline bridge status", user.PhoneLastSeen) go user.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Error: WAPhoneOffline}) } else { - if user.GetPrevBridgeState().StateEvent == StateBackfilling { - user.log.Infoln("Offline sync completed") - } go user.sendBridgeState(BridgeState{StateEvent: StateConnected}) } case *events.AppStateSyncComplete: