From e362743f1842fff9086fa2a5810ac2f0eac59639 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Tue, 10 May 2022 12:18:01 -0600 Subject: [PATCH 1/6] config: add settings for automatic media requests --- config/bridge.go | 14 +++++++++++++- config/upgrade.go | 4 +++- example-config.yaml | 19 ++++++++++++++++--- historysync.go | 4 +++- portal.go | 2 +- 5 files changed, 36 insertions(+), 7 deletions(-) diff --git a/config/bridge.go b/config/bridge.go index b1e77bf..1c652c9 100644 --- a/config/bridge.go +++ b/config/bridge.go @@ -34,6 +34,13 @@ type DeferredConfig struct { BatchDelay int `yaml:"batch_delay"` } +type MediaRequestMethod string + +const ( + MediaRequestMethodImmediate MediaRequestMethod = "immediate" + MediaRequestMethodLocalTime = "local_time" +) + type BridgeConfig struct { UsernameTemplate string `yaml:"username_template"` DisplaynameTemplate string `yaml:"displayname_template"` @@ -51,7 +58,6 @@ type BridgeConfig struct { DoublePuppetBackfill bool `yaml:"double_puppet_backfill"` RequestFullSync bool `yaml:"request_full_sync"` - AutoRequestMedia bool `yaml:"auto_request_media"` MaxInitialConversations int `yaml:"max_initial_conversations"` Immediate struct { @@ -59,6 +65,12 @@ type BridgeConfig struct { MaxEvents int `yaml:"max_events"` } `yaml:"immediate"` + MediaRequests struct { + AutoRequestMedia bool `yaml:"auto_request_media"` + RequestMethod MediaRequestMethod `yaml:"request_method"` + RequestLocalTime int `yaml:"request_local_time"` + } `yaml:"media_requests"` + Deferred []DeferredConfig `yaml:"deferred"` } `yaml:"history_sync"` UserAvatarSync bool `yaml:"user_avatar_sync"` diff --git a/config/upgrade.go b/config/upgrade.go index 99d6db8..a1688f2 100644 --- a/config/upgrade.go +++ b/config/upgrade.go @@ -81,7 +81,9 @@ func (helper *UpgradeHelper) doUpgrade() { helper.Copy(Bool, "bridge", "history_sync", "backfill") helper.Copy(Bool, "bridge", "history_sync", "double_puppet_backfill") helper.Copy(Bool, "bridge", "history_sync", "request_full_sync") - helper.Copy(Bool, "bridge", "history_sync", "auto_request_media") + helper.Copy(Bool, "bridge", "history_sync", "media_requests", "auto_request_media") + helper.Copy(Str, "bridge", "history_sync", "media_requests", "request_method") + helper.Copy(Int, "bridge", "history_sync", "media_requests", "request_local_time") helper.Copy(Int, "bridge", "history_sync", "max_initial_conversations") helper.Copy(Int, "bridge", "history_sync", "immediate", "worker_count") helper.Copy(Int, "bridge", "history_sync", "immediate", "max_events") diff --git a/example-config.yaml b/example-config.yaml index a0bfbe7..06b67ad 100644 --- a/example-config.yaml +++ b/example-config.yaml @@ -133,9 +133,22 @@ bridge: # Should the bridge request a full sync from the phone when logging in? # This bumps the size of history syncs from 3 months to 1 year. request_full_sync: false - # Should expired media be automatically requested from the server after backfilling? - # If false, media can still be requested by reacting with the ♻️ (recycle) emoji. - auto_request_media: true + # Settings for media requests. If the media expired, then it will not + # be on the WA servers. + # Media can always be requested by reacting with the ♻️ (recycle) emoji. + # These settings determine if the media requests should be done + # automatically during or after backfill. + media_requests: + # Should expired media be automatically requested from the server as + # part of the backfill process? + auto_request_media: true + # Whether to request the media immediately after the media message + # is backfilled ("immediate") or at a specific time of the day + # ("local_time"). + request_method: immediate + # If request_method is "local_time", what time should the requests + # be sent (in minutes after midnight)? + request_local_time: 120 # The maximum number of initial conversations that should be synced. # Other conversations will be backfilled on demand when the start PM # provisioning endpoint is used or when a message comes in from that diff --git a/historysync.go b/historysync.go index 137ee7c..27fef3f 100644 --- a/historysync.go +++ b/historysync.go @@ -28,6 +28,7 @@ import ( "maunium.net/go/mautrix/event" "maunium.net/go/mautrix/id" + "maunium.net/go/mautrix-whatsapp/config" "maunium.net/go/mautrix-whatsapp/database" ) @@ -518,7 +519,8 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo, portal.finishBatch(resp.EventIDs, infos) portal.NextBatchID = resp.NextBatchID portal.Update() - if portal.bridge.Config.Bridge.HistorySync.AutoRequestMedia { + if portal.bridge.Config.Bridge.HistorySync.MediaRequests.AutoRequestMedia && + portal.bridge.Config.Bridge.HistorySync.MediaRequests.RequestMethod == config.MediaRequestMethodImmediate { go portal.requestMediaRetries(source, infos) } return resp diff --git a/portal.go b/portal.go index 062ce63..d219757 100644 --- a/portal.go +++ b/portal.go @@ -2178,7 +2178,7 @@ func (portal *Portal) convertMediaMessage(intent *appservice.IntentAPI, source * converted.MediaKey = msg.GetMediaKey() errorText := fmt.Sprintf("Old %s.", typeName) - if portal.bridge.Config.Bridge.HistorySync.AutoRequestMedia && isBackfill { + if portal.bridge.Config.Bridge.HistorySync.MediaRequests.AutoRequestMedia && isBackfill { errorText += " Media will be automatically requested from your phone later." } else { errorText += ` React with the \u267b (recycle) emoji to request this media from your phone.` From b5551ee16a8618c5048b333125c489eeb4cf68df Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Tue, 10 May 2022 14:28:30 -0600 Subject: [PATCH 2/6] database: add media requests buffer table --- commands.go | 2 +- database/database.go | 15 ++- database/mediabackfillrequest.go | 124 ++++++++++++++++++ ...-09-media-backfill-requests-queue-table.go | 25 ++++ database/upgrades/upgrades.go | 2 +- historysync.go | 20 +-- portal.go | 4 +- user.go | 6 +- 8 files changed, 176 insertions(+), 22 deletions(-) create mode 100644 database/mediabackfillrequest.go create mode 100644 database/upgrades/2022-05-09-media-backfill-requests-queue-table.go diff --git a/commands.go b/commands.go index b959cc7..255d22e 100644 --- a/commands.go +++ b/commands.go @@ -873,7 +873,7 @@ func (handler *CommandHandler) CommandBackfill(ce *CommandEvent) { return } } - backfillMessages := ce.Portal.bridge.DB.BackfillQuery.NewWithValues(ce.User.MXID, database.BackfillImmediate, 0, &ce.Portal.Key, nil, nil, batchSize, -1, batchDelay) + backfillMessages := ce.Portal.bridge.DB.Backfill.NewWithValues(ce.User.MXID, database.BackfillImmediate, 0, &ce.Portal.Key, nil, nil, batchSize, -1, batchDelay) backfillMessages.Insert() ce.User.BackfillQueue.ReCheckQueue <- true diff --git a/database/database.go b/database/database.go index 15c39a9..8fe1150 100644 --- a/database/database.go +++ b/database/database.go @@ -49,9 +49,10 @@ type Database struct { Message *MessageQuery Reaction *ReactionQuery - DisappearingMessage *DisappearingMessageQuery - BackfillQuery *BackfillQuery - HistorySyncQuery *HistorySyncQuery + DisappearingMessage *DisappearingMessageQuery + Backfill *BackfillQuery + HistorySync *HistorySyncQuery + MediaBackfillRequest *MediaBackfillRequestQuery } func New(cfg config.DatabaseConfig, baseLog log.Logger) (*Database, error) { @@ -89,14 +90,18 @@ func New(cfg config.DatabaseConfig, baseLog log.Logger) (*Database, error) { db: db, log: db.log.Sub("DisappearingMessage"), } - db.BackfillQuery = &BackfillQuery{ + db.Backfill = &BackfillQuery{ db: db, log: db.log.Sub("Backfill"), } - db.HistorySyncQuery = &HistorySyncQuery{ + db.HistorySync = &HistorySyncQuery{ db: db, log: db.log.Sub("HistorySync"), } + db.MediaBackfillRequest = &MediaBackfillRequestQuery{ + db: db, + log: db.log.Sub("MediaBackfillRequest"), + } db.SetMaxOpenConns(cfg.MaxOpenConns) db.SetMaxIdleConns(cfg.MaxIdleConns) diff --git a/database/mediabackfillrequest.go b/database/mediabackfillrequest.go new file mode 100644 index 0000000..59155a9 --- /dev/null +++ b/database/mediabackfillrequest.go @@ -0,0 +1,124 @@ +// mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge. +// Copyright (C) 2022 Tulir Asokan, Sumner Evans +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package database + +import ( + "database/sql" + "errors" + + _ "github.com/mattn/go-sqlite3" + log "maunium.net/go/maulogger/v2" + "maunium.net/go/mautrix/id" +) + +type MediaBackfillRequestStatus int + +const ( + MediaBackfillRequestStatusNotRequested MediaBackfillRequestStatus = iota + MediaBackfillRequestStatusSuccess + MediaBackfillRequestStatusFailed +) + +type MediaBackfillRequestQuery struct { + db *Database + log log.Logger +} + +type MediaBackfillRequest struct { + db *Database + log log.Logger + + UserID id.UserID + PortalKey *PortalKey + EventID id.EventID + Status MediaBackfillRequestStatus + Error string +} + +func (mbrq *MediaBackfillRequestQuery) newMediaBackfillRequest() *MediaBackfillRequest { + return &MediaBackfillRequest{ + db: mbrq.db, + log: mbrq.log, + PortalKey: &PortalKey{}, + } +} + +func (mbrq *MediaBackfillRequestQuery) NewMediaBackfillRequestWithValues(userID id.UserID, portalKey *PortalKey, eventID id.EventID) *MediaBackfillRequest { + return &MediaBackfillRequest{ + db: mbrq.db, + log: mbrq.log, + UserID: userID, + PortalKey: portalKey, + EventID: eventID, + } +} + +const ( + getMediaBackfillRequestsForUser = ` + SELECT user_mxid, portal_jid, portal_receiver, event_id, status, error + FROM media_backfill_requests + WHERE user_mxid=$1 + ` +) + +func (mbr *MediaBackfillRequest) Upsert() { + _, err := mbr.db.Exec(` + INSERT INTO media_backfill_requests (user_mxid, portal_jid, portal_receiver, event_id, status, error) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (user_mxid, portal_jid, portal_receiver, event_id) + DO UPDATE SET + status=EXCLUDED.status, + error=EXCLUDED.error + `, + mbr.UserID, + mbr.PortalKey.JID.String(), + mbr.PortalKey.Receiver.String(), + mbr.EventID, + mbr.Status, + mbr.Error) + if err != nil { + mbr.log.Warnfln("Failed to insert media backfill request %s/%s/%s: %v", mbr.UserID, mbr.PortalKey.String(), mbr.EventID, err) + } +} + +func (mbr *MediaBackfillRequest) Scan(row Scannable) *MediaBackfillRequest { + err := row.Scan(&mbr.UserID, &mbr.PortalKey.JID, &mbr.PortalKey.Receiver, &mbr.EventID, &mbr.Status, &mbr.Error) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + mbr.log.Errorln("Database scan failed:", err) + } + return nil + } + return mbr +} + +func (mbr *MediaBackfillRequestQuery) GetMediaBackfillRequestsForUser(userID id.UserID) (requests []*MediaBackfillRequest) { + rows, err := mbr.db.Query(getMediaBackfillRequestsForUser, userID) + defer rows.Close() + if err != nil || rows == nil { + return nil + } + for rows.Next() { + requests = append(requests, mbr.newMediaBackfillRequest().Scan(rows)) + } + return +} + +func (mbr *MediaBackfillRequestQuery) DeleteAllMediaBackfillRequests(userID id.UserID) error { + _, err := mbr.db.Exec("DELETE FROM media_backfill_requests WHERE user_mxid=$1", userID) + return err +} diff --git a/database/upgrades/2022-05-09-media-backfill-requests-queue-table.go b/database/upgrades/2022-05-09-media-backfill-requests-queue-table.go new file mode 100644 index 0000000..5cfa6e4 --- /dev/null +++ b/database/upgrades/2022-05-09-media-backfill-requests-queue-table.go @@ -0,0 +1,25 @@ +package upgrades + +import ( + "database/sql" +) + +func init() { + upgrades[42] = upgrade{"Add table of media to request from the user's phone", func(tx *sql.Tx, ctx context) error { + _, err := tx.Exec(` + CREATE TABLE media_backfill_requests ( + user_mxid TEXT, + portal_jid TEXT, + portal_receiver TEXT, + event_id TEXT, + status INTEGER, + error TEXT, + + PRIMARY KEY (user_mxid, portal_jid, portal_receiver, event_id), + FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE + ) + `) + return err + }} +} diff --git a/database/upgrades/upgrades.go b/database/upgrades/upgrades.go index 4366655..43eeaf8 100644 --- a/database/upgrades/upgrades.go +++ b/database/upgrades/upgrades.go @@ -40,7 +40,7 @@ type upgrade struct { fn upgradeFunc } -const NumberOfUpgrades = 42 +const NumberOfUpgrades = 43 var upgrades [NumberOfUpgrades]upgrade diff --git a/historysync.go b/historysync.go index 27fef3f..84a3e61 100644 --- a/historysync.go +++ b/historysync.go @@ -53,7 +53,7 @@ func (user *User) handleHistorySyncsLoop() { reCheckQueue := make(chan bool, 1) // Start the backfill queue. user.BackfillQueue = &BackfillQueue{ - BackfillQuery: user.bridge.DB.BackfillQuery, + BackfillQuery: user.bridge.DB.Backfill, ImmediateBackfillRequests: make(chan *database.Backfill, 1), DeferredBackfillRequests: make(chan *database.Backfill, 1), ReCheckQueue: make(chan bool, 1), @@ -82,7 +82,7 @@ func (user *User) handleHistorySyncsLoop() { func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Backfill) { for req := range backfillRequests { user.log.Infofln("Handling backfill request %s", req) - conv := user.bridge.DB.HistorySyncQuery.GetConversation(user.MXID, req.Portal) + conv := user.bridge.DB.HistorySync.GetConversation(user.MXID, req.Portal) if conv == nil { user.log.Debugfln("Could not find history sync conversation data for %s", req.Portal.String()) continue @@ -133,7 +133,7 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor user.log.Debugfln("Limiting backfill to end at %v", end) } } - allMsgs := user.bridge.DB.HistorySyncQuery.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, req.TimeEnd, req.MaxTotalEvents) + allMsgs := user.bridge.DB.HistorySync.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, req.TimeEnd, req.MaxTotalEvents) sendDisappearedNotice := false // If expired messages are on, and a notice has not been sent to this chat @@ -211,7 +211,7 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor insertionEventIds[0]) } user.log.Debugfln("Deleting %d history sync messages after backfilling (queue ID: %d)", len(allMsgs), req.QueueID) - err := user.bridge.DB.HistorySyncQuery.DeleteMessages(user.MXID, conv.ConversationID, allMsgs) + err := user.bridge.DB.HistorySync.DeleteMessages(user.MXID, conv.ConversationID, allMsgs) if err != nil { user.log.Warnfln("Failed to delete %d history sync messages after backfilling (queue ID: %d): %v", len(allMsgs), req.QueueID, err) } @@ -255,7 +255,7 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History } portal := user.GetPortalByJID(jid) - historySyncConversation := user.bridge.DB.HistorySyncQuery.NewConversationWithValues( + historySyncConversation := user.bridge.DB.HistorySync.NewConversationWithValues( user.MXID, conv.GetId(), &portal.Key, @@ -291,7 +291,7 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History continue } - message, err := user.bridge.DB.HistorySyncQuery.NewMessageWithValues(user.MXID, conv.GetId(), wmi.GetKey().GetId(), rawMsg) + message, err := user.bridge.DB.HistorySync.NewMessageWithValues(user.MXID, conv.GetId(), wmi.GetKey().GetId(), rawMsg) if err != nil { user.log.Warnfln("Failed to save message %s in %s. Error: %+v", wmi.GetKey().Id, conv.GetId(), err) continue @@ -308,7 +308,7 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History return } - nMostRecent := user.bridge.DB.HistorySyncQuery.GetNMostRecentConversations(user.MXID, user.bridge.Config.Bridge.HistorySync.MaxInitialConversations) + nMostRecent := user.bridge.DB.HistorySync.GetNMostRecentConversations(user.MXID, user.bridge.Config.Bridge.HistorySync.MaxInitialConversations) if len(nMostRecent) > 0 { // Find the portals for all of the conversations. portals := []*Portal{} @@ -348,7 +348,7 @@ func getConversationTimestamp(conv *waProto.Conversation) uint64 { func (user *User) EnqueueImmedateBackfills(portals []*Portal) { for priority, portal := range portals { maxMessages := user.bridge.Config.Bridge.HistorySync.Immediate.MaxEvents - initialBackfill := user.bridge.DB.BackfillQuery.NewWithValues(user.MXID, database.BackfillImmediate, priority, &portal.Key, nil, nil, maxMessages, maxMessages, 0) + initialBackfill := user.bridge.DB.Backfill.NewWithValues(user.MXID, database.BackfillImmediate, priority, &portal.Key, nil, nil, maxMessages, maxMessages, 0) initialBackfill.Insert() } } @@ -362,7 +362,7 @@ func (user *User) EnqueueDeferredBackfills(portals []*Portal) { startDaysAgo := time.Now().AddDate(0, 0, -backfillStage.StartDaysAgo) startDate = &startDaysAgo } - backfillMessages := user.bridge.DB.BackfillQuery.NewWithValues( + backfillMessages := user.bridge.DB.Backfill.NewWithValues( user.MXID, database.BackfillDeferred, stageIdx*numPortals+portalIdx, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay) backfillMessages.Insert() } @@ -375,7 +375,7 @@ func (user *User) EnqueueForwardBackfills(portals []*Portal) { if lastMsg == nil { continue } - backfill := user.bridge.DB.BackfillQuery.NewWithValues( + backfill := user.bridge.DB.Backfill.NewWithValues( user.MXID, database.BackfillForward, priority, &portal.Key, &lastMsg.Timestamp, nil, -1, -1, 0) backfill.Insert() } diff --git a/portal.go b/portal.go index d219757..689c753 100644 --- a/portal.go +++ b/portal.go @@ -1224,8 +1224,8 @@ func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, i // before creating the matrix room if errors.Is(err, whatsmeow.ErrNotInGroup) { user.log.Debugfln("Skipping creating matrix room for %s because the user is not a participant", portal.Key.JID) - user.bridge.DB.BackfillQuery.DeleteAllForPortal(user.MXID, portal.Key) - user.bridge.DB.HistorySyncQuery.DeleteAllMessagesForPortal(user.MXID, portal.Key) + user.bridge.DB.Backfill.DeleteAllForPortal(user.MXID, portal.Key) + user.bridge.DB.HistorySync.DeleteAllMessagesForPortal(user.MXID, portal.Key) return err } else if err != nil { portal.log.Warnfln("Failed to get group info through %s: %v", user.JID, err) diff --git a/user.go b/user.go index db16bdb..3d6e630 100644 --- a/user.go +++ b/user.go @@ -428,9 +428,9 @@ func (user *User) DeleteSession() { } // Delete all of the backfill and history sync data. - user.bridge.DB.BackfillQuery.DeleteAll(user.MXID) - user.bridge.DB.HistorySyncQuery.DeleteAllConversations(user.MXID) - user.bridge.DB.HistorySyncQuery.DeleteAllMessages(user.MXID) + user.bridge.DB.Backfill.DeleteAll(user.MXID) + user.bridge.DB.HistorySync.DeleteAllConversations(user.MXID) + user.bridge.DB.HistorySync.DeleteAllMessages(user.MXID) } func (user *User) IsConnected() bool { From 7de7fedc02f4f05519d32b9d82c6351780985c9b Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Wed, 11 May 2022 10:50:17 -0600 Subject: [PATCH 3/6] media backfill: enable enqueue to media backfill buffer instead of immediately requesting retry receipt --- historysync.go | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/historysync.go b/historysync.go index 84a3e61..76e9046 100644 --- a/historysync.go +++ b/historysync.go @@ -519,22 +519,27 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo, portal.finishBatch(resp.EventIDs, infos) portal.NextBatchID = resp.NextBatchID portal.Update() - if portal.bridge.Config.Bridge.HistorySync.MediaRequests.AutoRequestMedia && - portal.bridge.Config.Bridge.HistorySync.MediaRequests.RequestMethod == config.MediaRequestMethodImmediate { - go portal.requestMediaRetries(source, infos) + if portal.bridge.Config.Bridge.HistorySync.MediaRequests.AutoRequestMedia { + go portal.requestMediaRetries(source, resp.EventIDs, infos) } return resp } } -func (portal *Portal) requestMediaRetries(source *User, infos []*wrappedInfo) { - for _, info := range infos { +func (portal *Portal) requestMediaRetries(source *User, eventIDs []id.EventID, infos []*wrappedInfo) { + for i, info := range infos { if info != nil && info.Error == database.MsgErrMediaNotFound && info.MediaKey != nil { - err := source.Client.SendMediaRetryReceipt(info.MessageInfo, info.MediaKey) - if err != nil { - portal.log.Warnfln("Failed to send post-backfill media retry request for %s: %v", info.ID, err) - } else { - portal.log.Debugfln("Sent post-backfill media retry request for %s", info.ID) + switch portal.bridge.Config.Bridge.HistorySync.MediaRequests.RequestMethod { + case config.MediaRequestMethodImmediate: + err := source.Client.SendMediaRetryReceipt(info.MessageInfo, info.MediaKey) + if err != nil { + portal.log.Warnfln("Failed to send post-backfill media retry request for %s: %v", info.ID, err) + } else { + portal.log.Debugfln("Sent post-backfill media retry request for %s", info.ID) + } + case config.MediaRequestMethodLocalTime: + req := portal.bridge.DB.MediaBackfillRequest.NewMediaBackfillRequestWithValues(source.MXID, &portal.Key, eventIDs[i]) + req.Upsert() } } } From 08e77fab29390419d88b1c2ad9896af50f82cf01 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Wed, 11 May 2022 15:37:30 -0600 Subject: [PATCH 4/6] media backfill: send retry requests at the configured time Only does the batch send of requests if the request method is 'local_time' --- database/mediabackfillrequest.go | 10 ++-- .../upgrades/2022-05-11-add-user-timestamp.go | 12 +++++ database/upgrades/upgrades.go | 2 +- database/user.go | 17 ++++--- historysync.go | 50 +++++++++++++++++++ matrix.go | 2 +- portal.go | 16 +++--- provisioning.go | 5 ++ user.go | 1 + 9 files changed, 94 insertions(+), 21 deletions(-) create mode 100644 database/upgrades/2022-05-11-add-user-timestamp.go diff --git a/database/mediabackfillrequest.go b/database/mediabackfillrequest.go index 59155a9..45a5245 100644 --- a/database/mediabackfillrequest.go +++ b/database/mediabackfillrequest.go @@ -29,8 +29,8 @@ type MediaBackfillRequestStatus int const ( MediaBackfillRequestStatusNotRequested MediaBackfillRequestStatus = iota - MediaBackfillRequestStatusSuccess - MediaBackfillRequestStatusFailed + MediaBackfillRequestStatusRequested + MediaBackfillRequestStatusRequestFailed ) type MediaBackfillRequestQuery struct { @@ -64,14 +64,16 @@ func (mbrq *MediaBackfillRequestQuery) NewMediaBackfillRequestWithValues(userID UserID: userID, PortalKey: portalKey, EventID: eventID, + Status: MediaBackfillRequestStatusNotRequested, } } const ( getMediaBackfillRequestsForUser = ` SELECT user_mxid, portal_jid, portal_receiver, event_id, status, error - FROM media_backfill_requests - WHERE user_mxid=$1 + FROM media_backfill_requests + WHERE user_mxid=$1 + AND status=0 ` ) diff --git a/database/upgrades/2022-05-11-add-user-timestamp.go b/database/upgrades/2022-05-11-add-user-timestamp.go new file mode 100644 index 0000000..4420cb2 --- /dev/null +++ b/database/upgrades/2022-05-11-add-user-timestamp.go @@ -0,0 +1,12 @@ +package upgrades + +import ( + "database/sql" +) + +func init() { + upgrades[43] = upgrade{"Add timezone column to user table", func(tx *sql.Tx, ctx context) error { + _, err := tx.Exec(`ALTER TABLE "user" ADD COLUMN timezone TEXT`) + return err + }} +} diff --git a/database/upgrades/upgrades.go b/database/upgrades/upgrades.go index 43eeaf8..3cb5ace 100644 --- a/database/upgrades/upgrades.go +++ b/database/upgrades/upgrades.go @@ -40,7 +40,7 @@ type upgrade struct { fn upgradeFunc } -const NumberOfUpgrades = 43 +const NumberOfUpgrades = 44 var upgrades [NumberOfUpgrades]upgrade diff --git a/database/user.go b/database/user.go index c28a95c..a01a0a2 100644 --- a/database/user.go +++ b/database/user.go @@ -44,7 +44,7 @@ func (uq *UserQuery) New() *User { } func (uq *UserQuery) GetAll() (users []*User) { - rows, err := uq.db.Query(`SELECT mxid, username, agent, device, management_room, space_room, phone_last_seen, phone_last_pinged FROM "user"`) + rows, err := uq.db.Query(`SELECT mxid, username, agent, device, management_room, space_room, phone_last_seen, phone_last_pinged, timezone FROM "user"`) if err != nil || rows == nil { return nil } @@ -56,7 +56,7 @@ func (uq *UserQuery) GetAll() (users []*User) { } func (uq *UserQuery) GetByMXID(userID id.UserID) *User { - row := uq.db.QueryRow(`SELECT mxid, username, agent, device, management_room, space_room, phone_last_seen, phone_last_pinged FROM "user" WHERE mxid=$1`, userID) + row := uq.db.QueryRow(`SELECT mxid, username, agent, device, management_room, space_room, phone_last_seen, phone_last_pinged, timezone FROM "user" WHERE mxid=$1`, userID) if row == nil { return nil } @@ -64,7 +64,7 @@ func (uq *UserQuery) GetByMXID(userID id.UserID) *User { } func (uq *UserQuery) GetByUsername(username string) *User { - row := uq.db.QueryRow(`SELECT mxid, username, agent, device, management_room, space_room, phone_last_seen, phone_last_pinged FROM "user" WHERE username=$1`, username) + row := uq.db.QueryRow(`SELECT mxid, username, agent, device, management_room, space_room, phone_last_seen, phone_last_pinged, timezone FROM "user" WHERE username=$1`, username) if row == nil { return nil } @@ -81,6 +81,7 @@ type User struct { SpaceRoom id.RoomID PhoneLastSeen time.Time PhoneLastPinged time.Time + Timezone string lastReadCache map[PortalKey]time.Time lastReadCacheLock sync.Mutex @@ -92,7 +93,7 @@ func (user *User) Scan(row Scannable) *User { var username sql.NullString var device, agent sql.NullByte var phoneLastSeen, phoneLastPinged sql.NullInt64 - err := row.Scan(&user.MXID, &username, &agent, &device, &user.ManagementRoom, &user.SpaceRoom, &phoneLastSeen, &phoneLastPinged) + err := row.Scan(&user.MXID, &username, &agent, &device, &user.ManagementRoom, &user.SpaceRoom, &phoneLastSeen, &phoneLastPinged, &user.Timezone) if err != nil { if err != sql.ErrNoRows { user.log.Errorln("Database scan failed:", err) @@ -149,16 +150,16 @@ func (user *User) phoneLastPingedPtr() *int64 { } func (user *User) Insert() { - _, err := user.db.Exec(`INSERT INTO "user" (mxid, username, agent, device, management_room, space_room, phone_last_seen, phone_last_pinged) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, - user.MXID, user.usernamePtr(), user.agentPtr(), user.devicePtr(), user.ManagementRoom, user.SpaceRoom, user.phoneLastSeenPtr(), user.phoneLastPingedPtr()) + _, err := user.db.Exec(`INSERT INTO "user" (mxid, username, agent, device, management_room, space_room, phone_last_seen, phone_last_pinged, timezone) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`, + user.MXID, user.usernamePtr(), user.agentPtr(), user.devicePtr(), user.ManagementRoom, user.SpaceRoom, user.phoneLastSeenPtr(), user.phoneLastPingedPtr(), user.Timezone) if err != nil { user.log.Warnfln("Failed to insert %s: %v", user.MXID, err) } } func (user *User) Update() { - _, err := user.db.Exec(`UPDATE "user" SET username=$1, agent=$2, device=$3, management_room=$4, space_room=$5, phone_last_seen=$6, phone_last_pinged=$7 WHERE mxid=$8`, - user.usernamePtr(), user.agentPtr(), user.devicePtr(), user.ManagementRoom, user.SpaceRoom, user.phoneLastSeenPtr(), user.phoneLastPingedPtr(), user.MXID) + _, err := user.db.Exec(`UPDATE "user" SET username=$1, agent=$2, device=$3, management_room=$4, space_room=$5, phone_last_seen=$6, phone_last_pinged=$7, timezone=$8 WHERE mxid=$9`, + user.usernamePtr(), user.agentPtr(), user.devicePtr(), user.ManagementRoom, user.SpaceRoom, user.phoneLastSeenPtr(), user.phoneLastPingedPtr(), user.Timezone, user.MXID) if err != nil { user.log.Warnfln("Failed to update %s: %v", user.MXID, err) } diff --git a/historysync.go b/historysync.go index 76e9046..9bf9266 100644 --- a/historysync.go +++ b/historysync.go @@ -72,6 +72,11 @@ func (user *User) handleHistorySyncsLoop() { go user.handleBackfillRequestsLoop(user.BackfillQueue.DeferredBackfillRequests) go user.BackfillQueue.RunLoop(user) + if user.bridge.Config.Bridge.HistorySync.MediaRequests.AutoRequestMedia && + user.bridge.Config.Bridge.HistorySync.MediaRequests.RequestMethod == config.MediaRequestMethodLocalTime { + go user.dailyMediaRequestLoop() + } + // Always save the history syncs for the user. If they want to enable // backfilling in the future, we will have it in the database. for evt := range user.historySyncs { @@ -79,6 +84,51 @@ func (user *User) handleHistorySyncsLoop() { } } +func (user *User) dailyMediaRequestLoop() { + // Calculate when to do the first set of media retry requests + now := time.Now() + userTz, err := time.LoadLocation(user.Timezone) + if err != nil { + userTz = now.Local().Location() + } + tonightMidnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, userTz) + midnightOffset := time.Duration(user.bridge.Config.Bridge.HistorySync.MediaRequests.RequestLocalTime) * time.Minute + requestStartTime := tonightMidnight.Add(midnightOffset) + + // If the request time for today has already happened, we need to start the + // request loop tomorrow instead. + if requestStartTime.Before(now) { + requestStartTime = requestStartTime.AddDate(0, 0, 1) + } + + // Wait to start the loop + user.log.Infof("Waiting until %s to do media retry requests", requestStartTime) + time.Sleep(time.Until(requestStartTime)) + + for { + mediaBackfillRequests := user.bridge.DB.MediaBackfillRequest.GetMediaBackfillRequestsForUser(user.MXID) + user.log.Infof("Sending %d media retry requests", len(mediaBackfillRequests)) + + // Send all of the media backfill requests for the user at once + for _, req := range mediaBackfillRequests { + portal := user.GetPortalByJID(req.PortalKey.JID) + _, err := portal.requestMediaRetry(user, req.EventID) + if err != nil { + user.log.Warnf("Failed to send media retry request for %s / %s", req.PortalKey.String(), req.EventID) + req.Status = database.MediaBackfillRequestStatusRequestFailed + req.Error = err.Error() + } else { + user.log.Debugfln("Sent media retry request for %s / %s", req.PortalKey.String(), req.EventID) + req.Status = database.MediaBackfillRequestStatusRequested + } + req.Upsert() + } + + // Wait for 24 hours before making requests again + time.Sleep(24 * time.Hour) + } +} + func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Backfill) { for req := range backfillRequests { user.log.Infofln("Handling backfill request %s", req) diff --git a/matrix.go b/matrix.go index e7cf345..90309a1 100644 --- a/matrix.go +++ b/matrix.go @@ -491,7 +491,7 @@ func (mx *MatrixHandler) HandleReaction(evt *event.Event) { content := evt.Content.AsReaction() if strings.Contains(content.RelatesTo.Key, "retry") || strings.HasPrefix(content.RelatesTo.Key, "\u267b") { // ♻️ - if portal.requestMediaRetry(user, content.RelatesTo.EventID) { + if retryRequested, _ := portal.requestMediaRetry(user, content.RelatesTo.EventID); retryRequested { _, _ = portal.MainIntent().RedactEvent(portal.MXID, evt.ID, mautrix.ReqRedact{ Reason: "requested media from phone", }) diff --git a/portal.go b/portal.go index 689c753..b791eae 100644 --- a/portal.go +++ b/portal.go @@ -2346,20 +2346,22 @@ func (portal *Portal) handleMediaRetry(retry *events.MediaRetry, source *User) { msg.UpdateMXID(resp.EventID, database.MsgNormal, database.MsgNoError) } -func (portal *Portal) requestMediaRetry(user *User, eventID id.EventID) bool { +func (portal *Portal) requestMediaRetry(user *User, eventID id.EventID) (bool, error) { msg := portal.bridge.DB.Message.GetByMXID(eventID) if msg == nil { - portal.log.Debugfln("%s requested a media retry for unknown event %s", user.MXID, eventID) - return false + err := errors.New(fmt.Sprintf("%s requested a media retry for unknown event %s", user.MXID, eventID)) + portal.log.Debugfln(err.Error()) + return false, err } else if msg.Error != database.MsgErrMediaNotFound { - portal.log.Debugfln("%s requested a media retry for non-errored event %s", user.MXID, eventID) - return false + err := errors.New(fmt.Sprintf("%s requested a media retry for non-errored event %s", user.MXID, eventID)) + portal.log.Debugfln(err.Error()) + return false, err } evt, err := portal.fetchMediaRetryEvent(msg) if err != nil { portal.log.Warnfln("Can't send media retry request for %s: %v", msg.JID, err) - return true + return true, nil } err = user.Client.SendMediaRetryReceipt(&types.MessageInfo{ @@ -2376,7 +2378,7 @@ func (portal *Portal) requestMediaRetry(user *User, eventID id.EventID) bool { } else { portal.log.Debugfln("Sent media retry request for %s", msg.JID) } - return true + return true, err } const thumbnailMaxSize = 72 diff --git a/provisioning.go b/provisioning.go index e39bafb..88c3a35 100644 --- a/provisioning.go +++ b/provisioning.go @@ -513,6 +513,11 @@ func (prov *ProvisioningAPI) Login(w http.ResponseWriter, r *http.Request) { userID := r.URL.Query().Get("user_id") user := prov.bridge.GetUserByMXID(id.UserID(userID)) + if userTimezone := r.URL.Query().Get("tz"); userTimezone != "" { + user.Timezone = userTimezone + user.Update() + } + c, err := upgrader.Upgrade(w, r, nil) if err != nil { prov.log.Errorln("Failed to upgrade connection to websocket:", err) diff --git a/user.go b/user.go index 3d6e630..0fca9ab 100644 --- a/user.go +++ b/user.go @@ -431,6 +431,7 @@ func (user *User) DeleteSession() { user.bridge.DB.Backfill.DeleteAll(user.MXID) user.bridge.DB.HistorySync.DeleteAllConversations(user.MXID) user.bridge.DB.HistorySync.DeleteAllMessages(user.MXID) + user.bridge.DB.MediaBackfillRequest.DeleteAllMediaBackfillRequests(user.MXID) } func (user *User) IsConnected() bool { From d686912c741512906e52d7904f414ff934b5024a Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Thu, 12 May 2022 08:56:00 -0600 Subject: [PATCH 5/6] provisioning: only update timezone after login success --- provisioning.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/provisioning.go b/provisioning.go index 88c3a35..de3f7db 100644 --- a/provisioning.go +++ b/provisioning.go @@ -513,11 +513,6 @@ func (prov *ProvisioningAPI) Login(w http.ResponseWriter, r *http.Request) { userID := r.URL.Query().Get("user_id") user := prov.bridge.GetUserByMXID(id.UserID(userID)) - if userTimezone := r.URL.Query().Get("tz"); userTimezone != "" { - user.Timezone = userTimezone - user.Update() - } - c, err := upgrader.Upgrade(w, r, nil) if err != nil { prov.log.Errorln("Failed to upgrade connection to websocket:", err) @@ -579,6 +574,11 @@ func (prov *ProvisioningAPI) Login(w http.ResponseWriter, r *http.Request) { "phone": fmt.Sprintf("+%s", jid.User), "platform": user.Client.Store.Platform, }) + + if userTimezone := r.URL.Query().Get("tz"); userTimezone != "" { + user.Timezone = userTimezone + user.Update() + } case whatsmeow.QRChannelTimeout.Event: user.log.Debugln("Login via provisioning API timed out") errCode := "login timed out" From b42aa166846f1de194fe0c9ac53a6ad158c9cdb5 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Thu, 12 May 2022 10:32:14 -0600 Subject: [PATCH 6/6] media backfill: store media key in request This way, we don't have to fetch the event from Matrix in these situations --- database/mediabackfillrequest.go | 17 ++++++++++------- ...5-09-media-backfill-requests-queue-table.go | 1 + historysync.go | 5 +++-- matrix.go | 2 +- portal.go | 18 +++++++++++------- 5 files changed, 26 insertions(+), 17 deletions(-) diff --git a/database/mediabackfillrequest.go b/database/mediabackfillrequest.go index 45a5245..d8a6cb5 100644 --- a/database/mediabackfillrequest.go +++ b/database/mediabackfillrequest.go @@ -45,6 +45,7 @@ type MediaBackfillRequest struct { UserID id.UserID PortalKey *PortalKey EventID id.EventID + MediaKey []byte Status MediaBackfillRequestStatus Error string } @@ -57,20 +58,21 @@ func (mbrq *MediaBackfillRequestQuery) newMediaBackfillRequest() *MediaBackfillR } } -func (mbrq *MediaBackfillRequestQuery) NewMediaBackfillRequestWithValues(userID id.UserID, portalKey *PortalKey, eventID id.EventID) *MediaBackfillRequest { +func (mbrq *MediaBackfillRequestQuery) NewMediaBackfillRequestWithValues(userID id.UserID, portalKey *PortalKey, eventID id.EventID, mediaKey []byte) *MediaBackfillRequest { return &MediaBackfillRequest{ db: mbrq.db, log: mbrq.log, UserID: userID, PortalKey: portalKey, EventID: eventID, + MediaKey: mediaKey, Status: MediaBackfillRequestStatusNotRequested, } } const ( getMediaBackfillRequestsForUser = ` - SELECT user_mxid, portal_jid, portal_receiver, event_id, status, error + SELECT user_mxid, portal_jid, portal_receiver, event_id, media_key, status, error FROM media_backfill_requests WHERE user_mxid=$1 AND status=0 @@ -79,17 +81,18 @@ const ( func (mbr *MediaBackfillRequest) Upsert() { _, err := mbr.db.Exec(` - INSERT INTO media_backfill_requests (user_mxid, portal_jid, portal_receiver, event_id, status, error) - VALUES ($1, $2, $3, $4, $5, $6) + INSERT INTO media_backfill_requests (user_mxid, portal_jid, portal_receiver, event_id, media_key, status, error) + VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (user_mxid, portal_jid, portal_receiver, event_id) DO UPDATE SET + media_key=EXCLUDED.media_key, status=EXCLUDED.status, - error=EXCLUDED.error - `, + error=EXCLUDED.error`, mbr.UserID, mbr.PortalKey.JID.String(), mbr.PortalKey.Receiver.String(), mbr.EventID, + mbr.MediaKey, mbr.Status, mbr.Error) if err != nil { @@ -98,7 +101,7 @@ func (mbr *MediaBackfillRequest) Upsert() { } func (mbr *MediaBackfillRequest) Scan(row Scannable) *MediaBackfillRequest { - err := row.Scan(&mbr.UserID, &mbr.PortalKey.JID, &mbr.PortalKey.Receiver, &mbr.EventID, &mbr.Status, &mbr.Error) + err := row.Scan(&mbr.UserID, &mbr.PortalKey.JID, &mbr.PortalKey.Receiver, &mbr.EventID, &mbr.MediaKey, &mbr.Status, &mbr.Error) if err != nil { if !errors.Is(err, sql.ErrNoRows) { mbr.log.Errorln("Database scan failed:", err) diff --git a/database/upgrades/2022-05-09-media-backfill-requests-queue-table.go b/database/upgrades/2022-05-09-media-backfill-requests-queue-table.go index 5cfa6e4..2470ffa 100644 --- a/database/upgrades/2022-05-09-media-backfill-requests-queue-table.go +++ b/database/upgrades/2022-05-09-media-backfill-requests-queue-table.go @@ -12,6 +12,7 @@ func init() { portal_jid TEXT, portal_receiver TEXT, event_id TEXT, + media_key BYTEA, status INTEGER, error TEXT, diff --git a/historysync.go b/historysync.go index 9bf9266..fed28d8 100644 --- a/historysync.go +++ b/historysync.go @@ -112,7 +112,7 @@ func (user *User) dailyMediaRequestLoop() { // Send all of the media backfill requests for the user at once for _, req := range mediaBackfillRequests { portal := user.GetPortalByJID(req.PortalKey.JID) - _, err := portal.requestMediaRetry(user, req.EventID) + _, err := portal.requestMediaRetry(user, req.EventID, req.MediaKey) if err != nil { user.log.Warnf("Failed to send media retry request for %s / %s", req.PortalKey.String(), req.EventID) req.Status = database.MediaBackfillRequestStatusRequestFailed @@ -121,6 +121,7 @@ func (user *User) dailyMediaRequestLoop() { user.log.Debugfln("Sent media retry request for %s / %s", req.PortalKey.String(), req.EventID) req.Status = database.MediaBackfillRequestStatusRequested } + req.MediaKey = nil req.Upsert() } @@ -588,7 +589,7 @@ func (portal *Portal) requestMediaRetries(source *User, eventIDs []id.EventID, i portal.log.Debugfln("Sent post-backfill media retry request for %s", info.ID) } case config.MediaRequestMethodLocalTime: - req := portal.bridge.DB.MediaBackfillRequest.NewMediaBackfillRequestWithValues(source.MXID, &portal.Key, eventIDs[i]) + req := portal.bridge.DB.MediaBackfillRequest.NewMediaBackfillRequestWithValues(source.MXID, &portal.Key, eventIDs[i], info.MediaKey) req.Upsert() } } diff --git a/matrix.go b/matrix.go index 90309a1..2b9f899 100644 --- a/matrix.go +++ b/matrix.go @@ -491,7 +491,7 @@ func (mx *MatrixHandler) HandleReaction(evt *event.Event) { content := evt.Content.AsReaction() if strings.Contains(content.RelatesTo.Key, "retry") || strings.HasPrefix(content.RelatesTo.Key, "\u267b") { // ♻️ - if retryRequested, _ := portal.requestMediaRetry(user, content.RelatesTo.EventID); retryRequested { + if retryRequested, _ := portal.requestMediaRetry(user, content.RelatesTo.EventID, nil); retryRequested { _, _ = portal.MainIntent().RedactEvent(portal.MXID, evt.ID, mautrix.ReqRedact{ Reason: "requested media from phone", }) diff --git a/portal.go b/portal.go index b791eae..4759c6b 100644 --- a/portal.go +++ b/portal.go @@ -2346,7 +2346,7 @@ func (portal *Portal) handleMediaRetry(retry *events.MediaRetry, source *User) { msg.UpdateMXID(resp.EventID, database.MsgNormal, database.MsgNoError) } -func (portal *Portal) requestMediaRetry(user *User, eventID id.EventID) (bool, error) { +func (portal *Portal) requestMediaRetry(user *User, eventID id.EventID, mediaKey []byte) (bool, error) { msg := portal.bridge.DB.Message.GetByMXID(eventID) if msg == nil { err := errors.New(fmt.Sprintf("%s requested a media retry for unknown event %s", user.MXID, eventID)) @@ -2358,13 +2358,17 @@ func (portal *Portal) requestMediaRetry(user *User, eventID id.EventID) (bool, e return false, err } - evt, err := portal.fetchMediaRetryEvent(msg) - if err != nil { - portal.log.Warnfln("Can't send media retry request for %s: %v", msg.JID, err) - return true, nil + // If the media key is not provided, grab it from the event in Matrix + if mediaKey == nil { + evt, err := portal.fetchMediaRetryEvent(msg) + if err != nil { + portal.log.Warnfln("Can't send media retry request for %s: %v", msg.JID, err) + return true, nil + } + mediaKey = evt.Media.Key } - err = user.Client.SendMediaRetryReceipt(&types.MessageInfo{ + err := user.Client.SendMediaRetryReceipt(&types.MessageInfo{ ID: msg.JID, MessageSource: types.MessageSource{ IsFromMe: msg.Sender.User == user.JID.User, @@ -2372,7 +2376,7 @@ func (portal *Portal) requestMediaRetry(user *User, eventID id.EventID) (bool, e Sender: msg.Sender, Chat: portal.Key.JID, }, - }, evt.Media.Key) + }, mediaKey) if err != nil { portal.log.Warnfln("Failed to send media retry request for %s: %v", msg.JID, err) } else {