2021-11-03 20:34:06 +01:00
// mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
// Copyright (C) 2021 Tulir Asokan
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package main
import (
2022-05-13 01:56:40 +02:00
"database/sql"
2022-03-14 22:53:50 +01:00
"fmt"
2021-11-03 20:34:06 +01:00
"time"
waProto "go.mau.fi/whatsmeow/binary/proto"
"go.mau.fi/whatsmeow/types"
2022-01-03 15:11:39 +01:00
2021-11-03 20:34:06 +01:00
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/appservice"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id"
2022-02-10 18:18:49 +01:00
2022-05-10 20:18:01 +02:00
"maunium.net/go/mautrix-whatsapp/config"
2022-02-10 18:18:49 +01:00
"maunium.net/go/mautrix-whatsapp/database"
2021-11-03 20:34:06 +01:00
)
// region User history sync handling
2022-02-10 18:18:49 +01:00
type wrappedInfo struct {
* types . MessageInfo
2022-03-05 20:22:31 +01:00
Type database . MessageType
2022-02-10 18:18:49 +01:00
Error database . MessageErrorType
2022-03-14 22:48:41 +01:00
2022-05-02 14:00:57 +02:00
MediaKey [ ] byte
2022-03-14 22:48:41 +01:00
ExpirationStart uint64
ExpiresIn uint32
2022-02-10 18:18:49 +01:00
}
2022-03-24 21:21:23 +01:00
func ( user * User ) handleHistorySyncsLoop ( ) {
2022-04-06 17:00:48 +02:00
if ! user . bridge . Config . Bridge . HistorySync . Backfill {
return
}
2021-11-03 20:34:06 +01:00
2022-04-06 17:00:48 +02:00
// Start the backfill queue.
user . BackfillQueue = & BackfillQueue {
2022-05-13 21:18:52 +02:00
BackfillQuery : user . bridge . DB . Backfill ,
reCheckChannels : [ ] chan bool { } ,
log : user . log . Sub ( "BackfillQueue" ) ,
2022-04-06 17:00:48 +02:00
}
2021-11-03 20:34:06 +01:00
2022-05-24 17:51:32 +02:00
forwardAndImmediate := [ ] database . BackfillType { database . BackfillImmediate , database . BackfillForward }
2022-04-06 17:00:48 +02:00
// Immediate backfills can be done in parallel
for i := 0 ; i < user . bridge . Config . Bridge . HistorySync . Immediate . WorkerCount ; i ++ {
2022-05-24 17:51:32 +02:00
go user . HandleBackfillRequestsLoop ( forwardAndImmediate , [ ] database . BackfillType { } )
2022-03-24 21:21:23 +01:00
}
2021-11-03 20:34:06 +01:00
2022-04-06 17:00:48 +02:00
// Deferred backfills should be handled synchronously so as not to
// overload the homeserver. Users can configure their backfill stages
// to be more or less aggressive with backfilling at this stage.
2022-05-24 17:51:32 +02:00
go user . HandleBackfillRequestsLoop ( [ ] database . BackfillType { database . BackfillDeferred } , forwardAndImmediate )
2022-04-06 17:00:48 +02:00
2022-05-11 23:37:30 +02:00
if user . bridge . Config . Bridge . HistorySync . MediaRequests . AutoRequestMedia &&
user . bridge . Config . Bridge . HistorySync . MediaRequests . RequestMethod == config . MediaRequestMethodLocalTime {
go user . dailyMediaRequestLoop ( )
}
2022-03-24 21:21:23 +01:00
// Always save the history syncs for the user. If they want to enable
// backfilling in the future, we will have it in the database.
for evt := range user . historySyncs {
2022-05-13 21:18:52 +02:00
user . handleHistorySync ( user . BackfillQueue , evt . Data )
2022-03-24 21:21:23 +01:00
}
2021-11-03 20:34:06 +01:00
}
2022-05-11 23:37:30 +02:00
func ( user * User ) dailyMediaRequestLoop ( ) {
// Calculate when to do the first set of media retry requests
now := time . Now ( )
userTz , err := time . LoadLocation ( user . Timezone )
if err != nil {
userTz = now . Local ( ) . Location ( )
}
tonightMidnight := time . Date ( now . Year ( ) , now . Month ( ) , now . Day ( ) , 0 , 0 , 0 , 0 , userTz )
midnightOffset := time . Duration ( user . bridge . Config . Bridge . HistorySync . MediaRequests . RequestLocalTime ) * time . Minute
requestStartTime := tonightMidnight . Add ( midnightOffset )
// If the request time for today has already happened, we need to start the
// request loop tomorrow instead.
if requestStartTime . Before ( now ) {
requestStartTime = requestStartTime . AddDate ( 0 , 0 , 1 )
}
// Wait to start the loop
user . log . Infof ( "Waiting until %s to do media retry requests" , requestStartTime )
time . Sleep ( time . Until ( requestStartTime ) )
for {
mediaBackfillRequests := user . bridge . DB . MediaBackfillRequest . GetMediaBackfillRequestsForUser ( user . MXID )
user . log . Infof ( "Sending %d media retry requests" , len ( mediaBackfillRequests ) )
// Send all of the media backfill requests for the user at once
for _ , req := range mediaBackfillRequests {
portal := user . GetPortalByJID ( req . PortalKey . JID )
2022-05-12 18:32:14 +02:00
_ , err := portal . requestMediaRetry ( user , req . EventID , req . MediaKey )
2022-05-11 23:37:30 +02:00
if err != nil {
user . log . Warnf ( "Failed to send media retry request for %s / %s" , req . PortalKey . String ( ) , req . EventID )
req . Status = database . MediaBackfillRequestStatusRequestFailed
req . Error = err . Error ( )
} else {
user . log . Debugfln ( "Sent media retry request for %s / %s" , req . PortalKey . String ( ) , req . EventID )
req . Status = database . MediaBackfillRequestStatusRequested
}
2022-05-12 18:32:14 +02:00
req . MediaKey = nil
2022-05-11 23:37:30 +02:00
req . Upsert ( )
}
// Wait for 24 hours before making requests again
time . Sleep ( 24 * time . Hour )
}
}
2022-04-29 10:07:05 +02:00
func ( user * User ) backfillInChunks ( req * database . Backfill , conv * database . HistorySyncConversation , portal * Portal ) {
2022-04-05 00:07:25 +02:00
portal . backfillLock . Lock ( )
defer portal . backfillLock . Unlock ( )
2022-05-24 23:13:31 +02:00
if ! user . shouldCreatePortalForHistorySync ( conv , portal ) {
return
}
2022-05-17 20:50:01 +02:00
backfillState := user . bridge . DB . Backfill . GetBackfillState ( user . MXID , & portal . Key )
if backfillState == nil {
backfillState = user . bridge . DB . Backfill . NewBackfillState ( user . MXID , & portal . Key )
}
backfillState . SetProcessingBatch ( true )
defer backfillState . SetProcessingBatch ( false )
2022-04-05 00:07:25 +02:00
2022-04-29 10:07:05 +02:00
var forwardPrevID id . EventID
2022-05-12 19:54:38 +02:00
var timeEnd * time . Time
2022-05-19 10:36:42 +02:00
var isLatestEvents bool
2022-05-24 12:39:29 +02:00
portal . latestEventBackfillLock . Lock ( )
2022-04-29 10:07:05 +02:00
if req . BackfillType == database . BackfillForward {
// TODO this overrides the TimeStart set when enqueuing the backfill
// maybe the enqueue should instead include the prev event ID
lastMessage := portal . bridge . DB . Message . GetLastInChat ( portal . Key )
forwardPrevID = lastMessage . MXID
start := lastMessage . Timestamp . Add ( 1 * time . Second )
req . TimeStart = & start
2022-05-19 10:36:42 +02:00
// Sending events at the end of the room (= latest events)
isLatestEvents = true
2022-04-29 10:07:05 +02:00
} else {
firstMessage := portal . bridge . DB . Message . GetFirstInChat ( portal . Key )
2022-05-12 19:54:38 +02:00
if firstMessage != nil {
2022-04-29 10:07:05 +02:00
end := firstMessage . Timestamp . Add ( - 1 * time . Second )
2022-05-12 19:54:38 +02:00
timeEnd = & end
2022-04-29 10:07:05 +02:00
user . log . Debugfln ( "Limiting backfill to end at %v" , end )
2022-05-19 10:36:42 +02:00
} else {
// Portal is empty -> events are latest
isLatestEvents = true
2022-04-29 10:07:05 +02:00
}
}
2022-05-24 12:39:29 +02:00
if ! isLatestEvents {
// We'll use normal batch sending, so no need to keep blocking new message processing
portal . latestEventBackfillLock . Unlock ( )
} else {
// This might involve sending events at the end of the room as non-historical events,
// make sure we don't process messages until this is done.
defer portal . latestEventBackfillLock . Unlock ( )
}
2022-05-12 19:54:38 +02:00
allMsgs := user . bridge . DB . HistorySync . GetMessagesBetween ( user . MXID , conv . ConversationID , req . TimeStart , timeEnd , req . MaxTotalEvents )
2022-04-30 00:59:27 +02:00
sendDisappearedNotice := false
// If expired messages are on, and a notice has not been sent to this chat
// about it having disappeared messages at the conversation timestamp, send
// a notice indicating so.
if len ( allMsgs ) == 0 && conv . EphemeralExpiration != nil && * conv . EphemeralExpiration > 0 {
lastMessage := portal . bridge . DB . Message . GetLastInChat ( portal . Key )
2022-05-12 17:08:09 +02:00
if lastMessage == nil || conv . LastMessageTimestamp . After ( lastMessage . Timestamp ) {
2022-04-30 00:59:27 +02:00
sendDisappearedNotice = true
}
}
if ! sendDisappearedNotice && len ( allMsgs ) == 0 {
2022-04-16 22:58:09 +02:00
user . log . Debugfln ( "Not backfilling %s: no bridgeable messages found" , portal . Key . JID )
return
}
2022-04-05 00:07:25 +02:00
if len ( portal . MXID ) == 0 {
user . log . Debugln ( "Creating portal for" , portal . Key . JID , "as part of history sync handling" )
err := portal . CreateMatrixRoom ( user , nil , true , false )
if err != nil {
2022-04-16 21:58:47 +02:00
user . log . Errorfln ( "Failed to create room for %s during backfill: %v" , portal . Key . JID , err )
2022-04-05 00:07:25 +02:00
return
2021-11-03 20:34:06 +01:00
}
2022-04-05 00:07:25 +02:00
}
2021-11-03 20:34:06 +01:00
2022-05-24 23:13:31 +02:00
// Update the backfill status here after the room has been created.
portal . updateBackfillStatus ( backfillState )
2022-04-30 00:59:27 +02:00
if sendDisappearedNotice {
user . log . Debugfln ( "Sending notice to %s that there are disappeared messages ending at %v" , portal . Key . JID , conv . LastMessageTimestamp )
resp , err := portal . sendMessage ( portal . MainIntent ( ) , event . EventMessage , & event . MessageEventContent {
MsgType : event . MsgNotice ,
Body : portal . formatDisappearingMessageNotice ( ) ,
} , nil , conv . LastMessageTimestamp . UnixMilli ( ) )
if err != nil {
portal . log . Errorln ( "Error sending disappearing messages notice event" )
return
}
msg := portal . bridge . DB . Message . New ( )
msg . Chat = portal . Key
msg . MXID = resp . EventID
msg . JID = types . MessageID ( resp . EventID )
msg . Timestamp = conv . LastMessageTimestamp
msg . Sent = true
2022-05-12 17:08:09 +02:00
msg . Type = database . MsgFake
2022-05-13 01:56:40 +02:00
msg . Insert ( nil )
2022-04-30 00:59:27 +02:00
return
}
2022-04-28 20:23:34 +02:00
user . log . Infofln ( "Backfilling %d messages in %s, %d messages at a time (queue ID: %d)" , len ( allMsgs ) , portal . Key . JID , req . MaxBatchEvents , req . QueueID )
2022-04-16 22:58:09 +02:00
toBackfill := allMsgs [ 0 : ]
var insertionEventIds [ ] id . EventID
2022-04-18 19:12:24 +02:00
for len ( toBackfill ) > 0 {
2022-04-16 22:58:09 +02:00
var msgs [ ] * waProto . WebMessageInfo
2022-04-29 10:07:05 +02:00
if len ( toBackfill ) <= req . MaxBatchEvents || req . MaxBatchEvents < 0 {
2022-04-16 22:58:09 +02:00
msgs = toBackfill
2022-04-18 19:12:24 +02:00
toBackfill = nil
2022-04-16 22:58:09 +02:00
} else {
2022-04-16 23:46:15 +02:00
msgs = toBackfill [ : req . MaxBatchEvents ]
toBackfill = toBackfill [ req . MaxBatchEvents : ]
2022-03-24 21:21:23 +01:00
}
2022-04-16 22:58:09 +02:00
if len ( msgs ) > 0 {
time . Sleep ( time . Duration ( req . BatchDelay ) * time . Second )
user . log . Debugfln ( "Backfilling %d messages in %s (queue ID: %d)" , len ( msgs ) , portal . Key . JID , req . QueueID )
2022-05-19 10:36:42 +02:00
resp := portal . backfill ( user , msgs , req . BackfillType == database . BackfillForward , isLatestEvents , forwardPrevID )
if resp != nil && ( resp . BaseInsertionEventID != "" || ! isLatestEvents ) {
2022-04-29 10:07:05 +02:00
insertionEventIds = append ( insertionEventIds , resp . BaseInsertionEventID )
}
2022-04-05 00:07:25 +02:00
}
}
2022-04-16 22:58:09 +02:00
user . log . Debugfln ( "Finished backfilling %d messages in %s (queue ID: %d)" , len ( allMsgs ) , portal . Key . JID , req . QueueID )
if len ( insertionEventIds ) > 0 {
portal . sendPostBackfillDummy (
2022-04-16 23:46:15 +02:00
time . Unix ( int64 ( allMsgs [ 0 ] . GetMessageTimestamp ( ) ) , 0 ) ,
2022-04-16 22:58:09 +02:00
insertionEventIds [ 0 ] )
}
2022-04-28 20:23:34 +02:00
user . log . Debugfln ( "Deleting %d history sync messages after backfilling (queue ID: %d)" , len ( allMsgs ) , req . QueueID )
2022-05-10 22:28:30 +02:00
err := user . bridge . DB . HistorySync . DeleteMessages ( user . MXID , conv . ConversationID , allMsgs )
2022-04-16 22:58:09 +02:00
if err != nil {
2022-04-28 20:23:34 +02:00
user . log . Warnfln ( "Failed to delete %d history sync messages after backfilling (queue ID: %d): %v" , len ( allMsgs ) , req . QueueID , err )
2022-04-16 22:58:09 +02:00
}
2022-05-17 20:50:01 +02:00
if req . TimeStart == nil {
// If the time start is nil, then there's no more history to backfill.
backfillState . BackfillComplete = true
if conv . EndOfHistoryTransferType == waProto . Conversation_COMPLETE_BUT_MORE_MESSAGES_REMAIN_ON_PRIMARY {
// Since there are more messages on the phone, but we can't
2022-05-24 12:39:29 +02:00
// backfill any more of them, indicate that the last timestamp
2022-05-17 20:50:01 +02:00
// that we expect to be backfilled is the oldest one that was just
// backfilled.
backfillState . FirstExpectedTimestamp = allMsgs [ len ( allMsgs ) - 1 ] . GetMessageTimestamp ( )
} else if conv . EndOfHistoryTransferType == waProto . Conversation_COMPLETE_AND_NO_MORE_MESSAGE_REMAIN_ON_PRIMARY {
// Since there are no more messages left on the phone, we've
// backfilled everything. Indicate so by setting the expected
// timestamp to 0 which means that the backfill goes to the
// beginning of time.
backfillState . FirstExpectedTimestamp = 0
}
backfillState . Upsert ( )
2022-05-20 07:20:04 +02:00
portal . updateBackfillStatus ( backfillState )
2022-05-17 20:50:01 +02:00
}
2022-04-05 00:07:25 +02:00
if ! conv . MarkedAsUnread && conv . UnreadCount == 0 {
user . markSelfReadFull ( portal )
2022-05-18 18:40:29 +02:00
} else if user . bridge . Config . Bridge . SyncManualMarkedUnread {
user . markUnread ( portal , true )
2021-11-03 23:09:23 +01:00
}
}
2022-03-24 21:21:23 +01:00
func ( user * User ) shouldCreatePortalForHistorySync ( conv * database . HistorySyncConversation , portal * Portal ) bool {
2021-11-03 20:34:06 +01:00
if len ( portal . MXID ) > 0 {
2022-05-13 10:19:52 +02:00
if ! user . bridge . AS . StateStore . IsInRoom ( portal . MXID , user . MXID ) {
portal . ensureUserInvited ( user )
}
2021-11-03 20:34:06 +01:00
// Portal exists, let backfill continue
return true
} else if ! user . bridge . Config . Bridge . HistorySync . CreatePortals {
user . log . Debugfln ( "Not creating portal for %s: creating rooms from history sync is disabled" , portal . Key . JID )
2022-05-24 13:02:06 +02:00
return false
2021-11-03 20:34:06 +01:00
} else {
// Portal doesn't exist, but should be created
return true
}
}
2022-05-13 21:18:52 +02:00
func ( user * User ) handleHistorySync ( backfillQueue * BackfillQueue , evt * waProto . HistorySync ) {
2022-03-24 21:21:23 +01:00
if evt == nil || evt . SyncType == nil || evt . GetSyncType ( ) == waProto . HistorySync_INITIAL_STATUS_V3 || evt . GetSyncType ( ) == waProto . HistorySync_PUSH_NAME {
return
}
2022-04-12 16:32:00 +02:00
description := fmt . Sprintf ( "type %s, %d conversations, chunk order %d, progress: %d" , evt . GetSyncType ( ) , len ( evt . GetConversations ( ) ) , evt . GetChunkOrder ( ) , evt . GetProgress ( ) )
2022-03-24 21:21:23 +01:00
user . log . Infoln ( "Storing history sync with" , description )
for _ , conv := range evt . GetConversations ( ) {
jid , err := types . ParseJID ( conv . GetId ( ) )
if err != nil {
user . log . Warnfln ( "Failed to parse chat JID '%s' in history sync: %v" , conv . GetId ( ) , err )
2021-11-03 20:34:06 +01:00
continue
2022-04-16 21:58:47 +02:00
} else if jid . Server == types . BroadcastServer {
user . log . Debugfln ( "Skipping broadcast list %s in history sync" , jid )
continue
2022-03-24 21:21:23 +01:00
}
portal := user . GetPortalByJID ( jid )
2022-05-10 22:28:30 +02:00
historySyncConversation := user . bridge . DB . HistorySync . NewConversationWithValues (
2022-03-24 21:21:23 +01:00
user . MXID ,
conv . GetId ( ) ,
& portal . Key ,
getConversationTimestamp ( conv ) ,
conv . GetMuteEndTime ( ) ,
conv . GetArchived ( ) ,
conv . GetPinned ( ) ,
conv . GetDisappearingMode ( ) . GetInitiator ( ) ,
conv . GetEndOfHistoryTransferType ( ) ,
conv . EphemeralExpiration ,
conv . GetMarkedAsUnread ( ) ,
conv . GetUnreadCount ( ) )
historySyncConversation . Upsert ( )
2022-04-22 22:07:59 +02:00
for _ , rawMsg := range conv . GetMessages ( ) {
2022-03-24 21:21:23 +01:00
// Don't store messages that will just be skipped.
2022-05-13 15:08:32 +02:00
msgEvt , err := user . Client . ParseWebMessage ( portal . Key . JID , rawMsg . GetMessage ( ) )
if err != nil {
2022-05-19 11:08:30 +02:00
user . log . Warnln ( "Dropping historical message due to info parse error:" , err )
2022-05-13 15:08:32 +02:00
continue
2022-04-22 22:07:59 +02:00
}
2022-05-13 15:08:32 +02:00
msgType := getMessageType ( msgEvt . Message )
2022-03-24 21:21:23 +01:00
if msgType == "unknown" || msgType == "ignore" || msgType == "unknown_protocol" {
continue
}
// Don't store unsupported messages.
2022-05-13 15:08:32 +02:00
if ! containsSupportedMessage ( msgEvt . Message ) {
2022-03-24 21:21:23 +01:00
continue
}
2022-05-13 15:08:32 +02:00
message , err := user . bridge . DB . HistorySync . NewMessageWithValues ( user . MXID , conv . GetId ( ) , msgEvt . Info . ID , rawMsg )
2022-03-24 21:21:23 +01:00
if err != nil {
2022-05-13 15:08:32 +02:00
user . log . Warnfln ( "Failed to save message %s in %s. Error: %+v" , msgEvt . Info . ID , conv . GetId ( ) , err )
2022-03-24 21:21:23 +01:00
continue
}
message . Insert ( )
2021-11-03 20:34:06 +01:00
}
}
2022-03-24 21:21:23 +01:00
// If this was the initial bootstrap, enqueue immediate backfills for the
// most recent portals. If it's the last history sync event, start
// backfilling the rest of the history of the portals.
2022-04-12 16:58:47 +02:00
if user . bridge . Config . Bridge . HistorySync . Backfill {
2022-04-13 18:15:42 +02:00
if evt . GetSyncType ( ) != waProto . HistorySync_INITIAL_BOOTSTRAP && evt . GetProgress ( ) < 98 {
2022-04-12 16:58:47 +02:00
return
}
2022-05-10 22:28:30 +02:00
nMostRecent := user . bridge . DB . HistorySync . GetNMostRecentConversations ( user . MXID , user . bridge . Config . Bridge . HistorySync . MaxInitialConversations )
2022-04-24 05:50:41 +02:00
if len ( nMostRecent ) > 0 {
// Find the portals for all of the conversations.
portals := [ ] * Portal { }
for _ , conv := range nMostRecent {
jid , err := types . ParseJID ( conv . ConversationID )
if err != nil {
user . log . Warnfln ( "Failed to parse chat JID '%s' in history sync: %v" , conv . ConversationID , err )
continue
}
portals = append ( portals , user . GetPortalByJID ( jid ) )
2022-03-24 21:21:23 +01:00
}
2022-03-31 01:59:55 +02:00
switch evt . GetSyncType ( ) {
case waProto . HistorySync_INITIAL_BOOTSTRAP :
// Enqueue immediate backfills for the most recent messages first.
2022-04-24 05:50:41 +02:00
user . EnqueueImmedateBackfills ( portals )
2022-04-05 20:28:58 +02:00
case waProto . HistorySync_FULL , waProto . HistorySync_RECENT :
2022-04-29 10:07:05 +02:00
user . EnqueueForwardBackfills ( portals )
2022-04-12 16:58:47 +02:00
// Enqueue deferred backfills as configured.
2022-04-24 05:50:41 +02:00
user . EnqueueDeferredBackfills ( portals )
2022-03-31 01:59:55 +02:00
}
2022-03-24 21:21:23 +01:00
2022-04-24 05:50:41 +02:00
// Tell the queue to check for new backfill requests.
2022-05-13 21:18:52 +02:00
backfillQueue . ReCheck ( )
2022-04-24 05:50:41 +02:00
}
2021-11-03 20:34:06 +01:00
}
}
2022-03-24 21:21:23 +01:00
func getConversationTimestamp ( conv * waProto . Conversation ) uint64 {
convTs := conv . GetConversationTimestamp ( )
if convTs == 0 && len ( conv . GetMessages ( ) ) > 0 {
convTs = conv . Messages [ 0 ] . GetMessage ( ) . GetMessageTimestamp ( )
2021-11-03 20:34:06 +01:00
}
2022-03-24 21:21:23 +01:00
return convTs
}
2022-04-24 05:50:41 +02:00
func ( user * User ) EnqueueImmedateBackfills ( portals [ ] * Portal ) {
for priority , portal := range portals {
maxMessages := user . bridge . Config . Bridge . HistorySync . Immediate . MaxEvents
2022-05-12 19:54:38 +02:00
initialBackfill := user . bridge . DB . Backfill . NewWithValues ( user . MXID , database . BackfillImmediate , priority , & portal . Key , nil , maxMessages , maxMessages , 0 )
2022-04-24 05:50:41 +02:00
initialBackfill . Insert ( )
}
2022-04-19 04:50:21 +02:00
}
2022-04-24 05:50:41 +02:00
func ( user * User ) EnqueueDeferredBackfills ( portals [ ] * Portal ) {
numPortals := len ( portals )
for stageIdx , backfillStage := range user . bridge . Config . Bridge . HistorySync . Deferred {
for portalIdx , portal := range portals {
var startDate * time . Time = nil
if backfillStage . StartDaysAgo > 0 {
startDaysAgo := time . Now ( ) . AddDate ( 0 , 0 , - backfillStage . StartDaysAgo )
startDate = & startDaysAgo
}
2022-05-10 22:28:30 +02:00
backfillMessages := user . bridge . DB . Backfill . NewWithValues (
2022-05-12 19:54:38 +02:00
user . MXID , database . BackfillDeferred , stageIdx * numPortals + portalIdx , & portal . Key , startDate , backfillStage . MaxBatchEvents , - 1 , backfillStage . BatchDelay )
2022-04-24 05:50:41 +02:00
backfillMessages . Insert ( )
2022-04-19 04:50:21 +02:00
}
}
2022-03-24 21:21:23 +01:00
}
2022-04-29 10:07:05 +02:00
func ( user * User ) EnqueueForwardBackfills ( portals [ ] * Portal ) {
for priority , portal := range portals {
lastMsg := user . bridge . DB . Message . GetLastInChat ( portal . Key )
if lastMsg == nil {
continue
}
2022-05-10 22:28:30 +02:00
backfill := user . bridge . DB . Backfill . NewWithValues (
2022-05-12 19:54:38 +02:00
user . MXID , database . BackfillForward , priority , & portal . Key , & lastMsg . Timestamp , - 1 , - 1 , 0 )
2022-04-29 10:07:05 +02:00
backfill . Insert ( )
}
}
2021-11-03 20:34:06 +01:00
// endregion
// region Portal backfilling
var (
PortalCreationDummyEvent = event . Type { Type : "fi.mau.dummy.portal_created" , Class : event . MessageEventType }
PreBackfillDummyEvent = event . Type { Type : "fi.mau.dummy.pre_backfill" , Class : event . MessageEventType }
2022-03-24 21:21:23 +01:00
2022-05-24 13:02:06 +02:00
HistorySyncMarker = event . Type { Type : "org.matrix.msc2716.marker" , Class : event . MessageEventType }
2022-05-20 07:20:04 +02:00
BackfillStatusEvent = event . Type { Type : "com.beeper.backfill_status" , Class : event . StateEventType }
2021-11-03 20:34:06 +01:00
)
2022-05-19 10:36:42 +02:00
func ( portal * Portal ) backfill ( source * User , messages [ ] * waProto . WebMessageInfo , isForward , isLatest bool , prevEventID id . EventID ) * mautrix . RespBatchSend {
2022-04-29 10:07:05 +02:00
var req mautrix . ReqBatchSend
var infos [ ] * wrappedInfo
2021-11-03 20:34:06 +01:00
2022-04-29 10:07:05 +02:00
if ! isForward {
if portal . FirstEventID != "" || portal . NextBatchID != "" {
req . PrevEventID = portal . FirstEventID
req . BatchID = portal . NextBatchID
} else {
portal . log . Warnfln ( "Can't backfill %d messages through %s to chat: first event ID not known" , len ( messages ) , source . MXID )
return nil
}
} else {
req . PrevEventID = prevEventID
}
2022-05-19 10:36:42 +02:00
req . BeeperNewMessages = isLatest && req . BatchID == ""
2021-11-03 20:34:06 +01:00
2022-04-29 10:07:05 +02:00
beforeFirstMessageTimestampMillis := ( int64 ( messages [ len ( messages ) - 1 ] . GetMessageTimestamp ( ) ) * 1000 ) - 1
req . StateEventsAtStart = make ( [ ] * event . Event , 0 )
2021-11-03 20:34:06 +01:00
2022-04-29 10:07:05 +02:00
addedMembers := make ( map [ id . UserID ] struct { } )
2021-11-03 20:34:06 +01:00
addMember := func ( puppet * Puppet ) {
if _ , alreadyAdded := addedMembers [ puppet . MXID ] ; alreadyAdded {
return
}
mxid := puppet . MXID . String ( )
content := event . MemberEventContent {
Membership : event . MembershipJoin ,
Displayname : puppet . Displayname ,
AvatarURL : puppet . AvatarURL . CUString ( ) ,
}
inviteContent := content
inviteContent . Membership = event . MembershipInvite
2022-04-29 10:07:05 +02:00
req . StateEventsAtStart = append ( req . StateEventsAtStart , & event . Event {
2021-11-03 20:34:06 +01:00
Type : event . StateMember ,
Sender : portal . MainIntent ( ) . UserID ,
StateKey : & mxid ,
2022-04-29 10:07:05 +02:00
Timestamp : beforeFirstMessageTimestampMillis ,
2021-11-03 20:34:06 +01:00
Content : event . Content { Parsed : & inviteContent } ,
} , & event . Event {
Type : event . StateMember ,
Sender : puppet . MXID ,
StateKey : & mxid ,
2022-04-29 10:07:05 +02:00
Timestamp : beforeFirstMessageTimestampMillis ,
2021-11-03 20:34:06 +01:00
Content : event . Content { Parsed : & content } ,
} )
2022-04-29 10:07:05 +02:00
addedMembers [ puppet . MXID ] = struct { } { }
2021-11-03 20:34:06 +01:00
}
2022-05-24 13:02:06 +02:00
portal . log . Infofln ( "Processing history sync with %d messages (forward: %t, latest: %t, prev: %s, batch: %s)" , len ( messages ) , isForward , isLatest , req . PrevEventID , req . BatchID )
2021-11-03 20:34:06 +01:00
// The messages are ordered newest to oldest, so iterate them in reverse order.
for i := len ( messages ) - 1 ; i >= 0 ; i -- {
2022-04-05 21:13:20 +02:00
webMsg := messages [ i ]
2022-05-13 15:08:32 +02:00
msgEvt , err := source . Client . ParseWebMessage ( portal . Key . JID , webMsg )
if err != nil {
continue
2022-04-22 22:07:59 +02:00
}
2022-05-13 15:08:32 +02:00
msgType := getMessageType ( msgEvt . Message )
2021-11-03 20:34:06 +01:00
if msgType == "unknown" || msgType == "ignore" || msgType == "unknown_protocol" {
if msgType != "ignore" {
2022-05-13 15:08:32 +02:00
portal . log . Debugfln ( "Skipping message %s with unknown type in backfill" , msgEvt . Info . ID )
2021-11-03 20:34:06 +01:00
}
continue
}
2021-11-09 22:44:35 +01:00
if webMsg . GetPushName ( ) != "" && webMsg . GetPushName ( ) != "-" {
2022-05-13 15:08:32 +02:00
existingContact , _ := source . Client . Store . Contacts . GetContact ( msgEvt . Info . Sender )
2021-11-09 22:59:28 +01:00
if ! existingContact . Found || existingContact . PushName == "" {
2022-05-13 15:08:32 +02:00
changed , _ , err := source . Client . Store . Contacts . PutPushName ( msgEvt . Info . Sender , webMsg . GetPushName ( ) )
2021-11-09 22:59:28 +01:00
if err != nil {
2022-05-13 15:08:32 +02:00
source . log . Errorfln ( "Failed to save push name of %s from historical message in device store: %v" , msgEvt . Info . Sender , err )
2021-11-09 22:59:28 +01:00
} else if changed {
2022-05-13 15:08:32 +02:00
source . log . Debugfln ( "Got push name %s for %s from historical message" , webMsg . GetPushName ( ) , msgEvt . Info . Sender )
2021-11-09 22:59:28 +01:00
}
2021-11-09 22:44:35 +01:00
}
}
2022-05-13 15:08:32 +02:00
puppet := portal . getMessagePuppet ( source , & msgEvt . Info )
2022-05-19 11:08:30 +02:00
if puppet == nil {
continue
}
2021-11-06 13:20:56 +01:00
intent := puppet . IntentFor ( portal )
2021-11-06 14:33:27 +01:00
if intent . IsCustomPuppet && ! portal . bridge . Config . CanDoublePuppetBackfill ( puppet . CustomMXID ) {
2021-11-03 20:34:06 +01:00
intent = puppet . DefaultIntent ( )
}
2022-04-22 22:07:59 +02:00
2022-05-13 15:08:32 +02:00
converted := portal . convertMessage ( intent , source , & msgEvt . Info , msgEvt . Message , true )
2021-11-03 20:34:06 +01:00
if converted == nil {
2022-05-13 15:08:32 +02:00
portal . log . Debugfln ( "Skipping unsupported message %s in backfill" , msgEvt . Info . ID )
2021-11-03 20:34:06 +01:00
continue
}
2021-11-06 20:28:42 +01:00
if ! intent . IsCustomPuppet && ! portal . bridge . StateStore . IsInRoom ( portal . MXID , puppet . MXID ) {
2021-11-03 20:34:06 +01:00
addMember ( puppet )
}
2021-11-05 10:47:51 +01:00
// TODO this won't work for history
if len ( converted . ReplyTo ) > 0 {
portal . SetReply ( converted . Content , converted . ReplyTo )
}
2022-05-13 15:08:32 +02:00
err = portal . appendBatchEvents ( converted , & msgEvt . Info , webMsg . GetEphemeralStartTimestamp ( ) , & req . Events , & infos )
2021-11-03 20:34:06 +01:00
if err != nil {
2022-05-13 15:08:32 +02:00
portal . log . Errorfln ( "Error handling message %s during backfill: %v" , msgEvt . Info . ID , err )
2021-11-03 20:34:06 +01:00
}
}
2022-04-29 10:07:05 +02:00
portal . log . Infofln ( "Made %d Matrix events from messages in batch" , len ( req . Events ) )
if len ( req . Events ) == 0 {
return nil
}
2021-11-03 20:34:06 +01:00
2022-04-29 10:07:05 +02:00
if len ( req . BatchID ) == 0 || isForward {
2021-11-03 20:34:06 +01:00
portal . log . Debugln ( "Sending a dummy event to avoid forward extremity errors with backfill" )
_ , err := portal . MainIntent ( ) . SendMessageEvent ( portal . MXID , PreBackfillDummyEvent , struct { } { } )
if err != nil {
portal . log . Warnln ( "Error sending pre-backfill dummy event:" , err )
}
}
2022-04-29 10:07:05 +02:00
resp , err := portal . MainIntent ( ) . BatchSend ( portal . MXID , & req )
if err != nil {
portal . log . Errorln ( "Error batch sending messages:" , err )
return nil
} else {
2022-05-13 01:56:40 +02:00
txn , err := portal . bridge . DB . Begin ( )
if err != nil {
portal . log . Errorln ( "Failed to start transaction to save batch messages:" , err )
return nil
}
// Do the following block in the transaction
{
portal . finishBatch ( txn , resp . EventIDs , infos )
portal . NextBatchID = resp . NextBatchID
portal . Update ( txn )
}
err = txn . Commit ( )
if err != nil {
portal . log . Errorln ( "Failed to commit transaction to save batch messages:" , err )
return nil
}
2022-05-11 18:50:17 +02:00
if portal . bridge . Config . Bridge . HistorySync . MediaRequests . AutoRequestMedia {
go portal . requestMediaRetries ( source , resp . EventIDs , infos )
2022-05-02 14:00:57 +02:00
}
2022-04-29 10:07:05 +02:00
return resp
2021-11-03 20:34:06 +01:00
}
}
2022-05-11 18:50:17 +02:00
func ( portal * Portal ) requestMediaRetries ( source * User , eventIDs [ ] id . EventID , infos [ ] * wrappedInfo ) {
for i , info := range infos {
2022-05-02 14:47:35 +02:00
if info != nil && info . Error == database . MsgErrMediaNotFound && info . MediaKey != nil {
2022-05-11 18:50:17 +02:00
switch portal . bridge . Config . Bridge . HistorySync . MediaRequests . RequestMethod {
case config . MediaRequestMethodImmediate :
err := source . Client . SendMediaRetryReceipt ( info . MessageInfo , info . MediaKey )
if err != nil {
portal . log . Warnfln ( "Failed to send post-backfill media retry request for %s: %v" , info . ID , err )
} else {
portal . log . Debugfln ( "Sent post-backfill media retry request for %s" , info . ID )
}
case config . MediaRequestMethodLocalTime :
2022-05-12 18:32:14 +02:00
req := portal . bridge . DB . MediaBackfillRequest . NewMediaBackfillRequestWithValues ( source . MXID , & portal . Key , eventIDs [ i ] , info . MediaKey )
2022-05-11 18:50:17 +02:00
req . Upsert ( )
2022-05-02 14:00:57 +02:00
}
}
}
}
2022-03-14 22:48:41 +01:00
func ( portal * Portal ) appendBatchEvents ( converted * ConvertedMessage , info * types . MessageInfo , expirationStart uint64 , eventsArray * [ ] * event . Event , infoArray * [ ] * wrappedInfo ) error {
2021-11-03 20:34:06 +01:00
mainEvt , err := portal . wrapBatchEvent ( info , converted . Intent , converted . Type , converted . Content , converted . Extra )
if err != nil {
return err
}
2022-06-17 10:34:24 +02:00
if portal . bridge . Config . Bridge . CaptionInMessage {
converted . MergeCaption ( )
}
2021-11-03 20:34:06 +01:00
if converted . Caption != nil {
captionEvt , err := portal . wrapBatchEvent ( info , converted . Intent , converted . Type , converted . Caption , nil )
if err != nil {
return err
}
* eventsArray = append ( * eventsArray , mainEvt , captionEvt )
2022-05-02 14:00:57 +02:00
* infoArray = append ( * infoArray , & wrappedInfo { info , database . MsgNormal , converted . Error , converted . MediaKey , expirationStart , converted . ExpiresIn } , nil )
2021-11-03 20:34:06 +01:00
} else {
* eventsArray = append ( * eventsArray , mainEvt )
2022-05-02 14:00:57 +02:00
* infoArray = append ( * infoArray , & wrappedInfo { info , database . MsgNormal , converted . Error , converted . MediaKey , expirationStart , converted . ExpiresIn } )
2021-11-03 20:34:06 +01:00
}
2022-01-03 15:11:39 +01:00
if converted . MultiEvent != nil {
for _ , subEvtContent := range converted . MultiEvent {
subEvt , err := portal . wrapBatchEvent ( info , converted . Intent , converted . Type , subEvtContent , nil )
if err != nil {
return err
}
* eventsArray = append ( * eventsArray , subEvt )
* infoArray = append ( * infoArray , nil )
}
}
2021-11-03 20:34:06 +01:00
return nil
}
func ( portal * Portal ) wrapBatchEvent ( info * types . MessageInfo , intent * appservice . IntentAPI , eventType event . Type , content * event . MessageEventContent , extraContent map [ string ] interface { } ) ( * event . Event , error ) {
wrappedContent := event . Content {
Parsed : content ,
Raw : extraContent ,
}
2022-06-30 19:56:25 +02:00
newEventType , err := portal . encrypt ( intent , & wrappedContent , eventType )
2021-11-03 20:34:06 +01:00
if err != nil {
return nil , err
}
2022-07-05 15:41:21 +02:00
if newEventType != eventType {
intent . AddDoublePuppetValue ( & wrappedContent )
}
2022-04-05 20:29:43 +02:00
2021-11-03 20:34:06 +01:00
return & event . Event {
Sender : intent . UserID ,
Type : newEventType ,
Timestamp : info . Timestamp . UnixMilli ( ) ,
Content : wrappedContent ,
} , nil
}
2022-05-13 01:56:40 +02:00
func ( portal * Portal ) finishBatch ( txn * sql . Tx , eventIDs [ ] id . EventID , infos [ ] * wrappedInfo ) {
for i , info := range infos {
if info == nil {
continue
2021-11-03 20:34:06 +01:00
}
2022-05-13 01:56:40 +02:00
eventID := eventIDs [ i ]
portal . markHandled ( txn , nil , info . MessageInfo , eventID , true , false , info . Type , info . Error )
2022-03-14 22:48:41 +01:00
2022-05-13 01:56:40 +02:00
if info . ExpiresIn > 0 {
if info . ExpirationStart > 0 {
remainingSeconds := time . Unix ( int64 ( info . ExpirationStart ) , 0 ) . Add ( time . Duration ( info . ExpiresIn ) * time . Second ) . Sub ( time . Now ( ) ) . Seconds ( )
portal . log . Debugfln ( "Disappearing history sync message: expires in %d, started at %d, remaining %d" , info . ExpiresIn , info . ExpirationStart , int ( remainingSeconds ) )
portal . MarkDisappearing ( eventID , uint32 ( remainingSeconds ) , true )
} else {
portal . log . Debugfln ( "Disappearing history sync message: expires in %d (not started)" , info . ExpiresIn )
portal . MarkDisappearing ( eventID , info . ExpiresIn , false )
}
2022-03-14 22:48:41 +01:00
}
}
2022-05-13 01:56:40 +02:00
portal . log . Infofln ( "Successfully sent %d events" , len ( eventIDs ) )
2022-03-14 22:48:41 +01:00
}
2022-03-24 21:21:23 +01:00
func ( portal * Portal ) sendPostBackfillDummy ( lastTimestamp time . Time , insertionEventId id . EventID ) {
2022-04-18 19:12:24 +02:00
resp , err := portal . MainIntent ( ) . SendMessageEvent ( portal . MXID , HistorySyncMarker , map [ string ] interface { } {
"org.matrix.msc2716.marker.insertion" : insertionEventId ,
//"m.marker.insertion": insertionEventId,
} )
if err != nil {
portal . log . Errorln ( "Error sending post-backfill dummy event:" , err )
return
2021-11-03 20:34:06 +01:00
}
2022-04-18 19:12:24 +02:00
msg := portal . bridge . DB . Message . New ( )
msg . Chat = portal . Key
msg . MXID = resp . EventID
msg . JID = types . MessageID ( resp . EventID )
msg . Timestamp = lastTimestamp . Add ( 1 * time . Second )
msg . Sent = true
2022-05-14 13:17:03 +02:00
msg . Type = database . MsgFake
2022-05-13 01:56:40 +02:00
msg . Insert ( nil )
2021-11-03 20:34:06 +01:00
}
2022-05-20 07:20:04 +02:00
func ( portal * Portal ) updateBackfillStatus ( backfillState * database . BackfillState ) {
backfillStatus := "backfilling"
if backfillState . BackfillComplete {
backfillStatus = "complete"
}
_ , err := portal . MainIntent ( ) . SendStateEvent ( portal . MXID , BackfillStatusEvent , "" , map [ string ] interface { } {
"status" : backfillStatus ,
2022-05-25 16:39:08 +02:00
"first_timestamp" : backfillState . FirstExpectedTimestamp * 1000 ,
2022-05-20 07:20:04 +02:00
} )
if err != nil {
2022-05-24 23:13:31 +02:00
portal . log . Errorln ( "Error sending backfill status event:" , err )
2022-05-20 07:20:04 +02:00
}
}
2021-11-03 20:34:06 +01:00
// endregion