diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 5d4db4f892..9ac26d52c6 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -111,7 +111,7 @@ class PusherServer(HomeServer): def remove_pusher(self, app_id, push_key, user_id): http_client = self.get_simple_http_client() - replication_url = self.worker_config.replication_url + replication_url = self.config.worker_replication_url url = replication_url + "/remove_pushers" return http_client.post_json_get_json(url, { "remove": [{ @@ -165,7 +165,7 @@ class PusherServer(HomeServer): def replicate(self): http_client = self.get_simple_http_client() store = self.get_datastore() - replication_url = self.worker_config.replication_url + replication_url = self.config.worker_replication_url pusher_pool = self.get_pusherpool() clock = self.get_clock() @@ -240,11 +240,8 @@ class PusherServer(HomeServer): logger.exception("Error replicating from %r", replication_url) yield sleep(30) - def get_event_cache_size(self): - return self.worker_config.event_cache_size - -def setup(worker_name, config_options): +def start(config_options): try: config = HomeServerConfig.load_config( "Synapse pusher", config_options @@ -253,9 +250,9 @@ def setup(worker_name, config_options): sys.stderr.write("\n" + e.message + "\n") sys.exit(1) - worker_config = config.workers[worker_name] + assert config.worker_app == "synapse.app.pusher" - setup_logging(worker_config.log_config, worker_config.log_file) + setup_logging(config.worker_log_config, config.worker_log_file) if config.start_pushers: sys.stderr.write( @@ -275,20 +272,19 @@ def setup(worker_name, config_options): config.server_name, db_config=config.database_config, config=config, - worker_config=worker_config, version_string=get_version_string("Synapse", synapse), database_engine=database_engine, ) ps.setup() - ps.start_listening(worker_config.listeners) + ps.start_listening(config.worker_listeners) def run(): with LoggingContext("run"): logger.info("Running") - change_resource_limit(worker_config.soft_file_limit) - if worker_config.gc_thresholds: - ps.set_threshold(worker_config.gc_thresholds) + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + ps.set_threshold(config.gc_thresholds) reactor.run() def start(): @@ -298,10 +294,10 @@ def setup(worker_name, config_options): reactor.callWhenRunning(start) - if worker_config.daemonize: + if config.worker_daemonize: daemon = Daemonize( app="synapse-pusher", - pid=worker_config.pid_file, + pid=config.worker_pid_file, action=run, auto_close_fds=False, verbose=True, @@ -314,5 +310,4 @@ def setup(worker_name, config_options): if __name__ == '__main__': with LoggingContext("main"): - worker_name = sys.argv[1] - ps = setup(worker_name, sys.argv[2:]) + ps = start(sys.argv[1:]) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index d10bb2b3f0..160db8637e 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -97,7 +97,7 @@ class SynchrotronPresence(object): self.http_client = hs.get_simple_http_client() self.store = hs.get_datastore() self.user_to_num_current_syncs = {} - self.syncing_users_url = hs.worker_config.replication_url + "/syncing_users" + self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users" self.clock = hs.get_clock() active_presence = self.store.take_presence_startup_info() @@ -305,7 +305,7 @@ class SynchrotronServer(HomeServer): def replicate(self): http_client = self.get_simple_http_client() store = self.get_datastore() - replication_url = self.worker_config.replication_url + replication_url = self.config.worker_replication_url clock = self.get_clock() notifier = self.get_notifier() presence_handler = self.get_presence_handler() @@ -403,11 +403,8 @@ class SynchrotronServer(HomeServer): def build_typing_handler(self): return SynchrotronTyping(self) - def get_event_cache_size(self): - return self.worker_config.event_cache_size - -def start(worker_name, config_options): +def start(config_options): try: config = HomeServerConfig.load_config( "Synapse synchrotron", config_options @@ -416,9 +413,9 @@ def start(worker_name, config_options): sys.stderr.write("\n" + e.message + "\n") sys.exit(1) - worker_config = config.workers[worker_name] + assert config.worker_app == "synapse.app.synchrotron" - setup_logging(worker_config.log_config, worker_config.log_file) + setup_logging(config.worker_log_config, config.worker_log_file) database_engine = create_engine(config.database_config) @@ -426,21 +423,20 @@ def start(worker_name, config_options): config.server_name, db_config=config.database_config, config=config, - worker_config=worker_config, version_string=get_version_string("Synapse", synapse), database_engine=database_engine, application_service_handler=SynchrotronApplicationService(), ) ss.setup() - ss.start_listening(worker_config.listeners) + ss.start_listening(config.worker_listeners) def run(): with LoggingContext("run"): logger.info("Running") - change_resource_limit(worker_config.soft_file_limit) - if worker_config.gc_thresholds: - ss.set_threshold(worker_config.gc_thresholds) + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + ss.set_threshold(config.gc_thresholds) reactor.run() def start(): @@ -449,10 +445,10 @@ def start(worker_name, config_options): reactor.callWhenRunning(start) - if worker_config.daemonize: + if config.worker_daemonize: daemon = Daemonize( app="synapse-synchrotron", - pid=worker_config.pid_file, + pid=config.worker_pid_file, action=run, auto_close_fds=False, verbose=True, @@ -465,5 +461,4 @@ def start(worker_name, config_options): if __name__ == '__main__': with LoggingContext("main"): - worker_name = sys.argv[1] - start(worker_name, sys.argv[2:]) + start(sys.argv[1:]) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 503358e03e..904789d155 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -13,52 +13,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -import collections - from ._base import Config -from .server import read_gc_thresholds - - -Worker = collections.namedtuple("Worker", [ - "app", - "listeners", - "pid_file", - "daemonize", - "log_file", - "log_config", - "event_cache_size", - "soft_file_limit", - "gc_thresholds", - "replication_url", -]) - - -def read_worker_config(config): - return Worker( - app=config["app"], - listeners=config.get("listeners", []), - pid_file=config.get("pid_file"), - daemonize=config["daemonize"], - log_file=config.get("log_file"), - log_config=config.get("log_config"), - event_cache_size=Config.parse_size(config.get("event_cache_size", "10K")), - soft_file_limit=config.get("soft_file_limit"), - gc_thresholds=read_gc_thresholds(config.get("gc_thresholds")), - replication_url=config.get("replication_url"), - ) class WorkerConfig(Config): """The workers are processes run separately to the main synapse process. - Each worker has a name that identifies it within the config file. They have their own pid_file and listener configuration. They use the - replication_url to talk to the main synapse process. They have their - own cache size tuning, gc threshold tuning and open file limits.""" + replication_url to talk to the main synapse process.""" def read_config(self, config): - workers = config.get("workers", {}) - - self.workers = { - worker_name: read_worker_config(worker_config) - for worker_name, worker_config in workers.items() - } + self.worker_app = config.get("worker_app") + self.worker_listeners = config.get("worker_listeners") + self.worker_daemonize = config.get("worker_daemonize") + self.worker_pid_file = config.get("worker_pid_file") + self.worker_log_file = config.get("worker_log_file") + self.worker_log_config = config.get("worker_log_config") + self.worker_replication_url = config.get("worker_replication_url") diff --git a/synapse/server.py b/synapse/server.py index b3c31ece73..dd4b81c658 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -236,9 +236,6 @@ class HomeServer(object): def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) - def get_event_cache_size(self): - return self.config.event_cache_size - def _make_dependency_method(depname): def _get(hs): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2932880cc5..32c6677d47 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -166,7 +166,7 @@ class SQLBaseStore(object): self._get_event_counters = PerformanceCounters() self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, - max_entries=hs.get_event_cache_size()) + max_entries=hs.config.event_cache_size) self._state_group_cache = DictionaryCache( "*stateGroupCache*", 2000 * CACHE_SIZE_FACTOR