diff --git a/synapse/config/server.py b/synapse/config/server.py index aa93a416f..8a55ffac4 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -19,7 +19,7 @@ import logging import os.path import re from textwrap import indent -from typing import List +from typing import List, Dict, Optional import attr import yaml @@ -287,13 +287,17 @@ class ServerConfig(Config): self.retention_default_min_lifetime = None self.retention_default_max_lifetime = None - self.retention_allowed_lifetime_min = retention_config.get("allowed_lifetime_min") + self.retention_allowed_lifetime_min = retention_config.get( + "allowed_lifetime_min" + ) if self.retention_allowed_lifetime_min is not None: self.retention_allowed_lifetime_min = self.parse_duration( self.retention_allowed_lifetime_min ) - self.retention_allowed_lifetime_max = retention_config.get("allowed_lifetime_max") + self.retention_allowed_lifetime_max = retention_config.get( + "allowed_lifetime_max" + ) if self.retention_allowed_lifetime_max is not None: self.retention_allowed_lifetime_max = self.parse_duration( self.retention_allowed_lifetime_max @@ -302,14 +306,15 @@ class ServerConfig(Config): if ( self.retention_allowed_lifetime_min is not None and self.retention_allowed_lifetime_max is not None - and self.retention_allowed_lifetime_min > self.retention_allowed_lifetime_max + and self.retention_allowed_lifetime_min + > self.retention_allowed_lifetime_max ): raise ConfigError( "Invalid retention policy limits: 'allowed_lifetime_min' can not be" " greater than 'allowed_lifetime_max'" ) - self.retention_purge_jobs = [] + self.retention_purge_jobs = [] # type: List[Dict[str, Optional[int]]] for purge_job_config in retention_config.get("purge_jobs", []): interval_config = purge_job_config.get("interval") @@ -342,18 +347,22 @@ class ServerConfig(Config): " 'longest_max_lifetime' value." ) - self.retention_purge_jobs.append({ - "interval": interval, - "shortest_max_lifetime": shortest_max_lifetime, - "longest_max_lifetime": longest_max_lifetime, - }) + self.retention_purge_jobs.append( + { + "interval": interval, + "shortest_max_lifetime": shortest_max_lifetime, + "longest_max_lifetime": longest_max_lifetime, + } + ) if not self.retention_purge_jobs: - self.retention_purge_jobs = [{ - "interval": self.parse_duration("1d"), - "shortest_max_lifetime": None, - "longest_max_lifetime": None, - }] + self.retention_purge_jobs = [ + { + "interval": self.parse_duration("1d"), + "shortest_max_lifetime": None, + "longest_max_lifetime": None, + } + ] self.listeners = [] # type: List[dict] for listener in config.get("listeners", []): diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index e1800177f..d122c11a4 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -154,20 +154,17 @@ class PaginationHandler(object): # Figure out what token we should start purging at. ts = self.clock.time_msec() - max_lifetime - stream_ordering = ( - yield self.store.find_first_stream_ordering_after_ts(ts) - ) + stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts) - r = ( - yield self.store.get_room_event_after_stream_ordering( - room_id, stream_ordering, - ) + r = yield self.store.get_room_event_after_stream_ordering( + room_id, stream_ordering, ) if not r: logger.warning( "[purge] purging events not possible: No event found " "(ts %i => stream_ordering %i)", - ts, stream_ordering, + ts, + stream_ordering, ) continue @@ -186,9 +183,7 @@ class PaginationHandler(object): # the background so that it's not blocking any other operation apart from # other purges in the same room. run_as_background_process( - "_purge_history", - self._purge_history, - purge_id, room_id, token, True, + "_purge_history", self._purge_history, purge_id, room_id, token, True, ) def start_purge_history(self, room_id, token, delete_local_events=False): diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py index 54a7d24c7..7fceae59c 100644 --- a/synapse/storage/data_stores/main/room.py +++ b/synapse/storage/data_stores/main/room.py @@ -334,8 +334,9 @@ class RoomStore(RoomWorkerStore, SearchStore): WHERE state.room_id > ? AND state.type = '%s' ORDER BY state.room_id ASC LIMIT ?; - """ % EventTypes.Retention, - (last_room, batch_size) + """ + % EventTypes.Retention, + (last_room, batch_size), ) rows = self.cursor_to_dict(txn) @@ -358,15 +359,13 @@ class RoomStore(RoomWorkerStore, SearchStore): "event_id": row["event_id"], "min_lifetime": retention_policy.get("min_lifetime"), "max_lifetime": retention_policy.get("max_lifetime"), - } + }, ) logger.info("Inserted %d rows into room_retention", len(rows)) self._background_update_progress_txn( - txn, "insert_room_retention", { - "room_id": rows[-1]["room_id"], - } + txn, "insert_room_retention", {"room_id": rows[-1]["room_id"]} ) if batch_size > len(rows): @@ -375,8 +374,7 @@ class RoomStore(RoomWorkerStore, SearchStore): return False end = yield self.runInteraction( - "insert_room_retention", - _background_insert_retention_txn, + "insert_room_retention", _background_insert_retention_txn, ) if end: @@ -585,17 +583,15 @@ class RoomStore(RoomWorkerStore, SearchStore): ) def _store_retention_policy_for_room_txn(self, txn, event): - if ( - hasattr(event, "content") - and ("min_lifetime" in event.content or "max_lifetime" in event.content) + if hasattr(event, "content") and ( + "min_lifetime" in event.content or "max_lifetime" in event.content ): if ( - ("min_lifetime" in event.content and not isinstance( - event.content.get("min_lifetime"), integer_types - )) - or ("max_lifetime" in event.content and not isinstance( - event.content.get("max_lifetime"), integer_types - )) + "min_lifetime" in event.content + and not isinstance(event.content.get("min_lifetime"), integer_types) + ) or ( + "max_lifetime" in event.content + and not isinstance(event.content.get("max_lifetime"), integer_types) ): # Ignore the event if one of the value isn't an integer. return @@ -798,7 +794,9 @@ class RoomStore(RoomWorkerStore, SearchStore): return local_media_mxcs, remote_media_mxcs @defer.inlineCallbacks - def get_rooms_for_retention_period_in_range(self, min_ms, max_ms, include_null=False): + def get_rooms_for_retention_period_in_range( + self, min_ms, max_ms, include_null=False + ): """Retrieves all of the rooms within the given retention range. Optionally includes the rooms which don't have a retention policy. @@ -904,23 +902,24 @@ class RoomStore(RoomWorkerStore, SearchStore): INNER JOIN current_state_events USING (event_id, room_id) WHERE room_id = ?; """, - (room_id,) + (room_id,), ) return self.cursor_to_dict(txn) ret = yield self.runInteraction( - "get_retention_policy_for_room", - get_retention_policy_for_room_txn, + "get_retention_policy_for_room", get_retention_policy_for_room_txn, ) # If we don't know this room ID, ret will be None, in this case return the default # policy. if not ret: - defer.returnValue({ - "min_lifetime": self.config.retention_default_min_lifetime, - "max_lifetime": self.config.retention_default_max_lifetime, - }) + defer.returnValue( + { + "min_lifetime": self.config.retention_default_min_lifetime, + "max_lifetime": self.config.retention_default_max_lifetime, + } + ) row = ret[0] diff --git a/tests/rest/client/test_retention.py b/tests/rest/client/test_retention.py index 7b6f25a83..6bf485c23 100644 --- a/tests/rest/client/test_retention.py +++ b/tests/rest/client/test_retention.py @@ -61,9 +61,7 @@ class RetentionTestCase(unittest.HomeserverTestCase): self.helper.send_state( room_id=room_id, event_type=EventTypes.Retention, - body={ - "max_lifetime": one_day_ms * 4, - }, + body={"max_lifetime": one_day_ms * 4}, tok=self.token, expect_code=400, ) @@ -71,9 +69,7 @@ class RetentionTestCase(unittest.HomeserverTestCase): self.helper.send_state( room_id=room_id, event_type=EventTypes.Retention, - body={ - "max_lifetime": one_hour_ms, - }, + body={"max_lifetime": one_hour_ms}, tok=self.token, expect_code=400, ) @@ -89,9 +85,7 @@ class RetentionTestCase(unittest.HomeserverTestCase): self.helper.send_state( room_id=room_id, event_type=EventTypes.Retention, - body={ - "max_lifetime": lifetime, - }, + body={"max_lifetime": lifetime}, tok=self.token, ) @@ -115,20 +109,12 @@ class RetentionTestCase(unittest.HomeserverTestCase): events = [] # Send a first event, which should be filtered out at the end of the test. - resp = self.helper.send( - room_id=room_id, - body="1", - tok=self.token, - ) + resp = self.helper.send(room_id=room_id, body="1", tok=self.token) # Get the event from the store so that we end up with a FrozenEvent that we can # give to filter_events_for_client. We need to do this now because the event won't # be in the database anymore after it has expired. - events.append(self.get_success( - store.get_event( - resp.get("event_id") - ) - )) + events.append(self.get_success(store.get_event(resp.get("event_id")))) # Advance the time by 2 days. We're using the default retention policy, therefore # after this the first event will still be valid. @@ -143,20 +129,16 @@ class RetentionTestCase(unittest.HomeserverTestCase): valid_event_id = resp.get("event_id") - events.append(self.get_success( - store.get_event( - valid_event_id - ) - )) + events.append(self.get_success(store.get_event(valid_event_id))) # Advance the time by anothe 2 days. After this, the first event should be # outdated but not the second one. self.reactor.advance(one_day_ms * 2 / 1000) # Run filter_events_for_client with our list of FrozenEvents. - filtered_events = self.get_success(filter_events_for_client( - storage, self.user_id, events - )) + filtered_events = self.get_success( + filter_events_for_client(storage, self.user_id, events) + ) # We should only get one event back. self.assertEqual(len(filtered_events), 1, filtered_events) @@ -172,28 +154,22 @@ class RetentionTestCase(unittest.HomeserverTestCase): # Send a first event to the room. This is the event we'll want to be purged at the # end of the test. - resp = self.helper.send( - room_id=room_id, - body="1", - tok=self.token, - ) + resp = self.helper.send(room_id=room_id, body="1", tok=self.token) expired_event_id = resp.get("event_id") # Check that we can retrieve the event. expired_event = self.get_event(room_id, expired_event_id) - self.assertEqual(expired_event.get("content", {}).get("body"), "1", expired_event) + self.assertEqual( + expired_event.get("content", {}).get("body"), "1", expired_event + ) # Advance the time. self.reactor.advance(increment / 1000) # Send another event. We need this because the purge job won't purge the most # recent event in the room. - resp = self.helper.send( - room_id=room_id, - body="2", - tok=self.token, - ) + resp = self.helper.send(room_id=room_id, body="2", tok=self.token) valid_event_id = resp.get("event_id") @@ -240,8 +216,7 @@ class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase): mock_federation_client = Mock(spec=["backfill"]) self.hs = self.setup_test_homeserver( - config=config, - federation_client=mock_federation_client, + config=config, federation_client=mock_federation_client, ) return self.hs @@ -268,9 +243,7 @@ class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase): self.helper.send_state( room_id=room_id, event_type=EventTypes.Retention, - body={ - "max_lifetime": one_day_ms * 35, - }, + body={"max_lifetime": one_day_ms * 35}, tok=self.token, ) @@ -289,18 +262,16 @@ class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase): # Check that we can retrieve the event. expired_event = self.get_event(room_id, first_event_id) - self.assertEqual(expired_event.get("content", {}).get("body"), "1", expired_event) + self.assertEqual( + expired_event.get("content", {}).get("body"), "1", expired_event + ) # Advance the time by a month. self.reactor.advance(one_day_ms * 30 / 1000) # Send another event. We need this because the purge job won't purge the most # recent event in the room. - resp = self.helper.send( - room_id=room_id, - body="2", - tok=self.token, - ) + resp = self.helper.send(room_id=room_id, body="2", tok=self.token) second_event_id = resp.get("event_id") @@ -313,7 +284,9 @@ class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase): ) if expected_code_for_first_event == 200: - self.assertEqual(first_event.get("content", {}).get("body"), "1", first_event) + self.assertEqual( + first_event.get("content", {}).get("body"), "1", first_event + ) # Check that the event that hasn't been purged can still be retrieved. second_event = self.get_event(room_id, second_event_id)