forked from MirrorHub/synapse
Limit the number of events sent over replication when persisting events. (#10082)
This commit is contained in:
parent
1641c5c707
commit
9408b86f5c
2 changed files with 11 additions and 7 deletions
1
changelog.d/10082.bugfix
Normal file
1
changelog.d/10082.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fixed a bug causing replication requests to fail when receiving a lot of events via federation.
|
|
@ -91,6 +91,7 @@ from synapse.types import (
|
||||||
get_domain_from_id,
|
get_domain_from_id,
|
||||||
)
|
)
|
||||||
from synapse.util.async_helpers import Linearizer, concurrently_execute
|
from synapse.util.async_helpers import Linearizer, concurrently_execute
|
||||||
|
from synapse.util.iterutils import batch_iter
|
||||||
from synapse.util.retryutils import NotRetryingDestination
|
from synapse.util.retryutils import NotRetryingDestination
|
||||||
from synapse.util.stringutils import shortstr
|
from synapse.util.stringutils import shortstr
|
||||||
from synapse.visibility import filter_events_for_server
|
from synapse.visibility import filter_events_for_server
|
||||||
|
@ -3053,11 +3054,13 @@ class FederationHandler(BaseHandler):
|
||||||
"""
|
"""
|
||||||
instance = self.config.worker.events_shard_config.get_instance(room_id)
|
instance = self.config.worker.events_shard_config.get_instance(room_id)
|
||||||
if instance != self._instance_name:
|
if instance != self._instance_name:
|
||||||
|
# Limit the number of events sent over federation.
|
||||||
|
for batch in batch_iter(event_and_contexts, 1000):
|
||||||
result = await self._send_events(
|
result = await self._send_events(
|
||||||
instance_name=instance,
|
instance_name=instance,
|
||||||
store=self.store,
|
store=self.store,
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
event_and_contexts=event_and_contexts,
|
event_and_contexts=batch,
|
||||||
backfilled=backfilled,
|
backfilled=backfilled,
|
||||||
)
|
)
|
||||||
return result["max_stream_id"]
|
return result["max_stream_id"]
|
||||||
|
|
Loading…
Reference in a new issue