Merge pull request #2104 from matrix-org/erikj/metrics_tcp

Rearrange TCP replication metrics
This commit is contained in:
Erik Johnston 2017-04-05 14:24:06 +01:00 committed by GitHub
commit ea0152b132

View file

@ -51,6 +51,7 @@ indicate which side is sending, these are *not* included on the wire::
from twisted.internet import defer from twisted.internet import defer
from twisted.protocols.basic import LineOnlyReceiver from twisted.protocols.basic import LineOnlyReceiver
from twisted.python.failure import Failure
from commands import ( from commands import (
COMMAND_MAP, VALID_CLIENT_COMMANDS, VALID_SERVER_COMMANDS, COMMAND_MAP, VALID_CLIENT_COMMANDS, VALID_SERVER_COMMANDS,
@ -60,6 +61,7 @@ from commands import (
from streams import STREAMS_MAP from streams import STREAMS_MAP
from synapse.util.stringutils import random_string from synapse.util.stringutils import random_string
from synapse.metrics.metric import CounterMetric
import logging import logging
import synapse.metrics import synapse.metrics
@ -69,11 +71,8 @@ import fcntl
metrics = synapse.metrics.get_metrics_for(__name__) metrics = synapse.metrics.get_metrics_for(__name__)
inbound_commands_counter = metrics.register_counter( connection_close_counter = metrics.register_counter(
"inbound_commands", labels=["command", "name", "conn_id"], "close_reason", labels=["reason_type"],
)
outbound_commands_counter = metrics.register_counter(
"outbound_commands", labels=["command", "name", "conn_id"],
) )
@ -135,6 +134,13 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
# The LoopingCall for sending pings. # The LoopingCall for sending pings.
self._send_ping_loop = None self._send_ping_loop = None
self.inbound_commands_counter = CounterMetric(
"inbound_commands", labels=["command"],
)
self.outbound_commands_counter = CounterMetric(
"outbound_commands", labels=["command"],
)
def connectionMade(self): def connectionMade(self):
logger.info("[%s] Connection established", self.id()) logger.info("[%s] Connection established", self.id())
@ -193,7 +199,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.last_received_command = self.clock.time_msec() self.last_received_command = self.clock.time_msec()
inbound_commands_counter.inc(cmd_name, self.name, self.conn_id) self.inbound_commands_counter.inc(cmd_name)
cmd_cls = COMMAND_MAP[cmd_name] cmd_cls = COMMAND_MAP[cmd_name]
try: try:
@ -242,7 +248,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self._queue_command(cmd) self._queue_command(cmd)
return return
outbound_commands_counter.inc(cmd.NAME, self.name, self.conn_id) self.outbound_commands_counter.inc(cmd.NAME)
string = "%s %s" % (cmd.NAME, cmd.to_line(),) string = "%s %s" % (cmd.NAME, cmd.to_line(),)
if "\n" in string: if "\n" in string:
@ -307,6 +313,10 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
def connectionLost(self, reason): def connectionLost(self, reason):
logger.info("[%s] Replication connection closed: %r", self.id(), reason) logger.info("[%s] Replication connection closed: %r", self.id(), reason)
if isinstance(reason, Failure):
connection_close_counter.inc(reason.type.__name__)
else:
connection_close_counter.inc(reason.__class__.__name__)
try: try:
# Remove us from list of connections to be monitored # Remove us from list of connections to be monitored
@ -604,3 +614,24 @@ metrics.register_callback(
}, },
labels=["name", "conn_id"], labels=["name", "conn_id"],
) )
metrics.register_callback(
"inbound_commands",
lambda: {
(k[0], p.name, p.conn_id): count
for p in connected_connections
for k, count in p.inbound_commands_counter.counts.iteritems()
},
labels=["command", "name", "conn_id"],
)
metrics.register_callback(
"outbound_commands",
lambda: {
(k[0], p.name, p.conn_id): count
for p in connected_connections
for k, count in p.outbound_commands_counter.counts.iteritems()
},
labels=["command", "name", "conn_id"],
)