Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes
This commit is contained in:
commit
ebb3cc4ab6
1
changelog.d/3822.misc
Normal file
1
changelog.d/3822.misc
Normal file
|
@ -0,0 +1 @@
|
|||
crypto/ is now ported to Python 3.
|
1
changelog.d/3827.misc
Normal file
1
changelog.d/3827.misc
Normal file
|
@ -0,0 +1 @@
|
|||
speed up lazy loading by 2-3x
|
1
changelog.d/3840.misc
Normal file
1
changelog.d/3840.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Disable lazy loading for incremental syncs for now
|
1
changelog.d/3845.bugfix
Normal file
1
changelog.d/3845.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix outbound requests occasionally wedging, which can result in federation breaking between servers.
|
|
@ -123,6 +123,6 @@ class ClientTLSOptionsFactory(object):
|
|||
|
||||
def get_options(self, host):
|
||||
return ClientTLSOptions(
|
||||
host.decode('utf-8'),
|
||||
host,
|
||||
CertificateOptions(verify=False).getContext()
|
||||
)
|
||||
|
|
|
@ -50,7 +50,7 @@ def fetch_server_key(server_name, tls_client_options_factory, path=KEY_API_V1):
|
|||
defer.returnValue((server_response, server_certificate))
|
||||
except SynapseKeyClientError as e:
|
||||
logger.warn("Error getting key for %r: %s", server_name, e)
|
||||
if e.status.startswith("4"):
|
||||
if e.status.startswith(b"4"):
|
||||
# Don't retry for 4xx responses.
|
||||
raise IOError("Cannot get key for %r" % server_name)
|
||||
except (ConnectError, DomainError) as e:
|
||||
|
@ -82,6 +82,12 @@ class SynapseKeyClientProtocol(HTTPClient):
|
|||
self._peer = self.transport.getPeer()
|
||||
logger.debug("Connected to %s", self._peer)
|
||||
|
||||
if not isinstance(self.path, bytes):
|
||||
self.path = self.path.encode('ascii')
|
||||
|
||||
if not isinstance(self.host, bytes):
|
||||
self.host = self.host.encode('ascii')
|
||||
|
||||
self.sendCommand(b"GET", self.path)
|
||||
if self.host:
|
||||
self.sendHeader(b"Host", self.host)
|
||||
|
|
|
@ -16,9 +16,10 @@
|
|||
|
||||
import hashlib
|
||||
import logging
|
||||
import urllib
|
||||
from collections import namedtuple
|
||||
|
||||
from six.moves import urllib
|
||||
|
||||
from signedjson.key import (
|
||||
decode_verify_key_bytes,
|
||||
encode_verify_key_base64,
|
||||
|
@ -432,7 +433,7 @@ class Keyring(object):
|
|||
# an incoming request.
|
||||
query_response = yield self.client.post_json(
|
||||
destination=perspective_name,
|
||||
path=b"/_matrix/key/v2/query",
|
||||
path="/_matrix/key/v2/query",
|
||||
data={
|
||||
u"server_keys": {
|
||||
server_name: {
|
||||
|
@ -513,8 +514,8 @@ class Keyring(object):
|
|||
|
||||
(response, tls_certificate) = yield fetch_server_key(
|
||||
server_name, self.hs.tls_client_options_factory,
|
||||
path=(b"/_matrix/key/v2/server/%s" % (
|
||||
urllib.quote(requested_key_id),
|
||||
path=("/_matrix/key/v2/server/%s" % (
|
||||
urllib.parse.quote(requested_key_id),
|
||||
)).encode("ascii"),
|
||||
)
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ from twisted.internet import defer
|
|||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.push.clientformat import format_push_rules_for_user
|
||||
from synapse.storage.roommember import MemberSummary
|
||||
from synapse.types import RoomStreamToken
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
|
@ -529,6 +530,8 @@ class SyncHandler(object):
|
|||
A deferred dict describing the room summary
|
||||
"""
|
||||
|
||||
# FIXME: we could/should get this from room_stats when matthew/stats lands
|
||||
|
||||
# FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
|
||||
last_events, _ = yield self.store.get_recent_event_ids_for_room(
|
||||
room_id, end_token=now_token.room_key, limit=1,
|
||||
|
@ -541,44 +544,54 @@ class SyncHandler(object):
|
|||
last_event = last_events[-1]
|
||||
state_ids = yield self.store.get_state_ids_for_event(
|
||||
last_event.event_id, [
|
||||
(EventTypes.Member, None),
|
||||
(EventTypes.Name, ''),
|
||||
(EventTypes.CanonicalAlias, ''),
|
||||
]
|
||||
)
|
||||
|
||||
member_ids = {
|
||||
state_key: event_id
|
||||
for (t, state_key), event_id in iteritems(state_ids)
|
||||
if t == EventTypes.Member
|
||||
}
|
||||
# this is heavily cached, thus: fast.
|
||||
details = yield self.store.get_room_summary(room_id)
|
||||
|
||||
name_id = state_ids.get((EventTypes.Name, ''))
|
||||
canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ''))
|
||||
|
||||
summary = {}
|
||||
|
||||
# FIXME: it feels very heavy to load up every single membership event
|
||||
# just to calculate the counts.
|
||||
member_events = yield self.store.get_events(member_ids.values())
|
||||
|
||||
joined_user_ids = []
|
||||
invited_user_ids = []
|
||||
|
||||
for ev in member_events.values():
|
||||
if ev.content.get("membership") == Membership.JOIN:
|
||||
joined_user_ids.append(ev.state_key)
|
||||
elif ev.content.get("membership") == Membership.INVITE:
|
||||
invited_user_ids.append(ev.state_key)
|
||||
empty_ms = MemberSummary([], 0)
|
||||
|
||||
# TODO: only send these when they change.
|
||||
summary["m.joined_member_count"] = len(joined_user_ids)
|
||||
summary["m.invited_member_count"] = len(invited_user_ids)
|
||||
summary["m.joined_member_count"] = (
|
||||
details.get(Membership.JOIN, empty_ms).count
|
||||
)
|
||||
summary["m.invited_member_count"] = (
|
||||
details.get(Membership.INVITE, empty_ms).count
|
||||
)
|
||||
|
||||
if name_id or canonical_alias_id:
|
||||
defer.returnValue(summary)
|
||||
|
||||
# FIXME: order by stream ordering, not alphabetic
|
||||
joined_user_ids = [
|
||||
r[0] for r in details.get(Membership.JOIN, empty_ms).members
|
||||
]
|
||||
invited_user_ids = [
|
||||
r[0] for r in details.get(Membership.INVITE, empty_ms).members
|
||||
]
|
||||
gone_user_ids = (
|
||||
[r[0] for r in details.get(Membership.LEAVE, empty_ms).members] +
|
||||
[r[0] for r in details.get(Membership.BAN, empty_ms).members]
|
||||
)
|
||||
|
||||
# FIXME: only build up a member_ids list for our heroes
|
||||
member_ids = {}
|
||||
for membership in (
|
||||
Membership.JOIN,
|
||||
Membership.INVITE,
|
||||
Membership.LEAVE,
|
||||
Membership.BAN
|
||||
):
|
||||
for user_id, event_id in details.get(membership, empty_ms).members:
|
||||
member_ids[user_id] = event_id
|
||||
|
||||
# FIXME: order by stream ordering rather than as returned by SQL
|
||||
me = sync_config.user.to_string()
|
||||
if (joined_user_ids or invited_user_ids):
|
||||
summary['m.heroes'] = sorted(
|
||||
|
@ -590,7 +603,11 @@ class SyncHandler(object):
|
|||
)[0:5]
|
||||
else:
|
||||
summary['m.heroes'] = sorted(
|
||||
[user_id for user_id in member_ids.keys() if user_id != me]
|
||||
[
|
||||
user_id
|
||||
for user_id in gone_user_ids
|
||||
if user_id != me
|
||||
]
|
||||
)[0:5]
|
||||
|
||||
if not sync_config.filter_collection.lazy_load_members():
|
||||
|
@ -723,6 +740,26 @@ class SyncHandler(object):
|
|||
lazy_load_members=lazy_load_members,
|
||||
)
|
||||
elif batch.limited:
|
||||
state_at_timeline_start = yield self.store.get_state_ids_for_event(
|
||||
batch.events[0].event_id, types=types,
|
||||
filtered_types=filtered_types,
|
||||
)
|
||||
|
||||
# for now, we disable LL for gappy syncs - see
|
||||
# https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
|
||||
# N.B. this slows down incr syncs as we are now processing way
|
||||
# more state in the server than if we were LLing.
|
||||
#
|
||||
# We still have to filter timeline_start to LL entries (above) in order
|
||||
# for _calculate_state's LL logic to work, as we have to include LL
|
||||
# members for timeline senders in case they weren't loaded in the initial
|
||||
# sync. We do this by (counterintuitively) by filtering timeline_start
|
||||
# members to just be ones which were timeline senders, which then ensures
|
||||
# all of the rest get included in the state block (if we need to know
|
||||
# about them).
|
||||
types = None
|
||||
filtered_types = None
|
||||
|
||||
state_at_previous_sync = yield self.get_state_at(
|
||||
room_id, stream_position=since_token, types=types,
|
||||
filtered_types=filtered_types,
|
||||
|
@ -733,24 +770,21 @@ class SyncHandler(object):
|
|||
filtered_types=filtered_types,
|
||||
)
|
||||
|
||||
state_at_timeline_start = yield self.store.get_state_ids_for_event(
|
||||
batch.events[0].event_id, types=types,
|
||||
filtered_types=filtered_types,
|
||||
)
|
||||
|
||||
state_ids = _calculate_state(
|
||||
timeline_contains=timeline_state,
|
||||
timeline_start=state_at_timeline_start,
|
||||
previous=state_at_previous_sync,
|
||||
current=current_state_ids,
|
||||
# we have to include LL members in case LL initial sync missed them
|
||||
lazy_load_members=lazy_load_members,
|
||||
)
|
||||
else:
|
||||
state_ids = {}
|
||||
if lazy_load_members:
|
||||
if types:
|
||||
# We're returning an incremental sync, with no "gap" since
|
||||
# the previous sync, so normally there would be no state to return
|
||||
# We're returning an incremental sync, with no
|
||||
# "gap" since the previous sync, so normally there would be
|
||||
# no state to return.
|
||||
# But we're lazy-loading, so the client might need some more
|
||||
# member events to understand the events in this timeline.
|
||||
# So we fish out all the member events corresponding to the
|
||||
|
@ -1620,10 +1654,24 @@ class SyncHandler(object):
|
|||
)
|
||||
|
||||
summary = {}
|
||||
|
||||
# we include a summary in room responses when we're lazy loading
|
||||
# members (as the client otherwise doesn't have enough info to form
|
||||
# the name itself).
|
||||
if (
|
||||
sync_config.filter_collection.lazy_load_members() and
|
||||
(
|
||||
# we recalulate the summary:
|
||||
# if there are membership changes in the timeline, or
|
||||
# if membership has changed during a gappy sync, or
|
||||
# if this is an initial sync.
|
||||
any(ev.type == EventTypes.Member for ev in batch.events) or
|
||||
(
|
||||
# XXX: this may include false positives in the form of LL
|
||||
# members which have snuck into state
|
||||
batch.limited and
|
||||
any(t == EventTypes.Member for (t, k) in state)
|
||||
) or
|
||||
since_token is None
|
||||
)
|
||||
):
|
||||
|
@ -1653,6 +1701,16 @@ class SyncHandler(object):
|
|||
unread_notifications["highlight_count"] = notifs["highlight_count"]
|
||||
|
||||
sync_result_builder.joined.append(room_sync)
|
||||
|
||||
if batch.limited:
|
||||
user_id = sync_result_builder.sync_config.user.to_string()
|
||||
logger.info(
|
||||
"Incremental syncing room %s for user %s with %d state events" % (
|
||||
room_id,
|
||||
user_id,
|
||||
len(state),
|
||||
)
|
||||
)
|
||||
elif room_builder.rtype == "archived":
|
||||
room_sync = ArchivedSyncResult(
|
||||
room_id=room_id,
|
||||
|
|
|
@ -280,7 +280,10 @@ class MatrixFederationHttpClient(object):
|
|||
# :'(
|
||||
# Update transactions table?
|
||||
with logcontext.PreserveLoggingContext():
|
||||
body = yield treq.content(response)
|
||||
body = yield self._timeout_deferred(
|
||||
treq.content(response),
|
||||
timeout,
|
||||
)
|
||||
raise HttpResponseException(
|
||||
response.code, response.phrase, body
|
||||
)
|
||||
|
@ -394,7 +397,10 @@ class MatrixFederationHttpClient(object):
|
|||
check_content_type_is_json(response.headers)
|
||||
|
||||
with logcontext.PreserveLoggingContext():
|
||||
body = yield treq.json_content(response)
|
||||
body = yield self._timeout_deferred(
|
||||
treq.json_content(response),
|
||||
timeout,
|
||||
)
|
||||
defer.returnValue(body)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -444,7 +450,10 @@ class MatrixFederationHttpClient(object):
|
|||
check_content_type_is_json(response.headers)
|
||||
|
||||
with logcontext.PreserveLoggingContext():
|
||||
body = yield treq.json_content(response)
|
||||
body = yield self._timeout_deferred(
|
||||
treq.json_content(response),
|
||||
timeout,
|
||||
)
|
||||
|
||||
defer.returnValue(body)
|
||||
|
||||
|
@ -496,7 +505,10 @@ class MatrixFederationHttpClient(object):
|
|||
check_content_type_is_json(response.headers)
|
||||
|
||||
with logcontext.PreserveLoggingContext():
|
||||
body = yield treq.json_content(response)
|
||||
body = yield self._timeout_deferred(
|
||||
treq.json_content(response),
|
||||
timeout,
|
||||
)
|
||||
|
||||
defer.returnValue(body)
|
||||
|
||||
|
@ -543,7 +555,10 @@ class MatrixFederationHttpClient(object):
|
|||
check_content_type_is_json(response.headers)
|
||||
|
||||
with logcontext.PreserveLoggingContext():
|
||||
body = yield treq.json_content(response)
|
||||
body = yield self._timeout_deferred(
|
||||
treq.json_content(response),
|
||||
timeout,
|
||||
)
|
||||
|
||||
defer.returnValue(body)
|
||||
|
||||
|
@ -585,8 +600,10 @@ class MatrixFederationHttpClient(object):
|
|||
|
||||
try:
|
||||
with logcontext.PreserveLoggingContext():
|
||||
length = yield _readBodyToFile(
|
||||
response, output_stream, max_size
|
||||
length = yield self._timeout_deferred(
|
||||
_readBodyToFile(
|
||||
response, output_stream, max_size
|
||||
),
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to download body")
|
||||
|
@ -594,6 +611,27 @@ class MatrixFederationHttpClient(object):
|
|||
|
||||
defer.returnValue((length, headers))
|
||||
|
||||
def _timeout_deferred(self, deferred, timeout_ms=None):
|
||||
"""Times the deferred out after `timeout_ms` ms
|
||||
|
||||
Args:
|
||||
deferred (Deferred)
|
||||
timeout_ms (int|None): Timeout in milliseconds. If None defaults
|
||||
to 60 seconds.
|
||||
|
||||
Returns:
|
||||
Deferred
|
||||
"""
|
||||
|
||||
add_timeout_to_deferred(
|
||||
deferred,
|
||||
timeout_ms / 1000. if timeout_ms else 60,
|
||||
self.hs.get_reactor(),
|
||||
cancelled_to_request_timed_out_error,
|
||||
)
|
||||
|
||||
return deferred
|
||||
|
||||
|
||||
class _ReadBodyToFileProtocol(protocol.Protocol):
|
||||
def __init__(self, stream, deferred, max_size):
|
||||
|
|
|
@ -929,6 +929,10 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
|
|||
txn, self.get_users_in_room, (room_id,)
|
||||
)
|
||||
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_room_summary, (room_id,)
|
||||
)
|
||||
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_current_state_ids, (room_id,)
|
||||
)
|
||||
|
|
|
@ -51,6 +51,12 @@ ProfileInfo = namedtuple(
|
|||
"ProfileInfo", ("avatar_url", "display_name")
|
||||
)
|
||||
|
||||
# "members" points to a truncated list of (user_id, event_id) tuples for users of
|
||||
# a given membership type, suitable for use in calculating heroes for a room.
|
||||
# "count" points to the total numberr of users of a given membership type.
|
||||
MemberSummary = namedtuple(
|
||||
"MemberSummary", ("members", "count")
|
||||
)
|
||||
|
||||
_MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update"
|
||||
|
||||
|
@ -89,6 +95,65 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
)
|
||||
defer.returnValue(result)
|
||||
|
||||
@cached(max_entries=100000)
|
||||
def get_room_summary(self, room_id):
|
||||
""" Get the details of a room roughly suitable for use by the room
|
||||
summary extension to /sync. Useful when lazy loading room members.
|
||||
Args:
|
||||
room_id (str): The room ID to query
|
||||
Returns:
|
||||
Deferred[dict[str, MemberSummary]:
|
||||
dict of membership states, pointing to a MemberSummary named tuple.
|
||||
"""
|
||||
|
||||
def _get_room_summary_txn(txn):
|
||||
# first get counts.
|
||||
# We do this all in one transaction to keep the cache small.
|
||||
# FIXME: get rid of this when we have room_stats
|
||||
sql = """
|
||||
SELECT count(*), m.membership FROM room_memberships as m
|
||||
INNER JOIN current_state_events as c
|
||||
ON m.event_id = c.event_id
|
||||
AND m.room_id = c.room_id
|
||||
AND m.user_id = c.state_key
|
||||
WHERE c.type = 'm.room.member' AND c.room_id = ?
|
||||
GROUP BY m.membership
|
||||
"""
|
||||
|
||||
txn.execute(sql, (room_id,))
|
||||
res = {}
|
||||
for count, membership in txn:
|
||||
summary = res.setdefault(to_ascii(membership), MemberSummary([], count))
|
||||
|
||||
# we order by membership and then fairly arbitrarily by event_id so
|
||||
# heroes are consistent
|
||||
sql = """
|
||||
SELECT m.user_id, m.membership, m.event_id
|
||||
FROM room_memberships as m
|
||||
INNER JOIN current_state_events as c
|
||||
ON m.event_id = c.event_id
|
||||
AND m.room_id = c.room_id
|
||||
AND m.user_id = c.state_key
|
||||
WHERE c.type = 'm.room.member' AND c.room_id = ?
|
||||
ORDER BY
|
||||
CASE m.membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
|
||||
m.event_id ASC
|
||||
LIMIT ?
|
||||
"""
|
||||
|
||||
# 6 is 5 (number of heroes) plus 1, in case one of them is the calling user.
|
||||
txn.execute(sql, (room_id, Membership.JOIN, Membership.INVITE, 6))
|
||||
for user_id, membership, event_id in txn:
|
||||
summary = res[to_ascii(membership)]
|
||||
# we will always have a summary for this membership type at this
|
||||
# point given the summary currently contains the counts.
|
||||
members = summary.members
|
||||
members.append((to_ascii(user_id), to_ascii(event_id)))
|
||||
|
||||
return res
|
||||
|
||||
return self.runInteraction("get_room_summary", _get_room_summary_txn)
|
||||
|
||||
@cached()
|
||||
def get_invited_rooms_for_user(self, user_id):
|
||||
""" Get all the rooms the user is invited to
|
||||
|
|
Loading…
Reference in a new issue