Compare commits

...

3 commits

Author SHA1 Message Date
Andrew Morgan f388592e42 Changelog 2021-10-20 20:02:33 +01:00
Andrew Morgan 6b31321588 Remove the need to convert to a List 2021-10-20 19:57:33 +01:00
Andrew Morgan 527864c97b Deduplicate presence updates before sending them to application services
We calculate presence updates for application services based on the
users that application service is interested in. For each of these
users, we determine which presence updates they are set to receive,
compile that into a list, and then send every update from the list
to the application service.

However, because a single presence update can cause a notification
for many different users, we're likely to end up with lots of
duplicated presence updates being collected here. Currently, all of
these are sent to the application service.

By using a Set, this deduplication happens automatically.
2021-10-20 19:50:18 +01:00
3 changed files with 11 additions and 7 deletions

1
changelog.d/11140.bugfix Normal file
View file

@ -0,0 +1 @@
Fix a long-standing bug that caused duplicate presence updates to be sent to application services.

View file

@ -48,7 +48,7 @@ This is all tied together by the AppServiceScheduler which DIs the required
components.
"""
import logging
from typing import List, Optional
from typing import Iterable, List, Optional
from synapse.appservice import ApplicationService, ApplicationServiceState
from synapse.events import EventBase
@ -95,7 +95,7 @@ class ApplicationServiceScheduler:
self.queuer.enqueue_event(service, event)
def submit_ephemeral_events_for_as(
self, service: ApplicationService, events: List[JsonDict]
self, service: ApplicationService, events: Iterable[JsonDict]
):
self.queuer.enqueue_ephemeral(service, events)
@ -130,7 +130,9 @@ class _ServiceQueuer:
self.queued_events.setdefault(service.id, []).append(event)
self._start_background_request(service)
def enqueue_ephemeral(self, service: ApplicationService, events: List[JsonDict]):
def enqueue_ephemeral(
self, service: ApplicationService, events: Iterable[JsonDict]
):
self.queued_ephemeral.setdefault(service.id, []).extend(events)
self._start_background_request(service)

View file

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Union
from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Set, Union
from prometheus_client import Counter
@ -231,6 +231,7 @@ class ApplicationServicesHandler:
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
# Only handle typing if we have the latest token
events: Union[Set[JsonDict], List[JsonDict]]
if stream_key == "typing_key" and new_token is not None:
events = await self._handle_typing(service, new_token)
if events:
@ -277,8 +278,8 @@ class ApplicationServicesHandler:
async def _handle_presence(
self, service: ApplicationService, users: Collection[Union[str, UserID]]
) -> List[JsonDict]:
events: List[JsonDict] = []
) -> Set[JsonDict]:
events: Set[JsonDict] = set()
presence_source = self.event_sources.sources.presence
from_key = await self.store.get_type_stream_id_for_appservice(
service, "presence"
@ -296,7 +297,7 @@ class ApplicationServicesHandler:
from_key=from_key,
)
time_now = self.clock.time_msec()
events.extend(
events.update(
{
"type": "m.presence",
"sender": event.user_id,