Compress portal event channels into one

Use a single channel of pointers to pre-allocate less memory per
portal.
This commit is contained in:
Toni Spets 2023-12-06 08:58:46 +02:00 committed by Toni Spets
parent 4e6e486e19
commit 2589049f14
2 changed files with 59 additions and 35 deletions

View file

@ -110,7 +110,13 @@ func (portal *Portal) MarkEncrypted() {
func (portal *Portal) ReceiveMatrixEvent(user bridge.User, evt *event.Event) { func (portal *Portal) ReceiveMatrixEvent(user bridge.User, evt *event.Event) {
if user.GetPermissionLevel() >= bridgeconfig.PermissionLevelUser || portal.HasRelaybot() { if user.GetPermissionLevel() >= bridgeconfig.PermissionLevelUser || portal.HasRelaybot() {
portal.matrixMessages <- PortalMatrixMessage{user: user.(*User), evt: evt, receivedAt: time.Now()} portal.events <- &PortalEvent{
MatrixMessage: &PortalMatrixMessage{
user: user.(*User),
evt: evt,
receivedAt: time.Now(),
},
}
} }
} }
@ -199,9 +205,7 @@ func (br *WABridge) newBlankPortal(key database.PortalKey) *Portal {
log: br.Log.Sub(fmt.Sprintf("Portal/%s", key)), log: br.Log.Sub(fmt.Sprintf("Portal/%s", key)),
zlog: br.ZLog.With().Str("portal_key", key.String()).Logger(), zlog: br.ZLog.With().Str("portal_key", key.String()).Logger(),
messages: make(chan PortalMessage, br.Config.Bridge.PortalMessageBuffer), events: make(chan *PortalEvent, br.Config.Bridge.PortalMessageBuffer),
matrixMessages: make(chan PortalMatrixMessage, br.Config.Bridge.PortalMessageBuffer),
mediaRetries: make(chan PortalMediaRetry, br.Config.Bridge.PortalMessageBuffer),
mediaErrorCache: make(map[types.MessageID]*FailedMediaMeta), mediaErrorCache: make(map[types.MessageID]*FailedMediaMeta),
} }
@ -232,6 +236,12 @@ type fakeMessage struct {
Important bool Important bool
} }
type PortalEvent struct {
Message *PortalMessage
MatrixMessage *PortalMatrixMessage
MediaRetry *PortalMediaRetry
}
type PortalMessage struct { type PortalMessage struct {
evt *events.Message evt *events.Message
undecryptable *events.UndecryptableMessage undecryptable *events.UndecryptableMessage
@ -279,9 +289,7 @@ type Portal struct {
currentlyTyping []id.UserID currentlyTyping []id.UserID
currentlyTypingLock sync.Mutex currentlyTypingLock sync.Mutex
messages chan PortalMessage events chan *PortalEvent
matrixMessages chan PortalMatrixMessage
mediaRetries chan PortalMediaRetry
mediaErrorCache map[types.MessageID]*FailedMediaMeta mediaErrorCache map[types.MessageID]*FailedMediaMeta
@ -337,7 +345,7 @@ var (
_ bridge.TypingPortal = (*Portal)(nil) _ bridge.TypingPortal = (*Portal)(nil)
) )
func (portal *Portal) handleWhatsAppMessageLoopItem(msg PortalMessage) { func (portal *Portal) handleWhatsAppMessageLoopItem(msg *PortalMessage) {
if len(portal.MXID) == 0 { if len(portal.MXID) == 0 {
if msg.fake == nil && msg.undecryptable == nil && (msg.evt == nil || !containsSupportedMessage(msg.evt.Message)) { if msg.fake == nil && msg.undecryptable == nil && (msg.evt == nil || !containsSupportedMessage(msg.evt.Message)) {
portal.log.Debugln("Not creating portal room for incoming message: message is not a chat message") portal.log.Debugln("Not creating portal room for incoming message: message is not a chat message")
@ -369,7 +377,7 @@ func (portal *Portal) handleWhatsAppMessageLoopItem(msg PortalMessage) {
} }
} }
func (portal *Portal) handleMatrixMessageLoopItem(msg PortalMatrixMessage) { func (portal *Portal) handleMatrixMessageLoopItem(msg *PortalMatrixMessage) {
portal.latestEventBackfillLock.Lock() portal.latestEventBackfillLock.Lock()
defer portal.latestEventBackfillLock.Unlock() defer portal.latestEventBackfillLock.Unlock()
evtTS := time.UnixMilli(msg.evt.Timestamp) evtTS := time.UnixMilli(msg.evt.Timestamp)
@ -483,12 +491,16 @@ func (portal *Portal) handleOneMessageLoopItem() {
} }
}() }()
select { select {
case msg := <-portal.messages: case msg := <-portal.events:
portal.handleWhatsAppMessageLoopItem(msg) if msg.Message != nil {
case msg := <-portal.matrixMessages: portal.handleWhatsAppMessageLoopItem(msg.Message)
portal.handleMatrixMessageLoopItem(msg) } else if msg.MatrixMessage != nil {
case retry := <-portal.mediaRetries: portal.handleMatrixMessageLoopItem(msg.MatrixMessage)
portal.handleMediaRetry(retry.evt, retry.source) } else if msg.MediaRetry != nil {
portal.handleMediaRetry(msg.MediaRetry.evt, msg.MediaRetry.source)
} else {
portal.log.Warn("Portal event loop returned an event without any data")
}
} }
} }

52
user.go
View file

@ -635,15 +635,17 @@ func (user *User) handleCallStart(sender types.JID, id, callType string, ts time
if callType != "" { if callType != "" {
text = fmt.Sprintf("Incoming %s call. Use the WhatsApp app to answer.", callType) text = fmt.Sprintf("Incoming %s call. Use the WhatsApp app to answer.", callType)
} }
portal.messages <- PortalMessage{ portal.events <- &PortalEvent{
fake: &fakeMessage{ Message: &PortalMessage{
Sender: sender, fake: &fakeMessage{
Text: text, Sender: sender,
ID: id, Text: text,
Time: ts, ID: id,
Important: true, Time: ts,
Important: true,
},
source: user,
}, },
source: user,
} }
} }
@ -865,11 +867,15 @@ func (user *User) HandleEvent(event interface{}) {
go user.handleChatPresence(v) go user.handleChatPresence(v)
case *events.Message: case *events.Message:
portal := user.GetPortalByMessageSource(v.Info.MessageSource) portal := user.GetPortalByMessageSource(v.Info.MessageSource)
portal.messages <- PortalMessage{evt: v, source: user} portal.events <- &PortalEvent{
Message: &PortalMessage{evt: v, source: user},
}
case *events.MediaRetry: case *events.MediaRetry:
user.phoneSeen(v.Timestamp) user.phoneSeen(v.Timestamp)
portal := user.GetPortalByJID(v.ChatID) portal := user.GetPortalByJID(v.ChatID)
portal.mediaRetries <- PortalMediaRetry{evt: v, source: user} portal.events <- &PortalEvent{
MediaRetry: &PortalMediaRetry{evt: v, source: user},
}
case *events.CallOffer: case *events.CallOffer:
user.handleCallStart(v.CallCreator, v.CallID, "", v.Timestamp) user.handleCallStart(v.CallCreator, v.CallID, "", v.Timestamp)
case *events.CallOfferNotice: case *events.CallOfferNotice:
@ -885,22 +891,26 @@ func (user *User) HandleEvent(event interface{}) {
if v.Implicit { if v.Implicit {
text = fmt.Sprintf("Your security code with %s (device #%d) changed.", puppet.Displayname, v.JID.Device) text = fmt.Sprintf("Your security code with %s (device #%d) changed.", puppet.Displayname, v.JID.Device)
} }
portal.messages <- PortalMessage{ portal.events <- &PortalEvent{
fake: &fakeMessage{ Message: &PortalMessage{
Sender: v.JID, fake: &fakeMessage{
Text: text, Sender: v.JID,
ID: strconv.FormatInt(v.Timestamp.Unix(), 10), Text: text,
Time: v.Timestamp, ID: strconv.FormatInt(v.Timestamp.Unix(), 10),
Important: false, Time: v.Timestamp,
Important: false,
},
source: user,
}, },
source: user,
} }
} }
case *events.CallTerminate, *events.CallRelayLatency, *events.CallAccept, *events.UnknownCallEvent: case *events.CallTerminate, *events.CallRelayLatency, *events.CallAccept, *events.UnknownCallEvent:
// ignore // ignore
case *events.UndecryptableMessage: case *events.UndecryptableMessage:
portal := user.GetPortalByMessageSource(v.Info.MessageSource) portal := user.GetPortalByMessageSource(v.Info.MessageSource)
portal.messages <- PortalMessage{undecryptable: v, source: user} portal.events <- &PortalEvent{
Message: &PortalMessage{undecryptable: v, source: user},
}
case *events.HistorySync: case *events.HistorySync:
if user.bridge.Config.Bridge.HistorySync.Backfill { if user.bridge.Config.Bridge.HistorySync.Backfill {
user.historySyncs <- v user.historySyncs <- v
@ -1235,7 +1245,9 @@ func (user *User) handleReceipt(receipt *events.Receipt) {
if portal == nil || len(portal.MXID) == 0 { if portal == nil || len(portal.MXID) == 0 {
return return
} }
portal.messages <- PortalMessage{receipt: receipt, source: user} portal.events <- &PortalEvent{
Message: &PortalMessage{receipt: receipt, source: user},
}
} }
func (user *User) makeReadMarkerContent(eventID id.EventID, doublePuppet bool) CustomReadMarkers { func (user *User) makeReadMarkerContent(eventID id.EventID, doublePuppet bool) CustomReadMarkers {