mirror of
https://mau.dev/maunium/synapse.git
synced 2025-01-04 11:24:03 +01:00
Skip processing stats for broken rooms. (#14873)
* Skip processing stats for broken rooms. * Newsfragment * Use a custom exception.
This commit is contained in:
parent
2ec9c58496
commit
82d3efa312
4 changed files with 72 additions and 36 deletions
1
changelog.d/14873.bugfix
Normal file
1
changelog.d/14873.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix a long-standing bug where the `populate_room_stats` background job could fail on broken rooms.
|
|
@ -110,6 +110,10 @@ event_fetch_ongoing_gauge = Gauge(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidEventError(Exception):
|
||||||
|
"""The event retrieved from the database is invalid and cannot be used."""
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True, auto_attribs=True)
|
@attr.s(slots=True, auto_attribs=True)
|
||||||
class EventCacheEntry:
|
class EventCacheEntry:
|
||||||
event: EventBase
|
event: EventBase
|
||||||
|
@ -1310,7 +1314,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||||
# invites, so just accept it for all membership events.
|
# invites, so just accept it for all membership events.
|
||||||
#
|
#
|
||||||
if d["type"] != EventTypes.Member:
|
if d["type"] != EventTypes.Member:
|
||||||
raise Exception(
|
raise InvalidEventError(
|
||||||
"Room %s for event %s is unknown" % (d["room_id"], event_id)
|
"Room %s for event %s is unknown" % (d["room_id"], event_id)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -29,6 +29,7 @@ from synapse.storage.database import (
|
||||||
LoggingDatabaseConnection,
|
LoggingDatabaseConnection,
|
||||||
LoggingTransaction,
|
LoggingTransaction,
|
||||||
)
|
)
|
||||||
|
from synapse.storage.databases.main.events_worker import InvalidEventError
|
||||||
from synapse.storage.databases.main.state_deltas import StateDeltasStore
|
from synapse.storage.databases.main.state_deltas import StateDeltasStore
|
||||||
from synapse.types import JsonDict
|
from synapse.types import JsonDict
|
||||||
from synapse.util.caches.descriptors import cached
|
from synapse.util.caches.descriptors import cached
|
||||||
|
@ -554,7 +555,17 @@ class StatsStore(StateDeltasStore):
|
||||||
"get_initial_state_for_room", _fetch_current_state_stats
|
"get_initial_state_for_room", _fetch_current_state_stats
|
||||||
)
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
state_event_map = await self.get_events(event_ids, get_prev_content=False) # type: ignore[attr-defined]
|
state_event_map = await self.get_events(event_ids, get_prev_content=False) # type: ignore[attr-defined]
|
||||||
|
except InvalidEventError as e:
|
||||||
|
# If an exception occurs fetching events then the room is broken;
|
||||||
|
# skip process it to avoid being stuck on a room.
|
||||||
|
logger.warning(
|
||||||
|
"Failed to fetch events for room %s, skipping stats calculation: %r.",
|
||||||
|
room_id,
|
||||||
|
e,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
room_state: Dict[str, Union[None, bool, str]] = {
|
room_state: Dict[str, Union[None, bool, str]] = {
|
||||||
"join_rules": None,
|
"join_rules": None,
|
||||||
|
|
|
@ -40,9 +40,23 @@ class RoomBackgroundUpdateStoreTestCase(HomeserverTestCase):
|
||||||
self.token = self.login("foo", "pass")
|
self.token = self.login("foo", "pass")
|
||||||
|
|
||||||
def _generate_room(self) -> str:
|
def _generate_room(self) -> str:
|
||||||
room_id = self.helper.create_room_as(self.user_id, tok=self.token)
|
"""Create a room and return the room ID."""
|
||||||
|
return self.helper.create_room_as(self.user_id, tok=self.token)
|
||||||
|
|
||||||
return room_id
|
def run_background_updates(self, update_name: str) -> None:
|
||||||
|
"""Insert and run the background update."""
|
||||||
|
self.get_success(
|
||||||
|
self.store.db_pool.simple_insert(
|
||||||
|
"background_updates",
|
||||||
|
{"update_name": update_name, "progress_json": "{}"},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# ... and tell the DataStore that it hasn't finished all updates yet
|
||||||
|
self.store.db_pool.updates._all_done = False
|
||||||
|
|
||||||
|
# Now let's actually drive the updates to completion
|
||||||
|
self.wait_for_background_updates()
|
||||||
|
|
||||||
def test_background_populate_rooms_creator_column(self) -> None:
|
def test_background_populate_rooms_creator_column(self) -> None:
|
||||||
"""Test that the background update to populate the rooms creator column
|
"""Test that the background update to populate the rooms creator column
|
||||||
|
@ -71,22 +85,7 @@ class RoomBackgroundUpdateStoreTestCase(HomeserverTestCase):
|
||||||
)
|
)
|
||||||
self.assertEqual(room_creator_before, None)
|
self.assertEqual(room_creator_before, None)
|
||||||
|
|
||||||
# Insert and run the background update.
|
self.run_background_updates(_BackgroundUpdates.POPULATE_ROOMS_CREATOR_COLUMN)
|
||||||
self.get_success(
|
|
||||||
self.store.db_pool.simple_insert(
|
|
||||||
"background_updates",
|
|
||||||
{
|
|
||||||
"update_name": _BackgroundUpdates.POPULATE_ROOMS_CREATOR_COLUMN,
|
|
||||||
"progress_json": "{}",
|
|
||||||
},
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
# ... and tell the DataStore that it hasn't finished all updates yet
|
|
||||||
self.store.db_pool.updates._all_done = False
|
|
||||||
|
|
||||||
# Now let's actually drive the updates to completion
|
|
||||||
self.wait_for_background_updates()
|
|
||||||
|
|
||||||
# Make sure the background update filled in the room creator
|
# Make sure the background update filled in the room creator
|
||||||
room_creator_after = self.get_success(
|
room_creator_after = self.get_success(
|
||||||
|
@ -137,22 +136,7 @@ class RoomBackgroundUpdateStoreTestCase(HomeserverTestCase):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Insert and run the background update
|
self.run_background_updates(_BackgroundUpdates.ADD_ROOM_TYPE_COLUMN)
|
||||||
self.get_success(
|
|
||||||
self.store.db_pool.simple_insert(
|
|
||||||
"background_updates",
|
|
||||||
{
|
|
||||||
"update_name": _BackgroundUpdates.ADD_ROOM_TYPE_COLUMN,
|
|
||||||
"progress_json": "{}",
|
|
||||||
},
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
# ... and tell the DataStore that it hasn't finished all updates yet
|
|
||||||
self.store.db_pool.updates._all_done = False
|
|
||||||
|
|
||||||
# Now let's actually drive the updates to completion
|
|
||||||
self.wait_for_background_updates()
|
|
||||||
|
|
||||||
# Make sure the background update filled in the room type
|
# Make sure the background update filled in the room type
|
||||||
room_type_after = self.get_success(
|
room_type_after = self.get_success(
|
||||||
|
@ -164,3 +148,39 @@ class RoomBackgroundUpdateStoreTestCase(HomeserverTestCase):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
self.assertEqual(room_type_after, RoomTypes.SPACE)
|
self.assertEqual(room_type_after, RoomTypes.SPACE)
|
||||||
|
|
||||||
|
def test_populate_stats_broken_rooms(self) -> None:
|
||||||
|
"""Ensure that re-populating room stats skips broken rooms."""
|
||||||
|
|
||||||
|
# Create a good room.
|
||||||
|
good_room_id = self._generate_room()
|
||||||
|
|
||||||
|
# Create a room and then break it by having no room version.
|
||||||
|
room_id = self._generate_room()
|
||||||
|
self.get_success(
|
||||||
|
self.store.db_pool.simple_update(
|
||||||
|
table="rooms",
|
||||||
|
keyvalues={"room_id": room_id},
|
||||||
|
updatevalues={"room_version": None},
|
||||||
|
desc="test",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Nuke any current stats in the database.
|
||||||
|
self.get_success(
|
||||||
|
self.store.db_pool.simple_delete(
|
||||||
|
table="room_stats_state", keyvalues={"1": 1}, desc="test"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
self.run_background_updates("populate_stats_process_rooms")
|
||||||
|
|
||||||
|
# Only the good room appears in the stats tables.
|
||||||
|
results = self.get_success(
|
||||||
|
self.store.db_pool.simple_select_onecol(
|
||||||
|
table="room_stats_state",
|
||||||
|
keyvalues={},
|
||||||
|
retcol="room_id",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.assertEqual(results, [good_room_id])
|
||||||
|
|
Loading…
Reference in a new issue