diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index d9567eeba..be63290c6 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -46,6 +46,7 @@ const ( // ensures that only one request is in flight to a given destination // at a time. type destinationQueue struct { + queues *OutgoingQueues db storage.Database process *process.ProcessContext signing *SigningInfo @@ -246,6 +247,7 @@ func (oq *destinationQueue) backgroundSend() { } destinationQueueRunning.Inc() defer destinationQueueRunning.Dec() + defer oq.queues.clearQueue(oq) defer oq.running.Store(false) // Mark the queue as overflowed, so we will consult the database diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 4453ddb01..f32ae20fd 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -120,7 +120,7 @@ func NewOutgoingQueues( log.WithError(err).Error("Failed to get EDU server names for destination queue hydration") } for serverName := range serverNames { - if queue := queues.getQueue(serverName); !queue.statistics.Blacklisted() { + if queue := queues.getQueue(serverName); queue != nil { queue.wakeQueueIfNeeded() } } @@ -148,12 +148,16 @@ type queuedEDU struct { } func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue { + if oqs.statistics.ForServer(destination).Blacklisted() { + return nil + } oqs.queuesMutex.Lock() defer oqs.queuesMutex.Unlock() - oq := oqs.queues[destination] - if oq == nil { + oq, ok := oqs.queues[destination] + if !ok { destinationQueueTotal.Inc() oq = &destinationQueue{ + queues: oqs, db: oqs.db, process: oqs.process, rsAPI: oqs.rsAPI, @@ -170,6 +174,16 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d return oq } +func (oqs *OutgoingQueues) clearQueue(oq *destinationQueue) { + oqs.queuesMutex.Lock() + defer oqs.queuesMutex.Unlock() + + close(oq.notify) + close(oq.interruptBackoff) + delete(oqs.queues, oq.destination) + destinationQueueTotal.Dec() +} + type ErrorFederationDisabled struct { Message string } @@ -236,7 +250,9 @@ func (oqs *OutgoingQueues) SendEvent( } for destination := range destmap { - oqs.getQueue(destination).sendEvent(ev, nid) + if queue := oqs.getQueue(destination); queue != nil { + queue.sendEvent(ev, nid) + } } return nil @@ -306,7 +322,9 @@ func (oqs *OutgoingQueues) SendEDU( } for destination := range destmap { - oqs.getQueue(destination).sendEDU(e, nid) + if queue := oqs.getQueue(destination); queue != nil { + queue.sendEDU(e, nid) + } } return nil @@ -317,9 +335,7 @@ func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) { if oqs.disabled { return } - q := oqs.getQueue(srv) - if q == nil { - return + if queue := oqs.getQueue(srv); queue != nil { + queue.wakeQueueIfNeeded() } - q.wakeQueueIfNeeded() }