Compare commits

...

12 Commits

9 changed files with 82 additions and 48 deletions

View File

@ -12,6 +12,7 @@ repos:
rev: v1.0.0-rc.1
hooks:
- id: go-imports-repo
args: ["-w"]
- id: go-vet-repo-mod
- repo: https://github.com/beeper/pre-commit-go

View File

@ -78,6 +78,7 @@ type BridgeConfig struct {
AutoRequestMedia bool `yaml:"auto_request_media"`
RequestMethod MediaRequestMethod `yaml:"request_method"`
RequestLocalTime int `yaml:"request_local_time"`
MaxAsyncHandle int64 `yaml:"max_async_handle"`
} `yaml:"media_requests"`
Deferred []DeferredConfig `yaml:"deferred"`
@ -94,27 +95,28 @@ type BridgeConfig struct {
DoublePuppetConfig bridgeconfig.DoublePuppetConfig `yaml:",inline"`
PrivateChatPortalMeta string `yaml:"private_chat_portal_meta"`
ParallelMemberSync bool `yaml:"parallel_member_sync"`
BridgeNotices bool `yaml:"bridge_notices"`
ResendBridgeInfo bool `yaml:"resend_bridge_info"`
MuteBridging bool `yaml:"mute_bridging"`
ArchiveTag string `yaml:"archive_tag"`
PinnedTag string `yaml:"pinned_tag"`
TagOnlyOnCreate bool `yaml:"tag_only_on_create"`
MarkReadOnlyOnCreate bool `yaml:"mark_read_only_on_create"`
EnableStatusBroadcast bool `yaml:"enable_status_broadcast"`
MuteStatusBroadcast bool `yaml:"mute_status_broadcast"`
StatusBroadcastTag string `yaml:"status_broadcast_tag"`
WhatsappThumbnail bool `yaml:"whatsapp_thumbnail"`
AllowUserInvite bool `yaml:"allow_user_invite"`
FederateRooms bool `yaml:"federate_rooms"`
URLPreviews bool `yaml:"url_previews"`
CaptionInMessage bool `yaml:"caption_in_message"`
BeeperGalleries bool `yaml:"beeper_galleries"`
ExtEvPolls bool `yaml:"extev_polls"`
CrossRoomReplies bool `yaml:"cross_room_replies"`
DisableReplyFallbacks bool `yaml:"disable_reply_fallbacks"`
PrivateChatPortalMeta string `yaml:"private_chat_portal_meta"`
ParallelMemberSync bool `yaml:"parallel_member_sync"`
BridgeNotices bool `yaml:"bridge_notices"`
ResendBridgeInfo bool `yaml:"resend_bridge_info"`
MuteBridging bool `yaml:"mute_bridging"`
ArchiveTag string `yaml:"archive_tag"`
PinnedTag string `yaml:"pinned_tag"`
TagOnlyOnCreate bool `yaml:"tag_only_on_create"`
MarkReadOnlyOnCreate bool `yaml:"mark_read_only_on_create"`
EnableStatusBroadcast bool `yaml:"enable_status_broadcast"`
MuteStatusBroadcast bool `yaml:"mute_status_broadcast"`
StatusBroadcastTag string `yaml:"status_broadcast_tag"`
WhatsappThumbnail bool `yaml:"whatsapp_thumbnail"`
AllowUserInvite bool `yaml:"allow_user_invite"`
FederateRooms bool `yaml:"federate_rooms"`
URLPreviews bool `yaml:"url_previews"`
CaptionInMessage bool `yaml:"caption_in_message"`
BeeperGalleries bool `yaml:"beeper_galleries"`
ExtEvPolls bool `yaml:"extev_polls"`
CrossRoomReplies bool `yaml:"cross_room_replies"`
DisableReplyFallbacks bool `yaml:"disable_reply_fallbacks"`
PrivateChatSelfPuppets bool `yaml:"private_chat_self_puppets"`
MessageHandlingTimeout struct {
ErrorAfterStr string `yaml:"error_after"`

View File

@ -54,6 +54,7 @@ func DoUpgrade(helper *up.Helper) {
helper.Copy(up.Bool, "bridge", "history_sync", "media_requests", "auto_request_media")
helper.Copy(up.Str, "bridge", "history_sync", "media_requests", "request_method")
helper.Copy(up.Int, "bridge", "history_sync", "media_requests", "request_local_time")
helper.Copy(up.Int, "bridge", "history_sync", "media_requests", "max_async_handle")
helper.Copy(up.Int, "bridge", "history_sync", "max_initial_conversations")
helper.Copy(up.Int, "bridge", "history_sync", "message_count")
helper.Copy(up.Int, "bridge", "history_sync", "unread_hours_threshold")
@ -115,6 +116,7 @@ func DoUpgrade(helper *up.Helper) {
}
helper.Copy(up.Bool, "bridge", "cross_room_replies")
helper.Copy(up.Bool, "bridge", "disable_reply_fallbacks")
helper.Copy(up.Bool, "bridge", "private_chat_self_puppets")
helper.Copy(up.Str|up.Null, "bridge", "message_handling_timeout", "error_after")
helper.Copy(up.Str|up.Null, "bridge", "message_handling_timeout", "deadline")

View File

@ -173,6 +173,8 @@ bridge:
request_method: immediate
# If request_method is "local_time", what time should the requests be sent (in minutes after midnight)?
request_local_time: 120
# Maximum number of media request responses to handle in parallel per user.
max_async_handle: 2
# Settings for immediate backfills. These backfills should generally be small and their main purpose is
# to populate each of the initial chats (as configured by max_initial_conversations) with a few messages
# so that you can continue conversations without losing context.
@ -313,6 +315,10 @@ bridge:
# Disable generating reply fallbacks? Some extremely bad clients still rely on them,
# but they're being phased out and will be completely removed in the future.
disable_reply_fallbacks: false
# Invite the puppet which represents the bridge user into private chats?
# This allows proper backfilling in private chats without double puppeting enabled,
# but adds an additional puppet user to each private chat.
private_chat_self_puppets: false
# Maximum time for handling Matrix events. Duration strings formatted for https://pkg.go.dev/time#ParseDuration
# Null means there's no enforced timeout.
message_handling_timeout:

1
go.mod
View File

@ -18,6 +18,7 @@ require (
golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f
golang.org/x/image v0.15.0
golang.org/x/net v0.22.0
golang.org/x/sync v0.3.0
google.golang.org/protobuf v1.33.0
maunium.net/go/mautrix v0.18.0
)

2
go.sum
View File

@ -85,6 +85,8 @@ golang.org/x/image v0.15.0 h1:kOELfmgrmJlw4Cdb7g/QGuB3CvDrXbqEIww/pNtNBm8=
golang.org/x/image v0.15.0/go.mod h1:HUYqC05R2ZcZ3ejNQsIHQDQiwWM4JBqmm6MKANTp4LE=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

View File

@ -250,26 +250,33 @@ func (portal *Portal) legacyBackfill(ctx context.Context, user *User) {
}
func (user *User) dailyMediaRequestLoop() {
// Calculate when to do the first set of media retry requests
now := time.Now()
userTz, err := time.LoadLocation(user.Timezone)
if err != nil {
userTz = now.Local().Location()
}
tonightMidnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, userTz)
midnightOffset := time.Duration(user.bridge.Config.Bridge.HistorySync.MediaRequests.RequestLocalTime) * time.Minute
requestStartTime := tonightMidnight.Add(midnightOffset)
// If the request time for today has already happened, we need to start the
// request loop tomorrow instead.
if requestStartTime.Before(now) {
requestStartTime = requestStartTime.AddDate(0, 0, 1)
}
log := user.zlog.With().
Str("action", "daily media request loop").
Logger()
ctx := log.WithContext(context.Background())
// Calculate when to do the first set of media retry requests
now := time.Now()
userTz, err := time.LoadLocation(user.Timezone)
tzIsInvalid := err != nil && user.Timezone != ""
var requestStartTime time.Time
if tzIsInvalid {
requestStartTime = now.Add(8 * time.Hour)
log.Warn().Msg("Invalid time zone, using static 8 hour start time")
} else {
if userTz == nil {
userTz = now.Local().Location()
}
tonightMidnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, userTz)
midnightOffset := time.Duration(user.bridge.Config.Bridge.HistorySync.MediaRequests.RequestLocalTime) * time.Minute
requestStartTime = tonightMidnight.Add(midnightOffset)
// If the request time for today has already happened, we need to start the
// request loop tomorrow instead.
if requestStartTime.Before(now) {
requestStartTime = requestStartTime.AddDate(0, 0, 1)
}
}
// Wait to start the loop
log.Info().Time("start_loop_at", requestStartTime).Msg("Waiting until start time to do media retry requests")
time.Sleep(time.Until(requestStartTime))

View File

@ -269,7 +269,6 @@ type fakeMessage struct {
type PortalEvent struct {
Message *PortalMessage
MatrixMessage *PortalMatrixMessage
MediaRetry *PortalMediaRetry
}
type PortalMessage struct {
@ -286,11 +285,6 @@ type PortalMatrixMessage struct {
receivedAt time.Time
}
type PortalMediaRetry struct {
evt *events.MediaRetry
source *User
}
type recentlyHandledWrapper struct {
id types.MessageID
err database.MessageErrorType
@ -572,8 +566,6 @@ func (portal *Portal) handleOneMessageLoopItem() {
portal.handleWhatsAppMessageLoopItem(msg.Message)
} else if msg.MatrixMessage != nil {
portal.handleMatrixMessageLoopItem(msg.MatrixMessage)
} else if msg.MediaRetry != nil {
portal.handleMediaRetry(msg.MediaRetry.evt, msg.MediaRetry.source)
} else {
portal.zlog.Warn().Msg("Unexpected PortalEvent with no data")
}
@ -1224,7 +1216,7 @@ func (portal *Portal) getMessageIntent(ctx context.Context, user *User, info *ty
return nil
}
intent := puppet.IntentFor(portal)
if !intent.IsCustomPuppet && portal.IsPrivateChat() && info.Sender.User == portal.Key.Receiver.User && portal.Key.Receiver != portal.Key.JID {
if !portal.bridge.Config.Bridge.PrivateChatSelfPuppets && !intent.IsCustomPuppet && portal.IsPrivateChat() && info.Sender.User == portal.Key.Receiver.User && portal.Key.Receiver != portal.Key.JID {
zerolog.Ctx(ctx).Debug().Msg("Not handling message: user doesn't have double puppeting enabled")
return nil
}
@ -2188,6 +2180,10 @@ func (portal *Portal) CreateMatrixRoom(ctx context.Context, user *User, groupInf
invite = append(invite, portal.bridge.Bot.UserID)
}
}
if portal.IsPrivateChat() && portal.bridge.Config.Bridge.PrivateChatSelfPuppets {
rec := portal.bridge.GetPuppetByJID(portal.Key.Receiver)
invite = append(invite, rec.MXID)
}
if !portal.AvatarURL.IsEmpty() && portal.shouldSetDMRoomMetadata() {
initialState = append(initialState, &event.Event{
Type: event.StateRoomAvatar,
@ -2311,6 +2307,13 @@ func (portal *Portal) CreateMatrixRoom(ctx context.Context, user *User, groupInf
log.Err(err).Msg("Failed to ensure bridge bot is joined to created portal")
}
}
if portal.bridge.Config.Bridge.PrivateChatSelfPuppets {
rec := portal.bridge.GetPuppetByJID(portal.Key.Receiver)
err = rec.DefaultIntent().EnsureJoined(ctx, portal.MXID)
if err != nil {
log.Err(err).Msg("Failed to join created portal with puppet")
}
}
user.UpdateDirectChats(ctx, map[id.UserID][]id.RoomID{puppet.MXID: {portal.MXID}})
} else if portal.IsParent {
@ -3801,6 +3804,13 @@ func (portal *Portal) handleMediaRetry(retry *events.MediaRetry, source *User) {
Str("retry_message_id", retry.MessageID).
Logger()
ctx := log.WithContext(context.TODO())
err := source.mediaRetryLock.Acquire(ctx, 1)
if err != nil {
log.Err(err).Msg("Failed to acquire media retry semaphore")
return
}
defer source.mediaRetryLock.Release(1)
msg, err := portal.bridge.DB.Message.GetByJID(ctx, portal.Key, retry.MessageID)
if msg == nil {
log.Warn().Msg("Dropping media retry notification for unknown message")

View File

@ -41,6 +41,7 @@ import (
"go.mau.fi/whatsmeow/types"
"go.mau.fi/whatsmeow/types/events"
waLog "go.mau.fi/whatsmeow/util/log"
"golang.org/x/sync/semaphore"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/appservice"
"maunium.net/go/mautrix/bridge"
@ -74,6 +75,8 @@ type User struct {
historySyncs chan *events.HistorySync
lastPresence types.Presence
mediaRetryLock *semaphore.Weighted
historySyncLoopsStarted bool
enqueueBackfillsTimer *time.Timer
spaceMembershipChecked bool
@ -257,6 +260,8 @@ func (br *WABridge) NewUser(dbUser *database.User) *User {
lastPresence: types.PresenceUnavailable,
resyncQueue: make(map[types.JID]resyncQueueItem),
mediaRetryLock: semaphore.NewWeighted(br.Config.Bridge.HistorySync.MediaRequests.MaxAsyncHandle),
}
user.PermissionLevel = user.bridge.Config.Bridge.Permissions.Get(user.MXID)
@ -955,9 +960,7 @@ func (user *User) HandleEvent(event interface{}) {
case *events.MediaRetry:
user.phoneSeen(v.Timestamp)
portal := user.GetPortalByJID(v.ChatID)
portal.events <- &PortalEvent{
MediaRetry: &PortalMediaRetry{evt: v, source: user},
}
go portal.handleMediaRetry(v, user)
case *events.CallOffer:
user.handleCallStart(v.CallCreator, v.CallID, "", v.Timestamp)
case *events.CallOfferNotice: