forked from MirrorHub/synapse
Add type hints for ObservableDeferred
attributes (#12159)
Signed-off-by: Sean Quah <seanq@element.io>
This commit is contained in:
parent
158e0937eb
commit
75574726a7
2 changed files with 12 additions and 3 deletions
1
changelog.d/12159.misc
Normal file
1
changelog.d/12159.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Add type hints for `ObservableDeferred` attributes.
|
|
@ -40,7 +40,7 @@ from typing import (
|
||||||
)
|
)
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
from typing_extensions import ContextManager
|
from typing_extensions import ContextManager, Literal
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from twisted.internet.defer import CancelledError
|
from twisted.internet.defer import CancelledError
|
||||||
|
@ -96,6 +96,10 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
|
||||||
|
|
||||||
__slots__ = ["_deferred", "_observers", "_result"]
|
__slots__ = ["_deferred", "_observers", "_result"]
|
||||||
|
|
||||||
|
_deferred: "defer.Deferred[_T]"
|
||||||
|
_observers: Union[List["defer.Deferred[_T]"], Tuple[()]]
|
||||||
|
_result: Union[None, Tuple[Literal[True], _T], Tuple[Literal[False], Failure]]
|
||||||
|
|
||||||
def __init__(self, deferred: "defer.Deferred[_T]", consumeErrors: bool = False):
|
def __init__(self, deferred: "defer.Deferred[_T]", consumeErrors: bool = False):
|
||||||
object.__setattr__(self, "_deferred", deferred)
|
object.__setattr__(self, "_deferred", deferred)
|
||||||
object.__setattr__(self, "_result", None)
|
object.__setattr__(self, "_result", None)
|
||||||
|
@ -158,12 +162,14 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
|
||||||
effect the underlying deferred.
|
effect the underlying deferred.
|
||||||
"""
|
"""
|
||||||
if not self._result:
|
if not self._result:
|
||||||
|
assert isinstance(self._observers, list)
|
||||||
d: "defer.Deferred[_T]" = defer.Deferred()
|
d: "defer.Deferred[_T]" = defer.Deferred()
|
||||||
self._observers.append(d)
|
self._observers.append(d)
|
||||||
return d
|
return d
|
||||||
|
elif self._result[0]:
|
||||||
|
return defer.succeed(self._result[1])
|
||||||
else:
|
else:
|
||||||
success, res = self._result
|
return defer.fail(self._result[1])
|
||||||
return defer.succeed(res) if success else defer.fail(res)
|
|
||||||
|
|
||||||
def observers(self) -> "Collection[defer.Deferred[_T]]":
|
def observers(self) -> "Collection[defer.Deferred[_T]]":
|
||||||
return self._observers
|
return self._observers
|
||||||
|
@ -175,6 +181,8 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
|
||||||
return self._result is not None and self._result[0] is True
|
return self._result is not None and self._result[0] is True
|
||||||
|
|
||||||
def get_result(self) -> Union[_T, Failure]:
|
def get_result(self) -> Union[_T, Failure]:
|
||||||
|
if self._result is None:
|
||||||
|
raise ValueError(f"{self!r} has no result yet")
|
||||||
return self._result[1]
|
return self._result[1]
|
||||||
|
|
||||||
def __getattr__(self, name: str) -> Any:
|
def __getattr__(self, name: str) -> Any:
|
||||||
|
|
Loading…
Reference in a new issue