From ac7a437ad868af785c68b04bda095bc082f879af Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Thu, 28 Apr 2022 12:23:34 -0600 Subject: [PATCH 1/3] backfill: add more logging to the queue --- backfillqueue.go | 8 +++++++- historysync.go | 12 ++++++------ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/backfillqueue.go b/backfillqueue.go index 6de3548..ea7a4d3 100644 --- a/backfillqueue.go +++ b/backfillqueue.go @@ -45,7 +45,9 @@ func (bq *BackfillQueue) immediateBackfillLoop(user *User) { } else { select { case <-bq.ReCheckQueue: + bq.log.Debugfln("Re-checking infinite backfill queue due to forced re-check") case <-time.After(10 * time.Second): + bq.log.Debugfln("Re-checking infinite backfill queue due to timeout") } } } @@ -55,6 +57,7 @@ func (bq *BackfillQueue) deferredBackfillLoop(user *User) { for { // Finish all immediate backfills before doing the deferred ones. if immediate := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); immediate != nil { + bq.log.Debugfln("Not doing any deferred or media backfill since there are immediate backfills to do") time.Sleep(10 * time.Second) } else if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillDeferred); backfill != nil { bq.DeferredBackfillRequests <- backfill @@ -63,7 +66,10 @@ func (bq *BackfillQueue) deferredBackfillLoop(user *User) { bq.DeferredBackfillRequests <- mediaBackfill mediaBackfill.MarkDone() } else { - time.Sleep(10 * time.Second) + select { + case <-bq.ReCheckQueue: + case <-time.After(time.Minute): + } } } } diff --git a/historysync.go b/historysync.go index 0f6520e..412dabe 100644 --- a/historysync.go +++ b/historysync.go @@ -78,7 +78,7 @@ func (user *User) handleHistorySyncsLoop() { func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Backfill) { for req := range backfillRequests { - user.log.Debugfln("Handling backfill request %s", req) + user.log.Infofln("Handling backfill request %s", req) conv := user.bridge.DB.HistorySyncQuery.GetConversation(user.MXID, req.Portal) if conv == nil { user.log.Debugfln("Could not find history sync conversation data for %s", req.Portal.String()) @@ -96,7 +96,7 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac endTime = *req.TimeEnd } - user.log.Debugfln("Backfilling media from %v to %v for %s", startTime, endTime, portal.Key.String()) + user.log.Infofln("Backfilling media from %v to %v for %s", startTime, endTime, portal.Key.String()) // Go through all of the messages in the given time range, // requesting any media that errored. @@ -155,7 +155,7 @@ func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill } } - user.log.Debugfln("Backfilling %d messages in %s, %d messages at a time", len(allMsgs), portal.Key.JID, req.MaxBatchEvents) + user.log.Infofln("Backfilling %d messages in %s, %d messages at a time (queue ID: %d)", len(allMsgs), portal.Key.JID, req.MaxBatchEvents, req.QueueID) toBackfill := allMsgs[0:] var insertionEventIds []id.EventID for len(toBackfill) > 0 { @@ -180,10 +180,10 @@ func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill time.Unix(int64(allMsgs[0].GetMessageTimestamp()), 0), insertionEventIds[0]) } - user.log.Debugfln("Deleting %d history sync messages after backfilling", len(allMsgs)) + user.log.Debugfln("Deleting %d history sync messages after backfilling (queue ID: %d)", len(allMsgs), req.QueueID) err := user.bridge.DB.HistorySyncQuery.DeleteMessages(user.MXID, conv.ConversationID, allMsgs) if err != nil { - user.log.Warnfln("Failed to delete %d history sync messages after backfilling: %v", len(allMsgs), err) + user.log.Warnfln("Failed to delete %d history sync messages after backfilling (queue ID: %d): %v", len(allMsgs), req.QueueID, err) } if !conv.MarkedAsUnread && conv.UnreadCount == 0 { @@ -422,7 +422,7 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) newMinTs = lastMessage.Timestamp } - portal.log.Infofln("Processing history sync with %d messages", len(messages)) + portal.log.Debugfln("Processing backfill with %d messages", len(messages)) // The messages are ordered newest to oldest, so iterate them in reverse order. for i := len(messages) - 1; i >= 0; i-- { webMsg := messages[i] From d898aefff1486c1c24f797b0709994b541fc85bc Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Thu, 28 Apr 2022 15:37:51 -0600 Subject: [PATCH 2/3] backfill loops: combine immediate and deferred loops --- backfillqueue.go | 30 +++++------------------------- historysync.go | 2 +- 2 files changed, 6 insertions(+), 26 deletions(-) diff --git a/backfillqueue.go b/backfillqueue.go index ea7a4d3..1804bea 100644 --- a/backfillqueue.go +++ b/backfillqueue.go @@ -32,33 +32,13 @@ type BackfillQueue struct { log log.Logger } -func (bq *BackfillQueue) RunLoops(user *User) { - go bq.immediateBackfillLoop(user) - bq.deferredBackfillLoop(user) -} - -func (bq *BackfillQueue) immediateBackfillLoop(user *User) { +// Immediate backfills should happen first, then deferred backfills and lastly +// media backfills. +func (bq *BackfillQueue) RunLoop(user *User) { for { - if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); backfill != nil { - bq.ImmediateBackfillRequests <- backfill - backfill.MarkDone() - } else { - select { - case <-bq.ReCheckQueue: - bq.log.Debugfln("Re-checking infinite backfill queue due to forced re-check") - case <-time.After(10 * time.Second): - bq.log.Debugfln("Re-checking infinite backfill queue due to timeout") - } - } - } -} - -func (bq *BackfillQueue) deferredBackfillLoop(user *User) { - for { - // Finish all immediate backfills before doing the deferred ones. if immediate := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); immediate != nil { - bq.log.Debugfln("Not doing any deferred or media backfill since there are immediate backfills to do") - time.Sleep(10 * time.Second) + bq.ImmediateBackfillRequests <- immediate + immediate.MarkDone() } else if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillDeferred); backfill != nil { bq.DeferredBackfillRequests <- backfill backfill.MarkDone() diff --git a/historysync.go b/historysync.go index 412dabe..d3f60af 100644 --- a/historysync.go +++ b/historysync.go @@ -67,7 +67,7 @@ func (user *User) handleHistorySyncsLoop() { // overload the homeserver. Users can configure their backfill stages // to be more or less aggressive with backfilling at this stage. go user.handleBackfillRequestsLoop(user.BackfillQueue.DeferredBackfillRequests) - go user.BackfillQueue.RunLoops(user) + go user.BackfillQueue.RunLoop(user) // Always save the history syncs for the user. If they want to enable // backfilling in the future, we will have it in the database. From 76c6d0bf87c490adbc80fbf75ce512c88541035c Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Thu, 28 Apr 2022 15:40:15 -0600 Subject: [PATCH 3/3] backfill: fixed bug where the media backfill loop would sleep too often If the number of requested message was a multiple of the number of batch events, then it would sleep on every single other message (even if not an errored media message). --- historysync.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/historysync.go b/historysync.go index d3f60af..e0e47e7 100644 --- a/historysync.go +++ b/historysync.go @@ -102,10 +102,11 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac // requesting any media that errored. requested := 0 for _, msg := range user.bridge.DB.Message.GetMessagesBetween(portal.Key, startTime, endTime) { - if requested > 0 && requested%req.MaxBatchEvents == 0 { - time.Sleep(time.Duration(req.BatchDelay) * time.Second) - } if msg.Error == database.MsgErrMediaNotFound { + if requested > 0 && requested%req.MaxBatchEvents == 0 { + time.Sleep(time.Duration(req.BatchDelay) * time.Second) + } + portal.requestMediaRetry(user, msg.MXID) requested += 1 }