From f2e762680c1d243df7e5c0a0983d7388fdcddc17 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Mon, 18 Apr 2022 20:50:21 -0600 Subject: [PATCH] media backfill: add ability to automatically request media not on WA server This adds a new backfill type for media that sends a request to the phone for every media that is not available on the WA servers. WA deletes media from their servers after about two weeks, so you have to ask the phone to re-upload it. In order to use this, you need to enable bridge.history_sync.backfill_media and configure the requests that will be made per portal using bridge.history_sync.media (which is similar to the deferred backfill config). If you already have backfilled portals, but want to do a one-off media backfill for all existing portals, you can set bridge.history_sync.enqueue_backfill_media_next_start to true. --- backfillqueue.go | 8 ++-- commands.go | 10 ++++- config/bridge.go | 9 +++- config/upgrade.go | 3 ++ database/backfillqueue.go | 1 + database/portal.go | 8 ++++ example-config.yaml | 26 +++++++++++ historysync.go | 94 ++++++++++++++++++++++++++++----------- portal.go | 10 ++++- user.go | 7 +++ 10 files changed, 141 insertions(+), 35 deletions(-) diff --git a/backfillqueue.go b/backfillqueue.go index fb3fe3f..6de3548 100644 --- a/backfillqueue.go +++ b/backfillqueue.go @@ -56,12 +56,12 @@ func (bq *BackfillQueue) deferredBackfillLoop(user *User) { // Finish all immediate backfills before doing the deferred ones. if immediate := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); immediate != nil { time.Sleep(10 * time.Second) - continue - } - - if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillDeferred); backfill != nil { + } else if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillDeferred); backfill != nil { bq.DeferredBackfillRequests <- backfill backfill.MarkDone() + } else if mediaBackfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillMedia); mediaBackfill != nil { + bq.DeferredBackfillRequests <- mediaBackfill + mediaBackfill.MarkDone() } else { time.Sleep(10 * time.Second) } diff --git a/commands.go b/commands.go index a7bdebb..478fa3b 100644 --- a/commands.go +++ b/commands.go @@ -868,8 +868,14 @@ func (handler *CommandHandler) CommandBackfill(ce *CommandEvent) { return } } - backfill := ce.Portal.bridge.DB.BackfillQuery.NewWithValues(ce.User.MXID, database.BackfillImmediate, 0, &ce.Portal.Key, nil, nil, batchSize, -1, batchDelay) - backfill.Insert() + backfillMessages := ce.Portal.bridge.DB.BackfillQuery.NewWithValues(ce.User.MXID, database.BackfillImmediate, 0, &ce.Portal.Key, nil, nil, batchSize, -1, batchDelay) + backfillMessages.Insert() + + if ce.Bridge.Config.Bridge.HistorySync.BackfillMedia { + backfillMedia := ce.Portal.bridge.DB.BackfillQuery.NewWithValues(ce.User.MXID, database.BackfillMedia, 1, &ce.Portal.Key, nil, nil, batchSize, -1, batchDelay) + backfillMedia.Insert() + } + ce.User.BackfillQueue.ReCheckQueue <- true } diff --git a/config/bridge.go b/config/bridge.go index 840472d..03edf5a 100644 --- a/config/bridge.go +++ b/config/bridge.go @@ -46,8 +46,12 @@ type BridgeConfig struct { IdentityChangeNotices bool `yaml:"identity_change_notices"` HistorySync struct { - CreatePortals bool `yaml:"create_portals"` - Backfill bool `yaml:"backfill"` + CreatePortals bool `yaml:"create_portals"` + + Backfill bool `yaml:"backfill"` + BackfillMedia bool `yaml:"backfill_media"` + EnqueueBackfillMediaNextStart bool `yaml:"enqueue_backfill_media_next_start"` + DoublePuppetBackfill bool `yaml:"double_puppet_backfill"` RequestFullSync bool `yaml:"request_full_sync"` MaxInitialConversations int `yaml:"max_initial_conversations"` @@ -58,6 +62,7 @@ type BridgeConfig struct { } `yaml:"immediate"` Deferred []DeferredConfig `yaml:"deferred"` + Media []DeferredConfig `yaml:"media"` } `yaml:"history_sync"` UserAvatarSync bool `yaml:"user_avatar_sync"` BridgeMatrixLeave bool `yaml:"bridge_matrix_leave"` diff --git a/config/upgrade.go b/config/upgrade.go index fdf06a0..ad151d2 100644 --- a/config/upgrade.go +++ b/config/upgrade.go @@ -79,12 +79,15 @@ func (helper *UpgradeHelper) doUpgrade() { helper.Copy(Bool, "bridge", "identity_change_notices") helper.Copy(Bool, "bridge", "history_sync", "create_portals") helper.Copy(Bool, "bridge", "history_sync", "backfill") + helper.Copy(Bool, "bridge", "history_sync", "backfill_media") + helper.Copy(Bool, "bridge", "history_sync", "enqueue_backfill_media_next_start") helper.Copy(Bool, "bridge", "history_sync", "double_puppet_backfill") helper.Copy(Bool, "bridge", "history_sync", "request_full_sync") helper.Copy(Int, "bridge", "history_sync", "max_initial_conversations") helper.Copy(Int, "bridge", "history_sync", "immediate", "worker_count") helper.Copy(Int, "bridge", "history_sync", "immediate", "max_events") helper.Copy(List, "bridge", "history_sync", "deferred") + helper.Copy(List, "bridge", "history_sync", "media") helper.Copy(Bool, "bridge", "user_avatar_sync") helper.Copy(Bool, "bridge", "bridge_matrix_leave") helper.Copy(Bool, "bridge", "sync_with_custom_puppets") diff --git a/database/backfillqueue.go b/database/backfillqueue.go index 545b80a..0cf316f 100644 --- a/database/backfillqueue.go +++ b/database/backfillqueue.go @@ -30,6 +30,7 @@ type BackfillType int const ( BackfillImmediate BackfillType = 0 BackfillDeferred = 1 + BackfillMedia = 2 ) type BackfillQuery struct { diff --git a/database/portal.go b/database/portal.go index 3d14569..3c67727 100644 --- a/database/portal.go +++ b/database/portal.go @@ -66,6 +66,14 @@ func (pq *PortalQuery) GetAll() []*Portal { return pq.getAll("SELECT * FROM portal") } +func (pq *PortalQuery) GetAllForUser(userID id.UserID) []*Portal { + return pq.getAll(` + SELECT p.* FROM portal p + LEFT JOIN user_portal up ON p.jid=up.portal_jid AND p.receiver=up.portal_receiver + WHERE mxid<>'' AND up.user_mxid=$1 + `, userID) +} + func (pq *PortalQuery) GetByJID(key PortalKey) *Portal { return pq.get("SELECT * FROM portal WHERE jid=$1 AND receiver=$2", key.JID, key.Receiver) } diff --git a/example-config.yaml b/example-config.yaml index faf49c7..41feb28 100644 --- a/example-config.yaml +++ b/example-config.yaml @@ -125,6 +125,13 @@ bridge: # Note that prior to Synapse 1.49, there were some bugs with the implementation, especially if using event persistence workers. # There are also still some issues in Synapse's federation implementation. backfill: false + # Enable requesting media that was not found on the WhatsApp server. + # Only applies if backfill is true. + backfill_media: false + # Enqueue backfilling of media that was not found on the WhatsApp + # server on next startup for all portals. + # Only applies if backfill and backfill_media are true. + enqueue_backfill_media_next_start: false # Use double puppets for backfilling? # In order to use this, the double puppets must be in the appservice's user ID namespace # (because the bridge can't use the double puppet access token with batch sending). @@ -177,6 +184,25 @@ bridge: - start_days_ago: -1 max_batch_events: 500 batch_delay: 10 + # Settings for automatically requesting all media that was not found on + # the WhatsApp server. This process happens after the deferred + # backfills are completed. + # The config is the same as for deferred backfills, except the + # max_batch_events represents the maximum number of media messages to + # request. + media: + # Last Month + - start_days_ago: 30 + max_batch_events: 5 + batch_delay: 10 + # Last 3 months + - start_days_ago: 90 + max_batch_events: 5 + batch_delay: 10 + # The start of time + - start_days_ago: -1 + max_batch_events: 10 + batch_delay: 20 # Should puppet avatars be fetched from the server even if an avatar is already set? user_avatar_sync: true # Should Matrix users leaving groups be bridged to WhatsApp? diff --git a/historysync.go b/historysync.go index 57ba936..a4d821a 100644 --- a/historysync.go +++ b/historysync.go @@ -84,25 +84,51 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac user.log.Debugfln("Could not find history sync conversation data for %s", req.Portal.String()) continue } - - // Update the client store with basic chat settings. - if conv.MuteEndTime.After(time.Now()) { - user.Client.Store.ChatSettings.PutMutedUntil(conv.PortalKey.JID, conv.MuteEndTime) - } - if conv.Archived { - user.Client.Store.ChatSettings.PutArchived(conv.PortalKey.JID, true) - } - if conv.Pinned > 0 { - user.Client.Store.ChatSettings.PutPinned(conv.PortalKey.JID, true) - } - portal := user.GetPortalByJID(conv.PortalKey.JID) - if conv.EphemeralExpiration != nil && portal.ExpirationTime != *conv.EphemeralExpiration { - portal.ExpirationTime = *conv.EphemeralExpiration - portal.Update() - } - user.createOrUpdatePortalAndBackfillWithLock(req, conv, portal) + if req.BackfillType == database.BackfillMedia { + startTime := time.Unix(0, 0) + if req.TimeStart != nil { + startTime = *req.TimeStart + } + endTime := time.Now() + if req.TimeEnd != nil { + endTime = *req.TimeEnd + } + + user.log.Debugfln("Backfilling media from %v to %v for %s", startTime, endTime, portal.Key.String()) + + // Go through all of the messages in the given time range, + // requesting any media that errored. + requested := 0 + for _, msg := range user.bridge.DB.Message.GetMessagesBetween(portal.Key, startTime, endTime) { + if requested > 0 && requested%req.MaxBatchEvents == 0 { + time.Sleep(time.Duration(req.BatchDelay) * time.Second) + } + if msg.Error == database.MsgErrMediaNotFound { + portal.requestMediaRetry(user, msg.MXID) + requested += 1 + } + } + } else { + // Update the client store with basic chat settings. + if conv.MuteEndTime.After(time.Now()) { + user.Client.Store.ChatSettings.PutMutedUntil(conv.PortalKey.JID, conv.MuteEndTime) + } + if conv.Archived { + user.Client.Store.ChatSettings.PutArchived(conv.PortalKey.JID, true) + } + if conv.Pinned > 0 { + user.Client.Store.ChatSettings.PutPinned(conv.PortalKey.JID, true) + } + + if conv.EphemeralExpiration != nil && portal.ExpirationTime != *conv.EphemeralExpiration { + portal.ExpirationTime = *conv.EphemeralExpiration + portal.Update() + } + + user.createOrUpdatePortalAndBackfillWithLock(req, conv, portal) + } } } @@ -245,7 +271,8 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History } nMostRecent := user.bridge.DB.HistorySyncQuery.GetNMostRecentConversations(user.MXID, user.bridge.Config.Bridge.HistorySync.MaxInitialConversations) - for i, conv := range nMostRecent { + var priorityCounter int + for _, conv := range nMostRecent { jid, err := types.ParseJID(conv.ConversationID) if err != nil { user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.ConversationID, err) @@ -256,10 +283,11 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History switch evt.GetSyncType() { case waProto.HistorySync_INITIAL_BOOTSTRAP: // Enqueue immediate backfills for the most recent messages first. - user.EnqueueImmedateBackfill(portal, i) + user.EnqueueImmedateBackfill(portal, &priorityCounter) case waProto.HistorySync_FULL, waProto.HistorySync_RECENT: // Enqueue deferred backfills as configured. - user.EnqueueDeferredBackfills(portal, len(nMostRecent), i) + user.EnqueueDeferredBackfills(portal, &priorityCounter) + user.EnqueueMediaBackfills(portal, &priorityCounter) } } @@ -276,22 +304,38 @@ func getConversationTimestamp(conv *waProto.Conversation) uint64 { return convTs } -func (user *User) EnqueueImmedateBackfill(portal *Portal, priority int) { +func (user *User) EnqueueImmedateBackfill(portal *Portal, priorityCounter *int) { maxMessages := user.bridge.Config.Bridge.HistorySync.Immediate.MaxEvents - initialBackfill := user.bridge.DB.BackfillQuery.NewWithValues(user.MXID, database.BackfillImmediate, priority, &portal.Key, nil, nil, maxMessages, maxMessages, 0) + initialBackfill := user.bridge.DB.BackfillQuery.NewWithValues(user.MXID, database.BackfillImmediate, *priorityCounter, &portal.Key, nil, nil, maxMessages, maxMessages, 0) initialBackfill.Insert() + *priorityCounter++ } -func (user *User) EnqueueDeferredBackfills(portal *Portal, numConversations, priority int) { - for j, backfillStage := range user.bridge.Config.Bridge.HistorySync.Deferred { +func (user *User) EnqueueDeferredBackfills(portal *Portal, priorityCounter *int) { + for _, backfillStage := range user.bridge.Config.Bridge.HistorySync.Deferred { + var startDate *time.Time = nil + if backfillStage.StartDaysAgo > 0 { + startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo) + startDate = &startDaysAgo + } + backfillMessages := user.bridge.DB.BackfillQuery.NewWithValues( + user.MXID, database.BackfillDeferred, *priorityCounter, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay) + backfillMessages.Insert() + *priorityCounter++ + } +} + +func (user *User) EnqueueMediaBackfills(portal *Portal, priorityCounter *int) { + for _, backfillStage := range user.bridge.Config.Bridge.HistorySync.Media { var startDate *time.Time = nil if backfillStage.StartDaysAgo > 0 { startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo) startDate = &startDaysAgo } backfill := user.bridge.DB.BackfillQuery.NewWithValues( - user.MXID, database.BackfillDeferred, j*numConversations+priority, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay) + user.MXID, database.BackfillMedia, *priorityCounter, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay) backfill.Insert() + *priorityCounter++ } } diff --git a/portal.go b/portal.go index 1560c88..23680b0 100644 --- a/portal.go +++ b/portal.go @@ -92,6 +92,10 @@ func (bridge *Bridge) GetAllPortals() []*Portal { return bridge.dbPortalsToPortals(bridge.DB.Portal.GetAll()) } +func (bridge *Bridge) GetAllPortalsForUser(userID id.UserID) []*Portal { + return bridge.dbPortalsToPortals(bridge.DB.Portal.GetAllForUser(userID)) +} + func (bridge *Bridge) GetAllPortalsByJID(jid types.JID) []*Portal { return bridge.dbPortalsToPortals(bridge.DB.Portal.GetAllByJID(jid)) } @@ -1339,8 +1343,10 @@ func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, i } if user.bridge.Config.Bridge.HistorySync.Backfill && backfill { - user.EnqueueImmedateBackfill(portal, 0) - user.EnqueueDeferredBackfills(portal, 1, 0) + var priorityCounter int + user.EnqueueImmedateBackfill(portal, &priorityCounter) + user.EnqueueDeferredBackfills(portal, &priorityCounter) + user.EnqueueMediaBackfills(portal, &priorityCounter) user.BackfillQueue.ReCheckQueue <- true } return nil diff --git a/user.go b/user.go index 5ed2c64..d2cec49 100644 --- a/user.go +++ b/user.go @@ -190,6 +190,13 @@ func (bridge *Bridge) NewUser(dbUser *database.User) *User { user.Admin = user.bridge.Config.Bridge.Permissions.IsAdmin(user.MXID) if user.bridge.Config.Bridge.HistorySync.Backfill { go user.handleHistorySyncsLoop() + + if user.bridge.Config.Bridge.HistorySync.BackfillMedia && user.bridge.Config.Bridge.HistorySync.EnqueueBackfillMediaNextStart { + var priorityCounter int + for _, portal := range user.bridge.GetAllPortalsForUser(user.MXID) { + user.EnqueueMediaBackfills(portal, &priorityCounter) + } + } } return user }