forked from MirrorHub/synapse
Merge pull request #2054 from matrix-org/erikj/user_iter_cursor
Reduce some CPU work on DB threads
This commit is contained in:
commit
57cfa513f5
18 changed files with 102 additions and 132 deletions
|
@ -167,7 +167,6 @@ class SlavedEventStore(BaseSlavedStore):
|
||||||
_get_rooms_for_user_where_membership_is_txn = (
|
_get_rooms_for_user_where_membership_is_txn = (
|
||||||
DataStore._get_rooms_for_user_where_membership_is_txn.__func__
|
DataStore._get_rooms_for_user_where_membership_is_txn.__func__
|
||||||
)
|
)
|
||||||
_get_members_rows_txn = DataStore._get_members_rows_txn.__func__
|
|
||||||
_get_state_for_groups = DataStore._get_state_for_groups.__func__
|
_get_state_for_groups = DataStore._get_state_for_groups.__func__
|
||||||
_get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__
|
_get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__
|
||||||
_get_events_around_txn = DataStore._get_events_around_txn.__func__
|
_get_events_around_txn = DataStore._get_events_around_txn.__func__
|
||||||
|
|
|
@ -73,6 +73,9 @@ class LoggingTransaction(object):
|
||||||
def __setattr__(self, name, value):
|
def __setattr__(self, name, value):
|
||||||
setattr(self.txn, name, value)
|
setattr(self.txn, name, value)
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
return self.txn.__iter__()
|
||||||
|
|
||||||
def execute(self, sql, *args):
|
def execute(self, sql, *args):
|
||||||
self._do_execute(self.txn.execute, sql, *args)
|
self._do_execute(self.txn.execute, sql, *args)
|
||||||
|
|
||||||
|
@ -132,7 +135,7 @@ class PerformanceCounters(object):
|
||||||
|
|
||||||
def interval(self, interval_duration, limit=3):
|
def interval(self, interval_duration, limit=3):
|
||||||
counters = []
|
counters = []
|
||||||
for name, (count, cum_time) in self.current_counters.items():
|
for name, (count, cum_time) in self.current_counters.iteritems():
|
||||||
prev_count, prev_time = self.previous_counters.get(name, (0, 0))
|
prev_count, prev_time = self.previous_counters.get(name, (0, 0))
|
||||||
counters.append((
|
counters.append((
|
||||||
(cum_time - prev_time) / interval_duration,
|
(cum_time - prev_time) / interval_duration,
|
||||||
|
@ -357,7 +360,7 @@ class SQLBaseStore(object):
|
||||||
"""
|
"""
|
||||||
col_headers = list(intern(column[0]) for column in cursor.description)
|
col_headers = list(intern(column[0]) for column in cursor.description)
|
||||||
results = list(
|
results = list(
|
||||||
dict(zip(col_headers, row)) for row in cursor.fetchall()
|
dict(zip(col_headers, row)) for row in cursor
|
||||||
)
|
)
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
@ -565,7 +568,7 @@ class SQLBaseStore(object):
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
|
def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
|
||||||
if keyvalues:
|
if keyvalues:
|
||||||
where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
|
where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
|
||||||
else:
|
else:
|
||||||
where = ""
|
where = ""
|
||||||
|
|
||||||
|
@ -579,7 +582,7 @@ class SQLBaseStore(object):
|
||||||
|
|
||||||
txn.execute(sql, keyvalues.values())
|
txn.execute(sql, keyvalues.values())
|
||||||
|
|
||||||
return [r[0] for r in txn.fetchall()]
|
return [r[0] for r in txn]
|
||||||
|
|
||||||
def _simple_select_onecol(self, table, keyvalues, retcol,
|
def _simple_select_onecol(self, table, keyvalues, retcol,
|
||||||
desc="_simple_select_onecol"):
|
desc="_simple_select_onecol"):
|
||||||
|
@ -712,7 +715,7 @@ class SQLBaseStore(object):
|
||||||
)
|
)
|
||||||
values.extend(iterable)
|
values.extend(iterable)
|
||||||
|
|
||||||
for key, value in keyvalues.items():
|
for key, value in keyvalues.iteritems():
|
||||||
clauses.append("%s = ?" % (key,))
|
clauses.append("%s = ?" % (key,))
|
||||||
values.append(value)
|
values.append(value)
|
||||||
|
|
||||||
|
@ -753,7 +756,7 @@ class SQLBaseStore(object):
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _simple_update_one_txn(txn, table, keyvalues, updatevalues):
|
def _simple_update_one_txn(txn, table, keyvalues, updatevalues):
|
||||||
if keyvalues:
|
if keyvalues:
|
||||||
where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
|
where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
|
||||||
else:
|
else:
|
||||||
where = ""
|
where = ""
|
||||||
|
|
||||||
|
@ -870,7 +873,7 @@ class SQLBaseStore(object):
|
||||||
)
|
)
|
||||||
values.extend(iterable)
|
values.extend(iterable)
|
||||||
|
|
||||||
for key, value in keyvalues.items():
|
for key, value in keyvalues.iteritems():
|
||||||
clauses.append("%s = ?" % (key,))
|
clauses.append("%s = ?" % (key,))
|
||||||
values.append(value)
|
values.append(value)
|
||||||
|
|
||||||
|
@ -901,16 +904,16 @@ class SQLBaseStore(object):
|
||||||
|
|
||||||
txn = db_conn.cursor()
|
txn = db_conn.cursor()
|
||||||
txn.execute(sql, (int(max_value),))
|
txn.execute(sql, (int(max_value),))
|
||||||
rows = txn.fetchall()
|
|
||||||
txn.close()
|
|
||||||
|
|
||||||
cache = {
|
cache = {
|
||||||
row[0]: int(row[1])
|
row[0]: int(row[1])
|
||||||
for row in rows
|
for row in txn
|
||||||
}
|
}
|
||||||
|
|
||||||
|
txn.close()
|
||||||
|
|
||||||
if cache:
|
if cache:
|
||||||
min_val = min(cache.values())
|
min_val = min(cache.itervalues())
|
||||||
else:
|
else:
|
||||||
min_val = max_value
|
min_val = max_value
|
||||||
|
|
||||||
|
|
|
@ -182,7 +182,7 @@ class AccountDataStore(SQLBaseStore):
|
||||||
txn.execute(sql, (user_id, stream_id))
|
txn.execute(sql, (user_id, stream_id))
|
||||||
|
|
||||||
global_account_data = {
|
global_account_data = {
|
||||||
row[0]: json.loads(row[1]) for row in txn.fetchall()
|
row[0]: json.loads(row[1]) for row in txn
|
||||||
}
|
}
|
||||||
|
|
||||||
sql = (
|
sql = (
|
||||||
|
@ -193,7 +193,7 @@ class AccountDataStore(SQLBaseStore):
|
||||||
txn.execute(sql, (user_id, stream_id))
|
txn.execute(sql, (user_id, stream_id))
|
||||||
|
|
||||||
account_data_by_room = {}
|
account_data_by_room = {}
|
||||||
for row in txn.fetchall():
|
for row in txn:
|
||||||
room_account_data = account_data_by_room.setdefault(row[0], {})
|
room_account_data = account_data_by_room.setdefault(row[0], {})
|
||||||
room_account_data[row[1]] = json.loads(row[2])
|
room_account_data[row[1]] = json.loads(row[2])
|
||||||
|
|
||||||
|
|
|
@ -178,7 +178,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
|
||||||
)
|
)
|
||||||
txn.execute(sql, (user_id,))
|
txn.execute(sql, (user_id,))
|
||||||
message_json = ujson.dumps(messages_by_device["*"])
|
message_json = ujson.dumps(messages_by_device["*"])
|
||||||
for row in txn.fetchall():
|
for row in txn:
|
||||||
# Add the message for all devices for this user on this
|
# Add the message for all devices for this user on this
|
||||||
# server.
|
# server.
|
||||||
device = row[0]
|
device = row[0]
|
||||||
|
@ -195,7 +195,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
|
||||||
# TODO: Maybe this needs to be done in batches if there are
|
# TODO: Maybe this needs to be done in batches if there are
|
||||||
# too many local devices for a given user.
|
# too many local devices for a given user.
|
||||||
txn.execute(sql, [user_id] + devices)
|
txn.execute(sql, [user_id] + devices)
|
||||||
for row in txn.fetchall():
|
for row in txn:
|
||||||
# Only insert into the local inbox if the device exists on
|
# Only insert into the local inbox if the device exists on
|
||||||
# this server
|
# this server
|
||||||
device = row[0]
|
device = row[0]
|
||||||
|
@ -251,7 +251,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
|
||||||
user_id, device_id, last_stream_id, current_stream_id, limit
|
user_id, device_id, last_stream_id, current_stream_id, limit
|
||||||
))
|
))
|
||||||
messages = []
|
messages = []
|
||||||
for row in txn.fetchall():
|
for row in txn:
|
||||||
stream_pos = row[0]
|
stream_pos = row[0]
|
||||||
messages.append(ujson.loads(row[1]))
|
messages.append(ujson.loads(row[1]))
|
||||||
if len(messages) < limit:
|
if len(messages) < limit:
|
||||||
|
@ -340,7 +340,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
|
||||||
" ORDER BY stream_id ASC"
|
" ORDER BY stream_id ASC"
|
||||||
)
|
)
|
||||||
txn.execute(sql, (last_pos, upper_pos))
|
txn.execute(sql, (last_pos, upper_pos))
|
||||||
rows.extend(txn.fetchall())
|
rows.extend(txn)
|
||||||
|
|
||||||
return rows
|
return rows
|
||||||
|
|
||||||
|
@ -384,7 +384,7 @@ class DeviceInboxStore(BackgroundUpdateStore):
|
||||||
destination, last_stream_id, current_stream_id, limit
|
destination, last_stream_id, current_stream_id, limit
|
||||||
))
|
))
|
||||||
messages = []
|
messages = []
|
||||||
for row in txn.fetchall():
|
for row in txn:
|
||||||
stream_pos = row[0]
|
stream_pos = row[0]
|
||||||
messages.append(ujson.loads(row[1]))
|
messages.append(ujson.loads(row[1]))
|
||||||
if len(messages) < limit:
|
if len(messages) < limit:
|
||||||
|
|
|
@ -333,13 +333,12 @@ class DeviceStore(SQLBaseStore):
|
||||||
txn.execute(
|
txn.execute(
|
||||||
sql, (destination, from_stream_id, now_stream_id, False)
|
sql, (destination, from_stream_id, now_stream_id, False)
|
||||||
)
|
)
|
||||||
rows = txn.fetchall()
|
|
||||||
|
|
||||||
if not rows:
|
|
||||||
return (now_stream_id, [])
|
|
||||||
|
|
||||||
# maps (user_id, device_id) -> stream_id
|
# maps (user_id, device_id) -> stream_id
|
||||||
query_map = {(r[0], r[1]): r[2] for r in rows}
|
query_map = {(r[0], r[1]): r[2] for r in txn}
|
||||||
|
if not query_map:
|
||||||
|
return (now_stream_id, [])
|
||||||
|
|
||||||
devices = self._get_e2e_device_keys_txn(
|
devices = self._get_e2e_device_keys_txn(
|
||||||
txn, query_map.keys(), include_all_devices=True
|
txn, query_map.keys(), include_all_devices=True
|
||||||
)
|
)
|
||||||
|
|
|
@ -153,7 +153,7 @@ class EndToEndKeyStore(SQLBaseStore):
|
||||||
)
|
)
|
||||||
txn.execute(sql, (user_id, device_id))
|
txn.execute(sql, (user_id, device_id))
|
||||||
result = {}
|
result = {}
|
||||||
for algorithm, key_count in txn.fetchall():
|
for algorithm, key_count in txn:
|
||||||
result[algorithm] = key_count
|
result[algorithm] = key_count
|
||||||
return result
|
return result
|
||||||
return self.runInteraction(
|
return self.runInteraction(
|
||||||
|
@ -174,7 +174,7 @@ class EndToEndKeyStore(SQLBaseStore):
|
||||||
user_result = result.setdefault(user_id, {})
|
user_result = result.setdefault(user_id, {})
|
||||||
device_result = user_result.setdefault(device_id, {})
|
device_result = user_result.setdefault(device_id, {})
|
||||||
txn.execute(sql, (user_id, device_id, algorithm))
|
txn.execute(sql, (user_id, device_id, algorithm))
|
||||||
for key_id, key_json in txn.fetchall():
|
for key_id, key_json in txn:
|
||||||
device_result[algorithm + ":" + key_id] = key_json
|
device_result[algorithm + ":" + key_id] = key_json
|
||||||
delete.append((user_id, device_id, algorithm, key_id))
|
delete.append((user_id, device_id, algorithm, key_id))
|
||||||
sql = (
|
sql = (
|
||||||
|
|
|
@ -74,7 +74,7 @@ class EventFederationStore(SQLBaseStore):
|
||||||
base_sql % (",".join(["?"] * len(chunk)),),
|
base_sql % (",".join(["?"] * len(chunk)),),
|
||||||
chunk
|
chunk
|
||||||
)
|
)
|
||||||
new_front.update([r[0] for r in txn.fetchall()])
|
new_front.update([r[0] for r in txn])
|
||||||
|
|
||||||
new_front -= results
|
new_front -= results
|
||||||
|
|
||||||
|
@ -110,7 +110,7 @@ class EventFederationStore(SQLBaseStore):
|
||||||
|
|
||||||
txn.execute(sql, (room_id, False,))
|
txn.execute(sql, (room_id, False,))
|
||||||
|
|
||||||
return dict(txn.fetchall())
|
return dict(txn)
|
||||||
|
|
||||||
def _get_oldest_events_in_room_txn(self, txn, room_id):
|
def _get_oldest_events_in_room_txn(self, txn, room_id):
|
||||||
return self._simple_select_onecol_txn(
|
return self._simple_select_onecol_txn(
|
||||||
|
@ -152,7 +152,7 @@ class EventFederationStore(SQLBaseStore):
|
||||||
txn.execute(sql, (room_id, ))
|
txn.execute(sql, (room_id, ))
|
||||||
|
|
||||||
results = []
|
results = []
|
||||||
for event_id, depth in txn.fetchall():
|
for event_id, depth in txn:
|
||||||
hashes = self._get_event_reference_hashes_txn(txn, event_id)
|
hashes = self._get_event_reference_hashes_txn(txn, event_id)
|
||||||
prev_hashes = {
|
prev_hashes = {
|
||||||
k: encode_base64(v) for k, v in hashes.items()
|
k: encode_base64(v) for k, v in hashes.items()
|
||||||
|
@ -334,8 +334,7 @@ class EventFederationStore(SQLBaseStore):
|
||||||
|
|
||||||
def get_forward_extremeties_for_room_txn(txn):
|
def get_forward_extremeties_for_room_txn(txn):
|
||||||
txn.execute(sql, (stream_ordering, room_id))
|
txn.execute(sql, (stream_ordering, room_id))
|
||||||
rows = txn.fetchall()
|
return [event_id for event_id, in txn]
|
||||||
return [event_id for event_id, in rows]
|
|
||||||
|
|
||||||
return self.runInteraction(
|
return self.runInteraction(
|
||||||
"get_forward_extremeties_for_room",
|
"get_forward_extremeties_for_room",
|
||||||
|
@ -436,7 +435,7 @@ class EventFederationStore(SQLBaseStore):
|
||||||
(room_id, event_id, False, limit - len(event_results))
|
(room_id, event_id, False, limit - len(event_results))
|
||||||
)
|
)
|
||||||
|
|
||||||
for row in txn.fetchall():
|
for row in txn:
|
||||||
if row[1] not in event_results:
|
if row[1] not in event_results:
|
||||||
queue.put((-row[0], row[1]))
|
queue.put((-row[0], row[1]))
|
||||||
|
|
||||||
|
@ -482,7 +481,7 @@ class EventFederationStore(SQLBaseStore):
|
||||||
(room_id, event_id, False, limit - len(event_results))
|
(room_id, event_id, False, limit - len(event_results))
|
||||||
)
|
)
|
||||||
|
|
||||||
for e_id, in txn.fetchall():
|
for e_id, in txn:
|
||||||
new_front.add(e_id)
|
new_front.add(e_id)
|
||||||
|
|
||||||
new_front -= earliest_events
|
new_front -= earliest_events
|
||||||
|
|
|
@ -206,7 +206,7 @@ class EventPushActionsStore(SQLBaseStore):
|
||||||
" stream_ordering >= ? AND stream_ordering <= ?"
|
" stream_ordering >= ? AND stream_ordering <= ?"
|
||||||
)
|
)
|
||||||
txn.execute(sql, (min_stream_ordering, max_stream_ordering))
|
txn.execute(sql, (min_stream_ordering, max_stream_ordering))
|
||||||
return [r[0] for r in txn.fetchall()]
|
return [r[0] for r in txn]
|
||||||
ret = yield self.runInteraction("get_push_action_users_in_range", f)
|
ret = yield self.runInteraction("get_push_action_users_in_range", f)
|
||||||
defer.returnValue(ret)
|
defer.returnValue(ret)
|
||||||
|
|
||||||
|
|
|
@ -217,14 +217,14 @@ class EventsStore(SQLBaseStore):
|
||||||
partitioned.setdefault(event.room_id, []).append((event, ctx))
|
partitioned.setdefault(event.room_id, []).append((event, ctx))
|
||||||
|
|
||||||
deferreds = []
|
deferreds = []
|
||||||
for room_id, evs_ctxs in partitioned.items():
|
for room_id, evs_ctxs in partitioned.iteritems():
|
||||||
d = preserve_fn(self._event_persist_queue.add_to_queue)(
|
d = preserve_fn(self._event_persist_queue.add_to_queue)(
|
||||||
room_id, evs_ctxs,
|
room_id, evs_ctxs,
|
||||||
backfilled=backfilled,
|
backfilled=backfilled,
|
||||||
)
|
)
|
||||||
deferreds.append(d)
|
deferreds.append(d)
|
||||||
|
|
||||||
for room_id in partitioned.keys():
|
for room_id in partitioned:
|
||||||
self._maybe_start_persisting(room_id)
|
self._maybe_start_persisting(room_id)
|
||||||
|
|
||||||
return preserve_context_over_deferred(
|
return preserve_context_over_deferred(
|
||||||
|
@ -323,7 +323,7 @@ class EventsStore(SQLBaseStore):
|
||||||
(event, context)
|
(event, context)
|
||||||
)
|
)
|
||||||
|
|
||||||
for room_id, ev_ctx_rm in events_by_room.items():
|
for room_id, ev_ctx_rm in events_by_room.iteritems():
|
||||||
# Work out new extremities by recursively adding and removing
|
# Work out new extremities by recursively adding and removing
|
||||||
# the new events.
|
# the new events.
|
||||||
latest_event_ids = yield self.get_latest_event_ids_in_room(
|
latest_event_ids = yield self.get_latest_event_ids_in_room(
|
||||||
|
@ -453,10 +453,10 @@ class EventsStore(SQLBaseStore):
|
||||||
missing_event_ids,
|
missing_event_ids,
|
||||||
)
|
)
|
||||||
|
|
||||||
groups = set(event_to_groups.values())
|
groups = set(event_to_groups.itervalues())
|
||||||
group_to_state = yield self._get_state_for_groups(groups)
|
group_to_state = yield self._get_state_for_groups(groups)
|
||||||
|
|
||||||
state_sets.extend(group_to_state.values())
|
state_sets.extend(group_to_state.itervalues())
|
||||||
|
|
||||||
if not new_latest_event_ids:
|
if not new_latest_event_ids:
|
||||||
current_state = {}
|
current_state = {}
|
||||||
|
@ -718,7 +718,7 @@ class EventsStore(SQLBaseStore):
|
||||||
|
|
||||||
def _update_forward_extremities_txn(self, txn, new_forward_extremities,
|
def _update_forward_extremities_txn(self, txn, new_forward_extremities,
|
||||||
max_stream_order):
|
max_stream_order):
|
||||||
for room_id, new_extrem in new_forward_extremities.items():
|
for room_id, new_extrem in new_forward_extremities.iteritems():
|
||||||
self._simple_delete_txn(
|
self._simple_delete_txn(
|
||||||
txn,
|
txn,
|
||||||
table="event_forward_extremities",
|
table="event_forward_extremities",
|
||||||
|
@ -736,7 +736,7 @@ class EventsStore(SQLBaseStore):
|
||||||
"event_id": ev_id,
|
"event_id": ev_id,
|
||||||
"room_id": room_id,
|
"room_id": room_id,
|
||||||
}
|
}
|
||||||
for room_id, new_extrem in new_forward_extremities.items()
|
for room_id, new_extrem in new_forward_extremities.iteritems()
|
||||||
for ev_id in new_extrem
|
for ev_id in new_extrem
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -753,7 +753,7 @@ class EventsStore(SQLBaseStore):
|
||||||
"event_id": event_id,
|
"event_id": event_id,
|
||||||
"stream_ordering": max_stream_order,
|
"stream_ordering": max_stream_order,
|
||||||
}
|
}
|
||||||
for room_id, new_extrem in new_forward_extremities.items()
|
for room_id, new_extrem in new_forward_extremities.iteritems()
|
||||||
for event_id in new_extrem
|
for event_id in new_extrem
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
@ -807,7 +807,7 @@ class EventsStore(SQLBaseStore):
|
||||||
event.depth, depth_updates.get(event.room_id, event.depth)
|
event.depth, depth_updates.get(event.room_id, event.depth)
|
||||||
)
|
)
|
||||||
|
|
||||||
for room_id, depth in depth_updates.items():
|
for room_id, depth in depth_updates.iteritems():
|
||||||
self._update_min_depth_for_room_txn(txn, room_id, depth)
|
self._update_min_depth_for_room_txn(txn, room_id, depth)
|
||||||
|
|
||||||
def _update_outliers_txn(self, txn, events_and_contexts):
|
def _update_outliers_txn(self, txn, events_and_contexts):
|
||||||
|
@ -834,7 +834,7 @@ class EventsStore(SQLBaseStore):
|
||||||
|
|
||||||
have_persisted = {
|
have_persisted = {
|
||||||
event_id: outlier
|
event_id: outlier
|
||||||
for event_id, outlier in txn.fetchall()
|
for event_id, outlier in txn
|
||||||
}
|
}
|
||||||
|
|
||||||
to_remove = set()
|
to_remove = set()
|
||||||
|
@ -958,14 +958,10 @@ class EventsStore(SQLBaseStore):
|
||||||
return
|
return
|
||||||
|
|
||||||
def event_dict(event):
|
def event_dict(event):
|
||||||
return {
|
d = event.get_dict()
|
||||||
k: v
|
d.pop("redacted", None)
|
||||||
for k, v in event.get_dict().items()
|
d.pop("redacted_because", None)
|
||||||
if k not in [
|
return d
|
||||||
"redacted",
|
|
||||||
"redacted_because",
|
|
||||||
]
|
|
||||||
}
|
|
||||||
|
|
||||||
self._simple_insert_many_txn(
|
self._simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
|
@ -1998,7 +1994,7 @@ class EventsStore(SQLBaseStore):
|
||||||
"state_key": key[1],
|
"state_key": key[1],
|
||||||
"event_id": state_id,
|
"event_id": state_id,
|
||||||
}
|
}
|
||||||
for key, state_id in curr_state.items()
|
for key, state_id in curr_state.iteritems()
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -356,7 +356,7 @@ def _get_or_create_schema_state(txn, database_engine):
|
||||||
),
|
),
|
||||||
(current_version,)
|
(current_version,)
|
||||||
)
|
)
|
||||||
applied_deltas = [d for d, in txn.fetchall()]
|
applied_deltas = [d for d, in txn]
|
||||||
return current_version, applied_deltas, upgraded
|
return current_version, applied_deltas, upgraded
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
|
@ -313,10 +313,9 @@ class ReceiptsStore(SQLBaseStore):
|
||||||
)
|
)
|
||||||
|
|
||||||
txn.execute(sql, (room_id, receipt_type, user_id))
|
txn.execute(sql, (room_id, receipt_type, user_id))
|
||||||
results = txn.fetchall()
|
|
||||||
|
|
||||||
if results and topological_ordering:
|
if topological_ordering:
|
||||||
for to, so, _ in results:
|
for to, so, _ in txn:
|
||||||
if int(to) > topological_ordering:
|
if int(to) > topological_ordering:
|
||||||
return False
|
return False
|
||||||
elif int(to) == topological_ordering and int(so) >= stream_ordering:
|
elif int(to) == topological_ordering and int(so) >= stream_ordering:
|
||||||
|
|
|
@ -209,7 +209,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
|
||||||
" WHERE lower(name) = lower(?)"
|
" WHERE lower(name) = lower(?)"
|
||||||
)
|
)
|
||||||
txn.execute(sql, (user_id,))
|
txn.execute(sql, (user_id,))
|
||||||
return dict(txn.fetchall())
|
return dict(txn)
|
||||||
|
|
||||||
return self.runInteraction("get_users_by_id_case_insensitive", f)
|
return self.runInteraction("get_users_by_id_case_insensitive", f)
|
||||||
|
|
||||||
|
|
|
@ -396,7 +396,7 @@ class RoomStore(SQLBaseStore):
|
||||||
sql % ("AND appservice_id IS NULL",),
|
sql % ("AND appservice_id IS NULL",),
|
||||||
(stream_id,)
|
(stream_id,)
|
||||||
)
|
)
|
||||||
return dict(txn.fetchall())
|
return dict(txn)
|
||||||
else:
|
else:
|
||||||
# We want to get from all lists, so we need to aggregate the results
|
# We want to get from all lists, so we need to aggregate the results
|
||||||
|
|
||||||
|
@ -422,7 +422,7 @@ class RoomStore(SQLBaseStore):
|
||||||
|
|
||||||
results = {}
|
results = {}
|
||||||
# A room is visible if its visible on any list.
|
# A room is visible if its visible on any list.
|
||||||
for room_id, visibility in txn.fetchall():
|
for room_id, visibility in txn:
|
||||||
results[room_id] = bool(visibility) or results.get(room_id, False)
|
results[room_id] = bool(visibility) or results.get(room_id, False)
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
|
@ -132,14 +132,17 @@ class RoomMemberStore(SQLBaseStore):
|
||||||
@cached(max_entries=500000, iterable=True)
|
@cached(max_entries=500000, iterable=True)
|
||||||
def get_users_in_room(self, room_id):
|
def get_users_in_room(self, room_id):
|
||||||
def f(txn):
|
def f(txn):
|
||||||
|
sql = (
|
||||||
rows = self._get_members_rows_txn(
|
"SELECT m.user_id FROM room_memberships as m"
|
||||||
txn,
|
" INNER JOIN current_state_events as c"
|
||||||
room_id=room_id,
|
" ON m.event_id = c.event_id "
|
||||||
membership=Membership.JOIN,
|
" AND m.room_id = c.room_id "
|
||||||
|
" AND m.user_id = c.state_key"
|
||||||
|
" WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?"
|
||||||
)
|
)
|
||||||
|
|
||||||
return [r["user_id"] for r in rows]
|
txn.execute(sql, (room_id, Membership.JOIN,))
|
||||||
|
return [r[0] for r in txn]
|
||||||
return self.runInteraction("get_users_in_room", f)
|
return self.runInteraction("get_users_in_room", f)
|
||||||
|
|
||||||
@cached()
|
@cached()
|
||||||
|
@ -246,34 +249,6 @@ class RoomMemberStore(SQLBaseStore):
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None):
|
|
||||||
where_clause = "c.room_id = ?"
|
|
||||||
where_values = [room_id]
|
|
||||||
|
|
||||||
if membership:
|
|
||||||
where_clause += " AND m.membership = ?"
|
|
||||||
where_values.append(membership)
|
|
||||||
|
|
||||||
if user_id:
|
|
||||||
where_clause += " AND m.user_id = ?"
|
|
||||||
where_values.append(user_id)
|
|
||||||
|
|
||||||
sql = (
|
|
||||||
"SELECT m.* FROM room_memberships as m"
|
|
||||||
" INNER JOIN current_state_events as c"
|
|
||||||
" ON m.event_id = c.event_id "
|
|
||||||
" AND m.room_id = c.room_id "
|
|
||||||
" AND m.user_id = c.state_key"
|
|
||||||
" WHERE c.type = 'm.room.member' AND %(where)s"
|
|
||||||
) % {
|
|
||||||
"where": where_clause,
|
|
||||||
}
|
|
||||||
|
|
||||||
txn.execute(sql, where_values)
|
|
||||||
rows = self.cursor_to_dict(txn)
|
|
||||||
|
|
||||||
return rows
|
|
||||||
|
|
||||||
@cachedInlineCallbacks(max_entries=500000, iterable=True)
|
@cachedInlineCallbacks(max_entries=500000, iterable=True)
|
||||||
def get_rooms_for_user(self, user_id):
|
def get_rooms_for_user(self, user_id):
|
||||||
"""Returns a set of room_ids the user is currently joined to
|
"""Returns a set of room_ids the user is currently joined to
|
||||||
|
|
|
@ -72,7 +72,7 @@ class SignatureStore(SQLBaseStore):
|
||||||
" WHERE event_id = ?"
|
" WHERE event_id = ?"
|
||||||
)
|
)
|
||||||
txn.execute(query, (event_id, ))
|
txn.execute(query, (event_id, ))
|
||||||
return {k: v for k, v in txn.fetchall()}
|
return {k: v for k, v in txn}
|
||||||
|
|
||||||
def _store_event_reference_hashes_txn(self, txn, events):
|
def _store_event_reference_hashes_txn(self, txn, events):
|
||||||
"""Store a hash for a PDU
|
"""Store a hash for a PDU
|
||||||
|
|
|
@ -90,7 +90,7 @@ class StateStore(SQLBaseStore):
|
||||||
event_ids,
|
event_ids,
|
||||||
)
|
)
|
||||||
|
|
||||||
groups = set(event_to_groups.values())
|
groups = set(event_to_groups.itervalues())
|
||||||
group_to_state = yield self._get_state_for_groups(groups)
|
group_to_state = yield self._get_state_for_groups(groups)
|
||||||
|
|
||||||
defer.returnValue(group_to_state)
|
defer.returnValue(group_to_state)
|
||||||
|
@ -108,17 +108,18 @@ class StateStore(SQLBaseStore):
|
||||||
|
|
||||||
state_event_map = yield self.get_events(
|
state_event_map = yield self.get_events(
|
||||||
[
|
[
|
||||||
ev_id for group_ids in group_to_ids.values()
|
ev_id for group_ids in group_to_ids.itervalues()
|
||||||
for ev_id in group_ids.values()
|
for ev_id in group_ids.itervalues()
|
||||||
],
|
],
|
||||||
get_prev_content=False
|
get_prev_content=False
|
||||||
)
|
)
|
||||||
|
|
||||||
defer.returnValue({
|
defer.returnValue({
|
||||||
group: [
|
group: [
|
||||||
state_event_map[v] for v in event_id_map.values() if v in state_event_map
|
state_event_map[v] for v in event_id_map.itervalues()
|
||||||
|
if v in state_event_map
|
||||||
]
|
]
|
||||||
for group, event_id_map in group_to_ids.items()
|
for group, event_id_map in group_to_ids.iteritems()
|
||||||
})
|
})
|
||||||
|
|
||||||
def _have_persisted_state_group_txn(self, txn, state_group):
|
def _have_persisted_state_group_txn(self, txn, state_group):
|
||||||
|
@ -190,7 +191,7 @@ class StateStore(SQLBaseStore):
|
||||||
"state_key": key[1],
|
"state_key": key[1],
|
||||||
"event_id": state_id,
|
"event_id": state_id,
|
||||||
}
|
}
|
||||||
for key, state_id in context.delta_ids.items()
|
for key, state_id in context.delta_ids.iteritems()
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
@ -205,7 +206,7 @@ class StateStore(SQLBaseStore):
|
||||||
"state_key": key[1],
|
"state_key": key[1],
|
||||||
"event_id": state_id,
|
"event_id": state_id,
|
||||||
}
|
}
|
||||||
for key, state_id in context.current_state_ids.items()
|
for key, state_id in context.current_state_ids.iteritems()
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -217,7 +218,7 @@ class StateStore(SQLBaseStore):
|
||||||
"state_group": state_group_id,
|
"state_group": state_group_id,
|
||||||
"event_id": event_id,
|
"event_id": event_id,
|
||||||
}
|
}
|
||||||
for event_id, state_group_id in state_groups.items()
|
for event_id, state_group_id in state_groups.iteritems()
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -341,10 +342,10 @@ class StateStore(SQLBaseStore):
|
||||||
args.extend(where_args)
|
args.extend(where_args)
|
||||||
|
|
||||||
txn.execute(sql % (where_clause,), args)
|
txn.execute(sql % (where_clause,), args)
|
||||||
rows = self.cursor_to_dict(txn)
|
for row in txn:
|
||||||
for row in rows:
|
typ, state_key, event_id = row
|
||||||
key = (row["type"], row["state_key"])
|
key = (typ, state_key)
|
||||||
results[group][key] = row["event_id"]
|
results[group][key] = event_id
|
||||||
else:
|
else:
|
||||||
if types is not None:
|
if types is not None:
|
||||||
where_clause = "AND (%s)" % (
|
where_clause = "AND (%s)" % (
|
||||||
|
@ -373,12 +374,11 @@ class StateStore(SQLBaseStore):
|
||||||
" WHERE state_group = ? %s" % (where_clause,),
|
" WHERE state_group = ? %s" % (where_clause,),
|
||||||
args
|
args
|
||||||
)
|
)
|
||||||
rows = txn.fetchall()
|
results[group].update(
|
||||||
results[group].update({
|
((typ, state_key), event_id)
|
||||||
(typ, state_key): event_id
|
for typ, state_key, event_id in txn
|
||||||
for typ, state_key, event_id in rows
|
|
||||||
if (typ, state_key) not in results[group]
|
if (typ, state_key) not in results[group]
|
||||||
})
|
)
|
||||||
|
|
||||||
# If the lengths match then we must have all the types,
|
# If the lengths match then we must have all the types,
|
||||||
# so no need to go walk further down the tree.
|
# so no need to go walk further down the tree.
|
||||||
|
@ -415,21 +415,21 @@ class StateStore(SQLBaseStore):
|
||||||
event_ids,
|
event_ids,
|
||||||
)
|
)
|
||||||
|
|
||||||
groups = set(event_to_groups.values())
|
groups = set(event_to_groups.itervalues())
|
||||||
group_to_state = yield self._get_state_for_groups(groups, types)
|
group_to_state = yield self._get_state_for_groups(groups, types)
|
||||||
|
|
||||||
state_event_map = yield self.get_events(
|
state_event_map = yield self.get_events(
|
||||||
[ev_id for sd in group_to_state.values() for ev_id in sd.values()],
|
[ev_id for sd in group_to_state.itervalues() for ev_id in sd.itervalues()],
|
||||||
get_prev_content=False
|
get_prev_content=False
|
||||||
)
|
)
|
||||||
|
|
||||||
event_to_state = {
|
event_to_state = {
|
||||||
event_id: {
|
event_id: {
|
||||||
k: state_event_map[v]
|
k: state_event_map[v]
|
||||||
for k, v in group_to_state[group].items()
|
for k, v in group_to_state[group].iteritems()
|
||||||
if v in state_event_map
|
if v in state_event_map
|
||||||
}
|
}
|
||||||
for event_id, group in event_to_groups.items()
|
for event_id, group in event_to_groups.iteritems()
|
||||||
}
|
}
|
||||||
|
|
||||||
defer.returnValue({event: event_to_state[event] for event in event_ids})
|
defer.returnValue({event: event_to_state[event] for event in event_ids})
|
||||||
|
@ -452,12 +452,12 @@ class StateStore(SQLBaseStore):
|
||||||
event_ids,
|
event_ids,
|
||||||
)
|
)
|
||||||
|
|
||||||
groups = set(event_to_groups.values())
|
groups = set(event_to_groups.itervalues())
|
||||||
group_to_state = yield self._get_state_for_groups(groups, types)
|
group_to_state = yield self._get_state_for_groups(groups, types)
|
||||||
|
|
||||||
event_to_state = {
|
event_to_state = {
|
||||||
event_id: group_to_state[group]
|
event_id: group_to_state[group]
|
||||||
for event_id, group in event_to_groups.items()
|
for event_id, group in event_to_groups.iteritems()
|
||||||
}
|
}
|
||||||
|
|
||||||
defer.returnValue({event: event_to_state[event] for event in event_ids})
|
defer.returnValue({event: event_to_state[event] for event in event_ids})
|
||||||
|
@ -569,7 +569,7 @@ class StateStore(SQLBaseStore):
|
||||||
got_all = not (missing_types or types is None)
|
got_all = not (missing_types or types is None)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
k: v for k, v in state_dict_ids.items()
|
k: v for k, v in state_dict_ids.iteritems()
|
||||||
if include(k[0], k[1])
|
if include(k[0], k[1])
|
||||||
}, missing_types, got_all
|
}, missing_types, got_all
|
||||||
|
|
||||||
|
@ -628,7 +628,7 @@ class StateStore(SQLBaseStore):
|
||||||
|
|
||||||
# Now we want to update the cache with all the things we fetched
|
# Now we want to update the cache with all the things we fetched
|
||||||
# from the database.
|
# from the database.
|
||||||
for group, group_state_dict in group_to_state_dict.items():
|
for group, group_state_dict in group_to_state_dict.iteritems():
|
||||||
if types:
|
if types:
|
||||||
# We delibrately put key -> None mappings into the cache to
|
# We delibrately put key -> None mappings into the cache to
|
||||||
# cache absence of the key, on the assumption that if we've
|
# cache absence of the key, on the assumption that if we've
|
||||||
|
@ -643,10 +643,10 @@ class StateStore(SQLBaseStore):
|
||||||
else:
|
else:
|
||||||
state_dict = results[group]
|
state_dict = results[group]
|
||||||
|
|
||||||
state_dict.update({
|
state_dict.update(
|
||||||
(intern_string(k[0]), intern_string(k[1])): v
|
((intern_string(k[0]), intern_string(k[1])), v)
|
||||||
for k, v in group_state_dict.items()
|
for k, v in group_state_dict.iteritems()
|
||||||
})
|
)
|
||||||
|
|
||||||
self._state_group_cache.update(
|
self._state_group_cache.update(
|
||||||
cache_seq_num,
|
cache_seq_num,
|
||||||
|
@ -657,10 +657,10 @@ class StateStore(SQLBaseStore):
|
||||||
|
|
||||||
# Remove all the entries with None values. The None values were just
|
# Remove all the entries with None values. The None values were just
|
||||||
# used for bookkeeping in the cache.
|
# used for bookkeeping in the cache.
|
||||||
for group, state_dict in results.items():
|
for group, state_dict in results.iteritems():
|
||||||
results[group] = {
|
results[group] = {
|
||||||
key: event_id
|
key: event_id
|
||||||
for key, event_id in state_dict.items()
|
for key, event_id in state_dict.iteritems()
|
||||||
if event_id
|
if event_id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -749,7 +749,7 @@ class StateStore(SQLBaseStore):
|
||||||
# of keys
|
# of keys
|
||||||
|
|
||||||
delta_state = {
|
delta_state = {
|
||||||
key: value for key, value in curr_state.items()
|
key: value for key, value in curr_state.iteritems()
|
||||||
if prev_state.get(key, None) != value
|
if prev_state.get(key, None) != value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -789,7 +789,7 @@ class StateStore(SQLBaseStore):
|
||||||
"state_key": key[1],
|
"state_key": key[1],
|
||||||
"event_id": state_id,
|
"event_id": state_id,
|
||||||
}
|
}
|
||||||
for key, state_id in delta_state.items()
|
for key, state_id in delta_state.iteritems()
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -95,7 +95,7 @@ class TagsStore(SQLBaseStore):
|
||||||
for stream_id, user_id, room_id in tag_ids:
|
for stream_id, user_id, room_id in tag_ids:
|
||||||
txn.execute(sql, (user_id, room_id))
|
txn.execute(sql, (user_id, room_id))
|
||||||
tags = []
|
tags = []
|
||||||
for tag, content in txn.fetchall():
|
for tag, content in txn:
|
||||||
tags.append(json.dumps(tag) + ":" + content)
|
tags.append(json.dumps(tag) + ":" + content)
|
||||||
tag_json = "{" + ",".join(tags) + "}"
|
tag_json = "{" + ",".join(tags) + "}"
|
||||||
results.append((stream_id, user_id, room_id, tag_json))
|
results.append((stream_id, user_id, room_id, tag_json))
|
||||||
|
@ -132,7 +132,7 @@ class TagsStore(SQLBaseStore):
|
||||||
" WHERE user_id = ? AND stream_id > ?"
|
" WHERE user_id = ? AND stream_id > ?"
|
||||||
)
|
)
|
||||||
txn.execute(sql, (user_id, stream_id))
|
txn.execute(sql, (user_id, stream_id))
|
||||||
room_ids = [row[0] for row in txn.fetchall()]
|
room_ids = [row[0] for row in txn]
|
||||||
return room_ids
|
return room_ids
|
||||||
|
|
||||||
changed = self._account_data_stream_cache.has_entity_changed(
|
changed = self._account_data_stream_cache.has_entity_changed(
|
||||||
|
|
|
@ -89,7 +89,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_select_one_1col(self):
|
def test_select_one_1col(self):
|
||||||
self.mock_txn.rowcount = 1
|
self.mock_txn.rowcount = 1
|
||||||
self.mock_txn.fetchall.return_value = [("Value",)]
|
self.mock_txn.__iter__ = Mock(return_value=iter([("Value",)]))
|
||||||
|
|
||||||
value = yield self.datastore._simple_select_one_onecol(
|
value = yield self.datastore._simple_select_one_onecol(
|
||||||
table="tablename",
|
table="tablename",
|
||||||
|
@ -136,7 +136,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_select_list(self):
|
def test_select_list(self):
|
||||||
self.mock_txn.rowcount = 3
|
self.mock_txn.rowcount = 3
|
||||||
self.mock_txn.fetchall.return_value = ((1,), (2,), (3,))
|
self.mock_txn.__iter__ = Mock(return_value=iter([(1,), (2,), (3,)]))
|
||||||
self.mock_txn.description = (
|
self.mock_txn.description = (
|
||||||
("colA", None, None, None, None, None, None),
|
("colA", None, None, None, None, None, None),
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in a new issue