Move store_event_search_txn to SearchStore

... as a precursor to making event storing and doing the bg update share some
code.
This commit is contained in:
Richard van der Hoff 2018-02-03 22:57:33 +00:00
parent bb9f0f3cdb
commit 4eeae7ad65
2 changed files with 43 additions and 37 deletions

View file

@ -16,11 +16,9 @@
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import StoreError from synapse.api.errors import StoreError
from synapse.storage.search import SearchStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from ._base import SQLBaseStore
from .engines import PostgresEngine, Sqlite3Engine
import collections import collections
import logging import logging
import ujson as json import ujson as json
@ -40,7 +38,7 @@ RatelimitOverride = collections.namedtuple(
) )
class RoomStore(SQLBaseStore): class RoomStore(SearchStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def store_room(self, room_id, room_creator_user_id, is_public): def store_room(self, room_id, room_creator_user_id, is_public):
@ -263,8 +261,8 @@ class RoomStore(SQLBaseStore):
}, },
) )
self._store_event_search_txn( self.store_event_search_txn(
txn, event, "content.topic", event.content["topic"] txn, event, "content.topic", event.content["topic"],
) )
def _store_room_name_txn(self, txn, event): def _store_room_name_txn(self, txn, event):
@ -279,14 +277,14 @@ class RoomStore(SQLBaseStore):
} }
) )
self._store_event_search_txn( self.store_event_search_txn(
txn, event, "content.name", event.content["name"] txn, event, "content.name", event.content["name"],
) )
def _store_room_message_txn(self, txn, event): def _store_room_message_txn(self, txn, event):
if hasattr(event, "content") and "body" in event.content: if hasattr(event, "content") and "body" in event.content:
self._store_event_search_txn( self.store_event_search_txn(
txn, event, "content.body", event.content["body"] txn, event, "content.body", event.content["body"],
) )
def _store_history_visibility_txn(self, txn, event): def _store_history_visibility_txn(self, txn, event):
@ -308,33 +306,6 @@ class RoomStore(SQLBaseStore):
event.content[key] event.content[key]
)) ))
def _store_event_search_txn(self, txn, event, key, value):
if isinstance(self.database_engine, PostgresEngine):
txn.execute("SET work_mem='256kB'")
sql = (
"INSERT INTO event_search"
" (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
" VALUES (?,?,?,to_tsvector('english', ?),?,?)"
)
txn.execute(
sql,
(
event.event_id, event.room_id, key, value,
event.internal_metadata.stream_ordering,
event.origin_server_ts,
)
)
txn.execute("RESET work_mem")
elif isinstance(self.database_engine, Sqlite3Engine):
sql = (
"INSERT INTO event_search (event_id, room_id, key, value)"
" VALUES (?,?,?,?)"
)
txn.execute(sql, (event.event_id, event.room_id, key, value,))
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")
def add_event_report(self, room_id, event_id, user_id, reason, content, def add_event_report(self, room_id, event_id, user_id, reason, content,
received_ts): received_ts):
next_id = self._event_reports_id_gen.get_next() next_id = self._event_reports_id_gen.get_next()

View file

@ -246,6 +246,41 @@ class SearchStore(BackgroundUpdateStore):
defer.returnValue(num_rows) defer.returnValue(num_rows)
def store_event_search_txn(self, txn, event, key, value):
"""Add event to the search table
Args:
txn (cursor):
event (EventBase):
key (str):
value (str):
"""
if isinstance(self.database_engine, PostgresEngine):
txn.execute("SET work_mem='256kB'")
sql = (
"INSERT INTO event_search"
" (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
" VALUES (?,?,?,to_tsvector('english', ?),?,?)"
)
txn.execute(
sql,
(
event.event_id, event.room_id, key, value,
event.internal_metadata.stream_ordering,
event.origin_server_ts,
)
)
txn.execute("RESET work_mem")
elif isinstance(self.database_engine, Sqlite3Engine):
sql = (
"INSERT INTO event_search (event_id, room_id, key, value)"
" VALUES (?,?,?,?)"
)
txn.execute(sql, (event.event_id, event.room_id, key, value,))
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")
@defer.inlineCallbacks @defer.inlineCallbacks
def search_msgs(self, room_ids, search_term, keys): def search_msgs(self, room_ids, search_term, keys):
"""Performs a full text search over events with given keys. """Performs a full text search over events with given keys.