forked from MirrorHub/synapse
Merge pull request #705 from matrix-org/dbkr/pushers_use_event_actions
Change pushers to use the event_actions table
This commit is contained in:
commit
2547dffccc
19 changed files with 680 additions and 628 deletions
|
@ -21,7 +21,7 @@ from synapse.api.constants import Membership, EventTypes
|
|||
from synapse.types import UserID, RoomAlias, Requester
|
||||
from synapse.push.action_generator import ActionGenerator
|
||||
|
||||
from synapse.util.logcontext import PreserveLoggingContext
|
||||
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
|
||||
|
||||
import logging
|
||||
|
||||
|
@ -406,6 +406,12 @@ class BaseHandler(object):
|
|||
event, context=context
|
||||
)
|
||||
|
||||
# this intentionally does not yield: we don't care about the result
|
||||
# and don't need to wait for it.
|
||||
preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
|
||||
event_stream_id, max_stream_id
|
||||
)
|
||||
|
||||
destinations = set()
|
||||
for k, s in context.current_state.items():
|
||||
try:
|
||||
|
|
|
@ -26,7 +26,7 @@ from synapse.api.errors import (
|
|||
from synapse.api.constants import EventTypes, Membership, RejectedReason
|
||||
from synapse.events.validator import EventValidator
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.logcontext import PreserveLoggingContext
|
||||
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.util.async import run_on_reactor
|
||||
from synapse.util.frozenutils import unfreeze
|
||||
|
@ -1094,6 +1094,12 @@ class FederationHandler(BaseHandler):
|
|||
context=context,
|
||||
)
|
||||
|
||||
# this intentionally does not yield: we don't care about the result
|
||||
# and don't need to wait for it.
|
||||
preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
|
||||
event_stream_id, max_stream_id
|
||||
)
|
||||
|
||||
defer.returnValue((context, event_stream_id, max_stream_id))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
|
@ -80,6 +80,9 @@ class ReceiptsHandler(BaseHandler):
|
|||
def _handle_new_receipts(self, receipts):
|
||||
"""Takes a list of receipts, stores them and informs the notifier.
|
||||
"""
|
||||
min_batch_id = None
|
||||
max_batch_id = None
|
||||
|
||||
for receipt in receipts:
|
||||
room_id = receipt["room_id"]
|
||||
receipt_type = receipt["receipt_type"]
|
||||
|
@ -97,10 +100,21 @@ class ReceiptsHandler(BaseHandler):
|
|||
|
||||
stream_id, max_persisted_id = res
|
||||
|
||||
with PreserveLoggingContext():
|
||||
self.notifier.on_new_event(
|
||||
"receipt_key", max_persisted_id, rooms=[room_id]
|
||||
)
|
||||
if min_batch_id is None or stream_id < min_batch_id:
|
||||
min_batch_id = stream_id
|
||||
if max_batch_id is None or max_persisted_id > max_batch_id:
|
||||
max_batch_id = max_persisted_id
|
||||
|
||||
affected_room_ids = list(set([r["room_id"] for r in receipts]))
|
||||
|
||||
with PreserveLoggingContext():
|
||||
self.notifier.on_new_event(
|
||||
"receipt_key", max_batch_id, rooms=affected_room_ids
|
||||
)
|
||||
# Note that the min here shouldn't be relied upon to be accurate.
|
||||
self.hs.get_pusherpool().on_new_receipts(
|
||||
min_batch_id, max_batch_id, affected_room_ids
|
||||
)
|
||||
|
||||
defer.returnValue(True)
|
||||
|
||||
|
|
|
@ -13,333 +13,6 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.types import StreamToken
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
import synapse.util.async
|
||||
from .push_rule_evaluator import evaluator_for_user_id
|
||||
|
||||
import logging
|
||||
import random
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_NEXT_ID = 1
|
||||
|
||||
|
||||
def _get_next_id():
|
||||
global _NEXT_ID
|
||||
_id = _NEXT_ID
|
||||
_NEXT_ID += 1
|
||||
return _id
|
||||
|
||||
|
||||
# Pushers could now be moved to pull out of the event_push_actions table instead
|
||||
# of listening on the event stream: this would avoid them having to run the
|
||||
# rules again.
|
||||
class Pusher(object):
|
||||
INITIAL_BACKOFF = 1000
|
||||
MAX_BACKOFF = 60 * 60 * 1000
|
||||
GIVE_UP_AFTER = 24 * 60 * 60 * 1000
|
||||
|
||||
def __init__(self, _hs, user_id, app_id,
|
||||
app_display_name, device_display_name, pushkey, pushkey_ts,
|
||||
data, last_token, last_success, failing_since):
|
||||
self.hs = _hs
|
||||
self.evStreamHandler = self.hs.get_handlers().event_stream_handler
|
||||
self.store = self.hs.get_datastore()
|
||||
self.clock = self.hs.get_clock()
|
||||
self.user_id = user_id
|
||||
self.app_id = app_id
|
||||
self.app_display_name = app_display_name
|
||||
self.device_display_name = device_display_name
|
||||
self.pushkey = pushkey
|
||||
self.pushkey_ts = pushkey_ts
|
||||
self.data = data
|
||||
self.last_token = last_token
|
||||
self.last_success = last_success # not actually used
|
||||
self.backoff_delay = Pusher.INITIAL_BACKOFF
|
||||
self.failing_since = failing_since
|
||||
self.alive = True
|
||||
self.badge = None
|
||||
|
||||
self.name = "Pusher-%d" % (_get_next_id(),)
|
||||
|
||||
# The last value of last_active_time that we saw
|
||||
self.last_last_active_time = 0
|
||||
self.has_unread = True
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_context_for_event(self, ev):
|
||||
name_aliases = yield self.store.get_room_name_and_aliases(
|
||||
ev['room_id']
|
||||
)
|
||||
|
||||
ctx = {'aliases': name_aliases[1]}
|
||||
if name_aliases[0] is not None:
|
||||
ctx['name'] = name_aliases[0]
|
||||
|
||||
their_member_events_for_room = yield self.store.get_current_state(
|
||||
room_id=ev['room_id'],
|
||||
event_type='m.room.member',
|
||||
state_key=ev['user_id']
|
||||
)
|
||||
for mev in their_member_events_for_room:
|
||||
if mev.content['membership'] == 'join' and 'displayname' in mev.content:
|
||||
dn = mev.content['displayname']
|
||||
if dn is not None:
|
||||
ctx['sender_display_name'] = dn
|
||||
|
||||
defer.returnValue(ctx)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def start(self):
|
||||
with LoggingContext(self.name):
|
||||
if not self.last_token:
|
||||
# First-time setup: get a token to start from (we can't
|
||||
# just start from no token, ie. 'now'
|
||||
# because we need the result to be reproduceable in case
|
||||
# we fail to dispatch the push)
|
||||
config = PaginationConfig(from_token=None, limit='1')
|
||||
chunk = yield self.evStreamHandler.get_stream(
|
||||
self.user_id, config, timeout=0, affect_presence=False
|
||||
)
|
||||
self.last_token = chunk['end']
|
||||
yield self.store.update_pusher_last_token(
|
||||
self.app_id, self.pushkey, self.user_id, self.last_token
|
||||
)
|
||||
logger.info("New pusher %s for user %s starting from token %s",
|
||||
self.pushkey, self.user_id, self.last_token)
|
||||
|
||||
else:
|
||||
logger.info(
|
||||
"Old pusher %s for user %s starting",
|
||||
self.pushkey, self.user_id,
|
||||
)
|
||||
|
||||
wait = 0
|
||||
while self.alive:
|
||||
try:
|
||||
if wait > 0:
|
||||
yield synapse.util.async.sleep(wait)
|
||||
with Measure(self.clock, "push"):
|
||||
yield self.get_and_dispatch()
|
||||
wait = 0
|
||||
except:
|
||||
if wait == 0:
|
||||
wait = 1
|
||||
else:
|
||||
wait = min(wait * 2, 1800)
|
||||
logger.exception(
|
||||
"Exception in pusher loop for pushkey %s. Pausing for %ds",
|
||||
self.pushkey, wait
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_and_dispatch(self):
|
||||
from_tok = StreamToken.from_string(self.last_token)
|
||||
config = PaginationConfig(from_token=from_tok, limit='1')
|
||||
timeout = (300 + random.randint(-60, 60)) * 1000
|
||||
chunk = yield self.evStreamHandler.get_stream(
|
||||
self.user_id, config, timeout=timeout, affect_presence=False,
|
||||
only_keys=("room", "receipt",),
|
||||
)
|
||||
|
||||
# limiting to 1 may get 1 event plus 1 presence event, so
|
||||
# pick out the actual event
|
||||
single_event = None
|
||||
read_receipt = None
|
||||
for c in chunk['chunk']:
|
||||
if 'event_id' in c: # Hmmm...
|
||||
single_event = c
|
||||
elif c['type'] == 'm.receipt':
|
||||
read_receipt = c
|
||||
|
||||
have_updated_badge = False
|
||||
if read_receipt:
|
||||
for receipt_part in read_receipt['content'].values():
|
||||
if 'm.read' in receipt_part:
|
||||
if self.user_id in receipt_part['m.read'].keys():
|
||||
have_updated_badge = True
|
||||
|
||||
if not single_event:
|
||||
if have_updated_badge:
|
||||
yield self.update_badge()
|
||||
self.last_token = chunk['end']
|
||||
yield self.store.update_pusher_last_token(
|
||||
self.app_id,
|
||||
self.pushkey,
|
||||
self.user_id,
|
||||
self.last_token
|
||||
)
|
||||
return
|
||||
|
||||
if not self.alive:
|
||||
return
|
||||
|
||||
processed = False
|
||||
|
||||
rule_evaluator = yield \
|
||||
evaluator_for_user_id(
|
||||
self.user_id, single_event['room_id'], self.store
|
||||
)
|
||||
|
||||
actions = yield rule_evaluator.actions_for_event(single_event)
|
||||
tweaks = rule_evaluator.tweaks_for_actions(actions)
|
||||
|
||||
if 'notify' in actions:
|
||||
self.badge = yield self._get_badge_count()
|
||||
rejected = yield self.dispatch_push(single_event, tweaks, self.badge)
|
||||
self.has_unread = True
|
||||
if isinstance(rejected, list) or isinstance(rejected, tuple):
|
||||
processed = True
|
||||
for pk in rejected:
|
||||
if pk != self.pushkey:
|
||||
# for sanity, we only remove the pushkey if it
|
||||
# was the one we actually sent...
|
||||
logger.warn(
|
||||
("Ignoring rejected pushkey %s because we"
|
||||
" didn't send it"), pk
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"Pushkey %s was rejected: removing",
|
||||
pk
|
||||
)
|
||||
yield self.hs.get_pusherpool().remove_pusher(
|
||||
self.app_id, pk, self.user_id
|
||||
)
|
||||
else:
|
||||
if have_updated_badge:
|
||||
yield self.update_badge()
|
||||
processed = True
|
||||
|
||||
if not self.alive:
|
||||
return
|
||||
|
||||
if processed:
|
||||
self.backoff_delay = Pusher.INITIAL_BACKOFF
|
||||
self.last_token = chunk['end']
|
||||
yield self.store.update_pusher_last_token_and_success(
|
||||
self.app_id,
|
||||
self.pushkey,
|
||||
self.user_id,
|
||||
self.last_token,
|
||||
self.clock.time_msec()
|
||||
)
|
||||
if self.failing_since:
|
||||
self.failing_since = None
|
||||
yield self.store.update_pusher_failing_since(
|
||||
self.app_id,
|
||||
self.pushkey,
|
||||
self.user_id,
|
||||
self.failing_since)
|
||||
else:
|
||||
if not self.failing_since:
|
||||
self.failing_since = self.clock.time_msec()
|
||||
yield self.store.update_pusher_failing_since(
|
||||
self.app_id,
|
||||
self.pushkey,
|
||||
self.user_id,
|
||||
self.failing_since
|
||||
)
|
||||
|
||||
if (self.failing_since and
|
||||
self.failing_since <
|
||||
self.clock.time_msec() - Pusher.GIVE_UP_AFTER):
|
||||
# we really only give up so that if the URL gets
|
||||
# fixed, we don't suddenly deliver a load
|
||||
# of old notifications.
|
||||
logger.warn("Giving up on a notification to user %s, "
|
||||
"pushkey %s",
|
||||
self.user_id, self.pushkey)
|
||||
self.backoff_delay = Pusher.INITIAL_BACKOFF
|
||||
self.last_token = chunk['end']
|
||||
yield self.store.update_pusher_last_token(
|
||||
self.app_id,
|
||||
self.pushkey,
|
||||
self.user_id,
|
||||
self.last_token
|
||||
)
|
||||
|
||||
self.failing_since = None
|
||||
yield self.store.update_pusher_failing_since(
|
||||
self.app_id,
|
||||
self.pushkey,
|
||||
self.user_id,
|
||||
self.failing_since
|
||||
)
|
||||
else:
|
||||
logger.warn("Failed to dispatch push for user %s "
|
||||
"(failing for %dms)."
|
||||
"Trying again in %dms",
|
||||
self.user_id,
|
||||
self.clock.time_msec() - self.failing_since,
|
||||
self.backoff_delay)
|
||||
yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
|
||||
self.backoff_delay *= 2
|
||||
if self.backoff_delay > Pusher.MAX_BACKOFF:
|
||||
self.backoff_delay = Pusher.MAX_BACKOFF
|
||||
|
||||
def stop(self):
|
||||
self.alive = False
|
||||
|
||||
def dispatch_push(self, p, tweaks, badge):
|
||||
"""
|
||||
Overridden by implementing classes to actually deliver the notification
|
||||
Args:
|
||||
p: The event to notify for as a single event from the event stream
|
||||
Returns: If the notification was delivered, an array containing any
|
||||
pushkeys that were rejected by the push gateway.
|
||||
False if the notification could not be delivered (ie.
|
||||
should be retried).
|
||||
"""
|
||||
pass
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_badge(self):
|
||||
new_badge = yield self._get_badge_count()
|
||||
if self.badge != new_badge:
|
||||
self.badge = new_badge
|
||||
yield self.send_badge(self.badge)
|
||||
|
||||
def send_badge(self, badge):
|
||||
"""
|
||||
Overridden by implementing classes to send an updated badge count
|
||||
"""
|
||||
pass
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_badge_count(self):
|
||||
invites, joins = yield defer.gatherResults([
|
||||
self.store.get_invited_rooms_for_user(self.user_id),
|
||||
self.store.get_rooms_for_user(self.user_id),
|
||||
], consumeErrors=True)
|
||||
|
||||
my_receipts_by_room = yield self.store.get_receipts_for_user(
|
||||
self.user_id,
|
||||
"m.read",
|
||||
)
|
||||
|
||||
badge = len(invites)
|
||||
|
||||
for r in joins:
|
||||
if r.room_id in my_receipts_by_room:
|
||||
last_unread_event_id = my_receipts_by_room[r.room_id]
|
||||
|
||||
notifs = yield (
|
||||
self.store.get_unread_event_push_actions_by_room_for_user(
|
||||
r.room_id, self.user_id, last_unread_event_id
|
||||
)
|
||||
)
|
||||
badge += notifs["notify_count"]
|
||||
defer.returnValue(badge)
|
||||
|
||||
|
||||
class PusherConfigException(Exception):
|
||||
def __init__(self, msg):
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
|
||||
from twisted.internet import defer
|
||||
|
||||
from .bulk_push_rule_evaluator import evaluator_for_room_id
|
||||
from .bulk_push_rule_evaluator import evaluator_for_event
|
||||
|
||||
import logging
|
||||
|
||||
|
@ -35,8 +35,8 @@ class ActionGenerator:
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def handle_push_actions_for_event(self, event, context, handler):
|
||||
bulk_evaluator = yield evaluator_for_room_id(
|
||||
event.room_id, self.hs, self.store
|
||||
bulk_evaluator = yield evaluator_for_event(
|
||||
event, self.hs, self.store
|
||||
)
|
||||
|
||||
actions_by_user = yield bulk_evaluator.action_for_event_by_user(
|
||||
|
|
|
@ -79,7 +79,7 @@ def make_base_append_rules(kind, modified_base_rules):
|
|||
rules = []
|
||||
|
||||
if kind == 'override':
|
||||
rules = BASE_APPEND_OVRRIDE_RULES
|
||||
rules = BASE_APPEND_OVERRIDE_RULES
|
||||
elif kind == 'underride':
|
||||
rules = BASE_APPEND_UNDERRIDE_RULES
|
||||
elif kind == 'content':
|
||||
|
@ -148,7 +148,7 @@ BASE_PREPEND_OVERRIDE_RULES = [
|
|||
]
|
||||
|
||||
|
||||
BASE_APPEND_OVRRIDE_RULES = [
|
||||
BASE_APPEND_OVERRIDE_RULES = [
|
||||
{
|
||||
'rule_id': 'global/override/.m.rule.suppress_notices',
|
||||
'conditions': [
|
||||
|
@ -163,6 +163,40 @@ BASE_APPEND_OVRRIDE_RULES = [
|
|||
'dont_notify',
|
||||
]
|
||||
},
|
||||
# NB. .m.rule.invite_for_me must be higher prio than .m.rule.member_event
|
||||
# otherwise invites will be matched by .m.rule.member_event
|
||||
{
|
||||
'rule_id': 'global/underride/.m.rule.invite_for_me',
|
||||
'conditions': [
|
||||
{
|
||||
'kind': 'event_match',
|
||||
'key': 'type',
|
||||
'pattern': 'm.room.member',
|
||||
'_id': '_member',
|
||||
},
|
||||
{
|
||||
'kind': 'event_match',
|
||||
'key': 'content.membership',
|
||||
'pattern': 'invite',
|
||||
'_id': '_invite_member',
|
||||
},
|
||||
{
|
||||
'kind': 'event_match',
|
||||
'key': 'state_key',
|
||||
'pattern_type': 'user_id'
|
||||
},
|
||||
],
|
||||
'actions': [
|
||||
'notify',
|
||||
{
|
||||
'set_tweak': 'sound',
|
||||
'value': 'default'
|
||||
}, {
|
||||
'set_tweak': 'highlight',
|
||||
'value': False
|
||||
}
|
||||
]
|
||||
},
|
||||
# Will we sometimes want to know about people joining and leaving?
|
||||
# Perhaps: if so, this could be expanded upon. Seems the most usual case
|
||||
# is that we don't though. We add this override rule so that even if
|
||||
|
@ -251,38 +285,6 @@ BASE_APPEND_UNDERRIDE_RULES = [
|
|||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
'rule_id': 'global/underride/.m.rule.invite_for_me',
|
||||
'conditions': [
|
||||
{
|
||||
'kind': 'event_match',
|
||||
'key': 'type',
|
||||
'pattern': 'm.room.member',
|
||||
'_id': '_member',
|
||||
},
|
||||
{
|
||||
'kind': 'event_match',
|
||||
'key': 'content.membership',
|
||||
'pattern': 'invite',
|
||||
'_id': '_invite_member',
|
||||
},
|
||||
{
|
||||
'kind': 'event_match',
|
||||
'key': 'state_key',
|
||||
'pattern_type': 'user_id'
|
||||
},
|
||||
],
|
||||
'actions': [
|
||||
'notify',
|
||||
{
|
||||
'set_tweak': 'sound',
|
||||
'value': 'default'
|
||||
}, {
|
||||
'set_tweak': 'highlight',
|
||||
'value': False
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
'rule_id': 'global/underride/.m.rule.message',
|
||||
'conditions': [
|
||||
|
@ -315,7 +317,7 @@ for r in BASE_PREPEND_OVERRIDE_RULES:
|
|||
r['default'] = True
|
||||
BASE_RULE_IDS.add(r['rule_id'])
|
||||
|
||||
for r in BASE_APPEND_OVRRIDE_RULES:
|
||||
for r in BASE_APPEND_OVERRIDE_RULES:
|
||||
r['priority_class'] = PRIORITY_CLASS_MAP['override']
|
||||
r['default'] = True
|
||||
BASE_RULE_IDS.add(r['rule_id'])
|
||||
|
|
|
@ -69,12 +69,28 @@ def _get_rules(room_id, user_ids, store):
|
|||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def evaluator_for_room_id(room_id, hs, store):
|
||||
results = yield store.get_receipts_for_room(room_id, "m.read")
|
||||
user_ids = [
|
||||
row["user_id"] for row in results
|
||||
if hs.is_mine_id(row["user_id"])
|
||||
]
|
||||
def evaluator_for_event(event, hs, store):
|
||||
room_id = event.room_id
|
||||
users_with_pushers = yield store.get_users_with_pushers_in_room(room_id)
|
||||
receipts = yield store.get_receipts_for_room(room_id, "m.read")
|
||||
|
||||
# any users with pushers must be ours: they have pushers
|
||||
user_ids = set(users_with_pushers)
|
||||
for r in receipts:
|
||||
if hs.is_mine_id(r['user_id']):
|
||||
user_ids.add(r['user_id'])
|
||||
|
||||
# if this event is an invite event, we may need to run rules for the user
|
||||
# who's been invited, otherwise they won't get told they've been invited
|
||||
if event.type == 'm.room.member' and event.content['membership'] == 'invite':
|
||||
invited_user = event.state_key
|
||||
if invited_user and hs.is_mine_id(invited_user):
|
||||
has_pusher = yield store.user_has_pusher(invited_user)
|
||||
if has_pusher:
|
||||
user_ids.add(invited_user)
|
||||
|
||||
user_ids = list(user_ids)
|
||||
|
||||
rules_by_user = yield _get_rules(room_id, user_ids, store)
|
||||
|
||||
defer.returnValue(BulkPushRuleEvaluator(
|
||||
|
@ -101,10 +117,15 @@ class BulkPushRuleEvaluator:
|
|||
def action_for_event_by_user(self, event, handler, current_state):
|
||||
actions_by_user = {}
|
||||
|
||||
users_dict = yield self.store.are_guests(self.rules_by_user.keys())
|
||||
# None of these users can be peeking since this list of users comes
|
||||
# from the set of users in the room, so we know for sure they're all
|
||||
# actually in the room.
|
||||
user_tuples = [
|
||||
(u, False) for u in self.rules_by_user.keys()
|
||||
]
|
||||
|
||||
filtered_by_user = yield handler.filter_events_for_clients(
|
||||
users_dict.items(), [event], {event.event_id: current_state}
|
||||
user_tuples, [event], {event.event_id: current_state}
|
||||
)
|
||||
|
||||
room_members = yield self.store.get_users_in_room(self.room_id)
|
||||
|
|
|
@ -13,60 +13,236 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.push import Pusher, PusherConfigException
|
||||
from synapse.push import PusherConfigException
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet import defer, reactor
|
||||
|
||||
import logging
|
||||
import push_rule_evaluator
|
||||
import push_tools
|
||||
|
||||
from synapse.util.metrics import Measure
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HttpPusher(Pusher):
|
||||
def __init__(self, _hs, user_id, app_id,
|
||||
app_display_name, device_display_name, pushkey, pushkey_ts,
|
||||
data, last_token, last_success, failing_since):
|
||||
super(HttpPusher, self).__init__(
|
||||
_hs,
|
||||
user_id,
|
||||
app_id,
|
||||
app_display_name,
|
||||
device_display_name,
|
||||
pushkey,
|
||||
pushkey_ts,
|
||||
data,
|
||||
last_token,
|
||||
last_success,
|
||||
failing_since
|
||||
class HttpPusher(object):
|
||||
INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes
|
||||
MAX_BACKOFF_SEC = 60 * 60
|
||||
|
||||
# This one's in ms because we compare it against the clock
|
||||
GIVE_UP_AFTER_MS = 24 * 60 * 60 * 1000
|
||||
|
||||
def __init__(self, hs, pusherdict):
|
||||
self.hs = hs
|
||||
self.store = self.hs.get_datastore()
|
||||
self.clock = self.hs.get_clock()
|
||||
self.user_id = pusherdict['user_name']
|
||||
self.app_id = pusherdict['app_id']
|
||||
self.app_display_name = pusherdict['app_display_name']
|
||||
self.device_display_name = pusherdict['device_display_name']
|
||||
self.pushkey = pusherdict['pushkey']
|
||||
self.pushkey_ts = pusherdict['ts']
|
||||
self.data = pusherdict['data']
|
||||
self.last_stream_ordering = pusherdict['last_stream_ordering']
|
||||
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
|
||||
self.failing_since = pusherdict['failing_since']
|
||||
self.timed_call = None
|
||||
self.processing = False
|
||||
|
||||
# This is the highest stream ordering we know it's safe to process.
|
||||
# When new events arrive, we'll be given a window of new events: we
|
||||
# should honour this rather than just looking for anything higher
|
||||
# because of potential out-of-order event serialisation. This starts
|
||||
# off as None though as we don't know any better.
|
||||
self.max_stream_ordering = None
|
||||
|
||||
if 'data' not in pusherdict:
|
||||
raise PusherConfigException(
|
||||
"No 'data' key for HTTP pusher"
|
||||
)
|
||||
self.data = pusherdict['data']
|
||||
|
||||
self.name = "%s/%s/%s" % (
|
||||
pusherdict['user_name'],
|
||||
pusherdict['app_id'],
|
||||
pusherdict['pushkey'],
|
||||
)
|
||||
if 'url' not in data:
|
||||
|
||||
if 'url' not in self.data:
|
||||
raise PusherConfigException(
|
||||
"'url' required in data for HTTP pusher"
|
||||
)
|
||||
self.url = data['url']
|
||||
self.http_client = _hs.get_simple_http_client()
|
||||
self.url = self.data['url']
|
||||
self.http_client = hs.get_simple_http_client()
|
||||
self.data_minus_url = {}
|
||||
self.data_minus_url.update(self.data)
|
||||
del self.data_minus_url['url']
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _build_notification_dict(self, event, tweaks, badge):
|
||||
# we probably do not want to push for every presence update
|
||||
# (we may want to be able to set up notifications when specific
|
||||
# people sign in, but we'd want to only deliver the pertinent ones)
|
||||
# Actually, presence events will not get this far now because we
|
||||
# need to filter them out in the main Pusher code.
|
||||
if 'event_id' not in event:
|
||||
defer.returnValue(None)
|
||||
def on_started(self):
|
||||
yield self._process()
|
||||
|
||||
ctx = yield self.get_context_for_event(event)
|
||||
@defer.inlineCallbacks
|
||||
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
|
||||
with Measure(self.clock, "push.on_new_notifications"):
|
||||
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
|
||||
yield self._process()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_new_receipts(self, min_stream_id, max_stream_id):
|
||||
# Note that the min here shouldn't be relied upon to be accurate.
|
||||
|
||||
# We could check the receipts are actually m.read receipts here,
|
||||
# but currently that's the only type of receipt anyway...
|
||||
with Measure(self.clock, "push.on_new_receipts"):
|
||||
badge = yield push_tools.get_badge_count(
|
||||
self.hs.get_datastore(), self.user_id
|
||||
)
|
||||
yield self.send_badge(badge)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_timer(self):
|
||||
with Measure(self.clock, "push.on_timer"):
|
||||
yield self._process()
|
||||
|
||||
def on_stop(self):
|
||||
if self.timed_call:
|
||||
self.timed_call.cancel()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _process(self):
|
||||
if self.processing:
|
||||
return
|
||||
try:
|
||||
self.processing = True
|
||||
# if the max ordering changes while we're running _unsafe_process,
|
||||
# call it again, and so on until we've caught up.
|
||||
while True:
|
||||
starting_max_ordering = self.max_stream_ordering
|
||||
try:
|
||||
yield self._unsafe_process()
|
||||
except:
|
||||
logger.exception("Exception processing notifs")
|
||||
if self.max_stream_ordering == starting_max_ordering:
|
||||
break
|
||||
finally:
|
||||
self.processing = False
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _unsafe_process(self):
|
||||
"""
|
||||
Looks for unset notifications and dispatch them, in order
|
||||
Never call this directly: use _process which will only allow this to
|
||||
run once per pusher.
|
||||
"""
|
||||
|
||||
unprocessed = yield self.store.get_unread_push_actions_for_user_in_range(
|
||||
self.user_id, self.last_stream_ordering, self.max_stream_ordering
|
||||
)
|
||||
|
||||
for push_action in unprocessed:
|
||||
processed = yield self._process_one(push_action)
|
||||
if processed:
|
||||
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
|
||||
self.last_stream_ordering = push_action['stream_ordering']
|
||||
self.store.update_pusher_last_stream_ordering_and_success(
|
||||
self.app_id, self.pushkey, self.user_id,
|
||||
self.last_stream_ordering,
|
||||
self.clock.time_msec()
|
||||
)
|
||||
if self.failing_since:
|
||||
self.failing_since = None
|
||||
yield self.store.update_pusher_failing_since(
|
||||
self.app_id, self.pushkey, self.user_id,
|
||||
self.failing_since
|
||||
)
|
||||
else:
|
||||
if not self.failing_since:
|
||||
self.failing_since = self.clock.time_msec()
|
||||
yield self.store.update_pusher_failing_since(
|
||||
self.app_id, self.pushkey, self.user_id,
|
||||
self.failing_since
|
||||
)
|
||||
|
||||
if (
|
||||
self.failing_since and
|
||||
self.failing_since <
|
||||
self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER_MS
|
||||
):
|
||||
# we really only give up so that if the URL gets
|
||||
# fixed, we don't suddenly deliver a load
|
||||
# of old notifications.
|
||||
logger.warn("Giving up on a notification to user %s, "
|
||||
"pushkey %s",
|
||||
self.user_id, self.pushkey)
|
||||
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
|
||||
self.last_stream_ordering = push_action['stream_ordering']
|
||||
yield self.store.update_pusher_last_stream_ordering(
|
||||
self.app_id,
|
||||
self.pushkey,
|
||||
self.user_id,
|
||||
self.last_stream_ordering
|
||||
)
|
||||
|
||||
self.failing_since = None
|
||||
yield self.store.update_pusher_failing_since(
|
||||
self.app_id,
|
||||
self.pushkey,
|
||||
self.user_id,
|
||||
self.failing_since
|
||||
)
|
||||
else:
|
||||
logger.info("Push failed: delaying for %ds", self.backoff_delay)
|
||||
self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
|
||||
self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
|
||||
break
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _process_one(self, push_action):
|
||||
if 'notify' not in push_action['actions']:
|
||||
defer.returnValue(True)
|
||||
|
||||
tweaks = push_rule_evaluator.tweaks_for_actions(push_action['actions'])
|
||||
badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
|
||||
|
||||
event = yield self.store.get_event(push_action['event_id'], allow_none=True)
|
||||
if event is None:
|
||||
defer.returnValue(True) # It's been redacted
|
||||
rejected = yield self.dispatch_push(event, tweaks, badge)
|
||||
if rejected is False:
|
||||
defer.returnValue(False)
|
||||
|
||||
if isinstance(rejected, list) or isinstance(rejected, tuple):
|
||||
for pk in rejected:
|
||||
if pk != self.pushkey:
|
||||
# for sanity, we only remove the pushkey if it
|
||||
# was the one we actually sent...
|
||||
logger.warn(
|
||||
("Ignoring rejected pushkey %s because we"
|
||||
" didn't send it"), pk
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"Pushkey %s was rejected: removing",
|
||||
pk
|
||||
)
|
||||
yield self.hs.get_pusherpool().remove_pusher(
|
||||
self.app_id, pk, self.user_id
|
||||
)
|
||||
defer.returnValue(True)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _build_notification_dict(self, event, tweaks, badge):
|
||||
ctx = yield push_tools.get_context_for_event(self.hs.get_datastore(), event)
|
||||
|
||||
d = {
|
||||
'notification': {
|
||||
'id': event['event_id'],
|
||||
'room_id': event['room_id'],
|
||||
'type': event['type'],
|
||||
'sender': event['user_id'],
|
||||
'id': event.event_id, # deprecated: remove soon
|
||||
'event_id': event.event_id,
|
||||
'room_id': event.room_id,
|
||||
'type': event.type,
|
||||
'sender': event.user_id,
|
||||
'counts': { # -- we don't mark messages as read yet so
|
||||
# we have no way of knowing
|
||||
# Just set the badge to 1 until we have read receipts
|
||||
|
@ -84,11 +260,11 @@ class HttpPusher(Pusher):
|
|||
]
|
||||
}
|
||||
}
|
||||
if event['type'] == 'm.room.member':
|
||||
d['notification']['membership'] = event['content']['membership']
|
||||
d['notification']['user_is_target'] = event['state_key'] == self.user_id
|
||||
if event.type == 'm.room.member':
|
||||
d['notification']['membership'] = event.content['membership']
|
||||
d['notification']['user_is_target'] = event.state_key == self.user_id
|
||||
if 'content' in event:
|
||||
d['notification']['content'] = event['content']
|
||||
d['notification']['content'] = event.content
|
||||
|
||||
if len(ctx['aliases']):
|
||||
d['notification']['room_alias'] = ctx['aliases'][0]
|
||||
|
|
|
@ -13,12 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from .baserules import list_with_base_rules
|
||||
|
||||
import logging
|
||||
import simplejson as json
|
||||
import re
|
||||
|
||||
from synapse.types import UserID
|
||||
|
@ -32,22 +27,6 @@ IS_GLOB = re.compile(r'[\?\*\[\]]')
|
|||
INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$")
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def evaluator_for_user_id(user_id, room_id, store):
|
||||
rawrules = yield store.get_push_rules_for_user(user_id)
|
||||
enabled_map = yield store.get_push_rules_enabled_for_user(user_id)
|
||||
our_member_event = yield store.get_current_state(
|
||||
room_id=room_id,
|
||||
event_type='m.room.member',
|
||||
state_key=user_id,
|
||||
)
|
||||
|
||||
defer.returnValue(PushRuleEvaluator(
|
||||
user_id, rawrules, enabled_map,
|
||||
room_id, our_member_event, store
|
||||
))
|
||||
|
||||
|
||||
def _room_member_count(ev, condition, room_member_count):
|
||||
if 'is' not in condition:
|
||||
return False
|
||||
|
@ -74,111 +53,14 @@ def _room_member_count(ev, condition, room_member_count):
|
|||
return False
|
||||
|
||||
|
||||
class PushRuleEvaluator:
|
||||
DEFAULT_ACTIONS = []
|
||||
|
||||
def __init__(self, user_id, raw_rules, enabled_map, room_id,
|
||||
our_member_event, store):
|
||||
self.user_id = user_id
|
||||
self.room_id = room_id
|
||||
self.our_member_event = our_member_event
|
||||
self.store = store
|
||||
|
||||
rules = []
|
||||
for raw_rule in raw_rules:
|
||||
rule = dict(raw_rule)
|
||||
rule['conditions'] = json.loads(raw_rule['conditions'])
|
||||
rule['actions'] = json.loads(raw_rule['actions'])
|
||||
rules.append(rule)
|
||||
|
||||
self.rules = list_with_base_rules(rules)
|
||||
|
||||
self.enabled_map = enabled_map
|
||||
|
||||
@staticmethod
|
||||
def tweaks_for_actions(actions):
|
||||
tweaks = {}
|
||||
for a in actions:
|
||||
if not isinstance(a, dict):
|
||||
continue
|
||||
if 'set_tweak' in a and 'value' in a:
|
||||
tweaks[a['set_tweak']] = a['value']
|
||||
return tweaks
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def actions_for_event(self, ev):
|
||||
"""
|
||||
This should take into account notification settings that the user
|
||||
has configured both globally and per-room when we have the ability
|
||||
to do such things.
|
||||
"""
|
||||
if ev['user_id'] == self.user_id:
|
||||
# let's assume you probably know about messages you sent yourself
|
||||
defer.returnValue([])
|
||||
|
||||
room_id = ev['room_id']
|
||||
|
||||
# get *our* member event for display name matching
|
||||
my_display_name = None
|
||||
|
||||
if self.our_member_event:
|
||||
my_display_name = self.our_member_event[0].content.get("displayname")
|
||||
|
||||
room_members = yield self.store.get_users_in_room(room_id)
|
||||
room_member_count = len(room_members)
|
||||
|
||||
evaluator = PushRuleEvaluatorForEvent(ev, room_member_count)
|
||||
|
||||
for r in self.rules:
|
||||
enabled = self.enabled_map.get(r['rule_id'], None)
|
||||
if enabled is not None and not enabled:
|
||||
continue
|
||||
elif enabled is None and not r.get("enabled", True):
|
||||
# if no override, check enabled on the rule itself
|
||||
# (may have come from a base rule)
|
||||
continue
|
||||
|
||||
conditions = r['conditions']
|
||||
actions = r['actions']
|
||||
|
||||
# ignore rules with no actions (we have an explict 'dont_notify')
|
||||
if len(actions) == 0:
|
||||
logger.warn(
|
||||
"Ignoring rule id %s with no actions for user %s",
|
||||
r['rule_id'], self.user_id
|
||||
)
|
||||
continue
|
||||
|
||||
matches = True
|
||||
for c in conditions:
|
||||
matches = evaluator.matches(
|
||||
c, self.user_id, my_display_name
|
||||
)
|
||||
if not matches:
|
||||
break
|
||||
|
||||
logger.debug(
|
||||
"Rule %s %s",
|
||||
r['rule_id'], "matches" if matches else "doesn't match"
|
||||
)
|
||||
|
||||
if matches:
|
||||
logger.debug(
|
||||
"%s matches for user %s, event %s",
|
||||
r['rule_id'], self.user_id, ev['event_id']
|
||||
)
|
||||
|
||||
# filter out dont_notify as we treat an empty actions list
|
||||
# as dont_notify, and this doesn't take up a row in our database
|
||||
actions = [x for x in actions if x != 'dont_notify']
|
||||
|
||||
defer.returnValue(actions)
|
||||
|
||||
logger.debug(
|
||||
"No rules match for user %s, event %s",
|
||||
self.user_id, ev['event_id']
|
||||
)
|
||||
defer.returnValue(PushRuleEvaluator.DEFAULT_ACTIONS)
|
||||
def tweaks_for_actions(actions):
|
||||
tweaks = {}
|
||||
for a in actions:
|
||||
if not isinstance(a, dict):
|
||||
continue
|
||||
if 'set_tweak' in a and 'value' in a:
|
||||
tweaks[a['set_tweak']] = a['value']
|
||||
return tweaks
|
||||
|
||||
|
||||
class PushRuleEvaluatorForEvent(object):
|
||||
|
|
66
synapse/push/push_tools.py
Normal file
66
synapse/push/push_tools.py
Normal file
|
@ -0,0 +1,66 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2015, 2016 OpenMarket Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_badge_count(store, user_id):
|
||||
invites, joins = yield defer.gatherResults([
|
||||
store.get_invited_rooms_for_user(user_id),
|
||||
store.get_rooms_for_user(user_id),
|
||||
], consumeErrors=True)
|
||||
|
||||
my_receipts_by_room = yield store.get_receipts_for_user(
|
||||
user_id, "m.read",
|
||||
)
|
||||
|
||||
badge = len(invites)
|
||||
|
||||
for r in joins:
|
||||
if r.room_id in my_receipts_by_room:
|
||||
last_unread_event_id = my_receipts_by_room[r.room_id]
|
||||
|
||||
notifs = yield (
|
||||
store.get_unread_event_push_actions_by_room_for_user(
|
||||
r.room_id, user_id, last_unread_event_id
|
||||
)
|
||||
)
|
||||
badge += notifs["notify_count"]
|
||||
defer.returnValue(badge)
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_context_for_event(store, ev):
|
||||
name_aliases = yield store.get_room_name_and_aliases(
|
||||
ev.room_id
|
||||
)
|
||||
|
||||
ctx = {'aliases': name_aliases[1]}
|
||||
if name_aliases[0] is not None:
|
||||
ctx['name'] = name_aliases[0]
|
||||
|
||||
their_member_events_for_room = yield store.get_current_state(
|
||||
room_id=ev.room_id,
|
||||
event_type='m.room.member',
|
||||
state_key=ev.user_id
|
||||
)
|
||||
for mev in their_member_events_for_room:
|
||||
if mev.content['membership'] == 'join' and 'displayname' in mev.content:
|
||||
dn = mev.content['displayname']
|
||||
if dn is not None:
|
||||
ctx['sender_display_name'] = dn
|
||||
|
||||
defer.returnValue(ctx)
|
10
synapse/push/pusher.py
Normal file
10
synapse/push/pusher.py
Normal file
|
@ -0,0 +1,10 @@
|
|||
from httppusher import HttpPusher
|
||||
|
||||
PUSHER_TYPES = {
|
||||
'http': HttpPusher
|
||||
}
|
||||
|
||||
|
||||
def create_pusher(hs, pusherdict):
|
||||
if pusherdict['kind'] in PUSHER_TYPES:
|
||||
return PUSHER_TYPES[pusherdict['kind']](hs, pusherdict)
|
|
@ -16,9 +16,10 @@
|
|||
|
||||
from twisted.internet import defer
|
||||
|
||||
from .httppusher import HttpPusher
|
||||
import pusher
|
||||
from synapse.push import PusherConfigException
|
||||
from synapse.util.logcontext import preserve_fn
|
||||
from synapse.util.async import run_on_reactor
|
||||
|
||||
import logging
|
||||
|
||||
|
@ -48,7 +49,7 @@ class PusherPool:
|
|||
# will then get pulled out of the database,
|
||||
# recreated, added and started: this means we have only one
|
||||
# code path adding pushers.
|
||||
self._create_pusher({
|
||||
pusher.create_pusher(self.hs, {
|
||||
"user_name": user_id,
|
||||
"kind": kind,
|
||||
"app_id": app_id,
|
||||
|
@ -58,10 +59,18 @@ class PusherPool:
|
|||
"ts": time_now_msec,
|
||||
"lang": lang,
|
||||
"data": data,
|
||||
"last_token": None,
|
||||
"last_stream_ordering": None,
|
||||
"last_success": None,
|
||||
"failing_since": None
|
||||
})
|
||||
|
||||
# create the pusher setting last_stream_ordering to the current maximum
|
||||
# stream ordering in event_push_actions, so it will process
|
||||
# pushes from this point onwards.
|
||||
last_stream_ordering = (
|
||||
yield self.store.get_latest_push_action_stream_ordering()
|
||||
)
|
||||
|
||||
yield self.store.add_pusher(
|
||||
user_id=user_id,
|
||||
access_token=access_token,
|
||||
|
@ -73,6 +82,7 @@ class PusherPool:
|
|||
pushkey_ts=time_now_msec,
|
||||
lang=lang,
|
||||
data=data,
|
||||
last_stream_ordering=last_stream_ordering,
|
||||
profile_tag=profile_tag,
|
||||
)
|
||||
yield self._refresh_pusher(app_id, pushkey, user_id)
|
||||
|
@ -106,26 +116,51 @@ class PusherPool:
|
|||
)
|
||||
yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
|
||||
|
||||
def _create_pusher(self, pusherdict):
|
||||
if pusherdict['kind'] == 'http':
|
||||
return HttpPusher(
|
||||
self.hs,
|
||||
user_id=pusherdict['user_name'],
|
||||
app_id=pusherdict['app_id'],
|
||||
app_display_name=pusherdict['app_display_name'],
|
||||
device_display_name=pusherdict['device_display_name'],
|
||||
pushkey=pusherdict['pushkey'],
|
||||
pushkey_ts=pusherdict['ts'],
|
||||
data=pusherdict['data'],
|
||||
last_token=pusherdict['last_token'],
|
||||
last_success=pusherdict['last_success'],
|
||||
failing_since=pusherdict['failing_since']
|
||||
@defer.inlineCallbacks
|
||||
def on_new_notifications(self, min_stream_id, max_stream_id):
|
||||
yield run_on_reactor()
|
||||
try:
|
||||
users_affected = yield self.store.get_push_action_users_in_range(
|
||||
min_stream_id, max_stream_id
|
||||
)
|
||||
else:
|
||||
raise PusherConfigException(
|
||||
"Unknown pusher type '%s' for user %s" %
|
||||
(pusherdict['kind'], pusherdict['user_name'])
|
||||
|
||||
deferreds = []
|
||||
|
||||
for u in users_affected:
|
||||
if u in self.pushers:
|
||||
for p in self.pushers[u].values():
|
||||
deferreds.append(
|
||||
p.on_new_notifications(min_stream_id, max_stream_id)
|
||||
)
|
||||
|
||||
yield defer.gatherResults(deferreds)
|
||||
except:
|
||||
logger.exception("Exception in pusher on_new_notifications")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
|
||||
yield run_on_reactor()
|
||||
try:
|
||||
# Need to subtract 1 from the minimum because the lower bound here
|
||||
# is not inclusive
|
||||
updated_receipts = yield self.store.get_all_updated_receipts(
|
||||
min_stream_id - 1, max_stream_id
|
||||
)
|
||||
# This returns a tuple, user_id is at index 3
|
||||
users_affected = set([r[3] for r in updated_receipts])
|
||||
|
||||
deferreds = []
|
||||
|
||||
for u in users_affected:
|
||||
if u in self.pushers:
|
||||
for p in self.pushers[u].values():
|
||||
deferreds.append(
|
||||
p.on_new_receipts(min_stream_id, max_stream_id)
|
||||
)
|
||||
|
||||
yield defer.gatherResults(deferreds)
|
||||
except:
|
||||
logger.exception("Exception in pusher on_new_receipts")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _refresh_pusher(self, app_id, pushkey, user_id):
|
||||
|
@ -146,30 +181,34 @@ class PusherPool:
|
|||
logger.info("Starting %d pushers", len(pushers))
|
||||
for pusherdict in pushers:
|
||||
try:
|
||||
p = self._create_pusher(pusherdict)
|
||||
p = pusher.create_pusher(self.hs, pusherdict)
|
||||
except PusherConfigException:
|
||||
logger.exception("Couldn't start a pusher: caught PusherConfigException")
|
||||
continue
|
||||
if p:
|
||||
fullid = "%s:%s:%s" % (
|
||||
appid_pushkey = "%s:%s" % (
|
||||
pusherdict['app_id'],
|
||||
pusherdict['pushkey'],
|
||||
pusherdict['user_name']
|
||||
)
|
||||
if fullid in self.pushers:
|
||||
self.pushers[fullid].stop()
|
||||
self.pushers[fullid] = p
|
||||
preserve_fn(p.start)()
|
||||
byuser = self.pushers.setdefault(pusherdict['user_name'], {})
|
||||
|
||||
if appid_pushkey in byuser:
|
||||
byuser[appid_pushkey].on_stop()
|
||||
byuser[appid_pushkey] = p
|
||||
preserve_fn(p.on_started)()
|
||||
|
||||
logger.info("Started pushers")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def remove_pusher(self, app_id, pushkey, user_id):
|
||||
fullid = "%s:%s:%s" % (app_id, pushkey, user_id)
|
||||
if fullid in self.pushers:
|
||||
logger.info("Stopping pusher %s", fullid)
|
||||
self.pushers[fullid].stop()
|
||||
del self.pushers[fullid]
|
||||
appid_pushkey = "%s:%s" % (app_id, pushkey)
|
||||
|
||||
byuser = self.pushers.get(user_id, {})
|
||||
|
||||
if appid_pushkey in byuser:
|
||||
logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
|
||||
byuser[appid_pushkey].on_stop()
|
||||
del byuser[appid_pushkey]
|
||||
yield self.store.delete_pusher_by_app_id_pushkey_user_id(
|
||||
app_id, pushkey, user_id
|
||||
)
|
||||
|
|
|
@ -100,6 +100,54 @@ class EventPushActionsStore(SQLBaseStore):
|
|||
)
|
||||
defer.returnValue(ret)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
|
||||
def f(txn):
|
||||
sql = (
|
||||
"SELECT DISTINCT(user_id) FROM event_push_actions WHERE"
|
||||
" stream_ordering >= ? AND stream_ordering <= ?"
|
||||
)
|
||||
txn.execute(sql, (min_stream_ordering, max_stream_ordering))
|
||||
return [r[0] for r in txn.fetchall()]
|
||||
ret = yield self.runInteraction("get_push_action_users_in_range", f)
|
||||
defer.returnValue(ret)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_unread_push_actions_for_user_in_range(self, user_id,
|
||||
min_stream_ordering,
|
||||
max_stream_ordering=None):
|
||||
def f(txn):
|
||||
sql = (
|
||||
"SELECT event_id, stream_ordering, actions"
|
||||
" FROM event_push_actions"
|
||||
" WHERE user_id = ? AND stream_ordering > ?"
|
||||
)
|
||||
args = [user_id, min_stream_ordering]
|
||||
if max_stream_ordering is not None:
|
||||
sql += " AND stream_ordering <= ?"
|
||||
args.append(max_stream_ordering)
|
||||
sql += " ORDER BY stream_ordering ASC"
|
||||
txn.execute(sql, args)
|
||||
return txn.fetchall()
|
||||
ret = yield self.runInteraction("get_unread_push_actions_for_user_in_range", f)
|
||||
defer.returnValue([
|
||||
{
|
||||
"event_id": row[0],
|
||||
"stream_ordering": row[1],
|
||||
"actions": json.loads(row[2]),
|
||||
} for row in ret
|
||||
])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_latest_push_action_stream_ordering(self):
|
||||
def f(txn):
|
||||
txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
|
||||
return txn.fetchone()
|
||||
result = yield self.runInteraction(
|
||||
"get_latest_push_action_stream_ordering", f
|
||||
)
|
||||
defer.returnValue(result[0] or 0)
|
||||
|
||||
def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
|
||||
# Sad that we have to blow away the cache for the whole room here
|
||||
txn.call_after(
|
||||
|
|
|
@ -61,6 +61,17 @@ class EventsStore(SQLBaseStore):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def persist_events(self, events_and_contexts, backfilled=False):
|
||||
"""
|
||||
Write events to the database
|
||||
Args:
|
||||
events_and_contexts: list of tuples of (event, context)
|
||||
backfilled: ?
|
||||
|
||||
Returns: Tuple of stream_orderings where the first is the minimum and
|
||||
last is the maximum stream ordering assigned to the events when
|
||||
persisting.
|
||||
|
||||
"""
|
||||
if not events_and_contexts:
|
||||
return
|
||||
|
||||
|
@ -191,6 +202,9 @@ class EventsStore(SQLBaseStore):
|
|||
txn.call_after(self._get_current_state_for_key.invalidate_all)
|
||||
txn.call_after(self.get_rooms_for_user.invalidate_all)
|
||||
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
|
||||
txn.call_after(
|
||||
self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
|
||||
)
|
||||
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
|
||||
txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
|
||||
|
||||
|
|
|
@ -18,6 +18,8 @@ from twisted.internet import defer
|
|||
|
||||
from canonicaljson import encode_canonical_json
|
||||
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||
|
||||
import logging
|
||||
import simplejson as json
|
||||
import types
|
||||
|
@ -47,6 +49,13 @@ class PusherStore(SQLBaseStore):
|
|||
|
||||
return rows
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def user_has_pusher(self, user_id):
|
||||
ret = yield self._simple_select_one_onecol(
|
||||
"pushers", {"user_name": user_id}, "id", allow_none=True
|
||||
)
|
||||
defer.returnValue(ret is not None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
|
||||
def r(txn):
|
||||
|
@ -107,31 +116,46 @@ class PusherStore(SQLBaseStore):
|
|||
"get_all_updated_pushers", get_all_updated_pushers_txn
|
||||
)
|
||||
|
||||
@cachedInlineCallbacks(num_args=1)
|
||||
def get_users_with_pushers_in_room(self, room_id):
|
||||
users = yield self.get_users_in_room(room_id)
|
||||
|
||||
result = yield self._simple_select_many_batch(
|
||||
'pushers', 'user_name', users, ['user_name']
|
||||
)
|
||||
|
||||
defer.returnValue([r['user_name'] for r in result])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def add_pusher(self, user_id, access_token, kind, app_id,
|
||||
app_display_name, device_display_name,
|
||||
pushkey, pushkey_ts, lang, data, profile_tag=""):
|
||||
pushkey, pushkey_ts, lang, data, last_stream_ordering,
|
||||
profile_tag=""):
|
||||
with self._pushers_id_gen.get_next() as stream_id:
|
||||
yield self._simple_upsert(
|
||||
"pushers",
|
||||
dict(
|
||||
app_id=app_id,
|
||||
pushkey=pushkey,
|
||||
user_name=user_id,
|
||||
),
|
||||
dict(
|
||||
access_token=access_token,
|
||||
kind=kind,
|
||||
app_display_name=app_display_name,
|
||||
device_display_name=device_display_name,
|
||||
ts=pushkey_ts,
|
||||
lang=lang,
|
||||
data=encode_canonical_json(data),
|
||||
profile_tag=profile_tag,
|
||||
id=stream_id,
|
||||
),
|
||||
desc="add_pusher",
|
||||
)
|
||||
def f(txn):
|
||||
txn.call_after(self.get_users_with_pushers_in_room.invalidate_all)
|
||||
return self._simple_upsert_txn(
|
||||
txn,
|
||||
"pushers",
|
||||
{
|
||||
"app_id": app_id,
|
||||
"pushkey": pushkey,
|
||||
"user_name": user_id,
|
||||
},
|
||||
{
|
||||
"access_token": access_token,
|
||||
"kind": kind,
|
||||
"app_display_name": app_display_name,
|
||||
"device_display_name": device_display_name,
|
||||
"ts": pushkey_ts,
|
||||
"lang": lang,
|
||||
"data": encode_canonical_json(data),
|
||||
"last_stream_ordering": last_stream_ordering,
|
||||
"profile_tag": profile_tag,
|
||||
"id": stream_id,
|
||||
},
|
||||
)
|
||||
defer.returnValue((yield self.runInteraction("add_pusher", f)))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
|
||||
|
@ -153,22 +177,28 @@ class PusherStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_pusher_last_token(self, app_id, pushkey, user_id, last_token):
|
||||
def update_pusher_last_stream_ordering(self, app_id, pushkey, user_id,
|
||||
last_stream_ordering):
|
||||
yield self._simple_update_one(
|
||||
"pushers",
|
||||
{'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
|
||||
{'last_token': last_token},
|
||||
desc="update_pusher_last_token",
|
||||
{'last_stream_ordering': last_stream_ordering},
|
||||
desc="update_pusher_last_stream_ordering",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def update_pusher_last_token_and_success(self, app_id, pushkey, user_id,
|
||||
last_token, last_success):
|
||||
def update_pusher_last_stream_ordering_and_success(self, app_id, pushkey,
|
||||
user_id,
|
||||
last_stream_ordering,
|
||||
last_success):
|
||||
yield self._simple_update_one(
|
||||
"pushers",
|
||||
{'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
|
||||
{'last_token': last_token, 'last_success': last_success},
|
||||
desc="update_pusher_last_token_and_success",
|
||||
{
|
||||
'last_stream_ordering': last_stream_ordering,
|
||||
'last_success': last_success
|
||||
},
|
||||
desc="update_pusher_last_stream_ordering_and_success",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
|
@ -390,16 +390,19 @@ class ReceiptsStore(SQLBaseStore):
|
|||
}
|
||||
)
|
||||
|
||||
def get_all_updated_receipts(self, last_id, current_id, limit):
|
||||
def get_all_updated_receipts(self, last_id, current_id, limit=None):
|
||||
def get_all_updated_receipts_txn(txn):
|
||||
sql = (
|
||||
"SELECT stream_id, room_id, receipt_type, user_id, event_id, data"
|
||||
" FROM receipts_linearized"
|
||||
" WHERE ? < stream_id AND stream_id <= ?"
|
||||
" ORDER BY stream_id ASC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
txn.execute(sql, (last_id, current_id, limit))
|
||||
args = [last_id, current_id]
|
||||
if limit is not None:
|
||||
sql += " LIMIT ?"
|
||||
args.append(limit)
|
||||
txn.execute(sql, args)
|
||||
|
||||
return txn.fetchall()
|
||||
return self.runInteraction(
|
||||
|
|
|
@ -20,7 +20,7 @@ from twisted.internet import defer
|
|||
from synapse.api.errors import StoreError, Codes
|
||||
|
||||
from ._base import SQLBaseStore
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||
|
||||
|
||||
class RegistrationStore(SQLBaseStore):
|
||||
|
@ -319,26 +319,6 @@ class RegistrationStore(SQLBaseStore):
|
|||
|
||||
defer.returnValue(res if res else False)
|
||||
|
||||
@cachedList(cached_method_name="is_guest", list_name="user_ids", num_args=1,
|
||||
inlineCallbacks=True)
|
||||
def are_guests(self, user_ids):
|
||||
sql = "SELECT name, is_guest FROM users WHERE name IN (%s)" % (
|
||||
",".join("?" for _ in user_ids),
|
||||
)
|
||||
|
||||
rows = yield self._execute(
|
||||
"are_guests", self.cursor_to_dict, sql, *user_ids
|
||||
)
|
||||
|
||||
result = {user_id: False for user_id in user_ids}
|
||||
|
||||
result.update({
|
||||
row["name"]: bool(row["is_guest"])
|
||||
for row in rows
|
||||
})
|
||||
|
||||
defer.returnValue(result)
|
||||
|
||||
def _query_for_auth(self, txn, token):
|
||||
sql = (
|
||||
"SELECT users.name, users.is_guest, access_tokens.id as token_id"
|
||||
|
|
|
@ -58,6 +58,9 @@ class RoomMemberStore(SQLBaseStore):
|
|||
txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,))
|
||||
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
|
||||
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
|
||||
txn.call_after(
|
||||
self.get_users_with_pushers_in_room.invalidate, (event.room_id,)
|
||||
)
|
||||
txn.call_after(
|
||||
self._membership_stream_cache.entity_has_changed,
|
||||
event.state_key, event.internal_metadata.stream_ordering
|
||||
|
|
79
synapse/storage/schema/delta/31/pushers.py
Normal file
79
synapse/storage/schema/delta/31/pushers.py
Normal file
|
@ -0,0 +1,79 @@
|
|||
# Copyright 2016 OpenMarket Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
# Change the last_token to last_stream_ordering now that pushers no longer
|
||||
# listen on an event stream but instead select out of the event_push_actions
|
||||
# table.
|
||||
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def token_to_stream_ordering(token):
|
||||
return int(token[1:].split('_')[0])
|
||||
|
||||
|
||||
def run_create(cur, database_engine, *args, **kwargs):
|
||||
logger.info("Porting pushers table, delta 31...")
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS pushers2 (
|
||||
id BIGINT PRIMARY KEY,
|
||||
user_name TEXT NOT NULL,
|
||||
access_token BIGINT DEFAULT NULL,
|
||||
profile_tag VARCHAR(32) NOT NULL,
|
||||
kind VARCHAR(8) NOT NULL,
|
||||
app_id VARCHAR(64) NOT NULL,
|
||||
app_display_name VARCHAR(64) NOT NULL,
|
||||
device_display_name VARCHAR(128) NOT NULL,
|
||||
pushkey TEXT NOT NULL,
|
||||
ts BIGINT NOT NULL,
|
||||
lang VARCHAR(8),
|
||||
data TEXT,
|
||||
last_stream_ordering INTEGER,
|
||||
last_success BIGINT,
|
||||
failing_since BIGINT,
|
||||
UNIQUE (app_id, pushkey, user_name)
|
||||
)
|
||||
""")
|
||||
cur.execute("""SELECT
|
||||
id, user_name, access_token, profile_tag, kind,
|
||||
app_id, app_display_name, device_display_name,
|
||||
pushkey, ts, lang, data, last_token, last_success,
|
||||
failing_since
|
||||
FROM pushers
|
||||
""")
|
||||
count = 0
|
||||
for row in cur.fetchall():
|
||||
row = list(row)
|
||||
row[12] = token_to_stream_ordering(row[12])
|
||||
cur.execute(database_engine.convert_param_style("""
|
||||
INSERT into pushers2 (
|
||||
id, user_name, access_token, profile_tag, kind,
|
||||
app_id, app_display_name, device_display_name,
|
||||
pushkey, ts, lang, data, last_stream_ordering, last_success,
|
||||
failing_since
|
||||
) values (%s)""" % (','.join(['?' for _ in range(len(row))]))),
|
||||
row
|
||||
)
|
||||
count += 1
|
||||
cur.execute("DROP TABLE pushers")
|
||||
cur.execute("ALTER TABLE pushers2 RENAME TO pushers")
|
||||
logger.info("Moved %d pushers to new table", count)
|
||||
|
||||
|
||||
def run_upgrade(cur, database_engine, *args, **kwargs):
|
||||
pass
|
Loading…
Reference in a new issue