mirror of
https://github.com/matrix-org/dendrite
synced 2024-11-18 15:50:52 +01:00
Check for duplicates when pulling things out of the database
This commit is contained in:
parent
147b3eddad
commit
cae335bc74
2 changed files with 25 additions and 0 deletions
|
@ -170,6 +170,7 @@ func (oq *destinationQueue) wakeQueueIfNeeded() {
|
|||
// getPendingFromDatabase will look at the database and see if
|
||||
// there are any persisted events that haven't been sent to this
|
||||
// destination yet. If so, they will be queued up.
|
||||
// nolint:gocyclo
|
||||
func (oq *destinationQueue) getPendingFromDatabase() {
|
||||
// Check to see if there's anything to do for this server
|
||||
// in the database.
|
||||
|
@ -177,10 +178,27 @@ func (oq *destinationQueue) getPendingFromDatabase() {
|
|||
ctx := context.Background()
|
||||
oq.pendingMutex.Lock()
|
||||
defer oq.pendingMutex.Unlock()
|
||||
|
||||
// Take a note of all of the PDUs and EDUs that we already
|
||||
// have cached. We will index them based on the receipt,
|
||||
// which ultimately just contains the index of the PDU/EDU
|
||||
// in the database.
|
||||
gotPDUs := map[string]struct{}{}
|
||||
gotEDUs := map[string]struct{}{}
|
||||
for _, pdu := range oq.pendingPDUs {
|
||||
gotPDUs[pdu.receipt.String()] = struct{}{}
|
||||
}
|
||||
for _, edu := range oq.pendingEDUs {
|
||||
gotEDUs[edu.receipt.String()] = struct{}{}
|
||||
}
|
||||
|
||||
if pduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); pduCapacity > 0 {
|
||||
// We have room in memory for some PDUs - let's request no more than that.
|
||||
if pdus, err := oq.db.GetPendingPDUs(ctx, oq.destination, pduCapacity); err == nil {
|
||||
for receipt, pdu := range pdus {
|
||||
if _, ok := gotPDUs[receipt.String()]; ok {
|
||||
continue
|
||||
}
|
||||
oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{receipt, pdu})
|
||||
retrieved = true
|
||||
}
|
||||
|
@ -192,6 +210,9 @@ func (oq *destinationQueue) getPendingFromDatabase() {
|
|||
// We have room in memory for some EDUs - let's request no more than that.
|
||||
if edus, err := oq.db.GetPendingEDUs(ctx, oq.destination, eduCapacity); err == nil {
|
||||
for receipt, edu := range edus {
|
||||
if _, ok := gotEDUs[receipt.String()]; ok {
|
||||
continue
|
||||
}
|
||||
oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{receipt, edu})
|
||||
retrieved = true
|
||||
}
|
||||
|
|
|
@ -46,6 +46,10 @@ type Receipt struct {
|
|||
nid int64
|
||||
}
|
||||
|
||||
func (r *Receipt) String() string {
|
||||
return fmt.Sprintf("%d", r.nid)
|
||||
}
|
||||
|
||||
// UpdateRoom updates the joined hosts for a room and returns what the joined
|
||||
// hosts were before the update, or nil if this was a duplicate message.
|
||||
// This is called when we receive a message from kafka, so we pass in
|
||||
|
|
Loading…
Reference in a new issue