diff --git a/backfillqueue.go b/backfillqueue.go
new file mode 100644
index 0000000..fb3fe3f
--- /dev/null
+++ b/backfillqueue.go
@@ -0,0 +1,69 @@
+// mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
+// Copyright (C) 2021 Tulir Asokan, Sumner Evans
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package main
+
+import (
+ "time"
+
+ log "maunium.net/go/maulogger/v2"
+ "maunium.net/go/mautrix-whatsapp/database"
+)
+
+type BackfillQueue struct {
+ BackfillQuery *database.BackfillQuery
+ ImmediateBackfillRequests chan *database.Backfill
+ DeferredBackfillRequests chan *database.Backfill
+ ReCheckQueue chan bool
+
+ log log.Logger
+}
+
+func (bq *BackfillQueue) RunLoops(user *User) {
+ go bq.immediateBackfillLoop(user)
+ bq.deferredBackfillLoop(user)
+}
+
+func (bq *BackfillQueue) immediateBackfillLoop(user *User) {
+ for {
+ if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); backfill != nil {
+ bq.ImmediateBackfillRequests <- backfill
+ backfill.MarkDone()
+ } else {
+ select {
+ case <-bq.ReCheckQueue:
+ case <-time.After(10 * time.Second):
+ }
+ }
+ }
+}
+
+func (bq *BackfillQueue) deferredBackfillLoop(user *User) {
+ for {
+ // Finish all immediate backfills before doing the deferred ones.
+ if immediate := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); immediate != nil {
+ time.Sleep(10 * time.Second)
+ continue
+ }
+
+ if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillDeferred); backfill != nil {
+ bq.DeferredBackfillRequests <- backfill
+ backfill.MarkDone()
+ } else {
+ time.Sleep(10 * time.Second)
+ }
+ }
+}
diff --git a/commands.go b/commands.go
index 6d9099d..a7bdebb 100644
--- a/commands.go
+++ b/commands.go
@@ -31,6 +31,7 @@ import (
"github.com/tidwall/gjson"
"maunium.net/go/maulogger/v2"
+ "maunium.net/go/mautrix-whatsapp/database"
"go.mau.fi/whatsmeow"
"go.mau.fi/whatsmeow/appstate"
@@ -140,7 +141,7 @@ func (handler *CommandHandler) CommandMux(ce *CommandEvent) {
handler.CommandLogout(ce)
case "toggle":
handler.CommandToggle(ce)
- case "set-relay", "unset-relay", "login-matrix", "sync", "list", "search", "open", "pm", "invite-link", "resolve", "resolve-link", "join", "create", "accept":
+ case "set-relay", "unset-relay", "login-matrix", "sync", "list", "search", "open", "pm", "invite-link", "resolve", "resolve-link", "join", "create", "accept", "backfill":
if !ce.User.HasSession() {
ce.Reply("You are not logged in. Use the `login` command to log into WhatsApp.")
return
@@ -176,6 +177,8 @@ func (handler *CommandHandler) CommandMux(ce *CommandEvent) {
handler.CommandCreate(ce)
case "accept":
handler.CommandAccept(ce)
+ case "backfill":
+ handler.CommandBackfill(ce)
}
default:
ce.Reply("Unknown command, use the `help` command for help.")
@@ -745,6 +748,7 @@ func (handler *CommandHandler) CommandHelp(ce *CommandEvent) {
cmdPrefix + cmdSetPowerLevelHelp,
cmdPrefix + cmdDeletePortalHelp,
cmdPrefix + cmdDeleteAllPortalsHelp,
+ cmdPrefix + cmdBackfillHelp,
}, "\n* "))
}
@@ -835,6 +839,40 @@ func (handler *CommandHandler) CommandDeleteAllPortals(ce *CommandEvent) {
}()
}
+const cmdBackfillHelp = `backfill [batch size] [batch delay] - Backfill all messages the portal.`
+
+func (handler *CommandHandler) CommandBackfill(ce *CommandEvent) {
+ if ce.Portal == nil {
+ ce.Reply("This is not a portal room")
+ return
+ }
+ if !ce.Bridge.Config.Bridge.HistorySync.Backfill {
+ ce.Reply("Backfill is not enabled for this bridge.")
+ return
+ }
+ batchSize := 100
+ batchDelay := 5
+ if len(ce.Args) >= 1 {
+ var err error
+ batchSize, err = strconv.Atoi(ce.Args[0])
+ if err != nil || batchSize < 1 {
+ ce.Reply("\"%s\" isn't a valid batch size", ce.Args[0])
+ return
+ }
+ }
+ if len(ce.Args) >= 2 {
+ var err error
+ batchDelay, err = strconv.Atoi(ce.Args[0])
+ if err != nil || batchSize < 0 {
+ ce.Reply("\"%s\" isn't a valid batch delay", ce.Args[1])
+ return
+ }
+ }
+ backfill := ce.Portal.bridge.DB.BackfillQuery.NewWithValues(ce.User.MXID, database.BackfillImmediate, 0, &ce.Portal.Key, nil, nil, batchSize, -1, batchDelay)
+ backfill.Insert()
+ ce.User.BackfillQueue.ReCheckQueue <- true
+}
+
const cmdListHelp = `list [page] [items per page] - Get a list of all contacts and groups.`
func matchesQuery(str string, query string) bool {
@@ -1015,7 +1053,7 @@ func (handler *CommandHandler) CommandOpen(ce *CommandEvent) {
portal.UpdateMatrixRoom(ce.User, info)
ce.Reply("Portal room synced.")
} else {
- err = portal.CreateMatrixRoom(ce.User, info, true)
+ err = portal.CreateMatrixRoom(ce.User, info, true, true)
if err != nil {
ce.Reply("Failed to create room: %v", err)
} else {
diff --git a/config/bridge.go b/config/bridge.go
index 7ffe458..840472d 100644
--- a/config/bridge.go
+++ b/config/bridge.go
@@ -28,6 +28,12 @@ import (
"maunium.net/go/mautrix/id"
)
+type DeferredConfig struct {
+ StartDaysAgo int `yaml:"start_days_ago"`
+ MaxBatchEvents int `yaml:"max_batch_events"`
+ BatchDelay int `yaml:"batch_delay"`
+}
+
type BridgeConfig struct {
UsernameTemplate string `yaml:"username_template"`
DisplaynameTemplate string `yaml:"displayname_template"`
@@ -40,11 +46,18 @@ type BridgeConfig struct {
IdentityChangeNotices bool `yaml:"identity_change_notices"`
HistorySync struct {
- CreatePortals bool `yaml:"create_portals"`
- MaxAge int64 `yaml:"max_age"`
- Backfill bool `yaml:"backfill"`
- DoublePuppetBackfill bool `yaml:"double_puppet_backfill"`
- RequestFullSync bool `yaml:"request_full_sync"`
+ CreatePortals bool `yaml:"create_portals"`
+ Backfill bool `yaml:"backfill"`
+ DoublePuppetBackfill bool `yaml:"double_puppet_backfill"`
+ RequestFullSync bool `yaml:"request_full_sync"`
+ MaxInitialConversations int `yaml:"max_initial_conversations"`
+
+ Immediate struct {
+ WorkerCount int `yaml:"worker_count"`
+ MaxEvents int `yaml:"max_events"`
+ } `yaml:"immediate"`
+
+ Deferred []DeferredConfig `yaml:"deferred"`
} `yaml:"history_sync"`
UserAvatarSync bool `yaml:"user_avatar_sync"`
BridgeMatrixLeave bool `yaml:"bridge_matrix_leave"`
diff --git a/config/upgrade.go b/config/upgrade.go
index 123ea30..fdf06a0 100644
--- a/config/upgrade.go
+++ b/config/upgrade.go
@@ -78,10 +78,13 @@ func (helper *UpgradeHelper) doUpgrade() {
helper.Copy(Bool, "bridge", "call_start_notices")
helper.Copy(Bool, "bridge", "identity_change_notices")
helper.Copy(Bool, "bridge", "history_sync", "create_portals")
- helper.Copy(Int, "bridge", "history_sync", "max_age")
helper.Copy(Bool, "bridge", "history_sync", "backfill")
helper.Copy(Bool, "bridge", "history_sync", "double_puppet_backfill")
helper.Copy(Bool, "bridge", "history_sync", "request_full_sync")
+ helper.Copy(Int, "bridge", "history_sync", "max_initial_conversations")
+ helper.Copy(Int, "bridge", "history_sync", "immediate", "worker_count")
+ helper.Copy(Int, "bridge", "history_sync", "immediate", "max_events")
+ helper.Copy(List, "bridge", "history_sync", "deferred")
helper.Copy(Bool, "bridge", "user_avatar_sync")
helper.Copy(Bool, "bridge", "bridge_matrix_leave")
helper.Copy(Bool, "bridge", "sync_with_custom_puppets")
diff --git a/database/backfillqueue.go b/database/backfillqueue.go
new file mode 100644
index 0000000..b6c82a8
--- /dev/null
+++ b/database/backfillqueue.go
@@ -0,0 +1,151 @@
+// mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
+// Copyright (C) 2021 Tulir Asokan, Sumner Evans
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package database
+
+import (
+ "database/sql"
+ "errors"
+ "time"
+
+ log "maunium.net/go/maulogger/v2"
+ "maunium.net/go/mautrix/id"
+)
+
+type BackfillType int
+
+const (
+ BackfillImmediate BackfillType = 0
+ BackfillDeferred = 1
+)
+
+type BackfillQuery struct {
+ db *Database
+ log log.Logger
+}
+
+func (bq *BackfillQuery) New() *Backfill {
+ return &Backfill{
+ db: bq.db,
+ log: bq.log,
+ Portal: &PortalKey{},
+ }
+}
+
+func (bq *BackfillQuery) NewWithValues(userID id.UserID, backfillType BackfillType, priority int, portal *PortalKey, timeStart *time.Time, timeEnd *time.Time, maxBatchEvents, maxTotalEvents, batchDelay int) *Backfill {
+ return &Backfill{
+ db: bq.db,
+ log: bq.log,
+ UserID: userID,
+ BackfillType: backfillType,
+ Priority: priority,
+ Portal: portal,
+ TimeStart: timeStart,
+ TimeEnd: timeEnd,
+ MaxBatchEvents: maxBatchEvents,
+ MaxTotalEvents: maxTotalEvents,
+ BatchDelay: batchDelay,
+ }
+}
+
+const (
+ getNextBackfillQuery = `
+ SELECT queue_id, user_mxid, type, priority, portal_jid, portal_receiver, time_start, time_end, max_batch_events, max_total_events, batch_delay
+ FROM backfill_queue
+ WHERE user_mxid=$1
+ AND type=$2
+ AND completed_at IS NULL
+ ORDER BY priority, queue_id
+ LIMIT 1
+ `
+)
+
+/// Returns the next backfill to perform
+func (bq *BackfillQuery) GetNext(userID id.UserID, backfillType BackfillType) (backfill *Backfill) {
+ rows, err := bq.db.Query(getNextBackfillQuery, userID, backfillType)
+ defer rows.Close()
+ if err != nil || rows == nil {
+ bq.log.Error(err)
+ return
+ }
+ if rows.Next() {
+ backfill = bq.New().Scan(rows)
+ }
+ return
+}
+
+func (bq *BackfillQuery) DeleteAll(userID id.UserID) error {
+ _, err := bq.db.Exec("DELETE FROM backfill_queue WHERE user_mxid=$1", userID)
+ return err
+}
+
+type Backfill struct {
+ db *Database
+ log log.Logger
+
+ // Fields
+ QueueID int
+ UserID id.UserID
+ BackfillType BackfillType
+ Priority int
+ Portal *PortalKey
+ TimeStart *time.Time
+ TimeEnd *time.Time
+ MaxBatchEvents int
+ MaxTotalEvents int
+ BatchDelay int
+ CompletedAt *time.Time
+}
+
+func (b *Backfill) Scan(row Scannable) *Backfill {
+ err := row.Scan(&b.QueueID, &b.UserID, &b.BackfillType, &b.Priority, &b.Portal.JID, &b.Portal.Receiver, &b.TimeStart, &b.TimeEnd, &b.MaxBatchEvents, &b.MaxTotalEvents, &b.BatchDelay)
+ if err != nil {
+ if !errors.Is(err, sql.ErrNoRows) {
+ b.log.Errorln("Database scan failed:", err)
+ }
+ return nil
+ }
+ return b
+}
+
+func (b *Backfill) Insert() {
+ rows, err := b.db.Query(`
+ INSERT INTO backfill_queue
+ (user_mxid, type, priority, portal_jid, portal_receiver, time_start, time_end, max_batch_events, max_total_events, batch_delay, completed_at)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
+ RETURNING queue_id
+ `, b.UserID, b.BackfillType, b.Priority, b.Portal.JID, b.Portal.Receiver, b.TimeStart, b.TimeEnd, b.MaxBatchEvents, b.MaxTotalEvents, b.BatchDelay, b.CompletedAt)
+ defer rows.Close()
+ if err != nil || !rows.Next() {
+ b.log.Warnfln("Failed to insert %v/%s with priority %d: %v", b.BackfillType, b.Portal.JID, b.Priority, err)
+ return
+ }
+ err = rows.Scan(&b.QueueID)
+ if err != nil {
+ b.log.Warnfln("Failed to insert %s/%s with priority %s: %v", b.BackfillType, b.Portal.JID, b.Priority, err)
+ }
+}
+
+func (b *Backfill) MarkDone() {
+ if b.QueueID == 0 {
+ b.log.Errorf("Cannot delete backfill without queue_id. Maybe it wasn't actually inserted in the database?")
+ return
+ }
+ _, err := b.db.Exec("UPDATE backfill_queue SET completed_at=$1 WHERE queue_id=$2", time.Now(), b.QueueID)
+ if err != nil {
+ b.log.Warnfln("Failed to mark %s/%s as complete: %v", b.BackfillType, b.Priority, err)
+ }
+}
diff --git a/database/database.go b/database/database.go
index 7640871..cb64a12 100644
--- a/database/database.go
+++ b/database/database.go
@@ -46,6 +46,8 @@ type Database struct {
Reaction *ReactionQuery
DisappearingMessage *DisappearingMessageQuery
+ BackfillQuery *BackfillQuery
+ HistorySyncQuery *HistorySyncQuery
}
func New(cfg config.DatabaseConfig, baseLog log.Logger) (*Database, error) {
@@ -83,6 +85,14 @@ func New(cfg config.DatabaseConfig, baseLog log.Logger) (*Database, error) {
db: db,
log: db.log.Sub("DisappearingMessage"),
}
+ db.BackfillQuery = &BackfillQuery{
+ db: db,
+ log: db.log.Sub("Backfill"),
+ }
+ db.HistorySyncQuery = &HistorySyncQuery{
+ db: db,
+ log: db.log.Sub("HistorySync"),
+ }
db.SetMaxOpenConns(cfg.MaxOpenConns)
db.SetMaxIdleConns(cfg.MaxIdleConns)
diff --git a/database/historysync.go b/database/historysync.go
new file mode 100644
index 0000000..efbc784
--- /dev/null
+++ b/database/historysync.go
@@ -0,0 +1,317 @@
+// mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
+// Copyright (C) 2022 Tulir Asokan, Sumner Evans
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package database
+
+import (
+ "database/sql"
+ "errors"
+ "fmt"
+ "strings"
+ "time"
+
+ waProto "go.mau.fi/whatsmeow/binary/proto"
+ "google.golang.org/protobuf/proto"
+
+ _ "github.com/mattn/go-sqlite3"
+ log "maunium.net/go/maulogger/v2"
+ "maunium.net/go/mautrix/id"
+)
+
+type HistorySyncQuery struct {
+ db *Database
+ log log.Logger
+}
+
+type HistorySyncConversation struct {
+ db *Database
+ log log.Logger
+
+ UserID id.UserID
+ ConversationID string
+ PortalKey *PortalKey
+ LastMessageTimestamp time.Time
+ MuteEndTime time.Time
+ Archived bool
+ Pinned uint32
+ DisappearingMode waProto.DisappearingMode_DisappearingModeInitiator
+ EndOfHistoryTransferType waProto.Conversation_ConversationEndOfHistoryTransferType
+ EphemeralExpiration *uint32
+ MarkedAsUnread bool
+ UnreadCount uint32
+}
+
+func (hsq *HistorySyncQuery) NewConversation() *HistorySyncConversation {
+ return &HistorySyncConversation{
+ db: hsq.db,
+ log: hsq.log,
+ PortalKey: &PortalKey{},
+ }
+}
+
+func (hsq *HistorySyncQuery) NewConversationWithValues(
+ userID id.UserID,
+ conversationID string,
+ portalKey *PortalKey,
+ lastMessageTimestamp,
+ muteEndTime uint64,
+ archived bool,
+ pinned uint32,
+ disappearingMode waProto.DisappearingMode_DisappearingModeInitiator,
+ endOfHistoryTransferType waProto.Conversation_ConversationEndOfHistoryTransferType,
+ ephemeralExpiration *uint32,
+ markedAsUnread bool,
+ unreadCount uint32) *HistorySyncConversation {
+ return &HistorySyncConversation{
+ db: hsq.db,
+ log: hsq.log,
+ UserID: userID,
+ ConversationID: conversationID,
+ PortalKey: portalKey,
+ LastMessageTimestamp: time.Unix(int64(lastMessageTimestamp), 0),
+ MuteEndTime: time.Unix(int64(muteEndTime), 0),
+ Archived: archived,
+ Pinned: pinned,
+ DisappearingMode: disappearingMode,
+ EndOfHistoryTransferType: endOfHistoryTransferType,
+ EphemeralExpiration: ephemeralExpiration,
+ MarkedAsUnread: markedAsUnread,
+ UnreadCount: unreadCount,
+ }
+}
+
+const (
+ getNMostRecentConversations = `
+ SELECT user_mxid, conversation_id, portal_jid, portal_receiver, last_message_timestamp, archived, pinned, mute_end_time, disappearing_mode, end_of_history_transfer_type, ephemeral_expiration, marked_as_unread, unread_count
+ FROM history_sync_conversation
+ WHERE user_mxid=$1
+ ORDER BY last_message_timestamp DESC
+ LIMIT $2
+ `
+ getConversationByPortal = `
+ SELECT user_mxid, conversation_id, portal_jid, portal_receiver, last_message_timestamp, archived, pinned, mute_end_time, disappearing_mode, end_of_history_transfer_type, ephemeral_expiration, marked_as_unread, unread_count
+ FROM history_sync_conversation
+ WHERE user_mxid=$1
+ AND portal_jid=$2
+ AND portal_receiver=$3
+ `
+)
+
+func (hsc *HistorySyncConversation) Upsert() {
+ _, err := hsc.db.Exec(`
+ INSERT INTO history_sync_conversation (user_mxid, conversation_id, portal_jid, portal_receiver, last_message_timestamp, archived, pinned, mute_end_time, disappearing_mode, end_of_history_transfer_type, ephemeral_expiration, marked_as_unread, unread_count)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
+ ON CONFLICT (user_mxid, conversation_id)
+ DO UPDATE SET
+ portal_jid=EXCLUDED.portal_jid,
+ portal_receiver=EXCLUDED.portal_receiver,
+ last_message_timestamp=CASE
+ WHEN EXCLUDED.last_message_timestamp > history_sync_conversation.last_message_timestamp THEN EXCLUDED.last_message_timestamp
+ ELSE history_sync_conversation.last_message_timestamp
+ END,
+ archived=EXCLUDED.archived,
+ pinned=EXCLUDED.pinned,
+ mute_end_time=EXCLUDED.mute_end_time,
+ disappearing_mode=EXCLUDED.disappearing_mode,
+ end_of_history_transfer_type=EXCLUDED.end_of_history_transfer_type,
+ ephemeral_expiration=EXCLUDED.ephemeral_expiration,
+ marked_as_unread=EXCLUDED.marked_as_unread,
+ unread_count=EXCLUDED.unread_count
+ `,
+ hsc.UserID,
+ hsc.ConversationID,
+ hsc.PortalKey.JID.String(),
+ hsc.PortalKey.Receiver.String(),
+ hsc.LastMessageTimestamp,
+ hsc.Archived,
+ hsc.Pinned,
+ hsc.MuteEndTime,
+ hsc.DisappearingMode,
+ hsc.EndOfHistoryTransferType,
+ hsc.EphemeralExpiration,
+ hsc.MarkedAsUnread,
+ hsc.UnreadCount)
+ if err != nil {
+ hsc.log.Warnfln("Failed to insert history sync conversation %s/%s: %v", hsc.UserID, hsc.ConversationID, err)
+ }
+}
+
+func (hsc *HistorySyncConversation) Scan(row Scannable) *HistorySyncConversation {
+ err := row.Scan(
+ &hsc.UserID,
+ &hsc.ConversationID,
+ &hsc.PortalKey.JID,
+ &hsc.PortalKey.Receiver,
+ &hsc.LastMessageTimestamp,
+ &hsc.Archived,
+ &hsc.Pinned,
+ &hsc.MuteEndTime,
+ &hsc.DisappearingMode,
+ &hsc.EndOfHistoryTransferType,
+ &hsc.EphemeralExpiration,
+ &hsc.MarkedAsUnread,
+ &hsc.UnreadCount)
+ if err != nil {
+ if !errors.Is(err, sql.ErrNoRows) {
+ hsc.log.Errorln("Database scan failed:", err)
+ }
+ return nil
+ }
+ return hsc
+}
+
+func (hsq *HistorySyncQuery) GetNMostRecentConversations(userID id.UserID, n int) (conversations []*HistorySyncConversation) {
+ rows, err := hsq.db.Query(getNMostRecentConversations, userID, n)
+ defer rows.Close()
+ if err != nil || rows == nil {
+ return nil
+ }
+ for rows.Next() {
+ conversations = append(conversations, hsq.NewConversation().Scan(rows))
+ }
+ return
+}
+
+func (hsq *HistorySyncQuery) GetConversation(userID id.UserID, portalKey *PortalKey) (conversation *HistorySyncConversation) {
+ rows, err := hsq.db.Query(getConversationByPortal, userID, portalKey.JID, portalKey.Receiver)
+ defer rows.Close()
+ if err != nil || rows == nil {
+ return nil
+ }
+ if rows.Next() {
+ conversation = hsq.NewConversation().Scan(rows)
+ }
+ return
+}
+
+func (hsq *HistorySyncQuery) DeleteAllConversations(userID id.UserID) error {
+ _, err := hsq.db.Exec("DELETE FROM history_sync_conversation WHERE user_mxid=$1", userID)
+ return err
+}
+
+const (
+ getMessagesBetween = `
+ SELECT data
+ FROM history_sync_message
+ WHERE user_mxid=$1
+ AND conversation_id=$2
+ %s
+ ORDER BY timestamp DESC
+ %s
+ `
+ deleteMessages = `
+ DELETE FROM history_sync_message
+ WHERE %s
+ `
+)
+
+type HistorySyncMessage struct {
+ db *Database
+ log log.Logger
+
+ UserID id.UserID
+ ConversationID string
+ MessageID string
+ Timestamp time.Time
+ Data []byte
+}
+
+func (hsq *HistorySyncQuery) NewMessageWithValues(userID id.UserID, conversationID, messageID string, message *waProto.HistorySyncMsg) (*HistorySyncMessage, error) {
+ msgData, err := proto.Marshal(message)
+ if err != nil {
+ return nil, err
+ }
+ return &HistorySyncMessage{
+ db: hsq.db,
+ log: hsq.log,
+ UserID: userID,
+ ConversationID: conversationID,
+ MessageID: messageID,
+ Timestamp: time.Unix(int64(message.Message.GetMessageTimestamp()), 0),
+ Data: msgData,
+ }, nil
+}
+
+func (hsm *HistorySyncMessage) Insert() {
+ _, err := hsm.db.Exec(`
+ INSERT INTO history_sync_message (user_mxid, conversation_id, message_id, timestamp, data)
+ VALUES ($1, $2, $3, $4, $5)
+ ON CONFLICT (user_mxid, conversation_id, message_id) DO NOTHING
+ `, hsm.UserID, hsm.ConversationID, hsm.MessageID, hsm.Timestamp, hsm.Data)
+ if err != nil {
+ hsm.log.Warnfln("Failed to insert history sync message %s/%s: %v", hsm.ConversationID, hsm.Timestamp, err)
+ }
+}
+
+func (hsq *HistorySyncQuery) GetMessagesBetween(userID id.UserID, conversationID string, startTime, endTime *time.Time, limit int) (messages []*waProto.WebMessageInfo) {
+ whereClauses := ""
+ args := []interface{}{userID, conversationID}
+ argNum := 3
+ if startTime != nil {
+ whereClauses += fmt.Sprintf(" AND timestamp >= $%d", argNum)
+ args = append(args, startTime)
+ argNum++
+ }
+ if endTime != nil {
+ whereClauses += fmt.Sprintf(" AND timestamp <= $%d", argNum)
+ args = append(args, endTime)
+ }
+
+ limitClause := ""
+ if limit > 0 {
+ limitClause = fmt.Sprintf("LIMIT %d", limit)
+ }
+
+ rows, err := hsq.db.Query(fmt.Sprintf(getMessagesBetween, whereClauses, limitClause), args...)
+ defer rows.Close()
+ if err != nil || rows == nil {
+ return nil
+ }
+
+ var msgData []byte
+ for rows.Next() {
+ err := rows.Scan(&msgData)
+ if err != nil {
+ hsq.log.Error("Database scan failed: %v", err)
+ continue
+ }
+ var historySyncMsg waProto.HistorySyncMsg
+ err = proto.Unmarshal(msgData, &historySyncMsg)
+ if err != nil {
+ hsq.log.Errorf("Failed to unmarshal history sync message: %v", err)
+ continue
+ }
+ messages = append(messages, historySyncMsg.Message)
+ }
+ return
+}
+
+func (hsq *HistorySyncQuery) DeleteMessages(userID id.UserID, conversationID string, messages []*waProto.WebMessageInfo) error {
+ whereClauses := []string{}
+ preparedStatementArgs := []interface{}{userID, conversationID}
+ for i, msg := range messages {
+ whereClauses = append(whereClauses, fmt.Sprintf("(user_mxid=$1 AND conversation_id=$2 AND message_id=$%d)", i+3))
+ preparedStatementArgs = append(preparedStatementArgs, msg.GetKey().GetId())
+ }
+
+ _, err := hsq.db.Exec(fmt.Sprintf(deleteMessages, strings.Join(whereClauses, " OR ")), preparedStatementArgs...)
+ return err
+}
+
+func (hsq *HistorySyncQuery) DeleteAllMessages(userID id.UserID) error {
+ _, err := hsq.db.Exec("DELETE FROM history_sync_message WHERE user_mxid=$1", userID)
+ return err
+}
diff --git a/database/upgrades/2022-03-15-prioritized-backfill.go b/database/upgrades/2022-03-15-prioritized-backfill.go
new file mode 100644
index 0000000..c7a3892
--- /dev/null
+++ b/database/upgrades/2022-03-15-prioritized-backfill.go
@@ -0,0 +1,45 @@
+package upgrades
+
+import (
+ "database/sql"
+ "fmt"
+)
+
+func init() {
+ upgrades[39] = upgrade{"Add backfill queue", func(tx *sql.Tx, ctx context) error {
+ // The queue_id needs to auto-increment every insertion. For SQLite,
+ // INTEGER PRIMARY KEY is an alias for the ROWID, so it will
+ // auto-increment. See https://sqlite.org/lang_createtable.html#rowid
+ // For Postgres, we need to add GENERATED ALWAYS AS IDENTITY for the
+ // same functionality.
+ queueIDColumnTypeModifier := ""
+ if ctx.dialect == Postgres {
+ queueIDColumnTypeModifier = "GENERATED ALWAYS AS IDENTITY"
+ }
+
+ _, err := tx.Exec(fmt.Sprintf(`
+ CREATE TABLE backfill_queue (
+ queue_id INTEGER PRIMARY KEY %s,
+ user_mxid TEXT,
+ type INTEGER NOT NULL,
+ priority INTEGER NOT NULL,
+ portal_jid TEXT,
+ portal_receiver TEXT,
+ time_start TIMESTAMP,
+ time_end TIMESTAMP,
+ max_batch_events INTEGER NOT NULL,
+ max_total_events INTEGER,
+ batch_delay INTEGER,
+ completed_at TIMESTAMP,
+
+ FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE,
+ FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE
+ )
+ `, queueIDColumnTypeModifier))
+ if err != nil {
+ return err
+ }
+
+ return err
+ }}
+}
diff --git a/database/upgrades/2022-03-18-historysync-store.go b/database/upgrades/2022-03-18-historysync-store.go
new file mode 100644
index 0000000..3625069
--- /dev/null
+++ b/database/upgrades/2022-03-18-historysync-store.go
@@ -0,0 +1,52 @@
+package upgrades
+
+import (
+ "database/sql"
+)
+
+func init() {
+ upgrades[40] = upgrade{"Store history syncs for later backfills", func(tx *sql.Tx, ctx context) error {
+ _, err := tx.Exec(`
+ CREATE TABLE history_sync_conversation (
+ user_mxid TEXT,
+ conversation_id TEXT,
+ portal_jid TEXT,
+ portal_receiver TEXT,
+ last_message_timestamp TIMESTAMP,
+ archived BOOLEAN,
+ pinned INTEGER,
+ mute_end_time TIMESTAMP,
+ disappearing_mode INTEGER,
+ end_of_history_transfer_type INTEGER,
+ ephemeral_expiration INTEGER,
+ marked_as_unread BOOLEAN,
+ unread_count INTEGER,
+
+ PRIMARY KEY (user_mxid, conversation_id),
+ FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE,
+ FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE ON UPDATE CASCADE
+ )
+ `)
+ if err != nil {
+ return err
+ }
+ _, err = tx.Exec(`
+ CREATE TABLE history_sync_message (
+ user_mxid TEXT,
+ conversation_id TEXT,
+ message_id TEXT,
+ timestamp TIMESTAMP,
+ data BYTEA,
+
+ PRIMARY KEY (user_mxid, conversation_id, message_id),
+ FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE,
+ FOREIGN KEY (user_mxid, conversation_id) REFERENCES history_sync_conversation(user_mxid, conversation_id) ON DELETE CASCADE
+ )
+ `)
+ if err != nil {
+ return err
+ }
+
+ return nil
+ }}
+}
diff --git a/database/upgrades/upgrades.go b/database/upgrades/upgrades.go
index f4b1ded..98044a4 100644
--- a/database/upgrades/upgrades.go
+++ b/database/upgrades/upgrades.go
@@ -40,7 +40,7 @@ type upgrade struct {
fn upgradeFunc
}
-const NumberOfUpgrades = 39
+const NumberOfUpgrades = 41
var upgrades [NumberOfUpgrades]upgrade
diff --git a/example-config.yaml b/example-config.yaml
index 93100a6..1ddf877 100644
--- a/example-config.yaml
+++ b/example-config.yaml
@@ -115,14 +115,10 @@ bridge:
# Should another user's cryptographic identity changing send a message to Matrix?
identity_change_notices: false
portal_message_buffer: 128
- # Settings for handling history sync payloads. These settings only apply right after login,
- # because the phone only sends the history sync data once, and there's no way to re-request it
- # (other than logging out and back in again).
+ # Settings for handling history sync payloads.
history_sync:
# Should the bridge create portals for chats in the history sync payload?
create_portals: true
- # Maximum age of chats in seconds to create portals for. Set to 0 to create portals for all chats in sync payload.
- max_age: 604800
# Enable backfilling history sync payloads from WhatsApp using batch sending?
# This requires a server with MSC2716 support, which is currently an experimental feature in synapse.
# It can be enabled by setting experimental_features -> msc2716_enabled to true in homeserver.yaml.
@@ -137,6 +133,52 @@ bridge:
# Should the bridge request a full sync from the phone when logging in?
# This bumps the size of history syncs from 3 months to 1 year.
request_full_sync: false
+ # The maximum number of initial conversations that should be synced.
+ # Other conversations will be backfilled on demand when the start PM
+ # provisioning endpoint is used or when a message comes in from that
+ # chat.
+ max_initial_conversations: 10
+ # Settings for immediate backfills. These backfills should generally be
+ # small and their main purpose is to populate each of the initial chats
+ # (as configured by max_initial_conversations) with a few messages so
+ # that you can continue conversations without loosing context.
+ immediate:
+ # The number of concurrent backfill workers to create for immediate
+ # backfills. Note that using more than one worker could cause the
+ # room list to jump around since there are no guarantees about the
+ # order in which the backfills will complete.
+ worker_count: 1
+ # The maximum number of events to backfill initially.
+ max_events: 10
+ # Settings for deferred backfills. The purpose of these backfills are
+ # to fill in the rest of the chat history that was not covered by the
+ # immediate backfills. These backfills generally should happen at a
+ # slower pace so as not to overload the homeserver.
+ # Each deferred backfill config should define a "stage" of backfill
+ # (i.e. the last week of messages). The fields are as follows:
+ # - start_days_ago: the number of days ago to start backfilling from.
+ # To indicate the start of time, use -1. For example, for a week ago,
+ # use 7.
+ # - max_batch_events: the number of events to send per batch.
+ # - batch_delay: the number of seconds to wait before backfilling each
+ # batch.
+ deferred:
+ # Last Week
+ - start_days_ago: 7
+ max_batch_events: 20
+ batch_delay: 5
+ # Last Month
+ - start_days_ago: 30
+ max_batch_events: 50
+ batch_delay: 10
+ # Last 3 months
+ - start_days_ago: 90
+ max_batch_events: 100
+ batch_delay: 10
+ # The start of time
+ - start_days_ago: -1
+ max_batch_events: 500
+ batch_delay: 10
# Should puppet avatars be fetched from the server even if an avatar is already set?
user_avatar_sync: true
# Should Matrix users leaving groups be bridged to WhatsApp?
diff --git a/historysync.go b/historysync.go
index 5fece97..31978fe 100644
--- a/historysync.go
+++ b/historysync.go
@@ -18,8 +18,6 @@ package main
import (
"fmt"
- "sort"
- "sync"
"time"
waProto "go.mau.fi/whatsmeow/binary/proto"
@@ -35,12 +33,6 @@ import (
// region User history sync handling
-type portalToBackfill struct {
- portal *Portal
- conv *waProto.Conversation
- msgs []*waProto.WebMessageInfo
-}
-
type wrappedInfo struct {
*types.MessageInfo
Type database.MessageType
@@ -50,107 +42,81 @@ type wrappedInfo struct {
ExpiresIn uint32
}
-type conversationList []*waProto.Conversation
-
-var _ sort.Interface = (conversationList)(nil)
-
-func (c conversationList) Len() int {
- return len(c)
-}
-
-func (c conversationList) Less(i, j int) bool {
- return getConversationTimestamp(c[i]) < getConversationTimestamp(c[j])
-}
-
-func (c conversationList) Swap(i, j int) {
- c[i], c[j] = c[j], c[i]
-}
-
func (user *User) handleHistorySyncsLoop() {
+ if !user.bridge.Config.Bridge.HistorySync.Backfill {
+ return
+ }
+
+ reCheckQueue := make(chan bool, 1)
+ // Start the backfill queue.
+ user.BackfillQueue = &BackfillQueue{
+ BackfillQuery: user.bridge.DB.BackfillQuery,
+ ImmediateBackfillRequests: make(chan *database.Backfill, 1),
+ DeferredBackfillRequests: make(chan *database.Backfill, 1),
+ ReCheckQueue: make(chan bool, 1),
+ log: user.log.Sub("BackfillQueue"),
+ }
+ reCheckQueue = user.BackfillQueue.ReCheckQueue
+
+ // Immediate backfills can be done in parallel
+ for i := 0; i < user.bridge.Config.Bridge.HistorySync.Immediate.WorkerCount; i++ {
+ go user.handleBackfillRequestsLoop(user.BackfillQueue.ImmediateBackfillRequests)
+ }
+
+ // Deferred backfills should be handled synchronously so as not to
+ // overload the homeserver. Users can configure their backfill stages
+ // to be more or less aggressive with backfilling at this stage.
+ go user.handleBackfillRequestsLoop(user.BackfillQueue.DeferredBackfillRequests)
+ go user.BackfillQueue.RunLoops(user)
+
+ // Always save the history syncs for the user. If they want to enable
+ // backfilling in the future, we will have it in the database.
for evt := range user.historySyncs {
- go user.sendBridgeState(BridgeState{StateEvent: StateBackfilling})
- user.handleHistorySync(evt.Data)
- if len(user.historySyncs) == 0 && user.IsConnected() {
- go user.sendBridgeState(BridgeState{StateEvent: StateConnected})
- }
+ user.handleHistorySync(reCheckQueue, evt.Data)
}
}
-func (user *User) handleHistorySync(evt *waProto.HistorySync) {
- if evt == nil || evt.SyncType == nil || evt.GetSyncType() == waProto.HistorySync_INITIAL_STATUS_V3 || evt.GetSyncType() == waProto.HistorySync_PUSH_NAME {
- return
- }
- description := fmt.Sprintf("type %s, %d conversations, chunk order %d, progress %d%%", evt.GetSyncType(), len(evt.GetConversations()), evt.GetChunkOrder(), evt.GetProgress())
- user.log.Infoln("Handling history sync with", description)
-
- conversations := conversationList(evt.GetConversations())
- // We want to handle recent conversations first
- sort.Sort(sort.Reverse(conversations))
- portalsToBackfill := make(chan portalToBackfill, len(conversations))
-
- var backfillWait sync.WaitGroup
- backfillWait.Add(1)
- go user.backfillLoop(portalsToBackfill, backfillWait.Done)
- for _, conv := range conversations {
- user.handleHistorySyncConversation(conv, portalsToBackfill)
- }
- close(portalsToBackfill)
- backfillWait.Wait()
- user.log.Infoln("Finished handling history sync with", description)
-}
-
-func (user *User) backfillLoop(ch chan portalToBackfill, done func()) {
- defer done()
- for ptb := range ch {
- if len(ptb.msgs) > 0 {
- user.log.Debugln("Bridging history sync payload for", ptb.portal.Key.JID)
- ptb.portal.backfill(user, ptb.msgs)
- } else {
- user.log.Debugfln("Not backfilling %s: no bridgeable messages found", ptb.portal.Key.JID)
+func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Backfill) {
+ for req := range backfillRequests {
+ user.log.Infof("Backfill request: %v", req)
+ conv := user.bridge.DB.HistorySyncQuery.GetConversation(user.MXID, req.Portal)
+ if conv == nil {
+ user.log.Errorf("Could not find conversation for %s in %s", user.MXID, req.Portal.String())
+ continue
}
- if !ptb.conv.GetMarkedAsUnread() && ptb.conv.GetUnreadCount() == 0 {
- user.markSelfReadFull(ptb.portal)
+
+ // Update the client store with basic chat settings.
+ if conv.MuteEndTime.After(time.Now()) {
+ user.Client.Store.ChatSettings.PutMutedUntil(conv.PortalKey.JID, conv.MuteEndTime)
}
+ if conv.Archived {
+ user.Client.Store.ChatSettings.PutArchived(conv.PortalKey.JID, true)
+ }
+ if conv.Pinned > 0 {
+ user.Client.Store.ChatSettings.PutPinned(conv.PortalKey.JID, true)
+ }
+
+ portal := user.GetPortalByJID(conv.PortalKey.JID)
+ if conv.EphemeralExpiration != nil && portal.ExpirationTime != *conv.EphemeralExpiration {
+ portal.ExpirationTime = *conv.EphemeralExpiration
+ portal.Update()
+ }
+
+ user.createOrUpdatePortalAndBackfillWithLock(req, conv, portal)
}
}
-func (user *User) handleHistorySyncConversation(conv *waProto.Conversation, portalsToBackfill chan portalToBackfill) {
- jid, err := types.ParseJID(conv.GetId())
- if err != nil {
- user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.GetId(), err)
- return
- }
+func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill, conv *database.HistorySyncConversation, portal *Portal) {
+ portal.backfillLock.Lock()
+ defer portal.backfillLock.Unlock()
- // Update the client store with basic chat settings.
- muteEnd := time.Unix(int64(conv.GetMuteEndTime()), 0)
- if muteEnd.After(time.Now()) {
- _ = user.Client.Store.ChatSettings.PutMutedUntil(jid, muteEnd)
- }
- if conv.GetArchived() {
- _ = user.Client.Store.ChatSettings.PutArchived(jid, true)
- }
- if conv.GetPinned() > 0 {
- _ = user.Client.Store.ChatSettings.PutPinned(jid, true)
- }
-
- portal := user.GetPortalByJID(jid)
- if conv.EphemeralExpiration != nil && portal.ExpirationTime != conv.GetEphemeralExpiration() {
- portal.ExpirationTime = conv.GetEphemeralExpiration()
- portal.Update()
- }
- // Check if portal is too old or doesn't contain anything we can bridge.
if !user.shouldCreatePortalForHistorySync(conv, portal) {
return
}
- var msgs []*waProto.WebMessageInfo
- if user.bridge.Config.Bridge.HistorySync.Backfill {
- msgs = filterMessagesToBackfill(conv.GetMessages())
- }
- ptb := portalToBackfill{portal: portal, conv: conv, msgs: msgs}
if len(portal.MXID) == 0 {
user.log.Debugln("Creating portal for", portal.Key.JID, "as part of history sync handling")
- err = portal.CreateMatrixRoom(user, getPartialInfoFromConversation(jid, conv), false)
+ err := portal.CreateMatrixRoom(user, nil, true, false)
if err != nil {
user.log.Warnfln("Failed to create room for %s during backfill: %v", portal.Key.JID, err)
return
@@ -158,10 +124,147 @@ func (user *User) handleHistorySyncConversation(conv *waProto.Conversation, port
} else {
portal.UpdateMatrixRoom(user, nil)
}
- if !user.bridge.Config.Bridge.HistorySync.Backfill {
- user.log.Debugln("Backfill is disabled, not bridging history sync payload for", portal.Key.JID)
+
+ allMsgs := user.bridge.DB.HistorySyncQuery.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, req.TimeEnd, req.MaxTotalEvents)
+
+ if len(allMsgs) > 0 {
+ user.log.Debugf("Backfilling %d messages in %s, %d messages at a time", len(allMsgs), portal.Key.JID, req.MaxBatchEvents)
+ toBackfill := allMsgs[0:]
+ insertionEventIds := []id.EventID{}
+ for {
+ if len(toBackfill) == 0 {
+ break
+ }
+
+ var msgs []*waProto.WebMessageInfo
+ if len(toBackfill) <= req.MaxBatchEvents {
+ msgs = toBackfill
+ toBackfill = toBackfill[0:0]
+ } else {
+ msgs = toBackfill[len(toBackfill)-req.MaxBatchEvents:]
+ toBackfill = toBackfill[:len(toBackfill)-req.MaxBatchEvents]
+ }
+
+ if len(msgs) > 0 {
+ time.Sleep(time.Duration(req.BatchDelay) * time.Second)
+ user.log.Debugf("Backfilling %d messages in %s", len(msgs), portal.Key.JID)
+ insertionEventIds = append(insertionEventIds, portal.backfill(user, msgs)...)
+ }
+ }
+ user.log.Debugf("Finished backfilling %d messages in %s", len(allMsgs), portal.Key.JID)
+ if len(insertionEventIds) > 0 {
+ portal.sendPostBackfillDummy(
+ time.Unix(int64(allMsgs[len(allMsgs)-1].GetMessageTimestamp()), 0),
+ insertionEventIds[0])
+ }
+ user.log.Debugf("Deleting %d history sync messages after backfilling", len(allMsgs))
+ err := user.bridge.DB.HistorySyncQuery.DeleteMessages(user.MXID, conv.ConversationID, allMsgs)
+ if err != nil {
+ user.log.Warnf("Failed to delete %d history sync messages after backfilling: %v", len(allMsgs), err)
+ }
} else {
- portalsToBackfill <- ptb
+ user.log.Debugfln("Not backfilling %s: no bridgeable messages found", portal.Key.JID)
+ }
+ if !conv.MarkedAsUnread && conv.UnreadCount == 0 {
+ user.markSelfReadFull(portal)
+ }
+}
+
+func (user *User) shouldCreatePortalForHistorySync(conv *database.HistorySyncConversation, portal *Portal) bool {
+ if len(portal.MXID) > 0 {
+ user.log.Debugfln("Portal for %s already exists, ensuring user is invited", portal.Key.JID)
+ portal.ensureUserInvited(user)
+ // Portal exists, let backfill continue
+ return true
+ } else if !user.bridge.Config.Bridge.HistorySync.CreatePortals {
+ user.log.Debugfln("Not creating portal for %s: creating rooms from history sync is disabled", portal.Key.JID)
+ } else {
+ // Portal doesn't exist, but should be created
+ return true
+ }
+ // Portal shouldn't be created, reason logged above
+ return false
+}
+
+func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.HistorySync) {
+ if evt == nil || evt.SyncType == nil || evt.GetSyncType() == waProto.HistorySync_INITIAL_STATUS_V3 || evt.GetSyncType() == waProto.HistorySync_PUSH_NAME {
+ return
+ }
+ description := fmt.Sprintf("type %s, %d conversations, chunk order %d", evt.GetSyncType(), len(evt.GetConversations()), evt.GetChunkOrder())
+ user.log.Infoln("Storing history sync with", description)
+
+ for _, conv := range evt.GetConversations() {
+ jid, err := types.ParseJID(conv.GetId())
+ if err != nil {
+ user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.GetId(), err)
+ continue
+ }
+ portal := user.GetPortalByJID(jid)
+
+ historySyncConversation := user.bridge.DB.HistorySyncQuery.NewConversationWithValues(
+ user.MXID,
+ conv.GetId(),
+ &portal.Key,
+ getConversationTimestamp(conv),
+ conv.GetMuteEndTime(),
+ conv.GetArchived(),
+ conv.GetPinned(),
+ conv.GetDisappearingMode().GetInitiator(),
+ conv.GetEndOfHistoryTransferType(),
+ conv.EphemeralExpiration,
+ conv.GetMarkedAsUnread(),
+ conv.GetUnreadCount())
+ historySyncConversation.Upsert()
+
+ for _, msg := range conv.GetMessages() {
+ // Don't store messages that will just be skipped.
+ wmi := msg.GetMessage()
+ msgType := getMessageType(wmi.GetMessage())
+ if msgType == "unknown" || msgType == "ignore" || msgType == "unknown_protocol" {
+ continue
+ }
+
+ // Don't store unsupported messages.
+ if !containsSupportedMessage(msg.GetMessage().GetMessage()) {
+ continue
+ }
+
+ message, err := user.bridge.DB.HistorySyncQuery.NewMessageWithValues(user.MXID, conv.GetId(), msg.Message.GetKey().GetId(), msg)
+ if err != nil {
+ user.log.Warnf("Failed to save message %s in %s. Error: %+v", msg.Message.Key.Id, conv.GetId(), err)
+ continue
+ }
+ message.Insert()
+ }
+ }
+
+ // If this was the initial bootstrap, enqueue immediate backfills for the
+ // most recent portals. If it's the last history sync event, start
+ // backfilling the rest of the history of the portals.
+ if user.bridge.Config.Bridge.HistorySync.Backfill && (evt.GetSyncType() == waProto.HistorySync_INITIAL_BOOTSTRAP || evt.GetSyncType() == waProto.HistorySync_FULL || evt.GetSyncType() == waProto.HistorySync_RECENT) {
+ nMostRecent := user.bridge.DB.HistorySyncQuery.GetNMostRecentConversations(user.MXID, user.bridge.Config.Bridge.HistorySync.MaxInitialConversations)
+ for i, conv := range nMostRecent {
+ jid, err := types.ParseJID(conv.ConversationID)
+ if err != nil {
+ user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.ConversationID, err)
+ continue
+ }
+ portal := user.GetPortalByJID(jid)
+
+ switch evt.GetSyncType() {
+ case waProto.HistorySync_INITIAL_BOOTSTRAP:
+ // Enqueue immediate backfills for the most recent messages first.
+ user.EnqueueImmedateBackfill(portal, i)
+ case waProto.HistorySync_FULL, waProto.HistorySync_RECENT:
+ if evt.GetProgress() >= 99 {
+ // Enqueue deferred backfills as configured.
+ user.EnqueueDeferredBackfills(portal, len(nMostRecent), i)
+ }
+ }
+ }
+
+ // Tell the queue to check for new backfill requests.
+ reCheckQueue <- true
}
}
@@ -173,71 +276,22 @@ func getConversationTimestamp(conv *waProto.Conversation) uint64 {
return convTs
}
-func (user *User) shouldCreatePortalForHistorySync(conv *waProto.Conversation, portal *Portal) bool {
- maxAge := user.bridge.Config.Bridge.HistorySync.MaxAge
- minLastMsgToCreate := time.Now().Add(-time.Duration(maxAge) * time.Second)
- lastMsg := time.Unix(int64(getConversationTimestamp(conv)), 0)
-
- if len(portal.MXID) > 0 {
- user.log.Debugfln("Portal for %s already exists, ensuring user is invited", portal.Key.JID)
- portal.ensureUserInvited(user)
- // Portal exists, let backfill continue
- return true
- } else if !user.bridge.Config.Bridge.HistorySync.CreatePortals {
- user.log.Debugfln("Not creating portal for %s: creating rooms from history sync is disabled", portal.Key.JID)
- } else if !containsSupportedMessages(conv) {
- user.log.Debugfln("Not creating portal for %s: no interesting messages found", portal.Key.JID)
- } else if maxAge > 0 && !lastMsg.After(minLastMsgToCreate) {
- user.log.Debugfln("Not creating portal for %s: last message older than limit (%s)", portal.Key.JID, lastMsg)
- } else {
- // Portal doesn't exist, but should be created
- return true
- }
- // Portal shouldn't be created, reason logged above
- return false
+func (user *User) EnqueueImmedateBackfill(portal *Portal, priority int) {
+ maxMessages := user.bridge.Config.Bridge.HistorySync.Immediate.MaxEvents
+ initialBackfill := user.bridge.DB.BackfillQuery.NewWithValues(user.MXID, database.BackfillImmediate, priority, &portal.Key, nil, nil, maxMessages, maxMessages, 0)
+ initialBackfill.Insert()
}
-func filterMessagesToBackfill(messages []*waProto.HistorySyncMsg) []*waProto.WebMessageInfo {
- filtered := make([]*waProto.WebMessageInfo, 0, len(messages))
- for _, msg := range messages {
- wmi := msg.GetMessage()
- msgType := getMessageType(wmi.GetMessage())
- if msgType == "unknown" || msgType == "ignore" || msgType == "unknown_protocol" {
- continue
- } else {
- filtered = append(filtered, wmi)
+func (user *User) EnqueueDeferredBackfills(portal *Portal, numConversations, priority int) {
+ for j, backfillStage := range user.bridge.Config.Bridge.HistorySync.Deferred {
+ var startDate *time.Time = nil
+ if backfillStage.StartDaysAgo > 0 {
+ startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo)
+ startDate = &startDaysAgo
}
- }
- return filtered
-}
-
-func containsSupportedMessages(conv *waProto.Conversation) bool {
- for _, msg := range conv.GetMessages() {
- if containsSupportedMessage(msg.GetMessage().GetMessage()) {
- return true
- }
- }
- return false
-}
-
-func getPartialInfoFromConversation(jid types.JID, conv *waProto.Conversation) *types.GroupInfo {
- // TODO broadcast list info?
- if jid.Server != types.GroupServer {
- return nil
- }
- participants := make([]types.GroupParticipant, len(conv.GetParticipant()))
- for i, pcp := range conv.GetParticipant() {
- participantJID, _ := types.ParseJID(pcp.GetUserJid())
- participants[i] = types.GroupParticipant{
- JID: participantJID,
- IsAdmin: pcp.GetRank() == waProto.GroupParticipant_ADMIN,
- IsSuperAdmin: pcp.GetRank() == waProto.GroupParticipant_SUPERADMIN,
- }
- }
- return &types.GroupInfo{
- JID: jid,
- GroupName: types.GroupName{Name: conv.GetName()},
- Participants: participants,
+ backfill := user.bridge.DB.BackfillQuery.NewWithValues(
+ user.MXID, database.BackfillDeferred, j*numConversations+priority, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay)
+ backfill.Insert()
}
}
@@ -246,14 +300,15 @@ func getPartialInfoFromConversation(jid types.JID, conv *waProto.Conversation) *
var (
PortalCreationDummyEvent = event.Type{Type: "fi.mau.dummy.portal_created", Class: event.MessageEventType}
- BackfillEndDummyEvent = event.Type{Type: "fi.mau.dummy.backfill_end", Class: event.MessageEventType}
PreBackfillDummyEvent = event.Type{Type: "fi.mau.dummy.pre_backfill", Class: event.MessageEventType}
+
+ // Marker events for when a backfill finishes
+ BackfillEndDummyEvent = event.Type{Type: "fi.mau.dummy.backfill_end", Class: event.MessageEventType}
+ RoomMarker = event.Type{Type: "m.room.marker", Class: event.MessageEventType}
+ MSC2716Marker = event.Type{Type: "org.matrix.msc2716.marker", Class: event.MessageEventType}
)
-func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) {
- portal.backfillLock.Lock()
- defer portal.backfillLock.Unlock()
-
+func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) []id.EventID {
var historyBatch, newBatch mautrix.ReqBatchSend
var historyBatchInfos, newBatchInfos []*wrappedInfo
@@ -375,32 +430,33 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo)
}
}
+ insertionEventIds := []id.EventID{}
+
if len(historyBatch.Events) > 0 && len(historyBatch.PrevEventID) > 0 {
portal.log.Infofln("Sending %d historical messages...", len(historyBatch.Events))
historyResp, err := portal.MainIntent().BatchSend(portal.MXID, &historyBatch)
+ insertionEventIds = append(insertionEventIds, historyResp.BaseInsertionEventID)
if err != nil {
portal.log.Errorln("Error sending batch of historical messages:", err)
} else {
portal.finishBatch(historyResp.EventIDs, historyBatchInfos)
portal.NextBatchID = historyResp.NextBatchID
portal.Update()
- // If batchID is non-empty, it means this is backfilling very old messages, and we don't need a post-backfill dummy.
- if historyBatch.BatchID == "" {
- portal.sendPostBackfillDummy(time.UnixMilli(historyBatch.Events[len(historyBatch.Events)-1].Timestamp))
- }
}
}
if len(newBatch.Events) > 0 && len(newBatch.PrevEventID) > 0 {
portal.log.Infofln("Sending %d new messages...", len(newBatch.Events))
newResp, err := portal.MainIntent().BatchSend(portal.MXID, &newBatch)
+ insertionEventIds = append(insertionEventIds, newResp.BaseInsertionEventID)
if err != nil {
portal.log.Errorln("Error sending batch of new messages:", err)
} else {
portal.finishBatch(newResp.EventIDs, newBatchInfos)
- portal.sendPostBackfillDummy(time.UnixMilli(newBatch.Events[len(newBatch.Events)-1].Timestamp))
}
}
+
+ return insertionEventIds
}
func (portal *Portal) parseWebMessageInfo(source *User, webMsg *waProto.WebMessageInfo) *types.MessageInfo {
@@ -478,6 +534,15 @@ func (portal *Portal) wrapBatchEvent(info *types.MessageInfo, intent *appservice
if err != nil {
return nil, err
}
+
+ if eventType == event.EventEncrypted {
+ // Clear other custom keys if the event was encrypted, but keep the double puppet identifier
+ wrappedContent.Raw = map[string]interface{}{backfillIDField: info.ID}
+ if intent.IsCustomPuppet {
+ wrappedContent.Raw[doublePuppetKey] = doublePuppetValue
+ }
+ }
+
return &event.Event{
Sender: intent.UserID,
Type: newEventType,
@@ -530,19 +595,25 @@ func (portal *Portal) finishBatchEvt(info *wrappedInfo, eventID id.EventID) {
}
}
-func (portal *Portal) sendPostBackfillDummy(lastTimestamp time.Time) {
- resp, err := portal.MainIntent().SendMessageEvent(portal.MXID, BackfillEndDummyEvent, struct{}{})
- if err != nil {
- portal.log.Errorln("Error sending post-backfill dummy event:", err)
- return
+func (portal *Portal) sendPostBackfillDummy(lastTimestamp time.Time, insertionEventId id.EventID) {
+ for _, evtType := range []event.Type{BackfillEndDummyEvent, RoomMarker, MSC2716Marker} {
+ resp, err := portal.MainIntent().SendMessageEvent(portal.MXID, evtType, map[string]interface{}{
+ "org.matrix.msc2716.marker.insertion": insertionEventId,
+ "m.marker.insertion": insertionEventId,
+ })
+ if err != nil {
+ portal.log.Errorln("Error sending post-backfill dummy event:", err)
+ return
+ }
+ msg := portal.bridge.DB.Message.New()
+ msg.Chat = portal.Key
+ msg.MXID = resp.EventID
+ msg.JID = types.MessageID(resp.EventID)
+ msg.Timestamp = lastTimestamp.Add(1 * time.Second)
+ msg.Sent = true
+ msg.Insert()
+
}
- msg := portal.bridge.DB.Message.New()
- msg.Chat = portal.Key
- msg.MXID = resp.EventID
- msg.JID = types.MessageID(resp.EventID)
- msg.Timestamp = lastTimestamp.Add(1 * time.Second)
- msg.Sent = true
- msg.Insert()
}
// endregion
diff --git a/portal.go b/portal.go
index a5ed631..55b786d 100644
--- a/portal.go
+++ b/portal.go
@@ -237,7 +237,7 @@ func (portal *Portal) handleMessageLoopItem(msg PortalMessage) {
return
}
portal.log.Debugln("Creating Matrix room from incoming message")
- err := portal.CreateMatrixRoom(msg.source, nil, false)
+ err := portal.CreateMatrixRoom(msg.source, nil, false, true)
if err != nil {
portal.log.Errorln("Failed to create portal room:", err)
return
@@ -1164,7 +1164,7 @@ func (portal *Portal) UpdateBridgeInfo() {
}
}
-func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, isFullInfo bool) error {
+func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, isFullInfo, backfill bool) error {
portal.roomCreateLock.Lock()
defer portal.roomCreateLock.Unlock()
if len(portal.MXID) > 0 {
@@ -1337,6 +1337,12 @@ func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, i
portal.FirstEventID = firstEventResp.EventID
portal.Update()
}
+
+ if user.bridge.Config.Bridge.HistorySync.Backfill && backfill {
+ user.EnqueueImmedateBackfill(portal, 0)
+ user.EnqueueDeferredBackfills(portal, 1, 0)
+ user.BackfillQueue.ReCheckQueue <- true
+ }
return nil
}
@@ -1895,7 +1901,7 @@ func shallowCopyMap(data map[string]interface{}) map[string]interface{} {
return newMap
}
-func (portal *Portal) makeMediaBridgeFailureMessage(info *types.MessageInfo, bridgeErr error, converted *ConvertedMessage, keys *FailedMediaKeys) *ConvertedMessage {
+func (portal *Portal) makeMediaBridgeFailureMessage(info *types.MessageInfo, bridgeErr error, converted *ConvertedMessage, keys *FailedMediaKeys, userFriendlyError string) *ConvertedMessage {
portal.log.Errorfln("Failed to bridge media for %s: %v", info.ID, bridgeErr)
if keys != nil {
meta := &FailedMediaMeta{
@@ -1908,9 +1914,13 @@ func (portal *Portal) makeMediaBridgeFailureMessage(info *types.MessageInfo, bri
portal.mediaErrorCache[info.ID] = meta
}
converted.Type = event.EventMessage
+ body := userFriendlyError
+ if body == "" {
+ body = fmt.Sprintf("Failed to bridge media: %v", bridgeErr)
+ }
converted.Content = &event.MessageEventContent{
MsgType: event.MsgNotice,
- Body: fmt.Sprintf("Failed to bridge media: %v", bridgeErr),
+ Body: body,
}
return converted
}
@@ -2159,24 +2169,24 @@ func (portal *Portal) convertMediaMessage(intent *appservice.IntentAPI, source *
Type: whatsmeow.GetMediaType(msg),
SHA256: msg.GetFileSha256(),
EncSHA256: msg.GetFileEncSha256(),
- })
+ }, "Old photo or attachment. This will sync in a future update.")
} else if errors.Is(err, whatsmeow.ErrNoURLPresent) {
portal.log.Debugfln("No URL present error for media message %s, ignoring...", info.ID)
return nil
} else if errors.Is(err, whatsmeow.ErrFileLengthMismatch) || errors.Is(err, whatsmeow.ErrInvalidMediaSHA256) {
portal.log.Warnfln("Mismatching media checksums in %s: %v. Ignoring because WhatsApp seems to ignore them too", info.ID, err)
} else if err != nil {
- return portal.makeMediaBridgeFailureMessage(info, err, converted, nil)
+ return portal.makeMediaBridgeFailureMessage(info, err, converted, nil, "")
}
err = portal.uploadMedia(intent, data, converted.Content)
if err != nil {
if errors.Is(err, mautrix.MTooLarge) {
- return portal.makeMediaBridgeFailureMessage(info, errors.New("homeserver rejected too large file"), converted, nil)
+ return portal.makeMediaBridgeFailureMessage(info, errors.New("homeserver rejected too large file"), converted, nil, "")
} else if httpErr, ok := err.(mautrix.HTTPError); ok && httpErr.IsStatus(413) {
- return portal.makeMediaBridgeFailureMessage(info, errors.New("proxy rejected too large file"), converted, nil)
+ return portal.makeMediaBridgeFailureMessage(info, errors.New("proxy rejected too large file"), converted, nil, "")
} else {
- return portal.makeMediaBridgeFailureMessage(info, fmt.Errorf("failed to upload media: %w", err), converted, nil)
+ return portal.makeMediaBridgeFailureMessage(info, fmt.Errorf("failed to upload media: %w", err), converted, nil, "")
}
}
return converted
diff --git a/provisioning.go b/provisioning.go
index 622f4c4..114e7cc 100644
--- a/provisioning.go
+++ b/provisioning.go
@@ -346,7 +346,7 @@ func (prov *ProvisioningAPI) OpenGroup(w http.ResponseWriter, r *http.Request) {
portal := user.GetPortalByJID(info.JID)
status := http.StatusOK
if len(portal.MXID) == 0 {
- err = portal.CreateMatrixRoom(user, info, true)
+ err = portal.CreateMatrixRoom(user, info, true, true)
if err != nil {
jsonResponse(w, http.StatusInternalServerError, Error{
Error: fmt.Sprintf("Failed to create portal: %v", err),
diff --git a/user.go b/user.go
index 7b08a9a..46dbefa 100644
--- a/user.go
+++ b/user.go
@@ -74,6 +74,8 @@ type User struct {
groupListCache []*types.GroupInfo
groupListCacheLock sync.Mutex
groupListCacheTime time.Time
+
+ BackfillQueue *BackfillQueue
}
func (bridge *Bridge) getUserByMXID(userID id.UserID, onlyIfExists bool) *User {
@@ -186,7 +188,9 @@ func (bridge *Bridge) NewUser(dbUser *database.User) *User {
user.RelayWhitelisted = user.bridge.Config.Bridge.Permissions.IsRelayWhitelisted(user.MXID)
user.Whitelisted = user.bridge.Config.Bridge.Permissions.IsWhitelisted(user.MXID)
user.Admin = user.bridge.Config.Bridge.Permissions.IsAdmin(user.MXID)
- go user.handleHistorySyncsLoop()
+ if user.bridge.Config.Bridge.HistorySync.Backfill {
+ go user.handleHistorySyncsLoop()
+ }
return user
}
@@ -410,6 +414,11 @@ func (user *User) DeleteSession() {
user.JID = types.EmptyJID
user.Update()
}
+
+ // Delete all of the backfill and history sync data.
+ user.bridge.DB.BackfillQuery.DeleteAll(user.MXID)
+ user.bridge.DB.HistorySyncQuery.DeleteAllConversations(user.MXID)
+ user.bridge.DB.HistorySyncQuery.DeleteAllMessages(user.MXID)
}
func (user *User) IsConnected() bool {
@@ -685,7 +694,9 @@ func (user *User) HandleEvent(event interface{}) {
portal := user.GetPortalByMessageSource(v.Info.MessageSource)
portal.messages <- PortalMessage{undecryptable: v, source: user}
case *events.HistorySync:
- user.historySyncs <- v
+ if user.bridge.Config.Bridge.HistorySync.Backfill {
+ user.historySyncs <- v
+ }
case *events.Mute:
portal := user.GetPortalByJID(v.JID)
if portal != nil {
@@ -942,7 +953,7 @@ func (user *User) ResyncGroups(createPortals bool) error {
portal := user.GetPortalByJID(group.JID)
if len(portal.MXID) == 0 {
if createPortals {
- err = portal.CreateMatrixRoom(user, group, true)
+ err = portal.CreateMatrixRoom(user, group, true, true)
if err != nil {
return fmt.Errorf("failed to create room for %s: %w", group.JID, err)
}
@@ -1025,7 +1036,7 @@ func (user *User) markSelfReadFull(portal *Portal) {
func (user *User) handleGroupCreate(evt *events.JoinedGroup) {
portal := user.GetPortalByJID(evt.JID)
if len(portal.MXID) == 0 {
- err := portal.CreateMatrixRoom(user, &evt.GroupInfo, true)
+ err := portal.CreateMatrixRoom(user, &evt.GroupInfo, true, true)
if err != nil {
user.log.Errorln("Failed to create Matrix room after join notification: %v", err)
}
@@ -1093,7 +1104,7 @@ func (user *User) StartPM(jid types.JID, reason string) (*Portal, *Puppet, bool,
return portal, puppet, false, nil
}
}
- err := portal.CreateMatrixRoom(user, nil, false)
+ err := portal.CreateMatrixRoom(user, nil, false, true)
return portal, puppet, true, err
}