forked from MirrorHub/synapse
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/split_stream_store
This commit is contained in:
commit
64346be26d
6 changed files with 221 additions and 225 deletions
|
@ -53,8 +53,6 @@ class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
|
||||||
|
|
||||||
super(SlavedEventStore, self).__init__(db_conn, hs)
|
super(SlavedEventStore, self).__init__(db_conn, hs)
|
||||||
|
|
||||||
self.stream_ordering_month_ago = 0
|
|
||||||
|
|
||||||
# Cached functions can't be accessed through a class instance so we need
|
# Cached functions can't be accessed through a class instance so we need
|
||||||
# to reach inside the __dict__ to extract them.
|
# to reach inside the __dict__ to extract them.
|
||||||
get_latest_event_ids_in_room = EventFederationStore.__dict__[
|
get_latest_event_ids_in_room = EventFederationStore.__dict__[
|
||||||
|
|
|
@ -14,32 +14,19 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from ._base import BaseSlavedStore
|
from ._base import BaseSlavedStore
|
||||||
from synapse.storage import DataStore
|
from synapse.storage.room import RoomWorkerStore
|
||||||
from synapse.storage.room import RoomStore
|
|
||||||
from ._slaved_id_tracker import SlavedIdTracker
|
from ._slaved_id_tracker import SlavedIdTracker
|
||||||
|
|
||||||
|
|
||||||
class RoomStore(BaseSlavedStore):
|
class RoomStore(RoomWorkerStore, BaseSlavedStore):
|
||||||
def __init__(self, db_conn, hs):
|
def __init__(self, db_conn, hs):
|
||||||
super(RoomStore, self).__init__(db_conn, hs)
|
super(RoomStore, self).__init__(db_conn, hs)
|
||||||
self._public_room_id_gen = SlavedIdTracker(
|
self._public_room_id_gen = SlavedIdTracker(
|
||||||
db_conn, "public_room_list_stream", "stream_id"
|
db_conn, "public_room_list_stream", "stream_id"
|
||||||
)
|
)
|
||||||
|
|
||||||
get_public_room_ids = DataStore.get_public_room_ids.__func__
|
def get_current_public_room_stream_id(self):
|
||||||
get_current_public_room_stream_id = (
|
return self._public_room_id_gen.get_current_token()
|
||||||
DataStore.get_current_public_room_stream_id.__func__
|
|
||||||
)
|
|
||||||
get_public_room_ids_at_stream_id = (
|
|
||||||
RoomStore.__dict__["get_public_room_ids_at_stream_id"]
|
|
||||||
)
|
|
||||||
get_public_room_ids_at_stream_id_txn = (
|
|
||||||
DataStore.get_public_room_ids_at_stream_id_txn.__func__
|
|
||||||
)
|
|
||||||
get_published_at_stream_id_txn = (
|
|
||||||
DataStore.get_published_at_stream_id_txn.__func__
|
|
||||||
)
|
|
||||||
get_public_room_changes = DataStore.get_public_room_changes.__func__
|
|
||||||
|
|
||||||
def stream_positions(self):
|
def stream_positions(self):
|
||||||
result = super(RoomStore, self).stream_positions()
|
result = super(RoomStore, self).stream_positions()
|
||||||
|
|
|
@ -20,7 +20,6 @@ from synapse.storage.devices import DeviceStore
|
||||||
from .appservice import (
|
from .appservice import (
|
||||||
ApplicationServiceStore, ApplicationServiceTransactionStore
|
ApplicationServiceStore, ApplicationServiceTransactionStore
|
||||||
)
|
)
|
||||||
from ._base import LoggingTransaction
|
|
||||||
from .directory import DirectoryStore
|
from .directory import DirectoryStore
|
||||||
from .events import EventsStore
|
from .events import EventsStore
|
||||||
from .presence import PresenceStore, UserPresenceState
|
from .presence import PresenceStore, UserPresenceState
|
||||||
|
@ -213,20 +212,6 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
prefilled_cache=_group_updates_prefill,
|
prefilled_cache=_group_updates_prefill,
|
||||||
)
|
)
|
||||||
|
|
||||||
cur = LoggingTransaction(
|
|
||||||
db_conn.cursor(),
|
|
||||||
name="_find_stream_orderings_for_times_txn",
|
|
||||||
database_engine=self.database_engine,
|
|
||||||
after_callbacks=[],
|
|
||||||
final_callbacks=[],
|
|
||||||
)
|
|
||||||
self._find_stream_orderings_for_times_txn(cur)
|
|
||||||
cur.close()
|
|
||||||
|
|
||||||
self.find_stream_orderings_looping_call = self._clock.looping_call(
|
|
||||||
self._find_stream_orderings_for_times, 10 * 60 * 1000
|
|
||||||
)
|
|
||||||
|
|
||||||
self._stream_order_on_start = self.get_room_max_stream_ordering()
|
self._stream_order_on_start = self.get_room_max_stream_ordering()
|
||||||
self._min_stream_order_on_start = self.get_room_min_stream_ordering()
|
self._min_stream_order_on_start = self.get_room_min_stream_ordering()
|
||||||
|
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from ._base import SQLBaseStore
|
from synapse.storage._base import SQLBaseStore, LoggingTransaction
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from synapse.util.async import sleep
|
from synapse.util.async import sleep
|
||||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||||
|
@ -64,6 +64,27 @@ def _deserialize_action(actions, is_highlight):
|
||||||
|
|
||||||
|
|
||||||
class EventPushActionsWorkerStore(SQLBaseStore):
|
class EventPushActionsWorkerStore(SQLBaseStore):
|
||||||
|
def __init__(self, db_conn, hs):
|
||||||
|
super(EventPushActionsWorkerStore, self).__init__(db_conn, hs)
|
||||||
|
|
||||||
|
# These get correctly set by _find_stream_orderings_for_times_txn
|
||||||
|
self.stream_ordering_month_ago = None
|
||||||
|
self.stream_ordering_day_ago = None
|
||||||
|
|
||||||
|
cur = LoggingTransaction(
|
||||||
|
db_conn.cursor(),
|
||||||
|
name="_find_stream_orderings_for_times_txn",
|
||||||
|
database_engine=self.database_engine,
|
||||||
|
after_callbacks=[],
|
||||||
|
final_callbacks=[],
|
||||||
|
)
|
||||||
|
self._find_stream_orderings_for_times_txn(cur)
|
||||||
|
cur.close()
|
||||||
|
|
||||||
|
self.find_stream_orderings_looping_call = self._clock.looping_call(
|
||||||
|
self._find_stream_orderings_for_times, 10 * 60 * 1000
|
||||||
|
)
|
||||||
|
|
||||||
@cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
|
@cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
|
||||||
def get_unread_event_push_actions_by_room_for_user(
|
def get_unread_event_push_actions_by_room_for_user(
|
||||||
self, room_id, user_id, last_read_event_id
|
self, room_id, user_id, last_read_event_id
|
||||||
|
@ -443,6 +464,69 @@ class EventPushActionsWorkerStore(SQLBaseStore):
|
||||||
desc="remove_push_actions_from_staging",
|
desc="remove_push_actions_from_staging",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _find_stream_orderings_for_times(self):
|
||||||
|
yield self.runInteraction(
|
||||||
|
"_find_stream_orderings_for_times",
|
||||||
|
self._find_stream_orderings_for_times_txn
|
||||||
|
)
|
||||||
|
|
||||||
|
def _find_stream_orderings_for_times_txn(self, txn):
|
||||||
|
logger.info("Searching for stream ordering 1 month ago")
|
||||||
|
self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
|
||||||
|
txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"Found stream ordering 1 month ago: it's %d",
|
||||||
|
self.stream_ordering_month_ago
|
||||||
|
)
|
||||||
|
logger.info("Searching for stream ordering 1 day ago")
|
||||||
|
self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
|
||||||
|
txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"Found stream ordering 1 day ago: it's %d",
|
||||||
|
self.stream_ordering_day_ago
|
||||||
|
)
|
||||||
|
|
||||||
|
def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
|
||||||
|
"""
|
||||||
|
Find the stream_ordering of the first event that was received after
|
||||||
|
a given timestamp. This is relatively slow as there is no index on
|
||||||
|
received_ts but we can then use this to delete push actions before
|
||||||
|
this.
|
||||||
|
|
||||||
|
received_ts must necessarily be in the same order as stream_ordering
|
||||||
|
and stream_ordering is indexed, so we manually binary search using
|
||||||
|
stream_ordering
|
||||||
|
"""
|
||||||
|
txn.execute("SELECT MAX(stream_ordering) FROM events")
|
||||||
|
max_stream_ordering = txn.fetchone()[0]
|
||||||
|
|
||||||
|
if max_stream_ordering is None:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
range_start = 0
|
||||||
|
range_end = max_stream_ordering
|
||||||
|
|
||||||
|
sql = (
|
||||||
|
"SELECT received_ts FROM events"
|
||||||
|
" WHERE stream_ordering > ?"
|
||||||
|
" ORDER BY stream_ordering"
|
||||||
|
" LIMIT 1"
|
||||||
|
)
|
||||||
|
|
||||||
|
while range_end - range_start > 1:
|
||||||
|
middle = int((range_end + range_start) / 2)
|
||||||
|
txn.execute(sql, (middle,))
|
||||||
|
middle_ts = txn.fetchone()[0]
|
||||||
|
if ts > middle_ts:
|
||||||
|
range_start = middle
|
||||||
|
else:
|
||||||
|
range_end = middle
|
||||||
|
|
||||||
|
return range_end
|
||||||
|
|
||||||
|
|
||||||
class EventPushActionsStore(EventPushActionsWorkerStore):
|
class EventPushActionsStore(EventPushActionsWorkerStore):
|
||||||
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
|
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
|
||||||
|
@ -650,69 +734,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
|
||||||
WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
|
WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
|
||||||
""", (room_id, user_id, stream_ordering))
|
""", (room_id, user_id, stream_ordering))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _find_stream_orderings_for_times(self):
|
|
||||||
yield self.runInteraction(
|
|
||||||
"_find_stream_orderings_for_times",
|
|
||||||
self._find_stream_orderings_for_times_txn
|
|
||||||
)
|
|
||||||
|
|
||||||
def _find_stream_orderings_for_times_txn(self, txn):
|
|
||||||
logger.info("Searching for stream ordering 1 month ago")
|
|
||||||
self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
|
|
||||||
txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
|
|
||||||
)
|
|
||||||
logger.info(
|
|
||||||
"Found stream ordering 1 month ago: it's %d",
|
|
||||||
self.stream_ordering_month_ago
|
|
||||||
)
|
|
||||||
logger.info("Searching for stream ordering 1 day ago")
|
|
||||||
self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
|
|
||||||
txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
|
|
||||||
)
|
|
||||||
logger.info(
|
|
||||||
"Found stream ordering 1 day ago: it's %d",
|
|
||||||
self.stream_ordering_day_ago
|
|
||||||
)
|
|
||||||
|
|
||||||
def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
|
|
||||||
"""
|
|
||||||
Find the stream_ordering of the first event that was received after
|
|
||||||
a given timestamp. This is relatively slow as there is no index on
|
|
||||||
received_ts but we can then use this to delete push actions before
|
|
||||||
this.
|
|
||||||
|
|
||||||
received_ts must necessarily be in the same order as stream_ordering
|
|
||||||
and stream_ordering is indexed, so we manually binary search using
|
|
||||||
stream_ordering
|
|
||||||
"""
|
|
||||||
txn.execute("SELECT MAX(stream_ordering) FROM events")
|
|
||||||
max_stream_ordering = txn.fetchone()[0]
|
|
||||||
|
|
||||||
if max_stream_ordering is None:
|
|
||||||
return 0
|
|
||||||
|
|
||||||
range_start = 0
|
|
||||||
range_end = max_stream_ordering
|
|
||||||
|
|
||||||
sql = (
|
|
||||||
"SELECT received_ts FROM events"
|
|
||||||
" WHERE stream_ordering > ?"
|
|
||||||
" ORDER BY stream_ordering"
|
|
||||||
" LIMIT 1"
|
|
||||||
)
|
|
||||||
|
|
||||||
while range_end - range_start > 1:
|
|
||||||
middle = int((range_end + range_start) / 2)
|
|
||||||
txn.execute(sql, (middle,))
|
|
||||||
middle_ts = txn.fetchone()[0]
|
|
||||||
if ts > middle_ts:
|
|
||||||
range_start = middle
|
|
||||||
else:
|
|
||||||
range_end = middle
|
|
||||||
|
|
||||||
return range_end
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _rotate_notifs(self):
|
def _rotate_notifs(self):
|
||||||
if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
|
if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.api.errors import StoreError
|
from synapse.api.errors import StoreError
|
||||||
|
from synapse.storage._base import SQLBaseStore
|
||||||
from synapse.storage.search import SearchStore
|
from synapse.storage.search import SearchStore
|
||||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||||
|
|
||||||
|
@ -38,7 +39,126 @@ RatelimitOverride = collections.namedtuple(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class RoomStore(SearchStore):
|
class RoomWorkerStore(SQLBaseStore):
|
||||||
|
def get_public_room_ids(self):
|
||||||
|
return self._simple_select_onecol(
|
||||||
|
table="rooms",
|
||||||
|
keyvalues={
|
||||||
|
"is_public": True,
|
||||||
|
},
|
||||||
|
retcol="room_id",
|
||||||
|
desc="get_public_room_ids",
|
||||||
|
)
|
||||||
|
|
||||||
|
@cached(num_args=2, max_entries=100)
|
||||||
|
def get_public_room_ids_at_stream_id(self, stream_id, network_tuple):
|
||||||
|
"""Get pulbic rooms for a particular list, or across all lists.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
stream_id (int)
|
||||||
|
network_tuple (ThirdPartyInstanceID): The list to use (None, None)
|
||||||
|
means the main list, None means all lsits.
|
||||||
|
"""
|
||||||
|
return self.runInteraction(
|
||||||
|
"get_public_room_ids_at_stream_id",
|
||||||
|
self.get_public_room_ids_at_stream_id_txn,
|
||||||
|
stream_id, network_tuple=network_tuple
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_public_room_ids_at_stream_id_txn(self, txn, stream_id,
|
||||||
|
network_tuple):
|
||||||
|
return {
|
||||||
|
rm
|
||||||
|
for rm, vis in self.get_published_at_stream_id_txn(
|
||||||
|
txn, stream_id, network_tuple=network_tuple
|
||||||
|
).items()
|
||||||
|
if vis
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple):
|
||||||
|
if network_tuple:
|
||||||
|
# We want to get from a particular list. No aggregation required.
|
||||||
|
|
||||||
|
sql = ("""
|
||||||
|
SELECT room_id, visibility FROM public_room_list_stream
|
||||||
|
INNER JOIN (
|
||||||
|
SELECT room_id, max(stream_id) AS stream_id
|
||||||
|
FROM public_room_list_stream
|
||||||
|
WHERE stream_id <= ? %s
|
||||||
|
GROUP BY room_id
|
||||||
|
) grouped USING (room_id, stream_id)
|
||||||
|
""")
|
||||||
|
|
||||||
|
if network_tuple.appservice_id is not None:
|
||||||
|
txn.execute(
|
||||||
|
sql % ("AND appservice_id = ? AND network_id = ?",),
|
||||||
|
(stream_id, network_tuple.appservice_id, network_tuple.network_id,)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
txn.execute(
|
||||||
|
sql % ("AND appservice_id IS NULL",),
|
||||||
|
(stream_id,)
|
||||||
|
)
|
||||||
|
return dict(txn)
|
||||||
|
else:
|
||||||
|
# We want to get from all lists, so we need to aggregate the results
|
||||||
|
|
||||||
|
logger.info("Executing full list")
|
||||||
|
|
||||||
|
sql = ("""
|
||||||
|
SELECT room_id, visibility
|
||||||
|
FROM public_room_list_stream
|
||||||
|
INNER JOIN (
|
||||||
|
SELECT
|
||||||
|
room_id, max(stream_id) AS stream_id, appservice_id,
|
||||||
|
network_id
|
||||||
|
FROM public_room_list_stream
|
||||||
|
WHERE stream_id <= ?
|
||||||
|
GROUP BY room_id, appservice_id, network_id
|
||||||
|
) grouped USING (room_id, stream_id)
|
||||||
|
""")
|
||||||
|
|
||||||
|
txn.execute(
|
||||||
|
sql,
|
||||||
|
(stream_id,)
|
||||||
|
)
|
||||||
|
|
||||||
|
results = {}
|
||||||
|
# A room is visible if its visible on any list.
|
||||||
|
for room_id, visibility in txn:
|
||||||
|
results[room_id] = bool(visibility) or results.get(room_id, False)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
def get_public_room_changes(self, prev_stream_id, new_stream_id,
|
||||||
|
network_tuple):
|
||||||
|
def get_public_room_changes_txn(txn):
|
||||||
|
then_rooms = self.get_public_room_ids_at_stream_id_txn(
|
||||||
|
txn, prev_stream_id, network_tuple
|
||||||
|
)
|
||||||
|
|
||||||
|
now_rooms_dict = self.get_published_at_stream_id_txn(
|
||||||
|
txn, new_stream_id, network_tuple
|
||||||
|
)
|
||||||
|
|
||||||
|
now_rooms_visible = set(
|
||||||
|
rm for rm, vis in now_rooms_dict.items() if vis
|
||||||
|
)
|
||||||
|
now_rooms_not_visible = set(
|
||||||
|
rm for rm, vis in now_rooms_dict.items() if not vis
|
||||||
|
)
|
||||||
|
|
||||||
|
newly_visible = now_rooms_visible - then_rooms
|
||||||
|
newly_unpublished = now_rooms_not_visible & then_rooms
|
||||||
|
|
||||||
|
return newly_visible, newly_unpublished
|
||||||
|
|
||||||
|
return self.runInteraction(
|
||||||
|
"get_public_room_changes", get_public_room_changes_txn
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class RoomStore(RoomWorkerStore, SearchStore):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def store_room(self, room_id, room_creator_user_id, is_public):
|
def store_room(self, room_id, room_creator_user_id, is_public):
|
||||||
|
@ -225,16 +345,6 @@ class RoomStore(SearchStore):
|
||||||
)
|
)
|
||||||
self.hs.get_notifier().on_new_replication_data()
|
self.hs.get_notifier().on_new_replication_data()
|
||||||
|
|
||||||
def get_public_room_ids(self):
|
|
||||||
return self._simple_select_onecol(
|
|
||||||
table="rooms",
|
|
||||||
keyvalues={
|
|
||||||
"is_public": True,
|
|
||||||
},
|
|
||||||
retcol="room_id",
|
|
||||||
desc="get_public_room_ids",
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_room_count(self):
|
def get_room_count(self):
|
||||||
"""Retrieve a list of all rooms
|
"""Retrieve a list of all rooms
|
||||||
"""
|
"""
|
||||||
|
@ -326,113 +436,6 @@ class RoomStore(SearchStore):
|
||||||
def get_current_public_room_stream_id(self):
|
def get_current_public_room_stream_id(self):
|
||||||
return self._public_room_id_gen.get_current_token()
|
return self._public_room_id_gen.get_current_token()
|
||||||
|
|
||||||
@cached(num_args=2, max_entries=100)
|
|
||||||
def get_public_room_ids_at_stream_id(self, stream_id, network_tuple):
|
|
||||||
"""Get pulbic rooms for a particular list, or across all lists.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
stream_id (int)
|
|
||||||
network_tuple (ThirdPartyInstanceID): The list to use (None, None)
|
|
||||||
means the main list, None means all lsits.
|
|
||||||
"""
|
|
||||||
return self.runInteraction(
|
|
||||||
"get_public_room_ids_at_stream_id",
|
|
||||||
self.get_public_room_ids_at_stream_id_txn,
|
|
||||||
stream_id, network_tuple=network_tuple
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_public_room_ids_at_stream_id_txn(self, txn, stream_id,
|
|
||||||
network_tuple):
|
|
||||||
return {
|
|
||||||
rm
|
|
||||||
for rm, vis in self.get_published_at_stream_id_txn(
|
|
||||||
txn, stream_id, network_tuple=network_tuple
|
|
||||||
).items()
|
|
||||||
if vis
|
|
||||||
}
|
|
||||||
|
|
||||||
def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple):
|
|
||||||
if network_tuple:
|
|
||||||
# We want to get from a particular list. No aggregation required.
|
|
||||||
|
|
||||||
sql = ("""
|
|
||||||
SELECT room_id, visibility FROM public_room_list_stream
|
|
||||||
INNER JOIN (
|
|
||||||
SELECT room_id, max(stream_id) AS stream_id
|
|
||||||
FROM public_room_list_stream
|
|
||||||
WHERE stream_id <= ? %s
|
|
||||||
GROUP BY room_id
|
|
||||||
) grouped USING (room_id, stream_id)
|
|
||||||
""")
|
|
||||||
|
|
||||||
if network_tuple.appservice_id is not None:
|
|
||||||
txn.execute(
|
|
||||||
sql % ("AND appservice_id = ? AND network_id = ?",),
|
|
||||||
(stream_id, network_tuple.appservice_id, network_tuple.network_id,)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
txn.execute(
|
|
||||||
sql % ("AND appservice_id IS NULL",),
|
|
||||||
(stream_id,)
|
|
||||||
)
|
|
||||||
return dict(txn)
|
|
||||||
else:
|
|
||||||
# We want to get from all lists, so we need to aggregate the results
|
|
||||||
|
|
||||||
logger.info("Executing full list")
|
|
||||||
|
|
||||||
sql = ("""
|
|
||||||
SELECT room_id, visibility
|
|
||||||
FROM public_room_list_stream
|
|
||||||
INNER JOIN (
|
|
||||||
SELECT
|
|
||||||
room_id, max(stream_id) AS stream_id, appservice_id,
|
|
||||||
network_id
|
|
||||||
FROM public_room_list_stream
|
|
||||||
WHERE stream_id <= ?
|
|
||||||
GROUP BY room_id, appservice_id, network_id
|
|
||||||
) grouped USING (room_id, stream_id)
|
|
||||||
""")
|
|
||||||
|
|
||||||
txn.execute(
|
|
||||||
sql,
|
|
||||||
(stream_id,)
|
|
||||||
)
|
|
||||||
|
|
||||||
results = {}
|
|
||||||
# A room is visible if its visible on any list.
|
|
||||||
for room_id, visibility in txn:
|
|
||||||
results[room_id] = bool(visibility) or results.get(room_id, False)
|
|
||||||
|
|
||||||
return results
|
|
||||||
|
|
||||||
def get_public_room_changes(self, prev_stream_id, new_stream_id,
|
|
||||||
network_tuple):
|
|
||||||
def get_public_room_changes_txn(txn):
|
|
||||||
then_rooms = self.get_public_room_ids_at_stream_id_txn(
|
|
||||||
txn, prev_stream_id, network_tuple
|
|
||||||
)
|
|
||||||
|
|
||||||
now_rooms_dict = self.get_published_at_stream_id_txn(
|
|
||||||
txn, new_stream_id, network_tuple
|
|
||||||
)
|
|
||||||
|
|
||||||
now_rooms_visible = set(
|
|
||||||
rm for rm, vis in now_rooms_dict.items() if vis
|
|
||||||
)
|
|
||||||
now_rooms_not_visible = set(
|
|
||||||
rm for rm, vis in now_rooms_dict.items() if not vis
|
|
||||||
)
|
|
||||||
|
|
||||||
newly_visible = now_rooms_visible - then_rooms
|
|
||||||
newly_unpublished = now_rooms_not_visible & then_rooms
|
|
||||||
|
|
||||||
return newly_visible, newly_unpublished
|
|
||||||
|
|
||||||
return self.runInteraction(
|
|
||||||
"get_public_room_changes", get_public_room_changes_txn
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_all_new_public_rooms(self, prev_id, current_id, limit):
|
def get_all_new_public_rooms(self, prev_id, current_id, limit):
|
||||||
def get_all_new_public_rooms(txn):
|
def get_all_new_public_rooms(txn):
|
||||||
sql = ("""
|
sql = ("""
|
||||||
|
|
|
@ -299,10 +299,6 @@ def preserve_fn(f):
|
||||||
Useful for wrapping functions that return a deferred which you don't yield
|
Useful for wrapping functions that return a deferred which you don't yield
|
||||||
on.
|
on.
|
||||||
"""
|
"""
|
||||||
def reset_context(result):
|
|
||||||
LoggingContext.set_current_context(LoggingContext.sentinel)
|
|
||||||
return result
|
|
||||||
|
|
||||||
def g(*args, **kwargs):
|
def g(*args, **kwargs):
|
||||||
current = LoggingContext.current_context()
|
current = LoggingContext.current_context()
|
||||||
res = f(*args, **kwargs)
|
res = f(*args, **kwargs)
|
||||||
|
@ -323,12 +319,11 @@ def preserve_fn(f):
|
||||||
# which is supposed to have a single entry and exit point. But
|
# which is supposed to have a single entry and exit point. But
|
||||||
# by spawning off another deferred, we are effectively
|
# by spawning off another deferred, we are effectively
|
||||||
# adding a new exit point.)
|
# adding a new exit point.)
|
||||||
res.addBoth(reset_context)
|
res.addBoth(_set_context_cb, LoggingContext.sentinel)
|
||||||
return res
|
return res
|
||||||
return g
|
return g
|
||||||
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def make_deferred_yieldable(deferred):
|
def make_deferred_yieldable(deferred):
|
||||||
"""Given a deferred, make it follow the Synapse logcontext rules:
|
"""Given a deferred, make it follow the Synapse logcontext rules:
|
||||||
|
|
||||||
|
@ -342,9 +337,16 @@ def make_deferred_yieldable(deferred):
|
||||||
|
|
||||||
(This is more-or-less the opposite operation to preserve_fn.)
|
(This is more-or-less the opposite operation to preserve_fn.)
|
||||||
"""
|
"""
|
||||||
with PreserveLoggingContext():
|
if isinstance(deferred, defer.Deferred) and not deferred.called:
|
||||||
r = yield deferred
|
prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)
|
||||||
defer.returnValue(r)
|
deferred.addBoth(_set_context_cb, prev_context)
|
||||||
|
return deferred
|
||||||
|
|
||||||
|
|
||||||
|
def _set_context_cb(result, context):
|
||||||
|
"""A callback function which just sets the logging context"""
|
||||||
|
LoggingContext.set_current_context(context)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
# modules to ignore in `logcontext_tracer`
|
# modules to ignore in `logcontext_tracer`
|
||||||
|
|
Loading…
Reference in a new issue