Add option to sync group members in parallel

This commit is contained in:
Tulir Asokan 2022-09-29 14:39:58 +03:00
parent ea8c58b4f5
commit 862cd8a0d7
4 changed files with 33 additions and 9 deletions

View file

@ -94,6 +94,7 @@ type BridgeConfig struct {
LoginSharedSecretMap map[string]string `yaml:"login_shared_secret_map"` LoginSharedSecretMap map[string]string `yaml:"login_shared_secret_map"`
PrivateChatPortalMeta bool `yaml:"private_chat_portal_meta"` PrivateChatPortalMeta bool `yaml:"private_chat_portal_meta"`
ParallelMemberSync bool `yaml:"parallel_member_sync"`
BridgeNotices bool `yaml:"bridge_notices"` BridgeNotices bool `yaml:"bridge_notices"`
ResendBridgeInfo bool `yaml:"resend_bridge_info"` ResendBridgeInfo bool `yaml:"resend_bridge_info"`
MuteBridging bool `yaml:"mute_bridging"` MuteBridging bool `yaml:"mute_bridging"`

View file

@ -73,6 +73,7 @@ func DoUpgrade(helper *up.Helper) {
helper.Copy(up.Map, "bridge", "login_shared_secret_map") helper.Copy(up.Map, "bridge", "login_shared_secret_map")
} }
helper.Copy(up.Bool, "bridge", "private_chat_portal_meta") helper.Copy(up.Bool, "bridge", "private_chat_portal_meta")
helper.Copy(up.Bool, "bridge", "parallel_member_sync")
helper.Copy(up.Bool, "bridge", "bridge_notices") helper.Copy(up.Bool, "bridge", "bridge_notices")
helper.Copy(up.Bool, "bridge", "resend_bridge_info") helper.Copy(up.Bool, "bridge", "resend_bridge_info")
helper.Copy(up.Bool, "bridge", "mute_bridging") helper.Copy(up.Bool, "bridge", "mute_bridging")

View file

@ -232,6 +232,8 @@ bridge:
example.com: foobar example.com: foobar
# Should the bridge explicitly set the avatar and room name for private chat portal rooms? # Should the bridge explicitly set the avatar and room name for private chat portal rooms?
private_chat_portal_meta: false private_chat_portal_meta: false
# Should group members be synced in parallel? This makes member sync faster
parallel_member_sync: false
# Should Matrix m.notice-type messages be bridged? # Should Matrix m.notice-type messages be bridged?
bridge_notices: true bridge_notices: true
# Set this to true to tell the bridge to re-send m.bridge events to all rooms on the next run. # Set this to true to tell the bridge to re-send m.bridge events to all rooms on the next run.

View file

@ -32,6 +32,7 @@ import (
"math" "math"
"mime" "mime"
"net/http" "net/http"
"runtime/debug"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -898,6 +899,25 @@ func (portal *Portal) kickExtraUsers(participantMap map[types.JID]bool) {
// portal.kickExtraUsers(participantMap) // portal.kickExtraUsers(participantMap)
//} //}
func (portal *Portal) syncParticipant(source *User, participant types.GroupParticipant, puppet *Puppet, user *User, wg *sync.WaitGroup) {
defer func() {
wg.Done()
if err := recover(); err != nil {
portal.log.Errorfln("Syncing participant %s panicked: %v\n%s", participant.JID, err, debug.Stack())
}
}()
puppet.SyncContact(source, true, false, "group participant")
if user != nil && user != source {
portal.ensureUserInvited(user)
}
if user == nil || !puppet.IntentFor(portal).IsCustomPuppet {
err := puppet.IntentFor(portal).EnsureJoined(portal.MXID)
if err != nil {
portal.log.Warnfln("Failed to make puppet of %s join %s: %v", participant.JID, portal.MXID, err)
}
}
}
func (portal *Portal) SyncParticipants(source *User, metadata *types.GroupInfo) { func (portal *Portal) SyncParticipants(source *User, metadata *types.GroupInfo) {
changed := false changed := false
levels, err := portal.MainIntent().PowerLevels(portal.MXID) levels, err := portal.MainIntent().PowerLevels(portal.MXID)
@ -906,20 +926,18 @@ func (portal *Portal) SyncParticipants(source *User, metadata *types.GroupInfo)
changed = true changed = true
} }
changed = portal.applyPowerLevelFixes(levels) || changed changed = portal.applyPowerLevelFixes(levels) || changed
var wg sync.WaitGroup
wg.Add(len(metadata.Participants))
participantMap := make(map[types.JID]bool) participantMap := make(map[types.JID]bool)
for _, participant := range metadata.Participants { for _, participant := range metadata.Participants {
portal.log.Debugfln("Syncing participant %s (admin: %t)", participant.JID, participant.IsAdmin)
participantMap[participant.JID] = true participantMap[participant.JID] = true
puppet := portal.bridge.GetPuppetByJID(participant.JID) puppet := portal.bridge.GetPuppetByJID(participant.JID)
puppet.SyncContact(source, true, false, "group participant")
user := portal.bridge.GetUserByJID(participant.JID) user := portal.bridge.GetUserByJID(participant.JID)
if user != nil && user != source { if portal.bridge.Config.Bridge.ParallelMemberSync {
portal.ensureUserInvited(user) go portal.syncParticipant(source, participant, puppet, user, &wg)
} } else {
if user == nil || !puppet.IntentFor(portal).IsCustomPuppet { portal.syncParticipant(source, participant, puppet, user, &wg)
err = puppet.IntentFor(portal).EnsureJoined(portal.MXID)
if err != nil {
portal.log.Warnfln("Failed to make puppet of %s join %s: %v", participant.JID, portal.MXID, err)
}
} }
expectedLevel := 0 expectedLevel := 0
@ -940,6 +958,8 @@ func (portal *Portal) SyncParticipants(source *User, metadata *types.GroupInfo)
} }
} }
portal.kickExtraUsers(participantMap) portal.kickExtraUsers(participantMap)
wg.Wait()
portal.log.Debugln("Participant sync completed")
} }
func reuploadAvatar(intent *appservice.IntentAPI, url string) (id.ContentURI, error) { func reuploadAvatar(intent *appservice.IntentAPI, url string) (id.ContentURI, error) {