2362d45a5c
Despite our good progress moving towards having an apitype package, where our exchange types live and can be shared among the engine and our services, there were a few major types that were still duplciated. Resource was the biggest example -- and indeed, the apitype varirant was missing the new Dependencies property -- but there were others, like Manfiest, PluginInfo, etc. These too had semi-random omissions. This change merges all of these types into the apitype package. This not only cleans up the redundancy and missing properties, but will "force the issue" with respect to keeping them in sync and properly versioning the information in a backwards compatible way. The resource/stack package still exists as a simple marshaling layer to and from the engine's core data types. Finally, I've made the controversial change to share the actual Deployment data structure at the apitype layer also. This will force us to confront differences in that data structure similarly, and will allow us to leverage the strong typing throughout to catch issues.
244 lines
7 KiB
Go
244 lines
7 KiB
Go
// Copyright 2016-2017, Pulumi Corporation. All rights reserved.
|
|
|
|
package operations
|
|
|
|
import (
|
|
"sort"
|
|
|
|
"github.com/hashicorp/go-multierror"
|
|
|
|
"github.com/pulumi/pulumi/pkg/resource"
|
|
"github.com/pulumi/pulumi/pkg/tokens"
|
|
"github.com/pulumi/pulumi/pkg/util/contract"
|
|
)
|
|
|
|
// Resource is a tree representation of a resource/component hierarchy
|
|
type Resource struct {
|
|
Stack tokens.QName
|
|
Project tokens.PackageName
|
|
State *resource.State
|
|
Parent *Resource
|
|
Children map[resource.URN]*Resource
|
|
}
|
|
|
|
// NewResourceMap constructs a map of resources with parent/child relations, indexed by URN.
|
|
func NewResourceMap(source []*resource.State) map[resource.URN]*Resource {
|
|
_, resources := makeResourceTreeMap(source)
|
|
return resources
|
|
}
|
|
|
|
// NewResourceTree constructs a tree representation of a resource/component hierarchy
|
|
func NewResourceTree(source []*resource.State) *Resource {
|
|
root, _ := makeResourceTreeMap(source)
|
|
return root
|
|
}
|
|
|
|
// makeResourceTreeMap is a helper used by the two above functions to construct a resource hierarchy.
|
|
func makeResourceTreeMap(source []*resource.State) (*Resource, map[resource.URN]*Resource) {
|
|
resources := make(map[resource.URN]*Resource)
|
|
|
|
var stack tokens.QName
|
|
var proj tokens.PackageName
|
|
|
|
// First create a list of resource nodes, without parent/child relations hooked up.
|
|
for _, state := range source {
|
|
stack = state.URN.Stack()
|
|
proj = state.URN.Project()
|
|
if !state.Delete {
|
|
// Only include resources which are not marked as pending-deletion.
|
|
contract.Assertf(resources[state.URN] == nil, "Unexpected duplicate resource %s", state.URN)
|
|
resources[state.URN] = &Resource{
|
|
Stack: stack,
|
|
Project: proj,
|
|
State: state,
|
|
Children: make(map[resource.URN]*Resource),
|
|
}
|
|
}
|
|
}
|
|
|
|
// Next, walk the list of resources, and wire up parents and children. We do this in a second pass so
|
|
// that the creation of the tree isn't order dependent.
|
|
for _, child := range resources {
|
|
if parurn := child.State.Parent; parurn != "" {
|
|
parent, ok := resources[parurn]
|
|
contract.Assertf(ok, "Expected to find parent node '%v' in checkpoint tree nodes", parurn)
|
|
child.Parent = parent
|
|
parent.Children[child.State.URN] = child
|
|
}
|
|
}
|
|
|
|
// Create a single root node which is the parent of all unparented nodes
|
|
root := &Resource{
|
|
Stack: stack,
|
|
Project: proj,
|
|
State: nil,
|
|
Parent: nil,
|
|
Children: make(map[resource.URN]*Resource),
|
|
}
|
|
for _, node := range resources {
|
|
if node.Parent == nil {
|
|
root.Children[node.State.URN] = node
|
|
node.Parent = root
|
|
}
|
|
}
|
|
|
|
// Return the root node and map of children.
|
|
return root, resources
|
|
}
|
|
|
|
// GetChild find a child with the given type and name or returns `nil`.
|
|
func (r *Resource) GetChild(typ string, name string) *Resource {
|
|
for childURN, childResource := range r.Children {
|
|
if childURN.Stack() == r.Stack &&
|
|
childURN.Project() == r.Project &&
|
|
childURN.Type() == tokens.Type(typ) &&
|
|
childURN.Name() == tokens.QName(name) {
|
|
return childResource
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// OperationsProvider gets an OperationsProvider for this resource.
|
|
func (r *Resource) OperationsProvider(config map[tokens.ModuleMember]string) Provider {
|
|
return &resourceOperations{
|
|
resource: r,
|
|
config: config,
|
|
}
|
|
}
|
|
|
|
// ResourceOperations is an OperationsProvider for Resources
|
|
type resourceOperations struct {
|
|
resource *Resource
|
|
config map[tokens.ModuleMember]string
|
|
}
|
|
|
|
var _ Provider = (*resourceOperations)(nil)
|
|
|
|
// GetLogs gets logs for a Resource
|
|
func (ops *resourceOperations) GetLogs(query LogQuery) (*[]LogEntry, error) {
|
|
// Only get logs for this resource if it matches the resource filter query
|
|
if ops.matchesResourceFilter(query.ResourceFilter) {
|
|
// Set query to be a new query with `ResourceFilter` nil so that we don't filter out logs from any children of
|
|
// this resource since this resource did match the resource filter.
|
|
query = LogQuery{
|
|
StartTime: query.StartTime,
|
|
EndTime: query.EndTime,
|
|
ResourceFilter: nil,
|
|
}
|
|
// Try to get an operations provider for this resource, it may be `nil`
|
|
opsProvider, err := ops.getOperationsProvider()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if opsProvider != nil {
|
|
// If this resource has an operations provider - use it and don't recur into children. It is the
|
|
// responsibility of it's GetLogs implementation to aggregate all logs from children, either by passing them
|
|
// through or by filtering specific content out.
|
|
logsResult, err := opsProvider.GetLogs(query)
|
|
if err != nil {
|
|
return logsResult, err
|
|
}
|
|
if logsResult != nil {
|
|
return logsResult, nil
|
|
}
|
|
}
|
|
}
|
|
// If this resource did not choose to provide it's own logs, recur into children and collect + aggregate their logs.
|
|
var logs []LogEntry
|
|
// Kick off GetLogs on all children in parallel, writing results to shared channels
|
|
ch := make(chan *[]LogEntry)
|
|
errch := make(chan error)
|
|
for _, child := range ops.resource.Children {
|
|
childOps := &resourceOperations{
|
|
resource: child,
|
|
config: ops.config,
|
|
}
|
|
go func() {
|
|
childLogs, err := childOps.GetLogs(query)
|
|
ch <- childLogs
|
|
errch <- err
|
|
}()
|
|
}
|
|
// Handle results from GetLogs calls as they complete
|
|
var err error
|
|
for range ops.resource.Children {
|
|
childLogs := <-ch
|
|
childErr := <-errch
|
|
if childErr != nil {
|
|
err = multierror.Append(err, childErr)
|
|
}
|
|
if childLogs != nil {
|
|
logs = append(logs, *childLogs...)
|
|
}
|
|
}
|
|
if err != nil {
|
|
return &logs, err
|
|
}
|
|
// Sort
|
|
sort.SliceStable(logs, func(i, j int) bool { return logs[i].Timestamp < logs[j].Timestamp })
|
|
// Remove duplicates
|
|
var retLogs []LogEntry
|
|
var lastLogTimestamp int64
|
|
var lastLogs []LogEntry
|
|
for _, log := range logs {
|
|
shouldContinue := false
|
|
if log.Timestamp == lastLogTimestamp {
|
|
for _, lastLog := range lastLogs {
|
|
if log.Message == lastLog.Message {
|
|
shouldContinue = true
|
|
break
|
|
}
|
|
}
|
|
} else {
|
|
lastLogs = nil
|
|
}
|
|
if shouldContinue {
|
|
continue
|
|
}
|
|
lastLogs = append(lastLogs, log)
|
|
lastLogTimestamp = log.Timestamp
|
|
retLogs = append(retLogs, log)
|
|
}
|
|
return &retLogs, nil
|
|
}
|
|
|
|
// matchesResourceFilter determines whether this resource matches the provided resource filter.
|
|
func (ops *resourceOperations) matchesResourceFilter(filter *ResourceFilter) bool {
|
|
if filter == nil {
|
|
// No filter, all resources match it.
|
|
return true
|
|
}
|
|
if ops.resource == nil || ops.resource.State == nil {
|
|
return false
|
|
}
|
|
urn := ops.resource.State.URN
|
|
if resource.URN(*filter) == urn {
|
|
// The filter matched the full URN
|
|
return true
|
|
}
|
|
if string(*filter) == string(urn.Type())+"::"+string(urn.Name()) {
|
|
// The filter matched the '<type>::<name>' part of the URN
|
|
return true
|
|
}
|
|
if tokens.QName(*filter) == urn.Name() {
|
|
// The filter matched the '<name>' part of the URN
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (ops *resourceOperations) getOperationsProvider() (Provider, error) {
|
|
if ops.resource == nil || ops.resource.State == nil {
|
|
return nil, nil
|
|
}
|
|
switch ops.resource.State.Type.Package() {
|
|
case "cloud":
|
|
return CloudOperationsProvider(ops.config, ops.resource)
|
|
case "aws":
|
|
return AWSOperationsProvider(ops.config, ops.resource)
|
|
default:
|
|
return nil, nil
|
|
}
|
|
}
|