Use single SQL query for fetching backfill queue items

This commit is contained in:
Tulir Asokan 2022-04-29 10:44:22 +03:00
parent 86840dec59
commit 6b69ea707b
4 changed files with 37 additions and 17 deletions

View file

@ -20,6 +20,7 @@ import (
"time" "time"
log "maunium.net/go/maulogger/v2" log "maunium.net/go/maulogger/v2"
"maunium.net/go/mautrix-whatsapp/database" "maunium.net/go/mautrix-whatsapp/database"
) )
@ -32,19 +33,16 @@ type BackfillQueue struct {
log log.Logger log log.Logger
} }
// Immediate backfills should happen first, then deferred backfills and lastly // RunLoop fetches backfills from the database, prioritizing immediate and forward backfills
// media backfills.
func (bq *BackfillQueue) RunLoop(user *User) { func (bq *BackfillQueue) RunLoop(user *User) {
for { for {
if immediate := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); immediate != nil { if backfill := bq.BackfillQuery.GetNext(user.MXID); backfill != nil {
bq.ImmediateBackfillRequests <- immediate if backfill.BackfillType == database.BackfillImmediate || backfill.BackfillType == database.BackfillForward {
immediate.MarkDone() bq.ImmediateBackfillRequests <- backfill
} else if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillDeferred); backfill != nil { } else {
bq.DeferredBackfillRequests <- backfill bq.DeferredBackfillRequests <- backfill
}
backfill.MarkDone() backfill.MarkDone()
} else if mediaBackfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillMedia); mediaBackfill != nil {
bq.DeferredBackfillRequests <- mediaBackfill
mediaBackfill.MarkDone()
} else { } else {
select { select {
case <-bq.ReCheckQueue: case <-bq.ReCheckQueue:

View file

@ -30,14 +30,17 @@ type BackfillType int
const ( const (
BackfillImmediate BackfillType = 0 BackfillImmediate BackfillType = 0
BackfillDeferred = 1 BackfillForward BackfillType = 100
BackfillMedia = 2 BackfillDeferred BackfillType = 200
BackfillMedia BackfillType = 300
) )
func (bt BackfillType) String() string { func (bt BackfillType) String() string {
switch bt { switch bt {
case BackfillImmediate: case BackfillImmediate:
return "IMMEDIATE" return "IMMEDIATE"
case BackfillForward:
return "FORWARD"
case BackfillDeferred: case BackfillDeferred:
return "DEFERRED" return "DEFERRED"
case BackfillMedia: case BackfillMedia:
@ -80,16 +83,15 @@ const (
SELECT queue_id, user_mxid, type, priority, portal_jid, portal_receiver, time_start, time_end, max_batch_events, max_total_events, batch_delay SELECT queue_id, user_mxid, type, priority, portal_jid, portal_receiver, time_start, time_end, max_batch_events, max_total_events, batch_delay
FROM backfill_queue FROM backfill_queue
WHERE user_mxid=$1 WHERE user_mxid=$1
AND type=$2
AND completed_at IS NULL AND completed_at IS NULL
ORDER BY priority, queue_id ORDER BY type, priority, queue_id
LIMIT 1 LIMIT 1
` `
) )
// GetNext returns the next backfill to perform // GetNext returns the next backfill to perform
func (bq *BackfillQuery) GetNext(userID id.UserID, backfillType BackfillType) (backfill *Backfill) { func (bq *BackfillQuery) GetNext(userID id.UserID) (backfill *Backfill) {
rows, err := bq.db.Query(getNextBackfillQuery, userID, backfillType) rows, err := bq.db.Query(getNextBackfillQuery, userID)
defer rows.Close() defer rows.Close()
if err != nil || rows == nil { if err != nil || rows == nil {
bq.log.Error(err) bq.log.Error(err)

View file

@ -0,0 +1,20 @@
package upgrades
import (
"database/sql"
)
func init() {
upgrades[41] = upgrade{"Update backfill queue tables to be sortable by priority", func(tx *sql.Tx, ctx context) error {
_, err := tx.Exec(`
UPDATE backfill_queue
SET type=CASE
WHEN type=1 THEN 200
WHEN type=2 THEN 300
ELSE type
END
WHERE type=1 OR type=2
`)
return err
}}
}

View file

@ -40,7 +40,7 @@ type upgrade struct {
fn upgradeFunc fn upgradeFunc
} }
const NumberOfUpgrades = 41 const NumberOfUpgrades = 42
var upgrades [NumberOfUpgrades]upgrade var upgrades [NumberOfUpgrades]upgrade