mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-15 06:43:50 +01:00
Fix populate_stream_ordering2
background job (#10267)
It was possible for us not to find any rows in a batch, and hence conclude that we had finished. Let's not do that.
This commit is contained in:
parent
60efc51a2b
commit
7647b0337f
2 changed files with 13 additions and 16 deletions
1
changelog.d/10267.bugfix
Normal file
1
changelog.d/10267.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix a long-standing bug where Synapse would return errors after 2<sup>31</sup> events were handled by the server.
|
|
@ -1055,32 +1055,28 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
|
||||||
batch_size = max(batch_size, 1)
|
batch_size = max(batch_size, 1)
|
||||||
|
|
||||||
def process(txn: Cursor) -> int:
|
def process(txn: Cursor) -> int:
|
||||||
# if this is the first pass, find the minimum stream ordering
|
last_stream = progress.get("last_stream", -(1 << 31))
|
||||||
last_stream = progress.get("last_stream")
|
|
||||||
if last_stream is None:
|
|
||||||
txn.execute(
|
|
||||||
"""
|
|
||||||
SELECT stream_ordering FROM events ORDER BY stream_ordering LIMIT 1
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
rows = txn.fetchall()
|
|
||||||
if not rows:
|
|
||||||
return 0
|
|
||||||
last_stream = rows[0][0] - 1
|
|
||||||
|
|
||||||
txn.execute(
|
txn.execute(
|
||||||
"""
|
"""
|
||||||
UPDATE events SET stream_ordering2=stream_ordering
|
UPDATE events SET stream_ordering2=stream_ordering
|
||||||
WHERE stream_ordering > ? AND stream_ordering <= ?
|
WHERE stream_ordering IN (
|
||||||
|
SELECT stream_ordering FROM events WHERE stream_ordering > ?
|
||||||
|
ORDER BY stream_ordering LIMIT ?
|
||||||
|
)
|
||||||
|
RETURNING stream_ordering;
|
||||||
""",
|
""",
|
||||||
(last_stream, last_stream + batch_size),
|
(last_stream, batch_size),
|
||||||
)
|
)
|
||||||
row_count = txn.rowcount
|
row_count = txn.rowcount
|
||||||
|
if row_count == 0:
|
||||||
|
return 0
|
||||||
|
last_stream = max(row[0] for row in txn)
|
||||||
|
logger.info("populated stream_ordering2 up to %i", last_stream)
|
||||||
|
|
||||||
self.db_pool.updates._background_update_progress_txn(
|
self.db_pool.updates._background_update_progress_txn(
|
||||||
txn,
|
txn,
|
||||||
_BackgroundUpdates.POPULATE_STREAM_ORDERING2,
|
_BackgroundUpdates.POPULATE_STREAM_ORDERING2,
|
||||||
{"last_stream": last_stream + batch_size},
|
{"last_stream": last_stream},
|
||||||
)
|
)
|
||||||
return row_count
|
return row_count
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue