Apply periodic resync for group chats too

This commit is contained in:
Tulir Asokan 2022-06-28 16:22:10 +03:00
parent f0e93bd938
commit 19b4c3093a
8 changed files with 155 additions and 116 deletions

View file

@ -19,6 +19,7 @@ package database
import (
"database/sql"
"fmt"
"time"
log "maunium.net/go/maulogger/v2"
@ -64,7 +65,7 @@ func (pq *PortalQuery) New() *Portal {
}
}
const portalColumns = "jid, receiver, mxid, name, name_set, topic, topic_set, avatar, avatar_url, avatar_set, encrypted, first_event_id, next_batch_id, relay_user_id, expiration_time"
const portalColumns = "jid, receiver, mxid, name, name_set, topic, topic_set, avatar, avatar_url, avatar_set, encrypted, last_sync, first_event_id, next_batch_id, relay_user_id, expiration_time"
func (pq *PortalQuery) GetAll() []*Portal {
return pq.getAll(fmt.Sprintf("SELECT %s FROM portal", portalColumns))
@ -142,6 +143,7 @@ type Portal struct {
AvatarURL id.ContentURI
AvatarSet bool
Encrypted bool
LastSync time.Time
FirstEventID id.EventID
NextBatchID id.BatchID
@ -153,13 +155,17 @@ type Portal struct {
func (portal *Portal) Scan(row dbutil.Scannable) *Portal {
var mxid, avatarURL, firstEventID, nextBatchID, relayUserID sql.NullString
err := row.Scan(&portal.Key.JID, &portal.Key.Receiver, &mxid, &portal.Name, &portal.NameSet, &portal.Topic, &portal.TopicSet, &portal.Avatar, &avatarURL, &portal.AvatarSet, &portal.Encrypted, &firstEventID, &nextBatchID, &relayUserID, &portal.ExpirationTime)
var lastSyncTs int64
err := row.Scan(&portal.Key.JID, &portal.Key.Receiver, &mxid, &portal.Name, &portal.NameSet, &portal.Topic, &portal.TopicSet, &portal.Avatar, &avatarURL, &portal.AvatarSet, &portal.Encrypted, &lastSyncTs, &firstEventID, &nextBatchID, &relayUserID, &portal.ExpirationTime)
if err != nil {
if err != sql.ErrNoRows {
portal.log.Errorln("Database scan failed:", err)
}
return nil
}
if lastSyncTs > 0 {
portal.LastSync = time.Unix(lastSyncTs, 0)
}
portal.MXID = id.RoomID(mxid.String)
portal.AvatarURL, _ = id.ParseContentURI(avatarURL.String)
portal.FirstEventID = id.EventID(firstEventID.String)
@ -182,15 +188,22 @@ func (portal *Portal) relayUserPtr() *id.UserID {
return nil
}
func (portal *Portal) lastSyncTs() int64 {
if portal.LastSync.IsZero() {
return 0
}
return portal.LastSync.Unix()
}
func (portal *Portal) Insert() {
_, err := portal.db.Exec(`
INSERT INTO portal (jid, receiver, mxid, name, name_set, topic, topic_set, avatar, avatar_url, avatar_set,
encrypted, first_event_id, next_batch_id, relay_user_id, expiration_time)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
encrypted, last_sync, first_event_id, next_batch_id, relay_user_id, expiration_time)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
`,
portal.Key.JID, portal.Key.Receiver, portal.mxidPtr(), portal.Name, portal.NameSet, portal.Topic, portal.TopicSet,
portal.Avatar, portal.AvatarURL.String(), portal.AvatarSet, portal.Encrypted, portal.FirstEventID.String(),
portal.NextBatchID.String(), portal.relayUserPtr(), portal.ExpirationTime)
portal.Avatar, portal.AvatarURL.String(), portal.AvatarSet, portal.Encrypted, portal.lastSyncTs(),
portal.FirstEventID.String(), portal.NextBatchID.String(), portal.relayUserPtr(), portal.ExpirationTime)
if err != nil {
portal.log.Warnfln("Failed to insert %s: %v", portal.Key, err)
}
@ -200,13 +213,13 @@ func (portal *Portal) Update(txn *sql.Tx) {
query := `
UPDATE portal
SET mxid=$1, name=$2, name_set=$3, topic=$4, topic_set=$5, avatar=$6, avatar_url=$7, avatar_set=$8,
encrypted=$9, first_event_id=$10, next_batch_id=$11, relay_user_id=$12, expiration_time=$13
WHERE jid=$14 AND receiver=$15
encrypted=$9, last_sync=$10, first_event_id=$11, next_batch_id=$12, relay_user_id=$13, expiration_time=$14
WHERE jid=$15 AND receiver=$16
`
args := []interface{}{
portal.mxidPtr(), portal.Name, portal.Topic, portal.Avatar, portal.AvatarURL.String(), portal.Encrypted,
portal.FirstEventID.String(), portal.NextBatchID.String(), portal.relayUserPtr(), portal.ExpirationTime,
portal.Key.JID, portal.Key.Receiver,
portal.mxidPtr(), portal.Name, portal.NameSet, portal.Topic, portal.TopicSet, portal.Avatar, portal.AvatarURL.String(),
portal.AvatarSet, portal.Encrypted, portal.lastSyncTs(), portal.FirstEventID.String(), portal.NextBatchID.String(),
portal.relayUserPtr(), portal.ExpirationTime, portal.Key.JID, portal.Key.Receiver,
}
var err error
if txn != nil {

View file

@ -1,4 +1,4 @@
-- v0 -> v50: Latest revision
-- v0 -> v51: Latest revision
CREATE TABLE "user" (
mxid TEXT PRIMARY KEY,
@ -27,6 +27,7 @@ CREATE TABLE portal (
avatar_url TEXT,
avatar_set BOOLEAN NOT NULL DEFAULT false,
encrypted BOOLEAN NOT NULL DEFAULT false,
last_sync BIGINT NOT NULL DEFAULT 0,
first_event_id TEXT,
next_batch_id TEXT,

View file

@ -0,0 +1,3 @@
-- v51: Add last sync timestamp for portals too
ALTER TABLE portal ADD COLUMN last_sync BIGINT NOT NULL DEFAULT 0;

2
go.mod
View file

@ -10,7 +10,7 @@ require (
github.com/prometheus/client_golang v1.12.2-0.20220613221938-ebd77f036066
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
github.com/tidwall/gjson v1.14.1
go.mau.fi/whatsmeow v0.0.0-20220628090541-c0ed326fe088
go.mau.fi/whatsmeow v0.0.0-20220628131901-3f187acf2229
golang.org/x/image v0.0.0-20220617043117-41969df76e82
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e
google.golang.org/protobuf v1.28.0

4
go.sum
View file

@ -64,8 +64,8 @@ github.com/yuin/goldmark v1.4.12 h1:6hffw6vALvEDqJ19dOJvJKOoAOKe4NDaTqvd2sktGN0=
github.com/yuin/goldmark v1.4.12/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.mau.fi/libsignal v0.0.0-20220628090436-4d18b66b087e h1:ByHDg+D+dMIGuBA2n+1xOUf4xr3FJFYg8yxl06s1YBE=
go.mau.fi/libsignal v0.0.0-20220628090436-4d18b66b087e/go.mod h1:RCdzkTWSJv0AKGqurzPXJsEGIVMuQps3E/h7CMUPous=
go.mau.fi/whatsmeow v0.0.0-20220628090541-c0ed326fe088 h1:DjPQQ41SjbQpTJ5QvjjN7OatCnb8yr4qKyBAwram6+E=
go.mau.fi/whatsmeow v0.0.0-20220628090541-c0ed326fe088/go.mod h1:hsjqq2xLuoFew8vbsDCJcGf5EbXCRcR/yoQ+87w6m3k=
go.mau.fi/whatsmeow v0.0.0-20220628131901-3f187acf2229 h1:iTJ65cHF4PKn9pTUnbPus6FdFeLXSOvm04FwJwq11hA=
go.mau.fi/whatsmeow v0.0.0-20220628131901-3f187acf2229/go.mod h1:hsjqq2xLuoFew8vbsDCJcGf5EbXCRcR/yoQ+87w6m3k=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/image v0.0.0-20220617043117-41969df76e82 h1:KpZB5pUSBvrHltNEdK/tw0xlPeD13M6M6aGP32gKqiw=

View file

@ -823,6 +823,7 @@ func (portal *Portal) getMessagePuppet(user *User, info *types.MessageInfo) (pup
portal.log.Warnfln("Message %+v doesn't seem to have a valid sender (%s): puppet is nil", *info, info.Sender)
return nil
}
user.EnqueuePortalResync(portal)
puppet.SyncContact(user, true, true, "handling message")
return puppet
}
@ -929,49 +930,71 @@ func (portal *Portal) SyncParticipants(source *User, metadata *types.GroupInfo)
portal.kickExtraUsers(participantMap)
}
func (user *User) updateAvatar(jid types.JID, avatarID *string, avatarURL *id.ContentURI, avatarSet *bool, log log.Logger, intent *appservice.IntentAPI) bool {
currentID := ""
if *avatarSet && *avatarID != "remove" && *avatarID != "unauthorized" {
currentID = *avatarID
}
avatar, err := user.Client.GetProfilePictureInfo(jid, false, currentID)
if errors.Is(err, whatsmeow.ErrProfilePictureUnauthorized) {
if *avatarID == "" {
*avatarID = "unauthorized"
*avatarSet = false
return true
}
return false
} else if errors.Is(err, whatsmeow.ErrProfilePictureNotSet) {
*avatarURL = id.ContentURI{}
avatar = &types.ProfilePictureInfo{ID: "remove"}
// Fall through to the rest of the avatar handling code
} else if err != nil {
log.Warnln("Failed to get avatar URL:", err)
return false
} else if avatar == nil {
// Avatar hasn't changed
return false
}
if avatar.ID == *avatarID && *avatarSet {
return false
} else if len(avatar.URL) == 0 {
log.Warnln("Didn't get URL in response to avatar query")
return false
} else if avatar.ID != *avatarID || avatarURL.IsEmpty() {
url, err := reuploadAvatar(intent, avatar.URL)
if err != nil {
log.Warnln("Failed to reupload avatar:", err)
return false
}
*avatarURL = url
}
*avatarID = avatar.ID
*avatarSet = false
return true
}
func (portal *Portal) UpdateAvatar(user *User, setBy types.JID, updateInfo bool) bool {
portal.avatarLock.Lock()
defer portal.avatarLock.Unlock()
avatar, err := user.Client.GetProfilePictureInfo(portal.Key.JID, false)
if err != nil {
if !errors.Is(err, whatsmeow.ErrProfilePictureUnauthorized) {
portal.log.Warnln("Failed to get avatar URL:", err)
changed := user.updateAvatar(portal.Key.JID, &portal.Avatar, &portal.AvatarURL, &portal.AvatarSet, portal.log, portal.MainIntent())
if !changed || portal.Avatar == "unauthorized" {
if changed || updateInfo {
portal.Update(nil)
}
return false
} else if avatar == nil {
if portal.Avatar == "remove" && portal.AvatarSet {
return false
}
portal.AvatarURL = id.ContentURI{}
avatar = &types.ProfilePictureInfo{ID: "remove"}
} else if avatar.ID == portal.Avatar && portal.AvatarSet {
return false
} else if len(avatar.URL) == 0 {
portal.log.Warnln("Didn't get URL in response to avatar query")
return false
} else {
url, err := reuploadAvatar(portal.MainIntent(), avatar.URL)
if err != nil {
portal.log.Warnln("Failed to reupload avatar:", err)
return false
}
portal.AvatarURL = url
return changed
}
portal.Avatar = avatar.ID
portal.AvatarSet = false
if len(portal.MXID) > 0 {
intent := portal.MainIntent()
if !setBy.IsEmpty() {
intent = portal.bridge.GetPuppetByJID(setBy).IntentFor(portal)
}
_, err = intent.SetRoomAvatar(portal.MXID, portal.AvatarURL)
_, err := intent.SetRoomAvatar(portal.MXID, portal.AvatarURL)
if errors.Is(err, mautrix.MForbidden) && intent != portal.MainIntent() {
_, err = portal.MainIntent().SetRoomAvatar(portal.MXID, portal.AvatarURL)
}
if err != nil {
portal.log.Warnln("Failed to set room avatar:", err)
return false
return true
} else {
portal.AvatarSet = true
}
@ -1111,10 +1134,11 @@ func (portal *Portal) UpdateMatrixRoom(user *User, groupInfo *types.GroupInfo) b
update := false
update = portal.UpdateMetadata(user, groupInfo) || update
if !portal.IsPrivateChat() && !portal.IsBroadcastList() && portal.Avatar == "" {
if !portal.IsPrivateChat() && !portal.IsBroadcastList() {
update = portal.UpdateAvatar(user, types.EmptyJID, false) || update
}
if update {
if update || portal.LastSync.Add(24*time.Hour).Before(time.Now()) {
portal.LastSync = time.Now()
portal.Update(nil)
portal.UpdateBridgeInfo()
}

View file

@ -17,7 +17,6 @@
package main
import (
"errors"
"fmt"
"io"
"net/http"
@ -25,7 +24,6 @@ import (
"sync"
"time"
"go.mau.fi/whatsmeow"
"go.mau.fi/whatsmeow/types"
log "maunium.net/go/maulogger/v2"
@ -238,44 +236,16 @@ func reuploadAvatar(intent *appservice.IntentAPI, url string) (id.ContentURI, er
}
func (puppet *Puppet) UpdateAvatar(source *User) bool {
avatar, err := source.Client.GetProfilePictureInfo(puppet.JID, false)
if err != nil {
if !errors.Is(err, whatsmeow.ErrProfilePictureUnauthorized) {
puppet.log.Warnln("Failed to get avatar URL:", err)
} else if puppet.Avatar == "" {
puppet.Avatar = "unauthorized"
puppet.AvatarSet = false
return true
}
return false
} else if avatar == nil {
if puppet.Avatar == "remove" && puppet.AvatarSet {
return false
}
puppet.AvatarURL = id.ContentURI{}
avatar = &types.ProfilePictureInfo{ID: "remove"}
} else if avatar.ID == puppet.Avatar && puppet.AvatarSet {
return false
} else if len(avatar.URL) == 0 {
puppet.log.Warnln("Didn't get URL in response to avatar query")
return false
} else if avatar.ID != puppet.Avatar || puppet.AvatarURL.IsEmpty() {
url, err := reuploadAvatar(puppet.DefaultIntent(), avatar.URL)
if err != nil {
puppet.log.Warnln("Failed to reupload avatar:", err)
return false
}
puppet.AvatarURL = url
oldAvatarID := puppet.Avatar
changed := source.updateAvatar(puppet.JID, &puppet.Avatar, &puppet.AvatarURL, &puppet.AvatarSet, puppet.log, puppet.DefaultIntent())
if !changed || puppet.Avatar == "unauthorized" {
return changed
}
puppet.Avatar = avatar.ID
puppet.AvatarSet = false
err = puppet.DefaultIntent().SetAvatarURL(puppet.AvatarURL)
err := puppet.DefaultIntent().SetAvatarURL(puppet.AvatarURL)
if err != nil {
puppet.log.Warnln("Failed to set avatar:", err)
} else {
puppet.log.Debugln("Updated avatar", puppet.Avatar, "->", avatar.ID)
puppet.log.Debugln("Updated avatar", oldAvatarID, "->", puppet.Avatar)
puppet.AvatarSet = true
}
go puppet.updatePortalAvatar()

100
user.go
View file

@ -85,10 +85,14 @@ type User struct {
BackfillQueue *BackfillQueue
BridgeState *bridge.BridgeStateQueue
puppetResyncQueue []*Puppet
puppetResyncQueueDedup map[types.JID]struct{}
puppetResyncQueueLock sync.Mutex
nextPuppetResync time.Time
resyncQueue map[types.JID]resyncQueueItem
resyncQueueLock sync.Mutex
nextResync time.Time
}
type resyncQueueItem struct {
portal *Portal
puppet *Puppet
}
func (br *WABridge) getUserByMXID(userID id.UserID, onlyIfExists bool) *User {
@ -223,7 +227,7 @@ func (br *WABridge) NewUser(dbUser *database.User) *User {
historySyncs: make(chan *events.HistorySync, 32),
lastPresence: types.PresenceUnavailable,
puppetResyncQueueDedup: make(map[types.JID]struct{}),
resyncQueue: make(map[types.JID]resyncQueueItem),
}
user.PermissionLevel = user.bridge.Config.Bridge.Permissions.Get(user.MXID)
@ -235,65 +239,89 @@ func (br *WABridge) NewUser(dbUser *database.User) *User {
return user
}
const puppetSyncMinInterval = 7 * 24 * time.Hour
const puppetSyncLoopInterval = 4 * time.Hour
const resyncMinInterval = 7 * 24 * time.Hour
const resyncLoopInterval = 4 * time.Hour
func (user *User) puppetResyncLoop() {
user.nextPuppetResync = time.Now().Add(puppetSyncLoopInterval).Add(-time.Duration(rand.Intn(3600)) * time.Second)
user.nextResync = time.Now().Add(resyncLoopInterval).Add(-time.Duration(rand.Intn(3600)) * time.Second)
for {
time.Sleep(user.nextPuppetResync.Sub(time.Now()))
user.nextPuppetResync = time.Now().Add(puppetSyncLoopInterval)
time.Sleep(user.nextResync.Sub(time.Now()))
user.nextResync = time.Now().Add(resyncLoopInterval)
user.doPuppetResync()
}
}
func (user *User) EnqueuePuppetResync(puppet *Puppet) {
if puppet.LastSync.Add(puppetSyncMinInterval).After(time.Now()) {
if puppet.LastSync.Add(resyncMinInterval).After(time.Now()) {
return
}
user.puppetResyncQueueLock.Lock()
if _, exists := user.puppetResyncQueueDedup[puppet.JID]; !exists {
user.puppetResyncQueueDedup[puppet.JID] = struct{}{}
user.puppetResyncQueue = append(user.puppetResyncQueue, puppet)
user.log.Infofln("Enqueued resync for %s (next sync in %s)", puppet.JID, user.nextPuppetResync.Sub(time.Now()))
user.resyncQueueLock.Lock()
if _, exists := user.resyncQueue[puppet.JID]; !exists {
user.resyncQueue[puppet.JID] = resyncQueueItem{puppet: puppet}
user.log.Debugfln("Enqueued resync for %s (next sync in %s)", puppet.JID, user.nextResync.Sub(time.Now()))
}
user.puppetResyncQueueLock.Unlock()
user.resyncQueueLock.Unlock()
}
func (user *User) EnqueuePortalResync(portal *Portal) {
if portal.IsPrivateChat() || portal.LastSync.Add(resyncMinInterval).After(time.Now()) {
return
}
user.resyncQueueLock.Lock()
if _, exists := user.resyncQueue[portal.Key.JID]; !exists {
user.resyncQueue[portal.Key.JID] = resyncQueueItem{portal: portal}
user.log.Debugfln("Enqueued resync for %s (next sync in %s)", portal.Key.JID, user.nextResync.Sub(time.Now()))
}
user.resyncQueueLock.Unlock()
}
func (user *User) doPuppetResync() {
if !user.IsLoggedIn() {
return
}
user.puppetResyncQueueLock.Lock()
if len(user.puppetResyncQueue) == 0 {
user.puppetResyncQueueLock.Unlock()
user.resyncQueueLock.Lock()
if len(user.resyncQueue) == 0 {
user.resyncQueueLock.Unlock()
return
}
queue := user.puppetResyncQueue
user.puppetResyncQueue = nil
user.puppetResyncQueueDedup = make(map[types.JID]struct{})
user.puppetResyncQueueLock.Unlock()
var jids []types.JID
var filteredPuppets []*Puppet
for _, puppet := range queue {
if puppet.LastSync.Add(puppetSyncMinInterval).After(time.Now()) {
user.log.Debugfln("Not resyncing %s, last sync was %s ago", puppet.JID, time.Now().Sub(puppet.LastSync))
queue := user.resyncQueue
user.resyncQueue = make(map[types.JID]resyncQueueItem)
user.resyncQueueLock.Unlock()
var puppetJIDs []types.JID
var puppets []*Puppet
var portals []*Portal
for jid, item := range queue {
var lastSync time.Time
if item.puppet != nil {
lastSync = item.puppet.LastSync
} else if item.portal != nil {
lastSync = item.portal.LastSync
}
if lastSync.Add(resyncMinInterval).After(time.Now()) {
user.log.Debugfln("Not resyncing %s, last sync was %s ago", jid, time.Now().Sub(lastSync))
continue
}
jids = append(jids, puppet.JID)
filteredPuppets = append(filteredPuppets, puppet)
if item.puppet != nil {
puppets = append(puppets, item.puppet)
puppetJIDs = append(puppetJIDs, jid)
} else if item.portal != nil {
portals = append(portals, item.portal)
}
}
if len(jids) == 0 {
user.log.Debugfln("Skipping background sync, all puppets in queue have been synced in the past 3 days")
for _, portal := range portals {
user.log.Debugfln("Doing background sync for %s", portal.Key.JID)
portal.UpdateMatrixRoom(user, nil)
}
if len(puppetJIDs) == 0 {
return
}
user.log.Debugfln("Doing background sync for %+v", jids)
infos, err := user.Client.GetUserInfo(jids)
user.log.Debugfln("Doing background sync for users: %+v", puppetJIDs)
infos, err := user.Client.GetUserInfo(puppetJIDs)
if err != nil {
user.log.Errorfln("Error getting user info for background sync: %v", err)
return
}
for _, puppet := range filteredPuppets {
for _, puppet := range puppets {
info, ok := infos[puppet.JID]
if !ok {
user.log.Warnfln("Didn't get info for %s in background sync", puppet.JID)