Compare commits

...

25 commits

Author SHA1 Message Date
David Robertson a037c2ed43
Run generate-sample-config
Just trying to ensure we don't forget about this PR
2021-10-14 17:40:04 +01:00
Azrenbeth fd1d3e1fb3 Run linters 2021-09-28 17:40:15 +01:00
Azrenbeth 1a50b18994 Update name to 'synapse_auto_compressor' 2021-09-28 17:16:23 +01:00
Erik Johnston 584c670802 Make the looping call wait until the previous run has finished 2021-09-28 17:10:24 +01:00
Erik Johnston 61c5650058 Fix connectng to postgres when config has no host 2021-09-28 17:01:01 +01:00
Azrenbeth db6cc8f35b Merge remote-tracking branch 'origin/develop' into azren/compressor_integration 2021-09-28 16:15:58 +01:00
Azrenbeth d6b511e669 Tidy up documentation a bit 2021-09-28 13:50:57 +01:00
Azrenbeth 596e13ce74 Better search for state database 2021-09-27 16:35:13 +01:00
Azrenbeth efbc338043 Extract password from db_args 2021-09-27 16:20:06 +01:00
Azrenbeth 71aace8a0d Move from compressing largest rooms to compressing number of chunks 2021-09-27 15:14:36 +01:00
Azrenbeth a5819f7da9 Extract dsn parameters earlier 2021-09-27 15:09:09 +01:00
Azrenbeth 7d49d86b60 Remove accidental s at end of hs.config.worker 2021-09-27 13:05:29 +01:00
Erik Johnston f1c149cb18 Use the effective connection params when connecting to postgres 2021-09-24 14:48:35 +01:00
Erik Johnston 3e5dda1a47 Add a DatabasePoolpostgres_connection_info 2021-09-24 14:48:15 +01:00
Azrenbeth 8c0fe97edf Only run compressor if run_background_tasks is true 2021-09-21 13:42:21 +01:00
Azrenbeth da1f804aa0 Run linters 2021-09-20 16:50:17 +01:00
Azrenbeth ffb96458d3 Add TODO in state compressor docs for when auto_compressor docs merged 2021-09-20 16:40:22 +01:00
Azrenbeth 2e3d7f5e15 Sample config follows code style, and config is validated 2021-09-20 16:38:35 +01:00
Azrenbeth ede5974f3d No complaints if compressor config is empty 2021-09-20 16:38:34 +01:00
Azrenbeth b88026654f Added docs for state_compressor 2021-09-20 16:38:34 +01:00
Azrenbeth f84cb2c79d Moved state_compressor setup to util/state_compressor.py 2021-09-20 16:38:34 +01:00
Azrenbeth 5e32e2b12a Added handling in config for when compressor not installed 2021-09-20 16:38:34 +01:00
Azrenbeth 1b76638c2a Added config section for state compressor 2021-09-20 16:38:34 +01:00
Azrenbeth f122710716 run in background 2021-09-20 16:38:34 +01:00
Azrenbeth c0915ee998 call compress_largest_rooms every 1 minute 2021-09-20 16:38:34 +01:00
12 changed files with 327 additions and 1 deletions

View file

@ -47,6 +47,7 @@
- [Workers](workers.md)
- [Using `synctl` with Workers](synctl_workers.md)
- [Systemd](systemd-with-workers/README.md)
- [State Compressor](state_compressor.md)
- [Administration](usage/administration/README.md)
- [Admin API](usage/administration/admin_api/README.md)
- [Account Validity](admin_api/account_validity.md)

View file

@ -2648,3 +2648,38 @@ redis:
# Optional password if configured on the Redis instance
#
#password: <secret_password>
## State compressor ##
# The state compressor is an experimental tool which attempts to
# reduce the number of rows in the state_groups_state table
# of postgres databases.
#
# For more information please see
# https://matrix-org.github.io/synapse/latest/state_compressor.html
#
state_compressor:
# Whether the state compressor should run (defaults to false)
# Uncomment to enable it - Note, this requires the 'auto-compressor'
# library to be installed
#
#enabled: true
# The (rough) number of state groups to load at one time. Defaults
# to 500.
#
#chunk_size: 1000
# The number of chunks to compress on each run. Defaults to 100.
#
#number_of_chunks: 1
# The default level sizes for the compressor to use. Defaults to
# 100,50,25.
#
#default_levels: 128,64,32.
# How frequently to run the state compressor. Defaults to 1d
#
#time_between_runs: 1w

47
docs/state_compressor.md Normal file
View file

@ -0,0 +1,47 @@
# State compressor
The state compressor is an **experimental** tool that attempts to reduce the number of rows
in the `state_groups_state` table inside of a postgres database. Documentation on how it works
can be found on [its github repository](https://github.com/matrix-org/rust-synapse-compress-state).
## Enabling the state compressor
The state compressor requires the python library for the `synapse_auto_compressor` tool to be
installed. This can be done with pip or by following the instructions for this can be found in [the `python.md` file in the source
repo](https://github.com/matrix-org/rust-synapse-compress-state/blob/main/docs/python.md).
The following configuration options are provided:
- `chunk_size`
The number of state groups to work on at once. All of the entries from
`state_groups_state` are requested from the database for state groups that are
worked on. Therefore small chunk sizes may be needed on machines with low memory.
Note: if the compressor fails to find space savings on the chunk as a whole
(which may well happen in rooms with lots of backfill in) then the entire chunk
is skipped. This defaults to 500
- `number_of_chunks`
The compressor will stop once it has finished compressing this many chunks. Defaults to 100
- `default_levels`
Sizes of each new level in the compression algorithm, as a comma separated list.
The first entry in the list is for the lowest, most granular level, with each
subsequent entry being for the next highest level. The number of entries in the
list determines the number of levels that will be used. The sum of the sizes of
the levels effect the performance of fetching the state from the database, as the
sum of the sizes is the upper bound on number of iterations needed to fetch a
given set of state. This defaults to "100,50,25"
- `time_between_runs`
This controls how often the state compressor is run. This defaults to once every
day.
An example configuration:
```yaml
state_compressor:
enabled: true
chunk_size: 500
number_of_chunks: 50
default_levels: 100,50,25
time_between_runs: 1d
```

View file

@ -257,3 +257,7 @@ ignore_missing_imports = True
[mypy-ijson.*]
ignore_missing_imports = True
[mypy-psycopg2.*]
ignore_missing_imports = True

View file

@ -48,6 +48,7 @@ from synapse.metrics.jemalloc import setup_jemalloc_stats
from synapse.util.caches.lrucache import setup_expire_lru_cache_entries
from synapse.util.daemonize import daemonize_process
from synapse.util.rlimit import change_resource_limit
from synapse.util.state_compressor import setup_state_compressor
from synapse.util.versionstring import get_version_string
if TYPE_CHECKING:
@ -383,6 +384,9 @@ async def start(hs: "HomeServer"):
# If we've configured an expiry time for caches, start the background job now.
setup_expire_lru_cache_entries(hs)
# Schedule the state compressor to run
setup_state_compressor(hs)
# It is now safe to start your Synapse.
hs.start_listening()
hs.get_datastore().db_pool.start_profiling()

View file

@ -32,6 +32,7 @@ from synapse.config import (
server_notices,
spam_checker,
sso,
state_compressor,
stats,
third_party_event_rules,
tls,
@ -91,6 +92,7 @@ class RootConfig:
modules: modules.ModulesConfig
caches: cache.CacheConfig
federation: federation.FederationConfig
statecompressor: state_compressor.StateCompressorConfig
config_classes: List = ...
def __init__(self) -> None: ...

View file

@ -45,6 +45,7 @@ from .server import ServerConfig
from .server_notices import ServerNoticesConfig
from .spam_checker import SpamCheckerConfig
from .sso import SSOConfig
from .state_compressor import StateCompressorConfig
from .stats import StatsConfig
from .third_party_event_rules import ThirdPartyRulesConfig
from .tls import TlsConfig
@ -97,4 +98,5 @@ class HomeServerConfig(RootConfig):
WorkerConfig,
RedisConfig,
ExperimentalConfig,
StateCompressorConfig,
]

View file

@ -0,0 +1,96 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.config._base import Config, ConfigError
from synapse.config._util import validate_config
from synapse.python_dependencies import DependencyException, check_requirements
class StateCompressorConfig(Config):
section = "statecompressor"
def read_config(self, config, **kwargs):
compressor_config = config.get("state_compressor") or {}
validate_config(
_STATE_COMPRESSOR_SCHEMA, compressor_config, ("state_compressor",)
)
self.compressor_enabled = compressor_config.get("enabled") or False
if not self.compressor_enabled:
return
try:
check_requirements("synapse_auto_compressor")
except DependencyException as e:
raise ConfigError from e
self.compressor_chunk_size = compressor_config.get("chunk_size") or 500
self.compressor_number_of_chunks = (
compressor_config.get("number_of_chunks") or 100
)
self.compressor_default_levels = (
compressor_config.get("default_levels") or "100,50,25"
)
self.time_between_compressor_runs = self.parse_duration(
compressor_config.get("time_between_runs") or "1d"
)
def generate_config_section(self, **kwargs):
return """\
## State compressor ##
# The state compressor is an experimental tool which attempts to
# reduce the number of rows in the state_groups_state table
# of postgres databases.
#
# For more information please see
# https://matrix-org.github.io/synapse/latest/state_compressor.html
#
state_compressor:
# Whether the state compressor should run (defaults to false)
# Uncomment to enable it - Note, this requires the 'auto-compressor'
# library to be installed
#
#enabled: true
# The (rough) number of state groups to load at one time. Defaults
# to 500.
#
#chunk_size: 1000
# The number of chunks to compress on each run. Defaults to 100.
#
#number_of_chunks: 1
# The default level sizes for the compressor to use. Defaults to
# 100,50,25.
#
#default_levels: 128,64,32.
# How frequently to run the state compressor. Defaults to 1d
#
#time_between_runs: 1w
"""
_STATE_COMPRESSOR_SCHEMA = {
"type": "object",
"properties": {
"enabled": {"type": "boolean"},
"chunk_size": {"type": "number"},
"number_of_chunks": {"type": "number"},
"default_levels": {"type": "string"},
"time_between_runs": {"type": "string"},
},
}

View file

@ -114,6 +114,8 @@ CONDITIONAL_REQUIREMENTS = {
"redis": ["txredisapi>=1.4.7", "hiredis"],
# Required to use experimental `caches.track_memory_usage` config option.
"cache_memory": ["pympler"],
# Needs to be manually installed to use
"synapse_auto_compressor": ["synapse_auto_compressor"],
}
ALL_OPTIONAL_REQUIREMENTS: Set[str] = set()

View file

@ -395,6 +395,7 @@ class DatabasePool:
hs,
database_config: DatabaseConnectionConfig,
engine: BaseDatabaseEngine,
db_conn: LoggingDatabaseConnection,
):
self.hs = hs
self._clock = hs.get_clock()
@ -427,6 +428,17 @@ class DatabasePool:
if isinstance(self.engine, Sqlite3Engine):
self._unsafe_to_upsert_tables.add("user_directory_search")
# We store the connection info for later use when using postgres
# (primarily to allow things like the state auto compressor to connect
# to the DB).
self.postgres_connection_info_parameters: Optional[Dict] = None
if isinstance(self.engine, PostgresEngine):
self.postgres_connection_info_parameters = dict(db_conn.info.dsn_parameters)
# For some reason it doesn't always include the host, so explicitly
# include the things we care about from the info object
self.postgres_connection_info_parameters["host"] = db_conn.info.host
self.postgres_connection_info_parameters["user"] = db_conn.info.user
if self.engine.can_native_upsert:
# Check ASAP (and then later, every 1s) to see if we have finished
# background updates of tables that aren't safe to update.

View file

@ -61,7 +61,7 @@ class Databases:
databases=database_config.databases,
)
database = DatabasePool(hs, database_config, engine)
database = DatabasePool(hs, database_config, engine, db_conn)
if "main" in database_config.databases:
logger.info(

View file

@ -0,0 +1,121 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import TYPE_CHECKING
from synapse.logging.context import defer_to_thread
from synapse.metrics.background_process_metrics import run_as_background_process
try:
import synapse_auto_compressor as state_compressor
except ImportError:
state_compressor = None
if TYPE_CHECKING:
from synapse.server import HomeServer
# The postgres connection options that the rust library understands. See
# https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html#keys
_VALID_POSTGRES_CONN_ARGS = {
"user",
"password",
"dbname",
"options",
"application_name",
"sslmode",
"host",
"port",
"connect_timeout",
"keepalives",
"keepalives_idle",
"target_session_attrs",
"channel_binding",
}
def setup_state_compressor(hs: "HomeServer"):
"""Schedules the state compressor to run regularly"""
# Return if cannot import synapse_auto_compressor
if not state_compressor or not hs.config.worker.run_background_tasks:
return
# Return if compressor isn't enabled
compressor_config = hs.config.statecompressor
if not compressor_config.compressor_enabled:
return
# Check that the database being used is postgres
db_config = None
for conf in hs.config.database.databases:
if "state" in conf.databases:
db_config = conf.config
break
# One of the databases should have the state tables in
assert db_config is not None
if db_config["name"] != "psycopg2":
return
password = db_config.get("args").get("password")
# Construct the database URL from the database config.
#
# This is a bit convoluted as the rust postgres library doesn't have a
# default host/user, so we use the existing Synapse connections to look up
# what parameters were used there. On the flip side, psycopg2 has some
# parameters that rust doesn't understand, so we need to filter them out.
#
# Note: we need to connect to the *state* database.
conn_info_params = (
hs.get_datastores().state.db_pool.postgres_connection_info_parameters
)
assert conn_info_params is not None
effective_db_args = {}
for key, value in conn_info_params.items():
if key in _VALID_POSTGRES_CONN_ARGS:
effective_db_args[key] = value
# We cannot extract the password from the connection info, so use the value extracted
# from synapse's config
if password is not None:
effective_db_args["password"] = password
# psycopg2 has a handy util function from going from dictionary to a DSN
# (postgres connection string.)
from psycopg2.extensions import make_dsn
db_url = make_dsn("", **effective_db_args)
# The method to be called periodically
def run_state_compressor():
return run_as_background_process(
desc="State Compressor",
func=defer_to_thread,
reactor=hs.get_reactor(),
f=state_compressor.compress_state_events_table,
db_url=db_url,
chunk_size=compressor_config.compressor_chunk_size,
default_levels=compressor_config.compressor_default_levels,
number_of_chunks=compressor_config.compressor_number_of_chunks,
)
# Call the compressor every `time_between_runs` milliseconds
clock = hs.get_clock()
clock.looping_call(
run_state_compressor,
compressor_config.time_between_compressor_runs,
)