diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go index 4e1a3c2a3..44f133388 100644 --- a/syncapi/notifier/notifier.go +++ b/syncapi/notifier/notifier.go @@ -45,6 +45,8 @@ type Notifier struct { lastCleanUpTime time.Time // Protects roomIDToJoinedUsers and roomIDToPeekingDevices mapLock *sync.RWMutex + // This map is reused to prevent allocations and GC pressure in SharedUsers. + _sharedUsers map[string]struct{} } // NewNotifier creates a new notifier set to the given sync position. @@ -58,12 +60,16 @@ func NewNotifier() *Notifier { streamLock: &sync.Mutex{}, mapLock: &sync.RWMutex{}, lastCleanUpTime: time.Now(), + _sharedUsers: map[string]struct{}{}, } } // SetCurrentPosition sets the current streaming positions. // This must be called directly after NewNotifier and initialising the streams. func (n *Notifier) SetCurrentPosition(currPos types.StreamingToken) { + n.streamLock.Lock() + defer n.streamLock.Unlock() + n.currPos = currPos } @@ -249,13 +255,22 @@ func (n *Notifier) OnNewPresence( n.wakeupUsers(sharedUsers, nil, n.currPos) } -func (n *Notifier) SharedUsers(userID string) (sharedUsers []string) { +func (n *Notifier) SharedUsers(userID string) []string { n.mapLock.RLock() defer n.mapLock.RUnlock() + n._sharedUsers[userID] = struct{}{} for roomID, users := range n.roomIDToJoinedUsers { - if _, ok := users[userID]; ok { - sharedUsers = append(sharedUsers, n.JoinedUsers(roomID)...) + if _, ok := users[userID]; !ok { + continue } + for _, userID := range n.JoinedUsers(roomID) { + n._sharedUsers[userID] = struct{}{} + } + } + sharedUsers := make([]string, 0, len(n._sharedUsers)+1) + for userID := range n._sharedUsers { + sharedUsers = append(sharedUsers, userID) + delete(n._sharedUsers, userID) } return sharedUsers } @@ -328,7 +343,7 @@ func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) { // This is just the bulk form of addJoinedUser for roomID, userIDs := range roomIDToUserIDs { if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { - n.roomIDToJoinedUsers[roomID] = make(userIDSet) + n.roomIDToJoinedUsers[roomID] = make(userIDSet, len(userIDs)) } for _, userID := range userIDs { n.roomIDToJoinedUsers[roomID].add(userID) @@ -343,7 +358,7 @@ func (n *Notifier) setPeekingDevices(roomIDToPeekingDevices map[string][]types.P // This is just the bulk form of addPeekingDevice for roomID, peekingDevices := range roomIDToPeekingDevices { if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { - n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet) + n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet, len(peekingDevices)) } for _, peekingDevice := range peekingDevices { n.roomIDToPeekingDevices[roomID].add(peekingDevice) @@ -416,7 +431,7 @@ func (n *Notifier) fetchUserStreams(userID string) []*UserDeviceStream { if !ok { return []*UserDeviceStream{} } - streams := []*UserDeviceStream{} + streams := make([]*UserDeviceStream, 0, len(user)) for _, stream := range user { streams = append(streams, stream) } @@ -514,10 +529,10 @@ func (n *Notifier) removeEmptyUserStreams() { } // A string set, mainly existing for improving clarity of structs in this file. -type userIDSet map[string]bool +type userIDSet map[string]struct{} func (s userIDSet) add(str string) { - s[str] = true + s[str] = struct{}{} } func (s userIDSet) remove(str string) { @@ -534,10 +549,10 @@ func (s userIDSet) values() (vals []string) { // A set of PeekingDevices, similar to userIDSet -type peekingDeviceSet map[types.PeekingDevice]bool +type peekingDeviceSet map[types.PeekingDevice]struct{} func (s peekingDeviceSet) add(d types.PeekingDevice) { - s[d] = true + s[d] = struct{}{} } // nolint:unused