Use channel for Matrix events to prevent concurrent sending to WhatsApp

This commit is contained in:
Tulir Asokan 2021-12-14 17:47:30 +02:00
parent be05d7d4a9
commit e38b0bf545
2 changed files with 55 additions and 27 deletions

View file

@ -445,7 +445,7 @@ func (mx *MatrixHandler) HandleMessage(evt *event.Event) {
portal := mx.bridge.GetPortalByMXID(evt.RoomID)
if portal != nil && (user.Whitelisted || portal.HasRelaybot()) {
portal.HandleMatrixMessage(user, evt)
portal.matrixMessages <- PortalMatrixMessage{user: user, evt: evt}
}
}
@ -479,7 +479,7 @@ func (mx *MatrixHandler) HandleRedaction(evt *event.Event) {
portal := mx.bridge.GetPortalByMXID(evt.RoomID)
if portal != nil && (user.Whitelisted || portal.HasRelaybot()) {
portal.HandleMatrixRedaction(user, evt)
portal.matrixMessages <- PortalMatrixMessage{user: user, evt: evt}
}
}

View file

@ -141,7 +141,8 @@ func (bridge *Bridge) NewManualPortal(key database.PortalKey) *Portal {
bridge: bridge,
log: bridge.Log.Sub(fmt.Sprintf("Portal/%s", key)),
messages: make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer),
messages: make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer),
matrixMessages: make(chan PortalMatrixMessage, bridge.Config.Bridge.PortalMessageBuffer),
}
portal.Key = key
go portal.handleMessageLoop()
@ -154,7 +155,8 @@ func (bridge *Bridge) NewPortal(dbPortal *database.Portal) *Portal {
bridge: bridge,
log: bridge.Log.Sub(fmt.Sprintf("Portal/%s", dbPortal.Key)),
messages: make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer),
messages: make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer),
matrixMessages: make(chan PortalMatrixMessage, bridge.Config.Bridge.PortalMessageBuffer),
}
go portal.handleMessageLoop()
return portal
@ -177,6 +179,11 @@ type PortalMessage struct {
source *User
}
type PortalMatrixMessage struct {
evt *event.Event
user *User
}
type recentlyHandledWrapper struct {
id types.MessageID
err bool
@ -201,34 +208,55 @@ type Portal struct {
currentlyTyping []id.UserID
currentlyTypingLock sync.Mutex
messages chan PortalMessage
messages chan PortalMessage
matrixMessages chan PortalMatrixMessage
relayUser *User
}
func (portal *Portal) handleMessageLoop() {
for msg := range portal.messages {
if len(portal.MXID) == 0 {
if msg.fake == nil && (msg.evt == nil || !containsSupportedMessage(msg.evt.Message)) {
portal.log.Debugln("Not creating portal room for incoming message: message is not a chat message")
continue
}
portal.log.Debugln("Creating Matrix room from incoming message")
err := portal.CreateMatrixRoom(msg.source, nil, false)
if err != nil {
portal.log.Errorln("Failed to create portal room:", err)
continue
}
func (portal *Portal) handleMessageLoopItem(msg PortalMessage) {
if len(portal.MXID) == 0 {
if msg.fake == nil && (msg.evt == nil || !containsSupportedMessage(msg.evt.Message)) {
portal.log.Debugln("Not creating portal room for incoming message: message is not a chat message")
return
}
if msg.evt != nil {
portal.handleMessage(msg.source, msg.evt)
} else if msg.undecryptable != nil {
portal.handleUndecryptableMessage(msg.source, msg.undecryptable)
} else if msg.fake != nil {
msg.fake.ID = "FAKE::" + msg.fake.ID
portal.handleFakeMessage(*msg.fake)
} else {
portal.log.Warnln("Unexpected PortalMessage with no message: %+v", msg)
portal.log.Debugln("Creating Matrix room from incoming message")
err := portal.CreateMatrixRoom(msg.source, nil, false)
if err != nil {
portal.log.Errorln("Failed to create portal room:", err)
return
}
}
if msg.evt != nil {
portal.handleMessage(msg.source, msg.evt)
} else if msg.undecryptable != nil {
portal.handleUndecryptableMessage(msg.source, msg.undecryptable)
} else if msg.fake != nil {
msg.fake.ID = "FAKE::" + msg.fake.ID
portal.handleFakeMessage(*msg.fake)
} else {
portal.log.Warnln("Unexpected PortalMessage with no message: %+v", msg)
}
}
func (portal *Portal) handleMatrixMessageLoopItem(msg PortalMatrixMessage) {
switch msg.evt.Type {
case event.EventMessage, event.EventSticker:
portal.HandleMatrixMessage(msg.user, msg.evt)
case event.EventRedaction:
portal.HandleMatrixRedaction(msg.user, msg.evt)
default:
portal.log.Warnln("Unsupported event type %+v in portal message channel", msg.evt.Type)
}
}
func (portal *Portal) handleMessageLoop() {
for {
select {
case msg := <-portal.messages:
portal.handleMessageLoopItem(msg)
case msg := <-portal.matrixMessages:
portal.handleMatrixMessageLoopItem(msg)
}
}
}