Merge pull request #2237 from matrix-org/erikj/sync_key_count

Add count of one time keys to sync stream
This commit is contained in:
Erik Johnston 2017-05-23 11:18:13 +01:00 committed by GitHub
commit fbbc40f385
4 changed files with 34 additions and 15 deletions

View file

@ -117,6 +117,8 @@ class SyncResult(collections.namedtuple("SyncResult", [
"archived", # ArchivedSyncResult for each archived room. "archived", # ArchivedSyncResult for each archived room.
"to_device", # List of direct messages for the device. "to_device", # List of direct messages for the device.
"device_lists", # List of user_ids whose devices have chanegd "device_lists", # List of user_ids whose devices have chanegd
"device_one_time_keys_count", # Dict of algorithm to count for one time keys
# for this device
])): ])):
__slots__ = [] __slots__ = []
@ -550,6 +552,14 @@ class SyncHandler(object):
sync_result_builder sync_result_builder
) )
device_id = sync_config.device_id
one_time_key_counts = {}
if device_id:
user_id = sync_config.user.to_string()
one_time_key_counts = yield self.store.count_e2e_one_time_keys(
user_id, device_id
)
defer.returnValue(SyncResult( defer.returnValue(SyncResult(
presence=sync_result_builder.presence, presence=sync_result_builder.presence,
account_data=sync_result_builder.account_data, account_data=sync_result_builder.account_data,
@ -558,6 +568,7 @@ class SyncHandler(object):
archived=sync_result_builder.archived, archived=sync_result_builder.archived,
to_device=sync_result_builder.to_device, to_device=sync_result_builder.to_device,
device_lists=device_lists, device_lists=device_lists,
device_one_time_keys_count=one_time_key_counts,
next_batch=sync_result_builder.now_token, next_batch=sync_result_builder.now_token,
)) ))

View file

@ -16,6 +16,7 @@
from ._base import BaseSlavedStore from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker from ._slaved_id_tracker import SlavedIdTracker
from synapse.storage import DataStore from synapse.storage import DataStore
from synapse.storage.end_to_end_keys import EndToEndKeyStore
from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.caches.stream_change_cache import StreamChangeCache
@ -45,6 +46,7 @@ class SlavedDeviceStore(BaseSlavedStore):
_mark_as_sent_devices_by_remote_txn = ( _mark_as_sent_devices_by_remote_txn = (
DataStore._mark_as_sent_devices_by_remote_txn.__func__ DataStore._mark_as_sent_devices_by_remote_txn.__func__
) )
count_e2e_one_time_keys = EndToEndKeyStore.__dict__["count_e2e_one_time_keys"]
def stream_positions(self): def stream_positions(self):
result = super(SlavedDeviceStore, self).stream_positions() result = super(SlavedDeviceStore, self).stream_positions()

View file

@ -192,6 +192,7 @@ class SyncRestServlet(RestServlet):
"invite": invited, "invite": invited,
"leave": archived, "leave": archived,
}, },
"device_one_time_keys_count": sync_result.device_one_time_keys_count,
"next_batch": sync_result.next_batch.to_string(), "next_batch": sync_result.next_batch.to_string(),
} }

View file

@ -185,8 +185,8 @@ class EndToEndKeyStore(SQLBaseStore):
for algorithm, key_id, json_bytes in new_keys for algorithm, key_id, json_bytes in new_keys
], ],
) )
txn.call_after( self._invalidate_cache_and_stream(
self.count_e2e_one_time_keys.invalidate, (user_id, device_id,) txn, self.count_e2e_one_time_keys, (user_id, device_id,)
) )
yield self.runInteraction( yield self.runInteraction(
"add_e2e_one_time_keys_insert", _add_e2e_one_time_keys "add_e2e_one_time_keys_insert", _add_e2e_one_time_keys
@ -237,24 +237,29 @@ class EndToEndKeyStore(SQLBaseStore):
) )
for user_id, device_id, algorithm, key_id in delete: for user_id, device_id, algorithm, key_id in delete:
txn.execute(sql, (user_id, device_id, algorithm, key_id)) txn.execute(sql, (user_id, device_id, algorithm, key_id))
txn.call_after( self._invalidate_cache_and_stream(
self.count_e2e_one_time_keys.invalidate, (user_id, device_id,) txn, self.count_e2e_one_time_keys, (user_id, device_id,)
) )
return result return result
return self.runInteraction( return self.runInteraction(
"claim_e2e_one_time_keys", _claim_e2e_one_time_keys "claim_e2e_one_time_keys", _claim_e2e_one_time_keys
) )
@defer.inlineCallbacks
def delete_e2e_keys_by_device(self, user_id, device_id): def delete_e2e_keys_by_device(self, user_id, device_id):
yield self._simple_delete( def delete_e2e_keys_by_device_txn(txn):
self._simple_delete_txn(
txn,
table="e2e_device_keys_json", table="e2e_device_keys_json",
keyvalues={"user_id": user_id, "device_id": device_id}, keyvalues={"user_id": user_id, "device_id": device_id},
desc="delete_e2e_device_keys_by_device"
) )
yield self._simple_delete( self._simple_delete_txn(
txn,
table="e2e_one_time_keys_json", table="e2e_one_time_keys_json",
keyvalues={"user_id": user_id, "device_id": device_id}, keyvalues={"user_id": user_id, "device_id": device_id},
desc="delete_e2e_one_time_keys_by_device"
) )
self.count_e2e_one_time_keys.invalidate((user_id, device_id,)) self._invalidate_cache_and_stream(
txn, self.count_e2e_one_time_keys, (user_id, device_id,)
)
return self.runInteraction(
"delete_e2e_keys_by_device", delete_e2e_keys_by_device_txn
)