From 682dced40b1f9d8857ba9fd6d1bd9160a7cc3c2e Mon Sep 17 00:00:00 2001 From: Pat Gavlin Date: Fri, 28 Feb 2020 17:22:50 -0800 Subject: [PATCH] Mock resource monitor (#3738) These changes add support for mocking the resource monitor to the NodeJS and Python SDKs. The proposed mock interface is a simplified version of the standard resource monitor that allows an end-user to replace the usual implementations of ReadResource/RegisterResource and Invoke with their own. This can be used in unit tests to allow for precise control of resource outputs and invoke results. --- CHANGELOG.md | 4 + sdk/nodejs/runtime/index.ts | 1 + sdk/nodejs/runtime/mocks.ts | 127 +++++++++++++++ sdk/nodejs/runtime/settings.ts | 24 ++- sdk/nodejs/tsconfig.json | 1 + sdk/python/lib/pulumi/runtime/__init__.py | 6 + sdk/python/lib/pulumi/runtime/invoke.py | 72 +-------- sdk/python/lib/pulumi/runtime/mocks.py | 164 ++++++++++++++++++++ sdk/python/lib/pulumi/runtime/settings.py | 27 ++-- sdk/python/lib/pulumi/runtime/stack.py | 19 ++- sdk/python/lib/pulumi/runtime/sync_await.py | 83 ++++++++++ 11 files changed, 438 insertions(+), 90 deletions(-) create mode 100644 sdk/nodejs/runtime/mocks.ts create mode 100644 sdk/python/lib/pulumi/runtime/mocks.py create mode 100644 sdk/python/lib/pulumi/runtime/sync_await.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 0500696d4..677868f28 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,13 @@ CHANGELOG ========= +## HEAD (unreleased) - Fix missing module import on Windows platform. [#3983](https://github.com/pulumi/pulumi/pull/3983) +- Add support for mocking the resource monitor to the NodeJS and Python SDKs. + [#3738](https://github.com/pulumi/pulumi/pull/3738/files) + ## 1.11.1 (2020-02-26) - Fix a regression for CustomTimeouts in Python SDK. [#3964](https://github.com/pulumi/pulumi/pull/3964) diff --git a/sdk/nodejs/runtime/index.ts b/sdk/nodejs/runtime/index.ts index 508026883..ba46ae605 100644 --- a/sdk/nodejs/runtime/index.ts +++ b/sdk/nodejs/runtime/index.ts @@ -20,6 +20,7 @@ export { } from "./closure/serializeClosure"; export { CodePathOptions, computeCodePaths } from "./closure/codePaths"; +export { Mocks, setMocks } from "./mocks"; export * from "./config"; export * from "./invoke"; diff --git a/sdk/nodejs/runtime/mocks.ts b/sdk/nodejs/runtime/mocks.ts new file mode 100644 index 000000000..79857c7c2 --- /dev/null +++ b/sdk/nodejs/runtime/mocks.ts @@ -0,0 +1,127 @@ +// Copyright 2016-2018, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { deserializeProperties, serializeProperties } from "./rpc"; +import { getProject, getStack, setMockOptions } from "./settings"; + +const provproto = require("../proto/provider_pb.js"); +const resproto = require("../proto/resource_pb.js"); +const structproto = require("google-protobuf/google/protobuf/struct_pb.js"); + +/** + * Mocks is an abstract class that allows subclasses to replace operations normally implemented by the Pulumi engine with + * their own implementations. This can be used during testing to ensure that calls to provider functions and resource constructors + * return predictable values. + */ +export interface Mocks { + /** + * call mocks provider-implemented function calls (e.g. aws.get_availability_zones). + * + * @param token: The token that indicates which function is being called. This token is of the form "package:module:function". + * @param args: The arguments provided to the function call. + * @param provider: If provided, the identifier of the provider instance being used to make the call. + */ + call(token: string, args: any, provider?: string): Record; + + /** + * new_resource mocks resource construction calls. This function should return the physical identifier and the output properties + * for the resource being constructed. + * + * @param type_: The token that indicates which resource type is being constructed. This token is of the form "package:module:type". + * @param name: The logical name of the resource instance. + * @param inputs: The inputs for the resource. + * @param provider: If provided, the identifier of the provider instnace being used to manage this resource. + * @param id_: If provided, the physical identifier of an existing resource to read or import. + */ + newResource(type: string, name: string, inputs: any, provider?: string, id?: string): { id: string, state: Record }; +} + +export class MockMonitor { + mocks: Mocks; + + constructor(mocks: Mocks) { + this.mocks = mocks; + } + + private newUrn(parent: string, type: string, name: string): string { + if (parent) { + const qualifiedType = parent.split("::")[2]; + const parentType = qualifiedType.split("$").pop(); + type = parentType + "$" + type; + } + return "urn:pulumi:" + [getStack(), getProject(), type, name].join("::"); + } + + public async invoke(req: any, callback: (err: any, innerResponse: any) => void) { + try { + const result = this.mocks.call(req.getTok(), deserializeProperties(req.getArgs()), req.getProvider()); + const response = new provproto.InvokeResponse(); + response.setReturn(structproto.Struct.fromJavaScript(await serializeProperties("", result))); + callback(null, response); + } catch (err) { + callback(err, undefined); + } + } + + public async readResource(req: any, callback: (err: any, innterResponse: any) => void) { + try { + const result = this.mocks.newResource( + req.getType(), + req.getName(), + deserializeProperties(req.getProperties()), + req.getProvider(), + req.getId()); + const response = new resproto.ReadResourceResponse(); + response.setUrn(this.newUrn(req.getParent(), req.getType(), req.getName())); + response.setProperties(structproto.Struct.fromJavaScript(await serializeProperties("", result.state))); + callback(null, response); + } catch (err) { + callback(err, undefined); + } + } + + public async registerResource(req: any, callback: (err: any, innerResponse: any) => void) { + try { + const result = this.mocks.newResource( + req.getType(), + req.getName(), + deserializeProperties(req.getObject()), + req.getProvider(), + req.getImportid()); + const response = new resproto.RegisterResourceResponse(); + response.setUrn(this.newUrn(req.getParent(), req.getType(), req.getName())); + response.setId(result.id); + response.setObject(structproto.Struct.fromJavaScript(await serializeProperties("", result.state))); + callback(null, response); + } catch (err) { + callback(err, undefined); + } + } + + public registerResourceOutputs(req: any, callback: (err: any, innerResponse: any) => void) { + callback(null, {}); + } +} + +/** + * setMocks configures the Pulumi runtime to use the given mocks for testing. + * + * @param mocks: The mocks to use for calls to provider functions and resource consrtuction. + * @param project: If provided, the name of the Pulumi project. Defaults to "project". + * @param stack: If provided, the name of the Pulumi stack. Defaults to "stack". + * @param preview: If provided, indicates whether or not the program is running a preview. Defaults to false. + */ +export function setMocks(mocks: Mocks, project?: string, stack?: string, preview?: boolean) { + setMockOptions(new MockMonitor(mocks), project, stack, preview); +} diff --git a/sdk/nodejs/runtime/settings.ts b/sdk/nodejs/runtime/settings.ts index 844d92200..75e54ff74 100644 --- a/sdk/nodejs/runtime/settings.ts +++ b/sdk/nodejs/runtime/settings.ts @@ -20,8 +20,10 @@ import { debuggablePromise } from "./debuggable"; const engrpc = require("../proto/engine_grpc_pb.js"); const engproto = require("../proto/engine_pb.js"); +const provproto = require("../proto/provider_pb.js"); const resrpc = require("../proto/resource_grpc_pb.js"); const resproto = require("../proto/resource_pb.js"); +const structproto = require("google-protobuf/google/protobuf/struct_pb.js"); /** * excessiveDebugOutput enables, well, pretty excessive debug output pertaining to resources and properties. @@ -51,7 +53,25 @@ export interface Options { /** * options are the current deployment options being used for this entire session. */ -const options = loadOptions(); +let options = loadOptions(); + + +export function setMockOptions(mockMonitor: any, project?: string, stack?: string, preview?: boolean) { + options = { + project: project || options.project || "project", + stack: stack || options.stack || "stack", + dryRun: preview, + queryMode: options.queryMode, + parallel: options.parallel, + monitorAddr: options.monitorAddr, + engineAddr: options.engineAddr, + testModeEnabled: true, + legacyApply: options.legacyApply, + syncDir: options.syncDir, + }; + + monitor = mockMonitor; +} /** @internal Used only for testing purposes. */ export function _setIsDryRun(val: boolean) { @@ -64,7 +84,7 @@ export function _setIsDryRun(val: boolean) { * and therefore certain output properties will never be resolved. */ export function isDryRun(): boolean { - return options.dryRun === true || isTestModeEnabled(); + return options.dryRun === true; } /** @internal Used only for testing purposes */ diff --git a/sdk/nodejs/tsconfig.json b/sdk/nodejs/tsconfig.json index f82f531da..89c76229c 100644 --- a/sdk/nodejs/tsconfig.json +++ b/sdk/nodejs/tsconfig.json @@ -51,6 +51,7 @@ "runtime/config.ts", "runtime/debuggable.ts", "runtime/invoke.ts", + "runtime/mocks.ts", "runtime/resource.ts", "runtime/rpc.ts", "runtime/settings.ts", diff --git a/sdk/python/lib/pulumi/runtime/__init__.py b/sdk/python/lib/pulumi/runtime/__init__.py index 877444782..39240a9c2 100644 --- a/sdk/python/lib/pulumi/runtime/__init__.py +++ b/sdk/python/lib/pulumi/runtime/__init__.py @@ -23,6 +23,12 @@ from .config import ( get_config_env_key, ) +from .mocks import ( + Mocks, + set_mocks, + test, +) + from .settings import ( Settings, configure, diff --git a/sdk/python/lib/pulumi/runtime/invoke.py b/sdk/python/lib/pulumi/runtime/invoke.py index 13eefa5f1..e78f15026 100644 --- a/sdk/python/lib/pulumi/runtime/invoke.py +++ b/sdk/python/lib/pulumi/runtime/invoke.py @@ -16,13 +16,14 @@ import sys from typing import Any, Awaitable import grpc +from .. import log from ..output import Inputs from ..invoke import InvokeOptions -from .. import log -from .settings import get_monitor from ..runtime.proto import provider_pb2 from . import rpc from .rpc_manager import RPC_MANAGER +from .settings import get_monitor +from .sync_await import _sync_await # This setting overrides a hardcoded maximum protobuf size in the python protobuf bindings. This avoids deserialization # exceptions on large gRPC payloads, but makes it possible to use enough memory to cause an OOM error instead [1]. @@ -39,73 +40,6 @@ if not sys.platform.startswith('win32'): from google.protobuf.pyext._message import SetAllowOversizeProtos # pylint: disable-msg=E0611 SetAllowOversizeProtos(True) -# If we are not running on Python 3.7 or later, we need to swap the Python implementation of Task in for the C -# implementation in order to support synchronous invokes. -if sys.version_info[0] == 3 and sys.version_info[1] < 7: - asyncio.Task = asyncio.tasks._PyTask - asyncio.tasks.Task = asyncio.tasks._PyTask - - def enter_task(loop, task): - task.__class__._current_tasks[loop] = task - - def leave_task(loop, task): - task.__class__._current_tasks.pop(loop) - - _enter_task = enter_task - _leave_task = leave_task -else: - _enter_task = asyncio.tasks._enter_task # type: ignore - _leave_task = asyncio.tasks._leave_task # type: ignore - - -def _sync_await(awaitable: Awaitable[Any]) -> Any: - """ - _sync_await waits for the given future to complete by effectively yielding the current task and pumping the event - loop. - """ - - # Fetch the current event loop and ensure a future. - loop = asyncio.get_event_loop() - fut = asyncio.ensure_future(awaitable) - - # If the loop is not running, we can just use run_until_complete. Without this, we would need to duplicate a fair - # amount of bookkeeping logic around loop startup and shutdown. - if not loop.is_running(): - return loop.run_until_complete(fut) - - # If we are executing inside a task, pretend we've returned from its current callback--effectively yielding to - # the event loop--by calling _leave_task. - task = asyncio.Task.current_task(loop) - if task is not None: - _leave_task(loop, task) - - # Pump the event loop until the future is complete. This is the kernel of BaseEventLoop.run_forever, and may not - # work with alternative event loop implementations. - # - # In order to make this reentrant with respect to _run_once, we keep track of the number of event handles on the - # ready list and ensure that there are exactly that many handles on the list once we are finished. - # - # See https://github.com/python/cpython/blob/3.6/Lib/asyncio/base_events.py#L1428-L1452 for the details of the - # _run_once kernel with which we need to cooperate. - ntodo = len(loop._ready) # type: ignore - while not fut.done() and not fut.cancelled(): - loop._run_once() # type: ignore - if loop._stopping: # type: ignore - break - # If we drained the ready list past what a calling _run_once would have expected, fix things up by pushing - # cancelled handles onto the list. - while len(loop._ready) < ntodo: # type: ignore - handle = asyncio.Handle(lambda: None, [], loop) - handle._cancelled = True - loop._ready.append(handle) # type: ignore - - # If we were executing inside a task, restore its context and continue on. - if task is not None: - _enter_task(loop, task) - - # Return the result of the future. - return fut.result() - class InvokeResult: """ InvokeResult is a helper type that wraps a prompt value in an Awaitable. diff --git a/sdk/python/lib/pulumi/runtime/mocks.py b/sdk/python/lib/pulumi/runtime/mocks.py new file mode 100644 index 000000000..015e0bc0a --- /dev/null +++ b/sdk/python/lib/pulumi/runtime/mocks.py @@ -0,0 +1,164 @@ +# Copyright 2016-2018, Pulumi Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Mocks for testing. +""" +import asyncio +import logging +from abc import ABC, abstractmethod +from typing import Optional, Awaitable, Tuple, Union, Any, TYPE_CHECKING + +import grpc +from google.protobuf import empty_pb2 +from . import rpc +from .settings import Settings, configure, get_stack, get_project +from .sync_await import _sync_await +from ..runtime.proto import engine_pb2, engine_pb2_grpc, provider_pb2, resource_pb2, resource_pb2_grpc +from ..runtime.stack import run_pulumi_func +from ..output import Output + +if TYPE_CHECKING: + from ..resource import Resource + + +loop = None + + +def test(fn): + def wrapper(*args, **kwargs): + asyncio.set_event_loop(loop) + _sync_await(run_pulumi_func(lambda: _sync_await(Output.from_input(fn(*args, **kwargs)).future()))) + return wrapper + +class Mocks(ABC): + """ + Mocks is an abstract class that allows subclasses to replace operations normally implemented by the Pulumi engine with + their own implementations. This can be used during testing to ensure that calls to provider functions and resource constructors + return predictable values. + """ + @abstractmethod + def call(self, token: str, args: dict, provider: Optional[str]) -> dict: + """ + call mocks provider-implemented function calls (e.g. aws.get_availability_zones). + + :param str token: The token that indicates which function is being called. This token is of the form "package:module:function". + :param dict args: The arguments provided to the function call. + :param Optional[str] provider: If provided, the identifier of the provider instance being used to make the call. + """ + return {} + + @abstractmethod + def new_resource(self, type_: str, name: str, inputs: dict, provider: Optional[str], id_: Optional[str]) -> Tuple[str, dict]: + """ + new_resource mocks resource construction calls. This function should return the physical identifier and the output properties + for the resource being constructed. + + :param str type_: The token that indicates which resource type is being constructed. This token is of the form "package:module:type". + :param str name: The logical name of the resource instance. + :param dict inputs: The inputs for the resource. + :param Optional[str] provider: If provided, the identifier of the provider instnace being used to manage this resource. + :param Optional[str] id_: If provided, the physical identifier of an existing resource to read or import. + """ + return ("", {}) + + +class MockMonitor: + mocks: Mocks + + def __init__(self, mocks: Mocks): + self.mocks = mocks + + def make_urn(self, parent: str, type_: str, name: str) -> str: + if parent != "": + qualifiedType = parent.split("::")[2] + parentType = qualifiedType.split("$").pop() + type_ = parentType + "$" + type_ + + return "urn:pulumi:" + "::".join([get_stack(), get_project(), type_, name]) + + def Invoke(self, request): + args = rpc.deserialize_properties(request.args) + + ret = self.mocks.call(request.tok, args, request.provider) + + asyncio.set_event_loop(loop) + ret_proto = _sync_await(asyncio.ensure_future(rpc.serialize_properties(ret, {}))) + + fields = {"failures": None, "return": ret_proto} + return provider_pb2.InvokeResponse(**fields) + + def ReadResource(self, request): + state = rpc.deserialize_properties(request.properties) + + _, state = self.mocks.new_resource(request.type, request.name, state, request.provider, request.id) + + asyncio.set_event_loop(loop) + props_proto = _sync_await(asyncio.ensure_future(rpc.serialize_properties(state, {}))) + + urn = self.make_urn(request.parent, request.type, request.name) + return resource_pb2.ReadResourceResponse(urn=urn, properties=props_proto) + + def RegisterResource(self, request): + inputs = rpc.deserialize_properties(request.object) + + id_, state = self.mocks.new_resource(request.type, request.name, inputs, request.provider, request.importId) + + asyncio.set_event_loop(loop) + obj_proto = _sync_await(rpc.serialize_properties(state, {})) + + urn = self.make_urn(request.parent, request.type, request.name) + return resource_pb2.RegisterResourceResponse(urn=urn, id=id_, object=obj_proto) + + def RegisterResourceOutputs(self, request): + #pylint: disable=unused-argument + return empty_pb2.Empty() + + +class MockEngine: + logger: logging.Logger + + def __init__(self, logger: Optional[logging.Logger]): + self.logger = logger if logger is not None else logging.getLogger() + + def Log(self, request): + if request.severity == engine_pb2.DEBUG: + self.logger.debug(request.message) + elif request.severity == engine_pb2.INFO: + self.logger.info(request.message) + elif request.severity == engine_pb2.WARNING: + self.logger.warning(request.message) + elif request.severity == engine_pb2.ERROR: + self.logger.error(request.message) + + +def set_mocks(mocks: Mocks, + project: Optional[str] = None, + stack: Optional[str] = None, + preview: Optional[bool] = None, + logger: Optional[logging.Logger] = None): + """ + set_mocks configures the Pulumi runtime to use the given mocks for testing. + """ + settings = Settings(monitor=MockMonitor(mocks), + engine=MockEngine(logger), + project=project if project is not None else 'project', + stack=stack if stack is not None else 'stack', + dry_run=preview, + test_mode_enabled=True) + configure(settings) + + # Make sure we have an event loop. + global loop + loop = asyncio.get_event_loop() diff --git a/sdk/python/lib/pulumi/runtime/settings.py b/sdk/python/lib/pulumi/runtime/settings.py index 4810a9b7d..62ee77a3e 100644 --- a/sdk/python/lib/pulumi/runtime/settings.py +++ b/sdk/python/lib/pulumi/runtime/settings.py @@ -18,7 +18,7 @@ Runtime settings and configuration. import asyncio import os import sys -from typing import Optional, Awaitable, TYPE_CHECKING +from typing import Optional, Awaitable, Union, Any, TYPE_CHECKING import grpc from ..runtime.proto import engine_pb2_grpc, resource_pb2, resource_pb2_grpc @@ -27,10 +27,9 @@ from ..errors import RunError if TYPE_CHECKING: from ..resource import Resource - class Settings: - monitor: Optional[resource_pb2_grpc.ResourceMonitorStub] - engine: Optional[engine_pb2_grpc.EngineStub] + monitor: Optional[Union[resource_pb2_grpc.ResourceMonitorStub, Any]] + engine: Optional[Union[engine_pb2_grpc.EngineStub, Any]] project: Optional[str] stack: Optional[str] parallel: Optional[str] @@ -42,8 +41,8 @@ class Settings: A bag of properties for configuring the Pulumi Python language runtime. """ def __init__(self, - monitor: Optional[str] = None, - engine: Optional[str] = None, + monitor: Optional[Union[str, Any]] = None, + engine: Optional[Union[str, Any]] = None, project: Optional[str] = None, stack: Optional[str] = None, parallel: Optional[str] = None, @@ -65,12 +64,18 @@ class Settings: self.legacy_apply_enabled = os.getenv("PULUMI_ENABLE_LEGACY_APPLY", "false") == "true" # Actually connect to the monitor/engine over gRPC. - if monitor: - self.monitor = resource_pb2_grpc.ResourceMonitorStub(grpc.insecure_channel(monitor)) + if monitor is not None: + if isinstance(monitor, str): + self.monitor = resource_pb2_grpc.ResourceMonitorStub(grpc.insecure_channel(monitor)) + else: + self.monitor = monitor else: self.monitor = None if engine: - self.engine = engine_pb2_grpc.EngineStub(grpc.insecure_channel(engine)) + if isinstance(engine, str): + self.engine = engine_pb2_grpc.EngineStub(grpc.insecure_channel(engine)) + else: + self.engine = engine else: self.engine = None @@ -153,7 +158,7 @@ def _set_stack(v: Optional[str]): SETTINGS.stack = v -def get_monitor() -> Optional[resource_pb2_grpc.ResourceMonitorStub]: +def get_monitor() -> Optional[Union[resource_pb2_grpc.ResourceMonitorStub, Any]]: """ Returns the current resource monitoring service client for RPC communications. """ @@ -163,7 +168,7 @@ def get_monitor() -> Optional[resource_pb2_grpc.ResourceMonitorStub]: return monitor -def get_engine() -> Optional[engine_pb2_grpc.EngineStub]: +def get_engine() -> Optional[Union[engine_pb2_grpc.EngineStub, Any]]: """ Returns the current engine service client for RPC communications. """ diff --git a/sdk/python/lib/pulumi/runtime/stack.py b/sdk/python/lib/pulumi/runtime/stack.py index c42d0ca1a..24d756547 100644 --- a/sdk/python/lib/pulumi/runtime/stack.py +++ b/sdk/python/lib/pulumi/runtime/stack.py @@ -28,14 +28,9 @@ from . import known_types from ..output import Output -async def run_in_stack(func: Callable): - """ - Run the given function inside of a new stack resource. This ensures that any stack export calls - will end up as output properties on the resulting stack component in the checkpoint file. This - is meant for internal runtime use only and is used by the Python SDK entrypoint program. - """ +async def run_pulumi_func(func: Callable): try: - Stack(func) + func() finally: log.debug("Waiting for outstanding RPCs to complete") @@ -71,11 +66,19 @@ async def run_in_stack(func: Callable): await asyncio.sleep(0) # Once we get scheduled again, all tasks have exited and we're good to go. - log.debug("run_in_stack completed") + log.debug("run_pulumi_func completed") if RPC_MANAGER.unhandled_exception is not None: raise RPC_MANAGER.unhandled_exception.with_traceback(RPC_MANAGER.exception_traceback) +async def run_in_stack(func: Callable): + """ + Run the given function inside of a new stack resource. This ensures that any stack export calls + will end up as output properties on the resulting stack component in the checkpoint file. This + is meant for internal runtime use only and is used by the Python SDK entrypoint program. + """ + await run_pulumi_func(lambda: Stack(func)) + @known_types.stack class Stack(ComponentResource): """ diff --git a/sdk/python/lib/pulumi/runtime/sync_await.py b/sdk/python/lib/pulumi/runtime/sync_await.py new file mode 100644 index 000000000..b45a9b4a2 --- /dev/null +++ b/sdk/python/lib/pulumi/runtime/sync_await.py @@ -0,0 +1,83 @@ +# Copyright 2016-2018, Pulumi Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio +import sys +from typing import Any, Awaitable + +# If we are not running on Python 3.7 or later, we need to swap the Python implementation of Task in for the C +# implementation in order to support synchronous invokes. +if sys.version_info[0] == 3 and sys.version_info[1] < 7: + asyncio.Task = asyncio.tasks._PyTask + asyncio.tasks.Task = asyncio.tasks._PyTask + + def enter_task(loop, task): + task.__class__._current_tasks[loop] = task + + def leave_task(loop, task): + task.__class__._current_tasks.pop(loop) + + _enter_task = enter_task + _leave_task = leave_task +else: + _enter_task = asyncio.tasks._enter_task # type: ignore + _leave_task = asyncio.tasks._leave_task # type: ignore + + +def _sync_await(awaitable: Awaitable[Any]) -> Any: + """ + _sync_await waits for the given future to complete by effectively yielding the current task and pumping the event + loop. + """ + + # Fetch the current event loop and ensure a future. + loop = asyncio.get_event_loop() + fut = asyncio.ensure_future(awaitable) + + # If the loop is not running, we can just use run_until_complete. Without this, we would need to duplicate a fair + # amount of bookkeeping logic around loop startup and shutdown. + if not loop.is_running(): + return loop.run_until_complete(fut) + + # If we are executing inside a task, pretend we've returned from its current callback--effectively yielding to + # the event loop--by calling _leave_task. + task = asyncio.Task.current_task(loop) + if task is not None: + _leave_task(loop, task) + + # Pump the event loop until the future is complete. This is the kernel of BaseEventLoop.run_forever, and may not + # work with alternative event loop implementations. + # + # In order to make this reentrant with respect to _run_once, we keep track of the number of event handles on the + # ready list and ensure that there are exactly that many handles on the list once we are finished. + # + # See https://github.com/python/cpython/blob/3.6/Lib/asyncio/base_events.py#L1428-L1452 for the details of the + # _run_once kernel with which we need to cooperate. + ntodo = len(loop._ready) # type: ignore + while not fut.done() and not fut.cancelled(): + loop._run_once() # type: ignore + if loop._stopping: # type: ignore + break + # If we drained the ready list past what a calling _run_once would have expected, fix things up by pushing + # cancelled handles onto the list. + while len(loop._ready) < ntodo: # type: ignore + handle = asyncio.Handle(lambda: None, [], loop) + handle._cancelled = True + loop._ready.append(handle) # type: ignore + + # If we were executing inside a task, restore its context and continue on. + if task is not None: + _enter_task(loop, task) + + # Return the result of the future. + return fut.result()