mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-15 09:03:51 +01:00
Merge branch 'develop' of github.com:matrix-org/synapse into release-v0.16.0
This commit is contained in:
commit
5c73115155
22 changed files with 998 additions and 399 deletions
84
jenkins-dendron-postgres.sh
Executable file
84
jenkins-dendron-postgres.sh
Executable file
|
@ -0,0 +1,84 @@
|
|||
#!/bin/bash
|
||||
|
||||
set -eux
|
||||
|
||||
: ${WORKSPACE:="$(pwd)"}
|
||||
|
||||
export PYTHONDONTWRITEBYTECODE=yep
|
||||
export SYNAPSE_CACHE_FACTOR=1
|
||||
|
||||
# Output test results as junit xml
|
||||
export TRIAL_FLAGS="--reporter=subunit"
|
||||
export TOXSUFFIX="| subunit-1to2 | subunit2junitxml --no-passthrough --output-to=results.xml"
|
||||
# Write coverage reports to a separate file for each process
|
||||
export COVERAGE_OPTS="-p"
|
||||
export DUMP_COVERAGE_COMMAND="coverage help"
|
||||
|
||||
# Output flake8 violations to violations.flake8.log
|
||||
# Don't exit with non-0 status code on Jenkins,
|
||||
# so that the build steps continue and a later step can decided whether to
|
||||
# UNSTABLE or FAILURE this build.
|
||||
export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?"
|
||||
|
||||
rm .coverage* || echo "No coverage files to remove"
|
||||
|
||||
tox --notest -e py27
|
||||
|
||||
TOX_BIN=$WORKSPACE/.tox/py27/bin
|
||||
python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install
|
||||
$TOX_BIN/pip install psycopg2
|
||||
$TOX_BIN/pip install lxml
|
||||
|
||||
: ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"}
|
||||
|
||||
if [[ ! -e .dendron-base ]]; then
|
||||
git clone https://github.com/matrix-org/dendron.git .dendron-base --mirror
|
||||
else
|
||||
(cd .dendron-base; git fetch -p)
|
||||
fi
|
||||
|
||||
rm -rf dendron
|
||||
git clone .dendron-base dendron --shared
|
||||
cd dendron
|
||||
|
||||
: ${GOPATH:=${WORKSPACE}/.gopath}
|
||||
if [[ "${GOPATH}" != *:* ]]; then
|
||||
mkdir -p "${GOPATH}"
|
||||
export PATH="${GOPATH}/bin:${PATH}"
|
||||
fi
|
||||
export GOPATH
|
||||
|
||||
git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop)
|
||||
|
||||
go get github.com/constabulary/gb/...
|
||||
gb generate
|
||||
gb build
|
||||
|
||||
cd ..
|
||||
|
||||
|
||||
if [[ ! -e .sytest-base ]]; then
|
||||
git clone https://github.com/matrix-org/sytest.git .sytest-base --mirror
|
||||
else
|
||||
(cd .sytest-base; git fetch -p)
|
||||
fi
|
||||
|
||||
rm -rf sytest
|
||||
git clone .sytest-base sytest --shared
|
||||
cd sytest
|
||||
|
||||
git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop)
|
||||
|
||||
: ${PORT_BASE:=8000}
|
||||
|
||||
./jenkins/prep_sytest_for_postgres.sh
|
||||
|
||||
echo >&2 "Running sytest with PostgreSQL";
|
||||
./jenkins/install_and_run.sh --python $TOX_BIN/python \
|
||||
--synapse-directory $WORKSPACE \
|
||||
--dendron $WORKSPACE/dendron/bin/dendron \
|
||||
--synchrotron \
|
||||
--pusher \
|
||||
--port-base $PORT_BASE
|
||||
|
||||
cd ..
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
import synapse
|
||||
|
||||
import gc
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
@ -351,6 +352,8 @@ class SynapseService(service.Service):
|
|||
def startService(self):
|
||||
hs = setup(self.config)
|
||||
change_resource_limit(hs.config.soft_file_limit)
|
||||
if hs.config.gc_thresholds:
|
||||
gc.set_threshold(*hs.config.gc_thresholds)
|
||||
|
||||
def stopService(self):
|
||||
return self._port.stopListening()
|
||||
|
@ -422,6 +425,8 @@ def run(hs):
|
|||
# sys.settrace(logcontext_tracer)
|
||||
with LoggingContext("run"):
|
||||
change_resource_limit(hs.config.soft_file_limit)
|
||||
if hs.config.gc_thresholds:
|
||||
gc.set_threshold(*hs.config.gc_thresholds)
|
||||
reactor.run()
|
||||
|
||||
if hs.config.daemonize:
|
||||
|
|
|
@ -43,6 +43,7 @@ from twisted.web.resource import Resource
|
|||
|
||||
from daemonize import Daemonize
|
||||
|
||||
import gc
|
||||
import sys
|
||||
import logging
|
||||
|
||||
|
@ -64,6 +65,20 @@ class SlaveConfig(DatabaseConfig):
|
|||
self.pid_file = self.abspath(config.get("pid_file"))
|
||||
self.public_baseurl = config["public_baseurl"]
|
||||
|
||||
thresholds = config.get("gc_thresholds", None)
|
||||
if thresholds is not None:
|
||||
try:
|
||||
assert len(thresholds) == 3
|
||||
self.gc_thresholds = (
|
||||
int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
|
||||
)
|
||||
except:
|
||||
raise ConfigError(
|
||||
"Value of `gc_threshold` must be a list of three integers if set"
|
||||
)
|
||||
else:
|
||||
self.gc_thresholds = None
|
||||
|
||||
# some things used by the auth handler but not actually used in the
|
||||
# pusher codebase
|
||||
self.bcrypt_rounds = None
|
||||
|
@ -311,7 +326,7 @@ class PusherServer(HomeServer):
|
|||
poke_pushers(result)
|
||||
except:
|
||||
logger.exception("Error replicating from %r", replication_url)
|
||||
sleep(30)
|
||||
yield sleep(30)
|
||||
|
||||
|
||||
def setup(config_options):
|
||||
|
@ -342,6 +357,8 @@ def setup(config_options):
|
|||
ps.start_listening()
|
||||
|
||||
change_resource_limit(ps.config.soft_file_limit)
|
||||
if ps.config.gc_thresholds:
|
||||
gc.set_threshold(*ps.config.gc_thresholds)
|
||||
|
||||
def start():
|
||||
ps.replicate()
|
||||
|
@ -361,6 +378,8 @@ if __name__ == '__main__':
|
|||
def run():
|
||||
with LoggingContext("run"):
|
||||
change_resource_limit(ps.config.soft_file_limit)
|
||||
if ps.config.gc_thresholds:
|
||||
gc.set_threshold(*ps.config.gc_thresholds)
|
||||
reactor.run()
|
||||
|
||||
daemon = Daemonize(
|
||||
|
|
537
synapse/app/synchrotron.py
Normal file
537
synapse/app/synchrotron.py
Normal file
|
@ -0,0 +1,537 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2016 OpenMarket Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import synapse
|
||||
|
||||
from synapse.api.constants import EventTypes, PresenceState
|
||||
from synapse.config._base import ConfigError
|
||||
from synapse.config.database import DatabaseConfig
|
||||
from synapse.config.logger import LoggingConfig
|
||||
from synapse.config.appservice import AppServiceConfig
|
||||
from synapse.events import FrozenEvent
|
||||
from synapse.handlers.presence import PresenceHandler
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.http.server import JsonResource
|
||||
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
|
||||
from synapse.rest.client.v2_alpha import sync
|
||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
|
||||
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
|
||||
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
|
||||
from synapse.replication.slave.storage.presence import SlavedPresenceStore
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.client_ips import ClientIpStore
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.storage.presence import PresenceStore, UserPresenceState
|
||||
from synapse.storage.roommember import RoomMemberStore
|
||||
from synapse.util.async import sleep
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext, preserve_fn
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.rlimit import change_resource_limit
|
||||
from synapse.util.stringutils import random_string
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
from twisted.internet import reactor, defer
|
||||
from twisted.web.resource import Resource
|
||||
|
||||
from daemonize import Daemonize
|
||||
|
||||
import sys
|
||||
import logging
|
||||
import contextlib
|
||||
import gc
|
||||
import ujson as json
|
||||
|
||||
logger = logging.getLogger("synapse.app.synchrotron")
|
||||
|
||||
|
||||
class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig):
|
||||
def read_config(self, config):
|
||||
self.replication_url = config["replication_url"]
|
||||
self.server_name = config["server_name"]
|
||||
self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
|
||||
"use_insecure_ssl_client_just_for_testing_do_not_use", False
|
||||
)
|
||||
self.user_agent_suffix = None
|
||||
self.listeners = config["listeners"]
|
||||
self.soft_file_limit = config.get("soft_file_limit")
|
||||
self.daemonize = config.get("daemonize")
|
||||
self.pid_file = self.abspath(config.get("pid_file"))
|
||||
self.macaroon_secret_key = config["macaroon_secret_key"]
|
||||
self.expire_access_token = config.get("expire_access_token", False)
|
||||
|
||||
thresholds = config.get("gc_thresholds", None)
|
||||
if thresholds is not None:
|
||||
try:
|
||||
assert len(thresholds) == 3
|
||||
self.gc_thresholds = (
|
||||
int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
|
||||
)
|
||||
except:
|
||||
raise ConfigError(
|
||||
"Value of `gc_threshold` must be a list of three integers if set"
|
||||
)
|
||||
else:
|
||||
self.gc_thresholds = None
|
||||
|
||||
def default_config(self, server_name, **kwargs):
|
||||
pid_file = self.abspath("synchroton.pid")
|
||||
return """\
|
||||
# Slave configuration
|
||||
|
||||
# The replication listener on the synapse to talk to.
|
||||
#replication_url: https://localhost:{replication_port}/_synapse/replication
|
||||
|
||||
server_name: "%(server_name)s"
|
||||
|
||||
listeners:
|
||||
# Enable a /sync listener on the synchrontron
|
||||
#- type: http
|
||||
# port: {http_port}
|
||||
# bind_address: ""
|
||||
# Enable a ssh manhole listener on the synchrotron
|
||||
# - type: manhole
|
||||
# port: {manhole_port}
|
||||
# bind_address: 127.0.0.1
|
||||
# Enable a metric listener on the synchrotron
|
||||
# - type: http
|
||||
# port: {metrics_port}
|
||||
# bind_address: 127.0.0.1
|
||||
# resources:
|
||||
# - names: ["metrics"]
|
||||
# compress: False
|
||||
|
||||
report_stats: False
|
||||
|
||||
daemonize: False
|
||||
|
||||
pid_file: %(pid_file)s
|
||||
""" % locals()
|
||||
|
||||
|
||||
class SynchrotronSlavedStore(
|
||||
SlavedPushRuleStore,
|
||||
SlavedEventStore,
|
||||
SlavedReceiptsStore,
|
||||
SlavedAccountDataStore,
|
||||
SlavedApplicationServiceStore,
|
||||
SlavedRegistrationStore,
|
||||
SlavedFilteringStore,
|
||||
SlavedPresenceStore,
|
||||
BaseSlavedStore,
|
||||
ClientIpStore, # After BaseSlavedStore because the constructor is different
|
||||
):
|
||||
# XXX: This is a bit broken because we don't persist forgotten rooms
|
||||
# in a way that they can be streamed. This means that we don't have a
|
||||
# way to invalidate the forgotten rooms cache correctly.
|
||||
# For now we expire the cache every 10 minutes.
|
||||
BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000
|
||||
who_forgot_in_room = (
|
||||
RoomMemberStore.__dict__["who_forgot_in_room"]
|
||||
)
|
||||
|
||||
# XXX: This is a bit broken because we don't persist the accepted list in a
|
||||
# way that can be replicated. This means that we don't have a way to
|
||||
# invalidate the cache correctly.
|
||||
get_presence_list_accepted = PresenceStore.__dict__[
|
||||
"get_presence_list_accepted"
|
||||
]
|
||||
|
||||
UPDATE_SYNCING_USERS_MS = 10 * 1000
|
||||
|
||||
|
||||
class SynchrotronPresence(object):
|
||||
def __init__(self, hs):
|
||||
self.http_client = hs.get_simple_http_client()
|
||||
self.store = hs.get_datastore()
|
||||
self.user_to_num_current_syncs = {}
|
||||
self.syncing_users_url = hs.config.replication_url + "/syncing_users"
|
||||
self.clock = hs.get_clock()
|
||||
|
||||
active_presence = self.store.take_presence_startup_info()
|
||||
self.user_to_current_state = {
|
||||
state.user_id: state
|
||||
for state in active_presence
|
||||
}
|
||||
|
||||
self.process_id = random_string(16)
|
||||
logger.info("Presence process_id is %r", self.process_id)
|
||||
|
||||
self._sending_sync = False
|
||||
self._need_to_send_sync = False
|
||||
self.clock.looping_call(
|
||||
self._send_syncing_users_regularly,
|
||||
UPDATE_SYNCING_USERS_MS,
|
||||
)
|
||||
|
||||
reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
|
||||
|
||||
def set_state(self, user, state):
|
||||
# TODO Hows this supposed to work?
|
||||
pass
|
||||
|
||||
get_states = PresenceHandler.get_states.__func__
|
||||
current_state_for_users = PresenceHandler.current_state_for_users.__func__
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def user_syncing(self, user_id, affect_presence):
|
||||
if affect_presence:
|
||||
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
|
||||
self.user_to_num_current_syncs[user_id] = curr_sync + 1
|
||||
prev_states = yield self.current_state_for_users([user_id])
|
||||
if prev_states[user_id].state == PresenceState.OFFLINE:
|
||||
# TODO: Don't block the sync request on this HTTP hit.
|
||||
yield self._send_syncing_users_now()
|
||||
|
||||
def _end():
|
||||
# We check that the user_id is in user_to_num_current_syncs because
|
||||
# user_to_num_current_syncs may have been cleared if we are
|
||||
# shutting down.
|
||||
if affect_presence and user_id in self.user_to_num_current_syncs:
|
||||
self.user_to_num_current_syncs[user_id] -= 1
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _user_syncing():
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
_end()
|
||||
|
||||
defer.returnValue(_user_syncing())
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _on_shutdown(self):
|
||||
# When the synchrotron is shutdown tell the master to clear the in
|
||||
# progress syncs for this process
|
||||
self.user_to_num_current_syncs.clear()
|
||||
yield self._send_syncing_users_now()
|
||||
|
||||
def _send_syncing_users_regularly(self):
|
||||
# Only send an update if we aren't in the middle of sending one.
|
||||
if not self._sending_sync:
|
||||
preserve_fn(self._send_syncing_users_now)()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _send_syncing_users_now(self):
|
||||
if self._sending_sync:
|
||||
# We don't want to race with sending another update.
|
||||
# Instead we wait for that update to finish and send another
|
||||
# update afterwards.
|
||||
self._need_to_send_sync = True
|
||||
return
|
||||
|
||||
# Flag that we are sending an update.
|
||||
self._sending_sync = True
|
||||
|
||||
yield self.http_client.post_json_get_json(self.syncing_users_url, {
|
||||
"process_id": self.process_id,
|
||||
"syncing_users": [
|
||||
user_id for user_id, count in self.user_to_num_current_syncs.items()
|
||||
if count > 0
|
||||
],
|
||||
})
|
||||
|
||||
# Unset the flag as we are no longer sending an update.
|
||||
self._sending_sync = False
|
||||
if self._need_to_send_sync:
|
||||
# If something happened while we were sending the update then
|
||||
# we might need to send another update.
|
||||
# TODO: Check if the update that was sent matches the current state
|
||||
# as we only need to send an update if they are different.
|
||||
self._need_to_send_sync = False
|
||||
yield self._send_syncing_users_now()
|
||||
|
||||
def process_replication(self, result):
|
||||
stream = result.get("presence", {"rows": []})
|
||||
for row in stream["rows"]:
|
||||
(
|
||||
position, user_id, state, last_active_ts,
|
||||
last_federation_update_ts, last_user_sync_ts, status_msg,
|
||||
currently_active
|
||||
) = row
|
||||
self.user_to_current_state[user_id] = UserPresenceState(
|
||||
user_id, state, last_active_ts,
|
||||
last_federation_update_ts, last_user_sync_ts, status_msg,
|
||||
currently_active
|
||||
)
|
||||
|
||||
|
||||
class SynchrotronTyping(object):
|
||||
def __init__(self, hs):
|
||||
self._latest_room_serial = 0
|
||||
self._room_serials = {}
|
||||
self._room_typing = {}
|
||||
|
||||
def stream_positions(self):
|
||||
return {"typing": self._latest_room_serial}
|
||||
|
||||
def process_replication(self, result):
|
||||
stream = result.get("typing")
|
||||
if stream:
|
||||
self._latest_room_serial = int(stream["position"])
|
||||
|
||||
for row in stream["rows"]:
|
||||
position, room_id, typing_json = row
|
||||
typing = json.loads(typing_json)
|
||||
self._room_serials[room_id] = position
|
||||
self._room_typing[room_id] = typing
|
||||
|
||||
|
||||
class SynchrotronApplicationService(object):
|
||||
def notify_interested_services(self, event):
|
||||
pass
|
||||
|
||||
|
||||
class SynchrotronServer(HomeServer):
|
||||
def get_db_conn(self, run_new_connection=True):
|
||||
# Any param beginning with cp_ is a parameter for adbapi, and should
|
||||
# not be passed to the database engine.
|
||||
db_params = {
|
||||
k: v for k, v in self.db_config.get("args", {}).items()
|
||||
if not k.startswith("cp_")
|
||||
}
|
||||
db_conn = self.database_engine.module.connect(**db_params)
|
||||
|
||||
if run_new_connection:
|
||||
self.database_engine.on_new_connection(db_conn)
|
||||
return db_conn
|
||||
|
||||
def setup(self):
|
||||
logger.info("Setting up.")
|
||||
self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self)
|
||||
logger.info("Finished setting up.")
|
||||
|
||||
def _listen_http(self, listener_config):
|
||||
port = listener_config["port"]
|
||||
bind_address = listener_config.get("bind_address", "")
|
||||
site_tag = listener_config.get("tag", port)
|
||||
resources = {}
|
||||
for res in listener_config["resources"]:
|
||||
for name in res["names"]:
|
||||
if name == "metrics":
|
||||
resources[METRICS_PREFIX] = MetricsResource(self)
|
||||
elif name == "client":
|
||||
resource = JsonResource(self, canonical_json=False)
|
||||
sync.register_servlets(self, resource)
|
||||
resources.update({
|
||||
"/_matrix/client/r0": resource,
|
||||
"/_matrix/client/unstable": resource,
|
||||
"/_matrix/client/v2_alpha": resource,
|
||||
})
|
||||
|
||||
root_resource = create_resource_tree(resources, Resource())
|
||||
reactor.listenTCP(
|
||||
port,
|
||||
SynapseSite(
|
||||
"synapse.access.http.%s" % (site_tag,),
|
||||
site_tag,
|
||||
listener_config,
|
||||
root_resource,
|
||||
),
|
||||
interface=bind_address
|
||||
)
|
||||
logger.info("Synapse synchrotron now listening on port %d", port)
|
||||
|
||||
def start_listening(self):
|
||||
for listener in self.config.listeners:
|
||||
if listener["type"] == "http":
|
||||
self._listen_http(listener)
|
||||
elif listener["type"] == "manhole":
|
||||
reactor.listenTCP(
|
||||
listener["port"],
|
||||
manhole(
|
||||
username="matrix",
|
||||
password="rabbithole",
|
||||
globals={"hs": self},
|
||||
),
|
||||
interface=listener.get("bind_address", '127.0.0.1')
|
||||
)
|
||||
else:
|
||||
logger.warn("Unrecognized listener type: %s", listener["type"])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def replicate(self):
|
||||
http_client = self.get_simple_http_client()
|
||||
store = self.get_datastore()
|
||||
replication_url = self.config.replication_url
|
||||
clock = self.get_clock()
|
||||
notifier = self.get_notifier()
|
||||
presence_handler = self.get_presence_handler()
|
||||
typing_handler = self.get_typing_handler()
|
||||
|
||||
def expire_broken_caches():
|
||||
store.who_forgot_in_room.invalidate_all()
|
||||
store.get_presence_list_accepted.invalidate_all()
|
||||
|
||||
def notify_from_stream(
|
||||
result, stream_name, stream_key, room=None, user=None
|
||||
):
|
||||
stream = result.get(stream_name)
|
||||
if stream:
|
||||
position_index = stream["field_names"].index("position")
|
||||
if room:
|
||||
room_index = stream["field_names"].index(room)
|
||||
if user:
|
||||
user_index = stream["field_names"].index(user)
|
||||
|
||||
users = ()
|
||||
rooms = ()
|
||||
for row in stream["rows"]:
|
||||
position = row[position_index]
|
||||
|
||||
if user:
|
||||
users = (row[user_index],)
|
||||
|
||||
if room:
|
||||
rooms = (row[room_index],)
|
||||
|
||||
notifier.on_new_event(
|
||||
stream_key, position, users=users, rooms=rooms
|
||||
)
|
||||
|
||||
def notify(result):
|
||||
stream = result.get("events")
|
||||
if stream:
|
||||
max_position = stream["position"]
|
||||
for row in stream["rows"]:
|
||||
position = row[0]
|
||||
internal = json.loads(row[1])
|
||||
event_json = json.loads(row[2])
|
||||
event = FrozenEvent(event_json, internal_metadata_dict=internal)
|
||||
extra_users = ()
|
||||
if event.type == EventTypes.Member:
|
||||
extra_users = (event.state_key,)
|
||||
notifier.on_new_room_event(
|
||||
event, position, max_position, extra_users
|
||||
)
|
||||
|
||||
notify_from_stream(
|
||||
result, "push_rules", "push_rules_key", user="user_id"
|
||||
)
|
||||
notify_from_stream(
|
||||
result, "user_account_data", "account_data_key", user="user_id"
|
||||
)
|
||||
notify_from_stream(
|
||||
result, "room_account_data", "account_data_key", user="user_id"
|
||||
)
|
||||
notify_from_stream(
|
||||
result, "tag_account_data", "account_data_key", user="user_id"
|
||||
)
|
||||
notify_from_stream(
|
||||
result, "receipts", "receipt_key", room="room_id"
|
||||
)
|
||||
notify_from_stream(
|
||||
result, "typing", "typing_key", room="room_id"
|
||||
)
|
||||
|
||||
next_expire_broken_caches_ms = 0
|
||||
while True:
|
||||
try:
|
||||
args = store.stream_positions()
|
||||
args.update(typing_handler.stream_positions())
|
||||
args["timeout"] = 30000
|
||||
result = yield http_client.get_json(replication_url, args=args)
|
||||
now_ms = clock.time_msec()
|
||||
if now_ms > next_expire_broken_caches_ms:
|
||||
expire_broken_caches()
|
||||
next_expire_broken_caches_ms = (
|
||||
now_ms + store.BROKEN_CACHE_EXPIRY_MS
|
||||
)
|
||||
yield store.process_replication(result)
|
||||
typing_handler.process_replication(result)
|
||||
presence_handler.process_replication(result)
|
||||
notify(result)
|
||||
except:
|
||||
logger.exception("Error replicating from %r", replication_url)
|
||||
yield sleep(5)
|
||||
|
||||
def build_presence_handler(self):
|
||||
return SynchrotronPresence(self)
|
||||
|
||||
def build_typing_handler(self):
|
||||
return SynchrotronTyping(self)
|
||||
|
||||
|
||||
def setup(config_options):
|
||||
try:
|
||||
config = SynchrotronConfig.load_config(
|
||||
"Synapse synchrotron", config_options
|
||||
)
|
||||
except ConfigError as e:
|
||||
sys.stderr.write("\n" + e.message + "\n")
|
||||
sys.exit(1)
|
||||
|
||||
if not config:
|
||||
sys.exit(0)
|
||||
|
||||
config.setup_logging()
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
|
||||
ss = SynchrotronServer(
|
||||
config.server_name,
|
||||
db_config=config.database_config,
|
||||
config=config,
|
||||
version_string=get_version_string("Synapse", synapse),
|
||||
database_engine=database_engine,
|
||||
application_service_handler=SynchrotronApplicationService(),
|
||||
)
|
||||
|
||||
ss.setup()
|
||||
ss.start_listening()
|
||||
|
||||
change_resource_limit(ss.config.soft_file_limit)
|
||||
if ss.config.gc_thresholds:
|
||||
ss.set_threshold(*ss.config.gc_thresholds)
|
||||
|
||||
def start():
|
||||
ss.get_datastore().start_profiling()
|
||||
ss.replicate()
|
||||
|
||||
reactor.callWhenRunning(start)
|
||||
|
||||
return ss
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
with LoggingContext("main"):
|
||||
ss = setup(sys.argv[1:])
|
||||
|
||||
if ss.config.daemonize:
|
||||
def run():
|
||||
with LoggingContext("run"):
|
||||
change_resource_limit(ss.config.soft_file_limit)
|
||||
if ss.config.gc_thresholds:
|
||||
gc.set_threshold(*ss.config.gc_thresholds)
|
||||
reactor.run()
|
||||
|
||||
daemon = Daemonize(
|
||||
app="synapse-synchrotron",
|
||||
pid=ss.config.pid_file,
|
||||
action=run,
|
||||
auto_close_fds=False,
|
||||
verbose=True,
|
||||
logger=logger,
|
||||
)
|
||||
|
||||
daemon.start()
|
||||
else:
|
||||
reactor.run()
|
|
@ -13,7 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ._base import Config
|
||||
from ._base import Config, ConfigError
|
||||
|
||||
|
||||
class ServerConfig(Config):
|
||||
|
@ -38,6 +38,20 @@ class ServerConfig(Config):
|
|||
|
||||
self.listeners = config.get("listeners", [])
|
||||
|
||||
thresholds = config.get("gc_thresholds", None)
|
||||
if thresholds is not None:
|
||||
try:
|
||||
assert len(thresholds) == 3
|
||||
self.gc_thresholds = (
|
||||
int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
|
||||
)
|
||||
except:
|
||||
raise ConfigError(
|
||||
"Value of `gc_threshold` must be a list of three integers if set"
|
||||
)
|
||||
else:
|
||||
self.gc_thresholds = None
|
||||
|
||||
bind_port = config.get("bind_port")
|
||||
if bind_port:
|
||||
self.listeners = []
|
||||
|
@ -157,6 +171,9 @@ class ServerConfig(Config):
|
|||
# hard limit.
|
||||
soft_file_limit: 0
|
||||
|
||||
# The GC threshold parameters to pass to `gc.set_threshold`, if defined
|
||||
# gc_thresholds: [700, 10, 10]
|
||||
|
||||
# A list of other Home Servers to fetch the public room directory from
|
||||
# and include in the public room directory of this home server
|
||||
# This is a temporary stopgap solution to populate new server with a
|
||||
|
|
|
@ -66,10 +66,6 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
self.hs = hs
|
||||
|
||||
self.distributor.observe("user_joined_room", self.user_joined_room)
|
||||
|
||||
self.waiting_for_join_list = {}
|
||||
|
||||
self.store = hs.get_datastore()
|
||||
self.replication_layer = hs.get_replication_layer()
|
||||
self.state_handler = hs.get_state_handler()
|
||||
|
@ -1091,15 +1087,6 @@ class FederationHandler(BaseHandler):
|
|||
def get_min_depth_for_context(self, context):
|
||||
return self.store.get_min_depth(context)
|
||||
|
||||
@log_function
|
||||
def user_joined_room(self, user, room_id):
|
||||
waiters = self.waiting_for_join_list.get(
|
||||
(user.to_string(), room_id),
|
||||
[]
|
||||
)
|
||||
while waiters:
|
||||
waiters.pop().callback(None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def _handle_new_event(self, origin, event, state=None, auth_events=None,
|
||||
|
|
|
@ -50,6 +50,8 @@ timers_fired_counter = metrics.register_counter("timers_fired")
|
|||
federation_presence_counter = metrics.register_counter("federation_presence")
|
||||
bump_active_time_counter = metrics.register_counter("bump_active_time")
|
||||
|
||||
get_updates_counter = metrics.register_counter("get_updates", labels=["type"])
|
||||
|
||||
|
||||
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
|
||||
# "currently_active"
|
||||
|
@ -181,7 +183,7 @@ class PresenceHandler(object):
|
|||
# The initial delay is to allow disconnected clients a chance to
|
||||
# reconnect before we treat them as offline.
|
||||
self.clock.call_later(
|
||||
30 * 1000,
|
||||
30,
|
||||
self.clock.looping_call,
|
||||
self._handle_timeouts,
|
||||
5000,
|
||||
|
@ -281,8 +283,10 @@ class PresenceHandler(object):
|
|||
"""Checks the presence of users that have timed out and updates as
|
||||
appropriate.
|
||||
"""
|
||||
logger.info("Handling presence timeouts")
|
||||
now = self.clock.time_msec()
|
||||
|
||||
try:
|
||||
with Measure(self.clock, "presence_handle_timeouts"):
|
||||
# Fetch the list of users that *may* have timed out. Things may have
|
||||
# changed since the timeout was set, so we won't necessarily have to
|
||||
|
@ -293,12 +297,12 @@ class PresenceHandler(object):
|
|||
# process have expired.
|
||||
expired_process_ids = [
|
||||
process_id for process_id, last_update
|
||||
in self.external_process_last_update.items()
|
||||
in self.external_process_last_updated_ms.items()
|
||||
if now - last_update > EXTERNAL_PROCESS_EXPIRY
|
||||
]
|
||||
for process_id in expired_process_ids:
|
||||
users_to_check.update(
|
||||
self.external_process_to_current_syncs.pop(process_id, ())
|
||||
self.external_process_last_updated_ms.pop(process_id, ())
|
||||
)
|
||||
self.external_process_last_update.pop(process_id)
|
||||
|
||||
|
@ -314,11 +318,13 @@ class PresenceHandler(object):
|
|||
changes = handle_timeouts(
|
||||
states,
|
||||
is_mine_fn=self.is_mine_id,
|
||||
syncing_users=self.get_syncing_users(),
|
||||
syncing_user_ids=self.get_currently_syncing_users(),
|
||||
now=now,
|
||||
)
|
||||
|
||||
preserve_fn(self._update_states)(changes)
|
||||
except:
|
||||
logger.exception("Exception in _handle_timeouts loop")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def bump_presence_active_time(self, user):
|
||||
|
@ -400,7 +406,8 @@ class PresenceHandler(object):
|
|||
user_id for user_id, count in self.user_to_num_current_syncs.items()
|
||||
if count
|
||||
}
|
||||
syncing_user_ids.update(self.external_process_to_current_syncs.values())
|
||||
for user_ids in self.external_process_to_current_syncs.values():
|
||||
syncing_user_ids.update(user_ids)
|
||||
return syncing_user_ids
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -974,13 +981,13 @@ class PresenceEventSource(object):
|
|||
|
||||
user_ids_changed = set()
|
||||
changed = None
|
||||
if from_key and max_token - from_key < 100:
|
||||
# For small deltas, its quicker to get all changes and then
|
||||
# work out if we share a room or they're in our presence list
|
||||
if from_key:
|
||||
changed = stream_change_cache.get_all_entities_changed(from_key)
|
||||
|
||||
# get_all_entities_changed can return None
|
||||
if changed is not None:
|
||||
if changed is not None and len(changed) < 500:
|
||||
# For small deltas, its quicker to get all changes and then
|
||||
# work out if we share a room or they're in our presence list
|
||||
get_updates_counter.inc("stream")
|
||||
for other_user_id in changed:
|
||||
if other_user_id in friends:
|
||||
user_ids_changed.add(other_user_id)
|
||||
|
@ -992,6 +999,8 @@ class PresenceEventSource(object):
|
|||
else:
|
||||
# Too many possible updates. Find all users we can see and check
|
||||
# if any of them have changed.
|
||||
get_updates_counter.inc("full")
|
||||
|
||||
user_ids_to_check = set()
|
||||
for room_id in room_ids:
|
||||
users = yield self.store.get_users_in_room(room_id)
|
||||
|
|
|
@ -22,6 +22,7 @@ import functools
|
|||
import os
|
||||
import stat
|
||||
import time
|
||||
import gc
|
||||
|
||||
from twisted.internet import reactor
|
||||
|
||||
|
@ -152,6 +153,13 @@ reactor_metrics = get_metrics_for("reactor")
|
|||
tick_time = reactor_metrics.register_distribution("tick_time")
|
||||
pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
|
||||
|
||||
gc_time = reactor_metrics.register_distribution("gc_time", labels=["gen"])
|
||||
gc_unreachable = reactor_metrics.register_counter("gc_unreachable", labels=["gen"])
|
||||
|
||||
reactor_metrics.register_callback(
|
||||
"gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"]
|
||||
)
|
||||
|
||||
|
||||
def runUntilCurrentTimer(func):
|
||||
|
||||
|
@ -178,6 +186,22 @@ def runUntilCurrentTimer(func):
|
|||
end = time.time() * 1000
|
||||
tick_time.inc_by(end - start)
|
||||
pending_calls_metric.inc_by(num_pending)
|
||||
|
||||
# Check if we need to do a manual GC (since its been disabled), and do
|
||||
# one if necessary.
|
||||
threshold = gc.get_threshold()
|
||||
counts = gc.get_count()
|
||||
for i in (2, 1, 0):
|
||||
if threshold[i] < counts[i]:
|
||||
logger.info("Collecting gc %d", i)
|
||||
|
||||
start = time.time() * 1000
|
||||
unreachable = gc.collect(i)
|
||||
end = time.time() * 1000
|
||||
|
||||
gc_time.inc_by(end - start, i)
|
||||
gc_unreachable.inc_by(unreachable, i)
|
||||
|
||||
return ret
|
||||
|
||||
return f
|
||||
|
@ -192,5 +216,9 @@ try:
|
|||
# runUntilCurrent is called when we have pending calls. It is called once
|
||||
# per iteratation after fd polling.
|
||||
reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent)
|
||||
|
||||
# We manually run the GC each reactor tick so that we can get some metrics
|
||||
# about time spent doing GC,
|
||||
gc.disable()
|
||||
except AttributeError:
|
||||
pass
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
from twisted.internet import defer
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import AuthError
|
||||
|
||||
from synapse.util.logutils import log_function
|
||||
|
@ -152,10 +152,6 @@ class Notifier(object):
|
|||
self.appservice_handler = hs.get_application_service_handler()
|
||||
self.state_handler = hs.get_state_handler()
|
||||
|
||||
hs.get_distributor().observe(
|
||||
"user_joined_room", self._user_joined_room
|
||||
)
|
||||
|
||||
self.clock.looping_call(
|
||||
self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS
|
||||
)
|
||||
|
@ -248,6 +244,9 @@ class Notifier(object):
|
|||
)
|
||||
app_streams |= app_user_streams
|
||||
|
||||
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
|
||||
self._user_joined_room(event.state_key, event.room_id)
|
||||
|
||||
self.on_new_event(
|
||||
"room_key", room_stream_id,
|
||||
users=extra_users,
|
||||
|
@ -483,9 +482,8 @@ class Notifier(object):
|
|||
user_stream.appservice, set()
|
||||
).add(user_stream)
|
||||
|
||||
def _user_joined_room(self, user, room_id):
|
||||
user = str(user)
|
||||
new_user_stream = self.user_to_user_stream.get(user)
|
||||
def _user_joined_room(self, user_id, room_id):
|
||||
new_user_stream = self.user_to_user_stream.get(user_id)
|
||||
if new_user_stream is not None:
|
||||
room_streams = self.room_to_user_streams.setdefault(room_id, set())
|
||||
room_streams.add(new_user_stream)
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import ujson as json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
|
@ -27,13 +26,6 @@ from synapse.visibility import filter_events_for_clients
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def decode_rule_json(rule):
|
||||
rule = dict(rule)
|
||||
rule['conditions'] = json.loads(rule['conditions'])
|
||||
rule['actions'] = json.loads(rule['actions'])
|
||||
return rule
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_rules(room_id, user_ids, store):
|
||||
rules_by_user = yield store.bulk_get_push_rules(user_ids)
|
||||
|
|
|
@ -13,37 +13,11 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.push.baserules import list_with_base_rules
|
||||
|
||||
from synapse.push.rulekinds import (
|
||||
PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP
|
||||
)
|
||||
|
||||
import copy
|
||||
import simplejson as json
|
||||
|
||||
|
||||
def load_rules_for_user(user, rawrules, enabled_map):
|
||||
ruleslist = []
|
||||
for rawrule in rawrules:
|
||||
rule = dict(rawrule)
|
||||
rule["conditions"] = json.loads(rawrule["conditions"])
|
||||
rule["actions"] = json.loads(rawrule["actions"])
|
||||
ruleslist.append(rule)
|
||||
|
||||
# We're going to be mutating this a lot, so do a deep copy
|
||||
rules = list(list_with_base_rules(ruleslist))
|
||||
|
||||
for i, rule in enumerate(rules):
|
||||
rule_id = rule['rule_id']
|
||||
if rule_id in enabled_map:
|
||||
if rule.get('enabled', True) != bool(enabled_map[rule_id]):
|
||||
# Rules are cached across users.
|
||||
rule = dict(rule)
|
||||
rule['enabled'] = bool(enabled_map[rule_id])
|
||||
rules[i] = rule
|
||||
|
||||
return rules
|
||||
|
||||
|
||||
def format_push_rules_for_user(user, ruleslist):
|
||||
|
|
|
@ -186,7 +186,7 @@ class Mailer(object):
|
|||
|
||||
multipart_msg = MIMEMultipart('alternative')
|
||||
multipart_msg['Subject'] = "[%s] %s" % (self.app_name, summary_text)
|
||||
multipart_msg['From'] = self.hs.config.email_notif_from
|
||||
multipart_msg['From'] = from_string
|
||||
multipart_msg['To'] = email_address
|
||||
multipart_msg['Date'] = email.utils.formatdate()
|
||||
multipart_msg['Message-ID'] = email.utils.make_msgid()
|
||||
|
|
|
@ -131,15 +131,10 @@ class SlavedEventStore(BaseSlavedStore):
|
|||
_get_events_from_cache = DataStore._get_events_from_cache.__func__
|
||||
|
||||
_invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__
|
||||
_parse_events_txn = DataStore._parse_events_txn.__func__
|
||||
_get_events_txn = DataStore._get_events_txn.__func__
|
||||
_get_event_txn = DataStore._get_event_txn.__func__
|
||||
_enqueue_events = DataStore._enqueue_events.__func__
|
||||
_do_fetch = DataStore._do_fetch.__func__
|
||||
_fetch_events_txn = DataStore._fetch_events_txn.__func__
|
||||
_fetch_event_rows = DataStore._fetch_event_rows.__func__
|
||||
_get_event_from_row = DataStore._get_event_from_row.__func__
|
||||
_get_event_from_row_txn = DataStore._get_event_from_row_txn.__func__
|
||||
_get_rooms_for_user_where_membership_is_txn = (
|
||||
DataStore._get_rooms_for_user_where_membership_is_txn.__func__
|
||||
)
|
||||
|
|
|
@ -17,7 +17,7 @@ from twisted.internet import defer
|
|||
from .appservice import (
|
||||
ApplicationServiceStore, ApplicationServiceTransactionStore
|
||||
)
|
||||
from ._base import Cache, LoggingTransaction
|
||||
from ._base import LoggingTransaction
|
||||
from .directory import DirectoryStore
|
||||
from .events import EventsStore
|
||||
from .presence import PresenceStore, UserPresenceState
|
||||
|
@ -45,6 +45,7 @@ from .search import SearchStore
|
|||
from .tags import TagsStore
|
||||
from .account_data import AccountDataStore
|
||||
from .openid import OpenIdStore
|
||||
from .client_ips import ClientIpStore
|
||||
|
||||
from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator
|
||||
|
||||
|
@ -58,12 +59,6 @@ import logging
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
|
||||
# times give more inserts into the database even for readonly API hits
|
||||
# 120 seconds == 2 minutes
|
||||
LAST_SEEN_GRANULARITY = 120 * 1000
|
||||
|
||||
|
||||
class DataStore(RoomMemberStore, RoomStore,
|
||||
RegistrationStore, StreamStore, ProfileStore,
|
||||
PresenceStore, TransactionStore,
|
||||
|
@ -84,6 +79,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
AccountDataStore,
|
||||
EventPushActionsStore,
|
||||
OpenIdStore,
|
||||
ClientIpStore,
|
||||
):
|
||||
|
||||
def __init__(self, db_conn, hs):
|
||||
|
@ -91,11 +87,6 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
self._clock = hs.get_clock()
|
||||
self.database_engine = hs.database_engine
|
||||
|
||||
self.client_ip_last_seen = Cache(
|
||||
name="client_ip_last_seen",
|
||||
keylen=4,
|
||||
)
|
||||
|
||||
self._stream_id_gen = StreamIdGenerator(
|
||||
db_conn, "events", "stream_ordering",
|
||||
extra_tables=[("local_invites", "stream_id")]
|
||||
|
@ -216,39 +207,6 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
|
||||
return [UserPresenceState(**row) for row in rows]
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def insert_client_ip(self, user, access_token, ip, user_agent):
|
||||
now = int(self._clock.time_msec())
|
||||
key = (user.to_string(), access_token, ip)
|
||||
|
||||
try:
|
||||
last_seen = self.client_ip_last_seen.get(key)
|
||||
except KeyError:
|
||||
last_seen = None
|
||||
|
||||
# Rate-limited inserts
|
||||
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
|
||||
defer.returnValue(None)
|
||||
|
||||
self.client_ip_last_seen.prefill(key, now)
|
||||
|
||||
# It's safe not to lock here: a) no unique constraint,
|
||||
# b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely
|
||||
yield self._simple_upsert(
|
||||
"user_ips",
|
||||
keyvalues={
|
||||
"user_id": user.to_string(),
|
||||
"access_token": access_token,
|
||||
"ip": ip,
|
||||
"user_agent": user_agent,
|
||||
},
|
||||
values={
|
||||
"last_seen": now,
|
||||
},
|
||||
desc="insert_client_ip",
|
||||
lock=False,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def count_daily_users(self):
|
||||
"""
|
||||
|
|
|
@ -298,6 +298,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
|
|||
dict(txn_id=txn_id, as_id=service.id)
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_oldest_unsent_txn(self, service):
|
||||
"""Get the oldest transaction which has not been sent for this
|
||||
service.
|
||||
|
@ -308,12 +309,23 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
|
|||
A Deferred which resolves to an AppServiceTransaction or
|
||||
None.
|
||||
"""
|
||||
return self.runInteraction(
|
||||
entry = yield self.runInteraction(
|
||||
"get_oldest_unsent_appservice_txn",
|
||||
self._get_oldest_unsent_txn,
|
||||
service
|
||||
)
|
||||
|
||||
if not entry:
|
||||
defer.returnValue(None)
|
||||
|
||||
event_ids = json.loads(entry["event_ids"])
|
||||
|
||||
events = yield self._get_events(event_ids)
|
||||
|
||||
defer.returnValue(AppServiceTransaction(
|
||||
service=service, id=entry["txn_id"], events=events
|
||||
))
|
||||
|
||||
def _get_oldest_unsent_txn(self, txn, service):
|
||||
# Monotonically increasing txn ids, so just select the smallest
|
||||
# one in the txns table (we delete them when they are sent)
|
||||
|
@ -328,12 +340,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
|
|||
|
||||
entry = rows[0]
|
||||
|
||||
event_ids = json.loads(entry["event_ids"])
|
||||
events = self._get_events_txn(txn, event_ids)
|
||||
|
||||
return AppServiceTransaction(
|
||||
service=service, id=entry["txn_id"], events=events
|
||||
)
|
||||
return entry
|
||||
|
||||
def _get_last_txn(self, txn, service_id):
|
||||
txn.execute(
|
||||
|
|
68
synapse/storage/client_ips.py
Normal file
68
synapse/storage/client_ips.py
Normal file
|
@ -0,0 +1,68 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2016 OpenMarket Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ._base import SQLBaseStore, Cache
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
|
||||
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
|
||||
# times give more inserts into the database even for readonly API hits
|
||||
# 120 seconds == 2 minutes
|
||||
LAST_SEEN_GRANULARITY = 120 * 1000
|
||||
|
||||
|
||||
class ClientIpStore(SQLBaseStore):
|
||||
|
||||
def __init__(self, hs):
|
||||
self.client_ip_last_seen = Cache(
|
||||
name="client_ip_last_seen",
|
||||
keylen=4,
|
||||
)
|
||||
|
||||
super(ClientIpStore, self).__init__(hs)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def insert_client_ip(self, user, access_token, ip, user_agent):
|
||||
now = int(self._clock.time_msec())
|
||||
key = (user.to_string(), access_token, ip)
|
||||
|
||||
try:
|
||||
last_seen = self.client_ip_last_seen.get(key)
|
||||
except KeyError:
|
||||
last_seen = None
|
||||
|
||||
# Rate-limited inserts
|
||||
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
|
||||
defer.returnValue(None)
|
||||
|
||||
self.client_ip_last_seen.prefill(key, now)
|
||||
|
||||
# It's safe not to lock here: a) no unique constraint,
|
||||
# b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely
|
||||
yield self._simple_upsert(
|
||||
"user_ips",
|
||||
keyvalues={
|
||||
"user_id": user.to_string(),
|
||||
"access_token": access_token,
|
||||
"ip": ip,
|
||||
"user_agent": user_agent,
|
||||
},
|
||||
values={
|
||||
"last_seen": now,
|
||||
},
|
||||
desc="insert_client_ip",
|
||||
lock=False,
|
||||
)
|
|
@ -27,6 +27,9 @@ from synapse.api.constants import EventTypes
|
|||
from canonicaljson import encode_canonical_json
|
||||
from collections import deque, namedtuple
|
||||
|
||||
import synapse
|
||||
import synapse.metrics
|
||||
|
||||
|
||||
import logging
|
||||
import math
|
||||
|
@ -35,6 +38,10 @@ import ujson as json
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
metrics = synapse.metrics.get_metrics_for(__name__)
|
||||
persist_event_counter = metrics.register_counter("persisted_events")
|
||||
|
||||
|
||||
def encode_json(json_object):
|
||||
if USE_FROZEN_DICTS:
|
||||
# ujson doesn't like frozen_dicts
|
||||
|
@ -139,6 +146,9 @@ class _EventPeristenceQueue(object):
|
|||
pass
|
||||
|
||||
|
||||
_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
|
||||
|
||||
|
||||
class EventsStore(SQLBaseStore):
|
||||
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
|
||||
|
||||
|
@ -258,6 +268,7 @@ class EventsStore(SQLBaseStore):
|
|||
events_and_contexts=chunk,
|
||||
backfilled=backfilled,
|
||||
)
|
||||
persist_event_counter.inc_by(len(chunk))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
|
@ -275,6 +286,7 @@ class EventsStore(SQLBaseStore):
|
|||
current_state=current_state,
|
||||
backfilled=backfilled,
|
||||
)
|
||||
persist_event_counter.inc()
|
||||
except _RollbackButIsFineException:
|
||||
pass
|
||||
|
||||
|
@ -632,6 +644,8 @@ class EventsStore(SQLBaseStore):
|
|||
],
|
||||
)
|
||||
|
||||
self._add_to_cache(txn, events_and_contexts)
|
||||
|
||||
if backfilled:
|
||||
# Backfilled events come before the current state so we don't need
|
||||
# to update the current state table
|
||||
|
@ -673,6 +687,45 @@ class EventsStore(SQLBaseStore):
|
|||
|
||||
return
|
||||
|
||||
def _add_to_cache(self, txn, events_and_contexts):
|
||||
to_prefill = []
|
||||
|
||||
rows = []
|
||||
N = 200
|
||||
for i in range(0, len(events_and_contexts), N):
|
||||
ev_map = {
|
||||
e[0].event_id: e[0]
|
||||
for e in events_and_contexts[i:i + N]
|
||||
}
|
||||
if not ev_map:
|
||||
break
|
||||
|
||||
sql = (
|
||||
"SELECT "
|
||||
" e.event_id as event_id, "
|
||||
" r.redacts as redacts,"
|
||||
" rej.event_id as rejects "
|
||||
" FROM events as e"
|
||||
" LEFT JOIN rejections as rej USING (event_id)"
|
||||
" LEFT JOIN redactions as r ON e.event_id = r.redacts"
|
||||
" WHERE e.event_id IN (%s)"
|
||||
) % (",".join(["?"] * len(ev_map)),)
|
||||
|
||||
txn.execute(sql, ev_map.keys())
|
||||
rows = self.cursor_to_dict(txn)
|
||||
for row in rows:
|
||||
event = ev_map[row["event_id"]]
|
||||
if not row["rejects"] and not row["redacts"]:
|
||||
to_prefill.append(_EventCacheEntry(
|
||||
event=event,
|
||||
redacted_event=None,
|
||||
))
|
||||
|
||||
def prefill():
|
||||
for cache_entry in to_prefill:
|
||||
self._get_event_cache.prefill((cache_entry[0].event_id,), cache_entry)
|
||||
txn.call_after(prefill)
|
||||
|
||||
def _store_redaction(self, txn, event):
|
||||
# invalidate the cache for the redacted event
|
||||
txn.call_after(self._invalidate_get_event_cache, event.redacts)
|
||||
|
@ -738,100 +791,65 @@ class EventsStore(SQLBaseStore):
|
|||
event_id_list = event_ids
|
||||
event_ids = set(event_ids)
|
||||
|
||||
event_map = self._get_events_from_cache(
|
||||
event_entry_map = self._get_events_from_cache(
|
||||
event_ids,
|
||||
check_redacted=check_redacted,
|
||||
get_prev_content=get_prev_content,
|
||||
allow_rejected=allow_rejected,
|
||||
)
|
||||
|
||||
missing_events_ids = [e for e in event_ids if e not in event_map]
|
||||
missing_events_ids = [e for e in event_ids if e not in event_entry_map]
|
||||
|
||||
if missing_events_ids:
|
||||
missing_events = yield self._enqueue_events(
|
||||
missing_events_ids,
|
||||
check_redacted=check_redacted,
|
||||
get_prev_content=get_prev_content,
|
||||
allow_rejected=allow_rejected,
|
||||
)
|
||||
|
||||
event_map.update(missing_events)
|
||||
event_entry_map.update(missing_events)
|
||||
|
||||
defer.returnValue([
|
||||
event_map[e_id] for e_id in event_id_list
|
||||
if e_id in event_map and event_map[e_id]
|
||||
])
|
||||
events = []
|
||||
for event_id in event_id_list:
|
||||
entry = event_entry_map.get(event_id, None)
|
||||
if not entry:
|
||||
continue
|
||||
|
||||
def _get_events_txn(self, txn, event_ids, check_redacted=True,
|
||||
get_prev_content=False, allow_rejected=False):
|
||||
if not event_ids:
|
||||
return []
|
||||
if allow_rejected or not entry.event.rejected_reason:
|
||||
if check_redacted and entry.redacted_event:
|
||||
event = entry.redacted_event
|
||||
else:
|
||||
event = entry.event
|
||||
|
||||
event_map = self._get_events_from_cache(
|
||||
event_ids,
|
||||
check_redacted=check_redacted,
|
||||
get_prev_content=get_prev_content,
|
||||
allow_rejected=allow_rejected,
|
||||
events.append(event)
|
||||
|
||||
if get_prev_content:
|
||||
if "replaces_state" in event.unsigned:
|
||||
prev = yield self.get_event(
|
||||
event.unsigned["replaces_state"],
|
||||
get_prev_content=False,
|
||||
allow_none=True,
|
||||
)
|
||||
if prev:
|
||||
event.unsigned = dict(event.unsigned)
|
||||
event.unsigned["prev_content"] = prev.content
|
||||
event.unsigned["prev_sender"] = prev.sender
|
||||
|
||||
missing_events_ids = [e for e in event_ids if e not in event_map]
|
||||
|
||||
if not missing_events_ids:
|
||||
return [
|
||||
event_map[e_id] for e_id in event_ids
|
||||
if e_id in event_map and event_map[e_id]
|
||||
]
|
||||
|
||||
missing_events = self._fetch_events_txn(
|
||||
txn,
|
||||
missing_events_ids,
|
||||
check_redacted=check_redacted,
|
||||
get_prev_content=get_prev_content,
|
||||
allow_rejected=allow_rejected,
|
||||
)
|
||||
|
||||
event_map.update(missing_events)
|
||||
|
||||
return [
|
||||
event_map[e_id] for e_id in event_ids
|
||||
if e_id in event_map and event_map[e_id]
|
||||
]
|
||||
defer.returnValue(events)
|
||||
|
||||
def _invalidate_get_event_cache(self, event_id):
|
||||
for check_redacted in (False, True):
|
||||
for get_prev_content in (False, True):
|
||||
self._get_event_cache.invalidate(
|
||||
(event_id, check_redacted, get_prev_content)
|
||||
)
|
||||
self._get_event_cache.invalidate((event_id,))
|
||||
|
||||
def _get_event_txn(self, txn, event_id, check_redacted=True,
|
||||
get_prev_content=False, allow_rejected=False):
|
||||
|
||||
events = self._get_events_txn(
|
||||
txn, [event_id],
|
||||
check_redacted=check_redacted,
|
||||
get_prev_content=get_prev_content,
|
||||
allow_rejected=allow_rejected,
|
||||
)
|
||||
|
||||
return events[0] if events else None
|
||||
|
||||
def _get_events_from_cache(self, events, check_redacted, get_prev_content,
|
||||
allow_rejected):
|
||||
def _get_events_from_cache(self, events, allow_rejected):
|
||||
event_map = {}
|
||||
|
||||
for event_id in events:
|
||||
try:
|
||||
ret = self._get_event_cache.get(
|
||||
(event_id, check_redacted, get_prev_content,)
|
||||
)
|
||||
ret = self._get_event_cache.get((event_id,), None)
|
||||
if not ret:
|
||||
continue
|
||||
|
||||
if allow_rejected or not ret.rejected_reason:
|
||||
if allow_rejected or not ret.event.rejected_reason:
|
||||
event_map[event_id] = ret
|
||||
else:
|
||||
event_map[event_id] = None
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
return event_map
|
||||
|
||||
|
@ -902,8 +920,7 @@ class EventsStore(SQLBaseStore):
|
|||
reactor.callFromThread(fire, event_list)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _enqueue_events(self, events, check_redacted=True,
|
||||
get_prev_content=False, allow_rejected=False):
|
||||
def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
|
||||
"""Fetches events from the database using the _event_fetch_list. This
|
||||
allows batch and bulk fetching of events - it allows us to fetch events
|
||||
without having to create a new transaction for each request for events.
|
||||
|
@ -941,8 +958,6 @@ class EventsStore(SQLBaseStore):
|
|||
[
|
||||
preserve_fn(self._get_event_from_row)(
|
||||
row["internal_metadata"], row["json"], row["redacts"],
|
||||
check_redacted=check_redacted,
|
||||
get_prev_content=get_prev_content,
|
||||
rejected_reason=row["rejects"],
|
||||
)
|
||||
for row in rows
|
||||
|
@ -951,7 +966,7 @@ class EventsStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
defer.returnValue({
|
||||
e.event_id: e
|
||||
e.event.event_id: e
|
||||
for e in res if e
|
||||
})
|
||||
|
||||
|
@ -981,37 +996,8 @@ class EventsStore(SQLBaseStore):
|
|||
|
||||
return rows
|
||||
|
||||
def _fetch_events_txn(self, txn, events, check_redacted=True,
|
||||
get_prev_content=False, allow_rejected=False):
|
||||
if not events:
|
||||
return {}
|
||||
|
||||
rows = self._fetch_event_rows(
|
||||
txn, events,
|
||||
)
|
||||
|
||||
if not allow_rejected:
|
||||
rows[:] = [r for r in rows if not r["rejects"]]
|
||||
|
||||
res = [
|
||||
self._get_event_from_row_txn(
|
||||
txn,
|
||||
row["internal_metadata"], row["json"], row["redacts"],
|
||||
check_redacted=check_redacted,
|
||||
get_prev_content=get_prev_content,
|
||||
rejected_reason=row["rejects"],
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
|
||||
return {
|
||||
r.event_id: r
|
||||
for r in res
|
||||
}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_event_from_row(self, internal_metadata, js, redacted,
|
||||
check_redacted=True, get_prev_content=False,
|
||||
rejected_reason=None):
|
||||
d = json.loads(js)
|
||||
internal_metadata = json.loads(internal_metadata)
|
||||
|
@ -1021,26 +1007,27 @@ class EventsStore(SQLBaseStore):
|
|||
table="rejections",
|
||||
keyvalues={"event_id": rejected_reason},
|
||||
retcol="reason",
|
||||
desc="_get_event_from_row",
|
||||
desc="_get_event_from_row_rejected_reason",
|
||||
)
|
||||
|
||||
ev = FrozenEvent(
|
||||
original_ev = FrozenEvent(
|
||||
d,
|
||||
internal_metadata_dict=internal_metadata,
|
||||
rejected_reason=rejected_reason,
|
||||
)
|
||||
|
||||
if check_redacted and redacted:
|
||||
ev = prune_event(ev)
|
||||
redacted_event = None
|
||||
if redacted:
|
||||
redacted_event = prune_event(original_ev)
|
||||
|
||||
redaction_id = yield self._simple_select_one_onecol(
|
||||
table="redactions",
|
||||
keyvalues={"redacts": ev.event_id},
|
||||
keyvalues={"redacts": redacted_event.event_id},
|
||||
retcol="event_id",
|
||||
desc="_get_event_from_row",
|
||||
desc="_get_event_from_row_redactions",
|
||||
)
|
||||
|
||||
ev.unsigned["redacted_by"] = redaction_id
|
||||
redacted_event.unsigned["redacted_by"] = redaction_id
|
||||
# Get the redaction event.
|
||||
|
||||
because = yield self.get_event(
|
||||
|
@ -1052,86 +1039,16 @@ class EventsStore(SQLBaseStore):
|
|||
if because:
|
||||
# It's fine to do add the event directly, since get_pdu_json
|
||||
# will serialise this field correctly
|
||||
ev.unsigned["redacted_because"] = because
|
||||
redacted_event.unsigned["redacted_because"] = because
|
||||
|
||||
if get_prev_content and "replaces_state" in ev.unsigned:
|
||||
prev = yield self.get_event(
|
||||
ev.unsigned["replaces_state"],
|
||||
get_prev_content=False,
|
||||
allow_none=True,
|
||||
)
|
||||
if prev:
|
||||
ev.unsigned["prev_content"] = prev.content
|
||||
ev.unsigned["prev_sender"] = prev.sender
|
||||
|
||||
self._get_event_cache.prefill(
|
||||
(ev.event_id, check_redacted, get_prev_content), ev
|
||||
cache_entry = _EventCacheEntry(
|
||||
event=original_ev,
|
||||
redacted_event=redacted_event,
|
||||
)
|
||||
|
||||
defer.returnValue(ev)
|
||||
self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
|
||||
|
||||
def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
|
||||
check_redacted=True, get_prev_content=False,
|
||||
rejected_reason=None):
|
||||
d = json.loads(js)
|
||||
internal_metadata = json.loads(internal_metadata)
|
||||
|
||||
if rejected_reason:
|
||||
rejected_reason = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="rejections",
|
||||
keyvalues={"event_id": rejected_reason},
|
||||
retcol="reason",
|
||||
)
|
||||
|
||||
ev = FrozenEvent(
|
||||
d,
|
||||
internal_metadata_dict=internal_metadata,
|
||||
rejected_reason=rejected_reason,
|
||||
)
|
||||
|
||||
if check_redacted and redacted:
|
||||
ev = prune_event(ev)
|
||||
|
||||
redaction_id = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="redactions",
|
||||
keyvalues={"redacts": ev.event_id},
|
||||
retcol="event_id",
|
||||
)
|
||||
|
||||
ev.unsigned["redacted_by"] = redaction_id
|
||||
# Get the redaction event.
|
||||
|
||||
because = self._get_event_txn(
|
||||
txn,
|
||||
redaction_id,
|
||||
check_redacted=False
|
||||
)
|
||||
|
||||
if because:
|
||||
ev.unsigned["redacted_because"] = because
|
||||
|
||||
if get_prev_content and "replaces_state" in ev.unsigned:
|
||||
prev = self._get_event_txn(
|
||||
txn,
|
||||
ev.unsigned["replaces_state"],
|
||||
get_prev_content=False,
|
||||
)
|
||||
if prev:
|
||||
ev.unsigned["prev_content"] = prev.content
|
||||
ev.unsigned["prev_sender"] = prev.sender
|
||||
|
||||
self._get_event_cache.prefill(
|
||||
(ev.event_id, check_redacted, get_prev_content), ev
|
||||
)
|
||||
|
||||
return ev
|
||||
|
||||
def _parse_events_txn(self, txn, rows):
|
||||
event_ids = [r["event_id"] for r in rows]
|
||||
|
||||
return self._get_events_txn(txn, event_ids)
|
||||
defer.returnValue(cache_entry)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def count_daily_messages(self):
|
||||
|
|
|
@ -194,32 +194,44 @@ class RoomStore(SQLBaseStore):
|
|||
|
||||
@cachedInlineCallbacks()
|
||||
def get_room_name_and_aliases(self, room_id):
|
||||
def f(txn):
|
||||
def get_room_name(txn):
|
||||
sql = (
|
||||
"SELECT event_id FROM current_state_events "
|
||||
"SELECT name FROM room_names"
|
||||
" INNER JOIN current_state_events USING (room_id, event_id)"
|
||||
" WHERE room_id = ?"
|
||||
" LIMIT 1"
|
||||
)
|
||||
|
||||
sql += " AND ((type = 'm.room.name' AND state_key = '')"
|
||||
sql += " OR type = 'm.room.aliases')"
|
||||
|
||||
txn.execute(sql, (room_id,))
|
||||
results = self.cursor_to_dict(txn)
|
||||
rows = txn.fetchall()
|
||||
if rows:
|
||||
return rows[0][0]
|
||||
else:
|
||||
return None
|
||||
|
||||
return self._parse_events_txn(txn, results)
|
||||
return [row[0] for row in txn.fetchall()]
|
||||
|
||||
events = yield self.runInteraction("get_room_name_and_aliases", f)
|
||||
def get_room_aliases(txn):
|
||||
sql = (
|
||||
"SELECT content FROM current_state_events"
|
||||
" INNER JOIN events USING (room_id, event_id)"
|
||||
" WHERE room_id = ?"
|
||||
)
|
||||
txn.execute(sql, (room_id,))
|
||||
return [row[0] for row in txn.fetchall()]
|
||||
|
||||
name = yield self.runInteraction("get_room_name", get_room_name)
|
||||
alias_contents = yield self.runInteraction("get_room_aliases", get_room_aliases)
|
||||
|
||||
name = None
|
||||
aliases = []
|
||||
|
||||
for e in events:
|
||||
if e.type == 'm.room.name':
|
||||
if 'name' in e.content:
|
||||
name = e.content['name']
|
||||
elif e.type == 'm.room.aliases':
|
||||
if 'aliases' in e.content:
|
||||
aliases.extend(e.content['aliases'])
|
||||
for c in alias_contents:
|
||||
try:
|
||||
content = json.loads(c)
|
||||
except:
|
||||
continue
|
||||
|
||||
aliases.extend(content.get('aliases', []))
|
||||
|
||||
defer.returnValue((name, aliases))
|
||||
|
||||
|
|
|
@ -243,13 +243,6 @@ class RoomMemberStore(SQLBaseStore):
|
|||
user_ids = yield self.get_users_in_room(room_id)
|
||||
defer.returnValue(set(get_domain_from_id(uid) for uid in user_ids))
|
||||
|
||||
def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None):
|
||||
rows = self._get_members_rows_txn(
|
||||
txn,
|
||||
room_id, membership, user_id,
|
||||
)
|
||||
return [r["event_id"] for r in rows]
|
||||
|
||||
def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None):
|
||||
where_clause = "c.room_id = ?"
|
||||
where_values = [room_id]
|
||||
|
|
|
@ -21,6 +21,7 @@ from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
|||
|
||||
import logging
|
||||
import re
|
||||
import ujson as json
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -52,7 +53,7 @@ class SearchStore(BackgroundUpdateStore):
|
|||
|
||||
def reindex_search_txn(txn):
|
||||
sql = (
|
||||
"SELECT stream_ordering, event_id FROM events"
|
||||
"SELECT stream_ordering, event_id, room_id, type, content FROM events"
|
||||
" WHERE ? <= stream_ordering AND stream_ordering < ?"
|
||||
" AND (%s)"
|
||||
" ORDER BY stream_ordering DESC"
|
||||
|
@ -61,28 +62,30 @@ class SearchStore(BackgroundUpdateStore):
|
|||
|
||||
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
|
||||
|
||||
rows = txn.fetchall()
|
||||
rows = self.cursor_to_dict(txn)
|
||||
if not rows:
|
||||
return 0
|
||||
|
||||
min_stream_id = rows[-1][0]
|
||||
event_ids = [row[1] for row in rows]
|
||||
|
||||
events = self._get_events_txn(txn, event_ids)
|
||||
min_stream_id = rows[-1]["stream_ordering"]
|
||||
|
||||
event_search_rows = []
|
||||
for event in events:
|
||||
for row in rows:
|
||||
try:
|
||||
event_id = event.event_id
|
||||
room_id = event.room_id
|
||||
content = event.content
|
||||
if event.type == "m.room.message":
|
||||
event_id = row["event_id"]
|
||||
room_id = row["room_id"]
|
||||
etype = row["type"]
|
||||
try:
|
||||
content = json.loads(row["content"])
|
||||
except:
|
||||
continue
|
||||
|
||||
if etype == "m.room.message":
|
||||
key = "content.body"
|
||||
value = content["body"]
|
||||
elif event.type == "m.room.topic":
|
||||
elif etype == "m.room.topic":
|
||||
key = "content.topic"
|
||||
value = content["topic"]
|
||||
elif event.type == "m.room.name":
|
||||
elif etype == "m.room.name":
|
||||
key = "content.name"
|
||||
value = content["name"]
|
||||
except (KeyError, AttributeError):
|
||||
|
|
|
@ -132,17 +132,16 @@ class StreamStore(SQLBaseStore):
|
|||
return True
|
||||
return False
|
||||
|
||||
ret = self._get_events_txn(
|
||||
txn,
|
||||
# apply the filter on the room id list
|
||||
[
|
||||
r["event_id"] for r in rows
|
||||
if app_service_interested(r)
|
||||
],
|
||||
return [r for r in rows if app_service_interested(r)]
|
||||
|
||||
rows = yield self.runInteraction("get_appservice_room_stream", f)
|
||||
|
||||
ret = yield self._get_events(
|
||||
[r["event_id"] for r in rows],
|
||||
get_prev_content=True
|
||||
)
|
||||
|
||||
self._set_before_and_after(ret, rows)
|
||||
self._set_before_and_after(ret, rows, topo_order=from_id is None)
|
||||
|
||||
if rows:
|
||||
key = "s%d" % max(r["stream_ordering"] for r in rows)
|
||||
|
@ -151,10 +150,7 @@ class StreamStore(SQLBaseStore):
|
|||
# get.
|
||||
key = to_key
|
||||
|
||||
return ret, key
|
||||
|
||||
results = yield self.runInteraction("get_appservice_room_stream", f)
|
||||
defer.returnValue(results)
|
||||
defer.returnValue((ret, key))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,
|
||||
|
|
|
@ -357,7 +357,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
|
|||
other_events = [Mock(event_id="e5"), Mock(event_id="e6")]
|
||||
|
||||
# we aren't testing store._base stuff here, so mock this out
|
||||
self.store._get_events_txn = Mock(return_value=events)
|
||||
self.store._get_events = Mock(return_value=events)
|
||||
|
||||
yield self._insert_txn(self.as_list[1]["id"], 9, other_events)
|
||||
yield self._insert_txn(service.id, 10, events)
|
||||
|
|
Loading…
Reference in a new issue