0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-11-03 05:08:59 +01:00

Simplify token replication logic

This commit is contained in:
Andrew Morgan 2019-03-05 13:58:30 +00:00
parent fe7bd23a85
commit b9f6163092

View file

@ -478,7 +478,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
# Now we can send any updates that came in while we were subscribing # Now we can send any updates that came in while we were subscribing
pending_rdata = self.pending_rdata.pop(stream_name, []) pending_rdata = self.pending_rdata.pop(stream_name, [])
batch_updates = [] updates = []
for token, update in pending_rdata: for token, update in pending_rdata:
# If the token is null, it is part of a batch update. Batches # If the token is null, it is part of a batch update. Batches
# are multiple updates that share a single token. To denote # are multiple updates that share a single token. To denote
@ -489,33 +489,24 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
# final token. # final token.
if token is None: if token is None:
# Store this update as part of a batch # Store this update as part of a batch
batch_updates.append(update) updates.append(update)
continue continue
if len(batch_updates) > 0: if token <= current_token:
# There is an ongoing batch and this is the end # This update or batch of updates is older than
if current_token <= current_token: # current_token, dismiss it
# This batch is older than current_token, dismiss it updates = []
batch_updates = [] continue
else:
# This is the end of the batch. Append final update of updates.append(update)
# this batch before sending
batch_updates.append(update)
# Send all updates that are part of this batch with the # Send all updates that are part of this batch with the
# found token # found token
for update in batch_updates: for update in updates:
self.send_command(RdataCommand(stream_name, token, update)) self.send_command(RdataCommand(stream_name, token, update))
# Clear saved batch updates # Clear stored updates
batch_updates = [] updates = []
continue
# This is an update that's not part of a batch.
#
# Only send updates newer than the current token
if token > current_token:
self.send_command(RdataCommand(stream_name, token, update))
# They're now fully subscribed # They're now fully subscribed
self.replication_streams.add(stream_name) self.replication_streams.add(stream_name)