diff --git a/database/backfillqueue.go b/database/backfill.go similarity index 70% rename from database/backfillqueue.go rename to database/backfill.go index 6738877..1a0979b 100644 --- a/database/backfillqueue.go +++ b/database/backfill.go @@ -22,6 +22,7 @@ import ( "fmt" "strconv" "strings" + "sync" "time" log "maunium.net/go/maulogger/v2" @@ -51,6 +52,8 @@ func (bt BackfillType) String() string { type BackfillQuery struct { db *Database log log.Logger + + backfillQueryLock sync.Mutex } func (bq *BackfillQuery) New() *Backfill { @@ -96,6 +99,9 @@ const ( // GetNext returns the next backfill to perform func (bq *BackfillQuery) GetNext(userID id.UserID, backfillTypes []BackfillType) (backfill *Backfill) { + bq.backfillQueryLock.Lock() + defer bq.backfillQueryLock.Unlock() + types := []string{} for _, backfillType := range backfillTypes { types = append(types, strconv.Itoa(int(backfillType))) @@ -113,6 +119,8 @@ func (bq *BackfillQuery) GetNext(userID id.UserID, backfillTypes []BackfillType) } func (bq *BackfillQuery) DeleteAll(userID id.UserID) { + bq.backfillQueryLock.Lock() + defer bq.backfillQueryLock.Unlock() _, err := bq.db.Exec("DELETE FROM backfill_queue WHERE user_mxid=$1", userID) if err != nil { bq.log.Warnfln("Failed to delete backfill queue items for %s: %v", userID, err) @@ -120,6 +128,8 @@ func (bq *BackfillQuery) DeleteAll(userID id.UserID) { } func (bq *BackfillQuery) DeleteAllForPortal(userID id.UserID, portalKey PortalKey) { + bq.backfillQueryLock.Lock() + defer bq.backfillQueryLock.Unlock() _, err := bq.db.Exec(` DELETE FROM backfill_queue WHERE user_mxid=$1 @@ -167,6 +177,9 @@ func (b *Backfill) Scan(row Scannable) *Backfill { } func (b *Backfill) Insert() { + b.db.Backfill.backfillQueryLock.Lock() + defer b.db.Backfill.backfillQueryLock.Unlock() + rows, err := b.db.Query(` INSERT INTO backfill_queue (user_mxid, type, priority, portal_jid, portal_receiver, time_start, max_batch_events, max_total_events, batch_delay, dispatch_time, completed_at) @@ -185,6 +198,9 @@ func (b *Backfill) Insert() { } func (b *Backfill) MarkDispatched() { + b.db.Backfill.backfillQueryLock.Lock() + defer b.db.Backfill.backfillQueryLock.Unlock() + if b.QueueID == 0 { b.log.Errorf("Cannot mark backfill as dispatched without queue_id. Maybe it wasn't actually inserted in the database?") return @@ -196,6 +212,9 @@ func (b *Backfill) MarkDispatched() { } func (b *Backfill) MarkDone() { + b.db.Backfill.backfillQueryLock.Lock() + defer b.db.Backfill.backfillQueryLock.Unlock() + if b.QueueID == 0 { b.log.Errorf("Cannot mark backfill done without queue_id. Maybe it wasn't actually inserted in the database?") return @@ -205,3 +224,79 @@ func (b *Backfill) MarkDone() { b.log.Warnfln("Failed to mark %s/%s as complete: %v", b.BackfillType, b.Priority, err) } } + +func (bq *BackfillQuery) NewBackfillState(userID id.UserID, portalKey *PortalKey) *BackfillState { + return &BackfillState{ + db: bq.db, + log: bq.log, + UserID: userID, + Portal: portalKey, + } +} + +const ( + getBackfillState = ` + SELECT user_mxid, portal_jid, portal_receiver, processing_batch, backfill_complete, first_expected_ts + FROM backfill_state + WHERE user_mxid=$1 + AND portal_jid=$2 + AND portal_receiver=$3 + ` +) + +type BackfillState struct { + db *Database + log log.Logger + + // Fields + UserID id.UserID + Portal *PortalKey + ProcessingBatch bool + BackfillComplete bool + FirstExpectedTimestamp uint64 +} + +func (b *BackfillState) Scan(row Scannable) *BackfillState { + err := row.Scan(&b.UserID, &b.Portal.JID, &b.Portal.Receiver, &b.ProcessingBatch, &b.BackfillComplete, &b.FirstExpectedTimestamp) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + b.log.Errorln("Database scan failed:", err) + } + return nil + } + return b +} + +func (b *BackfillState) Upsert() { + _, err := b.db.Exec(` + INSERT INTO backfill_state + (user_mxid, portal_jid, portal_receiver, processing_batch, backfill_complete, first_expected_ts) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (user_mxid, portal_jid, portal_receiver) + DO UPDATE SET + processing_batch=EXCLUDED.processing_batch, + backfill_complete=EXCLUDED.backfill_complete, + first_expected_ts=EXCLUDED.first_expected_ts`, + b.UserID, b.Portal.JID, b.Portal.Receiver, b.ProcessingBatch, b.BackfillComplete, b.FirstExpectedTimestamp) + if err != nil { + b.log.Warnfln("Failed to insert backfill state for %s: %v", b.Portal.JID, err) + } +} + +func (b *BackfillState) SetProcessingBatch(processing bool) { + b.ProcessingBatch = processing + b.Upsert() +} + +func (bq *BackfillQuery) GetBackfillState(userID id.UserID, portalKey *PortalKey) (backfillState *BackfillState) { + rows, err := bq.db.Query(getBackfillState, userID, portalKey.JID, portalKey.Receiver) + if err != nil || rows == nil { + bq.log.Error(err) + return + } + defer rows.Close() + if rows.Next() { + backfillState = bq.NewBackfillState(userID, portalKey).Scan(rows) + } + return +} diff --git a/database/upgrades/2022-05-16-room-backfill-state.go b/database/upgrades/2022-05-16-room-backfill-state.go index f7de55f..976b129 100644 --- a/database/upgrades/2022-05-16-room-backfill-state.go +++ b/database/upgrades/2022-05-16-room-backfill-state.go @@ -13,7 +13,7 @@ func init() { portal_receiver TEXT, processing_batch BOOLEAN, backfill_complete BOOLEAN, - first_expected_ts TIMESTAMP, + first_expected_ts INTEGER, PRIMARY KEY (user_mxid, portal_jid, portal_receiver), FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, diff --git a/historysync.go b/historysync.go index 5c2e6ca..00f0e2a 100644 --- a/historysync.go +++ b/historysync.go @@ -130,6 +130,13 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor portal.backfillLock.Lock() defer portal.backfillLock.Unlock() + backfillState := user.bridge.DB.Backfill.GetBackfillState(user.MXID, &portal.Key) + if backfillState == nil { + backfillState = user.bridge.DB.Backfill.NewBackfillState(user.MXID, &portal.Key) + } + backfillState.SetProcessingBatch(true) + defer backfillState.SetProcessingBatch(false) + if !user.shouldCreatePortalForHistorySync(conv, portal) { return } @@ -241,6 +248,26 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor user.log.Warnfln("Failed to delete %d history sync messages after backfilling (queue ID: %d): %v", len(allMsgs), req.QueueID, err) } + if req.TimeStart == nil { + // If the time start is nil, then there's no more history to backfill. + backfillState.BackfillComplete = true + + if conv.EndOfHistoryTransferType == waProto.Conversation_COMPLETE_BUT_MORE_MESSAGES_REMAIN_ON_PRIMARY { + // Since there are more messages on the phone, but we can't + // backfilll any more of them, indicate that the last timestamp + // that we expect to be backfilled is the oldest one that was just + // backfilled. + backfillState.FirstExpectedTimestamp = allMsgs[len(allMsgs)-1].GetMessageTimestamp() + } else if conv.EndOfHistoryTransferType == waProto.Conversation_COMPLETE_AND_NO_MORE_MESSAGE_REMAIN_ON_PRIMARY { + // Since there are no more messages left on the phone, we've + // backfilled everything. Indicate so by setting the expected + // timestamp to 0 which means that the backfill goes to the + // beginning of time. + backfillState.FirstExpectedTimestamp = 0 + } + backfillState.Upsert() + } + if !conv.MarkedAsUnread && conv.UnreadCount == 0 { user.markSelfReadFull(portal) } else if user.bridge.Config.Bridge.SyncManualMarkedUnread {