Merge pull request #187 from matrix-org/erikj/sanitize_logging

Sanitize logging
This commit is contained in:
Erik Johnston 2015-06-19 11:35:59 +01:00
commit 0e58d19163
7 changed files with 209 additions and 127 deletions

View file

@ -370,6 +370,8 @@ class Auth(object):
user_agent=user_agent user_agent=user_agent
) )
request.authenticated_entity = user.to_string()
defer.returnValue((user, ClientInfo(device_id, token_id))) defer.returnValue((user, ClientInfo(device_id, token_id)))
except KeyError: except KeyError:
raise AuthError( raise AuthError(

View file

@ -35,7 +35,6 @@ from twisted.enterprise import adbapi
from twisted.web.resource import Resource, EncodingResourceWrapper from twisted.web.resource import Resource, EncodingResourceWrapper
from twisted.web.static import File from twisted.web.static import File
from twisted.web.server import Site, GzipEncoderFactory, Request from twisted.web.server import Site, GzipEncoderFactory, Request
from twisted.web.http import proxiedLogFormatter, combinedLogFormatter
from synapse.http.server import JsonResource, RootRedirect from synapse.http.server import JsonResource, RootRedirect
from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.rest.media.v1.media_repository import MediaRepositoryResource from synapse.rest.media.v1.media_repository import MediaRepositoryResource
@ -61,10 +60,13 @@ import twisted.manhole.telnet
import synapse import synapse
import contextlib
import logging import logging
import os import os
import re
import resource import resource
import subprocess import subprocess
import time
logger = logging.getLogger("synapse.app.homeserver") logger = logging.getLogger("synapse.app.homeserver")
@ -142,6 +144,7 @@ class SynapseHomeServer(HomeServer):
port = listener_config["port"] port = listener_config["port"]
bind_address = listener_config.get("bind_address", "") bind_address = listener_config.get("bind_address", "")
tls = listener_config.get("tls", False) tls = listener_config.get("tls", False)
site_tag = listener_config.get("tag", port)
if tls and config.no_tls: if tls and config.no_tls:
return return
@ -197,7 +200,8 @@ class SynapseHomeServer(HomeServer):
reactor.listenSSL( reactor.listenSSL(
port, port,
SynapseSite( SynapseSite(
"synapse.access.https", "synapse.access.https.%s" % (site_tag,),
site_tag,
listener_config, listener_config,
root_resource, root_resource,
), ),
@ -208,7 +212,8 @@ class SynapseHomeServer(HomeServer):
reactor.listenTCP( reactor.listenTCP(
port, port,
SynapseSite( SynapseSite(
"synapse.access.https", "synapse.access.http.%s" % (site_tag,),
site_tag,
listener_config, listener_config,
root_resource, root_resource,
), ),
@ -432,9 +437,70 @@ class SynapseService(service.Service):
return self._port.stopListening() return self._port.stopListening()
class XForwardedForRequest(Request): class SynapseRequest(Request):
def __init__(self, *args, **kw): def __init__(self, site, *args, **kw):
Request.__init__(self, *args, **kw) Request.__init__(self, *args, **kw)
self.site = site
self.authenticated_entity = None
self.start_time = 0
def __repr__(self):
# We overwrite this so that we don't log ``access_token``
return '<%s at 0x%x method=%s uri=%s clientproto=%s site=%s>' % (
self.__class__.__name__,
id(self),
self.method,
self.get_redacted_uri(),
self.clientproto,
self.site.site_tag,
)
def get_redacted_uri(self):
return re.sub(
r'(\?.*access_token=)[^&]*(.*)$',
r'\1<redacted>\2',
self.uri
)
def get_user_agent(self):
return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1]
def started_processing(self):
self.site.access_logger.info(
"%s - %s - Received request: %s %s",
self.getClientIP(),
self.site.site_tag,
self.method,
self.get_redacted_uri()
)
self.start_time = int(time.time() * 1000)
def finished_processing(self):
self.site.access_logger.info(
"%s - %s - {%s}"
" Processed request: %dms %sB %s \"%s %s %s\" \"%s\"",
self.getClientIP(),
self.site.site_tag,
self.authenticated_entity,
int(time.time() * 1000) - self.start_time,
self.sentLength,
self.code,
self.method,
self.get_redacted_uri(),
self.clientproto,
self.get_user_agent(),
)
@contextlib.contextmanager
def processing(self):
self.started_processing()
yield
self.finished_processing()
class XForwardedForRequest(SynapseRequest):
def __init__(self, *args, **kw):
SynapseRequest.__init__(self, *args, **kw)
""" """
Add a layer on top of another request that only uses the value of an Add a layer on top of another request that only uses the value of an
@ -450,8 +516,16 @@ class XForwardedForRequest(Request):
b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip() b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip()
def XForwardedFactory(*args, **kwargs): class SynapseRequestFactory(object):
return XForwardedForRequest(*args, **kwargs) def __init__(self, site, x_forwarded_for):
self.site = site
self.x_forwarded_for = x_forwarded_for
def __call__(self, *args, **kwargs):
if self.x_forwarded_for:
return XForwardedForRequest(self.site, *args, **kwargs)
else:
return SynapseRequest(self.site, *args, **kwargs)
class SynapseSite(Site): class SynapseSite(Site):
@ -459,18 +533,17 @@ class SynapseSite(Site):
Subclass of a twisted http Site that does access logging with python's Subclass of a twisted http Site that does access logging with python's
standard logging standard logging
""" """
def __init__(self, logger_name, config, resource, *args, **kwargs): def __init__(self, logger_name, site_tag, config, resource, *args, **kwargs):
Site.__init__(self, resource, *args, **kwargs) Site.__init__(self, resource, *args, **kwargs)
if config.get("x_forwarded", False):
self.requestFactory = XForwardedFactory self.site_tag = site_tag
self._log_formatter = proxiedLogFormatter
else: proxied = config.get("x_forwarded", False)
self._log_formatter = combinedLogFormatter self.requestFactory = SynapseRequestFactory(self, proxied)
self.access_logger = logging.getLogger(logger_name) self.access_logger = logging.getLogger(logger_name)
def log(self, request): def log(self, request):
line = self._log_formatter(self._logDateTime, request) pass
self.access_logger.info(line)
def create_resource_tree(desired_tree, redirect_root_to_web_client=True): def create_resource_tree(desired_tree, redirect_root_to_web_client=True):

View file

@ -94,6 +94,7 @@ class TransportLayerServer(object):
yield self.keyring.verify_json_for_server(origin, json_request) yield self.keyring.verify_json_for_server(origin, json_request)
logger.info("Request from %s", origin) logger.info("Request from %s", origin)
request.authenticated_entity = origin
defer.returnValue((origin, content)) defer.returnValue((origin, content))

View file

@ -61,21 +61,31 @@ class SimpleHttpClient(object):
self.agent = Agent(reactor, pool=pool) self.agent = Agent(reactor, pool=pool)
self.version_string = hs.version_string self.version_string = hs.version_string
def request(self, method, *args, **kwargs): def request(self, method, uri, *args, **kwargs):
# A small wrapper around self.agent.request() so we can easily attach # A small wrapper around self.agent.request() so we can easily attach
# counters to it # counters to it
outgoing_requests_counter.inc(method) outgoing_requests_counter.inc(method)
d = preserve_context_over_fn( d = preserve_context_over_fn(
self.agent.request, self.agent.request,
method, *args, **kwargs method, uri, *args, **kwargs
) )
logger.info("Sending request %s %s", method, uri)
def _cb(response): def _cb(response):
incoming_responses_counter.inc(method, response.code) incoming_responses_counter.inc(method, response.code)
logger.info(
"Received response to %s %s: %s",
method, uri, response.code
)
return response return response
def _eb(failure): def _eb(failure):
incoming_responses_counter.inc(method, "ERR") incoming_responses_counter.inc(method, "ERR")
logger.info(
"Error sending request to %s %s: %s %s",
method, uri, failure.type, failure.getErrorMessage()
)
return failure return failure
d.addCallbacks(_cb, _eb) d.addCallbacks(_cb, _eb)
@ -84,7 +94,9 @@ class SimpleHttpClient(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def post_urlencoded_get_json(self, uri, args={}): def post_urlencoded_get_json(self, uri, args={}):
# TODO: Do we ever want to log message contents?
logger.debug("post_urlencoded_get_json args: %s", args) logger.debug("post_urlencoded_get_json args: %s", args)
query_bytes = urllib.urlencode(args, True) query_bytes = urllib.urlencode(args, True)
response = yield self.request( response = yield self.request(
@ -105,7 +117,7 @@ class SimpleHttpClient(object):
def post_json_get_json(self, uri, post_json): def post_json_get_json(self, uri, post_json):
json_str = encode_canonical_json(post_json) json_str = encode_canonical_json(post_json)
logger.info("HTTP POST %s -> %s", json_str, uri) logger.debug("HTTP POST %s -> %s", json_str, uri)
response = yield self.request( response = yield self.request(
"POST", "POST",

View file

@ -35,11 +35,13 @@ from syutil.crypto.jsonsign import sign_json
import simplejson as json import simplejson as json
import logging import logging
import sys
import urllib import urllib
import urlparse import urlparse
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
outbound_logger = logging.getLogger("synapse.http.outbound")
metrics = synapse.metrics.get_metrics_for(__name__) metrics = synapse.metrics.get_metrics_for(__name__)
@ -109,6 +111,8 @@ class MatrixFederationHttpClient(object):
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.version_string = hs.version_string self.version_string = hs.version_string
self._next_id = 1
@defer.inlineCallbacks @defer.inlineCallbacks
def _create_request(self, destination, method, path_bytes, def _create_request(self, destination, method, path_bytes,
body_callback, headers_dict={}, param_bytes=b"", body_callback, headers_dict={}, param_bytes=b"",
@ -123,16 +127,12 @@ class MatrixFederationHttpClient(object):
("", "", path_bytes, param_bytes, query_bytes, "",) ("", "", path_bytes, param_bytes, query_bytes, "",)
) )
logger.info("Sending request to %s: %s %s", txn_id = "%s-%s" % (method, self._next_id)
destination, method, url_bytes) self._next_id = (self._next_id + 1) % (sys.maxint - 1)
logger.debug( outbound_logger.info(
"Types: %s", "{%s} [%s] Sending request: %s %s",
[ txn_id, destination, method, url_bytes
type(destination), type(method), type(path_bytes),
type(param_bytes),
type(query_bytes)
]
) )
# XXX: Would be much nicer to retry only at the transaction-layer # XXX: Would be much nicer to retry only at the transaction-layer
@ -141,63 +141,71 @@ class MatrixFederationHttpClient(object):
endpoint = self._getEndpoint(reactor, destination) endpoint = self._getEndpoint(reactor, destination)
while True: log_result = None
producer = None try:
if body_callback: while True:
producer = body_callback(method, url_bytes, headers_dict) producer = None
if body_callback:
producer = body_callback(method, url_bytes, headers_dict)
try: try:
request_deferred = preserve_context_over_fn( request_deferred = preserve_context_over_fn(
self.agent.request, self.agent.request,
destination,
endpoint,
method,
path_bytes,
param_bytes,
query_bytes,
Headers(headers_dict),
producer
)
response = yield self.clock.time_bound_deferred(
request_deferred,
time_out=timeout/1000. if timeout else 60,
)
logger.debug("Got response to %s", method)
break
except Exception as e:
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
logger.warn(
"DNS Lookup failed to %s with %s",
destination, destination,
e endpoint,
method,
path_bytes,
param_bytes,
query_bytes,
Headers(headers_dict),
producer
) )
raise
logger.warn( response = yield self.clock.time_bound_deferred(
"Sending request failed to %s: %s %s: %s - %s", request_deferred,
destination, time_out=timeout/1000. if timeout else 60,
method, )
url_bytes,
type(e).__name__,
_flatten_response_never_received(e),
)
if retries_left and not timeout: log_result = "%d %s" % (response.code, response.phrase,)
yield sleep(2 ** (5 - retries_left)) break
retries_left -= 1 except Exception as e:
else: if not retry_on_dns_fail and isinstance(e, DNSLookupError):
raise logger.warn(
"DNS Lookup failed to %s with %s",
destination,
e
)
log_result = "DNS Lookup failed to %s with %s" % (
destination, e
)
raise
logger.info( logger.warn(
"Received response %d %s for %s: %s %s", "{%s} Sending request failed to %s: %s %s: %s - %s",
response.code, txn_id,
response.phrase, destination,
destination, method,
method, url_bytes,
url_bytes type(e).__name__,
) _flatten_response_never_received(e),
)
log_result = "%s - %s" % (
type(e).__name__, _flatten_response_never_received(e),
)
if retries_left and not timeout:
yield sleep(2 ** (5 - retries_left))
retries_left -= 1
else:
raise
finally:
outbound_logger.info(
"{%s} [%s] Result: %s",
txn_id,
destination,
log_result,
)
if 200 <= response.code < 300: if 200 <= response.code < 300:
pass pass

View file

@ -79,53 +79,39 @@ def request_handler(request_handler):
_next_request_id += 1 _next_request_id += 1
with LoggingContext(request_id) as request_context: with LoggingContext(request_id) as request_context:
request_context.request = request_id request_context.request = request_id
code = None with request.processing():
start = self.clock.time_msec() try:
try: d = request_handler(self, request)
logger.info( with PreserveLoggingContext():
"Received request: %s %s", yield d
request.method, request.path except CodeMessageException as e:
) code = e.code
d = request_handler(self, request) if isinstance(e, SynapseError):
with PreserveLoggingContext(): logger.info(
yield d "%s SynapseError: %s - %s", request, code, e.msg
code = request.code )
except CodeMessageException as e: else:
code = e.code logger.exception(e)
if isinstance(e, SynapseError): outgoing_responses_counter.inc(request.method, str(code))
logger.info( respond_with_json(
"%s SynapseError: %s - %s", request, code, e.msg request, code, cs_exception(e), send_cors=True,
pretty_print=_request_user_agent_is_curl(request),
version_string=self.version_string,
)
except:
logger.exception(
"Failed handle request %s.%s on %r: %r",
request_handler.__module__,
request_handler.__name__,
self,
request
)
respond_with_json(
request,
500,
{"error": "Internal server error"},
send_cors=True
) )
else:
logger.exception(e)
outgoing_responses_counter.inc(request.method, str(code))
respond_with_json(
request, code, cs_exception(e), send_cors=True,
pretty_print=_request_user_agent_is_curl(request),
version_string=self.version_string,
)
except:
code = 500
logger.exception(
"Failed handle request %s.%s on %r: %r",
request_handler.__module__,
request_handler.__name__,
self,
request
)
respond_with_json(
request,
500,
{"error": "Internal server error"},
send_cors=True
)
finally:
code = str(code) if code else "-"
end = self.clock.time_msec()
logger.info(
"Processed request: %dms %s %s %s",
end-start, code, request.method, request.path
)
return wrapped_request_handler return wrapped_request_handler

View file

@ -39,10 +39,10 @@ class HttpTransactionStore(object):
A tuple of (HTTP response code, response content) or None. A tuple of (HTTP response code, response content) or None.
""" """
try: try:
logger.debug("get_response Key: %s TxnId: %s", key, txn_id) logger.debug("get_response TxnId: %s", txn_id)
(last_txn_id, response) = self.transactions[key] (last_txn_id, response) = self.transactions[key]
if txn_id == last_txn_id: if txn_id == last_txn_id:
logger.info("get_response: Returning a response for %s", key) logger.info("get_response: Returning a response for %s", txn_id)
return response return response
except KeyError: except KeyError:
pass pass
@ -58,7 +58,7 @@ class HttpTransactionStore(object):
txn_id (str): The transaction ID for this request. txn_id (str): The transaction ID for this request.
response (tuple): A tuple of (HTTP response code, response content) response (tuple): A tuple of (HTTP response code, response content)
""" """
logger.debug("store_response Key: %s TxnId: %s", key, txn_id) logger.debug("store_response TxnId: %s", txn_id)
self.transactions[key] = (txn_id, response) self.transactions[key] = (txn_id, response)
def store_client_transaction(self, request, txn_id, response): def store_client_transaction(self, request, txn_id, response):