Merge pull request #227 from matrix-org/erikj/receipts_take2

Re-enable receipts API.
This commit is contained in:
Erik Johnston 2015-08-18 16:30:04 +01:00
commit 38a965b816
4 changed files with 91 additions and 17 deletions

View file

@ -171,7 +171,6 @@ class ReceiptEventSource(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def get_new_events_for_user(self, user, from_key, limit): def get_new_events_for_user(self, user, from_key, limit):
defer.returnValue(([], from_key))
from_key = int(from_key) from_key = int(from_key)
to_key = yield self.get_current_key() to_key = yield self.get_current_key()
@ -194,7 +193,6 @@ class ReceiptEventSource(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def get_pagination_rows(self, user, config, key): def get_pagination_rows(self, user, config, key):
to_key = int(config.from_key) to_key = int(config.from_key)
defer.returnValue(([], to_key))
if config.to_key: if config.to_key:
from_key = int(config.to_key) from_key = int(config.to_key)

View file

@ -54,7 +54,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database # Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts. # schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 21 SCHEMA_VERSION = 22
dir_path = os.path.abspath(os.path.dirname(__file__)) dir_path = os.path.abspath(os.path.dirname(__file__))

View file

@ -14,12 +14,11 @@
# limitations under the License. # limitations under the License.
from ._base import SQLBaseStore from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
from synapse.util.caches import cache_counter, caches_by_name
from twisted.internet import defer from twisted.internet import defer
from synapse.util import unwrapFirstError
from blist import sorteddict from blist import sorteddict
import logging import logging
import ujson as json import ujson as json
@ -54,19 +53,13 @@ class ReceiptsStore(SQLBaseStore):
self, room_ids, from_key self, room_ids, from_key
) )
results = yield defer.gatherResults( results = yield self._get_linearized_receipts_for_rooms(
[ room_ids, to_key, from_key=from_key
self.get_linearized_receipts_for_room( )
room_id, to_key, from_key=from_key
)
for room_id in room_ids
],
consumeErrors=True,
).addErrback(unwrapFirstError)
defer.returnValue([ev for res in results for ev in res]) defer.returnValue([ev for res in results.values() for ev in res])
@defer.inlineCallbacks @cachedInlineCallbacks(num_args=3, max_entries=5000)
def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None): def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
"""Get receipts for a single room for sending to clients. """Get receipts for a single room for sending to clients.
@ -126,6 +119,66 @@ class ReceiptsStore(SQLBaseStore):
"content": content, "content": content,
}]) }])
@cachedList(cache=get_linearized_receipts_for_room.cache, list_name="room_ids",
num_args=3, inlineCallbacks=True)
def _get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
if not room_ids:
defer.returnValue({})
def f(txn):
if from_key:
sql = (
"SELECT * FROM receipts_linearized WHERE"
" room_id IN (%s) AND stream_id > ? AND stream_id <= ?"
) % (
",".join(["?"] * len(room_ids))
)
args = list(room_ids)
args.extend([from_key, to_key])
txn.execute(sql, args)
else:
sql = (
"SELECT * FROM receipts_linearized WHERE"
" room_id IN (%s) AND stream_id <= ?"
) % (
",".join(["?"] * len(room_ids))
)
args = list(room_ids)
args.append(to_key)
txn.execute(sql, args)
return self.cursor_to_dict(txn)
txn_results = yield self.runInteraction(
"_get_linearized_receipts_for_rooms", f
)
results = {}
for row in txn_results:
# We want a single event per room, since we want to batch the
# receipts by room, event and type.
room_event = results.setdefault(row["room_id"], {
"type": "m.receipt",
"room_id": row["room_id"],
"content": {},
})
# The content is of the form:
# {"$foo:bar": { "read": { "@user:host": <receipt> }, .. }, .. }
event_entry = room_event["content"].setdefault(row["event_id"], {})
receipt_type = event_entry.setdefault(row["receipt_type"], {})
receipt_type[row["user_id"]] = json.loads(row["data"])
results = {
room_id: [results[room_id]] if room_id in results else []
for room_id in room_ids
}
defer.returnValue(results)
def get_max_receipt_stream_id(self): def get_max_receipt_stream_id(self):
return self._receipts_id_gen.get_max_token(self) return self._receipts_id_gen.get_max_token(self)
@ -305,6 +358,8 @@ class _RoomStreamChangeCache(object):
self._room_to_key = {} self._room_to_key = {}
self._cache = sorteddict() self._cache = sorteddict()
self._earliest_key = None self._earliest_key = None
self.name = "ReceiptsRoomChangeCache"
caches_by_name[self.name] = self._cache
@defer.inlineCallbacks @defer.inlineCallbacks
def get_rooms_changed(self, store, room_ids, key): def get_rooms_changed(self, store, room_ids, key):
@ -318,8 +373,11 @@ class _RoomStreamChangeCache(object):
result = set( result = set(
self._cache[k] for k in keys[i:] self._cache[k] for k in keys[i:]
).intersection(room_ids) ).intersection(room_ids)
cache_counter.inc_hits(self.name)
else: else:
result = room_ids result = room_ids
cache_counter.inc_misses(self.name)
defer.returnValue(result) defer.returnValue(result)

View file

@ -0,0 +1,18 @@
/* Copyright 2015 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
CREATE INDEX receipts_linearized_room_stream ON receipts_linearized(
room_id, stream_id
);