diff --git a/config/bridge.go b/config/bridge.go index 7b16341..2393cef 100644 --- a/config/bridge.go +++ b/config/bridge.go @@ -57,12 +57,10 @@ type BridgeConfig struct { IdentityChangeNotices bool `yaml:"identity_change_notices"` HistorySync struct { - CreatePortals bool `yaml:"create_portals"` - Backfill bool `yaml:"backfill"` + Backfill bool `yaml:"backfill"` - DoublePuppetBackfill bool `yaml:"double_puppet_backfill"` - RequestFullSync bool `yaml:"request_full_sync"` - FullSyncConfig struct { + RequestFullSync bool `yaml:"request_full_sync"` + FullSyncConfig struct { DaysLimit uint32 `yaml:"days_limit"` SizeLimit uint32 `yaml:"size_mb_limit"` StorageQuota uint32 `yaml:"storage_quota_mb"` diff --git a/config/config.go b/config/config.go index 326f859..bb7ec25 100644 --- a/config/config.go +++ b/config/config.go @@ -45,15 +45,3 @@ func (config *Config) CanAutoDoublePuppet(userID id.UserID) bool { _, hasSecret := config.Bridge.LoginSharedSecretMap[homeserver] return hasSecret } - -func (config *Config) CanDoublePuppetBackfill(userID id.UserID) bool { - if !config.Bridge.HistorySync.DoublePuppetBackfill { - return false - } - _, homeserver, _ := userID.Parse() - // Batch sending can only use local users, so don't allow double puppets on other servers. - if homeserver != config.Homeserver.Domain && config.Homeserver.Software != bridgeconfig.SoftwareHungry { - return false - } - return true -} diff --git a/config/upgrade.go b/config/upgrade.go index a249a6f..440c07e 100644 --- a/config/upgrade.go +++ b/config/upgrade.go @@ -45,9 +45,7 @@ func DoUpgrade(helper *up.Helper) { helper.Copy(up.Int, "bridge", "portal_message_buffer") helper.Copy(up.Bool, "bridge", "call_start_notices") helper.Copy(up.Bool, "bridge", "identity_change_notices") - helper.Copy(up.Bool, "bridge", "history_sync", "create_portals") helper.Copy(up.Bool, "bridge", "history_sync", "backfill") - helper.Copy(up.Bool, "bridge", "history_sync", "double_puppet_backfill") helper.Copy(up.Bool, "bridge", "history_sync", "request_full_sync") helper.Copy(up.Int|up.Null, "bridge", "history_sync", "full_sync_config", "days_limit") helper.Copy(up.Int|up.Null, "bridge", "history_sync", "full_sync_config", "size_mb_limit") diff --git a/example-config.yaml b/example-config.yaml index 0960098..b1b4c2d 100644 --- a/example-config.yaml +++ b/example-config.yaml @@ -95,7 +95,7 @@ whatsapp: os_name: Mautrix-WhatsApp bridge # Browser name that determines the logo shown in the mobile app. # Must be "unknown" for a generic icon or a valid browser name if you want a specific icon. - # List of valid browser names: https://github.com/tulir/whatsmeow/blob/8b34d886d543b72e5f4699cf5b2797f68d598f78/binary/proto/def.proto#L38-L51 + # List of valid browser names: https://github.com/tulir/whatsmeow/blob/efc632c008604016ddde63bfcfca8de4e5304da9/binary/proto/def.proto#L43-L64 browser_name: unknown # Bridge config @@ -129,14 +129,12 @@ bridge: history_sync: # Enable backfilling history sync payloads from WhatsApp? backfill: true - # Should the bridge create portals for chats in the history sync payload? - # This has no effect unless backfill is enabled. - create_portals: true - # Use double puppets for backfilling? - # In order to use this, the double puppets must be in the appservice's user ID namespace - # (because the bridge can't use the double puppet access token with batch sending). - # This only affects double puppets on the local server, double puppets on other servers will never be used. - double_puppet_backfill: false + # The maximum number of initial conversations that should be synced. + # Other conversations will be backfilled on demand when receiving a message or when initiating a direct chat. + max_initial_conversations: -1 + # Maximum number of messages to backfill in each conversation. + # Set to -1 to disable limit. + message_count: 50 # Should the bridge request a full sync from the phone when logging in? # This bumps the size of history syncs from 3 months to 1 year. request_full_sync: false @@ -166,11 +164,6 @@ bridge: # If request_method is "local_time", what time should the requests # be sent (in minutes after midnight)? request_local_time: 120 - # The maximum number of initial conversations that should be synced. - # Other conversations will be backfilled on demand when receiving a message or when initiating a direct chat. - max_initial_conversations: -1 - # Number of messages to backfill in each conversation - message_count: 50 # If this value is greater than 0, then if the conversation's last # message was more than this number of hours ago, then the conversation # will automatically be marked it as read. diff --git a/historysync.go b/historysync.go index 888f178..f7d9e2a 100644 --- a/historysync.go +++ b/historysync.go @@ -29,7 +29,6 @@ import ( "maunium.net/go/mautrix" "maunium.net/go/mautrix/appservice" - "maunium.net/go/mautrix/bridge/bridgeconfig" "maunium.net/go/mautrix/event" "maunium.net/go/mautrix/id" "maunium.net/go/mautrix/util/dbutil" @@ -242,8 +241,8 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor portal.backfillLock.Lock() defer portal.backfillLock.Unlock() - if !user.shouldCreatePortalForHistorySync(conv, portal) { - return + if len(portal.MXID) > 0 && !user.bridge.AS.StateStore.IsInRoom(portal.MXID, user.MXID) { + portal.ensureUserInvited(user) } backfillState := user.bridge.DB.Backfill.GetBackfillState(user.MXID, &portal.Key) @@ -253,19 +252,17 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor backfillState.SetProcessingBatch(true) defer backfillState.SetProcessingBatch(false) - var forwardPrevID id.EventID var timeEnd *time.Time - var isLatestEvents, shouldMarkAsRead, shouldAtomicallyMarkAsRead bool + var forward, shouldMarkAsRead bool portal.latestEventBackfillLock.Lock() if req.BackfillType == database.BackfillForward { // TODO this overrides the TimeStart set when enqueuing the backfill // maybe the enqueue should instead include the prev event ID lastMessage := portal.bridge.DB.Message.GetLastInChat(portal.Key) - forwardPrevID = lastMessage.MXID start := lastMessage.Timestamp.Add(1 * time.Second) req.TimeStart = &start // Sending events at the end of the room (= latest events) - isLatestEvents = true + forward = true } else { firstMessage := portal.bridge.DB.Message.GetFirstInChat(portal.Key) if firstMessage != nil { @@ -274,10 +271,10 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor user.log.Debugfln("Limiting backfill to end at %v", end) } else { // Portal is empty -> events are latest - isLatestEvents = true + forward = true } } - if !isLatestEvents { + if !forward { // We'll use normal batch sending, so no need to keep blocking new message processing portal.latestEventBackfillLock.Unlock() } else { @@ -288,7 +285,6 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor isUnread := conv.MarkedAsUnread || conv.UnreadCount > 0 isTooOld := user.bridge.Config.Bridge.HistorySync.UnreadHoursThreshold > 0 && conv.LastMessageTimestamp.Before(time.Now().Add(time.Duration(-user.bridge.Config.Bridge.HistorySync.UnreadHoursThreshold)*time.Hour)) shouldMarkAsRead = !isUnread || isTooOld - shouldAtomicallyMarkAsRead = shouldMarkAsRead && user.bridge.Config.Homeserver.Software == bridgeconfig.SoftwareHungry } allMsgs := user.bridge.DB.HistorySync.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, timeEnd, req.MaxTotalEvents) @@ -347,7 +343,6 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor user.log.Infofln("Backfilling %d messages in %s, %d messages at a time (queue ID: %d)", len(allMsgs), portal.Key.JID, req.MaxBatchEvents, req.QueueID) toBackfill := allMsgs[0:] - var insertionEventIds []id.EventID for len(toBackfill) > 0 { var msgs []*waProto.WebMessageInfo if len(toBackfill) <= req.MaxBatchEvents || req.MaxBatchEvents < 0 { @@ -361,19 +356,10 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor if len(msgs) > 0 { time.Sleep(time.Duration(req.BatchDelay) * time.Second) user.log.Debugfln("Backfilling %d messages in %s (queue ID: %d)", len(msgs), portal.Key.JID, req.QueueID) - resp := portal.backfill(user, msgs, req.BackfillType == database.BackfillForward, isLatestEvents, shouldAtomicallyMarkAsRead, forwardPrevID) - if resp != nil && (resp.BaseInsertionEventID != "" || !isLatestEvents) { - insertionEventIds = append(insertionEventIds, resp.BaseInsertionEventID) - } + portal.backfill(user, msgs, forward, shouldMarkAsRead) } } user.log.Debugfln("Finished backfilling %d messages in %s (queue ID: %d)", len(allMsgs), portal.Key.JID, req.QueueID) - if len(insertionEventIds) > 0 { - portal.sendPostBackfillDummy( - time.Unix(int64(allMsgs[0].GetMessageTimestamp()), 0), - insertionEventIds[0]) - } - user.log.Debugfln("Deleting %d history sync messages after backfilling (queue ID: %d)", len(allMsgs), req.QueueID) err := user.bridge.DB.HistorySync.DeleteMessages(user.MXID, conv.ConversationID, allMsgs) if err != nil { user.log.Warnfln("Failed to delete %d history sync messages after backfilling (queue ID: %d): %v", len(allMsgs), req.QueueID, err) @@ -399,30 +385,6 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor backfillState.Upsert() portal.updateBackfillStatus(backfillState) } - - if isLatestEvents && !shouldAtomicallyMarkAsRead { - if shouldMarkAsRead { - user.markSelfReadFull(portal) - } else if conv.MarkedAsUnread && user.bridge.Config.Bridge.SyncManualMarkedUnread { - user.markUnread(portal, true) - } - } -} - -func (user *User) shouldCreatePortalForHistorySync(conv *database.HistorySyncConversation, portal *Portal) bool { - if len(portal.MXID) > 0 { - if !user.bridge.AS.StateStore.IsInRoom(portal.MXID, user.MXID) { - portal.ensureUserInvited(user) - } - // Portal exists, let backfill continue - return true - } else if !user.bridge.Config.Bridge.HistorySync.CreatePortals { - user.log.Debugfln("Not creating portal for %s: creating rooms from history sync is disabled", portal.Key.JID) - return false - } else { - // Portal doesn't exist, but should be created - return true - } } func (user *User) storeHistorySync(evt *waProto.HistorySync) { @@ -627,69 +589,20 @@ func (portal *Portal) deterministicEventID(sender types.JID, messageID types.Mes var ( PortalCreationDummyEvent = event.Type{Type: "fi.mau.dummy.portal_created", Class: event.MessageEventType} - PreBackfillDummyEvent = event.Type{Type: "fi.mau.dummy.pre_backfill", Class: event.MessageEventType} - - HistorySyncMarker = event.Type{Type: "org.matrix.msc2716.marker", Class: event.MessageEventType} BackfillStatusEvent = event.Type{Type: "com.beeper.backfill_status", Class: event.StateEventType} ) -func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo, isForward, isLatest, atomicMarkAsRead bool, prevEventID id.EventID) *mautrix.RespBatchSend { - var req mautrix.ReqBatchSend +func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo, isForward, atomicMarkAsRead bool) *mautrix.RespBeeperBatchSend { + var req mautrix.ReqBeeperBatchSend var infos []*wrappedInfo - if !isForward { - if portal.FirstEventID != "" || portal.NextBatchID != "" { - req.PrevEventID = portal.FirstEventID - req.BatchID = portal.NextBatchID - } else { - portal.log.Warnfln("Can't backfill %d messages through %s to chat: first event ID not known", len(messages), source.MXID) - return nil - } - } else { - req.PrevEventID = prevEventID - } - req.BeeperNewMessages = isLatest && req.BatchID == "" + req.Forward = isForward if atomicMarkAsRead { - req.BeeperMarkReadBy = source.MXID + req.MarkReadBy = source.MXID } - beforeFirstMessageTimestampMillis := (int64(messages[len(messages)-1].GetMessageTimestamp()) * 1000) - 1 - req.StateEventsAtStart = make([]*event.Event, 0) - - addedMembers := make(map[id.UserID]struct{}) - addMember := func(puppet *Puppet) { - if portal.bridge.Config.Homeserver.Software == bridgeconfig.SoftwareHungry { - // Hungryserv doesn't need state_events_at_start, it can figure out memberships automatically - return - } else if _, alreadyAdded := addedMembers[puppet.MXID]; alreadyAdded { - return - } - mxid := puppet.MXID.String() - content := event.MemberEventContent{ - Membership: event.MembershipJoin, - Displayname: puppet.Displayname, - AvatarURL: puppet.AvatarURL.CUString(), - } - inviteContent := content - inviteContent.Membership = event.MembershipInvite - req.StateEventsAtStart = append(req.StateEventsAtStart, &event.Event{ - Type: event.StateMember, - Sender: portal.MainIntent().UserID, - StateKey: &mxid, - Timestamp: beforeFirstMessageTimestampMillis, - Content: event.Content{Parsed: &inviteContent}, - }, &event.Event{ - Type: event.StateMember, - Sender: puppet.MXID, - StateKey: &mxid, - Timestamp: beforeFirstMessageTimestampMillis, - Content: event.Content{Parsed: &content}, - }) - addedMembers[puppet.MXID] = struct{}{} - } - - portal.log.Infofln("Processing history sync with %d messages (forward: %t, latest: %t, prev: %s, batch: %s)", len(messages), isForward, isLatest, req.PrevEventID, req.BatchID) + portal.log.Infofln("Processing history sync with %d messages (forward: %t)", len(messages), isForward) // The messages are ordered newest to oldest, so iterate them in reverse order. for i := len(messages) - 1; i >= 0; i-- { webMsg := messages[i] @@ -720,19 +633,12 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo, if puppet == nil { continue } - intent := puppet.IntentFor(portal) - if intent.IsCustomPuppet && !portal.bridge.Config.CanDoublePuppetBackfill(puppet.CustomMXID) { - intent = puppet.DefaultIntent() - } - converted := portal.convertMessage(intent, source, &msgEvt.Info, msgEvt.Message, true) + converted := portal.convertMessage(puppet.IntentFor(portal), source, &msgEvt.Info, msgEvt.Message, true) if converted == nil { portal.log.Debugfln("Skipping unsupported message %s in backfill", msgEvt.Info.ID) continue } - if !intent.IsCustomPuppet && !portal.bridge.StateStore.IsInRoom(portal.MXID, puppet.MXID) { - addMember(puppet) - } if converted.ReplyTo != nil { portal.SetReply(converted.Content, converted.ReplyTo, true) } @@ -747,15 +653,7 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo, return nil } - if len(req.BatchID) == 0 || isForward { - portal.log.Debugln("Sending a dummy event to avoid forward extremity errors with backfill") - _, err := portal.MainIntent().SendMessageEvent(portal.MXID, PreBackfillDummyEvent, struct{}{}) - if err != nil { - portal.log.Warnln("Error sending pre-backfill dummy event:", err) - } - } - - resp, err := portal.MainIntent().BatchSend(portal.MXID, &req) + resp, err := portal.MainIntent().BeeperBatchSend(portal.MXID, &req) if err != nil { portal.log.Errorln("Error batch sending messages:", err) return nil @@ -766,12 +664,7 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo, return nil } - // Do the following block in the transaction - { - portal.finishBatch(txn, resp.EventIDs, infos) - portal.NextBatchID = resp.NextBatchID - portal.Update(txn) - } + portal.finishBatch(txn, resp.EventIDs, infos) err = txn.Commit() if err != nil { @@ -846,19 +739,16 @@ func (portal *Portal) appendBatchEvents(source *User, converted *ConvertedMessag *infoArray = append(*infoArray, nil) } } - // Sending reactions in the same batch requires deterministic event IDs, so only do it on hungryserv - if portal.bridge.Config.Homeserver.Software == bridgeconfig.SoftwareHungry { - for _, reaction := range raw.GetReactions() { - reactionEvent, reactionInfo := portal.wrapBatchReaction(source, reaction, mainEvt.ID, info.Timestamp) - if reactionEvent != nil { - *eventsArray = append(*eventsArray, reactionEvent) - *infoArray = append(*infoArray, &wrappedInfo{ - MessageInfo: reactionInfo, - SenderMXID: reactionEvent.Sender, - ReactionTarget: info.ID, - Type: database.MsgReaction, - }) - } + for _, reaction := range raw.GetReactions() { + reactionEvent, reactionInfo := portal.wrapBatchReaction(source, reaction, mainEvt.ID, info.Timestamp) + if reactionEvent != nil { + *eventsArray = append(*eventsArray, reactionEvent) + *infoArray = append(*infoArray, &wrappedInfo{ + MessageInfo: reactionInfo, + SenderMXID: reactionEvent.Sender, + ReactionTarget: info.ID, + Type: database.MsgReaction, + }) } } return nil @@ -923,13 +813,8 @@ func (portal *Portal) wrapBatchEvent(info *types.MessageInfo, intent *appservice return nil, err } intent.AddDoublePuppetValue(&wrappedContent) - var eventID id.EventID - if portal.bridge.Config.Homeserver.Software == bridgeconfig.SoftwareHungry { - eventID = portal.deterministicEventID(info.Sender, info.ID, partName) - } - return &event.Event{ - ID: eventID, + ID: portal.deterministicEventID(info.Sender, info.ID, partName), Sender: intent.UserID, Type: newEventType, Timestamp: info.Timestamp.UnixMilli(), @@ -956,33 +841,13 @@ func (portal *Portal) finishBatch(txn dbutil.Transaction, eventIDs []id.EventID, portal.log.Infofln("Successfully sent %d events", len(eventIDs)) } -func (portal *Portal) sendPostBackfillDummy(lastTimestamp time.Time, insertionEventId id.EventID) { - resp, err := portal.MainIntent().SendMessageEvent(portal.MXID, HistorySyncMarker, map[string]interface{}{ - "org.matrix.msc2716.marker.insertion": insertionEventId, - //"m.marker.insertion": insertionEventId, - }) - if err != nil { - portal.log.Errorln("Error sending post-backfill dummy event:", err) - return - } - msg := portal.bridge.DB.Message.New() - msg.Chat = portal.Key - msg.MXID = resp.EventID - msg.SenderMXID = portal.MainIntent().UserID - msg.JID = types.MessageID(resp.EventID) - msg.Timestamp = lastTimestamp.Add(1 * time.Second) - msg.Sent = true - msg.Type = database.MsgFake - msg.Insert(nil) -} - func (portal *Portal) updateBackfillStatus(backfillState *database.BackfillState) { backfillStatus := "backfilling" if backfillState.BackfillComplete { backfillStatus = "complete" } - _, err := portal.MainIntent().SendStateEvent(portal.MXID, BackfillStatusEvent, "", map[string]interface{}{ + _, err := portal.bridge.Bot.SendStateEvent(portal.MXID, BackfillStatusEvent, "", map[string]interface{}{ "status": backfillStatus, "first_timestamp": backfillState.FirstExpectedTimestamp * 1000, })