0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-12-16 22:33:51 +01:00

Merge branch 'develop' into matthew/filter_members

This commit is contained in:
Matthew Hodgson 2018-06-04 00:09:17 +03:00
commit 28f09fcdd5
59 changed files with 496 additions and 181 deletions

View file

@ -16,7 +16,7 @@
print("I am a fish %s" % print("I am a fish %s" %
"moo") "moo")
and this:: and this::
print( print(
"I am a fish %s" % "I am a fish %s" %

View file

@ -1,25 +1,47 @@
How to monitor Synapse metrics using Prometheus How to monitor Synapse metrics using Prometheus
=============================================== ===============================================
1. Install prometheus: 1. Install Prometheus:
Follow instructions at http://prometheus.io/docs/introduction/install/ Follow instructions at http://prometheus.io/docs/introduction/install/
2. Enable synapse metrics: 2. Enable Synapse metrics:
Simply setting a (local) port number will enable it. Pick a port. There are two methods of enabling metrics in Synapse.
prometheus itself defaults to 9090, so starting just above that for
locally monitored services seems reasonable. E.g. 9092:
Add to homeserver.yaml:: The first serves the metrics as a part of the usual web server and can be
enabled by adding the "metrics" resource to the existing listener as such::
metrics_port: 9092 resources:
- names:
- client
- metrics
Also ensure that ``enable_metrics`` is set to ``True``. This provides a simple way of adding metrics to your Synapse installation,
and serves under ``/_synapse/metrics``. If you do not wish your metrics be
publicly exposed, you will need to either filter it out at your load
balancer, or use the second method.
Restart synapse. The second method runs the metrics server on a different port, in a
different thread to Synapse. This can make it more resilient to heavy load
meaning metrics cannot be retrieved, and can be exposed to just internal
networks easier. The served metrics are available over HTTP only, and will
be available at ``/``.
3. Add a prometheus target for synapse. Add a new listener to homeserver.yaml::
listeners:
- type: metrics
port: 9000
bind_addresses:
- '0.0.0.0'
For both options, you will need to ensure that ``enable_metrics`` is set to
``True``.
Restart Synapse.
3. Add a Prometheus target for Synapse.
It needs to set the ``metrics_path`` to a non-default value (under ``scrape_configs``):: It needs to set the ``metrics_path`` to a non-default value (under ``scrape_configs``)::
@ -31,7 +53,40 @@ How to monitor Synapse metrics using Prometheus
If your prometheus is older than 1.5.2, you will need to replace If your prometheus is older than 1.5.2, you will need to replace
``static_configs`` in the above with ``target_groups``. ``static_configs`` in the above with ``target_groups``.
Restart prometheus. Restart Prometheus.
Removal of deprecated metrics & time based counters becoming histograms in 0.31.0
---------------------------------------------------------------------------------
The duplicated metrics deprecated in Synapse 0.27.0 have been removed.
All time duration-based metrics have been changed to be seconds. This affects:
================================
msec -> sec metrics
================================
python_gc_time
python_twisted_reactor_tick_time
synapse_storage_query_time
synapse_storage_schedule_time
synapse_storage_transaction_time
================================
Several metrics have been changed to be histograms, which sort entries into
buckets and allow better analysis. The following metrics are now histograms:
=========================================
Altered metrics
=========================================
python_gc_time
python_twisted_reactor_pending_calls
python_twisted_reactor_tick_time
synapse_http_server_response_time_seconds
synapse_storage_query_time
synapse_storage_schedule_time
synapse_storage_transaction_time
=========================================
Block and response metrics renamed for 0.27.0 Block and response metrics renamed for 0.27.0

View file

@ -15,6 +15,8 @@
import logging import logging
from six import itervalues
import pymacaroons import pymacaroons
from twisted.internet import defer from twisted.internet import defer
@ -66,7 +68,7 @@ class Auth(object):
) )
auth_events = yield self.store.get_events(auth_events_ids) auth_events = yield self.store.get_events(auth_events_ids)
auth_events = { auth_events = {
(e.type, e.state_key): e for e in auth_events.values() (e.type, e.state_key): e for e in itervalues(auth_events)
} }
self.check(event, auth_events=auth_events, do_sig_check=do_sig_check) self.check(event, auth_events=auth_events, do_sig_check=do_sig_check)

View file

@ -417,7 +417,7 @@ class Filter(object):
return room_ids return room_ids
def filter(self, events): def filter(self, events):
return filter(self.check, events) return list(filter(self.check, events))
def limit(self): def limit(self):
return self.filter_json.get("limit", 10) return self.filter_json.get("limit", 10)

View file

@ -124,6 +124,19 @@ def quit_with_error(error_string):
sys.exit(1) sys.exit(1)
def listen_metrics(bind_addresses, port):
"""
Start Prometheus metrics server.
"""
from synapse.metrics import RegistryProxy
from prometheus_client import start_http_server
for host in bind_addresses:
reactor.callInThread(start_http_server, int(port),
addr=host, registry=RegistryProxy)
logger.info("Metrics now reporting on %s:%d", host, port)
def listen_tcp(bind_addresses, port, factory, backlog=50): def listen_tcp(bind_addresses, port, factory, backlog=50):
""" """
Create a TCP socket for a port and several addresses Create a TCP socket for a port and several addresses

View file

@ -94,6 +94,13 @@ class AppserviceServer(HomeServer):
globals={"hs": self}, globals={"hs": self},
) )
) )
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warn("Unrecognized listener type: %s", listener["type"])

View file

@ -25,6 +25,7 @@ from synapse.config.logger import setup_logging
from synapse.crypto import context_factory from synapse.crypto import context_factory
from synapse.http.server import JsonResource from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
@ -77,7 +78,7 @@ class ClientReaderServer(HomeServer):
for res in listener_config["resources"]: for res in listener_config["resources"]:
for name in res["names"]: for name in res["names"]:
if name == "metrics": if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self) resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client": elif name == "client":
resource = JsonResource(self, canonical_json=False) resource = JsonResource(self, canonical_json=False)
PublicRoomListRestServlet(self).register(resource) PublicRoomListRestServlet(self).register(resource)
@ -118,7 +119,13 @@ class ClientReaderServer(HomeServer):
globals={"hs": self}, globals={"hs": self},
) )
) )
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warn("Unrecognized listener type: %s", listener["type"])

View file

@ -25,6 +25,7 @@ from synapse.config.logger import setup_logging
from synapse.crypto import context_factory from synapse.crypto import context_factory
from synapse.http.server import JsonResource from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
@ -90,7 +91,7 @@ class EventCreatorServer(HomeServer):
for res in listener_config["resources"]: for res in listener_config["resources"]:
for name in res["names"]: for name in res["names"]:
if name == "metrics": if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self) resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client": elif name == "client":
resource = JsonResource(self, canonical_json=False) resource = JsonResource(self, canonical_json=False)
RoomSendEventRestServlet(self).register(resource) RoomSendEventRestServlet(self).register(resource)
@ -134,6 +135,13 @@ class EventCreatorServer(HomeServer):
globals={"hs": self}, globals={"hs": self},
) )
) )
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warn("Unrecognized listener type: %s", listener["type"])

View file

@ -26,6 +26,7 @@ from synapse.config.logger import setup_logging
from synapse.crypto import context_factory from synapse.crypto import context_factory
from synapse.federation.transport.server import TransportLayerServer from synapse.federation.transport.server import TransportLayerServer
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.directory import DirectoryStore
@ -71,7 +72,7 @@ class FederationReaderServer(HomeServer):
for res in listener_config["resources"]: for res in listener_config["resources"]:
for name in res["names"]: for name in res["names"]:
if name == "metrics": if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self) resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "federation": elif name == "federation":
resources.update({ resources.update({
FEDERATION_PREFIX: TransportLayerServer(self), FEDERATION_PREFIX: TransportLayerServer(self),
@ -107,6 +108,13 @@ class FederationReaderServer(HomeServer):
globals={"hs": self}, globals={"hs": self},
) )
) )
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warn("Unrecognized listener type: %s", listener["type"])

View file

@ -25,6 +25,7 @@ from synapse.config.logger import setup_logging
from synapse.crypto import context_factory from synapse.crypto import context_factory
from synapse.federation import send_queue from synapse.federation import send_queue
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.devices import SlavedDeviceStore
@ -89,7 +90,7 @@ class FederationSenderServer(HomeServer):
for res in listener_config["resources"]: for res in listener_config["resources"]:
for name in res["names"]: for name in res["names"]:
if name == "metrics": if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self) resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
root_resource = create_resource_tree(resources, NoResource()) root_resource = create_resource_tree(resources, NoResource())
@ -121,6 +122,13 @@ class FederationSenderServer(HomeServer):
globals={"hs": self}, globals={"hs": self},
) )
) )
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warn("Unrecognized listener type: %s", listener["type"])

View file

@ -29,6 +29,7 @@ from synapse.http.servlet import (
RestServlet, parse_json_object_from_request, RestServlet, parse_json_object_from_request,
) )
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
@ -131,7 +132,7 @@ class FrontendProxyServer(HomeServer):
for res in listener_config["resources"]: for res in listener_config["resources"]:
for name in res["names"]: for name in res["names"]:
if name == "metrics": if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self) resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client": elif name == "client":
resource = JsonResource(self, canonical_json=False) resource = JsonResource(self, canonical_json=False)
KeyUploadServlet(self).register(resource) KeyUploadServlet(self).register(resource)
@ -172,6 +173,13 @@ class FrontendProxyServer(HomeServer):
globals={"hs": self}, globals={"hs": self},
) )
) )
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warn("Unrecognized listener type: %s", listener["type"])

View file

@ -35,7 +35,7 @@ from synapse.http.additional_resource import AdditionalResource
from synapse.http.server import RootRedirect from synapse.http.server import RootRedirect
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \ from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
check_requirements check_requirements
from synapse.replication.http import ReplicationRestResource, REPLICATION_PREFIX from synapse.replication.http import ReplicationRestResource, REPLICATION_PREFIX
@ -61,8 +61,6 @@ from twisted.web.resource import EncodingResourceWrapper, NoResource
from twisted.web.server import GzipEncoderFactory from twisted.web.server import GzipEncoderFactory
from twisted.web.static import File from twisted.web.static import File
from prometheus_client.twisted import MetricsResource
logger = logging.getLogger("synapse.app.homeserver") logger = logging.getLogger("synapse.app.homeserver")
@ -232,7 +230,7 @@ class SynapseHomeServer(HomeServer):
resources[WEB_CLIENT_PREFIX] = build_resource_for_web_client(self) resources[WEB_CLIENT_PREFIX] = build_resource_for_web_client(self)
if name == "metrics" and self.get_config().enable_metrics: if name == "metrics" and self.get_config().enable_metrics:
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy()) resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
if name == "replication": if name == "replication":
resources[REPLICATION_PREFIX] = ReplicationRestResource(self) resources[REPLICATION_PREFIX] = ReplicationRestResource(self)
@ -265,6 +263,13 @@ class SynapseHomeServer(HomeServer):
reactor.addSystemEventTrigger( reactor.addSystemEventTrigger(
"before", "shutdown", server_listener.stopListening, "before", "shutdown", server_listener.stopListening,
) )
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warn("Unrecognized listener type: %s", listener["type"])
@ -434,6 +439,10 @@ def run(hs):
total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users() total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()
stats["total_nonbridged_users"] = total_nonbridged_users stats["total_nonbridged_users"] = total_nonbridged_users
daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
for name, count in daily_user_type_results.iteritems():
stats["daily_user_type_" + name] = count
room_count = yield hs.get_datastore().get_room_count() room_count = yield hs.get_datastore().get_room_count()
stats["total_room_count"] = room_count stats["total_room_count"] = room_count

View file

@ -27,6 +27,7 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging from synapse.config.logger import setup_logging
from synapse.crypto import context_factory from synapse.crypto import context_factory
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
@ -73,7 +74,7 @@ class MediaRepositoryServer(HomeServer):
for res in listener_config["resources"]: for res in listener_config["resources"]:
for name in res["names"]: for name in res["names"]:
if name == "metrics": if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self) resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "media": elif name == "media":
media_repo = self.get_media_repository_resource() media_repo = self.get_media_repository_resource()
resources.update({ resources.update({
@ -114,6 +115,13 @@ class MediaRepositoryServer(HomeServer):
globals={"hs": self}, globals={"hs": self},
) )
) )
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warn("Unrecognized listener type: %s", listener["type"])

View file

@ -23,6 +23,7 @@ from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.events import SlavedEventStore
@ -92,7 +93,7 @@ class PusherServer(HomeServer):
for res in listener_config["resources"]: for res in listener_config["resources"]:
for name in res["names"]: for name in res["names"]:
if name == "metrics": if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self) resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
root_resource = create_resource_tree(resources, NoResource()) root_resource = create_resource_tree(resources, NoResource())
@ -124,6 +125,13 @@ class PusherServer(HomeServer):
globals={"hs": self}, globals={"hs": self},
) )
) )
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warn("Unrecognized listener type: %s", listener["type"])

View file

@ -26,6 +26,7 @@ from synapse.config.logger import setup_logging
from synapse.handlers.presence import PresenceHandler, get_interested_parties from synapse.handlers.presence import PresenceHandler, get_interested_parties
from synapse.http.server import JsonResource from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
@ -257,7 +258,7 @@ class SynchrotronServer(HomeServer):
for res in listener_config["resources"]: for res in listener_config["resources"]:
for name in res["names"]: for name in res["names"]:
if name == "metrics": if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self) resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client": elif name == "client":
resource = JsonResource(self, canonical_json=False) resource = JsonResource(self, canonical_json=False)
sync.register_servlets(self, resource) sync.register_servlets(self, resource)
@ -301,6 +302,13 @@ class SynchrotronServer(HomeServer):
globals={"hs": self}, globals={"hs": self},
) )
) )
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warn("Unrecognized listener type: %s", listener["type"])

View file

@ -26,6 +26,7 @@ from synapse.config.logger import setup_logging
from synapse.crypto import context_factory from synapse.crypto import context_factory
from synapse.http.server import JsonResource from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
@ -105,7 +106,7 @@ class UserDirectoryServer(HomeServer):
for res in listener_config["resources"]: for res in listener_config["resources"]:
for name in res["names"]: for name in res["names"]:
if name == "metrics": if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self) resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client": elif name == "client":
resource = JsonResource(self, canonical_json=False) resource = JsonResource(self, canonical_json=False)
user_directory.register_servlets(self, resource) user_directory.register_servlets(self, resource)
@ -146,6 +147,13 @@ class UserDirectoryServer(HomeServer):
globals={"hs": self}, globals={"hs": self},
) )
) )
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
else: else:
logger.warn("Unrecognized listener type: %s", listener["type"]) logger.warn("Unrecognized listener type: %s", listener["type"])

View file

@ -250,6 +250,9 @@ class ContentRepositoryConfig(Config):
# - '192.168.0.0/16' # - '192.168.0.0/16'
# - '100.64.0.0/10' # - '100.64.0.0/10'
# - '169.254.0.0/16' # - '169.254.0.0/16'
# - '::1/128'
# - 'fe80::/64'
# - 'fc00::/7'
# #
# List of IP address CIDR ranges that the URL preview spider is allowed # List of IP address CIDR ranges that the URL preview spider is allowed
# to access even if they are specified in url_preview_ip_range_blacklist. # to access even if they are specified in url_preview_ip_range_blacklist.

View file

@ -14,8 +14,12 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging
from ._base import Config, ConfigError from ._base import Config, ConfigError
logger = logging.Logger(__name__)
class ServerConfig(Config): class ServerConfig(Config):
@ -138,6 +142,12 @@ class ServerConfig(Config):
metrics_port = config.get("metrics_port") metrics_port = config.get("metrics_port")
if metrics_port: if metrics_port:
logger.warn(
("The metrics_port configuration option is deprecated in Synapse 0.31 "
"in favour of a listener. Please see "
"http://github.com/matrix-org/synapse/blob/master/docs/metrics-howto.rst"
" on how to configure the new listener."))
self.listeners.append({ self.listeners.append({
"port": metrics_port, "port": metrics_port,
"bind_addresses": [config.get("metrics_bind_host", "127.0.0.1")], "bind_addresses": [config.get("metrics_bind_host", "127.0.0.1")],

View file

@ -471,14 +471,14 @@ def _check_power_levels(event, auth_events):
] ]
old_list = current_state.content.get("users", {}) old_list = current_state.content.get("users", {})
for user in set(old_list.keys() + user_list.keys()): for user in set(list(old_list) + list(user_list)):
levels_to_check.append( levels_to_check.append(
(user, "users") (user, "users")
) )
old_list = current_state.content.get("events", {}) old_list = current_state.content.get("events", {})
new_list = event.content.get("events", {}) new_list = event.content.get("events", {})
for ev_id in set(old_list.keys() + new_list.keys()): for ev_id in set(list(old_list) + list(new_list)):
levels_to_check.append( levels_to_check.append(
(ev_id, "events") (ev_id, "events")
) )

View file

@ -146,7 +146,7 @@ class EventBase(object):
return field in self._event_dict return field in self._event_dict
def items(self): def items(self):
return self._event_dict.items() return list(self._event_dict.items())
class FrozenEvent(EventBase): class FrozenEvent(EventBase):

View file

@ -391,7 +391,7 @@ class FederationClient(FederationBase):
""" """
if return_local: if return_local:
seen_events = yield self.store.get_events(event_ids, allow_rejected=True) seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
signed_events = seen_events.values() signed_events = list(seen_events.values())
else: else:
seen_events = yield self.store.have_seen_events(event_ids) seen_events = yield self.store.have_seen_events(event_ids)
signed_events = [] signed_events = []
@ -589,7 +589,7 @@ class FederationClient(FederationBase):
} }
valid_pdus = yield self._check_sigs_and_hash_and_fetch( valid_pdus = yield self._check_sigs_and_hash_and_fetch(
destination, pdus.values(), destination, list(pdus.values()),
outlier=True, outlier=True,
) )

View file

@ -197,7 +197,7 @@ class FederationRemoteSendQueue(object):
# We only want to send presence for our own users, so lets always just # We only want to send presence for our own users, so lets always just
# filter here just in case. # filter here just in case.
local_states = filter(lambda s: self.is_mine_id(s.user_id), states) local_states = list(filter(lambda s: self.is_mine_id(s.user_id), states))
self.presence_map.update({state.user_id: state for state in local_states}) self.presence_map.update({state.user_id: state for state in local_states})
self.presence_changed[pos] = [state.user_id for state in local_states] self.presence_changed[pos] = [state.user_id for state in local_states]

View file

@ -35,6 +35,8 @@ from synapse.metrics import (
from prometheus_client import Counter from prometheus_client import Counter
from six import itervalues
import logging import logging
@ -234,7 +236,7 @@ class TransactionQueue(object):
yield logcontext.make_deferred_yieldable(defer.gatherResults( yield logcontext.make_deferred_yieldable(defer.gatherResults(
[ [
logcontext.run_in_background(handle_room_events, evs) logcontext.run_in_background(handle_room_events, evs)
for evs in events_by_room.itervalues() for evs in itervalues(events_by_room)
], ],
consumeErrors=True consumeErrors=True
)) ))
@ -325,7 +327,7 @@ class TransactionQueue(object):
if not states_map: if not states_map:
break break
yield self._process_presence_inner(states_map.values()) yield self._process_presence_inner(list(states_map.values()))
except Exception: except Exception:
logger.exception("Error sending presence states to servers") logger.exception("Error sending presence states to servers")
finally: finally:

View file

@ -114,14 +114,14 @@ class BaseHandler(object):
if guest_access != "can_join": if guest_access != "can_join":
if context: if context:
current_state = yield self.store.get_events( current_state = yield self.store.get_events(
context.current_state_ids.values() list(context.current_state_ids.values())
) )
else: else:
current_state = yield self.state_handler.get_current_state( current_state = yield self.state_handler.get_current_state(
event.room_id event.room_id
) )
current_state = current_state.values() current_state = list(current_state.values())
logger.info("maybe_kick_guest_users %r", current_state) logger.info("maybe_kick_guest_users %r", current_state)
yield self.kick_guest_users(current_state) yield self.kick_guest_users(current_state)

View file

@ -15,6 +15,8 @@
from twisted.internet import defer from twisted.internet import defer
from six import itervalues
import synapse import synapse
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
@ -119,7 +121,7 @@ class ApplicationServicesHandler(object):
yield make_deferred_yieldable(defer.gatherResults([ yield make_deferred_yieldable(defer.gatherResults([
run_in_background(handle_room_events, evs) run_in_background(handle_room_events, evs)
for evs in events_by_room.itervalues() for evs in itervalues(events_by_room)
], consumeErrors=True)) ], consumeErrors=True))
yield self.store.set_appservice_last_pos(upper_bound) yield self.store.set_appservice_last_pos(upper_bound)

View file

@ -249,7 +249,7 @@ class AuthHandler(BaseHandler):
errordict = e.error_dict() errordict = e.error_dict()
for f in flows: for f in flows:
if len(set(f) - set(creds.keys())) == 0: if len(set(f) - set(creds)) == 0:
# it's very useful to know what args are stored, but this can # it's very useful to know what args are stored, but this can
# include the password in the case of registering, so only log # include the password in the case of registering, so only log
# the keys (confusingly, clientdict may contain a password # the keys (confusingly, clientdict may contain a password
@ -257,12 +257,12 @@ class AuthHandler(BaseHandler):
# and is not sensitive). # and is not sensitive).
logger.info( logger.info(
"Auth completed with creds: %r. Client dict has keys: %r", "Auth completed with creds: %r. Client dict has keys: %r",
creds, clientdict.keys() creds, list(clientdict)
) )
defer.returnValue((creds, clientdict, session['id'])) defer.returnValue((creds, clientdict, session['id']))
ret = self._auth_dict_for_flows(flows, session) ret = self._auth_dict_for_flows(flows, session)
ret['completed'] = creds.keys() ret['completed'] = list(creds)
ret.update(errordict) ret.update(errordict)
raise InteractiveAuthIncompleteError( raise InteractiveAuthIncompleteError(
ret, ret,

View file

@ -114,7 +114,7 @@ class DeviceHandler(BaseHandler):
user_id, device_id=None user_id, device_id=None
) )
devices = device_map.values() devices = list(device_map.values())
for device in devices: for device in devices:
_update_device_from_client_ips(device, ips) _update_device_from_client_ips(device, ips)
@ -187,7 +187,7 @@ class DeviceHandler(BaseHandler):
defer.Deferred: defer.Deferred:
""" """
device_map = yield self.store.get_devices_by_user(user_id) device_map = yield self.store.get_devices_by_user(user_id)
device_ids = device_map.keys() device_ids = list(device_map)
if except_device_id is not None: if except_device_id is not None:
device_ids = [d for d in device_ids if d != except_device_id] device_ids = [d for d in device_ids if d != except_device_id]
yield self.delete_devices(user_id, device_ids) yield self.delete_devices(user_id, device_ids)

View file

@ -52,7 +52,6 @@ from synapse.util.retryutils import NotRetryingDestination
from synapse.util.distributor import user_joined_room from synapse.util.distributor import user_joined_room
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -480,8 +479,8 @@ class FederationHandler(BaseHandler):
# to get all state ids that we're interested in. # to get all state ids that we're interested in.
event_map = yield self.store.get_events([ event_map = yield self.store.get_events([
e_id e_id
for key_to_eid in event_to_state_ids.itervalues() for key_to_eid in list(event_to_state_ids.values())
for key, e_id in key_to_eid.iteritems() for key, e_id in key_to_eid.items()
if key[0] != EventTypes.Member or check_match(key[1]) if key[0] != EventTypes.Member or check_match(key[1])
]) ])
@ -1149,13 +1148,13 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key) user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id) yield user_joined_room(self.distributor, user, event.room_id)
state_ids = context.prev_state_ids.values() state_ids = list(context.prev_state_ids.values())
auth_chain = yield self.store.get_auth_chain(state_ids) auth_chain = yield self.store.get_auth_chain(state_ids)
state = yield self.store.get_events(context.prev_state_ids.values()) state = yield self.store.get_events(list(context.prev_state_ids.values()))
defer.returnValue({ defer.returnValue({
"state": state.values(), "state": list(state.values()),
"auth_chain": auth_chain, "auth_chain": auth_chain,
}) })
@ -1405,7 +1404,7 @@ class FederationHandler(BaseHandler):
else: else:
del results[(event.type, event.state_key)] del results[(event.type, event.state_key)]
res = results.values() res = list(results.values())
for event in res: for event in res:
# We sign these again because there was a bug where we # We sign these again because there was a bug where we
# incorrectly signed things the first time round # incorrectly signed things the first time round
@ -1446,7 +1445,7 @@ class FederationHandler(BaseHandler):
else: else:
results.pop((event.type, event.state_key), None) results.pop((event.type, event.state_key), None)
defer.returnValue(results.values()) defer.returnValue(list(results.values()))
else: else:
defer.returnValue([]) defer.returnValue([])
@ -1915,7 +1914,7 @@ class FederationHandler(BaseHandler):
}) })
new_state = self.state_handler.resolve_events( new_state = self.state_handler.resolve_events(
[local_view.values(), remote_view.values()], [list(local_view.values()), list(remote_view.values())],
event event
) )

View file

@ -572,6 +572,9 @@ class EventCreationHandler(object):
u = yield self.store.get_user_by_id(user_id) u = yield self.store.get_user_by_id(user_id)
assert u is not None assert u is not None
if u["appservice_id"] is not None:
# users registered by an appservice are exempt
return
if u["consent_version"] == self.config.user_consent_version: if u["consent_version"] == self.config.user_consent_version:
return return

View file

@ -325,7 +325,7 @@ class PresenceHandler(object):
if to_notify: if to_notify:
notified_presence_counter.inc(len(to_notify)) notified_presence_counter.inc(len(to_notify))
yield self._persist_and_notify(to_notify.values()) yield self._persist_and_notify(list(to_notify.values()))
self.unpersisted_users_changes |= set(s.user_id for s in new_states) self.unpersisted_users_changes |= set(s.user_id for s in new_states)
self.unpersisted_users_changes -= set(to_notify.keys()) self.unpersisted_users_changes -= set(to_notify.keys())
@ -687,7 +687,7 @@ class PresenceHandler(object):
""" """
updates = yield self.current_state_for_users(target_user_ids) updates = yield self.current_state_for_users(target_user_ids)
updates = updates.values() updates = list(updates.values())
for user_id in set(target_user_ids) - set(u.user_id for u in updates): for user_id in set(target_user_ids) - set(u.user_id for u in updates):
updates.append(UserPresenceState.default(user_id)) updates.append(UserPresenceState.default(user_id))
@ -753,11 +753,11 @@ class PresenceHandler(object):
self._push_to_remotes([state]) self._push_to_remotes([state])
else: else:
user_ids = yield self.store.get_users_in_room(room_id) user_ids = yield self.store.get_users_in_room(room_id)
user_ids = filter(self.is_mine_id, user_ids) user_ids = list(filter(self.is_mine_id, user_ids))
states = yield self.current_state_for_users(user_ids) states = yield self.current_state_for_users(user_ids)
self._push_to_remotes(states.values()) self._push_to_remotes(list(states.values()))
@defer.inlineCallbacks @defer.inlineCallbacks
def get_presence_list(self, observer_user, accepted=None): def get_presence_list(self, observer_user, accepted=None):
@ -1051,7 +1051,7 @@ class PresenceEventSource(object):
updates = yield presence.current_state_for_users(user_ids_changed) updates = yield presence.current_state_for_users(user_ids_changed)
if include_offline: if include_offline:
defer.returnValue((updates.values(), max_token)) defer.returnValue((list(updates.values()), max_token))
else: else:
defer.returnValue(([ defer.returnValue(([
s for s in itervalues(updates) s for s in itervalues(updates)
@ -1112,7 +1112,7 @@ def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
if new_state: if new_state:
changes[state.user_id] = new_state changes[state.user_id] = new_state
return changes.values() return list(changes.values())
def handle_timeout(state, is_mine, syncing_user_ids, now): def handle_timeout(state, is_mine, syncing_user_ids, now):

View file

@ -455,7 +455,7 @@ class RoomContextHandler(BaseHandler):
state = yield self.store.get_state_for_events( state = yield self.store.get_state_for_events(
[last_event_id], None [last_event_id], None
) )
results["state"] = state[last_event_id].values() results["state"] = list(state[last_event_id].values())
results["start"] = now_token.copy_and_replace( results["start"] = now_token.copy_and_replace(
"room_key", results["start"] "room_key", results["start"]

View file

@ -15,6 +15,7 @@
from twisted.internet import defer from twisted.internet import defer
from six import iteritems
from six.moves import range from six.moves import range
from ._base import BaseHandler from ._base import BaseHandler
@ -307,7 +308,7 @@ class RoomListHandler(BaseHandler):
) )
event_map = yield self.store.get_events([ event_map = yield self.store.get_events([
event_id for key, event_id in current_state_ids.iteritems() event_id for key, event_id in iteritems(current_state_ids)
if key[0] in ( if key[0] in (
EventTypes.JoinRules, EventTypes.JoinRules,
EventTypes.Name, EventTypes.Name,

View file

@ -348,7 +348,7 @@ class SearchHandler(BaseHandler):
rooms = set(e.room_id for e in allowed_events) rooms = set(e.room_id for e in allowed_events)
for room_id in rooms: for room_id in rooms:
state = yield self.state_handler.get_current_state(room_id) state = yield self.state_handler.get_current_state(room_id)
state_results[room_id] = state.values() state_results[room_id] = list(state.values())
state_results.values() state_results.values()

View file

@ -453,6 +453,10 @@ class SyncHandler(object):
Returns: Returns:
A Deferred map from ((type, state_key)->Event) A Deferred map from ((type, state_key)->Event)
""" """
# FIXME this claims to get the state at a stream position, but
# get_recent_events_for_room operates by topo ordering. This therefore
# does not reliably give you the state at the given stream position.
# (https://github.com/matrix-org/synapse/issues/3305)
last_events, _ = yield self.store.get_recent_events_for_room( last_events, _ = yield self.store.get_recent_events_for_room(
room_id, end_token=stream_position.room_key, limit=1, room_id, end_token=stream_position.room_key, limit=1,
) )
@ -608,11 +612,11 @@ class SyncHandler(object):
state = {} state = {}
if state_ids: if state_ids:
state = yield self.store.get_events(state_ids.values()) state = yield self.store.get_events(list(state_ids.values()))
defer.returnValue({ defer.returnValue({
(e.type, e.state_key): e (e.type, e.state_key): e
for e in sync_config.filter_collection.filter_room_state(state.values()) for e in sync_config.filter_collection.filter_room_state(list(state.values()))
}) })
@defer.inlineCallbacks @defer.inlineCallbacks
@ -961,7 +965,7 @@ class SyncHandler(object):
presence.extend(states) presence.extend(states)
# Deduplicate the presence entries so that there's at most one per user # Deduplicate the presence entries so that there's at most one per user
presence = {p.user_id: p for p in presence}.values() presence = list({p.user_id: p for p in presence}.values())
presence = sync_config.filter_collection.filter_presence( presence = sync_config.filter_collection.filter_presence(
presence presence
@ -1113,7 +1117,13 @@ class SyncHandler(object):
Returns: Returns:
Deferred(tuple): Returns a tuple of the form: Deferred(tuple): Returns a tuple of the form:
`([RoomSyncResultBuilder], [InvitedSyncResult], newly_joined_rooms)` `(room_entries, invited_rooms, newly_joined_rooms, newly_left_rooms)`
where:
room_entries is a list [RoomSyncResultBuilder]
invited_rooms is a list [InvitedSyncResult]
newly_joined rooms is a list[str] of room ids
newly_left_rooms is a list[str] of room ids
""" """
user_id = sync_result_builder.sync_config.user.to_string() user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token since_token = sync_result_builder.since_token

View file

@ -39,7 +39,8 @@ HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
class RegistryProxy(object): class RegistryProxy(object):
def collect(self): @staticmethod
def collect():
for metric in REGISTRY.collect(): for metric in REGISTRY.collect():
if not metric.name.startswith("__"): if not metric.name.startswith("__"):
yield metric yield metric

View file

@ -13,4 +13,8 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from prometheus_client.twisted import MetricsResource
METRICS_PREFIX = "/_synapse/metrics" METRICS_PREFIX = "/_synapse/metrics"
__all__ = ["MetricsResource", "METRICS_PREFIX"]

View file

@ -39,7 +39,7 @@ def list_with_base_rules(rawrules):
rawrules = [r for r in rawrules if r['priority_class'] >= 0] rawrules = [r for r in rawrules if r['priority_class'] >= 0]
# shove the server default rules for each kind onto the end of each # shove the server default rules for each kind onto the end of each
current_prio_class = PRIORITY_CLASS_INVERSE_MAP.keys()[-1] current_prio_class = list(PRIORITY_CLASS_INVERSE_MAP)[-1]
ruleslist.extend(make_base_prepend_rules( ruleslist.extend(make_base_prepend_rules(
PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules

View file

@ -229,7 +229,8 @@ class Mailer(object):
if room_vars['notifs'] and 'messages' in room_vars['notifs'][-1]: if room_vars['notifs'] and 'messages' in room_vars['notifs'][-1]:
prev_messages = room_vars['notifs'][-1]['messages'] prev_messages = room_vars['notifs'][-1]['messages']
for message in notifvars['messages']: for message in notifvars['messages']:
pm = filter(lambda pm: pm['id'] == message['id'], prev_messages) pm = list(filter(lambda pm: pm['id'] == message['id'],
prev_messages))
if pm: if pm:
if not message["is_historical"]: if not message["is_historical"]:
pm[0]["is_historical"] = False pm[0]["is_historical"] = False

View file

@ -113,7 +113,7 @@ def calculate_room_name(store, room_state_ids, user_id, fallback_to_members=True
# so find out who is in the room that isn't the user. # so find out who is in the room that isn't the user.
if "m.room.member" in room_state_bytype_ids: if "m.room.member" in room_state_bytype_ids:
member_events = yield store.get_events( member_events = yield store.get_events(
room_state_bytype_ids["m.room.member"].values() list(room_state_bytype_ids["m.room.member"].values())
) )
all_members = [ all_members = [
ev for ev in member_events.values() ev for ev in member_events.values()

View file

@ -104,7 +104,7 @@ class HttpTransactionCache(object):
def _cleanup(self): def _cleanup(self):
now = self.clock.time_msec() now = self.clock.time_msec()
for key in self.transactions.keys(): for key in list(self.transactions):
ts = self.transactions[key][1] ts = self.transactions[key][1]
if now > (ts + CLEANUP_PERIOD_MS): # after cleanup period if now > (ts + CLEANUP_PERIOD_MS): # after cleanup period
del self.transactions[key] del self.transactions[key]

View file

@ -132,7 +132,8 @@ class StateHandler(object):
defer.returnValue(event) defer.returnValue(event)
return return
state_map = yield self.store.get_events(state.values(), get_prev_content=False) state_map = yield self.store.get_events(list(state.values()),
get_prev_content=False)
state = { state = {
key: state_map[e_id] for key, e_id in iteritems(state) if e_id in state_map key: state_map[e_id] for key, e_id in iteritems(state) if e_id in state_map
} }

View file

@ -40,6 +40,9 @@ import synapse.metrics
from synapse.events import EventBase # noqa: F401 from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401
from six.moves import range
from six import itervalues, iteritems
from prometheus_client import Counter from prometheus_client import Counter
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -245,7 +248,7 @@ class EventsStore(EventsWorkerStore):
partitioned.setdefault(event.room_id, []).append((event, ctx)) partitioned.setdefault(event.room_id, []).append((event, ctx))
deferreds = [] deferreds = []
for room_id, evs_ctxs in partitioned.iteritems(): for room_id, evs_ctxs in iteritems(partitioned):
d = self._event_persist_queue.add_to_queue( d = self._event_persist_queue.add_to_queue(
room_id, evs_ctxs, room_id, evs_ctxs,
backfilled=backfilled, backfilled=backfilled,
@ -330,7 +333,7 @@ class EventsStore(EventsWorkerStore):
chunks = [ chunks = [
events_and_contexts[x:x + 100] events_and_contexts[x:x + 100]
for x in xrange(0, len(events_and_contexts), 100) for x in range(0, len(events_and_contexts), 100)
] ]
for chunk in chunks: for chunk in chunks:
@ -364,7 +367,7 @@ class EventsStore(EventsWorkerStore):
(event, context) (event, context)
) )
for room_id, ev_ctx_rm in events_by_room.iteritems(): for room_id, ev_ctx_rm in iteritems(events_by_room):
# Work out new extremities by recursively adding and removing # Work out new extremities by recursively adding and removing
# the new events. # the new events.
latest_event_ids = yield self.get_latest_event_ids_in_room( latest_event_ids = yield self.get_latest_event_ids_in_room(
@ -459,12 +462,12 @@ class EventsStore(EventsWorkerStore):
event_counter.labels(event.type, origin_type, origin_entity).inc() event_counter.labels(event.type, origin_type, origin_entity).inc()
for room_id, new_state in current_state_for_room.iteritems(): for room_id, new_state in iteritems(current_state_for_room):
self.get_current_state_ids.prefill( self.get_current_state_ids.prefill(
(room_id, ), new_state (room_id, ), new_state
) )
for room_id, latest_event_ids in new_forward_extremeties.iteritems(): for room_id, latest_event_ids in iteritems(new_forward_extremeties):
self.get_latest_event_ids_in_room.prefill( self.get_latest_event_ids_in_room.prefill(
(room_id,), list(latest_event_ids) (room_id,), list(latest_event_ids)
) )
@ -641,20 +644,20 @@ class EventsStore(EventsWorkerStore):
""" """
existing_state = yield self.get_current_state_ids(room_id) existing_state = yield self.get_current_state_ids(room_id)
existing_events = set(existing_state.itervalues()) existing_events = set(itervalues(existing_state))
new_events = set(ev_id for ev_id in current_state.itervalues()) new_events = set(ev_id for ev_id in itervalues(current_state))
changed_events = existing_events ^ new_events changed_events = existing_events ^ new_events
if not changed_events: if not changed_events:
return return
to_delete = { to_delete = {
key: ev_id for key, ev_id in existing_state.iteritems() key: ev_id for key, ev_id in iteritems(existing_state)
if ev_id in changed_events if ev_id in changed_events
} }
events_to_insert = (new_events - existing_events) events_to_insert = (new_events - existing_events)
to_insert = { to_insert = {
key: ev_id for key, ev_id in current_state.iteritems() key: ev_id for key, ev_id in iteritems(current_state)
if ev_id in events_to_insert if ev_id in events_to_insert
} }
@ -757,11 +760,11 @@ class EventsStore(EventsWorkerStore):
) )
def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order): def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
for room_id, current_state_tuple in state_delta_by_room.iteritems(): for room_id, current_state_tuple in iteritems(state_delta_by_room):
to_delete, to_insert = current_state_tuple to_delete, to_insert = current_state_tuple
txn.executemany( txn.executemany(
"DELETE FROM current_state_events WHERE event_id = ?", "DELETE FROM current_state_events WHERE event_id = ?",
[(ev_id,) for ev_id in to_delete.itervalues()], [(ev_id,) for ev_id in itervalues(to_delete)],
) )
self._simple_insert_many_txn( self._simple_insert_many_txn(
@ -774,7 +777,7 @@ class EventsStore(EventsWorkerStore):
"type": key[0], "type": key[0],
"state_key": key[1], "state_key": key[1],
} }
for key, ev_id in to_insert.iteritems() for key, ev_id in iteritems(to_insert)
], ],
) )
@ -793,7 +796,7 @@ class EventsStore(EventsWorkerStore):
"event_id": ev_id, "event_id": ev_id,
"prev_event_id": to_delete.get(key, None), "prev_event_id": to_delete.get(key, None),
} }
for key, ev_id in state_deltas.iteritems() for key, ev_id in iteritems(state_deltas)
] ]
) )
@ -836,7 +839,7 @@ class EventsStore(EventsWorkerStore):
def _update_forward_extremities_txn(self, txn, new_forward_extremities, def _update_forward_extremities_txn(self, txn, new_forward_extremities,
max_stream_order): max_stream_order):
for room_id, new_extrem in new_forward_extremities.iteritems(): for room_id, new_extrem in iteritems(new_forward_extremities):
self._simple_delete_txn( self._simple_delete_txn(
txn, txn,
table="event_forward_extremities", table="event_forward_extremities",
@ -854,7 +857,7 @@ class EventsStore(EventsWorkerStore):
"event_id": ev_id, "event_id": ev_id,
"room_id": room_id, "room_id": room_id,
} }
for room_id, new_extrem in new_forward_extremities.iteritems() for room_id, new_extrem in iteritems(new_forward_extremities)
for ev_id in new_extrem for ev_id in new_extrem
], ],
) )
@ -871,7 +874,7 @@ class EventsStore(EventsWorkerStore):
"event_id": event_id, "event_id": event_id,
"stream_ordering": max_stream_order, "stream_ordering": max_stream_order,
} }
for room_id, new_extrem in new_forward_extremities.iteritems() for room_id, new_extrem in iteritems(new_forward_extremities)
for event_id in new_extrem for event_id in new_extrem
] ]
) )
@ -899,7 +902,7 @@ class EventsStore(EventsWorkerStore):
new_events_and_contexts[event.event_id] = (event, context) new_events_and_contexts[event.event_id] = (event, context)
else: else:
new_events_and_contexts[event.event_id] = (event, context) new_events_and_contexts[event.event_id] = (event, context)
return new_events_and_contexts.values() return list(new_events_and_contexts.values())
def _update_room_depths_txn(self, txn, events_and_contexts, backfilled): def _update_room_depths_txn(self, txn, events_and_contexts, backfilled):
"""Update min_depth for each room """Update min_depth for each room
@ -925,7 +928,7 @@ class EventsStore(EventsWorkerStore):
event.depth, depth_updates.get(event.room_id, event.depth) event.depth, depth_updates.get(event.room_id, event.depth)
) )
for room_id, depth in depth_updates.iteritems(): for room_id, depth in iteritems(depth_updates):
self._update_min_depth_for_room_txn(txn, room_id, depth) self._update_min_depth_for_room_txn(txn, room_id, depth)
def _update_outliers_txn(self, txn, events_and_contexts): def _update_outliers_txn(self, txn, events_and_contexts):
@ -1309,7 +1312,7 @@ class EventsStore(EventsWorkerStore):
" WHERE e.event_id IN (%s)" " WHERE e.event_id IN (%s)"
) % (",".join(["?"] * len(ev_map)),) ) % (",".join(["?"] * len(ev_map)),)
txn.execute(sql, ev_map.keys()) txn.execute(sql, list(ev_map))
rows = self.cursor_to_dict(txn) rows = self.cursor_to_dict(txn)
for row in rows: for row in rows:
event = ev_map[row["event_id"]] event = ev_map[row["event_id"]]
@ -1572,7 +1575,7 @@ class EventsStore(EventsWorkerStore):
chunks = [ chunks = [
event_ids[i:i + 100] event_ids[i:i + 100]
for i in xrange(0, len(event_ids), 100) for i in range(0, len(event_ids), 100)
] ]
for chunk in chunks: for chunk in chunks:
ev_rows = self._simple_select_many_txn( ev_rows = self._simple_select_many_txn(
@ -1986,7 +1989,7 @@ class EventsStore(EventsWorkerStore):
logger.info("[purge] finding state groups which depend on redundant" logger.info("[purge] finding state groups which depend on redundant"
" state groups") " state groups")
remaining_state_groups = [] remaining_state_groups = []
for i in xrange(0, len(state_rows), 100): for i in range(0, len(state_rows), 100):
chunk = [sg for sg, in state_rows[i:i + 100]] chunk = [sg for sg, in state_rows[i:i + 100]]
# look for state groups whose prev_state_group is one we are about # look for state groups whose prev_state_group is one we are about
# to delete # to delete
@ -2042,7 +2045,7 @@ class EventsStore(EventsWorkerStore):
"state_key": key[1], "state_key": key[1],
"event_id": state_id, "event_id": state_id,
} }
for key, state_id in curr_state.iteritems() for key, state_id in iteritems(curr_state)
], ],
) )

View file

@ -17,6 +17,7 @@ from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.descriptors import cachedInlineCallbacks
from twisted.internet import defer from twisted.internet import defer
import six
import OpenSSL import OpenSSL
from signedjson.key import decode_verify_key_bytes from signedjson.key import decode_verify_key_bytes
@ -26,6 +27,13 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
if six.PY2:
db_binary_type = buffer
else:
db_binary_type = memoryview
class KeyStore(SQLBaseStore): class KeyStore(SQLBaseStore):
"""Persistence for signature verification keys and tls X.509 certificates """Persistence for signature verification keys and tls X.509 certificates
@ -72,7 +80,7 @@ class KeyStore(SQLBaseStore):
values={ values={
"from_server": from_server, "from_server": from_server,
"ts_added_ms": time_now_ms, "ts_added_ms": time_now_ms,
"tls_certificate": buffer(tls_certificate_bytes), "tls_certificate": db_binary_type(tls_certificate_bytes),
}, },
desc="store_server_certificate", desc="store_server_certificate",
) )
@ -135,7 +143,7 @@ class KeyStore(SQLBaseStore):
values={ values={
"from_server": from_server, "from_server": from_server,
"ts_added_ms": time_now_ms, "ts_added_ms": time_now_ms,
"verify_key": buffer(verify_key.encode()), "verify_key": db_binary_type(verify_key.encode()),
}, },
) )
txn.call_after( txn.call_after(
@ -172,7 +180,7 @@ class KeyStore(SQLBaseStore):
"from_server": from_server, "from_server": from_server,
"ts_added_ms": ts_now_ms, "ts_added_ms": ts_now_ms,
"ts_valid_until_ms": ts_expires_ms, "ts_valid_until_ms": ts_expires_ms,
"key_json": buffer(key_json_bytes), "key_json": db_binary_type(key_json_bytes),
}, },
desc="store_server_keys_json", desc="store_server_keys_json",
) )

View file

@ -26,7 +26,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database # Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts. # schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 49 SCHEMA_VERSION = 50
dir_path = os.path.abspath(os.path.dirname(__file__)) dir_path = os.path.abspath(os.path.dirname(__file__))

View file

@ -16,6 +16,7 @@
from ._base import SQLBaseStore from ._base import SQLBaseStore
from synapse.api.constants import PresenceState from synapse.api.constants import PresenceState
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
from synapse.util import batch_iter
from collections import namedtuple from collections import namedtuple
from twisted.internet import defer from twisted.internet import defer
@ -115,11 +116,7 @@ class PresenceStore(SQLBaseStore):
" AND user_id IN (%s)" " AND user_id IN (%s)"
) )
batches = ( for states in batch_iter(presence_states, 50):
presence_states[i:i + 50]
for i in xrange(0, len(presence_states), 50)
)
for states in batches:
args = [stream_id] args = [stream_id]
args.extend(s.user_id for s in states) args.extend(s.user_id for s in states)
txn.execute( txn.execute(

View file

@ -332,6 +332,35 @@ class ReceiptsStore(ReceiptsWorkerStore):
def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
user_id, event_id, data, stream_id): user_id, event_id, data, stream_id):
res = self._simple_select_one_txn(
txn,
table="events",
retcols=["topological_ordering", "stream_ordering"],
keyvalues={"event_id": event_id},
allow_none=True
)
stream_ordering = int(res["stream_ordering"]) if res else None
# We don't want to clobber receipts for more recent events, so we
# have to compare orderings of existing receipts
if stream_ordering is not None:
sql = (
"SELECT stream_ordering, event_id FROM events"
" INNER JOIN receipts_linearized as r USING (event_id, room_id)"
" WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
)
txn.execute(sql, (room_id, receipt_type, user_id))
for so, eid in txn:
if int(so) >= stream_ordering:
logger.debug(
"Ignoring new receipt for %s in favour of existing "
"one for later event %s",
event_id, eid,
)
return False
txn.call_after( txn.call_after(
self.get_receipts_for_room.invalidate, (room_id, receipt_type) self.get_receipts_for_room.invalidate, (room_id, receipt_type)
) )
@ -355,34 +384,6 @@ class ReceiptsStore(ReceiptsWorkerStore):
(user_id, room_id, receipt_type) (user_id, room_id, receipt_type)
) )
res = self._simple_select_one_txn(
txn,
table="events",
retcols=["topological_ordering", "stream_ordering"],
keyvalues={"event_id": event_id},
allow_none=True
)
topological_ordering = int(res["topological_ordering"]) if res else None
stream_ordering = int(res["stream_ordering"]) if res else None
# We don't want to clobber receipts for more recent events, so we
# have to compare orderings of existing receipts
sql = (
"SELECT topological_ordering, stream_ordering, event_id FROM events"
" INNER JOIN receipts_linearized as r USING (event_id, room_id)"
" WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
)
txn.execute(sql, (room_id, receipt_type, user_id))
if topological_ordering:
for to, so, _ in txn:
if int(to) > topological_ordering:
return False
elif int(to) == topological_ordering and int(so) >= stream_ordering:
return False
self._simple_delete_txn( self._simple_delete_txn(
txn, txn,
table="receipts_linearized", table="receipts_linearized",
@ -406,7 +407,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
} }
) )
if receipt_type == "m.read" and topological_ordering: if receipt_type == "m.read" and stream_ordering is not None:
self._remove_old_push_actions_before_txn( self._remove_old_push_actions_before_txn(
txn, txn,
room_id=room_id, room_id=room_id,

View file

@ -36,6 +36,7 @@ class RegistrationWorkerStore(SQLBaseStore):
retcols=[ retcols=[
"name", "password_hash", "is_guest", "name", "password_hash", "is_guest",
"consent_version", "consent_server_notice_sent", "consent_version", "consent_server_notice_sent",
"appservice_id",
], ],
allow_none=True, allow_none=True,
desc="get_user_by_id", desc="get_user_by_id",
@ -101,6 +102,13 @@ class RegistrationStore(RegistrationWorkerStore,
columns=["user_id", "device_id"], columns=["user_id", "device_id"],
) )
self.register_background_index_update(
"users_creation_ts",
index_name="users_creation_ts",
table="users",
columns=["creation_ts"],
)
# we no longer use refresh tokens, but it's possible that some people # we no longer use refresh tokens, but it's possible that some people
# might have a background update queued to build this index. Just # might have a background update queued to build this index. Just
# clear the background update. # clear the background update.
@ -485,6 +493,35 @@ class RegistrationStore(RegistrationWorkerStore,
ret = yield self.runInteraction("count_users", _count_users) ret = yield self.runInteraction("count_users", _count_users)
defer.returnValue(ret) defer.returnValue(ret)
def count_daily_user_type(self):
"""
Counts 1) native non guest users
2) native guests users
3) bridged users
who registered on the homeserver in the past 24 hours
"""
def _count_daily_user_type(txn):
yesterday = int(self._clock.time()) - (60 * 60 * 24)
sql = """
SELECT user_type, COALESCE(count(*), 0) AS count FROM (
SELECT
CASE
WHEN is_guest=0 AND appservice_id IS NULL THEN 'native'
WHEN is_guest=1 AND appservice_id IS NULL THEN 'guest'
WHEN is_guest=0 AND appservice_id IS NOT NULL THEN 'bridged'
END AS user_type
FROM users
WHERE creation_ts > ?
) AS t GROUP BY user_type
"""
results = {'native': 0, 'guest': 0, 'bridged': 0}
txn.execute(sql, (yesterday,))
for row in txn:
results[row[0]] = row[1]
return results
return self.runInteraction("count_daily_user_type", _count_daily_user_type)
@defer.inlineCallbacks @defer.inlineCallbacks
def count_nonbridged_users(self): def count_nonbridged_users(self):
def _count_users(txn): def _count_users(txn):

View file

@ -0,0 +1,19 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
INSERT into background_updates (update_name, progress_json)
VALUES ('users_creation_ts', '{}');

View file

@ -18,13 +18,14 @@ import logging
import re import re
import simplejson as json import simplejson as json
from six import string_types
from twisted.internet import defer from twisted.internet import defer
from .background_updates import BackgroundUpdateStore from .background_updates import BackgroundUpdateStore
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.storage.engines import PostgresEngine, Sqlite3Engine
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
SearchEntry = namedtuple('SearchEntry', [ SearchEntry = namedtuple('SearchEntry', [
@ -126,7 +127,7 @@ class SearchStore(BackgroundUpdateStore):
# skip over it. # skip over it.
continue continue
if not isinstance(value, basestring): if not isinstance(value, string_types):
# If the event body, name or topic isn't a string # If the event body, name or topic isn't a string
# then skip over it # then skip over it
continue continue
@ -447,7 +448,7 @@ class SearchStore(BackgroundUpdateStore):
"search_msgs", self.cursor_to_dict, sql, *args "search_msgs", self.cursor_to_dict, sql, *args
) )
results = filter(lambda row: row["room_id"] in room_ids, results) results = list(filter(lambda row: row["room_id"] in room_ids, results))
events = yield self._get_events([r["event_id"] for r in results]) events = yield self._get_events([r["event_id"] for r in results])
@ -602,7 +603,7 @@ class SearchStore(BackgroundUpdateStore):
"search_rooms", self.cursor_to_dict, sql, *args "search_rooms", self.cursor_to_dict, sql, *args
) )
results = filter(lambda row: row["room_id"] in room_ids, results) results = list(filter(lambda row: row["room_id"] in room_ids, results))
events = yield self._get_events([r["event_id"] for r in results]) events = yield self._get_events([r["event_id"] for r in results])

View file

@ -14,6 +14,7 @@
# limitations under the License. # limitations under the License.
from twisted.internet import defer from twisted.internet import defer
import six
from ._base import SQLBaseStore from ._base import SQLBaseStore
@ -21,6 +22,13 @@ from unpaddedbase64 import encode_base64
from synapse.crypto.event_signing import compute_event_reference_hash from synapse.crypto.event_signing import compute_event_reference_hash
from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.descriptors import cached, cachedList
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
if six.PY2:
db_binary_type = buffer
else:
db_binary_type = memoryview
class SignatureWorkerStore(SQLBaseStore): class SignatureWorkerStore(SQLBaseStore):
@cached() @cached()
@ -56,7 +64,7 @@ class SignatureWorkerStore(SQLBaseStore):
for e_id, h in hashes.items() for e_id, h in hashes.items()
} }
defer.returnValue(hashes.items()) defer.returnValue(list(hashes.items()))
def _get_event_reference_hashes_txn(self, txn, event_id): def _get_event_reference_hashes_txn(self, txn, event_id):
"""Get all the hashes for a given PDU. """Get all the hashes for a given PDU.
@ -91,7 +99,7 @@ class SignatureStore(SignatureWorkerStore):
vals.append({ vals.append({
"event_id": event.event_id, "event_id": event.event_id,
"algorithm": ref_alg, "algorithm": ref_alg,
"hash": buffer(ref_hash_bytes), "hash": db_binary_type(ref_hash_bytes),
}) })
self._simple_insert_many_txn( self._simple_insert_many_txn(

View file

@ -16,6 +16,9 @@
from collections import namedtuple from collections import namedtuple
import logging import logging
from six import iteritems, itervalues
from six.moves import range
from twisted.internet import defer from twisted.internet import defer
from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.background_updates import BackgroundUpdateStore
@ -134,7 +137,7 @@ class StateGroupWorkerStore(SQLBaseStore):
event_ids, event_ids,
) )
groups = set(event_to_groups.itervalues()) groups = set(itervalues(event_to_groups))
group_to_state = yield self._get_state_for_groups(groups) group_to_state = yield self._get_state_for_groups(groups)
defer.returnValue(group_to_state) defer.returnValue(group_to_state)
@ -166,18 +169,18 @@ class StateGroupWorkerStore(SQLBaseStore):
state_event_map = yield self.get_events( state_event_map = yield self.get_events(
[ [
ev_id for group_ids in group_to_ids.itervalues() ev_id for group_ids in itervalues(group_to_ids)
for ev_id in group_ids.itervalues() for ev_id in itervalues(group_ids)
], ],
get_prev_content=False get_prev_content=False
) )
defer.returnValue({ defer.returnValue({
group: [ group: [
state_event_map[v] for v in event_id_map.itervalues() state_event_map[v] for v in itervalues(event_id_map)
if v in state_event_map if v in state_event_map
] ]
for group, event_id_map in group_to_ids.iteritems() for group, event_id_map in iteritems(group_to_ids)
}) })
@defer.inlineCallbacks @defer.inlineCallbacks
@ -198,7 +201,7 @@ class StateGroupWorkerStore(SQLBaseStore):
""" """
results = {} results = {}
chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)] chunks = [groups[i:i + 100] for i in range(0, len(groups), 100)]
for chunk in chunks: for chunk in chunks:
res = yield self.runInteraction( res = yield self.runInteraction(
"_get_state_groups_from_groups", "_get_state_groups_from_groups",
@ -390,21 +393,21 @@ class StateGroupWorkerStore(SQLBaseStore):
event_ids, event_ids,
) )
groups = set(event_to_groups.itervalues()) groups = set(itervalues(event_to_groups))
group_to_state = yield self._get_state_for_groups(groups, types) group_to_state = yield self._get_state_for_groups(groups, types)
state_event_map = yield self.get_events( state_event_map = yield self.get_events(
[ev_id for sd in group_to_state.itervalues() for ev_id in sd.itervalues()], [ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)],
get_prev_content=False get_prev_content=False
) )
event_to_state = { event_to_state = {
event_id: { event_id: {
k: state_event_map[v] k: state_event_map[v]
for k, v in group_to_state[group].iteritems() for k, v in iteritems(group_to_state[group])
if v in state_event_map if v in state_event_map
} }
for event_id, group in event_to_groups.iteritems() for event_id, group in iteritems(event_to_groups)
} }
defer.returnValue({event: event_to_state[event] for event in event_ids}) defer.returnValue({event: event_to_state[event] for event in event_ids})
@ -429,12 +432,12 @@ class StateGroupWorkerStore(SQLBaseStore):
event_ids, event_ids,
) )
groups = set(event_to_groups.itervalues()) groups = set(itervalues(event_to_groups))
group_to_state = yield self._get_state_for_groups(groups, types) group_to_state = yield self._get_state_for_groups(groups, types)
event_to_state = { event_to_state = {
event_id: group_to_state[group] event_id: group_to_state[group]
for event_id, group in event_to_groups.iteritems() for event_id, group in iteritems(event_to_groups)
} }
defer.returnValue({event: event_to_state[event] for event in event_ids}) defer.returnValue({event: event_to_state[event] for event in event_ids})
@ -562,7 +565,7 @@ class StateGroupWorkerStore(SQLBaseStore):
got_all = is_all or not missing_types got_all = is_all or not missing_types
return { return {
k: v for k, v in state_dict_ids.iteritems() k: v for k, v in iteritems(state_dict_ids)
if include(k[0], k[1]) if include(k[0], k[1])
}, missing_types, got_all }, missing_types, got_all
@ -630,12 +633,12 @@ class StateGroupWorkerStore(SQLBaseStore):
# Now we want to update the cache with all the things we fetched # Now we want to update the cache with all the things we fetched
# from the database. # from the database.
for group, group_state_dict in group_to_state_dict.iteritems(): for group, group_state_dict in iteritems(group_to_state_dict):
state_dict = results[group] state_dict = results[group]
state_dict.update( state_dict.update(
((intern_string(k[0]), intern_string(k[1])), to_ascii(v)) ((intern_string(k[0]), intern_string(k[1])), to_ascii(v))
for k, v in group_state_dict.iteritems() for k, v in iteritems(group_state_dict)
) )
self._state_group_cache.update( self._state_group_cache.update(
@ -722,7 +725,7 @@ class StateGroupWorkerStore(SQLBaseStore):
"state_key": key[1], "state_key": key[1],
"event_id": state_id, "event_id": state_id,
} }
for key, state_id in delta_ids.iteritems() for key, state_id in iteritems(delta_ids)
], ],
) )
else: else:
@ -737,7 +740,7 @@ class StateGroupWorkerStore(SQLBaseStore):
"state_key": key[1], "state_key": key[1],
"event_id": state_id, "event_id": state_id,
} }
for key, state_id in current_state_ids.iteritems() for key, state_id in iteritems(current_state_ids)
], ],
) )
@ -862,11 +865,11 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
"state_group": state_group_id, "state_group": state_group_id,
"event_id": event_id, "event_id": event_id,
} }
for event_id, state_group_id in state_groups.iteritems() for event_id, state_group_id in iteritems(state_groups)
], ],
) )
for event_id, state_group_id in state_groups.iteritems(): for event_id, state_group_id in iteritems(state_groups):
txn.call_after( txn.call_after(
self._get_state_group_for_event.prefill, self._get_state_group_for_event.prefill,
(event_id,), state_group_id (event_id,), state_group_id
@ -894,7 +897,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
def reindex_txn(txn): def reindex_txn(txn):
new_last_state_group = last_state_group new_last_state_group = last_state_group
for count in xrange(batch_size): for count in range(batch_size):
txn.execute( txn.execute(
"SELECT id, room_id FROM state_groups" "SELECT id, room_id FROM state_groups"
" WHERE ? < id AND id <= ?" " WHERE ? < id AND id <= ?"
@ -952,7 +955,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
# of keys # of keys
delta_state = { delta_state = {
key: value for key, value in curr_state.iteritems() key: value for key, value in iteritems(curr_state)
if prev_state.get(key, None) != value if prev_state.get(key, None) != value
} }
@ -992,7 +995,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
"state_key": key[1], "state_key": key[1],
"event_id": state_id, "event_id": state_id,
} }
for key, state_id in delta_state.iteritems() for key, state_id in iteritems(delta_state)
], ],
) )

View file

@ -17,6 +17,7 @@ from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached from synapse.util.caches.descriptors import cached
from twisted.internet import defer from twisted.internet import defer
import six
from canonicaljson import encode_canonical_json from canonicaljson import encode_canonical_json
@ -25,6 +26,13 @@ from collections import namedtuple
import logging import logging
import simplejson as json import simplejson as json
# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
# despite being deprecated and removed in favor of memoryview
if six.PY2:
db_binary_type = buffer
else:
db_binary_type = memoryview
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -110,7 +118,7 @@ class TransactionStore(SQLBaseStore):
"transaction_id": transaction_id, "transaction_id": transaction_id,
"origin": origin, "origin": origin,
"response_code": code, "response_code": code,
"response_json": buffer(encode_canonical_json(response_dict)), "response_json": db_binary_type(encode_canonical_json(response_dict)),
"ts": self._clock.time_msec(), "ts": self._clock.time_msec(),
}, },
or_ignore=True, or_ignore=True,

View file

@ -22,6 +22,8 @@ from synapse.api.constants import EventTypes, JoinRules
from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import get_domain_from_id, get_localpart_from_id from synapse.types import get_domain_from_id, get_localpart_from_id
from six import iteritems
import re import re
import logging import logging
@ -100,7 +102,7 @@ class UserDirectoryStore(SQLBaseStore):
user_id, get_localpart_from_id(user_id), get_domain_from_id(user_id), user_id, get_localpart_from_id(user_id), get_domain_from_id(user_id),
profile.display_name, profile.display_name,
) )
for user_id, profile in users_with_profile.iteritems() for user_id, profile in iteritems(users_with_profile)
) )
elif isinstance(self.database_engine, Sqlite3Engine): elif isinstance(self.database_engine, Sqlite3Engine):
sql = """ sql = """
@ -112,7 +114,7 @@ class UserDirectoryStore(SQLBaseStore):
user_id, user_id,
"%s %s" % (user_id, p.display_name,) if p.display_name else user_id "%s %s" % (user_id, p.display_name,) if p.display_name else user_id
) )
for user_id, p in users_with_profile.iteritems() for user_id, p in iteritems(users_with_profile)
) )
else: else:
# This should be unreachable. # This should be unreachable.
@ -130,7 +132,7 @@ class UserDirectoryStore(SQLBaseStore):
"display_name": profile.display_name, "display_name": profile.display_name,
"avatar_url": profile.avatar_url, "avatar_url": profile.avatar_url,
} }
for user_id, profile in users_with_profile.iteritems() for user_id, profile in iteritems(users_with_profile)
] ]
) )
for user_id in users_with_profile: for user_id in users_with_profile:

View file

@ -31,6 +31,9 @@ import functools
import inspect import inspect
import threading import threading
from six import string_types, itervalues
import six
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -205,7 +208,7 @@ class Cache(object):
def invalidate_all(self): def invalidate_all(self):
self.check_thread() self.check_thread()
self.cache.clear() self.cache.clear()
for entry in self._pending_deferred_cache.itervalues(): for entry in itervalues(self._pending_deferred_cache):
entry.invalidate() entry.invalidate()
self._pending_deferred_cache.clear() self._pending_deferred_cache.clear()
@ -392,9 +395,10 @@ class CacheDescriptor(_CacheDescriptorBase):
ret.addErrback(onErr) ret.addErrback(onErr)
# If our cache_key is a string, try to convert to ascii to save # If our cache_key is a string on py2, try to convert to ascii
# a bit of space in large caches # to save a bit of space in large caches. Py3 does this
if isinstance(cache_key, basestring): # internally automatically.
if six.PY2 and isinstance(cache_key, string_types):
cache_key = to_ascii(cache_key) cache_key = to_ascii(cache_key)
result_d = ObservableDeferred(ret, consumeErrors=True) result_d = ObservableDeferred(ret, consumeErrors=True)
@ -565,7 +569,7 @@ class CacheListDescriptor(_CacheDescriptorBase):
return results return results
return logcontext.make_deferred_yieldable(defer.gatherResults( return logcontext.make_deferred_yieldable(defer.gatherResults(
cached_defers.values(), list(cached_defers.values()),
consumeErrors=True, consumeErrors=True,
).addCallback(update_results_dict).addErrback( ).addCallback(update_results_dict).addErrback(
unwrapFirstError unwrapFirstError

View file

@ -1,3 +1,5 @@
from six import itervalues
SENTINEL = object() SENTINEL = object()
@ -49,7 +51,7 @@ class TreeCache(object):
if popped is SENTINEL: if popped is SENTINEL:
return default return default
node_and_keys = zip(nodes, key) node_and_keys = list(zip(nodes, key))
node_and_keys.reverse() node_and_keys.reverse()
node_and_keys.append((self.root, None)) node_and_keys.append((self.root, None))
@ -76,7 +78,7 @@ def iterate_tree_cache_entry(d):
can contain dicts. can contain dicts.
""" """
if isinstance(d, dict): if isinstance(d, dict):
for value_d in d.itervalues(): for value_d in itervalues(d):
for value in iterate_tree_cache_entry(value_d): for value in iterate_tree_cache_entry(value_d):
yield value yield value
else: else:

View file

@ -16,16 +16,17 @@
from frozendict import frozendict from frozendict import frozendict
import simplejson as json import simplejson as json
from six import string_types
def freeze(o): def freeze(o):
t = type(o) if isinstance(o, dict):
if t is dict:
return frozendict({k: freeze(v) for k, v in o.items()}) return frozendict({k: freeze(v) for k, v in o.items()})
if t is frozendict: if isinstance(o, frozendict):
return o return o
if t is str or t is unicode: if isinstance(o, string_types):
return o return o
try: try:
@ -37,11 +38,10 @@ def freeze(o):
def unfreeze(o): def unfreeze(o):
t = type(o) if isinstance(o, (dict, frozendict)):
if t is dict or t is frozendict:
return dict({k: unfreeze(v) for k, v in o.items()}) return dict({k: unfreeze(v) for k, v in o.items()})
if t is str or t is unicode: if isinstance(o, string_types):
return o return o
try: try:

View file

@ -49,6 +49,7 @@ class RegistrationStoreTestCase(unittest.TestCase):
"is_guest": 0, "is_guest": 0,
"consent_version": None, "consent_version": None,
"consent_server_notice_sent": None, "consent_server_notice_sent": None,
"appservice_id": None,
}, },
(yield self.store.get_user_by_id(self.user_id)) (yield self.store.get_user_by_id(self.user_id))
) )

View file

@ -33,7 +33,7 @@ class WheelTimerTestCase(unittest.TestCase):
self.assertListEqual(wheel.fetch(156), [obj]) self.assertListEqual(wheel.fetch(156), [obj])
self.assertListEqual(wheel.fetch(170), []) self.assertListEqual(wheel.fetch(170), [])
def test_mutli_insert(self): def test_multi_insert(self):
wheel = WheelTimer(bucket_size=5) wheel = WheelTimer(bucket_size=5)
obj1 = object() obj1 = object()
@ -58,7 +58,7 @@ class WheelTimerTestCase(unittest.TestCase):
wheel.insert(100, obj, 50) wheel.insert(100, obj, 50)
self.assertListEqual(wheel.fetch(120), [obj]) self.assertListEqual(wheel.fetch(120), [obj])
def test_insert_past_mutli(self): def test_insert_past_multi(self):
wheel = WheelTimer(bucket_size=5) wheel = WheelTimer(bucket_size=5)
obj1 = object() obj1 = object()

29
tox.ini
View file

@ -51,7 +51,34 @@ usedevelop=true
commands = commands =
/usr/bin/find "{toxinidir}" -name '*.pyc' -delete /usr/bin/find "{toxinidir}" -name '*.pyc' -delete
coverage run {env:COVERAGE_OPTS:} --source="{toxinidir}/synapse" \ coverage run {env:COVERAGE_OPTS:} --source="{toxinidir}/synapse" \
"{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests/config} \ "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests/config \
tests/appservice/test_scheduler.py \
tests/handlers/test_auth.py \
tests/handlers/test_presence.py \
tests/handlers/test_register.py \
tests/storage/test_appservice.py \
tests/storage/test_base.py \
tests/storage/test_client_ips.py \
tests/storage/test_devices.py \
tests/storage/test_end_to_end_keys.py \
tests/storage/test_event_push_actions.py \
tests/storage/test_profile.py \
tests/storage/test_room.py \
tests/test_distributor.py \
tests/test_dns.py \
tests/test_preview.py \
tests/test_test_utils.py \
tests/test_types.py \
tests/util/test_dict_cache.py \
tests/util/test_expiring_cache.py \
tests/util/test_file_consumer.py \
tests/util/test_limiter.py \
tests/util/test_linearizer.py \
tests/util/test_logcontext.py \
tests/util/test_logformatter.py \
tests/util/test_rwlock.py \
tests/util/test_snapshot_cache.py \
tests/util/test_wheel_timer.py} \
{env:TOXSUFFIX:} {env:TOXSUFFIX:}
{env:DUMP_COVERAGE_COMMAND:coverage report -m} {env:DUMP_COVERAGE_COMMAND:coverage report -m}