Clean up some backfill things

This commit is contained in:
Tulir Asokan 2022-04-18 20:12:24 +03:00
parent 5b9fe8e08a
commit 2c68ccc0a3
2 changed files with 25 additions and 29 deletions

View file

@ -64,16 +64,16 @@ func (bq *BackfillQuery) NewWithValues(userID id.UserID, backfillType BackfillTy
const (
getNextBackfillQuery = `
SELECT queue_id, user_mxid, type, priority, portal_jid, portal_receiver, time_start, time_end, max_batch_events, max_total_events, batch_delay
FROM backfill_queue
WHERE user_mxid=$1
AND type=$2
AND completed_at IS NULL
ORDER BY priority, queue_id
LIMIT 1
FROM backfill_queue
WHERE user_mxid=$1
AND type=$2
AND completed_at IS NULL
ORDER BY priority, queue_id
LIMIT 1
`
)
/// Returns the next backfill to perform
// GetNext returns the next backfill to perform
func (bq *BackfillQuery) GetNext(userID id.UserID, backfillType BackfillType) (backfill *Backfill) {
rows, err := bq.db.Query(getNextBackfillQuery, userID, backfillType)
defer rows.Close()

View file

@ -132,15 +132,11 @@ func (user *User) createOrUpdatePortalAndBackfillWithLock(req *database.Backfill
user.log.Debugfln("Backfilling %d messages in %s, %d messages at a time", len(allMsgs), portal.Key.JID, req.MaxBatchEvents)
toBackfill := allMsgs[0:]
var insertionEventIds []id.EventID
for {
if len(toBackfill) == 0 {
break
}
for len(toBackfill) > 0 {
var msgs []*waProto.WebMessageInfo
if len(toBackfill) <= req.MaxBatchEvents {
msgs = toBackfill
toBackfill = toBackfill[0:0]
toBackfill = nil
} else {
msgs = toBackfill[:req.MaxBatchEvents]
toBackfill = toBackfill[req.MaxBatchEvents:]
@ -598,24 +594,24 @@ func (portal *Portal) finishBatchEvt(info *wrappedInfo, eventID id.EventID) {
}
func (portal *Portal) sendPostBackfillDummy(lastTimestamp time.Time, insertionEventId id.EventID) {
for _, evtType := range []event.Type{BackfillEndDummyEvent, HistorySyncMarker} {
resp, err := portal.MainIntent().SendMessageEvent(portal.MXID, evtType, map[string]interface{}{
"org.matrix.msc2716.marker.insertion": insertionEventId,
//"m.marker.insertion": insertionEventId,
})
if err != nil {
portal.log.Errorln("Error sending post-backfill dummy event:", err)
return
}
msg := portal.bridge.DB.Message.New()
msg.Chat = portal.Key
msg.MXID = resp.EventID
msg.JID = types.MessageID(resp.EventID)
msg.Timestamp = lastTimestamp.Add(1 * time.Second)
msg.Sent = true
msg.Insert()
// TODO remove after clients stop using this
_, _ = portal.MainIntent().SendMessageEvent(portal.MXID, BackfillEndDummyEvent, struct{}{})
resp, err := portal.MainIntent().SendMessageEvent(portal.MXID, HistorySyncMarker, map[string]interface{}{
"org.matrix.msc2716.marker.insertion": insertionEventId,
//"m.marker.insertion": insertionEventId,
})
if err != nil {
portal.log.Errorln("Error sending post-backfill dummy event:", err)
return
}
msg := portal.bridge.DB.Message.New()
msg.Chat = portal.Key
msg.MXID = resp.EventID
msg.JID = types.MessageID(resp.EventID)
msg.Timestamp = lastTimestamp.Add(1 * time.Second)
msg.Sent = true
msg.Insert()
}
// endregion