mirror of
https://mau.dev/maunium/synapse.git
synced 2025-01-19 03:41:54 +01:00
Merge pull request #6629 from matrix-org/rav/kill_event_reference_hashes
Remove a bunch of unused code from event creation
This commit is contained in:
commit
1807db5e73
7 changed files with 57 additions and 105 deletions
1
changelog.d/6629.misc
Normal file
1
changelog.d/6629.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Simplify event creation code by removing redundant queries on the event_reference_hashes table.
|
|
@ -48,7 +48,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
|
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
|
||||||
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
|
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
|
||||||
from synapse.storage.state import StateFilter
|
from synapse.storage.state import StateFilter
|
||||||
from synapse.types import RoomAlias, UserID, create_requester
|
from synapse.types import Collection, RoomAlias, UserID, create_requester
|
||||||
from synapse.util.async_helpers import Linearizer
|
from synapse.util.async_helpers import Linearizer
|
||||||
from synapse.util.frozenutils import frozendict_json_encoder
|
from synapse.util.frozenutils import frozendict_json_encoder
|
||||||
from synapse.util.metrics import measure_func
|
from synapse.util.metrics import measure_func
|
||||||
|
@ -422,7 +422,7 @@ class EventCreationHandler(object):
|
||||||
event_dict,
|
event_dict,
|
||||||
token_id=None,
|
token_id=None,
|
||||||
txn_id=None,
|
txn_id=None,
|
||||||
prev_events_and_hashes=None,
|
prev_event_ids: Optional[Collection[str]] = None,
|
||||||
require_consent=True,
|
require_consent=True,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
|
@ -439,10 +439,9 @@ class EventCreationHandler(object):
|
||||||
token_id (str)
|
token_id (str)
|
||||||
txn_id (str)
|
txn_id (str)
|
||||||
|
|
||||||
prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
|
prev_event_ids:
|
||||||
the forward extremities to use as the prev_events for the
|
the forward extremities to use as the prev_events for the
|
||||||
new event. For each event, a tuple of (event_id, hashes, depth)
|
new event.
|
||||||
where *hashes* is a map from algorithm to hash.
|
|
||||||
|
|
||||||
If None, they will be requested from the database.
|
If None, they will be requested from the database.
|
||||||
|
|
||||||
|
@ -498,9 +497,7 @@ class EventCreationHandler(object):
|
||||||
builder.internal_metadata.txn_id = txn_id
|
builder.internal_metadata.txn_id = txn_id
|
||||||
|
|
||||||
event, context = yield self.create_new_client_event(
|
event, context = yield self.create_new_client_event(
|
||||||
builder=builder,
|
builder=builder, requester=requester, prev_event_ids=prev_event_ids,
|
||||||
requester=requester,
|
|
||||||
prev_events_and_hashes=prev_events_and_hashes,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# In an ideal world we wouldn't need the second part of this condition. However,
|
# In an ideal world we wouldn't need the second part of this condition. However,
|
||||||
|
@ -714,7 +711,7 @@ class EventCreationHandler(object):
|
||||||
@measure_func("create_new_client_event")
|
@measure_func("create_new_client_event")
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def create_new_client_event(
|
def create_new_client_event(
|
||||||
self, builder, requester=None, prev_events_and_hashes=None
|
self, builder, requester=None, prev_event_ids: Optional[Collection[str]] = None
|
||||||
):
|
):
|
||||||
"""Create a new event for a local client
|
"""Create a new event for a local client
|
||||||
|
|
||||||
|
@ -723,10 +720,9 @@ class EventCreationHandler(object):
|
||||||
|
|
||||||
requester (synapse.types.Requester|None):
|
requester (synapse.types.Requester|None):
|
||||||
|
|
||||||
prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
|
prev_event_ids:
|
||||||
the forward extremities to use as the prev_events for the
|
the forward extremities to use as the prev_events for the
|
||||||
new event. For each event, a tuple of (event_id, hashes, depth)
|
new event.
|
||||||
where *hashes* is a map from algorithm to hash.
|
|
||||||
|
|
||||||
If None, they will be requested from the database.
|
If None, they will be requested from the database.
|
||||||
|
|
||||||
|
@ -734,22 +730,15 @@ class EventCreationHandler(object):
|
||||||
Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
|
Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if prev_events_and_hashes is not None:
|
if prev_event_ids is not None:
|
||||||
assert len(prev_events_and_hashes) <= 10, (
|
assert len(prev_event_ids) <= 10, (
|
||||||
"Attempting to create an event with %i prev_events"
|
"Attempting to create an event with %i prev_events"
|
||||||
% (len(prev_events_and_hashes),)
|
% (len(prev_event_ids),)
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
prev_events_and_hashes = yield self.store.get_prev_events_for_room(
|
prev_event_ids = yield self.store.get_prev_events_for_room(builder.room_id)
|
||||||
builder.room_id
|
|
||||||
)
|
|
||||||
|
|
||||||
prev_events = [
|
event = yield builder.build(prev_event_ids=prev_event_ids)
|
||||||
(event_id, prev_hashes)
|
|
||||||
for event_id, prev_hashes, _ in prev_events_and_hashes
|
|
||||||
]
|
|
||||||
|
|
||||||
event = yield builder.build(prev_event_ids=[p for p, _ in prev_events])
|
|
||||||
context = yield self.state.compute_event_context(event)
|
context = yield self.state.compute_event_context(event)
|
||||||
if requester:
|
if requester:
|
||||||
context.app_service = requester.app_service
|
context.app_service = requester.app_service
|
||||||
|
@ -1042,9 +1031,7 @@ class EventCreationHandler(object):
|
||||||
# For each room we need to find a joined member we can use to send
|
# For each room we need to find a joined member we can use to send
|
||||||
# the dummy event with.
|
# the dummy event with.
|
||||||
|
|
||||||
prev_events_and_hashes = yield self.store.get_prev_events_for_room(room_id)
|
latest_event_ids = yield self.store.get_prev_events_for_room(room_id)
|
||||||
|
|
||||||
latest_event_ids = (event_id for (event_id, _, _) in prev_events_and_hashes)
|
|
||||||
|
|
||||||
members = yield self.state.get_current_users_in_room(
|
members = yield self.state.get_current_users_in_room(
|
||||||
room_id, latest_event_ids=latest_event_ids
|
room_id, latest_event_ids=latest_event_ids
|
||||||
|
@ -1063,7 +1050,7 @@ class EventCreationHandler(object):
|
||||||
"room_id": room_id,
|
"room_id": room_id,
|
||||||
"sender": user_id,
|
"sender": user_id,
|
||||||
},
|
},
|
||||||
prev_events_and_hashes=prev_events_and_hashes,
|
prev_event_ids=latest_event_ids,
|
||||||
)
|
)
|
||||||
|
|
||||||
event.internal_metadata.proactively_send = False
|
event.internal_metadata.proactively_send = False
|
||||||
|
|
|
@ -25,7 +25,7 @@ from twisted.internet import defer
|
||||||
from synapse import types
|
from synapse import types
|
||||||
from synapse.api.constants import EventTypes, Membership
|
from synapse.api.constants import EventTypes, Membership
|
||||||
from synapse.api.errors import AuthError, Codes, SynapseError
|
from synapse.api.errors import AuthError, Codes, SynapseError
|
||||||
from synapse.types import RoomID, UserID
|
from synapse.types import Collection, RoomID, UserID
|
||||||
from synapse.util.async_helpers import Linearizer
|
from synapse.util.async_helpers import Linearizer
|
||||||
from synapse.util.distributor import user_joined_room, user_left_room
|
from synapse.util.distributor import user_joined_room, user_left_room
|
||||||
|
|
||||||
|
@ -149,7 +149,7 @@ class RoomMemberHandler(object):
|
||||||
target,
|
target,
|
||||||
room_id,
|
room_id,
|
||||||
membership,
|
membership,
|
||||||
prev_events_and_hashes,
|
prev_event_ids: Collection[str],
|
||||||
txn_id=None,
|
txn_id=None,
|
||||||
ratelimit=True,
|
ratelimit=True,
|
||||||
content=None,
|
content=None,
|
||||||
|
@ -177,7 +177,7 @@ class RoomMemberHandler(object):
|
||||||
},
|
},
|
||||||
token_id=requester.access_token_id,
|
token_id=requester.access_token_id,
|
||||||
txn_id=txn_id,
|
txn_id=txn_id,
|
||||||
prev_events_and_hashes=prev_events_and_hashes,
|
prev_event_ids=prev_event_ids,
|
||||||
require_consent=require_consent,
|
require_consent=require_consent,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -370,8 +370,7 @@ class RoomMemberHandler(object):
|
||||||
if block_invite:
|
if block_invite:
|
||||||
raise SynapseError(403, "Invites have been disabled on this server")
|
raise SynapseError(403, "Invites have been disabled on this server")
|
||||||
|
|
||||||
prev_events_and_hashes = yield self.store.get_prev_events_for_room(room_id)
|
latest_event_ids = yield self.store.get_prev_events_for_room(room_id)
|
||||||
latest_event_ids = (event_id for (event_id, _, _) in prev_events_and_hashes)
|
|
||||||
|
|
||||||
current_state_ids = yield self.state_handler.get_current_state_ids(
|
current_state_ids = yield self.state_handler.get_current_state_ids(
|
||||||
room_id, latest_event_ids=latest_event_ids
|
room_id, latest_event_ids=latest_event_ids
|
||||||
|
@ -485,7 +484,7 @@ class RoomMemberHandler(object):
|
||||||
membership=effective_membership_state,
|
membership=effective_membership_state,
|
||||||
txn_id=txn_id,
|
txn_id=txn_id,
|
||||||
ratelimit=ratelimit,
|
ratelimit=ratelimit,
|
||||||
prev_events_and_hashes=prev_events_and_hashes,
|
prev_event_ids=latest_event_ids,
|
||||||
content=content,
|
content=content,
|
||||||
require_consent=require_consent,
|
require_consent=require_consent,
|
||||||
)
|
)
|
||||||
|
|
|
@ -14,13 +14,10 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import itertools
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
import random
|
|
||||||
|
|
||||||
from six.moves import range
|
from six.moves import range
|
||||||
from six.moves.queue import Empty, PriorityQueue
|
from six.moves.queue import Empty, PriorityQueue
|
||||||
|
|
||||||
from unpaddedbase64 import encode_base64
|
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.api.errors import StoreError
|
from synapse.api.errors import StoreError
|
||||||
|
@ -148,8 +145,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
||||||
retcol="event_id",
|
retcol="event_id",
|
||||||
)
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
def get_prev_events_for_room(self, room_id: str):
|
||||||
def get_prev_events_for_room(self, room_id):
|
|
||||||
"""
|
"""
|
||||||
Gets a subset of the current forward extremities in the given room.
|
Gets a subset of the current forward extremities in the given room.
|
||||||
|
|
||||||
|
@ -160,41 +156,30 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
||||||
room_id (str): room_id
|
room_id (str): room_id
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Deferred[list[(str, dict[str, str], int)]]
|
Deferred[List[str]]: the event ids of the forward extremites
|
||||||
for each event, a tuple of (event_id, hashes, depth)
|
|
||||||
where *hashes* is a map from algorithm to hash.
|
|
||||||
"""
|
|
||||||
res = yield self.get_latest_event_ids_and_hashes_in_room(room_id)
|
|
||||||
if len(res) > 10:
|
|
||||||
# Sort by reverse depth, so we point to the most recent.
|
|
||||||
res.sort(key=lambda a: -a[2])
|
|
||||||
|
|
||||||
# we use half of the limit for the actual most recent events, and
|
|
||||||
# the other half to randomly point to some of the older events, to
|
|
||||||
# make sure that we don't completely ignore the older events.
|
|
||||||
res = res[0:5] + random.sample(res[5:], 5)
|
|
||||||
|
|
||||||
return res
|
|
||||||
|
|
||||||
def get_latest_event_ids_and_hashes_in_room(self, room_id):
|
|
||||||
"""
|
|
||||||
Gets the current forward extremities in the given room
|
|
||||||
|
|
||||||
Args:
|
|
||||||
room_id (str): room_id
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Deferred[list[(str, dict[str, str], int)]]
|
|
||||||
for each event, a tuple of (event_id, hashes, depth)
|
|
||||||
where *hashes* is a map from algorithm to hash.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return self.db.runInteraction(
|
return self.db.runInteraction(
|
||||||
"get_latest_event_ids_and_hashes_in_room",
|
"get_prev_events_for_room", self._get_prev_events_for_room_txn, room_id
|
||||||
self._get_latest_event_ids_and_hashes_in_room,
|
|
||||||
room_id,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _get_prev_events_for_room_txn(self, txn, room_id: str):
|
||||||
|
# we just use the 10 newest events. Older events will become
|
||||||
|
# prev_events of future events.
|
||||||
|
|
||||||
|
sql = """
|
||||||
|
SELECT e.event_id FROM event_forward_extremities AS f
|
||||||
|
INNER JOIN events AS e USING (event_id)
|
||||||
|
WHERE f.room_id = ?
|
||||||
|
ORDER BY e.depth DESC
|
||||||
|
LIMIT 10
|
||||||
|
"""
|
||||||
|
|
||||||
|
txn.execute(sql, (room_id,))
|
||||||
|
|
||||||
|
return [row[0] for row in txn]
|
||||||
|
|
||||||
def get_rooms_with_many_extremities(self, min_count, limit, room_id_filter):
|
def get_rooms_with_many_extremities(self, min_count, limit, room_id_filter):
|
||||||
"""Get the top rooms with at least N extremities.
|
"""Get the top rooms with at least N extremities.
|
||||||
|
|
||||||
|
@ -243,27 +228,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
||||||
desc="get_latest_event_ids_in_room",
|
desc="get_latest_event_ids_in_room",
|
||||||
)
|
)
|
||||||
|
|
||||||
def _get_latest_event_ids_and_hashes_in_room(self, txn, room_id):
|
|
||||||
sql = (
|
|
||||||
"SELECT e.event_id, e.depth FROM events as e "
|
|
||||||
"INNER JOIN event_forward_extremities as f "
|
|
||||||
"ON e.event_id = f.event_id "
|
|
||||||
"AND e.room_id = f.room_id "
|
|
||||||
"WHERE f.room_id = ?"
|
|
||||||
)
|
|
||||||
|
|
||||||
txn.execute(sql, (room_id,))
|
|
||||||
|
|
||||||
results = []
|
|
||||||
for event_id, depth in txn.fetchall():
|
|
||||||
hashes = self._get_event_reference_hashes_txn(txn, event_id)
|
|
||||||
prev_hashes = {
|
|
||||||
k: encode_base64(v) for k, v in hashes.items() if k == "sha256"
|
|
||||||
}
|
|
||||||
results.append((event_id, prev_hashes, depth))
|
|
||||||
|
|
||||||
return results
|
|
||||||
|
|
||||||
def get_min_depth(self, room_id):
|
def get_min_depth(self, room_id):
|
||||||
""" For hte given room, get the minimum depth we have seen for it.
|
""" For hte given room, get the minimum depth we have seen for it.
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import re
|
import re
|
||||||
import string
|
import string
|
||||||
|
import sys
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
@ -23,6 +24,17 @@ from unpaddedbase64 import decode_base64
|
||||||
|
|
||||||
from synapse.api.errors import SynapseError
|
from synapse.api.errors import SynapseError
|
||||||
|
|
||||||
|
# define a version of typing.Collection that works on python 3.5
|
||||||
|
if sys.version_info[:3] >= (3, 6, 0):
|
||||||
|
from typing import Collection
|
||||||
|
else:
|
||||||
|
from typing import Sized, Iterable, Container, TypeVar
|
||||||
|
|
||||||
|
T_co = TypeVar("T_co", covariant=True)
|
||||||
|
|
||||||
|
class Collection(Iterable[T_co], Container[T_co], Sized):
|
||||||
|
__slots__ = ()
|
||||||
|
|
||||||
|
|
||||||
class Requester(
|
class Requester(
|
||||||
namedtuple(
|
namedtuple(
|
||||||
|
|
|
@ -60,21 +60,14 @@ class EventFederationWorkerStoreTestCase(tests.unittest.TestCase):
|
||||||
(event_id, bytearray(b"ffff")),
|
(event_id, bytearray(b"ffff")),
|
||||||
)
|
)
|
||||||
|
|
||||||
for i in range(0, 11):
|
for i in range(0, 20):
|
||||||
yield self.store.db.runInteraction("insert", insert_event, i)
|
yield self.store.db.runInteraction("insert", insert_event, i)
|
||||||
|
|
||||||
# this should get the last five and five others
|
# this should get the last ten
|
||||||
r = yield self.store.get_prev_events_for_room(room_id)
|
r = yield self.store.get_prev_events_for_room(room_id)
|
||||||
self.assertEqual(10, len(r))
|
self.assertEqual(10, len(r))
|
||||||
for i in range(0, 5):
|
for i in range(0, 10):
|
||||||
el = r[i]
|
self.assertEqual("$event_%i:local" % (19 - i), r[i])
|
||||||
depth = el[2]
|
|
||||||
self.assertEqual(10 - i, depth)
|
|
||||||
|
|
||||||
for i in range(5, 5):
|
|
||||||
el = r[i]
|
|
||||||
depth = el[2]
|
|
||||||
self.assertLessEqual(5, depth)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_get_rooms_with_many_extremities(self):
|
def test_get_rooms_with_many_extremities(self):
|
||||||
|
|
|
@ -531,10 +531,6 @@ class HomeserverTestCase(TestCase):
|
||||||
secrets = self.hs.get_secrets()
|
secrets = self.hs.get_secrets()
|
||||||
requester = Requester(user, None, False, None, None)
|
requester = Requester(user, None, False, None, None)
|
||||||
|
|
||||||
prev_events_and_hashes = None
|
|
||||||
if prev_event_ids:
|
|
||||||
prev_events_and_hashes = [[p, {}, 0] for p in prev_event_ids]
|
|
||||||
|
|
||||||
event, context = self.get_success(
|
event, context = self.get_success(
|
||||||
event_creator.create_event(
|
event_creator.create_event(
|
||||||
requester,
|
requester,
|
||||||
|
@ -544,7 +540,7 @@ class HomeserverTestCase(TestCase):
|
||||||
"sender": user.to_string(),
|
"sender": user.to_string(),
|
||||||
"content": {"body": secrets.token_hex(), "msgtype": "m.text"},
|
"content": {"body": secrets.token_hex(), "msgtype": "m.text"},
|
||||||
},
|
},
|
||||||
prev_events_and_hashes=prev_events_and_hashes,
|
prev_event_ids=prev_event_ids,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue