diff --git a/changelog.d/4081.bugfix b/changelog.d/4081.bugfix index 13dad5884..cfe4b3e9d 100644 --- a/changelog.d/4081.bugfix +++ b/changelog.d/4081.bugfix @@ -1 +1,2 @@ -Fix race condition in populating reserved users +Fix race condition where config defined reserved users were not being added to +the monthly active user list prior to the homeserver reactor firing up diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py index cf15f8c5b..9a5c3b7ed 100644 --- a/synapse/storage/monthly_active_users.py +++ b/synapse/storage/monthly_active_users.py @@ -33,21 +33,23 @@ class MonthlyActiveUsersStore(SQLBaseStore): self._clock = hs.get_clock() self.hs = hs self.reserved_users = () - self.initialise_reserved_users( - dbconn.cursor(), hs.config.mau_limits_reserved_threepids + # Do not add more reserved users than the total allowable number + self._initialise_reserved_users( + dbconn.cursor(), + hs.config.mau_limits_reserved_threepids[:self.hs.config.max_mau_value], ) - def initialise_reserved_users(self, txn, threepids): + def _initialise_reserved_users(self, txn, threepids): """Ensures that reserved threepids are accounted for in the MAU table, should be called on start up. - Arguments: - threepids []: List of threepid dicts to reserve + Args: + txn (cursor): + threepids (list[dict]): List of threepid dicts to reserve """ reserved_user_list = [] - # Do not add more reserved users than the total allowable number - for tp in threepids[:self.hs.config.max_mau_value]: + for tp in threepids: user_id = self.get_user_id_by_threepid_txn( txn, tp["medium"], tp["address"] @@ -172,26 +174,36 @@ class MonthlyActiveUsersStore(SQLBaseStore): @defer.inlineCallbacks def upsert_monthly_active_user(self, user_id): - """Updates or inserts monthly active user member - Arguments: + """Updates or inserts the user into the monthly active user table, which + is used to track the current MAU usage of the server + + Args: user_id (str): user to add/update """ is_insert = yield self.runInteraction( "upsert_monthly_active_user", self.upsert_monthly_active_user_txn, user_id ) + # Considered pushing cache invalidation down into txn method, but + # did not because txn is not a LoggingTransaction. This means I could not + # call txn.call_after(). Therefore cache is altered in background thread + # and calls from elsewhere to user_last_seen_monthly_active and + # get_monthly_active_count fail with ValueError in + # synapse/util/caches/descriptors.py#check_thread if is_insert: self.user_last_seen_monthly_active.invalidate((user_id,)) self.get_monthly_active_count.invalidate(()) def upsert_monthly_active_user_txn(self, txn, user_id): - """ - Updates or inserts monthly active user member - Arguments: - txn (cursor): - user_id (str): user to add/update + """Updates or inserts monthly active user member + + Args: + txn (cursor): + user_id (str): user to add/update + + Returns: bool: True if a new entry was created, False if an - existing one was updated. + existing one was updated. """ # Am consciously deciding to lock the table on the basis that is ought # never be a big table and alternative approaches (batching multiple @@ -207,6 +219,7 @@ class MonthlyActiveUsersStore(SQLBaseStore): "timestamp": int(self._clock.time_msec()), }, ) + return is_insert @cached(num_args=1) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 0f970850e..80d76bf9d 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -474,6 +474,15 @@ class RegistrationStore(RegistrationWorkerStore, @defer.inlineCallbacks def get_user_id_by_threepid(self, medium, address): + """Returns user id from threepid + + Args: + medium (str): threepid medium e.g. email + address (str): threepid address e.g. me@example.com + + Returns: + Deferred[str|None]: user id or None if no user id/threepid mapping exists + """ user_id = yield self.runInteraction( "get_user_id_by_threepid", self.get_user_id_by_threepid_txn, medium, address @@ -481,6 +490,16 @@ class RegistrationStore(RegistrationWorkerStore, defer.returnValue(user_id) def get_user_id_by_threepid_txn(self, txn, medium, address): + """Returns user id from threepid + + Args: + txn (cursor): + medium (str): threepid medium e.g. email + address (str): threepid address e.g. me@example.com + + Returns: + str|None: user id or None if no user id/threepid mapping exists + """ ret = self._simple_select_one_txn( txn, "user_threepids", diff --git a/tests/storage/test_monthly_active_users.py b/tests/storage/test_monthly_active_users.py index 0c17745ae..832e379a8 100644 --- a/tests/storage/test_monthly_active_users.py +++ b/tests/storage/test_monthly_active_users.py @@ -54,7 +54,7 @@ class MonthlyActiveUsersTestCase(HomeserverTestCase): self.store.user_add_threepid(user2, "email", user2_email, now, now) self.store.runInteraction( - "initialise", self.store.initialise_reserved_users, threepids + "initialise", self.store._initialise_reserved_users, threepids ) self.pump() @@ -203,7 +203,7 @@ class MonthlyActiveUsersTestCase(HomeserverTestCase): ] self.hs.config.mau_limits_reserved_threepids = threepids self.store.runInteraction( - "initialise", self.store.initialise_reserved_users, threepids + "initialise", self.store._initialise_reserved_users, threepids ) self.pump()