From 6fec2d035f5584335af7c1b115ca48a13a8da5fa Mon Sep 17 00:00:00 2001
From: Erik Johnston <erikj@matrix.org>
Date: Fri, 17 Nov 2023 14:14:29 +0000
Subject: [PATCH] Also discard 'caches' and 'backfill' stream POSITIONS
 (#16655)

Follow on from #16640
---
 changelog.d/16655.misc                   |  1 +
 synapse/replication/tcp/streams/_base.py | 16 ++++++++++++++++
 2 files changed, 17 insertions(+)
 create mode 100644 changelog.d/16655.misc

diff --git a/changelog.d/16655.misc b/changelog.d/16655.misc
new file mode 100644
index 000000000..3b1cc2185
--- /dev/null
+++ b/changelog.d/16655.misc
@@ -0,0 +1 @@
+More efficiently handle no-op `POSITION` over replication.
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index cc34dfb32..1f6402c2d 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -305,6 +305,14 @@ class BackfillStream(Stream):
         # which means we need to negate it.
         return -self.store._backfill_id_gen.get_minimal_local_current_token()
 
+    def can_discard_position(
+        self, instance_name: str, prev_token: int, new_token: int
+    ) -> bool:
+        # Backfill stream can't go backwards, so we know we can ignore any
+        # positions where the tokens are from before the current token.
+
+        return new_token <= self.current_token(instance_name)
+
 
 class PresenceStream(_StreamFromIdGen):
     @attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -519,6 +527,14 @@ class CachesStream(Stream):
             return self.store._cache_id_gen.get_minimal_local_current_token()
         return self.current_token(self.local_instance_name)
 
+    def can_discard_position(
+        self, instance_name: str, prev_token: int, new_token: int
+    ) -> bool:
+        # Caches streams can't go backwards, so we know we can ignore any
+        # positions where the tokens are from before the current token.
+
+        return new_token <= self.current_token(instance_name)
+
 
 class DeviceListsStream(_StreamFromIdGen):
     """Either a user has updated their devices or a remote server needs to be