mirror of
https://github.com/tulir/mautrix-whatsapp
synced 2024-12-13 17:13:11 +01:00
Improve max message handling duration options
This commit is contained in:
parent
e6240ff68c
commit
44768c9c93
5 changed files with 324 additions and 191 deletions
|
@ -110,8 +110,13 @@ type BridgeConfig struct {
|
|||
URLPreviews bool `yaml:"url_previews"`
|
||||
CaptionInMessage bool `yaml:"caption_in_message"`
|
||||
|
||||
MessageHandlingDeadlineStr string `yaml:"message_handling_deadline"`
|
||||
MessageHandlingDeadline time.Duration `yaml:"-"`
|
||||
MessageHandlingTimeout struct {
|
||||
ErrorAfterStr string `yaml:"error_after"`
|
||||
DeadlineStr string `yaml:"deadline"`
|
||||
|
||||
ErrorAfter time.Duration `yaml:"-"`
|
||||
Deadline time.Duration `yaml:"-"`
|
||||
} `yaml:"message_handling_timeout"`
|
||||
|
||||
DisableStatusBroadcastSend bool `yaml:"disable_status_broadcast_send"`
|
||||
DisappearingMessagesInGroups bool `yaml:"disappearing_messages_in_groups"`
|
||||
|
@ -199,8 +204,14 @@ func (bc *BridgeConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if bc.MessageHandlingDeadlineStr != "" {
|
||||
bc.MessageHandlingDeadline, err = time.ParseDuration(bc.MessageHandlingDeadlineStr)
|
||||
if bc.MessageHandlingTimeout.ErrorAfterStr != "" {
|
||||
bc.MessageHandlingTimeout.ErrorAfter, err = time.ParseDuration(bc.MessageHandlingTimeout.ErrorAfterStr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if bc.MessageHandlingTimeout.DeadlineStr != "" {
|
||||
bc.MessageHandlingTimeout.Deadline, err = time.ParseDuration(bc.MessageHandlingTimeout.DeadlineStr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -91,7 +91,8 @@ func DoUpgrade(helper *up.Helper) {
|
|||
helper.Copy(up.Bool, "bridge", "disable_bridge_alerts")
|
||||
helper.Copy(up.Bool, "bridge", "url_previews")
|
||||
helper.Copy(up.Bool, "bridge", "caption_in_message")
|
||||
helper.Copy(up.Str|up.Null, "bridge", "message_handling_deadline")
|
||||
helper.Copy(up.Str|up.Null, "bridge", "message_handling_timeout", "error_after")
|
||||
helper.Copy(up.Str|up.Null, "bridge", "message_handling_timeout", "deadline")
|
||||
|
||||
helper.Copy(up.Str, "bridge", "management_room_text", "welcome")
|
||||
helper.Copy(up.Str, "bridge", "management_room_text", "welcome_connected")
|
||||
|
|
|
@ -280,9 +280,13 @@ bridge:
|
|||
# Send captions in the same message as images. This will send data compatible with both MSC2530 and MSC3552.
|
||||
# This is currently not supported in most clients.
|
||||
caption_in_message: false
|
||||
# Maximum time for handling Matrix events. A duration string formatted for https://pkg.go.dev/time#ParseDuration
|
||||
# Maximum time for handling Matrix events. Duration strings formatted for https://pkg.go.dev/time#ParseDuration
|
||||
# Null means there's no enforced timeout.
|
||||
message_handling_deadline: null
|
||||
message_handling_timeout:
|
||||
# Send an error message after this timeout, but keep waiting for the response until the deadline.
|
||||
error_after: 10s
|
||||
# Drop messages after this timeout. They may still go through if the message got sent to the servers.
|
||||
deadline: 60s
|
||||
|
||||
# The prefix for commands. Only required in non-management rooms.
|
||||
command_prefix: "!wa"
|
||||
|
|
262
messagetracking.go
Normal file
262
messagetracking.go
Normal file
|
@ -0,0 +1,262 @@
|
|||
// mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
|
||||
// Copyright (C) 2022 Tulir Asokan
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"go.mau.fi/whatsmeow"
|
||||
log "maunium.net/go/maulogger/v2"
|
||||
"maunium.net/go/mautrix"
|
||||
"maunium.net/go/mautrix/bridge"
|
||||
"maunium.net/go/mautrix/event"
|
||||
"maunium.net/go/mautrix/id"
|
||||
)
|
||||
|
||||
var (
|
||||
errUserNotConnected = errors.New("you are not connected to WhatsApp")
|
||||
errDifferentUser = errors.New("user is not the recipient of this private chat portal")
|
||||
errUserNotLoggedIn = errors.New("user is not logged in and chat has no relay bot")
|
||||
errMNoticeDisabled = errors.New("bridging m.notice messages is disabled")
|
||||
errUnexpectedParsedContentType = errors.New("unexpected parsed content type")
|
||||
errInvalidGeoURI = errors.New("invalid `geo:` URI in message")
|
||||
errUnknownMsgType = errors.New("unknown msgtype")
|
||||
errMediaDownloadFailed = errors.New("failed to download media")
|
||||
errMediaDecryptFailed = errors.New("failed to decrypt media")
|
||||
errMediaConvertFailed = errors.New("failed to convert media")
|
||||
errMediaWhatsAppUploadFailed = errors.New("failed to upload media to WhatsApp")
|
||||
errTargetNotFound = errors.New("target event not found")
|
||||
errReactionDatabaseNotFound = errors.New("reaction database entry not found")
|
||||
errReactionTargetNotFound = errors.New("reaction target message not found")
|
||||
errTargetIsFake = errors.New("target is a fake event")
|
||||
errTargetSentBySomeoneElse = errors.New("target is a fake event")
|
||||
|
||||
errBroadcastReactionNotSupported = errors.New("reacting to status messages is not currently supported")
|
||||
errBroadcastSendDisabled = errors.New("sending status messages is disabled")
|
||||
|
||||
errMessageDisconnected = &whatsmeow.DisconnectedError{Action: "message send"}
|
||||
errMessageRetryDisconnected = &whatsmeow.DisconnectedError{Action: "message send (retry)"}
|
||||
|
||||
errMessageTakingLong = errors.New("bridging the message is taking longer than usual")
|
||||
errTimeoutBeforeHandling = errors.New("message timed out before handling was started")
|
||||
)
|
||||
|
||||
func errorToStatusReason(err error) (reason event.MessageStatusReason, isCertain, canRetry, sendNotice bool) {
|
||||
switch {
|
||||
case errors.Is(err, whatsmeow.ErrBroadcastListUnsupported),
|
||||
errors.Is(err, errUnexpectedParsedContentType),
|
||||
errors.Is(err, errUnknownMsgType),
|
||||
errors.Is(err, errInvalidGeoURI),
|
||||
errors.Is(err, whatsmeow.ErrUnknownServer),
|
||||
errors.Is(err, whatsmeow.ErrRecipientADJID),
|
||||
errors.Is(err, errBroadcastReactionNotSupported),
|
||||
errors.Is(err, errBroadcastSendDisabled):
|
||||
return event.MessageStatusUnsupported, true, false, true
|
||||
case errors.Is(err, errTimeoutBeforeHandling):
|
||||
return event.MessageStatusTooOld, true, true, true
|
||||
case errors.Is(err, context.DeadlineExceeded):
|
||||
return event.MessageStatusTooOld, false, true, true
|
||||
case errors.Is(err, errMessageTakingLong):
|
||||
// Set can_retry=false here since we'll send another status event allowing the retry later
|
||||
// Technically retrying when this happens is fine, but we'd just ignore it anyway.
|
||||
return event.MessageStatusTooOld, false, false, true
|
||||
case errors.Is(err, errTargetNotFound),
|
||||
errors.Is(err, errTargetIsFake),
|
||||
errors.Is(err, errReactionDatabaseNotFound),
|
||||
errors.Is(err, errReactionTargetNotFound),
|
||||
errors.Is(err, errTargetSentBySomeoneElse):
|
||||
return event.MessageStatusGenericError, true, false, false
|
||||
case errors.Is(err, whatsmeow.ErrNotConnected),
|
||||
errors.Is(err, errUserNotConnected):
|
||||
return event.MessageStatusGenericError, true, true, true
|
||||
case errors.Is(err, errUserNotLoggedIn),
|
||||
errors.Is(err, errDifferentUser):
|
||||
return event.MessageStatusGenericError, true, true, false
|
||||
case errors.Is(err, errMessageDisconnected),
|
||||
errors.Is(err, errMessageRetryDisconnected):
|
||||
return event.MessageStatusGenericError, false, true, true
|
||||
default:
|
||||
return event.MessageStatusGenericError, false, true, true
|
||||
}
|
||||
}
|
||||
|
||||
func (portal *Portal) sendErrorMessage(err error, confirmed bool, editID id.EventID) id.EventID {
|
||||
if !portal.bridge.Config.Bridge.MessageErrorNotices {
|
||||
return ""
|
||||
}
|
||||
certainty := "may not have been"
|
||||
if confirmed {
|
||||
certainty = "was not"
|
||||
}
|
||||
msg := fmt.Sprintf("\u26a0 Your message %s bridged: %v", certainty, err)
|
||||
if errors.Is(err, errMessageTakingLong) {
|
||||
msg = "\u26a0 Bridging your message is taking longer than usual"
|
||||
}
|
||||
content := &event.MessageEventContent{
|
||||
MsgType: event.MsgNotice,
|
||||
Body: msg,
|
||||
}
|
||||
if editID != "" {
|
||||
content.SetEdit(editID)
|
||||
}
|
||||
resp, err := portal.sendMainIntentMessage(content)
|
||||
if err != nil {
|
||||
portal.log.Warnfln("Failed to send bridging error message:", err)
|
||||
return ""
|
||||
}
|
||||
return resp.EventID
|
||||
}
|
||||
|
||||
func (portal *Portal) sendStatusEvent(evtID, lastRetry id.EventID, err error) {
|
||||
if !portal.bridge.Config.Bridge.MessageStatusEvents {
|
||||
return
|
||||
}
|
||||
if lastRetry == evtID {
|
||||
lastRetry = ""
|
||||
}
|
||||
intent := portal.bridge.Bot
|
||||
if !portal.Encrypted {
|
||||
// Bridge bot isn't present in unencrypted DMs
|
||||
intent = portal.MainIntent()
|
||||
}
|
||||
content := event.BeeperMessageStatusEventContent{
|
||||
Network: portal.getBridgeInfoStateKey(),
|
||||
RelatesTo: event.RelatesTo{
|
||||
Type: event.RelReference,
|
||||
EventID: evtID,
|
||||
},
|
||||
Success: err == nil,
|
||||
LastRetry: lastRetry,
|
||||
}
|
||||
if !content.Success {
|
||||
reason, isCertain, canRetry, _ := errorToStatusReason(err)
|
||||
content.Reason = reason
|
||||
content.IsCertain = &isCertain
|
||||
content.CanRetry = &canRetry
|
||||
content.Error = err.Error()
|
||||
}
|
||||
_, err = intent.SendMessageEvent(portal.MXID, event.BeeperMessageStatus, &content)
|
||||
if err != nil {
|
||||
portal.log.Warnln("Failed to send message status event:", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (portal *Portal) sendDeliveryReceipt(eventID id.EventID) {
|
||||
if portal.bridge.Config.Bridge.DeliveryReceipts {
|
||||
err := portal.bridge.Bot.MarkRead(portal.MXID, eventID)
|
||||
if err != nil {
|
||||
portal.log.Debugfln("Failed to send delivery receipt for %s: %v", eventID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (portal *Portal) sendMessageMetrics(evt *event.Event, err error, part string, ms *metricSender) {
|
||||
var msgType string
|
||||
switch evt.Type {
|
||||
case event.EventMessage:
|
||||
msgType = "message"
|
||||
case event.EventReaction:
|
||||
msgType = "reaction"
|
||||
case event.EventRedaction:
|
||||
msgType = "redaction"
|
||||
default:
|
||||
msgType = "unknown event"
|
||||
}
|
||||
evtDescription := evt.ID.String()
|
||||
if evt.Type == event.EventRedaction {
|
||||
evtDescription += fmt.Sprintf(" of %s", evt.Redacts)
|
||||
}
|
||||
origEvtID := evt.ID
|
||||
if retryMeta := evt.Content.AsMessage().MessageSendRetry; retryMeta != nil {
|
||||
origEvtID = retryMeta.OriginalEventID
|
||||
}
|
||||
if err != nil {
|
||||
level := log.LevelError
|
||||
if part == "Ignoring" {
|
||||
level = log.LevelDebug
|
||||
}
|
||||
portal.log.Logfln(level, "%s %s %s from %s: %v", part, msgType, evtDescription, evt.Sender, err)
|
||||
reason, isCertain, _, sendNotice := errorToStatusReason(err)
|
||||
status := bridge.ReasonToCheckpointStatus(reason)
|
||||
portal.bridge.SendMessageCheckpoint(evt, bridge.MsgStepRemote, err, status, ms.getRetryNum())
|
||||
if sendNotice {
|
||||
ms.setNoticeID(portal.sendErrorMessage(err, isCertain, ms.getNoticeID()))
|
||||
}
|
||||
portal.sendStatusEvent(origEvtID, evt.ID, err)
|
||||
} else {
|
||||
portal.log.Debugfln("Handled Matrix %s %s", msgType, evtDescription)
|
||||
portal.sendDeliveryReceipt(evt.ID)
|
||||
portal.bridge.SendMessageSuccessCheckpoint(evt, bridge.MsgStepRemote, ms.getRetryNum())
|
||||
portal.sendStatusEvent(origEvtID, evt.ID, nil)
|
||||
if prevNotice := ms.popNoticeID(); prevNotice != "" {
|
||||
_, _ = portal.MainIntent().RedactEvent(portal.MXID, prevNotice, mautrix.ReqRedact{
|
||||
Reason: "error resolved",
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type metricSender struct {
|
||||
portal *Portal
|
||||
previousNotice id.EventID
|
||||
lock sync.Mutex
|
||||
completed bool
|
||||
retryNum int
|
||||
}
|
||||
|
||||
func (ms *metricSender) getRetryNum() int {
|
||||
if ms != nil {
|
||||
return ms.retryNum
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (ms *metricSender) getNoticeID() id.EventID {
|
||||
if ms == nil {
|
||||
return ""
|
||||
}
|
||||
return ms.previousNotice
|
||||
}
|
||||
|
||||
func (ms *metricSender) popNoticeID() id.EventID {
|
||||
if ms == nil {
|
||||
return ""
|
||||
}
|
||||
evtID := ms.previousNotice
|
||||
ms.previousNotice = ""
|
||||
return evtID
|
||||
}
|
||||
|
||||
func (ms *metricSender) setNoticeID(evtID id.EventID) {
|
||||
if ms != nil && ms.previousNotice == "" {
|
||||
ms.previousNotice = evtID
|
||||
}
|
||||
}
|
||||
|
||||
func (ms *metricSender) sendMessageMetrics(evt *event.Event, err error, part string, completed bool) {
|
||||
ms.lock.Lock()
|
||||
defer ms.lock.Unlock()
|
||||
if !completed && ms.completed {
|
||||
return
|
||||
}
|
||||
ms.portal.sendMessageMetrics(evt, err, part, ms)
|
||||
ms.retryNum++
|
||||
ms.completed = completed
|
||||
}
|
223
portal.go
223
portal.go
|
@ -3108,126 +3108,6 @@ func (portal *Portal) convertMatrixMessage(ctx context.Context, sender *User, ev
|
|||
return &msg, sender, nil
|
||||
}
|
||||
|
||||
func (portal *Portal) sendErrorMessage(message string, confirmed bool) id.EventID {
|
||||
if !portal.bridge.Config.Bridge.MessageErrorNotices {
|
||||
return ""
|
||||
}
|
||||
certainty := "may not have been"
|
||||
if confirmed {
|
||||
certainty = "was not"
|
||||
}
|
||||
resp, err := portal.sendMainIntentMessage(&event.MessageEventContent{
|
||||
MsgType: event.MsgNotice,
|
||||
Body: fmt.Sprintf("\u26a0 Your message %s bridged: %v", certainty, message),
|
||||
})
|
||||
if err != nil {
|
||||
portal.log.Warnfln("Failed to send bridging error message:", err)
|
||||
return ""
|
||||
}
|
||||
return resp.EventID
|
||||
}
|
||||
|
||||
var (
|
||||
errUserNotConnected = errors.New("you are not connected to WhatsApp")
|
||||
errDifferentUser = errors.New("user is not the recipient of this private chat portal")
|
||||
errUserNotLoggedIn = errors.New("user is not logged in and chat has no relay bot")
|
||||
errMNoticeDisabled = errors.New("bridging m.notice messages is disabled")
|
||||
errUnexpectedParsedContentType = errors.New("unexpected parsed content type")
|
||||
errInvalidGeoURI = errors.New("invalid `geo:` URI in message")
|
||||
errUnknownMsgType = errors.New("unknown msgtype")
|
||||
errMediaDownloadFailed = errors.New("failed to download media")
|
||||
errMediaDecryptFailed = errors.New("failed to decrypt media")
|
||||
errMediaConvertFailed = errors.New("failed to convert media")
|
||||
errMediaWhatsAppUploadFailed = errors.New("failed to upload media to WhatsApp")
|
||||
errTargetNotFound = errors.New("target event not found")
|
||||
errReactionDatabaseNotFound = errors.New("reaction database entry not found")
|
||||
errReactionTargetNotFound = errors.New("reaction target message not found")
|
||||
errTargetIsFake = errors.New("target is a fake event")
|
||||
errTargetSentBySomeoneElse = errors.New("target is a fake event")
|
||||
|
||||
errBroadcastReactionNotSupported = errors.New("reacting to status messages is not currently supported")
|
||||
errBroadcastSendDisabled = errors.New("sending status messages is disabled")
|
||||
|
||||
errMessageDisconnected = &whatsmeow.DisconnectedError{Action: "message send"}
|
||||
errMessageRetryDisconnected = &whatsmeow.DisconnectedError{Action: "message send (retry)"}
|
||||
)
|
||||
|
||||
func errorToStatusReason(err error) (reason event.MessageStatusReason, isCertain, canRetry, sendNotice bool) {
|
||||
switch {
|
||||
case errors.Is(err, whatsmeow.ErrBroadcastListUnsupported),
|
||||
errors.Is(err, errUnexpectedParsedContentType),
|
||||
errors.Is(err, errUnknownMsgType),
|
||||
errors.Is(err, errInvalidGeoURI),
|
||||
errors.Is(err, whatsmeow.ErrUnknownServer),
|
||||
errors.Is(err, whatsmeow.ErrRecipientADJID),
|
||||
errors.Is(err, errBroadcastReactionNotSupported),
|
||||
errors.Is(err, errBroadcastSendDisabled):
|
||||
return event.MessageStatusUnsupported, true, false, true
|
||||
case errors.Is(err, context.DeadlineExceeded):
|
||||
return event.MessageStatusTooOld, false, true, true
|
||||
case errors.Is(err, errTargetNotFound),
|
||||
errors.Is(err, errTargetIsFake),
|
||||
errors.Is(err, errReactionDatabaseNotFound),
|
||||
errors.Is(err, errReactionTargetNotFound),
|
||||
errors.Is(err, errTargetSentBySomeoneElse):
|
||||
return event.MessageStatusGenericError, true, false, false
|
||||
case errors.Is(err, whatsmeow.ErrNotConnected),
|
||||
errors.Is(err, errUserNotConnected):
|
||||
return event.MessageStatusGenericError, true, true, true
|
||||
case errors.Is(err, errUserNotLoggedIn),
|
||||
errors.Is(err, errDifferentUser):
|
||||
return event.MessageStatusGenericError, true, true, false
|
||||
case errors.Is(err, errMessageDisconnected),
|
||||
errors.Is(err, errMessageRetryDisconnected):
|
||||
return event.MessageStatusGenericError, false, true, true
|
||||
default:
|
||||
return event.MessageStatusGenericError, false, true, true
|
||||
}
|
||||
}
|
||||
|
||||
func (portal *Portal) sendStatusEvent(evtID, lastRetry id.EventID, err error) {
|
||||
if !portal.bridge.Config.Bridge.MessageStatusEvents {
|
||||
return
|
||||
}
|
||||
if lastRetry == evtID {
|
||||
lastRetry = ""
|
||||
}
|
||||
intent := portal.bridge.Bot
|
||||
if !portal.Encrypted {
|
||||
// Bridge bot isn't present in unencrypted DMs
|
||||
intent = portal.MainIntent()
|
||||
}
|
||||
content := event.BeeperMessageStatusEventContent{
|
||||
Network: portal.getBridgeInfoStateKey(),
|
||||
RelatesTo: event.RelatesTo{
|
||||
Type: event.RelReference,
|
||||
EventID: evtID,
|
||||
},
|
||||
Success: err == nil,
|
||||
LastRetry: lastRetry,
|
||||
}
|
||||
if !content.Success {
|
||||
reason, isCertain, canRetry, _ := errorToStatusReason(err)
|
||||
content.Reason = reason
|
||||
content.IsCertain = &isCertain
|
||||
content.CanRetry = &canRetry
|
||||
content.Error = err.Error()
|
||||
}
|
||||
_, err = intent.SendMessageEvent(portal.MXID, event.BeeperMessageStatus, &content)
|
||||
if err != nil {
|
||||
portal.log.Warnln("Failed to send message status event:", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (portal *Portal) sendDeliveryReceipt(eventID id.EventID) {
|
||||
if portal.bridge.Config.Bridge.DeliveryReceipts {
|
||||
err := portal.bridge.Bot.MarkRead(portal.MXID, eventID)
|
||||
if err != nil {
|
||||
portal.log.Debugfln("Failed to send delivery receipt for %s: %v", eventID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (portal *Portal) generateMessageInfo(sender *User) *types.MessageInfo {
|
||||
return &types.MessageInfo{
|
||||
ID: whatsmeow.GenerateMessageID(),
|
||||
|
@ -3241,83 +3121,58 @@ func (portal *Portal) generateMessageInfo(sender *User) *types.MessageInfo {
|
|||
}
|
||||
}
|
||||
|
||||
func (portal *Portal) sendMessageMetrics(evt *event.Event, err error, part string) {
|
||||
var msgType string
|
||||
switch evt.Type {
|
||||
case event.EventMessage:
|
||||
msgType = "message"
|
||||
case event.EventReaction:
|
||||
msgType = "reaction"
|
||||
case event.EventRedaction:
|
||||
msgType = "redaction"
|
||||
default:
|
||||
msgType = "unknown event"
|
||||
}
|
||||
evtDescription := evt.ID.String()
|
||||
if evt.Type == event.EventRedaction {
|
||||
evtDescription += fmt.Sprintf(" of %s", evt.Redacts)
|
||||
}
|
||||
origEvtID := evt.ID
|
||||
if retryMeta := evt.Content.AsMessage().MessageSendRetry; retryMeta != nil {
|
||||
origEvtID = retryMeta.OriginalEventID
|
||||
}
|
||||
if err != nil {
|
||||
level := log.LevelError
|
||||
if part == "Ignoring" {
|
||||
level = log.LevelDebug
|
||||
}
|
||||
portal.log.Logfln(level, "%s %s %s from %s: %v", part, msgType, evtDescription, evt.Sender, err)
|
||||
reason, isCertain, _, sendNotice := errorToStatusReason(err)
|
||||
status := bridge.ReasonToCheckpointStatus(reason)
|
||||
portal.bridge.SendMessageCheckpoint(evt, bridge.MsgStepRemote, err, status, 0)
|
||||
if sendNotice {
|
||||
portal.sendErrorMessage(err.Error(), isCertain)
|
||||
}
|
||||
portal.sendStatusEvent(origEvtID, evt.ID, err)
|
||||
} else {
|
||||
portal.log.Debugfln("Handled Matrix %s %s", msgType, evtDescription)
|
||||
portal.sendDeliveryReceipt(evt.ID)
|
||||
portal.bridge.SendMessageSuccessCheckpoint(evt, bridge.MsgStepRemote, 0)
|
||||
portal.sendStatusEvent(origEvtID, evt.ID, nil)
|
||||
}
|
||||
}
|
||||
|
||||
func (portal *Portal) HandleMatrixMessage(sender *User, evt *event.Event) {
|
||||
if err := portal.canBridgeFrom(sender, true); err != nil {
|
||||
go portal.sendMessageMetrics(evt, err, "Ignoring")
|
||||
go portal.sendMessageMetrics(evt, err, "Ignoring", nil)
|
||||
return
|
||||
} else if portal.Key.JID == types.StatusBroadcastJID && portal.bridge.Config.Bridge.DisableStatusBroadcastSend {
|
||||
go portal.sendMessageMetrics(evt, errBroadcastSendDisabled, "Ignoring")
|
||||
go portal.sendMessageMetrics(evt, errBroadcastSendDisabled, "Ignoring", nil)
|
||||
return
|
||||
}
|
||||
|
||||
messageAge := time.Now().Sub(time.UnixMilli(evt.Timestamp))
|
||||
ms := metricSender{portal: portal}
|
||||
|
||||
origEvtID := evt.ID
|
||||
var dbMsg *database.Message
|
||||
if retryMeta := evt.Content.AsMessage().MessageSendRetry; retryMeta != nil {
|
||||
origEvtID = retryMeta.OriginalEventID
|
||||
dbMsg = portal.bridge.DB.Message.GetByMXID(origEvtID)
|
||||
if dbMsg != nil && dbMsg.Sent {
|
||||
portal.log.Debugfln("Ignoring retry request %s (#%d) for %s/%s from %s as message was already sent", evt.ID, retryMeta.RetryCount, origEvtID, dbMsg.JID, evt.Sender)
|
||||
go portal.sendMessageMetrics(evt, nil, "")
|
||||
portal.log.Debugfln("Ignoring retry request %s (#%d, age: %s) for %s/%s from %s as message was already sent", evt.ID, retryMeta.RetryCount, messageAge, origEvtID, dbMsg.JID, evt.Sender)
|
||||
go ms.sendMessageMetrics(evt, nil, "", true)
|
||||
return
|
||||
} else if dbMsg != nil {
|
||||
portal.log.Debugfln("Got retry request %s (#%d) for %s/%s from %s", evt.ID, retryMeta.RetryCount, origEvtID, dbMsg.JID, evt.Sender)
|
||||
portal.log.Debugfln("Got retry request %s (#%d, age: %s) for %s/%s from %s", evt.ID, retryMeta.RetryCount, messageAge, origEvtID, dbMsg.JID, evt.Sender)
|
||||
} else {
|
||||
portal.log.Debugfln("Got retry request %s (#%d) for %s from %s (original message not known)", evt.ID, retryMeta.RetryCount, origEvtID, evt.Sender)
|
||||
portal.log.Debugfln("Got retry request %s (#%d, age: %s) for %s from %s (original message not known)", evt.ID, retryMeta.RetryCount, messageAge, origEvtID, evt.Sender)
|
||||
}
|
||||
} else {
|
||||
portal.log.Debugfln("Received message %s from %s", evt.ID, evt.Sender)
|
||||
portal.log.Debugfln("Received message %s from %s (age: %s)", evt.ID, evt.Sender, messageAge)
|
||||
}
|
||||
|
||||
if portal.bridge.Config.Bridge.MessageHandlingTimeout.ErrorAfter > 0 {
|
||||
remainingTime := portal.bridge.Config.Bridge.MessageHandlingTimeout.ErrorAfter - messageAge
|
||||
if remainingTime < 0 {
|
||||
go ms.sendMessageMetrics(evt, fmt.Errorf("%w (message is %s old)", errTimeoutBeforeHandling, messageAge), "Timeout handling", true)
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
time.Sleep(remainingTime)
|
||||
ms.sendMessageMetrics(evt, errMessageTakingLong, "Timeout handling", false)
|
||||
}()
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
if portal.bridge.Config.Bridge.MessageHandlingDeadline > 0 {
|
||||
if portal.bridge.Config.Bridge.MessageHandlingTimeout.Deadline > 0 {
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(ctx, portal.bridge.Config.Bridge.MessageHandlingDeadline)
|
||||
ctx, cancel = context.WithTimeout(ctx, portal.bridge.Config.Bridge.MessageHandlingTimeout.Deadline)
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
msg, sender, err := portal.convertMatrixMessage(ctx, sender, evt)
|
||||
if msg == nil {
|
||||
go portal.sendMessageMetrics(evt, err, "Error converting")
|
||||
go ms.sendMessageMetrics(evt, err, "Error converting", true)
|
||||
return
|
||||
}
|
||||
portal.MarkDisappearing(origEvtID, portal.ExpirationTime, true)
|
||||
|
@ -3329,7 +3184,7 @@ func (portal *Portal) HandleMatrixMessage(sender *User, evt *event.Event) {
|
|||
}
|
||||
portal.log.Debugln("Sending event", evt.ID, "to WhatsApp", info.ID)
|
||||
ts, err := sender.Client.SendMessage(ctx, portal.Key.JID, info.ID, msg)
|
||||
go portal.sendMessageMetrics(evt, err, "Error sending")
|
||||
go ms.sendMessageMetrics(evt, err, "Error sending", true)
|
||||
if err == nil {
|
||||
dbMsg.MarkSent(ts)
|
||||
}
|
||||
|
@ -3337,12 +3192,12 @@ func (portal *Portal) HandleMatrixMessage(sender *User, evt *event.Event) {
|
|||
|
||||
func (portal *Portal) HandleMatrixReaction(sender *User, evt *event.Event) {
|
||||
if err := portal.canBridgeFrom(sender, false); err != nil {
|
||||
go portal.sendMessageMetrics(evt, err, "Ignoring")
|
||||
go portal.sendMessageMetrics(evt, err, "Ignoring", nil)
|
||||
return
|
||||
} else if portal.Key.JID.Server == types.BroadcastServer {
|
||||
// TODO implement this, probably by only sending the reaction to the sender of the status message?
|
||||
// (whatsapp hasn't published the feature yet)
|
||||
go portal.sendMessageMetrics(evt, errBroadcastReactionNotSupported, "Ignoring")
|
||||
go portal.sendMessageMetrics(evt, errBroadcastReactionNotSupported, "Ignoring", nil)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -3359,7 +3214,7 @@ func (portal *Portal) HandleMatrixReaction(sender *User, evt *event.Event) {
|
|||
|
||||
portal.log.Debugfln("Received reaction event %s from %s", evt.ID, evt.Sender)
|
||||
err := portal.handleMatrixReaction(sender, evt)
|
||||
go portal.sendMessageMetrics(evt, err, "Error sending")
|
||||
go portal.sendMessageMetrics(evt, err, "Error sending", nil)
|
||||
}
|
||||
|
||||
func (portal *Portal) handleMatrixReaction(sender *User, evt *event.Event) error {
|
||||
|
@ -3433,7 +3288,7 @@ func (portal *Portal) upsertReaction(intent *appservice.IntentAPI, targetJID typ
|
|||
|
||||
func (portal *Portal) HandleMatrixRedaction(sender *User, evt *event.Event) {
|
||||
if err := portal.canBridgeFrom(sender, true); err != nil {
|
||||
go portal.sendMessageMetrics(evt, err, "Ignoring")
|
||||
go portal.sendMessageMetrics(evt, err, "Ignoring", nil)
|
||||
return
|
||||
}
|
||||
portal.log.Debugfln("Received redaction %s from %s", evt.ID, evt.Sender)
|
||||
|
@ -3446,28 +3301,28 @@ func (portal *Portal) HandleMatrixRedaction(sender *User, evt *event.Event) {
|
|||
|
||||
msg := portal.bridge.DB.Message.GetByMXID(evt.Redacts)
|
||||
if msg == nil {
|
||||
go portal.sendMessageMetrics(evt, errTargetNotFound, "Ignoring")
|
||||
go portal.sendMessageMetrics(evt, errTargetNotFound, "Ignoring", nil)
|
||||
} else if msg.IsFakeJID() {
|
||||
go portal.sendMessageMetrics(evt, errTargetIsFake, "Ignoring")
|
||||
go portal.sendMessageMetrics(evt, errTargetIsFake, "Ignoring", nil)
|
||||
} else if msg.Sender.User != sender.JID.User {
|
||||
go portal.sendMessageMetrics(evt, errTargetSentBySomeoneElse, "Ignoring")
|
||||
go portal.sendMessageMetrics(evt, errTargetSentBySomeoneElse, "Ignoring", nil)
|
||||
} else if portal.Key.JID == types.StatusBroadcastJID && portal.bridge.Config.Bridge.DisableStatusBroadcastSend {
|
||||
go portal.sendMessageMetrics(evt, errBroadcastSendDisabled, "Ignoring")
|
||||
go portal.sendMessageMetrics(evt, errBroadcastSendDisabled, "Ignoring", nil)
|
||||
return
|
||||
} else if msg.Type == database.MsgReaction {
|
||||
if reaction := portal.bridge.DB.Reaction.GetByMXID(evt.Redacts); reaction == nil {
|
||||
go portal.sendMessageMetrics(evt, errReactionDatabaseNotFound, "Ignoring")
|
||||
go portal.sendMessageMetrics(evt, errReactionDatabaseNotFound, "Ignoring", nil)
|
||||
} else if reactionTarget := reaction.GetTarget(); reactionTarget == nil {
|
||||
go portal.sendMessageMetrics(evt, errReactionTargetNotFound, "Ignoring")
|
||||
go portal.sendMessageMetrics(evt, errReactionTargetNotFound, "Ignoring", nil)
|
||||
} else {
|
||||
portal.log.Debugfln("Sending redaction reaction %s of %s/%s to WhatsApp", evt.ID, msg.MXID, msg.JID)
|
||||
_, err := portal.sendReactionToWhatsApp(sender, "", reactionTarget, "", evt.Timestamp)
|
||||
go portal.sendMessageMetrics(evt, err, "Error sending")
|
||||
go portal.sendMessageMetrics(evt, err, "Error sending", nil)
|
||||
}
|
||||
} else {
|
||||
portal.log.Debugfln("Sending redaction %s of %s/%s to WhatsApp", evt.ID, msg.MXID, msg.JID)
|
||||
_, err := sender.Client.RevokeMessage(portal.Key.JID, msg.JID)
|
||||
go portal.sendMessageMetrics(evt, err, "Error sending")
|
||||
go portal.sendMessageMetrics(evt, err, "Error sending", nil)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue