From 79851a62b41f55544e039c0156beabae9216ee43 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Fri, 31 Aug 2018 00:13:08 +0300 Subject: [PATCH] Add locking for whatsapp->matrix messages for desegregated group chats --- Gopkg.lock | 6 +- example-config.yaml | 2 +- portal.go | 112 ++++++++++++++---- .../go/mautrix-appservice/intent.go | 10 ++ .../go/mautrix-appservice/statestore.go | 2 + 5 files changed, 104 insertions(+), 28 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 5334d34..5bca2ba 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -87,7 +87,7 @@ "curve25519", "hkdf" ] - revision = "614d502a4dac94afa3a6ce146bd1736da82514c6" + revision = "182538f80094b6a8efaade63a8fd8e0d9d5843dd" [[projects]] branch = "master" @@ -102,7 +102,7 @@ branch = "master" name = "golang.org/x/sys" packages = ["unix"] - revision = "d99a578cf41bfccdeaf48b0845c823a4b8b0ad5e" + revision = "49385e6e15226593f68b26af201feec29d5bba22" [[projects]] name = "gopkg.in/russross/blackfriday.v2" @@ -141,7 +141,7 @@ branch = "master" name = "maunium.net/go/mautrix-appservice" packages = ["."] - revision = "37d4449056015cea5d0a4420bba595c61ad32007" + revision = "fb756247f82716de7698b8200f28f16b4fd04a6b" [solve-meta] analyzer-name = "dep" diff --git a/example-config.yaml b/example-config.yaml index 616c142..4839aee 100644 --- a/example-config.yaml +++ b/example-config.yaml @@ -64,7 +64,7 @@ bridge: # domain - All users on that homeserver # mxid - Specific user permissions: - "example.com": full + "example.com": user "@admin:example.com": admin # Logging config. diff --git a/portal.go b/portal.go index 98457e8..27635e1 100644 --- a/portal.go +++ b/portal.go @@ -102,6 +102,9 @@ func (bridge *Bridge) NewPortal(dbPortal *database.Portal) *Portal { Portal: dbPortal, bridge: bridge, log: bridge.Log.Sub(fmt.Sprintf("Portal/%s", dbPortal.Key)), + + messageLocks: make(map[types.WhatsAppMessageID]sync.Mutex), + recentlyHandled: [20]types.WhatsAppMessageID{}, } } @@ -111,7 +114,80 @@ type Portal struct { bridge *Bridge log log.Logger - roomCreateLock sync.Mutex + roomCreateLock sync.Mutex + messageLocksLock sync.Mutex + messageLocks map[types.WhatsAppMessageID]sync.Mutex + + recentlyHandled [20]types.WhatsAppMessageID + recentlyHandledLock sync.Mutex + recentlyHandledIndex uint8 +} + +func (portal *Portal) getMessageLock(messageID types.WhatsAppMessageID) sync.Mutex { + portal.messageLocksLock.Lock() + defer portal.messageLocksLock.Unlock() + lock, ok := portal.messageLocks[messageID] + if !ok { + portal.messageLocks[messageID] = lock + } + return lock +} + +func (portal *Portal) deleteMessageLock(messageID types.WhatsAppMessageID) { + portal.messageLocksLock.Lock() + delete(portal.messageLocks, messageID) + portal.messageLocksLock.Unlock() +} + +func (portal *Portal) isRecentlyHandled(id types.WhatsAppMessageID) bool { + start := portal.recentlyHandledIndex + for i := start; i != start; i = (i - 1) % 20 { + if portal.recentlyHandled[i] == id { + return true + } + } + return false +} + +func (portal *Portal) isDuplicate(id types.WhatsAppMessageID) bool { + msg := portal.bridge.DB.Message.GetByJID(portal.Key, id) + if msg != nil { + return true + } + return false +} + +func (portal *Portal) markHandled(jid types.WhatsAppMessageID, mxid types.MatrixEventID) { + msg := portal.bridge.DB.Message.New() + msg.Chat = portal.Key + msg.JID = jid + msg.MXID = mxid + msg.Insert() + + portal.recentlyHandledLock.Lock() + index := portal.recentlyHandledIndex + portal.recentlyHandledIndex = (portal.recentlyHandledIndex + 1) % 20 + portal.recentlyHandledLock.Unlock() + portal.recentlyHandled[index] = jid +} + +func (portal *Portal) startHandling(id types.WhatsAppMessageID) (*sync.Mutex, bool) { + if portal.isRecentlyHandled(id) { + return nil, false + } + lock := portal.getMessageLock(id) + lock.Lock() + if portal.isDuplicate(id) { + lock.Unlock() + return nil, false + } + return &lock, true +} + +func (portal *Portal) finishHandling(id types.WhatsAppMessageID, mxid types.MatrixEventID) { + portal.markHandled(id, mxid) + portal.deleteMessageLock(id) + portal.log.Debugln("Handled message", id, "->", mxid) } func (portal *Portal) SyncParticipants(metadata *whatsappExt.GroupInfo) { @@ -238,6 +314,8 @@ func (portal *Portal) Sync(user *User, contact whatsapp.Contact) { portal.log.Errorln("Failed to create portal room:", err) return } + } else { + portal.MainIntent().EnsureInvited(portal.MXID, user.MXID) } update := false @@ -382,22 +460,6 @@ func (portal *Portal) MainIntent() *appservice.IntentAPI { return portal.bridge.Bot } -func (portal *Portal) IsDuplicate(id types.WhatsAppMessageID) bool { - msg := portal.bridge.DB.Message.GetByJID(portal.Key, id) - if msg != nil { - return true - } - return false -} - -func (portal *Portal) MarkHandled(jid types.WhatsAppMessageID, mxid types.MatrixEventID) { - msg := portal.bridge.DB.Message.New() - msg.Chat = portal.Key - msg.JID = jid - msg.MXID = mxid - msg.Insert() -} - func (portal *Portal) GetMessageIntent(user *User, info whatsapp.MessageInfo) *appservice.IntentAPI { if info.FromMe { if portal.IsPrivateChat() { @@ -434,9 +496,11 @@ func (portal *Portal) SetReply(content *gomatrix.Content, info whatsapp.MessageI } func (portal *Portal) HandleTextMessage(source *User, message whatsapp.TextMessage) { - if portal.IsDuplicate(message.Info.Id) { + lock, ok := portal.startHandling(message.Info.Id) + if !ok { return } + defer lock.Unlock() err := portal.CreateMatrixRoom([]string{source.MXID}) if err != nil { @@ -463,14 +527,15 @@ func (portal *Portal) HandleTextMessage(source *User, message whatsapp.TextMessa portal.log.Errorfln("Failed to handle message %s: %v", message.Info.Id, err) return } - portal.MarkHandled(message.Info.Id, resp.EventID) - portal.log.Debugln("Handled message", message.Info.Id, "->", resp.EventID) + portal.finishHandling(message.Info.Id, resp.EventID) } func (portal *Portal) HandleMediaMessage(source *User, download func() ([]byte, error), thumbnail []byte, info whatsapp.MessageInfo, mimeType, caption string) { - if portal.IsDuplicate(info.Id) { + lock, ok := portal.startHandling(info.Id) + if !ok { return } + defer lock.Unlock() err := portal.CreateMatrixRoom([]string{source.MXID}) if err != nil { @@ -563,8 +628,7 @@ func (portal *Portal) HandleMediaMessage(source *User, download func() ([]byte, // TODO store caption mxid? } - portal.MarkHandled(info.Id, resp.EventID) - portal.log.Debugln("Handled message", info.Id, "->", resp.EventID) + portal.finishHandling(info.Id, resp.EventID) } func makeMessageID() *string { @@ -799,8 +863,8 @@ func (portal *Portal) HandleMatrixMessage(sender *User, evt *gomatrix.Event) { portal.log.Debugln("Unhandled Matrix event:", evt) return } + portal.markHandled(info.GetKey().GetId(), evt.ID) err = sender.Conn.Send(info) - portal.MarkHandled(info.GetKey().GetId(), evt.ID) if err != nil { portal.log.Errorfln("Error handling Matrix event %s: %v", evt.ID, err) } else { diff --git a/vendor/maunium.net/go/mautrix-appservice/intent.go b/vendor/maunium.net/go/mautrix-appservice/intent.go index 3aa864b..e940b3e 100644 --- a/vendor/maunium.net/go/mautrix-appservice/intent.go +++ b/vendor/maunium.net/go/mautrix-appservice/intent.go @@ -231,3 +231,13 @@ func (intent *IntentAPI) SetAvatarURL(avatarURL string) error { } return intent.Client.SetAvatarURL(avatarURL) } + +func (intent *IntentAPI) EnsureInvited(roomID, userID string) error { + if !intent.as.StateStore.IsInvited(roomID, userID) { + _, err := intent.Client.InviteUser(roomID, &gomatrix.ReqInviteUser{ + UserID: userID, + }) + return err + } + return nil +} \ No newline at end of file diff --git a/vendor/maunium.net/go/mautrix-appservice/statestore.go b/vendor/maunium.net/go/mautrix-appservice/statestore.go index 14f9b74..543d98d 100644 --- a/vendor/maunium.net/go/mautrix-appservice/statestore.go +++ b/vendor/maunium.net/go/mautrix-appservice/statestore.go @@ -30,6 +30,8 @@ func (as *AppService) UpdateState(evt *gomatrix.Event) { switch evt.Type { case gomatrix.StateMember: as.StateStore.SetMembership(evt.RoomID, evt.GetStateKey(), evt.Content.Membership) + case gomatrix.StatePowerLevels: + as.StateStore.SetPowerLevels(evt.RoomID, &evt.Content.PowerLevels) } }