Merge branch 'develop' into matrix-org-hotfixes

This commit is contained in:
Richard van der Hoff 2019-02-20 14:24:17 +00:00
commit 68af15637b
9 changed files with 246 additions and 19 deletions

View file

@ -89,7 +89,7 @@ install:
- psql -At -U postgres -c 'select version();' || true
- pip install tox
# if we don't have python3.6 in this environment, travis unhelpfully gives us
# a `python3.6` on our path which does nothing but spit out a warning. Tox
# tries to run it (even if we're not running a py36 env), so the build logs

1
changelog.d/4644.misc Normal file
View file

@ -0,0 +1 @@
Introduce upsert batching functionality in the database layer.

1
changelog.d/4694.feature Normal file
View file

@ -0,0 +1 @@
Add basic optional sentry integration

1
changelog.d/4695.feature Normal file
View file

@ -0,0 +1 @@
Add prometheus metrics for number of outgoing EDUs, by type.

View file

@ -59,6 +59,8 @@ class MetricsConfig(Config):
#
#sentry:
# dsn: "..."
# Whether or not to report anonymized homeserver usage statistics.
"""
if report_stats is None:

View file

@ -33,7 +33,6 @@ from synapse.metrics import (
event_processing_loop_counter,
event_processing_loop_room_count,
events_processed_counter,
sent_edus_counter,
sent_transactions_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
@ -47,10 +46,24 @@ from .units import Edu, Transaction
logger = logging.getLogger(__name__)
sent_pdus_destination_dist_count = Counter(
"synapse_federation_client_sent_pdu_destinations:count", ""
"synapse_federation_client_sent_pdu_destinations:count",
"Number of PDUs queued for sending to one or more destinations",
)
sent_pdus_destination_dist_total = Counter(
"synapse_federation_client_sent_pdu_destinations:total", ""
"Total number of PDUs queued for sending across all destinations",
)
sent_edus_counter = Counter(
"synapse_federation_client_sent_edus",
"Total number of EDUs successfully sent",
)
sent_edus_by_type = Counter(
"synapse_federation_client_sent_edus_by_type",
"Number of sent EDUs successfully sent, by event type",
["type"],
)
@ -360,8 +373,6 @@ class TransactionQueue(object):
logger.info("Not sending EDU to ourselves")
return
sent_edus_counter.inc()
if key:
self.pending_edus_keyed_by_dest.setdefault(
destination, {}
@ -496,6 +507,9 @@ class TransactionQueue(object):
)
if success:
sent_transactions_counter.inc()
sent_edus_counter.inc(len(pending_edus))
for edu in pending_edus:
sent_edus_by_type.labels(edu.edu_type).inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if device_message_edus:

View file

@ -274,8 +274,6 @@ pending_calls_metric = Histogram(
# Federation Metrics
#
sent_edus_counter = Counter("synapse_federation_client_sent_edus", "")
sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "")
events_processed_counter = Counter("synapse_federation_client_events_processed", "")

View file

@ -106,6 +106,14 @@ class LoggingTransaction(object):
def __iter__(self):
return self.txn.__iter__()
def execute_batch(self, sql, args):
if isinstance(self.database_engine, PostgresEngine):
from psycopg2.extras import execute_batch
self._do_execute(lambda *x: execute_batch(self.txn, *x), sql, args)
else:
for val in args:
self.execute(sql, val)
def execute(self, sql, *args):
self._do_execute(self.txn.execute, sql, *args)
@ -699,20 +707,34 @@ class SQLBaseStore(object):
else:
return "%s = ?" % (key,)
# First try to update.
sql = "UPDATE %s SET %s WHERE %s" % (
table,
", ".join("%s = ?" % (k,) for k in values),
" AND ".join(_getwhere(k) for k in keyvalues)
)
sqlargs = list(values.values()) + list(keyvalues.values())
if not values:
# If `values` is empty, then all of the values we care about are in
# the unique key, so there is nothing to UPDATE. We can just do a
# SELECT instead to see if it exists.
sql = "SELECT 1 FROM %s WHERE %s" % (
table,
" AND ".join(_getwhere(k) for k in keyvalues)
)
sqlargs = list(keyvalues.values())
txn.execute(sql, sqlargs)
if txn.fetchall():
# We have an existing record.
return False
else:
# First try to update.
sql = "UPDATE %s SET %s WHERE %s" % (
table,
", ".join("%s = ?" % (k,) for k in values),
" AND ".join(_getwhere(k) for k in keyvalues)
)
sqlargs = list(values.values()) + list(keyvalues.values())
txn.execute(sql, sqlargs)
if txn.rowcount > 0:
# successfully updated at least one row.
return False
txn.execute(sql, sqlargs)
if txn.rowcount > 0:
# successfully updated at least one row.
return False
# We didn't update any rows so insert a new one
# We didn't find any existing rows, so insert a new one
allvalues = {}
allvalues.update(keyvalues)
allvalues.update(values)
@ -759,6 +781,106 @@ class SQLBaseStore(object):
)
txn.execute(sql, list(allvalues.values()))
def _simple_upsert_many_txn(
self, txn, table, key_names, key_values, value_names, value_values
):
"""
Upsert, many times.
Args:
table (str): The table to upsert into
key_names (list[str]): The key column names.
key_values (list[list]): A list of each row's key column values.
value_names (list[str]): The value column names. If empty, no
values will be used, even if value_values is provided.
value_values (list[list]): A list of each row's value column values.
Returns:
None
"""
if (
self.database_engine.can_native_upsert
and table not in self._unsafe_to_upsert_tables
):
return self._simple_upsert_many_txn_native_upsert(
txn, table, key_names, key_values, value_names, value_values
)
else:
return self._simple_upsert_many_txn_emulated(
txn, table, key_names, key_values, value_names, value_values
)
def _simple_upsert_many_txn_emulated(
self, txn, table, key_names, key_values, value_names, value_values
):
"""
Upsert, many times, but without native UPSERT support or batching.
Args:
table (str): The table to upsert into
key_names (list[str]): The key column names.
key_values (list[list]): A list of each row's key column values.
value_names (list[str]): The value column names. If empty, no
values will be used, even if value_values is provided.
value_values (list[list]): A list of each row's value column values.
Returns:
None
"""
# No value columns, therefore make a blank list so that the following
# zip() works correctly.
if not value_names:
value_values = [() for x in range(len(key_values))]
for keyv, valv in zip(key_values, value_values):
_keys = {x: y for x, y in zip(key_names, keyv)}
_vals = {x: y for x, y in zip(value_names, valv)}
self._simple_upsert_txn_emulated(txn, table, _keys, _vals)
def _simple_upsert_many_txn_native_upsert(
self, txn, table, key_names, key_values, value_names, value_values
):
"""
Upsert, many times, using batching where possible.
Args:
table (str): The table to upsert into
key_names (list[str]): The key column names.
key_values (list[list]): A list of each row's key column values.
value_names (list[str]): The value column names. If empty, no
values will be used, even if value_values is provided.
value_values (list[list]): A list of each row's value column values.
Returns:
None
"""
allnames = []
allnames.extend(key_names)
allnames.extend(value_names)
if not value_names:
# No value columns, therefore make a blank list so that the
# following zip() works correctly.
latter = "NOTHING"
value_values = [() for x in range(len(key_values))]
else:
latter = (
"UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in value_names)
)
sql = "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s" % (
table,
", ".join(k for k in allnames),
", ".join("?" for _ in allnames),
", ".join(key_names),
latter,
)
args = []
for x, y in zip(key_values, value_values):
args.append(tuple(x) + tuple(y))
return txn.execute_batch(sql, args)
def _simple_select_one(self, table, keyvalues, retcols,
allow_none=False, desc="_simple_select_one"):
"""Executes a SELECT query on the named table, which is expected to

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -314,3 +315,90 @@ class CacheDecoratorTestCase(unittest.TestCase):
self.assertEquals(callcount[0], 2)
self.assertEquals(callcount2[0], 3)
class UpsertManyTests(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, hs):
self.storage = hs.get_datastore()
self.table_name = "table_" + hs.get_secrets().token_hex(6)
self.get_success(
self.storage.runInteraction(
"create",
lambda x, *a: x.execute(*a),
"CREATE TABLE %s (id INTEGER, username TEXT, value TEXT)"
% (self.table_name,),
)
)
self.get_success(
self.storage.runInteraction(
"index",
lambda x, *a: x.execute(*a),
"CREATE UNIQUE INDEX %sindex ON %s(id, username)"
% (self.table_name, self.table_name),
)
)
def _dump_to_tuple(self, res):
for i in res:
yield (i["id"], i["username"], i["value"])
def test_upsert_many(self):
"""
Upsert_many will perform the upsert operation across a batch of data.
"""
# Add some data to an empty table
key_names = ["id", "username"]
value_names = ["value"]
key_values = [[1, "user1"], [2, "user2"]]
value_values = [["hello"], ["there"]]
self.get_success(
self.storage.runInteraction(
"test",
self.storage._simple_upsert_many_txn,
self.table_name,
key_names,
key_values,
value_names,
value_values,
)
)
# Check results are what we expect
res = self.get_success(
self.storage._simple_select_list(
self.table_name, None, ["id, username, value"]
)
)
self.assertEqual(
set(self._dump_to_tuple(res)),
set([(1, "user1", "hello"), (2, "user2", "there")]),
)
# Update only user2
key_values = [[2, "user2"]]
value_values = [["bleb"]]
self.get_success(
self.storage.runInteraction(
"test",
self.storage._simple_upsert_many_txn,
self.table_name,
key_names,
key_values,
value_names,
value_values,
)
)
# Check results are what we expect
res = self.get_success(
self.storage._simple_select_list(
self.table_name, None, ["id, username, value"]
)
)
self.assertEqual(
set(self._dump_to_tuple(res)),
set([(1, "user1", "hello"), (2, "user2", "bleb")]),
)