backfill state: update during backfill

This commit is contained in:
Sumner Evans 2022-05-17 12:50:01 -06:00
parent 89663c2745
commit 4eac1ea719
No known key found for this signature in database
GPG key ID: 8904527AB50022FD
3 changed files with 123 additions and 1 deletions

View file

@ -22,6 +22,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"time"
log "maunium.net/go/maulogger/v2"
@ -51,6 +52,8 @@ func (bt BackfillType) String() string {
type BackfillQuery struct {
db *Database
log log.Logger
backfillQueryLock sync.Mutex
}
func (bq *BackfillQuery) New() *Backfill {
@ -96,6 +99,9 @@ const (
// GetNext returns the next backfill to perform
func (bq *BackfillQuery) GetNext(userID id.UserID, backfillTypes []BackfillType) (backfill *Backfill) {
bq.backfillQueryLock.Lock()
defer bq.backfillQueryLock.Unlock()
types := []string{}
for _, backfillType := range backfillTypes {
types = append(types, strconv.Itoa(int(backfillType)))
@ -113,6 +119,8 @@ func (bq *BackfillQuery) GetNext(userID id.UserID, backfillTypes []BackfillType)
}
func (bq *BackfillQuery) DeleteAll(userID id.UserID) {
bq.backfillQueryLock.Lock()
defer bq.backfillQueryLock.Unlock()
_, err := bq.db.Exec("DELETE FROM backfill_queue WHERE user_mxid=$1", userID)
if err != nil {
bq.log.Warnfln("Failed to delete backfill queue items for %s: %v", userID, err)
@ -120,6 +128,8 @@ func (bq *BackfillQuery) DeleteAll(userID id.UserID) {
}
func (bq *BackfillQuery) DeleteAllForPortal(userID id.UserID, portalKey PortalKey) {
bq.backfillQueryLock.Lock()
defer bq.backfillQueryLock.Unlock()
_, err := bq.db.Exec(`
DELETE FROM backfill_queue
WHERE user_mxid=$1
@ -167,6 +177,9 @@ func (b *Backfill) Scan(row Scannable) *Backfill {
}
func (b *Backfill) Insert() {
b.db.Backfill.backfillQueryLock.Lock()
defer b.db.Backfill.backfillQueryLock.Unlock()
rows, err := b.db.Query(`
INSERT INTO backfill_queue
(user_mxid, type, priority, portal_jid, portal_receiver, time_start, max_batch_events, max_total_events, batch_delay, dispatch_time, completed_at)
@ -185,6 +198,9 @@ func (b *Backfill) Insert() {
}
func (b *Backfill) MarkDispatched() {
b.db.Backfill.backfillQueryLock.Lock()
defer b.db.Backfill.backfillQueryLock.Unlock()
if b.QueueID == 0 {
b.log.Errorf("Cannot mark backfill as dispatched without queue_id. Maybe it wasn't actually inserted in the database?")
return
@ -196,6 +212,9 @@ func (b *Backfill) MarkDispatched() {
}
func (b *Backfill) MarkDone() {
b.db.Backfill.backfillQueryLock.Lock()
defer b.db.Backfill.backfillQueryLock.Unlock()
if b.QueueID == 0 {
b.log.Errorf("Cannot mark backfill done without queue_id. Maybe it wasn't actually inserted in the database?")
return
@ -205,3 +224,79 @@ func (b *Backfill) MarkDone() {
b.log.Warnfln("Failed to mark %s/%s as complete: %v", b.BackfillType, b.Priority, err)
}
}
func (bq *BackfillQuery) NewBackfillState(userID id.UserID, portalKey *PortalKey) *BackfillState {
return &BackfillState{
db: bq.db,
log: bq.log,
UserID: userID,
Portal: portalKey,
}
}
const (
getBackfillState = `
SELECT user_mxid, portal_jid, portal_receiver, processing_batch, backfill_complete, first_expected_ts
FROM backfill_state
WHERE user_mxid=$1
AND portal_jid=$2
AND portal_receiver=$3
`
)
type BackfillState struct {
db *Database
log log.Logger
// Fields
UserID id.UserID
Portal *PortalKey
ProcessingBatch bool
BackfillComplete bool
FirstExpectedTimestamp uint64
}
func (b *BackfillState) Scan(row Scannable) *BackfillState {
err := row.Scan(&b.UserID, &b.Portal.JID, &b.Portal.Receiver, &b.ProcessingBatch, &b.BackfillComplete, &b.FirstExpectedTimestamp)
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
b.log.Errorln("Database scan failed:", err)
}
return nil
}
return b
}
func (b *BackfillState) Upsert() {
_, err := b.db.Exec(`
INSERT INTO backfill_state
(user_mxid, portal_jid, portal_receiver, processing_batch, backfill_complete, first_expected_ts)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (user_mxid, portal_jid, portal_receiver)
DO UPDATE SET
processing_batch=EXCLUDED.processing_batch,
backfill_complete=EXCLUDED.backfill_complete,
first_expected_ts=EXCLUDED.first_expected_ts`,
b.UserID, b.Portal.JID, b.Portal.Receiver, b.ProcessingBatch, b.BackfillComplete, b.FirstExpectedTimestamp)
if err != nil {
b.log.Warnfln("Failed to insert backfill state for %s: %v", b.Portal.JID, err)
}
}
func (b *BackfillState) SetProcessingBatch(processing bool) {
b.ProcessingBatch = processing
b.Upsert()
}
func (bq *BackfillQuery) GetBackfillState(userID id.UserID, portalKey *PortalKey) (backfillState *BackfillState) {
rows, err := bq.db.Query(getBackfillState, userID, portalKey.JID, portalKey.Receiver)
if err != nil || rows == nil {
bq.log.Error(err)
return
}
defer rows.Close()
if rows.Next() {
backfillState = bq.NewBackfillState(userID, portalKey).Scan(rows)
}
return
}

View file

@ -13,7 +13,7 @@ func init() {
portal_receiver TEXT,
processing_batch BOOLEAN,
backfill_complete BOOLEAN,
first_expected_ts TIMESTAMP,
first_expected_ts INTEGER,
PRIMARY KEY (user_mxid, portal_jid, portal_receiver),
FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE,

View file

@ -130,6 +130,13 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor
portal.backfillLock.Lock()
defer portal.backfillLock.Unlock()
backfillState := user.bridge.DB.Backfill.GetBackfillState(user.MXID, &portal.Key)
if backfillState == nil {
backfillState = user.bridge.DB.Backfill.NewBackfillState(user.MXID, &portal.Key)
}
backfillState.SetProcessingBatch(true)
defer backfillState.SetProcessingBatch(false)
if !user.shouldCreatePortalForHistorySync(conv, portal) {
return
}
@ -241,6 +248,26 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor
user.log.Warnfln("Failed to delete %d history sync messages after backfilling (queue ID: %d): %v", len(allMsgs), req.QueueID, err)
}
if req.TimeStart == nil {
// If the time start is nil, then there's no more history to backfill.
backfillState.BackfillComplete = true
if conv.EndOfHistoryTransferType == waProto.Conversation_COMPLETE_BUT_MORE_MESSAGES_REMAIN_ON_PRIMARY {
// Since there are more messages on the phone, but we can't
// backfilll any more of them, indicate that the last timestamp
// that we expect to be backfilled is the oldest one that was just
// backfilled.
backfillState.FirstExpectedTimestamp = allMsgs[len(allMsgs)-1].GetMessageTimestamp()
} else if conv.EndOfHistoryTransferType == waProto.Conversation_COMPLETE_AND_NO_MORE_MESSAGE_REMAIN_ON_PRIMARY {
// Since there are no more messages left on the phone, we've
// backfilled everything. Indicate so by setting the expected
// timestamp to 0 which means that the backfill goes to the
// beginning of time.
backfillState.FirstExpectedTimestamp = 0
}
backfillState.Upsert()
}
if !conv.MarkedAsUnread && conv.UnreadCount == 0 {
user.markSelfReadFull(portal)
} else if user.bridge.Config.Bridge.SyncManualMarkedUnread {