forked from MirrorHub/synapse
Merge pull request #3505 from matrix-org/erikj/receipts_cahce
Use stream cache in get_linearized_receipts_for_room
This commit is contained in:
commit
0456e05977
3 changed files with 21 additions and 7 deletions
1
changelog.d/3505.feature
Normal file
1
changelog.d/3505.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Reduce database consumption when processing large numbers of receipts
|
|
@ -49,7 +49,7 @@ class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore):
|
|||
|
||||
def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
|
||||
self.get_receipts_for_user.invalidate((user_id, receipt_type))
|
||||
self.get_linearized_receipts_for_room.invalidate_many((room_id,))
|
||||
self._get_linearized_receipts_for_room.invalidate_many((room_id,))
|
||||
self.get_last_receipt_event_id_for_user.invalidate(
|
||||
(user_id, room_id, receipt_type)
|
||||
)
|
||||
|
|
|
@ -140,7 +140,9 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
|||
"""
|
||||
room_ids = set(room_ids)
|
||||
|
||||
if from_key:
|
||||
if from_key is not None:
|
||||
# Only ask the database about rooms where there have been new
|
||||
# receipts added since `from_key`
|
||||
room_ids = yield self._receipts_stream_cache.get_entities_changed(
|
||||
room_ids, from_key
|
||||
)
|
||||
|
@ -151,7 +153,6 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
|||
|
||||
defer.returnValue([ev for res in results.values() for ev in res])
|
||||
|
||||
@cachedInlineCallbacks(num_args=3, tree=True)
|
||||
def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
|
||||
"""Get receipts for a single room for sending to clients.
|
||||
|
||||
|
@ -162,7 +163,19 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
|||
from the start.
|
||||
|
||||
Returns:
|
||||
list: A list of receipts.
|
||||
Deferred[list]: A list of receipts.
|
||||
"""
|
||||
if from_key is not None:
|
||||
# Check the cache first to see if any new receipts have been added
|
||||
# since`from_key`. If not we can no-op.
|
||||
if not self._receipts_stream_cache.has_entity_changed(room_id, from_key):
|
||||
defer.succeed([])
|
||||
|
||||
return self._get_linearized_receipts_for_room(room_id, to_key, from_key)
|
||||
|
||||
@cachedInlineCallbacks(num_args=3, tree=True)
|
||||
def _get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
|
||||
"""See get_linearized_receipts_for_room
|
||||
"""
|
||||
def f(txn):
|
||||
if from_key:
|
||||
|
@ -211,7 +224,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
|||
"content": content,
|
||||
}])
|
||||
|
||||
@cachedList(cached_method_name="get_linearized_receipts_for_room",
|
||||
@cachedList(cached_method_name="_get_linearized_receipts_for_room",
|
||||
list_name="room_ids", num_args=3, inlineCallbacks=True)
|
||||
def _get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
|
||||
if not room_ids:
|
||||
|
@ -373,7 +386,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
|
|||
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
|
||||
)
|
||||
# FIXME: This shouldn't invalidate the whole cache
|
||||
txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
|
||||
txn.call_after(self._get_linearized_receipts_for_room.invalidate_many, (room_id,))
|
||||
|
||||
txn.call_after(
|
||||
self._receipts_stream_cache.entity_has_changed,
|
||||
|
@ -493,7 +506,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
|
|||
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
|
||||
)
|
||||
# FIXME: This shouldn't invalidate the whole cache
|
||||
txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
|
||||
txn.call_after(self._get_linearized_receipts_for_room.invalidate_many, (room_id,))
|
||||
|
||||
self._simple_delete_txn(
|
||||
txn,
|
||||
|
|
Loading…
Reference in a new issue