From a0ffed43ddabc4f97f1d04e652528556949ba6ef Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Tue, 24 May 2022 09:50:38 -0600 Subject: [PATCH 1/4] database/backfill_state: change type of first_expected_ts to BIGINT --- database/upgrades/49-backfill-state-timestamp-bigint.sql | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 database/upgrades/49-backfill-state-timestamp-bigint.sql diff --git a/database/upgrades/49-backfill-state-timestamp-bigint.sql b/database/upgrades/49-backfill-state-timestamp-bigint.sql new file mode 100644 index 0000000..971fce6 --- /dev/null +++ b/database/upgrades/49-backfill-state-timestamp-bigint.sql @@ -0,0 +1,7 @@ +-- v49: Convert first_expected_ts to BIGINT so that we can use zero-values. + +-- only: sqlite +UPDATE backfill_state SET first_expected_ts=unixepoch(first_expected_ts); + +-- only: postgres +ALTER TABLE backfill_state ALTER COLUMN first_expected_ts TYPE BIGINT; From b850995888f3313ee1bc8a835d1ecc10525cf58e Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Tue, 24 May 2022 09:51:32 -0600 Subject: [PATCH 2/4] backfill: block deferred requests on immediate and forward backfills --- backfillqueue.go | 23 +++++++++++++---------- database/backfill.go | 30 ++++++++++++++++++++++++++++++ historysync.go | 8 +++++--- 3 files changed, 48 insertions(+), 13 deletions(-) diff --git a/backfillqueue.go b/backfillqueue.go index a952881..20b58fd 100644 --- a/backfillqueue.go +++ b/backfillqueue.go @@ -40,26 +40,29 @@ func (bq *BackfillQueue) ReCheck() { } } -func (bq *BackfillQueue) GetNextBackfill(userID id.UserID, backfillTypes []database.BackfillType, reCheckChannel chan bool) *database.Backfill { +func (bq *BackfillQueue) GetNextBackfill(userID id.UserID, backfillTypes []database.BackfillType, waitForBackfillTypes []database.BackfillType, reCheckChannel chan bool) *database.Backfill { for { - if backfill := bq.BackfillQuery.GetNext(userID, backfillTypes); backfill != nil { - backfill.MarkDispatched() - return backfill - } else { - select { - case <-reCheckChannel: - case <-time.After(time.Minute): + if !bq.BackfillQuery.HasUnstartedOrInFlightOfType(userID, waitForBackfillTypes) { + // check for immediate when dealing with deferred + if backfill := bq.BackfillQuery.GetNext(userID, backfillTypes); backfill != nil { + backfill.MarkDispatched() + return backfill } } + + select { + case <-reCheckChannel: + case <-time.After(time.Minute): + } } } -func (user *User) HandleBackfillRequestsLoop(backfillTypes []database.BackfillType) { +func (user *User) HandleBackfillRequestsLoop(backfillTypes []database.BackfillType, waitForBackfillTypes []database.BackfillType) { reCheckChannel := make(chan bool) user.BackfillQueue.reCheckChannels = append(user.BackfillQueue.reCheckChannels, reCheckChannel) for { - req := user.BackfillQueue.GetNextBackfill(user.MXID, backfillTypes, reCheckChannel) + req := user.BackfillQueue.GetNextBackfill(user.MXID, backfillTypes, waitForBackfillTypes, reCheckChannel) user.log.Infofln("Handling backfill request %s", req) conv := user.bridge.DB.HistorySync.GetConversation(user.MXID, req.Portal) diff --git a/database/backfill.go b/database/backfill.go index 77a84ed..7c97219 100644 --- a/database/backfill.go +++ b/database/backfill.go @@ -97,6 +97,14 @@ const ( ORDER BY type, priority, queue_id LIMIT 1 ` + getUnstartedOrInFlightQuery = ` + SELECT 1 + FROM backfill_queue + WHERE user_mxid=$1 + AND type IN (%s) + AND (dispatch_time IS NULL OR completed_at IS NULL) + LIMIT 1 + ` ) // GetNext returns the next backfill to perform @@ -120,6 +128,28 @@ func (bq *BackfillQuery) GetNext(userID id.UserID, backfillTypes []BackfillType) return } +func (bq *BackfillQuery) HasUnstartedOrInFlightOfType(userID id.UserID, backfillTypes []BackfillType) bool { + if len(backfillTypes) == 0 { + return false + } + + bq.backfillQueryLock.Lock() + defer bq.backfillQueryLock.Unlock() + + types := []string{} + for _, backfillType := range backfillTypes { + types = append(types, strconv.Itoa(int(backfillType))) + } + rows, err := bq.db.Query(fmt.Sprintf(getUnstartedOrInFlightQuery, strings.Join(types, ",")), userID) + if err != nil || rows == nil { + // No rows means that there are no unstarted or in flight backfill + // requests. + return false + } + defer rows.Close() + return rows.Next() +} + func (bq *BackfillQuery) DeleteAll(userID id.UserID) { bq.backfillQueryLock.Lock() defer bq.backfillQueryLock.Unlock() diff --git a/historysync.go b/historysync.go index 2f952a4..bce9c3f 100644 --- a/historysync.go +++ b/historysync.go @@ -58,15 +58,17 @@ func (user *User) handleHistorySyncsLoop() { log: user.log.Sub("BackfillQueue"), } + forwardAndImmediate := []database.BackfillType{database.BackfillImmediate, database.BackfillForward} + // Immediate backfills can be done in parallel for i := 0; i < user.bridge.Config.Bridge.HistorySync.Immediate.WorkerCount; i++ { - go user.HandleBackfillRequestsLoop([]database.BackfillType{database.BackfillImmediate, database.BackfillForward}) + go user.HandleBackfillRequestsLoop(forwardAndImmediate, []database.BackfillType{}) } // Deferred backfills should be handled synchronously so as not to // overload the homeserver. Users can configure their backfill stages // to be more or less aggressive with backfilling at this stage. - go user.HandleBackfillRequestsLoop([]database.BackfillType{database.BackfillDeferred}) + go user.HandleBackfillRequestsLoop([]database.BackfillType{database.BackfillDeferred}, forwardAndImmediate) if user.bridge.Config.Bridge.HistorySync.MediaRequests.AutoRequestMedia && user.bridge.Config.Bridge.HistorySync.MediaRequests.RequestMethod == config.MediaRequestMethodLocalTime { @@ -733,7 +735,7 @@ func (portal *Portal) updateBackfillStatus(backfillState *database.BackfillState "first_timestamp": backfillState.FirstExpectedTimestamp, }) if err != nil { - portal.log.Errorln("Error sending post-backfill dummy event:", err) + portal.log.Errorln("Error sending backfill status dummy event:", err) } } From 03d46630c26cf5ef973295aece14f886be646954 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Tue, 24 May 2022 15:13:31 -0600 Subject: [PATCH 3/4] backfill status event: send after room exists --- historysync.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/historysync.go b/historysync.go index bce9c3f..eddfb69 100644 --- a/historysync.go +++ b/historysync.go @@ -132,17 +132,16 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor portal.backfillLock.Lock() defer portal.backfillLock.Unlock() + if !user.shouldCreatePortalForHistorySync(conv, portal) { + return + } + backfillState := user.bridge.DB.Backfill.GetBackfillState(user.MXID, &portal.Key) if backfillState == nil { backfillState = user.bridge.DB.Backfill.NewBackfillState(user.MXID, &portal.Key) } backfillState.SetProcessingBatch(true) defer backfillState.SetProcessingBatch(false) - portal.updateBackfillStatus(backfillState) - - if !user.shouldCreatePortalForHistorySync(conv, portal) { - return - } var forwardPrevID id.EventID var timeEnd *time.Time @@ -194,6 +193,9 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor } } + // Update the backfill status here after the room has been created. + portal.updateBackfillStatus(backfillState) + if sendDisappearedNotice { user.log.Debugfln("Sending notice to %s that there are disappeared messages ending at %v", portal.Key.JID, conv.LastMessageTimestamp) resp, err := portal.sendMessage(portal.MainIntent(), event.EventMessage, &event.MessageEventContent{ @@ -735,7 +737,7 @@ func (portal *Portal) updateBackfillStatus(backfillState *database.BackfillState "first_timestamp": backfillState.FirstExpectedTimestamp, }) if err != nil { - portal.log.Errorln("Error sending backfill status dummy event:", err) + portal.log.Errorln("Error sending backfill status event:", err) } } From 0939bb3bc35bcad27aefe94e6a45ec5981f0f861 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Wed, 25 May 2022 08:39:08 -0600 Subject: [PATCH 4/4] backfill status event: send first expected timestamp in ms after epoch instead of seconds --- historysync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/historysync.go b/historysync.go index eddfb69..d232413 100644 --- a/historysync.go +++ b/historysync.go @@ -734,7 +734,7 @@ func (portal *Portal) updateBackfillStatus(backfillState *database.BackfillState _, err := portal.MainIntent().SendStateEvent(portal.MXID, BackfillStatusEvent, "", map[string]interface{}{ "status": backfillStatus, - "first_timestamp": backfillState.FirstExpectedTimestamp, + "first_timestamp": backfillState.FirstExpectedTimestamp * 1000, }) if err != nil { portal.log.Errorln("Error sending backfill status event:", err)