This commit is contained in:
Tulir Asokan 2021-11-10 17:21:04 -06:00 committed by GitHub
commit 498cb03954
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 11 additions and 0 deletions

1
changelog.d/11220.bugfix Normal file
View file

@ -0,0 +1 @@
Fix using MSC2716 batch sending in combination with event persistence workers. Contributed by @tulir at Beeper.

View file

@ -112,6 +112,7 @@ from synapse.storage.databases.main.monthly_active_users import (
)
from synapse.storage.databases.main.presence import PresenceStore
from synapse.storage.databases.main.room import RoomWorkerStore
from synapse.storage.databases.main.room_batch import RoomBatchStore
from synapse.storage.databases.main.search import SearchStore
from synapse.storage.databases.main.session import SessionStore
from synapse.storage.databases.main.stats import StatsStore
@ -239,6 +240,7 @@ class GenericWorkerSlavedStore(
SlavedEventStore,
SlavedKeyStore,
RoomWorkerStore,
RoomBatchStore,
DirectoryStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,

View file

@ -112,6 +112,7 @@ class EventContext:
_current_state_ids = attr.ib(default=None, type=Optional[StateMap[str]])
_prev_state_ids = attr.ib(default=None, type=Optional[StateMap[str]])
_outlier = attr.ib(default=False, type=bool)
@staticmethod
def with_state(
@ -137,6 +138,7 @@ class EventContext:
return EventContext(
current_state_ids={},
prev_state_ids={},
outlier=True,
)
async def serialize(self, event: EventBase, store: "DataStore") -> JsonDict:
@ -170,6 +172,7 @@ class EventContext:
"prev_group": self.prev_group,
"delta_ids": _encode_state_dict(self.delta_ids),
"app_service_id": self.app_service.id if self.app_service else None,
"outlier": self._outlier,
}
@staticmethod
@ -198,6 +201,11 @@ class EventContext:
rejected=input["rejected"],
)
if input["outlier"]:
context._prev_state_ids = {}
context._current_state_ids = {}
context._outlier = True
app_service_id = input["app_service_id"]
if app_service_id:
context.app_service = storage.main.get_app_service_by_id(app_service_id)