Add support for streamInvoke during update (#4990)

Previously, streamInvoke was only supported by
the query command. Copied the implementation
into the resource monitor, which will allow
streaming invoke commands to run during updates.

Also fixed a bug with cancellation of streaming
invokes. The check was comparing against a
hardcoded string, which did not match the actual
error string. Instead, we can rely on the error code.
This commit is contained in:
Levi Blackstone 2020-07-10 10:56:35 -06:00 committed by GitHub
parent 89f3a32db3
commit 736019f7ce
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 49 additions and 3 deletions

View file

@ -2,7 +2,9 @@ CHANGELOG
=========
## HEAD (Unreleased)
_(none)_
- Add support for streamInvoke during update
[#4990](https://github.com/pulumi/pulumi/pull/4990)
## 2.6.1 (2020-07-09)

View file

@ -582,7 +582,51 @@ func (rm *resmon) Invoke(ctx context.Context, req *pulumirpc.InvokeRequest) (*pu
func (rm *resmon) StreamInvoke(
req *pulumirpc.InvokeRequest, stream pulumirpc.ResourceMonitor_StreamInvokeServer) error {
return fmt.Errorf("the resource monitor does not implement streaming invokes")
tok := tokens.ModuleMember(req.GetTok())
label := fmt.Sprintf("ResourceMonitor.StreamInvoke(%s)", tok)
providerReq, err := parseProviderRequest(tok.Package(), req.GetVersion())
if err != nil {
return err
}
prov, err := getProviderFromSource(rm.providers, rm.defaultProviders, providerReq, req.GetProvider())
if err != nil {
return err
}
args, err := plugin.UnmarshalProperties(
req.GetArgs(), plugin.MarshalOptions{Label: label, KeepUnknowns: true})
if err != nil {
return errors.Wrapf(err, "failed to unmarshal %v args", tok)
}
// Synchronously do the StreamInvoke and then return the arguments. This will block until the
// streaming operation completes!
logging.V(5).Infof("ResourceMonitor.StreamInvoke received: tok=%v #args=%v", tok, len(args))
failures, err := prov.StreamInvoke(tok, args, func(event resource.PropertyMap) error {
mret, err := plugin.MarshalProperties(event, plugin.MarshalOptions{Label: label, KeepUnknowns: true})
if err != nil {
return errors.Wrapf(err, "failed to marshal return")
}
return stream.Send(&pulumirpc.InvokeResponse{Return: mret})
})
if err != nil {
return errors.Wrapf(err, "streaming invocation of %v returned an error", tok)
}
var chkfails []*pulumirpc.CheckFailure
for _, failure := range failures {
chkfails = append(chkfails, &pulumirpc.CheckFailure{
Property: string(failure.Property),
Reason: failure.Reason,
})
}
if len(chkfails) > 0 {
return stream.Send(&pulumirpc.InvokeResponse{Failures: chkfails})
}
return nil
}
// ReadResource reads the current state associated with a resource from its provider plugin.

View file

@ -104,7 +104,7 @@ export async function streamInvoke(
queue.push(live);
});
call.on("error", (err: any) => {
if (err.code === 1 && err.details === "Cancelled") {
if (err.code === 1) {
return;
}
throw err;