diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 053ed8480..8a052f071 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -36,6 +36,7 @@ class ReceiptsHandler(BaseHandler): self.federation.register_edu_handler( "m.receipt", self._received_remote_receipt ) + self.clock = self.hs.get_clock() self._receipt_cache = None @@ -51,6 +52,9 @@ class ReceiptsHandler(BaseHandler): "receipt_type": receipt_type, "user_id": user_id, "event_ids": [event_id], + "data": { + "ts": self.clock.time_msec() + } } is_new = yield self._handle_new_receipts([receipt]) @@ -65,12 +69,12 @@ class ReceiptsHandler(BaseHandler): "room_id": room_id, "receipt_type": receipt_type, "user_id": user_id, - "event_ids": [event_id], + "event_ids": user_values["event_ids"], + "data": user_values.get("data", {}), } for room_id, room_values in content.items() - for event_id, ev_values in room_values.items() - for receipt_type, users in ev_values.items() - for user_id in users + for receipt_type, users in room_values.items() + for user_id, user_values in users.items() ] yield self._handle_new_receipts(receipts) @@ -82,9 +86,10 @@ class ReceiptsHandler(BaseHandler): receipt_type = receipt["receipt_type"] user_id = receipt["user_id"] event_ids = receipt["event_ids"] + data = receipt["data"] res = yield self.store.insert_receipt( - room_id, receipt_type, user_id, event_ids, + room_id, receipt_type, user_id, event_ids, data ) if not res: @@ -108,6 +113,7 @@ class ReceiptsHandler(BaseHandler): receipt_type = receipt["receipt_type"] user_id = receipt["user_id"] event_ids = receipt["event_ids"] + data = receipt["data"] remotedomains = set() @@ -124,10 +130,12 @@ class ReceiptsHandler(BaseHandler): edu_type="m.receipt", content={ room_id: { - event_id: { - receipt_type: [user_id] + receipt_type: { + user_id: { + "event_ids": event_ids, + "data": data, + } } - for event_id in event_ids }, }, ) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 503f68f85..c4e6b02bd 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -21,6 +21,7 @@ from synapse.util import unwrapFirstError from blist import sorteddict import logging +import ujson as json logger = logging.getLogger(__name__) @@ -91,8 +92,8 @@ class ReceiptsStore(SQLBaseStore): content.setdefault( row["event_id"], {} ).setdefault( - row["receipt_type"], [] - ).append(row["user_id"]) + row["receipt_type"], {} + )[row["user_id"]] = json.loads(row["data"]) defer.returnValue([{ "type": "m.receipt", @@ -124,7 +125,7 @@ class ReceiptsStore(SQLBaseStore): defer.returnValue(result) def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, - user_id, event_id, stream_id): + user_id, event_id, data, stream_id): # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts @@ -172,13 +173,14 @@ class ReceiptsStore(SQLBaseStore): "receipt_type": receipt_type, "user_id": user_id, "event_id": event_id, + "data": json.dumps(data), } ) return True @defer.inlineCallbacks - def insert_receipt(self, room_id, receipt_type, user_id, event_ids): + def insert_receipt(self, room_id, receipt_type, user_id, event_ids, data): if not event_ids: return @@ -214,6 +216,7 @@ class ReceiptsStore(SQLBaseStore): "insert_linearized_receipt", self.insert_linearized_receipt_txn, room_id, receipt_type, user_id, linearized_event_id, + data, stream_id=stream_id, ) @@ -221,22 +224,22 @@ class ReceiptsStore(SQLBaseStore): defer.returnValue(None) yield self.insert_graph_receipt( - room_id, receipt_type, user_id, event_ids + room_id, receipt_type, user_id, event_ids, data ) max_persisted_id = yield self._stream_id_gen.get_max_token(self) defer.returnValue((stream_id, max_persisted_id)) - def insert_graph_receipt(self, room_id, receipt_type, - user_id, event_ids): + def insert_graph_receipt(self, room_id, receipt_type, user_id, event_ids, + data): return self.runInteraction( "insert_graph_receipt", self.insert_graph_receipt_txn, - room_id, receipt_type, user_id, event_ids, + room_id, receipt_type, user_id, event_ids, data ) def insert_graph_receipt_txn(self, txn, room_id, receipt_type, - user_id, event_ids): + user_id, event_ids, data): self._simple_delete_txn( txn, table="receipts_graph", @@ -246,18 +249,16 @@ class ReceiptsStore(SQLBaseStore): "user_id": user_id, } ) - self._simple_insert_many_txn( + self._simple_insert_txn( txn, table="receipts_graph", - values=[ - { - "room_id": room_id, - "receipt_type": receipt_type, - "user_id": user_id, - "event_id": event_id, - } - for event_id in event_ids - ], + values={ + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + "event_ids": json.dumps(event_ids), + "data": json.dumps(data), + } ) diff --git a/synapse/storage/schema/delta/21/receipts.sql b/synapse/storage/schema/delta/21/receipts.sql index ac7738e37..2f64d609f 100644 --- a/synapse/storage/schema/delta/21/receipts.sql +++ b/synapse/storage/schema/delta/21/receipts.sql @@ -18,11 +18,9 @@ CREATE TABLE IF NOT EXISTS receipts_graph( room_id TEXT NOT NULL, receipt_type TEXT NOT NULL, user_id TEXT NOT NULL, - event_id TEXT NOT NULL -); - -CREATE INDEX receipts_graph_room_tuple ON receipts_graph( - room_id, receipt_type, user_id + event_ids TEXT NOT NULL, + data TEXT NOT NULL, + CONSTRAINT receipts_graph_uniqueness UNIQUE (room_id, receipt_type, user_id) ); CREATE TABLE IF NOT EXISTS receipts_linearized ( @@ -30,11 +28,9 @@ CREATE TABLE IF NOT EXISTS receipts_linearized ( room_id TEXT NOT NULL, receipt_type TEXT NOT NULL, user_id TEXT NOT NULL, - event_id TEXT NOT NULL -); - -CREATE INDEX receipts_linearized_room_tuple ON receipts_linearized( - room_id, receipt_type, user_id + event_id TEXT NOT NULL, + data TEXT NOT NULL, + CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id) ); CREATE INDEX receipts_linearized_id ON receipts_linearized(