From 1d43aeca2f09647221ef1de93ddca208bb8086a6 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Tue, 28 May 2019 14:12:35 +0300 Subject: [PATCH] Remove message locking as message handling is synchronous per-portal --- portal.go | 51 ++++++++++++--------------------------------------- 1 file changed, 12 insertions(+), 39 deletions(-) diff --git a/portal.go b/portal.go index 69828be..1e3c268 100644 --- a/portal.go +++ b/portal.go @@ -108,7 +108,6 @@ func (bridge *Bridge) NewPortal(dbPortal *database.Portal) *Portal { bridge: bridge, log: bridge.Log.Sub(fmt.Sprintf("Portal/%s", dbPortal.Key)), - messageLocks: make(map[types.WhatsAppMessageID]sync.Mutex), recentlyHandled: [recentlyHandledLength]types.WhatsAppMessageID{}, messages: make(chan PortalMessage, 128), @@ -134,9 +133,7 @@ type Portal struct { avatarURL string - roomCreateLock sync.Mutex - messageLocksLock sync.Mutex - messageLocks map[types.WhatsAppMessageID]sync.Mutex + roomCreateLock sync.Mutex recentlyHandled [recentlyHandledLength]types.WhatsAppMessageID recentlyHandledLock sync.Mutex @@ -188,22 +185,6 @@ func (portal *Portal) handleMessage(msg PortalMessage) { } } -func (portal *Portal) getMessageLock(messageID types.WhatsAppMessageID) sync.Mutex { - portal.messageLocksLock.Lock() - defer portal.messageLocksLock.Unlock() - lock, ok := portal.messageLocks[messageID] - if !ok { - portal.messageLocks[messageID] = lock - } - return lock -} - -func (portal *Portal) deleteMessageLock(messageID types.WhatsAppMessageID) { - portal.messageLocksLock.Lock() - delete(portal.messageLocks, messageID) - portal.messageLocksLock.Unlock() -} - func (portal *Portal) isRecentlyHandled(id types.WhatsAppMessageID) bool { start := portal.recentlyHandledIndex for i := start; i != start; i = (i - 1) % recentlyHandledLength { @@ -252,25 +233,19 @@ func (portal *Portal) markHandled(source *User, message *waProto.WebMessageInfo, portal.recentlyHandled[index] = msg.JID } -func (portal *Portal) startHandling(info whatsapp.MessageInfo) (*sync.Mutex, bool) { - if portal.lastMessageTs > info.Timestamp+1 || portal.isRecentlyHandled(info.Id) { - return nil, false - } - lock := portal.getMessageLock(info.Id) - lock.Lock() - if portal.isDuplicate(info.Id) { - lock.Unlock() - return nil, false +func (portal *Portal) startHandling(info whatsapp.MessageInfo) bool { + if portal.lastMessageTs > info.Timestamp+1 || + portal.isRecentlyHandled(info.Id) || + portal.isDuplicate(info.Id) { + return false } portal.lastMessageTs = info.Timestamp - return &lock, true + return true } func (portal *Portal) finishHandling(source *User, message *waProto.WebMessageInfo, mxid types.MatrixEventID) { portal.markHandled(source, message, mxid) - id := message.GetKey().GetId() - portal.deleteMessageLock(id) - portal.log.Debugln("Handled message", id, "->", mxid) + portal.log.Debugln("Handled message", message.GetKey().GetId(), "->", mxid) } func (portal *Portal) SyncParticipants(metadata *whatsappExt.GroupInfo) { @@ -582,6 +557,8 @@ func (portal *Portal) FillInitialHistory(user *User) error { if portal.bridge.Config.Bridge.InitialHistoryFill == 0 { return nil } + portal.backfillLock.Lock() + defer portal.backfillLock.Unlock() n := portal.bridge.Config.Bridge.InitialHistoryFill portal.log.Infoln("Filling initial history, maximum", n, "messages") var messages []interface{} @@ -795,11 +772,9 @@ func (portal *Portal) HandleTextMessage(source *User, message whatsapp.TextMessa return } - lock, ok := portal.startHandling(message.Info) - if !ok { + if !portal.startHandling(message.Info) { return } - defer lock.Unlock() intent := portal.GetMessageIntent(source, message.Info) if intent == nil { @@ -828,11 +803,9 @@ func (portal *Portal) HandleMediaMessage(source *User, download func() ([]byte, return } - lock, ok := portal.startHandling(info) - if !ok { + if !portal.startHandling(info) { return } - defer lock.Unlock() intent := portal.GetMessageIntent(source, info) if intent == nil {