Emit reads for external resources when refreshing (#1749)
* Emit reads for external resources when refreshing Fixes pulumi/pulumi#1744. This commit educates the refresh source about external resources. If a refresh source encounters a resource with the External bit set, it'll send a Read event to the engine and the engine will process it accordingly. * CR: save last event channel instead of last event, style fixes
This commit is contained in:
parent
80569fb5f8
commit
623e5f1be2
|
@ -119,6 +119,11 @@ func (j *Journal) Snap(base *deploy.Snapshot) *deploy.Snapshot {
|
|||
dones[e.Step.Old()] = true
|
||||
case deploy.OpReplace:
|
||||
// do nothing.
|
||||
case deploy.OpRead, deploy.OpReadReplacement:
|
||||
resources = append(resources, e.Step.New())
|
||||
if e.Step.Old() != nil {
|
||||
dones[e.Step.Old()] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -982,3 +987,45 @@ func TestParallelRefresh(t *testing.T) {
|
|||
assert.Equal(t, string(snap.Resources[3].URN.Name()), "resC")
|
||||
assert.Equal(t, string(snap.Resources[4].URN.Name()), "resD")
|
||||
}
|
||||
|
||||
func TestExternalRefresh(t *testing.T) {
|
||||
loaders := []*deploytest.ProviderLoader{
|
||||
deploytest.NewProviderLoader("pkgA", semver.MustParse("1.0.0"), func() (plugin.Provider, error) {
|
||||
return &deploytest.Provider{}, nil
|
||||
}),
|
||||
}
|
||||
|
||||
// Our program reads a resource and exits.
|
||||
program := deploytest.NewLanguageRuntime(func(_ plugin.RunInfo, monitor *deploytest.ResourceMonitor) error {
|
||||
_, _, err := monitor.ReadResource("pkgA:m:typA", "resA", "resA-some-id", "", resource.PropertyMap{}, "")
|
||||
if !assert.NoError(t, err) {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
host := deploytest.NewPluginHost(nil, program, loaders...)
|
||||
p := &TestPlan{
|
||||
Options: UpdateOptions{host: host},
|
||||
Steps: []TestStep{{Op: Update}},
|
||||
}
|
||||
|
||||
// The read should place "resA" in the snapshot with the "External" bit set.
|
||||
snap := p.Run(t, nil)
|
||||
assert.Len(t, snap.Resources, 2)
|
||||
assert.Equal(t, string(snap.Resources[0].URN.Name()), "default") // provider
|
||||
assert.Equal(t, string(snap.Resources[1].URN.Name()), "resA")
|
||||
assert.True(t, snap.Resources[1].External)
|
||||
|
||||
p = &TestPlan{
|
||||
Options: UpdateOptions{host: host},
|
||||
Steps: []TestStep{{Op: Refresh}},
|
||||
}
|
||||
|
||||
snap = p.Run(t, snap)
|
||||
// A refresh should leave "resA" as it is in the snapshot. The External bit should still be set.
|
||||
assert.Len(t, snap.Resources, 2)
|
||||
assert.Equal(t, string(snap.Resources[0].URN.Name()), "default") // provider
|
||||
assert.Equal(t, string(snap.Resources[1].URN.Name()), "resA")
|
||||
assert.True(t, snap.Resources[1].External)
|
||||
}
|
||||
|
|
|
@ -71,13 +71,13 @@ func (src *refreshSource) Iterate(ctx context.Context, opts Options, provs Provi
|
|||
|
||||
// refreshSourceIterator returns state from an existing snapshot, augmented by consulting the resource provider.
|
||||
type refreshSourceIterator struct {
|
||||
ctx context.Context // cancellation context for this source.
|
||||
plugctx *plugin.Context
|
||||
target *Target
|
||||
providers ProviderSource
|
||||
states []*resource.State
|
||||
current int
|
||||
lastEvent *refreshSourceEvent // the last event that we emitted, or nil if no such event exists.
|
||||
ctx context.Context // cancellation context for this source.
|
||||
plugctx *plugin.Context
|
||||
target *Target
|
||||
providers ProviderSource
|
||||
states []*resource.State
|
||||
current int
|
||||
lastEventDone chan struct{} // completion channel for the last event that we sent, or nil if we haven't emitted any
|
||||
}
|
||||
|
||||
func (iter *refreshSourceIterator) Close() error {
|
||||
|
@ -94,10 +94,11 @@ func (iter *refreshSourceIterator) Next() (SourceEvent, error) {
|
|||
// The simplest way to guarantee this property is to serialize every event such that the next event isn't
|
||||
// sent until the previous event retires. This isn't fast, but it works. We should come up with a more
|
||||
// performant method at some point.
|
||||
if iter.lastEvent != nil {
|
||||
if iter.lastEventDone != nil {
|
||||
logging.V(7).Infof("refreshSourceIterator.Next(): waiting for previous event to retire")
|
||||
|
||||
select {
|
||||
case <-iter.lastEvent.done:
|
||||
case <-iter.lastEventDone:
|
||||
case <-iter.ctx.Done():
|
||||
logging.V(7).Infof("refreshSourceIterator.Next(): cancelled, exiting")
|
||||
return nil, nil
|
||||
|
@ -110,13 +111,30 @@ func (iter *refreshSourceIterator) Next() (SourceEvent, error) {
|
|||
logging.V(7).Infof("refreshSourceIterator.Next(): no more goal states")
|
||||
return nil, nil
|
||||
}
|
||||
goal, err := iter.newRefreshGoal(iter.states[iter.current])
|
||||
|
||||
current := iter.states[iter.current]
|
||||
if current.External {
|
||||
event := &refreshReadEvent{
|
||||
id: current.ID,
|
||||
name: current.URN.Name(),
|
||||
baseType: current.Type,
|
||||
provider: current.Provider,
|
||||
parent: current.Parent,
|
||||
props: current.Inputs,
|
||||
dependencies: current.Dependencies,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
iter.lastEventDone = event.done
|
||||
return event, nil
|
||||
}
|
||||
goal, err := iter.newRefreshGoal(current)
|
||||
if err != nil {
|
||||
logging.V(7).Infof("refreshSourceIterator.Next(): error: %s", err.Error())
|
||||
return nil, err
|
||||
} else if goal != nil {
|
||||
iter.lastEvent = &refreshSourceEvent{goal: goal, done: make(chan struct{})}
|
||||
return iter.lastEvent, nil
|
||||
event := &refreshSourceEvent{goal: goal, done: make(chan struct{})}
|
||||
iter.lastEventDone = event.done
|
||||
return event, nil
|
||||
}
|
||||
// If the goal was nil, it means the resource was deleted, and we should keep going.
|
||||
}
|
||||
|
@ -160,3 +178,28 @@ func (rse *refreshSourceEvent) Goal() *resource.Goal { return rse.goal }
|
|||
func (rse *refreshSourceEvent) Done(result *RegisterResult) {
|
||||
rse.done <- struct{}{}
|
||||
}
|
||||
|
||||
type refreshReadEvent struct {
|
||||
id resource.ID
|
||||
name tokens.QName
|
||||
baseType tokens.Type
|
||||
provider string
|
||||
parent resource.URN
|
||||
props resource.PropertyMap
|
||||
dependencies []resource.URN
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
var _ ReadResourceEvent = (*refreshReadEvent)(nil)
|
||||
|
||||
func (g *refreshReadEvent) event() {}
|
||||
func (g *refreshReadEvent) ID() resource.ID { return g.id }
|
||||
func (g *refreshReadEvent) Name() tokens.QName { return g.name }
|
||||
func (g *refreshReadEvent) Type() tokens.Type { return g.baseType }
|
||||
func (g *refreshReadEvent) Provider() string { return g.provider }
|
||||
func (g *refreshReadEvent) Parent() resource.URN { return g.parent }
|
||||
func (g *refreshReadEvent) Properties() resource.PropertyMap { return g.props }
|
||||
func (g *refreshReadEvent) Dependencies() []resource.URN { return g.dependencies }
|
||||
func (g *refreshReadEvent) Done(_ *ReadResult) {
|
||||
g.done <- struct{}{}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue