synapse/synapse/logging/_remote.py
Patrick Cloke 00b24aa545
Support generating structured logs in addition to standard logs. (#8607)
This modifies the configuration of structured logging to be usable from
the standard Python logging configuration.

This also separates the formatting of logs from the transport allowing
JSON logs to files or standard logs to sockets.
2020-10-29 07:27:37 -04:00

233 lines
7.3 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sys
import traceback
from collections import deque
from ipaddress import IPv4Address, IPv6Address, ip_address
from math import floor
from typing import Callable, Optional
import attr
from typing_extensions import Deque
from zope.interface import implementer
from twisted.application.internet import ClientService
from twisted.internet.defer import Deferred
from twisted.internet.endpoints import (
HostnameEndpoint,
TCP4ClientEndpoint,
TCP6ClientEndpoint,
)
from twisted.internet.interfaces import IPushProducer, ITransport
from twisted.internet.protocol import Factory, Protocol
logger = logging.getLogger(__name__)
@attr.s
@implementer(IPushProducer)
class LogProducer:
"""
An IPushProducer that writes logs from its buffer to its transport when it
is resumed.
Args:
buffer: Log buffer to read logs from.
transport: Transport to write to.
format: A callable to format the log record to a string.
"""
transport = attr.ib(type=ITransport)
_format = attr.ib(type=Callable[[logging.LogRecord], str])
_buffer = attr.ib(type=deque)
_paused = attr.ib(default=False, type=bool, init=False)
def pauseProducing(self):
self._paused = True
def stopProducing(self):
self._paused = True
self._buffer = deque()
def resumeProducing(self):
# If we're already producing, nothing to do.
self._paused = False
# Loop until paused.
while self._paused is False and (self._buffer and self.transport.connected):
try:
# Request the next record and format it.
record = self._buffer.popleft()
msg = self._format(record)
# Send it as a new line over the transport.
self.transport.write(msg.encode("utf8"))
self.transport.write(b"\n")
except Exception:
# Something has gone wrong writing to the transport -- log it
# and break out of the while.
traceback.print_exc(file=sys.__stderr__)
break
class RemoteHandler(logging.Handler):
"""
An logging handler that writes logs to a TCP target.
Args:
host: The host of the logging target.
port: The logging target's port.
maximum_buffer: The maximum buffer size.
"""
def __init__(
self,
host: str,
port: int,
maximum_buffer: int = 1000,
level=logging.NOTSET,
_reactor=None,
):
super().__init__(level=level)
self.host = host
self.port = port
self.maximum_buffer = maximum_buffer
self._buffer = deque() # type: Deque[logging.LogRecord]
self._connection_waiter = None # type: Optional[Deferred]
self._producer = None # type: Optional[LogProducer]
# Connect without DNS lookups if it's a direct IP.
if _reactor is None:
from twisted.internet import reactor
_reactor = reactor
try:
ip = ip_address(self.host)
if isinstance(ip, IPv4Address):
endpoint = TCP4ClientEndpoint(_reactor, self.host, self.port)
elif isinstance(ip, IPv6Address):
endpoint = TCP6ClientEndpoint(_reactor, self.host, self.port)
else:
raise ValueError("Unknown IP address provided: %s" % (self.host,))
except ValueError:
endpoint = HostnameEndpoint(_reactor, self.host, self.port)
factory = Factory.forProtocol(Protocol)
self._service = ClientService(endpoint, factory, clock=_reactor)
self._service.startService()
self._connect()
def close(self):
self._service.stopService()
def _connect(self) -> None:
"""
Triggers an attempt to connect then write to the remote if not already writing.
"""
# Do not attempt to open multiple connections.
if self._connection_waiter:
return
self._connection_waiter = self._service.whenConnected(failAfterFailures=1)
@self._connection_waiter.addErrback
def fail(r):
r.printTraceback(file=sys.__stderr__)
self._connection_waiter = None
self._connect()
@self._connection_waiter.addCallback
def writer(r):
# We have a connection. If we already have a producer, and its
# transport is the same, just trigger a resumeProducing.
if self._producer and r.transport is self._producer.transport:
self._producer.resumeProducing()
self._connection_waiter = None
return
# If the producer is still producing, stop it.
if self._producer:
self._producer.stopProducing()
# Make a new producer and start it.
self._producer = LogProducer(
buffer=self._buffer, transport=r.transport, format=self.format,
)
r.transport.registerProducer(self._producer, True)
self._producer.resumeProducing()
self._connection_waiter = None
def _handle_pressure(self) -> None:
"""
Handle backpressure by shedding records.
The buffer will, in this order, until the buffer is below the maximum:
- Shed DEBUG records.
- Shed INFO records.
- Shed the middle 50% of the records.
"""
if len(self._buffer) <= self.maximum_buffer:
return
# Strip out DEBUGs
self._buffer = deque(
filter(lambda record: record.levelno > logging.DEBUG, self._buffer)
)
if len(self._buffer) <= self.maximum_buffer:
return
# Strip out INFOs
self._buffer = deque(
filter(lambda record: record.levelno > logging.INFO, self._buffer)
)
if len(self._buffer) <= self.maximum_buffer:
return
# Cut the middle entries out
buffer_split = floor(self.maximum_buffer / 2)
old_buffer = self._buffer
self._buffer = deque()
for i in range(buffer_split):
self._buffer.append(old_buffer.popleft())
end_buffer = []
for i in range(buffer_split):
end_buffer.append(old_buffer.pop())
self._buffer.extend(reversed(end_buffer))
def emit(self, record: logging.LogRecord) -> None:
self._buffer.append(record)
# Handle backpressure, if it exists.
try:
self._handle_pressure()
except Exception:
# If handling backpressure fails, clear the buffer and log the
# exception.
self._buffer.clear()
logger.warning("Failed clearing backpressure")
# Try and write immediately.
self._connect()