Handle history sync payloads with MSC2716 batch sending

This commit is contained in:
Tulir Asokan 2021-10-26 17:01:10 +03:00
parent b45e6b29f5
commit 0b4d0bbbe1
14 changed files with 600 additions and 249 deletions

View file

@ -400,7 +400,7 @@ func (handler *CommandHandler) CommandLogin(ce *CommandEvent) {
select {
case success := <-loginChan:
ce.Reply("Successfully logged in as +%s", success.ID.User)
ce.Reply("Successfully logged in as +%s (device #%d)", success.ID.User, success.ID.Device)
cancel()
case <-ctx.Done():
ce.Reply("Login timed out")
@ -434,6 +434,7 @@ func (user *User) loginQrChannel(ctx context.Context, ce *CommandEvent, qrChan <
select {
case <-time.After(qrEvt.Timeout):
if len(qrEvt.Codes) == 0 {
_, _ = bot.RedactEvent(ce.RoomID, qrEventID)
cancel()
return
}
@ -461,6 +462,7 @@ func (user *User) loginQrChannel(ctx context.Context, ce *CommandEvent, qrChan <
user.log.Errorln("Failed to send edited QR code to user:", err)
}
case <-ctx.Done():
_, _ = bot.RedactEvent(ce.RoomID, qrEventID)
return
}
}

View file

@ -58,16 +58,16 @@ type BridgeConfig struct {
DefaultBridgePresence bool `yaml:"default_bridge_presence"`
LoginSharedSecret string `yaml:"login_shared_secret"`
InviteOwnPuppetForBackfilling bool `yaml:"invite_own_puppet_for_backfilling"`
PrivateChatPortalMeta bool `yaml:"private_chat_portal_meta"`
BridgeNotices bool `yaml:"bridge_notices"`
ResendBridgeInfo bool `yaml:"resend_bridge_info"`
MuteBridging bool `yaml:"mute_bridging"`
ArchiveTag string `yaml:"archive_tag"`
PinnedTag string `yaml:"pinned_tag"`
TagOnlyOnCreate bool `yaml:"tag_only_on_create"`
MarkReadOnlyOnCreate bool `yaml:"mark_read_only_on_create"`
EnableStatusBroadcast bool `yaml:"enable_status_broadcast"`
DoublePuppetBackfill bool `yaml:"double_puppet_backfill"`
PrivateChatPortalMeta bool `yaml:"private_chat_portal_meta"`
BridgeNotices bool `yaml:"bridge_notices"`
ResendBridgeInfo bool `yaml:"resend_bridge_info"`
MuteBridging bool `yaml:"mute_bridging"`
ArchiveTag string `yaml:"archive_tag"`
PinnedTag string `yaml:"pinned_tag"`
TagOnlyOnCreate bool `yaml:"tag_only_on_create"`
MarkReadOnlyOnCreate bool `yaml:"mark_read_only_on_create"`
EnableStatusBroadcast bool `yaml:"enable_status_broadcast"`
WhatsappThumbnail bool `yaml:"whatsapp_thumbnail"`
@ -118,7 +118,7 @@ func (bc *BridgeConfig) setDefaults() {
bc.DefaultBridgeReceipts = true
bc.LoginSharedSecret = ""
bc.InviteOwnPuppetForBackfilling = true
bc.DoublePuppetBackfill = false
bc.PrivateChatPortalMeta = false
bc.BridgeNotices = true
bc.EnableStatusBroadcast = true

View file

@ -19,14 +19,19 @@ package database
import (
"database/sql"
_ "github.com/lib/pq"
"github.com/lib/pq"
_ "github.com/mattn/go-sqlite3"
log "maunium.net/go/maulogger/v2"
"go.mau.fi/whatsmeow/store/sqlstore"
"maunium.net/go/mautrix-whatsapp/database/upgrades"
)
func init() {
sqlstore.PostgresArrayWrapper = pq.Array
}
type Database struct {
*sql.DB
log log.Logger

View file

@ -22,6 +22,7 @@ import (
"time"
log "maunium.net/go/maulogger/v2"
"maunium.net/go/mautrix/id"
"go.mau.fi/whatsmeow/types"
@ -62,20 +63,26 @@ func (mq *MessageQuery) GetByMXID(mxid id.EventID) *Message {
}
func (mq *MessageQuery) GetLastInChat(chat PortalKey) *Message {
return mq.GetLastInChatBefore(chat, time.Now().Unix()+60)
return mq.GetLastInChatBefore(chat, time.Now().Add(60 * time.Second))
}
func (mq *MessageQuery) GetLastInChatBefore(chat PortalKey, maxTimestamp int64) *Message {
func (mq *MessageQuery) GetLastInChatBefore(chat PortalKey, maxTimestamp time.Time) *Message {
msg := mq.get("SELECT chat_jid, chat_receiver, jid, mxid, sender, timestamp, sent "+
"FROM message WHERE chat_jid=$1 AND chat_receiver=$2 AND timestamp<=$3 AND sent=true ORDER BY timestamp DESC LIMIT 1",
chat.JID, chat.Receiver, maxTimestamp)
if msg == nil || msg.Timestamp == 0 {
chat.JID, chat.Receiver, maxTimestamp.Unix())
if msg == nil || msg.Timestamp.IsZero() {
// Old db, we don't know what the last message is.
return nil
}
return msg
}
func (mq *MessageQuery) GetFirstInChat(chat PortalKey) *Message {
return mq.get("SELECT chat_jid, chat_receiver, jid, mxid, sender, timestamp, sent "+
"FROM message WHERE chat_jid=$1 AND chat_receiver=$2 AND sent=true ORDER BY timestamp ASC LIMIT 1",
chat.JID, chat.Receiver)
}
func (mq *MessageQuery) get(query string, args ...interface{}) *Message {
row := mq.db.QueryRow(query, args...)
if row == nil {
@ -92,7 +99,7 @@ type Message struct {
JID types.MessageID
MXID id.EventID
Sender types.JID
Timestamp int64
Timestamp time.Time
Sent bool
}
@ -101,14 +108,17 @@ func (msg *Message) IsFakeMXID() bool {
}
func (msg *Message) Scan(row Scannable) *Message {
err := row.Scan(&msg.Chat.JID, &msg.Chat.Receiver, &msg.JID, &msg.MXID, &msg.Sender, &msg.Timestamp, &msg.Sent)
var ts int64
err := row.Scan(&msg.Chat.JID, &msg.Chat.Receiver, &msg.JID, &msg.MXID, &msg.Sender, &ts, &msg.Sent)
if err != nil {
if err != sql.ErrNoRows {
msg.log.Errorln("Database scan failed:", err)
}
return nil
}
if ts != 0 {
msg.Timestamp = time.Unix(ts, 0)
}
return msg
}
@ -116,7 +126,7 @@ func (msg *Message) Insert() {
_, err := msg.db.Exec(`INSERT INTO message
(chat_jid, chat_receiver, jid, mxid, sender, timestamp, sent)
VALUES ($1, $2, $3, $4, $5, $6, $7)`,
msg.Chat.JID, msg.Chat.Receiver, msg.JID, msg.MXID, msg.Sender, msg.Timestamp, msg.Sent)
msg.Chat.JID, msg.Chat.Receiver, msg.JID, msg.MXID, msg.Sender, msg.Timestamp.Unix(), msg.Sent)
if err != nil {
msg.log.Warnfln("Failed to insert %s@%s: %v", msg.Chat, msg.JID, err)
}

View file

@ -20,6 +20,7 @@ import (
"database/sql"
log "maunium.net/go/maulogger/v2"
"maunium.net/go/mautrix/id"
"go.mau.fi/whatsmeow/types"
@ -31,15 +32,14 @@ type PortalKey struct {
}
func GroupPortalKey(jid types.JID) PortalKey {
return PortalKey{
JID: jid.ToNonAD(),
Receiver: jid.ToNonAD(),
}
return NewPortalKey(jid, jid)
}
func NewPortalKey(jid, receiver types.JID) PortalKey {
if jid.Server == types.GroupServer {
receiver = jid
} else if jid.Server == types.LegacyUserServer {
jid.Server = types.DefaultUserServer
}
return PortalKey{
JID: jid.ToNonAD(),
@ -79,11 +79,11 @@ func (pq *PortalQuery) GetByMXID(mxid id.RoomID) *Portal {
}
func (pq *PortalQuery) GetAllByJID(jid types.JID) []*Portal {
return pq.getAll("SELECT * FROM portal WHERE jid=$1", jid)
return pq.getAll("SELECT * FROM portal WHERE jid=$1", jid.ToNonAD())
}
func (pq *PortalQuery) FindPrivateChats(receiver types.JID) []*Portal {
return pq.getAll("SELECT * FROM portal WHERE receiver='$1@s.whatsapp.net' AND jid LIKE '%@s.whatsapp.net'", receiver)
return pq.getAll("SELECT * FROM portal WHERE receiver=$1 AND jid LIKE '%@s.whatsapp.net'", receiver.ToNonAD())
}
func (pq *PortalQuery) getAll(query string, args ...interface{}) (portals []*Portal) {
@ -118,11 +118,14 @@ type Portal struct {
Avatar string
AvatarURL id.ContentURI
Encrypted bool
FirstEventID id.EventID
NextBatchID id.BatchID
}
func (portal *Portal) Scan(row Scannable) *Portal {
var mxid, avatarURL sql.NullString
err := row.Scan(&portal.Key.JID, &portal.Key.Receiver, &mxid, &portal.Name, &portal.Topic, &portal.Avatar, &avatarURL, &portal.Encrypted)
var mxid, avatarURL, firstEventID, nextBatchID sql.NullString
err := row.Scan(&portal.Key.JID, &portal.Key.Receiver, &mxid, &portal.Name, &portal.Topic, &portal.Avatar, &avatarURL, &portal.Encrypted, &firstEventID, &nextBatchID)
if err != nil {
if err != sql.ErrNoRows {
portal.log.Errorln("Database scan failed:", err)
@ -131,6 +134,8 @@ func (portal *Portal) Scan(row Scannable) *Portal {
}
portal.MXID = id.RoomID(mxid.String)
portal.AvatarURL, _ = id.ParseContentURI(avatarURL.String)
portal.FirstEventID = id.EventID(firstEventID.String)
portal.NextBatchID = id.BatchID(nextBatchID.String)
return portal
}
@ -142,20 +147,16 @@ func (portal *Portal) mxidPtr() *id.RoomID {
}
func (portal *Portal) Insert() {
_, err := portal.db.Exec("INSERT INTO portal (jid, receiver, mxid, name, topic, avatar, avatar_url, encrypted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
portal.Key.JID, portal.Key.Receiver, portal.mxidPtr(), portal.Name, portal.Topic, portal.Avatar, portal.AvatarURL.String(), portal.Encrypted)
_, err := portal.db.Exec("INSERT INTO portal (jid, receiver, mxid, name, topic, avatar, avatar_url, encrypted, first_event_id, next_batch_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
portal.Key.JID, portal.Key.Receiver, portal.mxidPtr(), portal.Name, portal.Topic, portal.Avatar, portal.AvatarURL.String(), portal.Encrypted, portal.FirstEventID.String(), portal.NextBatchID.String())
if err != nil {
portal.log.Warnfln("Failed to insert %s: %v", portal.Key, err)
}
}
func (portal *Portal) Update() {
var mxid *id.RoomID
if len(portal.MXID) > 0 {
mxid = &portal.MXID
}
_, err := portal.db.Exec("UPDATE portal SET mxid=$1, name=$2, topic=$3, avatar=$4, avatar_url=$5, encrypted=$6 WHERE jid=$7 AND receiver=$8",
mxid, portal.Name, portal.Topic, portal.Avatar, portal.AvatarURL.String(), portal.Encrypted, portal.Key.JID, portal.Key.Receiver)
_, err := portal.db.Exec("UPDATE portal SET mxid=$1, name=$2, topic=$3, avatar=$4, avatar_url=$5, encrypted=$6, first_event_id=$7, next_batch_id=$8 WHERE jid=$9 AND receiver=$10",
portal.mxidPtr(), portal.Name, portal.Topic, portal.Avatar, portal.AvatarURL.String(), portal.Encrypted, portal.FirstEventID.String(), portal.NextBatchID.String(), portal.Key.JID, portal.Key.Receiver)
if err != nil {
portal.log.Warnfln("Failed to update %s: %v", portal.Key, err)
}

View file

@ -0,0 +1,19 @@
package upgrades
import (
"database/sql"
)
func init() {
upgrades[26] = upgrade{"Add columns to store infinite backfill pointers for portals", func(tx *sql.Tx, ctx context) error {
_, err := tx.Exec(`ALTER TABLE portal ADD COLUMN first_event_id TEXT NOT NULL DEFAULT ''`)
if err != nil {
return err
}
_, err = tx.Exec(`ALTER TABLE portal ADD COLUMN next_batch_id TEXT NOT NULL DEFAULT ''`)
if err != nil {
return err
}
return nil
}}
}

View file

@ -39,7 +39,7 @@ type upgrade struct {
fn upgradeFunc
}
const NumberOfUpgrades = 26
const NumberOfUpgrades = 27
var upgrades [NumberOfUpgrades]upgrade

View file

@ -147,11 +147,9 @@ bridge:
# manually.
login_shared_secret: null
# Whether or not to invite own WhatsApp user's Matrix puppet into private
# chat portals when backfilling if needed.
# This always uses the default puppet instead of custom puppets due to
# rate limits and timestamp massaging.
invite_own_puppet_for_backfilling: true
# Whether to use custom puppet for backfilling.
# In order to use this, the custom puppets must be in the appservice's user ID namespace.
double_puppet_backfill: false
# Whether or not to explicitly set the avatar and room name for private
# chat portal rooms. This can be useful if the previous field works fine,
# but causes room avatar/name bugs.

4
go.mod
View file

@ -8,13 +8,13 @@ require (
github.com/mattn/go-sqlite3 v1.14.9
github.com/prometheus/client_golang v1.11.0
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
go.mau.fi/whatsmeow v0.0.0-20211024175202-609be38a9f28
go.mau.fi/whatsmeow v0.0.0-20211026140006-b484ee326162
golang.org/x/image v0.0.0-20210628002857-a66eb6448b8d
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v2 v2.4.0
maunium.net/go/mauflag v1.0.0
maunium.net/go/maulogger/v2 v2.3.0
maunium.net/go/mautrix v0.9.29
maunium.net/go/mautrix v0.9.30
)
require (

8
go.sum
View file

@ -141,8 +141,8 @@ github.com/tidwall/sjson v1.1.5 h1:wsUceI/XDyZk3J1FUvuuYlK62zJv2HO2Pzb8A5EWdUE=
github.com/tidwall/sjson v1.1.5/go.mod h1:VuJzsZnTowhSxWdOgsAnb886i4AjEyTkk7tNtsL7EYE=
go.mau.fi/libsignal v0.0.0-20211024113310-f9fc6a1855f2 h1:xpQTMgJGGaF+c8jV/LA/FVXAPJxZbSAGeflOc+Ly6uQ=
go.mau.fi/libsignal v0.0.0-20211024113310-f9fc6a1855f2/go.mod h1:3XlVlwOfp8f9Wri+C1D4ORqgUsN4ZvunJOoPjQMBhos=
go.mau.fi/whatsmeow v0.0.0-20211024175202-609be38a9f28 h1:BP4f/gLWTjefHYeTHcUybEJelp57rWkV27kpUC64GsY=
go.mau.fi/whatsmeow v0.0.0-20211024175202-609be38a9f28/go.mod h1:8FQjyDWAghfKYj9xTAxS23PQwlhjr2cgEGm9rfSA+cg=
go.mau.fi/whatsmeow v0.0.0-20211026140006-b484ee326162 h1:nwQ9gDQsvAmhW6B2a97RV0bkO9PEb7C7UZiMEYADRtw=
go.mau.fi/whatsmeow v0.0.0-20211026140006-b484ee326162/go.mod h1:ODEmmqeUn9eBDQHFc1S902YA3YFLtmaBujYRRFl53jI=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@ -224,5 +224,5 @@ maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfk
maunium.net/go/maulogger/v2 v2.2.4/go.mod h1:TYWy7wKwz/tIXTpsx8G3mZseIRiC5DoMxSZazOHy68A=
maunium.net/go/maulogger/v2 v2.3.0 h1:TMCcO65fLk6+pJXo7sl38tzjzW0KBFgc6JWJMBJp4GE=
maunium.net/go/maulogger/v2 v2.3.0/go.mod h1:TYWy7wKwz/tIXTpsx8G3mZseIRiC5DoMxSZazOHy68A=
maunium.net/go/mautrix v0.9.29 h1:qJyTSZQuogkkEFrJd+oZiTuE/6Cq7ca3wxiLYadYUoM=
maunium.net/go/mautrix v0.9.29/go.mod h1:7IzKfWvpQtN+W2Lzxc0rLvIxFM3ryKX6Ys3S/ZoWbg8=
maunium.net/go/mautrix v0.9.30 h1:iOJ9Cl576jxCL1x8J3bKQx29nc5hoewZlVyPmNWdzF8=
maunium.net/go/mautrix v0.9.30/go.mod h1:7IzKfWvpQtN+W2Lzxc0rLvIxFM3ryKX6Ys3S/ZoWbg8=

View file

@ -433,6 +433,7 @@ func (bridge *Bridge) Stop() {
}
bridge.Log.Debugln("Disconnecting", user.MXID)
user.Client.Disconnect()
close(user.historySyncs)
}
}

657
portal.go
View file

@ -173,6 +173,7 @@ type Portal struct {
roomCreateLock sync.Mutex
encryptLock sync.Mutex
backfillLock sync.Mutex
recentlyHandled [recentlyHandledLength]types.MessageID
recentlyHandledLock sync.Mutex
@ -211,7 +212,7 @@ func (portal *Portal) handleMessageLoop() {
portal.syncDoublePuppetDetailsAfterCreate(msg.source)
}
//portal.backfillLock.Lock()
portal.handleMessage(msg, false)
portal.handleMessage(msg.source, msg.evt)
//portal.backfillLock.Unlock()
}
}
@ -238,52 +239,107 @@ func (portal *Portal) shouldCreateRoom(msg PortalMessage) bool {
return false
}
func (portal *Portal) handleMessage(msg PortalMessage, isBackfill bool) {
func (portal *Portal) getMessageType(waMsg *waProto.Message) string {
switch {
case waMsg == nil:
return "ignore"
case waMsg.Conversation != nil || waMsg.ExtendedTextMessage != nil:
return "text"
case waMsg.ImageMessage != nil:
return fmt.Sprintf("image %s", waMsg.GetImageMessage().GetMimetype())
case waMsg.StickerMessage != nil:
return fmt.Sprintf("sticker %s", waMsg.GetStickerMessage().GetMimetype())
case waMsg.VideoMessage != nil:
return fmt.Sprintf("video %s", waMsg.GetVideoMessage().GetMimetype())
case waMsg.AudioMessage != nil:
return fmt.Sprintf("audio %s", waMsg.GetAudioMessage().GetMimetype())
case waMsg.DocumentMessage != nil:
return fmt.Sprintf("document %s", waMsg.GetDocumentMessage().GetMimetype())
case waMsg.ContactMessage != nil:
return "contact"
case waMsg.LocationMessage != nil:
return "location"
case waMsg.GetProtocolMessage() != nil:
switch waMsg.GetProtocolMessage().GetType() {
case waProto.ProtocolMessage_REVOKE:
return "revoke"
case waProto.ProtocolMessage_APP_STATE_SYNC_KEY_SHARE, waProto.ProtocolMessage_HISTORY_SYNC_NOTIFICATION:
return "ignore"
default:
return "unknown_protocol"
}
default:
return "unknown"
}
}
func (portal *Portal) convertMessage(intent *appservice.IntentAPI, source *User, info *types.MessageInfo, waMsg *waProto.Message) *ConvertedMessage {
switch {
case waMsg.Conversation != nil || waMsg.ExtendedTextMessage != nil:
return portal.convertTextMessage(intent, waMsg)
case waMsg.ImageMessage != nil:
return portal.convertMediaMessage(intent, source, info, waMsg.GetImageMessage())
case waMsg.StickerMessage != nil:
return portal.convertMediaMessage(intent, source, info, waMsg.GetStickerMessage())
case waMsg.VideoMessage != nil:
return portal.convertMediaMessage(intent, source, info, waMsg.GetVideoMessage())
case waMsg.AudioMessage != nil:
return portal.convertMediaMessage(intent, source, info, waMsg.GetAudioMessage())
case waMsg.DocumentMessage != nil:
return portal.convertMediaMessage(intent, source, info, waMsg.GetDocumentMessage())
case waMsg.ContactMessage != nil:
return portal.convertContactMessage(intent, waMsg.GetContactMessage())
case waMsg.LocationMessage != nil:
return portal.convertLocationMessage(intent, waMsg.GetLocationMessage())
default:
return nil
}
}
func (portal *Portal) handleMessage(source *User, evt *events.Message) {
if len(portal.MXID) == 0 {
portal.log.Warnln("handleMessage called even though portal.MXID is empty")
return
}
var triedToHandle bool
var trackMessageCallback func()
var typeName string
if !isBackfill {
trackMessageCallback = portal.bridge.Metrics.TrackWhatsAppMessage(msg.evt.Info.Timestamp, typeName)
msgID := evt.Info.ID
msgType := portal.getMessageType(evt.Message)
if msgType == "ignore" {
return
} else if portal.isRecentlyHandled(msgID) {
portal.log.Debugfln("Not handling %s (%s): message was recently handled", msgID, msgType)
return
} else if portal.isDuplicate(msgID) {
portal.log.Debugfln("Not handling %s (%s): message is duplicate", msgID, msgType)
return
}
waMsg := msg.evt.Message
switch {
case waMsg.Conversation != nil || waMsg.ExtendedTextMessage != nil:
typeName = "text"
triedToHandle = portal.HandleTextMessage(msg.source, msg.evt)
case waMsg.ImageMessage != nil:
typeName = "image"
triedToHandle = portal.HandleMediaMessage(msg.source, &msg.evt.Info, waMsg.GetImageMessage())
case waMsg.StickerMessage != nil:
typeName = "sticker"
triedToHandle = portal.HandleMediaMessage(msg.source, &msg.evt.Info, waMsg.GetStickerMessage())
case waMsg.VideoMessage != nil:
typeName = "video"
triedToHandle = portal.HandleMediaMessage(msg.source, &msg.evt.Info, waMsg.GetVideoMessage())
case waMsg.AudioMessage != nil:
typeName = "audio"
triedToHandle = portal.HandleMediaMessage(msg.source, &msg.evt.Info, waMsg.GetAudioMessage())
case waMsg.DocumentMessage != nil:
typeName = "document"
triedToHandle = portal.HandleMediaMessage(msg.source, &msg.evt.Info, waMsg.GetDocumentMessage())
case waMsg.ContactMessage != nil:
typeName = "contact"
triedToHandle = portal.HandleContactMessage(msg.source, &msg.evt.Info, waMsg.GetContactMessage())
case waMsg.LocationMessage != nil:
typeName = "location"
triedToHandle = portal.HandleLocationMessage(msg.source, &msg.evt.Info, waMsg.GetLocationMessage())
case waMsg.GetProtocolMessage() != nil && waMsg.GetProtocolMessage().GetType() == waProto.ProtocolMessage_REVOKE:
typeName = "revoke"
triedToHandle = portal.HandleMessageRevoke(msg.source, waMsg.GetProtocolMessage().GetKey())
default:
portal.log.Warnln("Unhandled message:", msg.evt.Info, msg.evt.Message)
}
if triedToHandle && trackMessageCallback != nil {
trackMessageCallback()
intent := portal.getMessageIntent(source, &evt.Info)
converted := portal.convertMessage(intent, source, &evt.Info, evt.Message)
if converted != nil {
var eventID id.EventID
resp, err := portal.sendMessage(converted.Intent, converted.Type, converted.Content, evt.Info.Timestamp.UnixMilli())
if err != nil {
portal.log.Errorln("Failed to send %s to Matrix: %v", msgID, err)
} else {
eventID = resp.EventID
}
if converted.Caption != nil {
resp, err = portal.sendMessage(converted.Intent, converted.Type, converted.Content, evt.Info.Timestamp.UnixMilli())
if err != nil {
portal.log.Errorln("Failed to send caption of %s to Matrix: %v", msgID, err)
} else {
eventID = resp.EventID
}
}
if len(eventID) != 0 {
portal.finishHandling(&evt.Info, resp.EventID)
}
} else if msgType == "revoke" {
portal.HandleMessageRevoke(source, evt.Message.GetProtocolMessage().GetKey())
} else {
portal.log.Warnln("Unhandled message:", evt.Info, evt.Message)
return
}
portal.bridge.Metrics.TrackWhatsAppMessage(evt.Info.Timestamp, strings.Split(msgType, " ")[0])
}
func (portal *Portal) isRecentlyHandled(id types.MessageID) bool {
@ -308,24 +364,38 @@ func init() {
gob.Register(&waProto.Message{})
}
func (portal *Portal) markHandled(source *User, info *types.MessageInfo, mxid id.EventID, isSent bool) *database.Message {
func (portal *Portal) markHandled(info *types.MessageInfo, mxid id.EventID, isSent, recent bool) *database.Message {
msg := portal.bridge.DB.Message.New()
msg.Chat = portal.Key
msg.JID = info.ID
msg.MXID = mxid
msg.Timestamp = info.Timestamp.Unix()
msg.Timestamp = info.Timestamp
msg.Sender = info.Sender
msg.Sent = isSent
msg.Insert()
portal.recentlyHandledLock.Lock()
index := portal.recentlyHandledIndex
portal.recentlyHandledIndex = (portal.recentlyHandledIndex + 1) % recentlyHandledLength
portal.recentlyHandledLock.Unlock()
portal.recentlyHandled[index] = msg.JID
if recent {
portal.recentlyHandledLock.Lock()
index := portal.recentlyHandledIndex
portal.recentlyHandledIndex = (portal.recentlyHandledIndex + 1) % recentlyHandledLength
portal.recentlyHandledLock.Unlock()
portal.recentlyHandled[index] = msg.JID
}
return msg
}
func (portal *Portal) getMessagePuppet(user *User, info *types.MessageInfo) *Puppet {
if info.IsFromMe {
return portal.bridge.GetPuppetByJID(user.JID)
} else if portal.IsPrivateChat() {
return portal.bridge.GetPuppetByJID(portal.Key.JID)
} else {
puppet := portal.bridge.GetPuppetByJID(info.Sender)
puppet.SyncContact(user, true)
return puppet
}
}
func (portal *Portal) getMessageIntent(user *User, info *types.MessageInfo) *appservice.IntentAPI {
if info.IsFromMe {
return portal.bridge.GetPuppetByJID(user.JID).IntentFor(portal)
@ -337,25 +407,8 @@ func (portal *Portal) getMessageIntent(user *User, info *types.MessageInfo) *app
return puppet.IntentFor(portal)
}
func (portal *Portal) startHandling(source *User, info *types.MessageInfo, msgType string) *appservice.IntentAPI {
if portal.isRecentlyHandled(info.ID) {
portal.log.Debugfln("Not handling %s (%s): message was recently handled", info.ID, msgType)
} else if portal.isDuplicate(info.ID) {
portal.log.Debugfln("Not handling %s (%s): message is duplicate", info.ID, msgType)
} else {
intent := portal.getMessageIntent(source, info)
if intent != nil {
portal.log.Debugfln("Starting handling of %s (%s, ts: %d)", info.ID, msgType, info.Timestamp)
} else {
portal.log.Debugfln("Not handling %s (%s): sender is not known", info.ID, msgType)
}
return intent
}
return nil
}
func (portal *Portal) finishHandling(source *User, message *types.MessageInfo, mxid id.EventID) {
portal.markHandled(source, message, mxid, true)
func (portal *Portal) finishHandling(message *types.MessageInfo, mxid id.EventID) {
portal.markHandled(message, mxid, true, true)
portal.sendDeliveryReceipt(mxid)
portal.log.Debugln("Handled message", message.ID, "->", mxid)
}
@ -364,19 +417,19 @@ func (portal *Portal) kickExtraUsers(participantMap map[types.JID]bool) {
members, err := portal.MainIntent().JoinedMembers(portal.MXID)
if err != nil {
portal.log.Warnln("Failed to get member list:", err)
} else {
for member := range members.Joined {
jid, ok := portal.bridge.ParsePuppetMXID(member)
if ok {
_, shouldBePresent := participantMap[jid]
if !shouldBePresent {
_, err = portal.MainIntent().KickUser(portal.MXID, &mautrix.ReqKickUser{
UserID: member,
Reason: "User had left this WhatsApp chat",
})
if err != nil {
portal.log.Warnfln("Failed to kick user %s who had left: %v", member, err)
}
return
}
for member := range members.Joined {
jid, ok := portal.bridge.ParsePuppetMXID(member)
if ok {
_, shouldBePresent := participantMap[jid]
if !shouldBePresent {
_, err = portal.MainIntent().KickUser(portal.MXID, &mautrix.ReqKickUser{
UserID: member,
Reason: "User had left this WhatsApp chat",
})
if err != nil {
portal.log.Warnfln("Failed to kick user %s who had left: %v", member, err)
}
}
}
@ -758,6 +811,237 @@ func (portal *Portal) RestrictMetadataChanges(restrict bool) id.EventID {
return ""
}
func (portal *Portal) parseWebMessageInfo(webMsg *waProto.WebMessageInfo) *types.MessageInfo {
info := types.MessageInfo{
MessageSource: types.MessageSource{
Chat: portal.Key.JID,
IsFromMe: webMsg.GetKey().GetFromMe(),
IsGroup: false,
},
ID: webMsg.GetKey().GetId(),
PushName: webMsg.GetPushName(),
Timestamp: time.Unix(int64(webMsg.GetMessageTimestamp()), 0),
}
var err error
if info.IsFromMe {
info.Sender = portal.Key.Receiver
} else if portal.IsPrivateChat() {
info.Sender = portal.Key.JID
} else if webMsg.GetParticipant() != "" {
info.Sender, err = types.ParseJID(webMsg.GetParticipant())
} else if webMsg.GetKey().GetParticipant() != "" {
info.Sender, err = types.ParseJID(webMsg.GetKey().GetParticipant())
}
if info.Sender.IsEmpty() {
portal.log.Warnfln("Failed to get sender of message %s (parse error: %v)", info.ID, err)
return nil
}
return &info
}
const backfillIDField = "net.maunium.whatsapp.id"
func (portal *Portal) wrapBatchEvent(info *types.MessageInfo, intent *appservice.IntentAPI, eventType event.Type, content *event.MessageEventContent) (*event.Event, error) {
wrappedContent := event.Content{
Parsed: content,
Raw: map[string]interface{}{
backfillIDField: info.ID,
},
}
if intent.IsCustomPuppet {
wrappedContent.Raw["net.maunium.whatsapp.puppet"] = intent.IsCustomPuppet
}
newEventType, err := portal.encrypt(&wrappedContent, eventType)
if err != nil {
return nil, err
}
return &event.Event{
Sender: intent.UserID,
Type: newEventType,
Timestamp: info.Timestamp.UnixMilli(),
Content: wrappedContent,
}, nil
}
func (portal *Portal) appendBatchEvents(converted *ConvertedMessage, info *types.MessageInfo, eventsArray *[]*event.Event, infoArray *[]*types.MessageInfo) error {
mainEvt, err := portal.wrapBatchEvent(info, converted.Intent, converted.Type, converted.Content)
if err != nil {
return err
}
if converted.Caption != nil {
captionEvt, err := portal.wrapBatchEvent(info, converted.Intent, converted.Type, converted.Caption)
if err != nil {
return err
}
*eventsArray = append(*eventsArray, mainEvt, captionEvt)
*infoArray = append(*infoArray, nil, info)
} else {
*eventsArray = append(*eventsArray, mainEvt)
*infoArray = append(*infoArray, info)
}
return nil
}
func (portal *Portal) finishBatch(eventIDs []id.EventID, infos []*types.MessageInfo) {
if len(eventIDs) != len(infos) {
portal.log.Errorfln("Length of event IDs (%d) and message infos (%d) doesn't match! Using slow path for mapping event IDs", len(eventIDs), len(infos))
infoMap := make(map[types.MessageID]*types.MessageInfo, len(infos))
for _, info := range infos {
infoMap[info.ID] = info
}
for _, eventID := range eventIDs {
if evt, err := portal.MainIntent().GetEvent(portal.MXID, eventID); err != nil {
portal.log.Warnfln("Failed to get event %s to register it in the database: %v", eventID, err)
} else if msgID, ok := evt.Content.Raw[backfillIDField].(string); !ok {
portal.log.Warnfln("Event %s doesn't include the WhatsApp message ID", eventID)
} else if info, ok := infoMap[types.MessageID(msgID)]; !ok {
portal.log.Warnfln("Didn't find info of message %s (event %s) to register it in the database", msgID, eventID)
} else {
portal.markHandled(info, eventID, true, false)
}
}
} else {
for i := 0; i < len(infos); i++ {
if infos[i] != nil {
portal.markHandled(infos[i], eventIDs[i], true, false)
}
}
portal.log.Infofln("Successfully sent %d events", len(eventIDs))
}
}
func (portal *Portal) backfill(source *User, messages []*waProto.HistorySyncMsg) {
portal.backfillLock.Lock()
defer portal.backfillLock.Unlock()
var historyBatch, newBatch mautrix.ReqBatchSend
var historyBatchInfos, newBatchInfos []*types.MessageInfo
historyBatch.StateEventsAtStart = make([]*event.Event, 0)
newBatch.StateEventsAtStart = make([]*event.Event, 0)
removeMembersAtEnd := make(map[id.UserID]*event.MemberEventContent)
firstMsgTimestamp := time.Unix(int64(messages[len(messages)-1].GetMessage().GetMessageTimestamp()), 0)
addMember := func(puppet *Puppet) {
if _, alreadyAdded := removeMembersAtEnd[puppet.MXID]; alreadyAdded {
return
}
mxid := puppet.MXID.String()
content := event.MemberEventContent{
Membership: event.MembershipJoin,
Displayname: puppet.Displayname,
AvatarURL: puppet.AvatarURL.CUString(),
}
inviteContent := content
inviteContent.Membership = event.MembershipInvite
historyBatch.StateEventsAtStart = append(historyBatch.StateEventsAtStart, &event.Event{
Type: event.StateMember,
Sender: portal.MainIntent().UserID,
StateKey: &mxid,
Timestamp: firstMsgTimestamp.UnixMilli(),
Content: event.Content{Parsed: &inviteContent},
}, &event.Event{
Type: event.StateMember,
Sender: puppet.MXID,
StateKey: &mxid,
Timestamp: firstMsgTimestamp.UnixMilli(),
Content: event.Content{Parsed: &content},
})
removeMembersAtEnd[puppet.MXID] = &content
}
firstMessage := portal.bridge.DB.Message.GetFirstInChat(portal.Key)
lastMessage := portal.bridge.DB.Message.GetLastInChat(portal.Key)
var historyMaxTs, newMinTs time.Time
if portal.FirstEventID != "" || portal.NextBatchID != "" {
historyBatch.PrevEventID = portal.FirstEventID
historyBatch.BatchID = portal.NextBatchID
if firstMessage == nil && lastMessage == nil {
historyMaxTs = time.Now()
} else {
historyMaxTs = firstMessage.Timestamp
}
}
if lastMessage != nil {
newBatch.PrevEventID = lastMessage.MXID
newMinTs = lastMessage.Timestamp
}
portal.log.Infofln("Processing history sync with %d messages", len(messages))
// The messages are ordered newest to oldest, so iterate them in reverse order.
for i := len(messages) - 1; i >= 0; i-- {
wrappedMsg := messages[i]
webMsg := wrappedMsg.GetMessage()
msgType := portal.getMessageType(webMsg.GetMessage())
if msgType == "unknown" || msgType == "ignore" {
if msgType == "unknown" {
portal.log.Debugfln("Skipping message %s with unknown type in backfill", webMsg.GetKey().GetId())
}
continue
}
info := portal.parseWebMessageInfo(webMsg)
if info == nil {
continue
}
var batch *mautrix.ReqBatchSend
var infos *[]*types.MessageInfo
var history bool
if !historyMaxTs.IsZero() && info.Timestamp.Before(historyMaxTs) {
batch, infos, history = &historyBatch, &historyBatchInfos, true
} else if !newMinTs.IsZero() && info.Timestamp.After(newMinTs) {
batch, infos = &newBatch, &newBatchInfos
} else {
continue
}
puppet := portal.getMessagePuppet(source, info)
var intent *appservice.IntentAPI
if portal.Key.JID == puppet.JID {
intent = puppet.DefaultIntent()
} else {
intent = puppet.IntentFor(portal)
if intent.IsCustomPuppet && !portal.bridge.Config.Bridge.DoublePuppetBackfill {
intent = puppet.DefaultIntent()
addMember(puppet)
}
}
converted := portal.convertMessage(intent, source, info, webMsg.GetMessage())
if converted == nil {
portal.log.Debugfln("Skipping unsupported message %s in backfill", info.ID)
continue
}
if history && !portal.IsPrivateChat() && !portal.bridge.StateStore.IsInRoom(portal.MXID, puppet.MXID) {
addMember(puppet)
}
err := portal.appendBatchEvents(converted, info, &batch.Events, infos)
if err != nil {
portal.log.Errorfln("Error handling message %s during backfill: %v", info.ID, err)
}
}
if len(historyBatch.Events) > 0 {
portal.log.Infofln("Sending %d historical messages...", len(historyBatch.Events))
historyResp, err := portal.MainIntent().BatchSend(portal.MXID, &historyBatch)
if err != nil {
portal.log.Errorln("Error sending batch of historical messages:", err)
} else {
portal.finishBatch(historyResp.EventIDs, historyBatchInfos)
portal.NextBatchID = historyResp.NextBatchID
portal.Update()
}
}
if len(newBatch.Events) > 0 {
portal.log.Infofln("Sending %d new messages...", len(newBatch.Events))
newResp, err := portal.MainIntent().BatchSend(portal.MXID, &newBatch)
if err != nil {
portal.log.Errorln("Error sending batch of new messages:", err)
} else {
portal.finishBatch(newResp.EventIDs, newBatchInfos)
}
}
}
//func (portal *Portal) BackfillHistory(user *User, lastMessageTime int64) error {
// if !portal.bridge.Config.Bridge.RecoverHistory {
// return nil
@ -1007,6 +1291,8 @@ func (portal *Portal) UpdateBridgeInfo() {
}
}
var PortalCreationDummyEvent = event.Type{Type: "fi.mau.portal_created", Class: event.MessageEventType}
func (portal *Portal) CreateMatrixRoom(user *User) error {
portal.roomCreateLock.Lock()
defer portal.roomCreateLock.Unlock()
@ -1162,6 +1448,14 @@ func (portal *Portal) CreateMatrixRoom(user *User) error {
user.UpdateDirectChats(map[id.UserID][]id.RoomID{puppet.MXID: {portal.MXID}})
}
firstEventResp, err := portal.MainIntent().SendMessageEvent(portal.MXID, PortalCreationDummyEvent, struct{}{})
if err != nil {
portal.log.Errorln("Failed to send dummy event to mark portal creation:", err)
} else {
portal.FirstEventID = firstEventResp.EventID
portal.Update()
}
//user.CreateUserPortal(database.PortalKeyWithMeta{PortalKey: portal.Key, InCommunity: inCommunity})
//err = portal.FillInitialHistory(user)
@ -1289,6 +1583,21 @@ func (portal *Portal) sendMainIntentMessage(content interface{}) (*mautrix.RespS
return portal.sendMessage(portal.MainIntent(), event.EventMessage, content, 0)
}
func (portal *Portal) encrypt(content *event.Content, eventType event.Type) (event.Type, error) {
if portal.Encrypted && portal.bridge.Crypto != nil {
// TODO maybe the locking should be inside mautrix-go?
portal.encryptLock.Lock()
encrypted, err := portal.bridge.Crypto.Encrypt(portal.MXID, eventType, *content)
portal.encryptLock.Unlock()
if err != nil {
return eventType, fmt.Errorf("failed to encrypt event: %w", err)
}
eventType = event.EventEncrypted
content.Parsed = encrypted
}
return eventType, nil
}
func (portal *Portal) sendMessage(intent *appservice.IntentAPI, eventType event.Type, content interface{}, timestamp int64) (*mautrix.RespSendEvent, error) {
wrappedContent := event.Content{Parsed: content}
if timestamp != 0 && intent.IsCustomPuppet {
@ -1296,16 +1605,10 @@ func (portal *Portal) sendMessage(intent *appservice.IntentAPI, eventType event.
"net.maunium.whatsapp.puppet": intent.IsCustomPuppet,
}
}
if portal.Encrypted && portal.bridge.Crypto != nil {
// TODO maybe the locking should be inside mautrix-go?
portal.encryptLock.Lock()
encrypted, err := portal.bridge.Crypto.Encrypt(portal.MXID, eventType, wrappedContent)
portal.encryptLock.Unlock()
if err != nil {
return nil, fmt.Errorf("failed to encrypt event: %w", err)
}
eventType = event.EventEncrypted
wrappedContent.Parsed = encrypted
var err error
eventType, err = portal.encrypt(&wrappedContent, eventType)
if err != nil {
return nil, err
}
_, _ = intent.UserTyping(portal.MXID, false, 0)
if timestamp == 0 {
@ -1315,33 +1618,29 @@ func (portal *Portal) sendMessage(intent *appservice.IntentAPI, eventType event.
}
}
func (portal *Portal) HandleTextMessage(source *User, msg *events.Message) bool {
intent := portal.startHandling(source, &msg.Info, "text")
if intent == nil {
return false
}
type ConvertedMessage struct {
Intent *appservice.IntentAPI
Type event.Type
Content *event.MessageEventContent
Caption *event.MessageEventContent
}
func (portal *Portal) convertTextMessage(intent *appservice.IntentAPI, msg *waProto.Message) *ConvertedMessage {
content := &event.MessageEventContent{
Body: msg.Message.GetConversation(),
Body: msg.GetConversation(),
MsgType: event.MsgText,
}
if msg.Message.GetExtendedTextMessage() != nil {
content.Body = msg.Message.GetExtendedTextMessage().GetText()
if msg.GetExtendedTextMessage() != nil {
content.Body = msg.GetExtendedTextMessage().GetText()
contextInfo := msg.Message.GetExtendedTextMessage().GetContextInfo()
contextInfo := msg.GetExtendedTextMessage().GetContextInfo()
if contextInfo != nil {
portal.bridge.Formatter.ParseWhatsApp(content, contextInfo.GetMentionedJid())
portal.SetReply(content, contextInfo.GetStanzaId())
}
}
resp, err := portal.sendMessage(intent, event.EventMessage, content, msg.Info.Timestamp.Unix()*1000)
if err != nil {
portal.log.Errorfln("Failed to handle message %s: %v", msg.Info.ID, err)
} else {
portal.finishHandling(source, &msg.Info, resp.EventID)
}
return true
return &ConvertedMessage{Intent: intent, Type: event.EventMessage, Content: content}
}
//func (portal *Portal) HandleStubMessage(source *User, message whatsapp.StubMessage, isBackfill bool) bool {
@ -1395,12 +1694,7 @@ func (portal *Portal) HandleTextMessage(source *User, msg *events.Message) bool
// return true
//}
func (portal *Portal) HandleLocationMessage(source *User, info *types.MessageInfo, msg *waProto.LocationMessage) bool {
intent := portal.startHandling(source, info, "location")
if intent == nil {
return false
}
func (portal *Portal) convertLocationMessage(intent *appservice.IntentAPI, msg *waProto.LocationMessage) *ConvertedMessage {
url := msg.GetUrl()
if len(url) == 0 {
url = fmt.Sprintf("https://maps.google.com/?q=%.5f,%.5f", msg.GetDegreesLatitude(), msg.GetDegreesLongitude())
@ -1445,21 +1739,10 @@ func (portal *Portal) HandleLocationMessage(source *User, info *types.MessageInf
portal.SetReply(content, msg.GetContextInfo().GetStanzaId())
resp, err := portal.sendMessage(intent, event.EventMessage, content, info.Timestamp.Unix()*1000)
if err != nil {
portal.log.Errorfln("Failed to handle message %s: %v", info.ID, err)
} else {
portal.finishHandling(source, info, resp.EventID)
}
return true
return &ConvertedMessage{Intent: intent, Type: event.EventMessage, Content: content}
}
func (portal *Portal) HandleContactMessage(source *User, info *types.MessageInfo, msg *waProto.ContactMessage) bool {
intent := portal.startHandling(source, info, "contact")
if intent == nil {
return false
}
func (portal *Portal) convertContactMessage(intent *appservice.IntentAPI, msg *waProto.ContactMessage) *ConvertedMessage {
fileName := fmt.Sprintf("%s.vcf", msg.GetDisplayName())
data := []byte(msg.GetVcard())
mimeType := "text/vcard"
@ -1468,7 +1751,7 @@ func (portal *Portal) HandleContactMessage(source *User, info *types.MessageInfo
uploadResp, err := intent.UploadBytesWithName(data, uploadMimeType, fileName)
if err != nil {
portal.log.Errorfln("Failed to upload vcard of %s: %v", msg.GetDisplayName(), err)
return true
return nil
}
content := &event.MessageEventContent{
@ -1488,38 +1771,7 @@ func (portal *Portal) HandleContactMessage(source *User, info *types.MessageInfo
portal.SetReply(content, msg.GetContextInfo().GetStanzaId())
resp, err := portal.sendMessage(intent, event.EventMessage, content, info.Timestamp.Unix()*1000)
if err != nil {
portal.log.Errorfln("Failed to handle message %s: %v", info.ID, err)
} else {
portal.finishHandling(source, info, resp.EventID)
}
return true
}
func (portal *Portal) sendMediaBridgeFailure(source *User, intent *appservice.IntentAPI, info *types.MessageInfo, bridgeErr error) {
portal.log.Errorfln("Failed to bridge media for %s: %v", info.ID, bridgeErr)
resp, err := portal.sendMessage(intent, event.EventMessage, &event.MessageEventContent{
MsgType: event.MsgNotice,
Body: "Failed to bridge media",
}, info.Timestamp.Unix()*1000)
if err != nil {
portal.log.Errorfln("Failed to send media download error message for %s: %v", info.ID, err)
} else {
portal.finishHandling(source, info, resp.EventID)
}
}
func (portal *Portal) encryptFile(data []byte, mimeType string) ([]byte, string, *event.EncryptedFileInfo) {
if !portal.Encrypted {
return data, mimeType, nil
}
file := &event.EncryptedFileInfo{
EncryptedFile: *attachment.NewEncryptedFile(),
URL: "",
}
return file.Encrypt(data), "application/octet-stream", file
return &ConvertedMessage{Intent: intent, Type: event.EventMessage, Content: content}
}
// FIXME
@ -1612,18 +1864,43 @@ func (portal *Portal) encryptFile(data []byte, mimeType string) ([]byte, string,
// return
//}
func (portal *Portal) makeMediaBridgeFailureMessage(intent *appservice.IntentAPI, info *types.MessageInfo, bridgeErr error, captionContent *event.MessageEventContent) *ConvertedMessage {
portal.log.Errorfln("Failed to bridge media for %s: %v", info.ID, bridgeErr)
return &ConvertedMessage{Intent: intent, Type: event.EventMessage, Content: &event.MessageEventContent{
MsgType: event.MsgNotice,
Body: "Failed to bridge media",
}, Caption: captionContent}
}
func (portal *Portal) encryptFile(data []byte, mimeType string) ([]byte, string, *event.EncryptedFileInfo) {
if !portal.Encrypted {
return data, mimeType, nil
}
file := &event.EncryptedFileInfo{
EncryptedFile: *attachment.NewEncryptedFile(),
URL: "",
}
return file.Encrypt(data), "application/octet-stream", file
}
type MediaMessage interface {
whatsmeow.DownloadableMessage
GetContextInfo() *waProto.ContextInfo
GetMimetype() string
}
type MediaMessageWithThumbnailAndCaption interface {
type MediaMessageWithThumbnail interface {
MediaMessage
GetJpegThumbnail() []byte
GetCaption() string
}
type MediaMessageWithCaption interface {
MediaMessage
GetCaption() string
}
type MediaMessageWithFileName interface {
MediaMessage
GetFileName() string
@ -1634,10 +1911,16 @@ type MediaMessageWithDuration interface {
GetSeconds() uint32
}
func (portal *Portal) HandleMediaMessage(source *User, info *types.MessageInfo, msg MediaMessage) bool {
intent := portal.startHandling(source, info, fmt.Sprintf("media %s", msg.GetMimetype()))
if intent == nil {
return false
func (portal *Portal) convertMediaMessage(intent *appservice.IntentAPI, source *User, info *types.MessageInfo, msg MediaMessage) *ConvertedMessage {
messageWithCaption, ok := msg.(MediaMessageWithCaption)
var captionContent *event.MessageEventContent
if ok && len(messageWithCaption.GetCaption()) > 0 {
captionContent = &event.MessageEventContent{
Body: messageWithCaption.GetCaption(),
MsgType: event.MsgNotice,
}
portal.bridge.Formatter.ParseWhatsApp(captionContent, msg.GetContextInfo().GetMentionedJid())
}
data, err := source.Client.Download(msg)
@ -1653,10 +1936,9 @@ func (portal *Portal) HandleMediaMessage(source *User, info *types.MessageInfo,
//}
if errors.Is(err, whatsmeow.ErrNoURLPresent) {
portal.log.Debugfln("No URL present error for media message %s, ignoring...", info.ID)
return true
return nil
} else if err != nil {
portal.sendMediaBridgeFailure(source, intent, info, err)
return true
return portal.makeMediaBridgeFailureMessage(intent, info, err, captionContent)
}
var width, height int
@ -1670,13 +1952,12 @@ func (portal *Portal) HandleMediaMessage(source *User, info *types.MessageInfo,
uploaded, err := intent.UploadBytes(data, uploadMimeType)
if err != nil {
if errors.Is(err, mautrix.MTooLarge) {
portal.sendMediaBridgeFailure(source, intent, info, errors.New("homeserver rejected too large file"))
return portal.makeMediaBridgeFailureMessage(intent, info, errors.New("homeserver rejected too large file"), captionContent)
} else if httpErr, ok := err.(mautrix.HTTPError); ok && httpErr.IsStatus(413) {
portal.sendMediaBridgeFailure(source, intent, info, errors.New("proxy rejected too large file"))
return portal.makeMediaBridgeFailureMessage(intent, info, errors.New("proxy rejected too large file"), captionContent)
} else {
portal.sendMediaBridgeFailure(source, intent, info, fmt.Errorf("failed to upload media: %w", err))
return portal.makeMediaBridgeFailureMessage(intent, info, fmt.Errorf("failed to upload media: %w", err), captionContent)
}
return true
}
content := &event.MessageEventContent{
@ -1719,9 +2000,9 @@ func (portal *Portal) HandleMediaMessage(source *User, info *types.MessageInfo,
}
portal.SetReply(content, msg.GetContextInfo().GetStanzaId())
msgWithThumbnail, ok := msg.(MediaMessageWithThumbnailAndCaption)
if ok && msgWithThumbnail.GetJpegThumbnail() != nil && portal.bridge.Config.Bridge.WhatsappThumbnail {
thumbnailData := msgWithThumbnail.GetJpegThumbnail()
messageWithThumbnail, ok := msg.(MediaMessageWithThumbnail)
if ok && messageWithThumbnail.GetJpegThumbnail() != nil && portal.bridge.Config.Bridge.WhatsappThumbnail {
thumbnailData := messageWithThumbnail.GetJpegThumbnail()
thumbnailMime := http.DetectContentType(thumbnailData)
thumbnailCfg, _, _ := image.DecodeConfig(bytes.NewReader(thumbnailData))
thumbnailSize := len(thumbnailData)
@ -1759,33 +2040,17 @@ func (portal *Portal) HandleMediaMessage(source *User, info *types.MessageInfo,
content.MsgType = event.MsgFile
}
ts := info.Timestamp.Unix() * 1000
eventType := event.EventMessage
if isSticker {
eventType = event.EventSticker
}
resp, err := portal.sendMessage(intent, eventType, content, ts)
if err != nil {
portal.log.Errorfln("Failed to handle message %s: %v", info.ID, err)
return true
return &ConvertedMessage{
Intent: intent,
Type: eventType,
Content: content,
Caption: captionContent,
}
if msgWithThumbnail != nil && len(msgWithThumbnail.GetCaption()) > 0 {
captionContent := &event.MessageEventContent{
Body: msgWithThumbnail.GetCaption(),
MsgType: event.MsgNotice,
}
portal.bridge.Formatter.ParseWhatsApp(captionContent, msg.GetContextInfo().GetMentionedJid())
resp, err = portal.sendMessage(intent, event.EventMessage, captionContent, ts)
if err != nil {
portal.log.Warnfln("Failed to handle caption of message %s: %v", info.ID, err)
}
}
portal.finishHandling(source, info, resp.EventID)
return true
}
func (portal *Portal) downloadThumbnail(content *event.MessageEventContent, id id.EventID) []byte {
@ -2247,7 +2512,7 @@ func (portal *Portal) HandleMatrixMessage(sender *User, evt *event.Event) {
return
}
info := portal.generateMessageInfo(sender)
dbMsg := portal.markHandled(sender, info, evt.ID, false)
dbMsg := portal.markHandled(info, evt.ID, false, true)
portal.log.Debugln("Sending event", evt.ID, "to WhatsApp", info.ID)
err := sender.Client.SendMessage(portal.Key.JID, info.ID, msg)
if err != nil {

View file

@ -28,6 +28,7 @@ import (
"go.mau.fi/whatsmeow/types"
log "maunium.net/go/maulogger/v2"
"maunium.net/go/mautrix/appservice"
"maunium.net/go/mautrix/id"
@ -165,10 +166,7 @@ type Puppet struct {
}
func (puppet *Puppet) IntentFor(portal *Portal) *appservice.IntentAPI {
if (!portal.IsPrivateChat() && puppet.customIntent == nil) ||
// FIXME
//(portal.backfilling && portal.bridge.Config.Bridge.InviteOwnPuppetForBackfilling) ||
portal.Key.JID == puppet.JID {
if (!portal.IsPrivateChat() && puppet.customIntent == nil) || portal.Key.JID == puppet.JID {
return puppet.DefaultIntent()
}
return puppet.customIntent

52
user.go
View file

@ -26,6 +26,8 @@ import (
log "maunium.net/go/maulogger/v2"
"go.mau.fi/whatsmeow/appstate"
waProto "go.mau.fi/whatsmeow/binary/proto"
"maunium.net/go/mautrix/appservice"
"maunium.net/go/mautrix/pushrules"
@ -63,6 +65,8 @@ type User struct {
qrListener chan<- *events.QR
loginListener chan<- *events.PairSuccess
historySyncs chan *events.HistorySync
prevBridgeStatus *BridgeState
}
@ -176,11 +180,21 @@ func (bridge *Bridge) NewUser(dbUser *database.User) *User {
bridge: bridge,
log: bridge.Log.Sub("User").Sub(string(dbUser.MXID)),
historySyncs: make(chan *events.HistorySync, 32),
IsRelaybot: false,
}
user.RelaybotWhitelisted = user.bridge.Config.Bridge.Permissions.IsRelaybotWhitelisted(user.MXID)
user.Whitelisted = user.bridge.Config.Bridge.Permissions.IsWhitelisted(user.MXID)
user.Admin = user.bridge.Config.Bridge.Permissions.IsAdmin(user.MXID)
go func() {
for evt := range user.historySyncs {
if evt == nil {
return
}
user.handleHistorySync(evt.Data)
}
}()
return user
}
@ -328,6 +342,28 @@ func (user *User) sendMarkdownBridgeAlert(formatString string, args ...interface
}
}
func (user *User) handleHistorySync(evt *waProto.HistorySync) {
if evt.GetSyncType() != waProto.HistorySync_RECENT && evt.GetSyncType() != waProto.HistorySync_FULL {
return
}
user.log.Infofln("Handling history sync with type %s, chunk order %d, progress %d%%", evt.GetSyncType(), evt.GetChunkOrder(), evt.GetProgress())
for _, conv := range evt.GetConversations() {
jid, err := types.ParseJID(conv.GetId())
if err != nil {
user.log.Warnfln("Failed to parse chat JID '%s' in history sync: %v", conv.GetId(), err)
continue
}
portal := user.GetPortalByJID(jid)
err = portal.CreateMatrixRoom(user)
if err != nil {
user.log.Warnfln("Failed to create room for %s during backfill: %v", portal.Key.JID, err)
continue
}
portal.backfill(user, conv.GetMessages())
}
}
func (user *User) HandleEvent(event interface{}) {
switch v := event.(type) {
case *events.LoggedOut:
@ -345,6 +381,20 @@ func (user *User) HandleEvent(event interface{}) {
}
}()
go user.tryAutomaticDoublePuppeting()
case *events.AppStateSyncComplete:
if len(user.Client.Store.PushName) > 0 && v.Name == appstate.WAPatchCriticalBlock {
err := user.Client.SendPresence(types.PresenceUnavailable)
if err != nil {
user.log.Warnln("Failed to send presence after app state sync:", err)
}
}
case *events.PushNameSetting:
// Send presence available when connecting and when the pushname is changed.
// This makes sure that outgoing messages always have the right pushname.
err := user.Client.SendPresence(types.PresenceUnavailable)
if err != nil {
user.log.Warnln("Failed to send presence after push name update:", err)
}
case *events.PairSuccess:
user.JID = v.ID
user.addToJIDMap()
@ -382,6 +432,8 @@ func (user *User) HandleEvent(event interface{}) {
case *events.Message:
portal := user.GetPortalByJID(v.Info.Chat)
portal.messages <- PortalMessage{v, user}
case *events.HistorySync:
user.historySyncs <- v
case *events.Mute:
portal := user.bridge.GetPortalByJID(user.PortalKey(v.JID))
if portal != nil {