Add non-MSC2716 backfill support

This commit is contained in:
Tulir Asokan 2023-06-19 02:10:43 +03:00
parent 8ab06edaca
commit 2d33bb1673
9 changed files with 130 additions and 53 deletions

View file

@ -1,3 +1,8 @@
# v0.9.0 (unreleased)
* Removed MSC2716 support.
* Added legacy backfill support.
# v0.8.6 (2023-06-16) # v0.8.6 (2023-06-16)
* Implemented intentional mentions for outgoing messages. * Implemented intentional mentions for outgoing messages.

View file

@ -68,6 +68,7 @@ type BridgeConfig struct {
StorageQuota uint32 `yaml:"storage_quota_mb"` StorageQuota uint32 `yaml:"storage_quota_mb"`
} }
MaxInitialConversations int `yaml:"max_initial_conversations"` MaxInitialConversations int `yaml:"max_initial_conversations"`
MessageCount int `yaml:"message_count"`
UnreadHoursThreshold int `yaml:"unread_hours_threshold"` UnreadHoursThreshold int `yaml:"unread_hours_threshold"`
Immediate struct { Immediate struct {

View file

@ -56,6 +56,7 @@ func DoUpgrade(helper *up.Helper) {
helper.Copy(up.Str, "bridge", "history_sync", "media_requests", "request_method") helper.Copy(up.Str, "bridge", "history_sync", "media_requests", "request_method")
helper.Copy(up.Int, "bridge", "history_sync", "media_requests", "request_local_time") helper.Copy(up.Int, "bridge", "history_sync", "media_requests", "request_local_time")
helper.Copy(up.Int, "bridge", "history_sync", "max_initial_conversations") helper.Copy(up.Int, "bridge", "history_sync", "max_initial_conversations")
helper.Copy(up.Int, "bridge", "history_sync", "message_count")
helper.Copy(up.Int, "bridge", "history_sync", "unread_hours_threshold") helper.Copy(up.Int, "bridge", "history_sync", "unread_hours_threshold")
helper.Copy(up.Int, "bridge", "history_sync", "immediate", "worker_count") helper.Copy(up.Int, "bridge", "history_sync", "immediate", "worker_count")
helper.Copy(up.Int, "bridge", "history_sync", "immediate", "max_events") helper.Copy(up.Int, "bridge", "history_sync", "immediate", "max_events")

View file

@ -127,12 +127,8 @@ bridge:
portal_message_buffer: 128 portal_message_buffer: 128
# Settings for handling history sync payloads. # Settings for handling history sync payloads.
history_sync: history_sync:
# Enable backfilling history sync payloads from WhatsApp using batch sending? # Enable backfilling history sync payloads from WhatsApp?
# This requires a server with MSC2716 support, which is currently an experimental feature in synapse. backfill: true
# It can be enabled by setting experimental_features -> msc2716_enabled to true in homeserver.yaml.
# Note that prior to Synapse 1.49, there were some bugs with the implementation, especially if using event persistence workers.
# There are also still some issues in Synapse's federation implementation.
backfill: false
# Should the bridge create portals for chats in the history sync payload? # Should the bridge create portals for chats in the history sync payload?
# This has no effect unless backfill is enabled. # This has no effect unless backfill is enabled.
create_portals: true create_portals: true
@ -171,16 +167,22 @@ bridge:
# be sent (in minutes after midnight)? # be sent (in minutes after midnight)?
request_local_time: 120 request_local_time: 120
# The maximum number of initial conversations that should be synced. # The maximum number of initial conversations that should be synced.
# Other conversations will be backfilled on demand when the start PM # Other conversations will be backfilled on demand when receiving a message or when initiating a direct chat.
# provisioning endpoint is used or when a message comes in from that
# chat.
max_initial_conversations: -1 max_initial_conversations: -1
# Number of messages to backfill in each conversation
message_count: 50
# If this value is greater than 0, then if the conversation's last # If this value is greater than 0, then if the conversation's last
# message was more than this number of hours ago, then the conversation # message was more than this number of hours ago, then the conversation
# will automatically be marked it as read. # will automatically be marked it as read.
# Conversations that have a last message that is less than this number # Conversations that have a last message that is less than this number
# of hours ago will have their unread status synced from WhatsApp. # of hours ago will have their unread status synced from WhatsApp.
unread_hours_threshold: 0 unread_hours_threshold: 0
###############################################################################
# The settings below are only applicable for backfilling using batch sending, #
# which is no longer supported in Synapse. #
###############################################################################
# Settings for immediate backfills. These backfills should generally be # Settings for immediate backfills. These backfills should generally be
# small and their main purpose is to populate each of the initial chats # small and their main purpose is to populate each of the initial chats
# (as configured by max_initial_conversations) with a few messages so # (as configured by max_initial_conversations) with a few messages so
@ -220,6 +222,7 @@ bridge:
- start_days_ago: -1 - start_days_ago: -1
max_batch_events: 500 max_batch_events: 500
batch_delay: 10 batch_delay: 10
# Should puppet avatars be fetched from the server even if an avatar is already set? # Should puppet avatars be fetched from the server even if an avatar is already set?
user_avatar_sync: true user_avatar_sync: true
# Should Matrix users leaving groups be bridged to WhatsApp? # Should Matrix users leaving groups be bridged to WhatsApp?

2
go.mod
View file

@ -18,7 +18,7 @@ require (
golang.org/x/net v0.11.0 golang.org/x/net v0.11.0
google.golang.org/protobuf v1.30.0 google.golang.org/protobuf v1.30.0
maunium.net/go/maulogger/v2 v2.4.1 maunium.net/go/maulogger/v2 v2.4.1
maunium.net/go/mautrix v0.15.3 maunium.net/go/mautrix v0.15.4-0.20230618223441-8d500be4cbb2
) )
require ( require (

4
go.sum
View file

@ -131,5 +131,5 @@ maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M=
maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA= maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA=
maunium.net/go/maulogger/v2 v2.4.1 h1:N7zSdd0mZkB2m2JtFUsiGTQQAdP0YeFWT7YMc80yAL8= maunium.net/go/maulogger/v2 v2.4.1 h1:N7zSdd0mZkB2m2JtFUsiGTQQAdP0YeFWT7YMc80yAL8=
maunium.net/go/maulogger/v2 v2.4.1/go.mod h1:omPuYwYBILeVQobz8uO3XC8DIRuEb5rXYlQSuqrbCho= maunium.net/go/maulogger/v2 v2.4.1/go.mod h1:omPuYwYBILeVQobz8uO3XC8DIRuEb5rXYlQSuqrbCho=
maunium.net/go/mautrix v0.15.3 h1:C9BHSUM0gYbuZmAtopuLjIcH5XHLb/ZjTEz7nN+0jN0= maunium.net/go/mautrix v0.15.4-0.20230618223441-8d500be4cbb2 h1:a7xytO4aYZchg/j8vFqYFkxS75eIgJwMsSsqY7FO6kg=
maunium.net/go/mautrix v0.15.3/go.mod h1:zLrQqdxJlLkurRCozTc9CL6FySkgZlO/kpCYxBILSLE= maunium.net/go/mautrix v0.15.4-0.20230618223441-8d500be4cbb2/go.mod h1:zLrQqdxJlLkurRCozTc9CL6FySkgZlO/kpCYxBILSLE=

View file

@ -60,25 +60,28 @@ func (user *User) handleHistorySyncsLoop() {
return return
} }
// Start the backfill queue. batchSend := user.bridge.SpecVersions.Supports(mautrix.BeeperFeatureBatchSending)
user.BackfillQueue = &BackfillQueue{ if batchSend {
BackfillQuery: user.bridge.DB.Backfill, // Start the backfill queue.
reCheckChannels: []chan bool{}, user.BackfillQueue = &BackfillQueue{
log: user.log.Sub("BackfillQueue"), BackfillQuery: user.bridge.DB.Backfill,
reCheckChannels: []chan bool{},
log: user.log.Sub("BackfillQueue"),
}
forwardAndImmediate := []database.BackfillType{database.BackfillImmediate, database.BackfillForward}
// Immediate backfills can be done in parallel
for i := 0; i < user.bridge.Config.Bridge.HistorySync.Immediate.WorkerCount; i++ {
go user.HandleBackfillRequestsLoop(forwardAndImmediate, []database.BackfillType{})
}
// Deferred backfills should be handled synchronously so as not to
// overload the homeserver. Users can configure their backfill stages
// to be more or less aggressive with backfilling at this stage.
go user.HandleBackfillRequestsLoop([]database.BackfillType{database.BackfillDeferred}, forwardAndImmediate)
} }
forwardAndImmediate := []database.BackfillType{database.BackfillImmediate, database.BackfillForward}
// Immediate backfills can be done in parallel
for i := 0; i < user.bridge.Config.Bridge.HistorySync.Immediate.WorkerCount; i++ {
go user.HandleBackfillRequestsLoop(forwardAndImmediate, []database.BackfillType{})
}
// Deferred backfills should be handled synchronously so as not to
// overload the homeserver. Users can configure their backfill stages
// to be more or less aggressive with backfilling at this stage.
go user.HandleBackfillRequestsLoop([]database.BackfillType{database.BackfillDeferred}, forwardAndImmediate)
if user.bridge.Config.Bridge.HistorySync.MediaRequests.AutoRequestMedia && if user.bridge.Config.Bridge.HistorySync.MediaRequests.AutoRequestMedia &&
user.bridge.Config.Bridge.HistorySync.MediaRequests.RequestMethod == config.MediaRequestMethodLocalTime { user.bridge.Config.Bridge.HistorySync.MediaRequests.RequestMethod == config.MediaRequestMethodLocalTime {
go user.dailyMediaRequestLoop() go user.dailyMediaRequestLoop()
@ -92,9 +95,13 @@ func (user *User) handleHistorySyncsLoop() {
if evt == nil { if evt == nil {
return return
} }
user.handleHistorySync(user.BackfillQueue, evt.Data) user.storeHistorySync(evt.Data)
case <-user.enqueueBackfillsTimer.C: case <-user.enqueueBackfillsTimer.C:
user.enqueueAllBackfills() if batchSend {
user.enqueueAllBackfills()
} else {
user.backfillAll()
}
} }
} }
} }
@ -125,6 +132,66 @@ func (user *User) enqueueAllBackfills() {
} }
} }
func (user *User) backfillAll() {
conversations := user.bridge.DB.HistorySync.GetNMostRecentConversations(user.MXID, -1)
if len(conversations) > 0 {
user.zlog.Info().
Int("conversation_count", len(conversations)).
Msg("Probably received all history sync blobs, now backfilling conversations")
// Find the portals for all the conversations.
for i, conv := range conversations {
jid, err := types.ParseJID(conv.ConversationID)
if err != nil {
user.zlog.Warn().Err(err).
Str("conversation_id", conv.ConversationID).
Msg("Failed to parse chat JID in history sync")
continue
}
portal := user.GetPortalByJID(jid)
if portal.MXID != "" {
user.zlog.Debug().
Str("portal_jid", portal.Key.JID.String()).
Msg("Chat already has a room, deleting messages from database")
user.bridge.DB.HistorySync.DeleteAllMessagesForPortal(user.MXID, portal.Key)
} else if i < user.bridge.Config.Bridge.HistorySync.MaxInitialConversations {
err = portal.CreateMatrixRoom(user, nil, true, true)
if err != nil {
user.zlog.Err(err).Msg("Failed to create Matrix room for backfill")
}
}
}
}
}
func (portal *Portal) legacyBackfill(user *User) {
defer portal.latestEventBackfillLock.Unlock()
// This should only be called from CreateMatrixRoom which locks latestEventBackfillLock before creating the room.
if portal.latestEventBackfillLock.TryLock() {
panic("legacyBackfill() called without locking latestEventBackfillLock")
}
// TODO use portal.zlog instead of user.zlog
log := user.zlog.With().
Str("portal_jid", portal.Key.JID.String()).
Str("action", "legacy backfill").
Logger()
messages := user.bridge.DB.HistorySync.GetMessagesBetween(user.MXID, portal.Key.JID.String(), nil, nil, portal.bridge.Config.Bridge.HistorySync.MessageCount)
log.Debug().Int("message_count", len(messages)).Msg("Got messages to backfill from database")
for i := len(messages) - 1; i >= 0; i-- {
msgEvt, err := user.Client.ParseWebMessage(portal.Key.JID, messages[i])
if err != nil {
log.Warn().Err(err).
Int("msg_index", i).
Str("msg_id", messages[i].GetKey().GetId()).
Uint64("msg_time_seconds", messages[i].GetMessageTimestamp()).
Msg("Dropping historical message due to parse error")
continue
}
portal.handleMessage(user, msgEvt)
}
log.Debug().Msg("Backfill complete, deleting leftover messages from database")
user.bridge.DB.HistorySync.DeleteAllMessagesForPortal(user.MXID, portal.Key)
}
func (user *User) dailyMediaRequestLoop() { func (user *User) dailyMediaRequestLoop() {
// Calculate when to do the first set of media retry requests // Calculate when to do the first set of media retry requests
now := time.Now() now := time.Now()
@ -358,12 +425,12 @@ func (user *User) shouldCreatePortalForHistorySync(conv *database.HistorySyncCon
} }
} }
func (user *User) handleHistorySync(backfillQueue *BackfillQueue, evt *waProto.HistorySync) { func (user *User) storeHistorySync(evt *waProto.HistorySync) {
if evt == nil || evt.SyncType == nil { if evt == nil || evt.SyncType == nil {
return return
} }
log := user.bridge.ZLog.With(). log := user.bridge.ZLog.With().
Str("method", "User.handleHistorySync"). Str("method", "User.storeHistorySync").
Str("user_id", user.MXID.String()). Str("user_id", user.MXID.String()).
Str("sync_type", evt.GetSyncType().String()). Str("sync_type", evt.GetSyncType().String()).
Uint32("chunk_order", evt.GetChunkOrder()). Uint32("chunk_order", evt.GetChunkOrder()).

15
main.go
View file

@ -33,7 +33,6 @@ import (
"go.mau.fi/whatsmeow/store/sqlstore" "go.mau.fi/whatsmeow/store/sqlstore"
"go.mau.fi/whatsmeow/types" "go.mau.fi/whatsmeow/types"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/bridge" "maunium.net/go/mautrix/bridge"
"maunium.net/go/mautrix/bridge/commands" "maunium.net/go/mautrix/bridge/commands"
"maunium.net/go/mautrix/bridge/status" "maunium.net/go/mautrix/bridge/status"
@ -248,20 +247,6 @@ func (br *WABridge) GetConfigPtr() interface{} {
return br.Config return br.Config
} }
const unstableFeatureBatchSending = "org.matrix.msc2716"
func (br *WABridge) CheckFeatures(versions *mautrix.RespVersions) (string, bool) {
if br.Config.Bridge.HistorySync.Backfill {
supported, known := versions.UnstableFeatures[unstableFeatureBatchSending]
if !known {
return "Backfilling is enabled in bridge config, but homeserver does not support MSC2716 batch sending", false
} else if !supported {
return "Backfilling is enabled in bridge config, but MSC2716 batch sending is not enabled on homeserver", false
}
}
return "", true
}
func main() { func main() {
br := &WABridge{ br := &WABridge{
usersByMXID: make(map[id.UserID]*User), usersByMXID: make(map[id.UserID]*User),

View file

@ -1697,6 +1697,16 @@ func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, i
if !portal.shouldSetDMRoomMetadata() { if !portal.shouldSetDMRoomMetadata() {
req.Name = "" req.Name = ""
} }
legacyBackfill := user.bridge.Config.Bridge.HistorySync.Backfill && backfill && !user.bridge.SpecVersions.Supports(mautrix.BeeperFeatureBatchSending)
var backfillStarted bool
if legacyBackfill {
portal.latestEventBackfillLock.Lock()
defer func() {
if !backfillStarted {
portal.latestEventBackfillLock.Unlock()
}
}()
}
resp, err := intent.CreateRoom(req) resp, err := intent.CreateRoom(req)
if err != nil { if err != nil {
return err return err
@ -1758,10 +1768,15 @@ func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, i
} }
if user.bridge.Config.Bridge.HistorySync.Backfill && backfill { if user.bridge.Config.Bridge.HistorySync.Backfill && backfill {
portals := []*Portal{portal} if legacyBackfill {
user.EnqueueImmediateBackfills(portals) backfillStarted = true
user.EnqueueDeferredBackfills(portals) go portal.legacyBackfill(user)
user.BackfillQueue.ReCheck() } else {
portals := []*Portal{portal}
user.EnqueueImmediateBackfills(portals)
user.EnqueueDeferredBackfills(portals)
user.BackfillQueue.ReCheck()
}
} }
return nil return nil
} }
@ -4491,7 +4506,7 @@ func (portal *Portal) Cleanup(puppetsOnly bool) {
return return
} }
intent := portal.MainIntent() intent := portal.MainIntent()
if portal.bridge.SpecVersions.UnstableFeatures["com.beeper.room_yeeting"] { if portal.bridge.SpecVersions.Supports(mautrix.BeeperFeatureRoomYeeting) {
err := intent.BeeperDeleteRoom(portal.MXID) err := intent.BeeperDeleteRoom(portal.MXID)
if err == nil || errors.Is(err, mautrix.MNotFound) { if err == nil || errors.Is(err, mautrix.MNotFound) {
return return