Allow query on local backend stack snapshots

This commit is contained in:
Alex Clemmer 2019-10-23 15:15:04 -07:00
parent 8fdb2ab20b
commit 9e4110904c
5 changed files with 97 additions and 32 deletions

View file

@ -26,6 +26,7 @@ import (
"strings"
"time"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"gocloud.dev/blob"
_ "gocloud.dev/blob/azureblob" // driver for azblob://
@ -550,11 +551,58 @@ func (b *localBackend) apply(
// query executes a query program against the resource outputs of a locally hosted stack.
func (b *localBackend) query(ctx context.Context, op backend.QueryOperation,
events chan<- engine.Event) result.Result {
callerEventsOpt chan<- engine.Event) result.Result {
// TODO: Consider implementing this for local backend. We left it out for the initial cut
// because we weren't sure we wanted to commit to it.
return result.Error("Local backend does not support querying over the state")
q, err := b.newQuery(ctx, op)
if err != nil {
return result.FromError(err)
}
// Render query output to CLI.
displayEvents := make(chan engine.Event)
displayDone := make(chan bool)
go display.ShowQueryEvents("running query", displayEvents, displayDone, op.Opts.Display)
// The engineEvents channel receives all events from the engine, which we then forward onto other
// channels for actual processing. (displayEvents and callerEventsOpt.)
engineEvents := make(chan engine.Event)
eventsDone := make(chan bool)
go func() {
for e := range engineEvents {
displayEvents <- e
if callerEventsOpt != nil {
callerEventsOpt <- e
}
}
close(eventsDone)
}()
// Depending on the action, kick off the relevant engine activity. Note that we don't immediately check and
// return error conditions, because we will do so below after waiting for the display channels to close.
cancellationScope := op.Scopes.NewScope(engineEvents, true /*dryRun*/)
engineCtx := &engine.Context{
Cancel: cancellationScope.Context(),
Events: engineEvents,
BackendClient: backend.NewBackendClient(b),
}
if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil {
engineCtx.ParentSpan = parentSpan.Context()
}
res := engine.Query(engineCtx, q, op.Opts.Engine)
// Wait for dependent channels to finish processing engineEvents before closing.
<-displayDone
cancellationScope.Close() // Don't take any cancellations anymore, we're shutting down.
close(engineEvents)
// Make sure that the goroutine writing to displayEvents and callerEventsOpt
// has exited before proceeding
<-eventsDone
close(displayEvents)
return res
}
func (b *localBackend) GetHistory(ctx context.Context, stackRef backend.StackReference) ([]backend.UpdateInfo, error) {

View file

@ -49,6 +49,19 @@ const DisableCheckpointBackupsEnvVar = "PULUMI_DISABLE_CHECKPOINT_BACKUPS"
// be used as a last resort when a command absolutely must be run.
var DisableIntegrityChecking bool
type cloudQuery struct {
root string
proj *workspace.Project
}
func (q *cloudQuery) GetRoot() string {
return q.root
}
func (q *cloudQuery) GetProject() *workspace.Project {
return q.proj
}
// update is an implementation of engine.Update backed by local state.
type update struct {
root string
@ -69,6 +82,12 @@ func (u *update) GetTarget() *deploy.Target {
return u.target
}
func (b *localBackend) newQuery(ctx context.Context,
op backend.QueryOperation) (*cloudQuery, error) {
return &cloudQuery{root: op.Root, proj: op.Proj}, nil
}
func (b *localBackend) newUpdate(stackName tokens.QName, op backend.UpdateOperation) (*update, error) {
contract.Require(stackName != "", "stackName")

View file

@ -130,7 +130,7 @@ func query(ctx *Context, q QueryInfo, opts QueryOptions) result.Result {
if res.IsBail() {
return res
}
return result.Error("an error occurred while running the query")
return result.Errorf("an error occurred while running the query: %v", res.Error())
}
return nil
}

View file

@ -4,34 +4,32 @@ package ints
import (
"testing"
"github.com/pulumi/pulumi/pkg/testing/integration"
)
// TestQuery creates a stack and runs a query over the stack's resource ouptputs.
func TestQuery(t *testing.T) {
//
// TODO[hausdorff, #3396]: Enable once we allow query against local backend stacks.
//
// integration.ProgramTest(t, &integration.ProgramTestOptions{
// // Create Pulumi resources.
// Dir: "step1",
// StackName: "query-stack",
// Dependencies: []string{"@pulumi/pulumi"},
// EditDirs: []integration.EditDir{
// // Try to create resources during `pulumi query`. This should fail.
// {
// Dir: "step2",
// Additive: true,
// QueryMode: true,
// ExpectFailure: true,
// },
// // Run a query during `pulumi query`. Should succeed.
// {
// Dir: "step3",
// Additive: true,
// QueryMode: true,
// ExpectFailure: false,
// },
// },
// })
integration.ProgramTest(t, &integration.ProgramTestOptions{
// Create Pulumi resources.
Dir: "step1",
StackName: "query-stack",
Dependencies: []string{"@pulumi/pulumi"},
EditDirs: []integration.EditDir{
// Try to create resources during `pulumi query`. This should fail.
{
Dir: "step2",
Additive: true,
QueryMode: true,
ExpectFailure: true,
},
// Run a query during `pulumi query`. Should succeed.
{
Dir: "step3",
Additive: true,
QueryMode: true,
ExpectFailure: false,
},
},
})
}

View file

@ -4,7 +4,7 @@ import * as pulumi from "@pulumi/pulumi";
// Step 3: Run a query during `pulumi query`.
pulumi.runtime
.listResourceOutputs(undefined, `moolumi/${pulumi.runtime.getStack()}`)
.listResourceOutputs(undefined, "moolumi/query-stack")
.groupBy<string, pulumi.Resource>(r => (<any>r).__pulumiType)
.all(async function(group) {
const count = await group.count();