Mock resource monitor (#3738)

These changes add support for mocking the resource monitor to the NodeJS
and Python SDKs. The proposed mock interface is a simplified version of
the standard resource monitor that allows an end-user to replace the
usual implementations of ReadResource/RegisterResource and Invoke with
their own. This can be used in unit tests to allow for precise control
of resource outputs and invoke results.
This commit is contained in:
Pat Gavlin 2020-02-28 17:22:50 -08:00 committed by GitHub
parent 9d1edad65c
commit 682dced40b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 438 additions and 90 deletions

View file

@ -1,9 +1,13 @@
CHANGELOG
=========
## HEAD (unreleased)
- Fix missing module import on Windows platform.
[#3983](https://github.com/pulumi/pulumi/pull/3983)
- Add support for mocking the resource monitor to the NodeJS and Python SDKs.
[#3738](https://github.com/pulumi/pulumi/pull/3738/files)
## 1.11.1 (2020-02-26)
- Fix a regression for CustomTimeouts in Python SDK.
[#3964](https://github.com/pulumi/pulumi/pull/3964)

View file

@ -20,6 +20,7 @@ export {
} from "./closure/serializeClosure";
export { CodePathOptions, computeCodePaths } from "./closure/codePaths";
export { Mocks, setMocks } from "./mocks";
export * from "./config";
export * from "./invoke";

127
sdk/nodejs/runtime/mocks.ts Normal file
View file

@ -0,0 +1,127 @@
// Copyright 2016-2018, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { deserializeProperties, serializeProperties } from "./rpc";
import { getProject, getStack, setMockOptions } from "./settings";
const provproto = require("../proto/provider_pb.js");
const resproto = require("../proto/resource_pb.js");
const structproto = require("google-protobuf/google/protobuf/struct_pb.js");
/**
* Mocks is an abstract class that allows subclasses to replace operations normally implemented by the Pulumi engine with
* their own implementations. This can be used during testing to ensure that calls to provider functions and resource constructors
* return predictable values.
*/
export interface Mocks {
/**
* call mocks provider-implemented function calls (e.g. aws.get_availability_zones).
*
* @param token: The token that indicates which function is being called. This token is of the form "package:module:function".
* @param args: The arguments provided to the function call.
* @param provider: If provided, the identifier of the provider instance being used to make the call.
*/
call(token: string, args: any, provider?: string): Record<string, any>;
/**
* new_resource mocks resource construction calls. This function should return the physical identifier and the output properties
* for the resource being constructed.
*
* @param type_: The token that indicates which resource type is being constructed. This token is of the form "package:module:type".
* @param name: The logical name of the resource instance.
* @param inputs: The inputs for the resource.
* @param provider: If provided, the identifier of the provider instnace being used to manage this resource.
* @param id_: If provided, the physical identifier of an existing resource to read or import.
*/
newResource(type: string, name: string, inputs: any, provider?: string, id?: string): { id: string, state: Record<string, any> };
}
export class MockMonitor {
mocks: Mocks;
constructor(mocks: Mocks) {
this.mocks = mocks;
}
private newUrn(parent: string, type: string, name: string): string {
if (parent) {
const qualifiedType = parent.split("::")[2];
const parentType = qualifiedType.split("$").pop();
type = parentType + "$" + type;
}
return "urn:pulumi:" + [getStack(), getProject(), type, name].join("::");
}
public async invoke(req: any, callback: (err: any, innerResponse: any) => void) {
try {
const result = this.mocks.call(req.getTok(), deserializeProperties(req.getArgs()), req.getProvider());
const response = new provproto.InvokeResponse();
response.setReturn(structproto.Struct.fromJavaScript(await serializeProperties("", result)));
callback(null, response);
} catch (err) {
callback(err, undefined);
}
}
public async readResource(req: any, callback: (err: any, innterResponse: any) => void) {
try {
const result = this.mocks.newResource(
req.getType(),
req.getName(),
deserializeProperties(req.getProperties()),
req.getProvider(),
req.getId());
const response = new resproto.ReadResourceResponse();
response.setUrn(this.newUrn(req.getParent(), req.getType(), req.getName()));
response.setProperties(structproto.Struct.fromJavaScript(await serializeProperties("", result.state)));
callback(null, response);
} catch (err) {
callback(err, undefined);
}
}
public async registerResource(req: any, callback: (err: any, innerResponse: any) => void) {
try {
const result = this.mocks.newResource(
req.getType(),
req.getName(),
deserializeProperties(req.getObject()),
req.getProvider(),
req.getImportid());
const response = new resproto.RegisterResourceResponse();
response.setUrn(this.newUrn(req.getParent(), req.getType(), req.getName()));
response.setId(result.id);
response.setObject(structproto.Struct.fromJavaScript(await serializeProperties("", result.state)));
callback(null, response);
} catch (err) {
callback(err, undefined);
}
}
public registerResourceOutputs(req: any, callback: (err: any, innerResponse: any) => void) {
callback(null, {});
}
}
/**
* setMocks configures the Pulumi runtime to use the given mocks for testing.
*
* @param mocks: The mocks to use for calls to provider functions and resource consrtuction.
* @param project: If provided, the name of the Pulumi project. Defaults to "project".
* @param stack: If provided, the name of the Pulumi stack. Defaults to "stack".
* @param preview: If provided, indicates whether or not the program is running a preview. Defaults to false.
*/
export function setMocks(mocks: Mocks, project?: string, stack?: string, preview?: boolean) {
setMockOptions(new MockMonitor(mocks), project, stack, preview);
}

View file

@ -20,8 +20,10 @@ import { debuggablePromise } from "./debuggable";
const engrpc = require("../proto/engine_grpc_pb.js");
const engproto = require("../proto/engine_pb.js");
const provproto = require("../proto/provider_pb.js");
const resrpc = require("../proto/resource_grpc_pb.js");
const resproto = require("../proto/resource_pb.js");
const structproto = require("google-protobuf/google/protobuf/struct_pb.js");
/**
* excessiveDebugOutput enables, well, pretty excessive debug output pertaining to resources and properties.
@ -51,7 +53,25 @@ export interface Options {
/**
* options are the current deployment options being used for this entire session.
*/
const options = loadOptions();
let options = loadOptions();
export function setMockOptions(mockMonitor: any, project?: string, stack?: string, preview?: boolean) {
options = {
project: project || options.project || "project",
stack: stack || options.stack || "stack",
dryRun: preview,
queryMode: options.queryMode,
parallel: options.parallel,
monitorAddr: options.monitorAddr,
engineAddr: options.engineAddr,
testModeEnabled: true,
legacyApply: options.legacyApply,
syncDir: options.syncDir,
};
monitor = mockMonitor;
}
/** @internal Used only for testing purposes. */
export function _setIsDryRun(val: boolean) {
@ -64,7 +84,7 @@ export function _setIsDryRun(val: boolean) {
* and therefore certain output properties will never be resolved.
*/
export function isDryRun(): boolean {
return options.dryRun === true || isTestModeEnabled();
return options.dryRun === true;
}
/** @internal Used only for testing purposes */

View file

@ -51,6 +51,7 @@
"runtime/config.ts",
"runtime/debuggable.ts",
"runtime/invoke.ts",
"runtime/mocks.ts",
"runtime/resource.ts",
"runtime/rpc.ts",
"runtime/settings.ts",

View file

@ -23,6 +23,12 @@ from .config import (
get_config_env_key,
)
from .mocks import (
Mocks,
set_mocks,
test,
)
from .settings import (
Settings,
configure,

View file

@ -16,13 +16,14 @@ import sys
from typing import Any, Awaitable
import grpc
from .. import log
from ..output import Inputs
from ..invoke import InvokeOptions
from .. import log
from .settings import get_monitor
from ..runtime.proto import provider_pb2
from . import rpc
from .rpc_manager import RPC_MANAGER
from .settings import get_monitor
from .sync_await import _sync_await
# This setting overrides a hardcoded maximum protobuf size in the python protobuf bindings. This avoids deserialization
# exceptions on large gRPC payloads, but makes it possible to use enough memory to cause an OOM error instead [1].
@ -39,73 +40,6 @@ if not sys.platform.startswith('win32'):
from google.protobuf.pyext._message import SetAllowOversizeProtos # pylint: disable-msg=E0611
SetAllowOversizeProtos(True)
# If we are not running on Python 3.7 or later, we need to swap the Python implementation of Task in for the C
# implementation in order to support synchronous invokes.
if sys.version_info[0] == 3 and sys.version_info[1] < 7:
asyncio.Task = asyncio.tasks._PyTask
asyncio.tasks.Task = asyncio.tasks._PyTask
def enter_task(loop, task):
task.__class__._current_tasks[loop] = task
def leave_task(loop, task):
task.__class__._current_tasks.pop(loop)
_enter_task = enter_task
_leave_task = leave_task
else:
_enter_task = asyncio.tasks._enter_task # type: ignore
_leave_task = asyncio.tasks._leave_task # type: ignore
def _sync_await(awaitable: Awaitable[Any]) -> Any:
"""
_sync_await waits for the given future to complete by effectively yielding the current task and pumping the event
loop.
"""
# Fetch the current event loop and ensure a future.
loop = asyncio.get_event_loop()
fut = asyncio.ensure_future(awaitable)
# If the loop is not running, we can just use run_until_complete. Without this, we would need to duplicate a fair
# amount of bookkeeping logic around loop startup and shutdown.
if not loop.is_running():
return loop.run_until_complete(fut)
# If we are executing inside a task, pretend we've returned from its current callback--effectively yielding to
# the event loop--by calling _leave_task.
task = asyncio.Task.current_task(loop)
if task is not None:
_leave_task(loop, task)
# Pump the event loop until the future is complete. This is the kernel of BaseEventLoop.run_forever, and may not
# work with alternative event loop implementations.
#
# In order to make this reentrant with respect to _run_once, we keep track of the number of event handles on the
# ready list and ensure that there are exactly that many handles on the list once we are finished.
#
# See https://github.com/python/cpython/blob/3.6/Lib/asyncio/base_events.py#L1428-L1452 for the details of the
# _run_once kernel with which we need to cooperate.
ntodo = len(loop._ready) # type: ignore
while not fut.done() and not fut.cancelled():
loop._run_once() # type: ignore
if loop._stopping: # type: ignore
break
# If we drained the ready list past what a calling _run_once would have expected, fix things up by pushing
# cancelled handles onto the list.
while len(loop._ready) < ntodo: # type: ignore
handle = asyncio.Handle(lambda: None, [], loop)
handle._cancelled = True
loop._ready.append(handle) # type: ignore
# If we were executing inside a task, restore its context and continue on.
if task is not None:
_enter_task(loop, task)
# Return the result of the future.
return fut.result()
class InvokeResult:
"""
InvokeResult is a helper type that wraps a prompt value in an Awaitable.

View file

@ -0,0 +1,164 @@
# Copyright 2016-2018, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Mocks for testing.
"""
import asyncio
import logging
from abc import ABC, abstractmethod
from typing import Optional, Awaitable, Tuple, Union, Any, TYPE_CHECKING
import grpc
from google.protobuf import empty_pb2
from . import rpc
from .settings import Settings, configure, get_stack, get_project
from .sync_await import _sync_await
from ..runtime.proto import engine_pb2, engine_pb2_grpc, provider_pb2, resource_pb2, resource_pb2_grpc
from ..runtime.stack import run_pulumi_func
from ..output import Output
if TYPE_CHECKING:
from ..resource import Resource
loop = None
def test(fn):
def wrapper(*args, **kwargs):
asyncio.set_event_loop(loop)
_sync_await(run_pulumi_func(lambda: _sync_await(Output.from_input(fn(*args, **kwargs)).future())))
return wrapper
class Mocks(ABC):
"""
Mocks is an abstract class that allows subclasses to replace operations normally implemented by the Pulumi engine with
their own implementations. This can be used during testing to ensure that calls to provider functions and resource constructors
return predictable values.
"""
@abstractmethod
def call(self, token: str, args: dict, provider: Optional[str]) -> dict:
"""
call mocks provider-implemented function calls (e.g. aws.get_availability_zones).
:param str token: The token that indicates which function is being called. This token is of the form "package:module:function".
:param dict args: The arguments provided to the function call.
:param Optional[str] provider: If provided, the identifier of the provider instance being used to make the call.
"""
return {}
@abstractmethod
def new_resource(self, type_: str, name: str, inputs: dict, provider: Optional[str], id_: Optional[str]) -> Tuple[str, dict]:
"""
new_resource mocks resource construction calls. This function should return the physical identifier and the output properties
for the resource being constructed.
:param str type_: The token that indicates which resource type is being constructed. This token is of the form "package:module:type".
:param str name: The logical name of the resource instance.
:param dict inputs: The inputs for the resource.
:param Optional[str] provider: If provided, the identifier of the provider instnace being used to manage this resource.
:param Optional[str] id_: If provided, the physical identifier of an existing resource to read or import.
"""
return ("", {})
class MockMonitor:
mocks: Mocks
def __init__(self, mocks: Mocks):
self.mocks = mocks
def make_urn(self, parent: str, type_: str, name: str) -> str:
if parent != "":
qualifiedType = parent.split("::")[2]
parentType = qualifiedType.split("$").pop()
type_ = parentType + "$" + type_
return "urn:pulumi:" + "::".join([get_stack(), get_project(), type_, name])
def Invoke(self, request):
args = rpc.deserialize_properties(request.args)
ret = self.mocks.call(request.tok, args, request.provider)
asyncio.set_event_loop(loop)
ret_proto = _sync_await(asyncio.ensure_future(rpc.serialize_properties(ret, {})))
fields = {"failures": None, "return": ret_proto}
return provider_pb2.InvokeResponse(**fields)
def ReadResource(self, request):
state = rpc.deserialize_properties(request.properties)
_, state = self.mocks.new_resource(request.type, request.name, state, request.provider, request.id)
asyncio.set_event_loop(loop)
props_proto = _sync_await(asyncio.ensure_future(rpc.serialize_properties(state, {})))
urn = self.make_urn(request.parent, request.type, request.name)
return resource_pb2.ReadResourceResponse(urn=urn, properties=props_proto)
def RegisterResource(self, request):
inputs = rpc.deserialize_properties(request.object)
id_, state = self.mocks.new_resource(request.type, request.name, inputs, request.provider, request.importId)
asyncio.set_event_loop(loop)
obj_proto = _sync_await(rpc.serialize_properties(state, {}))
urn = self.make_urn(request.parent, request.type, request.name)
return resource_pb2.RegisterResourceResponse(urn=urn, id=id_, object=obj_proto)
def RegisterResourceOutputs(self, request):
#pylint: disable=unused-argument
return empty_pb2.Empty()
class MockEngine:
logger: logging.Logger
def __init__(self, logger: Optional[logging.Logger]):
self.logger = logger if logger is not None else logging.getLogger()
def Log(self, request):
if request.severity == engine_pb2.DEBUG:
self.logger.debug(request.message)
elif request.severity == engine_pb2.INFO:
self.logger.info(request.message)
elif request.severity == engine_pb2.WARNING:
self.logger.warning(request.message)
elif request.severity == engine_pb2.ERROR:
self.logger.error(request.message)
def set_mocks(mocks: Mocks,
project: Optional[str] = None,
stack: Optional[str] = None,
preview: Optional[bool] = None,
logger: Optional[logging.Logger] = None):
"""
set_mocks configures the Pulumi runtime to use the given mocks for testing.
"""
settings = Settings(monitor=MockMonitor(mocks),
engine=MockEngine(logger),
project=project if project is not None else 'project',
stack=stack if stack is not None else 'stack',
dry_run=preview,
test_mode_enabled=True)
configure(settings)
# Make sure we have an event loop.
global loop
loop = asyncio.get_event_loop()

View file

@ -18,7 +18,7 @@ Runtime settings and configuration.
import asyncio
import os
import sys
from typing import Optional, Awaitable, TYPE_CHECKING
from typing import Optional, Awaitable, Union, Any, TYPE_CHECKING
import grpc
from ..runtime.proto import engine_pb2_grpc, resource_pb2, resource_pb2_grpc
@ -27,10 +27,9 @@ from ..errors import RunError
if TYPE_CHECKING:
from ..resource import Resource
class Settings:
monitor: Optional[resource_pb2_grpc.ResourceMonitorStub]
engine: Optional[engine_pb2_grpc.EngineStub]
monitor: Optional[Union[resource_pb2_grpc.ResourceMonitorStub, Any]]
engine: Optional[Union[engine_pb2_grpc.EngineStub, Any]]
project: Optional[str]
stack: Optional[str]
parallel: Optional[str]
@ -42,8 +41,8 @@ class Settings:
A bag of properties for configuring the Pulumi Python language runtime.
"""
def __init__(self,
monitor: Optional[str] = None,
engine: Optional[str] = None,
monitor: Optional[Union[str, Any]] = None,
engine: Optional[Union[str, Any]] = None,
project: Optional[str] = None,
stack: Optional[str] = None,
parallel: Optional[str] = None,
@ -65,12 +64,18 @@ class Settings:
self.legacy_apply_enabled = os.getenv("PULUMI_ENABLE_LEGACY_APPLY", "false") == "true"
# Actually connect to the monitor/engine over gRPC.
if monitor:
self.monitor = resource_pb2_grpc.ResourceMonitorStub(grpc.insecure_channel(monitor))
if monitor is not None:
if isinstance(monitor, str):
self.monitor = resource_pb2_grpc.ResourceMonitorStub(grpc.insecure_channel(monitor))
else:
self.monitor = monitor
else:
self.monitor = None
if engine:
self.engine = engine_pb2_grpc.EngineStub(grpc.insecure_channel(engine))
if isinstance(engine, str):
self.engine = engine_pb2_grpc.EngineStub(grpc.insecure_channel(engine))
else:
self.engine = engine
else:
self.engine = None
@ -153,7 +158,7 @@ def _set_stack(v: Optional[str]):
SETTINGS.stack = v
def get_monitor() -> Optional[resource_pb2_grpc.ResourceMonitorStub]:
def get_monitor() -> Optional[Union[resource_pb2_grpc.ResourceMonitorStub, Any]]:
"""
Returns the current resource monitoring service client for RPC communications.
"""
@ -163,7 +168,7 @@ def get_monitor() -> Optional[resource_pb2_grpc.ResourceMonitorStub]:
return monitor
def get_engine() -> Optional[engine_pb2_grpc.EngineStub]:
def get_engine() -> Optional[Union[engine_pb2_grpc.EngineStub, Any]]:
"""
Returns the current engine service client for RPC communications.
"""

View file

@ -28,14 +28,9 @@ from . import known_types
from ..output import Output
async def run_in_stack(func: Callable):
"""
Run the given function inside of a new stack resource. This ensures that any stack export calls
will end up as output properties on the resulting stack component in the checkpoint file. This
is meant for internal runtime use only and is used by the Python SDK entrypoint program.
"""
async def run_pulumi_func(func: Callable):
try:
Stack(func)
func()
finally:
log.debug("Waiting for outstanding RPCs to complete")
@ -71,11 +66,19 @@ async def run_in_stack(func: Callable):
await asyncio.sleep(0)
# Once we get scheduled again, all tasks have exited and we're good to go.
log.debug("run_in_stack completed")
log.debug("run_pulumi_func completed")
if RPC_MANAGER.unhandled_exception is not None:
raise RPC_MANAGER.unhandled_exception.with_traceback(RPC_MANAGER.exception_traceback)
async def run_in_stack(func: Callable):
"""
Run the given function inside of a new stack resource. This ensures that any stack export calls
will end up as output properties on the resulting stack component in the checkpoint file. This
is meant for internal runtime use only and is used by the Python SDK entrypoint program.
"""
await run_pulumi_func(lambda: Stack(func))
@known_types.stack
class Stack(ComponentResource):
"""

View file

@ -0,0 +1,83 @@
# Copyright 2016-2018, Pulumi Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import sys
from typing import Any, Awaitable
# If we are not running on Python 3.7 or later, we need to swap the Python implementation of Task in for the C
# implementation in order to support synchronous invokes.
if sys.version_info[0] == 3 and sys.version_info[1] < 7:
asyncio.Task = asyncio.tasks._PyTask
asyncio.tasks.Task = asyncio.tasks._PyTask
def enter_task(loop, task):
task.__class__._current_tasks[loop] = task
def leave_task(loop, task):
task.__class__._current_tasks.pop(loop)
_enter_task = enter_task
_leave_task = leave_task
else:
_enter_task = asyncio.tasks._enter_task # type: ignore
_leave_task = asyncio.tasks._leave_task # type: ignore
def _sync_await(awaitable: Awaitable[Any]) -> Any:
"""
_sync_await waits for the given future to complete by effectively yielding the current task and pumping the event
loop.
"""
# Fetch the current event loop and ensure a future.
loop = asyncio.get_event_loop()
fut = asyncio.ensure_future(awaitable)
# If the loop is not running, we can just use run_until_complete. Without this, we would need to duplicate a fair
# amount of bookkeeping logic around loop startup and shutdown.
if not loop.is_running():
return loop.run_until_complete(fut)
# If we are executing inside a task, pretend we've returned from its current callback--effectively yielding to
# the event loop--by calling _leave_task.
task = asyncio.Task.current_task(loop)
if task is not None:
_leave_task(loop, task)
# Pump the event loop until the future is complete. This is the kernel of BaseEventLoop.run_forever, and may not
# work with alternative event loop implementations.
#
# In order to make this reentrant with respect to _run_once, we keep track of the number of event handles on the
# ready list and ensure that there are exactly that many handles on the list once we are finished.
#
# See https://github.com/python/cpython/blob/3.6/Lib/asyncio/base_events.py#L1428-L1452 for the details of the
# _run_once kernel with which we need to cooperate.
ntodo = len(loop._ready) # type: ignore
while not fut.done() and not fut.cancelled():
loop._run_once() # type: ignore
if loop._stopping: # type: ignore
break
# If we drained the ready list past what a calling _run_once would have expected, fix things up by pushing
# cancelled handles onto the list.
while len(loop._ready) < ntodo: # type: ignore
handle = asyncio.Handle(lambda: None, [], loop)
handle._cancelled = True
loop._ready.append(handle) # type: ignore
# If we were executing inside a task, restore its context and continue on.
if task is not None:
_enter_task(loop, task)
# Return the result of the future.
return fut.result()