forked from MirrorHub/synapse
Fix bug with delayed cache invalidation stream
We poked the notifier before updated the current token for the cache invalidation stream. This mean that sometimes the update wouldn't be sent until the next time a cache was invalidated.
This commit is contained in:
parent
6619f047ad
commit
efb79820b4
2 changed files with 15 additions and 13 deletions
|
@ -48,16 +48,16 @@ class LoggingTransaction(object):
|
||||||
passed to the constructor. Adds logging and metrics to the .execute()
|
passed to the constructor. Adds logging and metrics to the .execute()
|
||||||
method."""
|
method."""
|
||||||
__slots__ = [
|
__slots__ = [
|
||||||
"txn", "name", "database_engine", "after_callbacks", "final_callbacks",
|
"txn", "name", "database_engine", "after_callbacks", "exception_callbacks",
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(self, txn, name, database_engine, after_callbacks,
|
def __init__(self, txn, name, database_engine, after_callbacks,
|
||||||
final_callbacks):
|
exception_callbacks):
|
||||||
object.__setattr__(self, "txn", txn)
|
object.__setattr__(self, "txn", txn)
|
||||||
object.__setattr__(self, "name", name)
|
object.__setattr__(self, "name", name)
|
||||||
object.__setattr__(self, "database_engine", database_engine)
|
object.__setattr__(self, "database_engine", database_engine)
|
||||||
object.__setattr__(self, "after_callbacks", after_callbacks)
|
object.__setattr__(self, "after_callbacks", after_callbacks)
|
||||||
object.__setattr__(self, "final_callbacks", final_callbacks)
|
object.__setattr__(self, "exception_callbacks", exception_callbacks)
|
||||||
|
|
||||||
def call_after(self, callback, *args, **kwargs):
|
def call_after(self, callback, *args, **kwargs):
|
||||||
"""Call the given callback on the main twisted thread after the
|
"""Call the given callback on the main twisted thread after the
|
||||||
|
@ -66,8 +66,8 @@ class LoggingTransaction(object):
|
||||||
"""
|
"""
|
||||||
self.after_callbacks.append((callback, args, kwargs))
|
self.after_callbacks.append((callback, args, kwargs))
|
||||||
|
|
||||||
def call_finally(self, callback, *args, **kwargs):
|
def call_on_exception(self, callback, *args, **kwargs):
|
||||||
self.final_callbacks.append((callback, args, kwargs))
|
self.exception_callbacks.append((callback, args, kwargs))
|
||||||
|
|
||||||
def __getattr__(self, name):
|
def __getattr__(self, name):
|
||||||
return getattr(self.txn, name)
|
return getattr(self.txn, name)
|
||||||
|
@ -215,7 +215,7 @@ class SQLBaseStore(object):
|
||||||
|
|
||||||
self._clock.looping_call(loop, 10000)
|
self._clock.looping_call(loop, 10000)
|
||||||
|
|
||||||
def _new_transaction(self, conn, desc, after_callbacks, final_callbacks,
|
def _new_transaction(self, conn, desc, after_callbacks, exception_callbacks,
|
||||||
logging_context, func, *args, **kwargs):
|
logging_context, func, *args, **kwargs):
|
||||||
start = time.time() * 1000
|
start = time.time() * 1000
|
||||||
txn_id = self._TXN_ID
|
txn_id = self._TXN_ID
|
||||||
|
@ -236,7 +236,7 @@ class SQLBaseStore(object):
|
||||||
txn = conn.cursor()
|
txn = conn.cursor()
|
||||||
txn = LoggingTransaction(
|
txn = LoggingTransaction(
|
||||||
txn, name, self.database_engine, after_callbacks,
|
txn, name, self.database_engine, after_callbacks,
|
||||||
final_callbacks,
|
exception_callbacks,
|
||||||
)
|
)
|
||||||
r = func(txn, *args, **kwargs)
|
r = func(txn, *args, **kwargs)
|
||||||
conn.commit()
|
conn.commit()
|
||||||
|
@ -308,11 +308,11 @@ class SQLBaseStore(object):
|
||||||
current_context = LoggingContext.current_context()
|
current_context = LoggingContext.current_context()
|
||||||
|
|
||||||
after_callbacks = []
|
after_callbacks = []
|
||||||
final_callbacks = []
|
exception_callbacks = []
|
||||||
|
|
||||||
def inner_func(conn, *args, **kwargs):
|
def inner_func(conn, *args, **kwargs):
|
||||||
return self._new_transaction(
|
return self._new_transaction(
|
||||||
conn, desc, after_callbacks, final_callbacks, current_context,
|
conn, desc, after_callbacks, exception_callbacks, current_context,
|
||||||
func, *args, **kwargs
|
func, *args, **kwargs
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -321,9 +321,10 @@ class SQLBaseStore(object):
|
||||||
|
|
||||||
for after_callback, after_args, after_kwargs in after_callbacks:
|
for after_callback, after_args, after_kwargs in after_callbacks:
|
||||||
after_callback(*after_args, **after_kwargs)
|
after_callback(*after_args, **after_kwargs)
|
||||||
finally:
|
except: # noqa: E722, as we reraise the exception this is fine.
|
||||||
for after_callback, after_args, after_kwargs in final_callbacks:
|
for after_callback, after_args, after_kwargs in exception_callbacks:
|
||||||
after_callback(*after_args, **after_kwargs)
|
after_callback(*after_args, **after_kwargs)
|
||||||
|
raise
|
||||||
|
|
||||||
defer.returnValue(result)
|
defer.returnValue(result)
|
||||||
|
|
||||||
|
@ -1000,7 +1001,8 @@ class SQLBaseStore(object):
|
||||||
# __exit__ called after the transaction finishes.
|
# __exit__ called after the transaction finishes.
|
||||||
ctx = self._cache_id_gen.get_next()
|
ctx = self._cache_id_gen.get_next()
|
||||||
stream_id = ctx.__enter__()
|
stream_id = ctx.__enter__()
|
||||||
txn.call_finally(ctx.__exit__, None, None, None)
|
txn.call_on_exception(ctx.__exit__, None, None, None)
|
||||||
|
txn.call_after(ctx.__exit__, None, None, None)
|
||||||
txn.call_after(self.hs.get_notifier().on_new_replication_data)
|
txn.call_after(self.hs.get_notifier().on_new_replication_data)
|
||||||
|
|
||||||
self._simple_insert_txn(
|
self._simple_insert_txn(
|
||||||
|
|
|
@ -76,7 +76,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
|
||||||
name="_find_stream_orderings_for_times_txn",
|
name="_find_stream_orderings_for_times_txn",
|
||||||
database_engine=self.database_engine,
|
database_engine=self.database_engine,
|
||||||
after_callbacks=[],
|
after_callbacks=[],
|
||||||
final_callbacks=[],
|
exception_callbacks=[],
|
||||||
)
|
)
|
||||||
self._find_stream_orderings_for_times_txn(cur)
|
self._find_stream_orderings_for_times_txn(cur)
|
||||||
cur.close()
|
cur.close()
|
||||||
|
|
Loading…
Reference in a new issue