From b850995888f3313ee1bc8a835d1ecc10525cf58e Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Tue, 24 May 2022 09:51:32 -0600 Subject: [PATCH] backfill: block deferred requests on immediate and forward backfills --- backfillqueue.go | 23 +++++++++++++---------- database/backfill.go | 30 ++++++++++++++++++++++++++++++ historysync.go | 8 +++++--- 3 files changed, 48 insertions(+), 13 deletions(-) diff --git a/backfillqueue.go b/backfillqueue.go index a952881..20b58fd 100644 --- a/backfillqueue.go +++ b/backfillqueue.go @@ -40,26 +40,29 @@ func (bq *BackfillQueue) ReCheck() { } } -func (bq *BackfillQueue) GetNextBackfill(userID id.UserID, backfillTypes []database.BackfillType, reCheckChannel chan bool) *database.Backfill { +func (bq *BackfillQueue) GetNextBackfill(userID id.UserID, backfillTypes []database.BackfillType, waitForBackfillTypes []database.BackfillType, reCheckChannel chan bool) *database.Backfill { for { - if backfill := bq.BackfillQuery.GetNext(userID, backfillTypes); backfill != nil { - backfill.MarkDispatched() - return backfill - } else { - select { - case <-reCheckChannel: - case <-time.After(time.Minute): + if !bq.BackfillQuery.HasUnstartedOrInFlightOfType(userID, waitForBackfillTypes) { + // check for immediate when dealing with deferred + if backfill := bq.BackfillQuery.GetNext(userID, backfillTypes); backfill != nil { + backfill.MarkDispatched() + return backfill } } + + select { + case <-reCheckChannel: + case <-time.After(time.Minute): + } } } -func (user *User) HandleBackfillRequestsLoop(backfillTypes []database.BackfillType) { +func (user *User) HandleBackfillRequestsLoop(backfillTypes []database.BackfillType, waitForBackfillTypes []database.BackfillType) { reCheckChannel := make(chan bool) user.BackfillQueue.reCheckChannels = append(user.BackfillQueue.reCheckChannels, reCheckChannel) for { - req := user.BackfillQueue.GetNextBackfill(user.MXID, backfillTypes, reCheckChannel) + req := user.BackfillQueue.GetNextBackfill(user.MXID, backfillTypes, waitForBackfillTypes, reCheckChannel) user.log.Infofln("Handling backfill request %s", req) conv := user.bridge.DB.HistorySync.GetConversation(user.MXID, req.Portal) diff --git a/database/backfill.go b/database/backfill.go index 77a84ed..7c97219 100644 --- a/database/backfill.go +++ b/database/backfill.go @@ -97,6 +97,14 @@ const ( ORDER BY type, priority, queue_id LIMIT 1 ` + getUnstartedOrInFlightQuery = ` + SELECT 1 + FROM backfill_queue + WHERE user_mxid=$1 + AND type IN (%s) + AND (dispatch_time IS NULL OR completed_at IS NULL) + LIMIT 1 + ` ) // GetNext returns the next backfill to perform @@ -120,6 +128,28 @@ func (bq *BackfillQuery) GetNext(userID id.UserID, backfillTypes []BackfillType) return } +func (bq *BackfillQuery) HasUnstartedOrInFlightOfType(userID id.UserID, backfillTypes []BackfillType) bool { + if len(backfillTypes) == 0 { + return false + } + + bq.backfillQueryLock.Lock() + defer bq.backfillQueryLock.Unlock() + + types := []string{} + for _, backfillType := range backfillTypes { + types = append(types, strconv.Itoa(int(backfillType))) + } + rows, err := bq.db.Query(fmt.Sprintf(getUnstartedOrInFlightQuery, strings.Join(types, ",")), userID) + if err != nil || rows == nil { + // No rows means that there are no unstarted or in flight backfill + // requests. + return false + } + defer rows.Close() + return rows.Next() +} + func (bq *BackfillQuery) DeleteAll(userID id.UserID) { bq.backfillQueryLock.Lock() defer bq.backfillQueryLock.Unlock() diff --git a/historysync.go b/historysync.go index 2f952a4..bce9c3f 100644 --- a/historysync.go +++ b/historysync.go @@ -58,15 +58,17 @@ func (user *User) handleHistorySyncsLoop() { log: user.log.Sub("BackfillQueue"), } + forwardAndImmediate := []database.BackfillType{database.BackfillImmediate, database.BackfillForward} + // Immediate backfills can be done in parallel for i := 0; i < user.bridge.Config.Bridge.HistorySync.Immediate.WorkerCount; i++ { - go user.HandleBackfillRequestsLoop([]database.BackfillType{database.BackfillImmediate, database.BackfillForward}) + go user.HandleBackfillRequestsLoop(forwardAndImmediate, []database.BackfillType{}) } // Deferred backfills should be handled synchronously so as not to // overload the homeserver. Users can configure their backfill stages // to be more or less aggressive with backfilling at this stage. - go user.HandleBackfillRequestsLoop([]database.BackfillType{database.BackfillDeferred}) + go user.HandleBackfillRequestsLoop([]database.BackfillType{database.BackfillDeferred}, forwardAndImmediate) if user.bridge.Config.Bridge.HistorySync.MediaRequests.AutoRequestMedia && user.bridge.Config.Bridge.HistorySync.MediaRequests.RequestMethod == config.MediaRequestMethodLocalTime { @@ -733,7 +735,7 @@ func (portal *Portal) updateBackfillStatus(backfillState *database.BackfillState "first_timestamp": backfillState.FirstExpectedTimestamp, }) if err != nil { - portal.log.Errorln("Error sending post-backfill dummy event:", err) + portal.log.Errorln("Error sending backfill status dummy event:", err) } }