Merge pull request #874 from matrix-org/markjh/worker_config

Inline the synchrotron and pusher configs into the main config
This commit is contained in:
Mark Haines 2016-06-17 10:58:52 +01:00 committed by GitHub
commit f1073ad43d
6 changed files with 171 additions and 285 deletions

View file

@ -18,10 +18,8 @@ import synapse
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.config._base import ConfigError from synapse.config._base import ConfigError
from synapse.config.database import DatabaseConfig from synapse.config.logger import setup_logging
from synapse.config.logger import LoggingConfig from synapse.config.homeserver import HomeServerConfig
from synapse.config.emailconfig import EmailConfig
from synapse.config.key import KeyConfig
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.storage.roommember import RoomMemberStore from synapse.storage.roommember import RoomMemberStore
@ -43,98 +41,12 @@ from twisted.web.resource import Resource
from daemonize import Daemonize from daemonize import Daemonize
import gc
import sys import sys
import logging import logging
logger = logging.getLogger("synapse.app.pusher") logger = logging.getLogger("synapse.app.pusher")
class SlaveConfig(DatabaseConfig):
def read_config(self, config):
self.replication_url = config["replication_url"]
self.server_name = config["server_name"]
self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
"use_insecure_ssl_client_just_for_testing_do_not_use", False
)
self.user_agent_suffix = None
self.start_pushers = True
self.listeners = config["listeners"]
self.soft_file_limit = config.get("soft_file_limit")
self.daemonize = config.get("daemonize")
self.pid_file = self.abspath(config.get("pid_file"))
self.public_baseurl = config["public_baseurl"]
thresholds = config.get("gc_thresholds", None)
if thresholds is not None:
try:
assert len(thresholds) == 3
self.gc_thresholds = (
int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
)
except:
raise ConfigError(
"Value of `gc_threshold` must be a list of three integers if set"
)
else:
self.gc_thresholds = None
# some things used by the auth handler but not actually used in the
# pusher codebase
self.bcrypt_rounds = None
self.ldap_enabled = None
self.ldap_server = None
self.ldap_port = None
self.ldap_tls = None
self.ldap_search_base = None
self.ldap_search_property = None
self.ldap_email_property = None
self.ldap_full_name_property = None
# We would otherwise try to use the registration shared secret as the
# macaroon shared secret if there was no macaroon_shared_secret, but
# that means pulling in RegistrationConfig too. We don't need to be
# backwards compaitible in the pusher codebase so just make people set
# macaroon_shared_secret. We set this to None to prevent it referencing
# an undefined key.
self.registration_shared_secret = None
def default_config(self, server_name, **kwargs):
pid_file = self.abspath("pusher.pid")
return """\
# Slave configuration
# The replication listener on the synapse to talk to.
#replication_url: https://localhost:{replication_port}/_synapse/replication
server_name: "%(server_name)s"
listeners: []
# Enable a ssh manhole listener on the pusher.
# - type: manhole
# port: {manhole_port}
# bind_address: 127.0.0.1
# Enable a metric listener on the pusher.
# - type: http
# port: {metrics_port}
# bind_address: 127.0.0.1
# resources:
# - names: ["metrics"]
# compress: False
report_stats: False
daemonize: False
pid_file: %(pid_file)s
""" % locals()
class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig, KeyConfig):
pass
class PusherSlaveStore( class PusherSlaveStore(
SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore, SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore,
SlavedAccountDataStore SlavedAccountDataStore
@ -199,7 +111,7 @@ class PusherServer(HomeServer):
def remove_pusher(self, app_id, push_key, user_id): def remove_pusher(self, app_id, push_key, user_id):
http_client = self.get_simple_http_client() http_client = self.get_simple_http_client()
replication_url = self.config.replication_url replication_url = self.config.worker_replication_url
url = replication_url + "/remove_pushers" url = replication_url + "/remove_pushers"
return http_client.post_json_get_json(url, { return http_client.post_json_get_json(url, {
"remove": [{ "remove": [{
@ -232,8 +144,8 @@ class PusherServer(HomeServer):
) )
logger.info("Synapse pusher now listening on port %d", port) logger.info("Synapse pusher now listening on port %d", port)
def start_listening(self): def start_listening(self, listeners):
for listener in self.config.listeners: for listener in listeners:
if listener["type"] == "http": if listener["type"] == "http":
self._listen_http(listener) self._listen_http(listener)
elif listener["type"] == "manhole": elif listener["type"] == "manhole":
@ -253,7 +165,7 @@ class PusherServer(HomeServer):
def replicate(self): def replicate(self):
http_client = self.get_simple_http_client() http_client = self.get_simple_http_client()
store = self.get_datastore() store = self.get_datastore()
replication_url = self.config.replication_url replication_url = self.config.worker_replication_url
pusher_pool = self.get_pusherpool() pusher_pool = self.get_pusherpool()
clock = self.get_clock() clock = self.get_clock()
@ -329,19 +241,30 @@ class PusherServer(HomeServer):
yield sleep(30) yield sleep(30)
def setup(config_options): def start(config_options):
try: try:
config = PusherSlaveConfig.load_config( config = HomeServerConfig.load_config(
"Synapse pusher", config_options "Synapse pusher", config_options
) )
except ConfigError as e: except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n") sys.stderr.write("\n" + e.message + "\n")
sys.exit(1) sys.exit(1)
if not config: assert config.worker_app == "synapse.app.pusher"
sys.exit(0)
config.setup_logging() setup_logging(config.worker_log_config, config.worker_log_file)
if config.start_pushers:
sys.stderr.write(
"\nThe pushers must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker."
"\nPlease add ``start_pushers: false`` to the main config"
"\n"
)
sys.exit(1)
# Force the pushers to start since they will be disabled in the main config
config.start_pushers = True
database_engine = create_engine(config.database_config) database_engine = create_engine(config.database_config)
@ -354,11 +277,15 @@ def setup(config_options):
) )
ps.setup() ps.setup()
ps.start_listening() ps.start_listening(config.worker_listeners)
change_resource_limit(ps.config.soft_file_limit) def run():
if ps.config.gc_thresholds: with LoggingContext("run"):
gc.set_threshold(*ps.config.gc_thresholds) logger.info("Running")
change_resource_limit(config.soft_file_limit)
if config.gc_thresholds:
ps.set_threshold(config.gc_thresholds)
reactor.run()
def start(): def start():
ps.replicate() ps.replicate()
@ -367,30 +294,20 @@ def setup(config_options):
reactor.callWhenRunning(start) reactor.callWhenRunning(start)
return ps if config.worker_daemonize:
daemon = Daemonize(
app="synapse-pusher",
pid=config.worker_pid_file,
action=run,
auto_close_fds=False,
verbose=True,
logger=logger,
)
daemon.start()
else:
run()
if __name__ == '__main__': if __name__ == '__main__':
with LoggingContext("main"): with LoggingContext("main"):
ps = setup(sys.argv[1:]) ps = start(sys.argv[1:])
if ps.config.daemonize:
def run():
with LoggingContext("run"):
change_resource_limit(ps.config.soft_file_limit)
if ps.config.gc_thresholds:
gc.set_threshold(*ps.config.gc_thresholds)
reactor.run()
daemon = Daemonize(
app="synapse-pusher",
pid=ps.config.pid_file,
action=run,
auto_close_fds=False,
verbose=True,
logger=logger,
)
daemon.start()
else:
reactor.run()

View file

@ -18,9 +18,8 @@ import synapse
from synapse.api.constants import EventTypes, PresenceState from synapse.api.constants import EventTypes, PresenceState
from synapse.config._base import ConfigError from synapse.config._base import ConfigError
from synapse.config.database import DatabaseConfig from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import LoggingConfig from synapse.config.logger import setup_logging
from synapse.config.appservice import AppServiceConfig
from synapse.events import FrozenEvent from synapse.events import FrozenEvent
from synapse.handlers.presence import PresenceHandler from synapse.handlers.presence import PresenceHandler
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
@ -57,76 +56,11 @@ from daemonize import Daemonize
import sys import sys
import logging import logging
import contextlib import contextlib
import gc
import ujson as json import ujson as json
logger = logging.getLogger("synapse.app.synchrotron") logger = logging.getLogger("synapse.app.synchrotron")
class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig):
def read_config(self, config):
self.replication_url = config["replication_url"]
self.server_name = config["server_name"]
self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
"use_insecure_ssl_client_just_for_testing_do_not_use", False
)
self.user_agent_suffix = None
self.listeners = config["listeners"]
self.soft_file_limit = config.get("soft_file_limit")
self.daemonize = config.get("daemonize")
self.pid_file = self.abspath(config.get("pid_file"))
self.macaroon_secret_key = config["macaroon_secret_key"]
self.expire_access_token = config.get("expire_access_token", False)
thresholds = config.get("gc_thresholds", None)
if thresholds is not None:
try:
assert len(thresholds) == 3
self.gc_thresholds = (
int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
)
except:
raise ConfigError(
"Value of `gc_threshold` must be a list of three integers if set"
)
else:
self.gc_thresholds = None
def default_config(self, server_name, **kwargs):
pid_file = self.abspath("synchroton.pid")
return """\
# Slave configuration
# The replication listener on the synapse to talk to.
#replication_url: https://localhost:{replication_port}/_synapse/replication
server_name: "%(server_name)s"
listeners:
# Enable a /sync listener on the synchrontron
#- type: http
# port: {http_port}
# bind_address: ""
# Enable a ssh manhole listener on the synchrotron
# - type: manhole
# port: {manhole_port}
# bind_address: 127.0.0.1
# Enable a metric listener on the synchrotron
# - type: http
# port: {metrics_port}
# bind_address: 127.0.0.1
# resources:
# - names: ["metrics"]
# compress: False
report_stats: False
daemonize: False
pid_file: %(pid_file)s
""" % locals()
class SynchrotronSlavedStore( class SynchrotronSlavedStore(
SlavedPushRuleStore, SlavedPushRuleStore,
SlavedEventStore, SlavedEventStore,
@ -163,7 +97,7 @@ class SynchrotronPresence(object):
self.http_client = hs.get_simple_http_client() self.http_client = hs.get_simple_http_client()
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.user_to_num_current_syncs = {} self.user_to_num_current_syncs = {}
self.syncing_users_url = hs.config.replication_url + "/syncing_users" self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users"
self.clock = hs.get_clock() self.clock = hs.get_clock()
active_presence = self.store.take_presence_startup_info() active_presence = self.store.take_presence_startup_info()
@ -350,8 +284,8 @@ class SynchrotronServer(HomeServer):
) )
logger.info("Synapse synchrotron now listening on port %d", port) logger.info("Synapse synchrotron now listening on port %d", port)
def start_listening(self): def start_listening(self, listeners):
for listener in self.config.listeners: for listener in listeners:
if listener["type"] == "http": if listener["type"] == "http":
self._listen_http(listener) self._listen_http(listener)
elif listener["type"] == "manhole": elif listener["type"] == "manhole":
@ -371,7 +305,7 @@ class SynchrotronServer(HomeServer):
def replicate(self): def replicate(self):
http_client = self.get_simple_http_client() http_client = self.get_simple_http_client()
store = self.get_datastore() store = self.get_datastore()
replication_url = self.config.replication_url replication_url = self.config.worker_replication_url
clock = self.get_clock() clock = self.get_clock()
notifier = self.get_notifier() notifier = self.get_notifier()
presence_handler = self.get_presence_handler() presence_handler = self.get_presence_handler()
@ -470,19 +404,18 @@ class SynchrotronServer(HomeServer):
return SynchrotronTyping(self) return SynchrotronTyping(self)
def setup(config_options): def start(config_options):
try: try:
config = SynchrotronConfig.load_config( config = HomeServerConfig.load_config(
"Synapse synchrotron", config_options "Synapse synchrotron", config_options
) )
except ConfigError as e: except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n") sys.stderr.write("\n" + e.message + "\n")
sys.exit(1) sys.exit(1)
if not config: assert config.worker_app == "synapse.app.synchrotron"
sys.exit(0)
config.setup_logging() setup_logging(config.worker_log_config, config.worker_log_file)
database_engine = create_engine(config.database_config) database_engine = create_engine(config.database_config)
@ -496,11 +429,15 @@ def setup(config_options):
) )
ss.setup() ss.setup()
ss.start_listening() ss.start_listening(config.worker_listeners)
change_resource_limit(ss.config.soft_file_limit) def run():
if ss.config.gc_thresholds: with LoggingContext("run"):
ss.set_threshold(*ss.config.gc_thresholds) logger.info("Running")
change_resource_limit(config.soft_file_limit)
if config.gc_thresholds:
ss.set_threshold(config.gc_thresholds)
reactor.run()
def start(): def start():
ss.get_datastore().start_profiling() ss.get_datastore().start_profiling()
@ -508,30 +445,20 @@ def setup(config_options):
reactor.callWhenRunning(start) reactor.callWhenRunning(start)
return ss if config.worker_daemonize:
daemon = Daemonize(
app="synapse-synchrotron",
pid=config.worker_pid_file,
action=run,
auto_close_fds=False,
verbose=True,
logger=logger,
)
daemon.start()
else:
run()
if __name__ == '__main__': if __name__ == '__main__':
with LoggingContext("main"): with LoggingContext("main"):
ss = setup(sys.argv[1:]) start(sys.argv[1:])
if ss.config.daemonize:
def run():
with LoggingContext("run"):
change_resource_limit(ss.config.soft_file_limit)
if ss.config.gc_thresholds:
gc.set_threshold(*ss.config.gc_thresholds)
reactor.run()
daemon = Daemonize(
app="synapse-synchrotron",
pid=ss.config.pid_file,
action=run,
auto_close_fds=False,
verbose=True,
logger=logger,
)
daemon.start()
else:
reactor.run()

View file

@ -32,13 +32,15 @@ from .password import PasswordConfig
from .jwt import JWTConfig from .jwt import JWTConfig
from .ldap import LDAPConfig from .ldap import LDAPConfig
from .emailconfig import EmailConfig from .emailconfig import EmailConfig
from .workers import WorkerConfig
class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
RatelimitConfig, ContentRepositoryConfig, CaptchaConfig, RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig, VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig,
AppServiceConfig, KeyConfig, SAML2Config, CasConfig, AppServiceConfig, KeyConfig, SAML2Config, CasConfig,
JWTConfig, LDAPConfig, PasswordConfig, EmailConfig,): JWTConfig, LDAPConfig, PasswordConfig, EmailConfig,
WorkerConfig,):
pass pass

View file

@ -126,54 +126,58 @@ class LoggingConfig(Config):
) )
def setup_logging(self): def setup_logging(self):
log_format = ( setup_logging(self.log_config, self.log_file, self.verbosity)
"%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
" - %(message)s"
)
if self.log_config is None:
level = logging.INFO
level_for_storage = logging.INFO
if self.verbosity:
level = logging.DEBUG
if self.verbosity > 1:
level_for_storage = logging.DEBUG
# FIXME: we need a logging.WARN for a -q quiet option def setup_logging(log_config=None, log_file=None, verbosity=None):
logger = logging.getLogger('') log_format = (
logger.setLevel(level) "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
" - %(message)s"
)
if log_config is None:
logging.getLogger('synapse.storage').setLevel(level_for_storage) level = logging.INFO
level_for_storage = logging.INFO
if verbosity:
level = logging.DEBUG
if verbosity > 1:
level_for_storage = logging.DEBUG
formatter = logging.Formatter(log_format) # FIXME: we need a logging.WARN for a -q quiet option
if self.log_file: logger = logging.getLogger('')
# TODO: Customisable file size / backup count logger.setLevel(level)
handler = logging.handlers.RotatingFileHandler(
self.log_file, maxBytes=(1000 * 1000 * 100), backupCount=3
)
def sighup(signum, stack): logging.getLogger('synapse.storage').setLevel(level_for_storage)
logger.info("Closing log file due to SIGHUP")
handler.doRollover()
logger.info("Opened new log file due to SIGHUP")
# TODO(paul): obviously this is a terrible mechanism for formatter = logging.Formatter(log_format)
# stealing SIGHUP, because it means no other part of synapse if log_file:
# can use it instead. If we want to catch SIGHUP anywhere # TODO: Customisable file size / backup count
# else as well, I'd suggest we find a nicer way to broadcast handler = logging.handlers.RotatingFileHandler(
# it around. log_file, maxBytes=(1000 * 1000 * 100), backupCount=3
if getattr(signal, "SIGHUP"): )
signal.signal(signal.SIGHUP, sighup)
else:
handler = logging.StreamHandler()
handler.setFormatter(formatter)
handler.addFilter(LoggingContextFilter(request="")) def sighup(signum, stack):
logger.info("Closing log file due to SIGHUP")
handler.doRollover()
logger.info("Opened new log file due to SIGHUP")
logger.addHandler(handler) # TODO(paul): obviously this is a terrible mechanism for
# stealing SIGHUP, because it means no other part of synapse
# can use it instead. If we want to catch SIGHUP anywhere
# else as well, I'd suggest we find a nicer way to broadcast
# it around.
if getattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, sighup)
else: else:
with open(self.log_config, 'r') as f: handler = logging.StreamHandler()
logging.config.dictConfig(yaml.load(f)) handler.setFormatter(formatter)
observer = PythonLoggingObserver() handler.addFilter(LoggingContextFilter(request=""))
observer.start()
logger.addHandler(handler)
else:
with open(log_config, 'r') as f:
logging.config.dictConfig(yaml.load(f))
observer = PythonLoggingObserver()
observer.start()

View file

@ -38,19 +38,7 @@ class ServerConfig(Config):
self.listeners = config.get("listeners", []) self.listeners = config.get("listeners", [])
thresholds = config.get("gc_thresholds", None) self.gc_thresholds = read_gc_thresholds(config.get("gc_thresholds", None))
if thresholds is not None:
try:
assert len(thresholds) == 3
self.gc_thresholds = (
int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
)
except:
raise ConfigError(
"Value of `gc_threshold` must be a list of three integers if set"
)
else:
self.gc_thresholds = None
bind_port = config.get("bind_port") bind_port = config.get("bind_port")
if bind_port: if bind_port:
@ -264,3 +252,20 @@ class ServerConfig(Config):
type=int, type=int,
help="Turn on the twisted telnet manhole" help="Turn on the twisted telnet manhole"
" service on the given port.") " service on the given port.")
def read_gc_thresholds(thresholds):
"""Reads the three integer thresholds for garbage collection. Ensures that
the thresholds are integers if thresholds are supplied.
"""
if thresholds is None:
return None
try:
assert len(thresholds) == 3
return (
int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
)
except:
raise ConfigError(
"Value of `gc_threshold` must be a list of three integers if set"
)

31
synapse/config/workers.py Normal file
View file

@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
# Copyright 2016 matrix.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import Config
class WorkerConfig(Config):
"""The workers are processes run separately to the main synapse process.
They have their own pid_file and listener configuration. They use the
replication_url to talk to the main synapse process."""
def read_config(self, config):
self.worker_app = config.get("worker_app")
self.worker_listeners = config.get("worker_listeners")
self.worker_daemonize = config.get("worker_daemonize")
self.worker_pid_file = config.get("worker_pid_file")
self.worker_log_file = config.get("worker_log_file")
self.worker_log_config = config.get("worker_log_config")
self.worker_replication_url = config.get("worker_replication_url")