backfill: add more logging to the queue

This commit is contained in:
Sumner Evans 2022-04-28 12:23:34 -06:00
parent 387148849a
commit ac7a437ad8
No known key found for this signature in database
GPG key ID: 8904527AB50022FD
2 changed files with 13 additions and 7 deletions

View file

@ -45,7 +45,9 @@ func (bq *BackfillQueue) immediateBackfillLoop(user *User) {
} else { } else {
select { select {
case <-bq.ReCheckQueue: case <-bq.ReCheckQueue:
bq.log.Debugfln("Re-checking infinite backfill queue due to forced re-check")
case <-time.After(10 * time.Second): case <-time.After(10 * time.Second):
bq.log.Debugfln("Re-checking infinite backfill queue due to timeout")
} }
} }
} }
@ -55,6 +57,7 @@ func (bq *BackfillQueue) deferredBackfillLoop(user *User) {
for { for {
// Finish all immediate backfills before doing the deferred ones. // Finish all immediate backfills before doing the deferred ones.
if immediate := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); immediate != nil { if immediate := bq.BackfillQuery.GetNext(user.MXID, database.BackfillImmediate); immediate != nil {
bq.log.Debugfln("Not doing any deferred or media backfill since there are immediate backfills to do")
time.Sleep(10 * time.Second) time.Sleep(10 * time.Second)
} else if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillDeferred); backfill != nil { } else if backfill := bq.BackfillQuery.GetNext(user.MXID, database.BackfillDeferred); backfill != nil {
bq.DeferredBackfillRequests <- backfill bq.DeferredBackfillRequests <- backfill
@ -63,7 +66,10 @@ func (bq *BackfillQueue) deferredBackfillLoop(user *User) {
bq.DeferredBackfillRequests <- mediaBackfill bq.DeferredBackfillRequests <- mediaBackfill
mediaBackfill.MarkDone() mediaBackfill.MarkDone()
} else { } else {
time.Sleep(10 * time.Second) select {
case <-bq.ReCheckQueue:
case <-time.After(time.Minute):
}
} }
} }
} }

View file

@ -78,7 +78,7 @@ func (user *User) handleHistorySyncsLoop() {
func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Backfill) { func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Backfill) {
for req := range backfillRequests { for req := range backfillRequests {
user.log.Debugfln("Handling backfill request %s", req) user.log.Infofln("Handling backfill request %s", req)
conv := user.bridge.DB.HistorySyncQuery.GetConversation(user.MXID, req.Portal) conv := user.bridge.DB.HistorySyncQuery.GetConversation(user.MXID, req.Portal)
if conv == nil { if conv == nil {
user.log.Debugfln("Could not find history sync conversation data for %s", req.Portal.String()) user.log.Debugfln("Could not find history sync conversation data for %s", req.Portal.String())
@ -96,7 +96,7 @@ func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Bac
endTime = *req.TimeEnd endTime = *req.TimeEnd
} }
user.log.Debugfln("Backfilling media from %v to %v for %s", startTime, endTime, portal.Key.String()) user.log.Infofln("Backfilling media from %v to %v for %s", startTime, endTime, portal.Key.String())
// Go through all of the messages in the given time range, // Go through all of the messages in the given time range,
// requesting any media that errored. // requesting any media that errored.
@ -155,7 +155,7 @@ func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill
} }
} }
user.log.Debugfln("Backfilling %d messages in %s, %d messages at a time", len(allMsgs), portal.Key.JID, req.MaxBatchEvents) user.log.Infofln("Backfilling %d messages in %s, %d messages at a time (queue ID: %d)", len(allMsgs), portal.Key.JID, req.MaxBatchEvents, req.QueueID)
toBackfill := allMsgs[0:] toBackfill := allMsgs[0:]
var insertionEventIds []id.EventID var insertionEventIds []id.EventID
for len(toBackfill) > 0 { for len(toBackfill) > 0 {
@ -180,10 +180,10 @@ func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill
time.Unix(int64(allMsgs[0].GetMessageTimestamp()), 0), time.Unix(int64(allMsgs[0].GetMessageTimestamp()), 0),
insertionEventIds[0]) insertionEventIds[0])
} }
user.log.Debugfln("Deleting %d history sync messages after backfilling", len(allMsgs)) user.log.Debugfln("Deleting %d history sync messages after backfilling (queue ID: %d)", len(allMsgs), req.QueueID)
err := user.bridge.DB.HistorySyncQuery.DeleteMessages(user.MXID, conv.ConversationID, allMsgs) err := user.bridge.DB.HistorySyncQuery.DeleteMessages(user.MXID, conv.ConversationID, allMsgs)
if err != nil { if err != nil {
user.log.Warnfln("Failed to delete %d history sync messages after backfilling: %v", len(allMsgs), err) user.log.Warnfln("Failed to delete %d history sync messages after backfilling (queue ID: %d): %v", len(allMsgs), req.QueueID, err)
} }
if !conv.MarkedAsUnread && conv.UnreadCount == 0 { if !conv.MarkedAsUnread && conv.UnreadCount == 0 {
@ -422,7 +422,7 @@ func (portal *Portal) backfill(source *User, messages []*waProto.WebMessageInfo)
newMinTs = lastMessage.Timestamp newMinTs = lastMessage.Timestamp
} }
portal.log.Infofln("Processing history sync with %d messages", len(messages)) portal.log.Debugfln("Processing backfill with %d messages", len(messages))
// The messages are ordered newest to oldest, so iterate them in reverse order. // The messages are ordered newest to oldest, so iterate them in reverse order.
for i := len(messages) - 1; i >= 0; i-- { for i := len(messages) - 1; i >= 0; i-- {
webMsg := messages[i] webMsg := messages[i]