diff --git a/backfillqueue.go b/backfillqueue.go index 8d0d9c5..fb3fe3f 100644 --- a/backfillqueue.go +++ b/backfillqueue.go @@ -41,7 +41,7 @@ func (bq *BackfillQueue) immediateBackfillLoop(user *User) { for { if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); backfill != nil { bq.ImmediateBackfillRequests <- backfill - backfill.Delete() + backfill.MarkDone() } else { select { case <-bq.ReCheckQueue: @@ -61,7 +61,7 @@ func (bq *BackfillQueue) deferredBackfillLoop(user *User) { if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillDeferred); backfill != nil { bq.DeferredBackfillRequests <- backfill - backfill.Delete() + backfill.MarkDone() } else { time.Sleep(10 * time.Second) } diff --git a/database/backfillqueue.go b/database/backfillqueue.go index 7f012e3..b6c82a8 100644 --- a/database/backfillqueue.go +++ b/database/backfillqueue.go @@ -67,6 +67,7 @@ const ( FROM backfill_queue WHERE user_mxid=$1 AND type=$2 + AND completed_at IS NULL ORDER BY priority, queue_id LIMIT 1 ` @@ -106,6 +107,7 @@ type Backfill struct { MaxBatchEvents int MaxTotalEvents int BatchDelay int + CompletedAt *time.Time } func (b *Backfill) Scan(row Scannable) *Backfill { @@ -122,10 +124,10 @@ func (b *Backfill) Scan(row Scannable) *Backfill { func (b *Backfill) Insert() { rows, err := b.db.Query(` INSERT INTO backfill_queue - (user_mxid, type, priority, portal_jid, portal_receiver, time_start, time_end, max_batch_events, max_total_events, batch_delay) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + (user_mxid, type, priority, portal_jid, portal_receiver, time_start, time_end, max_batch_events, max_total_events, batch_delay, completed_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING queue_id - `, b.UserID, b.BackfillType, b.Priority, b.Portal.JID, b.Portal.Receiver, b.TimeStart, b.TimeEnd, b.MaxBatchEvents, b.MaxTotalEvents, b.BatchDelay) + `, b.UserID, b.BackfillType, b.Priority, b.Portal.JID, b.Portal.Receiver, b.TimeStart, b.TimeEnd, b.MaxBatchEvents, b.MaxTotalEvents, b.BatchDelay, b.CompletedAt) defer rows.Close() if err != nil || !rows.Next() { b.log.Warnfln("Failed to insert %v/%s with priority %d: %v", b.BackfillType, b.Portal.JID, b.Priority, err) @@ -137,13 +139,13 @@ func (b *Backfill) Insert() { } } -func (b *Backfill) Delete() { +func (b *Backfill) MarkDone() { if b.QueueID == 0 { b.log.Errorf("Cannot delete backfill without queue_id. Maybe it wasn't actually inserted in the database?") return } - _, err := b.db.Exec("DELETE FROM backfill_queue WHERE queue_id=$1", b.QueueID) + _, err := b.db.Exec("UPDATE backfill_queue SET completed_at=$1 WHERE queue_id=$2", time.Now(), b.QueueID) if err != nil { - b.log.Warnfln("Failed to delete %s/%s: %v", b.BackfillType, b.Priority, err) + b.log.Warnfln("Failed to mark %s/%s as complete: %v", b.BackfillType, b.Priority, err) } } diff --git a/database/upgrades/2022-03-15-prioritized-backfill.go b/database/upgrades/2022-03-15-prioritized-backfill.go index 96fc9cd..813e746 100644 --- a/database/upgrades/2022-03-15-prioritized-backfill.go +++ b/database/upgrades/2022-03-15-prioritized-backfill.go @@ -17,6 +17,7 @@ func init() { max_batch_events INTEGER NOT NULL, max_total_events INTEGER, batch_delay INTEGER, + completed_at TIMESTAMP, FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE, FOREIGN KEY (portal_jid, portal_receiver) REFERENCES portal(jid, receiver) ON DELETE CASCADE