media backfill: add ability to automatically request media not on WA server

This adds a new backfill type for media that sends a request to the
phone for every media that is not available on the WA servers. WA
deletes media from their servers after about two weeks, so you have to
ask the phone to re-upload it.

In order to use this, you need to enable
bridge.history_sync.backfill_media and configure the requests that will
be made per portal using bridge.history_sync.media (which is similar to
the deferred backfill config).

If you already have backfilled portals, but want to do a one-off media
backfill for all existing portals, you can set
bridge.history_sync.enqueue_backfill_media_next_start to true.
This commit is contained in:
Sumner Evans 2022-04-18 20:50:21 -06:00
parent c98b7f32c9
commit f2e762680c
No known key found for this signature in database
GPG key ID: 8904527AB50022FD
10 changed files with 141 additions and 35 deletions

View file

@ -56,12 +56,12 @@ func (bq *BackfillQueue) deferredBackfillLoop(user *User) {
// Finish all immediate backfills before doing the deferred ones.
if immediate := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); immediate != nil {
time.Sleep(10 * time.Second)
continue
}
if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillDeferred); backfill != nil {
} else if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillDeferred); backfill != nil {
bq.DeferredBackfillRequests <- backfill
backfill.MarkDone()
} else if mediaBackfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillMedia); mediaBackfill != nil {
bq.DeferredBackfillRequests <- mediaBackfill
mediaBackfill.MarkDone()
} else {
time.Sleep(10 * time.Second)
}

View file

@ -868,8 +868,14 @@ func (handler *CommandHandler) CommandBackfill(ce *CommandEvent) {
return
}
}
backfill := ce.Portal.bridge.DB.BackfillQuery.NewWithValues(ce.User.MXID, database.BackfillImmediate, 0, &ce.Portal.Key, nil, nil, batchSize, -1, batchDelay)
backfill.Insert()
backfillMessages := ce.Portal.bridge.DB.BackfillQuery.NewWithValues(ce.User.MXID, database.BackfillImmediate, 0, &ce.Portal.Key, nil, nil, batchSize, -1, batchDelay)
backfillMessages.Insert()
if ce.Bridge.Config.Bridge.HistorySync.BackfillMedia {
backfillMedia := ce.Portal.bridge.DB.BackfillQuery.NewWithValues(ce.User.MXID, database.BackfillMedia, 1, &ce.Portal.Key, nil, nil, batchSize, -1, batchDelay)
backfillMedia.Insert()
}
ce.User.BackfillQueue.ReCheckQueue <- true
}

View file

@ -46,8 +46,12 @@ type BridgeConfig struct {
IdentityChangeNotices bool `yaml:"identity_change_notices"`
HistorySync struct {
CreatePortals bool `yaml:"create_portals"`
Backfill bool `yaml:"backfill"`
CreatePortals bool `yaml:"create_portals"`
Backfill bool `yaml:"backfill"`
BackfillMedia bool `yaml:"backfill_media"`
EnqueueBackfillMediaNextStart bool `yaml:"enqueue_backfill_media_next_start"`
DoublePuppetBackfill bool `yaml:"double_puppet_backfill"`
RequestFullSync bool `yaml:"request_full_sync"`
MaxInitialConversations int `yaml:"max_initial_conversations"`
@ -58,6 +62,7 @@ type BridgeConfig struct {
} `yaml:"immediate"`
Deferred []DeferredConfig `yaml:"deferred"`
Media []DeferredConfig `yaml:"media"`
} `yaml:"history_sync"`
UserAvatarSync bool `yaml:"user_avatar_sync"`
BridgeMatrixLeave bool `yaml:"bridge_matrix_leave"`

View file

@ -79,12 +79,15 @@ func (helper *UpgradeHelper) doUpgrade() {
helper.Copy(Bool, "bridge", "identity_change_notices")
helper.Copy(Bool, "bridge", "history_sync", "create_portals")
helper.Copy(Bool, "bridge", "history_sync", "backfill")
helper.Copy(Bool, "bridge", "history_sync", "backfill_media")
helper.Copy(Bool, "bridge", "history_sync", "enqueue_backfill_media_next_start")
helper.Copy(Bool, "bridge", "history_sync", "double_puppet_backfill")
helper.Copy(Bool, "bridge", "history_sync", "request_full_sync")
helper.Copy(Int, "bridge", "history_sync", "max_initial_conversations")
helper.Copy(Int, "bridge", "history_sync", "immediate", "worker_count")
helper.Copy(Int, "bridge", "history_sync", "immediate", "max_events")
helper.Copy(List, "bridge", "history_sync", "deferred")
helper.Copy(List, "bridge", "history_sync", "media")
helper.Copy(Bool, "bridge", "user_avatar_sync")
helper.Copy(Bool, "bridge", "bridge_matrix_leave")
helper.Copy(Bool, "bridge", "sync_with_custom_puppets")

View file

@ -30,6 +30,7 @@ type BackfillType int
const (
BackfillImmediate BackfillType = 0
BackfillDeferred = 1
BackfillMedia = 2
)
type BackfillQuery struct {

View file

@ -66,6 +66,14 @@ func (pq *PortalQuery) GetAll() []*Portal {
return pq.getAll("SELECT * FROM portal")
}
func (pq *PortalQuery) GetAllForUser(userID id.UserID) []*Portal {
return pq.getAll(`
SELECT p.* FROM portal p
LEFT JOIN user_portal up ON p.jid=up.portal_jid AND p.receiver=up.portal_receiver
WHERE mxid<>'' AND up.user_mxid=$1
`, userID)
}
func (pq *PortalQuery) GetByJID(key PortalKey) *Portal {
return pq.get("SELECT * FROM portal WHERE jid=$1 AND receiver=$2", key.JID, key.Receiver)
}

View file

@ -125,6 +125,13 @@ bridge:
# Note that prior to Synapse 1.49, there were some bugs with the implementation, especially if using event persistence workers.
# There are also still some issues in Synapse's federation implementation.
backfill: false
# Enable requesting media that was not found on the WhatsApp server.
# Only applies if backfill is true.
backfill_media: false
# Enqueue backfilling of media that was not found on the WhatsApp
# server on next startup for all portals.
# Only applies if backfill and backfill_media are true.
enqueue_backfill_media_next_start: false
# Use double puppets for backfilling?
# In order to use this, the double puppets must be in the appservice's user ID namespace
# (because the bridge can't use the double puppet access token with batch sending).
@ -177,6 +184,25 @@ bridge:
- start_days_ago: -1
max_batch_events: 500
batch_delay: 10
# Settings for automatically requesting all media that was not found on
# the WhatsApp server. This process happens after the deferred
# backfills are completed.
# The config is the same as for deferred backfills, except the
# max_batch_events represents the maximum number of media messages to
# request.
media:
# Last Month
- start_days_ago: 30
max_batch_events: 5
batch_delay: 10
# Last 3 months
- start_days_ago: 90
max_batch_events: 5
batch_delay: 10
# The start of time
- start_days_ago: -1
max_batch_events: 10
batch_delay: 20
# Should puppet avatars be fetched from the server even if an avatar is already set?
user_avatar_sync: true
# Should Matrix users leaving groups be bridged to WhatsApp?

View file

@ -84,25 +84,51 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac
user.log.Debugfln("Could not find history sync conversation data for %s", req.Portal.String())
continue
}
// Update the client store with basic chat settings.
if conv.MuteEndTime.After(time.Now()) {
user.Client.Store.ChatSettings.PutMutedUntil(conv.PortalKey.JID, conv.MuteEndTime)
}
if conv.Archived {
user.Client.Store.ChatSettings.PutArchived(conv.PortalKey.JID, true)
}
if conv.Pinned > 0 {
user.Client.Store.ChatSettings.PutPinned(conv.PortalKey.JID, true)
}
portal := user.GetPortalByJID(conv.PortalKey.JID)
if conv.EphemeralExpiration != nil && portal.ExpirationTime != *conv.EphemeralExpiration {
portal.ExpirationTime = *conv.EphemeralExpiration
portal.Update()
}
user.createOrUpdatePortalAndBackfillWithLock(req, conv, portal)
if req.BackfillType == database.BackfillMedia {
startTime := time.Unix(0, 0)
if req.TimeStart != nil {
startTime = *req.TimeStart
}
endTime := time.Now()
if req.TimeEnd != nil {
endTime = *req.TimeEnd
}
user.log.Debugfln("Backfilling media from %v to %v for %s", startTime, endTime, portal.Key.String())
// Go through all of the messages in the given time range,
// requesting any media that errored.
requested := 0
for _, msg := range user.bridge.DB.Message.GetMessagesBetween(portal.Key, startTime, endTime) {
if requested > 0 && requested%req.MaxBatchEvents == 0 {
time.Sleep(time.Duration(req.BatchDelay) * time.Second)
}
if msg.Error == database.MsgErrMediaNotFound {
portal.requestMediaRetry(user, msg.MXID)
requested += 1
}
}
} else {
// Update the client store with basic chat settings.
if conv.MuteEndTime.After(time.Now()) {
user.Client.Store.ChatSettings.PutMutedUntil(conv.PortalKey.JID, conv.MuteEndTime)
}
if conv.Archived {
user.Client.Store.ChatSettings.PutArchived(conv.PortalKey.JID, true)
}
if conv.Pinned > 0 {
user.Client.Store.ChatSettings.PutPinned(conv.PortalKey.JID, true)
}
if conv.EphemeralExpiration != nil && portal.ExpirationTime != *conv.EphemeralExpiration {
portal.ExpirationTime = *conv.EphemeralExpiration
portal.Update()
}
user.createOrUpdatePortalAndBackfillWithLock(req, conv, portal)
}
}
}
@ -245,7 +271,8 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History
}
nMostRecent := user.bridge.DB.HistorySyncQuery.GetNMostRecentConversations(user.MXID, user.bridge.Config.Bridge.HistorySync.MaxInitialConversations)
for i, conv := range nMostRecent {
var priorityCounter int
for _, conv := range nMostRecent {
jid, err := types.ParseJID(conv.ConversationID)
if err != nil {
user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.ConversationID, err)
@ -256,10 +283,11 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History
switch evt.GetSyncType() {
case waProto.HistorySync_INITIAL_BOOTSTRAP:
// Enqueue immediate backfills for the most recent messages first.
user.EnqueueImmedateBackfill(portal, i)
user.EnqueueImmedateBackfill(portal, &priorityCounter)
case waProto.HistorySync_FULL, waProto.HistorySync_RECENT:
// Enqueue deferred backfills as configured.
user.EnqueueDeferredBackfills(portal, len(nMostRecent), i)
user.EnqueueDeferredBackfills(portal, &priorityCounter)
user.EnqueueMediaBackfills(portal, &priorityCounter)
}
}
@ -276,22 +304,38 @@ func getConversationTimestamp(conv *waProto.Conversation) uint64 {
return convTs
}
func (user *User) EnqueueImmedateBackfill(portal *Portal, priority int) {
func (user *User) EnqueueImmedateBackfill(portal *Portal, priorityCounter *int) {
maxMessages := user.bridge.Config.Bridge.HistorySync.Immediate.MaxEvents
initialBackfill := user.bridge.DB.BackfillQuery.NewWithValues(user.MXID, database.BackfillImmediate, priority, &portal.Key, nil, nil, maxMessages, maxMessages, 0)
initialBackfill := user.bridge.DB.BackfillQuery.NewWithValues(user.MXID, database.BackfillImmediate, *priorityCounter, &portal.Key, nil, nil, maxMessages, maxMessages, 0)
initialBackfill.Insert()
*priorityCounter++
}
func (user *User) EnqueueDeferredBackfills(portal *Portal, numConversations, priority int) {
for j, backfillStage := range user.bridge.Config.Bridge.HistorySync.Deferred {
func (user *User) EnqueueDeferredBackfills(portal *Portal, priorityCounter *int) {
for _, backfillStage := range user.bridge.Config.Bridge.HistorySync.Deferred {
var startDate *time.Time = nil
if backfillStage.StartDaysAgo > 0 {
startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo)
startDate = &startDaysAgo
}
backfillMessages := user.bridge.DB.BackfillQuery.NewWithValues(
user.MXID, database.BackfillDeferred, *priorityCounter, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay)
backfillMessages.Insert()
*priorityCounter++
}
}
func (user *User) EnqueueMediaBackfills(portal *Portal, priorityCounter *int) {
for _, backfillStage := range user.bridge.Config.Bridge.HistorySync.Media {
var startDate *time.Time = nil
if backfillStage.StartDaysAgo > 0 {
startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo)
startDate = &startDaysAgo
}
backfill := user.bridge.DB.BackfillQuery.NewWithValues(
user.MXID, database.BackfillDeferred, j*numConversations+priority, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay)
user.MXID, database.BackfillMedia, *priorityCounter, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay)
backfill.Insert()
*priorityCounter++
}
}

View file

@ -92,6 +92,10 @@ func (bridge *Bridge) GetAllPortals() []*Portal {
return bridge.dbPortalsToPortals(bridge.DB.Portal.GetAll())
}
func (bridge *Bridge) GetAllPortalsForUser(userID id.UserID) []*Portal {
return bridge.dbPortalsToPortals(bridge.DB.Portal.GetAllForUser(userID))
}
func (bridge *Bridge) GetAllPortalsByJID(jid types.JID) []*Portal {
return bridge.dbPortalsToPortals(bridge.DB.Portal.GetAllByJID(jid))
}
@ -1339,8 +1343,10 @@ func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, i
}
if user.bridge.Config.Bridge.HistorySync.Backfill && backfill {
user.EnqueueImmedateBackfill(portal, 0)
user.EnqueueDeferredBackfills(portal, 1, 0)
var priorityCounter int
user.EnqueueImmedateBackfill(portal, &priorityCounter)
user.EnqueueDeferredBackfills(portal, &priorityCounter)
user.EnqueueMediaBackfills(portal, &priorityCounter)
user.BackfillQueue.ReCheckQueue <- true
}
return nil

View file

@ -190,6 +190,13 @@ func (bridge *Bridge) NewUser(dbUser *database.User) *User {
user.Admin = user.bridge.Config.Bridge.Permissions.IsAdmin(user.MXID)
if user.bridge.Config.Bridge.HistorySync.Backfill {
go user.handleHistorySyncsLoop()
if user.bridge.Config.Bridge.HistorySync.BackfillMedia && user.bridge.Config.Bridge.HistorySync.EnqueueBackfillMediaNextStart {
var priorityCounter int
for _, portal := range user.bridge.GetAllPortalsForUser(user.MXID) {
user.EnqueueMediaBackfills(portal, &priorityCounter)
}
}
}
return user
}