0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-11-15 06:21:49 +01:00

Run black on user directory code (#4635)

This commit is contained in:
Amber Brown 2019-02-13 23:05:32 +11:00 committed by GitHub
parent 19818d66af
commit bb4fd8f927
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 117 additions and 131 deletions

1
changelog.d/4635.misc Normal file
View file

@ -0,0 +1 @@
Run `black` to reformat user directory code.

View file

@ -130,7 +130,7 @@ class UserDirectoryHandler(object):
# Support users are for diagnostics and should not appear in the user directory.
if not is_support:
yield self.store.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url, None,
user_id, profile.display_name, profile.avatar_url, None
)
@defer.inlineCallbacks
@ -166,8 +166,9 @@ class UserDirectoryHandler(object):
self.pos = deltas[-1]["stream_id"]
# Expose current event processing position to prometheus
synapse.metrics.event_processing_positions.labels(
"user_dir").set(self.pos)
synapse.metrics.event_processing_positions.labels("user_dir").set(
self.pos
)
yield self.store.update_user_directory_stream_pos(self.pos)
@ -191,21 +192,25 @@ class UserDirectoryHandler(object):
logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
yield self._handle_initial_room(room_id)
num_processed_rooms += 1
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
logger.info("Processed all rooms.")
if self.search_all_users:
num_processed_users = 0
user_ids = yield self.store.get_all_local_users()
logger.info("Doing initial update of user directory. %d users", len(user_ids))
logger.info(
"Doing initial update of user directory. %d users", len(user_ids)
)
for user_id in user_ids:
# We add profiles for all users even if they don't match the
# include pattern, just in case we want to change it in future
logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids))
logger.info(
"Handling user %d/%d", num_processed_users + 1, len(user_ids)
)
yield self._handle_local_user(user_id)
num_processed_users += 1
yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.0)
logger.info("Processed all users")
@ -224,24 +229,24 @@ class UserDirectoryHandler(object):
if not is_in_room:
return
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id)
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
room_id
)
users_with_profile = yield self.state.get_current_user_in_room(room_id)
user_ids = set(users_with_profile)
unhandled_users = user_ids - self.initially_handled_users
yield self.store.add_profiles_to_user_dir(
room_id, {
user_id: users_with_profile[user_id] for user_id in unhandled_users
}
room_id,
{user_id: users_with_profile[user_id] for user_id in unhandled_users},
)
self.initially_handled_users |= unhandled_users
if is_public:
yield self.store.add_users_to_public_room(
room_id,
user_ids=user_ids - self.initially_handled_users_in_public
room_id, user_ids=user_ids - self.initially_handled_users_in_public
)
self.initially_handled_users_in_public |= user_ids
@ -253,7 +258,7 @@ class UserDirectoryHandler(object):
count = 0
for user_id in user_ids:
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
if not self.is_mine_id(user_id):
count += 1
@ -268,7 +273,7 @@ class UserDirectoryHandler(object):
continue
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
count += 1
user_set = (user_id, other_user_id)
@ -290,25 +295,23 @@ class UserDirectoryHandler(object):
if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
yield self.store.add_users_who_share_room(
room_id, not is_public, to_insert,
room_id, not is_public, to_insert
)
to_insert.clear()
if len(to_update) > self.INITIAL_ROOM_BATCH_SIZE:
yield self.store.update_users_who_share_room(
room_id, not is_public, to_update,
room_id, not is_public, to_update
)
to_update.clear()
if to_insert:
yield self.store.add_users_who_share_room(
room_id, not is_public, to_insert,
)
yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
to_insert.clear()
if to_update:
yield self.store.update_users_who_share_room(
room_id, not is_public, to_update,
room_id, not is_public, to_update
)
to_update.clear()
@ -329,11 +332,12 @@ class UserDirectoryHandler(object):
# may have become public or not and add/remove the users in said room
if typ in (EventTypes.RoomHistoryVisibility, EventTypes.JoinRules):
yield self._handle_room_publicity_change(
room_id, prev_event_id, event_id, typ,
room_id, prev_event_id, event_id, typ
)
elif typ == EventTypes.Member:
change = yield self._get_key_change(
prev_event_id, event_id,
prev_event_id,
event_id,
key_name="membership",
public_value=Membership.JOIN,
)
@ -342,14 +346,16 @@ class UserDirectoryHandler(object):
# Need to check if the server left the room entirely, if so
# we might need to remove all the users in that room
is_in_room = yield self.store.is_host_joined(
room_id, self.server_name,
room_id, self.server_name
)
if not is_in_room:
logger.info("Server left room: %r", room_id)
# Fetch all the users that we marked as being in user
# directory due to being in the room and then check if
# need to remove those users or not
user_ids = yield self.store.get_users_in_dir_due_to_room(room_id)
user_ids = yield self.store.get_users_in_dir_due_to_room(
room_id
)
for user_id in user_ids:
yield self._handle_remove_user(room_id, user_id)
return
@ -361,7 +367,7 @@ class UserDirectoryHandler(object):
if change is None:
# Handle any profile changes
yield self._handle_profile_change(
state_key, room_id, prev_event_id, event_id,
state_key, room_id, prev_event_id, event_id
)
continue
@ -393,13 +399,15 @@ class UserDirectoryHandler(object):
if typ == EventTypes.RoomHistoryVisibility:
change = yield self._get_key_change(
prev_event_id, event_id,
prev_event_id,
event_id,
key_name="history_visibility",
public_value="world_readable",
)
elif typ == EventTypes.JoinRules:
change = yield self._get_key_change(
prev_event_id, event_id,
prev_event_id,
event_id,
key_name="join_rule",
public_value=JoinRules.PUBLIC,
)
@ -524,7 +532,7 @@ class UserDirectoryHandler(object):
)
if self.is_mine_id(other_user_id) and not is_appservice:
shared_is_private = yield self.store.get_if_users_share_a_room(
other_user_id, user_id,
other_user_id, user_id
)
if shared_is_private is True:
# We've already marked in the database they share a private room
@ -539,13 +547,11 @@ class UserDirectoryHandler(object):
to_insert.add((other_user_id, user_id))
if to_insert:
yield self.store.add_users_who_share_room(
room_id, not is_public, to_insert,
)
yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
if to_update:
yield self.store.update_users_who_share_room(
room_id, not is_public, to_update,
room_id, not is_public, to_update
)
@defer.inlineCallbacks
@ -564,15 +570,15 @@ class UserDirectoryHandler(object):
row = yield self.store.get_user_in_public_room(user_id)
update_user_in_public = row and row["room_id"] == room_id
if (update_user_in_public or update_user_dir):
if update_user_in_public or update_user_dir:
# XXX: Make this faster?
rooms = yield self.store.get_rooms_for_user(user_id)
for j_room_id in rooms:
if (not update_user_in_public and not update_user_dir):
if not update_user_in_public and not update_user_dir:
break
is_in_room = yield self.store.is_host_joined(
j_room_id, self.server_name,
j_room_id, self.server_name
)
if not is_in_room:
@ -600,19 +606,19 @@ class UserDirectoryHandler(object):
# Get a list of user tuples that were in the DB due to this room and
# users (this includes tuples where the other user matches `user_id`)
user_tuples = yield self.store.get_users_in_share_dir_with_room_id(
user_id, room_id,
user_id, room_id
)
for user_id, other_user_id in user_tuples:
# For each user tuple get a list of rooms that they still share,
# trying to find a private room, and update the entry in the DB
rooms = yield self.store.get_rooms_in_common_for_users(user_id, other_user_id)
rooms = yield self.store.get_rooms_in_common_for_users(
user_id, other_user_id
)
# If they dont share a room anymore, remove the mapping
if not rooms:
yield self.store.remove_user_who_share_room(
user_id, other_user_id,
)
yield self.store.remove_user_who_share_room(user_id, other_user_id)
continue
found_public_share = None
@ -626,13 +632,13 @@ class UserDirectoryHandler(object):
else:
found_public_share = None
yield self.store.update_users_who_share_room(
room_id, not is_public, [(user_id, other_user_id)],
room_id, not is_public, [(user_id, other_user_id)]
)
break
if found_public_share:
yield self.store.update_users_who_share_room(
room_id, not is_public, [(user_id, other_user_id)],
room_id, not is_public, [(user_id, other_user_id)]
)
@defer.inlineCallbacks
@ -660,7 +666,7 @@ class UserDirectoryHandler(object):
if prev_name != new_name or prev_avatar != new_avatar:
yield self.store.update_profile_in_user_dir(
user_id, new_name, new_avatar, room_id,
user_id, new_name, new_avatar, room_id
)
@defer.inlineCallbacks

View file

@ -44,7 +44,7 @@ class UserDirectoryStore(SQLBaseStore):
)
current_state_ids = yield self.get_filtered_current_state_ids(
room_id, StateFilter.from_types(types_to_filter),
room_id, StateFilter.from_types(types_to_filter)
)
join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
@ -74,14 +74,8 @@ class UserDirectoryStore(SQLBaseStore):
"""
yield self._simple_insert_many(
table="users_in_public_rooms",
values=[
{
"user_id": user_id,
"room_id": room_id,
}
for user_id in user_ids
],
desc="add_users_to_public_room"
values=[{"user_id": user_id, "room_id": room_id} for user_id in user_ids],
desc="add_users_to_public_room",
)
for user_id in user_ids:
self.get_user_in_public_room.invalidate((user_id,))
@ -107,7 +101,9 @@ class UserDirectoryStore(SQLBaseStore):
"""
args = (
(
user_id, get_localpart_from_id(user_id), get_domain_from_id(user_id),
user_id,
get_localpart_from_id(user_id),
get_domain_from_id(user_id),
profile.display_name,
)
for user_id, profile in iteritems(users_with_profile)
@ -120,7 +116,7 @@ class UserDirectoryStore(SQLBaseStore):
args = (
(
user_id,
"%s %s" % (user_id, p.display_name,) if p.display_name else user_id
"%s %s" % (user_id, p.display_name) if p.display_name else user_id,
)
for user_id, p in iteritems(users_with_profile)
)
@ -141,12 +137,10 @@ class UserDirectoryStore(SQLBaseStore):
"avatar_url": profile.avatar_url,
}
for user_id, profile in iteritems(users_with_profile)
]
],
)
for user_id in users_with_profile:
txn.call_after(
self.get_user_in_directory.invalidate, (user_id,)
)
txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
return self.runInteraction(
"add_profiles_to_user_dir", _add_profiles_to_user_dir_txn
@ -188,9 +182,11 @@ class UserDirectoryStore(SQLBaseStore):
txn.execute(
sql,
(
user_id, get_localpart_from_id(user_id),
get_domain_from_id(user_id), display_name,
)
user_id,
get_localpart_from_id(user_id),
get_domain_from_id(user_id),
display_name,
),
)
else:
# TODO: Remove this code after we've bumped the minimum version
@ -208,9 +204,11 @@ class UserDirectoryStore(SQLBaseStore):
txn.execute(
sql,
(
user_id, get_localpart_from_id(user_id),
get_domain_from_id(user_id), display_name,
)
user_id,
get_localpart_from_id(user_id),
get_domain_from_id(user_id),
display_name,
),
)
elif new_entry is False:
sql = """
@ -225,15 +223,16 @@ class UserDirectoryStore(SQLBaseStore):
(
get_localpart_from_id(user_id),
get_domain_from_id(user_id),
display_name, user_id,
)
display_name,
user_id,
),
)
else:
raise RuntimeError(
"upsert returned None when 'can_native_upsert' is False"
)
elif isinstance(self.database_engine, Sqlite3Engine):
value = "%s %s" % (user_id, display_name,) if display_name else user_id
value = "%s %s" % (user_id, display_name) if display_name else user_id
self._simple_upsert_txn(
txn,
table="user_directory_search",
@ -264,29 +263,18 @@ class UserDirectoryStore(SQLBaseStore):
def remove_from_user_dir(self, user_id):
def _remove_from_user_dir_txn(txn):
self._simple_delete_txn(
txn,
table="user_directory",
keyvalues={"user_id": user_id},
txn, table="user_directory", keyvalues={"user_id": user_id}
)
self._simple_delete_txn(
txn,
table="user_directory_search",
keyvalues={"user_id": user_id},
txn, table="user_directory_search", keyvalues={"user_id": user_id}
)
self._simple_delete_txn(
txn,
table="users_in_public_rooms",
keyvalues={"user_id": user_id},
txn, table="users_in_public_rooms", keyvalues={"user_id": user_id}
)
txn.call_after(
self.get_user_in_directory.invalidate, (user_id,)
)
txn.call_after(
self.get_user_in_public_room.invalidate, (user_id,)
)
return self.runInteraction(
"remove_from_user_dir", _remove_from_user_dir_txn,
)
txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
txn.call_after(self.get_user_in_public_room.invalidate, (user_id,))
return self.runInteraction("remove_from_user_dir", _remove_from_user_dir_txn)
@defer.inlineCallbacks
def remove_from_user_in_public_room(self, user_id):
@ -371,6 +359,7 @@ class UserDirectoryStore(SQLBaseStore):
share_private (bool): Is the room private
user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
"""
def _add_users_who_share_room_txn(txn):
self._simple_insert_many_txn(
txn,
@ -387,13 +376,12 @@ class UserDirectoryStore(SQLBaseStore):
)
for user_id, other_user_id in user_id_tuples:
txn.call_after(
self.get_users_who_share_room_from_dir.invalidate,
(user_id,),
self.get_users_who_share_room_from_dir.invalidate, (user_id,)
)
txn.call_after(
self.get_if_users_share_a_room.invalidate,
(user_id, other_user_id),
self.get_if_users_share_a_room.invalidate, (user_id, other_user_id)
)
return self.runInteraction(
"add_users_who_share_room", _add_users_who_share_room_txn
)
@ -407,6 +395,7 @@ class UserDirectoryStore(SQLBaseStore):
share_private (bool): Is the room private
user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
"""
def _update_users_who_share_room_txn(txn):
sql = """
UPDATE users_who_share_rooms
@ -414,21 +403,16 @@ class UserDirectoryStore(SQLBaseStore):
WHERE user_id = ? AND other_user_id = ?
"""
txn.executemany(
sql,
(
(room_id, share_private, uid, oid)
for uid, oid in user_id_sets
)
sql, ((room_id, share_private, uid, oid) for uid, oid in user_id_sets)
)
for user_id, other_user_id in user_id_sets:
txn.call_after(
self.get_users_who_share_room_from_dir.invalidate,
(user_id,),
self.get_users_who_share_room_from_dir.invalidate, (user_id,)
)
txn.call_after(
self.get_if_users_share_a_room.invalidate,
(user_id, other_user_id),
self.get_if_users_share_a_room.invalidate, (user_id, other_user_id)
)
return self.runInteraction(
"update_users_who_share_room", _update_users_who_share_room_txn
)
@ -442,22 +426,18 @@ class UserDirectoryStore(SQLBaseStore):
share_private (bool): Is the room private
user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
"""
def _remove_user_who_share_room_txn(txn):
self._simple_delete_txn(
txn,
table="users_who_share_rooms",
keyvalues={
"user_id": user_id,
"other_user_id": other_user_id,
},
keyvalues={"user_id": user_id, "other_user_id": other_user_id},
)
txn.call_after(
self.get_users_who_share_room_from_dir.invalidate,
(user_id,),
self.get_users_who_share_room_from_dir.invalidate, (user_id,)
)
txn.call_after(
self.get_if_users_share_a_room.invalidate,
(user_id, other_user_id),
self.get_if_users_share_a_room.invalidate, (user_id, other_user_id)
)
return self.runInteraction(
@ -478,10 +458,7 @@ class UserDirectoryStore(SQLBaseStore):
"""
return self._simple_select_one_onecol(
table="users_who_share_rooms",
keyvalues={
"user_id": user_id,
"other_user_id": other_user_id,
},
keyvalues={"user_id": user_id, "other_user_id": other_user_id},
retcol="share_private",
allow_none=True,
desc="get_if_users_share_a_room",
@ -499,17 +476,12 @@ class UserDirectoryStore(SQLBaseStore):
"""
rows = yield self._simple_select_list(
table="users_who_share_rooms",
keyvalues={
"user_id": user_id,
},
retcols=("other_user_id", "share_private",),
keyvalues={"user_id": user_id},
retcols=("other_user_id", "share_private"),
desc="get_users_who_share_room_with_user",
)
defer.returnValue({
row["other_user_id"]: row["share_private"]
for row in rows
})
defer.returnValue({row["other_user_id"]: row["share_private"] for row in rows})
def get_users_in_share_dir_with_room_id(self, user_id, room_id):
"""Get all user tuples that are in the users_who_share_rooms due to the
@ -556,6 +528,7 @@ class UserDirectoryStore(SQLBaseStore):
def delete_all_from_user_dir(self):
"""Delete the entire user directory
"""
def _delete_all_from_user_dir_txn(txn):
txn.execute("DELETE FROM user_directory")
txn.execute("DELETE FROM user_directory_search")
@ -565,6 +538,7 @@ class UserDirectoryStore(SQLBaseStore):
txn.call_after(self.get_user_in_public_room.invalidate_all)
txn.call_after(self.get_users_who_share_room_from_dir.invalidate_all)
txn.call_after(self.get_if_users_share_a_room.invalidate_all)
return self.runInteraction(
"delete_all_from_user_dir", _delete_all_from_user_dir_txn
)
@ -574,7 +548,7 @@ class UserDirectoryStore(SQLBaseStore):
return self._simple_select_one(
table="user_directory",
keyvalues={"user_id": user_id},
retcols=("room_id", "display_name", "avatar_url",),
retcols=("room_id", "display_name", "avatar_url"),
allow_none=True,
desc="get_user_in_directory",
)
@ -607,7 +581,9 @@ class UserDirectoryStore(SQLBaseStore):
def get_current_state_deltas(self, prev_stream_id):
prev_stream_id = int(prev_stream_id)
if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id):
if not self._curr_state_delta_stream_cache.has_any_entity_changed(
prev_stream_id
):
return []
def get_current_state_deltas_txn(txn):
@ -641,7 +617,7 @@ class UserDirectoryStore(SQLBaseStore):
WHERE ? < stream_id AND stream_id <= ?
ORDER BY stream_id ASC
"""
txn.execute(sql, (prev_stream_id, max_stream_id,))
txn.execute(sql, (prev_stream_id, max_stream_id))
return self.cursor_to_dict(txn)
return self.runInteraction(
@ -731,8 +707,11 @@ class UserDirectoryStore(SQLBaseStore):
display_name IS NULL,
avatar_url IS NULL
LIMIT ?
""" % (join_clause, where_clause)
args = join_args + (full_query, exact_query, prefix_query, limit + 1,)
""" % (
join_clause,
where_clause,
)
args = join_args + (full_query, exact_query, prefix_query, limit + 1)
elif isinstance(self.database_engine, Sqlite3Engine):
search_query = _parse_query_sqlite(search_term)
@ -749,7 +728,10 @@ class UserDirectoryStore(SQLBaseStore):
display_name IS NULL,
avatar_url IS NULL
LIMIT ?
""" % (join_clause, where_clause)
""" % (
join_clause,
where_clause,
)
args = join_args + (search_query, limit + 1)
else:
# This should be unreachable.
@ -761,10 +743,7 @@ class UserDirectoryStore(SQLBaseStore):
limited = len(results) > limit
defer.returnValue({
"limited": limited,
"results": results,
})
defer.returnValue({"limited": limited, "results": results})
def _parse_query_sqlite(search_term):
@ -779,7 +758,7 @@ def _parse_query_sqlite(search_term):
# Pull out the individual words, discarding any non-word characters.
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
return " & ".join("(%s* OR %s)" % (result, result,) for result in results)
return " & ".join("(%s* OR %s)" % (result, result) for result in results)
def _parse_query_postgres(search_term):
@ -792,7 +771,7 @@ def _parse_query_postgres(search_term):
# Pull out the individual words, discarding any non-word characters.
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
both = " & ".join("(%s:* | %s)" % (result, result,) for result in results)
both = " & ".join("(%s:* | %s)" % (result, result) for result in results)
exact = " & ".join("%s" % (result,) for result in results)
prefix = " & ".join("%s:*" % (result,) for result in results)