0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2025-01-21 01:12:06 +01:00

Add replication http endpoint for event sending

This commit is contained in:
Erik Johnston 2018-02-05 17:22:16 +00:00
parent e3624fad5f
commit 24dd73028a
8 changed files with 303 additions and 12 deletions

View file

@ -38,6 +38,7 @@ from synapse.metrics import register_memory_metrics
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \ from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
check_requirements check_requirements
from synapse.replication.http import ReplicationRestResource, REPLICATION_PREFIX
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.rest import ClientRestResource from synapse.rest import ClientRestResource
from synapse.rest.key.v1.server_key_resource import LocalKey from synapse.rest.key.v1.server_key_resource import LocalKey
@ -219,6 +220,9 @@ class SynapseHomeServer(HomeServer):
if name == "metrics" and self.get_config().enable_metrics: if name == "metrics" and self.get_config().enable_metrics:
resources[METRICS_PREFIX] = MetricsResource(self) resources[METRICS_PREFIX] = MetricsResource(self)
if name == "replication":
resources[REPLICATION_PREFIX] = ReplicationRestResource(self)
return resources return resources
def start_listening(self): def start_listening(self):

View file

@ -33,8 +33,16 @@ class WorkerConfig(Config):
self.worker_pid_file = config.get("worker_pid_file") self.worker_pid_file = config.get("worker_pid_file")
self.worker_log_file = config.get("worker_log_file") self.worker_log_file = config.get("worker_log_file")
self.worker_log_config = config.get("worker_log_config") self.worker_log_config = config.get("worker_log_config")
# The host used to connect to the main synapse
self.worker_replication_host = config.get("worker_replication_host", None) self.worker_replication_host = config.get("worker_replication_host", None)
# The port on the main synapse for TCP replication
self.worker_replication_port = config.get("worker_replication_port", None) self.worker_replication_port = config.get("worker_replication_port", None)
# The port on the main synapse for HTTP replication endpoint
self.worker_replication_http_port = config.get("worker_replication_http_port")
self.worker_name = config.get("worker_name", self.worker_app) self.worker_name = config.get("worker_name", self.worker_app)
self.worker_main_http_uri = config.get("worker_main_http_uri", None) self.worker_main_http_uri = config.get("worker_main_http_uri", None)

View file

@ -14,6 +14,9 @@
# limitations under the License. # limitations under the License.
from frozendict import frozendict
class EventContext(object): class EventContext(object):
""" """
Attributes: Attributes:
@ -73,3 +76,72 @@ class EventContext(object):
self.prev_state_events = None self.prev_state_events = None
self.app_service = None self.app_service = None
def serialize(self):
"""Converts self to a type that can be serialized as JSON, and then
deserialized by `deserialize`
Returns:
dict
"""
return {
"current_state_ids": _encode_state_dict(self.current_state_ids),
"prev_state_ids": _encode_state_dict(self.prev_state_ids),
"state_group": self.state_group,
"rejected": self.rejected,
"push_actions": self.push_actions,
"prev_group": self.prev_group,
"delta_ids": _encode_state_dict(self.delta_ids),
"prev_state_events": self.prev_state_events,
"app_service_id": self.app_service.id if self.app_service else None
}
@staticmethod
def deserialize(store, input):
"""Converts a dict that was produced by `serialize` back into a
EventContext.
Args:
store (DataStore): Used to convert AS ID to AS object
input (dict): A dict produced by `serialize`
Returns:
EventContext
"""
context = EventContext()
context.current_state_ids = _decode_state_dict(input["current_state_ids"])
context.prev_state_ids = _decode_state_dict(input["prev_state_ids"])
context.state_group = input["state_group"]
context.rejected = input["rejected"]
context.push_actions = input["push_actions"]
context.prev_group = input["prev_group"]
context.delta_ids = _decode_state_dict(input["delta_ids"])
context.prev_state_events = input["prev_state_events"]
app_service_id = input["app_service_id"]
if app_service_id:
context.app_service = store.get_app_service_by_id(app_service_id)
return context
def _encode_state_dict(state_dict):
"""Since dicts of (type, state_key) -> event_id cannot be serialized in
JSON we need to convert them to a form that can.
"""
if state_dict is None:
return None
return [
(etype, state_key, v)
for (etype, state_key), v in state_dict.iteritems()
]
def _decode_state_dict(input):
"""Decodes a state dict encoded using `_encode_state_dict` above
"""
if input is None:
return None
return frozendict({(etype, state_key,): v for etype, state_key, v in input})

View file

@ -28,6 +28,7 @@ from synapse.util.logcontext import preserve_fn
from synapse.util.metrics import measure_func from synapse.util.metrics import measure_func
from synapse.util.frozenutils import unfreeze from synapse.util.frozenutils import unfreeze
from synapse.visibility import filter_events_for_client from synapse.visibility import filter_events_for_client
from synapse.replication.http.send_event import send_event_to_master
from ._base import BaseHandler from ._base import BaseHandler
@ -312,6 +313,9 @@ class EventCreationHandler(object):
self.server_name = hs.hostname self.server_name = hs.hostname
self.ratelimiter = hs.get_ratelimiter() self.ratelimiter = hs.get_ratelimiter()
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
self.config = hs.config
self.http_client = hs.get_simple_http_client()
# This is only used to get at ratelimit function, and maybe_kick_guest_users # This is only used to get at ratelimit function, and maybe_kick_guest_users
self.base_handler = BaseHandler(hs) self.base_handler = BaseHandler(hs)
@ -559,6 +563,18 @@ class EventCreationHandler(object):
): ):
# We now need to go and hit out to wherever we need to hit out to. # We now need to go and hit out to wherever we need to hit out to.
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
yield send_event_to_master(
self.http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
requester=requester,
event=event,
context=context,
)
return
if ratelimit: if ratelimit:
yield self.base_handler.ratelimit(requester) yield self.base_handler.ratelimit(requester)

View file

@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import send_event
from synapse.http.server import JsonResource
REPLICATION_PREFIX = "/_synapse/replication"
class ReplicationRestResource(JsonResource):
def __init__(self, hs):
JsonResource.__init__(self, hs, canonical_json=False)
self.register_servlets(hs)
def register_servlets(self, hs):
send_event.register_servlets(hs, self)

View file

@ -0,0 +1,108 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.util.metrics import Measure
from synapse.types import Requester
import logging
import re
logger = logging.getLogger(__name__)
def send_event_to_master(client, host, port, requester, event, context):
"""Send event to be handled on the master
Args:
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
requester (Requester)
event (FrozenEvent)
context (EventContext)
"""
uri = "http://%s:%s/_synapse/replication/send_event" % (host, port,)
payload = {
"event": event.get_pdu_json(),
"internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason,
"context": context.serialize(),
"requester": requester.serialize(),
}
return client.post_json_get_json(uri, payload)
class ReplicationSendEventRestServlet(RestServlet):
"""Handles events newly created on workers, including persisting and
notifying.
The API looks like:
POST /_synapse/replication/send_event
{
"event": { .. serialized event .. },
"internal_metadata": { .. serialized internal_metadata .. },
"rejected_reason": .., // The event.rejected_reason field
"context": { .. serialized event context .. },
"requester": { .. serialized requester .. },
}
"""
PATTERNS = [re.compile("^/_synapse/replication/send_event$")]
def __init__(self, hs):
super(ReplicationSendEventRestServlet, self).__init__()
self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastore()
self.clock = hs.get_clock()
@defer.inlineCallbacks
def on_POST(self, request):
with Measure(self.clock, "repl_send_event_parse"):
content = parse_json_object_from_request(request)
event_dict = content["event"]
internal_metadata = content["internal_metadata"]
rejected_reason = content["rejected_reason"]
event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
requester = Requester.deserialize(self.store, content["requester"])
context = EventContext.deserialize(self.store, content["context"])
if requester.user:
request.authenticated_entity = requester.user.to_string()
logger.info(
"Got event to send with ID: %s into room: %s",
event.event_id, event.room_id,
)
yield self.event_creation_handler.handle_new_client_event(
requester, event, context,
)
defer.returnValue((200, {}))
def register_servlets(hs, http_server):
ReplicationSendEventRestServlet(hs).register(http_server)

View file

@ -99,6 +99,19 @@ class ApplicationServiceStore(SQLBaseStore):
return service return service
return None return None
def get_app_service_by_id(self, as_id):
"""Get the application service with the given appservice ID.
Args:
as_id (str): The application service ID.
Returns:
synapse.appservice.ApplicationService or None.
"""
for service in self.services_cache:
if service.id == as_id:
return service
return None
def get_app_service_rooms(self, service): def get_app_service_rooms(self, service):
"""Get a list of RoomsForUser for this application service. """Get a list of RoomsForUser for this application service.

View file

@ -19,20 +19,59 @@ from synapse.api.errors import SynapseError
from collections import namedtuple from collections import namedtuple
Requester = namedtuple("Requester", [ class Requester(namedtuple("Requester", [
"user", "access_token_id", "is_guest", "device_id", "app_service", "user", "access_token_id", "is_guest", "device_id", "app_service",
]) ])):
""" """
Represents the user making a request Represents the user making a request
Attributes: Attributes:
user (UserID): id of the user making the request user (UserID): id of the user making the request
access_token_id (int|None): *ID* of the access token used for this access_token_id (int|None): *ID* of the access token used for this
request, or None if it came via the appservice API or similar request, or None if it came via the appservice API or similar
is_guest (bool): True if the user making this request is a guest user is_guest (bool): True if the user making this request is a guest user
device_id (str|None): device_id which was set at authentication time device_id (str|None): device_id which was set at authentication time
app_service (ApplicationService|None): the AS requesting on behalf of the user app_service (ApplicationService|None): the AS requesting on behalf of the user
""" """
def serialize(self):
"""Converts self to a type that can be serialized as JSON, and then
deserialized by `deserialize`
Returns:
dict
"""
return {
"user_id": self.user.to_string(),
"access_token_id": self.access_token_id,
"is_guest": self.is_guest,
"device_id": self.device_id,
"app_server_id": self.app_service.id if self.app_service else None,
}
@staticmethod
def deserialize(store, input):
"""Converts a dict that was produced by `serialize` back into a
Requester.
Args:
store (DataStore): Used to convert AS ID to AS object
input (dict): A dict produced by `serialize`
Returns:
Requester
"""
appservice = None
if input["app_server_id"]:
appservice = store.get_app_service_by_id(input["app_server_id"])
return Requester(
user=UserID.from_string(input["user_id"]),
access_token_id=input["access_token_id"],
is_guest=input["is_guest"],
device_id=input["device_id"],
app_service=appservice,
)
def create_requester(user_id, access_token_id=None, is_guest=False, def create_requester(user_id, access_token_id=None, is_guest=False,