diff --git a/changelog.d/9650.misc b/changelog.d/9650.misc new file mode 100644 index 000000000..d830ead70 --- /dev/null +++ b/changelog.d/9650.misc @@ -0,0 +1 @@ +Add a storage method for pulling all current user presence state from the database. \ No newline at end of file diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 5b0b9a20b..94590e7b4 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1906,6 +1906,7 @@ class DatabasePool: retcols: Iterable[str], filters: Optional[Dict[str, Any]] = None, keyvalues: Optional[Dict[str, Any]] = None, + exclude_keyvalues: Optional[Dict[str, Any]] = None, order_direction: str = "ASC", ) -> List[Dict[str, Any]]: """ @@ -1929,7 +1930,10 @@ class DatabasePool: apply a WHERE ? LIKE ? clause. keyvalues: column names and values to select the rows with, or None to not - apply a WHERE clause. + apply a WHERE key = value clause. + exclude_keyvalues: + column names and values to exclude rows with, or None to not + apply a WHERE key != value clause. order_direction: Whether the results should be ordered "ASC" or "DESC". Returns: @@ -1938,7 +1942,7 @@ class DatabasePool: if order_direction not in ["ASC", "DESC"]: raise ValueError("order_direction must be one of 'ASC' or 'DESC'.") - where_clause = "WHERE " if filters or keyvalues else "" + where_clause = "WHERE " if filters or keyvalues or exclude_keyvalues else "" arg_list = [] # type: List[Any] if filters: where_clause += " AND ".join("%s LIKE ?" % (k,) for k in filters) @@ -1947,6 +1951,9 @@ class DatabasePool: if keyvalues: where_clause += " AND ".join("%s = ?" % (k,) for k in keyvalues) arg_list += list(keyvalues.values()) + if exclude_keyvalues: + where_clause += " AND ".join("%s != ?" % (k,) for k in exclude_keyvalues) + arg_list += list(exclude_keyvalues.values()) sql = "SELECT %s FROM %s %s ORDER BY %s %s LIMIT ? OFFSET ?" % ( ", ".join(retcols), diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 29edab34d..0ff693a31 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List, Tuple +from typing import Dict, List, Tuple from synapse.api.presence import UserPresenceState from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause @@ -157,5 +157,63 @@ class PresenceStore(SQLBaseStore): return {row["user_id"]: UserPresenceState(**row) for row in rows} + async def get_presence_for_all_users( + self, + include_offline: bool = True, + ) -> Dict[str, UserPresenceState]: + """Retrieve the current presence state for all users. + + Note that the presence_stream table is culled frequently, so it should only + contain the latest presence state for each user. + + Args: + include_offline: Whether to include offline presence states + + Returns: + A dict of user IDs to their current UserPresenceState. + """ + users_to_state = {} + + exclude_keyvalues = None + if not include_offline: + # Exclude offline presence state + exclude_keyvalues = {"state": "offline"} + + # This may be a very heavy database query. + # We paginate in order to not block a database connection. + limit = 100 + offset = 0 + while True: + rows = await self.db_pool.runInteraction( + "get_presence_for_all_users", + self.db_pool.simple_select_list_paginate_txn, + "presence_stream", + orderby="stream_id", + start=offset, + limit=limit, + exclude_keyvalues=exclude_keyvalues, + retcols=( + "user_id", + "state", + "last_active_ts", + "last_federation_update_ts", + "last_user_sync_ts", + "status_msg", + "currently_active", + ), + order_direction="ASC", + ) + + for row in rows: + users_to_state[row["user_id"]] = UserPresenceState(**row) + + # We've run out of updates to query + if len(rows) < limit: + break + + offset += limit + + return users_to_state + def get_current_presence_token(self): return self._presence_id_gen.get_current_token()