mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-15 14:03:53 +01:00
Rename MSC2716 things from chunk
to batch
to match /batch_send
endpoint (#10838)
See https://github.com/matrix-org/matrix-doc/pull/2716#discussion_r684574497 Dropping support for older MSC2716 room versions so we don't have to worry about supporting both chunk and batch events.
This commit is contained in:
parent
4054dfa409
commit
51e2db3598
13 changed files with 162 additions and 117 deletions
1
changelog.d/10838.misc
Normal file
1
changelog.d/10838.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Rename [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) fields and event types from `chunk` to `batch` to match the `/batch_send` endpoint.
|
|
@ -121,7 +121,7 @@ class EventTypes:
|
||||||
SpaceParent = "m.space.parent"
|
SpaceParent = "m.space.parent"
|
||||||
|
|
||||||
MSC2716_INSERTION = "org.matrix.msc2716.insertion"
|
MSC2716_INSERTION = "org.matrix.msc2716.insertion"
|
||||||
MSC2716_CHUNK = "org.matrix.msc2716.chunk"
|
MSC2716_BATCH = "org.matrix.msc2716.batch"
|
||||||
MSC2716_MARKER = "org.matrix.msc2716.marker"
|
MSC2716_MARKER = "org.matrix.msc2716.marker"
|
||||||
|
|
||||||
|
|
||||||
|
@ -209,11 +209,11 @@ class EventContentFields:
|
||||||
|
|
||||||
# Used on normal messages to indicate they were historically imported after the fact
|
# Used on normal messages to indicate they were historically imported after the fact
|
||||||
MSC2716_HISTORICAL = "org.matrix.msc2716.historical"
|
MSC2716_HISTORICAL = "org.matrix.msc2716.historical"
|
||||||
# For "insertion" events to indicate what the next chunk ID should be in
|
# For "insertion" events to indicate what the next batch ID should be in
|
||||||
# order to connect to it
|
# order to connect to it
|
||||||
MSC2716_NEXT_CHUNK_ID = "org.matrix.msc2716.next_chunk_id"
|
MSC2716_NEXT_BATCH_ID = "org.matrix.msc2716.next_batch_id"
|
||||||
# Used on "chunk" events to indicate which insertion event it connects to
|
# Used on "batch" events to indicate which insertion event it connects to
|
||||||
MSC2716_CHUNK_ID = "org.matrix.msc2716.chunk_id"
|
MSC2716_BATCH_ID = "org.matrix.msc2716.batch_id"
|
||||||
# For "marker" events
|
# For "marker" events
|
||||||
MSC2716_MARKER_INSERTION = "org.matrix.msc2716.marker.insertion"
|
MSC2716_MARKER_INSERTION = "org.matrix.msc2716.marker.insertion"
|
||||||
|
|
||||||
|
|
|
@ -244,24 +244,8 @@ class RoomVersions:
|
||||||
msc2716_historical=False,
|
msc2716_historical=False,
|
||||||
msc2716_redactions=False,
|
msc2716_redactions=False,
|
||||||
)
|
)
|
||||||
MSC2716 = RoomVersion(
|
MSC2716v3 = RoomVersion(
|
||||||
"org.matrix.msc2716",
|
"org.matrix.msc2716v3",
|
||||||
RoomDisposition.UNSTABLE,
|
|
||||||
EventFormatVersions.V3,
|
|
||||||
StateResolutionVersions.V2,
|
|
||||||
enforce_key_validity=True,
|
|
||||||
special_case_aliases_auth=False,
|
|
||||||
strict_canonicaljson=True,
|
|
||||||
limit_notifications_power_levels=True,
|
|
||||||
msc2176_redaction_rules=False,
|
|
||||||
msc3083_join_rules=False,
|
|
||||||
msc3375_redaction_rules=False,
|
|
||||||
msc2403_knocking=True,
|
|
||||||
msc2716_historical=True,
|
|
||||||
msc2716_redactions=False,
|
|
||||||
)
|
|
||||||
MSC2716v2 = RoomVersion(
|
|
||||||
"org.matrix.msc2716v2",
|
|
||||||
RoomDisposition.UNSTABLE,
|
RoomDisposition.UNSTABLE,
|
||||||
EventFormatVersions.V3,
|
EventFormatVersions.V3,
|
||||||
StateResolutionVersions.V2,
|
StateResolutionVersions.V2,
|
||||||
|
@ -289,9 +273,9 @@ KNOWN_ROOM_VERSIONS: Dict[str, RoomVersion] = {
|
||||||
RoomVersions.V6,
|
RoomVersions.V6,
|
||||||
RoomVersions.MSC2176,
|
RoomVersions.MSC2176,
|
||||||
RoomVersions.V7,
|
RoomVersions.V7,
|
||||||
RoomVersions.MSC2716,
|
|
||||||
RoomVersions.V8,
|
RoomVersions.V8,
|
||||||
RoomVersions.V9,
|
RoomVersions.V9,
|
||||||
|
RoomVersions.MSC2716v3,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -213,7 +213,7 @@ def check(
|
||||||
|
|
||||||
if (
|
if (
|
||||||
event.type == EventTypes.MSC2716_INSERTION
|
event.type == EventTypes.MSC2716_INSERTION
|
||||||
or event.type == EventTypes.MSC2716_CHUNK
|
or event.type == EventTypes.MSC2716_BATCH
|
||||||
or event.type == EventTypes.MSC2716_MARKER
|
or event.type == EventTypes.MSC2716_MARKER
|
||||||
):
|
):
|
||||||
check_historical(room_version_obj, event, auth_events)
|
check_historical(room_version_obj, event, auth_events)
|
||||||
|
@ -552,14 +552,14 @@ def check_historical(
|
||||||
auth_events: StateMap[EventBase],
|
auth_events: StateMap[EventBase],
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Check whether the event sender is allowed to send historical related
|
"""Check whether the event sender is allowed to send historical related
|
||||||
events like "insertion", "chunk", and "marker".
|
events like "insertion", "batch", and "marker".
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
None
|
None
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
AuthError if the event sender is not allowed to send historical related events
|
AuthError if the event sender is not allowed to send historical related events
|
||||||
("insertion", "chunk", and "marker").
|
("insertion", "batch", and "marker").
|
||||||
"""
|
"""
|
||||||
# Ignore the auth checks in room versions that do not support historical
|
# Ignore the auth checks in room versions that do not support historical
|
||||||
# events
|
# events
|
||||||
|
@ -573,7 +573,7 @@ def check_historical(
|
||||||
if user_level < historical_level:
|
if user_level < historical_level:
|
||||||
raise AuthError(
|
raise AuthError(
|
||||||
403,
|
403,
|
||||||
'You don\'t have permission to send send historical related events ("insertion", "chunk", and "marker")',
|
'You don\'t have permission to send send historical related events ("insertion", "batch", and "marker")',
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -141,9 +141,9 @@ def prune_event_dict(room_version: RoomVersion, event_dict: dict) -> dict:
|
||||||
elif event_type == EventTypes.Redaction and room_version.msc2176_redaction_rules:
|
elif event_type == EventTypes.Redaction and room_version.msc2176_redaction_rules:
|
||||||
add_fields("redacts")
|
add_fields("redacts")
|
||||||
elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_INSERTION:
|
elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_INSERTION:
|
||||||
add_fields(EventContentFields.MSC2716_NEXT_CHUNK_ID)
|
add_fields(EventContentFields.MSC2716_NEXT_BATCH_ID)
|
||||||
elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_CHUNK:
|
elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_BATCH:
|
||||||
add_fields(EventContentFields.MSC2716_CHUNK_ID)
|
add_fields(EventContentFields.MSC2716_BATCH_ID)
|
||||||
elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_MARKER:
|
elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_MARKER:
|
||||||
add_fields(EventContentFields.MSC2716_MARKER_INSERTION)
|
add_fields(EventContentFields.MSC2716_MARKER_INSERTION)
|
||||||
|
|
||||||
|
|
|
@ -1425,7 +1425,7 @@ class EventCreationHandler:
|
||||||
# structural protocol level).
|
# structural protocol level).
|
||||||
is_msc2716_event = (
|
is_msc2716_event = (
|
||||||
original_event.type == EventTypes.MSC2716_INSERTION
|
original_event.type == EventTypes.MSC2716_INSERTION
|
||||||
or original_event.type == EventTypes.MSC2716_CHUNK
|
or original_event.type == EventTypes.MSC2716_BATCH
|
||||||
or original_event.type == EventTypes.MSC2716_MARKER
|
or original_event.type == EventTypes.MSC2716_MARKER
|
||||||
)
|
)
|
||||||
if not room_version_obj.msc2716_historical and is_msc2716_event:
|
if not room_version_obj.msc2716_historical and is_msc2716_event:
|
||||||
|
|
|
@ -43,25 +43,25 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
class RoomBatchSendEventRestServlet(RestServlet):
|
class RoomBatchSendEventRestServlet(RestServlet):
|
||||||
"""
|
"""
|
||||||
API endpoint which can insert a chunk of events historically back in time
|
API endpoint which can insert a batch of events historically back in time
|
||||||
next to the given `prev_event`.
|
next to the given `prev_event`.
|
||||||
|
|
||||||
`chunk_id` comes from `next_chunk_id `in the response of the batch send
|
`batch_id` comes from `next_batch_id `in the response of the batch send
|
||||||
endpoint and is derived from the "insertion" events added to each chunk.
|
endpoint and is derived from the "insertion" events added to each batch.
|
||||||
It's not required for the first batch send.
|
It's not required for the first batch send.
|
||||||
|
|
||||||
`state_events_at_start` is used to define the historical state events
|
`state_events_at_start` is used to define the historical state events
|
||||||
needed to auth the events like join events. These events will float
|
needed to auth the events like join events. These events will float
|
||||||
outside of the normal DAG as outlier's and won't be visible in the chat
|
outside of the normal DAG as outlier's and won't be visible in the chat
|
||||||
history which also allows us to insert multiple chunks without having a bunch
|
history which also allows us to insert multiple batches without having a bunch
|
||||||
of `@mxid joined the room` noise between each chunk.
|
of `@mxid joined the room` noise between each batch.
|
||||||
|
|
||||||
`events` is chronological chunk/list of events you want to insert.
|
`events` is chronological list of events you want to insert.
|
||||||
There is a reverse-chronological constraint on chunks so once you insert
|
There is a reverse-chronological constraint on batches so once you insert
|
||||||
some messages, you can only insert older ones after that.
|
some messages, you can only insert older ones after that.
|
||||||
tldr; Insert chunks from your most recent history -> oldest history.
|
tldr; Insert batches from your most recent history -> oldest history.
|
||||||
|
|
||||||
POST /_matrix/client/unstable/org.matrix.msc2716/rooms/<roomID>/batch_send?prev_event_id=<eventID>&chunk_id=<chunkID>
|
POST /_matrix/client/unstable/org.matrix.msc2716/rooms/<roomID>/batch_send?prev_event_id=<eventID>&batch_id=<batchID>
|
||||||
{
|
{
|
||||||
"events": [ ... ],
|
"events": [ ... ],
|
||||||
"state_events_at_start": [ ... ]
|
"state_events_at_start": [ ... ]
|
||||||
|
@ -129,7 +129,7 @@ class RoomBatchSendEventRestServlet(RestServlet):
|
||||||
self, sender: str, room_id: str, origin_server_ts: int
|
self, sender: str, room_id: str, origin_server_ts: int
|
||||||
) -> JsonDict:
|
) -> JsonDict:
|
||||||
"""Creates an event dict for an "insertion" event with the proper fields
|
"""Creates an event dict for an "insertion" event with the proper fields
|
||||||
and a random chunk ID.
|
and a random batch ID.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
sender: The event author MXID
|
sender: The event author MXID
|
||||||
|
@ -140,13 +140,13 @@ class RoomBatchSendEventRestServlet(RestServlet):
|
||||||
The new event dictionary to insert.
|
The new event dictionary to insert.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
next_chunk_id = random_string(8)
|
next_batch_id = random_string(8)
|
||||||
insertion_event = {
|
insertion_event = {
|
||||||
"type": EventTypes.MSC2716_INSERTION,
|
"type": EventTypes.MSC2716_INSERTION,
|
||||||
"sender": sender,
|
"sender": sender,
|
||||||
"room_id": room_id,
|
"room_id": room_id,
|
||||||
"content": {
|
"content": {
|
||||||
EventContentFields.MSC2716_NEXT_CHUNK_ID: next_chunk_id,
|
EventContentFields.MSC2716_NEXT_BATCH_ID: next_batch_id,
|
||||||
EventContentFields.MSC2716_HISTORICAL: True,
|
EventContentFields.MSC2716_HISTORICAL: True,
|
||||||
},
|
},
|
||||||
"origin_server_ts": origin_server_ts,
|
"origin_server_ts": origin_server_ts,
|
||||||
|
@ -191,7 +191,7 @@ class RoomBatchSendEventRestServlet(RestServlet):
|
||||||
prev_event_ids_from_query = parse_strings_from_args(
|
prev_event_ids_from_query = parse_strings_from_args(
|
||||||
request.args, "prev_event_id"
|
request.args, "prev_event_id"
|
||||||
)
|
)
|
||||||
chunk_id_from_query = parse_string(request, "chunk_id")
|
batch_id_from_query = parse_string(request, "batch_id")
|
||||||
|
|
||||||
if prev_event_ids_from_query is None:
|
if prev_event_ids_from_query is None:
|
||||||
raise SynapseError(
|
raise SynapseError(
|
||||||
|
@ -291,27 +291,27 @@ class RoomBatchSendEventRestServlet(RestServlet):
|
||||||
prev_event_ids_from_query
|
prev_event_ids_from_query
|
||||||
)
|
)
|
||||||
|
|
||||||
# Figure out which chunk to connect to. If they passed in
|
# Figure out which batch to connect to. If they passed in
|
||||||
# chunk_id_from_query let's use it. The chunk ID passed in comes
|
# batch_id_from_query let's use it. The batch ID passed in comes
|
||||||
# from the chunk_id in the "insertion" event from the previous chunk.
|
# from the batch_id in the "insertion" event from the previous batch.
|
||||||
last_event_in_chunk = events_to_create[-1]
|
last_event_in_batch = events_to_create[-1]
|
||||||
chunk_id_to_connect_to = chunk_id_from_query
|
batch_id_to_connect_to = batch_id_from_query
|
||||||
base_insertion_event = None
|
base_insertion_event = None
|
||||||
if chunk_id_from_query:
|
if batch_id_from_query:
|
||||||
# All but the first base insertion event should point at a fake
|
# All but the first base insertion event should point at a fake
|
||||||
# event, which causes the HS to ask for the state at the start of
|
# event, which causes the HS to ask for the state at the start of
|
||||||
# the chunk later.
|
# the batch later.
|
||||||
prev_event_ids = [fake_prev_event_id]
|
prev_event_ids = [fake_prev_event_id]
|
||||||
|
|
||||||
# Verify the chunk_id_from_query corresponds to an actual insertion event
|
# Verify the batch_id_from_query corresponds to an actual insertion event
|
||||||
# and have the chunk connected.
|
# and have the batch connected.
|
||||||
corresponding_insertion_event_id = (
|
corresponding_insertion_event_id = (
|
||||||
await self.store.get_insertion_event_by_chunk_id(chunk_id_from_query)
|
await self.store.get_insertion_event_by_batch_id(batch_id_from_query)
|
||||||
)
|
)
|
||||||
if corresponding_insertion_event_id is None:
|
if corresponding_insertion_event_id is None:
|
||||||
raise SynapseError(
|
raise SynapseError(
|
||||||
400,
|
400,
|
||||||
"No insertion event corresponds to the given ?chunk_id",
|
"No insertion event corresponds to the given ?batch_id",
|
||||||
errcode=Codes.INVALID_PARAM,
|
errcode=Codes.INVALID_PARAM,
|
||||||
)
|
)
|
||||||
pass
|
pass
|
||||||
|
@ -328,7 +328,7 @@ class RoomBatchSendEventRestServlet(RestServlet):
|
||||||
base_insertion_event_dict = self._create_insertion_event_dict(
|
base_insertion_event_dict = self._create_insertion_event_dict(
|
||||||
sender=requester.user.to_string(),
|
sender=requester.user.to_string(),
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
origin_server_ts=last_event_in_chunk["origin_server_ts"],
|
origin_server_ts=last_event_in_batch["origin_server_ts"],
|
||||||
)
|
)
|
||||||
base_insertion_event_dict["prev_events"] = prev_event_ids.copy()
|
base_insertion_event_dict["prev_events"] = prev_event_ids.copy()
|
||||||
|
|
||||||
|
@ -347,38 +347,38 @@ class RoomBatchSendEventRestServlet(RestServlet):
|
||||||
depth=inherited_depth,
|
depth=inherited_depth,
|
||||||
)
|
)
|
||||||
|
|
||||||
chunk_id_to_connect_to = base_insertion_event["content"][
|
batch_id_to_connect_to = base_insertion_event["content"][
|
||||||
EventContentFields.MSC2716_NEXT_CHUNK_ID
|
EventContentFields.MSC2716_NEXT_BATCH_ID
|
||||||
]
|
]
|
||||||
|
|
||||||
# Connect this current chunk to the insertion event from the previous chunk
|
# Connect this current batch to the insertion event from the previous batch
|
||||||
chunk_event = {
|
batch_event = {
|
||||||
"type": EventTypes.MSC2716_CHUNK,
|
"type": EventTypes.MSC2716_BATCH,
|
||||||
"sender": requester.user.to_string(),
|
"sender": requester.user.to_string(),
|
||||||
"room_id": room_id,
|
"room_id": room_id,
|
||||||
"content": {
|
"content": {
|
||||||
EventContentFields.MSC2716_CHUNK_ID: chunk_id_to_connect_to,
|
EventContentFields.MSC2716_BATCH_ID: batch_id_to_connect_to,
|
||||||
EventContentFields.MSC2716_HISTORICAL: True,
|
EventContentFields.MSC2716_HISTORICAL: True,
|
||||||
},
|
},
|
||||||
# Since the chunk event is put at the end of the chunk,
|
# Since the batch event is put at the end of the batch,
|
||||||
# where the newest-in-time event is, copy the origin_server_ts from
|
# where the newest-in-time event is, copy the origin_server_ts from
|
||||||
# the last event we're inserting
|
# the last event we're inserting
|
||||||
"origin_server_ts": last_event_in_chunk["origin_server_ts"],
|
"origin_server_ts": last_event_in_batch["origin_server_ts"],
|
||||||
}
|
}
|
||||||
# Add the chunk event to the end of the chunk (newest-in-time)
|
# Add the batch event to the end of the batch (newest-in-time)
|
||||||
events_to_create.append(chunk_event)
|
events_to_create.append(batch_event)
|
||||||
|
|
||||||
# Add an "insertion" event to the start of each chunk (next to the oldest-in-time
|
# Add an "insertion" event to the start of each batch (next to the oldest-in-time
|
||||||
# event in the chunk) so the next chunk can be connected to this one.
|
# event in the batch) so the next batch can be connected to this one.
|
||||||
insertion_event = self._create_insertion_event_dict(
|
insertion_event = self._create_insertion_event_dict(
|
||||||
sender=requester.user.to_string(),
|
sender=requester.user.to_string(),
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
# Since the insertion event is put at the start of the chunk,
|
# Since the insertion event is put at the start of the batch,
|
||||||
# where the oldest-in-time event is, copy the origin_server_ts from
|
# where the oldest-in-time event is, copy the origin_server_ts from
|
||||||
# the first event we're inserting
|
# the first event we're inserting
|
||||||
origin_server_ts=events_to_create[0]["origin_server_ts"],
|
origin_server_ts=events_to_create[0]["origin_server_ts"],
|
||||||
)
|
)
|
||||||
# Prepend the insertion event to the start of the chunk (oldest-in-time)
|
# Prepend the insertion event to the start of the batch (oldest-in-time)
|
||||||
events_to_create = [insertion_event] + events_to_create
|
events_to_create = [insertion_event] + events_to_create
|
||||||
|
|
||||||
event_ids = []
|
event_ids = []
|
||||||
|
@ -439,17 +439,17 @@ class RoomBatchSendEventRestServlet(RestServlet):
|
||||||
)
|
)
|
||||||
|
|
||||||
insertion_event_id = event_ids[0]
|
insertion_event_id = event_ids[0]
|
||||||
chunk_event_id = event_ids[-1]
|
batch_event_id = event_ids[-1]
|
||||||
historical_event_ids = event_ids[1:-1]
|
historical_event_ids = event_ids[1:-1]
|
||||||
|
|
||||||
response_dict = {
|
response_dict = {
|
||||||
"state_event_ids": state_event_ids_at_start,
|
"state_event_ids": state_event_ids_at_start,
|
||||||
"event_ids": historical_event_ids,
|
"event_ids": historical_event_ids,
|
||||||
"next_chunk_id": insertion_event["content"][
|
"next_batch_id": insertion_event["content"][
|
||||||
EventContentFields.MSC2716_NEXT_CHUNK_ID
|
EventContentFields.MSC2716_NEXT_BATCH_ID
|
||||||
],
|
],
|
||||||
"insertion_event_id": insertion_event_id,
|
"insertion_event_id": insertion_event_id,
|
||||||
"chunk_event_id": chunk_event_id,
|
"batch_event_id": batch_event_id,
|
||||||
}
|
}
|
||||||
if base_insertion_event is not None:
|
if base_insertion_event is not None:
|
||||||
response_dict["base_insertion_event_id"] = base_insertion_event.event_id
|
response_dict["base_insertion_event_id"] = base_insertion_event.event_id
|
||||||
|
|
|
@ -1034,13 +1034,13 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
||||||
LIMIT ?
|
LIMIT ?
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Find any chunk connections of a given insertion event
|
# Find any batch connections of a given insertion event
|
||||||
chunk_connection_query = """
|
batch_connection_query = """
|
||||||
SELECT e.depth, c.event_id FROM insertion_events AS i
|
SELECT e.depth, c.event_id FROM insertion_events AS i
|
||||||
/* Find the chunk that connects to the given insertion event */
|
/* Find the batch that connects to the given insertion event */
|
||||||
INNER JOIN chunk_events AS c
|
INNER JOIN batch_events AS c
|
||||||
ON i.next_chunk_id = c.chunk_id
|
ON i.next_batch_id = c.batch_id
|
||||||
/* Get the depth of the chunk start event from the events table */
|
/* Get the depth of the batch start event from the events table */
|
||||||
INNER JOIN events AS e USING (event_id)
|
INNER JOIN events AS e USING (event_id)
|
||||||
/* Find an insertion event which matches the given event_id */
|
/* Find an insertion event which matches the given event_id */
|
||||||
WHERE i.event_id = ?
|
WHERE i.event_id = ?
|
||||||
|
@ -1077,12 +1077,12 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
||||||
|
|
||||||
event_results.add(event_id)
|
event_results.add(event_id)
|
||||||
|
|
||||||
# Try and find any potential historical chunks of message history.
|
# Try and find any potential historical batches of message history.
|
||||||
#
|
#
|
||||||
# First we look for an insertion event connected to the current
|
# First we look for an insertion event connected to the current
|
||||||
# event (by prev_event). If we find any, we need to go and try to
|
# event (by prev_event). If we find any, we need to go and try to
|
||||||
# find any chunk events connected to the insertion event (by
|
# find any batch events connected to the insertion event (by
|
||||||
# chunk_id). If we find any, we'll add them to the queue and
|
# batch_id). If we find any, we'll add them to the queue and
|
||||||
# navigate up the DAG like normal in the next iteration of the loop.
|
# navigate up the DAG like normal in the next iteration of the loop.
|
||||||
txn.execute(
|
txn.execute(
|
||||||
connected_insertion_event_query, (event_id, limit - len(event_results))
|
connected_insertion_event_query, (event_id, limit - len(event_results))
|
||||||
|
@ -1097,17 +1097,17 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
||||||
connected_insertion_event = row[1]
|
connected_insertion_event = row[1]
|
||||||
queue.put((-connected_insertion_event_depth, connected_insertion_event))
|
queue.put((-connected_insertion_event_depth, connected_insertion_event))
|
||||||
|
|
||||||
# Find any chunk connections for the given insertion event
|
# Find any batch connections for the given insertion event
|
||||||
txn.execute(
|
txn.execute(
|
||||||
chunk_connection_query,
|
batch_connection_query,
|
||||||
(connected_insertion_event, limit - len(event_results)),
|
(connected_insertion_event, limit - len(event_results)),
|
||||||
)
|
)
|
||||||
chunk_start_event_id_results = txn.fetchall()
|
batch_start_event_id_results = txn.fetchall()
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"_get_backfill_events: chunk_start_event_id_results %s",
|
"_get_backfill_events: batch_start_event_id_results %s",
|
||||||
chunk_start_event_id_results,
|
batch_start_event_id_results,
|
||||||
)
|
)
|
||||||
for row in chunk_start_event_id_results:
|
for row in batch_start_event_id_results:
|
||||||
if row[1] not in event_results:
|
if row[1] not in event_results:
|
||||||
queue.put((-row[0], row[1]))
|
queue.put((-row[0], row[1]))
|
||||||
|
|
||||||
|
|
|
@ -1509,7 +1509,7 @@ class PersistEventsStore:
|
||||||
self._handle_event_relations(txn, event)
|
self._handle_event_relations(txn, event)
|
||||||
|
|
||||||
self._handle_insertion_event(txn, event)
|
self._handle_insertion_event(txn, event)
|
||||||
self._handle_chunk_event(txn, event)
|
self._handle_batch_event(txn, event)
|
||||||
|
|
||||||
# Store the labels for this event.
|
# Store the labels for this event.
|
||||||
labels = event.content.get(EventContentFields.LABELS)
|
labels = event.content.get(EventContentFields.LABELS)
|
||||||
|
@ -1790,23 +1790,23 @@ class PersistEventsStore:
|
||||||
):
|
):
|
||||||
return
|
return
|
||||||
|
|
||||||
next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
|
next_batch_id = event.content.get(EventContentFields.MSC2716_NEXT_BATCH_ID)
|
||||||
if next_chunk_id is None:
|
if next_batch_id is None:
|
||||||
# Invalid insertion event without next chunk ID
|
# Invalid insertion event without next batch ID
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event
|
"_handle_insertion_event (next_batch_id=%s) %s", next_batch_id, event
|
||||||
)
|
)
|
||||||
|
|
||||||
# Keep track of the insertion event and the chunk ID
|
# Keep track of the insertion event and the batch ID
|
||||||
self.db_pool.simple_insert_txn(
|
self.db_pool.simple_insert_txn(
|
||||||
txn,
|
txn,
|
||||||
table="insertion_events",
|
table="insertion_events",
|
||||||
values={
|
values={
|
||||||
"event_id": event.event_id,
|
"event_id": event.event_id,
|
||||||
"room_id": event.room_id,
|
"room_id": event.room_id,
|
||||||
"next_chunk_id": next_chunk_id,
|
"next_batch_id": next_batch_id,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1822,8 +1822,8 @@ class PersistEventsStore:
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
def _handle_chunk_event(self, txn: LoggingTransaction, event: EventBase):
|
def _handle_batch_event(self, txn: LoggingTransaction, event: EventBase):
|
||||||
"""Handles inserting the chunk edges/connections between the chunk event
|
"""Handles inserting the batch edges/connections between the batch event
|
||||||
and an insertion event. Part of MSC2716.
|
and an insertion event. Part of MSC2716.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
@ -1831,11 +1831,11 @@ class PersistEventsStore:
|
||||||
event: The event to process
|
event: The event to process
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if event.type != EventTypes.MSC2716_CHUNK:
|
if event.type != EventTypes.MSC2716_BATCH:
|
||||||
# Not a chunk event
|
# Not a batch event
|
||||||
return
|
return
|
||||||
|
|
||||||
# Skip processing a chunk event if the room version doesn't
|
# Skip processing a batch event if the room version doesn't
|
||||||
# support it or the event is not from the room creator.
|
# support it or the event is not from the room creator.
|
||||||
room_version = self.store.get_room_version_txn(txn, event.room_id)
|
room_version = self.store.get_room_version_txn(txn, event.room_id)
|
||||||
room_creator = self.db_pool.simple_select_one_onecol_txn(
|
room_creator = self.db_pool.simple_select_one_onecol_txn(
|
||||||
|
@ -1852,35 +1852,35 @@ class PersistEventsStore:
|
||||||
):
|
):
|
||||||
return
|
return
|
||||||
|
|
||||||
chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
|
batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID)
|
||||||
if chunk_id is None:
|
if batch_id is None:
|
||||||
# Invalid chunk event without a chunk ID
|
# Invalid batch event without a batch ID
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.debug("_handle_chunk_event chunk_id=%s %s", chunk_id, event)
|
logger.debug("_handle_batch_event batch_id=%s %s", batch_id, event)
|
||||||
|
|
||||||
# Keep track of the insertion event and the chunk ID
|
# Keep track of the insertion event and the batch ID
|
||||||
self.db_pool.simple_insert_txn(
|
self.db_pool.simple_insert_txn(
|
||||||
txn,
|
txn,
|
||||||
table="chunk_events",
|
table="batch_events",
|
||||||
values={
|
values={
|
||||||
"event_id": event.event_id,
|
"event_id": event.event_id,
|
||||||
"room_id": event.room_id,
|
"room_id": event.room_id,
|
||||||
"chunk_id": chunk_id,
|
"batch_id": batch_id,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
# When we receive an event with a `chunk_id` referencing the
|
# When we receive an event with a `batch_id` referencing the
|
||||||
# `next_chunk_id` of the insertion event, we can remove it from the
|
# `next_batch_id` of the insertion event, we can remove it from the
|
||||||
# `insertion_event_extremities` table.
|
# `insertion_event_extremities` table.
|
||||||
sql = """
|
sql = """
|
||||||
DELETE FROM insertion_event_extremities WHERE event_id IN (
|
DELETE FROM insertion_event_extremities WHERE event_id IN (
|
||||||
SELECT event_id FROM insertion_events
|
SELECT event_id FROM insertion_events
|
||||||
WHERE next_chunk_id = ?
|
WHERE next_batch_id = ?
|
||||||
)
|
)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
txn.execute(sql, (chunk_id,))
|
txn.execute(sql, (batch_id,))
|
||||||
|
|
||||||
def _handle_redaction(self, txn, redacted_event_id):
|
def _handle_redaction(self, txn, redacted_event_id):
|
||||||
"""Handles receiving a redaction and checking whether we need to remove
|
"""Handles receiving a redaction and checking whether we need to remove
|
||||||
|
|
|
@ -18,11 +18,11 @@ from synapse.storage._base import SQLBaseStore
|
||||||
|
|
||||||
|
|
||||||
class RoomBatchStore(SQLBaseStore):
|
class RoomBatchStore(SQLBaseStore):
|
||||||
async def get_insertion_event_by_chunk_id(self, chunk_id: str) -> Optional[str]:
|
async def get_insertion_event_by_batch_id(self, batch_id: str) -> Optional[str]:
|
||||||
"""Retrieve a insertion event ID.
|
"""Retrieve a insertion event ID.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
chunk_id: The chunk ID of the insertion event to retrieve.
|
batch_id: The batch ID of the insertion event to retrieve.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The event_id of an insertion event, or None if there is no known
|
The event_id of an insertion event, or None if there is no known
|
||||||
|
@ -30,7 +30,7 @@ class RoomBatchStore(SQLBaseStore):
|
||||||
"""
|
"""
|
||||||
return await self.db_pool.simple_select_one_onecol(
|
return await self.db_pool.simple_select_one_onecol(
|
||||||
table="insertion_events",
|
table="insertion_events",
|
||||||
keyvalues={"next_chunk_id": chunk_id},
|
keyvalues={"next_batch_id": batch_id},
|
||||||
retcol="event_id",
|
retcol="event_id",
|
||||||
allow_none=True,
|
allow_none=True,
|
||||||
)
|
)
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
|
|
||||||
# When updating these values, please leave a short summary of the changes below.
|
# When updating these values, please leave a short summary of the changes below.
|
||||||
|
|
||||||
SCHEMA_VERSION = 63
|
SCHEMA_VERSION = 64
|
||||||
"""Represents the expectations made by the codebase about the database schema
|
"""Represents the expectations made by the codebase about the database schema
|
||||||
|
|
||||||
This should be incremented whenever the codebase changes its requirements on the
|
This should be incremented whenever the codebase changes its requirements on the
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
/* Copyright 2021 The Matrix.org Foundation C.I.C
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
ALTER TABLE insertion_events RENAME COLUMN next_chunk_id TO next_batch_id;
|
||||||
|
DROP INDEX insertion_events_next_chunk_id;
|
||||||
|
CREATE INDEX IF NOT EXISTS insertion_events_next_batch_id ON insertion_events(next_batch_id);
|
||||||
|
|
||||||
|
ALTER TABLE chunk_events RENAME TO batch_events;
|
||||||
|
ALTER TABLE batch_events RENAME COLUMN chunk_id TO batch_id;
|
||||||
|
DROP INDEX chunk_events_chunk_id;
|
||||||
|
CREATE INDEX IF NOT EXISTS batch_events_batch_id ON batch_events(batch_id);
|
|
@ -0,0 +1,37 @@
|
||||||
|
/* Copyright 2021 The Matrix.org Foundation C.I.C
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
-- Re-create the insertion_events table since SQLite doesn't support better
|
||||||
|
-- renames for columns (next_chunk_id -> next_batch_id)
|
||||||
|
DROP TABLE insertion_events;
|
||||||
|
CREATE TABLE IF NOT EXISTS insertion_events(
|
||||||
|
event_id TEXT NOT NULL,
|
||||||
|
room_id TEXT NOT NULL,
|
||||||
|
next_batch_id TEXT NOT NULL
|
||||||
|
);
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS insertion_events_event_id ON insertion_events(event_id);
|
||||||
|
CREATE INDEX IF NOT EXISTS insertion_events_next_batch_id ON insertion_events(next_batch_id);
|
||||||
|
|
||||||
|
-- Re-create the chunk_events table since SQLite doesn't support better renames
|
||||||
|
-- for columns (chunk_id -> batch_id)
|
||||||
|
DROP TABLE chunk_events;
|
||||||
|
CREATE TABLE IF NOT EXISTS batch_events(
|
||||||
|
event_id TEXT NOT NULL,
|
||||||
|
room_id TEXT NOT NULL,
|
||||||
|
batch_id TEXT NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS batch_events_event_id ON batch_events(event_id);
|
||||||
|
CREATE INDEX IF NOT EXISTS batch_events_batch_id ON batch_events(batch_id);
|
Loading…
Reference in a new issue