Clean up personal filtering space support

This commit is contained in:
Tulir Asokan 2021-12-29 21:40:08 +02:00
parent 3a9b3ab41f
commit 7e5c2769c6
14 changed files with 179 additions and 210 deletions

View file

@ -32,6 +32,8 @@ type BridgeConfig struct {
UsernameTemplate string `yaml:"username_template"`
DisplaynameTemplate string `yaml:"displayname_template"`
PersonalFilteringSpaces bool `yaml:"personal_filtering_spaces"`
DeliveryReceipts bool `yaml:"delivery_receipts"`
PortalMessageBuffer int `yaml:"portal_message_buffer"`
CallStartNotices bool `yaml:"call_start_notices"`

View file

@ -59,6 +59,8 @@ type Config struct {
Username string `yaml:"username"`
Displayname string `yaml:"displayname"`
Avatar string `yaml:"avatar"`
ParsedAvatar id.ContentURI `yaml:"-"`
} `yaml:"bot"`
EphemeralEvents bool `yaml:"ephemeral_events"`

View file

@ -63,6 +63,7 @@ func (helper *UpgradeHelper) doUpgrade() {
helper.Copy(Str, "bridge", "username_template")
helper.Copy(Str, "bridge", "displayname_template")
helper.Copy(Bool, "bridge", "personal_filtering_spaces")
helper.Copy(Bool, "bridge", "delivery_receipts")
helper.Copy(Int, "bridge", "portal_message_buffer")
helper.Copy(Bool, "bridge", "call_start_notices")

View file

@ -1,12 +0,0 @@
package upgrades
import (
"database/sql"
)
func init() {
upgrades[32] = upgrade{"Store space in user table", func(tx *sql.Tx, ctx context) error {
_, err := tx.Exec(`ALTER TABLE "user" ADD COLUMN space_room TEXT NOT NULL DEFAULT ''`)
return err
}}
}

View file

@ -0,0 +1,16 @@
package upgrades
import (
"database/sql"
)
func init() {
upgrades[33] = upgrade{"Add personal filtering space info to user tables", func(tx *sql.Tx, ctx context) error {
_, err := tx.Exec(`ALTER TABLE "user" ADD COLUMN space_room TEXT NOT NULL DEFAULT ''`)
if err != nil {
return err
}
_, err = tx.Exec(`ALTER TABLE user_portal ADD COLUMN in_space BOOLEAN NOT NULL DEFAULT false`)
return err
}}
}

View file

@ -18,8 +18,11 @@ package database
import (
"database/sql"
"sync"
"time"
log "maunium.net/go/maulogger/v2"
"maunium.net/go/mautrix/id"
"go.mau.fi/whatsmeow/types"
@ -34,6 +37,9 @@ func (uq *UserQuery) New() *User {
return &User{
db: uq.db,
log: uq.log,
lastReadCache: make(map[PortalKey]time.Time),
inSpaceCache: make(map[PortalKey]bool),
}
}
@ -73,6 +79,11 @@ type User struct {
JID types.JID
ManagementRoom id.RoomID
SpaceRoom id.RoomID
lastReadCache map[PortalKey]time.Time
lastReadCacheLock sync.Mutex
inSpaceCache map[PortalKey]bool
inSpaceCacheLock sync.Mutex
}
func (user *User) Scan(row Scannable) *User {
@ -127,95 +138,3 @@ func (user *User) Update() {
user.log.Warnfln("Failed to update %s: %v", user.MXID, err)
}
}
//type PortalKeyWithMeta struct {
// PortalKey
// InCommunity bool
//}
//
//func (user *User) SetPortalKeys(newKeys []PortalKeyWithMeta) error {
// tx, err := user.db.Begin()
// if err != nil {
// return err
// }
// _, err = tx.Exec("DELETE FROM user_portal WHERE user_jid=$1", user.jidPtr())
// if err != nil {
// _ = tx.Rollback()
// return err
// }
// valueStrings := make([]string, len(newKeys))
// values := make([]interface{}, len(newKeys)*4)
// for i, key := range newKeys {
// pos := i * 4
// valueStrings[i] = fmt.Sprintf("($%d, $%d, $%d, $%d)", pos+1, pos+2, pos+3, pos+4)
// values[pos] = user.jidPtr()
// values[pos+1] = key.JID
// values[pos+2] = key.Receiver
// values[pos+3] = key.InCommunity
// }
// query := fmt.Sprintf("INSERT INTO user_portal (user_jid, portal_jid, portal_receiver, in_community) VALUES %s",
// strings.Join(valueStrings, ", "))
// _, err = tx.Exec(query, values...)
// if err != nil {
// _ = tx.Rollback()
// return err
// }
// return tx.Commit()
//}
//
//func (user *User) IsInPortal(key PortalKey) bool {
// row := user.db.QueryRow(`SELECT EXISTS(SELECT 1 FROM user_portal WHERE user_jid=$1 AND portal_jid=$2 AND portal_receiver=$3)`, user.jidPtr(), &key.JID, &key.Receiver)
// var exists bool
// _ = row.Scan(&exists)
// return exists
//}
//
//func (user *User) GetPortalKeys() []PortalKey {
// rows, err := user.db.Query(`SELECT portal_jid, portal_receiver FROM user_portal WHERE user_jid=$1`, user.jidPtr())
// if err != nil {
// user.log.Warnln("Failed to get user portal keys:", err)
// return nil
// }
// var keys []PortalKey
// for rows.Next() {
// var key PortalKey
// err = rows.Scan(&key.JID, &key.Receiver)
// if err != nil {
// user.log.Warnln("Failed to scan row:", err)
// continue
// }
// keys = append(keys, key)
// }
// return keys
//}
//
//func (user *User) GetInCommunityMap() map[PortalKey]bool {
// rows, err := user.db.Query(`SELECT portal_jid, portal_receiver, in_community FROM user_portal WHERE user_jid=$1`, user.jidPtr())
// if err != nil {
// user.log.Warnln("Failed to get user portal keys:", err)
// return nil
// }
// keys := make(map[PortalKey]bool)
// for rows.Next() {
// var key PortalKey
// var inCommunity bool
// err = rows.Scan(&key.JID, &key.Receiver, &inCommunity)
// if err != nil {
// user.log.Warnln("Failed to scan row:", err)
// continue
// }
// keys[key] = inCommunity
// }
// return keys
//}
//
//func (user *User) CreateUserPortal(newKey PortalKeyWithMeta) {
// user.log.Debugfln("Creating new portal %s for %s", newKey.PortalKey.JID, newKey.PortalKey.Receiver)
// _, err := user.db.Exec(`INSERT INTO user_portal (user_jid, portal_jid, portal_receiver, in_community) VALUES ($1, $2, $3, $4)`,
// user.jidPtr(),
// newKey.PortalKey.JID, newKey.PortalKey.Receiver,
// newKey.InCommunity)
// if err != nil {
// user.log.Warnfln("Failed to insert %s: %v", user.MXID, err)
// }
//}

View file

@ -19,37 +19,67 @@ package database
import (
"database/sql"
"errors"
"fmt"
"time"
)
func (user *User) GetLastReadTS(portal PortalKey) time.Time {
user.lastReadCacheLock.Lock()
defer user.lastReadCacheLock.Unlock()
if cached, ok := user.lastReadCache[portal]; ok {
return cached
}
var ts int64
err := user.db.QueryRow("SELECT last_read_ts FROM user_portal WHERE user_mxid=$1 AND portal_jid=$2 AND portal_receiver=$3", user.MXID, portal.JID, portal.Receiver).Scan(&ts)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
user.log.Warnfln("Failed to scan last read timestamp from user portal table: %v", err)
}
if ts == 0 {
return time.Time{}
user.lastReadCache[portal] = time.Time{}
} else {
user.lastReadCache[portal] = time.Unix(ts, 0)
}
return time.Unix(ts, 0)
return user.lastReadCache[portal]
}
func (user *User) SetLastReadTS(portal PortalKey, ts time.Time) {
var err error
if user.db.dialect == "postgres" {
_, err = user.db.Exec(`
user.lastReadCacheLock.Lock()
defer user.lastReadCacheLock.Unlock()
_, err := user.db.Exec(`
INSERT INTO user_portal (user_mxid, portal_jid, portal_receiver, last_read_ts) VALUES ($1, $2, $3, $4)
ON CONFLICT (user_mxid, portal_jid, portal_receiver) DO UPDATE SET last_read_ts=$4 WHERE user_portal.last_read_ts<$4
ON CONFLICT (user_mxid, portal_jid, portal_receiver) DO UPDATE SET last_read_ts=excluded.last_read_ts WHERE user_portal.last_read_ts<excluded.last_read_ts
`, user.MXID, portal.JID, portal.Receiver, ts.Unix())
} else if user.db.dialect == "sqlite3" {
_, err = user.db.Exec(
"INSERT OR REPLACE INTO user_portal (user_mxid, portal_jid, portal_receiver, last_read_ts) VALUES (?, ?, ?, ?)",
user.MXID, portal.JID, portal.Receiver, ts.Unix())
} else {
err = fmt.Errorf("unsupported dialect %s", user.db.dialect)
}
if err != nil {
user.log.Warnfln("Failed to update last read timestamp: %v", err)
} else {
user.lastReadCache[portal] = ts
}
}
func (user *User) IsInSpace(portal PortalKey) bool {
user.inSpaceCacheLock.Lock()
defer user.inSpaceCacheLock.Unlock()
if cached, ok := user.inSpaceCache[portal]; ok {
return cached
}
var inSpace bool
err := user.db.QueryRow("SELECT in_space FROM user_portal WHERE user_mxid=$1 AND portal_jid=$2 AND portal_receiver=$3", user.MXID, portal.JID, portal.Receiver).Scan(&inSpace)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
user.log.Warnfln("Failed to scan in space status from user portal table: %v", err)
}
user.inSpaceCache[portal] = inSpace
return inSpace
}
func (user *User) MarkInSpace(portal PortalKey) {
user.inSpaceCacheLock.Lock()
defer user.inSpaceCacheLock.Unlock()
_, err := user.db.Exec(`
INSERT INTO user_portal (user_mxid, portal_jid, portal_receiver, in_space) VALUES ($1, $2, $3, true)
ON CONFLICT (user_mxid, portal_jid, portal_receiver) DO UPDATE SET in_space=true
`, user.MXID, portal.JID, portal.Receiver)
if err != nil {
user.log.Warnfln("Failed to update in space status: %v", err)
} else {
user.inSpaceCache[portal] = true
}
}

View file

@ -94,6 +94,8 @@ bridge:
# {{.FullName}} - full name from contact list
# {{.FirstName}} - first name from contact list
displayname_template: "{{if .PushName}}{{.PushName}}{{else if .BusinessName}}{{.BusinessName}}{{else}}{{.JID}}{{end}} (WA)"
# Should the bridge create a space for each logged-in user and add bridged rooms to it?
personal_filtering_spaces: false
# Should the bridge send a read receipt from the bridge bot when a message has been sent to WhatsApp?
delivery_receipts: false
# Should incoming calls send a message to the Matrix room?

2
go.mod
View file

@ -14,7 +14,7 @@ require (
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
maunium.net/go/mauflag v1.0.0
maunium.net/go/maulogger/v2 v2.3.1
maunium.net/go/mautrix v0.10.8-0.20211222205631-95662fc3f0f3
maunium.net/go/mautrix v0.10.8-0.20211229165408-8916ec32ce52
)
require (

4
go.sum
View file

@ -221,5 +221,5 @@ maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M=
maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA=
maunium.net/go/maulogger/v2 v2.3.1 h1:fwBYJne0pHvJrrIPHK+TAPfyxxbBEz46oVGez2x0ODE=
maunium.net/go/maulogger/v2 v2.3.1/go.mod h1:TYWy7wKwz/tIXTpsx8G3mZseIRiC5DoMxSZazOHy68A=
maunium.net/go/mautrix v0.10.8-0.20211222205631-95662fc3f0f3 h1:/mhoJYRbniwUh9OEwBJzZPCtEGvpiCWsO+MbPzbqE1o=
maunium.net/go/mautrix v0.10.8-0.20211222205631-95662fc3f0f3/go.mod h1:k4Ng5oci83MEbqPL6KOjPdbU7f8v01KlMjR/zTQ+7mA=
maunium.net/go/mautrix v0.10.8-0.20211229165408-8916ec32ce52 h1:TSbyQDCDTaCUeB0TiWT5TotTqgHc2UDM6ke1SH5O9Dg=
maunium.net/go/mautrix v0.10.8-0.20211229165408-8916ec32ce52/go.mod h1:k4Ng5oci83MEbqPL6KOjPdbU7f8v01KlMjR/zTQ+7mA=

View file

@ -176,6 +176,8 @@ func (user *User) handleHistorySyncConversation(index int, conv *waProto.Convers
user.log.Warnfln("Failed to create room for %s during backfill: %v", portal.Key.JID, err)
return
}
} else {
portal.UpdateMatrixRoom(user, nil)
}
if !user.bridge.Config.Bridge.HistorySync.Backfill {
user.log.Debugln("Backfill is disabled, not bridging history sync payload for", portal.Key.JID)

View file

@ -356,7 +356,7 @@ func (bridge *Bridge) ResendBridgeInfo() {
func (bridge *Bridge) UpdateBotProfile() {
bridge.Log.Debugln("Updating bot profile")
botConfig := bridge.Config.AppService.Bot
botConfig := &bridge.Config.AppService.Bot
var err error
var mxc id.ContentURI
@ -367,6 +367,7 @@ func (bridge *Bridge) UpdateBotProfile() {
if err == nil {
err = bridge.Bot.SetAvatarURL(mxc)
}
botConfig.ParsedAvatar = mxc
}
if err != nil {
bridge.Log.Warnln("Failed to update bot avatar:", err)

View file

@ -805,39 +805,8 @@ func (portal *Portal) ensureMXIDInvited(mxid id.UserID) {
}
}
func (portal *Portal) ensureUserInvited(user *User) (ok bool) {
inviteContent := event.Content{
Parsed: &event.MemberEventContent{
Membership: event.MembershipInvite,
IsDirect: portal.IsPrivateChat(),
},
Raw: map[string]interface{}{},
}
customPuppet := portal.bridge.GetPuppetByCustomMXID(user.MXID)
if customPuppet != nil && customPuppet.CustomIntent() != nil {
inviteContent.Raw["fi.mau.will_auto_accept"] = true
}
_, err := portal.MainIntent().SendStateEvent(portal.MXID, event.StateMember, user.MXID.String(), &inviteContent)
var httpErr mautrix.HTTPError
if err != nil && errors.As(err, &httpErr) && httpErr.RespError != nil && strings.Contains(httpErr.RespError.Err, "is already in the room") {
portal.bridge.StateStore.SetMembership(portal.MXID, user.MXID, event.MembershipJoin)
ok = true
} else if err != nil {
portal.log.Warnfln("Failed to invite %s: %v", user.MXID, err)
} else {
ok = true
}
if customPuppet != nil && customPuppet.CustomIntent() != nil {
err = customPuppet.CustomIntent().EnsureJoined(portal.MXID)
if err != nil {
portal.log.Warnfln("Failed to auto-join portal as %s: %v", user.MXID, err)
ok = false
} else {
ok = true
}
}
return
func (portal *Portal) ensureUserInvited(user *User) bool {
return user.ensureInvited(portal.MainIntent(), portal.MXID, portal.IsPrivateChat())
}
func (portal *Portal) UpdateMatrixRoom(user *User, groupInfo *types.GroupInfo) bool {
@ -847,6 +816,7 @@ func (portal *Portal) UpdateMatrixRoom(user *User, groupInfo *types.GroupInfo) b
portal.log.Infoln("Syncing portal for", user.MXID)
portal.ensureUserInvited(user)
go portal.addToSpace(user)
update := false
update = portal.UpdateMetadata(user, groupInfo) || update
@ -971,7 +941,7 @@ func (portal *Portal) getBridgeInfo() (string, event.BridgeEventContent) {
Protocol: event.BridgeInfoSection{
ID: "whatsapp",
DisplayName: "WhatsApp",
AvatarURL: id.ContentURIString(portal.bridge.Config.AppService.Bot.Avatar),
AvatarURL: portal.bridge.Config.AppService.Bot.ParsedAvatar.CUString(),
ExternalURL: "https://www.whatsapp.com/",
},
Channel: event.BridgeInfoSection{
@ -1141,7 +1111,7 @@ func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, i
portal.ensureUserInvited(user)
user.syncChatDoublePuppetDetails(portal, true)
portal.addToSpace(user.getSpaceRoom(), portal.MXID, portal.bridge.Config.Homeserver.Domain)
go portal.addToSpace(user)
if groupInfo != nil {
portal.SyncParticipants(user, groupInfo)
@ -1178,14 +1148,20 @@ func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, i
return nil
}
func (portal *Portal) addToSpace(spaceID id.RoomID, portalID id.RoomID, homeserverDomain string) {
parentSpaceContent := make(map[string]interface{})
parentSpaceContent["via"] = []string{homeserverDomain}
portal.log.Debugfln("adding room %s to the space %s", portalID, spaceID)
portal.MainIntent().SendStateEvent(spaceID, event.Type{Type: "m.space.child", Class: event.StateEventType}, portalID.String(), parentSpaceContent)
func (portal *Portal) addToSpace(user *User) {
spaceID := user.GetSpaceRoom()
if len(spaceID) == 0 || user.IsInSpace(portal.Key) {
return
}
_, err := portal.bridge.Bot.SendStateEvent(spaceID, event.StateSpaceChild, portal.MXID.String(), &event.SpaceChildEventContent{
Via: []string{portal.bridge.Config.Homeserver.Domain},
})
if err != nil {
portal.log.Errorfln("Failed to add room to %s's personal filtering space (%s): %v", user.MXID, spaceID, err)
} else {
portal.log.Debugfln("Added room to %s's personal filtering space (%s)", user.MXID, spaceID)
user.MarkInSpace(portal.Key)
}
}
func (portal *Portal) IsPrivateChat() bool {

120
user.go
View file

@ -23,6 +23,7 @@ import (
"fmt"
"net/http"
"strconv"
"strings"
"sync"
"time"
@ -64,6 +65,8 @@ type User struct {
historySyncs chan *events.HistorySync
prevBridgeStatus *BridgeState
lastPresence types.Presence
spaceMembershipChecked bool
}
func (bridge *Bridge) getUserByMXID(userID id.UserID, onlyIfExists bool) *User {
@ -180,56 +183,83 @@ func (bridge *Bridge) NewUser(dbUser *database.User) *User {
return user
}
func (user *User) getSpaceRoom() id.RoomID {
var roomID id.RoomID
if len(user.SpaceRoom) == 0 {
//TODO check if Spaces creation is enabled by config
//Create Space
user.log.Debugln("Locking to create space.")
user.spaceCreateLock.Lock()
defer user.spaceCreateLock.Unlock()
if len(user.SpaceRoom) != 0 {
roomID = user.SpaceRoom
user.log.Debugln("Returning space after lock" + user.SpaceRoom)
} else {
creationContent := make(map[string]interface{})
creationContent["type"] = "m.space"
user.log.Debugln("Creating a new space for the user")
user.log.Debugln("Inviting user " + user.MXID)
var invite []id.UserID
invite = append(invite, user.MXID)
resp, err := user.bridge.Bot.CreateRoom(&mautrix.ReqCreateRoom{
Visibility: "private",
Name: "WhatsApp",
Topic: "WhatsApp bridge Space",
Invite: invite,
CreationContent: creationContent,
})
if err != nil {
user.log.Errorln("Failed to auto-create space room:", err)
} else {
user.setSpaceRoom(resp.RoomID)
roomID = resp.RoomID
}
}
func (user *User) ensureInvited(intent *appservice.IntentAPI, roomID id.RoomID, isDirect bool) (ok bool) {
inviteContent := event.Content{
Parsed: &event.MemberEventContent{
Membership: event.MembershipInvite,
IsDirect: isDirect,
},
Raw: map[string]interface{}{},
}
customPuppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
if customPuppet != nil && customPuppet.CustomIntent() != nil {
inviteContent.Raw["fi.mau.will_auto_accept"] = true
}
_, err := intent.SendStateEvent(roomID, event.StateMember, user.MXID.String(), &inviteContent)
var httpErr mautrix.HTTPError
if err != nil && errors.As(err, &httpErr) && httpErr.RespError != nil && strings.Contains(httpErr.RespError.Err, "is already in the room") {
user.bridge.StateStore.SetMembership(roomID, user.MXID, event.MembershipJoin)
ok = true
} else if err != nil {
user.log.Warnfln("Failed to invite user to %s: %v", roomID, err)
} else {
user.log.Debugln("Space found" + user.SpaceRoom)
roomID = user.SpaceRoom
ok = true
}
return roomID
if customPuppet != nil && customPuppet.CustomIntent() != nil {
err = customPuppet.CustomIntent().EnsureJoined(roomID)
if err != nil {
user.log.Warnfln("Failed to auto-join %s: %v", roomID, err)
ok = false
} else {
ok = true
}
}
return
}
func (user *User) setSpaceRoom(spaceID id.RoomID) {
user.SpaceRoom = spaceID
user.bridge.spaceRooms[user.SpaceRoom] = user
user.Update()
func (user *User) GetSpaceRoom() id.RoomID {
if !user.bridge.Config.Bridge.PersonalFilteringSpaces {
return ""
}
if len(user.SpaceRoom) == 0 {
user.spaceCreateLock.Lock()
defer user.spaceCreateLock.Unlock()
if len(user.SpaceRoom) > 0 {
return user.SpaceRoom
}
resp, err := user.bridge.Bot.CreateRoom(&mautrix.ReqCreateRoom{
Visibility: "private",
Name: "WhatsApp",
Topic: "Your WhatsApp bridged chats",
InitialState: []*event.Event{{
Type: event.StateRoomAvatar,
Content: event.Content{
Parsed: &event.RoomAvatarEventContent{
URL: user.bridge.Config.AppService.Bot.ParsedAvatar,
},
},
}},
CreationContent: map[string]interface{}{
"type": event.RoomTypeSpace,
},
})
if err != nil {
user.log.Errorln("Failed to auto-create space room:", err)
} else {
user.SpaceRoom = resp.RoomID
user.Update()
user.ensureInvited(user.bridge.Bot, user.SpaceRoom, false)
}
} else if !user.spaceMembershipChecked && !user.bridge.StateStore.IsInRoom(user.SpaceRoom, user.MXID) {
user.ensureInvited(user.bridge.Bot, user.SpaceRoom, false)
}
user.spaceMembershipChecked = true
return user.SpaceRoom
}
func (user *User) GetManagementRoom() id.RoomID {