From f18d7546c63ae30c4058d1ec6ab2d5c3b001d257 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 4 Jul 2016 15:48:25 +0100 Subject: [PATCH 1/5] Use a query that postgresql optimises better for get_events_around --- synapse/storage/stream.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index b9ad965fd..4dd11284e 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -591,25 +591,28 @@ class StreamStore(SQLBaseStore): query_before = ( "SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND (topological_ordering < ?" - " OR (topological_ordering = ? AND stream_ordering < ?))" - " ORDER BY topological_ordering DESC, stream_ordering DESC" - " LIMIT ?" + " WHERE room_id = ? AND topological_ordering < ?" + " UNION ALL " + " SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?" + " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" ) query_after = ( "SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND (topological_ordering > ?" - " OR (topological_ordering = ? AND stream_ordering > ?))" - " ORDER BY topological_ordering ASC, stream_ordering ASC" - " LIMIT ?" + " WHERE room_id = ? AND topological_ordering > ?" + " UNION ALL" + " SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?" + " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?" ) txn.execute( query_before, ( - room_id, topological_ordering, topological_ordering, - stream_ordering, before_limit, + room_id, topological_ordering, + room_id, topological_ordering, stream_ordering, + before_limit, ) ) @@ -630,8 +633,9 @@ class StreamStore(SQLBaseStore): txn.execute( query_after, ( - room_id, topological_ordering, topological_ordering, - stream_ordering, after_limit, + room_id, topological_ordering, + room_id, topological_ordering, stream_ordering, + after_limit, ) ) From 0fb76c71ac4bdd00e7524cf11668c13754d29a08 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 4 Jul 2016 19:44:55 +0100 Subject: [PATCH 2/5] Use different SQL for postgres and sqlite3 for when using multicolumn indexes --- synapse/storage/event_push_actions.py | 18 ++--- synapse/storage/stream.py | 100 +++++++++++++------------- 2 files changed, 59 insertions(+), 59 deletions(-) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 5f1b6f63a..e3e2e8083 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -16,6 +16,8 @@ from ._base import SQLBaseStore from twisted.internet import defer from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.types import RoomStreamToken +from .stream import lower_bound import logging import ujson as json @@ -73,6 +75,9 @@ class EventPushActionsStore(SQLBaseStore): stream_ordering = results[0][0] topological_ordering = results[0][1] + token = RoomStreamToken( + topological_ordering, stream_ordering + ) sql = ( "SELECT sum(notif), sum(highlight)" @@ -80,15 +85,10 @@ class EventPushActionsStore(SQLBaseStore): " WHERE" " user_id = ?" " AND room_id = ?" - " AND (" - " topological_ordering > ?" - " OR (topological_ordering = ? AND stream_ordering > ?)" - ")" - ) - txn.execute(sql, ( - user_id, room_id, - topological_ordering, topological_ordering, stream_ordering - )) + " AND %s" + ) % (lower_bound(token, self.database_engine, inclusive=""),) + + txn.execute(sql, (user_id, room_id)) row = txn.fetchone() if row: return { diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 4dd11284e..23b3a40aa 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -40,6 +40,7 @@ from synapse.util.caches.descriptors import cached from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken from synapse.util.logcontext import preserve_fn +from synapse.storage.engines import PostgresEngine import logging @@ -54,25 +55,41 @@ _STREAM_TOKEN = "stream" _TOPOLOGICAL_TOKEN = "topological" -def lower_bound(token): +def lower_bound(token, engine, inclusive=""): if token.topological is None: - return "(%d < %s)" % (token.stream, "stream_ordering") + return "(%d <%s %s)" % (token.stream, inclusive, "stream_ordering") else: - return "(%d < %s OR (%d = %s AND %d < %s))" % ( + if isinstance(engine, PostgresEngine): + # Postgres doesn't optimise ``(x < a) OR (x=a AND y= %s)" % (token.stream, "stream_ordering") + return "(%d >%s %s)" % (token.stream, inclusive, "stream_ordering") else: - return "(%d > %s OR (%d = %s AND %d >= %s))" % ( + if isinstance(engine, PostgresEngine): + # Postgres doesn't optimise ``(x > a) OR (x=a AND y>b)`` as well + # as it optimises ``(x,y) > (a,b)`` on multicolumn indexes. So we + # use the later form when running against postgres. + return "((%d,%d) >%s (%s,%s))" % ( + token.topological, token.stream, inclusive, + "topological_ordering", "stream_ordering", + ) + return "(%d > %s OR (%d = %s AND %d >%s %s))" % ( token.topological, "topological_ordering", token.topological, "topological_ordering", - token.stream, "stream_ordering", + token.stream, inclusive, "stream_ordering", ) @@ -308,18 +325,22 @@ class StreamStore(SQLBaseStore): args = [False, room_id] if direction == 'b': order = "DESC" - bounds = upper_bound(RoomStreamToken.parse(from_key)) + bounds = upper_bound( + RoomStreamToken.parse(from_key), self.database_engine + ) if to_key: - bounds = "%s AND %s" % ( - bounds, lower_bound(RoomStreamToken.parse(to_key)) - ) + bounds = "%s AND %s" % (bounds, lower_bound( + RoomStreamToken.parse(to_key), self.database_engine + )) else: order = "ASC" - bounds = lower_bound(RoomStreamToken.parse(from_key)) + bounds = lower_bound( + RoomStreamToken.parse(from_key), self.database_engine + ) if to_key: - bounds = "%s AND %s" % ( - bounds, upper_bound(RoomStreamToken.parse(to_key)) - ) + bounds = "%s AND %s" % (bounds, upper_bound( + RoomStreamToken.parse(to_key), self.database_engine + )) if int(limit) > 0: args.append(int(limit)) @@ -586,35 +607,24 @@ class StreamStore(SQLBaseStore): retcols=["stream_ordering", "topological_ordering"], ) - stream_ordering = results["stream_ordering"] - topological_ordering = results["topological_ordering"] + token = RoomStreamToken( + results["topological_ordering"], + results["stream_ordering"], + ) query_before = ( "SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND topological_ordering < ?" - " UNION ALL " - " SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?" + " WHERE room_id = ? AND %s" " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" - ) + ) % (upper_bound(token, self.database_engine, inclusive=""),) query_after = ( "SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND topological_ordering > ?" - " UNION ALL" - " SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?" + " WHERE room_id = ? AND %s" " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?" - ) + ) % (lower_bound(token, self.database_engine, inclusive=""),) - txn.execute( - query_before, - ( - room_id, topological_ordering, - room_id, topological_ordering, stream_ordering, - before_limit, - ) - ) + txn.execute(query_before, (room_id, before_limit)) rows = self.cursor_to_dict(txn) events_before = [r["event_id"] for r in rows] @@ -626,18 +636,11 @@ class StreamStore(SQLBaseStore): )) else: start_token = str(RoomStreamToken( - topological_ordering, - stream_ordering - 1, + token.topological, + token.stream - 1, )) - txn.execute( - query_after, - ( - room_id, topological_ordering, - room_id, topological_ordering, stream_ordering, - after_limit, - ) - ) + txn.execute(query_after, (room_id, after_limit)) rows = self.cursor_to_dict(txn) events_after = [r["event_id"] for r in rows] @@ -648,10 +651,7 @@ class StreamStore(SQLBaseStore): rows[-1]["stream_ordering"], )) else: - end_token = str(RoomStreamToken( - topological_ordering, - stream_ordering, - )) + end_token = str(token) return { "before": { From d44d11d864714d4d99953bdae6625973519f120f Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 Jul 2016 10:39:13 +0100 Subject: [PATCH 3/5] Use true/false for boolean parameter inclusive to avoid potential for sqli, and possibly make the code clearer --- synapse/storage/event_push_actions.py | 2 +- synapse/storage/stream.py | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index e3e2e8083..3d93285f8 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -86,7 +86,7 @@ class EventPushActionsStore(SQLBaseStore): " user_id = ?" " AND room_id = ?" " AND %s" - ) % (lower_bound(token, self.database_engine, inclusive=""),) + ) % (lower_bound(token, self.database_engine, inclusive=False),) txn.execute(sql, (user_id, room_id)) row = txn.fetchone() diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 23b3a40aa..56304999d 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -55,7 +55,8 @@ _STREAM_TOKEN = "stream" _TOPOLOGICAL_TOKEN = "topological" -def lower_bound(token, engine, inclusive=""): +def lower_bound(token, engine, inclusive=False): + inclusive = "=" if inclusive else "" if token.topological is None: return "(%d <%s %s)" % (token.stream, inclusive, "stream_ordering") else: @@ -74,7 +75,8 @@ def lower_bound(token, engine, inclusive=""): ) -def upper_bound(token, engine, inclusive="="): +def upper_bound(token, engine, inclusive=True): + inclusive = "=" if inclusive else "" if token.topological is None: return "(%d >%s %s)" % (token.stream, inclusive, "stream_ordering") else: @@ -616,13 +618,13 @@ class StreamStore(SQLBaseStore): "SELECT topological_ordering, stream_ordering, event_id FROM events" " WHERE room_id = ? AND %s" " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" - ) % (upper_bound(token, self.database_engine, inclusive=""),) + ) % (upper_bound(token, self.database_engine, inclusive=False),) query_after = ( "SELECT topological_ordering, stream_ordering, event_id FROM events" " WHERE room_id = ? AND %s" " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?" - ) % (lower_bound(token, self.database_engine, inclusive=""),) + ) % (lower_bound(token, self.database_engine, inclusive=False),) txn.execute(query_before, (room_id, before_limit)) From b6b0132ac7cac86e8cc5457783311b4db59e5870 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 Jul 2016 13:55:18 +0100 Subject: [PATCH 4/5] Make get_events_around more efficient on sqlite3 --- synapse/storage/stream.py | 62 +++++++++++++++++++++++++++++++-------- 1 file changed, 49 insertions(+), 13 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 56304999d..f18fb63c5 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -40,7 +40,7 @@ from synapse.util.caches.descriptors import cached from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken from synapse.util.logcontext import preserve_fn -from synapse.storage.engines import PostgresEngine +from synapse.storage.engines import PostgresEngine, Sqlite3Engine import logging @@ -614,19 +614,55 @@ class StreamStore(SQLBaseStore): results["stream_ordering"], ) - query_before = ( - "SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND %s" - " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" - ) % (upper_bound(token, self.database_engine, inclusive=False),) + if isinstance(self.database_engine, Sqlite3Engine): + # SQLite3 doesn't optimise ``(x < a) OR (x = a AND y < b)`` + # So we give pass it to SQLite3 as the UNION ALL of the two queries. - query_after = ( - "SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND %s" - " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?" - ) % (lower_bound(token, self.database_engine, inclusive=False),) + query_before = ( + "SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND topological_ordering < ?" + " UNION ALL" + " SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?" + " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" + ) + before_args = ( + room_id, token.topological, + room_id, token.topological, token.stream, + before_limit, + ) - txn.execute(query_before, (room_id, before_limit)) + query_after = ( + "SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND topological_ordering > ?" + " UNION ALL" + " SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?" + " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?" + ) + after_args = ( + room_id, token.topological, + room_id, token.topological, token.stream, + after_limit, + ) + else: + query_before = ( + "SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND %s" + " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" + ) % (upper_bound(token, self.database_engine, inclusive=False),) + + before_args = (room_id, before_limit), + + query_after = ( + "SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND %s" + " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?" + ) % (lower_bound(token, self.database_engine, inclusive=False),) + + after_args = (room_id, after_limit) + + txn.execute(query_before, before_args) rows = self.cursor_to_dict(txn) events_before = [r["event_id"] for r in rows] @@ -642,7 +678,7 @@ class StreamStore(SQLBaseStore): token.stream - 1, )) - txn.execute(query_after, (room_id, after_limit)) + txn.execute(query_after, after_args) rows = self.cursor_to_dict(txn) events_after = [r["event_id"] for r in rows] From dd2ccee27d834107e86cc18f46a5e4d4aa88d3c9 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 5 Jul 2016 14:06:07 +0100 Subject: [PATCH 5/5] Fix typo --- synapse/storage/stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index f18fb63c5..c08c5b997 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -652,7 +652,7 @@ class StreamStore(SQLBaseStore): " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" ) % (upper_bound(token, self.database_engine, inclusive=False),) - before_args = (room_id, before_limit), + before_args = (room_id, before_limit) query_after = ( "SELECT topological_ordering, stream_ordering, event_id FROM events"