mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-16 10:23:59 +01:00
Ignore depth when updating read-receipts
Order read receipts by stream ordering instead of depth
This commit is contained in:
parent
c2c3092cce
commit
857e6fd8b6
1 changed files with 37 additions and 30 deletions
|
@ -13,7 +13,7 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
from synapse.api.errors import NotFoundError
|
||||||
from ._base import SQLBaseStore
|
from ._base import SQLBaseStore
|
||||||
from .util.id_generators import StreamIdGenerator
|
from .util.id_generators import StreamIdGenerator
|
||||||
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached
|
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached
|
||||||
|
@ -332,6 +332,41 @@ class ReceiptsStore(ReceiptsWorkerStore):
|
||||||
|
|
||||||
def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
|
def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
|
||||||
user_id, event_id, data, stream_id):
|
user_id, event_id, data, stream_id):
|
||||||
|
res = self._simple_select_one_txn(
|
||||||
|
txn,
|
||||||
|
table="events",
|
||||||
|
retcols=["topological_ordering", "stream_ordering"],
|
||||||
|
keyvalues={"event_id": event_id},
|
||||||
|
allow_none=True
|
||||||
|
)
|
||||||
|
|
||||||
|
if not res:
|
||||||
|
raise NotFoundError(
|
||||||
|
"Cannot set read receipt on unknown event %s" % (
|
||||||
|
event_id,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
stream_ordering = int(res["stream_ordering"])
|
||||||
|
|
||||||
|
# We don't want to clobber receipts for more recent events, so we
|
||||||
|
# have to compare orderings of existing receipts
|
||||||
|
sql = (
|
||||||
|
"SELECT stream_ordering, event_id FROM events"
|
||||||
|
" INNER JOIN receipts_linearized as r USING (event_id, room_id)"
|
||||||
|
" WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
|
||||||
|
)
|
||||||
|
txn.execute(sql, (room_id, receipt_type, user_id))
|
||||||
|
|
||||||
|
for so, eid in txn:
|
||||||
|
if int(so) >= stream_ordering:
|
||||||
|
logger.debug(
|
||||||
|
"Ignoring new receipt for %s in favour of existing "
|
||||||
|
"one for later event %s",
|
||||||
|
event_id, eid,
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
txn.call_after(
|
txn.call_after(
|
||||||
self.get_receipts_for_room.invalidate, (room_id, receipt_type)
|
self.get_receipts_for_room.invalidate, (room_id, receipt_type)
|
||||||
)
|
)
|
||||||
|
@ -355,34 +390,6 @@ class ReceiptsStore(ReceiptsWorkerStore):
|
||||||
(user_id, room_id, receipt_type)
|
(user_id, room_id, receipt_type)
|
||||||
)
|
)
|
||||||
|
|
||||||
res = self._simple_select_one_txn(
|
|
||||||
txn,
|
|
||||||
table="events",
|
|
||||||
retcols=["topological_ordering", "stream_ordering"],
|
|
||||||
keyvalues={"event_id": event_id},
|
|
||||||
allow_none=True
|
|
||||||
)
|
|
||||||
|
|
||||||
topological_ordering = int(res["topological_ordering"]) if res else None
|
|
||||||
stream_ordering = int(res["stream_ordering"]) if res else None
|
|
||||||
|
|
||||||
# We don't want to clobber receipts for more recent events, so we
|
|
||||||
# have to compare orderings of existing receipts
|
|
||||||
sql = (
|
|
||||||
"SELECT topological_ordering, stream_ordering, event_id FROM events"
|
|
||||||
" INNER JOIN receipts_linearized as r USING (event_id, room_id)"
|
|
||||||
" WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
|
|
||||||
)
|
|
||||||
|
|
||||||
txn.execute(sql, (room_id, receipt_type, user_id))
|
|
||||||
|
|
||||||
if topological_ordering:
|
|
||||||
for to, so, _ in txn:
|
|
||||||
if int(to) > topological_ordering:
|
|
||||||
return False
|
|
||||||
elif int(to) == topological_ordering and int(so) >= stream_ordering:
|
|
||||||
return False
|
|
||||||
|
|
||||||
self._simple_delete_txn(
|
self._simple_delete_txn(
|
||||||
txn,
|
txn,
|
||||||
table="receipts_linearized",
|
table="receipts_linearized",
|
||||||
|
@ -406,7 +413,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
if receipt_type == "m.read" and topological_ordering:
|
if receipt_type == "m.read":
|
||||||
self._remove_old_push_actions_before_txn(
|
self._remove_old_push_actions_before_txn(
|
||||||
txn,
|
txn,
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
|
|
Loading…
Reference in a new issue