Merge pull request #2848 from matrix-org/rav/refactor_search_insert

Factor out common code for search insert
This commit is contained in:
Richard van der Hoff 2018-02-05 17:42:09 +01:00 committed by GitHub
commit 9a304ef2b0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 94 additions and 59 deletions

View file

@ -16,11 +16,9 @@
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import StoreError from synapse.api.errors import StoreError
from synapse.storage.search import SearchStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from ._base import SQLBaseStore
from .engines import PostgresEngine, Sqlite3Engine
import collections import collections
import logging import logging
import ujson as json import ujson as json
@ -40,7 +38,7 @@ RatelimitOverride = collections.namedtuple(
) )
class RoomStore(SQLBaseStore): class RoomStore(SearchStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def store_room(self, room_id, room_creator_user_id, is_public): def store_room(self, room_id, room_creator_user_id, is_public):
@ -263,8 +261,8 @@ class RoomStore(SQLBaseStore):
}, },
) )
self._store_event_search_txn( self.store_event_search_txn(
txn, event, "content.topic", event.content["topic"] txn, event, "content.topic", event.content["topic"],
) )
def _store_room_name_txn(self, txn, event): def _store_room_name_txn(self, txn, event):
@ -279,14 +277,14 @@ class RoomStore(SQLBaseStore):
} }
) )
self._store_event_search_txn( self.store_event_search_txn(
txn, event, "content.name", event.content["name"] txn, event, "content.name", event.content["name"],
) )
def _store_room_message_txn(self, txn, event): def _store_room_message_txn(self, txn, event):
if hasattr(event, "content") and "body" in event.content: if hasattr(event, "content") and "body" in event.content:
self._store_event_search_txn( self.store_event_search_txn(
txn, event, "content.body", event.content["body"] txn, event, "content.body", event.content["body"],
) )
def _store_history_visibility_txn(self, txn, event): def _store_history_visibility_txn(self, txn, event):
@ -308,31 +306,6 @@ class RoomStore(SQLBaseStore):
event.content[key] event.content[key]
)) ))
def _store_event_search_txn(self, txn, event, key, value):
if isinstance(self.database_engine, PostgresEngine):
sql = (
"INSERT INTO event_search"
" (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
" VALUES (?,?,?,to_tsvector('english', ?),?,?)"
)
txn.execute(
sql,
(
event.event_id, event.room_id, key, value,
event.internal_metadata.stream_ordering,
event.origin_server_ts,
)
)
elif isinstance(self.database_engine, Sqlite3Engine):
sql = (
"INSERT INTO event_search (event_id, room_id, key, value)"
" VALUES (?,?,?,?)"
)
txn.execute(sql, (event.event_id, event.room_id, key, value,))
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")
def add_event_report(self, room_id, event_id, user_id, reason, content, def add_event_report(self, room_id, event_id, user_id, reason, content,
received_ts): received_ts):
next_id = self._event_reports_id_gen.get_next() next_id = self._event_reports_id_gen.get_next()

View file

@ -13,19 +13,25 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from collections import namedtuple
import logging
import re
import ujson as json
from twisted.internet import defer from twisted.internet import defer
from .background_updates import BackgroundUpdateStore from .background_updates import BackgroundUpdateStore
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.storage.engines import PostgresEngine, Sqlite3Engine
import logging
import re
import ujson as json
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
SearchEntry = namedtuple('SearchEntry', [
'key', 'value', 'event_id', 'room_id', 'stream_ordering',
'origin_server_ts',
])
class SearchStore(BackgroundUpdateStore): class SearchStore(BackgroundUpdateStore):
@ -49,16 +55,17 @@ class SearchStore(BackgroundUpdateStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def _background_reindex_search(self, progress, batch_size): def _background_reindex_search(self, progress, batch_size):
# we work through the events table from highest stream id to lowest
target_min_stream_id = progress["target_min_stream_id_inclusive"] target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"] max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0) rows_inserted = progress.get("rows_inserted", 0)
INSERT_CLUMP_SIZE = 1000
TYPES = ["m.room.name", "m.room.message", "m.room.topic"] TYPES = ["m.room.name", "m.room.message", "m.room.topic"]
def reindex_search_txn(txn): def reindex_search_txn(txn):
sql = ( sql = (
"SELECT stream_ordering, event_id, room_id, type, content FROM events" "SELECT stream_ordering, event_id, room_id, type, content, "
" origin_server_ts FROM events"
" WHERE ? <= stream_ordering AND stream_ordering < ?" " WHERE ? <= stream_ordering AND stream_ordering < ?"
" AND (%s)" " AND (%s)"
" ORDER BY stream_ordering DESC" " ORDER BY stream_ordering DESC"
@ -67,6 +74,10 @@ class SearchStore(BackgroundUpdateStore):
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
# we could stream straight from the results into
# store_search_entries_txn with a generator function, but that
# would mean having two cursors open on the database at once.
# Instead we just build a list of results.
rows = self.cursor_to_dict(txn) rows = self.cursor_to_dict(txn)
if not rows: if not rows:
return 0 return 0
@ -79,6 +90,8 @@ class SearchStore(BackgroundUpdateStore):
event_id = row["event_id"] event_id = row["event_id"]
room_id = row["room_id"] room_id = row["room_id"]
etype = row["type"] etype = row["type"]
stream_ordering = row["stream_ordering"]
origin_server_ts = row["origin_server_ts"]
try: try:
content = json.loads(row["content"]) content = json.loads(row["content"])
except Exception: except Exception:
@ -93,6 +106,8 @@ class SearchStore(BackgroundUpdateStore):
elif etype == "m.room.name": elif etype == "m.room.name":
key = "content.name" key = "content.name"
value = content["name"] value = content["name"]
else:
raise Exception("unexpected event type %s" % etype)
except (KeyError, AttributeError): except (KeyError, AttributeError):
# If the event is missing a necessary field then # If the event is missing a necessary field then
# skip over it. # skip over it.
@ -103,25 +118,16 @@ class SearchStore(BackgroundUpdateStore):
# then skip over it # then skip over it
continue continue
event_search_rows.append((event_id, room_id, key, value)) event_search_rows.append(SearchEntry(
key=key,
value=value,
event_id=event_id,
room_id=room_id,
stream_ordering=stream_ordering,
origin_server_ts=origin_server_ts,
))
if isinstance(self.database_engine, PostgresEngine): self.store_search_entries_txn(txn, event_search_rows)
sql = (
"INSERT INTO event_search (event_id, room_id, key, vector)"
" VALUES (?,?,?,to_tsvector('english', ?))"
)
elif isinstance(self.database_engine, Sqlite3Engine):
sql = (
"INSERT INTO event_search (event_id, room_id, key, value)"
" VALUES (?,?,?,?)"
)
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")
for index in range(0, len(event_search_rows), INSERT_CLUMP_SIZE):
clump = event_search_rows[index:index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)
progress = { progress = {
"target_min_stream_id_inclusive": target_min_stream_id, "target_min_stream_id_inclusive": target_min_stream_id,
@ -242,6 +248,62 @@ class SearchStore(BackgroundUpdateStore):
defer.returnValue(num_rows) defer.returnValue(num_rows)
def store_event_search_txn(self, txn, event, key, value):
"""Add event to the search table
Args:
txn (cursor):
event (EventBase):
key (str):
value (str):
"""
self.store_search_entries_txn(
txn,
(SearchEntry(
key=key,
value=value,
event_id=event.event_id,
room_id=event.room_id,
stream_ordering=event.internal_metadata.stream_ordering,
origin_server_ts=event.origin_server_ts,
),),
)
def store_search_entries_txn(self, txn, entries):
"""Add entries to the search table
Args:
txn (cursor):
entries (iterable[SearchEntry]):
entries to be added to the table
"""
if isinstance(self.database_engine, PostgresEngine):
sql = (
"INSERT INTO event_search"
" (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
" VALUES (?,?,?,to_tsvector('english', ?),?,?)"
)
args = ((
entry.event_id, entry.room_id, entry.key, entry.value,
entry.stream_ordering, entry.origin_server_ts,
) for entry in entries)
txn.executemany(sql, args)
elif isinstance(self.database_engine, Sqlite3Engine):
sql = (
"INSERT INTO event_search (event_id, room_id, key, value)"
" VALUES (?,?,?,?)"
)
args = ((
entry.event_id, entry.room_id, entry.key, entry.value,
) for entry in entries)
txn.executemany(sql, args)
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")
@defer.inlineCallbacks @defer.inlineCallbacks
def search_msgs(self, room_ids, search_term, keys): def search_msgs(self, room_ids, search_term, keys):
"""Performs a full text search over events with given keys. """Performs a full text search over events with given keys.