Refactor EventPersistenceQueue (#10145)

some cleanup, pulled out of #10134.
This commit is contained in:
Richard van der Hoff 2021-06-14 11:59:27 +01:00 committed by GitHub
parent d7808a2dde
commit 1dfdc87b9b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 89 additions and 77 deletions

1
changelog.d/10145.misc Normal file
View file

@ -0,0 +1 @@
Refactor EventPersistenceQueue.

View file

@ -16,9 +16,23 @@
import itertools
import logging
from collections import deque, namedtuple
from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple
from collections import deque
from typing import (
Awaitable,
Callable,
Collection,
Deque,
Dict,
Generic,
Iterable,
List,
Optional,
Set,
Tuple,
TypeVar,
)
import attr
from prometheus_client import Counter, Histogram
from twisted.internet import defer
@ -37,7 +51,7 @@ from synapse.types import (
StateMap,
get_domain_from_id,
)
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.async_helpers import ObservableDeferred, yieldable_gather_results
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@ -89,25 +103,47 @@ times_pruned_extremities = Counter(
)
class _EventPeristenceQueue:
@attr.s(auto_attribs=True, frozen=True, slots=True)
class _EventPersistQueueItem:
events_and_contexts: List[Tuple[EventBase, EventContext]]
backfilled: bool
deferred: ObservableDeferred
_PersistResult = TypeVar("_PersistResult")
class _EventPeristenceQueue(Generic[_PersistResult]):
"""Queues up events so that they can be persisted in bulk with only one
concurrent transaction per room.
"""
_EventPersistQueueItem = namedtuple(
"_EventPersistQueueItem", ("events_and_contexts", "backfilled", "deferred")
)
def __init__(
self,
per_item_callback: Callable[
[List[Tuple[EventBase, EventContext]], bool],
Awaitable[_PersistResult],
],
):
"""Create a new event persistence queue
def __init__(self):
self._event_persist_queues = {}
self._currently_persisting_rooms = set()
The per_item_callback will be called for each item added via add_to_queue,
and its result will be returned via the Deferreds returned from add_to_queue.
"""
self._event_persist_queues: Dict[str, Deque[_EventPersistQueueItem]] = {}
self._currently_persisting_rooms: Set[str] = set()
self._per_item_callback = per_item_callback
def add_to_queue(self, room_id, events_and_contexts, backfilled):
async def add_to_queue(
self,
room_id: str,
events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
backfilled: bool,
) -> _PersistResult:
"""Add events to the queue, with the given persist_event options.
NB: due to the normal usage pattern of this method, it does *not*
follow the synapse logcontext rules, and leaves the logcontext in
place whether or not the returned deferred is ready.
If we are not already processing events in this room, starts off a background
process to to so, calling the per_item_callback for each item.
Args:
room_id (str):
@ -115,38 +151,36 @@ class _EventPeristenceQueue:
backfilled (bool):
Returns:
defer.Deferred: a deferred which will resolve once the events are
persisted. Runs its callbacks *without* a logcontext. The result
is the same as that returned by the callback passed to
`handle_queue`.
the result returned by the `_per_item_callback` passed to
`__init__`.
"""
queue = self._event_persist_queues.setdefault(room_id, deque())
if queue:
# if the last item in the queue has the same `backfilled` setting,
# we can just add these new events to that item.
if queue and queue[-1].backfilled == backfilled:
end_item = queue[-1]
if end_item.backfilled == backfilled:
end_item.events_and_contexts.extend(events_and_contexts)
return end_item.deferred.observe()
else:
# need to make a new queue item
deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True)
queue.append(
self._EventPersistQueueItem(
events_and_contexts=events_and_contexts,
end_item = _EventPersistQueueItem(
events_and_contexts=[],
backfilled=backfilled,
deferred=deferred,
)
)
queue.append(end_item)
return deferred.observe()
end_item.events_and_contexts.extend(events_and_contexts)
self._handle_queue(room_id)
return await make_deferred_yieldable(end_item.deferred.observe())
def handle_queue(self, room_id, per_item_callback):
def _handle_queue(self, room_id):
"""Attempts to handle the queue for a room if not already being handled.
The given callback will be invoked with for each item in the queue,
The queue's callback will be invoked with for each item in the queue,
of type _EventPersistQueueItem. The per_item_callback will continuously
be called with new items, unless the queue becomnes empty. The return
be called with new items, unless the queue becomes empty. The return
value of the function will be given to the deferreds waiting on the item,
exceptions will be passed to the deferreds as well.
@ -156,7 +190,6 @@ class _EventPeristenceQueue:
If another callback is currently handling the queue then it will not be
invoked.
"""
if room_id in self._currently_persisting_rooms:
return
@ -167,7 +200,9 @@ class _EventPeristenceQueue:
queue = self._get_drainining_queue(room_id)
for item in queue:
try:
ret = await per_item_callback(item)
ret = await self._per_item_callback(
item.events_and_contexts, item.backfilled
)
except Exception:
with PreserveLoggingContext():
item.deferred.errback()
@ -214,7 +249,7 @@ class EventsPersistenceStorage:
self._clock = hs.get_clock()
self._instance_name = hs.get_instance_name()
self.is_mine_id = hs.is_mine_id
self._event_persist_queue = _EventPeristenceQueue()
self._event_persist_queue = _EventPeristenceQueue(self._persist_event_batch)
self._state_resolution_handler = hs.get_state_resolution_handler()
async def persist_events(
@ -241,26 +276,21 @@ class EventsPersistenceStorage:
for event, ctx in events_and_contexts:
partitioned.setdefault(event.room_id, []).append((event, ctx))
deferreds = []
for room_id, evs_ctxs in partitioned.items():
d = self._event_persist_queue.add_to_queue(
async def enqueue(item):
room_id, evs_ctxs = item
return await self._event_persist_queue.add_to_queue(
room_id, evs_ctxs, backfilled=backfilled
)
deferreds.append(d)
for room_id in partitioned:
self._maybe_start_persisting(room_id)
ret_vals = await yieldable_gather_results(enqueue, partitioned.items())
# Each deferred returns a map from event ID to existing event ID if the
# event was deduplicated. (The dict may also include other entries if
# Each call to add_to_queue returns a map from event ID to existing event ID if
# the event was deduplicated. (The dict may also include other entries if
# the event was persisted in a batch with other events).
#
# Since we use `defer.gatherResults` we need to merge the returned list
# Since we use `yieldable_gather_results` we need to merge the returned list
# of dicts into one.
ret_vals = await make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True)
)
replaced_events = {}
replaced_events: Dict[str, str] = {}
for d in ret_vals:
replaced_events.update(d)
@ -287,16 +317,12 @@ class EventsPersistenceStorage:
event if it was deduplicated due to an existing event matching the
transaction ID.
"""
deferred = self._event_persist_queue.add_to_queue(
event.room_id, [(event, context)], backfilled=backfilled
)
self._maybe_start_persisting(event.room_id)
# The deferred returns a map from event ID to existing event ID if the
# add_to_queue returns a map from event ID to existing event ID if the
# event was deduplicated. (The dict may also include other entries if
# the event was persisted in a batch with other events.)
replaced_events = await make_deferred_yieldable(deferred)
replaced_events = await self._event_persist_queue.add_to_queue(
event.room_id, [(event, context)], backfilled=backfilled
)
replaced_event = replaced_events.get(event.event_id)
if replaced_event:
event = await self.main_store.get_event(replaced_event)
@ -308,29 +334,14 @@ class EventsPersistenceStorage:
pos = PersistedEventPosition(self._instance_name, event_stream_id)
return event, pos, self.main_store.get_room_max_token()
def _maybe_start_persisting(self, room_id: str):
"""Pokes the `_event_persist_queue` to start handling new items in the
queue, if not already in progress.
Causes the deferreds returned by `add_to_queue` to resolve with: a
dictionary of event ID to event ID we didn't persist as we already had
another event persisted with the same TXN ID.
"""
async def persisting_queue(item):
with Measure(self._clock, "persist_events"):
return await self._persist_events(
item.events_and_contexts, backfilled=item.backfilled
)
self._event_persist_queue.handle_queue(room_id, persisting_queue)
async def _persist_events(
async def _persist_event_batch(
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool = False,
) -> Dict[str, str]:
"""Calculates the change to current state and forward extremities, and
"""Callback for the _event_persist_queue
Calculates the change to current state and forward extremities, and
persists the given events and with those updates.
Returns: