forked from MirrorHub/synapse
Fix state group storage bug in workers
We needed to move `_count_state_group_hops_txn` to the StateGroupWorkerStore.
This commit is contained in:
parent
b8d821aa68
commit
fd1601c596
1 changed files with 41 additions and 41 deletions
|
@ -655,6 +655,47 @@ class StateGroupWorkerStore(SQLBaseStore):
|
||||||
|
|
||||||
return self.runInteraction("store_state_group", _store_state_group_txn)
|
return self.runInteraction("store_state_group", _store_state_group_txn)
|
||||||
|
|
||||||
|
def _count_state_group_hops_txn(self, txn, state_group):
|
||||||
|
"""Given a state group, count how many hops there are in the tree.
|
||||||
|
|
||||||
|
This is used to ensure the delta chains don't get too long.
|
||||||
|
"""
|
||||||
|
if isinstance(self.database_engine, PostgresEngine):
|
||||||
|
sql = ("""
|
||||||
|
WITH RECURSIVE state(state_group) AS (
|
||||||
|
VALUES(?::bigint)
|
||||||
|
UNION ALL
|
||||||
|
SELECT prev_state_group FROM state_group_edges e, state s
|
||||||
|
WHERE s.state_group = e.state_group
|
||||||
|
)
|
||||||
|
SELECT count(*) FROM state;
|
||||||
|
""")
|
||||||
|
|
||||||
|
txn.execute(sql, (state_group,))
|
||||||
|
row = txn.fetchone()
|
||||||
|
if row and row[0]:
|
||||||
|
return row[0]
|
||||||
|
else:
|
||||||
|
return 0
|
||||||
|
else:
|
||||||
|
# We don't use WITH RECURSIVE on sqlite3 as there are distributions
|
||||||
|
# that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
|
||||||
|
next_group = state_group
|
||||||
|
count = 0
|
||||||
|
|
||||||
|
while next_group:
|
||||||
|
next_group = self._simple_select_one_onecol_txn(
|
||||||
|
txn,
|
||||||
|
table="state_group_edges",
|
||||||
|
keyvalues={"state_group": next_group},
|
||||||
|
retcol="prev_state_group",
|
||||||
|
allow_none=True,
|
||||||
|
)
|
||||||
|
if next_group:
|
||||||
|
count += 1
|
||||||
|
|
||||||
|
return count
|
||||||
|
|
||||||
|
|
||||||
class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
|
class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
|
||||||
""" Keeps track of the state at a given event.
|
""" Keeps track of the state at a given event.
|
||||||
|
@ -729,47 +770,6 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
|
||||||
(event_id,), state_group_id
|
(event_id,), state_group_id
|
||||||
)
|
)
|
||||||
|
|
||||||
def _count_state_group_hops_txn(self, txn, state_group):
|
|
||||||
"""Given a state group, count how many hops there are in the tree.
|
|
||||||
|
|
||||||
This is used to ensure the delta chains don't get too long.
|
|
||||||
"""
|
|
||||||
if isinstance(self.database_engine, PostgresEngine):
|
|
||||||
sql = ("""
|
|
||||||
WITH RECURSIVE state(state_group) AS (
|
|
||||||
VALUES(?::bigint)
|
|
||||||
UNION ALL
|
|
||||||
SELECT prev_state_group FROM state_group_edges e, state s
|
|
||||||
WHERE s.state_group = e.state_group
|
|
||||||
)
|
|
||||||
SELECT count(*) FROM state;
|
|
||||||
""")
|
|
||||||
|
|
||||||
txn.execute(sql, (state_group,))
|
|
||||||
row = txn.fetchone()
|
|
||||||
if row and row[0]:
|
|
||||||
return row[0]
|
|
||||||
else:
|
|
||||||
return 0
|
|
||||||
else:
|
|
||||||
# We don't use WITH RECURSIVE on sqlite3 as there are distributions
|
|
||||||
# that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
|
|
||||||
next_group = state_group
|
|
||||||
count = 0
|
|
||||||
|
|
||||||
while next_group:
|
|
||||||
next_group = self._simple_select_one_onecol_txn(
|
|
||||||
txn,
|
|
||||||
table="state_group_edges",
|
|
||||||
keyvalues={"state_group": next_group},
|
|
||||||
retcol="prev_state_group",
|
|
||||||
allow_none=True,
|
|
||||||
)
|
|
||||||
if next_group:
|
|
||||||
count += 1
|
|
||||||
|
|
||||||
return count
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _background_deduplicate_state(self, progress, batch_size):
|
def _background_deduplicate_state(self, progress, batch_size):
|
||||||
"""This background update will slowly deduplicate state by reencoding
|
"""This background update will slowly deduplicate state by reencoding
|
||||||
|
|
Loading…
Reference in a new issue