Update to app_id / app_instance_id (partially) and mangle to be PEP8 compliant.

This commit is contained in:
David Baker 2014-12-03 13:37:02 +00:00
parent bdc21e7282
commit 88af58d41d
7 changed files with 214 additions and 131 deletions

View file

@ -24,90 +24,127 @@ import logging
logger = logging.getLogger(__name__)
class Pusher(object):
INITIAL_BACKOFF = 1000
MAX_BACKOFF = 60 * 60 * 1000
GIVE_UP_AFTER = 24 * 60 * 60 * 1000
def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data,
def __init__(self, _hs, user_name, app_id, app_instance_id,
app_display_name, device_display_name, pushkey, data,
last_token, last_success, failing_since):
self.hs = _hs
self.evStreamHandler = self.hs.get_handlers().event_stream_handler
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()
self.user_name = user_name
self.app = app
self.app_id = app_id
self.app_instance_id = app_instance_id
self.app_display_name = app_display_name
self.device_display_name = device_display_name
self.pushkey = pushkey
self.data = data
self.last_token = last_token
self.last_success = last_success # not actually used
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.failing_since = None
self.failing_since = failing_since
@defer.inlineCallbacks
def start(self):
if not self.last_token:
# First-time setup: get a token to start from (we can't just start from no token, ie. 'now'
# because we need the result to be reproduceable in case we fail to dispatch the push)
# First-time setup: get a token to start from (we can't
# just start from no token, ie. 'now'
# because we need the result to be reproduceable in case
# we fail to dispatch the push)
config = PaginationConfig(from_token=None, limit='1')
chunk = yield self.evStreamHandler.get_stream(self.user_name, config, timeout=0)
chunk = yield self.evStreamHandler.get_stream(
self.user_name, config, timeout=0)
self.last_token = chunk['end']
self.store.update_pusher_last_token(self.user_name, self.pushkey, self.last_token)
self.store.update_pusher_last_token(
self.user_name, self.pushkey, self.last_token)
logger.info("Pusher %s for user %s starting from token %s",
self.pushkey, self.user_name, self.last_token)
while True:
from_tok = StreamToken.from_string(self.last_token)
config = PaginationConfig(from_token=from_tok, limit='1')
chunk = yield self.evStreamHandler.get_stream(self.user_name, config, timeout=100*365*24*60*60*1000)
chunk = yield self.evStreamHandler.get_stream(
self.user_name, config, timeout=100*365*24*60*60*1000)
# limiting to 1 may get 1 event plus 1 presence event, so pick out the actual event
singleEvent = None
# limiting to 1 may get 1 event plus 1 presence event, so
# pick out the actual event
single_event = None
for c in chunk['chunk']:
if 'event_id' in c: # Hmmm...
singleEvent = c
single_event = c
break
if not singleEvent:
if not single_event:
continue
ret = yield self.dispatchPush(singleEvent)
if (ret):
ret = yield self.dispatch_push(single_event)
if ret:
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.last_token = chunk['end']
self.store.update_pusher_last_token_and_success(self.user_name, self.pushkey,
self.last_token, self.clock.time_msec())
self.store.update_pusher_last_token_and_success(
self.user_name,
self.pushkey,
self.last_token,
self.clock.time_msec()
)
if self.failing_since:
self.failing_since = None
self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since)
self.store.update_pusher_failing_since(
self.user_name,
self.pushkey,
self.failing_since)
else:
if not self.failing_since:
self.failing_since = self.clock.time_msec()
self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since)
self.store.update_pusher_failing_since(
self.user_name,
self.pushkey,
self.failing_since
)
if self.failing_since and self.failing_since < self.clock.time_msec() - Pusher.GIVE_UP_AFTER:
# we really only give up so that if the URL gets fixed, we don't suddenly deliver a load
if self.failing_since and \
self.failing_since < \
self.clock.time_msec() - Pusher.GIVE_UP_AFTER:
# we really only give up so that if the URL gets
# fixed, we don't suddenly deliver a load
# of old notifications.
logger.warn("Giving up on a notification to user %s, pushkey %s",
logger.warn("Giving up on a notification to user %s, "
"pushkey %s",
self.user_name, self.pushkey)
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.last_token = chunk['end']
self.store.update_pusher_last_token(self.user_name, self.pushkey, self.last_token)
self.store.update_pusher_last_token(
self.user_name,
self.pushkey,
self.last_token
)
self.failing_since = None
self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since)
self.store.update_pusher_failing_since(
self.user_name,
self.pushkey,
self.failing_since
)
else:
logger.warn("Failed to dispatch push for user %s (failing for %dms)."
logger.warn("Failed to dispatch push for user %s "
"(failing for %dms)."
"Trying again in %dms",
self.user_name,
self.clock.time_msec() - self.failing_since,
self.backoff_delay
)
yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
self.backoff_delay *=2
self.backoff_delay *= 2
if self.backoff_delay > Pusher.MAX_BACKOFF:
self.backoff_delay = Pusher.MAX_BACKOFF
def dispatch_push(self, p):
pass
class PusherConfigException(Exception):
def __init__(self, msg):

View file

@ -22,21 +22,28 @@ import logging
logger = logging.getLogger(__name__)
class HttpPusher(Pusher):
def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data,
def __init__(self, _hs, user_name, app_id, app_instance_id,
app_display_name, device_display_name, pushkey, data,
last_token, last_success, failing_since):
super(HttpPusher, self).__init__(_hs,
super(HttpPusher, self).__init__(
_hs,
user_name,
app,
app_id,
app_instance_id,
app_display_name,
device_display_name,
pushkey,
data,
last_token,
last_success,
failing_since)
failing_since
)
if 'url' not in data:
raise PusherConfigException("'url' required in data for HTTP pusher")
raise PusherConfigException(
"'url' required in data for HTTP pusher"
)
self.url = data['url']
self.httpCli = SimpleHttpClient(self.hs)
self.data_minus_url = {}
@ -54,33 +61,35 @@ class HttpPusher(Pusher):
return {
'notification': {
'transition' : 'new', # everything is new for now: we don't have read receipts
'transition': 'new',
# everything is new for now: we don't have read receipts
'id': event['event_id'],
'type': event['type'],
'from': event['user_id'],
# we may have to fetch this over federation and we can't trust it anyway: is it worth it?
# we may have to fetch this over federation and we
# can't trust it anyway: is it worth it?
#'fromDisplayName': 'Steve Stevington'
},
#'counts': { -- we don't mark messages as read yet so we have no way of knowing
#'counts': { -- we don't mark messages as read yet so
# we have no way of knowing
# 'unread': 1,
# 'missedCalls': 2
# },
'devices': {
self.pushkey: {
'data' : self.data_minus_url
'data': self.data_minus_url
}
}
}
@defer.inlineCallbacks
def dispatchPush(self, event):
notificationDict = self._build_notification_dict(event)
if not notificationDict:
def dispatch_push(self, event):
notification_dict = self._build_notification_dict(event)
if not notification_dict:
defer.returnValue(True)
try:
yield self.httpCli.post_json_get_json(self.url, notificationDict)
yield self.httpCli.post_json_get_json(self.url, notification_dict)
except:
logger.exception("Failed to push %s ", self.url)
defer.returnValue(False)
defer.returnValue(True)

View file

@ -34,13 +34,17 @@ class PusherPool:
def start(self):
self._pushers_added()
def add_pusher(self, user_name, kind, app, app_display_name, device_display_name, pushkey, data):
# we try to create the pusher just to validate the config: it will then get pulled out of the database,
# recreated, added and started: this means we have only one code path adding pushers.
def add_pusher(self, user_name, kind, app_id, app_instance_id,
app_display_name, device_display_name, pushkey, data):
# we try to create the pusher just to validate the config: it
# will then get pulled out of the database,
# recreated, added and started: this means we have only one
# code path adding pushers.
self._create_pusher({
"user_name": user_name,
"kind": kind,
"app": app,
"app_id": app_id,
"app_instance_id": app_instance_id,
"app_display_name": app_display_name,
"device_display_name": device_display_name,
"pushkey": pushkey,
@ -49,24 +53,33 @@ class PusherPool:
"last_success": None,
"failing_since": None
})
self._add_pusher_to_store(user_name, kind, app, app_display_name, device_display_name, pushkey, data)
self._add_pusher_to_store(user_name, kind, app_id, app_instance_id,
app_display_name, device_display_name,
pushkey, data)
@defer.inlineCallbacks
def _add_pusher_to_store(self, user_name, kind, app, app_display_name, device_display_name, pushkey, data):
yield self.store.add_pusher(user_name=user_name,
def _add_pusher_to_store(self, user_name, kind, app_id, app_instance_id,
app_display_name, device_display_name,
pushkey, data):
yield self.store.add_pusher(
user_name=user_name,
kind=kind,
app=app,
app_id=app_id,
app_instance_id=app_instance_id,
app_display_name=app_display_name,
device_display_name=device_display_name,
pushkey=pushkey,
data=json.dumps(data))
data=json.dumps(data)
)
self._pushers_added()
def _create_pusher(self, pusherdict):
if pusherdict['kind'] == 'http':
return HttpPusher(self.hs,
return HttpPusher(
self.hs,
user_name=pusherdict['user_name'],
app=pusherdict['app'],
app_id=pusherdict['app_id'],
app_instance_id=pusherdict['app_instance_id'],
app_display_name=pusherdict['app_display_name'],
device_display_name=pusherdict['device_display_name'],
pushkey=pusherdict['pushkey'],
@ -76,15 +89,19 @@ class PusherPool:
failing_since=pusherdict['failing_since']
)
else:
raise PusherConfigException("Unknown pusher type '%s' for user %s" %
(pusherdict['kind'], pusherdict['user_name']))
raise PusherConfigException(
"Unknown pusher type '%s' for user %s" %
(pusherdict['kind'], pusherdict['user_name'])
)
@defer.inlineCallbacks
def _pushers_added(self):
pushers = yield self.store.get_all_pushers_after_id(self.last_pusher_started)
pushers = yield self.store.get_all_pushers_after_id(
self.last_pusher_started
)
for p in pushers:
p['data'] = json.loads(p['data'])
if (len(pushers)):
if len(pushers):
self.last_pusher_started = pushers[-1]['id']
self._start_pushers(pushers)

View file

@ -31,30 +31,37 @@ class PusherRestServlet(RestServlet):
content = _parse_json(request)
reqd = ['kind', 'app', 'app_display_name', 'device_display_name', 'data']
reqd = ['kind', 'app_id', 'app_instance_id', 'app_display_name',
'device_display_name', 'data']
missing = []
for i in reqd:
if i not in content:
missing.append(i)
if len(missing):
raise SynapseError(400, "Missing parameters: "+','.join(missing), errcode=Codes.MISSING_PARAM)
raise SynapseError(400, "Missing parameters: "+','.join(missing),
errcode=Codes.MISSING_PARAM)
pusher_pool = self.hs.get_pusherpool()
try:
pusher_pool.add_pusher(user_name=user.to_string(),
pusher_pool.add_pusher(
user_name=user.to_string(),
kind=content['kind'],
app=content['app'],
app_id=content['app_id'],
app_instance_id=content['app_instance_id'],
app_display_name=content['app_display_name'],
device_display_name=content['device_display_name'],
pushkey=pushkey,
data=content['data'])
data=content['data']
)
except PusherConfigException as pce:
raise SynapseError(400, "Config Error: "+pce.message, errcode=Codes.MISSING_PARAM)
raise SynapseError(400, "Config Error: "+pce.message,
errcode=Codes.MISSING_PARAM)
defer.returnValue((200, {}))
def on_OPTIONS(self, request):
return (200, {})
def on_OPTIONS(self, _):
return 200, {}
# XXX: C+ped from rest/room.py - surely this should be common?
def _parse_json(request):
@ -67,5 +74,6 @@ def _parse_json(request):
except ValueError:
raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON)
def register_servlets(hs, http_server):
PusherRestServlet(hs).register(http_server)

View file

@ -25,11 +25,13 @@ import logging
logger = logging.getLogger(__name__)
class PusherStore(SQLBaseStore):
@defer.inlineCallbacks
def get_all_pushers_after_id(self, min_id):
sql = (
"SELECT id, user_name, kind, app, app_display_name, device_display_name, pushkey, data, "
"SELECT id, user_name, kind, app_id, app_instance_id,"
"app_display_name, device_display_name, pushkey, data, "
"last_token, last_success, failing_since "
"FROM pushers "
"WHERE id > ?"
@ -42,14 +44,15 @@ class PusherStore(SQLBaseStore):
"id": r[0],
"user_name": r[1],
"kind": r[2],
"app": r[3],
"app_display_name": r[4],
"device_display_name": r[5],
"pushkey": r[6],
"data": r[7],
"last_token": r[8],
"last_success": r[9],
"failing_since": r[10]
"app_id": r[3],
"app_instance_id": r[4],
"app_display_name": r[5],
"device_display_name": r[6],
"pushkey": r[7],
"data": r[8],
"last_token": r[9],
"last_success": r[10],
"failing_since": r[11]
}
for r in rows
]
@ -57,12 +60,14 @@ class PusherStore(SQLBaseStore):
defer.returnValue(ret)
@defer.inlineCallbacks
def add_pusher(self, user_name, kind, app, app_display_name, device_display_name, pushkey, data):
def add_pusher(self, user_name, kind, app_id, app_instance_id,
app_display_name, device_display_name, pushkey, data):
try:
yield self._simple_insert(PushersTable.table_name, dict(
user_name=user_name,
kind=kind,
app=app,
app_id=app_id,
app_instance_id=app_instance_id,
app_display_name=app_display_name,
device_display_name=device_display_name,
pushkey=pushkey,
@ -76,21 +81,25 @@ class PusherStore(SQLBaseStore):
@defer.inlineCallbacks
def update_pusher_last_token(self, user_name, pushkey, last_token):
yield self._simple_update_one(PushersTable.table_name,
yield self._simple_update_one(
PushersTable.table_name,
{'user_name': user_name, 'pushkey': pushkey},
{'last_token': last_token}
)
@defer.inlineCallbacks
def update_pusher_last_token_and_success(self, user_name, pushkey, last_token, last_success):
yield self._simple_update_one(PushersTable.table_name,
def update_pusher_last_token_and_success(self, user_name, pushkey,
last_token, last_success):
yield self._simple_update_one(
PushersTable.table_name,
{'user_name': user_name, 'pushkey': pushkey},
{'last_token': last_token, 'last_success': last_success}
)
@defer.inlineCallbacks
def update_pusher_failing_since(self, user_name, pushkey, failing_since):
yield self._simple_update_one(PushersTable.table_name,
yield self._simple_update_one(
PushersTable.table_name,
{'user_name': user_name, 'pushkey': pushkey},
{'failing_since': failing_since}
)
@ -103,7 +112,8 @@ class PushersTable(Table):
"id",
"user_name",
"kind",
"app"
"app_id",
"app_instance_id",
"app_display_name",
"device_display_name",
"pushkey",

View file

@ -17,11 +17,12 @@ CREATE TABLE IF NOT EXISTS pushers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_name TEXT NOT NULL,
kind varchar(8) NOT NULL,
app varchar(64) NOT NULL,
app_id varchar(64) NOT NULL,
app_instance_id varchar(64) NOT NULL,
app_display_name varchar(64) NOT NULL,
device_display_name varchar(128) NOT NULL,
pushkey blob NOT NULL,
data text,
data blob,
last_token TEXT,
last_success BIGINT,
failing_since BIGINT,

View file

@ -17,11 +17,12 @@ CREATE TABLE IF NOT EXISTS pushers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_name TEXT NOT NULL,
kind varchar(8) NOT NULL,
app varchar(64) NOT NULL,
app_id varchar(64) NOT NULL,
app_instance_id varchar(64) NOT NULL,
app_display_name varchar(64) NOT NULL,
device_display_name varchar(128) NOT NULL,
pushkey blob NOT NULL,
data text,
data blob,
last_token TEXT,
last_success BIGINT,
failing_since BIGINT,