From b43d3267e25949acb4ad7333ac47afa6e38cb815 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Apr 2017 13:34:54 +0100 Subject: [PATCH 1/3] Fixup some metrics for tcp repl --- synapse/metrics/metric.py | 3 +++ synapse/replication/tcp/protocol.py | 16 ++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py index e87b2b80a..920cde1dd 100644 --- a/synapse/metrics/metric.py +++ b/synapse/metrics/metric.py @@ -83,6 +83,9 @@ class CounterMetric(BaseMetric): def render(self): return map_concat(self.render_item, sorted(self.counts.keys())) + def unregister_counter(self, *values): + self.counts.pop(values, None) + class CallbackMetric(BaseMetric): """A metric that returns the numeric value returned by a callback whenever diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 686420461..4f44836c2 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -51,6 +51,7 @@ indicate which side is sending, these are *not* included on the wire:: from twisted.internet import defer from twisted.protocols.basic import LineOnlyReceiver +from twisted.python.failure import Failure from commands import ( COMMAND_MAP, VALID_CLIENT_COMMANDS, VALID_SERVER_COMMANDS, @@ -75,6 +76,9 @@ inbound_commands_counter = metrics.register_counter( outbound_commands_counter = metrics.register_counter( "outbound_commands", labels=["command", "name", "conn_id"], ) +connection_close_counter = metrics.register_counter( + "close_reason", labels=["reason_type"], +) # A list of all connected protocols. This allows us to send metrics about the @@ -307,6 +311,10 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): def connectionLost(self, reason): logger.info("[%s] Replication connection closed: %r", self.id(), reason) + if isinstance(reason, Failure): + connection_close_counter.inc(reason.type.__name__) + else: + connection_close_counter.inc(reason.__class__.__name__) try: # Remove us from list of connections to be monitored @@ -326,6 +334,14 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self.state = ConnectionStates.CLOSED self.pending_commands = [] + for cmd in COMMAND_MAP: + outbound_commands_counter.unregister_counter( + cmd.NAME, self.name, self.conn_id + ) + inbound_commands_counter.unregister_counter( + cmd.NAME, self.name, self.conn_id + ) + if self.transport: self.transport.unregisterProducer() From 1ca0e78ca10ccd0fa705160b7138f0ba2370158f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Apr 2017 13:43:39 +0100 Subject: [PATCH 2/3] Fix typo --- synapse/replication/tcp/protocol.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 4f44836c2..95ea256e7 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -336,10 +336,10 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): for cmd in COMMAND_MAP: outbound_commands_counter.unregister_counter( - cmd.NAME, self.name, self.conn_id + cmd, self.name, self.conn_id ) inbound_commands_counter.unregister_counter( - cmd.NAME, self.name, self.conn_id + cmd, self.name, self.conn_id ) if self.transport: From 3f213d908d82a02794dd0a06db4d7ac956fa0db3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Apr 2017 14:15:09 +0100 Subject: [PATCH 3/3] Rearrange metrics --- synapse/metrics/metric.py | 3 -- synapse/replication/tcp/protocol.py | 47 +++++++++++++++++++---------- 2 files changed, 31 insertions(+), 19 deletions(-) diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py index 920cde1dd..e87b2b80a 100644 --- a/synapse/metrics/metric.py +++ b/synapse/metrics/metric.py @@ -83,9 +83,6 @@ class CounterMetric(BaseMetric): def render(self): return map_concat(self.render_item, sorted(self.counts.keys())) - def unregister_counter(self, *values): - self.counts.pop(values, None) - class CallbackMetric(BaseMetric): """A metric that returns the numeric value returned by a callback whenever diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 95ea256e7..d4d672aaf 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -61,6 +61,7 @@ from commands import ( from streams import STREAMS_MAP from synapse.util.stringutils import random_string +from synapse.metrics.metric import CounterMetric import logging import synapse.metrics @@ -70,12 +71,6 @@ import fcntl metrics = synapse.metrics.get_metrics_for(__name__) -inbound_commands_counter = metrics.register_counter( - "inbound_commands", labels=["command", "name", "conn_id"], -) -outbound_commands_counter = metrics.register_counter( - "outbound_commands", labels=["command", "name", "conn_id"], -) connection_close_counter = metrics.register_counter( "close_reason", labels=["reason_type"], ) @@ -139,6 +134,13 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): # The LoopingCall for sending pings. self._send_ping_loop = None + self.inbound_commands_counter = CounterMetric( + "inbound_commands", labels=["command"], + ) + self.outbound_commands_counter = CounterMetric( + "outbound_commands", labels=["command"], + ) + def connectionMade(self): logger.info("[%s] Connection established", self.id()) @@ -197,7 +199,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self.last_received_command = self.clock.time_msec() - inbound_commands_counter.inc(cmd_name, self.name, self.conn_id) + self.inbound_commands_counter.inc(cmd_name) cmd_cls = COMMAND_MAP[cmd_name] try: @@ -246,7 +248,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self._queue_command(cmd) return - outbound_commands_counter.inc(cmd.NAME, self.name, self.conn_id) + self.outbound_commands_counter.inc(cmd.NAME) string = "%s %s" % (cmd.NAME, cmd.to_line(),) if "\n" in string: @@ -334,14 +336,6 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self.state = ConnectionStates.CLOSED self.pending_commands = [] - for cmd in COMMAND_MAP: - outbound_commands_counter.unregister_counter( - cmd, self.name, self.conn_id - ) - inbound_commands_counter.unregister_counter( - cmd, self.name, self.conn_id - ) - if self.transport: self.transport.unregisterProducer() @@ -620,3 +614,24 @@ metrics.register_callback( }, labels=["name", "conn_id"], ) + + +metrics.register_callback( + "inbound_commands", + lambda: { + (k[0], p.name, p.conn_id): count + for p in connected_connections + for k, count in p.inbound_commands_counter.counts.iteritems() + }, + labels=["command", "name", "conn_id"], +) + +metrics.register_callback( + "outbound_commands", + lambda: { + (k[0], p.name, p.conn_id): count + for p in connected_connections + for k, count in p.outbound_commands_counter.counts.iteritems() + }, + labels=["command", "name", "conn_id"], +)