mirror of
https://mau.dev/maunium/synapse.git
synced 2024-11-18 16:02:15 +01:00
commit
b8eca1ffbf
10 changed files with 244 additions and 48 deletions
|
@ -33,6 +33,7 @@ from synapse.api.urls import (
|
|||
)
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.crypto import context_factory
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
from daemonize import Daemonize
|
||||
import twisted.manhole.telnet
|
||||
|
@ -246,7 +247,7 @@ def setup():
|
|||
daemon = Daemonize(
|
||||
app="synapse-homeserver",
|
||||
pid=config.pid_file,
|
||||
action=reactor.run,
|
||||
action=run,
|
||||
auto_close_fds=False,
|
||||
verbose=True,
|
||||
logger=logger,
|
||||
|
@ -256,6 +257,13 @@ def setup():
|
|||
else:
|
||||
reactor.run()
|
||||
|
||||
def run():
|
||||
with LoggingContext("run"):
|
||||
reactor.run()
|
||||
|
||||
def main():
|
||||
with LoggingContext("main"):
|
||||
setup()
|
||||
|
||||
if __name__ == '__main__':
|
||||
setup()
|
||||
main()
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
from ._base import Config
|
||||
|
||||
from synapse.util.logcontext import LoggingContextFilter
|
||||
from twisted.python.log import PythonLoggingObserver
|
||||
import logging
|
||||
import logging.config
|
||||
|
@ -46,7 +46,8 @@ class LoggingConfig(Config):
|
|||
|
||||
def setup_logging(self):
|
||||
log_format = (
|
||||
'%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s'
|
||||
"%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
|
||||
" - %(message)s"
|
||||
)
|
||||
if self.log_config is None:
|
||||
|
||||
|
@ -54,13 +55,20 @@ class LoggingConfig(Config):
|
|||
if self.verbosity:
|
||||
level = logging.DEBUG
|
||||
|
||||
# FIXME: we need a logging.WARN for a -q quiet option
|
||||
# FIXME: we need a logging.WARN for a -q quiet option
|
||||
logger = logging.getLogger('')
|
||||
logger.setLevel(level)
|
||||
formatter = logging.Formatter(log_format)
|
||||
if self.log_file:
|
||||
handler = logging.FileHandler(self.log_file)
|
||||
else:
|
||||
handler = logging.StreamHandler()
|
||||
handler.setFormatter(formatter)
|
||||
|
||||
logging.basicConfig(
|
||||
level=level,
|
||||
filename=self.log_file,
|
||||
format=log_format
|
||||
)
|
||||
handler.addFilter(LoggingContextFilter(request=""))
|
||||
|
||||
logger.addHandler(handler)
|
||||
logger.info("Test")
|
||||
else:
|
||||
logging.config.fileConfig(self.log_config)
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ from twisted.web.http import HTTPClient
|
|||
from twisted.internet.protocol import Factory
|
||||
from twisted.internet import defer, reactor
|
||||
from synapse.http.endpoint import matrix_endpoint
|
||||
from synapse.util.logcontext import PreserveLoggingContext
|
||||
import json
|
||||
import logging
|
||||
|
||||
|
@ -36,10 +37,11 @@ def fetch_server_key(server_name, ssl_context_factory):
|
|||
|
||||
for i in range(5):
|
||||
try:
|
||||
protocol = yield endpoint.connect(factory)
|
||||
server_response, server_certificate = yield protocol.remote_key
|
||||
defer.returnValue((server_response, server_certificate))
|
||||
return
|
||||
with PreserveLoggingContext():
|
||||
protocol = yield endpoint.connect(factory)
|
||||
server_response, server_certificate = yield protocol.remote_key
|
||||
defer.returnValue((server_response, server_certificate))
|
||||
return
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
raise IOError("Cannot get key for %s" % server_name)
|
||||
|
|
|
@ -23,6 +23,7 @@ from twisted.web.http_headers import Headers
|
|||
|
||||
from synapse.http.endpoint import matrix_endpoint
|
||||
from synapse.util.async import sleep
|
||||
from synapse.util.logcontext import PreserveLoggingContext
|
||||
|
||||
from syutil.jsonutil import encode_canonical_json
|
||||
|
||||
|
@ -108,16 +109,17 @@ class BaseHttpClient(object):
|
|||
producer = body_callback(method, url_bytes, headers_dict)
|
||||
|
||||
try:
|
||||
response = yield self.agent.request(
|
||||
destination,
|
||||
endpoint,
|
||||
method,
|
||||
path_bytes,
|
||||
param_bytes,
|
||||
query_bytes,
|
||||
Headers(headers_dict),
|
||||
producer
|
||||
)
|
||||
with PreserveLoggingContext():
|
||||
response = yield self.agent.request(
|
||||
destination,
|
||||
endpoint,
|
||||
method,
|
||||
path_bytes,
|
||||
param_bytes,
|
||||
query_bytes,
|
||||
Headers(headers_dict),
|
||||
producer
|
||||
)
|
||||
|
||||
logger.debug("Got response to %s", method)
|
||||
break
|
||||
|
|
|
@ -20,6 +20,7 @@ from syutil.jsonutil import (
|
|||
from synapse.api.errors import (
|
||||
cs_exception, SynapseError, CodeMessageException
|
||||
)
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.web import server, resource
|
||||
|
@ -88,9 +89,19 @@ class JsonResource(HttpServer, resource.Resource):
|
|||
def render(self, request):
|
||||
""" This get's called by twisted every time someone sends us a request.
|
||||
"""
|
||||
self._async_render(request)
|
||||
self._async_render_with_logging_context(request)
|
||||
return server.NOT_DONE_YET
|
||||
|
||||
_request_id = 0
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _async_render_with_logging_context(self, request):
|
||||
request_id = "%s-%s" % (request.method, JsonResource._request_id)
|
||||
JsonResource._request_id += 1
|
||||
with LoggingContext(request_id) as request_context:
|
||||
request_context.request = request_id
|
||||
yield self._async_render(request)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _async_render(self, request):
|
||||
""" This get's called by twisted every time someone sends us a request.
|
||||
|
|
|
@ -17,8 +17,11 @@ import logging
|
|||
from synapse.api.errors import StoreError
|
||||
from synapse.api.events.utils import prune_event
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.util.logcontext import PreserveLoggingContext, LoggingContext
|
||||
from syutil.base64util import encode_base64
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import collections
|
||||
import copy
|
||||
import json
|
||||
|
@ -84,32 +87,40 @@ class SQLBaseStore(object):
|
|||
self.event_factory = hs.get_event_factory()
|
||||
self._clock = hs.get_clock()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def runInteraction(self, desc, func, *args, **kwargs):
|
||||
"""Wraps the .runInteraction() method on the underlying db_pool."""
|
||||
current_context = LoggingContext.current_context()
|
||||
def inner_func(txn, *args, **kwargs):
|
||||
start = time.clock() * 1000
|
||||
txn_id = SQLBaseStore._TXN_ID
|
||||
with LoggingContext("runInteraction") as context:
|
||||
current_context.copy_to(context)
|
||||
start = time.clock() * 1000
|
||||
txn_id = SQLBaseStore._TXN_ID
|
||||
|
||||
# We don't really need these to be unique, so lets stop it from
|
||||
# growing really large.
|
||||
self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1)
|
||||
# We don't really need these to be unique, so lets stop it from
|
||||
# growing really large.
|
||||
self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1)
|
||||
|
||||
name = "%s-%x" % (desc, txn_id, )
|
||||
name = "%s-%x" % (desc, txn_id, )
|
||||
|
||||
transaction_logger.debug("[TXN START] {%s}", name)
|
||||
try:
|
||||
return func(LoggingTransaction(txn, name), *args, **kwargs)
|
||||
except:
|
||||
logger.exception("[TXN FAIL] {%s}", name)
|
||||
raise
|
||||
finally:
|
||||
end = time.clock() * 1000
|
||||
transaction_logger.debug(
|
||||
"[TXN END] {%s} %f",
|
||||
name, end - start
|
||||
)
|
||||
transaction_logger.debug("[TXN START] {%s}", name)
|
||||
try:
|
||||
return func(LoggingTransaction(txn, name), *args, **kwargs)
|
||||
except:
|
||||
logger.exception("[TXN FAIL] {%s}", name)
|
||||
raise
|
||||
finally:
|
||||
end = time.clock() * 1000
|
||||
transaction_logger.debug(
|
||||
"[TXN END] {%s} %f",
|
||||
name, end - start
|
||||
)
|
||||
|
||||
return self._db_pool.runInteraction(inner_func, *args, **kwargs)
|
||||
with PreserveLoggingContext():
|
||||
result = yield self._db_pool.runInteraction(
|
||||
inner_func, *args, **kwargs
|
||||
)
|
||||
defer.returnValue(result)
|
||||
|
||||
def cursor_to_dict(self, cursor):
|
||||
"""Converts a SQL cursor into an list of dicts.
|
||||
|
@ -177,7 +188,7 @@ class SQLBaseStore(object):
|
|||
)
|
||||
|
||||
logger.debug(
|
||||
"[SQL] %s Args=%s Func=%s",
|
||||
"[SQL] %s Args=%s",
|
||||
sql, values.values(),
|
||||
)
|
||||
|
||||
|
|
|
@ -16,15 +16,17 @@
|
|||
|
||||
from twisted.internet import defer, reactor
|
||||
|
||||
from .logcontext import PreserveLoggingContext
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def sleep(seconds):
|
||||
d = defer.Deferred()
|
||||
reactor.callLater(seconds, d.callback, seconds)
|
||||
return d
|
||||
|
||||
with PreserveLoggingContext():
|
||||
yield d
|
||||
|
||||
def run_on_reactor():
|
||||
""" This will cause the rest of the function to be invoked upon the next
|
||||
iteration of the main loop
|
||||
"""
|
||||
return sleep(0)
|
||||
return sleep(0)
|
||||
|
|
108
synapse/util/logcontext.py
Normal file
108
synapse/util/logcontext.py
Normal file
|
@ -0,0 +1,108 @@
|
|||
import threading
|
||||
import logging
|
||||
|
||||
|
||||
class LoggingContext(object):
|
||||
"""Additional context for log formatting. Contexts are scoped within a
|
||||
"with" block. Contexts inherit the state of their parent contexts.
|
||||
Args:
|
||||
name (str): Name for the context for debugging.
|
||||
"""
|
||||
|
||||
__slots__ = ["parent_context", "name", "__dict__"]
|
||||
|
||||
thread_local = threading.local()
|
||||
|
||||
class Sentinel(object):
|
||||
"""Sentinel to represent the root context"""
|
||||
|
||||
__slots__ = []
|
||||
|
||||
def copy_to(self, record):
|
||||
pass
|
||||
|
||||
sentinel = Sentinel()
|
||||
|
||||
def __init__(self, name=None):
|
||||
self.parent_context = None
|
||||
self.name = name
|
||||
|
||||
def __str__(self):
|
||||
return "%s@%x" % (self.name, id(self))
|
||||
|
||||
@classmethod
|
||||
def current_context(cls):
|
||||
"""Get the current logging context from thread local storage"""
|
||||
return getattr(cls.thread_local, "current_context", cls.sentinel)
|
||||
|
||||
def __enter__(self):
|
||||
"""Enters this logging context into thread local storage"""
|
||||
if self.parent_context is not None:
|
||||
raise Exception("Attempt to enter logging context multiple times")
|
||||
self.parent_context = self.current_context()
|
||||
self.thread_local.current_context = self
|
||||
return self
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
"""Restore the logging context in thread local storage to the state it
|
||||
was before this context was entered.
|
||||
Returns:
|
||||
None to avoid suppressing any exeptions that were thrown.
|
||||
"""
|
||||
if self.thread_local.current_context is not self:
|
||||
logging.error(
|
||||
"Current logging context %s is not the expected context %s",
|
||||
self.thread_local.current_context,
|
||||
self
|
||||
)
|
||||
self.thread_local.current_context = self.parent_context
|
||||
self.parent_context = None
|
||||
|
||||
def __getattr__(self, name):
|
||||
"""Delegate member lookup to parent context"""
|
||||
return getattr(self.parent_context, name)
|
||||
|
||||
def copy_to(self, record):
|
||||
"""Copy fields from this context and its parents to the record"""
|
||||
if self.parent_context is not None:
|
||||
self.parent_context.copy_to(record)
|
||||
for key, value in self.__dict__.items():
|
||||
setattr(record, key, value)
|
||||
|
||||
|
||||
class LoggingContextFilter(logging.Filter):
|
||||
"""Logging filter that adds values from the current logging context to each
|
||||
record.
|
||||
Args:
|
||||
**defaults: Default values to avoid formatters complaining about
|
||||
missing fields
|
||||
"""
|
||||
def __init__(self, **defaults):
|
||||
self.defaults = defaults
|
||||
|
||||
def filter(self, record):
|
||||
"""Add each fields from the logging contexts to the record.
|
||||
Returns:
|
||||
True to include the record in the log output.
|
||||
"""
|
||||
context = LoggingContext.current_context()
|
||||
for key, value in self.defaults.items():
|
||||
setattr(record, key, value)
|
||||
context.copy_to(record)
|
||||
return True
|
||||
|
||||
|
||||
class PreserveLoggingContext(object):
|
||||
"""Captures the current logging context and restores it when the scope is
|
||||
exited. Used to restore the context after a function using
|
||||
@defer.inlineCallbacks is resumed by a callback from the reactor."""
|
||||
|
||||
__slots__ = ["current_context"]
|
||||
|
||||
def __enter__(self):
|
||||
"""Captures the current logging context"""
|
||||
self.current_context = LoggingContext.current_context()
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
"""Restores the current logging context"""
|
||||
LoggingContext.thread_local.current_context = self.current_context
|
|
@ -75,6 +75,7 @@ def trace_function(f):
|
|||
linenum = f.func_code.co_firstlineno
|
||||
pathname = f.func_code.co_filename
|
||||
|
||||
@wraps(f)
|
||||
def wrapped(*args, **kwargs):
|
||||
name = f.__module__
|
||||
logger = logging.getLogger(name)
|
||||
|
|
43
tests/util/test_log_context.py
Normal file
43
tests/util/test_log_context.py
Normal file
|
@ -0,0 +1,43 @@
|
|||
from twisted.internet import defer
|
||||
from twisted.internet import reactor
|
||||
from .. import unittest
|
||||
|
||||
from synapse.util.async import sleep
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
class LoggingContextTestCase(unittest.TestCase):
|
||||
|
||||
def _check_test_key(self, value):
|
||||
self.assertEquals(
|
||||
LoggingContext.current_context().test_key, value
|
||||
)
|
||||
|
||||
def test_with_context(self):
|
||||
with LoggingContext() as context_one:
|
||||
context_one.test_key = "test"
|
||||
self._check_test_key("test")
|
||||
|
||||
def test_chaining(self):
|
||||
with LoggingContext() as context_one:
|
||||
context_one.test_key = "one"
|
||||
with LoggingContext() as context_two:
|
||||
self._check_test_key("one")
|
||||
context_two.test_key = "two"
|
||||
self._check_test_key("two")
|
||||
self._check_test_key("one")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_sleep(self):
|
||||
@defer.inlineCallbacks
|
||||
def competing_callback():
|
||||
with LoggingContext() as competing_context:
|
||||
competing_context.test_key = "competing"
|
||||
yield sleep(0)
|
||||
self._check_test_key("competing")
|
||||
|
||||
reactor.callLater(0, competing_callback)
|
||||
|
||||
with LoggingContext() as context_one:
|
||||
context_one.test_key = "one"
|
||||
yield sleep(0)
|
||||
self._check_test_key("one")
|
Loading…
Reference in a new issue