mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-14 16:33:53 +01:00
Allow background tasks to be run on a separate worker. (#8369)
This commit is contained in:
parent
462e681c79
commit
62894673e6
19 changed files with 537 additions and 449 deletions
1
changelog.d/8369.feature
Normal file
1
changelog.d/8369.feature
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Allow running background tasks in a separate worker process.
|
|
@ -2504,6 +2504,11 @@ opentracing:
|
||||||
# events: worker1
|
# events: worker1
|
||||||
# typing: worker1
|
# typing: worker1
|
||||||
|
|
||||||
|
# The worker that is used to run background tasks (e.g. cleaning up expired
|
||||||
|
# data). If not provided this defaults to the main process.
|
||||||
|
#
|
||||||
|
#run_background_tasks_on: worker1
|
||||||
|
|
||||||
|
|
||||||
# Configuration for Redis when using workers. This *must* be enabled when
|
# Configuration for Redis when using workers. This *must* be enabled when
|
||||||
# using workers (unless using old style direct TCP configuration).
|
# using workers (unless using old style direct TCP configuration).
|
||||||
|
|
|
@ -319,6 +319,23 @@ stream_writers:
|
||||||
events: event_persister1
|
events: event_persister1
|
||||||
```
|
```
|
||||||
|
|
||||||
|
#### Background tasks
|
||||||
|
|
||||||
|
There is also *experimental* support for moving background tasks to a separate
|
||||||
|
worker. Background tasks are run periodically or started via replication. Exactly
|
||||||
|
which tasks are configured to run depends on your Synapse configuration (e.g. if
|
||||||
|
stats is enabled).
|
||||||
|
|
||||||
|
To enable this, the worker must have a `worker_name` and can be configured to run
|
||||||
|
background tasks. For example, to move background tasks to a dedicated worker,
|
||||||
|
the shared configuration would include:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
run_background_tasks_on: background_worker
|
||||||
|
```
|
||||||
|
|
||||||
|
You might also wish to investigate the `update_user_directory` and
|
||||||
|
`media_instance_running_background_jobs` settings.
|
||||||
|
|
||||||
### `synapse.app.pusher`
|
### `synapse.app.pusher`
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ from twisted.protocols.tls import TLSMemoryBIOFactory
|
||||||
|
|
||||||
import synapse
|
import synapse
|
||||||
from synapse.app import check_bind_error
|
from synapse.app import check_bind_error
|
||||||
|
from synapse.app.phone_stats_home import start_phone_stats_home
|
||||||
from synapse.config.server import ListenerConfig
|
from synapse.config.server import ListenerConfig
|
||||||
from synapse.crypto import context_factory
|
from synapse.crypto import context_factory
|
||||||
from synapse.logging.context import PreserveLoggingContext
|
from synapse.logging.context import PreserveLoggingContext
|
||||||
|
@ -274,6 +275,11 @@ def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
|
||||||
setup_sentry(hs)
|
setup_sentry(hs)
|
||||||
setup_sdnotify(hs)
|
setup_sdnotify(hs)
|
||||||
|
|
||||||
|
# If background tasks are running on the main process, start collecting the
|
||||||
|
# phone home stats.
|
||||||
|
if hs.config.run_background_tasks:
|
||||||
|
start_phone_stats_home(hs)
|
||||||
|
|
||||||
# We now freeze all allocated objects in the hopes that (almost)
|
# We now freeze all allocated objects in the hopes that (almost)
|
||||||
# everything currently allocated are things that will be used for the
|
# everything currently allocated are things that will be used for the
|
||||||
# rest of time. Doing so means less work each GC (hopefully).
|
# rest of time. Doing so means less work each GC (hopefully).
|
||||||
|
|
|
@ -208,6 +208,7 @@ def start(config_options):
|
||||||
|
|
||||||
# Explicitly disable background processes
|
# Explicitly disable background processes
|
||||||
config.update_user_directory = False
|
config.update_user_directory = False
|
||||||
|
config.run_background_tasks = False
|
||||||
config.start_pushers = False
|
config.start_pushers = False
|
||||||
config.send_federation = False
|
config.send_federation = False
|
||||||
|
|
||||||
|
|
|
@ -128,11 +128,13 @@ from synapse.rest.key.v2 import KeyApiV2Resource
|
||||||
from synapse.server import HomeServer, cache_in_self
|
from synapse.server import HomeServer, cache_in_self
|
||||||
from synapse.storage.databases.main.censor_events import CensorEventsStore
|
from synapse.storage.databases.main.censor_events import CensorEventsStore
|
||||||
from synapse.storage.databases.main.media_repository import MediaRepositoryStore
|
from synapse.storage.databases.main.media_repository import MediaRepositoryStore
|
||||||
|
from synapse.storage.databases.main.metrics import ServerMetricsStore
|
||||||
from synapse.storage.databases.main.monthly_active_users import (
|
from synapse.storage.databases.main.monthly_active_users import (
|
||||||
MonthlyActiveUsersWorkerStore,
|
MonthlyActiveUsersWorkerStore,
|
||||||
)
|
)
|
||||||
from synapse.storage.databases.main.presence import UserPresenceState
|
from synapse.storage.databases.main.presence import UserPresenceState
|
||||||
from synapse.storage.databases.main.search import SearchWorkerStore
|
from synapse.storage.databases.main.search import SearchWorkerStore
|
||||||
|
from synapse.storage.databases.main.stats import StatsStore
|
||||||
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
|
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
|
||||||
from synapse.storage.databases.main.user_directory import UserDirectoryStore
|
from synapse.storage.databases.main.user_directory import UserDirectoryStore
|
||||||
from synapse.types import ReadReceipt
|
from synapse.types import ReadReceipt
|
||||||
|
@ -454,6 +456,7 @@ class GenericWorkerSlavedStore(
|
||||||
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
|
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
|
||||||
# rather than going via the correct worker.
|
# rather than going via the correct worker.
|
||||||
UserDirectoryStore,
|
UserDirectoryStore,
|
||||||
|
StatsStore,
|
||||||
UIAuthWorkerStore,
|
UIAuthWorkerStore,
|
||||||
SlavedDeviceInboxStore,
|
SlavedDeviceInboxStore,
|
||||||
SlavedDeviceStore,
|
SlavedDeviceStore,
|
||||||
|
@ -476,6 +479,7 @@ class GenericWorkerSlavedStore(
|
||||||
SlavedFilteringStore,
|
SlavedFilteringStore,
|
||||||
MonthlyActiveUsersWorkerStore,
|
MonthlyActiveUsersWorkerStore,
|
||||||
MediaRepositoryStore,
|
MediaRepositoryStore,
|
||||||
|
ServerMetricsStore,
|
||||||
SearchWorkerStore,
|
SearchWorkerStore,
|
||||||
BaseSlavedStore,
|
BaseSlavedStore,
|
||||||
):
|
):
|
||||||
|
|
|
@ -17,14 +17,10 @@
|
||||||
|
|
||||||
import gc
|
import gc
|
||||||
import logging
|
import logging
|
||||||
import math
|
|
||||||
import os
|
import os
|
||||||
import resource
|
|
||||||
import sys
|
import sys
|
||||||
from typing import Iterable
|
from typing import Iterable
|
||||||
|
|
||||||
from prometheus_client import Gauge
|
|
||||||
|
|
||||||
from twisted.application import service
|
from twisted.application import service
|
||||||
from twisted.internet import defer, reactor
|
from twisted.internet import defer, reactor
|
||||||
from twisted.python.failure import Failure
|
from twisted.python.failure import Failure
|
||||||
|
@ -60,7 +56,6 @@ from synapse.http.server import (
|
||||||
from synapse.http.site import SynapseSite
|
from synapse.http.site import SynapseSite
|
||||||
from synapse.logging.context import LoggingContext
|
from synapse.logging.context import LoggingContext
|
||||||
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
|
||||||
from synapse.module_api import ModuleApi
|
from synapse.module_api import ModuleApi
|
||||||
from synapse.python_dependencies import check_requirements
|
from synapse.python_dependencies import check_requirements
|
||||||
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
|
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
|
||||||
|
@ -334,20 +329,6 @@ class SynapseHomeServer(HomeServer):
|
||||||
logger.warning("Unrecognized listener type: %s", listener.type)
|
logger.warning("Unrecognized listener type: %s", listener.type)
|
||||||
|
|
||||||
|
|
||||||
# Gauges to expose monthly active user control metrics
|
|
||||||
current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
|
|
||||||
current_mau_by_service_gauge = Gauge(
|
|
||||||
"synapse_admin_mau_current_mau_by_service",
|
|
||||||
"Current MAU by service",
|
|
||||||
["app_service"],
|
|
||||||
)
|
|
||||||
max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
|
|
||||||
registered_reserved_users_mau_gauge = Gauge(
|
|
||||||
"synapse_admin_mau:registered_reserved_users",
|
|
||||||
"Registered users with reserved threepids",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def setup(config_options):
|
def setup(config_options):
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
|
@ -389,8 +370,6 @@ def setup(config_options):
|
||||||
except UpgradeDatabaseException as e:
|
except UpgradeDatabaseException as e:
|
||||||
quit_with_error("Failed to upgrade database: %s" % (e,))
|
quit_with_error("Failed to upgrade database: %s" % (e,))
|
||||||
|
|
||||||
hs.setup_master()
|
|
||||||
|
|
||||||
async def do_acme() -> bool:
|
async def do_acme() -> bool:
|
||||||
"""
|
"""
|
||||||
Reprovision an ACME certificate, if it's required.
|
Reprovision an ACME certificate, if it's required.
|
||||||
|
@ -486,92 +465,6 @@ class SynapseService(service.Service):
|
||||||
return self._port.stopListening()
|
return self._port.stopListening()
|
||||||
|
|
||||||
|
|
||||||
# Contains the list of processes we will be monitoring
|
|
||||||
# currently either 0 or 1
|
|
||||||
_stats_process = []
|
|
||||||
|
|
||||||
|
|
||||||
async def phone_stats_home(hs, stats, stats_process=_stats_process):
|
|
||||||
logger.info("Gathering stats for reporting")
|
|
||||||
now = int(hs.get_clock().time())
|
|
||||||
uptime = int(now - hs.start_time)
|
|
||||||
if uptime < 0:
|
|
||||||
uptime = 0
|
|
||||||
|
|
||||||
#
|
|
||||||
# Performance statistics. Keep this early in the function to maintain reliability of `test_performance_100` test.
|
|
||||||
#
|
|
||||||
old = stats_process[0]
|
|
||||||
new = (now, resource.getrusage(resource.RUSAGE_SELF))
|
|
||||||
stats_process[0] = new
|
|
||||||
|
|
||||||
# Get RSS in bytes
|
|
||||||
stats["memory_rss"] = new[1].ru_maxrss
|
|
||||||
|
|
||||||
# Get CPU time in % of a single core, not % of all cores
|
|
||||||
used_cpu_time = (new[1].ru_utime + new[1].ru_stime) - (
|
|
||||||
old[1].ru_utime + old[1].ru_stime
|
|
||||||
)
|
|
||||||
if used_cpu_time == 0 or new[0] == old[0]:
|
|
||||||
stats["cpu_average"] = 0
|
|
||||||
else:
|
|
||||||
stats["cpu_average"] = math.floor(used_cpu_time / (new[0] - old[0]) * 100)
|
|
||||||
|
|
||||||
#
|
|
||||||
# General statistics
|
|
||||||
#
|
|
||||||
|
|
||||||
stats["homeserver"] = hs.config.server_name
|
|
||||||
stats["server_context"] = hs.config.server_context
|
|
||||||
stats["timestamp"] = now
|
|
||||||
stats["uptime_seconds"] = uptime
|
|
||||||
version = sys.version_info
|
|
||||||
stats["python_version"] = "{}.{}.{}".format(
|
|
||||||
version.major, version.minor, version.micro
|
|
||||||
)
|
|
||||||
stats["total_users"] = await hs.get_datastore().count_all_users()
|
|
||||||
|
|
||||||
total_nonbridged_users = await hs.get_datastore().count_nonbridged_users()
|
|
||||||
stats["total_nonbridged_users"] = total_nonbridged_users
|
|
||||||
|
|
||||||
daily_user_type_results = await hs.get_datastore().count_daily_user_type()
|
|
||||||
for name, count in daily_user_type_results.items():
|
|
||||||
stats["daily_user_type_" + name] = count
|
|
||||||
|
|
||||||
room_count = await hs.get_datastore().get_room_count()
|
|
||||||
stats["total_room_count"] = room_count
|
|
||||||
|
|
||||||
stats["daily_active_users"] = await hs.get_datastore().count_daily_users()
|
|
||||||
stats["monthly_active_users"] = await hs.get_datastore().count_monthly_users()
|
|
||||||
stats["daily_active_rooms"] = await hs.get_datastore().count_daily_active_rooms()
|
|
||||||
stats["daily_messages"] = await hs.get_datastore().count_daily_messages()
|
|
||||||
|
|
||||||
r30_results = await hs.get_datastore().count_r30_users()
|
|
||||||
for name, count in r30_results.items():
|
|
||||||
stats["r30_users_" + name] = count
|
|
||||||
|
|
||||||
daily_sent_messages = await hs.get_datastore().count_daily_sent_messages()
|
|
||||||
stats["daily_sent_messages"] = daily_sent_messages
|
|
||||||
stats["cache_factor"] = hs.config.caches.global_factor
|
|
||||||
stats["event_cache_size"] = hs.config.caches.event_cache_size
|
|
||||||
|
|
||||||
#
|
|
||||||
# Database version
|
|
||||||
#
|
|
||||||
|
|
||||||
# This only reports info about the *main* database.
|
|
||||||
stats["database_engine"] = hs.get_datastore().db_pool.engine.module.__name__
|
|
||||||
stats["database_server_version"] = hs.get_datastore().db_pool.engine.server_version
|
|
||||||
|
|
||||||
logger.info("Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats))
|
|
||||||
try:
|
|
||||||
await hs.get_proxied_http_client().put_json(
|
|
||||||
hs.config.report_stats_endpoint, stats
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning("Error reporting stats: %s", e)
|
|
||||||
|
|
||||||
|
|
||||||
def run(hs):
|
def run(hs):
|
||||||
PROFILE_SYNAPSE = False
|
PROFILE_SYNAPSE = False
|
||||||
if PROFILE_SYNAPSE:
|
if PROFILE_SYNAPSE:
|
||||||
|
@ -597,81 +490,6 @@ def run(hs):
|
||||||
ThreadPool._worker = profile(ThreadPool._worker)
|
ThreadPool._worker = profile(ThreadPool._worker)
|
||||||
reactor.run = profile(reactor.run)
|
reactor.run = profile(reactor.run)
|
||||||
|
|
||||||
clock = hs.get_clock()
|
|
||||||
|
|
||||||
stats = {}
|
|
||||||
|
|
||||||
def performance_stats_init():
|
|
||||||
_stats_process.clear()
|
|
||||||
_stats_process.append(
|
|
||||||
(int(hs.get_clock().time()), resource.getrusage(resource.RUSAGE_SELF))
|
|
||||||
)
|
|
||||||
|
|
||||||
def start_phone_stats_home():
|
|
||||||
return run_as_background_process(
|
|
||||||
"phone_stats_home", phone_stats_home, hs, stats
|
|
||||||
)
|
|
||||||
|
|
||||||
def generate_user_daily_visit_stats():
|
|
||||||
return run_as_background_process(
|
|
||||||
"generate_user_daily_visits", hs.get_datastore().generate_user_daily_visits
|
|
||||||
)
|
|
||||||
|
|
||||||
# Rather than update on per session basis, batch up the requests.
|
|
||||||
# If you increase the loop period, the accuracy of user_daily_visits
|
|
||||||
# table will decrease
|
|
||||||
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
|
|
||||||
|
|
||||||
# monthly active user limiting functionality
|
|
||||||
def reap_monthly_active_users():
|
|
||||||
return run_as_background_process(
|
|
||||||
"reap_monthly_active_users", hs.get_datastore().reap_monthly_active_users
|
|
||||||
)
|
|
||||||
|
|
||||||
clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
|
|
||||||
reap_monthly_active_users()
|
|
||||||
|
|
||||||
async def generate_monthly_active_users():
|
|
||||||
current_mau_count = 0
|
|
||||||
current_mau_count_by_service = {}
|
|
||||||
reserved_users = ()
|
|
||||||
store = hs.get_datastore()
|
|
||||||
if hs.config.limit_usage_by_mau or hs.config.mau_stats_only:
|
|
||||||
current_mau_count = await store.get_monthly_active_count()
|
|
||||||
current_mau_count_by_service = (
|
|
||||||
await store.get_monthly_active_count_by_service()
|
|
||||||
)
|
|
||||||
reserved_users = await store.get_registered_reserved_users()
|
|
||||||
current_mau_gauge.set(float(current_mau_count))
|
|
||||||
|
|
||||||
for app_service, count in current_mau_count_by_service.items():
|
|
||||||
current_mau_by_service_gauge.labels(app_service).set(float(count))
|
|
||||||
|
|
||||||
registered_reserved_users_mau_gauge.set(float(len(reserved_users)))
|
|
||||||
max_mau_gauge.set(float(hs.config.max_mau_value))
|
|
||||||
|
|
||||||
def start_generate_monthly_active_users():
|
|
||||||
return run_as_background_process(
|
|
||||||
"generate_monthly_active_users", generate_monthly_active_users
|
|
||||||
)
|
|
||||||
|
|
||||||
start_generate_monthly_active_users()
|
|
||||||
if hs.config.limit_usage_by_mau or hs.config.mau_stats_only:
|
|
||||||
clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
|
|
||||||
# End of monthly active user settings
|
|
||||||
|
|
||||||
if hs.config.report_stats:
|
|
||||||
logger.info("Scheduling stats reporting for 3 hour intervals")
|
|
||||||
clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)
|
|
||||||
|
|
||||||
# We need to defer this init for the cases that we daemonize
|
|
||||||
# otherwise the process ID we get is that of the non-daemon process
|
|
||||||
clock.call_later(0, performance_stats_init)
|
|
||||||
|
|
||||||
# We wait 5 minutes to send the first set of stats as the server can
|
|
||||||
# be quite busy the first few minutes
|
|
||||||
clock.call_later(5 * 60, start_phone_stats_home)
|
|
||||||
|
|
||||||
_base.start_reactor(
|
_base.start_reactor(
|
||||||
"synapse-homeserver",
|
"synapse-homeserver",
|
||||||
soft_file_limit=hs.config.soft_file_limit,
|
soft_file_limit=hs.config.soft_file_limit,
|
||||||
|
|
202
synapse/app/phone_stats_home.py
Normal file
202
synapse/app/phone_stats_home.py
Normal file
|
@ -0,0 +1,202 @@
|
||||||
|
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import math
|
||||||
|
import resource
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from prometheus_client import Gauge
|
||||||
|
|
||||||
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
|
|
||||||
|
logger = logging.getLogger("synapse.app.homeserver")
|
||||||
|
|
||||||
|
# Contains the list of processes we will be monitoring
|
||||||
|
# currently either 0 or 1
|
||||||
|
_stats_process = []
|
||||||
|
|
||||||
|
# Gauges to expose monthly active user control metrics
|
||||||
|
current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
|
||||||
|
current_mau_by_service_gauge = Gauge(
|
||||||
|
"synapse_admin_mau_current_mau_by_service",
|
||||||
|
"Current MAU by service",
|
||||||
|
["app_service"],
|
||||||
|
)
|
||||||
|
max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
|
||||||
|
registered_reserved_users_mau_gauge = Gauge(
|
||||||
|
"synapse_admin_mau:registered_reserved_users",
|
||||||
|
"Registered users with reserved threepids",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def phone_stats_home(hs, stats, stats_process=_stats_process):
|
||||||
|
logger.info("Gathering stats for reporting")
|
||||||
|
now = int(hs.get_clock().time())
|
||||||
|
uptime = int(now - hs.start_time)
|
||||||
|
if uptime < 0:
|
||||||
|
uptime = 0
|
||||||
|
|
||||||
|
#
|
||||||
|
# Performance statistics. Keep this early in the function to maintain reliability of `test_performance_100` test.
|
||||||
|
#
|
||||||
|
old = stats_process[0]
|
||||||
|
new = (now, resource.getrusage(resource.RUSAGE_SELF))
|
||||||
|
stats_process[0] = new
|
||||||
|
|
||||||
|
# Get RSS in bytes
|
||||||
|
stats["memory_rss"] = new[1].ru_maxrss
|
||||||
|
|
||||||
|
# Get CPU time in % of a single core, not % of all cores
|
||||||
|
used_cpu_time = (new[1].ru_utime + new[1].ru_stime) - (
|
||||||
|
old[1].ru_utime + old[1].ru_stime
|
||||||
|
)
|
||||||
|
if used_cpu_time == 0 or new[0] == old[0]:
|
||||||
|
stats["cpu_average"] = 0
|
||||||
|
else:
|
||||||
|
stats["cpu_average"] = math.floor(used_cpu_time / (new[0] - old[0]) * 100)
|
||||||
|
|
||||||
|
#
|
||||||
|
# General statistics
|
||||||
|
#
|
||||||
|
|
||||||
|
stats["homeserver"] = hs.config.server_name
|
||||||
|
stats["server_context"] = hs.config.server_context
|
||||||
|
stats["timestamp"] = now
|
||||||
|
stats["uptime_seconds"] = uptime
|
||||||
|
version = sys.version_info
|
||||||
|
stats["python_version"] = "{}.{}.{}".format(
|
||||||
|
version.major, version.minor, version.micro
|
||||||
|
)
|
||||||
|
stats["total_users"] = await hs.get_datastore().count_all_users()
|
||||||
|
|
||||||
|
total_nonbridged_users = await hs.get_datastore().count_nonbridged_users()
|
||||||
|
stats["total_nonbridged_users"] = total_nonbridged_users
|
||||||
|
|
||||||
|
daily_user_type_results = await hs.get_datastore().count_daily_user_type()
|
||||||
|
for name, count in daily_user_type_results.items():
|
||||||
|
stats["daily_user_type_" + name] = count
|
||||||
|
|
||||||
|
room_count = await hs.get_datastore().get_room_count()
|
||||||
|
stats["total_room_count"] = room_count
|
||||||
|
|
||||||
|
stats["daily_active_users"] = await hs.get_datastore().count_daily_users()
|
||||||
|
stats["monthly_active_users"] = await hs.get_datastore().count_monthly_users()
|
||||||
|
stats["daily_active_rooms"] = await hs.get_datastore().count_daily_active_rooms()
|
||||||
|
stats["daily_messages"] = await hs.get_datastore().count_daily_messages()
|
||||||
|
|
||||||
|
r30_results = await hs.get_datastore().count_r30_users()
|
||||||
|
for name, count in r30_results.items():
|
||||||
|
stats["r30_users_" + name] = count
|
||||||
|
|
||||||
|
daily_sent_messages = await hs.get_datastore().count_daily_sent_messages()
|
||||||
|
stats["daily_sent_messages"] = daily_sent_messages
|
||||||
|
stats["cache_factor"] = hs.config.caches.global_factor
|
||||||
|
stats["event_cache_size"] = hs.config.caches.event_cache_size
|
||||||
|
|
||||||
|
#
|
||||||
|
# Database version
|
||||||
|
#
|
||||||
|
|
||||||
|
# This only reports info about the *main* database.
|
||||||
|
stats["database_engine"] = hs.get_datastore().db_pool.engine.module.__name__
|
||||||
|
stats["database_server_version"] = hs.get_datastore().db_pool.engine.server_version
|
||||||
|
|
||||||
|
logger.info("Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats))
|
||||||
|
try:
|
||||||
|
await hs.get_proxied_http_client().put_json(
|
||||||
|
hs.config.report_stats_endpoint, stats
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Error reporting stats: %s", e)
|
||||||
|
|
||||||
|
|
||||||
|
def start_phone_stats_home(hs):
|
||||||
|
"""
|
||||||
|
Start the background tasks which report phone home stats.
|
||||||
|
"""
|
||||||
|
clock = hs.get_clock()
|
||||||
|
|
||||||
|
stats = {}
|
||||||
|
|
||||||
|
def performance_stats_init():
|
||||||
|
_stats_process.clear()
|
||||||
|
_stats_process.append(
|
||||||
|
(int(hs.get_clock().time()), resource.getrusage(resource.RUSAGE_SELF))
|
||||||
|
)
|
||||||
|
|
||||||
|
def start_phone_stats_home():
|
||||||
|
return run_as_background_process(
|
||||||
|
"phone_stats_home", phone_stats_home, hs, stats
|
||||||
|
)
|
||||||
|
|
||||||
|
def generate_user_daily_visit_stats():
|
||||||
|
return run_as_background_process(
|
||||||
|
"generate_user_daily_visits", hs.get_datastore().generate_user_daily_visits
|
||||||
|
)
|
||||||
|
|
||||||
|
# Rather than update on per session basis, batch up the requests.
|
||||||
|
# If you increase the loop period, the accuracy of user_daily_visits
|
||||||
|
# table will decrease
|
||||||
|
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
|
||||||
|
|
||||||
|
# monthly active user limiting functionality
|
||||||
|
def reap_monthly_active_users():
|
||||||
|
return run_as_background_process(
|
||||||
|
"reap_monthly_active_users", hs.get_datastore().reap_monthly_active_users
|
||||||
|
)
|
||||||
|
|
||||||
|
clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
|
||||||
|
reap_monthly_active_users()
|
||||||
|
|
||||||
|
async def generate_monthly_active_users():
|
||||||
|
current_mau_count = 0
|
||||||
|
current_mau_count_by_service = {}
|
||||||
|
reserved_users = ()
|
||||||
|
store = hs.get_datastore()
|
||||||
|
if hs.config.limit_usage_by_mau or hs.config.mau_stats_only:
|
||||||
|
current_mau_count = await store.get_monthly_active_count()
|
||||||
|
current_mau_count_by_service = (
|
||||||
|
await store.get_monthly_active_count_by_service()
|
||||||
|
)
|
||||||
|
reserved_users = await store.get_registered_reserved_users()
|
||||||
|
current_mau_gauge.set(float(current_mau_count))
|
||||||
|
|
||||||
|
for app_service, count in current_mau_count_by_service.items():
|
||||||
|
current_mau_by_service_gauge.labels(app_service).set(float(count))
|
||||||
|
|
||||||
|
registered_reserved_users_mau_gauge.set(float(len(reserved_users)))
|
||||||
|
max_mau_gauge.set(float(hs.config.max_mau_value))
|
||||||
|
|
||||||
|
def start_generate_monthly_active_users():
|
||||||
|
return run_as_background_process(
|
||||||
|
"generate_monthly_active_users", generate_monthly_active_users
|
||||||
|
)
|
||||||
|
|
||||||
|
if hs.config.limit_usage_by_mau or hs.config.mau_stats_only:
|
||||||
|
start_generate_monthly_active_users()
|
||||||
|
clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
|
||||||
|
# End of monthly active user settings
|
||||||
|
|
||||||
|
if hs.config.report_stats:
|
||||||
|
logger.info("Scheduling stats reporting for 3 hour intervals")
|
||||||
|
clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)
|
||||||
|
|
||||||
|
# We need to defer this init for the cases that we daemonize
|
||||||
|
# otherwise the process ID we get is that of the non-daemon process
|
||||||
|
clock.call_later(0, performance_stats_init)
|
||||||
|
|
||||||
|
# We wait 5 minutes to send the first set of stats as the server can
|
||||||
|
# be quite busy the first few minutes
|
||||||
|
clock.call_later(5 * 60, start_phone_stats_home)
|
|
@ -132,6 +132,19 @@ class WorkerConfig(Config):
|
||||||
|
|
||||||
self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)
|
self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)
|
||||||
|
|
||||||
|
# Whether this worker should run background tasks or not.
|
||||||
|
#
|
||||||
|
# As a note for developers, the background tasks guarded by this should
|
||||||
|
# be able to run on only a single instance (meaning that they don't
|
||||||
|
# depend on any in-memory state of a particular worker).
|
||||||
|
#
|
||||||
|
# No effort is made to ensure only a single instance of these tasks is
|
||||||
|
# running.
|
||||||
|
background_tasks_instance = config.get("run_background_tasks_on") or "master"
|
||||||
|
self.run_background_tasks = (
|
||||||
|
self.worker_name is None and background_tasks_instance == "master"
|
||||||
|
) or self.worker_name == background_tasks_instance
|
||||||
|
|
||||||
def generate_config_section(self, config_dir_path, server_name, **kwargs):
|
def generate_config_section(self, config_dir_path, server_name, **kwargs):
|
||||||
return """\
|
return """\
|
||||||
## Workers ##
|
## Workers ##
|
||||||
|
@ -167,6 +180,11 @@ class WorkerConfig(Config):
|
||||||
#stream_writers:
|
#stream_writers:
|
||||||
# events: worker1
|
# events: worker1
|
||||||
# typing: worker1
|
# typing: worker1
|
||||||
|
|
||||||
|
# The worker that is used to run background tasks (e.g. cleaning up expired
|
||||||
|
# data). If not provided this defaults to the main process.
|
||||||
|
#
|
||||||
|
#run_background_tasks_on: worker1
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def read_arguments(self, args):
|
def read_arguments(self, args):
|
||||||
|
|
|
@ -212,7 +212,7 @@ class AuthHandler(BaseHandler):
|
||||||
self._clock = self.hs.get_clock()
|
self._clock = self.hs.get_clock()
|
||||||
|
|
||||||
# Expire old UI auth sessions after a period of time.
|
# Expire old UI auth sessions after a period of time.
|
||||||
if hs.config.worker_app is None:
|
if hs.config.run_background_tasks:
|
||||||
self._clock.looping_call(
|
self._clock.looping_call(
|
||||||
run_as_background_process,
|
run_as_background_process,
|
||||||
5 * 60 * 1000,
|
5 * 60 * 1000,
|
||||||
|
|
|
@ -49,7 +49,7 @@ class StatsHandler:
|
||||||
# Guard to ensure we only process deltas one at a time
|
# Guard to ensure we only process deltas one at a time
|
||||||
self._is_processing = False
|
self._is_processing = False
|
||||||
|
|
||||||
if hs.config.stats_enabled:
|
if self.stats_enabled and hs.config.run_background_tasks:
|
||||||
self.notifier.add_replication_callback(self.notify_new_event)
|
self.notifier.add_replication_callback(self.notify_new_event)
|
||||||
|
|
||||||
# We kick this off so that we don't have to wait for a change before
|
# We kick this off so that we don't have to wait for a change before
|
||||||
|
|
|
@ -185,7 +185,10 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||||
we are listening on to provide HTTP services.
|
we are listening on to provide HTTP services.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
REQUIRED_ON_MASTER_STARTUP = ["user_directory_handler", "stats_handler"]
|
REQUIRED_ON_BACKGROUND_TASK_STARTUP = [
|
||||||
|
"auth",
|
||||||
|
"stats",
|
||||||
|
]
|
||||||
|
|
||||||
# This is overridden in derived application classes
|
# This is overridden in derived application classes
|
||||||
# (such as synapse.app.homeserver.SynapseHomeServer) and gives the class to be
|
# (such as synapse.app.homeserver.SynapseHomeServer) and gives the class to be
|
||||||
|
@ -251,14 +254,20 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||||
self.datastores = Databases(self.DATASTORE_CLASS, self)
|
self.datastores = Databases(self.DATASTORE_CLASS, self)
|
||||||
logger.info("Finished setting up.")
|
logger.info("Finished setting up.")
|
||||||
|
|
||||||
def setup_master(self) -> None:
|
# Register background tasks required by this server. This must be done
|
||||||
|
# somewhat manually due to the background tasks not being registered
|
||||||
|
# unless handlers are instantiated.
|
||||||
|
if self.config.run_background_tasks:
|
||||||
|
self.setup_background_tasks()
|
||||||
|
|
||||||
|
def setup_background_tasks(self) -> None:
|
||||||
"""
|
"""
|
||||||
Some handlers have side effects on instantiation (like registering
|
Some handlers have side effects on instantiation (like registering
|
||||||
background updates). This function causes them to be fetched, and
|
background updates). This function causes them to be fetched, and
|
||||||
therefore instantiated, to run those side effects.
|
therefore instantiated, to run those side effects.
|
||||||
"""
|
"""
|
||||||
for i in self.REQUIRED_ON_MASTER_STARTUP:
|
for i in self.REQUIRED_ON_BACKGROUND_TASK_STARTUP:
|
||||||
getattr(self, "get_" + i)()
|
getattr(self, "get_" + i + "_handler")()
|
||||||
|
|
||||||
def get_reactor(self) -> twisted.internet.base.ReactorBase:
|
def get_reactor(self) -> twisted.internet.base.ReactorBase:
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -15,9 +15,7 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import calendar
|
|
||||||
import logging
|
import logging
|
||||||
import time
|
|
||||||
from typing import Any, Dict, List, Optional, Tuple
|
from typing import Any, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
from synapse.api.constants import PresenceState
|
from synapse.api.constants import PresenceState
|
||||||
|
@ -268,9 +266,6 @@ class DataStore(
|
||||||
self._stream_order_on_start = self.get_room_max_stream_ordering()
|
self._stream_order_on_start = self.get_room_max_stream_ordering()
|
||||||
self._min_stream_order_on_start = self.get_room_min_stream_ordering()
|
self._min_stream_order_on_start = self.get_room_min_stream_ordering()
|
||||||
|
|
||||||
# Used in _generate_user_daily_visits to keep track of progress
|
|
||||||
self._last_user_visit_update = self._get_start_of_day()
|
|
||||||
|
|
||||||
def get_device_stream_token(self) -> int:
|
def get_device_stream_token(self) -> int:
|
||||||
return self._device_list_id_gen.get_current_token()
|
return self._device_list_id_gen.get_current_token()
|
||||||
|
|
||||||
|
@ -301,192 +296,6 @@ class DataStore(
|
||||||
|
|
||||||
return [UserPresenceState(**row) for row in rows]
|
return [UserPresenceState(**row) for row in rows]
|
||||||
|
|
||||||
async def count_daily_users(self) -> int:
|
|
||||||
"""
|
|
||||||
Counts the number of users who used this homeserver in the last 24 hours.
|
|
||||||
"""
|
|
||||||
yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
|
|
||||||
return await self.db_pool.runInteraction(
|
|
||||||
"count_daily_users", self._count_users, yesterday
|
|
||||||
)
|
|
||||||
|
|
||||||
async def count_monthly_users(self) -> int:
|
|
||||||
"""
|
|
||||||
Counts the number of users who used this homeserver in the last 30 days.
|
|
||||||
Note this method is intended for phonehome metrics only and is different
|
|
||||||
from the mau figure in synapse.storage.monthly_active_users which,
|
|
||||||
amongst other things, includes a 3 day grace period before a user counts.
|
|
||||||
"""
|
|
||||||
thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
|
|
||||||
return await self.db_pool.runInteraction(
|
|
||||||
"count_monthly_users", self._count_users, thirty_days_ago
|
|
||||||
)
|
|
||||||
|
|
||||||
def _count_users(self, txn, time_from):
|
|
||||||
"""
|
|
||||||
Returns number of users seen in the past time_from period
|
|
||||||
"""
|
|
||||||
sql = """
|
|
||||||
SELECT COALESCE(count(*), 0) FROM (
|
|
||||||
SELECT user_id FROM user_ips
|
|
||||||
WHERE last_seen > ?
|
|
||||||
GROUP BY user_id
|
|
||||||
) u
|
|
||||||
"""
|
|
||||||
txn.execute(sql, (time_from,))
|
|
||||||
(count,) = txn.fetchone()
|
|
||||||
return count
|
|
||||||
|
|
||||||
async def count_r30_users(self) -> Dict[str, int]:
|
|
||||||
"""
|
|
||||||
Counts the number of 30 day retained users, defined as:-
|
|
||||||
* Users who have created their accounts more than 30 days ago
|
|
||||||
* Where last seen at most 30 days ago
|
|
||||||
* Where account creation and last_seen are > 30 days apart
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A mapping of counts globally as well as broken out by platform.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def _count_r30_users(txn):
|
|
||||||
thirty_days_in_secs = 86400 * 30
|
|
||||||
now = int(self._clock.time())
|
|
||||||
thirty_days_ago_in_secs = now - thirty_days_in_secs
|
|
||||||
|
|
||||||
sql = """
|
|
||||||
SELECT platform, COALESCE(count(*), 0) FROM (
|
|
||||||
SELECT
|
|
||||||
users.name, platform, users.creation_ts * 1000,
|
|
||||||
MAX(uip.last_seen)
|
|
||||||
FROM users
|
|
||||||
INNER JOIN (
|
|
||||||
SELECT
|
|
||||||
user_id,
|
|
||||||
last_seen,
|
|
||||||
CASE
|
|
||||||
WHEN user_agent LIKE '%%Android%%' THEN 'android'
|
|
||||||
WHEN user_agent LIKE '%%iOS%%' THEN 'ios'
|
|
||||||
WHEN user_agent LIKE '%%Electron%%' THEN 'electron'
|
|
||||||
WHEN user_agent LIKE '%%Mozilla%%' THEN 'web'
|
|
||||||
WHEN user_agent LIKE '%%Gecko%%' THEN 'web'
|
|
||||||
ELSE 'unknown'
|
|
||||||
END
|
|
||||||
AS platform
|
|
||||||
FROM user_ips
|
|
||||||
) uip
|
|
||||||
ON users.name = uip.user_id
|
|
||||||
AND users.appservice_id is NULL
|
|
||||||
AND users.creation_ts < ?
|
|
||||||
AND uip.last_seen/1000 > ?
|
|
||||||
AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
|
|
||||||
GROUP BY users.name, platform, users.creation_ts
|
|
||||||
) u GROUP BY platform
|
|
||||||
"""
|
|
||||||
|
|
||||||
results = {}
|
|
||||||
txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
|
|
||||||
|
|
||||||
for row in txn:
|
|
||||||
if row[0] == "unknown":
|
|
||||||
pass
|
|
||||||
results[row[0]] = row[1]
|
|
||||||
|
|
||||||
sql = """
|
|
||||||
SELECT COALESCE(count(*), 0) FROM (
|
|
||||||
SELECT users.name, users.creation_ts * 1000,
|
|
||||||
MAX(uip.last_seen)
|
|
||||||
FROM users
|
|
||||||
INNER JOIN (
|
|
||||||
SELECT
|
|
||||||
user_id,
|
|
||||||
last_seen
|
|
||||||
FROM user_ips
|
|
||||||
) uip
|
|
||||||
ON users.name = uip.user_id
|
|
||||||
AND appservice_id is NULL
|
|
||||||
AND users.creation_ts < ?
|
|
||||||
AND uip.last_seen/1000 > ?
|
|
||||||
AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
|
|
||||||
GROUP BY users.name, users.creation_ts
|
|
||||||
) u
|
|
||||||
"""
|
|
||||||
|
|
||||||
txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
|
|
||||||
|
|
||||||
(count,) = txn.fetchone()
|
|
||||||
results["all"] = count
|
|
||||||
|
|
||||||
return results
|
|
||||||
|
|
||||||
return await self.db_pool.runInteraction("count_r30_users", _count_r30_users)
|
|
||||||
|
|
||||||
def _get_start_of_day(self):
|
|
||||||
"""
|
|
||||||
Returns millisecond unixtime for start of UTC day.
|
|
||||||
"""
|
|
||||||
now = time.gmtime()
|
|
||||||
today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0))
|
|
||||||
return today_start * 1000
|
|
||||||
|
|
||||||
async def generate_user_daily_visits(self) -> None:
|
|
||||||
"""
|
|
||||||
Generates daily visit data for use in cohort/ retention analysis
|
|
||||||
"""
|
|
||||||
|
|
||||||
def _generate_user_daily_visits(txn):
|
|
||||||
logger.info("Calling _generate_user_daily_visits")
|
|
||||||
today_start = self._get_start_of_day()
|
|
||||||
a_day_in_milliseconds = 24 * 60 * 60 * 1000
|
|
||||||
now = self.clock.time_msec()
|
|
||||||
|
|
||||||
sql = """
|
|
||||||
INSERT INTO user_daily_visits (user_id, device_id, timestamp)
|
|
||||||
SELECT u.user_id, u.device_id, ?
|
|
||||||
FROM user_ips AS u
|
|
||||||
LEFT JOIN (
|
|
||||||
SELECT user_id, device_id, timestamp FROM user_daily_visits
|
|
||||||
WHERE timestamp = ?
|
|
||||||
) udv
|
|
||||||
ON u.user_id = udv.user_id AND u.device_id=udv.device_id
|
|
||||||
INNER JOIN users ON users.name=u.user_id
|
|
||||||
WHERE last_seen > ? AND last_seen <= ?
|
|
||||||
AND udv.timestamp IS NULL AND users.is_guest=0
|
|
||||||
AND users.appservice_id IS NULL
|
|
||||||
GROUP BY u.user_id, u.device_id
|
|
||||||
"""
|
|
||||||
|
|
||||||
# This means that the day has rolled over but there could still
|
|
||||||
# be entries from the previous day. There is an edge case
|
|
||||||
# where if the user logs in at 23:59 and overwrites their
|
|
||||||
# last_seen at 00:01 then they will not be counted in the
|
|
||||||
# previous day's stats - it is important that the query is run
|
|
||||||
# often to minimise this case.
|
|
||||||
if today_start > self._last_user_visit_update:
|
|
||||||
yesterday_start = today_start - a_day_in_milliseconds
|
|
||||||
txn.execute(
|
|
||||||
sql,
|
|
||||||
(
|
|
||||||
yesterday_start,
|
|
||||||
yesterday_start,
|
|
||||||
self._last_user_visit_update,
|
|
||||||
today_start,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
self._last_user_visit_update = today_start
|
|
||||||
|
|
||||||
txn.execute(
|
|
||||||
sql, (today_start, today_start, self._last_user_visit_update, now)
|
|
||||||
)
|
|
||||||
# Update _last_user_visit_update to now. The reason to do this
|
|
||||||
# rather just clamping to the beginning of the day is to limit
|
|
||||||
# the size of the join - meaning that the query can be run more
|
|
||||||
# frequently
|
|
||||||
self._last_user_visit_update = now
|
|
||||||
|
|
||||||
await self.db_pool.runInteraction(
|
|
||||||
"generate_user_daily_visits", _generate_user_daily_visits
|
|
||||||
)
|
|
||||||
|
|
||||||
async def get_users(self) -> List[Dict[str, Any]]:
|
async def get_users(self) -> List[Dict[str, Any]]:
|
||||||
"""Function to retrieve a list of users in users table.
|
"""Function to retrieve a list of users in users table.
|
||||||
|
|
||||||
|
|
|
@ -12,6 +12,10 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
import calendar
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
from synapse.metrics import GaugeBucketCollector
|
from synapse.metrics import GaugeBucketCollector
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
|
@ -21,6 +25,8 @@ from synapse.storage.databases.main.event_push_actions import (
|
||||||
EventPushActionsWorkerStore,
|
EventPushActionsWorkerStore,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# Collect metrics on the number of forward extremities that exist.
|
# Collect metrics on the number of forward extremities that exist.
|
||||||
_extremities_collecter = GaugeBucketCollector(
|
_extremities_collecter = GaugeBucketCollector(
|
||||||
"synapse_forward_extremities",
|
"synapse_forward_extremities",
|
||||||
|
@ -60,6 +66,9 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
|
||||||
|
|
||||||
hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000)
|
hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000)
|
||||||
|
|
||||||
|
# Used in _generate_user_daily_visits to keep track of progress
|
||||||
|
self._last_user_visit_update = self._get_start_of_day()
|
||||||
|
|
||||||
async def _read_forward_extremities(self):
|
async def _read_forward_extremities(self):
|
||||||
def fetch(txn):
|
def fetch(txn):
|
||||||
txn.execute(
|
txn.execute(
|
||||||
|
@ -137,3 +146,189 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
|
||||||
return count
|
return count
|
||||||
|
|
||||||
return await self.db_pool.runInteraction("count_daily_active_rooms", _count)
|
return await self.db_pool.runInteraction("count_daily_active_rooms", _count)
|
||||||
|
|
||||||
|
async def count_daily_users(self) -> int:
|
||||||
|
"""
|
||||||
|
Counts the number of users who used this homeserver in the last 24 hours.
|
||||||
|
"""
|
||||||
|
yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
|
||||||
|
return await self.db_pool.runInteraction(
|
||||||
|
"count_daily_users", self._count_users, yesterday
|
||||||
|
)
|
||||||
|
|
||||||
|
async def count_monthly_users(self) -> int:
|
||||||
|
"""
|
||||||
|
Counts the number of users who used this homeserver in the last 30 days.
|
||||||
|
Note this method is intended for phonehome metrics only and is different
|
||||||
|
from the mau figure in synapse.storage.monthly_active_users which,
|
||||||
|
amongst other things, includes a 3 day grace period before a user counts.
|
||||||
|
"""
|
||||||
|
thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
|
||||||
|
return await self.db_pool.runInteraction(
|
||||||
|
"count_monthly_users", self._count_users, thirty_days_ago
|
||||||
|
)
|
||||||
|
|
||||||
|
def _count_users(self, txn, time_from):
|
||||||
|
"""
|
||||||
|
Returns number of users seen in the past time_from period
|
||||||
|
"""
|
||||||
|
sql = """
|
||||||
|
SELECT COALESCE(count(*), 0) FROM (
|
||||||
|
SELECT user_id FROM user_ips
|
||||||
|
WHERE last_seen > ?
|
||||||
|
GROUP BY user_id
|
||||||
|
) u
|
||||||
|
"""
|
||||||
|
txn.execute(sql, (time_from,))
|
||||||
|
(count,) = txn.fetchone()
|
||||||
|
return count
|
||||||
|
|
||||||
|
async def count_r30_users(self) -> Dict[str, int]:
|
||||||
|
"""
|
||||||
|
Counts the number of 30 day retained users, defined as:-
|
||||||
|
* Users who have created their accounts more than 30 days ago
|
||||||
|
* Where last seen at most 30 days ago
|
||||||
|
* Where account creation and last_seen are > 30 days apart
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A mapping of counts globally as well as broken out by platform.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _count_r30_users(txn):
|
||||||
|
thirty_days_in_secs = 86400 * 30
|
||||||
|
now = int(self._clock.time())
|
||||||
|
thirty_days_ago_in_secs = now - thirty_days_in_secs
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
SELECT platform, COALESCE(count(*), 0) FROM (
|
||||||
|
SELECT
|
||||||
|
users.name, platform, users.creation_ts * 1000,
|
||||||
|
MAX(uip.last_seen)
|
||||||
|
FROM users
|
||||||
|
INNER JOIN (
|
||||||
|
SELECT
|
||||||
|
user_id,
|
||||||
|
last_seen,
|
||||||
|
CASE
|
||||||
|
WHEN user_agent LIKE '%%Android%%' THEN 'android'
|
||||||
|
WHEN user_agent LIKE '%%iOS%%' THEN 'ios'
|
||||||
|
WHEN user_agent LIKE '%%Electron%%' THEN 'electron'
|
||||||
|
WHEN user_agent LIKE '%%Mozilla%%' THEN 'web'
|
||||||
|
WHEN user_agent LIKE '%%Gecko%%' THEN 'web'
|
||||||
|
ELSE 'unknown'
|
||||||
|
END
|
||||||
|
AS platform
|
||||||
|
FROM user_ips
|
||||||
|
) uip
|
||||||
|
ON users.name = uip.user_id
|
||||||
|
AND users.appservice_id is NULL
|
||||||
|
AND users.creation_ts < ?
|
||||||
|
AND uip.last_seen/1000 > ?
|
||||||
|
AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
|
||||||
|
GROUP BY users.name, platform, users.creation_ts
|
||||||
|
) u GROUP BY platform
|
||||||
|
"""
|
||||||
|
|
||||||
|
results = {}
|
||||||
|
txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
|
||||||
|
|
||||||
|
for row in txn:
|
||||||
|
if row[0] == "unknown":
|
||||||
|
pass
|
||||||
|
results[row[0]] = row[1]
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
SELECT COALESCE(count(*), 0) FROM (
|
||||||
|
SELECT users.name, users.creation_ts * 1000,
|
||||||
|
MAX(uip.last_seen)
|
||||||
|
FROM users
|
||||||
|
INNER JOIN (
|
||||||
|
SELECT
|
||||||
|
user_id,
|
||||||
|
last_seen
|
||||||
|
FROM user_ips
|
||||||
|
) uip
|
||||||
|
ON users.name = uip.user_id
|
||||||
|
AND appservice_id is NULL
|
||||||
|
AND users.creation_ts < ?
|
||||||
|
AND uip.last_seen/1000 > ?
|
||||||
|
AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
|
||||||
|
GROUP BY users.name, users.creation_ts
|
||||||
|
) u
|
||||||
|
"""
|
||||||
|
|
||||||
|
txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
|
||||||
|
|
||||||
|
(count,) = txn.fetchone()
|
||||||
|
results["all"] = count
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
return await self.db_pool.runInteraction("count_r30_users", _count_r30_users)
|
||||||
|
|
||||||
|
def _get_start_of_day(self):
|
||||||
|
"""
|
||||||
|
Returns millisecond unixtime for start of UTC day.
|
||||||
|
"""
|
||||||
|
now = time.gmtime()
|
||||||
|
today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0))
|
||||||
|
return today_start * 1000
|
||||||
|
|
||||||
|
async def generate_user_daily_visits(self) -> None:
|
||||||
|
"""
|
||||||
|
Generates daily visit data for use in cohort/ retention analysis
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _generate_user_daily_visits(txn):
|
||||||
|
logger.info("Calling _generate_user_daily_visits")
|
||||||
|
today_start = self._get_start_of_day()
|
||||||
|
a_day_in_milliseconds = 24 * 60 * 60 * 1000
|
||||||
|
now = self._clock.time_msec()
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
INSERT INTO user_daily_visits (user_id, device_id, timestamp)
|
||||||
|
SELECT u.user_id, u.device_id, ?
|
||||||
|
FROM user_ips AS u
|
||||||
|
LEFT JOIN (
|
||||||
|
SELECT user_id, device_id, timestamp FROM user_daily_visits
|
||||||
|
WHERE timestamp = ?
|
||||||
|
) udv
|
||||||
|
ON u.user_id = udv.user_id AND u.device_id=udv.device_id
|
||||||
|
INNER JOIN users ON users.name=u.user_id
|
||||||
|
WHERE last_seen > ? AND last_seen <= ?
|
||||||
|
AND udv.timestamp IS NULL AND users.is_guest=0
|
||||||
|
AND users.appservice_id IS NULL
|
||||||
|
GROUP BY u.user_id, u.device_id
|
||||||
|
"""
|
||||||
|
|
||||||
|
# This means that the day has rolled over but there could still
|
||||||
|
# be entries from the previous day. There is an edge case
|
||||||
|
# where if the user logs in at 23:59 and overwrites their
|
||||||
|
# last_seen at 00:01 then they will not be counted in the
|
||||||
|
# previous day's stats - it is important that the query is run
|
||||||
|
# often to minimise this case.
|
||||||
|
if today_start > self._last_user_visit_update:
|
||||||
|
yesterday_start = today_start - a_day_in_milliseconds
|
||||||
|
txn.execute(
|
||||||
|
sql,
|
||||||
|
(
|
||||||
|
yesterday_start,
|
||||||
|
yesterday_start,
|
||||||
|
self._last_user_visit_update,
|
||||||
|
today_start,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
self._last_user_visit_update = today_start
|
||||||
|
|
||||||
|
txn.execute(
|
||||||
|
sql, (today_start, today_start, self._last_user_visit_update, now)
|
||||||
|
)
|
||||||
|
# Update _last_user_visit_update to now. The reason to do this
|
||||||
|
# rather just clamping to the beginning of the day is to limit
|
||||||
|
# the size of the join - meaning that the query can be run more
|
||||||
|
# frequently
|
||||||
|
self._last_user_visit_update = now
|
||||||
|
|
||||||
|
await self.db_pool.runInteraction(
|
||||||
|
"generate_user_daily_visits", _generate_user_daily_visits
|
||||||
|
)
|
||||||
|
|
|
@ -32,6 +32,9 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore):
|
||||||
self._clock = hs.get_clock()
|
self._clock = hs.get_clock()
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
|
|
||||||
|
self._limit_usage_by_mau = hs.config.limit_usage_by_mau
|
||||||
|
self._max_mau_value = hs.config.max_mau_value
|
||||||
|
|
||||||
@cached(num_args=0)
|
@cached(num_args=0)
|
||||||
async def get_monthly_active_count(self) -> int:
|
async def get_monthly_active_count(self) -> int:
|
||||||
"""Generates current count of monthly active users
|
"""Generates current count of monthly active users
|
||||||
|
@ -124,60 +127,6 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore):
|
||||||
desc="user_last_seen_monthly_active",
|
desc="user_last_seen_monthly_active",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
|
|
||||||
def __init__(self, database: DatabasePool, db_conn, hs):
|
|
||||||
super().__init__(database, db_conn, hs)
|
|
||||||
|
|
||||||
self._limit_usage_by_mau = hs.config.limit_usage_by_mau
|
|
||||||
self._mau_stats_only = hs.config.mau_stats_only
|
|
||||||
self._max_mau_value = hs.config.max_mau_value
|
|
||||||
|
|
||||||
# Do not add more reserved users than the total allowable number
|
|
||||||
# cur = LoggingTransaction(
|
|
||||||
self.db_pool.new_transaction(
|
|
||||||
db_conn,
|
|
||||||
"initialise_mau_threepids",
|
|
||||||
[],
|
|
||||||
[],
|
|
||||||
self._initialise_reserved_users,
|
|
||||||
hs.config.mau_limits_reserved_threepids[: self._max_mau_value],
|
|
||||||
)
|
|
||||||
|
|
||||||
def _initialise_reserved_users(self, txn, threepids):
|
|
||||||
"""Ensures that reserved threepids are accounted for in the MAU table, should
|
|
||||||
be called on start up.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
txn (cursor):
|
|
||||||
threepids (list[dict]): List of threepid dicts to reserve
|
|
||||||
"""
|
|
||||||
|
|
||||||
# XXX what is this function trying to achieve? It upserts into
|
|
||||||
# monthly_active_users for each *registered* reserved mau user, but why?
|
|
||||||
#
|
|
||||||
# - shouldn't there already be an entry for each reserved user (at least
|
|
||||||
# if they have been active recently)?
|
|
||||||
#
|
|
||||||
# - if it's important that the timestamp is kept up to date, why do we only
|
|
||||||
# run this at startup?
|
|
||||||
|
|
||||||
for tp in threepids:
|
|
||||||
user_id = self.get_user_id_by_threepid_txn(txn, tp["medium"], tp["address"])
|
|
||||||
|
|
||||||
if user_id:
|
|
||||||
is_support = self.is_support_user_txn(txn, user_id)
|
|
||||||
if not is_support:
|
|
||||||
# We do this manually here to avoid hitting #6791
|
|
||||||
self.db_pool.simple_upsert_txn(
|
|
||||||
txn,
|
|
||||||
table="monthly_active_users",
|
|
||||||
keyvalues={"user_id": user_id},
|
|
||||||
values={"timestamp": int(self._clock.time_msec())},
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
logger.warning("mau limit reserved threepid %s not found in db" % tp)
|
|
||||||
|
|
||||||
async def reap_monthly_active_users(self):
|
async def reap_monthly_active_users(self):
|
||||||
"""Cleans out monthly active user table to ensure that no stale
|
"""Cleans out monthly active user table to ensure that no stale
|
||||||
entries exist.
|
entries exist.
|
||||||
|
@ -257,6 +206,58 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
|
||||||
"reap_monthly_active_users", _reap_users, reserved_users
|
"reap_monthly_active_users", _reap_users, reserved_users
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
|
||||||
|
def __init__(self, database: DatabasePool, db_conn, hs):
|
||||||
|
super().__init__(database, db_conn, hs)
|
||||||
|
|
||||||
|
self._mau_stats_only = hs.config.mau_stats_only
|
||||||
|
|
||||||
|
# Do not add more reserved users than the total allowable number
|
||||||
|
# cur = LoggingTransaction(
|
||||||
|
self.db_pool.new_transaction(
|
||||||
|
db_conn,
|
||||||
|
"initialise_mau_threepids",
|
||||||
|
[],
|
||||||
|
[],
|
||||||
|
self._initialise_reserved_users,
|
||||||
|
hs.config.mau_limits_reserved_threepids[: self._max_mau_value],
|
||||||
|
)
|
||||||
|
|
||||||
|
def _initialise_reserved_users(self, txn, threepids):
|
||||||
|
"""Ensures that reserved threepids are accounted for in the MAU table, should
|
||||||
|
be called on start up.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
txn (cursor):
|
||||||
|
threepids (list[dict]): List of threepid dicts to reserve
|
||||||
|
"""
|
||||||
|
|
||||||
|
# XXX what is this function trying to achieve? It upserts into
|
||||||
|
# monthly_active_users for each *registered* reserved mau user, but why?
|
||||||
|
#
|
||||||
|
# - shouldn't there already be an entry for each reserved user (at least
|
||||||
|
# if they have been active recently)?
|
||||||
|
#
|
||||||
|
# - if it's important that the timestamp is kept up to date, why do we only
|
||||||
|
# run this at startup?
|
||||||
|
|
||||||
|
for tp in threepids:
|
||||||
|
user_id = self.get_user_id_by_threepid_txn(txn, tp["medium"], tp["address"])
|
||||||
|
|
||||||
|
if user_id:
|
||||||
|
is_support = self.is_support_user_txn(txn, user_id)
|
||||||
|
if not is_support:
|
||||||
|
# We do this manually here to avoid hitting #6791
|
||||||
|
self.db_pool.simple_upsert_txn(
|
||||||
|
txn,
|
||||||
|
table="monthly_active_users",
|
||||||
|
keyvalues={"user_id": user_id},
|
||||||
|
values={"timestamp": int(self._clock.time_msec())},
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warning("mau limit reserved threepid %s not found in db" % tp)
|
||||||
|
|
||||||
async def upsert_monthly_active_user(self, user_id: str) -> None:
|
async def upsert_monthly_active_user(self, user_id: str) -> None:
|
||||||
"""Updates or inserts the user into the monthly active user table, which
|
"""Updates or inserts the user into the monthly active user table, which
|
||||||
is used to track the current MAU usage of the server
|
is used to track the current MAU usage of the server
|
||||||
|
|
|
@ -192,6 +192,18 @@ class RoomWorkerStore(SQLBaseStore):
|
||||||
"count_public_rooms", _count_public_rooms_txn
|
"count_public_rooms", _count_public_rooms_txn
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def get_room_count(self) -> int:
|
||||||
|
"""Retrieve the total number of rooms.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def f(txn):
|
||||||
|
sql = "SELECT count(*) FROM rooms"
|
||||||
|
txn.execute(sql)
|
||||||
|
row = txn.fetchone()
|
||||||
|
return row[0] or 0
|
||||||
|
|
||||||
|
return await self.db_pool.runInteraction("get_rooms", f)
|
||||||
|
|
||||||
async def get_largest_public_rooms(
|
async def get_largest_public_rooms(
|
||||||
self,
|
self,
|
||||||
network_tuple: Optional[ThirdPartyInstanceID],
|
network_tuple: Optional[ThirdPartyInstanceID],
|
||||||
|
@ -1292,18 +1304,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
|
||||||
)
|
)
|
||||||
self.hs.get_notifier().on_new_replication_data()
|
self.hs.get_notifier().on_new_replication_data()
|
||||||
|
|
||||||
async def get_room_count(self) -> int:
|
|
||||||
"""Retrieve the total number of rooms.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def f(txn):
|
|
||||||
sql = "SELECT count(*) FROM rooms"
|
|
||||||
txn.execute(sql)
|
|
||||||
row = txn.fetchone()
|
|
||||||
return row[0] or 0
|
|
||||||
|
|
||||||
return await self.db_pool.runInteraction("get_rooms", f)
|
|
||||||
|
|
||||||
async def add_event_report(
|
async def add_event_report(
|
||||||
self,
|
self,
|
||||||
room_id: str,
|
room_id: str,
|
||||||
|
|
|
@ -288,8 +288,6 @@ class UIAuthWorkerStore(SQLBaseStore):
|
||||||
)
|
)
|
||||||
return [(row["user_agent"], row["ip"]) for row in rows]
|
return [(row["user_agent"], row["ip"]) for row in rows]
|
||||||
|
|
||||||
|
|
||||||
class UIAuthStore(UIAuthWorkerStore):
|
|
||||||
async def delete_old_ui_auth_sessions(self, expiration_time: int) -> None:
|
async def delete_old_ui_auth_sessions(self, expiration_time: int) -> None:
|
||||||
"""
|
"""
|
||||||
Remove sessions which were last used earlier than the expiration time.
|
Remove sessions which were last used earlier than the expiration time.
|
||||||
|
@ -339,3 +337,7 @@ class UIAuthStore(UIAuthWorkerStore):
|
||||||
iterable=session_ids,
|
iterable=session_ids,
|
||||||
keyvalues={},
|
keyvalues={},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class UIAuthStore(UIAuthWorkerStore):
|
||||||
|
pass
|
||||||
|
|
|
@ -17,7 +17,7 @@ import resource
|
||||||
|
|
||||||
import mock
|
import mock
|
||||||
|
|
||||||
from synapse.app.homeserver import phone_stats_home
|
from synapse.app.phone_stats_home import phone_stats_home
|
||||||
|
|
||||||
from tests.unittest import HomeserverTestCase
|
from tests.unittest import HomeserverTestCase
|
||||||
|
|
||||||
|
|
|
@ -276,7 +276,7 @@ def setup_test_homeserver(
|
||||||
|
|
||||||
hs.setup()
|
hs.setup()
|
||||||
if homeserverToUse.__name__ == "TestHomeServer":
|
if homeserverToUse.__name__ == "TestHomeServer":
|
||||||
hs.setup_master()
|
hs.setup_background_tasks()
|
||||||
|
|
||||||
if isinstance(db_engine, PostgresEngine):
|
if isinstance(db_engine, PostgresEngine):
|
||||||
database = hs.get_datastores().databases[0]
|
database = hs.get_datastores().databases[0]
|
||||||
|
|
Loading…
Reference in a new issue