Merge pull request #3653 from matrix-org/erikj/split_federation

Move more federation APIs to workers
This commit is contained in:
Erik Johnston 2018-08-15 14:59:02 +01:00 committed by GitHub
commit dc56c47dc0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 512 additions and 138 deletions

1
changelog.d/3653.feature Normal file
View file

@ -0,0 +1 @@
Support more federation endpoints on workers

View file

@ -173,10 +173,23 @@ endpoints matching the following regular expressions::
^/_matrix/federation/v1/backfill/
^/_matrix/federation/v1/get_missing_events/
^/_matrix/federation/v1/publicRooms
^/_matrix/federation/v1/query/
^/_matrix/federation/v1/make_join/
^/_matrix/federation/v1/make_leave/
^/_matrix/federation/v1/send_join/
^/_matrix/federation/v1/send_leave/
^/_matrix/federation/v1/invite/
^/_matrix/federation/v1/query_auth/
^/_matrix/federation/v1/event_auth/
^/_matrix/federation/v1/exchange_third_party_invite/
^/_matrix/federation/v1/send/
The above endpoints should all be routed to the federation_reader worker by the
reverse-proxy configuration.
The `^/_matrix/federation/v1/send/` endpoint must only be handled by a single
instance.
``synapse.app.federation_sender``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

View file

@ -39,7 +39,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import (
JoinedRoomMemberListRestServlet,
@ -66,7 +66,7 @@ class ClientReaderSlavedStore(
DirectoryStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
TransactionStore,
SlavedTransactionStore,
SlavedClientIpStore,
BaseSlavedStore,
):

View file

@ -43,7 +43,7 @@ from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import (
JoinRoomAliasServlet,
@ -63,7 +63,7 @@ logger = logging.getLogger("synapse.app.event_creator")
class EventCreatorSlavedStore(
DirectoryStore,
TransactionStore,
SlavedTransactionStore,
SlavedProfileStore,
SlavedAccountDataStore,
SlavedPusherStore,

View file

@ -32,11 +32,16 @@ from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.profile import SlavedProfileStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@ -49,11 +54,16 @@ logger = logging.getLogger("synapse.app.federation_reader")
class FederationReaderSlavedStore(
SlavedProfileStore,
SlavedApplicationServiceStore,
SlavedPusherStore,
SlavedPushRuleStore,
SlavedReceiptsStore,
SlavedEventStore,
SlavedKeyStore,
RoomStore,
DirectoryStore,
TransactionStore,
SlavedTransactionStore,
BaseSlavedStore,
):
pass

View file

@ -36,7 +36,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@ -50,7 +50,7 @@ logger = logging.getLogger("synapse.app.federation_sender")
class FederationSenderSlaveStore(
SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore,
SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
):
def __init__(self, db_conn, hs):

View file

@ -34,7 +34,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.server import HomeServer
@ -52,7 +52,7 @@ class MediaRepositorySlavedStore(
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedClientIpStore,
TransactionStore,
SlavedTransactionStore,
BaseSlavedStore,
MediaRepositoryStore,
):

View file

@ -39,6 +39,10 @@ from synapse.federation.federation_base import FederationBase, event_from_pdu_js
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.http.endpoint import parse_server_name
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
ReplicationGetQueryRestServlet,
)
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
@ -760,6 +764,8 @@ class FederationHandlerRegistry(object):
if edu_type in self.edu_handlers:
raise KeyError("Already have an EDU handler for %s" % (edu_type,))
logger.info("Registering federation EDU handler for %r", edu_type)
self.edu_handlers[edu_type] = handler
def register_query_handler(self, query_type, handler):
@ -778,6 +784,8 @@ class FederationHandlerRegistry(object):
"Already have a Query handler for %s" % (query_type,)
)
logger.info("Registering federation query handler for %r", query_type)
self.query_handlers[query_type] = handler
@defer.inlineCallbacks
@ -800,3 +808,49 @@ class FederationHandlerRegistry(object):
raise NotFoundError("No handler for Query type '%s'" % (query_type,))
return handler(args)
class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
"""A FederationHandlerRegistry for worker processes.
When receiving EDU or queries it will check if an appropriate handler has
been registered on the worker, if there isn't one then it calls off to the
master process.
"""
def __init__(self, hs):
self.config = hs.config
self.http_client = hs.get_simple_http_client()
self.clock = hs.get_clock()
self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
super(ReplicationFederationHandlerRegistry, self).__init__()
def on_edu(self, edu_type, origin, content):
"""Overrides FederationHandlerRegistry
"""
handler = self.edu_handlers.get(edu_type)
if handler:
return super(ReplicationFederationHandlerRegistry, self).on_edu(
edu_type, origin, content,
)
return self._send_edu(
edu_type=edu_type,
origin=origin,
content=content,
)
def on_query(self, query_type, args):
"""Overrides FederationHandlerRegistry
"""
handler = self.query_handlers.get(query_type)
if handler:
return handler(args)
return self._get_query_client(
query_type=query_type,
args=args,
)

View file

@ -49,6 +49,11 @@ from synapse.crypto.event_signing import (
compute_event_signature,
)
from synapse.events.validator import EventValidator
from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet,
ReplicationFederationSendEventsRestServlet,
)
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import resolve_events_with_factory
from synapse.types import UserID, get_domain_from_id
from synapse.util import logcontext, unwrapFirstError
@ -91,6 +96,18 @@ class FederationHandler(BaseHandler):
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
self._server_notices_mxid = hs.config.server_notices_mxid
self.config = hs.config
self.http_client = hs.get_simple_http_client()
self._send_events_to_master = (
ReplicationFederationSendEventsRestServlet.make_client(hs)
)
self._notify_user_membership_change = (
ReplicationUserJoinedLeftRoomRestServlet.make_client(hs)
)
self._clean_room_for_join_client = (
ReplicationCleanRoomRestServlet.make_client(hs)
)
# When joining a room we need to queue any events for that room up
self.room_queues = {}
@ -1158,7 +1175,7 @@ class FederationHandler(BaseHandler):
)
context = yield self.state_handler.compute_event_context(event)
yield self._persist_events([(event, context)])
yield self.persist_events_and_notify([(event, context)])
defer.returnValue(event)
@ -1189,7 +1206,7 @@ class FederationHandler(BaseHandler):
)
context = yield self.state_handler.compute_event_context(event)
yield self._persist_events([(event, context)])
yield self.persist_events_and_notify([(event, context)])
defer.returnValue(event)
@ -1432,7 +1449,7 @@ class FederationHandler(BaseHandler):
event, context
)
yield self._persist_events(
yield self.persist_events_and_notify(
[(event, context)],
backfilled=backfilled,
)
@ -1470,7 +1487,7 @@ class FederationHandler(BaseHandler):
], consumeErrors=True,
))
yield self._persist_events(
yield self.persist_events_and_notify(
[
(ev_info["event"], context)
for ev_info, context in zip(event_infos, contexts)
@ -1558,7 +1575,7 @@ class FederationHandler(BaseHandler):
raise
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
yield self._persist_events(
yield self.persist_events_and_notify(
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
@ -1569,7 +1586,7 @@ class FederationHandler(BaseHandler):
event, old_state=state
)
yield self._persist_events(
yield self.persist_events_and_notify(
[(event, new_event_context)],
)
@ -2297,7 +2314,7 @@ class FederationHandler(BaseHandler):
for revocation.
"""
try:
response = yield self.hs.get_simple_http_client().get_json(
response = yield self.http_client.get_json(
url,
{"public_key": public_key}
)
@ -2310,7 +2327,7 @@ class FederationHandler(BaseHandler):
raise AuthError(403, "Third party certificate was invalid")
@defer.inlineCallbacks
def _persist_events(self, event_and_contexts, backfilled=False):
def persist_events_and_notify(self, event_and_contexts, backfilled=False):
"""Persists events and tells the notifier/pushers about them, if
necessary.
@ -2322,14 +2339,21 @@ class FederationHandler(BaseHandler):
Returns:
Deferred
"""
max_stream_id = yield self.store.persist_events(
event_and_contexts,
backfilled=backfilled,
)
if self.config.worker_app:
yield self._send_events_to_master(
store=self.store,
event_and_contexts=event_and_contexts,
backfilled=backfilled
)
else:
max_stream_id = yield self.store.persist_events(
event_and_contexts,
backfilled=backfilled,
)
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
self._notify_persisted_event(event, max_stream_id)
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
self._notify_persisted_event(event, max_stream_id)
def _notify_persisted_event(self, event, max_stream_id):
"""Checks to see if notifier/pushers should be notified about the
@ -2368,9 +2392,25 @@ class FederationHandler(BaseHandler):
)
def _clean_room_for_join(self, room_id):
return self.store.clean_room_for_join(room_id)
"""Called to clean up any data in DB for a given room, ready for the
server to join the room.
Args:
room_id (str)
"""
if self.config.worker_app:
return self._clean_room_for_join_client(room_id)
else:
return self.store.clean_room_for_join(room_id)
def user_joined_room(self, user, room_id):
"""Called when a new user has joined the room
"""
return user_joined_room(self.distributor, user, room_id)
if self.config.worker_app:
return self._notify_user_membership_change(
room_id=room_id,
user_id=user.to_string(),
change="joined",
)
else:
return user_joined_room(self.distributor, user, room_id)

View file

@ -14,7 +14,7 @@
# limitations under the License.
from synapse.http.server import JsonResource
from synapse.replication.http import membership, send_event
from synapse.replication.http import federation, membership, send_event
REPLICATION_PREFIX = "/_synapse/replication"
@ -27,3 +27,4 @@ class ReplicationRestResource(JsonResource):
def register_servlets(self, hs):
send_event.register_servlets(hs, self)
membership.register_servlets(hs, self)
federation.register_servlets(hs, self)

View file

@ -0,0 +1,259 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from twisted.internet import defer
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
"""Handles events newly received from federation, including persisting and
notifying.
The API looks like:
POST /_synapse/replication/fed_send_events/:txn_id
{
"events": [{
"event": { .. serialized event .. },
"internal_metadata": { .. serialized internal_metadata .. },
"rejected_reason": .., // The event.rejected_reason field
"context": { .. serialized event context .. },
}],
"backfilled": false
"""
NAME = "fed_send_events"
PATH_ARGS = ()
def __init__(self, hs):
super(ReplicationFederationSendEventsRestServlet, self).__init__(hs)
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self.federation_handler = hs.get_handlers().federation_handler
@staticmethod
@defer.inlineCallbacks
def _serialize_payload(store, event_and_contexts, backfilled):
"""
Args:
store
event_and_contexts (list[tuple[FrozenEvent, EventContext]])
backfilled (bool): Whether or not the events are the result of
backfilling
"""
event_payloads = []
for event, context in event_and_contexts:
serialized_context = yield context.serialize(event, store)
event_payloads.append({
"event": event.get_pdu_json(),
"internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason,
"context": serialized_context,
})
payload = {
"events": event_payloads,
"backfilled": backfilled,
}
defer.returnValue(payload)
@defer.inlineCallbacks
def _handle_request(self, request):
with Measure(self.clock, "repl_fed_send_events_parse"):
content = parse_json_object_from_request(request)
backfilled = content["backfilled"]
event_payloads = content["events"]
event_and_contexts = []
for event_payload in event_payloads:
event_dict = event_payload["event"]
internal_metadata = event_payload["internal_metadata"]
rejected_reason = event_payload["rejected_reason"]
event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
context = yield EventContext.deserialize(
self.store, event_payload["context"],
)
event_and_contexts.append((event, context))
logger.info(
"Got %d events from federation",
len(event_and_contexts),
)
yield self.federation_handler.persist_events_and_notify(
event_and_contexts, backfilled,
)
defer.returnValue((200, {}))
class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
"""Handles EDUs newly received from federation, including persisting and
notifying.
Request format:
POST /_synapse/replication/fed_send_edu/:edu_type/:txn_id
{
"origin": ...,
"content: { ... }
}
"""
NAME = "fed_send_edu"
PATH_ARGS = ("edu_type",)
def __init__(self, hs):
super(ReplicationFederationSendEduRestServlet, self).__init__(hs)
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self.registry = hs.get_federation_registry()
@staticmethod
def _serialize_payload(edu_type, origin, content):
return {
"origin": origin,
"content": content,
}
@defer.inlineCallbacks
def _handle_request(self, request, edu_type):
with Measure(self.clock, "repl_fed_send_edu_parse"):
content = parse_json_object_from_request(request)
origin = content["origin"]
edu_content = content["content"]
logger.info(
"Got %r edu from $s",
edu_type, origin,
)
result = yield self.registry.on_edu(edu_type, origin, edu_content)
defer.returnValue((200, result))
class ReplicationGetQueryRestServlet(ReplicationEndpoint):
"""Handle responding to queries from federation.
Request format:
POST /_synapse/replication/fed_query/:query_type
{
"args": { ... }
}
"""
NAME = "fed_query"
PATH_ARGS = ("query_type",)
# This is a query, so let's not bother caching
CACHE = False
def __init__(self, hs):
super(ReplicationGetQueryRestServlet, self).__init__(hs)
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self.registry = hs.get_federation_registry()
@staticmethod
def _serialize_payload(query_type, args):
"""
Args:
query_type (str)
args (dict): The arguments received for the given query type
"""
return {
"args": args,
}
@defer.inlineCallbacks
def _handle_request(self, request, query_type):
with Measure(self.clock, "repl_fed_query_parse"):
content = parse_json_object_from_request(request)
args = content["args"]
logger.info(
"Got %r query",
query_type,
)
result = yield self.registry.on_query(query_type, args)
defer.returnValue((200, result))
class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
"""Called to clean up any data in DB for a given room, ready for the
server to join the room.
Request format:
POST /_synapse/replication/fed_query/:fed_cleanup_room/:txn_id
{}
"""
NAME = "fed_cleanup_room"
PATH_ARGS = ("room_id",)
def __init__(self, hs):
super(ReplicationCleanRoomRestServlet, self).__init__(hs)
self.store = hs.get_datastore()
@staticmethod
def _serialize_payload(room_id, args):
"""
Args:
room_id (str)
"""
return {}
@defer.inlineCallbacks
def _handle_request(self, request, room_id):
yield self.store.clean_room_for_join(room_id)
defer.returnValue((200, {}))
def register_servlets(hs, http_server):
ReplicationFederationSendEventsRestServlet(hs).register(http_server)
ReplicationFederationSendEduRestServlet(hs).register(http_server)
ReplicationGetQueryRestServlet(hs).register(http_server)
ReplicationCleanRoomRestServlet(hs).register(http_server)

View file

@ -13,19 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.storage import DataStore
from synapse.storage.transactions import TransactionStore
from ._base import BaseSlavedStore
class TransactionStore(BaseSlavedStore):
get_destination_retry_timings = TransactionStore.__dict__[
"get_destination_retry_timings"
]
_get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__
set_destination_retry_timings = DataStore.set_destination_retry_timings.__func__
_set_destination_retry_timings = DataStore._set_destination_retry_timings.__func__
prep_send_transaction = DataStore.prep_send_transaction.__func__
delivered_txn = DataStore.delivered_txn.__func__
class SlavedTransactionStore(TransactionStore, BaseSlavedStore):
pass

View file

@ -36,6 +36,7 @@ from synapse.federation.federation_client import FederationClient
from synapse.federation.federation_server import (
FederationHandlerRegistry,
FederationServer,
ReplicationFederationHandlerRegistry,
)
from synapse.federation.send_queue import FederationRemoteSendQueue
from synapse.federation.transaction_queue import TransactionQueue
@ -423,7 +424,10 @@ class HomeServer(object):
return RoomMemberMasterHandler(self)
def build_federation_registry(self):
return FederationHandlerRegistry()
if self.config.worker_app:
return ReplicationFederationHandlerRegistry(self)
else:
return FederationHandlerRegistry()
def build_server_notices_manager(self):
if self.config.worker_app:

View file

@ -1435,88 +1435,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
(event.event_id, event.redacts)
)
@defer.inlineCallbacks
def have_events_in_timeline(self, event_ids):
"""Given a list of event ids, check if we have already processed and
stored them as non outliers.
"""
rows = yield self._simple_select_many_batch(
table="events",
retcols=("event_id",),
column="event_id",
iterable=list(event_ids),
keyvalues={"outlier": False},
desc="have_events_in_timeline",
)
defer.returnValue(set(r["event_id"] for r in rows))
@defer.inlineCallbacks
def have_seen_events(self, event_ids):
"""Given a list of event ids, check if we have already processed them.
Args:
event_ids (iterable[str]):
Returns:
Deferred[set[str]]: The events we have already seen.
"""
results = set()
def have_seen_events_txn(txn, chunk):
sql = (
"SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
% (",".join("?" * len(chunk)), )
)
txn.execute(sql, chunk)
for (event_id, ) in txn:
results.add(event_id)
# break the input up into chunks of 100
input_iterator = iter(event_ids)
for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
[]):
yield self.runInteraction(
"have_seen_events",
have_seen_events_txn,
chunk,
)
defer.returnValue(results)
def get_seen_events_with_rejections(self, event_ids):
"""Given a list of event ids, check if we rejected them.
Args:
event_ids (list[str])
Returns:
Deferred[dict[str, str|None):
Has an entry for each event id we already have seen. Maps to
the rejected reason string if we rejected the event, else maps
to None.
"""
if not event_ids:
return defer.succeed({})
def f(txn):
sql = (
"SELECT e.event_id, reason FROM events as e "
"LEFT JOIN rejections as r ON e.event_id = r.event_id "
"WHERE e.event_id = ?"
)
res = {}
for event_id in event_ids:
txn.execute(sql, (event_id,))
row = txn.fetchone()
if row:
_, rejected = row
res[event_id] = rejected
return res
return self.runInteraction("get_rejection_reasons", f)
@defer.inlineCallbacks
def count_daily_messages(self):
"""

View file

@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import logging
from collections import namedtuple
@ -442,3 +443,85 @@ class EventsWorkerStore(SQLBaseStore):
self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
defer.returnValue(cache_entry)
@defer.inlineCallbacks
def have_events_in_timeline(self, event_ids):
"""Given a list of event ids, check if we have already processed and
stored them as non outliers.
"""
rows = yield self._simple_select_many_batch(
table="events",
retcols=("event_id",),
column="event_id",
iterable=list(event_ids),
keyvalues={"outlier": False},
desc="have_events_in_timeline",
)
defer.returnValue(set(r["event_id"] for r in rows))
@defer.inlineCallbacks
def have_seen_events(self, event_ids):
"""Given a list of event ids, check if we have already processed them.
Args:
event_ids (iterable[str]):
Returns:
Deferred[set[str]]: The events we have already seen.
"""
results = set()
def have_seen_events_txn(txn, chunk):
sql = (
"SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
% (",".join("?" * len(chunk)), )
)
txn.execute(sql, chunk)
for (event_id, ) in txn:
results.add(event_id)
# break the input up into chunks of 100
input_iterator = iter(event_ids)
for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
[]):
yield self.runInteraction(
"have_seen_events",
have_seen_events_txn,
chunk,
)
defer.returnValue(results)
def get_seen_events_with_rejections(self, event_ids):
"""Given a list of event ids, check if we rejected them.
Args:
event_ids (list[str])
Returns:
Deferred[dict[str, str|None):
Has an entry for each event id we already have seen. Maps to
the rejected reason string if we rejected the event, else maps
to None.
"""
if not event_ids:
return defer.succeed({})
def f(txn):
sql = (
"SELECT e.event_id, reason FROM events as e "
"LEFT JOIN rejections as r ON e.event_id = r.event_id "
"WHERE e.event_id = ?"
)
res = {}
for event_id in event_ids:
txn.execute(sql, (event_id,))
row = txn.fetchone()
if row:
_, rejected = row
res[event_id] = rejected
return res
return self.runInteraction("get_rejection_reasons", f)

View file

@ -41,6 +41,22 @@ RatelimitOverride = collections.namedtuple(
class RoomWorkerStore(SQLBaseStore):
def get_room(self, room_id):
"""Retrieve a room.
Args:
room_id (str): The ID of the room to retrieve.
Returns:
A namedtuple containing the room information, or an empty list.
"""
return self._simple_select_one(
table="rooms",
keyvalues={"room_id": room_id},
retcols=("room_id", "is_public", "creator"),
desc="get_room",
allow_none=True,
)
def get_public_room_ids(self):
return self._simple_select_onecol(
table="rooms",
@ -215,22 +231,6 @@ class RoomStore(RoomWorkerStore, SearchStore):
logger.error("store_room with room_id=%s failed: %s", room_id, e)
raise StoreError(500, "Problem creating room.")
def get_room(self, room_id):
"""Retrieve a room.
Args:
room_id (str): The ID of the room to retrieve.
Returns:
A namedtuple containing the room information, or an empty list.
"""
return self._simple_select_one(
table="rooms",
keyvalues={"room_id": room_id},
retcols=("room_id", "is_public", "creator"),
desc="get_room",
allow_none=True,
)
@defer.inlineCallbacks
def set_room_is_public(self, room_id, is_public):
def set_room_is_public_txn(txn, next_id):