Remove message locking as message handling is synchronous per-portal

This commit is contained in:
Tulir Asokan 2019-05-28 14:12:35 +03:00
parent 7f0c67168c
commit 1d43aeca2f

View file

@ -108,7 +108,6 @@ func (bridge *Bridge) NewPortal(dbPortal *database.Portal) *Portal {
bridge: bridge, bridge: bridge,
log: bridge.Log.Sub(fmt.Sprintf("Portal/%s", dbPortal.Key)), log: bridge.Log.Sub(fmt.Sprintf("Portal/%s", dbPortal.Key)),
messageLocks: make(map[types.WhatsAppMessageID]sync.Mutex),
recentlyHandled: [recentlyHandledLength]types.WhatsAppMessageID{}, recentlyHandled: [recentlyHandledLength]types.WhatsAppMessageID{},
messages: make(chan PortalMessage, 128), messages: make(chan PortalMessage, 128),
@ -134,9 +133,7 @@ type Portal struct {
avatarURL string avatarURL string
roomCreateLock sync.Mutex roomCreateLock sync.Mutex
messageLocksLock sync.Mutex
messageLocks map[types.WhatsAppMessageID]sync.Mutex
recentlyHandled [recentlyHandledLength]types.WhatsAppMessageID recentlyHandled [recentlyHandledLength]types.WhatsAppMessageID
recentlyHandledLock sync.Mutex recentlyHandledLock sync.Mutex
@ -188,22 +185,6 @@ func (portal *Portal) handleMessage(msg PortalMessage) {
} }
} }
func (portal *Portal) getMessageLock(messageID types.WhatsAppMessageID) sync.Mutex {
portal.messageLocksLock.Lock()
defer portal.messageLocksLock.Unlock()
lock, ok := portal.messageLocks[messageID]
if !ok {
portal.messageLocks[messageID] = lock
}
return lock
}
func (portal *Portal) deleteMessageLock(messageID types.WhatsAppMessageID) {
portal.messageLocksLock.Lock()
delete(portal.messageLocks, messageID)
portal.messageLocksLock.Unlock()
}
func (portal *Portal) isRecentlyHandled(id types.WhatsAppMessageID) bool { func (portal *Portal) isRecentlyHandled(id types.WhatsAppMessageID) bool {
start := portal.recentlyHandledIndex start := portal.recentlyHandledIndex
for i := start; i != start; i = (i - 1) % recentlyHandledLength { for i := start; i != start; i = (i - 1) % recentlyHandledLength {
@ -252,25 +233,19 @@ func (portal *Portal) markHandled(source *User, message *waProto.WebMessageInfo,
portal.recentlyHandled[index] = msg.JID portal.recentlyHandled[index] = msg.JID
} }
func (portal *Portal) startHandling(info whatsapp.MessageInfo) (*sync.Mutex, bool) { func (portal *Portal) startHandling(info whatsapp.MessageInfo) bool {
if portal.lastMessageTs > info.Timestamp+1 || portal.isRecentlyHandled(info.Id) { if portal.lastMessageTs > info.Timestamp+1 ||
return nil, false portal.isRecentlyHandled(info.Id) ||
} portal.isDuplicate(info.Id) {
lock := portal.getMessageLock(info.Id) return false
lock.Lock()
if portal.isDuplicate(info.Id) {
lock.Unlock()
return nil, false
} }
portal.lastMessageTs = info.Timestamp portal.lastMessageTs = info.Timestamp
return &lock, true return true
} }
func (portal *Portal) finishHandling(source *User, message *waProto.WebMessageInfo, mxid types.MatrixEventID) { func (portal *Portal) finishHandling(source *User, message *waProto.WebMessageInfo, mxid types.MatrixEventID) {
portal.markHandled(source, message, mxid) portal.markHandled(source, message, mxid)
id := message.GetKey().GetId() portal.log.Debugln("Handled message", message.GetKey().GetId(), "->", mxid)
portal.deleteMessageLock(id)
portal.log.Debugln("Handled message", id, "->", mxid)
} }
func (portal *Portal) SyncParticipants(metadata *whatsappExt.GroupInfo) { func (portal *Portal) SyncParticipants(metadata *whatsappExt.GroupInfo) {
@ -582,6 +557,8 @@ func (portal *Portal) FillInitialHistory(user *User) error {
if portal.bridge.Config.Bridge.InitialHistoryFill == 0 { if portal.bridge.Config.Bridge.InitialHistoryFill == 0 {
return nil return nil
} }
portal.backfillLock.Lock()
defer portal.backfillLock.Unlock()
n := portal.bridge.Config.Bridge.InitialHistoryFill n := portal.bridge.Config.Bridge.InitialHistoryFill
portal.log.Infoln("Filling initial history, maximum", n, "messages") portal.log.Infoln("Filling initial history, maximum", n, "messages")
var messages []interface{} var messages []interface{}
@ -795,11 +772,9 @@ func (portal *Portal) HandleTextMessage(source *User, message whatsapp.TextMessa
return return
} }
lock, ok := portal.startHandling(message.Info) if !portal.startHandling(message.Info) {
if !ok {
return return
} }
defer lock.Unlock()
intent := portal.GetMessageIntent(source, message.Info) intent := portal.GetMessageIntent(source, message.Info)
if intent == nil { if intent == nil {
@ -828,11 +803,9 @@ func (portal *Portal) HandleMediaMessage(source *User, download func() ([]byte,
return return
} }
lock, ok := portal.startHandling(info) if !portal.startHandling(info) {
if !ok {
return return
} }
defer lock.Unlock()
intent := portal.GetMessageIntent(source, info) intent := portal.GetMessageIntent(source, info)
if intent == nil { if intent == nil {