forked from MirrorHub/mautrix-whatsapp
Merge pull request #502 from mautrix/backfill-queue-fixes
backfill: fixes and minor refactors
This commit is contained in:
commit
1f292dc1c5
4 changed files with 63 additions and 19 deletions
|
@ -40,26 +40,29 @@ func (bq *BackfillQueue) ReCheck() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bq *BackfillQueue) GetNextBackfill(userID id.UserID, backfillTypes []database.BackfillType, reCheckChannel chan bool) *database.Backfill {
|
func (bq *BackfillQueue) GetNextBackfill(userID id.UserID, backfillTypes []database.BackfillType, waitForBackfillTypes []database.BackfillType, reCheckChannel chan bool) *database.Backfill {
|
||||||
for {
|
for {
|
||||||
if backfill := bq.BackfillQuery.GetNext(userID, backfillTypes); backfill != nil {
|
if !bq.BackfillQuery.HasUnstartedOrInFlightOfType(userID, waitForBackfillTypes) {
|
||||||
backfill.MarkDispatched()
|
// check for immediate when dealing with deferred
|
||||||
return backfill
|
if backfill := bq.BackfillQuery.GetNext(userID, backfillTypes); backfill != nil {
|
||||||
} else {
|
backfill.MarkDispatched()
|
||||||
select {
|
return backfill
|
||||||
case <-reCheckChannel:
|
|
||||||
case <-time.After(time.Minute):
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-reCheckChannel:
|
||||||
|
case <-time.After(time.Minute):
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (user *User) HandleBackfillRequestsLoop(backfillTypes []database.BackfillType) {
|
func (user *User) HandleBackfillRequestsLoop(backfillTypes []database.BackfillType, waitForBackfillTypes []database.BackfillType) {
|
||||||
reCheckChannel := make(chan bool)
|
reCheckChannel := make(chan bool)
|
||||||
user.BackfillQueue.reCheckChannels = append(user.BackfillQueue.reCheckChannels, reCheckChannel)
|
user.BackfillQueue.reCheckChannels = append(user.BackfillQueue.reCheckChannels, reCheckChannel)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
req := user.BackfillQueue.GetNextBackfill(user.MXID, backfillTypes, reCheckChannel)
|
req := user.BackfillQueue.GetNextBackfill(user.MXID, backfillTypes, waitForBackfillTypes, reCheckChannel)
|
||||||
user.log.Infofln("Handling backfill request %s", req)
|
user.log.Infofln("Handling backfill request %s", req)
|
||||||
|
|
||||||
conv := user.bridge.DB.HistorySync.GetConversation(user.MXID, req.Portal)
|
conv := user.bridge.DB.HistorySync.GetConversation(user.MXID, req.Portal)
|
||||||
|
|
|
@ -97,6 +97,14 @@ const (
|
||||||
ORDER BY type, priority, queue_id
|
ORDER BY type, priority, queue_id
|
||||||
LIMIT 1
|
LIMIT 1
|
||||||
`
|
`
|
||||||
|
getUnstartedOrInFlightQuery = `
|
||||||
|
SELECT 1
|
||||||
|
FROM backfill_queue
|
||||||
|
WHERE user_mxid=$1
|
||||||
|
AND type IN (%s)
|
||||||
|
AND (dispatch_time IS NULL OR completed_at IS NULL)
|
||||||
|
LIMIT 1
|
||||||
|
`
|
||||||
)
|
)
|
||||||
|
|
||||||
// GetNext returns the next backfill to perform
|
// GetNext returns the next backfill to perform
|
||||||
|
@ -120,6 +128,28 @@ func (bq *BackfillQuery) GetNext(userID id.UserID, backfillTypes []BackfillType)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bq *BackfillQuery) HasUnstartedOrInFlightOfType(userID id.UserID, backfillTypes []BackfillType) bool {
|
||||||
|
if len(backfillTypes) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
bq.backfillQueryLock.Lock()
|
||||||
|
defer bq.backfillQueryLock.Unlock()
|
||||||
|
|
||||||
|
types := []string{}
|
||||||
|
for _, backfillType := range backfillTypes {
|
||||||
|
types = append(types, strconv.Itoa(int(backfillType)))
|
||||||
|
}
|
||||||
|
rows, err := bq.db.Query(fmt.Sprintf(getUnstartedOrInFlightQuery, strings.Join(types, ",")), userID)
|
||||||
|
if err != nil || rows == nil {
|
||||||
|
// No rows means that there are no unstarted or in flight backfill
|
||||||
|
// requests.
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
return rows.Next()
|
||||||
|
}
|
||||||
|
|
||||||
func (bq *BackfillQuery) DeleteAll(userID id.UserID) {
|
func (bq *BackfillQuery) DeleteAll(userID id.UserID) {
|
||||||
bq.backfillQueryLock.Lock()
|
bq.backfillQueryLock.Lock()
|
||||||
defer bq.backfillQueryLock.Unlock()
|
defer bq.backfillQueryLock.Unlock()
|
||||||
|
|
7
database/upgrades/49-backfill-state-timestamp-bigint.sql
Normal file
7
database/upgrades/49-backfill-state-timestamp-bigint.sql
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
-- v49: Convert first_expected_ts to BIGINT so that we can use zero-values.
|
||||||
|
|
||||||
|
-- only: sqlite
|
||||||
|
UPDATE backfill_state SET first_expected_ts=unixepoch(first_expected_ts);
|
||||||
|
|
||||||
|
-- only: postgres
|
||||||
|
ALTER TABLE backfill_state ALTER COLUMN first_expected_ts TYPE BIGINT;
|
|
@ -58,15 +58,17 @@ func (user *User) handleHistorySyncsLoop() {
|
||||||
log: user.log.Sub("BackfillQueue"),
|
log: user.log.Sub("BackfillQueue"),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
forwardAndImmediate := []database.BackfillType{database.BackfillImmediate, database.BackfillForward}
|
||||||
|
|
||||||
// Immediate backfills can be done in parallel
|
// Immediate backfills can be done in parallel
|
||||||
for i := 0; i < user.bridge.Config.Bridge.HistorySync.Immediate.WorkerCount; i++ {
|
for i := 0; i < user.bridge.Config.Bridge.HistorySync.Immediate.WorkerCount; i++ {
|
||||||
go user.HandleBackfillRequestsLoop([]database.BackfillType{database.BackfillImmediate, database.BackfillForward})
|
go user.HandleBackfillRequestsLoop(forwardAndImmediate, []database.BackfillType{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deferred backfills should be handled synchronously so as not to
|
// Deferred backfills should be handled synchronously so as not to
|
||||||
// overload the homeserver. Users can configure their backfill stages
|
// overload the homeserver. Users can configure their backfill stages
|
||||||
// to be more or less aggressive with backfilling at this stage.
|
// to be more or less aggressive with backfilling at this stage.
|
||||||
go user.HandleBackfillRequestsLoop([]database.BackfillType{database.BackfillDeferred})
|
go user.HandleBackfillRequestsLoop([]database.BackfillType{database.BackfillDeferred}, forwardAndImmediate)
|
||||||
|
|
||||||
if user.bridge.Config.Bridge.HistorySync.MediaRequests.AutoRequestMedia &&
|
if user.bridge.Config.Bridge.HistorySync.MediaRequests.AutoRequestMedia &&
|
||||||
user.bridge.Config.Bridge.HistorySync.MediaRequests.RequestMethod == config.MediaRequestMethodLocalTime {
|
user.bridge.Config.Bridge.HistorySync.MediaRequests.RequestMethod == config.MediaRequestMethodLocalTime {
|
||||||
|
@ -130,17 +132,16 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor
|
||||||
portal.backfillLock.Lock()
|
portal.backfillLock.Lock()
|
||||||
defer portal.backfillLock.Unlock()
|
defer portal.backfillLock.Unlock()
|
||||||
|
|
||||||
|
if !user.shouldCreatePortalForHistorySync(conv, portal) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
backfillState := user.bridge.DB.Backfill.GetBackfillState(user.MXID, &portal.Key)
|
backfillState := user.bridge.DB.Backfill.GetBackfillState(user.MXID, &portal.Key)
|
||||||
if backfillState == nil {
|
if backfillState == nil {
|
||||||
backfillState = user.bridge.DB.Backfill.NewBackfillState(user.MXID, &portal.Key)
|
backfillState = user.bridge.DB.Backfill.NewBackfillState(user.MXID, &portal.Key)
|
||||||
}
|
}
|
||||||
backfillState.SetProcessingBatch(true)
|
backfillState.SetProcessingBatch(true)
|
||||||
defer backfillState.SetProcessingBatch(false)
|
defer backfillState.SetProcessingBatch(false)
|
||||||
portal.updateBackfillStatus(backfillState)
|
|
||||||
|
|
||||||
if !user.shouldCreatePortalForHistorySync(conv, portal) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var forwardPrevID id.EventID
|
var forwardPrevID id.EventID
|
||||||
var timeEnd *time.Time
|
var timeEnd *time.Time
|
||||||
|
@ -201,6 +202,9 @@ func (user *User) backfillInChunks(req *database.Backfill, conv *database.Histor
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update the backfill status here after the room has been created.
|
||||||
|
portal.updateBackfillStatus(backfillState)
|
||||||
|
|
||||||
if sendDisappearedNotice {
|
if sendDisappearedNotice {
|
||||||
user.log.Debugfln("Sending notice to %s that there are disappeared messages ending at %v", portal.Key.JID, conv.LastMessageTimestamp)
|
user.log.Debugfln("Sending notice to %s that there are disappeared messages ending at %v", portal.Key.JID, conv.LastMessageTimestamp)
|
||||||
resp, err := portal.sendMessage(portal.MainIntent(), event.EventMessage, &event.MessageEventContent{
|
resp, err := portal.sendMessage(portal.MainIntent(), event.EventMessage, &event.MessageEventContent{
|
||||||
|
@ -734,10 +738,10 @@ func (portal *Portal) updateBackfillStatus(backfillState *database.BackfillState
|
||||||
|
|
||||||
_, err := portal.MainIntent().SendStateEvent(portal.MXID, BackfillStatusEvent, "", map[string]interface{}{
|
_, err := portal.MainIntent().SendStateEvent(portal.MXID, BackfillStatusEvent, "", map[string]interface{}{
|
||||||
"status": backfillStatus,
|
"status": backfillStatus,
|
||||||
"first_timestamp": backfillState.FirstExpectedTimestamp,
|
"first_timestamp": backfillState.FirstExpectedTimestamp * 1000,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
portal.log.Errorln("Error sending post-backfill dummy event:", err)
|
portal.log.Errorln("Error sending backfill status event:", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue