forked from MirrorHub/synapse
Merge pull request #1635 from matrix-org/erikj/split_out_fed_txn
Split out federation transaction sending to a worker
This commit is contained in:
commit
302fbd218d
38 changed files with 1135 additions and 223 deletions
|
@ -22,3 +22,4 @@ export SYNAPSE_CACHE_FACTOR=1
|
|||
--federation-reader \
|
||||
--client-reader \
|
||||
--appservice \
|
||||
--federation-sender \
|
||||
|
|
331
synapse/app/federation_sender.py
Normal file
331
synapse/app/federation_sender.py
Normal file
|
@ -0,0 +1,331 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2016 OpenMarket Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import synapse
|
||||
|
||||
from synapse.server import HomeServer
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.crypto import context_factory
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.federation import send_queue
|
||||
from synapse.federation.units import Edu
|
||||
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
|
||||
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.slave.storage.transactions import TransactionStore
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.storage.presence import UserPresenceState
|
||||
from synapse.util.async import sleep
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.rlimit import change_resource_limit
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
from synapse import events
|
||||
|
||||
from twisted.internet import reactor, defer
|
||||
from twisted.web.resource import Resource
|
||||
|
||||
from daemonize import Daemonize
|
||||
|
||||
import sys
|
||||
import logging
|
||||
import gc
|
||||
import ujson as json
|
||||
|
||||
logger = logging.getLogger("synapse.app.appservice")
|
||||
|
||||
|
||||
class FederationSenderSlaveStore(
|
||||
SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
|
||||
SlavedRegistrationStore,
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
class FederationSenderServer(HomeServer):
|
||||
def get_db_conn(self, run_new_connection=True):
|
||||
# Any param beginning with cp_ is a parameter for adbapi, and should
|
||||
# not be passed to the database engine.
|
||||
db_params = {
|
||||
k: v for k, v in self.db_config.get("args", {}).items()
|
||||
if not k.startswith("cp_")
|
||||
}
|
||||
db_conn = self.database_engine.module.connect(**db_params)
|
||||
|
||||
if run_new_connection:
|
||||
self.database_engine.on_new_connection(db_conn)
|
||||
return db_conn
|
||||
|
||||
def setup(self):
|
||||
logger.info("Setting up.")
|
||||
self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self)
|
||||
logger.info("Finished setting up.")
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
bind_address = listener_config.get("bind_address", "")
|
||||
site_tag = listener_config.get("tag", port)
|
||||
resources = {}
|
||||
for res in listener_config["resources"]:
|
||||
for name in res["names"]:
|
||||
if name == "metrics":
|
||||
resources[METRICS_PREFIX] = MetricsResource(self)
|
||||
|
||||
root_resource = create_resource_tree(resources, Resource())
|
||||
reactor.listenTCP(
|
||||
port,
|
||||
SynapseSite(
|
||||
"synapse.access.http.%s" % (site_tag,),
|
||||
site_tag,
|
||||
listener_config,
|
||||
root_resource,
|
||||
),
|
||||
interface=bind_address
|
||||
)
|
||||
logger.info("Synapse federation_sender now listening on port %d", port)
|
||||
|
||||
def start_listening(self, listeners):
|
||||
for listener in listeners:
|
||||
if listener["type"] == "http":
|
||||
self._listen_http(listener)
|
||||
elif listener["type"] == "manhole":
|
||||
reactor.listenTCP(
|
||||
listener["port"],
|
||||
manhole(
|
||||
username="matrix",
|
||||
password="rabbithole",
|
||||
globals={"hs": self},
|
||||
),
|
||||
interface=listener.get("bind_address", '127.0.0.1')
|
||||
)
|
||||
else:
|
||||
logger.warn("Unrecognized listener type: %s", listener["type"])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def replicate(self):
|
||||
http_client = self.get_simple_http_client()
|
||||
store = self.get_datastore()
|
||||
replication_url = self.config.worker_replication_url
|
||||
send_handler = FederationSenderHandler(self)
|
||||
|
||||
send_handler.on_start()
|
||||
|
||||
while True:
|
||||
try:
|
||||
args = store.stream_positions()
|
||||
args.update((yield send_handler.stream_positions()))
|
||||
args["timeout"] = 30000
|
||||
result = yield http_client.get_json(replication_url, args=args)
|
||||
yield store.process_replication(result)
|
||||
yield send_handler.process_replication(result)
|
||||
except:
|
||||
logger.exception("Error replicating from %r", replication_url)
|
||||
yield sleep(30)
|
||||
|
||||
|
||||
def start(config_options):
|
||||
try:
|
||||
config = HomeServerConfig.load_config(
|
||||
"Synapse federation sender", config_options
|
||||
)
|
||||
except ConfigError as e:
|
||||
sys.stderr.write("\n" + e.message + "\n")
|
||||
sys.exit(1)
|
||||
|
||||
assert config.worker_app == "synapse.app.federation_sender"
|
||||
|
||||
setup_logging(config.worker_log_config, config.worker_log_file)
|
||||
|
||||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
|
||||
if config.send_federation:
|
||||
sys.stderr.write(
|
||||
"\nThe send_federation must be disabled in the main synapse process"
|
||||
"\nbefore they can be run in a separate worker."
|
||||
"\nPlease add ``send_federation: false`` to the main config"
|
||||
"\n"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
# Force the pushers to start since they will be disabled in the main config
|
||||
config.send_federation = True
|
||||
|
||||
tls_server_context_factory = context_factory.ServerContextFactory(config)
|
||||
|
||||
ps = FederationSenderServer(
|
||||
config.server_name,
|
||||
db_config=config.database_config,
|
||||
tls_server_context_factory=tls_server_context_factory,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
database_engine=database_engine,
|
||||
)
|
||||
|
||||
ps.setup()
|
||||
ps.start_listening(config.worker_listeners)
|
||||
|
||||
def run():
|
||||
with LoggingContext("run"):
|
||||
logger.info("Running")
|
||||
change_resource_limit(config.soft_file_limit)
|
||||
if config.gc_thresholds:
|
||||
gc.set_threshold(*config.gc_thresholds)
|
||||
reactor.run()
|
||||
|
||||
def start():
|
||||
ps.replicate()
|
||||
ps.get_datastore().start_profiling()
|
||||
ps.get_state_handler().start_caching()
|
||||
|
||||
reactor.callWhenRunning(start)
|
||||
|
||||
if config.worker_daemonize:
|
||||
daemon = Daemonize(
|
||||
app="synapse-federation-sender",
|
||||
pid=config.worker_pid_file,
|
||||
action=run,
|
||||
auto_close_fds=False,
|
||||
verbose=True,
|
||||
logger=logger,
|
||||
)
|
||||
daemon.start()
|
||||
else:
|
||||
run()
|
||||
|
||||
|
||||
class FederationSenderHandler(object):
|
||||
"""Processes the replication stream and forwards the appropriate entries
|
||||
to the federation sender.
|
||||
"""
|
||||
def __init__(self, hs):
|
||||
self.store = hs.get_datastore()
|
||||
self.federation_sender = hs.get_federation_sender()
|
||||
|
||||
self._room_serials = {}
|
||||
self._room_typing = {}
|
||||
|
||||
def on_start(self):
|
||||
# There may be some events that are persisted but haven't been sent,
|
||||
# so send them now.
|
||||
self.federation_sender.notify_new_events(
|
||||
self.store.get_room_max_stream_ordering()
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def stream_positions(self):
|
||||
stream_id = yield self.store.get_federation_out_pos("federation")
|
||||
defer.returnValue({
|
||||
"federation": stream_id,
|
||||
|
||||
# Ack stuff we've "processed", this should only be called from
|
||||
# one process.
|
||||
"federation_ack": stream_id,
|
||||
})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def process_replication(self, result):
|
||||
# The federation stream contains things that we want to send out, e.g.
|
||||
# presence, typing, etc.
|
||||
fed_stream = result.get("federation")
|
||||
if fed_stream:
|
||||
latest_id = int(fed_stream["position"])
|
||||
|
||||
# The federation stream containis a bunch of different types of
|
||||
# rows that need to be handled differently. We parse the rows, put
|
||||
# them into the appropriate collection and then send them off.
|
||||
presence_to_send = {}
|
||||
keyed_edus = {}
|
||||
edus = {}
|
||||
failures = {}
|
||||
device_destinations = set()
|
||||
|
||||
# Parse the rows in the stream
|
||||
for row in fed_stream["rows"]:
|
||||
position, typ, content_js = row
|
||||
content = json.loads(content_js)
|
||||
|
||||
if typ == send_queue.PRESENCE_TYPE:
|
||||
destination = content["destination"]
|
||||
state = UserPresenceState.from_dict(content["state"])
|
||||
|
||||
presence_to_send.setdefault(destination, []).append(state)
|
||||
elif typ == send_queue.KEYED_EDU_TYPE:
|
||||
key = content["key"]
|
||||
edu = Edu(**content["edu"])
|
||||
|
||||
keyed_edus.setdefault(
|
||||
edu.destination, {}
|
||||
)[(edu.destination, tuple(key))] = edu
|
||||
elif typ == send_queue.EDU_TYPE:
|
||||
edu = Edu(**content)
|
||||
|
||||
edus.setdefault(edu.destination, []).append(edu)
|
||||
elif typ == send_queue.FAILURE_TYPE:
|
||||
destination = content["destination"]
|
||||
failure = content["failure"]
|
||||
|
||||
failures.setdefault(destination, []).append(failure)
|
||||
elif typ == send_queue.DEVICE_MESSAGE_TYPE:
|
||||
device_destinations.add(content["destination"])
|
||||
else:
|
||||
raise Exception("Unrecognised federation type: %r", typ)
|
||||
|
||||
# We've finished collecting, send everything off
|
||||
for destination, states in presence_to_send.items():
|
||||
self.federation_sender.send_presence(destination, states)
|
||||
|
||||
for destination, edu_map in keyed_edus.items():
|
||||
for key, edu in edu_map.items():
|
||||
self.federation_sender.send_edu(
|
||||
edu.destination, edu.edu_type, edu.content, key=key,
|
||||
)
|
||||
|
||||
for destination, edu_list in edus.items():
|
||||
for edu in edu_list:
|
||||
self.federation_sender.send_edu(
|
||||
edu.destination, edu.edu_type, edu.content, key=None,
|
||||
)
|
||||
|
||||
for destination, failure_list in failures.items():
|
||||
for failure in failure_list:
|
||||
self.federation_sender.send_failure(destination, failure)
|
||||
|
||||
for destination in device_destinations:
|
||||
self.federation_sender.send_device_messages(destination)
|
||||
|
||||
# Record where we are in the stream.
|
||||
yield self.store.update_federation_out_pos(
|
||||
"federation", latest_id
|
||||
)
|
||||
|
||||
# We also need to poke the federation sender when new events happen
|
||||
event_stream = result.get("events")
|
||||
if event_stream:
|
||||
latest_pos = event_stream["position"]
|
||||
self.federation_sender.notify_new_events(latest_pos)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
with LoggingContext("main"):
|
||||
start(sys.argv[1:])
|
|
@ -30,6 +30,11 @@ class ServerConfig(Config):
|
|||
self.use_frozen_dicts = config.get("use_frozen_dicts", False)
|
||||
self.public_baseurl = config.get("public_baseurl")
|
||||
|
||||
# Whether to send federation traffic out in this process. This only
|
||||
# applies to some federation traffic, and so shouldn't be used to
|
||||
# "disable" federation
|
||||
self.send_federation = config.get("send_federation", True)
|
||||
|
||||
if self.public_baseurl is not None:
|
||||
if self.public_baseurl[-1] != '/':
|
||||
self.public_baseurl += '/'
|
||||
|
|
|
@ -17,10 +17,9 @@
|
|||
"""
|
||||
|
||||
from .replication import ReplicationLayer
|
||||
from .transport.client import TransportLayerClient
|
||||
|
||||
|
||||
def initialize_http_replication(homeserver):
|
||||
transport = TransportLayerClient(homeserver)
|
||||
def initialize_http_replication(hs):
|
||||
transport = hs.get_federation_transport_client()
|
||||
|
||||
return ReplicationLayer(homeserver, transport)
|
||||
return ReplicationLayer(hs, transport)
|
||||
|
|
|
@ -18,7 +18,6 @@ from twisted.internet import defer
|
|||
|
||||
from .federation_base import FederationBase
|
||||
from synapse.api.constants import Membership
|
||||
from .units import Edu
|
||||
|
||||
from synapse.api.errors import (
|
||||
CodeMessageException, HttpResponseException, SynapseError,
|
||||
|
@ -45,10 +44,6 @@ logger = logging.getLogger(__name__)
|
|||
# synapse.federation.federation_client is a silly name
|
||||
metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
|
||||
|
||||
sent_pdus_destination_dist = metrics.register_distribution("sent_pdu_destinations")
|
||||
|
||||
sent_edus_counter = metrics.register_counter("sent_edus")
|
||||
|
||||
sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
|
||||
|
||||
|
||||
|
@ -92,63 +87,6 @@ class FederationClient(FederationBase):
|
|||
|
||||
self._get_pdu_cache.start()
|
||||
|
||||
@log_function
|
||||
def send_pdu(self, pdu, destinations):
|
||||
"""Informs the replication layer about a new PDU generated within the
|
||||
home server that should be transmitted to others.
|
||||
|
||||
TODO: Figure out when we should actually resolve the deferred.
|
||||
|
||||
Args:
|
||||
pdu (Pdu): The new Pdu.
|
||||
|
||||
Returns:
|
||||
Deferred: Completes when we have successfully processed the PDU
|
||||
and replicated it to any interested remote home servers.
|
||||
"""
|
||||
order = self._order
|
||||
self._order += 1
|
||||
|
||||
sent_pdus_destination_dist.inc_by(len(destinations))
|
||||
|
||||
logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id)
|
||||
|
||||
# TODO, add errback, etc.
|
||||
self._transaction_queue.enqueue_pdu(pdu, destinations, order)
|
||||
|
||||
logger.debug(
|
||||
"[%s] transaction_layer.enqueue_pdu... done",
|
||||
pdu.event_id
|
||||
)
|
||||
|
||||
def send_presence(self, destination, states):
|
||||
if destination != self.server_name:
|
||||
self._transaction_queue.enqueue_presence(destination, states)
|
||||
|
||||
@log_function
|
||||
def send_edu(self, destination, edu_type, content, key=None):
|
||||
edu = Edu(
|
||||
origin=self.server_name,
|
||||
destination=destination,
|
||||
edu_type=edu_type,
|
||||
content=content,
|
||||
)
|
||||
|
||||
sent_edus_counter.inc()
|
||||
|
||||
self._transaction_queue.enqueue_edu(edu, key=key)
|
||||
|
||||
@log_function
|
||||
def send_device_messages(self, destination):
|
||||
"""Sends the device messages in the local database to the remote
|
||||
destination"""
|
||||
self._transaction_queue.enqueue_device_messages(destination)
|
||||
|
||||
@log_function
|
||||
def send_failure(self, failure, destination):
|
||||
self._transaction_queue.enqueue_failure(failure, destination)
|
||||
return defer.succeed(None)
|
||||
|
||||
@log_function
|
||||
def make_query(self, destination, query_type, args,
|
||||
retry_on_dns_fail=False):
|
||||
|
|
|
@ -20,8 +20,6 @@ a given transport.
|
|||
from .federation_client import FederationClient
|
||||
from .federation_server import FederationServer
|
||||
|
||||
from .transaction_queue import TransactionQueue
|
||||
|
||||
from .persistence import TransactionActions
|
||||
|
||||
import logging
|
||||
|
@ -66,9 +64,6 @@ class ReplicationLayer(FederationClient, FederationServer):
|
|||
self._clock = hs.get_clock()
|
||||
|
||||
self.transaction_actions = TransactionActions(self.store)
|
||||
self._transaction_queue = TransactionQueue(hs, transport_layer)
|
||||
|
||||
self._order = 0
|
||||
|
||||
self.hs = hs
|
||||
|
||||
|
|
298
synapse/federation/send_queue.py
Normal file
298
synapse/federation/send_queue.py
Normal file
|
@ -0,0 +1,298 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""A federation sender that forwards things to be sent across replication to
|
||||
a worker process.
|
||||
|
||||
It assumes there is a single worker process feeding off of it.
|
||||
|
||||
Each row in the replication stream consists of a type and some json, where the
|
||||
types indicate whether they are presence, or edus, etc.
|
||||
|
||||
Ephemeral or non-event data are queued up in-memory. When the worker requests
|
||||
updates since a particular point, all in-memory data since before that point is
|
||||
dropped. We also expire things in the queue after 5 minutes, to ensure that a
|
||||
dead worker doesn't cause the queues to grow limitlessly.
|
||||
|
||||
Events are replicated via a separate events stream.
|
||||
"""
|
||||
|
||||
from .units import Edu
|
||||
|
||||
from synapse.util.metrics import Measure
|
||||
import synapse.metrics
|
||||
|
||||
from blist import sorteddict
|
||||
import ujson
|
||||
|
||||
|
||||
metrics = synapse.metrics.get_metrics_for(__name__)
|
||||
|
||||
|
||||
PRESENCE_TYPE = "p"
|
||||
KEYED_EDU_TYPE = "k"
|
||||
EDU_TYPE = "e"
|
||||
FAILURE_TYPE = "f"
|
||||
DEVICE_MESSAGE_TYPE = "d"
|
||||
|
||||
|
||||
class FederationRemoteSendQueue(object):
|
||||
"""A drop in replacement for TransactionQueue"""
|
||||
|
||||
def __init__(self, hs):
|
||||
self.server_name = hs.hostname
|
||||
self.clock = hs.get_clock()
|
||||
|
||||
self.presence_map = {}
|
||||
self.presence_changed = sorteddict()
|
||||
|
||||
self.keyed_edu = {}
|
||||
self.keyed_edu_changed = sorteddict()
|
||||
|
||||
self.edus = sorteddict()
|
||||
|
||||
self.failures = sorteddict()
|
||||
|
||||
self.device_messages = sorteddict()
|
||||
|
||||
self.pos = 1
|
||||
self.pos_time = sorteddict()
|
||||
|
||||
# EVERYTHING IS SAD. In particular, python only makes new scopes when
|
||||
# we make a new function, so we need to make a new function so the inner
|
||||
# lambda binds to the queue rather than to the name of the queue which
|
||||
# changes. ARGH.
|
||||
def register(name, queue):
|
||||
metrics.register_callback(
|
||||
queue_name + "_size",
|
||||
lambda: len(queue),
|
||||
)
|
||||
|
||||
for queue_name in [
|
||||
"presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
|
||||
"edus", "failures", "device_messages", "pos_time",
|
||||
]:
|
||||
register(queue_name, getattr(self, queue_name))
|
||||
|
||||
self.clock.looping_call(self._clear_queue, 30 * 1000)
|
||||
|
||||
def _next_pos(self):
|
||||
pos = self.pos
|
||||
self.pos += 1
|
||||
self.pos_time[self.clock.time_msec()] = pos
|
||||
return pos
|
||||
|
||||
def _clear_queue(self):
|
||||
"""Clear the queues for anything older than N minutes"""
|
||||
|
||||
FIVE_MINUTES_AGO = 5 * 60 * 1000
|
||||
now = self.clock.time_msec()
|
||||
|
||||
keys = self.pos_time.keys()
|
||||
time = keys.bisect_left(now - FIVE_MINUTES_AGO)
|
||||
if not keys[:time]:
|
||||
return
|
||||
|
||||
position_to_delete = max(keys[:time])
|
||||
for key in keys[:time]:
|
||||
del self.pos_time[key]
|
||||
|
||||
self._clear_queue_before_pos(position_to_delete)
|
||||
|
||||
def _clear_queue_before_pos(self, position_to_delete):
|
||||
"""Clear all the queues from before a given position"""
|
||||
with Measure(self.clock, "send_queue._clear"):
|
||||
# Delete things out of presence maps
|
||||
keys = self.presence_changed.keys()
|
||||
i = keys.bisect_left(position_to_delete)
|
||||
for key in keys[:i]:
|
||||
del self.presence_changed[key]
|
||||
|
||||
user_ids = set(
|
||||
user_id for uids in self.presence_changed.values() for _, user_id in uids
|
||||
)
|
||||
|
||||
to_del = [
|
||||
user_id for user_id in self.presence_map if user_id not in user_ids
|
||||
]
|
||||
for user_id in to_del:
|
||||
del self.presence_map[user_id]
|
||||
|
||||
# Delete things out of keyed edus
|
||||
keys = self.keyed_edu_changed.keys()
|
||||
i = keys.bisect_left(position_to_delete)
|
||||
for key in keys[:i]:
|
||||
del self.keyed_edu_changed[key]
|
||||
|
||||
live_keys = set()
|
||||
for edu_key in self.keyed_edu_changed.values():
|
||||
live_keys.add(edu_key)
|
||||
|
||||
to_del = [edu_key for edu_key in self.keyed_edu if edu_key not in live_keys]
|
||||
for edu_key in to_del:
|
||||
del self.keyed_edu[edu_key]
|
||||
|
||||
# Delete things out of edu map
|
||||
keys = self.edus.keys()
|
||||
i = keys.bisect_left(position_to_delete)
|
||||
for key in keys[:i]:
|
||||
del self.edus[key]
|
||||
|
||||
# Delete things out of failure map
|
||||
keys = self.failures.keys()
|
||||
i = keys.bisect_left(position_to_delete)
|
||||
for key in keys[:i]:
|
||||
del self.failures[key]
|
||||
|
||||
# Delete things out of device map
|
||||
keys = self.device_messages.keys()
|
||||
i = keys.bisect_left(position_to_delete)
|
||||
for key in keys[:i]:
|
||||
del self.device_messages[key]
|
||||
|
||||
def notify_new_events(self, current_id):
|
||||
"""As per TransactionQueue"""
|
||||
# We don't need to replicate this as it gets sent down a different
|
||||
# stream.
|
||||
pass
|
||||
|
||||
def send_edu(self, destination, edu_type, content, key=None):
|
||||
"""As per TransactionQueue"""
|
||||
pos = self._next_pos()
|
||||
|
||||
edu = Edu(
|
||||
origin=self.server_name,
|
||||
destination=destination,
|
||||
edu_type=edu_type,
|
||||
content=content,
|
||||
)
|
||||
|
||||
if key:
|
||||
assert isinstance(key, tuple)
|
||||
self.keyed_edu[(destination, key)] = edu
|
||||
self.keyed_edu_changed[pos] = (destination, key)
|
||||
else:
|
||||
self.edus[pos] = edu
|
||||
|
||||
def send_presence(self, destination, states):
|
||||
"""As per TransactionQueue"""
|
||||
pos = self._next_pos()
|
||||
|
||||
self.presence_map.update({
|
||||
state.user_id: state
|
||||
for state in states
|
||||
})
|
||||
|
||||
self.presence_changed[pos] = [
|
||||
(destination, state.user_id) for state in states
|
||||
]
|
||||
|
||||
def send_failure(self, failure, destination):
|
||||
"""As per TransactionQueue"""
|
||||
pos = self._next_pos()
|
||||
|
||||
self.failures[pos] = (destination, str(failure))
|
||||
|
||||
def send_device_messages(self, destination):
|
||||
"""As per TransactionQueue"""
|
||||
pos = self._next_pos()
|
||||
self.device_messages[pos] = destination
|
||||
|
||||
def get_current_token(self):
|
||||
return self.pos - 1
|
||||
|
||||
def get_replication_rows(self, token, limit, federation_ack=None):
|
||||
"""
|
||||
Args:
|
||||
token (int)
|
||||
limit (int)
|
||||
federation_ack (int): Optional. The position where the worker is
|
||||
explicitly acknowledged it has handled. Allows us to drop
|
||||
data from before that point
|
||||
"""
|
||||
# TODO: Handle limit.
|
||||
|
||||
# To handle restarts where we wrap around
|
||||
if token > self.pos:
|
||||
token = -1
|
||||
|
||||
rows = []
|
||||
|
||||
# There should be only one reader, so lets delete everything its
|
||||
# acknowledged its seen.
|
||||
if federation_ack:
|
||||
self._clear_queue_before_pos(federation_ack)
|
||||
|
||||
# Fetch changed presence
|
||||
keys = self.presence_changed.keys()
|
||||
i = keys.bisect_right(token)
|
||||
dest_user_ids = set(
|
||||
(pos, dest_user_id)
|
||||
for pos in keys[i:]
|
||||
for dest_user_id in self.presence_changed[pos]
|
||||
)
|
||||
|
||||
for (key, (dest, user_id)) in dest_user_ids:
|
||||
rows.append((key, PRESENCE_TYPE, ujson.dumps({
|
||||
"destination": dest,
|
||||
"state": self.presence_map[user_id].as_dict(),
|
||||
})))
|
||||
|
||||
# Fetch changes keyed edus
|
||||
keys = self.keyed_edu_changed.keys()
|
||||
i = keys.bisect_right(token)
|
||||
keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:])
|
||||
|
||||
for (pos, (destination, edu_key)) in keyed_edus:
|
||||
rows.append(
|
||||
(pos, KEYED_EDU_TYPE, ujson.dumps({
|
||||
"key": edu_key,
|
||||
"edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(),
|
||||
}))
|
||||
)
|
||||
|
||||
# Fetch changed edus
|
||||
keys = self.edus.keys()
|
||||
i = keys.bisect_right(token)
|
||||
edus = set((k, self.edus[k]) for k in keys[i:])
|
||||
|
||||
for (pos, edu) in edus:
|
||||
rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict())))
|
||||
|
||||
# Fetch changed failures
|
||||
keys = self.failures.keys()
|
||||
i = keys.bisect_right(token)
|
||||
failures = set((k, self.failures[k]) for k in keys[i:])
|
||||
|
||||
for (pos, (destination, failure)) in failures:
|
||||
rows.append((pos, FAILURE_TYPE, ujson.dumps({
|
||||
"destination": destination,
|
||||
"failure": failure,
|
||||
})))
|
||||
|
||||
# Fetch changed device messages
|
||||
keys = self.device_messages.keys()
|
||||
i = keys.bisect_right(token)
|
||||
device_messages = set((k, self.device_messages[k]) for k in keys[i:])
|
||||
|
||||
for (pos, destination) in device_messages:
|
||||
rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({
|
||||
"destination": destination,
|
||||
})))
|
||||
|
||||
# Sort rows based on pos
|
||||
rows.sort()
|
||||
|
||||
return rows
|
|
@ -19,6 +19,7 @@ from twisted.internet import defer
|
|||
from .persistence import TransactionActions
|
||||
from .units import Transaction, Edu
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import HttpResponseException
|
||||
from synapse.util.async import run_on_reactor
|
||||
from synapse.util.logcontext import preserve_context_over_fn
|
||||
|
@ -26,6 +27,7 @@ from synapse.util.retryutils import (
|
|||
get_retry_limiter, NotRetryingDestination,
|
||||
)
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.types import get_domain_from_id
|
||||
from synapse.handlers.presence import format_user_presence_state
|
||||
import synapse.metrics
|
||||
|
||||
|
@ -36,6 +38,12 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
metrics = synapse.metrics.get_metrics_for(__name__)
|
||||
|
||||
client_metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
|
||||
sent_pdus_destination_dist = client_metrics.register_distribution(
|
||||
"sent_pdu_destinations"
|
||||
)
|
||||
sent_edus_counter = client_metrics.register_counter("sent_edus")
|
||||
|
||||
|
||||
class TransactionQueue(object):
|
||||
"""This class makes sure we only have one transaction in flight at
|
||||
|
@ -44,13 +52,14 @@ class TransactionQueue(object):
|
|||
It batches pending PDUs into single transactions.
|
||||
"""
|
||||
|
||||
def __init__(self, hs, transport_layer):
|
||||
def __init__(self, hs):
|
||||
self.server_name = hs.hostname
|
||||
|
||||
self.store = hs.get_datastore()
|
||||
self.state = hs.get_state_handler()
|
||||
self.transaction_actions = TransactionActions(self.store)
|
||||
|
||||
self.transport_layer = transport_layer
|
||||
self.transport_layer = hs.get_federation_transport_client()
|
||||
|
||||
self.clock = hs.get_clock()
|
||||
|
||||
|
@ -95,6 +104,11 @@ class TransactionQueue(object):
|
|||
# HACK to get unique tx id
|
||||
self._next_txn_id = int(self.clock.time_msec())
|
||||
|
||||
self._order = 1
|
||||
|
||||
self._is_processing = False
|
||||
self._last_poked_id = -1
|
||||
|
||||
def can_send_to(self, destination):
|
||||
"""Can we send messages to the given server?
|
||||
|
||||
|
@ -115,11 +129,61 @@ class TransactionQueue(object):
|
|||
else:
|
||||
return not destination.startswith("localhost")
|
||||
|
||||
def enqueue_pdu(self, pdu, destinations, order):
|
||||
@defer.inlineCallbacks
|
||||
def notify_new_events(self, current_id):
|
||||
"""This gets called when we have some new events we might want to
|
||||
send out to other servers.
|
||||
"""
|
||||
self._last_poked_id = max(current_id, self._last_poked_id)
|
||||
|
||||
if self._is_processing:
|
||||
return
|
||||
|
||||
try:
|
||||
self._is_processing = True
|
||||
while True:
|
||||
last_token = yield self.store.get_federation_out_pos("events")
|
||||
next_token, events = yield self.store.get_all_new_events_stream(
|
||||
last_token, self._last_poked_id, limit=20,
|
||||
)
|
||||
|
||||
logger.debug("Handling %s -> %s", last_token, next_token)
|
||||
|
||||
if not events and next_token >= self._last_poked_id:
|
||||
break
|
||||
|
||||
for event in events:
|
||||
users_in_room = yield self.state.get_current_user_in_room(
|
||||
event.room_id, latest_event_ids=[event.event_id],
|
||||
)
|
||||
|
||||
destinations = set(
|
||||
get_domain_from_id(user_id) for user_id in users_in_room
|
||||
)
|
||||
|
||||
if event.type == EventTypes.Member:
|
||||
if event.content["membership"] == Membership.JOIN:
|
||||
destinations.add(get_domain_from_id(event.state_key))
|
||||
|
||||
logger.debug("Sending %s to %r", event, destinations)
|
||||
|
||||
self._send_pdu(event, destinations)
|
||||
|
||||
yield self.store.update_federation_out_pos(
|
||||
"events", next_token
|
||||
)
|
||||
|
||||
finally:
|
||||
self._is_processing = False
|
||||
|
||||
def _send_pdu(self, pdu, destinations):
|
||||
# We loop through all destinations to see whether we already have
|
||||
# a transaction in progress. If we do, stick it in the pending_pdus
|
||||
# table and we'll get back to it later.
|
||||
|
||||
order = self._order
|
||||
self._order += 1
|
||||
|
||||
destinations = set(destinations)
|
||||
destinations = set(
|
||||
dest for dest in destinations if self.can_send_to(dest)
|
||||
|
@ -130,6 +194,8 @@ class TransactionQueue(object):
|
|||
if not destinations:
|
||||
return
|
||||
|
||||
sent_pdus_destination_dist.inc_by(len(destinations))
|
||||
|
||||
for destination in destinations:
|
||||
self.pending_pdus_by_dest.setdefault(destination, []).append(
|
||||
(pdu, order)
|
||||
|
@ -139,7 +205,10 @@ class TransactionQueue(object):
|
|||
self._attempt_new_transaction, destination
|
||||
)
|
||||
|
||||
def enqueue_presence(self, destination, states):
|
||||
def send_presence(self, destination, states):
|
||||
if not self.can_send_to(destination):
|
||||
return
|
||||
|
||||
self.pending_presence_by_dest.setdefault(destination, {}).update({
|
||||
state.user_id: state for state in states
|
||||
})
|
||||
|
@ -148,12 +217,19 @@ class TransactionQueue(object):
|
|||
self._attempt_new_transaction, destination
|
||||
)
|
||||
|
||||
def enqueue_edu(self, edu, key=None):
|
||||
destination = edu.destination
|
||||
def send_edu(self, destination, edu_type, content, key=None):
|
||||
edu = Edu(
|
||||
origin=self.server_name,
|
||||
destination=destination,
|
||||
edu_type=edu_type,
|
||||
content=content,
|
||||
)
|
||||
|
||||
if not self.can_send_to(destination):
|
||||
return
|
||||
|
||||
sent_edus_counter.inc()
|
||||
|
||||
if key:
|
||||
self.pending_edus_keyed_by_dest.setdefault(
|
||||
destination, {}
|
||||
|
@ -165,7 +241,7 @@ class TransactionQueue(object):
|
|||
self._attempt_new_transaction, destination
|
||||
)
|
||||
|
||||
def enqueue_failure(self, failure, destination):
|
||||
def send_failure(self, failure, destination):
|
||||
if destination == self.server_name or destination == "localhost":
|
||||
return
|
||||
|
||||
|
@ -180,7 +256,7 @@ class TransactionQueue(object):
|
|||
self._attempt_new_transaction, destination
|
||||
)
|
||||
|
||||
def enqueue_device_messages(self, destination):
|
||||
def send_device_messages(self, destination):
|
||||
if destination == self.server_name or destination == "localhost":
|
||||
return
|
||||
|
||||
|
@ -191,6 +267,9 @@ class TransactionQueue(object):
|
|||
self._attempt_new_transaction, destination
|
||||
)
|
||||
|
||||
def get_current_token(self):
|
||||
return 0
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _attempt_new_transaction(self, destination):
|
||||
# list of (pending_pdu, deferred, order)
|
||||
|
|
|
@ -24,7 +24,6 @@ from .profile import ProfileHandler
|
|||
from .directory import DirectoryHandler
|
||||
from .admin import AdminHandler
|
||||
from .identity import IdentityHandler
|
||||
from .receipts import ReceiptsHandler
|
||||
from .search import SearchHandler
|
||||
|
||||
|
||||
|
@ -56,7 +55,6 @@ class Handlers(object):
|
|||
self.profile_handler = ProfileHandler(hs)
|
||||
self.directory_handler = DirectoryHandler(hs)
|
||||
self.admin_handler = AdminHandler(hs)
|
||||
self.receipts_handler = ReceiptsHandler(hs)
|
||||
self.identity_handler = IdentityHandler(hs)
|
||||
self.search_handler = SearchHandler(hs)
|
||||
self.room_context_handler = RoomContextHandler(hs)
|
||||
|
|
|
@ -34,9 +34,9 @@ class DeviceMessageHandler(object):
|
|||
self.store = hs.get_datastore()
|
||||
self.notifier = hs.get_notifier()
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
self.federation = hs.get_replication_layer()
|
||||
self.federation = hs.get_federation_sender()
|
||||
|
||||
self.federation.register_edu_handler(
|
||||
hs.get_replication_layer().register_edu_handler(
|
||||
"m.direct_to_device", self.on_direct_to_device_edu
|
||||
)
|
||||
|
||||
|
|
|
@ -80,22 +80,6 @@ class FederationHandler(BaseHandler):
|
|||
# When joining a room we need to queue any events for that room up
|
||||
self.room_queues = {}
|
||||
|
||||
def handle_new_event(self, event, destinations):
|
||||
""" Takes in an event from the client to server side, that has already
|
||||
been authed and handled by the state module, and sends it to any
|
||||
remote home servers that may be interested.
|
||||
|
||||
Args:
|
||||
event: The event to send
|
||||
destinations: A list of destinations to send it to
|
||||
|
||||
Returns:
|
||||
Deferred: Resolved when it has successfully been queued for
|
||||
processing.
|
||||
"""
|
||||
|
||||
return self.replication_layer.send_pdu(event, destinations)
|
||||
|
||||
@log_function
|
||||
@defer.inlineCallbacks
|
||||
def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None):
|
||||
|
@ -830,25 +814,6 @@ class FederationHandler(BaseHandler):
|
|||
user = UserID.from_string(event.state_key)
|
||||
yield user_joined_room(self.distributor, user, event.room_id)
|
||||
|
||||
new_pdu = event
|
||||
|
||||
users_in_room = yield self.store.get_joined_users_from_context(event, context)
|
||||
|
||||
destinations = set(
|
||||
get_domain_from_id(user_id) for user_id in users_in_room
|
||||
if not self.hs.is_mine_id(user_id)
|
||||
)
|
||||
|
||||
destinations.discard(origin)
|
||||
|
||||
logger.debug(
|
||||
"on_send_join_request: Sending event: %s, signatures: %s",
|
||||
event.event_id,
|
||||
event.signatures,
|
||||
)
|
||||
|
||||
self.replication_layer.send_pdu(new_pdu, destinations)
|
||||
|
||||
state_ids = context.prev_state_ids.values()
|
||||
auth_chain = yield self.store.get_auth_chain(set(
|
||||
[event.event_id] + state_ids
|
||||
|
@ -1055,24 +1020,6 @@ class FederationHandler(BaseHandler):
|
|||
event, event_stream_id, max_stream_id, extra_users=extra_users
|
||||
)
|
||||
|
||||
new_pdu = event
|
||||
|
||||
users_in_room = yield self.store.get_joined_users_from_context(event, context)
|
||||
|
||||
destinations = set(
|
||||
get_domain_from_id(user_id) for user_id in users_in_room
|
||||
if not self.hs.is_mine_id(user_id)
|
||||
)
|
||||
destinations.discard(origin)
|
||||
|
||||
logger.debug(
|
||||
"on_send_leave_request: Sending event: %s, signatures: %s",
|
||||
event.event_id,
|
||||
event.signatures,
|
||||
)
|
||||
|
||||
self.replication_layer.send_pdu(new_pdu, destinations)
|
||||
|
||||
defer.returnValue(None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
|
@ -372,11 +372,12 @@ class InitialSyncHandler(BaseHandler):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def get_receipts():
|
||||
receipts_handler = self.hs.get_handlers().receipts_handler
|
||||
receipts = yield receipts_handler.get_receipts_for_room(
|
||||
receipts = yield self.store.get_linearized_receipts_for_room(
|
||||
room_id,
|
||||
now_token.receipt_key
|
||||
to_key=now_token.receipt_key,
|
||||
)
|
||||
if not receipts:
|
||||
receipts = []
|
||||
defer.returnValue(receipts)
|
||||
|
||||
presence, receipts, (messages, token) = yield defer.gatherResults(
|
||||
|
|
|
@ -22,7 +22,7 @@ from synapse.events.utils import serialize_event
|
|||
from synapse.events.validator import EventValidator
|
||||
from synapse.push.action_generator import ActionGenerator
|
||||
from synapse.types import (
|
||||
UserID, RoomAlias, RoomStreamToken, get_domain_from_id
|
||||
UserID, RoomAlias, RoomStreamToken,
|
||||
)
|
||||
from synapse.util.async import run_on_reactor, ReadWriteLock
|
||||
from synapse.util.logcontext import preserve_fn
|
||||
|
@ -599,13 +599,6 @@ class MessageHandler(BaseHandler):
|
|||
event_stream_id, max_stream_id
|
||||
)
|
||||
|
||||
users_in_room = yield self.store.get_joined_users_from_context(event, context)
|
||||
|
||||
destinations = [
|
||||
get_domain_from_id(user_id) for user_id in users_in_room
|
||||
if not self.hs.is_mine_id(user_id)
|
||||
]
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _notify():
|
||||
yield run_on_reactor()
|
||||
|
@ -618,7 +611,3 @@ class MessageHandler(BaseHandler):
|
|||
|
||||
# If invite, remove room_state from unsigned before sending.
|
||||
event.unsigned.pop("invite_room_state", None)
|
||||
|
||||
preserve_fn(federation_handler.handle_new_event)(
|
||||
event, destinations=destinations,
|
||||
)
|
||||
|
|
|
@ -91,28 +91,29 @@ class PresenceHandler(object):
|
|||
self.store = hs.get_datastore()
|
||||
self.wheel_timer = WheelTimer()
|
||||
self.notifier = hs.get_notifier()
|
||||
self.federation = hs.get_replication_layer()
|
||||
self.replication = hs.get_replication_layer()
|
||||
self.federation = hs.get_federation_sender()
|
||||
|
||||
self.state = hs.get_state_handler()
|
||||
|
||||
self.federation.register_edu_handler(
|
||||
self.replication.register_edu_handler(
|
||||
"m.presence", self.incoming_presence
|
||||
)
|
||||
self.federation.register_edu_handler(
|
||||
self.replication.register_edu_handler(
|
||||
"m.presence_invite",
|
||||
lambda origin, content: self.invite_presence(
|
||||
observed_user=UserID.from_string(content["observed_user"]),
|
||||
observer_user=UserID.from_string(content["observer_user"]),
|
||||
)
|
||||
)
|
||||
self.federation.register_edu_handler(
|
||||
self.replication.register_edu_handler(
|
||||
"m.presence_accept",
|
||||
lambda origin, content: self.accept_presence(
|
||||
observed_user=UserID.from_string(content["observed_user"]),
|
||||
observer_user=UserID.from_string(content["observer_user"]),
|
||||
)
|
||||
)
|
||||
self.federation.register_edu_handler(
|
||||
self.replication.register_edu_handler(
|
||||
"m.presence_deny",
|
||||
lambda origin, content: self.deny_presence(
|
||||
observed_user=UserID.from_string(content["observed_user"]),
|
||||
|
|
|
@ -33,8 +33,8 @@ class ReceiptsHandler(BaseHandler):
|
|||
self.server_name = hs.config.server_name
|
||||
self.store = hs.get_datastore()
|
||||
self.hs = hs
|
||||
self.federation = hs.get_replication_layer()
|
||||
self.federation.register_edu_handler(
|
||||
self.federation = hs.get_federation_sender()
|
||||
hs.get_replication_layer().register_edu_handler(
|
||||
"m.receipt", self._received_remote_receipt
|
||||
)
|
||||
self.clock = self.hs.get_clock()
|
||||
|
|
|
@ -55,9 +55,9 @@ class TypingHandler(object):
|
|||
self.clock = hs.get_clock()
|
||||
self.wheel_timer = WheelTimer(bucket_size=5000)
|
||||
|
||||
self.federation = hs.get_replication_layer()
|
||||
self.federation = hs.get_federation_sender()
|
||||
|
||||
self.federation.register_edu_handler("m.typing", self._recv_edu)
|
||||
hs.get_replication_layer().register_edu_handler("m.typing", self._recv_edu)
|
||||
|
||||
hs.get_distributor().observe("user_left_room", self.user_left_room)
|
||||
|
||||
|
|
|
@ -143,6 +143,12 @@ class Notifier(object):
|
|||
|
||||
self.clock = hs.get_clock()
|
||||
self.appservice_handler = hs.get_application_service_handler()
|
||||
|
||||
if hs.should_send_federation():
|
||||
self.federation_sender = hs.get_federation_sender()
|
||||
else:
|
||||
self.federation_sender = None
|
||||
|
||||
self.state_handler = hs.get_state_handler()
|
||||
|
||||
self.clock.looping_call(
|
||||
|
@ -220,6 +226,9 @@ class Notifier(object):
|
|||
# poke any interested application service.
|
||||
self.appservice_handler.notify_interested_services(room_stream_id)
|
||||
|
||||
if self.federation_sender:
|
||||
self.federation_sender.notify_new_events(room_stream_id)
|
||||
|
||||
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
|
||||
self._user_joined_room(event.state_key, event.room_id)
|
||||
|
||||
|
|
60
synapse/replication/expire_cache.py
Normal file
60
synapse/replication/expire_cache.py
Normal file
|
@ -0,0 +1,60 @@
|
|||
# Copyright 2016 OpenMarket Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.http.server import respond_with_json_bytes, request_handler
|
||||
from synapse.http.servlet import parse_json_object_from_request
|
||||
|
||||
from twisted.web.resource import Resource
|
||||
from twisted.web.server import NOT_DONE_YET
|
||||
|
||||
|
||||
class ExpireCacheResource(Resource):
|
||||
"""
|
||||
HTTP endpoint for expiring storage caches.
|
||||
|
||||
POST /_synapse/replication/expire_cache HTTP/1.1
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"invalidate": [
|
||||
{
|
||||
"name": "func_name",
|
||||
"keys": ["key1", "key2"]
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
|
||||
def __init__(self, hs):
|
||||
Resource.__init__(self) # Resource is old-style, so no super()
|
||||
|
||||
self.store = hs.get_datastore()
|
||||
self.version_string = hs.version_string
|
||||
self.clock = hs.get_clock()
|
||||
|
||||
def render_POST(self, request):
|
||||
self._async_render_POST(request)
|
||||
return NOT_DONE_YET
|
||||
|
||||
@request_handler()
|
||||
def _async_render_POST(self, request):
|
||||
content = parse_json_object_from_request(request)
|
||||
|
||||
for row in content["invalidate"]:
|
||||
name = row["name"]
|
||||
keys = tuple(row["keys"])
|
||||
|
||||
getattr(self.store, name).invalidate(keys)
|
||||
|
||||
respond_with_json_bytes(request, 200, "{}")
|
|
@ -17,6 +17,7 @@ from synapse.http.servlet import parse_integer, parse_string
|
|||
from synapse.http.server import request_handler, finish_request
|
||||
from synapse.replication.pusher_resource import PusherResource
|
||||
from synapse.replication.presence_resource import PresenceResource
|
||||
from synapse.replication.expire_cache import ExpireCacheResource
|
||||
from synapse.api.errors import SynapseError
|
||||
|
||||
from twisted.web.resource import Resource
|
||||
|
@ -44,6 +45,7 @@ STREAM_NAMES = (
|
|||
("caches",),
|
||||
("to_device",),
|
||||
("public_rooms",),
|
||||
("federation",),
|
||||
)
|
||||
|
||||
|
||||
|
@ -116,11 +118,14 @@ class ReplicationResource(Resource):
|
|||
self.sources = hs.get_event_sources()
|
||||
self.presence_handler = hs.get_presence_handler()
|
||||
self.typing_handler = hs.get_typing_handler()
|
||||
self.federation_sender = hs.get_federation_sender()
|
||||
self.notifier = hs.notifier
|
||||
self.clock = hs.get_clock()
|
||||
self.config = hs.get_config()
|
||||
|
||||
self.putChild("remove_pushers", PusherResource(hs))
|
||||
self.putChild("syncing_users", PresenceResource(hs))
|
||||
self.putChild("expire_cache", ExpireCacheResource(hs))
|
||||
|
||||
def render_GET(self, request):
|
||||
self._async_render_GET(request)
|
||||
|
@ -134,6 +139,7 @@ class ReplicationResource(Resource):
|
|||
pushers_token = self.store.get_pushers_stream_token()
|
||||
caches_token = self.store.get_cache_stream_token()
|
||||
public_rooms_token = self.store.get_current_public_room_stream_id()
|
||||
federation_token = self.federation_sender.get_current_token()
|
||||
|
||||
defer.returnValue(_ReplicationToken(
|
||||
room_stream_token,
|
||||
|
@ -148,6 +154,7 @@ class ReplicationResource(Resource):
|
|||
caches_token,
|
||||
int(stream_token.to_device_key),
|
||||
int(public_rooms_token),
|
||||
int(federation_token),
|
||||
))
|
||||
|
||||
@request_handler()
|
||||
|
@ -164,8 +171,13 @@ class ReplicationResource(Resource):
|
|||
}
|
||||
request_streams["streams"] = parse_string(request, "streams")
|
||||
|
||||
federation_ack = parse_integer(request, "federation_ack", None)
|
||||
|
||||
def replicate():
|
||||
return self.replicate(request_streams, limit)
|
||||
return self.replicate(
|
||||
request_streams, limit,
|
||||
federation_ack=federation_ack
|
||||
)
|
||||
|
||||
writer = yield self.notifier.wait_for_replication(replicate, timeout)
|
||||
result = writer.finish()
|
||||
|
@ -183,7 +195,7 @@ class ReplicationResource(Resource):
|
|||
finish_request(request)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def replicate(self, request_streams, limit):
|
||||
def replicate(self, request_streams, limit, federation_ack=None):
|
||||
writer = _Writer()
|
||||
current_token = yield self.current_replication_token()
|
||||
logger.debug("Replicating up to %r", current_token)
|
||||
|
@ -202,6 +214,7 @@ class ReplicationResource(Resource):
|
|||
yield self.caches(writer, current_token, limit, request_streams)
|
||||
yield self.to_device(writer, current_token, limit, request_streams)
|
||||
yield self.public_rooms(writer, current_token, limit, request_streams)
|
||||
self.federation(writer, current_token, limit, request_streams, federation_ack)
|
||||
self.streams(writer, current_token, request_streams)
|
||||
|
||||
logger.debug("Replicated %d rows", writer.total)
|
||||
|
@ -465,6 +478,23 @@ class ReplicationResource(Resource):
|
|||
"position", "room_id", "visibility"
|
||||
), position=upto_token)
|
||||
|
||||
def federation(self, writer, current_token, limit, request_streams, federation_ack):
|
||||
if self.config.send_federation:
|
||||
return
|
||||
|
||||
current_position = current_token.federation
|
||||
|
||||
federation = request_streams.get("federation")
|
||||
|
||||
if federation is not None and federation != current_position:
|
||||
federation_rows = self.federation_sender.get_replication_rows(
|
||||
federation, limit, federation_ack=federation_ack,
|
||||
)
|
||||
upto_token = _position_from_rows(federation_rows, current_position)
|
||||
writer.write_header_and_rows("federation", federation_rows, (
|
||||
"position", "type", "content",
|
||||
), position=upto_token)
|
||||
|
||||
|
||||
class _Writer(object):
|
||||
"""Writes the streams as a JSON object as the response to the request"""
|
||||
|
@ -497,6 +527,7 @@ class _Writer(object):
|
|||
class _ReplicationToken(collections.namedtuple("_ReplicationToken", (
|
||||
"events", "presence", "typing", "receipts", "account_data", "backfill",
|
||||
"push_rules", "pushers", "state", "caches", "to_device", "public_rooms",
|
||||
"federation",
|
||||
))):
|
||||
__slots__ = []
|
||||
|
||||
|
|
|
@ -34,6 +34,9 @@ class BaseSlavedStore(SQLBaseStore):
|
|||
else:
|
||||
self._cache_id_gen = None
|
||||
|
||||
self.expire_cache_url = hs.config.worker_replication_url + "/expire_cache"
|
||||
self.http_client = hs.get_simple_http_client()
|
||||
|
||||
def stream_positions(self):
|
||||
pos = {}
|
||||
if self._cache_id_gen:
|
||||
|
@ -54,3 +57,19 @@ class BaseSlavedStore(SQLBaseStore):
|
|||
logger.info("Got unexpected cache_func: %r", cache_func)
|
||||
self._cache_id_gen.advance(int(stream["position"]))
|
||||
return defer.succeed(None)
|
||||
|
||||
def _invalidate_cache_and_stream(self, txn, cache_func, keys):
|
||||
txn.call_after(cache_func.invalidate, keys)
|
||||
txn.call_after(self._send_invalidation_poke, cache_func, keys)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _send_invalidation_poke(self, cache_func, keys):
|
||||
try:
|
||||
yield self.http_client.post_json_get_json(self.expire_cache_url, {
|
||||
"invalidate": [{
|
||||
"name": cache_func.__name__,
|
||||
"keys": list(keys),
|
||||
}]
|
||||
})
|
||||
except:
|
||||
logger.exception("Failed to poke on expire_cache")
|
||||
|
|
|
@ -29,10 +29,16 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
|
|||
"DeviceInboxStreamChangeCache",
|
||||
self._device_inbox_id_gen.get_current_token()
|
||||
)
|
||||
self._device_federation_outbox_stream_cache = StreamChangeCache(
|
||||
"DeviceFederationOutboxStreamChangeCache",
|
||||
self._device_inbox_id_gen.get_current_token()
|
||||
)
|
||||
|
||||
get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__
|
||||
get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__
|
||||
get_new_device_msgs_for_remote = DataStore.get_new_device_msgs_for_remote.__func__
|
||||
delete_messages_for_device = DataStore.delete_messages_for_device.__func__
|
||||
delete_device_msgs_for_remote = DataStore.delete_device_msgs_for_remote.__func__
|
||||
|
||||
def stream_positions(self):
|
||||
result = super(SlavedDeviceInboxStore, self).stream_positions()
|
||||
|
@ -45,9 +51,15 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
|
|||
self._device_inbox_id_gen.advance(int(stream["position"]))
|
||||
for row in stream["rows"]:
|
||||
stream_id = row[0]
|
||||
user_id = row[1]
|
||||
self._device_inbox_stream_cache.entity_has_changed(
|
||||
user_id, stream_id
|
||||
)
|
||||
entity = row[1]
|
||||
|
||||
if entity.startswith("@"):
|
||||
self._device_inbox_stream_cache.entity_has_changed(
|
||||
entity, stream_id
|
||||
)
|
||||
else:
|
||||
self._device_federation_outbox_stream_cache.entity_has_changed(
|
||||
entity, stream_id
|
||||
)
|
||||
|
||||
return super(SlavedDeviceInboxStore, self).process_replication(result)
|
||||
|
|
|
@ -26,6 +26,11 @@ from synapse.storage.stream import StreamStore
|
|||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
||||
import ujson as json
|
||||
import logging
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# So, um, we want to borrow a load of functions intended for reading from
|
||||
# a DataStore, but we don't want to take functions that either write to the
|
||||
|
@ -180,6 +185,11 @@ class SlavedEventStore(BaseSlavedStore):
|
|||
EventFederationStore.__dict__["_get_forward_extremeties_for_room"]
|
||||
)
|
||||
|
||||
get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
|
||||
|
||||
get_federation_out_pos = DataStore.get_federation_out_pos.__func__
|
||||
update_federation_out_pos = DataStore.update_federation_out_pos.__func__
|
||||
|
||||
def stream_positions(self):
|
||||
result = super(SlavedEventStore, self).stream_positions()
|
||||
result["events"] = self._stream_id_gen.get_current_token()
|
||||
|
@ -194,6 +204,10 @@ class SlavedEventStore(BaseSlavedStore):
|
|||
stream = result.get("events")
|
||||
if stream:
|
||||
self._stream_id_gen.advance(int(stream["position"]))
|
||||
|
||||
if stream["rows"]:
|
||||
logger.info("Got %d event rows", len(stream["rows"]))
|
||||
|
||||
for row in stream["rows"]:
|
||||
self._process_replication_row(
|
||||
row, backfilled=False, state_resets=state_resets
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from twisted.internet import defer
|
||||
from ._base import BaseSlavedStore
|
||||
from synapse.storage import DataStore
|
||||
from synapse.storage.transactions import TransactionStore
|
||||
|
@ -22,9 +21,10 @@ from synapse.storage.transactions import TransactionStore
|
|||
class TransactionStore(BaseSlavedStore):
|
||||
get_destination_retry_timings = TransactionStore.__dict__[
|
||||
"get_destination_retry_timings"
|
||||
].orig
|
||||
]
|
||||
_get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__
|
||||
set_destination_retry_timings = DataStore.set_destination_retry_timings.__func__
|
||||
_set_destination_retry_timings = DataStore._set_destination_retry_timings.__func__
|
||||
|
||||
# For now, don't record the destination rety timings
|
||||
def set_destination_retry_timings(*args, **kwargs):
|
||||
return defer.succeed(None)
|
||||
prep_send_transaction = DataStore.prep_send_transaction.__func__
|
||||
delivered_txn = DataStore.delivered_txn.__func__
|
||||
|
|
|
@ -36,7 +36,7 @@ class ReceiptRestServlet(RestServlet):
|
|||
super(ReceiptRestServlet, self).__init__()
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.receipts_handler = hs.get_handlers().receipts_handler
|
||||
self.receipts_handler = hs.get_receipts_handler()
|
||||
self.presence_handler = hs.get_presence_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
|
@ -32,6 +32,9 @@ from synapse.appservice.scheduler import ApplicationServiceScheduler
|
|||
from synapse.crypto.keyring import Keyring
|
||||
from synapse.events.builder import EventBuilderFactory
|
||||
from synapse.federation import initialize_http_replication
|
||||
from synapse.federation.send_queue import FederationRemoteSendQueue
|
||||
from synapse.federation.transport.client import TransportLayerClient
|
||||
from synapse.federation.transaction_queue import TransactionQueue
|
||||
from synapse.handlers import Handlers
|
||||
from synapse.handlers.appservice import ApplicationServicesHandler
|
||||
from synapse.handlers.auth import AuthHandler
|
||||
|
@ -44,6 +47,7 @@ from synapse.handlers.sync import SyncHandler
|
|||
from synapse.handlers.typing import TypingHandler
|
||||
from synapse.handlers.events import EventHandler, EventStreamHandler
|
||||
from synapse.handlers.initial_sync import InitialSyncHandler
|
||||
from synapse.handlers.receipts import ReceiptsHandler
|
||||
from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
|
||||
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
|
||||
from synapse.notifier import Notifier
|
||||
|
@ -124,6 +128,9 @@ class HomeServer(object):
|
|||
'http_client_context_factory',
|
||||
'simple_http_client',
|
||||
'media_repository',
|
||||
'federation_transport_client',
|
||||
'federation_sender',
|
||||
'receipts_handler',
|
||||
]
|
||||
|
||||
def __init__(self, hostname, **kwargs):
|
||||
|
@ -265,9 +272,30 @@ class HomeServer(object):
|
|||
def build_media_repository(self):
|
||||
return MediaRepository(self)
|
||||
|
||||
def build_federation_transport_client(self):
|
||||
return TransportLayerClient(self)
|
||||
|
||||
def build_federation_sender(self):
|
||||
if self.should_send_federation():
|
||||
return TransactionQueue(self)
|
||||
elif not self.config.worker_app:
|
||||
return FederationRemoteSendQueue(self)
|
||||
else:
|
||||
raise Exception("Workers cannot send federation traffic")
|
||||
|
||||
def build_receipts_handler(self):
|
||||
return ReceiptsHandler(self)
|
||||
|
||||
def remove_pusher(self, app_id, push_key, user_id):
|
||||
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
|
||||
|
||||
def should_send_federation(self):
|
||||
"Should this server be sending federation traffic directly?"
|
||||
return self.config.send_federation and (
|
||||
not self.config.worker_app
|
||||
or self.config.worker_app == "synapse.app.federation_sender"
|
||||
)
|
||||
|
||||
|
||||
def _make_dependency_method(depname):
|
||||
def _get(hs):
|
||||
|
|
|
@ -561,12 +561,17 @@ class SQLBaseStore(object):
|
|||
|
||||
@staticmethod
|
||||
def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
|
||||
if keyvalues:
|
||||
where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
|
||||
else:
|
||||
where = ""
|
||||
|
||||
sql = (
|
||||
"SELECT %(retcol)s FROM %(table)s WHERE %(where)s"
|
||||
"SELECT %(retcol)s FROM %(table)s %(where)s"
|
||||
) % {
|
||||
"retcol": retcol,
|
||||
"table": table,
|
||||
"where": " AND ".join("%s = ?" % k for k in keyvalues.keys()),
|
||||
"where": where,
|
||||
}
|
||||
|
||||
txn.execute(sql, keyvalues.values())
|
||||
|
@ -744,10 +749,15 @@ class SQLBaseStore(object):
|
|||
|
||||
@staticmethod
|
||||
def _simple_update_one_txn(txn, table, keyvalues, updatevalues):
|
||||
update_sql = "UPDATE %s SET %s WHERE %s" % (
|
||||
if keyvalues:
|
||||
where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
|
||||
else:
|
||||
where = ""
|
||||
|
||||
update_sql = "UPDATE %s SET %s %s" % (
|
||||
table,
|
||||
", ".join("%s = ?" % (k,) for k in updatevalues),
|
||||
" AND ".join("%s = ?" % (k,) for k in keyvalues)
|
||||
where,
|
||||
)
|
||||
|
||||
txn.execute(
|
||||
|
|
|
@ -269,27 +269,29 @@ class DeviceInboxStore(SQLBaseStore):
|
|||
return defer.succeed([])
|
||||
|
||||
def get_all_new_device_messages_txn(txn):
|
||||
# We limit like this as we might have multiple rows per stream_id, and
|
||||
# we want to make sure we always get all entries for any stream_id
|
||||
# we return.
|
||||
upper_pos = min(current_pos, last_pos + limit)
|
||||
sql = (
|
||||
"SELECT stream_id FROM device_inbox"
|
||||
" WHERE ? < stream_id AND stream_id <= ?"
|
||||
" GROUP BY stream_id"
|
||||
" ORDER BY stream_id ASC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
txn.execute(sql, (last_pos, current_pos, limit))
|
||||
stream_ids = txn.fetchall()
|
||||
if not stream_ids:
|
||||
return []
|
||||
max_stream_id_in_limit = stream_ids[-1]
|
||||
|
||||
sql = (
|
||||
"SELECT stream_id, user_id, device_id, message_json"
|
||||
"SELECT stream_id, user_id"
|
||||
" FROM device_inbox"
|
||||
" WHERE ? < stream_id AND stream_id <= ?"
|
||||
" ORDER BY stream_id ASC"
|
||||
)
|
||||
txn.execute(sql, (last_pos, max_stream_id_in_limit))
|
||||
return txn.fetchall()
|
||||
txn.execute(sql, (last_pos, upper_pos))
|
||||
rows = txn.fetchall()
|
||||
|
||||
sql = (
|
||||
"SELECT stream_id, destination"
|
||||
" FROM device_federation_outbox"
|
||||
" WHERE ? < stream_id AND stream_id <= ?"
|
||||
" ORDER BY stream_id ASC"
|
||||
)
|
||||
txn.execute(sql, (last_pos, upper_pos))
|
||||
rows.extend(txn.fetchall())
|
||||
|
||||
return rows
|
||||
|
||||
return self.runInteraction(
|
||||
"get_all_new_device_messages", get_all_new_device_messages_txn
|
||||
|
|
|
@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
# Remember to update this number every time a change is made to database
|
||||
# schema files, so the users will be informed on server restarts.
|
||||
SCHEMA_VERSION = 38
|
||||
SCHEMA_VERSION = 39
|
||||
|
||||
dir_path = os.path.abspath(os.path.dirname(__file__))
|
||||
|
||||
|
|
|
@ -37,6 +37,13 @@ class UserPresenceState(namedtuple("UserPresenceState",
|
|||
status_msg (str): User set status message.
|
||||
"""
|
||||
|
||||
def as_dict(self):
|
||||
return dict(self._asdict())
|
||||
|
||||
@staticmethod
|
||||
def from_dict(d):
|
||||
return UserPresenceState(**d)
|
||||
|
||||
def copy_and_replace(self, **kwargs):
|
||||
return self._replace(**kwargs)
|
||||
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
/* Copyright 2016 OpenMarket Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
CREATE INDEX device_federation_outbox_id ON device_federation_outbox(stream_id);
|
22
synapse/storage/schema/delta/39/federation_out_position.sql
Normal file
22
synapse/storage/schema/delta/39/federation_out_position.sql
Normal file
|
@ -0,0 +1,22 @@
|
|||
/* Copyright 2016 OpenMarket Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
CREATE TABLE federation_stream_position(
|
||||
type TEXT NOT NULL,
|
||||
stream_id INTEGER NOT NULL
|
||||
);
|
||||
|
||||
INSERT INTO federation_stream_position (type, stream_id) VALUES ('federation', -1);
|
||||
INSERT INTO federation_stream_position (type, stream_id) VALUES ('events', -1);
|
|
@ -765,3 +765,50 @@ class StreamStore(SQLBaseStore):
|
|||
"token": end_token,
|
||||
},
|
||||
}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_all_new_events_stream(self, from_id, current_id, limit):
|
||||
"""Get all new events"""
|
||||
|
||||
def get_all_new_events_stream_txn(txn):
|
||||
sql = (
|
||||
"SELECT e.stream_ordering, e.event_id"
|
||||
" FROM events AS e"
|
||||
" WHERE"
|
||||
" ? < e.stream_ordering AND e.stream_ordering <= ?"
|
||||
" ORDER BY e.stream_ordering ASC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
|
||||
txn.execute(sql, (from_id, current_id, limit))
|
||||
rows = txn.fetchall()
|
||||
|
||||
upper_bound = current_id
|
||||
if len(rows) == limit:
|
||||
upper_bound = rows[-1][0]
|
||||
|
||||
return upper_bound, [row[1] for row in rows]
|
||||
|
||||
upper_bound, event_ids = yield self.runInteraction(
|
||||
"get_all_new_events_stream", get_all_new_events_stream_txn,
|
||||
)
|
||||
|
||||
events = yield self._get_events(event_ids)
|
||||
|
||||
defer.returnValue((upper_bound, events))
|
||||
|
||||
def get_federation_out_pos(self, typ):
|
||||
return self._simple_select_one_onecol(
|
||||
table="federation_stream_position",
|
||||
retcol="stream_id",
|
||||
keyvalues={"type": typ},
|
||||
desc="get_federation_out_pos"
|
||||
)
|
||||
|
||||
def update_federation_out_pos(self, typ, stream_id):
|
||||
return self._simple_update_one(
|
||||
table="federation_stream_position",
|
||||
keyvalues={"type": typ},
|
||||
updatevalues={"stream_id": stream_id},
|
||||
desc="update_federation_out_pos",
|
||||
)
|
||||
|
|
|
@ -200,25 +200,48 @@ class TransactionStore(SQLBaseStore):
|
|||
|
||||
def _set_destination_retry_timings(self, txn, destination,
|
||||
retry_last_ts, retry_interval):
|
||||
txn.call_after(self.get_destination_retry_timings.invalidate, (destination,))
|
||||
self.database_engine.lock_table(txn, "destinations")
|
||||
|
||||
self._simple_upsert_txn(
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_destination_retry_timings, (destination,)
|
||||
)
|
||||
|
||||
# We need to be careful here as the data may have changed from under us
|
||||
# due to a worker setting the timings.
|
||||
|
||||
prev_row = self._simple_select_one_txn(
|
||||
txn,
|
||||
"destinations",
|
||||
table="destinations",
|
||||
keyvalues={
|
||||
"destination": destination,
|
||||
},
|
||||
values={
|
||||
"retry_last_ts": retry_last_ts,
|
||||
"retry_interval": retry_interval,
|
||||
},
|
||||
insertion_values={
|
||||
"destination": destination,
|
||||
"retry_last_ts": retry_last_ts,
|
||||
"retry_interval": retry_interval,
|
||||
}
|
||||
retcols=("retry_last_ts", "retry_interval"),
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
if not prev_row:
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
table="destinations",
|
||||
values={
|
||||
"destination": destination,
|
||||
"retry_last_ts": retry_last_ts,
|
||||
"retry_interval": retry_interval,
|
||||
}
|
||||
)
|
||||
elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval:
|
||||
self._simple_update_one_txn(
|
||||
txn,
|
||||
"destinations",
|
||||
keyvalues={
|
||||
"destination": destination,
|
||||
},
|
||||
updatevalues={
|
||||
"retry_last_ts": retry_last_ts,
|
||||
"retry_interval": retry_interval,
|
||||
},
|
||||
)
|
||||
|
||||
def get_destinations_needing_retry(self):
|
||||
"""Get all destinations which are due a retry for sending a transaction.
|
||||
|
||||
|
|
|
@ -76,15 +76,26 @@ class JsonEncodedObject(object):
|
|||
d.update(self.unrecognized_keys)
|
||||
return d
|
||||
|
||||
def get_internal_dict(self):
|
||||
d = {
|
||||
k: _encode(v, internal=True) for (k, v) in self.__dict__.items()
|
||||
if k in self.valid_keys
|
||||
}
|
||||
d.update(self.unrecognized_keys)
|
||||
return d
|
||||
|
||||
def __str__(self):
|
||||
return "(%s, %s)" % (self.__class__.__name__, repr(self.__dict__))
|
||||
|
||||
|
||||
def _encode(obj):
|
||||
def _encode(obj, internal=False):
|
||||
if type(obj) is list:
|
||||
return [_encode(o) for o in obj]
|
||||
return [_encode(o, internal=internal) for o in obj]
|
||||
|
||||
if isinstance(obj, JsonEncodedObject):
|
||||
return obj.get_dict()
|
||||
if internal:
|
||||
return obj.get_internal_dict()
|
||||
else:
|
||||
return obj.get_dict()
|
||||
|
||||
return obj
|
||||
|
|
|
@ -121,12 +121,6 @@ class RetryDestinationLimiter(object):
|
|||
pass
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
def err(failure):
|
||||
logger.exception(
|
||||
"Failed to store set_destination_retry_timings",
|
||||
failure.value
|
||||
)
|
||||
|
||||
valid_err_code = False
|
||||
if exc_type is not None and issubclass(exc_type, CodeMessageException):
|
||||
valid_err_code = 0 <= exc_val.code < 500
|
||||
|
@ -151,6 +145,15 @@ class RetryDestinationLimiter(object):
|
|||
|
||||
retry_last_ts = int(self.clock.time_msec())
|
||||
|
||||
self.store.set_destination_retry_timings(
|
||||
self.destination, retry_last_ts, self.retry_interval
|
||||
).addErrback(err)
|
||||
@defer.inlineCallbacks
|
||||
def store_retry_timings():
|
||||
try:
|
||||
yield self.store.set_destination_retry_timings(
|
||||
self.destination, retry_last_ts, self.retry_interval
|
||||
)
|
||||
except:
|
||||
logger.exception(
|
||||
"Failed to store set_destination_retry_timings",
|
||||
)
|
||||
|
||||
store_retry_timings()
|
||||
|
|
|
@ -103,7 +103,7 @@ class ReplicationResourceCase(unittest.TestCase):
|
|||
room_id = yield self.create_room()
|
||||
event_id = yield self.send_text_message(room_id, "Hello, World")
|
||||
get = self.get(receipts="-1")
|
||||
yield self.hs.get_handlers().receipts_handler.received_client_receipt(
|
||||
yield self.hs.get_receipts_handler().received_client_receipt(
|
||||
room_id, "m.read", self.user_id, event_id
|
||||
)
|
||||
code, body = yield get
|
||||
|
|
|
@ -39,7 +39,7 @@ class ApplicationServiceStoreTestCase(unittest.TestCase):
|
|||
event_cache_size=1,
|
||||
password_providers=[],
|
||||
)
|
||||
hs = yield setup_test_homeserver(config=config)
|
||||
hs = yield setup_test_homeserver(config=config, federation_sender=Mock())
|
||||
|
||||
self.as_token = "token1"
|
||||
self.as_url = "some_url"
|
||||
|
@ -112,7 +112,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
|
|||
event_cache_size=1,
|
||||
password_providers=[],
|
||||
)
|
||||
hs = yield setup_test_homeserver(config=config)
|
||||
hs = yield setup_test_homeserver(config=config, federation_sender=Mock())
|
||||
self.db_pool = hs.get_db_pool()
|
||||
|
||||
self.as_list = [
|
||||
|
@ -443,7 +443,11 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
|
|||
app_service_config_files=[f1, f2], event_cache_size=1,
|
||||
password_providers=[]
|
||||
)
|
||||
hs = yield setup_test_homeserver(config=config, datastore=Mock())
|
||||
hs = yield setup_test_homeserver(
|
||||
config=config,
|
||||
datastore=Mock(),
|
||||
federation_sender=Mock()
|
||||
)
|
||||
|
||||
ApplicationServiceStore(hs)
|
||||
|
||||
|
@ -456,7 +460,11 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
|
|||
app_service_config_files=[f1, f2], event_cache_size=1,
|
||||
password_providers=[]
|
||||
)
|
||||
hs = yield setup_test_homeserver(config=config, datastore=Mock())
|
||||
hs = yield setup_test_homeserver(
|
||||
config=config,
|
||||
datastore=Mock(),
|
||||
federation_sender=Mock()
|
||||
)
|
||||
|
||||
with self.assertRaises(ConfigError) as cm:
|
||||
ApplicationServiceStore(hs)
|
||||
|
@ -475,7 +483,11 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
|
|||
app_service_config_files=[f1, f2], event_cache_size=1,
|
||||
password_providers=[]
|
||||
)
|
||||
hs = yield setup_test_homeserver(config=config, datastore=Mock())
|
||||
hs = yield setup_test_homeserver(
|
||||
config=config,
|
||||
datastore=Mock(),
|
||||
federation_sender=Mock()
|
||||
)
|
||||
|
||||
with self.assertRaises(ConfigError) as cm:
|
||||
ApplicationServiceStore(hs)
|
||||
|
|
|
@ -53,6 +53,8 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
|
|||
config.trusted_third_party_id_servers = []
|
||||
config.room_invite_state_types = []
|
||||
config.password_providers = []
|
||||
config.worker_replication_url = ""
|
||||
config.worker_app = None
|
||||
|
||||
config.use_frozen_dicts = True
|
||||
config.database_config = {"name": "sqlite3"}
|
||||
|
@ -70,6 +72,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
|
|||
database_engine=create_engine(config.database_config),
|
||||
get_db_conn=db_pool.get_db_conn,
|
||||
room_list_handler=object(),
|
||||
tls_server_context_factory=Mock(),
|
||||
**kargs
|
||||
)
|
||||
hs.setup()
|
||||
|
@ -79,6 +82,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
|
|||
version_string="Synapse/tests",
|
||||
database_engine=create_engine(config.database_config),
|
||||
room_list_handler=object(),
|
||||
tls_server_context_factory=Mock(),
|
||||
**kargs
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in a new issue