0
0
Fork 0
mirror of https://github.com/matrix-org/dendrite synced 2024-09-24 03:48:59 +02:00

Ensure we only wake up a given user once (#2775)

This ensures that the sync API notifier only wakes up a given user once
for a given stream position.
This commit is contained in:
Neil Alexander 2022-10-07 13:42:35 +01:00 committed by GitHub
parent 8d8f4689a0
commit 1b5460a920
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -48,6 +48,7 @@ type Notifier struct {
lastCleanUpTime time.Time lastCleanUpTime time.Time
// This map is reused to prevent allocations and GC pressure in SharedUsers. // This map is reused to prevent allocations and GC pressure in SharedUsers.
_sharedUserMap map[string]struct{} _sharedUserMap map[string]struct{}
_wakeupUserMap map[string]struct{}
} }
// NewNotifier creates a new notifier set to the given sync position. // NewNotifier creates a new notifier set to the given sync position.
@ -61,6 +62,7 @@ func NewNotifier() *Notifier {
lock: &sync.RWMutex{}, lock: &sync.RWMutex{},
lastCleanUpTime: time.Now(), lastCleanUpTime: time.Now(),
_sharedUserMap: map[string]struct{}{}, _sharedUserMap: map[string]struct{}{},
_wakeupUserMap: map[string]struct{}{},
} }
} }
@ -408,12 +410,16 @@ func (n *Notifier) setPeekingDevices(roomIDToPeekingDevices map[string][]types.P
// specified user IDs, and also the specified peekingDevices // specified user IDs, and also the specified peekingDevices
func (n *Notifier) _wakeupUsers(userIDs []string, peekingDevices []types.PeekingDevice, newPos types.StreamingToken) { func (n *Notifier) _wakeupUsers(userIDs []string, peekingDevices []types.PeekingDevice, newPos types.StreamingToken) {
for _, userID := range userIDs { for _, userID := range userIDs {
n._wakeupUserMap[userID] = struct{}{}
}
for userID := range n._wakeupUserMap {
for _, stream := range n._fetchUserStreams(userID) { for _, stream := range n._fetchUserStreams(userID) {
if stream == nil { if stream == nil {
continue continue
} }
stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
} }
delete(n._wakeupUserMap, userID)
} }
for _, peekingDevice := range peekingDevices { for _, peekingDevice := range peekingDevices {