forked from MirrorHub/synapse
Merge pull request #516 from matrix-org/erikj/push_perf
Reduce number of calls to get_unread_event_push_actions_by_room
This commit is contained in:
commit
82b46f556d
2 changed files with 17 additions and 5 deletions
|
@ -125,9 +125,6 @@ class Pusher(object):
|
||||||
from_tok = StreamToken.from_string(self.last_token)
|
from_tok = StreamToken.from_string(self.last_token)
|
||||||
config = PaginationConfig(from_token=from_tok, limit='1')
|
config = PaginationConfig(from_token=from_tok, limit='1')
|
||||||
timeout = (300 + random.randint(-60, 60)) * 1000
|
timeout = (300 + random.randint(-60, 60)) * 1000
|
||||||
# note that we need to get read receipts down the stream as we need to
|
|
||||||
# wake up when one arrives. we don't need to explicitly look for
|
|
||||||
# them though.
|
|
||||||
chunk = yield self.evStreamHandler.get_stream(
|
chunk = yield self.evStreamHandler.get_stream(
|
||||||
self.user_id, config, timeout=timeout, affect_presence=False
|
self.user_id, config, timeout=timeout, affect_presence=False
|
||||||
)
|
)
|
||||||
|
@ -135,11 +132,22 @@ class Pusher(object):
|
||||||
# limiting to 1 may get 1 event plus 1 presence event, so
|
# limiting to 1 may get 1 event plus 1 presence event, so
|
||||||
# pick out the actual event
|
# pick out the actual event
|
||||||
single_event = None
|
single_event = None
|
||||||
|
read_receipt = None
|
||||||
for c in chunk['chunk']:
|
for c in chunk['chunk']:
|
||||||
if 'event_id' in c: # Hmmm...
|
if 'event_id' in c: # Hmmm...
|
||||||
single_event = c
|
single_event = c
|
||||||
|
elif c['type'] == 'm.receipt':
|
||||||
|
read_receipt = c
|
||||||
|
|
||||||
|
have_updated_badge = False
|
||||||
|
if read_receipt:
|
||||||
|
for receipt_part in read_receipt['content'].values():
|
||||||
|
if 'm.read' in receipt_part:
|
||||||
|
if self.user_id in receipt_part['m.read'].keys():
|
||||||
|
have_updated_badge = True
|
||||||
|
|
||||||
if not single_event:
|
if not single_event:
|
||||||
|
if have_updated_badge:
|
||||||
yield self.update_badge()
|
yield self.update_badge()
|
||||||
self.last_token = chunk['end']
|
self.last_token = chunk['end']
|
||||||
yield self.store.update_pusher_last_token(
|
yield self.store.update_pusher_last_token(
|
||||||
|
@ -185,6 +193,9 @@ class Pusher(object):
|
||||||
yield self.hs.get_pusherpool().remove_pusher(
|
yield self.hs.get_pusherpool().remove_pusher(
|
||||||
self.app_id, pk, self.user_id
|
self.app_id, pk, self.user_id
|
||||||
)
|
)
|
||||||
|
else:
|
||||||
|
if have_updated_badge:
|
||||||
|
yield self.update_badge()
|
||||||
processed = True
|
processed = True
|
||||||
|
|
||||||
if not self.alive:
|
if not self.alive:
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
from ._base import SQLBaseStore
|
from ._base import SQLBaseStore
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import ujson as json
|
import ujson as json
|
||||||
|
@ -46,7 +47,7 @@ class EventPushActionsStore(SQLBaseStore):
|
||||||
values
|
values
|
||||||
)
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@cachedInlineCallbacks(num_args=3)
|
||||||
def get_unread_event_push_actions_by_room_for_user(
|
def get_unread_event_push_actions_by_room_for_user(
|
||||||
self, room_id, user_id, last_read_event_id
|
self, room_id, user_id, last_read_event_id
|
||||||
):
|
):
|
||||||
|
|
Loading…
Reference in a new issue