0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-12-14 07:53:47 +01:00

Opentracing across workers (#5771)

Propagate opentracing contexts across workers


Also includes some Convenience modifications to opentracing for servlets, notably:
- Add boolean to skip the whitelisting check on inject
  extract methods. - useful when injecting into carriers
  locally. Otherwise we'd always have to include our
  own servername and whitelist our servername
- start_active_span_from_request instead of header
- Add boolean to decide whether to extract context
  from a request to a servlet
This commit is contained in:
Jorik Schellekens 2019-08-22 18:08:07 +01:00 committed by GitHub
parent dbd46decad
commit 812ed6b0d5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 123 additions and 83 deletions

1
changelog.d/5771.feature Normal file
View file

@ -0,0 +1 @@
Make Opentracing work in worker mode.

View file

@ -38,7 +38,12 @@ from synapse.http.servlet import (
parse_string_from_args, parse_string_from_args,
) )
from synapse.logging.context import run_in_background from synapse.logging.context import run_in_background
from synapse.logging.opentracing import start_active_span_from_context, tags from synapse.logging.opentracing import (
start_active_span,
start_active_span_from_request,
tags,
whitelisted_homeserver,
)
from synapse.types import ThirdPartyInstanceID, get_domain_from_id from synapse.types import ThirdPartyInstanceID, get_domain_from_id
from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
@ -288,11 +293,7 @@ class BaseFederationServlet(object):
logger.warn("authenticate_request failed: %s", e) logger.warn("authenticate_request failed: %s", e)
raise raise
# Start an opentracing span request_tags = {
with start_active_span_from_context(
request.requestHeaders,
"incoming-federation-request",
tags={
"request_id": request.get_request_id(), "request_id": request.get_request_id(),
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER, tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
tags.HTTP_METHOD: request.get_method(), tags.HTTP_METHOD: request.get_method(),
@ -300,8 +301,20 @@ class BaseFederationServlet(object):
tags.PEER_HOST_IPV6: request.getClientIP(), tags.PEER_HOST_IPV6: request.getClientIP(),
"authenticated_entity": origin, "authenticated_entity": origin,
"servlet_name": request.request_metrics.name, "servlet_name": request.request_metrics.name,
}, }
):
# Only accept the span context if the origin is authenticated
# and whitelisted
if origin and whitelisted_homeserver(origin):
scope = start_active_span_from_request(
request, "incoming-federation-request", tags=request_tags
)
else:
scope = start_active_span(
"incoming-federation-request", tags=request_tags
)
with scope:
if origin: if origin:
with ratelimiter.ratelimit(origin) as d: with ratelimiter.ratelimit(origin) as d:
await d await d

View file

@ -300,7 +300,7 @@ class RestServlet(object):
http_server.register_paths( http_server.register_paths(
method, method,
patterns, patterns,
trace_servlet(servlet_classname, method_handler), trace_servlet(servlet_classname)(method_handler),
servlet_classname, servlet_classname,
) )

View file

@ -174,10 +174,48 @@ from twisted.internet import defer
from synapse.config import ConfigError from synapse.config import ConfigError
# Helper class
class _DummyTagNames(object):
"""wrapper of opentracings tags. We need to have them if we
want to reference them without opentracing around. Clearly they
should never actually show up in a trace. `set_tags` overwrites
these with the correct ones."""
INVALID_TAG = "invalid-tag"
COMPONENT = INVALID_TAG
DATABASE_INSTANCE = INVALID_TAG
DATABASE_STATEMENT = INVALID_TAG
DATABASE_TYPE = INVALID_TAG
DATABASE_USER = INVALID_TAG
ERROR = INVALID_TAG
HTTP_METHOD = INVALID_TAG
HTTP_STATUS_CODE = INVALID_TAG
HTTP_URL = INVALID_TAG
MESSAGE_BUS_DESTINATION = INVALID_TAG
PEER_ADDRESS = INVALID_TAG
PEER_HOSTNAME = INVALID_TAG
PEER_HOST_IPV4 = INVALID_TAG
PEER_HOST_IPV6 = INVALID_TAG
PEER_PORT = INVALID_TAG
PEER_SERVICE = INVALID_TAG
SAMPLING_PRIORITY = INVALID_TAG
SERVICE = INVALID_TAG
SPAN_KIND = INVALID_TAG
SPAN_KIND_CONSUMER = INVALID_TAG
SPAN_KIND_PRODUCER = INVALID_TAG
SPAN_KIND_RPC_CLIENT = INVALID_TAG
SPAN_KIND_RPC_SERVER = INVALID_TAG
try: try:
import opentracing import opentracing
tags = opentracing.tags
except ImportError: except ImportError:
opentracing = None opentracing = None
tags = _DummyTagNames
try: try:
from jaeger_client import Config as JaegerConfig from jaeger_client import Config as JaegerConfig
from synapse.logging.scopecontextmanager import LogContextScopeManager from synapse.logging.scopecontextmanager import LogContextScopeManager
@ -252,10 +290,6 @@ def init_tracer(config):
scope_manager=LogContextScopeManager(config), scope_manager=LogContextScopeManager(config),
).initialize_tracer() ).initialize_tracer()
# Set up tags to be opentracing's tags
global tags
tags = opentracing.tags
# Whitelisting # Whitelisting
@ -334,8 +368,8 @@ def start_active_span_follows_from(operation_name, contexts):
return scope return scope
def start_active_span_from_context( def start_active_span_from_request(
headers, request,
operation_name, operation_name,
references=None, references=None,
tags=None, tags=None,
@ -344,9 +378,9 @@ def start_active_span_from_context(
finish_on_close=True, finish_on_close=True,
): ):
""" """
Extracts a span context from Twisted Headers. Extracts a span context from a Twisted Request.
args: args:
headers (twisted.web.http_headers.Headers) headers (twisted.web.http.Request)
For the other args see opentracing.tracer For the other args see opentracing.tracer
@ -360,7 +394,9 @@ def start_active_span_from_context(
if opentracing is None: if opentracing is None:
return _noop_context_manager() return _noop_context_manager()
header_dict = {k.decode(): v[0].decode() for k, v in headers.getAllRawHeaders()} header_dict = {
k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
}
context = opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict) context = opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict)
return opentracing.tracer.start_active_span( return opentracing.tracer.start_active_span(
@ -448,7 +484,7 @@ def set_operation_name(operation_name):
@only_if_tracing @only_if_tracing
def inject_active_span_twisted_headers(headers, destination): def inject_active_span_twisted_headers(headers, destination, check_destination=True):
""" """
Injects a span context into twisted headers in-place Injects a span context into twisted headers in-place
@ -467,7 +503,7 @@ def inject_active_span_twisted_headers(headers, destination):
https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
""" """
if not whitelisted_homeserver(destination): if check_destination and not whitelisted_homeserver(destination):
return return
span = opentracing.tracer.active_span span = opentracing.tracer.active_span
@ -479,7 +515,7 @@ def inject_active_span_twisted_headers(headers, destination):
@only_if_tracing @only_if_tracing
def inject_active_span_byte_dict(headers, destination): def inject_active_span_byte_dict(headers, destination, check_destination=True):
""" """
Injects a span context into a dict where the headers are encoded as byte Injects a span context into a dict where the headers are encoded as byte
strings strings
@ -511,7 +547,7 @@ def inject_active_span_byte_dict(headers, destination):
@only_if_tracing @only_if_tracing
def inject_active_span_text_map(carrier, destination=None): def inject_active_span_text_map(carrier, destination, check_destination=True):
""" """
Injects a span context into a dict Injects a span context into a dict
@ -532,7 +568,7 @@ def inject_active_span_text_map(carrier, destination=None):
https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
""" """
if destination and not whitelisted_homeserver(destination): if check_destination and not whitelisted_homeserver(destination):
return return
opentracing.tracer.inject( opentracing.tracer.inject(
@ -689,65 +725,43 @@ def tag_args(func):
return _tag_args_inner return _tag_args_inner
def trace_servlet(servlet_name, func): def trace_servlet(servlet_name, extract_context=False):
"""Decorator which traces a serlet. It starts a span with some servlet specific """Decorator which traces a serlet. It starts a span with some servlet specific
tags such as the servlet_name and request information""" tags such as the servlet_name and request information
Args:
servlet_name (str): The name to be used for the span's operation_name
extract_context (bool): Whether to attempt to extract the opentracing
context from the request the servlet is handling.
"""
def _trace_servlet_inner_1(func):
if not opentracing: if not opentracing:
return func return func
@wraps(func) @wraps(func)
@defer.inlineCallbacks @defer.inlineCallbacks
def _trace_servlet_inner(request, *args, **kwargs): def _trace_servlet_inner(request, *args, **kwargs):
with start_active_span( request_tags = {
"incoming-client-request",
tags={
"request_id": request.get_request_id(), "request_id": request.get_request_id(),
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER, tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
tags.HTTP_METHOD: request.get_method(), tags.HTTP_METHOD: request.get_method(),
tags.HTTP_URL: request.get_redacted_uri(), tags.HTTP_URL: request.get_redacted_uri(),
tags.PEER_HOST_IPV6: request.getClientIP(), tags.PEER_HOST_IPV6: request.getClientIP(),
"servlet_name": servlet_name, }
},
): if extract_context:
scope = start_active_span_from_request(
request, servlet_name, tags=request_tags
)
else:
scope = start_active_span(servlet_name, tags=request_tags)
with scope:
result = yield defer.maybeDeferred(func, request, *args, **kwargs) result = yield defer.maybeDeferred(func, request, *args, **kwargs)
return result return result
return _trace_servlet_inner return _trace_servlet_inner
return _trace_servlet_inner_1
# Helper class
class _DummyTagNames(object):
"""wrapper of opentracings tags. We need to have them if we
want to reference them without opentracing around. Clearly they
should never actually show up in a trace. `set_tags` overwrites
these with the correct ones."""
INVALID_TAG = "invalid-tag"
COMPONENT = INVALID_TAG
DATABASE_INSTANCE = INVALID_TAG
DATABASE_STATEMENT = INVALID_TAG
DATABASE_TYPE = INVALID_TAG
DATABASE_USER = INVALID_TAG
ERROR = INVALID_TAG
HTTP_METHOD = INVALID_TAG
HTTP_STATUS_CODE = INVALID_TAG
HTTP_URL = INVALID_TAG
MESSAGE_BUS_DESTINATION = INVALID_TAG
PEER_ADDRESS = INVALID_TAG
PEER_HOSTNAME = INVALID_TAG
PEER_HOST_IPV4 = INVALID_TAG
PEER_HOST_IPV6 = INVALID_TAG
PEER_PORT = INVALID_TAG
PEER_SERVICE = INVALID_TAG
SAMPLING_PRIORITY = INVALID_TAG
SERVICE = INVALID_TAG
SPAN_KIND = INVALID_TAG
SPAN_KIND_CONSUMER = INVALID_TAG
SPAN_KIND_PRODUCER = INVALID_TAG
SPAN_KIND_RPC_CLIENT = INVALID_TAG
SPAN_KIND_RPC_SERVER = INVALID_TAG
tags = _DummyTagNames

View file

@ -22,6 +22,7 @@ from six.moves import urllib
from twisted.internet import defer from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.api.errors import ( from synapse.api.errors import (
CodeMessageException, CodeMessageException,
HttpResponseException, HttpResponseException,
@ -165,8 +166,12 @@ class ReplicationEndpoint(object):
# have a good idea that the request has either succeeded or failed on # have a good idea that the request has either succeeded or failed on
# the master, and so whether we should clean up or not. # the master, and so whether we should clean up or not.
while True: while True:
headers = {}
opentracing.inject_active_span_byte_dict(
headers, None, check_destination=False
)
try: try:
result = yield request_func(uri, data) result = yield request_func(uri, data, headers=headers)
break break
except CodeMessageException as e: except CodeMessageException as e:
if e.code != 504 or not cls.RETRY_ON_TIMEOUT: if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
@ -205,7 +210,14 @@ class ReplicationEndpoint(object):
args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args) args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args)
pattern = re.compile("^/_synapse/replication/%s/%s$" % (self.NAME, args)) pattern = re.compile("^/_synapse/replication/%s/%s$" % (self.NAME, args))
http_server.register_paths(method, [pattern], handler, self.__class__.__name__) http_server.register_paths(
method,
[pattern],
opentracing.trace_servlet(self.__class__.__name__, extract_context=True)(
handler
),
self.__class__.__name__,
)
def _cached_handler(self, request, txn_id, **kwargs): def _cached_handler(self, request, txn_id, **kwargs):
"""Called on new incoming requests when caching is enabled. Checks """Called on new incoming requests when caching is enabled. Checks