Add option for max message handling duration

This commit is contained in:
Tulir Asokan 2022-06-29 20:05:55 +03:00
parent 52e3cdb121
commit d3d69d1a8a
9 changed files with 92 additions and 34 deletions

View File

@ -363,7 +363,7 @@ func fnCreate(ce *WrappedCommandEvent) {
}
ce.Log.Infofln("Creating group for %s with name %s and participants %+v", ce.RoomID, roomNameEvent.Name, participants)
resp, err := ce.User.Client.CreateGroup(roomNameEvent.Name, participants)
resp, err := ce.User.Client.CreateGroup(roomNameEvent.Name, participants, "")
if err != nil {
ce.Reply("Failed to create group: %v", err)
return

View File

@ -21,6 +21,7 @@ import (
"fmt"
"strings"
"text/template"
"time"
"go.mau.fi/whatsmeow/types"
@ -109,6 +110,9 @@ type BridgeConfig struct {
URLPreviews bool `yaml:"url_previews"`
CaptionInMessage bool `yaml:"caption_in_message"`
MessageHandlingDeadlineStr string `yaml:"message_handling_deadline"`
MessageHandlingDeadline time.Duration `yaml:"-"`
DisableStatusBroadcastSend bool `yaml:"disable_status_broadcast_send"`
DisappearingMessagesInGroups bool `yaml:"disappearing_messages_in_groups"`
@ -195,6 +199,13 @@ func (bc *BridgeConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
return err
}
if bc.MessageHandlingDeadlineStr != "" {
bc.MessageHandlingDeadline, err = time.ParseDuration(bc.MessageHandlingDeadlineStr)
if err != nil {
return err
}
}
return nil
}

View File

@ -91,6 +91,8 @@ func DoUpgrade(helper *up.Helper) {
helper.Copy(up.Bool, "bridge", "disable_bridge_alerts")
helper.Copy(up.Bool, "bridge", "url_previews")
helper.Copy(up.Bool, "bridge", "caption_in_message")
helper.Copy(up.Str|up.Null, "bridge", "message_handling_deadline")
helper.Copy(up.Str, "bridge", "management_room_text", "welcome")
helper.Copy(up.Str, "bridge", "management_room_text", "welcome_connected")
helper.Copy(up.Str, "bridge", "management_room_text", "welcome_unconnected")

View File

@ -280,6 +280,9 @@ bridge:
# Send captions in the same message as images. This will send data compatible with both MSC2530 and MSC3552.
# This is currently not supported in most clients.
caption_in_message: false
# Maximum time for handling Matrix events. A duration string formatted for https://pkg.go.dev/time#ParseDuration
# Null means there's no enforced timeout.
message_handling_deadline: null
# The prefix for commands. Only required in non-management rooms.
command_prefix: "!wa"

4
go.mod
View File

@ -10,12 +10,12 @@ require (
github.com/prometheus/client_golang v1.12.2-0.20220613221938-ebd77f036066
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
github.com/tidwall/gjson v1.14.1
go.mau.fi/whatsmeow v0.0.0-20220628131901-3f187acf2229
go.mau.fi/whatsmeow v0.0.0-20220629162100-72294010aba7
golang.org/x/image v0.0.0-20220617043117-41969df76e82
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e
google.golang.org/protobuf v1.28.0
maunium.net/go/maulogger/v2 v2.3.2
maunium.net/go/mautrix v0.11.1-0.20220628090842-e9aa4b6f3ac8
maunium.net/go/mautrix v0.11.1-0.20220629165511-d505965036ef
)
require (

8
go.sum
View File

@ -64,8 +64,8 @@ github.com/yuin/goldmark v1.4.12 h1:6hffw6vALvEDqJ19dOJvJKOoAOKe4NDaTqvd2sktGN0=
github.com/yuin/goldmark v1.4.12/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.mau.fi/libsignal v0.0.0-20220628090436-4d18b66b087e h1:ByHDg+D+dMIGuBA2n+1xOUf4xr3FJFYg8yxl06s1YBE=
go.mau.fi/libsignal v0.0.0-20220628090436-4d18b66b087e/go.mod h1:RCdzkTWSJv0AKGqurzPXJsEGIVMuQps3E/h7CMUPous=
go.mau.fi/whatsmeow v0.0.0-20220628131901-3f187acf2229 h1:iTJ65cHF4PKn9pTUnbPus6FdFeLXSOvm04FwJwq11hA=
go.mau.fi/whatsmeow v0.0.0-20220628131901-3f187acf2229/go.mod h1:hsjqq2xLuoFew8vbsDCJcGf5EbXCRcR/yoQ+87w6m3k=
go.mau.fi/whatsmeow v0.0.0-20220629162100-72294010aba7 h1:W3wefHGUb4WheA49V9bNK4hNugAqr49XyYRvUb/WC7Y=
go.mau.fi/whatsmeow v0.0.0-20220629162100-72294010aba7/go.mod h1:hsjqq2xLuoFew8vbsDCJcGf5EbXCRcR/yoQ+87w6m3k=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/image v0.0.0-20220617043117-41969df76e82 h1:KpZB5pUSBvrHltNEdK/tw0xlPeD13M6M6aGP32gKqiw=
@ -108,5 +108,5 @@ maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M=
maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA=
maunium.net/go/maulogger/v2 v2.3.2 h1:1XmIYmMd3PoQfp9J+PaHhpt80zpfmMqaShzUTC7FwY0=
maunium.net/go/maulogger/v2 v2.3.2/go.mod h1:TYWy7wKwz/tIXTpsx8G3mZseIRiC5DoMxSZazOHy68A=
maunium.net/go/mautrix v0.11.1-0.20220628090842-e9aa4b6f3ac8 h1:c5xjJlWALgEVLN3CWA77PrEvyGb9yrAcCq/9RJ/+Ubo=
maunium.net/go/mautrix v0.11.1-0.20220628090842-e9aa4b6f3ac8/go.mod h1:Lj4pBam5P0zIvieIFHnGsuaj+xfFtI3y/sC8yGlyna8=
maunium.net/go/mautrix v0.11.1-0.20220629165511-d505965036ef h1:v5axLh3G3cNDNEOn8vCJpafSk4hp7I+aYjmoTq6OjlY=
maunium.net/go/mautrix v0.11.1-0.20220629165511-d505965036ef/go.mod h1:Lj4pBam5P0zIvieIFHnGsuaj+xfFtI3y/sC8yGlyna8=

View File

@ -967,6 +967,7 @@ func (user *User) updateAvatar(jid types.JID, avatarID *string, avatarURL *id.Co
}
*avatarURL = url
}
log.Debugfln("Updated avatar %s -> %s", *avatarID, avatar.ID)
*avatarID = avatar.ID
*avatarSet = false
return true
@ -2777,12 +2778,12 @@ func createJPEGThumbnail(source []byte) ([]byte, error) {
return data, err
}
func (portal *Portal) downloadThumbnail(original []byte, thumbnailURL id.ContentURIString, eventID id.EventID) ([]byte, error) {
func (portal *Portal) downloadThumbnail(ctx context.Context, original []byte, thumbnailURL id.ContentURIString, eventID id.EventID) ([]byte, error) {
if len(thumbnailURL) == 0 {
// just fall back to making thumbnail of original
} else if mxc, err := thumbnailURL.Parse(); err != nil {
portal.log.Warnfln("Malformed thumbnail URL in %s: %v (falling back to generating thumbnail from source)", eventID, err)
} else if thumbnail, err := portal.MainIntent().DownloadBytes(mxc); err != nil {
} else if thumbnail, err := portal.MainIntent().DownloadBytesContext(ctx, mxc); err != nil {
portal.log.Warnfln("Failed to download thumbnail in %s: %v (falling back to generating thumbnail from source)", eventID, err)
} else {
return createJPEGThumbnail(thumbnail)
@ -2804,7 +2805,7 @@ func (portal *Portal) convertWebPtoPNG(webpImage []byte) ([]byte, error) {
return pngBuffer.Bytes(), nil
}
func (portal *Portal) preprocessMatrixMedia(sender *User, relaybotFormatted bool, content *event.MessageEventContent, eventID id.EventID, mediaType whatsmeow.MediaType) (*MediaUpload, error) {
func (portal *Portal) preprocessMatrixMedia(ctx context.Context, sender *User, relaybotFormatted bool, content *event.MessageEventContent, eventID id.EventID, mediaType whatsmeow.MediaType) (*MediaUpload, error) {
var caption string
var mentionedJIDs []string
if relaybotFormatted {
@ -2821,7 +2822,7 @@ func (portal *Portal) preprocessMatrixMedia(sender *User, relaybotFormatted bool
if err != nil {
return nil, err
}
data, err := portal.MainIntent().DownloadBytes(mxc)
data, err := portal.MainIntent().DownloadBytesContext(ctx, mxc)
if err != nil {
return nil, util.NewDualError(errMediaDownloadFailed, err)
}
@ -2832,7 +2833,7 @@ func (portal *Portal) preprocessMatrixMedia(sender *User, relaybotFormatted bool
}
}
if mediaType == whatsmeow.MediaVideo && content.GetInfo().MimeType == "image/gif" {
data, err = ffmpeg.ConvertBytes(data, ".mp4", []string{"-f", "gif"}, []string{
data, err = ffmpeg.ConvertBytes(ctx, data, ".mp4", []string{"-f", "gif"}, []string{
"-pix_fmt", "yuv420p", "-c:v", "libx264", "-movflags", "+faststart",
"-filter:v", "crop='floor(in_w/2)*2:floor(in_h/2)*2'",
}, content.GetInfo().MimeType)
@ -2848,7 +2849,7 @@ func (portal *Portal) preprocessMatrixMedia(sender *User, relaybotFormatted bool
}
content.Info.MimeType = "image/png"
}
uploadResp, err := sender.Client.Upload(context.Background(), data, mediaType)
uploadResp, err := sender.Client.Upload(ctx, data, mediaType)
if err != nil {
return nil, util.NewDualError(errMediaWhatsAppUploadFailed, err)
}
@ -2856,7 +2857,7 @@ func (portal *Portal) preprocessMatrixMedia(sender *User, relaybotFormatted bool
// Audio doesn't have thumbnails
var thumbnail []byte
if mediaType != whatsmeow.MediaAudio {
thumbnail, err = portal.downloadThumbnail(data, content.GetInfo().ThumbnailURL, eventID)
thumbnail, err = portal.downloadThumbnail(ctx, data, content.GetInfo().ThumbnailURL, eventID)
// Ignore format errors for non-image files, we don't care about those thumbnails
if err != nil && (!errors.Is(err, image.ErrFormat) || mediaType == whatsmeow.MediaImage) {
portal.log.Warnfln("Failed to generate thumbnail for %s: %v", eventID, err)
@ -2947,7 +2948,7 @@ func getUnstableWaveform(content map[string]interface{}) []byte {
return output
}
func (portal *Portal) convertMatrixMessage(sender *User, evt *event.Event) (*waProto.Message, *User, error) {
func (portal *Portal) convertMatrixMessage(ctx context.Context, sender *User, evt *event.Event) (*waProto.Message, *User, error) {
content, ok := evt.Content.Parsed.(*event.MessageEventContent)
if !ok {
return nil, sender, fmt.Errorf("%w %T", errUnexpectedParsedContentType, evt.Content.Parsed)
@ -3003,14 +3004,17 @@ func (portal *Portal) convertMatrixMessage(sender *User, evt *event.Event) (*waP
Text: &text,
ContextInfo: &ctxInfo,
}
hasPreview := portal.convertURLPreviewToWhatsApp(sender, evt, msg.ExtendedTextMessage)
hasPreview := portal.convertURLPreviewToWhatsApp(ctx, sender, evt, msg.ExtendedTextMessage)
if ctx.Err() != nil {
return nil, nil, ctx.Err()
}
if ctxInfo.StanzaId == nil && ctxInfo.MentionedJid == nil && ctxInfo.Expiration == nil && !hasPreview {
// No need for extended message
msg.ExtendedTextMessage = nil
msg.Conversation = &text
}
case event.MsgImage:
media, err := portal.preprocessMatrixMedia(sender, relaybotFormatted, content, evt.ID, whatsmeow.MediaImage)
media, err := portal.preprocessMatrixMedia(ctx, sender, relaybotFormatted, content, evt.ID, whatsmeow.MediaImage)
if media == nil {
return nil, sender, err
}
@ -3028,7 +3032,7 @@ func (portal *Portal) convertMatrixMessage(sender *User, evt *event.Event) (*waP
}
case event.MsgVideo:
gifPlayback := content.GetInfo().MimeType == "image/gif"
media, err := portal.preprocessMatrixMedia(sender, relaybotFormatted, content, evt.ID, whatsmeow.MediaVideo)
media, err := portal.preprocessMatrixMedia(ctx, sender, relaybotFormatted, content, evt.ID, whatsmeow.MediaVideo)
if media == nil {
return nil, sender, err
}
@ -3048,7 +3052,7 @@ func (portal *Portal) convertMatrixMessage(sender *User, evt *event.Event) (*waP
FileLength: proto.Uint64(uint64(media.FileLength)),
}
case event.MsgAudio:
media, err := portal.preprocessMatrixMedia(sender, relaybotFormatted, content, evt.ID, whatsmeow.MediaAudio)
media, err := portal.preprocessMatrixMedia(ctx, sender, relaybotFormatted, content, evt.ID, whatsmeow.MediaAudio)
if media == nil {
return nil, sender, err
}
@ -3071,7 +3075,7 @@ func (portal *Portal) convertMatrixMessage(sender *User, evt *event.Event) (*waP
msg.AudioMessage.Mimetype = proto.String(addCodecToMime(content.GetInfo().MimeType, "opus"))
}
case event.MsgFile:
media, err := portal.preprocessMatrixMedia(sender, relaybotFormatted, content, evt.ID, whatsmeow.MediaDocument)
media, err := portal.preprocessMatrixMedia(ctx, sender, relaybotFormatted, content, evt.ID, whatsmeow.MediaDocument)
if media == nil {
return nil, sender, err
}
@ -3159,6 +3163,8 @@ func errorToStatusReason(err error) (reason event.MessageStatusReason, isCertain
errors.Is(err, errBroadcastReactionNotSupported),
errors.Is(err, errBroadcastSendDisabled):
return event.MessageStatusUnsupported, true, false, true
case errors.Is(err, context.DeadlineExceeded):
return event.MessageStatusTooOld, false, true, true
case errors.Is(err, errTargetNotFound),
errors.Is(err, errTargetIsFake),
errors.Is(err, errReactionDatabaseNotFound),
@ -3179,10 +3185,13 @@ func errorToStatusReason(err error) (reason event.MessageStatusReason, isCertain
}
}
func (portal *Portal) sendStatusEvent(evtID id.EventID, err error) {
func (portal *Portal) sendStatusEvent(evtID, lastRetry id.EventID, err error) {
if !portal.bridge.Config.Bridge.MessageStatusEvents {
return
}
if lastRetry == evtID {
lastRetry = ""
}
intent := portal.bridge.Bot
if !portal.Encrypted {
// Bridge bot isn't present in unencrypted DMs
@ -3194,7 +3203,8 @@ func (portal *Portal) sendStatusEvent(evtID id.EventID, err error) {
Type: event.RelReference,
EventID: evtID,
},
Success: err == nil,
Success: err == nil,
LastRetry: lastRetry,
}
if !content.Success {
reason, isCertain, canRetry, _ := errorToStatusReason(err)
@ -3247,6 +3257,10 @@ func (portal *Portal) sendMessageMetrics(evt *event.Event, err error, part strin
if evt.Type == event.EventRedaction {
evtDescription += fmt.Sprintf(" of %s", evt.Redacts)
}
origEvtID := evt.ID
if retryMeta := evt.Content.AsMessage().MessageSendRetry; retryMeta != nil {
origEvtID = retryMeta.OriginalEventID
}
if err != nil {
level := log.LevelError
if part == "Ignoring" {
@ -3259,12 +3273,12 @@ func (portal *Portal) sendMessageMetrics(evt *event.Event, err error, part strin
if sendNotice {
portal.sendErrorMessage(err.Error(), isCertain)
}
portal.sendStatusEvent(evt.ID, err)
portal.sendStatusEvent(origEvtID, evt.ID, err)
} else {
portal.log.Debugfln("Handled Matrix %s %s", msgType, evtDescription)
portal.sendDeliveryReceipt(evt.ID)
portal.bridge.SendMessageSuccessCheckpoint(evt, bridge.MsgStepRemote, 0)
portal.sendStatusEvent(evt.ID, nil)
portal.sendStatusEvent(origEvtID, evt.ID, nil)
}
}
@ -3276,17 +3290,45 @@ func (portal *Portal) HandleMatrixMessage(sender *User, evt *event.Event) {
go portal.sendMessageMetrics(evt, errBroadcastSendDisabled, "Ignoring")
return
}
portal.log.Debugfln("Received message %s from %s", evt.ID, evt.Sender)
msg, sender, err := portal.convertMatrixMessage(sender, evt)
origEvtID := evt.ID
var dbMsg *database.Message
if retryMeta := evt.Content.AsMessage().MessageSendRetry; retryMeta != nil {
origEvtID = retryMeta.OriginalEventID
dbMsg = portal.bridge.DB.Message.GetByMXID(origEvtID)
if dbMsg != nil && dbMsg.Sent {
portal.log.Debugfln("Ignoring retry request %s (#%d) for %s/%s from %s as message was already sent", evt.ID, retryMeta.RetryCount, origEvtID, dbMsg.JID, evt.Sender)
go portal.sendMessageMetrics(evt, nil, "")
return
} else if dbMsg != nil {
portal.log.Debugfln("Got retry request %s (#%d) for %s/%s from %s", evt.ID, retryMeta.RetryCount, origEvtID, dbMsg.JID, evt.Sender)
} else {
portal.log.Debugfln("Got retry request %s (#%d) for %s from %s (original message not known)", evt.ID, retryMeta.RetryCount, origEvtID, evt.Sender)
}
} else {
portal.log.Debugfln("Received message %s from %s", evt.ID, evt.Sender)
}
ctx := context.Background()
if portal.bridge.Config.Bridge.MessageHandlingDeadline > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, portal.bridge.Config.Bridge.MessageHandlingDeadline)
defer cancel()
}
msg, sender, err := portal.convertMatrixMessage(ctx, sender, evt)
if msg == nil {
go portal.sendMessageMetrics(evt, err, "Error converting")
return
}
portal.MarkDisappearing(evt.ID, portal.ExpirationTime, true)
portal.MarkDisappearing(origEvtID, portal.ExpirationTime, true)
info := portal.generateMessageInfo(sender)
dbMsg := portal.markHandled(nil, nil, info, evt.ID, false, true, database.MsgNormal, database.MsgNoError)
if dbMsg == nil {
dbMsg = portal.markHandled(nil, nil, info, evt.ID, false, true, database.MsgNormal, database.MsgNoError)
} else {
info.ID = dbMsg.JID
}
portal.log.Debugln("Sending event", evt.ID, "to WhatsApp", info.ID)
ts, err := sender.Client.SendMessage(portal.Key.JID, info.ID, msg)
ts, err := sender.Client.SendMessage(ctx, portal.Key.JID, info.ID, msg)
go portal.sendMessageMetrics(evt, err, "Error sending")
if err == nil {
dbMsg.MarkSent(ts)
@ -3346,7 +3388,7 @@ func (portal *Portal) sendReactionToWhatsApp(sender *User, id types.MessageID, t
messageKeyParticipant = proto.String(target.Sender.ToNonAD().String())
}
key = variationselector.Remove(key)
return sender.Client.SendMessage(portal.Key.JID, id, &waProto.Message{
return sender.Client.SendMessage(context.TODO(), portal.Key.JID, id, &waProto.Message{
ReactionMessage: &waProto.ReactionMessage{
Key: &waProto.MessageKey{
RemoteJid: proto.String(portal.Key.JID.String()),

View File

@ -113,7 +113,7 @@ func (portal *Portal) convertURLPreviewToBeeper(intent *appservice.IntentAPI, so
var URLRegex = regexp.MustCompile(`https?://[^\s/_*]+(?:/\S*)?`)
func (portal *Portal) convertURLPreviewToWhatsApp(sender *User, evt *event.Event, dest *waProto.ExtendedTextMessage) bool {
func (portal *Portal) convertURLPreviewToWhatsApp(ctx context.Context, sender *User, evt *event.Event, dest *waProto.ExtendedTextMessage) bool {
var preview *BeeperLinkPreview
rawPreview := gjson.GetBytes(evt.Content.VeryRaw, `com\.beeper\.linkpreviews`)
@ -163,7 +163,7 @@ func (portal *Portal) convertURLPreviewToWhatsApp(sender *User, evt *event.Event
imageMXC = preview.ImageEncryption.URL.ParseOrIgnore()
}
if !imageMXC.IsEmpty() {
data, err := portal.MainIntent().DownloadBytes(imageMXC)
data, err := portal.MainIntent().DownloadBytesContext(ctx, imageMXC)
if err != nil {
portal.log.Errorfln("Failed to download URL preview image %s in %s: %v", preview.ImageURL, evt.ID, err)
return true
@ -176,7 +176,7 @@ func (portal *Portal) convertURLPreviewToWhatsApp(sender *User, evt *event.Event
}
}
dest.MediaKeyTimestamp = proto.Int64(time.Now().Unix())
uploadResp, err := sender.Client.Upload(context.Background(), data, whatsmeow.MediaLinkThumbnail)
uploadResp, err := sender.Client.Upload(ctx, data, whatsmeow.MediaLinkThumbnail)
if err != nil {
portal.log.Errorfln("Failed to upload URL preview thumbnail in %s: %v", evt.ID, err)
return true

View File

@ -671,7 +671,7 @@ func (user *User) sendHackyPhonePing() {
} else {
user.log.Warnfln("Failed to get last app state key ID to send hacky phone ping: %v - sending empty request", err)
}
ts, err := user.Client.SendMessage(user.JID.ToNonAD(), msgID, &waProto.Message{
ts, err := user.Client.SendMessage(context.Background(), user.JID.ToNonAD(), msgID, &waProto.Message{
ProtocolMessage: &waProto.ProtocolMessage{
Type: waProto.ProtocolMessage_APP_STATE_SYNC_KEY_REQUEST.Enum(),
AppStateSyncKeyRequest: &waProto.AppStateSyncKeyRequest{