backfill: remove intermediate buffer for backfill

This commit is contained in:
Sumner Evans 2022-05-13 13:18:52 -06:00
parent f3f6d88e55
commit df46ca99f9
No known key found for this signature in database
GPG Key ID: 8904527AB50022FD
5 changed files with 74 additions and 74 deletions

View File

@ -20,43 +20,73 @@ import (
"time"
log "maunium.net/go/maulogger/v2"
"maunium.net/go/mautrix/id"
"maunium.net/go/mautrix-whatsapp/database"
)
type BackfillQueue struct {
BackfillQuery *database.BackfillQuery
ImmediateBackfillRequests chan *database.Backfill
DeferredBackfillRequests chan *database.Backfill
ReCheckQueue chan bool
log log.Logger
BackfillQuery *database.BackfillQuery
reCheckChannels []chan bool
log log.Logger
}
// RunLoop fetches backfills from the database, prioritizing immediate and forward backfills
func (bq *BackfillQueue) RunLoop(user *User) {
func (bq *BackfillQueue) ReCheck() {
bq.log.Info("Sending re-checks to %d channels", len(bq.reCheckChannels))
for _, channel := range bq.reCheckChannels {
go func(c chan bool) {
c <- true
}(channel)
}
}
func (bq *BackfillQueue) GetNextBackfill(userID id.UserID, backfillTypes []database.BackfillType, reCheckChannel chan bool) *database.Backfill {
for {
if backfill := bq.BackfillQuery.GetNext(user.MXID); backfill != nil {
if backfill.BackfillType == database.BackfillImmediate || backfill.BackfillType == database.BackfillForward {
bq.ImmediateBackfillRequests <- backfill
} else if backfill.BackfillType == database.BackfillDeferred {
select {
case <-bq.ReCheckQueue:
// If a queue re-check is requested, interrupt sending the
// backfill request to the deferred channel so that
// immediate backfills can happen ASAP.
continue
case bq.DeferredBackfillRequests <- backfill:
}
} else {
bq.log.Debugfln("Unrecognized backfill type %d in queue", backfill.BackfillType)
}
if backfill := bq.BackfillQuery.GetNext(userID, backfillTypes); backfill != nil {
backfill.MarkDispatched()
return backfill
} else {
select {
case <-bq.ReCheckQueue:
case <-reCheckChannel:
case <-time.After(time.Minute):
}
}
}
}
func (user *User) HandleBackfillRequestsLoop(backfillTypes []database.BackfillType) {
reCheckChannel := make(chan bool)
user.BackfillQueue.reCheckChannels = append(user.BackfillQueue.reCheckChannels, reCheckChannel)
for {
req := user.BackfillQueue.GetNextBackfill(user.MXID, backfillTypes, reCheckChannel)
user.log.Infofln("Handling backfill request %s", req)
conv := user.bridge.DB.HistorySync.GetConversation(user.MXID, req.Portal)
if conv == nil {
user.log.Debugfln("Could not find history sync conversation data for %s", req.Portal.String())
req.MarkDone()
continue
}
portal := user.GetPortalByJID(conv.PortalKey.JID)
// Update the client store with basic chat settings.
if conv.MuteEndTime.After(time.Now()) {
user.Client.Store.ChatSettings.PutMutedUntil(conv.PortalKey.JID, conv.MuteEndTime)
}
if conv.Archived {
user.Client.Store.ChatSettings.PutArchived(conv.PortalKey.JID, true)
}
if conv.Pinned > 0 {
user.Client.Store.ChatSettings.PutPinned(conv.PortalKey.JID, true)
}
if conv.EphemeralExpiration != nil && portal.ExpirationTime != *conv.EphemeralExpiration {
portal.ExpirationTime = *conv.EphemeralExpiration
portal.Update(nil)
}
user.backfillInChunks(req, conv, portal)
req.MarkDone()
}
}

View File

@ -876,7 +876,7 @@ func (handler *CommandHandler) CommandBackfill(ce *CommandEvent) {
backfillMessages := ce.Portal.bridge.DB.Backfill.NewWithValues(ce.User.MXID, database.BackfillImmediate, 0, &ce.Portal.Key, nil, batchSize, -1, batchDelay)
backfillMessages.Insert()
ce.User.BackfillQueue.ReCheckQueue <- true
ce.User.BackfillQueue.ReCheck()
}
const cmdListHelp = `list <contacts|groups> [page] [items per page] - Get a list of all contacts and groups.`

View File

@ -20,6 +20,8 @@ import (
"database/sql"
"errors"
"fmt"
"strconv"
"strings"
"time"
log "maunium.net/go/maulogger/v2"
@ -79,6 +81,7 @@ const (
SELECT queue_id, user_mxid, type, priority, portal_jid, portal_receiver, time_start, max_batch_events, max_total_events, batch_delay
FROM backfill_queue
WHERE user_mxid=$1
AND type IN (%s)
AND dispatch_time IS NULL
ORDER BY type, priority, queue_id
LIMIT 1
@ -86,13 +89,17 @@ const (
)
// GetNext returns the next backfill to perform
func (bq *BackfillQuery) GetNext(userID id.UserID) (backfill *Backfill) {
rows, err := bq.db.Query(getNextBackfillQuery, userID)
defer rows.Close()
func (bq *BackfillQuery) GetNext(userID id.UserID, backfillTypes []BackfillType) (backfill *Backfill) {
types := []string{}
for _, backfillType := range backfillTypes {
types = append(types, strconv.Itoa(int(backfillType)))
}
rows, err := bq.db.Query(fmt.Sprintf(getNextBackfillQuery, strings.Join(types, ",")), userID)
if err != nil || rows == nil {
bq.log.Error(err)
return
}
defer rows.Close()
if rows.Next() {
backfill = bq.New().Scan(rows)
}

View File

@ -51,27 +51,22 @@ func (user *User) handleHistorySyncsLoop() {
return
}
reCheckQueue := make(chan bool, 1)
// Start the backfill queue.
user.BackfillQueue = &BackfillQueue{
BackfillQuery: user.bridge.DB.Backfill,
ImmediateBackfillRequests: make(chan *database.Backfill, 1),
DeferredBackfillRequests: make(chan *database.Backfill, 1),
ReCheckQueue: make(chan bool, 1),
log: user.log.Sub("BackfillQueue"),
BackfillQuery: user.bridge.DB.Backfill,
reCheckChannels: []chan bool{},
log: user.log.Sub("BackfillQueue"),
}
reCheckQueue = user.BackfillQueue.ReCheckQueue
// Immediate backfills can be done in parallel
for i := 0; i < user.bridge.Config.Bridge.HistorySync.Immediate.WorkerCount; i++ {
go user.handleBackfillRequestsLoop(user.BackfillQueue.ImmediateBackfillRequests)
go user.HandleBackfillRequestsLoop([]database.BackfillType{database.BackfillImmediate, database.BackfillForward})
}
// Deferred backfills should be handled synchronously so as not to
// overload the homeserver. Users can configure their backfill stages
// to be more or less aggressive with backfilling at this stage.
go user.handleBackfillRequestsLoop(user.BackfillQueue.DeferredBackfillRequests)
go user.BackfillQueue.RunLoop(user)
go user.HandleBackfillRequestsLoop([]database.BackfillType{database.BackfillDeferred})
if user.bridge.Config.Bridge.HistorySync.MediaRequests.AutoRequestMedia &&
user.bridge.Config.Bridge.HistorySync.MediaRequests.RequestMethod == config.MediaRequestMethodLocalTime {
@ -81,7 +76,7 @@ func (user *User) handleHistorySyncsLoop() {
// Always save the history syncs for the user. If they want to enable
// backfilling in the future, we will have it in the database.
for evt := range user.historySyncs {
user.handleHistorySync(reCheckQueue, evt.Data)
user.handleHistorySync(user.BackfillQueue, evt.Data)
}
}
@ -131,38 +126,6 @@ func (user *User) dailyMediaRequestLoop() {
}
}
func (user *User) handleBackfillRequestsLoop(backfillRequests chan *database.Backfill) {
for req := range backfillRequests {
user.log.Infofln("Handling backfill request %s", req)
conv := user.bridge.DB.HistorySync.GetConversation(user.MXID, req.Portal)
if conv == nil {
user.log.Debugfln("Could not find history sync conversation data for %s", req.Portal.String())
req.MarkDone()
continue
}
portal := user.GetPortalByJID(conv.PortalKey.JID)
// Update the client store with basic chat settings.
if conv.MuteEndTime.After(time.Now()) {
user.Client.Store.ChatSettings.PutMutedUntil(conv.PortalKey.JID, conv.MuteEndTime)
}
if conv.Archived {
user.Client.Store.ChatSettings.PutArchived(conv.PortalKey.JID, true)
}
if conv.Pinned > 0 {
user.Client.Store.ChatSettings.PutPinned(conv.PortalKey.JID, true)
}
if conv.EphemeralExpiration != nil && portal.ExpirationTime != *conv.EphemeralExpiration {
portal.ExpirationTime = *conv.EphemeralExpiration
portal.Update(nil)
}
user.backfillInChunks(req, conv, portal)
req.MarkDone()
}
}
func (user *User) backfillInChunks(req *database.Backfill, conv *database.HistorySyncConversation, portal *Portal) {
portal.backfillLock.Lock()
defer portal.backfillLock.Unlock()
@ -294,7 +257,7 @@ func (user *User) shouldCreatePortalForHistorySync(conv *database.HistorySyncCon
return false
}
func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.HistorySync) {
func (user *User) handleHistorySync(backfillQueue *BackfillQueue, evt *waProto.HistorySync) {
if evt == nil || evt.SyncType == nil || evt.GetSyncType() == waProto.HistorySync_INITIAL_STATUS_V3 || evt.GetSyncType() == waProto.HistorySync_PUSH_NAME {
return
}
@ -385,7 +348,7 @@ func (user *User) handleHistorySync(reCheckQueue chan bool, evt *waProto.History
}
// Tell the queue to check for new backfill requests.
reCheckQueue <- true
backfillQueue.ReCheck()
}
}
}

View File

@ -1368,7 +1368,7 @@ func (portal *Portal) CreateMatrixRoom(user *User, groupInfo *types.GroupInfo, i
portals := []*Portal{portal}
user.EnqueueImmedateBackfills(portals)
user.EnqueueDeferredBackfills(portals)
user.BackfillQueue.ReCheckQueue <- true
user.BackfillQueue.ReCheck()
}
return nil
}