Compare commits
25 commits
develop
...
azren/comp
Author | SHA1 | Date | |
---|---|---|---|
a037c2ed43 | |||
fd1d3e1fb3 | |||
1a50b18994 | |||
584c670802 | |||
61c5650058 | |||
db6cc8f35b | |||
d6b511e669 | |||
596e13ce74 | |||
efbc338043 | |||
71aace8a0d | |||
a5819f7da9 | |||
7d49d86b60 | |||
f1c149cb18 | |||
3e5dda1a47 | |||
8c0fe97edf | |||
da1f804aa0 | |||
ffb96458d3 | |||
2e3d7f5e15 | |||
ede5974f3d | |||
b88026654f | |||
f84cb2c79d | |||
5e32e2b12a | |||
1b76638c2a | |||
f122710716 | |||
c0915ee998 |
|
@ -47,6 +47,7 @@
|
|||
- [Workers](workers.md)
|
||||
- [Using `synctl` with Workers](synctl_workers.md)
|
||||
- [Systemd](systemd-with-workers/README.md)
|
||||
- [State Compressor](state_compressor.md)
|
||||
- [Administration](usage/administration/README.md)
|
||||
- [Admin API](usage/administration/admin_api/README.md)
|
||||
- [Account Validity](admin_api/account_validity.md)
|
||||
|
|
|
@ -2648,3 +2648,38 @@ redis:
|
|||
# Optional password if configured on the Redis instance
|
||||
#
|
||||
#password: <secret_password>
|
||||
|
||||
|
||||
## State compressor ##
|
||||
|
||||
# The state compressor is an experimental tool which attempts to
|
||||
# reduce the number of rows in the state_groups_state table
|
||||
# of postgres databases.
|
||||
#
|
||||
# For more information please see
|
||||
# https://matrix-org.github.io/synapse/latest/state_compressor.html
|
||||
#
|
||||
state_compressor:
|
||||
# Whether the state compressor should run (defaults to false)
|
||||
# Uncomment to enable it - Note, this requires the 'auto-compressor'
|
||||
# library to be installed
|
||||
#
|
||||
#enabled: true
|
||||
|
||||
# The (rough) number of state groups to load at one time. Defaults
|
||||
# to 500.
|
||||
#
|
||||
#chunk_size: 1000
|
||||
|
||||
# The number of chunks to compress on each run. Defaults to 100.
|
||||
#
|
||||
#number_of_chunks: 1
|
||||
|
||||
# The default level sizes for the compressor to use. Defaults to
|
||||
# 100,50,25.
|
||||
#
|
||||
#default_levels: 128,64,32.
|
||||
|
||||
# How frequently to run the state compressor. Defaults to 1d
|
||||
#
|
||||
#time_between_runs: 1w
|
||||
|
|
47
docs/state_compressor.md
Normal file
47
docs/state_compressor.md
Normal file
|
@ -0,0 +1,47 @@
|
|||
# State compressor
|
||||
|
||||
The state compressor is an **experimental** tool that attempts to reduce the number of rows
|
||||
in the `state_groups_state` table inside of a postgres database. Documentation on how it works
|
||||
can be found on [its github repository](https://github.com/matrix-org/rust-synapse-compress-state).
|
||||
|
||||
## Enabling the state compressor
|
||||
|
||||
The state compressor requires the python library for the `synapse_auto_compressor` tool to be
|
||||
installed. This can be done with pip or by following the instructions for this can be found in [the `python.md` file in the source
|
||||
repo](https://github.com/matrix-org/rust-synapse-compress-state/blob/main/docs/python.md).
|
||||
|
||||
The following configuration options are provided:
|
||||
|
||||
- `chunk_size`
|
||||
The number of state groups to work on at once. All of the entries from
|
||||
`state_groups_state` are requested from the database for state groups that are
|
||||
worked on. Therefore small chunk sizes may be needed on machines with low memory.
|
||||
Note: if the compressor fails to find space savings on the chunk as a whole
|
||||
(which may well happen in rooms with lots of backfill in) then the entire chunk
|
||||
is skipped. This defaults to 500
|
||||
|
||||
- `number_of_chunks`
|
||||
The compressor will stop once it has finished compressing this many chunks. Defaults to 100
|
||||
|
||||
- `default_levels`
|
||||
Sizes of each new level in the compression algorithm, as a comma separated list.
|
||||
The first entry in the list is for the lowest, most granular level, with each
|
||||
subsequent entry being for the next highest level. The number of entries in the
|
||||
list determines the number of levels that will be used. The sum of the sizes of
|
||||
the levels effect the performance of fetching the state from the database, as the
|
||||
sum of the sizes is the upper bound on number of iterations needed to fetch a
|
||||
given set of state. This defaults to "100,50,25"
|
||||
|
||||
- `time_between_runs`
|
||||
This controls how often the state compressor is run. This defaults to once every
|
||||
day.
|
||||
|
||||
An example configuration:
|
||||
```yaml
|
||||
state_compressor:
|
||||
enabled: true
|
||||
chunk_size: 500
|
||||
number_of_chunks: 50
|
||||
default_levels: 100,50,25
|
||||
time_between_runs: 1d
|
||||
```
|
4
mypy.ini
4
mypy.ini
|
@ -257,3 +257,7 @@ ignore_missing_imports = True
|
|||
|
||||
[mypy-ijson.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
|
||||
[mypy-psycopg2.*]
|
||||
ignore_missing_imports = True
|
||||
|
|
|
@ -48,6 +48,7 @@ from synapse.metrics.jemalloc import setup_jemalloc_stats
|
|||
from synapse.util.caches.lrucache import setup_expire_lru_cache_entries
|
||||
from synapse.util.daemonize import daemonize_process
|
||||
from synapse.util.rlimit import change_resource_limit
|
||||
from synapse.util.state_compressor import setup_state_compressor
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
@ -383,6 +384,9 @@ async def start(hs: "HomeServer"):
|
|||
# If we've configured an expiry time for caches, start the background job now.
|
||||
setup_expire_lru_cache_entries(hs)
|
||||
|
||||
# Schedule the state compressor to run
|
||||
setup_state_compressor(hs)
|
||||
|
||||
# It is now safe to start your Synapse.
|
||||
hs.start_listening()
|
||||
hs.get_datastore().db_pool.start_profiling()
|
||||
|
|
|
@ -32,6 +32,7 @@ from synapse.config import (
|
|||
server_notices,
|
||||
spam_checker,
|
||||
sso,
|
||||
state_compressor,
|
||||
stats,
|
||||
third_party_event_rules,
|
||||
tls,
|
||||
|
@ -91,6 +92,7 @@ class RootConfig:
|
|||
modules: modules.ModulesConfig
|
||||
caches: cache.CacheConfig
|
||||
federation: federation.FederationConfig
|
||||
statecompressor: state_compressor.StateCompressorConfig
|
||||
|
||||
config_classes: List = ...
|
||||
def __init__(self) -> None: ...
|
||||
|
|
|
@ -45,6 +45,7 @@ from .server import ServerConfig
|
|||
from .server_notices import ServerNoticesConfig
|
||||
from .spam_checker import SpamCheckerConfig
|
||||
from .sso import SSOConfig
|
||||
from .state_compressor import StateCompressorConfig
|
||||
from .stats import StatsConfig
|
||||
from .third_party_event_rules import ThirdPartyRulesConfig
|
||||
from .tls import TlsConfig
|
||||
|
@ -97,4 +98,5 @@ class HomeServerConfig(RootConfig):
|
|||
WorkerConfig,
|
||||
RedisConfig,
|
||||
ExperimentalConfig,
|
||||
StateCompressorConfig,
|
||||
]
|
||||
|
|
96
synapse/config/state_compressor.py
Normal file
96
synapse/config/state_compressor.py
Normal file
|
@ -0,0 +1,96 @@
|
|||
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.config._base import Config, ConfigError
|
||||
from synapse.config._util import validate_config
|
||||
from synapse.python_dependencies import DependencyException, check_requirements
|
||||
|
||||
|
||||
class StateCompressorConfig(Config):
|
||||
section = "statecompressor"
|
||||
|
||||
def read_config(self, config, **kwargs):
|
||||
compressor_config = config.get("state_compressor") or {}
|
||||
validate_config(
|
||||
_STATE_COMPRESSOR_SCHEMA, compressor_config, ("state_compressor",)
|
||||
)
|
||||
self.compressor_enabled = compressor_config.get("enabled") or False
|
||||
|
||||
if not self.compressor_enabled:
|
||||
return
|
||||
|
||||
try:
|
||||
check_requirements("synapse_auto_compressor")
|
||||
except DependencyException as e:
|
||||
raise ConfigError from e
|
||||
|
||||
self.compressor_chunk_size = compressor_config.get("chunk_size") or 500
|
||||
self.compressor_number_of_chunks = (
|
||||
compressor_config.get("number_of_chunks") or 100
|
||||
)
|
||||
self.compressor_default_levels = (
|
||||
compressor_config.get("default_levels") or "100,50,25"
|
||||
)
|
||||
self.time_between_compressor_runs = self.parse_duration(
|
||||
compressor_config.get("time_between_runs") or "1d"
|
||||
)
|
||||
|
||||
def generate_config_section(self, **kwargs):
|
||||
return """\
|
||||
## State compressor ##
|
||||
|
||||
# The state compressor is an experimental tool which attempts to
|
||||
# reduce the number of rows in the state_groups_state table
|
||||
# of postgres databases.
|
||||
#
|
||||
# For more information please see
|
||||
# https://matrix-org.github.io/synapse/latest/state_compressor.html
|
||||
#
|
||||
state_compressor:
|
||||
# Whether the state compressor should run (defaults to false)
|
||||
# Uncomment to enable it - Note, this requires the 'auto-compressor'
|
||||
# library to be installed
|
||||
#
|
||||
#enabled: true
|
||||
|
||||
# The (rough) number of state groups to load at one time. Defaults
|
||||
# to 500.
|
||||
#
|
||||
#chunk_size: 1000
|
||||
|
||||
# The number of chunks to compress on each run. Defaults to 100.
|
||||
#
|
||||
#number_of_chunks: 1
|
||||
|
||||
# The default level sizes for the compressor to use. Defaults to
|
||||
# 100,50,25.
|
||||
#
|
||||
#default_levels: 128,64,32.
|
||||
|
||||
# How frequently to run the state compressor. Defaults to 1d
|
||||
#
|
||||
#time_between_runs: 1w
|
||||
"""
|
||||
|
||||
|
||||
_STATE_COMPRESSOR_SCHEMA = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"enabled": {"type": "boolean"},
|
||||
"chunk_size": {"type": "number"},
|
||||
"number_of_chunks": {"type": "number"},
|
||||
"default_levels": {"type": "string"},
|
||||
"time_between_runs": {"type": "string"},
|
||||
},
|
||||
}
|
|
@ -114,6 +114,8 @@ CONDITIONAL_REQUIREMENTS = {
|
|||
"redis": ["txredisapi>=1.4.7", "hiredis"],
|
||||
# Required to use experimental `caches.track_memory_usage` config option.
|
||||
"cache_memory": ["pympler"],
|
||||
# Needs to be manually installed to use
|
||||
"synapse_auto_compressor": ["synapse_auto_compressor"],
|
||||
}
|
||||
|
||||
ALL_OPTIONAL_REQUIREMENTS: Set[str] = set()
|
||||
|
|
|
@ -395,6 +395,7 @@ class DatabasePool:
|
|||
hs,
|
||||
database_config: DatabaseConnectionConfig,
|
||||
engine: BaseDatabaseEngine,
|
||||
db_conn: LoggingDatabaseConnection,
|
||||
):
|
||||
self.hs = hs
|
||||
self._clock = hs.get_clock()
|
||||
|
@ -427,6 +428,17 @@ class DatabasePool:
|
|||
if isinstance(self.engine, Sqlite3Engine):
|
||||
self._unsafe_to_upsert_tables.add("user_directory_search")
|
||||
|
||||
# We store the connection info for later use when using postgres
|
||||
# (primarily to allow things like the state auto compressor to connect
|
||||
# to the DB).
|
||||
self.postgres_connection_info_parameters: Optional[Dict] = None
|
||||
if isinstance(self.engine, PostgresEngine):
|
||||
self.postgres_connection_info_parameters = dict(db_conn.info.dsn_parameters)
|
||||
# For some reason it doesn't always include the host, so explicitly
|
||||
# include the things we care about from the info object
|
||||
self.postgres_connection_info_parameters["host"] = db_conn.info.host
|
||||
self.postgres_connection_info_parameters["user"] = db_conn.info.user
|
||||
|
||||
if self.engine.can_native_upsert:
|
||||
# Check ASAP (and then later, every 1s) to see if we have finished
|
||||
# background updates of tables that aren't safe to update.
|
||||
|
|
|
@ -61,7 +61,7 @@ class Databases:
|
|||
databases=database_config.databases,
|
||||
)
|
||||
|
||||
database = DatabasePool(hs, database_config, engine)
|
||||
database = DatabasePool(hs, database_config, engine, db_conn)
|
||||
|
||||
if "main" in database_config.databases:
|
||||
logger.info(
|
||||
|
|
121
synapse/util/state_compressor.py
Normal file
121
synapse/util/state_compressor.py
Normal file
|
@ -0,0 +1,121 @@
|
|||
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from synapse.logging.context import defer_to_thread
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
|
||||
try:
|
||||
import synapse_auto_compressor as state_compressor
|
||||
except ImportError:
|
||||
state_compressor = None
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
|
||||
# The postgres connection options that the rust library understands. See
|
||||
# https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html#keys
|
||||
_VALID_POSTGRES_CONN_ARGS = {
|
||||
"user",
|
||||
"password",
|
||||
"dbname",
|
||||
"options",
|
||||
"application_name",
|
||||
"sslmode",
|
||||
"host",
|
||||
"port",
|
||||
"connect_timeout",
|
||||
"keepalives",
|
||||
"keepalives_idle",
|
||||
"target_session_attrs",
|
||||
"channel_binding",
|
||||
}
|
||||
|
||||
|
||||
def setup_state_compressor(hs: "HomeServer"):
|
||||
"""Schedules the state compressor to run regularly"""
|
||||
|
||||
# Return if cannot import synapse_auto_compressor
|
||||
if not state_compressor or not hs.config.worker.run_background_tasks:
|
||||
return
|
||||
|
||||
# Return if compressor isn't enabled
|
||||
compressor_config = hs.config.statecompressor
|
||||
if not compressor_config.compressor_enabled:
|
||||
return
|
||||
|
||||
# Check that the database being used is postgres
|
||||
db_config = None
|
||||
for conf in hs.config.database.databases:
|
||||
if "state" in conf.databases:
|
||||
db_config = conf.config
|
||||
break
|
||||
|
||||
# One of the databases should have the state tables in
|
||||
assert db_config is not None
|
||||
|
||||
if db_config["name"] != "psycopg2":
|
||||
return
|
||||
|
||||
password = db_config.get("args").get("password")
|
||||
|
||||
# Construct the database URL from the database config.
|
||||
#
|
||||
# This is a bit convoluted as the rust postgres library doesn't have a
|
||||
# default host/user, so we use the existing Synapse connections to look up
|
||||
# what parameters were used there. On the flip side, psycopg2 has some
|
||||
# parameters that rust doesn't understand, so we need to filter them out.
|
||||
#
|
||||
# Note: we need to connect to the *state* database.
|
||||
conn_info_params = (
|
||||
hs.get_datastores().state.db_pool.postgres_connection_info_parameters
|
||||
)
|
||||
assert conn_info_params is not None
|
||||
|
||||
effective_db_args = {}
|
||||
for key, value in conn_info_params.items():
|
||||
if key in _VALID_POSTGRES_CONN_ARGS:
|
||||
effective_db_args[key] = value
|
||||
|
||||
# We cannot extract the password from the connection info, so use the value extracted
|
||||
# from synapse's config
|
||||
if password is not None:
|
||||
effective_db_args["password"] = password
|
||||
|
||||
# psycopg2 has a handy util function from going from dictionary to a DSN
|
||||
# (postgres connection string.)
|
||||
from psycopg2.extensions import make_dsn
|
||||
|
||||
db_url = make_dsn("", **effective_db_args)
|
||||
|
||||
# The method to be called periodically
|
||||
def run_state_compressor():
|
||||
return run_as_background_process(
|
||||
desc="State Compressor",
|
||||
func=defer_to_thread,
|
||||
reactor=hs.get_reactor(),
|
||||
f=state_compressor.compress_state_events_table,
|
||||
db_url=db_url,
|
||||
chunk_size=compressor_config.compressor_chunk_size,
|
||||
default_levels=compressor_config.compressor_default_levels,
|
||||
number_of_chunks=compressor_config.compressor_number_of_chunks,
|
||||
)
|
||||
|
||||
# Call the compressor every `time_between_runs` milliseconds
|
||||
clock = hs.get_clock()
|
||||
clock.looping_call(
|
||||
run_state_compressor,
|
||||
compressor_config.time_between_compressor_runs,
|
||||
)
|
Loading…
Reference in a new issue