forked from MirrorHub/mautrix-whatsapp
backfill queue: add dispatch time, remove time end to table
* the dispatch time is going to be what the completed time used to be * the time end column was always nil, so I got rid of it
This commit is contained in:
parent
78fe69a578
commit
1d70cbff48
6 changed files with 59 additions and 18 deletions
|
@ -873,7 +873,7 @@ func (handler *CommandHandler) CommandBackfill(ce *CommandEvent) {
|
|||
return
|
||||
}
|
||||
}
|
||||
backfillMessages := ce.Portal.bridge.DB.Backfill.NewWithValues(ce.User.MXID, database.BackfillImmediate, 0, &ce.Portal.Key, nil, nil, batchSize, -1, batchDelay)
|
||||
backfillMessages := ce.Portal.bridge.DB.Backfill.NewWithValues(ce.User.MXID, database.BackfillImmediate, 0, &ce.Portal.Key, nil, batchSize, -1, batchDelay)
|
||||
backfillMessages.Insert()
|
||||
|
||||
ce.User.BackfillQueue.ReCheckQueue <- true
|
||||
|
|
|
@ -59,7 +59,7 @@ func (bq *BackfillQuery) New() *Backfill {
|
|||
}
|
||||
}
|
||||
|
||||
func (bq *BackfillQuery) NewWithValues(userID id.UserID, backfillType BackfillType, priority int, portal *PortalKey, timeStart *time.Time, timeEnd *time.Time, maxBatchEvents, maxTotalEvents, batchDelay int) *Backfill {
|
||||
func (bq *BackfillQuery) NewWithValues(userID id.UserID, backfillType BackfillType, priority int, portal *PortalKey, timeStart *time.Time, maxBatchEvents, maxTotalEvents, batchDelay int) *Backfill {
|
||||
return &Backfill{
|
||||
db: bq.db,
|
||||
log: bq.log,
|
||||
|
@ -68,7 +68,6 @@ func (bq *BackfillQuery) NewWithValues(userID id.UserID, backfillType BackfillTy
|
|||
Priority: priority,
|
||||
Portal: portal,
|
||||
TimeStart: timeStart,
|
||||
TimeEnd: timeEnd,
|
||||
MaxBatchEvents: maxBatchEvents,
|
||||
MaxTotalEvents: maxTotalEvents,
|
||||
BatchDelay: batchDelay,
|
||||
|
@ -77,10 +76,10 @@ func (bq *BackfillQuery) NewWithValues(userID id.UserID, backfillType BackfillTy
|
|||
|
||||
const (
|
||||
getNextBackfillQuery = `
|
||||
SELECT queue_id, user_mxid, type, priority, portal_jid, portal_receiver, time_start, time_end, max_batch_events, max_total_events, batch_delay
|
||||
SELECT queue_id, user_mxid, type, priority, portal_jid, portal_receiver, time_start, max_batch_events, max_total_events, batch_delay
|
||||
FROM backfill_queue
|
||||
WHERE user_mxid=$1
|
||||
AND completed_at IS NULL
|
||||
AND dispatch_time IS NULL
|
||||
ORDER BY type, priority, queue_id
|
||||
LIMIT 1
|
||||
`
|
||||
|
@ -126,21 +125,21 @@ type Backfill struct {
|
|||
Priority int
|
||||
Portal *PortalKey
|
||||
TimeStart *time.Time
|
||||
TimeEnd *time.Time
|
||||
MaxBatchEvents int
|
||||
MaxTotalEvents int
|
||||
BatchDelay int
|
||||
DispatchTime *time.Time
|
||||
CompletedAt *time.Time
|
||||
}
|
||||
|
||||
func (b *Backfill) String() string {
|
||||
return fmt.Sprintf("Backfill{QueueID: %d, UserID: %s, BackfillType: %s, Priority: %d, Portal: %s, TimeStart: %s, TimeEnd: %s, MaxBatchEvents: %d, MaxTotalEvents: %d, BatchDelay: %d, CompletedAt: %s}",
|
||||
b.QueueID, b.UserID, b.BackfillType, b.Priority, b.Portal, b.TimeStart, b.TimeEnd, b.MaxBatchEvents, b.MaxTotalEvents, b.BatchDelay, b.CompletedAt,
|
||||
return fmt.Sprintf("Backfill{QueueID: %d, UserID: %s, BackfillType: %s, Priority: %d, Portal: %s, TimeStart: %s, MaxBatchEvents: %d, MaxTotalEvents: %d, BatchDelay: %d, DispatchTime: %s, CompletedAt: %s}",
|
||||
b.QueueID, b.UserID, b.BackfillType, b.Priority, b.Portal, b.TimeStart, b.MaxBatchEvents, b.MaxTotalEvents, b.BatchDelay, b.CompletedAt, b.DispatchTime,
|
||||
)
|
||||
}
|
||||
|
||||
func (b *Backfill) Scan(row Scannable) *Backfill {
|
||||
err := row.Scan(&b.QueueID, &b.UserID, &b.BackfillType, &b.Priority, &b.Portal.JID, &b.Portal.Receiver, &b.TimeStart, &b.TimeEnd, &b.MaxBatchEvents, &b.MaxTotalEvents, &b.BatchDelay)
|
||||
err := row.Scan(&b.QueueID, &b.UserID, &b.BackfillType, &b.Priority, &b.Portal.JID, &b.Portal.Receiver, &b.TimeStart, &b.MaxBatchEvents, &b.MaxTotalEvents, &b.BatchDelay)
|
||||
if err != nil {
|
||||
if !errors.Is(err, sql.ErrNoRows) {
|
||||
b.log.Errorln("Database scan failed:", err)
|
||||
|
@ -153,10 +152,10 @@ func (b *Backfill) Scan(row Scannable) *Backfill {
|
|||
func (b *Backfill) Insert() {
|
||||
rows, err := b.db.Query(`
|
||||
INSERT INTO backfill_queue
|
||||
(user_mxid, type, priority, portal_jid, portal_receiver, time_start, time_end, max_batch_events, max_total_events, batch_delay, completed_at)
|
||||
(user_mxid, type, priority, portal_jid, portal_receiver, time_start, max_batch_events, max_total_events, batch_delay, dispatch_time, completed_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
|
||||
RETURNING queue_id
|
||||
`, b.UserID, b.BackfillType, b.Priority, b.Portal.JID, b.Portal.Receiver, b.TimeStart, b.TimeEnd, b.MaxBatchEvents, b.MaxTotalEvents, b.BatchDelay, b.CompletedAt)
|
||||
`, b.UserID, b.BackfillType, b.Priority, b.Portal.JID, b.Portal.Receiver, b.TimeStart, b.MaxBatchEvents, b.MaxTotalEvents, b.BatchDelay, b.DispatchTime, b.CompletedAt)
|
||||
defer rows.Close()
|
||||
if err != nil || !rows.Next() {
|
||||
b.log.Warnfln("Failed to insert %v/%s with priority %d: %v", b.BackfillType, b.Portal.JID, b.Priority, err)
|
||||
|
|
25
database/upgrades/2022-05-12-backfillqueue-dispatch-time.go
Normal file
25
database/upgrades/2022-05-12-backfillqueue-dispatch-time.go
Normal file
|
@ -0,0 +1,25 @@
|
|||
package upgrades
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
func init() {
|
||||
upgrades[44] = upgrade{"Add dispatch time to backfill queue", func(tx *sql.Tx, ctx context) error {
|
||||
// First, add dispatch_time TIMESTAMP column
|
||||
_, err := tx.Exec(`
|
||||
ALTER TABLE backfill_queue
|
||||
ADD COLUMN dispatch_time TIMESTAMP
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// For all previous jobs, set dispatch time to the completed time.
|
||||
_, err = tx.Exec(`
|
||||
UPDATE backfill_queue
|
||||
SET dispatch_time=completed_at
|
||||
`)
|
||||
return err
|
||||
}}
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
package upgrades
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
func init() {
|
||||
upgrades[45] = upgrade{"Add dispatch time to backfill queue", func(tx *sql.Tx, ctx context) error {
|
||||
// First, add dispatch_time TIMESTAMP column
|
||||
_, err := tx.Exec(`
|
||||
ALTER TABLE backfill_queue
|
||||
DROP COLUMN time_end
|
||||
`)
|
||||
return err
|
||||
}}
|
||||
}
|
|
@ -40,7 +40,7 @@ type upgrade struct {
|
|||
fn upgradeFunc
|
||||
}
|
||||
|
||||
const NumberOfUpgrades = 44
|
||||
const NumberOfUpgrades = 47
|
||||
|
||||
var upgrades [NumberOfUpgrades]upgrade
|
||||
|
||||
|
|
|
@ -169,6 +169,7 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor
|
|||
}
|
||||
|
||||
var forwardPrevID id.EventID
|
||||
var timeEnd *time.Time
|
||||
if req.BackfillType == database.BackfillForward {
|
||||
// TODO this overrides the TimeStart set when enqueuing the backfill
|
||||
// maybe the enqueue should instead include the prev event ID
|
||||
|
@ -178,13 +179,13 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor
|
|||
req.TimeStart = &start
|
||||
} else {
|
||||
firstMessage := portal.bridge.DB.Message.GetFirstInChat(portal.Key)
|
||||
if firstMessage != nil && (req.TimeEnd == nil || firstMessage.Timestamp.Before(*req.TimeEnd)) {
|
||||
if firstMessage != nil {
|
||||
end := firstMessage.Timestamp.Add(-1 * time.Second)
|
||||
req.TimeEnd = &end
|
||||
timeEnd = &end
|
||||
user.log.Debugfln("Limiting backfill to end at %v", end)
|
||||
}
|
||||
}
|
||||
allMsgs := user.bridge.DB.HistorySync.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, req.TimeEnd, req.MaxTotalEvents)
|
||||
allMsgs := user.bridge.DB.HistorySync.GetMessagesBetween(user.MXID, conv.ConversationID, req.TimeStart, timeEnd, req.MaxTotalEvents)
|
||||
|
||||
sendDisappearedNotice := false
|
||||
// If expired messages are on, and a notice has not been sent to this chat
|
||||
|
@ -397,7 +398,7 @@ func getConversationTimestamp(conv *waProto.Conversation) uint64 {
|
|||
func (user *User) EnqueueImmedateBackfills(portals []*Portal) {
|
||||
for priority, portal := range portals {
|
||||
maxMessages := user.bridge.Config.Bridge.HistorySync.Immediate.MaxEvents
|
||||
initialBackfill := user.bridge.DB.Backfill.NewWithValues(user.MXID, database.BackfillImmediate, priority, &portal.Key, nil, nil, maxMessages, maxMessages, 0)
|
||||
initialBackfill := user.bridge.DB.Backfill.NewWithValues(user.MXID, database.BackfillImmediate, priority, &portal.Key, nil, maxMessages, maxMessages, 0)
|
||||
initialBackfill.Insert()
|
||||
}
|
||||
}
|
||||
|
@ -412,7 +413,7 @@ func (user *User) EnqueueDeferredBackfills(portals []*Portal) {
|
|||
startDate = &startDaysAgo
|
||||
}
|
||||
backfillMessages := user.bridge.DB.Backfill.NewWithValues(
|
||||
user.MXID, database.BackfillDeferred, stageIdx*numPortals+portalIdx, &portal.Key, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay)
|
||||
user.MXID, database.BackfillDeferred, stageIdx*numPortals+portalIdx, &portal.Key, startDate, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay)
|
||||
backfillMessages.Insert()
|
||||
}
|
||||
}
|
||||
|
@ -425,7 +426,7 @@ func (user *User) EnqueueForwardBackfills(portals []*Portal) {
|
|||
continue
|
||||
}
|
||||
backfill := user.bridge.DB.Backfill.NewWithValues(
|
||||
user.MXID, database.BackfillForward, priority, &portal.Key, &lastMsg.Timestamp, nil, -1, -1, 0)
|
||||
user.MXID, database.BackfillForward, priority, &portal.Key, &lastMsg.Timestamp, -1, -1, 0)
|
||||
backfill.Insert()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue