forked from MirrorHub/synapse
Scattergather the call out to ASes; validate received results
This commit is contained in:
parent
434bbf2cb5
commit
80f4740c8f
1 changed files with 34 additions and 7 deletions
|
@ -34,6 +34,26 @@ def log_failure(failure):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _is_valid_3pu_result(r):
|
||||||
|
if not isinstance(r, dict):
|
||||||
|
return False
|
||||||
|
|
||||||
|
for k in ("userid", "protocol"):
|
||||||
|
if k not in r:
|
||||||
|
return False
|
||||||
|
if not isinstance(r[k], str):
|
||||||
|
return False
|
||||||
|
|
||||||
|
if "fields" not in r:
|
||||||
|
return False
|
||||||
|
fields = r["fields"]
|
||||||
|
if not isinstance(fields, dict):
|
||||||
|
return False
|
||||||
|
for k in fields.keys():
|
||||||
|
if not isinstance(fields[k], str):
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
class ApplicationServicesHandler(object):
|
class ApplicationServicesHandler(object):
|
||||||
|
|
||||||
|
@ -150,16 +170,23 @@ class ApplicationServicesHandler(object):
|
||||||
def query_3pu(self, protocol, fields):
|
def query_3pu(self, protocol, fields):
|
||||||
services = yield self._get_services_for_3pn(protocol)
|
services = yield self._get_services_for_3pn(protocol)
|
||||||
|
|
||||||
# TODO(paul): scattergather
|
deferreds = []
|
||||||
results = []
|
|
||||||
for service in services:
|
for service in services:
|
||||||
result = yield self.appservice_api.query_3pu(
|
deferreds.append(self.appservice_api.query_3pu(
|
||||||
service, protocol, fields
|
service, protocol, fields
|
||||||
)
|
))
|
||||||
if result:
|
|
||||||
results.extend(result)
|
|
||||||
|
|
||||||
defer.returnValue(results)
|
results = yield defer.DeferredList(deferreds, consumeErrors=True)
|
||||||
|
|
||||||
|
ret = []
|
||||||
|
for (success, result) in results:
|
||||||
|
if not success:
|
||||||
|
continue
|
||||||
|
if not isinstance(result, list):
|
||||||
|
continue
|
||||||
|
ret.extend(r for r in result if _is_valid_3pu_result(r))
|
||||||
|
|
||||||
|
defer.returnValue(ret)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _get_services_for_event(self, event):
|
def _get_services_for_event(self, event):
|
||||||
|
|
Loading…
Reference in a new issue