0
0
Fork 0
mirror of https://github.com/matrix-org/dendrite synced 2024-11-19 00:00:55 +01:00
This commit is contained in:
Neil Alexander 2020-12-14 17:45:43 +00:00
parent c7a9cee5cc
commit 9a5e0a81f9
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 14 additions and 18 deletions

View file

@ -77,9 +77,8 @@ func (n *Notifier) OnNewEvent(
// This needs to be done PRIOR to waking up users as they will read this value.
n.streamLock.Lock()
defer n.streamLock.Unlock()
latestPos := n.currPos.WithUpdates(posUpdate)
n.currPos = latestPos
n.currPos.ApplyUpdates(posUpdate)
n.removeEmptyUserStreams()
if ev != nil {
@ -113,11 +112,11 @@ func (n *Notifier) OnNewEvent(
}
}
n.wakeupUsers(usersToNotify, peekingDevicesToNotify, latestPos)
n.wakeupUsers(usersToNotify, peekingDevicesToNotify, n.currPos)
} else if roomID != "" {
n.wakeupUsers(n.joinedUsers(roomID), n.PeekingDevices(roomID), latestPos)
n.wakeupUsers(n.joinedUsers(roomID), n.PeekingDevices(roomID), n.currPos)
} else if len(userIDs) > 0 {
n.wakeupUsers(userIDs, nil, latestPos)
n.wakeupUsers(userIDs, nil, n.currPos)
} else {
log.WithFields(log.Fields{
"posUpdate": posUpdate.String,
@ -155,10 +154,9 @@ func (n *Notifier) OnNewSendToDevice(
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
latestPos := n.currPos.WithUpdates(posUpdate)
n.currPos = latestPos
n.wakeupUserDevice(userID, deviceIDs, latestPos)
n.currPos.ApplyUpdates(posUpdate)
n.wakeupUserDevice(userID, deviceIDs, n.currPos)
}
// OnNewReceipt updates the current position
@ -168,10 +166,9 @@ func (n *Notifier) OnNewTyping(
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
latestPos := n.currPos.WithUpdates(posUpdate)
n.currPos = latestPos
n.wakeupUsers(n.joinedUsers(roomID), nil, latestPos)
n.currPos.ApplyUpdates(posUpdate)
n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos)
}
// OnNewReceipt updates the current position
@ -181,10 +178,9 @@ func (n *Notifier) OnNewReceipt(
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
latestPos := n.currPos.WithUpdates(posUpdate)
n.currPos = latestPos
n.wakeupUsers(n.joinedUsers(roomID), nil, latestPos)
n.currPos.ApplyUpdates(posUpdate)
n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos)
}
func (n *Notifier) OnNewKeyChange(
@ -192,9 +188,9 @@ func (n *Notifier) OnNewKeyChange(
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
latestPos := n.currPos.WithUpdates(posUpdate)
n.currPos = latestPos
n.wakeupUsers([]string{wakeUserID}, nil, latestPos)
n.currPos.ApplyUpdates(posUpdate)
n.wakeupUsers([]string{wakeUserID}, nil, n.currPos)
}
// GetListener returns a UserStreamListener that can be used to wait for

View file

@ -294,9 +294,9 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
// Add the updates into the sync response.
for _, event := range events {
res.ToDevice.Events = append(res.ToDevice.Events, event.SendToDeviceEvent)
}
res.NextBatch.SendToDevicePosition++
}
}
return res, err
}