Handle media retries asynchronously

This commit is contained in:
Tulir Asokan 2024-04-11 17:27:49 +03:00
parent db6e6d7dd3
commit 1e84a169f9
7 changed files with 20 additions and 11 deletions

View File

@ -78,6 +78,7 @@ type BridgeConfig struct {
AutoRequestMedia bool `yaml:"auto_request_media"`
RequestMethod MediaRequestMethod `yaml:"request_method"`
RequestLocalTime int `yaml:"request_local_time"`
MaxAsyncHandle int64 `yaml:"max_async_handle"`
} `yaml:"media_requests"`
Deferred []DeferredConfig `yaml:"deferred"`

View File

@ -54,6 +54,7 @@ func DoUpgrade(helper *up.Helper) {
helper.Copy(up.Bool, "bridge", "history_sync", "media_requests", "auto_request_media")
helper.Copy(up.Str, "bridge", "history_sync", "media_requests", "request_method")
helper.Copy(up.Int, "bridge", "history_sync", "media_requests", "request_local_time")
helper.Copy(up.Int, "bridge", "history_sync", "media_requests", "max_async_handle")
helper.Copy(up.Int, "bridge", "history_sync", "max_initial_conversations")
helper.Copy(up.Int, "bridge", "history_sync", "message_count")
helper.Copy(up.Int, "bridge", "history_sync", "unread_hours_threshold")

View File

@ -173,6 +173,8 @@ bridge:
request_method: immediate
# If request_method is "local_time", what time should the requests be sent (in minutes after midnight)?
request_local_time: 120
# Maximum number of media request responses to handle in parallel per user.
max_async_handle: 2
# Settings for immediate backfills. These backfills should generally be small and their main purpose is
# to populate each of the initial chats (as configured by max_initial_conversations) with a few messages
# so that you can continue conversations without losing context.

1
go.mod
View File

@ -18,6 +18,7 @@ require (
golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f
golang.org/x/image v0.15.0
golang.org/x/net v0.22.0
golang.org/x/sync v0.3.0
google.golang.org/protobuf v1.33.0
maunium.net/go/mautrix v0.18.0
)

2
go.sum
View File

@ -85,6 +85,8 @@ golang.org/x/image v0.15.0 h1:kOELfmgrmJlw4Cdb7g/QGuB3CvDrXbqEIww/pNtNBm8=
golang.org/x/image v0.15.0/go.mod h1:HUYqC05R2ZcZ3ejNQsIHQDQiwWM4JBqmm6MKANTp4LE=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

View File

@ -269,7 +269,6 @@ type fakeMessage struct {
type PortalEvent struct {
Message *PortalMessage
MatrixMessage *PortalMatrixMessage
MediaRetry *PortalMediaRetry
}
type PortalMessage struct {
@ -286,11 +285,6 @@ type PortalMatrixMessage struct {
receivedAt time.Time
}
type PortalMediaRetry struct {
evt *events.MediaRetry
source *User
}
type recentlyHandledWrapper struct {
id types.MessageID
err database.MessageErrorType
@ -572,8 +566,6 @@ func (portal *Portal) handleOneMessageLoopItem() {
portal.handleWhatsAppMessageLoopItem(msg.Message)
} else if msg.MatrixMessage != nil {
portal.handleMatrixMessageLoopItem(msg.MatrixMessage)
} else if msg.MediaRetry != nil {
portal.handleMediaRetry(msg.MediaRetry.evt, msg.MediaRetry.source)
} else {
portal.zlog.Warn().Msg("Unexpected PortalEvent with no data")
}
@ -3801,6 +3793,13 @@ func (portal *Portal) handleMediaRetry(retry *events.MediaRetry, source *User) {
Str("retry_message_id", retry.MessageID).
Logger()
ctx := log.WithContext(context.TODO())
err := source.mediaRetryLock.Acquire(ctx, 1)
if err != nil {
log.Err(err).Msg("Failed to acquire media retry semaphore")
return
}
defer source.mediaRetryLock.Release(1)
msg, err := portal.bridge.DB.Message.GetByJID(ctx, portal.Key, retry.MessageID)
if msg == nil {
log.Warn().Msg("Dropping media retry notification for unknown message")

View File

@ -41,6 +41,7 @@ import (
"go.mau.fi/whatsmeow/types"
"go.mau.fi/whatsmeow/types/events"
waLog "go.mau.fi/whatsmeow/util/log"
"golang.org/x/sync/semaphore"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/appservice"
"maunium.net/go/mautrix/bridge"
@ -74,6 +75,8 @@ type User struct {
historySyncs chan *events.HistorySync
lastPresence types.Presence
mediaRetryLock *semaphore.Weighted
historySyncLoopsStarted bool
enqueueBackfillsTimer *time.Timer
spaceMembershipChecked bool
@ -257,6 +260,8 @@ func (br *WABridge) NewUser(dbUser *database.User) *User {
lastPresence: types.PresenceUnavailable,
resyncQueue: make(map[types.JID]resyncQueueItem),
mediaRetryLock: semaphore.NewWeighted(br.Config.Bridge.HistorySync.MediaRequests.MaxAsyncHandle),
}
user.PermissionLevel = user.bridge.Config.Bridge.Permissions.Get(user.MXID)
@ -955,9 +960,7 @@ func (user *User) HandleEvent(event interface{}) {
case *events.MediaRetry:
user.phoneSeen(v.Timestamp)
portal := user.GetPortalByJID(v.ChatID)
portal.events <- &PortalEvent{
MediaRetry: &PortalMediaRetry{evt: v, source: user},
}
go portal.handleMediaRetry(v, user)
case *events.CallOffer:
user.handleCallStart(v.CallCreator, v.CallID, "", v.Timestamp)
case *events.CallOfferNotice: