Merge pull request #474 from mautrix/sumner/bri-2926

media backfill: add ability to automatically request media not on WA server
This commit is contained in:
Sumner Evans 2022-04-19 11:04:29 -06:00 committed by GitHub
commit a3d56704a7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 141 additions and 35 deletions

View file

@ -56,12 +56,12 @@ func (bq *BackfillQueue) deferredBackfillLoop(user *User) {
// Finish all immediate backfills before doing the deferred ones.
if immediate := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); immediate != nil {
time.Sleep(10 * time.Second)
continue
}
if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillDeferred); backfill != nil {
} else if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillDeferred); backfill != nil {
bq.DeferredBackfillRequests <- backfill
backfill.MarkDone()
} else if mediaBackfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillMedia); mediaBackfill != nil {
bq.DeferredBackfillRequests <- mediaBackfill
mediaBackfill.MarkDone()
} else {
time.Sleep(10 * time.Second)
}

View file

@ -868,8 +868,14 @@ func (handler *CommandHandler) CommandBackfill(ce *CommandEvent) {
return
}
}
backfill := ce.Portal.bridge.DB.BackfillQuery.NewWithValues(ce.User.MXID, database.BackfillImmediate, 0, &ce.Portal.Key, nil, nil, batchSize, -1, batchDelay)
backfill.Insert()
backfillMessages := ce.Portal.bridge.DB.BackfillQuery.NewWithValues(ce.User.MXID, database.BackfillImmediate, 0, &ce.Portal.Key, nil, nil, batchSize, -1, batchDelay)
backfillMessages.Insert()
if ce.Bridge.Config.Bridge.HistorySync.BackfillMedia {
backfillMedia := ce.Portal.bridge.DB.BackfillQuery.NewWithValues(ce.User.MXID, database.BackfillMedia, 1, &ce.Portal.Key, nil, nil, batchSize, -1, batchDelay)
backfillMedia.Insert()
}
ce.User.BackfillQueue.ReCheckQueue <- true
}

View file

@ -46,8 +46,12 @@ type BridgeConfig struct {
IdentityChangeNotices bool `yaml:"identity_change_notices"`
HistorySync struct {
CreatePortals bool `yaml:"create_portals"`
Backfill bool `yaml:"backfill"`
CreatePortals bool `yaml:"create_portals"`
Backfill bool `yaml:"backfill"`
BackfillMedia bool `yaml:"backfill_media"`
EnqueueBackfillMediaNextStart bool `yaml:"enqueue_backfill_media_next_start"`
DoublePuppetBackfill bool `yaml:"double_puppet_backfill"`
RequestFullSync bool `yaml:"request_full_sync"`
MaxInitialConversations int `yaml:"max_initial_conversations"`
@ -58,6 +62,7 @@ type BridgeConfig struct {
} `yaml:"immediate"`
Deferred []DeferredConfig `yaml:"deferred"`
Media []DeferredConfig `yaml:"media"`
} `yaml:"history_sync"`
UserAvatarSync bool `yaml:"user_avatar_sync"`
BridgeMatrixLeave bool `yaml:"bridge_matrix_leave"`

View file

@ -79,12 +79,15 @@ func (helper *UpgradeHelper) doUpgrade() {
helper.Copy(Bool, "bridge", "identity_change_notices")
helper.Copy(Bool, "bridge", "history_sync", "create_portals")
helper.Copy(Bool, "bridge", "history_sync", "backfill")
helper.Copy(Bool, "bridge", "history_sync", "backfill_media")
helper.Copy(Bool, "bridge", "history_sync", "enqueue_backfill_media_next_start")
helper.Copy(Bool, "bridge", "history_sync", "double_puppet_backfill")
helper.Copy(Bool, "bridge", "history_sync", "request_full_sync")
helper.Copy(Int, "bridge", "history_sync", "max_initial_conversations")
helper.Copy(Int, "bridge", "history_sync", "immediate", "worker_count")
helper.Copy(Int, "bridge", "history_sync", "immediate", "max_events")
helper.Copy(List, "bridge", "history_sync", "deferred")
helper.Copy(List, "bridge", "history_sync", "media")
helper.Copy(Bool, "bridge", "user_avatar_sync")
helper.Copy(Bool, "bridge", "bridge_matrix_leave")
helper.Copy(Bool, "bridge", "sync_with_custom_puppets")

View file

@ -30,6 +30,7 @@ type BackfillType int
const (
BackfillImmediate BackfillType = 0
BackfillDeferred = 1
BackfillMedia = 2
)
type BackfillQuery struct {

View file

@ -66,6 +66,14 @@ func (pq *PortalQuery) GetAll() []*Portal {
return pq.getAll("SELECT * FROM portal")
}
func (pq *PortalQuery) GetAllForUser(userID id.UserID) []*Portal {
return pq.getAll(`
SELECT p.* FROM portal p
LEFT JOIN user_portal up ON p.jid=up.portal_jid AND p.receiver=up.portal_receiver
WHERE mxid<>'' AND up.user_mxid=$1
`, userID)
}
func (pq *PortalQuery) GetByJID(key PortalKey) *Portal {
return pq.get("SELECT * FROM portal WHERE jid=$1 AND receiver=$2", key.JID, key.Receiver)
}

View file

@ -125,6 +125,13 @@ bridge:
# Note that prior to Synapse 1.49, there were some bugs with the implementation, especially if using event persistence workers.
# There are also still some issues in Synapse's federation implementation.
backfill: false
# Enable requesting media that was not found on the WhatsApp server.
# Only applies if backfill is true.
backfill_media: false
# Enqueue backfilling of media that was not found on the WhatsApp
# server on next startup for all portals.
# Only applies if backfill and backfill_media are true.
enqueue_backfill_media_next_start: false
# Use double puppets for backfilling?
# In order to use this, the double puppets must be in the appservice's user ID namespace
# (because the bridge can't use the double puppet access token with batch sending).
@ -177,6 +184,25 @@ bridge:
- start_days_ago: -1
max_batch_events: 500
batch_delay: 10
# Settings for automatically requesting all media that was not found on
# the WhatsApp server. This process happens after the deferred
# backfills are completed.
# The config is the same as for deferred backfills, except the
# max_batch_events represents the maximum number of media messages to
# request.
media:
# Last Month
- start_days_ago: 30
max_batch_events: 5
batch_delay: 10
# Last 3 months
- start_days_ago: 90
max_batch_events: 5
batch_delay: 10
# The start of time
- start_days_ago: -1
max_batch_events: 10
batch_delay: 20
# Should puppet avatars be fetched from the server even if an avatar is already set?
user_avatar_sync: true
# Should Matrix users leaving groups be bridged to WhatsApp?

View file

@ -84,25 +84,51 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac
user.log.Debugfln("Could not find history sync conversation data for %s", req.Portal.String())
continue
}
// Update the client store with basic chat settings.
if conv.MuteEndTime.After(time.Now()) {
user.Client.Store.ChatSettings.PutMutedUntil(conv.PortalKey.JID, conv.MuteEndTime)
}
if conv.Archived {
user.Client.Store.ChatSettings.PutArchived(conv.PortalKey.JID, true)
}
if conv.Pinned > 0 {
user.Client.Store.ChatSettings.PutPinned(conv.PortalKey.JID, true)
}
portal := user.GetPortalByJID(conv.PortalKey.JID)
if conv.EphemeralExpiration != nil && portal.ExpirationTime != *conv.EphemeralExpiration {
portal.ExpirationTime = *conv.EphemeralExpiration
portal.Update()
}
user.createOrUpdatePortalAndBackfillWithLock(req, conv, portal)
if req.BackfillType == database.BackfillMedia {
startTime := time.Unix(0, 0)
if req.TimeStart != nil {
startTime = *req.TimeStart
}
endTime := time.Now()
if req.TimeEnd != nil {
endTime = *req.TimeEnd
}
user.log.Debugfln("Backfilling media from %v to %v for %s", startTime, endTime, portal.Key.String())
// Go through all of the messages in the given time range,
// requesting any media that errored.
requested := 0
for _, msg := range user.bridge.DB.Message.GetMessagesBetween(portal.Key, startTime, endTime) {
if requested > 0 && requested%req.MaxBatchEvents == 0 {
time.Sleep(time.Duration(req.BatchDelay) * time.Second)
}
if msg.Error == database.MsgErrMediaNotFound {
portal.requestMediaRetry(user, msg.MXID)
requested += 1
}
}
} else {
// Update the client store with basic chat settings.
if conv.MuteEndTime.After(time.Now()) {
user.Client.Store.ChatSettings.PutMutedUntil(conv.PortalKey.JID, conv.MuteEndTime)
}
if conv.Archived {
user.Client.Store.ChatSettings.PutArchived(conv.PortalKey.JID, true)
}
if conv.Pinned > 0 {
user.Client.Store.ChatSettings.PutPinned(conv.PortalKey.JID, true)
}
if conv.EphemeralExpiration != nil && portal.ExpirationTime != *conv.EphemeralExpiration {
portal.ExpirationTime = *conv.EphemeralExpiration
portal.Update()
}
user.createOrUpdatePortalAndBackfillWithLock(req, conv, portal)
}
}
}
@ -245,7 +271,8 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History
}
nMostRecent := user.bridge.DB.HistorySyncQuery.GetNMostRecentConversations(user.MXID, user.bridge.Config.Bridge.HistorySync.MaxInitialConversations)
for i, conv := range nMostRecent {
var priorityCounter int
for _, conv := range nMostRecent {
jid, err := types.ParseJID(conv.ConversationID)
if err != nil {
user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.ConversationID, err)
@ -256,10 +283,11 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History
switch evt.GetSyncType() {
case waProto.HistorySync_INITIAL_BOOTSTRAP:
// Enqueue immediate backfills for the most recent messages first.
user.EnqueueImmedateBackfill(portal, i)
user.EnqueueImmedateBackfill(portal, &priorityCounter)
case waProto.HistorySync_FULL, waProto.HistorySync_RECENT:
// Enqueue deferred backfills as configured.
user.EnqueueDeferredBackfills(portal, len(nMostRecent), i)
user.EnqueueDeferredBackfills(portal, &priorityCounter)
user.EnqueueMediaBackfills(portal, &priorityCounter)
}
}
@ -276,22 +304,38 @@ func getConversationTimestamp(conv *waProto.Conversation) uint64 {
return convTs
}
func (user *User) EnqueueImmedateBackfill(portal *Portal, priority int) {
func (user *User) EnqueueImmedateBackfill(portal *Portal, priorityCounter *int) {
maxMessages := user.bridge.Config.Bridge.HistorySync.Immediate.MaxEvents
initialBackfill := user.bridge.DB.BackfillQuery.NewWithValues(user.MXID, database.BackfillImmediate, priority, &portal.Key, nil, nil, maxMessages, maxMessages, 0)
initialBackfill := user.bridge.DB.BackfillQuery.NewWithValues(user.MXID, database.BackfillImmediate, *priorityCounter, &portal.Key, nil, nil, maxMessages, maxMessages, 0)
initialBackfill.Insert()
*priorityCounter++
}
func (user *User) EnqueueDeferredBackfills(portal *Portal, numConversations, priority int) {
for j, backfillStage := range user.bridge.Config.Bridge.HistorySync.Deferred {
func (user *User) EnqueueDeferredBackfills(portal *Portal, priorityCounter *int) {
for _, backfillStage := range user.bridge.Config.Bridge.HistorySync.Deferred {
var startDate *time.Time = nil
if backfillStage.StartDaysAgo > 0 {
startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo)
startDate = &startDaysAgo
}
backfillMessages := user.bridge.DB.BackfillQuery.NewWithValues(
user.MXID, database.BackfillDeferred, *priorityCounter, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay)
backfillMessages.Insert()
*priorityCounter++
}
}
func (user *User) EnqueueMediaBackfills(portal *Portal, priorityCounter *int) {
for _, backfillStage := range user.bridge.Config.Bridge.HistorySync.Media {
var startDate *time.Time = nil
if backfillStage.StartDaysAgo > 0 {
startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo)
startDate = &startDaysAgo
}
backfill := user.bridge.DB.BackfillQuery.NewWithValues(
user.MXID, database.BackfillDeferred, j*numConversations+priority, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay)
user.MXID, database.BackfillMedia, *priorityCounter, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay)
backfill.Insert()
*priorityCounter++
}
}

View file

@ -92,6 +92,10 @@ func (bridge *Bridge) GetAllPortals() []*Portal {
return bridge.dbPortalsToPortals(bridge.DB.Portal.GetAll())
}
func (bridge *Bridge) GetAllPortalsForUser(userID id.UserID) []*Portal {
return bridge.dbPortalsToPortals(bridge.DB.Portal.GetAllForUser(userID))
}
func (bridge *Bridge) GetAllPortalsByJID(jid types.JID) []*Portal {
return bridge.dbPortalsToPortals(bridge.DB.Portal.GetAllByJID(jid))
}
@ -1339,8 +1343,10 @@ func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, i
}
if user.bridge.Config.Bridge.HistorySync.Backfill && backfill {
user.EnqueueImmedateBackfill(portal, 0)
user.EnqueueDeferredBackfills(portal, 1, 0)
var priorityCounter int
user.EnqueueImmedateBackfill(portal, &priorityCounter)
user.EnqueueDeferredBackfills(portal, &priorityCounter)
user.EnqueueMediaBackfills(portal, &priorityCounter)
user.BackfillQueue.ReCheckQueue <- true
}
return nil

View file

@ -190,6 +190,13 @@ func (bridge *Bridge) NewUser(dbUser *database.User) *User {
user.Admin = user.bridge.Config.Bridge.Permissions.IsAdmin(user.MXID)
if user.bridge.Config.Bridge.HistorySync.Backfill {
go user.handleHistorySyncsLoop()
if user.bridge.Config.Bridge.HistorySync.BackfillMedia && user.bridge.Config.Bridge.HistorySync.EnqueueBackfillMediaNextStart {
var priorityCounter int
for _, portal := range user.bridge.GetAllPortalsForUser(user.MXID) {
user.EnqueueMediaBackfills(portal, &priorityCounter)
}
}
}
return user
}