forked from MirrorHub/synapse
Patch inlinecallbacks for log contexts
This commit is contained in:
parent
8c27bc8b60
commit
132279a46f
5 changed files with 95 additions and 10 deletions
|
@ -17,8 +17,11 @@
|
||||||
""" This is a reference implementation of a Matrix home server.
|
""" This is a reference implementation of a Matrix home server.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
|
from tests.patch_inline_callbacks import do_patch
|
||||||
|
|
||||||
# Check that we're not running on an unsupported Python version.
|
# Check that we're not running on an unsupported Python version.
|
||||||
if sys.version_info < (3, 5):
|
if sys.version_info < (3, 5):
|
||||||
print("Synapse requires Python 3.5 or above.")
|
print("Synapse requires Python 3.5 or above.")
|
||||||
|
@ -36,3 +39,6 @@ except ImportError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
__version__ = "1.4.0rc1"
|
__version__ = "1.4.0rc1"
|
||||||
|
|
||||||
|
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
|
||||||
|
do_patch()
|
||||||
|
|
|
@ -213,11 +213,11 @@ class RoomMemberHandler(object):
|
||||||
|
|
||||||
if predecessor:
|
if predecessor:
|
||||||
# It is an upgraded room. Copy over old tags
|
# It is an upgraded room. Copy over old tags
|
||||||
self.copy_room_tags_and_direct_to_room(
|
yield self.copy_room_tags_and_direct_to_room(
|
||||||
predecessor["room_id"], room_id, user_id
|
predecessor["room_id"], room_id, user_id
|
||||||
)
|
)
|
||||||
# Move over old push rules
|
# Move over old push rules
|
||||||
self.store.move_push_rules_from_room_to_room_for_user(
|
yield self.store.move_push_rules_from_room_to_room_for_user(
|
||||||
predecessor["room_id"], room_id, user_id
|
predecessor["room_id"], room_id, user_id
|
||||||
)
|
)
|
||||||
elif event.membership == Membership.LEAVE:
|
elif event.membership == Membership.LEAVE:
|
||||||
|
|
|
@ -30,7 +30,7 @@ from prometheus_client import Histogram
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.api.errors import StoreError
|
from synapse.api.errors import StoreError
|
||||||
from synapse.logging.context import LoggingContext, PreserveLoggingContext
|
from synapse.logging.context import LoggingContext, make_deferred_yieldable
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
||||||
from synapse.types import get_domain_from_id
|
from synapse.types import get_domain_from_id
|
||||||
|
@ -550,8 +550,9 @@ class SQLBaseStore(object):
|
||||||
|
|
||||||
return func(conn, *args, **kwargs)
|
return func(conn, *args, **kwargs)
|
||||||
|
|
||||||
with PreserveLoggingContext():
|
result = yield make_deferred_yieldable(
|
||||||
result = yield self._db_pool.runWithConnection(inner_func, *args, **kwargs)
|
self._db_pool.runWithConnection(inner_func, *args, **kwargs)
|
||||||
|
)
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
|
@ -235,7 +235,7 @@ class PushRulesWorkerStore(
|
||||||
(c.get("key") == "room_id" and c.get("pattern") == old_room_id)
|
(c.get("key") == "room_id" and c.get("pattern") == old_room_id)
|
||||||
for c in conditions
|
for c in conditions
|
||||||
):
|
):
|
||||||
self.move_push_rule_from_room_to_room(new_room_id, user_id, rule)
|
yield self.move_push_rule_from_room_to_room(new_room_id, user_id, rule)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def bulk_get_push_rules_for_room(self, event, context):
|
def bulk_get_push_rules_for_room(self, event, context):
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
from __future__ import print_function
|
from __future__ import print_function
|
||||||
|
|
||||||
|
import inspect
|
||||||
import functools
|
import functools
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
|
@ -33,17 +34,19 @@ def do_patch():
|
||||||
orig_inline_callbacks = defer.inlineCallbacks
|
orig_inline_callbacks = defer.inlineCallbacks
|
||||||
|
|
||||||
def new_inline_callbacks(f):
|
def new_inline_callbacks(f):
|
||||||
|
|
||||||
orig = orig_inline_callbacks(f)
|
|
||||||
|
|
||||||
@functools.wraps(f)
|
@functools.wraps(f)
|
||||||
def wrapped(*args, **kwargs):
|
def wrapped(*args, **kwargs):
|
||||||
start_context = LoggingContext.current_context()
|
start_context = LoggingContext.current_context()
|
||||||
|
changes = []
|
||||||
|
orig = orig_inline_callbacks(_check_yield_points(f, changes, start_context))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
res = orig(*args, **kwargs)
|
res = orig(*args, **kwargs)
|
||||||
except Exception:
|
except Exception:
|
||||||
if LoggingContext.current_context() != start_context:
|
if LoggingContext.current_context() != start_context:
|
||||||
|
for err in changes:
|
||||||
|
print(err, file=sys.stderr)
|
||||||
|
|
||||||
err = "%s changed context from %s to %s on exception" % (
|
err = "%s changed context from %s to %s on exception" % (
|
||||||
f,
|
f,
|
||||||
start_context,
|
start_context,
|
||||||
|
@ -55,7 +58,10 @@ def do_patch():
|
||||||
|
|
||||||
if not isinstance(res, Deferred) or res.called:
|
if not isinstance(res, Deferred) or res.called:
|
||||||
if LoggingContext.current_context() != start_context:
|
if LoggingContext.current_context() != start_context:
|
||||||
err = "%s changed context from %s to %s" % (
|
for err in changes:
|
||||||
|
print(err, file=sys.stderr)
|
||||||
|
|
||||||
|
err = "Completed %s changed context from %s to %s" % (
|
||||||
f,
|
f,
|
||||||
start_context,
|
start_context,
|
||||||
LoggingContext.current_context(),
|
LoggingContext.current_context(),
|
||||||
|
@ -76,6 +82,8 @@ def do_patch():
|
||||||
|
|
||||||
def check_ctx(r):
|
def check_ctx(r):
|
||||||
if LoggingContext.current_context() != start_context:
|
if LoggingContext.current_context() != start_context:
|
||||||
|
for err in changes:
|
||||||
|
print(err, file=sys.stderr)
|
||||||
err = "%s completion of %s changed context from %s to %s" % (
|
err = "%s completion of %s changed context from %s to %s" % (
|
||||||
"Failure" if isinstance(r, Failure) else "Success",
|
"Failure" if isinstance(r, Failure) else "Success",
|
||||||
f,
|
f,
|
||||||
|
@ -92,3 +100,73 @@ def do_patch():
|
||||||
return wrapped
|
return wrapped
|
||||||
|
|
||||||
defer.inlineCallbacks = new_inline_callbacks
|
defer.inlineCallbacks = new_inline_callbacks
|
||||||
|
|
||||||
|
|
||||||
|
def _check_yield_points(f, changes, start_context):
|
||||||
|
from synapse.logging.context import LoggingContext
|
||||||
|
|
||||||
|
@functools.wraps(f)
|
||||||
|
def check_yield_points_inner(*args, **kwargs):
|
||||||
|
gen = f(*args, **kwargs)
|
||||||
|
|
||||||
|
last_yield_line_no = 1
|
||||||
|
result = None
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
isFailure = isinstance(result, Failure)
|
||||||
|
if isFailure:
|
||||||
|
d = result.throwExceptionIntoGenerator(gen)
|
||||||
|
else:
|
||||||
|
d = gen.send(result)
|
||||||
|
except (StopIteration, defer._DefGen_Return) as e:
|
||||||
|
if LoggingContext.current_context() != start_context:
|
||||||
|
# This happens when the context is lost sometime *after* the
|
||||||
|
# final yield and returning. E.g. we forgot to yield on a
|
||||||
|
# function that returns a deferred.
|
||||||
|
err = (
|
||||||
|
"%s returned and changed context from %s to %s, in %s between %d and end of func"
|
||||||
|
% (
|
||||||
|
f.__qualname__,
|
||||||
|
start_context,
|
||||||
|
LoggingContext.current_context(),
|
||||||
|
f.__code__.co_filename,
|
||||||
|
last_yield_line_no,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
changes.append(err)
|
||||||
|
# print(err, file=sys.stderr)
|
||||||
|
# raise Exception(err)
|
||||||
|
return getattr(e, "value", None)
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = yield d
|
||||||
|
except Exception as e:
|
||||||
|
result = Failure(e)
|
||||||
|
|
||||||
|
frame = gen.gi_frame
|
||||||
|
if frame.f_code.co_name == "check_yield_points_inner":
|
||||||
|
frame = inspect.getgeneratorlocals(gen)["gen"].gi_frame
|
||||||
|
|
||||||
|
if LoggingContext.current_context() != start_context:
|
||||||
|
# This happens because the context is lost sometime *after* the
|
||||||
|
# previous yield and *after* the current yield. E.g. the
|
||||||
|
# deferred we waited on didn't follow the rules, or we forgot to
|
||||||
|
# yield on a function between the two yield points.
|
||||||
|
err = (
|
||||||
|
"%s changed context from %s to %s, happened between lines %d and %d in %s"
|
||||||
|
% (
|
||||||
|
frame.f_code.co_name,
|
||||||
|
start_context,
|
||||||
|
LoggingContext.current_context(),
|
||||||
|
last_yield_line_no,
|
||||||
|
frame.f_lineno,
|
||||||
|
frame.f_code.co_filename,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
changes.append(err)
|
||||||
|
# print(err, file=sys.stderr)
|
||||||
|
# raise Exception(err)
|
||||||
|
|
||||||
|
last_yield_line_no = frame.f_lineno
|
||||||
|
|
||||||
|
return check_yield_points_inner
|
||||||
|
|
Loading…
Reference in a new issue