From 1ecdb71ac35de0d27cdb46587088d7686834c9b8 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Thu, 24 Mar 2022 14:14:08 -0600 Subject: [PATCH 01/16] database: add backfill queue and history sync tables --- database/backfillqueue.go | 149 +++++++++ database/database.go | 10 + database/historysync.go | 293 ++++++++++++++++++ .../2022-03-15-prioritized-backfill.go | 54 ++++ .../upgrades/2022-03-18-historysync-store.go | 93 ++++++ database/upgrades/upgrades.go | 2 +- 6 files changed, 600 insertions(+), 1 deletion(-) create mode 100644 database/backfillqueue.go create mode 100644 database/historysync.go create mode 100644 database/upgrades/2022-03-15-prioritized-backfill.go create mode 100644 database/upgrades/2022-03-18-historysync-store.go diff --git a/database/backfillqueue.go b/database/backfillqueue.go new file mode 100644 index 0000000..7f012e3 --- /dev/null +++ b/database/backfillqueue.go @@ -0,0 +1,149 @@ +// mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge. +// Copyright (C) 2021 Tulir Asokan, Sumner Evans +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package database + +import ( + "database/sql" + "errors" + "time" + + log "maunium.net/go/maulogger/v2" + "maunium.net/go/mautrix/id" +) + +type BackfillType int + +const ( + BackfillImmediate BackfillType = 0 + BackfillDeferred = 1 +) + +type BackfillQuery struct { + db *Database + log log.Logger +} + +func (bq *BackfillQuery) New() *Backfill { + return &Backfill{ + db: bq.db, + log: bq.log, + Portal: &PortalKey{}, + } +} + +func (bq *BackfillQuery) NewWithValues(userID id.UserID, backfillType BackfillType, priority int, portal *PortalKey, timeStart *time.Time, timeEnd *time.Time, maxBatchEvents, maxTotalEvents, batchDelay int) *Backfill { + return &Backfill{ + db: bq.db, + log: bq.log, + UserID: userID, + BackfillType: backfillType, + Priority: priority, + Portal: portal, + TimeStart: timeStart, + TimeEnd: timeEnd, + MaxBatchEvents: maxBatchEvents, + MaxTotalEvents: maxTotalEvents, + BatchDelay: batchDelay, + } +} + +const ( + getNextBackfillQuery = ` + SELECT queue_id, user_mxid, type, priority, portal_jid, portal_receiver, time_start, time_end, max_batch_events, max_total_events, batch_delay + FROM backfill_queue + WHERE user_mxid=$1 + AND type=$2 + ORDER BY priority, queue_id + LIMIT 1 + ` +) + +/// Returns the next backfill to perform +func (bq *BackfillQuery) GetNext(userID id.UserID, backfillType BackfillType) (backfill *Backfill) { + rows, err := bq.db.Query(getNextBackfillQuery, userID, backfillType) + defer rows.Close() + if err != nil || rows == nil { + bq.log.Error(err) + return + } + if rows.Next() { + backfill = bq.New().Scan(rows) + } + return +} + +func (bq *BackfillQuery) DeleteAll(userID id.UserID) error { + _, err := bq.db.Exec("DELETE FROM backfill_queue WHERE user_mxid=$1", userID) + return err +} + +type Backfill struct { + db *Database + log log.Logger + + // Fields + QueueID int + UserID id.UserID + BackfillType BackfillType + Priority int + Portal *PortalKey + TimeStart *time.Time + TimeEnd *time.Time + MaxBatchEvents int + MaxTotalEvents int + BatchDelay int +} + +func (b *Backfill) Scan(row Scannable) *Backfill { + err := row.Scan(&b.QueueID, &b.UserID, &b.BackfillType, &b.Priority, &b.Portal.JID, &b.Portal.Receiver, &b.TimeStart, &b.TimeEnd, &b.MaxBatchEvents, &b.MaxTotalEvents, &b.BatchDelay) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + b.log.Errorln("Database scan failed:", err) + } + return nil + } + return b +} + +func (b *Backfill) Insert() { + rows, err := b.db.Query(` + INSERT INTO backfill_queue + (user_mxid, type, priority, portal_jid, portal_receiver, time_start, time_end, max_batch_events, max_total_events, batch_delay) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + RETURNING queue_id + `, b.UserID, b.BackfillType, b.Priority, b.Portal.JID, b.Portal.Receiver, b.TimeStart, b.TimeEnd, b.MaxBatchEvents, b.MaxTotalEvents, b.BatchDelay) + defer rows.Close() + if err != nil || !rows.Next() { + b.log.Warnfln("Failed to insert %v/%s with priority %d: %v", b.BackfillType, b.Portal.JID, b.Priority, err) + return + } + err = rows.Scan(&b.QueueID) + if err != nil { + b.log.Warnfln("Failed to insert %s/%s with priority %s: %v", b.BackfillType, b.Portal.JID, b.Priority, err) + } +} + +func (b *Backfill) Delete() { + if b.QueueID == 0 { + b.log.Errorf("Cannot delete backfill without queue_id. Maybe it wasn't actually inserted in the database?") + return + } + _, err := b.db.Exec("DELETE FROM backfill_queue WHERE queue_id=$1", b.QueueID) + if err != nil { + b.log.Warnfln("Failed to delete %s/%s: %v", b.BackfillType, b.Priority, err) + } +} diff --git a/database/database.go b/database/database.go index 7640871..cb64a12 100644 --- a/database/database.go +++ b/database/database.go @@ -46,6 +46,8 @@ type Database struct { Reaction *ReactionQuery DisappearingMessage *DisappearingMessageQuery + BackfillQuery *BackfillQuery + HistorySyncQuery *HistorySyncQuery } func New(cfg config.DatabaseConfig, baseLog log.Logger) (*Database, error) { @@ -83,6 +85,14 @@ func New(cfg config.DatabaseConfig, baseLog log.Logger) (*Database, error) { db: db, log: db.log.Sub("DisappearingMessage"), } + db.BackfillQuery = &BackfillQuery{ + db: db, + log: db.log.Sub("Backfill"), + } + db.HistorySyncQuery = &HistorySyncQuery{ + db: db, + log: db.log.Sub("HistorySync"), + } db.SetMaxOpenConns(cfg.MaxOpenConns) db.SetMaxIdleConns(cfg.MaxIdleConns) diff --git a/database/historysync.go b/database/historysync.go new file mode 100644 index 0000000..04821a2 --- /dev/null +++ b/database/historysync.go @@ -0,0 +1,293 @@ +// mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge. +// Copyright (C) 2022 Tulir Asokan, Sumner Evans +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package database + +import ( + "database/sql" + "errors" + "fmt" + "time" + + waProto "go.mau.fi/whatsmeow/binary/proto" + "google.golang.org/protobuf/proto" + + _ "github.com/mattn/go-sqlite3" + log "maunium.net/go/maulogger/v2" + "maunium.net/go/mautrix/id" +) + +type HistorySyncQuery struct { + db *Database + log log.Logger +} + +type HistorySyncConversation struct { + db *Database + log log.Logger + + UserID id.UserID + ConversationID string + PortalKey *PortalKey + LastMessageTimestamp time.Time + MuteEndTime time.Time + Archived bool + Pinned uint32 + DisappearingMode waProto.DisappearingMode_DisappearingModeInitiator + EndOfHistoryTransferType waProto.Conversation_ConversationEndOfHistoryTransferType + EphemeralExpiration *uint32 + MarkedAsUnread bool + UnreadCount uint32 +} + +func (hsq *HistorySyncQuery) NewConversation() *HistorySyncConversation { + return &HistorySyncConversation{ + db: hsq.db, + log: hsq.log, + PortalKey: &PortalKey{}, + } +} + +func (hsq *HistorySyncQuery) NewConversationWithValues( + userID id.UserID, + conversationID string, + portalKey *PortalKey, + lastMessageTimestamp, + muteEndTime uint64, + archived bool, + pinned uint32, + disappearingMode waProto.DisappearingMode_DisappearingModeInitiator, + endOfHistoryTransferType waProto.Conversation_ConversationEndOfHistoryTransferType, + ephemeralExpiration *uint32, + markedAsUnread bool, + unreadCount uint32) *HistorySyncConversation { + return &HistorySyncConversation{ + db: hsq.db, + log: hsq.log, + UserID: userID, + ConversationID: conversationID, + PortalKey: portalKey, + LastMessageTimestamp: time.Unix(int64(lastMessageTimestamp), 0), + MuteEndTime: time.Unix(int64(muteEndTime), 0), + Archived: archived, + Pinned: pinned, + DisappearingMode: disappearingMode, + EndOfHistoryTransferType: endOfHistoryTransferType, + EphemeralExpiration: ephemeralExpiration, + MarkedAsUnread: markedAsUnread, + UnreadCount: unreadCount, + } +} + +const ( + getNMostRecentConversations = ` + SELECT user_mxid, conversation_id, portal_jid, portal_receiver, last_message_timestamp, archived, pinned, mute_end_time, disappearing_mode, end_of_history_transfer_type, ephemeral_expiration, marked_as_unread, unread_count + FROM history_sync_conversation + WHERE user_mxid=$1 + ORDER BY last_message_timestamp DESC + LIMIT $2 + ` + getConversationByPortal = ` + SELECT user_mxid, conversation_id, portal_jid, portal_receiver, last_message_timestamp, archived, pinned, mute_end_time, disappearing_mode, end_of_history_transfer_type, ephemeral_expiration, marked_as_unread, unread_count + FROM history_sync_conversation + WHERE user_mxid=$1 + AND portal_jid=$2 + AND portal_receiver=$3 + ` +) + +func (hsc *HistorySyncConversation) Upsert() { + _, err := hsc.db.Exec(` + INSERT INTO history_sync_conversation (user_mxid, conversation_id, portal_jid, portal_receiver, last_message_timestamp, archived, pinned, mute_end_time, disappearing_mode, end_of_history_transfer_type, ephemeral_expiration, marked_as_unread, unread_count) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) + ON CONFLICT (user_mxid, conversation_id) + DO UPDATE SET + portal_jid=EXCLUDED.portal_jid, + portal_receiver=EXCLUDED.portal_receiver, + last_message_timestamp=EXCLUDED.last_message_timestamp, + archived=EXCLUDED.archived, + pinned=EXCLUDED.pinned, + mute_end_time=EXCLUDED.mute_end_time, + disappearing_mode=EXCLUDED.disappearing_mode, + end_of_history_transfer_type=EXCLUDED.end_of_history_transfer_type, + ephemeral_expiration=EXCLUDED.ephemeral_expiration, + marked_as_unread=EXCLUDED.marked_as_unread, + unread_count=EXCLUDED.unread_count + `, + hsc.UserID, + hsc.ConversationID, + hsc.PortalKey.JID.String(), + hsc.PortalKey.Receiver.String(), + hsc.LastMessageTimestamp, + hsc.Archived, + hsc.Pinned, + hsc.MuteEndTime, + hsc.DisappearingMode, + hsc.EndOfHistoryTransferType, + hsc.EphemeralExpiration, + hsc.MarkedAsUnread, + hsc.UnreadCount) + if err != nil { + hsc.log.Warnfln("Failed to insert history sync conversation %s/%s: %v", hsc.UserID, hsc.ConversationID, err) + } +} + +func (hsc *HistorySyncConversation) Scan(row Scannable) *HistorySyncConversation { + err := row.Scan( + &hsc.UserID, + &hsc.ConversationID, + &hsc.PortalKey.JID, + &hsc.PortalKey.Receiver, + &hsc.LastMessageTimestamp, + &hsc.Archived, + &hsc.Pinned, + &hsc.MuteEndTime, + &hsc.DisappearingMode, + &hsc.EndOfHistoryTransferType, + &hsc.EphemeralExpiration, + &hsc.MarkedAsUnread, + &hsc.UnreadCount) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + hsc.log.Errorln("Database scan failed:", err) + } + return nil + } + return hsc +} + +func (hsq *HistorySyncQuery) GetNMostRecentConversations(userID id.UserID, n int) (conversations []*HistorySyncConversation) { + rows, err := hsq.db.Query(getNMostRecentConversations, userID, n) + defer rows.Close() + if err != nil || rows == nil { + return nil + } + for rows.Next() { + conversations = append(conversations, hsq.NewConversation().Scan(rows)) + } + return +} + +func (hsq *HistorySyncQuery) GetConversation(userID id.UserID, portalKey *PortalKey) (conversation *HistorySyncConversation) { + rows, err := hsq.db.Query(getConversationByPortal, userID, portalKey.JID, portalKey.Receiver) + defer rows.Close() + if err != nil || rows == nil { + return nil + } + if rows.Next() { + conversation = hsq.NewConversation().Scan(rows) + } + return +} + +func (hsq *HistorySyncQuery) DeleteAllConversations(userID id.UserID) error { + _, err := hsq.db.Exec("DELETE FROM history_sync_conversation WHERE user_mxid=$1", userID) + return err +} + +const ( + getMessagesBetween = ` + SELECT data + FROM history_sync_message + WHERE user_mxid=$1 + AND conversation_id=$2 + %s + ORDER BY timestamp DESC + %s + ` +) + +type HistorySyncMessage struct { + db *Database + log log.Logger + + UserID id.UserID + ConversationID string + Timestamp time.Time + Data []byte +} + +func (hsq *HistorySyncQuery) NewMessageWithValues(userID id.UserID, conversationID string, message *waProto.HistorySyncMsg) (*HistorySyncMessage, error) { + msgData, err := proto.Marshal(message) + if err != nil { + return nil, err + } + return &HistorySyncMessage{ + db: hsq.db, + log: hsq.log, + UserID: userID, + ConversationID: conversationID, + Timestamp: time.Unix(int64(message.Message.GetMessageTimestamp()), 0), + Data: msgData, + }, nil +} + +func (hsm *HistorySyncMessage) Insert() { + _, err := hsm.db.Exec(` + INSERT INTO history_sync_message (user_mxid, conversation_id, timestamp, data) + VALUES ($1, $2, $3, $4) + `, hsm.UserID, hsm.ConversationID, hsm.Timestamp, hsm.Data) + if err != nil { + hsm.log.Warnfln("Failed to insert history sync message %s/%s: %v", hsm.ConversationID, hsm.Timestamp, err) + } +} + +func (hsq *HistorySyncQuery) GetMessagesBetween(userID id.UserID, conversationID string, startTime, endTime *time.Time, limit int) (messages []*waProto.WebMessageInfo) { + whereClauses := "" + args := []interface{}{userID, conversationID} + argNum := 3 + if startTime != nil { + whereClauses += fmt.Sprintf(" AND timestamp >= $%d", argNum) + args = append(args, startTime) + argNum++ + } + if endTime != nil { + whereClauses += fmt.Sprintf(" AND timestamp <= $%d", argNum) + args = append(args, endTime) + } + + limitClause := "" + if limit > 0 { + limitClause = fmt.Sprintf("LIMIT %d", limit) + } + + rows, err := hsq.db.Query(fmt.Sprintf(getMessagesBetween, whereClauses, limitClause), args...) + defer rows.Close() + if err != nil || rows == nil { + return nil + } + var msgData []byte + for rows.Next() { + err := rows.Scan(&msgData) + if err != nil { + hsq.log.Error("Database scan failed: %v", err) + continue + } + var historySyncMsg waProto.HistorySyncMsg + err = proto.Unmarshal(msgData, &historySyncMsg) + if err != nil { + hsq.log.Errorf("Failed to unmarshal history sync message: %v", err) + continue + } + messages = append(messages, historySyncMsg.Message) + } + return +} + +func (hsq *HistorySyncQuery) DeleteAllMessages(userID id.UserID) error { + _, err := hsq.db.Exec("DELETE FROM history_sync_message WHERE user_mxid=$1", userID) + return err +} diff --git a/database/upgrades/2022-03-15-prioritized-backfill.go b/database/upgrades/2022-03-15-prioritized-backfill.go new file mode 100644 index 0000000..96fc9cd --- /dev/null +++ b/database/upgrades/2022-03-15-prioritized-backfill.go @@ -0,0 +1,54 @@ +package upgrades + +import "database/sql" + +func init() { + upgrades[39] = upgrade{"Add backfill queue", func(tx *sql.Tx, ctx context) error { + _, err := tx.Exec(` + CREATE TABLE backfill_queue ( + queue_id INTEGER PRIMARY KEY, + user_mxid TEXT, + type INTEGER NOT NULL, + priority INTEGER NOT NULL, + portal_jid VARCHAR(255), + portal_receiver VARCHAR(255), + time_start TIMESTAMP, + time_end TIMESTAMP, + max_batch_events INTEGER NOT NULL, + max_total_events INTEGER, + batch_delay INTEGER, + + FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE + ) + `) + if err != nil { + return err + } + + // The queue_id needs to auto-increment every insertion. For SQLite, + // INTEGER PRIMARY KEY is an alias for the ROWID, so it will + // auto-increment. See https://sqlite.org/lang_createtable.html#rowid + // For Postgres, we have to manually add the sequence. + if ctx.dialect == Postgres { + _, err = tx.Exec(` + CREATE SEQUENCE backfill_queue_queue_id_seq + START WITH 1 + OWNED BY backfill_queue.queue_id + `) + if err != nil { + return err + } + _, err = tx.Exec(` + ALTER TABLE backfill_queue + ALTER COLUMN queue_id + SET DEFAULT nextval('backfill_queue_queue_id_seq'::regclass) + `) + if err != nil { + return err + } + } + + return err + }} +} diff --git a/database/upgrades/2022-03-18-historysync-store.go b/database/upgrades/2022-03-18-historysync-store.go new file mode 100644 index 0000000..bf53ab1 --- /dev/null +++ b/database/upgrades/2022-03-18-historysync-store.go @@ -0,0 +1,93 @@ +package upgrades + +import ( + "database/sql" +) + +func init() { + upgrades[40] = upgrade{"Store history syncs for later backfills", func(tx *sql.Tx, ctx context) error { + if ctx.dialect == Postgres { + _, err := tx.Exec(` + CREATE TABLE history_sync_conversation ( + user_mxid TEXT, + conversation_id TEXT, + portal_jid TEXT, + portal_receiver TEXT, + last_message_timestamp TIMESTAMP, + archived BOOLEAN, + pinned INTEGER, + mute_end_time TIMESTAMP, + disappearing_mode INTEGER, + end_of_history_transfer_type INTEGER, + ephemeral_expiration INTEGER, + marked_as_unread BOOLEAN, + unread_count INTEGER, + + PRIMARY KEY (user_mxid, conversation_id), + UNIQUE (conversation_id), + FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE ON UPDATE CASCADE + ) + `) + if err != nil { + return err + } + _, err = tx.Exec(` + CREATE TABLE history_sync_message ( + user_mxid TEXT, + conversation_id TEXT, + timestamp TIMESTAMP, + data BYTEA, + + FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY (conversation_id) REFERENCES history_sync_conversation(conversation_id) ON DELETE CASCADE + ) + `) + if err != nil { + return err + } + } else if ctx.dialect == SQLite { + _, err := tx.Exec(` + CREATE TABLE history_sync_conversation ( + user_mxid TEXT, + conversation_id TEXT, + portal_jid TEXT, + portal_receiver TEXT, + last_message_timestamp DATETIME, + archived INTEGER, + pinned INTEGER, + mute_end_time DATETIME, + disappearing_mode INTEGER, + end_of_history_transfer_type INTEGER, + ephemeral_expiration INTEGER, + marked_as_unread INTEGER, + unread_count INTEGER, + + PRIMARY KEY (user_mxid, conversation_id), + UNIQUE (conversation_id), + FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE ON UPDATE CASCADE + ) + `) + if err != nil { + return err + } + _, err = tx.Exec(` + CREATE TABLE history_sync_message ( + user_mxid TEXT, + conversation_id TEXT, + timestamp DATETIME, + data BLOB, + + FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY (conversation_id) REFERENCES history_sync_conversation(conversation_id) ON DELETE CASCADE + ) + `) + if err != nil { + return err + } + } + + return nil + }} +} diff --git a/database/upgrades/upgrades.go b/database/upgrades/upgrades.go index f4b1ded..98044a4 100644 --- a/database/upgrades/upgrades.go +++ b/database/upgrades/upgrades.go @@ -40,7 +40,7 @@ type upgrade struct { fn upgradeFunc } -const NumberOfUpgrades = 39 +const NumberOfUpgrades = 41 var upgrades [NumberOfUpgrades]upgrade From 536d340f20c8cfc50d2a73660ef9a3b28257c9bd Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Thu, 24 Mar 2022 14:18:32 -0600 Subject: [PATCH 02/16] config: add settings for prioritized backfill bridge.history_sync.max_initial_conversations: This setting determines the maximum number of initial conversations that should be backfilled. The data for all the other conversations will be stored in the database for backfill at a later time. bridge.history_sync.immediate: These settings are for the initial backfill that should be performed to populate each of the initial chats with a few messages so that users can continue their conversations without loosing context. bridge.history_sync.deferred: These settings are for backfilling the rest of the chat history that was not covered by the immediate backfills. These should generally be done at a slower pace to avoid overloading the homeserver. --- config/bridge.go | 23 +++++++++++++++----- config/upgrade.go | 5 ++++- example-config.yaml | 52 ++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 69 insertions(+), 11 deletions(-) diff --git a/config/bridge.go b/config/bridge.go index 7ffe458..840472d 100644 --- a/config/bridge.go +++ b/config/bridge.go @@ -28,6 +28,12 @@ import ( "maunium.net/go/mautrix/id" ) +type DeferredConfig struct { + StartDaysAgo int `yaml:"start_days_ago"` + MaxBatchEvents int `yaml:"max_batch_events"` + BatchDelay int `yaml:"batch_delay"` +} + type BridgeConfig struct { UsernameTemplate string `yaml:"username_template"` DisplaynameTemplate string `yaml:"displayname_template"` @@ -40,11 +46,18 @@ type BridgeConfig struct { IdentityChangeNotices bool `yaml:"identity_change_notices"` HistorySync struct { - CreatePortals bool `yaml:"create_portals"` - MaxAge int64 `yaml:"max_age"` - Backfill bool `yaml:"backfill"` - DoublePuppetBackfill bool `yaml:"double_puppet_backfill"` - RequestFullSync bool `yaml:"request_full_sync"` + CreatePortals bool `yaml:"create_portals"` + Backfill bool `yaml:"backfill"` + DoublePuppetBackfill bool `yaml:"double_puppet_backfill"` + RequestFullSync bool `yaml:"request_full_sync"` + MaxInitialConversations int `yaml:"max_initial_conversations"` + + Immediate struct { + WorkerCount int `yaml:"worker_count"` + MaxEvents int `yaml:"max_events"` + } `yaml:"immediate"` + + Deferred []DeferredConfig `yaml:"deferred"` } `yaml:"history_sync"` UserAvatarSync bool `yaml:"user_avatar_sync"` BridgeMatrixLeave bool `yaml:"bridge_matrix_leave"` diff --git a/config/upgrade.go b/config/upgrade.go index 123ea30..fdf06a0 100644 --- a/config/upgrade.go +++ b/config/upgrade.go @@ -78,10 +78,13 @@ func (helper *UpgradeHelper) doUpgrade() { helper.Copy(Bool, "bridge", "call_start_notices") helper.Copy(Bool, "bridge", "identity_change_notices") helper.Copy(Bool, "bridge", "history_sync", "create_portals") - helper.Copy(Int, "bridge", "history_sync", "max_age") helper.Copy(Bool, "bridge", "history_sync", "backfill") helper.Copy(Bool, "bridge", "history_sync", "double_puppet_backfill") helper.Copy(Bool, "bridge", "history_sync", "request_full_sync") + helper.Copy(Int, "bridge", "history_sync", "max_initial_conversations") + helper.Copy(Int, "bridge", "history_sync", "immediate", "worker_count") + helper.Copy(Int, "bridge", "history_sync", "immediate", "max_events") + helper.Copy(List, "bridge", "history_sync", "deferred") helper.Copy(Bool, "bridge", "user_avatar_sync") helper.Copy(Bool, "bridge", "bridge_matrix_leave") helper.Copy(Bool, "bridge", "sync_with_custom_puppets") diff --git a/example-config.yaml b/example-config.yaml index 93100a6..1ddf877 100644 --- a/example-config.yaml +++ b/example-config.yaml @@ -115,14 +115,10 @@ bridge: # Should another user's cryptographic identity changing send a message to Matrix? identity_change_notices: false portal_message_buffer: 128 - # Settings for handling history sync payloads. These settings only apply right after login, - # because the phone only sends the history sync data once, and there's no way to re-request it - # (other than logging out and back in again). + # Settings for handling history sync payloads. history_sync: # Should the bridge create portals for chats in the history sync payload? create_portals: true - # Maximum age of chats in seconds to create portals for. Set to 0 to create portals for all chats in sync payload. - max_age: 604800 # Enable backfilling history sync payloads from WhatsApp using batch sending? # This requires a server with MSC2716 support, which is currently an experimental feature in synapse. # It can be enabled by setting experimental_features -> msc2716_enabled to true in homeserver.yaml. @@ -137,6 +133,52 @@ bridge: # Should the bridge request a full sync from the phone when logging in? # This bumps the size of history syncs from 3 months to 1 year. request_full_sync: false + # The maximum number of initial conversations that should be synced. + # Other conversations will be backfilled on demand when the start PM + # provisioning endpoint is used or when a message comes in from that + # chat. + max_initial_conversations: 10 + # Settings for immediate backfills. These backfills should generally be + # small and their main purpose is to populate each of the initial chats + # (as configured by max_initial_conversations) with a few messages so + # that you can continue conversations without loosing context. + immediate: + # The number of concurrent backfill workers to create for immediate + # backfills. Note that using more than one worker could cause the + # room list to jump around since there are no guarantees about the + # order in which the backfills will complete. + worker_count: 1 + # The maximum number of events to backfill initially. + max_events: 10 + # Settings for deferred backfills. The purpose of these backfills are + # to fill in the rest of the chat history that was not covered by the + # immediate backfills. These backfills generally should happen at a + # slower pace so as not to overload the homeserver. + # Each deferred backfill config should define a "stage" of backfill + # (i.e. the last week of messages). The fields are as follows: + # - start_days_ago: the number of days ago to start backfilling from. + # To indicate the start of time, use -1. For example, for a week ago, + # use 7. + # - max_batch_events: the number of events to send per batch. + # - batch_delay: the number of seconds to wait before backfilling each + # batch. + deferred: + # Last Week + - start_days_ago: 7 + max_batch_events: 20 + batch_delay: 5 + # Last Month + - start_days_ago: 30 + max_batch_events: 50 + batch_delay: 10 + # Last 3 months + - start_days_ago: 90 + max_batch_events: 100 + batch_delay: 10 + # The start of time + - start_days_ago: -1 + max_batch_events: 500 + batch_delay: 10 # Should puppet avatars be fetched from the server even if an avatar is already set? user_avatar_sync: true # Should Matrix users leaving groups be bridged to WhatsApp? From 12a23e2ca568764b6db538f008149a6af2266709 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Thu, 24 Mar 2022 14:21:23 -0600 Subject: [PATCH 03/16] historysync: refactor to utilize backfill queue Also sends the `m.room.marker` event when a backfill stage is complete. --- backfillqueue.go | 69 +++++++ database/historysync.go | 5 +- historysync.go | 435 +++++++++++++++++++++++----------------- provisioning.go | 3 + user.go | 9 +- 5 files changed, 325 insertions(+), 196 deletions(-) create mode 100644 backfillqueue.go diff --git a/backfillqueue.go b/backfillqueue.go new file mode 100644 index 0000000..8d0d9c5 --- /dev/null +++ b/backfillqueue.go @@ -0,0 +1,69 @@ +// mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge. +// Copyright (C) 2021 Tulir Asokan, Sumner Evans +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package main + +import ( + "time" + + log "maunium.net/go/maulogger/v2" + "maunium.net/go/mautrix-whatsapp/database" +) + +type BackfillQueue struct { + BackfillQuery *database.BackfillQuery + ImmediateBackfillRequests chan *database.Backfill + DeferredBackfillRequests chan *database.Backfill + ReCheckQueue chan bool + + log log.Logger +} + +func (bq *BackfillQueue) RunLoops(user *User) { + go bq.immediateBackfillLoop(user) + bq.deferredBackfillLoop(user) +} + +func (bq *BackfillQueue) immediateBackfillLoop(user *User) { + for { + if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); backfill != nil { + bq.ImmediateBackfillRequests <- backfill + backfill.Delete() + } else { + select { + case <-bq.ReCheckQueue: + case <-time.After(10 * time.Second): + } + } + } +} + +func (bq *BackfillQueue) deferredBackfillLoop(user *User) { + for { + // Finish all immediate backfills before doing the deferred ones. + if immediate := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); immediate != nil { + time.Sleep(10 * time.Second) + continue + } + + if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillDeferred); backfill != nil { + bq.DeferredBackfillRequests <- backfill + backfill.Delete() + } else { + time.Sleep(10 * time.Second) + } + } +} diff --git a/database/historysync.go b/database/historysync.go index 04821a2..89fefb6 100644 --- a/database/historysync.go +++ b/database/historysync.go @@ -117,7 +117,10 @@ func (hsc *HistorySyncConversation) Upsert() { DO UPDATE SET portal_jid=EXCLUDED.portal_jid, portal_receiver=EXCLUDED.portal_receiver, - last_message_timestamp=EXCLUDED.last_message_timestamp, + last_message_timestamp=CASE + WHEN EXCLUDED.last_message_timestamp > history_sync_conversation.last_message_timestamp THEN EXCLUDED.last_message_timestamp + ELSE history_sync_conversation.last_message_timestamp + END, archived=EXCLUDED.archived, pinned=EXCLUDED.pinned, mute_end_time=EXCLUDED.mute_end_time, diff --git a/historysync.go b/historysync.go index 5fece97..75abc60 100644 --- a/historysync.go +++ b/historysync.go @@ -18,8 +18,6 @@ package main import ( "fmt" - "sort" - "sync" "time" waProto "go.mau.fi/whatsmeow/binary/proto" @@ -35,12 +33,6 @@ import ( // region User history sync handling -type portalToBackfill struct { - portal *Portal - conv *waProto.Conversation - msgs []*waProto.WebMessageInfo -} - type wrappedInfo struct { *types.MessageInfo Type database.MessageType @@ -50,118 +42,223 @@ type wrappedInfo struct { ExpiresIn uint32 } -type conversationList []*waProto.Conversation - -var _ sort.Interface = (conversationList)(nil) - -func (c conversationList) Len() int { - return len(c) -} - -func (c conversationList) Less(i, j int) bool { - return getConversationTimestamp(c[i]) < getConversationTimestamp(c[j]) -} - -func (c conversationList) Swap(i, j int) { - c[i], c[j] = c[j], c[i] -} - func (user *User) handleHistorySyncsLoop() { + reCheckQueue := make(chan bool, 1) + if user.bridge.Config.Bridge.HistorySync.Backfill { + // Start the backfill queue. + user.BackfillQueue = &BackfillQueue{ + BackfillQuery: user.bridge.DB.BackfillQuery, + ImmediateBackfillRequests: make(chan *database.Backfill, 1), + DeferredBackfillRequests: make(chan *database.Backfill, 1), + ReCheckQueue: make(chan bool, 1), + log: user.log.Sub("BackfillQueue"), + } + reCheckQueue = user.BackfillQueue.ReCheckQueue + + // Immediate backfills can be done in parallel + for i := 0; i < user.bridge.Config.Bridge.HistorySync.Immediate.WorkerCount; i++ { + go user.handleBackfillRequestsLoop(user.BackfillQueue.ImmediateBackfillRequests) + } + + // Deferred backfills should be handled synchronously so as not to + // overload the homeserver. Users can configure their backfill stages + // to be more or less aggressive with backfilling at this stage. + go user.handleBackfillRequestsLoop(user.BackfillQueue.DeferredBackfillRequests) + go user.BackfillQueue.RunLoops(user) + } + + // Always save the history syncs for the user. If they want to enable + // backfilling in the future, we will have it in the database. for evt := range user.historySyncs { - go user.sendBridgeState(BridgeState{StateEvent: StateBackfilling}) - user.handleHistorySync(evt.Data) - if len(user.historySyncs) == 0 && user.IsConnected() { - go user.sendBridgeState(BridgeState{StateEvent: StateConnected}) + user.handleHistorySync(reCheckQueue, evt.Data) + } +} + +func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Backfill) { + for req := range backfillRequests { + user.log.Infof("Backfill request: %v", req) + conv := user.bridge.DB.HistorySyncQuery.GetConversation(user.MXID, req.Portal) + + // Update the client store with basic chat settings. + if conv.MuteEndTime.After(time.Now()) { + user.Client.Store.ChatSettings.PutMutedUntil(conv.PortalKey.JID, conv.MuteEndTime) + } + if conv.Archived { + user.Client.Store.ChatSettings.PutArchived(conv.PortalKey.JID, true) + } + if conv.Pinned > 0 { + user.Client.Store.ChatSettings.PutPinned(conv.PortalKey.JID, true) + } + + portal := user.GetPortalByJID(conv.PortalKey.JID) + if conv.EphemeralExpiration != nil && portal.ExpirationTime != *conv.EphemeralExpiration { + portal.ExpirationTime = *conv.EphemeralExpiration + portal.Update() + } + + if !user.shouldCreatePortalForHistorySync(conv, portal) { + continue + } + + if len(portal.MXID) == 0 { + user.log.Debugln("Creating portal for", portal.Key.JID, "as part of history sync handling") + err := portal.CreateMatrixRoom(user, nil, true) + if err != nil { + user.log.Warnfln("Failed to create room for %s during backfill: %v", portal.Key.JID, err) + continue + } + } else { + portal.UpdateMatrixRoom(user, nil) + } + + allMsgs := user.bridge.DB.HistorySyncQuery.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, req.TimeEnd, req.MaxTotalEvents) + + if len(allMsgs) > 0 { + user.log.Debugf("Backfilling %d messages in %s, %d messages at a time", len(allMsgs), portal.Key.JID, req.MaxBatchEvents) + toBackfill := allMsgs[0:] + insertionEventIds := []id.EventID{} + for { + if len(toBackfill) == 0 { + break + } + + time.Sleep(time.Duration(req.BatchDelay) * time.Second) + + var msgs []*waProto.WebMessageInfo + if len(toBackfill) <= req.MaxBatchEvents { + msgs = toBackfill + toBackfill = toBackfill[0:0] + } else { + msgs = toBackfill[len(toBackfill)-req.MaxBatchEvents:] + toBackfill = toBackfill[:len(toBackfill)-req.MaxBatchEvents] + } + + if len(msgs) > 0 { + user.log.Debugf("Backfilling %d messages in %s", len(msgs), portal.Key.JID) + insertionEventIds = append(insertionEventIds, portal.backfill(user, msgs)...) + } + } + user.log.Debugf("Finished backfilling %d messages in %s", len(allMsgs), portal.Key.JID) + if len(insertionEventIds) > 0 { + portal.sendPostBackfillDummy( + time.Unix(int64(allMsgs[len(allMsgs)-1].GetMessageTimestamp()), 0), + insertionEventIds[0]) + } + } else { + user.log.Debugfln("Not backfilling %s: no bridgeable messages found", portal.Key.JID) + } + if !conv.MarkedAsUnread && conv.UnreadCount == 0 { + user.markSelfReadFull(portal) } } } -func (user *User) handleHistorySync(evt *waProto.HistorySync) { +func (user *User) shouldCreatePortalForHistorySync(conv *database.HistorySyncConversation, portal *Portal) bool { + if len(portal.MXID) > 0 { + user.log.Debugfln("Portal for %s already exists, ensuring user is invited", portal.Key.JID) + portal.ensureUserInvited(user) + // Portal exists, let backfill continue + return true + } else if !user.bridge.Config.Bridge.HistorySync.CreatePortals { + user.log.Debugfln("Not creating portal for %s: creating rooms from history sync is disabled", portal.Key.JID) + } else { + // Portal doesn't exist, but should be created + return true + } + // Portal shouldn't be created, reason logged above + return false +} + +func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.HistorySync) { if evt == nil || evt.SyncType == nil || evt.GetSyncType() == waProto.HistorySync_INITIAL_STATUS_V3 || evt.GetSyncType() == waProto.HistorySync_PUSH_NAME { return } - description := fmt.Sprintf("type %s, %d conversations, chunk order %d, progress %d%%", evt.GetSyncType(), len(evt.GetConversations()), evt.GetChunkOrder(), evt.GetProgress()) - user.log.Infoln("Handling history sync with", description) + description := fmt.Sprintf("type %s, %d conversations, chunk order %d", evt.GetSyncType(), len(evt.GetConversations()), evt.GetChunkOrder()) + user.log.Infoln("Storing history sync with", description) - conversations := conversationList(evt.GetConversations()) - // We want to handle recent conversations first - sort.Sort(sort.Reverse(conversations)) - portalsToBackfill := make(chan portalToBackfill, len(conversations)) - - var backfillWait sync.WaitGroup - backfillWait.Add(1) - go user.backfillLoop(portalsToBackfill, backfillWait.Done) - for _, conv := range conversations { - user.handleHistorySyncConversation(conv, portalsToBackfill) - } - close(portalsToBackfill) - backfillWait.Wait() - user.log.Infoln("Finished handling history sync with", description) -} - -func (user *User) backfillLoop(ch chan portalToBackfill, done func()) { - defer done() - for ptb := range ch { - if len(ptb.msgs) > 0 { - user.log.Debugln("Bridging history sync payload for", ptb.portal.Key.JID) - ptb.portal.backfill(user, ptb.msgs) - } else { - user.log.Debugfln("Not backfilling %s: no bridgeable messages found", ptb.portal.Key.JID) - } - if !ptb.conv.GetMarkedAsUnread() && ptb.conv.GetUnreadCount() == 0 { - user.markSelfReadFull(ptb.portal) - } - } -} - -func (user *User) handleHistorySyncConversation(conv *waProto.Conversation, portalsToBackfill chan portalToBackfill) { - jid, err := types.ParseJID(conv.GetId()) - if err != nil { - user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.GetId(), err) - return - } - - // Update the client store with basic chat settings. - muteEnd := time.Unix(int64(conv.GetMuteEndTime()), 0) - if muteEnd.After(time.Now()) { - _ = user.Client.Store.ChatSettings.PutMutedUntil(jid, muteEnd) - } - if conv.GetArchived() { - _ = user.Client.Store.ChatSettings.PutArchived(jid, true) - } - if conv.GetPinned() > 0 { - _ = user.Client.Store.ChatSettings.PutPinned(jid, true) - } - - portal := user.GetPortalByJID(jid) - if conv.EphemeralExpiration != nil && portal.ExpirationTime != conv.GetEphemeralExpiration() { - portal.ExpirationTime = conv.GetEphemeralExpiration() - portal.Update() - } - // Check if portal is too old or doesn't contain anything we can bridge. - if !user.shouldCreatePortalForHistorySync(conv, portal) { - return - } - - var msgs []*waProto.WebMessageInfo - if user.bridge.Config.Bridge.HistorySync.Backfill { - msgs = filterMessagesToBackfill(conv.GetMessages()) - } - ptb := portalToBackfill{portal: portal, conv: conv, msgs: msgs} - if len(portal.MXID) == 0 { - user.log.Debugln("Creating portal for", portal.Key.JID, "as part of history sync handling") - err = portal.CreateMatrixRoom(user, getPartialInfoFromConversation(jid, conv), false) + for _, conv := range evt.GetConversations() { + jid, err := types.ParseJID(conv.GetId()) if err != nil { - user.log.Warnfln("Failed to create room for %s during backfill: %v", portal.Key.JID, err) - return + user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.GetId(), err) + continue + } + portal := user.GetPortalByJID(jid) + + historySyncConversation := user.bridge.DB.HistorySyncQuery.NewConversationWithValues( + user.MXID, + conv.GetId(), + &portal.Key, + getConversationTimestamp(conv), + conv.GetMuteEndTime(), + conv.GetArchived(), + conv.GetPinned(), + conv.GetDisappearingMode().GetInitiator(), + conv.GetEndOfHistoryTransferType(), + conv.EphemeralExpiration, + conv.GetMarkedAsUnread(), + conv.GetUnreadCount()) + historySyncConversation.Upsert() + + for _, msg := range conv.GetMessages() { + // Don't store messages that will just be skipped. + wmi := msg.GetMessage() + msgType := getMessageType(wmi.GetMessage()) + if msgType == "unknown" || msgType == "ignore" || msgType == "unknown_protocol" { + continue + } + + // Don't store unsupported messages. + if !containsSupportedMessage(msg.GetMessage().GetMessage()) { + continue + } + + message, err := user.bridge.DB.HistorySyncQuery.NewMessageWithValues(user.MXID, conv.GetId(), msg) + if err != nil { + user.log.Warnf("Failed to save message %s in %s. Error: %+v", msg.Message.Key.Id, conv.GetId(), err) + continue + } + message.Insert() } - } else { - portal.UpdateMatrixRoom(user, nil) } - if !user.bridge.Config.Bridge.HistorySync.Backfill { - user.log.Debugln("Backfill is disabled, not bridging history sync payload for", portal.Key.JID) - } else { - portalsToBackfill <- ptb + + // If this was the initial bootstrap, enqueue immediate backfills for the + // most recent portals. If it's the last history sync event, start + // backfilling the rest of the history of the portals. + historySyncConfig := user.bridge.Config.Bridge.HistorySync + if historySyncConfig.Backfill && (evt.GetSyncType() == waProto.HistorySync_FULL || evt.GetSyncType() == waProto.HistorySync_INITIAL_BOOTSTRAP) { + nMostRecent := user.bridge.DB.HistorySyncQuery.GetNMostRecentConversations(user.MXID, user.bridge.Config.Bridge.HistorySync.MaxInitialConversations) + for i, conv := range nMostRecent { + jid, err := types.ParseJID(conv.ConversationID) + if err != nil { + user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.ConversationID, err) + continue + } + portal := user.GetPortalByJID(jid) + + switch evt.GetSyncType() { + case waProto.HistorySync_INITIAL_BOOTSTRAP: + // Enqueue immediate backfills for the most recent messages first. + maxMessages := historySyncConfig.Immediate.MaxEvents + initialBackfill := user.bridge.DB.BackfillQuery.NewWithValues(user.MXID, database.BackfillImmediate, i, &portal.Key, nil, nil, maxMessages, maxMessages, 0) + initialBackfill.Insert() + + case waProto.HistorySync_FULL: + // Enqueue deferred backfills as configured. + for j, backfillStage := range historySyncConfig.Deferred { + var startDate *time.Time = nil + if backfillStage.StartDaysAgo > 0 { + startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo) + startDate = &startDaysAgo + } + backfill := user.bridge.DB.BackfillQuery.NewWithValues( + user.MXID, database.BackfillDeferred, j*len(nMostRecent)+i, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay) + backfill.Insert() + } + } + } + + // Tell the queue to check for new backfill requests. + reCheckQueue <- true } } @@ -173,71 +270,22 @@ func getConversationTimestamp(conv *waProto.Conversation) uint64 { return convTs } -func (user *User) shouldCreatePortalForHistorySync(conv *waProto.Conversation, portal *Portal) bool { - maxAge := user.bridge.Config.Bridge.HistorySync.MaxAge - minLastMsgToCreate := time.Now().Add(-time.Duration(maxAge) * time.Second) - lastMsg := time.Unix(int64(getConversationTimestamp(conv)), 0) - - if len(portal.MXID) > 0 { - user.log.Debugfln("Portal for %s already exists, ensuring user is invited", portal.Key.JID) - portal.ensureUserInvited(user) - // Portal exists, let backfill continue - return true - } else if !user.bridge.Config.Bridge.HistorySync.CreatePortals { - user.log.Debugfln("Not creating portal for %s: creating rooms from history sync is disabled", portal.Key.JID) - } else if !containsSupportedMessages(conv) { - user.log.Debugfln("Not creating portal for %s: no interesting messages found", portal.Key.JID) - } else if maxAge > 0 && !lastMsg.After(minLastMsgToCreate) { - user.log.Debugfln("Not creating portal for %s: last message older than limit (%s)", portal.Key.JID, lastMsg) - } else { - // Portal doesn't exist, but should be created - return true - } - // Portal shouldn't be created, reason logged above - return false +func (user *User) EnqueueImmedateBackfill(portal *Portal, priority int) { + maxMessages := user.bridge.Config.Bridge.HistorySync.Immediate.MaxEvents + initialBackfill := user.bridge.DB.BackfillQuery.NewWithValues(user.MXID, database.BackfillImmediate, priority, &portal.Key, nil, nil, maxMessages, maxMessages, 0) + initialBackfill.Insert() } -func filterMessagesToBackfill(messages []*waProto.HistorySyncMsg) []*waProto.WebMessageInfo { - filtered := make([]*waProto.WebMessageInfo, 0, len(messages)) - for _, msg := range messages { - wmi := msg.GetMessage() - msgType := getMessageType(wmi.GetMessage()) - if msgType == "unknown" || msgType == "ignore" || msgType == "unknown_protocol" { - continue - } else { - filtered = append(filtered, wmi) +func (user *User) EnqueueDeferredBackfills(portal *Portal, numConversations, priority int) { + for j, backfillStage := range user.bridge.Config.Bridge.HistorySync.Deferred { + var startDate *time.Time = nil + if backfillStage.StartDaysAgo > 0 { + startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo) + startDate = &startDaysAgo } - } - return filtered -} - -func containsSupportedMessages(conv *waProto.Conversation) bool { - for _, msg := range conv.GetMessages() { - if containsSupportedMessage(msg.GetMessage().GetMessage()) { - return true - } - } - return false -} - -func getPartialInfoFromConversation(jid types.JID, conv *waProto.Conversation) *types.GroupInfo { - // TODO broadcast list info? - if jid.Server != types.GroupServer { - return nil - } - participants := make([]types.GroupParticipant, len(conv.GetParticipant())) - for i, pcp := range conv.GetParticipant() { - participantJID, _ := types.ParseJID(pcp.GetUserJid()) - participants[i] = types.GroupParticipant{ - JID: participantJID, - IsAdmin: pcp.GetRank() == waProto.GroupParticipant_ADMIN, - IsSuperAdmin: pcp.GetRank() == waProto.GroupParticipant_SUPERADMIN, - } - } - return &types.GroupInfo{ - JID: jid, - GroupName: types.GroupName{Name: conv.GetName()}, - Participants: participants, + backfill := user.bridge.DB.BackfillQuery.NewWithValues( + user.MXID, database.BackfillDeferred, j*numConversations+priority, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay) + backfill.Insert() } } @@ -246,11 +294,15 @@ func getPartialInfoFromConversation(jid types.JID, conv *waProto.Conversation) * var ( PortalCreationDummyEvent = event.Type{Type: "fi.mau.dummy.portal_created", Class: event.MessageEventType} - BackfillEndDummyEvent = event.Type{Type: "fi.mau.dummy.backfill_end", Class: event.MessageEventType} PreBackfillDummyEvent = event.Type{Type: "fi.mau.dummy.pre_backfill", Class: event.MessageEventType} + + // Marker events for when a backfill finishes + BackfillEndDummyEvent = event.Type{Type: "fi.mau.dummy.backfill_end", Class: event.MessageEventType} + RoomMarker = event.Type{Type: "m.room.marker", Class: event.MessageEventType} + MSC2716Marker = event.Type{Type: "org.matrix.msc2716.marker", Class: event.MessageEventType} ) -func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) { +func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) []id.EventID { portal.backfillLock.Lock() defer portal.backfillLock.Unlock() @@ -375,32 +427,33 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) } } + insertionEventIds := []id.EventID{} + if len(historyBatch.Events) > 0 && len(historyBatch.PrevEventID) > 0 { portal.log.Infofln("Sending %d historical messages...", len(historyBatch.Events)) historyResp, err := portal.MainIntent().BatchSend(portal.MXID, &historyBatch) + insertionEventIds = append(insertionEventIds, historyResp.BaseInsertionEventID) if err != nil { portal.log.Errorln("Error sending batch of historical messages:", err) } else { portal.finishBatch(historyResp.EventIDs, historyBatchInfos) portal.NextBatchID = historyResp.NextBatchID portal.Update() - // If batchID is non-empty, it means this is backfilling very old messages, and we don't need a post-backfill dummy. - if historyBatch.BatchID == "" { - portal.sendPostBackfillDummy(time.UnixMilli(historyBatch.Events[len(historyBatch.Events)-1].Timestamp)) - } } } if len(newBatch.Events) > 0 && len(newBatch.PrevEventID) > 0 { portal.log.Infofln("Sending %d new messages...", len(newBatch.Events)) newResp, err := portal.MainIntent().BatchSend(portal.MXID, &newBatch) + insertionEventIds = append(insertionEventIds, newResp.BaseInsertionEventID) if err != nil { portal.log.Errorln("Error sending batch of new messages:", err) } else { portal.finishBatch(newResp.EventIDs, newBatchInfos) - portal.sendPostBackfillDummy(time.UnixMilli(newBatch.Events[len(newBatch.Events)-1].Timestamp)) } } + + return insertionEventIds } func (portal *Portal) parseWebMessageInfo(source *User, webMsg *waProto.WebMessageInfo) *types.MessageInfo { @@ -530,19 +583,25 @@ func (portal *Portal) finishBatchEvt(info *wrappedInfo, eventID id.EventID) { } } -func (portal *Portal) sendPostBackfillDummy(lastTimestamp time.Time) { - resp, err := portal.MainIntent().SendMessageEvent(portal.MXID, BackfillEndDummyEvent, struct{}{}) - if err != nil { - portal.log.Errorln("Error sending post-backfill dummy event:", err) - return +func (portal *Portal) sendPostBackfillDummy(lastTimestamp time.Time, insertionEventId id.EventID) { + for _, evtType := range []event.Type{BackfillEndDummyEvent, RoomMarker, MSC2716Marker} { + resp, err := portal.MainIntent().SendMessageEvent(portal.MXID, evtType, map[string]interface{}{ + "org.matrix.msc2716.marker.insertion": insertionEventId, + "m.marker.insertion": insertionEventId, + }) + if err != nil { + portal.log.Errorln("Error sending post-backfill dummy event:", err) + return + } + msg := portal.bridge.DB.Message.New() + msg.Chat = portal.Key + msg.MXID = resp.EventID + msg.JID = types.MessageID(resp.EventID) + msg.Timestamp = lastTimestamp.Add(1 * time.Second) + msg.Sent = true + msg.Insert() + } - msg := portal.bridge.DB.Message.New() - msg.Chat = portal.Key - msg.MXID = resp.EventID - msg.JID = types.MessageID(resp.EventID) - msg.Timestamp = lastTimestamp.Add(1 * time.Second) - msg.Sent = true - msg.Insert() } // endregion diff --git a/provisioning.go b/provisioning.go index 622f4c4..8dad45e 100644 --- a/provisioning.go +++ b/provisioning.go @@ -436,6 +436,9 @@ func (prov *ProvisioningAPI) Logout(w http.ResponseWriter, r *http.Request) { user.bridge.Metrics.TrackConnectionState(user.JID, false) user.removeFromJIDMap(BridgeState{StateEvent: StateLoggedOut}) user.DeleteSession() + prov.bridge.DB.BackfillQuery.DeleteAll(user.MXID) + prov.bridge.DB.HistorySyncQuery.DeleteAllConversations(user.MXID) + prov.bridge.DB.HistorySyncQuery.DeleteAllMessages(user.MXID) jsonResponse(w, http.StatusOK, Response{true, "Logged out successfully."}) } diff --git a/user.go b/user.go index 7b08a9a..dde6b7c 100644 --- a/user.go +++ b/user.go @@ -74,6 +74,8 @@ type User struct { groupListCache []*types.GroupInfo groupListCacheLock sync.Mutex groupListCacheTime time.Time + + BackfillQueue *BackfillQueue } func (bridge *Bridge) getUserByMXID(userID id.UserID, onlyIfExists bool) *User { @@ -561,18 +563,11 @@ func (user *User) HandleEvent(event interface{}) { go user.tryAutomaticDoublePuppeting() case *events.OfflineSyncPreview: user.log.Infofln("Server says it's going to send %d messages and %d receipts that were missed during downtime", v.Messages, v.Receipts) - go user.sendBridgeState(BridgeState{ - StateEvent: StateBackfilling, - Message: fmt.Sprintf("backfilling %d messages and %d receipts", v.Messages, v.Receipts), - }) case *events.OfflineSyncCompleted: if !user.PhoneRecentlySeen(true) { user.log.Infofln("Offline sync completed, but phone last seen date is still %s - sending phone offline bridge status", user.PhoneLastSeen) go user.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Error: WAPhoneOffline}) } else { - if user.GetPrevBridgeState().StateEvent == StateBackfilling { - user.log.Infoln("Offline sync completed") - } go user.sendBridgeState(BridgeState{StateEvent: StateConnected}) } case *events.AppStateSyncComplete: From e702bf39d24eeea91398cd84f852575cd8308f0b Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Thu, 24 Mar 2022 14:21:57 -0600 Subject: [PATCH 04/16] commands/backfill: add command to manually backfill a portal --- commands.go | 46 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/commands.go b/commands.go index 6d9099d..709b0ff 100644 --- a/commands.go +++ b/commands.go @@ -31,6 +31,7 @@ import ( "github.com/tidwall/gjson" "maunium.net/go/maulogger/v2" + "maunium.net/go/mautrix-whatsapp/database" "go.mau.fi/whatsmeow" "go.mau.fi/whatsmeow/appstate" @@ -140,7 +141,7 @@ func (handler *CommandHandler) CommandMux(ce *CommandEvent) { handler.CommandLogout(ce) case "toggle": handler.CommandToggle(ce) - case "set-relay", "unset-relay", "login-matrix", "sync", "list", "search", "open", "pm", "invite-link", "resolve", "resolve-link", "join", "create", "accept": + case "set-relay", "unset-relay", "login-matrix", "sync", "list", "search", "open", "pm", "invite-link", "resolve", "resolve-link", "join", "create", "accept", "backfill": if !ce.User.HasSession() { ce.Reply("You are not logged in. Use the `login` command to log into WhatsApp.") return @@ -176,6 +177,8 @@ func (handler *CommandHandler) CommandMux(ce *CommandEvent) { handler.CommandCreate(ce) case "accept": handler.CommandAccept(ce) + case "backfill": + handler.CommandBackfill(ce) } default: ce.Reply("Unknown command, use the `help` command for help.") @@ -603,6 +606,9 @@ func (handler *CommandHandler) CommandLogout(ce *CommandEvent) { ce.User.removeFromJIDMap(BridgeState{StateEvent: StateLoggedOut}) ce.User.DeleteConnection() ce.User.DeleteSession() + ce.Bridge.DB.BackfillQuery.DeleteAll(ce.User.MXID) + ce.Bridge.DB.HistorySyncQuery.DeleteAllConversations(ce.User.MXID) + ce.Bridge.DB.HistorySyncQuery.DeleteAllMessages(ce.User.MXID) ce.Reply("Logged out successfully.") } @@ -745,6 +751,7 @@ func (handler *CommandHandler) CommandHelp(ce *CommandEvent) { cmdPrefix + cmdSetPowerLevelHelp, cmdPrefix + cmdDeletePortalHelp, cmdPrefix + cmdDeleteAllPortalsHelp, + cmdPrefix + cmdBackfillHelp, }, "\n* ")) } @@ -835,6 +842,43 @@ func (handler *CommandHandler) CommandDeleteAllPortals(ce *CommandEvent) { }() } +const cmdBackfillHelp = `backfill [batch size] [batch delay] - Backfill all messages the portal.` + +func (handler *CommandHandler) CommandBackfill(ce *CommandEvent) { + if ce.Portal == nil { + ce.Reply("This is not a portal room") + return + } + if !ce.Bridge.Config.Bridge.HistorySync.Backfill { + ce.Bot.SendMessageEvent(ce.RoomID, event.EventMessage, &event.MessageEventContent{ + MsgType: event.MsgNotice, + Body: "Backfill is not enabled for this bridge.", + }) + return + } + batchSize := 100 + batchDelay := 5 + if len(ce.Args) >= 1 { + var err error + batchSize, err = strconv.Atoi(ce.Args[0]) + if err != nil || batchSize < 1 { + ce.Reply("\"%s\" isn't a valid batch size", ce.Args[0]) + return + } + } + if len(ce.Args) >= 2 { + var err error + batchDelay, err = strconv.Atoi(ce.Args[0]) + if err != nil || batchSize < 0 { + ce.Reply("\"%s\" isn't a valid batch delay", ce.Args[1]) + return + } + } + backfill := ce.Portal.bridge.DB.BackfillQuery.NewWithValues(ce.User.MXID, database.BackfillImmediate, 0, &ce.Portal.Key, nil, nil, batchSize, -1, batchDelay) + backfill.Insert() + ce.User.BackfillQueue.ReCheckQueue <- true +} + const cmdListHelp = `list [page] [items per page] - Get a list of all contacts and groups.` func matchesQuery(str string, query string) bool { From 8a49fea81221a50457c41cd2defd545529613525 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Fri, 25 Mar 2022 00:15:52 -0600 Subject: [PATCH 05/16] backfill: backfill conversation when started This applies to when the conversation is started via the provisioning API (start new chat) or when a new message comes in to that portal. --- commands.go | 2 +- historysync.go | 36 +++++++++++------------------------- portal.go | 10 ++++++++-- provisioning.go | 2 +- user.go | 6 +++--- 5 files changed, 24 insertions(+), 32 deletions(-) diff --git a/commands.go b/commands.go index 709b0ff..4c3f7bc 100644 --- a/commands.go +++ b/commands.go @@ -1059,7 +1059,7 @@ func (handler *CommandHandler) CommandOpen(ce *CommandEvent) { portal.UpdateMatrixRoom(ce.User, info) ce.Reply("Portal room synced.") } else { - err = portal.CreateMatrixRoom(ce.User, info, true) + err = portal.CreateMatrixRoom(ce.User, info, true, true) if err != nil { ce.Reply("Failed to create room: %v", err) } else { diff --git a/historysync.go b/historysync.go index 75abc60..553dfff 100644 --- a/historysync.go +++ b/historysync.go @@ -78,6 +78,10 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac for req := range backfillRequests { user.log.Infof("Backfill request: %v", req) conv := user.bridge.DB.HistorySyncQuery.GetConversation(user.MXID, req.Portal) + if conv == nil { + user.log.Errorf("Could not find conversation for %s in %s", user.MXID, req.Portal.String()) + continue + } // Update the client store with basic chat settings. if conv.MuteEndTime.After(time.Now()) { @@ -102,7 +106,7 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac if len(portal.MXID) == 0 { user.log.Debugln("Creating portal for", portal.Key.JID, "as part of history sync handling") - err := portal.CreateMatrixRoom(user, nil, true) + err := portal.CreateMatrixRoom(user, nil, true, false) if err != nil { user.log.Warnfln("Failed to create room for %s during backfill: %v", portal.Key.JID, err) continue @@ -122,8 +126,6 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac break } - time.Sleep(time.Duration(req.BatchDelay) * time.Second) - var msgs []*waProto.WebMessageInfo if len(toBackfill) <= req.MaxBatchEvents { msgs = toBackfill @@ -134,6 +136,7 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac } if len(msgs) > 0 { + time.Sleep(time.Duration(req.BatchDelay) * time.Second) user.log.Debugf("Backfilling %d messages in %s", len(msgs), portal.Key.JID) insertionEventIds = append(insertionEventIds, portal.backfill(user, msgs)...) } @@ -224,8 +227,7 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History // If this was the initial bootstrap, enqueue immediate backfills for the // most recent portals. If it's the last history sync event, start // backfilling the rest of the history of the portals. - historySyncConfig := user.bridge.Config.Bridge.HistorySync - if historySyncConfig.Backfill && (evt.GetSyncType() == waProto.HistorySync_FULL || evt.GetSyncType() == waProto.HistorySync_INITIAL_BOOTSTRAP) { + if user.bridge.Config.Bridge.HistorySync.Backfill && evt.GetSyncType() == waProto.HistorySync_FULL { nMostRecent := user.bridge.DB.HistorySyncQuery.GetNMostRecentConversations(user.MXID, user.bridge.Config.Bridge.HistorySync.MaxInitialConversations) for i, conv := range nMostRecent { jid, err := types.ParseJID(conv.ConversationID) @@ -235,26 +237,10 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History } portal := user.GetPortalByJID(jid) - switch evt.GetSyncType() { - case waProto.HistorySync_INITIAL_BOOTSTRAP: - // Enqueue immediate backfills for the most recent messages first. - maxMessages := historySyncConfig.Immediate.MaxEvents - initialBackfill := user.bridge.DB.BackfillQuery.NewWithValues(user.MXID, database.BackfillImmediate, i, &portal.Key, nil, nil, maxMessages, maxMessages, 0) - initialBackfill.Insert() - - case waProto.HistorySync_FULL: - // Enqueue deferred backfills as configured. - for j, backfillStage := range historySyncConfig.Deferred { - var startDate *time.Time = nil - if backfillStage.StartDaysAgo > 0 { - startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo) - startDate = &startDaysAgo - } - backfill := user.bridge.DB.BackfillQuery.NewWithValues( - user.MXID, database.BackfillDeferred, j*len(nMostRecent)+i, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay) - backfill.Insert() - } - } + // Enqueue immediate backfills for the most recent messages first. + user.EnqueueImmedateBackfill(portal, i) + // Enqueue deferred backfills as configured. + user.EnqueueDeferredBackfills(portal, len(nMostRecent), i) } // Tell the queue to check for new backfill requests. diff --git a/portal.go b/portal.go index 2765ecf..c98efda 100644 --- a/portal.go +++ b/portal.go @@ -237,7 +237,7 @@ func (portal *Portal) handleMessageLoopItem(msg PortalMessage) { return } portal.log.Debugln("Creating Matrix room from incoming message") - err := portal.CreateMatrixRoom(msg.source, nil, false) + err := portal.CreateMatrixRoom(msg.source, nil, false, true) if err != nil { portal.log.Errorln("Failed to create portal room:", err) return @@ -1163,7 +1163,7 @@ func (portal *Portal) UpdateBridgeInfo() { } } -func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, isFullInfo bool) error { +func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, isFullInfo, backfill bool) error { portal.roomCreateLock.Lock() defer portal.roomCreateLock.Unlock() if len(portal.MXID) > 0 { @@ -1336,6 +1336,12 @@ func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, i portal.FirstEventID = firstEventResp.EventID portal.Update() } + + if user.bridge.Config.Bridge.HistorySync.Backfill && backfill { + user.EnqueueImmedateBackfill(portal, 0) + user.EnqueueDeferredBackfills(portal, 1, 0) + user.BackfillQueue.ReCheckQueue <- true + } return nil } diff --git a/provisioning.go b/provisioning.go index 8dad45e..f149240 100644 --- a/provisioning.go +++ b/provisioning.go @@ -346,7 +346,7 @@ func (prov *ProvisioningAPI) OpenGroup(w http.ResponseWriter, r *http.Request) { portal := user.GetPortalByJID(info.JID) status := http.StatusOK if len(portal.MXID) == 0 { - err = portal.CreateMatrixRoom(user, info, true) + err = portal.CreateMatrixRoom(user, info, true, true) if err != nil { jsonResponse(w, http.StatusInternalServerError, Error{ Error: fmt.Sprintf("Failed to create portal: %v", err), diff --git a/user.go b/user.go index dde6b7c..db2575d 100644 --- a/user.go +++ b/user.go @@ -937,7 +937,7 @@ func (user *User) ResyncGroups(createPortals bool) error { portal := user.GetPortalByJID(group.JID) if len(portal.MXID) == 0 { if createPortals { - err = portal.CreateMatrixRoom(user, group, true) + err = portal.CreateMatrixRoom(user, group, true, true) if err != nil { return fmt.Errorf("failed to create room for %s: %w", group.JID, err) } @@ -1020,7 +1020,7 @@ func (user *User) markSelfReadFull(portal *Portal) { func (user *User) handleGroupCreate(evt *events.JoinedGroup) { portal := user.GetPortalByJID(evt.JID) if len(portal.MXID) == 0 { - err := portal.CreateMatrixRoom(user, &evt.GroupInfo, true) + err := portal.CreateMatrixRoom(user, &evt.GroupInfo, true, true) if err != nil { user.log.Errorln("Failed to create Matrix room after join notification: %v", err) } @@ -1088,7 +1088,7 @@ func (user *User) StartPM(jid types.JID, reason string) (*Portal, *Puppet, bool, return portal, puppet, false, nil } } - err := portal.CreateMatrixRoom(user, nil, false) + err := portal.CreateMatrixRoom(user, nil, false, true) return portal, puppet, true, err } From 830c294b91060e9c1a25d5ecd6c27619bb3ef4ba Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Wed, 30 Mar 2022 17:40:23 -0600 Subject: [PATCH 06/16] historysync: delete history sync messages once backfilled --- database/historysync.go | 34 ++++++++++++++++--- database/message.go | 1 + .../upgrades/2022-03-18-historysync-store.go | 2 ++ historysync.go | 15 +++++--- 4 files changed, 43 insertions(+), 9 deletions(-) diff --git a/database/historysync.go b/database/historysync.go index 89fefb6..8532c5e 100644 --- a/database/historysync.go +++ b/database/historysync.go @@ -20,6 +20,8 @@ import ( "database/sql" "errors" "fmt" + "strconv" + "strings" "time" waProto "go.mau.fi/whatsmeow/binary/proto" @@ -203,7 +205,7 @@ func (hsq *HistorySyncQuery) DeleteAllConversations(userID id.UserID) error { const ( getMessagesBetween = ` - SELECT data + SELECT id, data FROM history_sync_message WHERE user_mxid=$1 AND conversation_id=$2 @@ -211,18 +213,28 @@ const ( ORDER BY timestamp DESC %s ` + deleteMessages = ` + DELETE FROM history_sync_message + WHERE id IN (%s) + ` ) type HistorySyncMessage struct { db *Database log log.Logger + ID int UserID id.UserID ConversationID string Timestamp time.Time Data []byte } +type WrappedWebMessageInfo struct { + ID int + Message *waProto.WebMessageInfo +} + func (hsq *HistorySyncQuery) NewMessageWithValues(userID id.UserID, conversationID string, message *waProto.HistorySyncMsg) (*HistorySyncMessage, error) { msgData, err := proto.Marshal(message) if err != nil { @@ -248,7 +260,7 @@ func (hsm *HistorySyncMessage) Insert() { } } -func (hsq *HistorySyncQuery) GetMessagesBetween(userID id.UserID, conversationID string, startTime, endTime *time.Time, limit int) (messages []*waProto.WebMessageInfo) { +func (hsq *HistorySyncQuery) GetMessagesBetween(userID id.UserID, conversationID string, startTime, endTime *time.Time, limit int) (messages []*WrappedWebMessageInfo) { whereClauses := "" args := []interface{}{userID, conversationID} argNum := 3 @@ -272,9 +284,10 @@ func (hsq *HistorySyncQuery) GetMessagesBetween(userID id.UserID, conversationID if err != nil || rows == nil { return nil } + var msgID int var msgData []byte for rows.Next() { - err := rows.Scan(&msgData) + err := rows.Scan(&msgID, &msgData) if err != nil { hsq.log.Error("Database scan failed: %v", err) continue @@ -285,11 +298,24 @@ func (hsq *HistorySyncQuery) GetMessagesBetween(userID id.UserID, conversationID hsq.log.Errorf("Failed to unmarshal history sync message: %v", err) continue } - messages = append(messages, historySyncMsg.Message) + messages = append(messages, &WrappedWebMessageInfo{ + ID: msgID, + Message: historySyncMsg.Message, + }) } return } +func (hsq *HistorySyncQuery) DeleteMessages(messages []*WrappedWebMessageInfo) error { + messageIDs := make([]string, len(messages)) + for i, msg := range messages { + messageIDs[i] = strconv.Itoa(msg.ID) + } + + _, err := hsq.db.Exec(fmt.Sprintf(deleteMessages, strings.Join(messageIDs, ","))) + return err +} + func (hsq *HistorySyncQuery) DeleteAllMessages(userID id.UserID) error { _, err := hsq.db.Exec("DELETE FROM history_sync_message WHERE user_mxid=$1", userID) return err diff --git a/database/message.go b/database/message.go index e782d9f..ad90cba 100644 --- a/database/message.go +++ b/database/message.go @@ -143,6 +143,7 @@ type Message struct { db *Database log log.Logger + ID int Chat PortalKey JID types.MessageID MXID id.EventID diff --git a/database/upgrades/2022-03-18-historysync-store.go b/database/upgrades/2022-03-18-historysync-store.go index bf53ab1..5597afb 100644 --- a/database/upgrades/2022-03-18-historysync-store.go +++ b/database/upgrades/2022-03-18-historysync-store.go @@ -34,6 +34,7 @@ func init() { } _, err = tx.Exec(` CREATE TABLE history_sync_message ( + id SERIAL PRIMARY KEY, user_mxid TEXT, conversation_id TEXT, timestamp TIMESTAMP, @@ -74,6 +75,7 @@ func init() { } _, err = tx.Exec(` CREATE TABLE history_sync_message ( + id INTEGER PRIMARY KEY, user_mxid TEXT, conversation_id TEXT, timestamp DATETIME, diff --git a/historysync.go b/historysync.go index 553dfff..cea3203 100644 --- a/historysync.go +++ b/historysync.go @@ -126,7 +126,7 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac break } - var msgs []*waProto.WebMessageInfo + var msgs []*database.WrappedWebMessageInfo if len(toBackfill) <= req.MaxBatchEvents { msgs = toBackfill toBackfill = toBackfill[0:0] @@ -144,9 +144,14 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac user.log.Debugf("Finished backfilling %d messages in %s", len(allMsgs), portal.Key.JID) if len(insertionEventIds) > 0 { portal.sendPostBackfillDummy( - time.Unix(int64(allMsgs[len(allMsgs)-1].GetMessageTimestamp()), 0), + time.Unix(int64(allMsgs[len(allMsgs)-1].Message.GetMessageTimestamp()), 0), insertionEventIds[0]) } + user.log.Debugf("Deleting %d history sync messages after backfilling", len(allMsgs)) + err := user.bridge.DB.HistorySyncQuery.DeleteMessages(allMsgs) + if err != nil { + user.log.Warnf("Failed to delete %d history sync messages after backfilling: %v", len(allMsgs), err) + } } else { user.log.Debugfln("Not backfilling %s: no bridgeable messages found", portal.Key.JID) } @@ -288,14 +293,14 @@ var ( MSC2716Marker = event.Type{Type: "org.matrix.msc2716.marker", Class: event.MessageEventType} ) -func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) []id.EventID { +func (portal *Portal) backfill(source *User, messages []*database.WrappedWebMessageInfo) []id.EventID { portal.backfillLock.Lock() defer portal.backfillLock.Unlock() var historyBatch, newBatch mautrix.ReqBatchSend var historyBatchInfos, newBatchInfos []*wrappedInfo - firstMsgTimestamp := time.Unix(int64(messages[len(messages)-1].GetMessageTimestamp()), 0) + firstMsgTimestamp := time.Unix(int64(messages[len(messages)-1].Message.GetMessageTimestamp()), 0) historyBatch.StateEventsAtStart = make([]*event.Event, 0) newBatch.StateEventsAtStart = make([]*event.Event, 0) @@ -350,7 +355,7 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) portal.log.Infofln("Processing history sync with %d messages", len(messages)) // The messages are ordered newest to oldest, so iterate them in reverse order. for i := len(messages) - 1; i >= 0; i-- { - webMsg := messages[i] + webMsg := messages[i].Message msgType := getMessageType(webMsg.GetMessage()) if msgType == "unknown" || msgType == "ignore" || msgType == "unknown_protocol" { if msgType != "ignore" { From 54534f6b42bd82856477a5b0554aad331be64d73 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Wed, 30 Mar 2022 17:59:55 -0600 Subject: [PATCH 07/16] historysync: enqueue immediate backfill on INITIAL_BOOTSTRAP --- historysync.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/historysync.go b/historysync.go index cea3203..c7ca570 100644 --- a/historysync.go +++ b/historysync.go @@ -232,7 +232,7 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History // If this was the initial bootstrap, enqueue immediate backfills for the // most recent portals. If it's the last history sync event, start // backfilling the rest of the history of the portals. - if user.bridge.Config.Bridge.HistorySync.Backfill && evt.GetSyncType() == waProto.HistorySync_FULL { + if user.bridge.Config.Bridge.HistorySync.Backfill && (evt.GetSyncType() == waProto.HistorySync_INITIAL_BOOTSTRAP || evt.GetSyncType() == waProto.HistorySync_FULL) { nMostRecent := user.bridge.DB.HistorySyncQuery.GetNMostRecentConversations(user.MXID, user.bridge.Config.Bridge.HistorySync.MaxInitialConversations) for i, conv := range nMostRecent { jid, err := types.ParseJID(conv.ConversationID) @@ -242,10 +242,14 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History } portal := user.GetPortalByJID(jid) - // Enqueue immediate backfills for the most recent messages first. - user.EnqueueImmedateBackfill(portal, i) - // Enqueue deferred backfills as configured. - user.EnqueueDeferredBackfills(portal, len(nMostRecent), i) + switch evt.GetSyncType() { + case waProto.HistorySync_INITIAL_BOOTSTRAP: + // Enqueue immediate backfills for the most recent messages first. + user.EnqueueImmedateBackfill(portal, i) + case waProto.HistorySync_FULL: + // Enqueue deferred backfills as configured. + user.EnqueueDeferredBackfills(portal, len(nMostRecent), i) + } } // Tell the queue to check for new backfill requests. From 748c9509a1f884981aecbf5334601e79909d87eb Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Mon, 4 Apr 2022 16:07:25 -0600 Subject: [PATCH 08/16] historysync: lock earlier to prevent races and duplicate messages Closes BRI-2709 --- historysync.go | 114 +++++++++++++++++++++++++------------------------ 1 file changed, 59 insertions(+), 55 deletions(-) diff --git a/historysync.go b/historysync.go index c7ca570..c290241 100644 --- a/historysync.go +++ b/historysync.go @@ -100,64 +100,71 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac portal.Update() } - if !user.shouldCreatePortalForHistorySync(conv, portal) { - continue + user.createOrUpdatePortalAndBackfillWithLock(req, conv, portal) + } +} + +func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill, conv *database.HistorySyncConversation, portal *Portal) { + portal.backfillLock.Lock() + defer portal.backfillLock.Unlock() + + if !user.shouldCreatePortalForHistorySync(conv, portal) { + return + } + + if len(portal.MXID) == 0 { + user.log.Debugln("Creating portal for", portal.Key.JID, "as part of history sync handling") + err := portal.CreateMatrixRoom(user, nil, true, false) + if err != nil { + user.log.Warnfln("Failed to create room for %s during backfill: %v", portal.Key.JID, err) + return } + } else { + portal.UpdateMatrixRoom(user, nil) + } - if len(portal.MXID) == 0 { - user.log.Debugln("Creating portal for", portal.Key.JID, "as part of history sync handling") - err := portal.CreateMatrixRoom(user, nil, true, false) - if err != nil { - user.log.Warnfln("Failed to create room for %s during backfill: %v", portal.Key.JID, err) - continue + allMsgs := user.bridge.DB.HistorySyncQuery.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, req.TimeEnd, req.MaxTotalEvents) + + if len(allMsgs) > 0 { + user.log.Debugf("Backfilling %d messages in %s, %d messages at a time", len(allMsgs), portal.Key.JID, req.MaxBatchEvents) + toBackfill := allMsgs[0:] + insertionEventIds := []id.EventID{} + for { + if len(toBackfill) == 0 { + break + } + + var msgs []*database.WrappedWebMessageInfo + if len(toBackfill) <= req.MaxBatchEvents { + msgs = toBackfill + toBackfill = toBackfill[0:0] + } else { + msgs = toBackfill[len(toBackfill)-req.MaxBatchEvents:] + toBackfill = toBackfill[:len(toBackfill)-req.MaxBatchEvents] + } + + if len(msgs) > 0 { + time.Sleep(time.Duration(req.BatchDelay) * time.Second) + user.log.Debugf("Backfilling %d messages in %s", len(msgs), portal.Key.JID) + insertionEventIds = append(insertionEventIds, portal.backfill(user, msgs)...) } - } else { - portal.UpdateMatrixRoom(user, nil) } - - allMsgs := user.bridge.DB.HistorySyncQuery.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, req.TimeEnd, req.MaxTotalEvents) - - if len(allMsgs) > 0 { - user.log.Debugf("Backfilling %d messages in %s, %d messages at a time", len(allMsgs), portal.Key.JID, req.MaxBatchEvents) - toBackfill := allMsgs[0:] - insertionEventIds := []id.EventID{} - for { - if len(toBackfill) == 0 { - break - } - - var msgs []*database.WrappedWebMessageInfo - if len(toBackfill) <= req.MaxBatchEvents { - msgs = toBackfill - toBackfill = toBackfill[0:0] - } else { - msgs = toBackfill[len(toBackfill)-req.MaxBatchEvents:] - toBackfill = toBackfill[:len(toBackfill)-req.MaxBatchEvents] - } - - if len(msgs) > 0 { - time.Sleep(time.Duration(req.BatchDelay) * time.Second) - user.log.Debugf("Backfilling %d messages in %s", len(msgs), portal.Key.JID) - insertionEventIds = append(insertionEventIds, portal.backfill(user, msgs)...) - } - } - user.log.Debugf("Finished backfilling %d messages in %s", len(allMsgs), portal.Key.JID) - if len(insertionEventIds) > 0 { - portal.sendPostBackfillDummy( - time.Unix(int64(allMsgs[len(allMsgs)-1].Message.GetMessageTimestamp()), 0), - insertionEventIds[0]) - } - user.log.Debugf("Deleting %d history sync messages after backfilling", len(allMsgs)) - err := user.bridge.DB.HistorySyncQuery.DeleteMessages(allMsgs) - if err != nil { - user.log.Warnf("Failed to delete %d history sync messages after backfilling: %v", len(allMsgs), err) - } - } else { - user.log.Debugfln("Not backfilling %s: no bridgeable messages found", portal.Key.JID) + user.log.Debugf("Finished backfilling %d messages in %s", len(allMsgs), portal.Key.JID) + if len(insertionEventIds) > 0 { + portal.sendPostBackfillDummy( + time.Unix(int64(allMsgs[len(allMsgs)-1].Message.GetMessageTimestamp()), 0), + insertionEventIds[0]) } - if !conv.MarkedAsUnread && conv.UnreadCount == 0 { - user.markSelfReadFull(portal) + user.log.Debugf("Deleting %d history sync messages after backfilling", len(allMsgs)) + err := user.bridge.DB.HistorySyncQuery.DeleteMessages(allMsgs) + if err != nil { + user.log.Warnf("Failed to delete %d history sync messages after backfilling: %v", len(allMsgs), err) } + } else { + user.log.Debugfln("Not backfilling %s: no bridgeable messages found", portal.Key.JID) + } + if !conv.MarkedAsUnread && conv.UnreadCount == 0 { + user.markSelfReadFull(portal) } } @@ -298,9 +305,6 @@ var ( ) func (portal *Portal) backfill(source *User, messages []*database.WrappedWebMessageInfo) []id.EventID { - portal.backfillLock.Lock() - defer portal.backfillLock.Unlock() - var historyBatch, newBatch mautrix.ReqBatchSend var historyBatchInfos, newBatchInfos []*wrappedInfo From a8be4b11a88b3b6e37c64b217a30791b07fa1009 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Tue, 5 Apr 2022 12:28:58 -0600 Subject: [PATCH 09/16] historysync: start deferred backfill on non-full syncs Co-authored-by: Tulir Asokan --- historysync.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/historysync.go b/historysync.go index c290241..9565fd0 100644 --- a/historysync.go +++ b/historysync.go @@ -239,7 +239,7 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History // If this was the initial bootstrap, enqueue immediate backfills for the // most recent portals. If it's the last history sync event, start // backfilling the rest of the history of the portals. - if user.bridge.Config.Bridge.HistorySync.Backfill && (evt.GetSyncType() == waProto.HistorySync_INITIAL_BOOTSTRAP || evt.GetSyncType() == waProto.HistorySync_FULL) { + if user.bridge.Config.Bridge.HistorySync.Backfill && (evt.GetSyncType() == waProto.HistorySync_INITIAL_BOOTSTRAP || evt.GetSyncType() == waProto.HistorySync_FULL || evt.GetSyncType() == waProto.HistorySync_RECENT) { nMostRecent := user.bridge.DB.HistorySyncQuery.GetNMostRecentConversations(user.MXID, user.bridge.Config.Bridge.HistorySync.MaxInitialConversations) for i, conv := range nMostRecent { jid, err := types.ParseJID(conv.ConversationID) @@ -253,9 +253,11 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History case waProto.HistorySync_INITIAL_BOOTSTRAP: // Enqueue immediate backfills for the most recent messages first. user.EnqueueImmedateBackfill(portal, i) - case waProto.HistorySync_FULL: - // Enqueue deferred backfills as configured. - user.EnqueueDeferredBackfills(portal, len(nMostRecent), i) + case waProto.HistorySync_FULL, waProto.HistorySync_RECENT: + if evt.GetProgress() >= 99 { + // Enqueue deferred backfills as configured. + user.EnqueueDeferredBackfills(portal, len(nMostRecent), i) + } } } From 173edfaaf15071af49eed2f14cabba31dcdbfece Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Tue, 5 Apr 2022 12:29:43 -0600 Subject: [PATCH 10/16] batch send: fix plaintext content Co-authored-by: Tulir Asokan --- historysync.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/historysync.go b/historysync.go index 9565fd0..8800a39 100644 --- a/historysync.go +++ b/historysync.go @@ -532,6 +532,15 @@ func (portal *Portal) wrapBatchEvent(info *types.MessageInfo, intent *appservice if err != nil { return nil, err } + + if eventType == event.EventEncrypted { + // Clear other custom keys if the event was encrypted, but keep the double puppet identifier + wrappedContent.Raw = map[string]interface{}{backfillIDField: info.ID} + if intent.IsCustomPuppet { + wrappedContent.Raw[doublePuppetKey] = doublePuppetValue + } + } + return &event.Event{ Sender: intent.UserID, Type: newEventType, From 005fbb09f8e498a3e058b28ee511a65a09356d25 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Tue, 5 Apr 2022 13:12:57 -0600 Subject: [PATCH 11/16] backfill queue: don't delete, just mark as complete --- backfillqueue.go | 4 ++-- database/backfillqueue.go | 14 ++++++++------ .../upgrades/2022-03-15-prioritized-backfill.go | 1 + 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/backfillqueue.go b/backfillqueue.go index 8d0d9c5..fb3fe3f 100644 --- a/backfillqueue.go +++ b/backfillqueue.go @@ -41,7 +41,7 @@ func (bq *BackfillQueue) immediateBackfillLoop(user *User) { for { if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); backfill != nil { bq.ImmediateBackfillRequests <- backfill - backfill.Delete() + backfill.MarkDone() } else { select { case <-bq.ReCheckQueue: @@ -61,7 +61,7 @@ func (bq *BackfillQueue) deferredBackfillLoop(user *User) { if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillDeferred); backfill != nil { bq.DeferredBackfillRequests <- backfill - backfill.Delete() + backfill.MarkDone() } else { time.Sleep(10 * time.Second) } diff --git a/database/backfillqueue.go b/database/backfillqueue.go index 7f012e3..b6c82a8 100644 --- a/database/backfillqueue.go +++ b/database/backfillqueue.go @@ -67,6 +67,7 @@ const ( FROM backfill_queue WHERE user_mxid=$1 AND type=$2 + AND completed_at IS NULL ORDER BY priority, queue_id LIMIT 1 ` @@ -106,6 +107,7 @@ type Backfill struct { MaxBatchEvents int MaxTotalEvents int BatchDelay int + CompletedAt *time.Time } func (b *Backfill) Scan(row Scannable) *Backfill { @@ -122,10 +124,10 @@ func (b *Backfill) Scan(row Scannable) *Backfill { func (b *Backfill) Insert() { rows, err := b.db.Query(` INSERT INTO backfill_queue - (user_mxid, type, priority, portal_jid, portal_receiver, time_start, time_end, max_batch_events, max_total_events, batch_delay) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + (user_mxid, type, priority, portal_jid, portal_receiver, time_start, time_end, max_batch_events, max_total_events, batch_delay, completed_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING queue_id - `, b.UserID, b.BackfillType, b.Priority, b.Portal.JID, b.Portal.Receiver, b.TimeStart, b.TimeEnd, b.MaxBatchEvents, b.MaxTotalEvents, b.BatchDelay) + `, b.UserID, b.BackfillType, b.Priority, b.Portal.JID, b.Portal.Receiver, b.TimeStart, b.TimeEnd, b.MaxBatchEvents, b.MaxTotalEvents, b.BatchDelay, b.CompletedAt) defer rows.Close() if err != nil || !rows.Next() { b.log.Warnfln("Failed to insert %v/%s with priority %d: %v", b.BackfillType, b.Portal.JID, b.Priority, err) @@ -137,13 +139,13 @@ func (b *Backfill) Insert() { } } -func (b *Backfill) Delete() { +func (b *Backfill) MarkDone() { if b.QueueID == 0 { b.log.Errorf("Cannot delete backfill without queue_id. Maybe it wasn't actually inserted in the database?") return } - _, err := b.db.Exec("DELETE FROM backfill_queue WHERE queue_id=$1", b.QueueID) + _, err := b.db.Exec("UPDATE backfill_queue SET completed_at=$1 WHERE queue_id=$2", time.Now(), b.QueueID) if err != nil { - b.log.Warnfln("Failed to delete %s/%s: %v", b.BackfillType, b.Priority, err) + b.log.Warnfln("Failed to mark %s/%s as complete: %v", b.BackfillType, b.Priority, err) } } diff --git a/database/upgrades/2022-03-15-prioritized-backfill.go b/database/upgrades/2022-03-15-prioritized-backfill.go index 96fc9cd..813e746 100644 --- a/database/upgrades/2022-03-15-prioritized-backfill.go +++ b/database/upgrades/2022-03-15-prioritized-backfill.go @@ -17,6 +17,7 @@ func init() { max_batch_events INTEGER NOT NULL, max_total_events INTEGER, batch_delay INTEGER, + completed_at TIMESTAMP, FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE From eb0a13a75360acc5c5c80002767caf4198794671 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Tue, 5 Apr 2022 13:13:20 -0600 Subject: [PATCH 12/16] historysync: use userID, conversationID, messageID as PK --- database/historysync.go | 43 ++++++++----------- .../upgrades/2022-03-18-historysync-store.go | 12 +++--- historysync.go | 14 +++--- 3 files changed, 32 insertions(+), 37 deletions(-) diff --git a/database/historysync.go b/database/historysync.go index 8532c5e..efbc784 100644 --- a/database/historysync.go +++ b/database/historysync.go @@ -20,7 +20,6 @@ import ( "database/sql" "errors" "fmt" - "strconv" "strings" "time" @@ -205,7 +204,7 @@ func (hsq *HistorySyncQuery) DeleteAllConversations(userID id.UserID) error { const ( getMessagesBetween = ` - SELECT id, data + SELECT data FROM history_sync_message WHERE user_mxid=$1 AND conversation_id=$2 @@ -215,7 +214,7 @@ const ( ` deleteMessages = ` DELETE FROM history_sync_message - WHERE id IN (%s) + WHERE %s ` ) @@ -223,19 +222,14 @@ type HistorySyncMessage struct { db *Database log log.Logger - ID int UserID id.UserID ConversationID string + MessageID string Timestamp time.Time Data []byte } -type WrappedWebMessageInfo struct { - ID int - Message *waProto.WebMessageInfo -} - -func (hsq *HistorySyncQuery) NewMessageWithValues(userID id.UserID, conversationID string, message *waProto.HistorySyncMsg) (*HistorySyncMessage, error) { +func (hsq *HistorySyncQuery) NewMessageWithValues(userID id.UserID, conversationID, messageID string, message *waProto.HistorySyncMsg) (*HistorySyncMessage, error) { msgData, err := proto.Marshal(message) if err != nil { return nil, err @@ -245,6 +239,7 @@ func (hsq *HistorySyncQuery) NewMessageWithValues(userID id.UserID, conversation log: hsq.log, UserID: userID, ConversationID: conversationID, + MessageID: messageID, Timestamp: time.Unix(int64(message.Message.GetMessageTimestamp()), 0), Data: msgData, }, nil @@ -252,15 +247,16 @@ func (hsq *HistorySyncQuery) NewMessageWithValues(userID id.UserID, conversation func (hsm *HistorySyncMessage) Insert() { _, err := hsm.db.Exec(` - INSERT INTO history_sync_message (user_mxid, conversation_id, timestamp, data) - VALUES ($1, $2, $3, $4) - `, hsm.UserID, hsm.ConversationID, hsm.Timestamp, hsm.Data) + INSERT INTO history_sync_message (user_mxid, conversation_id, message_id, timestamp, data) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (user_mxid, conversation_id, message_id) DO NOTHING + `, hsm.UserID, hsm.ConversationID, hsm.MessageID, hsm.Timestamp, hsm.Data) if err != nil { hsm.log.Warnfln("Failed to insert history sync message %s/%s: %v", hsm.ConversationID, hsm.Timestamp, err) } } -func (hsq *HistorySyncQuery) GetMessagesBetween(userID id.UserID, conversationID string, startTime, endTime *time.Time, limit int) (messages []*WrappedWebMessageInfo) { +func (hsq *HistorySyncQuery) GetMessagesBetween(userID id.UserID, conversationID string, startTime, endTime *time.Time, limit int) (messages []*waProto.WebMessageInfo) { whereClauses := "" args := []interface{}{userID, conversationID} argNum := 3 @@ -284,10 +280,10 @@ func (hsq *HistorySyncQuery) GetMessagesBetween(userID id.UserID, conversationID if err != nil || rows == nil { return nil } - var msgID int + var msgData []byte for rows.Next() { - err := rows.Scan(&msgID, &msgData) + err := rows.Scan(&msgData) if err != nil { hsq.log.Error("Database scan failed: %v", err) continue @@ -298,21 +294,20 @@ func (hsq *HistorySyncQuery) GetMessagesBetween(userID id.UserID, conversationID hsq.log.Errorf("Failed to unmarshal history sync message: %v", err) continue } - messages = append(messages, &WrappedWebMessageInfo{ - ID: msgID, - Message: historySyncMsg.Message, - }) + messages = append(messages, historySyncMsg.Message) } return } -func (hsq *HistorySyncQuery) DeleteMessages(messages []*WrappedWebMessageInfo) error { - messageIDs := make([]string, len(messages)) +func (hsq *HistorySyncQuery) DeleteMessages(userID id.UserID, conversationID string, messages []*waProto.WebMessageInfo) error { + whereClauses := []string{} + preparedStatementArgs := []interface{}{userID, conversationID} for i, msg := range messages { - messageIDs[i] = strconv.Itoa(msg.ID) + whereClauses = append(whereClauses, fmt.Sprintf("(user_mxid=$1 AND conversation_id=$2 AND message_id=$%d)", i+3)) + preparedStatementArgs = append(preparedStatementArgs, msg.GetKey().GetId()) } - _, err := hsq.db.Exec(fmt.Sprintf(deleteMessages, strings.Join(messageIDs, ","))) + _, err := hsq.db.Exec(fmt.Sprintf(deleteMessages, strings.Join(whereClauses, " OR ")), preparedStatementArgs...) return err } diff --git a/database/upgrades/2022-03-18-historysync-store.go b/database/upgrades/2022-03-18-historysync-store.go index 5597afb..15564d0 100644 --- a/database/upgrades/2022-03-18-historysync-store.go +++ b/database/upgrades/2022-03-18-historysync-store.go @@ -24,7 +24,6 @@ func init() { unread_count INTEGER, PRIMARY KEY (user_mxid, conversation_id), - UNIQUE (conversation_id), FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE ON UPDATE CASCADE ) @@ -34,14 +33,15 @@ func init() { } _, err = tx.Exec(` CREATE TABLE history_sync_message ( - id SERIAL PRIMARY KEY, user_mxid TEXT, conversation_id TEXT, + message_id TEXT, timestamp TIMESTAMP, data BYTEA, + PRIMARY KEY (user_mxid, conversation_id, message_id), FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, - FOREIGN KEY (conversation_id) REFERENCES history_sync_conversation(conversation_id) ON DELETE CASCADE + FOREIGN KEY (user_mxid, conversation_id) REFERENCES history_sync_conversation(user_mxid, conversation_id) ON DELETE CASCADE ) `) if err != nil { @@ -65,7 +65,6 @@ func init() { unread_count INTEGER, PRIMARY KEY (user_mxid, conversation_id), - UNIQUE (conversation_id), FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE ON UPDATE CASCADE ) @@ -75,14 +74,15 @@ func init() { } _, err = tx.Exec(` CREATE TABLE history_sync_message ( - id INTEGER PRIMARY KEY, user_mxid TEXT, conversation_id TEXT, + message_id TEXT, timestamp DATETIME, data BLOB, + PRIMARY KEY (user_mxid, conversation_id, message_id), FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, - FOREIGN KEY (conversation_id) REFERENCES history_sync_conversation(conversation_id) ON DELETE CASCADE + FOREIGN KEY (user_mxid, conversation_id) REFERENCES history_sync_conversation(user_mxid, conversation_id) ON DELETE CASCADE ) `) if err != nil { diff --git a/historysync.go b/historysync.go index 8800a39..8790b9c 100644 --- a/historysync.go +++ b/historysync.go @@ -134,7 +134,7 @@ func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill break } - var msgs []*database.WrappedWebMessageInfo + var msgs []*waProto.WebMessageInfo if len(toBackfill) <= req.MaxBatchEvents { msgs = toBackfill toBackfill = toBackfill[0:0] @@ -152,11 +152,11 @@ func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill user.log.Debugf("Finished backfilling %d messages in %s", len(allMsgs), portal.Key.JID) if len(insertionEventIds) > 0 { portal.sendPostBackfillDummy( - time.Unix(int64(allMsgs[len(allMsgs)-1].Message.GetMessageTimestamp()), 0), + time.Unix(int64(allMsgs[len(allMsgs)-1].GetMessageTimestamp()), 0), insertionEventIds[0]) } user.log.Debugf("Deleting %d history sync messages after backfilling", len(allMsgs)) - err := user.bridge.DB.HistorySyncQuery.DeleteMessages(allMsgs) + err := user.bridge.DB.HistorySyncQuery.DeleteMessages(user.MXID, conv.ConversationID, allMsgs) if err != nil { user.log.Warnf("Failed to delete %d history sync messages after backfilling: %v", len(allMsgs), err) } @@ -227,7 +227,7 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History continue } - message, err := user.bridge.DB.HistorySyncQuery.NewMessageWithValues(user.MXID, conv.GetId(), msg) + message, err := user.bridge.DB.HistorySyncQuery.NewMessageWithValues(user.MXID, conv.GetId(), msg.Message.GetKey().GetId(), msg) if err != nil { user.log.Warnf("Failed to save message %s in %s. Error: %+v", msg.Message.Key.Id, conv.GetId(), err) continue @@ -306,11 +306,11 @@ var ( MSC2716Marker = event.Type{Type: "org.matrix.msc2716.marker", Class: event.MessageEventType} ) -func (portal *Portal) backfill(source *User, messages []*database.WrappedWebMessageInfo) []id.EventID { +func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) []id.EventID { var historyBatch, newBatch mautrix.ReqBatchSend var historyBatchInfos, newBatchInfos []*wrappedInfo - firstMsgTimestamp := time.Unix(int64(messages[len(messages)-1].Message.GetMessageTimestamp()), 0) + firstMsgTimestamp := time.Unix(int64(messages[len(messages)-1].GetMessageTimestamp()), 0) historyBatch.StateEventsAtStart = make([]*event.Event, 0) newBatch.StateEventsAtStart = make([]*event.Event, 0) @@ -365,7 +365,7 @@ func (portal *Portal) backfill(source *User, messages []*database.WrappedWebMess portal.log.Infofln("Processing history sync with %d messages", len(messages)) // The messages are ordered newest to oldest, so iterate them in reverse order. for i := len(messages) - 1; i >= 0; i-- { - webMsg := messages[i].Message + webMsg := messages[i] msgType := getMessageType(webMsg.GetMessage()) if msgType == "unknown" || msgType == "ignore" || msgType == "unknown_protocol" { if msgType != "ignore" { From 83d397900f133d20671395e9fb87fc5bdd5962cf Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Tue, 5 Apr 2022 15:35:02 -0600 Subject: [PATCH 13/16] media: add better error when media couldn't be retrieved due to WA deleting it from their servers --- portal.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/portal.go b/portal.go index c98efda..d905e93 100644 --- a/portal.go +++ b/portal.go @@ -1900,7 +1900,7 @@ func shallowCopyMap(data map[string]interface{}) map[string]interface{} { return newMap } -func (portal *Portal) makeMediaBridgeFailureMessage(info *types.MessageInfo, bridgeErr error, converted *ConvertedMessage, keys *FailedMediaKeys) *ConvertedMessage { +func (portal *Portal) makeMediaBridgeFailureMessage(info *types.MessageInfo, bridgeErr error, converted *ConvertedMessage, keys *FailedMediaKeys, userFriendlyError string) *ConvertedMessage { portal.log.Errorfln("Failed to bridge media for %s: %v", info.ID, bridgeErr) if keys != nil { meta := &FailedMediaMeta{ @@ -1913,9 +1913,13 @@ func (portal *Portal) makeMediaBridgeFailureMessage(info *types.MessageInfo, bri portal.mediaErrorCache[info.ID] = meta } converted.Type = event.EventMessage + body := userFriendlyError + if body == "" { + body = fmt.Sprintf("Failed to bridge media: %v", bridgeErr) + } converted.Content = &event.MessageEventContent{ MsgType: event.MsgNotice, - Body: fmt.Sprintf("Failed to bridge media: %v", bridgeErr), + Body: body, } return converted } @@ -2164,24 +2168,24 @@ func (portal *Portal) convertMediaMessage(intent *appservice.IntentAPI, source * Type: whatsmeow.GetMediaType(msg), SHA256: msg.GetFileSha256(), EncSHA256: msg.GetFileEncSha256(), - }) + }, "Old photo or attachment. This will sync in a future update.") } else if errors.Is(err, whatsmeow.ErrNoURLPresent) { portal.log.Debugfln("No URL present error for media message %s, ignoring...", info.ID) return nil } else if errors.Is(err, whatsmeow.ErrFileLengthMismatch) || errors.Is(err, whatsmeow.ErrInvalidMediaSHA256) { portal.log.Warnfln("Mismatching media checksums in %s: %v. Ignoring because WhatsApp seems to ignore them too", info.ID, err) } else if err != nil { - return portal.makeMediaBridgeFailureMessage(info, err, converted, nil) + return portal.makeMediaBridgeFailureMessage(info, err, converted, nil, "") } err = portal.uploadMedia(intent, data, converted.Content) if err != nil { if errors.Is(err, mautrix.MTooLarge) { - return portal.makeMediaBridgeFailureMessage(info, errors.New("homeserver rejected too large file"), converted, nil) + return portal.makeMediaBridgeFailureMessage(info, errors.New("homeserver rejected too large file"), converted, nil, "") } else if httpErr, ok := err.(mautrix.HTTPError); ok && httpErr.IsStatus(413) { - return portal.makeMediaBridgeFailureMessage(info, errors.New("proxy rejected too large file"), converted, nil) + return portal.makeMediaBridgeFailureMessage(info, errors.New("proxy rejected too large file"), converted, nil, "") } else { - return portal.makeMediaBridgeFailureMessage(info, fmt.Errorf("failed to upload media: %w", err), converted, nil) + return portal.makeMediaBridgeFailureMessage(info, fmt.Errorf("failed to upload media: %w", err), converted, nil, "") } } return converted From c664e5f107d6bec3908fe2175922b4ee130e2b87 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Wed, 6 Apr 2022 08:45:45 -0600 Subject: [PATCH 14/16] Updates from CR Co-authored-by: Tulir Asokan --- commands.go | 8 +- .../2022-03-15-prioritized-backfill.go | 44 +++---- .../upgrades/2022-03-18-historysync-store.go | 117 ++++++------------ provisioning.go | 3 - user.go | 12 ++ 5 files changed, 67 insertions(+), 117 deletions(-) diff --git a/commands.go b/commands.go index 4c3f7bc..a7bdebb 100644 --- a/commands.go +++ b/commands.go @@ -606,9 +606,6 @@ func (handler *CommandHandler) CommandLogout(ce *CommandEvent) { ce.User.removeFromJIDMap(BridgeState{StateEvent: StateLoggedOut}) ce.User.DeleteConnection() ce.User.DeleteSession() - ce.Bridge.DB.BackfillQuery.DeleteAll(ce.User.MXID) - ce.Bridge.DB.HistorySyncQuery.DeleteAllConversations(ce.User.MXID) - ce.Bridge.DB.HistorySyncQuery.DeleteAllMessages(ce.User.MXID) ce.Reply("Logged out successfully.") } @@ -850,10 +847,7 @@ func (handler *CommandHandler) CommandBackfill(ce *CommandEvent) { return } if !ce.Bridge.Config.Bridge.HistorySync.Backfill { - ce.Bot.SendMessageEvent(ce.RoomID, event.EventMessage, &event.MessageEventContent{ - MsgType: event.MsgNotice, - Body: "Backfill is not enabled for this bridge.", - }) + ce.Reply("Backfill is not enabled for this bridge.") return } batchSize := 100 diff --git a/database/upgrades/2022-03-15-prioritized-backfill.go b/database/upgrades/2022-03-15-prioritized-backfill.go index 813e746..20a513e 100644 --- a/database/upgrades/2022-03-15-prioritized-backfill.go +++ b/database/upgrades/2022-03-15-prioritized-backfill.go @@ -1,12 +1,25 @@ package upgrades -import "database/sql" +import ( + "database/sql" + "fmt" +) func init() { upgrades[39] = upgrade{"Add backfill queue", func(tx *sql.Tx, ctx context) error { - _, err := tx.Exec(` + // The queue_id needs to auto-increment every insertion. For SQLite, + // INTEGER PRIMARY KEY is an alias for the ROWID, so it will + // auto-increment. See https://sqlite.org/lang_createtable.html#rowid + // For Postgres, we need to add GENERATED ALWAYS AS IDENTITY for the + // same functionality. + queueIDColumnTypeModifier := "" + if ctx.dialect == Postgres { + queueIDColumnTypeModifier = "GENERATED ALWAYS AS IDENTITY" + } + + _, err := tx.Exec(fmt.Sprintf(` CREATE TABLE backfill_queue ( - queue_id INTEGER PRIMARY KEY, + queue_id INTEGER PRIMARY KEY %s, user_mxid TEXT, type INTEGER NOT NULL, priority INTEGER NOT NULL, @@ -22,34 +35,11 @@ func init() { FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE ) - `) + `, queueIDColumnTypeModifier)) if err != nil { return err } - // The queue_id needs to auto-increment every insertion. For SQLite, - // INTEGER PRIMARY KEY is an alias for the ROWID, so it will - // auto-increment. See https://sqlite.org/lang_createtable.html#rowid - // For Postgres, we have to manually add the sequence. - if ctx.dialect == Postgres { - _, err = tx.Exec(` - CREATE SEQUENCE backfill_queue_queue_id_seq - START WITH 1 - OWNED BY backfill_queue.queue_id - `) - if err != nil { - return err - } - _, err = tx.Exec(` - ALTER TABLE backfill_queue - ALTER COLUMN queue_id - SET DEFAULT nextval('backfill_queue_queue_id_seq'::regclass) - `) - if err != nil { - return err - } - } - return err }} } diff --git a/database/upgrades/2022-03-18-historysync-store.go b/database/upgrades/2022-03-18-historysync-store.go index 15564d0..3625069 100644 --- a/database/upgrades/2022-03-18-historysync-store.go +++ b/database/upgrades/2022-03-18-historysync-store.go @@ -6,88 +6,45 @@ import ( func init() { upgrades[40] = upgrade{"Store history syncs for later backfills", func(tx *sql.Tx, ctx context) error { - if ctx.dialect == Postgres { - _, err := tx.Exec(` - CREATE TABLE history_sync_conversation ( - user_mxid TEXT, - conversation_id TEXT, - portal_jid TEXT, - portal_receiver TEXT, - last_message_timestamp TIMESTAMP, - archived BOOLEAN, - pinned INTEGER, - mute_end_time TIMESTAMP, - disappearing_mode INTEGER, - end_of_history_transfer_type INTEGER, - ephemeral_expiration INTEGER, - marked_as_unread BOOLEAN, - unread_count INTEGER, + _, err := tx.Exec(` + CREATE TABLE history_sync_conversation ( + user_mxid TEXT, + conversation_id TEXT, + portal_jid TEXT, + portal_receiver TEXT, + last_message_timestamp TIMESTAMP, + archived BOOLEAN, + pinned INTEGER, + mute_end_time TIMESTAMP, + disappearing_mode INTEGER, + end_of_history_transfer_type INTEGER, + ephemeral_expiration INTEGER, + marked_as_unread BOOLEAN, + unread_count INTEGER, - PRIMARY KEY (user_mxid, conversation_id), - FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, - FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE ON UPDATE CASCADE - ) - `) - if err != nil { - return err - } - _, err = tx.Exec(` - CREATE TABLE history_sync_message ( - user_mxid TEXT, - conversation_id TEXT, - message_id TEXT, - timestamp TIMESTAMP, - data BYTEA, + PRIMARY KEY (user_mxid, conversation_id), + FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE ON UPDATE CASCADE + ) + `) + if err != nil { + return err + } + _, err = tx.Exec(` + CREATE TABLE history_sync_message ( + user_mxid TEXT, + conversation_id TEXT, + message_id TEXT, + timestamp TIMESTAMP, + data BYTEA, - PRIMARY KEY (user_mxid, conversation_id, message_id), - FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, - FOREIGN KEY (user_mxid, conversation_id) REFERENCES history_sync_conversation(user_mxid, conversation_id) ON DELETE CASCADE - ) - `) - if err != nil { - return err - } - } else if ctx.dialect == SQLite { - _, err := tx.Exec(` - CREATE TABLE history_sync_conversation ( - user_mxid TEXT, - conversation_id TEXT, - portal_jid TEXT, - portal_receiver TEXT, - last_message_timestamp DATETIME, - archived INTEGER, - pinned INTEGER, - mute_end_time DATETIME, - disappearing_mode INTEGER, - end_of_history_transfer_type INTEGER, - ephemeral_expiration INTEGER, - marked_as_unread INTEGER, - unread_count INTEGER, - - PRIMARY KEY (user_mxid, conversation_id), - FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, - FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE ON UPDATE CASCADE - ) - `) - if err != nil { - return err - } - _, err = tx.Exec(` - CREATE TABLE history_sync_message ( - user_mxid TEXT, - conversation_id TEXT, - message_id TEXT, - timestamp DATETIME, - data BLOB, - - PRIMARY KEY (user_mxid, conversation_id, message_id), - FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, - FOREIGN KEY (user_mxid, conversation_id) REFERENCES history_sync_conversation(user_mxid, conversation_id) ON DELETE CASCADE - ) - `) - if err != nil { - return err - } + PRIMARY KEY (user_mxid, conversation_id, message_id), + FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY (user_mxid, conversation_id) REFERENCES history_sync_conversation(user_mxid, conversation_id) ON DELETE CASCADE + ) + `) + if err != nil { + return err } return nil diff --git a/provisioning.go b/provisioning.go index f149240..114e7cc 100644 --- a/provisioning.go +++ b/provisioning.go @@ -436,9 +436,6 @@ func (prov *ProvisioningAPI) Logout(w http.ResponseWriter, r *http.Request) { user.bridge.Metrics.TrackConnectionState(user.JID, false) user.removeFromJIDMap(BridgeState{StateEvent: StateLoggedOut}) user.DeleteSession() - prov.bridge.DB.BackfillQuery.DeleteAll(user.MXID) - prov.bridge.DB.HistorySyncQuery.DeleteAllConversations(user.MXID) - prov.bridge.DB.HistorySyncQuery.DeleteAllMessages(user.MXID) jsonResponse(w, http.StatusOK, Response{true, "Logged out successfully."}) } diff --git a/user.go b/user.go index db2575d..854535d 100644 --- a/user.go +++ b/user.go @@ -412,6 +412,11 @@ func (user *User) DeleteSession() { user.JID = types.EmptyJID user.Update() } + + // Delete all of the backfill and history sync data. + user.bridge.DB.BackfillQuery.DeleteAll(user.MXID) + user.bridge.DB.HistorySyncQuery.DeleteAllConversations(user.MXID) + user.bridge.DB.HistorySyncQuery.DeleteAllMessages(user.MXID) } func (user *User) IsConnected() bool { @@ -563,11 +568,18 @@ func (user *User) HandleEvent(event interface{}) { go user.tryAutomaticDoublePuppeting() case *events.OfflineSyncPreview: user.log.Infofln("Server says it's going to send %d messages and %d receipts that were missed during downtime", v.Messages, v.Receipts) + go user.sendBridgeState(BridgeState{ + StateEvent: StateBackfilling, + Message: fmt.Sprintf("backfilling %d messages and %d receipts", v.Messages, v.Receipts), + }) case *events.OfflineSyncCompleted: if !user.PhoneRecentlySeen(true) { user.log.Infofln("Offline sync completed, but phone last seen date is still %s - sending phone offline bridge status", user.PhoneLastSeen) go user.sendBridgeState(BridgeState{StateEvent: StateTransientDisconnect, Error: WAPhoneOffline}) } else { + if user.GetPrevBridgeState().StateEvent == StateBackfilling { + user.log.Infoln("Offline sync completed") + } go user.sendBridgeState(BridgeState{StateEvent: StateConnected}) } case *events.AppStateSyncComplete: From 78c6d5729980dc302960515cb63e24dcabdc00e0 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Wed, 6 Apr 2022 09:00:48 -0600 Subject: [PATCH 15/16] historysync: only save when backfill is enabled --- historysync.go | 46 ++++++++++++++++++++++++---------------------- user.go | 8 ++++++-- 2 files changed, 30 insertions(+), 24 deletions(-) diff --git a/historysync.go b/historysync.go index 8790b9c..31978fe 100644 --- a/historysync.go +++ b/historysync.go @@ -43,30 +43,32 @@ type wrappedInfo struct { } func (user *User) handleHistorySyncsLoop() { - reCheckQueue := make(chan bool, 1) - if user.bridge.Config.Bridge.HistorySync.Backfill { - // Start the backfill queue. - user.BackfillQueue = &BackfillQueue{ - BackfillQuery: user.bridge.DB.BackfillQuery, - ImmediateBackfillRequests: make(chan *database.Backfill, 1), - DeferredBackfillRequests: make(chan *database.Backfill, 1), - ReCheckQueue: make(chan bool, 1), - log: user.log.Sub("BackfillQueue"), - } - reCheckQueue = user.BackfillQueue.ReCheckQueue - - // Immediate backfills can be done in parallel - for i := 0; i < user.bridge.Config.Bridge.HistorySync.Immediate.WorkerCount; i++ { - go user.handleBackfillRequestsLoop(user.BackfillQueue.ImmediateBackfillRequests) - } - - // Deferred backfills should be handled synchronously so as not to - // overload the homeserver. Users can configure their backfill stages - // to be more or less aggressive with backfilling at this stage. - go user.handleBackfillRequestsLoop(user.BackfillQueue.DeferredBackfillRequests) - go user.BackfillQueue.RunLoops(user) + if !user.bridge.Config.Bridge.HistorySync.Backfill { + return } + reCheckQueue := make(chan bool, 1) + // Start the backfill queue. + user.BackfillQueue = &BackfillQueue{ + BackfillQuery: user.bridge.DB.BackfillQuery, + ImmediateBackfillRequests: make(chan *database.Backfill, 1), + DeferredBackfillRequests: make(chan *database.Backfill, 1), + ReCheckQueue: make(chan bool, 1), + log: user.log.Sub("BackfillQueue"), + } + reCheckQueue = user.BackfillQueue.ReCheckQueue + + // Immediate backfills can be done in parallel + for i := 0; i < user.bridge.Config.Bridge.HistorySync.Immediate.WorkerCount; i++ { + go user.handleBackfillRequestsLoop(user.BackfillQueue.ImmediateBackfillRequests) + } + + // Deferred backfills should be handled synchronously so as not to + // overload the homeserver. Users can configure their backfill stages + // to be more or less aggressive with backfilling at this stage. + go user.handleBackfillRequestsLoop(user.BackfillQueue.DeferredBackfillRequests) + go user.BackfillQueue.RunLoops(user) + // Always save the history syncs for the user. If they want to enable // backfilling in the future, we will have it in the database. for evt := range user.historySyncs { diff --git a/user.go b/user.go index 854535d..46dbefa 100644 --- a/user.go +++ b/user.go @@ -188,7 +188,9 @@ func (bridge *Bridge) NewUser(dbUser *database.User) *User { user.RelayWhitelisted = user.bridge.Config.Bridge.Permissions.IsRelayWhitelisted(user.MXID) user.Whitelisted = user.bridge.Config.Bridge.Permissions.IsWhitelisted(user.MXID) user.Admin = user.bridge.Config.Bridge.Permissions.IsAdmin(user.MXID) - go user.handleHistorySyncsLoop() + if user.bridge.Config.Bridge.HistorySync.Backfill { + go user.handleHistorySyncsLoop() + } return user } @@ -692,7 +694,9 @@ func (user *User) HandleEvent(event interface{}) { portal := user.GetPortalByMessageSource(v.Info.MessageSource) portal.messages <- PortalMessage{undecryptable: v, source: user} case *events.HistorySync: - user.historySyncs <- v + if user.bridge.Config.Bridge.HistorySync.Backfill { + user.historySyncs <- v + } case *events.Mute: portal := user.GetPortalByJID(v.JID) if portal != nil { From bd26fc4af994cdf6fdaf6c4a8fe87e2ae294026e Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Thu, 7 Apr 2022 09:20:12 -0600 Subject: [PATCH 16/16] Minor cleanups from CR Co-authored-by: Tulir Asokan --- database/message.go | 1 - database/upgrades/2022-03-15-prioritized-backfill.go | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/database/message.go b/database/message.go index ad90cba..e782d9f 100644 --- a/database/message.go +++ b/database/message.go @@ -143,7 +143,6 @@ type Message struct { db *Database log log.Logger - ID int Chat PortalKey JID types.MessageID MXID id.EventID diff --git a/database/upgrades/2022-03-15-prioritized-backfill.go b/database/upgrades/2022-03-15-prioritized-backfill.go index 20a513e..c7a3892 100644 --- a/database/upgrades/2022-03-15-prioritized-backfill.go +++ b/database/upgrades/2022-03-15-prioritized-backfill.go @@ -23,8 +23,8 @@ func init() { user_mxid TEXT, type INTEGER NOT NULL, priority INTEGER NOT NULL, - portal_jid VARCHAR(255), - portal_receiver VARCHAR(255), + portal_jid TEXT, + portal_receiver TEXT, time_start TIMESTAMP, time_end TIMESTAMP, max_batch_events INTEGER NOT NULL,