From 19b4c3093a41e42d7bd4bfb7fff160bc82348acf Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Tue, 28 Jun 2022 16:22:10 +0300 Subject: [PATCH] Apply periodic resync for group chats too --- database/portal.go | 35 ++++-- database/upgrades/00-latest-revision.sql | 3 +- .../upgrades/51-portal-background-sync.sql | 3 + go.mod | 2 +- go.sum | 4 +- portal.go | 82 +++++++++----- puppet.go | 42 ++------ user.go | 100 +++++++++++------- 8 files changed, 155 insertions(+), 116 deletions(-) create mode 100644 database/upgrades/51-portal-background-sync.sql diff --git a/database/portal.go b/database/portal.go index a0fe982..eefa71f 100644 --- a/database/portal.go +++ b/database/portal.go @@ -19,6 +19,7 @@ package database import ( "database/sql" "fmt" + "time" log "maunium.net/go/maulogger/v2" @@ -64,7 +65,7 @@ func (pq *PortalQuery) New() *Portal { } } -const portalColumns = "jid, receiver, mxid, name, name_set, topic, topic_set, avatar, avatar_url, avatar_set, encrypted, first_event_id, next_batch_id, relay_user_id, expiration_time" +const portalColumns = "jid, receiver, mxid, name, name_set, topic, topic_set, avatar, avatar_url, avatar_set, encrypted, last_sync, first_event_id, next_batch_id, relay_user_id, expiration_time" func (pq *PortalQuery) GetAll() []*Portal { return pq.getAll(fmt.Sprintf("SELECT %s FROM portal", portalColumns)) @@ -142,6 +143,7 @@ type Portal struct { AvatarURL id.ContentURI AvatarSet bool Encrypted bool + LastSync time.Time FirstEventID id.EventID NextBatchID id.BatchID @@ -153,13 +155,17 @@ type Portal struct { func (portal *Portal) Scan(row dbutil.Scannable) *Portal { var mxid, avatarURL, firstEventID, nextBatchID, relayUserID sql.NullString - err := row.Scan(&portal.Key.JID, &portal.Key.Receiver, &mxid, &portal.Name, &portal.NameSet, &portal.Topic, &portal.TopicSet, &portal.Avatar, &avatarURL, &portal.AvatarSet, &portal.Encrypted, &firstEventID, &nextBatchID, &relayUserID, &portal.ExpirationTime) + var lastSyncTs int64 + err := row.Scan(&portal.Key.JID, &portal.Key.Receiver, &mxid, &portal.Name, &portal.NameSet, &portal.Topic, &portal.TopicSet, &portal.Avatar, &avatarURL, &portal.AvatarSet, &portal.Encrypted, &lastSyncTs, &firstEventID, &nextBatchID, &relayUserID, &portal.ExpirationTime) if err != nil { if err != sql.ErrNoRows { portal.log.Errorln("Database scan failed:", err) } return nil } + if lastSyncTs > 0 { + portal.LastSync = time.Unix(lastSyncTs, 0) + } portal.MXID = id.RoomID(mxid.String) portal.AvatarURL, _ = id.ParseContentURI(avatarURL.String) portal.FirstEventID = id.EventID(firstEventID.String) @@ -182,15 +188,22 @@ func (portal *Portal) relayUserPtr() *id.UserID { return nil } +func (portal *Portal) lastSyncTs() int64 { + if portal.LastSync.IsZero() { + return 0 + } + return portal.LastSync.Unix() +} + func (portal *Portal) Insert() { _, err := portal.db.Exec(` INSERT INTO portal (jid, receiver, mxid, name, name_set, topic, topic_set, avatar, avatar_url, avatar_set, - encrypted, first_event_id, next_batch_id, relay_user_id, expiration_time) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) + encrypted, last_sync, first_event_id, next_batch_id, relay_user_id, expiration_time) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) `, portal.Key.JID, portal.Key.Receiver, portal.mxidPtr(), portal.Name, portal.NameSet, portal.Topic, portal.TopicSet, - portal.Avatar, portal.AvatarURL.String(), portal.AvatarSet, portal.Encrypted, portal.FirstEventID.String(), - portal.NextBatchID.String(), portal.relayUserPtr(), portal.ExpirationTime) + portal.Avatar, portal.AvatarURL.String(), portal.AvatarSet, portal.Encrypted, portal.lastSyncTs(), + portal.FirstEventID.String(), portal.NextBatchID.String(), portal.relayUserPtr(), portal.ExpirationTime) if err != nil { portal.log.Warnfln("Failed to insert %s: %v", portal.Key, err) } @@ -200,13 +213,13 @@ func (portal *Portal) Update(txn *sql.Tx) { query := ` UPDATE portal SET mxid=$1, name=$2, name_set=$3, topic=$4, topic_set=$5, avatar=$6, avatar_url=$7, avatar_set=$8, - encrypted=$9, first_event_id=$10, next_batch_id=$11, relay_user_id=$12, expiration_time=$13 - WHERE jid=$14 AND receiver=$15 + encrypted=$9, last_sync=$10, first_event_id=$11, next_batch_id=$12, relay_user_id=$13, expiration_time=$14 + WHERE jid=$15 AND receiver=$16 ` args := []interface{}{ - portal.mxidPtr(), portal.Name, portal.Topic, portal.Avatar, portal.AvatarURL.String(), portal.Encrypted, - portal.FirstEventID.String(), portal.NextBatchID.String(), portal.relayUserPtr(), portal.ExpirationTime, - portal.Key.JID, portal.Key.Receiver, + portal.mxidPtr(), portal.Name, portal.NameSet, portal.Topic, portal.TopicSet, portal.Avatar, portal.AvatarURL.String(), + portal.AvatarSet, portal.Encrypted, portal.lastSyncTs(), portal.FirstEventID.String(), portal.NextBatchID.String(), + portal.relayUserPtr(), portal.ExpirationTime, portal.Key.JID, portal.Key.Receiver, } var err error if txn != nil { diff --git a/database/upgrades/00-latest-revision.sql b/database/upgrades/00-latest-revision.sql index 36860f5..1bf27c4 100644 --- a/database/upgrades/00-latest-revision.sql +++ b/database/upgrades/00-latest-revision.sql @@ -1,4 +1,4 @@ --- v0 -> v50: Latest revision +-- v0 -> v51: Latest revision CREATE TABLE "user" ( mxid TEXT PRIMARY KEY, @@ -27,6 +27,7 @@ CREATE TABLE portal ( avatar_url TEXT, avatar_set BOOLEAN NOT NULL DEFAULT false, encrypted BOOLEAN NOT NULL DEFAULT false, + last_sync BIGINT NOT NULL DEFAULT 0, first_event_id TEXT, next_batch_id TEXT, diff --git a/database/upgrades/51-portal-background-sync.sql b/database/upgrades/51-portal-background-sync.sql new file mode 100644 index 0000000..ba0bd0b --- /dev/null +++ b/database/upgrades/51-portal-background-sync.sql @@ -0,0 +1,3 @@ +-- v51: Add last sync timestamp for portals too + +ALTER TABLE portal ADD COLUMN last_sync BIGINT NOT NULL DEFAULT 0; diff --git a/go.mod b/go.mod index 397da98..9f6b56c 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/prometheus/client_golang v1.12.2-0.20220613221938-ebd77f036066 github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e github.com/tidwall/gjson v1.14.1 - go.mau.fi/whatsmeow v0.0.0-20220628090541-c0ed326fe088 + go.mau.fi/whatsmeow v0.0.0-20220628131901-3f187acf2229 golang.org/x/image v0.0.0-20220617043117-41969df76e82 golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e google.golang.org/protobuf v1.28.0 diff --git a/go.sum b/go.sum index d12d573..ff5cb65 100644 --- a/go.sum +++ b/go.sum @@ -64,8 +64,8 @@ github.com/yuin/goldmark v1.4.12 h1:6hffw6vALvEDqJ19dOJvJKOoAOKe4NDaTqvd2sktGN0= github.com/yuin/goldmark v1.4.12/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.mau.fi/libsignal v0.0.0-20220628090436-4d18b66b087e h1:ByHDg+D+dMIGuBA2n+1xOUf4xr3FJFYg8yxl06s1YBE= go.mau.fi/libsignal v0.0.0-20220628090436-4d18b66b087e/go.mod h1:RCdzkTWSJv0AKGqurzPXJsEGIVMuQps3E/h7CMUPous= -go.mau.fi/whatsmeow v0.0.0-20220628090541-c0ed326fe088 h1:DjPQQ41SjbQpTJ5QvjjN7OatCnb8yr4qKyBAwram6+E= -go.mau.fi/whatsmeow v0.0.0-20220628090541-c0ed326fe088/go.mod h1:hsjqq2xLuoFew8vbsDCJcGf5EbXCRcR/yoQ+87w6m3k= +go.mau.fi/whatsmeow v0.0.0-20220628131901-3f187acf2229 h1:iTJ65cHF4PKn9pTUnbPus6FdFeLXSOvm04FwJwq11hA= +go.mau.fi/whatsmeow v0.0.0-20220628131901-3f187acf2229/go.mod h1:hsjqq2xLuoFew8vbsDCJcGf5EbXCRcR/yoQ+87w6m3k= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/image v0.0.0-20220617043117-41969df76e82 h1:KpZB5pUSBvrHltNEdK/tw0xlPeD13M6M6aGP32gKqiw= diff --git a/portal.go b/portal.go index 62f5de2..9155a55 100644 --- a/portal.go +++ b/portal.go @@ -823,6 +823,7 @@ func (portal *Portal) getMessagePuppet(user *User, info *types.MessageInfo) (pup portal.log.Warnfln("Message %+v doesn't seem to have a valid sender (%s): puppet is nil", *info, info.Sender) return nil } + user.EnqueuePortalResync(portal) puppet.SyncContact(user, true, true, "handling message") return puppet } @@ -929,49 +930,71 @@ func (portal *Portal) SyncParticipants(source *User, metadata *types.GroupInfo) portal.kickExtraUsers(participantMap) } +func (user *User) updateAvatar(jid types.JID, avatarID *string, avatarURL *id.ContentURI, avatarSet *bool, log log.Logger, intent *appservice.IntentAPI) bool { + currentID := "" + if *avatarSet && *avatarID != "remove" && *avatarID != "unauthorized" { + currentID = *avatarID + } + avatar, err := user.Client.GetProfilePictureInfo(jid, false, currentID) + if errors.Is(err, whatsmeow.ErrProfilePictureUnauthorized) { + if *avatarID == "" { + *avatarID = "unauthorized" + *avatarSet = false + return true + } + return false + } else if errors.Is(err, whatsmeow.ErrProfilePictureNotSet) { + *avatarURL = id.ContentURI{} + avatar = &types.ProfilePictureInfo{ID: "remove"} + // Fall through to the rest of the avatar handling code + } else if err != nil { + log.Warnln("Failed to get avatar URL:", err) + return false + } else if avatar == nil { + // Avatar hasn't changed + return false + } + if avatar.ID == *avatarID && *avatarSet { + return false + } else if len(avatar.URL) == 0 { + log.Warnln("Didn't get URL in response to avatar query") + return false + } else if avatar.ID != *avatarID || avatarURL.IsEmpty() { + url, err := reuploadAvatar(intent, avatar.URL) + if err != nil { + log.Warnln("Failed to reupload avatar:", err) + return false + } + *avatarURL = url + } + *avatarID = avatar.ID + *avatarSet = false + return true +} + func (portal *Portal) UpdateAvatar(user *User, setBy types.JID, updateInfo bool) bool { portal.avatarLock.Lock() defer portal.avatarLock.Unlock() - avatar, err := user.Client.GetProfilePictureInfo(portal.Key.JID, false) - if err != nil { - if !errors.Is(err, whatsmeow.ErrProfilePictureUnauthorized) { - portal.log.Warnln("Failed to get avatar URL:", err) + changed := user.updateAvatar(portal.Key.JID, &portal.Avatar, &portal.AvatarURL, &portal.AvatarSet, portal.log, portal.MainIntent()) + if !changed || portal.Avatar == "unauthorized" { + if changed || updateInfo { + portal.Update(nil) } - return false - } else if avatar == nil { - if portal.Avatar == "remove" && portal.AvatarSet { - return false - } - portal.AvatarURL = id.ContentURI{} - avatar = &types.ProfilePictureInfo{ID: "remove"} - } else if avatar.ID == portal.Avatar && portal.AvatarSet { - return false - } else if len(avatar.URL) == 0 { - portal.log.Warnln("Didn't get URL in response to avatar query") - return false - } else { - url, err := reuploadAvatar(portal.MainIntent(), avatar.URL) - if err != nil { - portal.log.Warnln("Failed to reupload avatar:", err) - return false - } - portal.AvatarURL = url + return changed } - portal.Avatar = avatar.ID - portal.AvatarSet = false if len(portal.MXID) > 0 { intent := portal.MainIntent() if !setBy.IsEmpty() { intent = portal.bridge.GetPuppetByJID(setBy).IntentFor(portal) } - _, err = intent.SetRoomAvatar(portal.MXID, portal.AvatarURL) + _, err := intent.SetRoomAvatar(portal.MXID, portal.AvatarURL) if errors.Is(err, mautrix.MForbidden) && intent != portal.MainIntent() { _, err = portal.MainIntent().SetRoomAvatar(portal.MXID, portal.AvatarURL) } if err != nil { portal.log.Warnln("Failed to set room avatar:", err) - return false + return true } else { portal.AvatarSet = true } @@ -1111,10 +1134,11 @@ func (portal *Portal) UpdateMatrixRoom(user *User, groupInfo *types.GroupInfo) b update := false update = portal.UpdateMetadata(user, groupInfo) || update - if !portal.IsPrivateChat() && !portal.IsBroadcastList() && portal.Avatar == "" { + if !portal.IsPrivateChat() && !portal.IsBroadcastList() { update = portal.UpdateAvatar(user, types.EmptyJID, false) || update } - if update { + if update || portal.LastSync.Add(24*time.Hour).Before(time.Now()) { + portal.LastSync = time.Now() portal.Update(nil) portal.UpdateBridgeInfo() } diff --git a/puppet.go b/puppet.go index df73a47..c84cb5f 100644 --- a/puppet.go +++ b/puppet.go @@ -17,7 +17,6 @@ package main import ( - "errors" "fmt" "io" "net/http" @@ -25,7 +24,6 @@ import ( "sync" "time" - "go.mau.fi/whatsmeow" "go.mau.fi/whatsmeow/types" log "maunium.net/go/maulogger/v2" @@ -238,44 +236,16 @@ func reuploadAvatar(intent *appservice.IntentAPI, url string) (id.ContentURI, er } func (puppet *Puppet) UpdateAvatar(source *User) bool { - avatar, err := source.Client.GetProfilePictureInfo(puppet.JID, false) - if err != nil { - if !errors.Is(err, whatsmeow.ErrProfilePictureUnauthorized) { - puppet.log.Warnln("Failed to get avatar URL:", err) - } else if puppet.Avatar == "" { - puppet.Avatar = "unauthorized" - puppet.AvatarSet = false - return true - } - return false - } else if avatar == nil { - if puppet.Avatar == "remove" && puppet.AvatarSet { - return false - } - puppet.AvatarURL = id.ContentURI{} - avatar = &types.ProfilePictureInfo{ID: "remove"} - } else if avatar.ID == puppet.Avatar && puppet.AvatarSet { - return false - } else if len(avatar.URL) == 0 { - puppet.log.Warnln("Didn't get URL in response to avatar query") - return false - } else if avatar.ID != puppet.Avatar || puppet.AvatarURL.IsEmpty() { - url, err := reuploadAvatar(puppet.DefaultIntent(), avatar.URL) - if err != nil { - puppet.log.Warnln("Failed to reupload avatar:", err) - return false - } - - puppet.AvatarURL = url + oldAvatarID := puppet.Avatar + changed := source.updateAvatar(puppet.JID, &puppet.Avatar, &puppet.AvatarURL, &puppet.AvatarSet, puppet.log, puppet.DefaultIntent()) + if !changed || puppet.Avatar == "unauthorized" { + return changed } - puppet.Avatar = avatar.ID - puppet.AvatarSet = false - - err = puppet.DefaultIntent().SetAvatarURL(puppet.AvatarURL) + err := puppet.DefaultIntent().SetAvatarURL(puppet.AvatarURL) if err != nil { puppet.log.Warnln("Failed to set avatar:", err) } else { - puppet.log.Debugln("Updated avatar", puppet.Avatar, "->", avatar.ID) + puppet.log.Debugln("Updated avatar", oldAvatarID, "->", puppet.Avatar) puppet.AvatarSet = true } go puppet.updatePortalAvatar() diff --git a/user.go b/user.go index 50d5be9..4264fad 100644 --- a/user.go +++ b/user.go @@ -85,10 +85,14 @@ type User struct { BackfillQueue *BackfillQueue BridgeState *bridge.BridgeStateQueue - puppetResyncQueue []*Puppet - puppetResyncQueueDedup map[types.JID]struct{} - puppetResyncQueueLock sync.Mutex - nextPuppetResync time.Time + resyncQueue map[types.JID]resyncQueueItem + resyncQueueLock sync.Mutex + nextResync time.Time +} + +type resyncQueueItem struct { + portal *Portal + puppet *Puppet } func (br *WABridge) getUserByMXID(userID id.UserID, onlyIfExists bool) *User { @@ -223,7 +227,7 @@ func (br *WABridge) NewUser(dbUser *database.User) *User { historySyncs: make(chan *events.HistorySync, 32), lastPresence: types.PresenceUnavailable, - puppetResyncQueueDedup: make(map[types.JID]struct{}), + resyncQueue: make(map[types.JID]resyncQueueItem), } user.PermissionLevel = user.bridge.Config.Bridge.Permissions.Get(user.MXID) @@ -235,65 +239,89 @@ func (br *WABridge) NewUser(dbUser *database.User) *User { return user } -const puppetSyncMinInterval = 7 * 24 * time.Hour -const puppetSyncLoopInterval = 4 * time.Hour +const resyncMinInterval = 7 * 24 * time.Hour +const resyncLoopInterval = 4 * time.Hour func (user *User) puppetResyncLoop() { - user.nextPuppetResync = time.Now().Add(puppetSyncLoopInterval).Add(-time.Duration(rand.Intn(3600)) * time.Second) + user.nextResync = time.Now().Add(resyncLoopInterval).Add(-time.Duration(rand.Intn(3600)) * time.Second) for { - time.Sleep(user.nextPuppetResync.Sub(time.Now())) - user.nextPuppetResync = time.Now().Add(puppetSyncLoopInterval) + time.Sleep(user.nextResync.Sub(time.Now())) + user.nextResync = time.Now().Add(resyncLoopInterval) user.doPuppetResync() } } func (user *User) EnqueuePuppetResync(puppet *Puppet) { - if puppet.LastSync.Add(puppetSyncMinInterval).After(time.Now()) { + if puppet.LastSync.Add(resyncMinInterval).After(time.Now()) { return } - user.puppetResyncQueueLock.Lock() - if _, exists := user.puppetResyncQueueDedup[puppet.JID]; !exists { - user.puppetResyncQueueDedup[puppet.JID] = struct{}{} - user.puppetResyncQueue = append(user.puppetResyncQueue, puppet) - user.log.Infofln("Enqueued resync for %s (next sync in %s)", puppet.JID, user.nextPuppetResync.Sub(time.Now())) + user.resyncQueueLock.Lock() + if _, exists := user.resyncQueue[puppet.JID]; !exists { + user.resyncQueue[puppet.JID] = resyncQueueItem{puppet: puppet} + user.log.Debugfln("Enqueued resync for %s (next sync in %s)", puppet.JID, user.nextResync.Sub(time.Now())) } - user.puppetResyncQueueLock.Unlock() + user.resyncQueueLock.Unlock() +} + +func (user *User) EnqueuePortalResync(portal *Portal) { + if portal.IsPrivateChat() || portal.LastSync.Add(resyncMinInterval).After(time.Now()) { + return + } + user.resyncQueueLock.Lock() + if _, exists := user.resyncQueue[portal.Key.JID]; !exists { + user.resyncQueue[portal.Key.JID] = resyncQueueItem{portal: portal} + user.log.Debugfln("Enqueued resync for %s (next sync in %s)", portal.Key.JID, user.nextResync.Sub(time.Now())) + } + user.resyncQueueLock.Unlock() } func (user *User) doPuppetResync() { if !user.IsLoggedIn() { return } - user.puppetResyncQueueLock.Lock() - if len(user.puppetResyncQueue) == 0 { - user.puppetResyncQueueLock.Unlock() + user.resyncQueueLock.Lock() + if len(user.resyncQueue) == 0 { + user.resyncQueueLock.Unlock() return } - queue := user.puppetResyncQueue - user.puppetResyncQueue = nil - user.puppetResyncQueueDedup = make(map[types.JID]struct{}) - user.puppetResyncQueueLock.Unlock() - var jids []types.JID - var filteredPuppets []*Puppet - for _, puppet := range queue { - if puppet.LastSync.Add(puppetSyncMinInterval).After(time.Now()) { - user.log.Debugfln("Not resyncing %s, last sync was %s ago", puppet.JID, time.Now().Sub(puppet.LastSync)) + queue := user.resyncQueue + user.resyncQueue = make(map[types.JID]resyncQueueItem) + user.resyncQueueLock.Unlock() + var puppetJIDs []types.JID + var puppets []*Puppet + var portals []*Portal + for jid, item := range queue { + var lastSync time.Time + if item.puppet != nil { + lastSync = item.puppet.LastSync + } else if item.portal != nil { + lastSync = item.portal.LastSync + } + if lastSync.Add(resyncMinInterval).After(time.Now()) { + user.log.Debugfln("Not resyncing %s, last sync was %s ago", jid, time.Now().Sub(lastSync)) continue } - jids = append(jids, puppet.JID) - filteredPuppets = append(filteredPuppets, puppet) + if item.puppet != nil { + puppets = append(puppets, item.puppet) + puppetJIDs = append(puppetJIDs, jid) + } else if item.portal != nil { + portals = append(portals, item.portal) + } } - if len(jids) == 0 { - user.log.Debugfln("Skipping background sync, all puppets in queue have been synced in the past 3 days") + for _, portal := range portals { + user.log.Debugfln("Doing background sync for %s", portal.Key.JID) + portal.UpdateMatrixRoom(user, nil) + } + if len(puppetJIDs) == 0 { return } - user.log.Debugfln("Doing background sync for %+v", jids) - infos, err := user.Client.GetUserInfo(jids) + user.log.Debugfln("Doing background sync for users: %+v", puppetJIDs) + infos, err := user.Client.GetUserInfo(puppetJIDs) if err != nil { user.log.Errorfln("Error getting user info for background sync: %v", err) return } - for _, puppet := range filteredPuppets { + for _, puppet := range puppets { info, ok := infos[puppet.JID] if !ok { user.log.Warnfln("Didn't get info for %s in background sync", puppet.JID)