forked from MirrorHub/synapse
Fix remaining scheduler bugs. Add more informative logging.
This commit is contained in:
parent
7e0bba555c
commit
db1fbc6c6f
3 changed files with 28 additions and 37 deletions
|
@ -83,9 +83,8 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||||
|
|
||||||
uri = service.url + ("/transactions/%s" %
|
uri = service.url + ("/transactions/%s" %
|
||||||
urllib.quote(txn_id))
|
urllib.quote(txn_id))
|
||||||
response = None
|
|
||||||
try:
|
try:
|
||||||
response = yield self.put_json(
|
yield self.put_json(
|
||||||
uri=uri,
|
uri=uri,
|
||||||
json_body={
|
json_body={
|
||||||
"events": events
|
"events": events
|
||||||
|
@ -93,9 +92,8 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||||
args={
|
args={
|
||||||
"access_token": service.hs_token
|
"access_token": service.hs_token
|
||||||
})
|
})
|
||||||
if response: # just an empty json object
|
defer.returnValue(True)
|
||||||
# TODO: Mark txn as sent successfully
|
return
|
||||||
defer.returnValue(True)
|
|
||||||
except CodeMessageException as e:
|
except CodeMessageException as e:
|
||||||
logger.warning("push_bulk to %s received %s", uri, e.code)
|
logger.warning("push_bulk to %s received %s", uri, e.code)
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
|
|
|
@ -77,6 +77,7 @@ class AppServiceScheduler(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def start(self):
|
def start(self):
|
||||||
|
logger.info("Starting appservice scheduler")
|
||||||
# check for any DOWN ASes and start recoverers for them.
|
# check for any DOWN ASes and start recoverers for them.
|
||||||
recoverers = yield _Recoverer.start(
|
recoverers = yield _Recoverer.start(
|
||||||
self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
|
self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
|
||||||
|
@ -137,40 +138,33 @@ class _TransactionController(object):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_recovered(self, recoverer):
|
def on_recovered(self, recoverer):
|
||||||
self.recoverers.remove(recoverer)
|
self.recoverers.remove(recoverer)
|
||||||
logger.info("Successfully recovered application service: %s",
|
logger.info("Successfully recovered application service AS ID %s",
|
||||||
recoverer.service)
|
recoverer.service.id)
|
||||||
logger.info("Active recoverers: %s", len(self.recoverers))
|
logger.info("Remaining active recoverers: %s", len(self.recoverers))
|
||||||
applied_state = yield self.store.set_appservice_state(
|
yield self.store.set_appservice_state(
|
||||||
recoverer.service,
|
recoverer.service,
|
||||||
ApplicationServiceState.UP
|
ApplicationServiceState.UP
|
||||||
)
|
)
|
||||||
if not applied_state:
|
|
||||||
logger.error("Failed to apply appservice state UP to service %s",
|
|
||||||
recoverer.service)
|
|
||||||
|
|
||||||
def add_recoverers(self, recoverers):
|
def add_recoverers(self, recoverers):
|
||||||
for r in recoverers:
|
for r in recoverers:
|
||||||
self.recoverers.append(r)
|
self.recoverers.append(r)
|
||||||
if len(recoverers) > 0:
|
if len(recoverers) > 0:
|
||||||
logger.info("Active recoverers: %s", len(self.recoverers))
|
logger.info("New active recoverers: %s", len(self.recoverers))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _start_recoverer(self, service):
|
def _start_recoverer(self, service):
|
||||||
applied_state = yield self.store.set_appservice_state(
|
yield self.store.set_appservice_state(
|
||||||
service,
|
service,
|
||||||
ApplicationServiceState.DOWN
|
ApplicationServiceState.DOWN
|
||||||
)
|
)
|
||||||
if applied_state:
|
logger.info(
|
||||||
logger.info(
|
"Application service falling behind. Starting recoverer. AS ID %s",
|
||||||
"Application service falling behind. Starting recoverer. %s",
|
service.id
|
||||||
service
|
)
|
||||||
)
|
recoverer = self.recoverer_fn(service, self.on_recovered)
|
||||||
recoverer = self.recoverer_fn(service, self.on_recovered)
|
self.add_recoverers([recoverer])
|
||||||
self.add_recoverers([recoverer])
|
recoverer.recover()
|
||||||
recoverer.recover()
|
|
||||||
else:
|
|
||||||
logger.error("Failed to apply appservice state DOWN to service %s",
|
|
||||||
service)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _is_service_up(self, service):
|
def _is_service_up(self, service):
|
||||||
|
@ -190,6 +184,8 @@ class _Recoverer(object):
|
||||||
_Recoverer(clock, store, as_api, s, callback) for s in services
|
_Recoverer(clock, store, as_api, s, callback) for s in services
|
||||||
]
|
]
|
||||||
for r in recoverers:
|
for r in recoverers:
|
||||||
|
logger.info("Starting recoverer for AS ID %s which was marked as "
|
||||||
|
"DOWN", r.service.id)
|
||||||
r.recover()
|
r.recover()
|
||||||
defer.returnValue(recoverers)
|
defer.returnValue(recoverers)
|
||||||
|
|
||||||
|
@ -206,12 +202,13 @@ class _Recoverer(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def retry(self):
|
def retry(self):
|
||||||
txn = yield self._get_oldest_txn()
|
txn = yield self.store.get_oldest_unsent_txn(self.service)
|
||||||
if txn:
|
if txn:
|
||||||
logger.info("Retrying transaction %s for service %s",
|
logger.info("Retrying transaction %s for AS ID %s",
|
||||||
txn.id, txn.service)
|
txn.id, txn.service.id)
|
||||||
if txn.send(self.as_api):
|
sent = yield txn.send(self.as_api)
|
||||||
txn.complete(self.store)
|
if sent:
|
||||||
|
yield txn.complete(self.store)
|
||||||
# reset the backoff counter and retry immediately
|
# reset the backoff counter and retry immediately
|
||||||
self.backoff_counter = 1
|
self.backoff_counter = 1
|
||||||
yield self.retry()
|
yield self.retry()
|
||||||
|
@ -225,8 +222,3 @@ class _Recoverer(object):
|
||||||
|
|
||||||
def _set_service_recovered(self):
|
def _set_service_recovered(self):
|
||||||
self.callback(self)
|
self.callback(self)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _get_oldest_txn(self):
|
|
||||||
txn = yield self.store.get_oldest_unsent_txn(self.service)
|
|
||||||
defer.returnValue(txn)
|
|
||||||
|
|
|
@ -293,6 +293,8 @@ class ApplicationServiceStore(SQLBaseStore):
|
||||||
services = {}
|
services = {}
|
||||||
for res in results:
|
for res in results:
|
||||||
as_token = res["token"]
|
as_token = res["token"]
|
||||||
|
if as_token is None:
|
||||||
|
continue
|
||||||
if as_token not in services:
|
if as_token not in services:
|
||||||
# add the service
|
# add the service
|
||||||
services[as_token] = {
|
services[as_token] = {
|
||||||
|
@ -516,11 +518,10 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
|
||||||
# Monotonically increasing txn ids, so just select the smallest
|
# Monotonically increasing txn ids, so just select the smallest
|
||||||
# one in the txns table (we delete them when they are sent)
|
# one in the txns table (we delete them when they are sent)
|
||||||
result = txn.execute(
|
result = txn.execute(
|
||||||
"SELECT *,MIN(txn_id) FROM application_services_txns WHERE as_id=?",
|
"SELECT MIN(txn_id), * FROM application_services_txns WHERE as_id=?",
|
||||||
(service.id,)
|
(service.id,)
|
||||||
)
|
)
|
||||||
entry = self.cursor_to_dict(result)[0]
|
entry = self.cursor_to_dict(result)[0]
|
||||||
|
|
||||||
if not entry or entry["txn_id"] is None:
|
if not entry or entry["txn_id"] is None:
|
||||||
# the min(txn_id) part will force a row, so entry may not be None
|
# the min(txn_id) part will force a row, so entry may not be None
|
||||||
return None
|
return None
|
||||||
|
|
Loading…
Reference in a new issue