From 4bc4e928770f93a10a3fe7fd5bb26f548177a597 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Mon, 2 May 2022 15:00:57 +0300 Subject: [PATCH] Remove media backfill queue and just request immediately --- backfillqueue.go | 4 +- commands.go | 8 +-- config/bridge.go | 6 +-- config/upgrade.go | 3 +- database/backfillqueue.go | 3 -- example-config.yaml | 10 ++-- historysync.go | 100 ++++++++++++++------------------------ matrix.go | 13 ++--- portal.go | 49 +++++++++---------- user.go | 4 -- 10 files changed, 78 insertions(+), 122 deletions(-) diff --git a/backfillqueue.go b/backfillqueue.go index 57ad1a6..a958df1 100644 --- a/backfillqueue.go +++ b/backfillqueue.go @@ -39,7 +39,7 @@ func (bq *BackfillQueue) RunLoop(user *User) { if backfill := bq.BackfillQuery.GetNext(user.MXID); backfill != nil { if backfill.BackfillType == database.BackfillImmediate || backfill.BackfillType == database.BackfillForward { bq.ImmediateBackfillRequests <- backfill - } else { + } else if backfill.BackfillType == database.BackfillDeferred { select { case <-bq.ReCheckQueue: // If a queue re-check is requested, interrupt sending the @@ -48,6 +48,8 @@ func (bq *BackfillQueue) RunLoop(user *User) { continue case bq.DeferredBackfillRequests <- backfill: } + } else { + bq.log.Debugfln("Unrecognized backfill type %d in queue", backfill.BackfillType) } backfill.MarkDone() } else { diff --git a/commands.go b/commands.go index 478fa3b..a289eab 100644 --- a/commands.go +++ b/commands.go @@ -31,7 +31,6 @@ import ( "github.com/tidwall/gjson" "maunium.net/go/maulogger/v2" - "maunium.net/go/mautrix-whatsapp/database" "go.mau.fi/whatsmeow" "go.mau.fi/whatsmeow/appstate" @@ -42,6 +41,8 @@ import ( "maunium.net/go/mautrix/event" "maunium.net/go/mautrix/format" "maunium.net/go/mautrix/id" + + "maunium.net/go/mautrix-whatsapp/database" ) type CommandHandler struct { @@ -871,11 +872,6 @@ func (handler *CommandHandler) CommandBackfill(ce *CommandEvent) { backfillMessages := ce.Portal.bridge.DB.BackfillQuery.NewWithValues(ce.User.MXID, database.BackfillImmediate, 0, &ce.Portal.Key, nil, nil, batchSize, -1, batchDelay) backfillMessages.Insert() - if ce.Bridge.Config.Bridge.HistorySync.BackfillMedia { - backfillMedia := ce.Portal.bridge.DB.BackfillQuery.NewWithValues(ce.User.MXID, database.BackfillMedia, 1, &ce.Portal.Key, nil, nil, batchSize, -1, batchDelay) - backfillMedia.Insert() - } - ce.User.BackfillQueue.ReCheckQueue <- true } diff --git a/config/bridge.go b/config/bridge.go index 42515e3..3b7a205 100644 --- a/config/bridge.go +++ b/config/bridge.go @@ -47,13 +47,11 @@ type BridgeConfig struct { HistorySync struct { CreatePortals bool `yaml:"create_portals"` - - Backfill bool `yaml:"backfill"` - BackfillMedia bool `yaml:"backfill_media"` - EnqueueBackfillMediaNextStart bool `yaml:"enqueue_backfill_media_next_start"` + Backfill bool `yaml:"backfill"` DoublePuppetBackfill bool `yaml:"double_puppet_backfill"` RequestFullSync bool `yaml:"request_full_sync"` + AutoRequestMedia bool `yaml:"auto_request_media"` MaxInitialConversations int `yaml:"max_initial_conversations"` Immediate struct { diff --git a/config/upgrade.go b/config/upgrade.go index 0b6927c..eac2d59 100644 --- a/config/upgrade.go +++ b/config/upgrade.go @@ -80,10 +80,9 @@ func (helper *UpgradeHelper) doUpgrade() { helper.Copy(Bool, "bridge", "identity_change_notices") helper.Copy(Bool, "bridge", "history_sync", "create_portals") helper.Copy(Bool, "bridge", "history_sync", "backfill") - helper.Copy(Bool, "bridge", "history_sync", "backfill_media") - helper.Copy(Bool, "bridge", "history_sync", "enqueue_backfill_media_next_start") helper.Copy(Bool, "bridge", "history_sync", "double_puppet_backfill") helper.Copy(Bool, "bridge", "history_sync", "request_full_sync") + helper.Copy(Bool, "bridge", "history_sync", "auto_request_media") helper.Copy(Int, "bridge", "history_sync", "max_initial_conversations") helper.Copy(Int, "bridge", "history_sync", "immediate", "worker_count") helper.Copy(Int, "bridge", "history_sync", "immediate", "max_events") diff --git a/database/backfillqueue.go b/database/backfillqueue.go index 962e84a..5947fa2 100644 --- a/database/backfillqueue.go +++ b/database/backfillqueue.go @@ -32,7 +32,6 @@ const ( BackfillImmediate BackfillType = 0 BackfillForward BackfillType = 100 BackfillDeferred BackfillType = 200 - BackfillMedia BackfillType = 300 ) func (bt BackfillType) String() string { @@ -43,8 +42,6 @@ func (bt BackfillType) String() string { return "FORWARD" case BackfillDeferred: return "DEFERRED" - case BackfillMedia: - return "MEDIA" } return "UNKNOWN" } diff --git a/example-config.yaml b/example-config.yaml index 22f88b4..88ebc3a 100644 --- a/example-config.yaml +++ b/example-config.yaml @@ -127,13 +127,6 @@ bridge: # Note that prior to Synapse 1.49, there were some bugs with the implementation, especially if using event persistence workers. # There are also still some issues in Synapse's federation implementation. backfill: false - # Enable requesting media that was not found on the WhatsApp server. - # Only applies if backfill is true. - backfill_media: false - # Enqueue backfilling of media that was not found on the WhatsApp - # server on next startup for all portals. - # Only applies if backfill and backfill_media are true. - enqueue_backfill_media_next_start: false # Use double puppets for backfilling? # In order to use this, the double puppets must be in the appservice's user ID namespace # (because the bridge can't use the double puppet access token with batch sending). @@ -142,6 +135,9 @@ bridge: # Should the bridge request a full sync from the phone when logging in? # This bumps the size of history syncs from 3 months to 1 year. request_full_sync: false + # Should expired media be automatically requested from the server after backfilling? + # If false, media can still be requested by reacting with the ♻️ (recycle) emoji. + auto_request_media: true # The maximum number of initial conversations that should be synced. # Other conversations will be backfilled on demand when the start PM # provisioning endpoint is used or when a message comes in from that diff --git a/historysync.go b/historysync.go index 61e6285..213a2e3 100644 --- a/historysync.go +++ b/historysync.go @@ -38,6 +38,8 @@ type wrappedInfo struct { Type database.MessageType Error database.MessageErrorType + MediaKey []byte + ExpirationStart uint64 ExpiresIn uint32 } @@ -86,50 +88,23 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac } portal := user.GetPortalByJID(conv.PortalKey.JID) - if req.BackfillType == database.BackfillMedia { - startTime := time.Unix(0, 0) - if req.TimeStart != nil { - startTime = *req.TimeStart - } - endTime := time.Now() - if req.TimeEnd != nil { - endTime = *req.TimeEnd - } - - user.log.Infofln("Backfilling media from %v to %v for %s", startTime, endTime, portal.Key.String()) - - // Go through all of the messages in the given time range, - // requesting any media that errored. - requested := 0 - for _, msg := range user.bridge.DB.Message.GetMessagesBetween(portal.Key, startTime, endTime) { - if msg.Error == database.MsgErrMediaNotFound { - if requested > 0 && requested%req.MaxBatchEvents == 0 { - time.Sleep(time.Duration(req.BatchDelay) * time.Second) - } - - portal.requestMediaRetry(user, msg.MXID) - requested += 1 - } - } - } else { - // Update the client store with basic chat settings. - if conv.MuteEndTime.After(time.Now()) { - user.Client.Store.ChatSettings.PutMutedUntil(conv.PortalKey.JID, conv.MuteEndTime) - } - if conv.Archived { - user.Client.Store.ChatSettings.PutArchived(conv.PortalKey.JID, true) - } - if conv.Pinned > 0 { - user.Client.Store.ChatSettings.PutPinned(conv.PortalKey.JID, true) - } - - if conv.EphemeralExpiration != nil && portal.ExpirationTime != *conv.EphemeralExpiration { - portal.ExpirationTime = *conv.EphemeralExpiration - portal.Update() - } - - user.backfillInChunks(req, conv, portal) + // Update the client store with basic chat settings. + if conv.MuteEndTime.After(time.Now()) { + user.Client.Store.ChatSettings.PutMutedUntil(conv.PortalKey.JID, conv.MuteEndTime) } + if conv.Archived { + user.Client.Store.ChatSettings.PutArchived(conv.PortalKey.JID, true) + } + if conv.Pinned > 0 { + user.Client.Store.ChatSettings.PutPinned(conv.PortalKey.JID, true) + } + + if conv.EphemeralExpiration != nil && portal.ExpirationTime != *conv.EphemeralExpiration { + portal.ExpirationTime = *conv.EphemeralExpiration + portal.Update() + } + + user.backfillInChunks(req, conv, portal) } } @@ -353,7 +328,6 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History user.EnqueueForwardBackfills(portals) // Enqueue deferred backfills as configured. user.EnqueueDeferredBackfills(portals) - user.EnqueueMediaBackfills(portals) } // Tell the queue to check for new backfill requests. @@ -406,22 +380,6 @@ func (user *User) EnqueueForwardBackfills(portals []*Portal) { } } -func (user *User) EnqueueMediaBackfills(portals []*Portal) { - numPortals := len(portals) - for stageIdx, backfillStage := range user.bridge.Config.Bridge.HistorySync.Media { - for portalIdx, portal := range portals { - var startDate *time.Time = nil - if backfillStage.StartDaysAgo > 0 { - startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo) - startDate = &startDaysAgo - } - backfill := user.bridge.DB.BackfillQuery.NewWithValues( - user.MXID, database.BackfillMedia, stageIdx*numPortals+portalIdx, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay) - backfill.Insert() - } - } -} - // endregion // region Portal backfilling @@ -521,7 +479,7 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo, intent = puppet.DefaultIntent() } - converted := portal.convertMessage(intent, source, info, msg) + converted := portal.convertMessage(intent, source, info, msg, true) if converted == nil { portal.log.Debugfln("Skipping unsupported message %s in backfill", info.ID) continue @@ -560,10 +518,26 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo, portal.finishBatch(resp.EventIDs, infos) portal.NextBatchID = resp.NextBatchID portal.Update() + if portal.bridge.Config.Bridge.HistorySync.AutoRequestMedia { + go portal.requestMediaRetries(source, infos) + } return resp } } +func (portal *Portal) requestMediaRetries(source *User, infos []*wrappedInfo) { + for _, info := range infos { + if info.Error == database.MsgErrMediaNotFound && info.MediaKey != nil { + err := source.Client.SendMediaRetryReceipt(info.MessageInfo, info.MediaKey) + if err != nil { + portal.log.Warnfln("Failed to send post-backfill media retry request for %s: %v", info.ID, err) + } else { + portal.log.Debugfln("Sent post-backfill media retry request for %s", info.ID) + } + } + } +} + func (portal *Portal) parseWebMessageInfo(source *User, webMsg *waProto.WebMessageInfo) *types.MessageInfo { info := types.MessageInfo{ MessageSource: types.MessageSource{ @@ -603,10 +577,10 @@ func (portal *Portal) appendBatchEvents(converted *ConvertedMessage, info *types return err } *eventsArray = append(*eventsArray, mainEvt, captionEvt) - *infoArray = append(*infoArray, &wrappedInfo{info, database.MsgNormal, converted.Error, expirationStart, converted.ExpiresIn}, nil) + *infoArray = append(*infoArray, &wrappedInfo{info, database.MsgNormal, converted.Error, converted.MediaKey, expirationStart, converted.ExpiresIn}, nil) } else { *eventsArray = append(*eventsArray, mainEvt) - *infoArray = append(*infoArray, &wrappedInfo{info, database.MsgNormal, converted.Error, expirationStart, converted.ExpiresIn}) + *infoArray = append(*infoArray, &wrappedInfo{info, database.MsgNormal, converted.Error, converted.MediaKey, expirationStart, converted.ExpiresIn}) } if converted.MultiEvent != nil { for _, subEvtContent := range converted.MultiEvent { diff --git a/matrix.go b/matrix.go index 408d19a..89ce655 100644 --- a/matrix.go +++ b/matrix.go @@ -484,18 +484,19 @@ func (mx *MatrixHandler) HandleReaction(evt *event.Event) { portal := mx.bridge.GetPortalByMXID(evt.RoomID) if portal == nil { return + } else if portal.IsPrivateChat() && user.JID.User != portal.Key.Receiver.User { + // One user can only react once, so we don't use the relay user for reactions + return } content := evt.Content.AsReaction() - if content.RelatesTo.Key == "click to retry" || strings.HasPrefix(content.RelatesTo.Key, "\u267b") { // ♻️ - portal.requestMediaRetry(user, content.RelatesTo.EventID) - } else { - if portal.IsPrivateChat() && user.JID.User != portal.Key.Receiver.User { - // One user can only react once, so we don't use the relay user for reactions + if strings.Contains(content.RelatesTo.Key, "retry") || strings.HasPrefix(content.RelatesTo.Key, "\u267b") { // ♻️ + if portal.requestMediaRetry(user, content.RelatesTo.EventID) { + // Errored media, don't try to send as reaction return } - portal.HandleMatrixReaction(user, evt) } + portal.HandleMatrixReaction(user, evt) } func (mx *MatrixHandler) HandleRedaction(evt *event.Event) { diff --git a/portal.go b/portal.go index e46e720..e06b4b9 100644 --- a/portal.go +++ b/portal.go @@ -458,20 +458,24 @@ func formatDuration(d time.Duration) string { return naturalJoin(parts) } -func (portal *Portal) convertMessage(intent *appservice.IntentAPI, source *User, info *types.MessageInfo, waMsg *waProto.Message) *ConvertedMessage { +func (portal *Portal) convertMessage(intent *appservice.IntentAPI, source *User, info *types.MessageInfo, waMsg *waProto.Message, isBackfill bool) *ConvertedMessage { switch { case waMsg.Conversation != nil || waMsg.ExtendedTextMessage != nil: return portal.convertTextMessage(intent, source, waMsg) case waMsg.ImageMessage != nil: - return portal.convertMediaMessage(intent, source, info, waMsg.GetImageMessage()) + return portal.convertMediaMessage(intent, source, info, waMsg.GetImageMessage(), "photo", isBackfill) case waMsg.StickerMessage != nil: - return portal.convertMediaMessage(intent, source, info, waMsg.GetStickerMessage()) + return portal.convertMediaMessage(intent, source, info, waMsg.GetStickerMessage(), "sticker", isBackfill) case waMsg.VideoMessage != nil: - return portal.convertMediaMessage(intent, source, info, waMsg.GetVideoMessage()) + return portal.convertMediaMessage(intent, source, info, waMsg.GetVideoMessage(), "video attachment", isBackfill) case waMsg.AudioMessage != nil: - return portal.convertMediaMessage(intent, source, info, waMsg.GetAudioMessage()) + typeName := "audio attachment" + if waMsg.GetAudioMessage().GetPtt() { + typeName = "voice message" + } + return portal.convertMediaMessage(intent, source, info, waMsg.GetAudioMessage(), typeName, isBackfill) case waMsg.DocumentMessage != nil: - return portal.convertMediaMessage(intent, source, info, waMsg.GetDocumentMessage()) + return portal.convertMediaMessage(intent, source, info, waMsg.GetDocumentMessage(), "file attachment", isBackfill) case waMsg.ContactMessage != nil: return portal.convertContactMessage(intent, waMsg.GetContactMessage()) case waMsg.ContactsArrayMessage != nil: @@ -625,7 +629,7 @@ func (portal *Portal) handleMessage(source *User, evt *events.Message) { portal.log.Debugfln("Not handling %s (%s): user doesn't have double puppeting enabled", msgID, msgType) return } - converted := portal.convertMessage(intent, source, &evt.Info, evt.Message) + converted := portal.convertMessage(intent, source, &evt.Info, evt.Message, false) if converted != nil { if evt.Info.IsIncomingBroadcast() { if converted.Extra == nil { @@ -1358,7 +1362,6 @@ func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, i portals := []*Portal{portal} user.EnqueueImmedateBackfills(portals) user.EnqueueDeferredBackfills(portals) - user.EnqueueMediaBackfills(portals) user.BackfillQueue.ReCheckQueue <- true } return nil @@ -1594,6 +1597,7 @@ type ConvertedMessage struct { ReplyTo types.MessageID ExpiresIn uint32 Error database.MessageErrorType + MediaKey []byte } func (portal *Portal) convertTextMessage(intent *appservice.IntentAPI, source *User, msg *waProto.Message) *ConvertedMessage { @@ -2172,26 +2176,18 @@ func (portal *Portal) uploadMedia(intent *appservice.IntentAPI, data []byte, con return nil } -func (portal *Portal) convertMediaMessage(intent *appservice.IntentAPI, source *User, info *types.MessageInfo, msg MediaMessage) *ConvertedMessage { +func (portal *Portal) convertMediaMessage(intent *appservice.IntentAPI, source *User, info *types.MessageInfo, msg MediaMessage, typeName string, isBackfill bool) *ConvertedMessage { converted := portal.convertMediaMessageContent(intent, msg) data, err := source.Client.Download(msg) if errors.Is(err, whatsmeow.ErrMediaDownloadFailedWith404) || errors.Is(err, whatsmeow.ErrMediaDownloadFailedWith410) { - //portal.log.Warnfln("Failed to download media for %s: %v. Requesting retry", info.ID, err) - //err = source.Client.SendMediaRetryReceipt(info, msg.GetMediaKey()) - //if err != nil { - // portal.log.Errorfln("Failed to send media retry receipt for %s: %v", info.ID, err) - //} converted.Error = database.MsgErrMediaNotFound + converted.MediaKey = msg.GetMediaKey() - errorText := "Old photo or attachment." - if portal.bridge.Config.Bridge.HistorySync.BackfillMedia { - if len(portal.bridge.Config.Bridge.HistorySync.Media) > 0 { - errorText += " Media will be requested from your phone later." - } else { - errorText += ` React with the \u267b (recycle) emoji or the text "click to retry" to request this media from your phone or use the backfill command to request all missing media for this chat.` - } + errorText := fmt.Sprintf("Old %s.", typeName) + if portal.bridge.Config.Bridge.HistorySync.AutoRequestMedia && isBackfill { + errorText += " Media will be automatically requested from your phone later." } else { - errorText += ` Automatic media backfill is disabled. React with the \u267b (recycle) emoji or the text "click to retry" to request this media from your phone.` + errorText += ` React with the \u267b (recycle) emoji to request this media from your phone.` } return portal.makeMediaBridgeFailureMessage(info, err, converted, &FailedMediaKeys{ @@ -2326,20 +2322,20 @@ func (portal *Portal) handleMediaRetry(retry *events.MediaRetry, source *User) { msg.UpdateMXID(resp.EventID, database.MsgNormal, database.MsgNoError) } -func (portal *Portal) requestMediaRetry(user *User, eventID id.EventID) { +func (portal *Portal) requestMediaRetry(user *User, eventID id.EventID) bool { msg := portal.bridge.DB.Message.GetByMXID(eventID) if msg == nil { portal.log.Debugfln("%s requested a media retry for unknown event %s", user.MXID, eventID) - return + return false } else if msg.Error != database.MsgErrMediaNotFound { portal.log.Debugfln("%s requested a media retry for non-errored event %s", user.MXID, eventID) - return + return false } evt, err := portal.fetchMediaRetryEvent(msg) if err != nil { portal.log.Warnfln("Can't send media retry request for %s: %v", msg.JID, err) - return + return true } err = user.Client.SendMediaRetryReceipt(&types.MessageInfo{ @@ -2356,6 +2352,7 @@ func (portal *Portal) requestMediaRetry(user *User, eventID id.EventID) { } else { portal.log.Debugfln("Sent media retry request for %s", msg.JID) } + return true } const thumbnailMaxSize = 72 diff --git a/user.go b/user.go index 25744b2..6350320 100644 --- a/user.go +++ b/user.go @@ -585,10 +585,6 @@ func (user *User) HandleEvent(event interface{}) { if user.bridge.Config.Bridge.HistorySync.Backfill && !user.historySyncLoopsStarted { go user.handleHistorySyncsLoop() user.historySyncLoopsStarted = true - - if user.bridge.Config.Bridge.HistorySync.BackfillMedia && user.bridge.Config.Bridge.HistorySync.EnqueueBackfillMediaNextStart { - user.EnqueueMediaBackfills(user.bridge.GetAllPortalsForUser(user.MXID)) - } } case *events.OfflineSyncPreview: user.log.Infofln("Server says it's going to send %d messages and %d receipts that were missed during downtime", v.Messages, v.Receipts)