From 1d70cbff48ea669fe3c4ecf36335592972a2f2b7 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Thu, 12 May 2022 11:54:38 -0600 Subject: [PATCH] backfill queue: add dispatch time, remove time end to table * the dispatch time is going to be what the completed time used to be * the time end column was always nil, so I got rid of it --- commands.go | 2 +- database/backfillqueue.go | 19 +++++++------- .../2022-05-12-backfillqueue-dispatch-time.go | 25 +++++++++++++++++++ ...022-05-12-backfillqueue-remove-time-end.go | 16 ++++++++++++ database/upgrades/upgrades.go | 2 +- historysync.go | 13 +++++----- 6 files changed, 59 insertions(+), 18 deletions(-) create mode 100644 database/upgrades/2022-05-12-backfillqueue-dispatch-time.go create mode 100644 database/upgrades/2022-05-12-backfillqueue-remove-time-end.go diff --git a/commands.go b/commands.go index 255d22e..c7bdd23 100644 --- a/commands.go +++ b/commands.go @@ -873,7 +873,7 @@ func (handler *CommandHandler) CommandBackfill(ce *CommandEvent) { return } } - backfillMessages := ce.Portal.bridge.DB.Backfill.NewWithValues(ce.User.MXID, database.BackfillImmediate, 0, &ce.Portal.Key, nil, nil, batchSize, -1, batchDelay) + backfillMessages := ce.Portal.bridge.DB.Backfill.NewWithValues(ce.User.MXID, database.BackfillImmediate, 0, &ce.Portal.Key, nil, batchSize, -1, batchDelay) backfillMessages.Insert() ce.User.BackfillQueue.ReCheckQueue <- true diff --git a/database/backfillqueue.go b/database/backfillqueue.go index 5947fa2..acb731a 100644 --- a/database/backfillqueue.go +++ b/database/backfillqueue.go @@ -59,7 +59,7 @@ func (bq *BackfillQuery) New() *Backfill { } } -func (bq *BackfillQuery) NewWithValues(userID id.UserID, backfillType BackfillType, priority int, portal *PortalKey, timeStart *time.Time, timeEnd *time.Time, maxBatchEvents, maxTotalEvents, batchDelay int) *Backfill { +func (bq *BackfillQuery) NewWithValues(userID id.UserID, backfillType BackfillType, priority int, portal *PortalKey, timeStart *time.Time, maxBatchEvents, maxTotalEvents, batchDelay int) *Backfill { return &Backfill{ db: bq.db, log: bq.log, @@ -68,7 +68,6 @@ func (bq *BackfillQuery) NewWithValues(userID id.UserID, backfillType BackfillTy Priority: priority, Portal: portal, TimeStart: timeStart, - TimeEnd: timeEnd, MaxBatchEvents: maxBatchEvents, MaxTotalEvents: maxTotalEvents, BatchDelay: batchDelay, @@ -77,10 +76,10 @@ func (bq *BackfillQuery) NewWithValues(userID id.UserID, backfillType BackfillTy const ( getNextBackfillQuery = ` - SELECT queue_id, user_mxid, type, priority, portal_jid, portal_receiver, time_start, time_end, max_batch_events, max_total_events, batch_delay + SELECT queue_id, user_mxid, type, priority, portal_jid, portal_receiver, time_start, max_batch_events, max_total_events, batch_delay FROM backfill_queue WHERE user_mxid=$1 - AND completed_at IS NULL + AND dispatch_time IS NULL ORDER BY type, priority, queue_id LIMIT 1 ` @@ -126,21 +125,21 @@ type Backfill struct { Priority int Portal *PortalKey TimeStart *time.Time - TimeEnd *time.Time MaxBatchEvents int MaxTotalEvents int BatchDelay int + DispatchTime *time.Time CompletedAt *time.Time } func (b *Backfill) String() string { - return fmt.Sprintf("Backfill{QueueID: %d, UserID: %s, BackfillType: %s, Priority: %d, Portal: %s, TimeStart: %s, TimeEnd: %s, MaxBatchEvents: %d, MaxTotalEvents: %d, BatchDelay: %d, CompletedAt: %s}", - b.QueueID, b.UserID, b.BackfillType, b.Priority, b.Portal, b.TimeStart, b.TimeEnd, b.MaxBatchEvents, b.MaxTotalEvents, b.BatchDelay, b.CompletedAt, + return fmt.Sprintf("Backfill{QueueID: %d, UserID: %s, BackfillType: %s, Priority: %d, Portal: %s, TimeStart: %s, MaxBatchEvents: %d, MaxTotalEvents: %d, BatchDelay: %d, DispatchTime: %s, CompletedAt: %s}", + b.QueueID, b.UserID, b.BackfillType, b.Priority, b.Portal, b.TimeStart, b.MaxBatchEvents, b.MaxTotalEvents, b.BatchDelay, b.CompletedAt, b.DispatchTime, ) } func (b *Backfill) Scan(row Scannable) *Backfill { - err := row.Scan(&b.QueueID, &b.UserID, &b.BackfillType, &b.Priority, &b.Portal.JID, &b.Portal.Receiver, &b.TimeStart, &b.TimeEnd, &b.MaxBatchEvents, &b.MaxTotalEvents, &b.BatchDelay) + err := row.Scan(&b.QueueID, &b.UserID, &b.BackfillType, &b.Priority, &b.Portal.JID, &b.Portal.Receiver, &b.TimeStart, &b.MaxBatchEvents, &b.MaxTotalEvents, &b.BatchDelay) if err != nil { if !errors.Is(err, sql.ErrNoRows) { b.log.Errorln("Database scan failed:", err) @@ -153,10 +152,10 @@ func (b *Backfill) Scan(row Scannable) *Backfill { func (b *Backfill) Insert() { rows, err := b.db.Query(` INSERT INTO backfill_queue - (user_mxid, type, priority, portal_jid, portal_receiver, time_start, time_end, max_batch_events, max_total_events, batch_delay, completed_at) + (user_mxid, type, priority, portal_jid, portal_receiver, time_start, max_batch_events, max_total_events, batch_delay, dispatch_time, completed_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING queue_id - `, b.UserID, b.BackfillType, b.Priority, b.Portal.JID, b.Portal.Receiver, b.TimeStart, b.TimeEnd, b.MaxBatchEvents, b.MaxTotalEvents, b.BatchDelay, b.CompletedAt) + `, b.UserID, b.BackfillType, b.Priority, b.Portal.JID, b.Portal.Receiver, b.TimeStart, b.MaxBatchEvents, b.MaxTotalEvents, b.BatchDelay, b.DispatchTime, b.CompletedAt) defer rows.Close() if err != nil || !rows.Next() { b.log.Warnfln("Failed to insert %v/%s with priority %d: %v", b.BackfillType, b.Portal.JID, b.Priority, err) diff --git a/database/upgrades/2022-05-12-backfillqueue-dispatch-time.go b/database/upgrades/2022-05-12-backfillqueue-dispatch-time.go new file mode 100644 index 0000000..cdd9048 --- /dev/null +++ b/database/upgrades/2022-05-12-backfillqueue-dispatch-time.go @@ -0,0 +1,25 @@ +package upgrades + +import ( + "database/sql" +) + +func init() { + upgrades[44] = upgrade{"Add dispatch time to backfill queue", func(tx *sql.Tx, ctx context) error { + // First, add dispatch_time TIMESTAMP column + _, err := tx.Exec(` + ALTER TABLE backfill_queue + ADD COLUMN dispatch_time TIMESTAMP + `) + if err != nil { + return err + } + + // For all previous jobs, set dispatch time to the completed time. + _, err = tx.Exec(` + UPDATE backfill_queue + SET dispatch_time=completed_at + `) + return err + }} +} diff --git a/database/upgrades/2022-05-12-backfillqueue-remove-time-end.go b/database/upgrades/2022-05-12-backfillqueue-remove-time-end.go new file mode 100644 index 0000000..1e39d7a --- /dev/null +++ b/database/upgrades/2022-05-12-backfillqueue-remove-time-end.go @@ -0,0 +1,16 @@ +package upgrades + +import ( + "database/sql" +) + +func init() { + upgrades[45] = upgrade{"Add dispatch time to backfill queue", func(tx *sql.Tx, ctx context) error { + // First, add dispatch_time TIMESTAMP column + _, err := tx.Exec(` + ALTER TABLE backfill_queue + DROP COLUMN time_end + `) + return err + }} +} diff --git a/database/upgrades/upgrades.go b/database/upgrades/upgrades.go index 3cb5ace..eba9df8 100644 --- a/database/upgrades/upgrades.go +++ b/database/upgrades/upgrades.go @@ -40,7 +40,7 @@ type upgrade struct { fn upgradeFunc } -const NumberOfUpgrades = 44 +const NumberOfUpgrades = 47 var upgrades [NumberOfUpgrades]upgrade diff --git a/historysync.go b/historysync.go index c1aee27..09d75a1 100644 --- a/historysync.go +++ b/historysync.go @@ -169,6 +169,7 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor } var forwardPrevID id.EventID + var timeEnd *time.Time if req.BackfillType == database.BackfillForward { // TODO this overrides the TimeStart set when enqueuing the backfill // maybe the enqueue should instead include the prev event ID @@ -178,13 +179,13 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor req.TimeStart = &start } else { firstMessage := portal.bridge.DB.Message.GetFirstInChat(portal.Key) - if firstMessage != nil && (req.TimeEnd == nil || firstMessage.Timestamp.Before(*req.TimeEnd)) { + if firstMessage != nil { end := firstMessage.Timestamp.Add(-1 * time.Second) - req.TimeEnd = &end + timeEnd = &end user.log.Debugfln("Limiting backfill to end at %v", end) } } - allMsgs := user.bridge.DB.HistorySync.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, req.TimeEnd, req.MaxTotalEvents) + allMsgs := user.bridge.DB.HistorySync.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, timeEnd, req.MaxTotalEvents) sendDisappearedNotice := false // If expired messages are on, and a notice has not been sent to this chat @@ -397,7 +398,7 @@ func getConversationTimestamp(conv *waProto.Conversation) uint64 { func (user *User) EnqueueImmedateBackfills(portals []*Portal) { for priority, portal := range portals { maxMessages := user.bridge.Config.Bridge.HistorySync.Immediate.MaxEvents - initialBackfill := user.bridge.DB.Backfill.NewWithValues(user.MXID, database.BackfillImmediate, priority, &portal.Key, nil, nil, maxMessages, maxMessages, 0) + initialBackfill := user.bridge.DB.Backfill.NewWithValues(user.MXID, database.BackfillImmediate, priority, &portal.Key, nil, maxMessages, maxMessages, 0) initialBackfill.Insert() } } @@ -412,7 +413,7 @@ func (user *User) EnqueueDeferredBackfills(portals []*Portal) { startDate = &startDaysAgo } backfillMessages := user.bridge.DB.Backfill.NewWithValues( - user.MXID, database.BackfillDeferred, stageIdx*numPortals+portalIdx, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay) + user.MXID, database.BackfillDeferred, stageIdx*numPortals+portalIdx, &portal.Key, startDate, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay) backfillMessages.Insert() } } @@ -425,7 +426,7 @@ func (user *User) EnqueueForwardBackfills(portals []*Portal) { continue } backfill := user.bridge.DB.Backfill.NewWithValues( - user.MXID, database.BackfillForward, priority, &portal.Key, &lastMsg.Timestamp, nil, -1, -1, 0) + user.MXID, database.BackfillForward, priority, &portal.Key, &lastMsg.Timestamp, -1, -1, 0) backfill.Insert() } }