mirror of
https://github.com/tulir/mautrix-whatsapp
synced 2024-05-19 20:33:45 +02:00
Compare commits
3 commits
2224f17dfd
...
2fed7f2d2e
Author | SHA1 | Date | |
---|---|---|---|
2fed7f2d2e | |||
1e84a169f9 | |||
db6e6d7dd3 |
|
@ -12,6 +12,7 @@ repos:
|
||||||
rev: v1.0.0-rc.1
|
rev: v1.0.0-rc.1
|
||||||
hooks:
|
hooks:
|
||||||
- id: go-imports-repo
|
- id: go-imports-repo
|
||||||
|
args: ["-w"]
|
||||||
- id: go-vet-repo-mod
|
- id: go-vet-repo-mod
|
||||||
|
|
||||||
- repo: https://github.com/beeper/pre-commit-go
|
- repo: https://github.com/beeper/pre-commit-go
|
||||||
|
|
|
@ -78,6 +78,7 @@ type BridgeConfig struct {
|
||||||
AutoRequestMedia bool `yaml:"auto_request_media"`
|
AutoRequestMedia bool `yaml:"auto_request_media"`
|
||||||
RequestMethod MediaRequestMethod `yaml:"request_method"`
|
RequestMethod MediaRequestMethod `yaml:"request_method"`
|
||||||
RequestLocalTime int `yaml:"request_local_time"`
|
RequestLocalTime int `yaml:"request_local_time"`
|
||||||
|
MaxAsyncHandle int64 `yaml:"max_async_handle"`
|
||||||
} `yaml:"media_requests"`
|
} `yaml:"media_requests"`
|
||||||
|
|
||||||
Deferred []DeferredConfig `yaml:"deferred"`
|
Deferred []DeferredConfig `yaml:"deferred"`
|
||||||
|
|
|
@ -54,6 +54,7 @@ func DoUpgrade(helper *up.Helper) {
|
||||||
helper.Copy(up.Bool, "bridge", "history_sync", "media_requests", "auto_request_media")
|
helper.Copy(up.Bool, "bridge", "history_sync", "media_requests", "auto_request_media")
|
||||||
helper.Copy(up.Str, "bridge", "history_sync", "media_requests", "request_method")
|
helper.Copy(up.Str, "bridge", "history_sync", "media_requests", "request_method")
|
||||||
helper.Copy(up.Int, "bridge", "history_sync", "media_requests", "request_local_time")
|
helper.Copy(up.Int, "bridge", "history_sync", "media_requests", "request_local_time")
|
||||||
|
helper.Copy(up.Int, "bridge", "history_sync", "media_requests", "max_async_handle")
|
||||||
helper.Copy(up.Int, "bridge", "history_sync", "max_initial_conversations")
|
helper.Copy(up.Int, "bridge", "history_sync", "max_initial_conversations")
|
||||||
helper.Copy(up.Int, "bridge", "history_sync", "message_count")
|
helper.Copy(up.Int, "bridge", "history_sync", "message_count")
|
||||||
helper.Copy(up.Int, "bridge", "history_sync", "unread_hours_threshold")
|
helper.Copy(up.Int, "bridge", "history_sync", "unread_hours_threshold")
|
||||||
|
|
|
@ -173,6 +173,8 @@ bridge:
|
||||||
request_method: immediate
|
request_method: immediate
|
||||||
# If request_method is "local_time", what time should the requests be sent (in minutes after midnight)?
|
# If request_method is "local_time", what time should the requests be sent (in minutes after midnight)?
|
||||||
request_local_time: 120
|
request_local_time: 120
|
||||||
|
# Maximum number of media request responses to handle in parallel per user.
|
||||||
|
max_async_handle: 2
|
||||||
# Settings for immediate backfills. These backfills should generally be small and their main purpose is
|
# Settings for immediate backfills. These backfills should generally be small and their main purpose is
|
||||||
# to populate each of the initial chats (as configured by max_initial_conversations) with a few messages
|
# to populate each of the initial chats (as configured by max_initial_conversations) with a few messages
|
||||||
# so that you can continue conversations without losing context.
|
# so that you can continue conversations without losing context.
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -18,6 +18,7 @@ require (
|
||||||
golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f
|
golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f
|
||||||
golang.org/x/image v0.15.0
|
golang.org/x/image v0.15.0
|
||||||
golang.org/x/net v0.22.0
|
golang.org/x/net v0.22.0
|
||||||
|
golang.org/x/sync v0.3.0
|
||||||
google.golang.org/protobuf v1.33.0
|
google.golang.org/protobuf v1.33.0
|
||||||
maunium.net/go/mautrix v0.18.0
|
maunium.net/go/mautrix v0.18.0
|
||||||
)
|
)
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -85,6 +85,8 @@ golang.org/x/image v0.15.0 h1:kOELfmgrmJlw4Cdb7g/QGuB3CvDrXbqEIww/pNtNBm8=
|
||||||
golang.org/x/image v0.15.0/go.mod h1:HUYqC05R2ZcZ3ejNQsIHQDQiwWM4JBqmm6MKANTp4LE=
|
golang.org/x/image v0.15.0/go.mod h1:HUYqC05R2ZcZ3ejNQsIHQDQiwWM4JBqmm6MKANTp4LE=
|
||||||
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
|
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
|
||||||
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
|
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
|
||||||
|
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
|
||||||
|
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
|
||||||
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
|
|
@ -250,26 +250,33 @@ func (portal *Portal) legacyBackfill(ctx context.Context, user *User) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (user *User) dailyMediaRequestLoop() {
|
func (user *User) dailyMediaRequestLoop() {
|
||||||
// Calculate when to do the first set of media retry requests
|
|
||||||
now := time.Now()
|
|
||||||
userTz, err := time.LoadLocation(user.Timezone)
|
|
||||||
if err != nil {
|
|
||||||
userTz = now.Local().Location()
|
|
||||||
}
|
|
||||||
tonightMidnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, userTz)
|
|
||||||
midnightOffset := time.Duration(user.bridge.Config.Bridge.HistorySync.MediaRequests.RequestLocalTime) * time.Minute
|
|
||||||
requestStartTime := tonightMidnight.Add(midnightOffset)
|
|
||||||
|
|
||||||
// If the request time for today has already happened, we need to start the
|
|
||||||
// request loop tomorrow instead.
|
|
||||||
if requestStartTime.Before(now) {
|
|
||||||
requestStartTime = requestStartTime.AddDate(0, 0, 1)
|
|
||||||
}
|
|
||||||
log := user.zlog.With().
|
log := user.zlog.With().
|
||||||
Str("action", "daily media request loop").
|
Str("action", "daily media request loop").
|
||||||
Logger()
|
Logger()
|
||||||
ctx := log.WithContext(context.Background())
|
ctx := log.WithContext(context.Background())
|
||||||
|
|
||||||
|
// Calculate when to do the first set of media retry requests
|
||||||
|
now := time.Now()
|
||||||
|
userTz, err := time.LoadLocation(user.Timezone)
|
||||||
|
tzIsInvalid := err != nil && user.Timezone != ""
|
||||||
|
var requestStartTime time.Time
|
||||||
|
if tzIsInvalid {
|
||||||
|
requestStartTime = now.Add(8 * time.Hour)
|
||||||
|
log.Warn().Msg("Invalid time zone, using static 8 hour start time")
|
||||||
|
} else {
|
||||||
|
if userTz == nil {
|
||||||
|
userTz = now.Local().Location()
|
||||||
|
}
|
||||||
|
tonightMidnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, userTz)
|
||||||
|
midnightOffset := time.Duration(user.bridge.Config.Bridge.HistorySync.MediaRequests.RequestLocalTime) * time.Minute
|
||||||
|
requestStartTime = tonightMidnight.Add(midnightOffset)
|
||||||
|
// If the request time for today has already happened, we need to start the
|
||||||
|
// request loop tomorrow instead.
|
||||||
|
if requestStartTime.Before(now) {
|
||||||
|
requestStartTime = requestStartTime.AddDate(0, 0, 1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Wait to start the loop
|
// Wait to start the loop
|
||||||
log.Info().Time("start_loop_at", requestStartTime).Msg("Waiting until start time to do media retry requests")
|
log.Info().Time("start_loop_at", requestStartTime).Msg("Waiting until start time to do media retry requests")
|
||||||
time.Sleep(time.Until(requestStartTime))
|
time.Sleep(time.Until(requestStartTime))
|
||||||
|
|
15
portal.go
15
portal.go
|
@ -269,7 +269,6 @@ type fakeMessage struct {
|
||||||
type PortalEvent struct {
|
type PortalEvent struct {
|
||||||
Message *PortalMessage
|
Message *PortalMessage
|
||||||
MatrixMessage *PortalMatrixMessage
|
MatrixMessage *PortalMatrixMessage
|
||||||
MediaRetry *PortalMediaRetry
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type PortalMessage struct {
|
type PortalMessage struct {
|
||||||
|
@ -286,11 +285,6 @@ type PortalMatrixMessage struct {
|
||||||
receivedAt time.Time
|
receivedAt time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
type PortalMediaRetry struct {
|
|
||||||
evt *events.MediaRetry
|
|
||||||
source *User
|
|
||||||
}
|
|
||||||
|
|
||||||
type recentlyHandledWrapper struct {
|
type recentlyHandledWrapper struct {
|
||||||
id types.MessageID
|
id types.MessageID
|
||||||
err database.MessageErrorType
|
err database.MessageErrorType
|
||||||
|
@ -572,8 +566,6 @@ func (portal *Portal) handleOneMessageLoopItem() {
|
||||||
portal.handleWhatsAppMessageLoopItem(msg.Message)
|
portal.handleWhatsAppMessageLoopItem(msg.Message)
|
||||||
} else if msg.MatrixMessage != nil {
|
} else if msg.MatrixMessage != nil {
|
||||||
portal.handleMatrixMessageLoopItem(msg.MatrixMessage)
|
portal.handleMatrixMessageLoopItem(msg.MatrixMessage)
|
||||||
} else if msg.MediaRetry != nil {
|
|
||||||
portal.handleMediaRetry(msg.MediaRetry.evt, msg.MediaRetry.source)
|
|
||||||
} else {
|
} else {
|
||||||
portal.zlog.Warn().Msg("Unexpected PortalEvent with no data")
|
portal.zlog.Warn().Msg("Unexpected PortalEvent with no data")
|
||||||
}
|
}
|
||||||
|
@ -3801,6 +3793,13 @@ func (portal *Portal) handleMediaRetry(retry *events.MediaRetry, source *User) {
|
||||||
Str("retry_message_id", retry.MessageID).
|
Str("retry_message_id", retry.MessageID).
|
||||||
Logger()
|
Logger()
|
||||||
ctx := log.WithContext(context.TODO())
|
ctx := log.WithContext(context.TODO())
|
||||||
|
err := source.mediaRetryLock.Acquire(ctx, 1)
|
||||||
|
if err != nil {
|
||||||
|
log.Err(err).Msg("Failed to acquire media retry semaphore")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer source.mediaRetryLock.Release(1)
|
||||||
|
|
||||||
msg, err := portal.bridge.DB.Message.GetByJID(ctx, portal.Key, retry.MessageID)
|
msg, err := portal.bridge.DB.Message.GetByJID(ctx, portal.Key, retry.MessageID)
|
||||||
if msg == nil {
|
if msg == nil {
|
||||||
log.Warn().Msg("Dropping media retry notification for unknown message")
|
log.Warn().Msg("Dropping media retry notification for unknown message")
|
||||||
|
|
9
user.go
9
user.go
|
@ -41,6 +41,7 @@ import (
|
||||||
"go.mau.fi/whatsmeow/types"
|
"go.mau.fi/whatsmeow/types"
|
||||||
"go.mau.fi/whatsmeow/types/events"
|
"go.mau.fi/whatsmeow/types/events"
|
||||||
waLog "go.mau.fi/whatsmeow/util/log"
|
waLog "go.mau.fi/whatsmeow/util/log"
|
||||||
|
"golang.org/x/sync/semaphore"
|
||||||
"maunium.net/go/mautrix"
|
"maunium.net/go/mautrix"
|
||||||
"maunium.net/go/mautrix/appservice"
|
"maunium.net/go/mautrix/appservice"
|
||||||
"maunium.net/go/mautrix/bridge"
|
"maunium.net/go/mautrix/bridge"
|
||||||
|
@ -74,6 +75,8 @@ type User struct {
|
||||||
historySyncs chan *events.HistorySync
|
historySyncs chan *events.HistorySync
|
||||||
lastPresence types.Presence
|
lastPresence types.Presence
|
||||||
|
|
||||||
|
mediaRetryLock *semaphore.Weighted
|
||||||
|
|
||||||
historySyncLoopsStarted bool
|
historySyncLoopsStarted bool
|
||||||
enqueueBackfillsTimer *time.Timer
|
enqueueBackfillsTimer *time.Timer
|
||||||
spaceMembershipChecked bool
|
spaceMembershipChecked bool
|
||||||
|
@ -257,6 +260,8 @@ func (br *WABridge) NewUser(dbUser *database.User) *User {
|
||||||
lastPresence: types.PresenceUnavailable,
|
lastPresence: types.PresenceUnavailable,
|
||||||
|
|
||||||
resyncQueue: make(map[types.JID]resyncQueueItem),
|
resyncQueue: make(map[types.JID]resyncQueueItem),
|
||||||
|
|
||||||
|
mediaRetryLock: semaphore.NewWeighted(br.Config.Bridge.HistorySync.MediaRequests.MaxAsyncHandle),
|
||||||
}
|
}
|
||||||
|
|
||||||
user.PermissionLevel = user.bridge.Config.Bridge.Permissions.Get(user.MXID)
|
user.PermissionLevel = user.bridge.Config.Bridge.Permissions.Get(user.MXID)
|
||||||
|
@ -955,9 +960,7 @@ func (user *User) HandleEvent(event interface{}) {
|
||||||
case *events.MediaRetry:
|
case *events.MediaRetry:
|
||||||
user.phoneSeen(v.Timestamp)
|
user.phoneSeen(v.Timestamp)
|
||||||
portal := user.GetPortalByJID(v.ChatID)
|
portal := user.GetPortalByJID(v.ChatID)
|
||||||
portal.events <- &PortalEvent{
|
go portal.handleMediaRetry(v, user)
|
||||||
MediaRetry: &PortalMediaRetry{evt: v, source: user},
|
|
||||||
}
|
|
||||||
case *events.CallOffer:
|
case *events.CallOffer:
|
||||||
user.handleCallStart(v.CallCreator, v.CallID, "", v.Timestamp)
|
user.handleCallStart(v.CallCreator, v.CallID, "", v.Timestamp)
|
||||||
case *events.CallOfferNotice:
|
case *events.CallOfferNotice:
|
||||||
|
|
Loading…
Reference in a new issue