From ac7a437ad868af785c68b04bda095bc082f879af Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Thu, 28 Apr 2022 12:23:34 -0600 Subject: [PATCH] backfill: add more logging to the queue --- backfillqueue.go | 8 +++++++- historysync.go | 12 ++++++------ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/backfillqueue.go b/backfillqueue.go index 6de3548..ea7a4d3 100644 --- a/backfillqueue.go +++ b/backfillqueue.go @@ -45,7 +45,9 @@ func (bq *BackfillQueue) immediateBackfillLoop(user *User) { } else { select { case <-bq.ReCheckQueue: + bq.log.Debugfln("Re-checking infinite backfill queue due to forced re-check") case <-time.After(10 * time.Second): + bq.log.Debugfln("Re-checking infinite backfill queue due to timeout") } } } @@ -55,6 +57,7 @@ func (bq *BackfillQueue) deferredBackfillLoop(user *User) { for { // Finish all immediate backfills before doing the deferred ones. if immediate := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); immediate != nil { + bq.log.Debugfln("Not doing any deferred or media backfill since there are immediate backfills to do") time.Sleep(10 * time.Second) } else if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillDeferred); backfill != nil { bq.DeferredBackfillRequests <- backfill @@ -63,7 +66,10 @@ func (bq *BackfillQueue) deferredBackfillLoop(user *User) { bq.DeferredBackfillRequests <- mediaBackfill mediaBackfill.MarkDone() } else { - time.Sleep(10 * time.Second) + select { + case <-bq.ReCheckQueue: + case <-time.After(time.Minute): + } } } } diff --git a/historysync.go b/historysync.go index 0f6520e..412dabe 100644 --- a/historysync.go +++ b/historysync.go @@ -78,7 +78,7 @@ func (user *User) handleHistorySyncsLoop() { func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Backfill) { for req := range backfillRequests { - user.log.Debugfln("Handling backfill request %s", req) + user.log.Infofln("Handling backfill request %s", req) conv := user.bridge.DB.HistorySyncQuery.GetConversation(user.MXID, req.Portal) if conv == nil { user.log.Debugfln("Could not find history sync conversation data for %s", req.Portal.String()) @@ -96,7 +96,7 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac endTime = *req.TimeEnd } - user.log.Debugfln("Backfilling media from %v to %v for %s", startTime, endTime, portal.Key.String()) + user.log.Infofln("Backfilling media from %v to %v for %s", startTime, endTime, portal.Key.String()) // Go through all of the messages in the given time range, // requesting any media that errored. @@ -155,7 +155,7 @@ func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill } } - user.log.Debugfln("Backfilling %d messages in %s, %d messages at a time", len(allMsgs), portal.Key.JID, req.MaxBatchEvents) + user.log.Infofln("Backfilling %d messages in %s, %d messages at a time (queue ID: %d)", len(allMsgs), portal.Key.JID, req.MaxBatchEvents, req.QueueID) toBackfill := allMsgs[0:] var insertionEventIds []id.EventID for len(toBackfill) > 0 { @@ -180,10 +180,10 @@ func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill time.Unix(int64(allMsgs[0].GetMessageTimestamp()), 0), insertionEventIds[0]) } - user.log.Debugfln("Deleting %d history sync messages after backfilling", len(allMsgs)) + user.log.Debugfln("Deleting %d history sync messages after backfilling (queue ID: %d)", len(allMsgs), req.QueueID) err := user.bridge.DB.HistorySyncQuery.DeleteMessages(user.MXID, conv.ConversationID, allMsgs) if err != nil { - user.log.Warnfln("Failed to delete %d history sync messages after backfilling: %v", len(allMsgs), err) + user.log.Warnfln("Failed to delete %d history sync messages after backfilling (queue ID: %d): %v", len(allMsgs), req.QueueID, err) } if !conv.MarkedAsUnread && conv.UnreadCount == 0 { @@ -422,7 +422,7 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo) newMinTs = lastMessage.Timestamp } - portal.log.Infofln("Processing history sync with %d messages", len(messages)) + portal.log.Debugfln("Processing backfill with %d messages", len(messages)) // The messages are ordered newest to oldest, so iterate them in reverse order. for i := len(messages) - 1; i >= 0; i-- { webMsg := messages[i]