From 3f0d511cd80c9e3505b4a2214fc0b31c3e7f0d4e Mon Sep 17 00:00:00 2001 From: Luke Hoban Date: Fri, 17 Nov 2017 14:30:02 -0800 Subject: [PATCH 01/18] Simplify log collection --- pkg/pulumiframework/awsConnection.go | 99 ++++++++++------------------ pkg/pulumiframework/resources.go | 27 +++++--- 2 files changed, 53 insertions(+), 73 deletions(-) diff --git a/pkg/pulumiframework/awsConnection.go b/pkg/pulumiframework/awsConnection.go index 96084588c..de2b92232 100644 --- a/pkg/pulumiframework/awsConnection.go +++ b/pkg/pulumiframework/awsConnection.go @@ -3,12 +3,12 @@ package pulumiframework import ( "regexp" - "github.com/aws/aws-sdk-go/service/cloudwatch" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/golang/glog" + "github.com/pulumi/pulumi/pkg/component" ) @@ -28,71 +28,42 @@ func newAWSConnection(sess *session.Session) *awsConnection { var logRegexp = regexp.MustCompile(".*Z\t[a-g0-9\\-]*\t(.*)") -func (p *awsConnection) getLogsForFunctionsConcurrently(functions []string) []component.LogEntry { - var logs []component.LogEntry - ch := make(chan []component.LogEntry) - for _, functionName := range functions { - go func(functionName string) { - ch <- p.getLogsForFunction(functionName) - }(functionName) - } - for i := 0; i < len(functions); i++ { - logs = append(logs, <-ch...) - } - return logs -} +func (p *awsConnection) getLogsForLogGroupsConcurrently(names []string, logGroups []string) []component.LogEntry { -func (p *awsConnection) getLogsForFunction(functionName string) []component.LogEntry { - logGroupName := "/aws/lambda/" + functionName - resp, err := p.logSvc.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{ - LogGroupName: aws.String(logGroupName), - }) - if err != nil { - glog.V(5).Infof("[getLogs] Error getting logs: %v %v\n", logGroupName, err) - return nil - } - glog.V(5).Infof("[getLogs] Log streams: %v\n", resp) - logResult := p.getLogsForFunctionNameStreamsConcurrently(functionName, resp.LogStreams) - return logResult -} + // Create a channel for collecting log event outputs + ch := make(chan []*cloudwatchlogs.FilteredLogEvent) -func (p *awsConnection) getLogsForFunctionNameStreamsConcurrently(functionName string, - logStreams []*cloudwatchlogs.LogStream) []component.LogEntry { - var logs []component.LogEntry - ch := make(chan []component.LogEntry) - for _, logStream := range logStreams { - go func(logStreamName *string) { - ch <- p.getLogsForFunctionNameStream(functionName, logStreamName) - }(logStream.LogStreamName) - } - for i := 0; i < len(logStreams); i++ { - logs = append(logs, <-ch...) - } - return logs -} - -func (p *awsConnection) getLogsForFunctionNameStream(functionName string, logStreamName *string) []component.LogEntry { - var logResult []component.LogEntry - logGroupName := "/aws/lambda/" + functionName - logsResp, err := p.logSvc.GetLogEvents(&cloudwatchlogs.GetLogEventsInput{ - LogGroupName: aws.String(logGroupName), - LogStreamName: logStreamName, - StartFromHead: aws.Bool(true), - }) - if err != nil { - glog.V(5).Infof("[getLogs] Error getting logs: %v %v\n", logStreamName, err) - } - glog.V(5).Infof("[getLogs] Log events: %v\n", logsResp) - for _, event := range logsResp.Events { - innerMatches := logRegexp.FindAllStringSubmatch(aws.StringValue(event.Message), -1) - glog.V(5).Infof("[getLogs] Inner matches: %v\n", innerMatches) - if len(innerMatches) > 0 { - logResult = append(logResult, component.LogEntry{ - ID: functionName, - Message: innerMatches[0][1], - Timestamp: aws.Int64Value(event.Timestamp), + // Run FilterLogEvents for each log group in parallel + for _, logGroup := range logGroups { + go func(logGroup string) { + resp, err := p.logSvc.FilterLogEvents(&cloudwatchlogs.FilterLogEventsInput{ + LogGroupName: aws.String(logGroup), }) + if err != nil { + glog.V(5).Infof("[getLogs] Error getting logs: %v %v\n", logGroup, err) + } + ch <- resp.Events + }(logGroup) + } + + // Collect responses on the channel and append logs into combined log array + var logs []component.LogEntry + for i := 0; i < len(logGroups); i++ { + logEvents := <-ch + if logEvents != nil { + for _, event := range logEvents { + innerMatches := logRegexp.FindAllStringSubmatch(aws.StringValue(event.Message), -1) + glog.V(5).Infof("[getLogs] Inner matches: %v\n", innerMatches) + if len(innerMatches) > 0 { + logs = append(logs, component.LogEntry{ + ID: names[i], + Message: innerMatches[0][1], + Timestamp: aws.Int64Value(event.Timestamp), + }) + } + } } } - return logResult + + return logs } diff --git a/pkg/pulumiframework/resources.go b/pkg/pulumiframework/resources.go index e7adb6ef8..0160b31d8 100644 --- a/pkg/pulumiframework/resources.go +++ b/pkg/pulumiframework/resources.go @@ -187,7 +187,7 @@ func (ops *componentOpsProvider) GetLogs(query component.LogQuery) ([]component. switch ops.component.Type { case pulumiFunctionType: functionName := ops.component.Resources["function"].Outputs["name"].StringValue() - logResult := ops.awsConnection.getLogsForFunction(functionName) + logResult := ops.awsConnection.getLogsForLogGroupsConcurrently([]string{functionName}, []string{"/aws/lambda/" + functionName}) sort.SliceStable(logResult, func(i, j int) bool { return logResult[i].Timestamp < logResult[j].Timestamp }) return logResult, nil default: @@ -295,19 +295,28 @@ type componentsOpsProvider struct { var _ component.OperationsProvider = (*componentsOpsProvider)(nil) -// GetLogs for a collection of Components returns combined logs from all Pulumi Function -// components in the collection. +// GetLogs returns combined logs for all Pulumi compute components. func (ops *componentsOpsProvider) GetLogs(query component.LogQuery) ([]component.LogEntry, error) { if query.StartTime != nil || query.EndTime != nil || query.Query != nil { contract.Failf("not yet implemented - StartTime, Endtime, Query") } - var functionNames []string - functionComponents := ops.components.FilterByType(pulumiFunctionType) - for _, v := range functionComponents { - functionName := v.Resources["function"].Outputs["name"].StringValue() - functionNames = append(functionNames, functionName) + + // Collect names and log groups that we want to query + var resourceNames []string + var logGroupNames []string + for _, v := range ops.components { + switch v.Type { + case pulumiFunctionType: + functionName := v.Resources["function"].Outputs["name"].StringValue() + resourceNames = append(resourceNames, functionName) + logGroupNames = append(logGroupNames, "/aws/lambda/"+functionName) + } } - logResults := ops.awsConnection.getLogsForFunctionsConcurrently(functionNames) + + // Concurrently read logs from CloudWatch + logResults := ops.awsConnection.getLogsForLogGroupsConcurrently(resourceNames, logGroupNames) + + // Sort and return log results sort.SliceStable(logResults, func(i, j int) bool { return logResults[i].Timestamp < logResults[j].Timestamp }) return logResults, nil } From 0079a38ee07261133f6857c26cb0e5bd07571a18 Mon Sep 17 00:00:00 2001 From: Luke Hoban Date: Sun, 19 Nov 2017 17:41:11 -0800 Subject: [PATCH 02/18] Transition to resource tree --- cmd/backend_local.go | 7 +- pkg/pulumiframework/awsConnection.go | 20 +- pkg/pulumiframework/resources.go | 385 ++++++++++++-------------- pkg/pulumiframework/resources_test.go | 99 +++---- 4 files changed, 227 insertions(+), 284 deletions(-) diff --git a/cmd/backend_local.go b/cmd/backend_local.go index e7d08010b..3ec69f075 100644 --- a/cmd/backend_local.go +++ b/cmd/backend_local.go @@ -159,11 +159,8 @@ func (b *localPulumiBackend) GetLogs(stackName tokens.QName, query component.Log contract.Assert(target != nil) // TODO[pulumi/pulumi#54]: replace this with a call into a generalized operations provider. - components := pulumiframework.GetComponents(snap.Resources) - ops, err := pulumiframework.OperationsProviderForComponents(target.Config, components) - if err != nil { - return nil, err - } + components := pulumiframework.NewResource(snap.Resources) + ops := pulumiframework.NewResourceOperations(target.Config, components) return ops.GetLogs(query) } diff --git a/pkg/pulumiframework/awsConnection.go b/pkg/pulumiframework/awsConnection.go index de2b92232..a0533ae4b 100644 --- a/pkg/pulumiframework/awsConnection.go +++ b/pkg/pulumiframework/awsConnection.go @@ -50,17 +50,15 @@ func (p *awsConnection) getLogsForLogGroupsConcurrently(names []string, logGroup var logs []component.LogEntry for i := 0; i < len(logGroups); i++ { logEvents := <-ch - if logEvents != nil { - for _, event := range logEvents { - innerMatches := logRegexp.FindAllStringSubmatch(aws.StringValue(event.Message), -1) - glog.V(5).Infof("[getLogs] Inner matches: %v\n", innerMatches) - if len(innerMatches) > 0 { - logs = append(logs, component.LogEntry{ - ID: names[i], - Message: innerMatches[0][1], - Timestamp: aws.Int64Value(event.Timestamp), - }) - } + for _, event := range logEvents { + innerMatches := logRegexp.FindAllStringSubmatch(aws.StringValue(event.Message), -1) + glog.V(5).Infof("[getLogs] Inner matches: %v\n", innerMatches) + if len(innerMatches) > 0 { + logs = append(logs, component.LogEntry{ + ID: names[i], + Message: innerMatches[0][1], + Timestamp: aws.Int64Value(event.Timestamp), + }) } } } diff --git a/pkg/pulumiframework/resources.go b/pkg/pulumiframework/resources.go index 0160b31d8..7de88a781 100644 --- a/pkg/pulumiframework/resources.go +++ b/pkg/pulumiframework/resources.go @@ -3,11 +3,9 @@ package pulumiframework import ( "fmt" "sort" - "strings" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/pkg/errors" "github.com/pulumi/pulumi/pkg/component" @@ -19,88 +17,6 @@ import ( // This file contains the implementation of the component.Components interface for the // AWS implementation of the Pulumi Framework defined in this repo. -// GetComponents extracts the Pulumi Framework components from a checkpoint -// file, based on the raw resources created by the implementation of the Pulumi Framework -// in this repo. -func GetComponents(source []*resource.State) component.Components { - sourceMap := makeIDLookup(source) - components := make(component.Components) - for _, res := range source { - name := string(res.URN.Name()) - if res.Type == stageType { - stage := res - deployment := lookup(sourceMap, deploymentType, stage.Inputs["deployment"].StringValue()) - restAPI := lookup(sourceMap, restAPIType, stage.Inputs["restApi"].StringValue()) - baseURL := deployment.Outputs["invokeUrl"].StringValue() + stage.Inputs["stageName"].StringValue() + "/" - restAPIName := restAPI.URN.Name() - urn := newPulumiFrameworkURN(res.URN, pulumiEndpointType, restAPIName) - components[urn] = &component.Component{ - Type: pulumiEndpointType, - Properties: resource.NewPropertyMapFromMap(map[string]interface{}{ - "url": baseURL, - }), - Resources: map[string]*resource.State{ - "restapi": restAPI, - "deployment": deployment, - "stage": stage, - }, - } - } else if res.Type == eventRuleType { - urn := newPulumiFrameworkURN(res.URN, pulumiTimerType, tokens.QName(name)) - components[urn] = &component.Component{ - Type: pulumiTimerType, - Properties: resource.NewPropertyMapFromMap(map[string]interface{}{ - "schedule": res.Inputs["scheduleExpression"].StringValue(), - }), - Resources: map[string]*resource.State{ - "rule": res, - "target": nil, - "permission": nil, - }, - } - } else if res.Type == tableType { - urn := newPulumiFrameworkURN(res.URN, pulumiTableType, tokens.QName(name)) - components[urn] = &component.Component{ - Type: pulumiTableType, - Properties: resource.NewPropertyMapFromMap(map[string]interface{}{ - "primaryKey": res.Inputs["hashKey"].StringValue(), - }), - Resources: map[string]*resource.State{ - "table": res, - }, - } - } else if res.Type == topicType { - if !strings.HasSuffix(name, "unhandled-error-topic") { - urn := newPulumiFrameworkURN(res.URN, pulumiTopicType, tokens.QName(name)) - components[urn] = &component.Component{ - Type: pulumiTopicType, - Properties: resource.NewPropertyMapFromMap(map[string]interface{}{}), - Resources: map[string]*resource.State{ - "topic": res, - }, - } - } - } else if res.Type == functionType { - if !strings.HasSuffix(name, "-log-collector") { - urn := newPulumiFrameworkURN(res.URN, pulumiFunctionType, tokens.QName(name)) - components[urn] = &component.Component{ - Type: pulumiFunctionType, - Properties: resource.NewPropertyMapFromMap(map[string]interface{}{}), - Resources: map[string]*resource.State{ - "function": res, - "role": nil, - "roleAttachment": nil, - "logGroup": nil, - "logSubscriptionFilter": nil, - "permission": nil, - }, - } - } - } - } - return components -} - // This function grovels through the given configuration bag, extracts the bits necessary to create an AWS session // (currently just the AWS region to target), and creates and returns the session. If the bag does not contain the // necessary properties or if session creation fails, this function returns `nil, error`. @@ -119,7 +35,7 @@ func createSessionFromConfig(config map[tokens.ModuleMember]string) (*session.Se // operational queries based on the underlying resources of the AWS Pulumi Framework implementation. func OperationsProviderForComponent( config map[tokens.ModuleMember]string, - component *component.Component) (component.OperationsProvider, error) { + component *Resource) (component.OperationsProvider, error) { sess, err := createSessionFromConfig(config) if err != nil { @@ -135,7 +51,7 @@ func OperationsProviderForComponent( type componentOpsProvider struct { awsConnection *awsConnection - component *component.Component + component *Resource } var _ component.OperationsProvider = (*componentOpsProvider)(nil) @@ -144,15 +60,6 @@ const ( // AWS config keys regionKey = "aws:config:region" - // AWS Resource Types - stageType = "aws:apigateway/stage:Stage" - deploymentType = "aws:apigateway/deployment:Deployment" - restAPIType = "aws:apigateway/restApi:RestApi" - eventRuleType = "aws:cloudwatch/eventRule:EventRule" - tableType = "aws:dynamodb/table:Table" - topicType = "aws:sns/topic:Topic" - functionType = "aws:lambda/function:Function" - // Pulumi Framework "virtual" types pulumiEndpointType = tokens.Type("pulumi:framework:Endpoint") pulumiTopicType = tokens.Type("pulumi:framework:Topic") @@ -184,19 +91,23 @@ func (ops *componentOpsProvider) GetLogs(query component.LogQuery) ([]component. if query.StartTime != nil || query.EndTime != nil || query.Query != nil { contract.Failf("not yet implemented - StartTime, Endtime, Query") } - switch ops.component.Type { + fmt.Printf("[GetLogs] type = %v", ops.component.state.Type) + switch ops.component.state.Type { case pulumiFunctionType: - functionName := ops.component.Resources["function"].Outputs["name"].StringValue() + urn := ops.component.state.URN + serverlessFunction := ops.component.GetChild("aws:serverless:Function", string(urn.Name())) + awsFunction := serverlessFunction.GetChild("aws:lambda/function:Function", string(urn.Name())) + functionName := awsFunction.state.Outputs["name"].StringValue() logResult := ops.awsConnection.getLogsForLogGroupsConcurrently([]string{functionName}, []string{"/aws/lambda/" + functionName}) sort.SliceStable(logResult, func(i, j int) bool { return logResult[i].Timestamp < logResult[j].Timestamp }) return logResult, nil default: - return nil, fmt.Errorf("Logs not supported for component type: %s", ops.component.Type) + return nil, fmt.Errorf("Logs not supported for component type: %s", ops.component.state.Type) } } func (ops *componentOpsProvider) ListMetrics() []component.MetricName { - switch ops.component.Type { + switch ops.component.state.Type { case pulumiFunctionType: // Don't include these which are internal implementation metrics: DLQ delivery errors return []component.MetricName{functionInvocations, functionDuration, functionErrors, functionThrottles} @@ -219,135 +130,187 @@ func (ops *componentOpsProvider) ListMetrics() []component.MetricName { func (ops *componentOpsProvider) GetMetricStatistics(metric component.MetricRequest) ([]component.MetricDataPoint, error) { - var dimensions []*cloudwatch.Dimension - var namespace string + return nil, fmt.Errorf("Not yet implmeneted: GetMetricStatistics") - switch ops.component.Type { - case pulumiFunctionType: - dimensions = append(dimensions, &cloudwatch.Dimension{ - Name: aws.String("FunctionName"), - Value: aws.String(string(ops.component.Resources["function"].ID)), - }) - namespace = "AWS/Lambda" - case pulumiEndpointType: - contract.Failf("not yet implemented") - case pulumiTopicType: - contract.Failf("not yet implemented") - case pulumiTimerType: - contract.Failf("not yet implemented") - case pulumiTableType: - contract.Failf("not yet implemented") - default: - contract.Failf("invalid component type") - } + // var dimensions []*cloudwatch.Dimension + // var namespace string - resp, err := ops.awsConnection.metricSvc.GetMetricStatistics(&cloudwatch.GetMetricStatisticsInput{ - Namespace: aws.String(namespace), - MetricName: aws.String(metric.Name), - Dimensions: dimensions, - Statistics: []*string{ - aws.String("Sum"), aws.String("SampleCount"), aws.String("Average"), - aws.String("Maximum"), aws.String("Minimum"), - }, - }) - if err != nil { - return nil, err - } + // switch ops.component.state.Type { + // case pulumiFunctionType: + // dimensions = append(dimensions, &cloudwatch.Dimension{ + // Name: aws.String("FunctionName"), + // Value: aws.String(string(ops.component.Resources["function"].ID)), + // }) + // namespace = "AWS/Lambda" + // case pulumiEndpointType: + // contract.Failf("not yet implemented") + // case pulumiTopicType: + // contract.Failf("not yet implemented") + // case pulumiTimerType: + // contract.Failf("not yet implemented") + // case pulumiTableType: + // contract.Failf("not yet implemented") + // default: + // contract.Failf("invalid component type") + // } - var metrics []component.MetricDataPoint - for _, datapoint := range resp.Datapoints { - metrics = append(metrics, component.MetricDataPoint{ - Timestamp: aws.TimeValue(datapoint.Timestamp), - Unit: aws.StringValue(datapoint.Unit), - Sum: aws.Float64Value(datapoint.Sum), - SampleCount: aws.Float64Value(datapoint.SampleCount), - Average: aws.Float64Value(datapoint.Average), - Maximum: aws.Float64Value(datapoint.Maximum), - Minimum: aws.Float64Value(datapoint.Minimum), - }) - } - return metrics, nil + // resp, err := ops.awsConnection.metricSvc.GetMetricStatistics(&cloudwatch.GetMetricStatisticsInput{ + // Namespace: aws.String(namespace), + // MetricName: aws.String(metric.Name), + // Dimensions: dimensions, + // Statistics: []*string{ + // aws.String("Sum"), aws.String("SampleCount"), aws.String("Average"), + // aws.String("Maximum"), aws.String("Minimum"), + // }, + // }) + // if err != nil { + // return nil, err + // } + + // var metrics []component.MetricDataPoint + // for _, datapoint := range resp.Datapoints { + // metrics = append(metrics, component.MetricDataPoint{ + // Timestamp: aws.TimeValue(datapoint.Timestamp), + // Unit: aws.StringValue(datapoint.Unit), + // Sum: aws.Float64Value(datapoint.Sum), + // SampleCount: aws.Float64Value(datapoint.SampleCount), + // Average: aws.Float64Value(datapoint.Average), + // Maximum: aws.Float64Value(datapoint.Maximum), + // Minimum: aws.Float64Value(datapoint.Minimum), + // }) + // } + // return metrics, nil } -// OperationsProviderForComponents creates an OperationsProvider capable of answering -// operational queries about a collection of Pulumi Framework Components based on the -// underlying resources of the AWS Pulumi Framework implementation. -func OperationsProviderForComponents( - config map[tokens.ModuleMember]string, - components component.Components) (component.OperationsProvider, error) { - - sess, err := createSessionFromConfig(config) - if err != nil { - return nil, errors.Wrap(err, "failed to create AWS session") - } - - prov := &componentsOpsProvider{ - awsConnection: newAWSConnection(sess), - components: components, - } - return prov, nil +// Resource is a tree representation of a resource/component hierarchy +type Resource struct { + ns tokens.QName + alloc tokens.PackageName + state *resource.State + parent *Resource + children map[resource.URN]*Resource } -type componentsOpsProvider struct { - awsConnection *awsConnection - components component.Components -} +// NewResource constructs a tree representation of a resource/component hierarchy +func NewResource(source []*resource.State) *Resource { + treeNodes := map[resource.URN]*Resource{} + var ns tokens.QName + var alloc tokens.PackageName -var _ component.OperationsProvider = (*componentsOpsProvider)(nil) - -// GetLogs returns combined logs for all Pulumi compute components. -func (ops *componentsOpsProvider) GetLogs(query component.LogQuery) ([]component.LogEntry, error) { - if query.StartTime != nil || query.EndTime != nil || query.Query != nil { - contract.Failf("not yet implemented - StartTime, Endtime, Query") + // Walk the ordered resource list and build tree nodes based on child relationships + for _, state := range source { + ns = state.URN.Namespace() + alloc = state.URN.Alloc() + newTree := &Resource{ + ns: ns, + alloc: alloc, + state: state, + parent: nil, + children: map[resource.URN]*Resource{}, + } + for _, childURN := range state.Children { + childTree, ok := treeNodes[childURN] + contract.Assertf(ok, "Expected children to be before parents in resource checkpoint") + childTree.parent = newTree + newTree.children[childTree.state.URN] = childTree + } + treeNodes[state.URN] = newTree } - // Collect names and log groups that we want to query - var resourceNames []string - var logGroupNames []string - for _, v := range ops.components { - switch v.Type { - case pulumiFunctionType: - functionName := v.Resources["function"].Outputs["name"].StringValue() - resourceNames = append(resourceNames, functionName) - logGroupNames = append(logGroupNames, "/aws/lambda/"+functionName) + // Create a single root node which is the parent of all unparented nodes + root := &Resource{ + ns: ns, + alloc: alloc, + state: nil, + parent: nil, + children: map[resource.URN]*Resource{}, + } + for _, node := range treeNodes { + if node.parent == nil { + root.children[node.state.URN] = node + node.parent = root } } - // Concurrently read logs from CloudWatch - logResults := ops.awsConnection.getLogsForLogGroupsConcurrently(resourceNames, logGroupNames) - - // Sort and return log results - sort.SliceStable(logResults, func(i, j int) bool { return logResults[i].Timestamp < logResults[j].Timestamp }) - return logResults, nil + // Return the root node + return root } -func (ops *componentsOpsProvider) ListMetrics() []component.MetricName { +// GetChild find a child with the given type and name or returns `nil`. +func (r *Resource) GetChild(typ string, name string) *Resource { + childURN := resource.NewURN(r.ns, r.alloc, tokens.Type(typ), tokens.QName(name)) + return r.children[childURN] +} + +func getOperationsProvider(resource *Resource, config map[tokens.ModuleMember]string) (component.OperationsProvider, error) { + if resource == nil || resource.state == nil { + return nil, nil + } + switch resource.state.Type { + case "cloud:function:Function": + return cloudFunctionOperationsProvider(resource, config) + default: + return nil, nil + } +} + +func cloudFunctionOperationsProvider(resource *Resource, config map[tokens.ModuleMember]string) (component.OperationsProvider, error) { + return OperationsProviderForComponent(config, resource) +} + +// ResourceOperations is an OperationsProvider for Resources +type ResourceOperations struct { + resource *Resource + config map[tokens.ModuleMember]string +} + +var _ component.OperationsProvider = (*ResourceOperations)(nil) + +// NewResourceOperations constructs an OperationsProvider for a resource and configuration options +func NewResourceOperations(config map[tokens.ModuleMember]string, resource *Resource) *ResourceOperations { + return &ResourceOperations{ + resource: resource, + config: config, + } +} + +// GetLogs gets logs for a Resource +func (ops *ResourceOperations) GetLogs(query component.LogQuery) ([]component.LogEntry, error) { + fmt.Printf("[ResourceOperations.GetLogs]: %v\n", query) + opsProvider, err := getOperationsProvider(ops.resource, ops.config) + if err != nil { + return nil, err + } + if opsProvider != nil { + // If this resource has an operations provider - use it and don't recur into children. It is the responsibility + // of it's GetLogs implementation to aggregate all logs from children, either by passing them through or by + // filtering specific content out. + // + // Note: We should also allow it to have a resource provider but to elect not to + // handle GetLogs. + return opsProvider.GetLogs(query) + } + var logs []component.LogEntry + for _, child := range ops.resource.children { + childOps := &ResourceOperations{ + resource: child, + config: ops.config, + } + childLogs, err := childOps.GetLogs(query) + if err != nil { + return logs, err + } + logs = append(logs, childLogs...) + } + return logs, nil +} + +// ListMetrics lists metrics for a Resource +func (ops *ResourceOperations) ListMetrics() []component.MetricName { return []component.MetricName{} } -func (ops *componentsOpsProvider) GetMetricStatistics(metric component.MetricRequest) ([]component.MetricDataPoint, error) { +// GetMetricStatistics gets metric statistics for a Resource +func (ops *ResourceOperations) GetMetricStatistics(metric component.MetricRequest) ([]component.MetricDataPoint, error) { return nil, fmt.Errorf("not yet implemented") } - -type typeid struct { - Type tokens.Type - ID resource.ID -} - -func makeIDLookup(source []*resource.State) map[typeid]*resource.State { - ret := make(map[typeid]*resource.State) - for _, state := range source { - tid := typeid{Type: state.Type, ID: state.ID} - ret[tid] = state - } - return ret -} - -func lookup(m map[typeid]*resource.State, t string, id string) *resource.State { - return m[typeid{Type: tokens.Type(t), ID: resource.ID(id)}] -} - -func newPulumiFrameworkURN(resourceURN resource.URN, t tokens.Type, name tokens.QName) resource.URN { - namespace := resourceURN.Namespace() - return resource.NewURN(namespace, resourceURN.Alloc(), t, name) -} diff --git a/pkg/pulumiframework/resources_test.go b/pkg/pulumiframework/resources_test.go index aa133f0a0..3225f0ba4 100644 --- a/pkg/pulumiframework/resources_test.go +++ b/pkg/pulumiframework/resources_test.go @@ -5,92 +5,77 @@ import ( "io/ioutil" "testing" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" "github.com/davecgh/go-spew/spew" "github.com/stretchr/testify/assert" - "github.com/pulumi/pulumi/pkg/component" - "github.com/pulumi/pulumi/pkg/resource" "github.com/pulumi/pulumi/pkg/resource/stack" - "github.com/pulumi/pulumi/pkg/tokens" ) -var sess *session.Session - -func init() { - var err error - config := aws.NewConfig() - config.Region = aws.String("eu-west-1") - sess, err = session.NewSession(config) - if err != nil { - panic("Could not create AWS session") - } -} - -func getPulumiResources(t *testing.T, path string) (component.Components, tokens.QName) { +func getPulumiResources(t *testing.T, path string) *Resource { var checkpoint stack.Checkpoint byts, err := ioutil.ReadFile(path) assert.NoError(t, err) err = json.Unmarshal(byts, &checkpoint) assert.NoError(t, err) - name, _, snapshot, err := stack.DeserializeCheckpoint(&checkpoint) + _, _, snapshot, err := stack.DeserializeCheckpoint(&checkpoint) assert.NoError(t, err) - resources := GetComponents(snapshot.Resources) + resources := NewResource(snapshot.Resources) spew.Dump(resources) - return resources, name + return resources } func TestTodo(t *testing.T) { - components, targetName := getPulumiResources(t, "testdata/todo.json") - assert.Equal(t, 5, len(components)) + components := getPulumiResources(t, "testdata/todo.json") + assert.Equal(t, 4, len(components.children)) - rawURN := resource.NewURN(targetName, "todo", "aws:dynamodb/table:Table:", "todo") - - tableArn := newPulumiFrameworkURN(rawURN, tokens.Type(pulumiTableType), tokens.QName("todo")) - table, ok := components[tableArn] - if !assert.True(t, ok) { + // Table child + table := components.GetChild("cloud:table:Table", "todo") + if !assert.NotNil(t, table) { return } - assert.Equal(t, 1, len(table.Properties)) - assert.Equal(t, "id", table.Properties[resource.PropertyKey("primaryKey")].StringValue()) - assert.Equal(t, 1, len(table.Resources)) - assert.Equal(t, pulumiTableType, table.Type) + assert.Equal(t, 2, len(table.state.Inputs)) + assert.Equal(t, "id", table.state.Inputs["primaryKey"].StringValue()) + assert.Equal(t, 1, len(table.children)) + assert.NotNil(t, table.GetChild("aws:dynamodb/table:Table", "todo")) - endpointArn := newPulumiFrameworkURN(rawURN, tokens.Type(pulumiEndpointType), tokens.QName("todo")) - endpoint, ok := components[endpointArn] - if !assert.True(t, ok) { + // Endpoint child + endpoint := components.GetChild("cloud:http:HttpEndpoint", "todo") + if !assert.NotNil(t, endpoint) { return } - assert.Equal(t, 1, len(endpoint.Properties)) - assert.Equal(t, "https://eupwl7wu4i.execute-api.us-east-2.amazonaws.com/stage/", - endpoint.Properties[resource.PropertyKey("url")].StringValue()) - assert.Equal(t, 3, len(endpoint.Resources)) - assert.Equal(t, pulumiEndpointType, endpoint.Type) + assert.Equal(t, 5, len(endpoint.state.Inputs)) + assert.Equal(t, "https://eupwl7wu4i.execute-api.us-east-2.amazonaws.com/", endpoint.state.Inputs["url"].StringValue()) + assert.Equal(t, 14, len(endpoint.children)) + assert.NotNil(t, endpoint.GetChild("aws:apigateway/restApi:RestApi", "todo")) } func TestCrawler(t *testing.T) { - components, targetName := getPulumiResources(t, "testdata/crawler.json") - assert.Equal(t, 4, len(components)) + components := getPulumiResources(t, "testdata/crawler.json") + assert.Equal(t, 7, len(components.children)) - rawURN := resource.NewURN(targetName, "countdown", "aws:sns/topic:Topic", "countDown") - - countDownArn := newPulumiFrameworkURN(rawURN, tokens.Type(pulumiTopicType), tokens.QName("countDown")) - countDown, ok := components[countDownArn] - if !assert.True(t, ok) { + // Topic child + topic := components.GetChild("cloud:topic:Topic", "countDown") + if !assert.NotNil(t, topic) { return } - assert.Equal(t, 0, len(countDown.Properties)) - assert.Equal(t, 1, len(countDown.Resources)) - assert.Equal(t, pulumiTopicType, countDown.Type) + assert.Equal(t, 0, len(topic.state.Inputs)) + assert.Equal(t, 1, len(topic.children)) + assert.NotNil(t, topic.GetChild("aws:sns/topic:Topic", "countDown")) - heartbeatArn := newPulumiFrameworkURN(rawURN, tokens.Type(pulumiTimerType), tokens.QName("heartbeat")) - heartbeat, ok := components[heartbeatArn] - if !assert.True(t, ok) { + // Timer child + heartbeat := components.GetChild("cloud:timer:Timer", "heartbeat") + if !assert.NotNil(t, heartbeat) { return } - assert.Equal(t, 1, len(heartbeat.Properties)) - assert.Equal(t, "rate(5 minutes)", heartbeat.Properties[resource.PropertyKey("schedule")].StringValue()) - assert.Equal(t, 3, len(heartbeat.Resources)) - assert.Equal(t, pulumiTimerType, heartbeat.Type) + assert.Equal(t, 1, len(heartbeat.state.Inputs)) + assert.Equal(t, "rate(5 minutes)", heartbeat.state.Inputs["scheduleExpression"].StringValue()) + assert.Equal(t, 4, len(heartbeat.children)) + + // Function child of timer + function := heartbeat.GetChild("cloud:function:Function", "heartbeat") + if !assert.NotNil(t, function) { + return + } + assert.Equal(t, 1, len(function.state.Inputs)) + assert.Equal(t, 3, len(function.children)) } From 06f0559849f5e0a59f8d2899a2e983ffe1b4863f Mon Sep 17 00:00:00 2001 From: Luke Hoban Date: Sun, 19 Nov 2017 22:28:49 -0800 Subject: [PATCH 03/18] Refactoring of OperationsProvider code --- cmd/backend.go | 4 +- cmd/backend_cloud.go | 8 +- cmd/backend_local.go | 21 +- cmd/logs.go | 4 +- pkg/component/component.go | 82 ----- .../awsConnection.go | 10 +- pkg/operations/operations.go | 50 +++ pkg/operations/resources.go | 142 ++++++++ pkg/operations/resources_cloud.go | 87 +++++ .../resources_test.go | 2 +- .../testdata/crawler.json | 0 .../testdata/todo.json | 0 pkg/pulumiframework/resources.go | 316 ------------------ 13 files changed, 303 insertions(+), 423 deletions(-) delete mode 100644 pkg/component/component.go rename pkg/{pulumiframework => operations}/awsConnection.go (89%) create mode 100644 pkg/operations/operations.go create mode 100644 pkg/operations/resources.go create mode 100644 pkg/operations/resources_cloud.go rename pkg/{pulumiframework => operations}/resources_test.go (99%) rename pkg/{pulumiframework => operations}/testdata/crawler.json (100%) rename pkg/{pulumiframework => operations}/testdata/todo.json (100%) delete mode 100644 pkg/pulumiframework/resources.go diff --git a/cmd/backend.go b/cmd/backend.go index 155fb7768..18c2db200 100644 --- a/cmd/backend.go +++ b/cmd/backend.go @@ -4,8 +4,8 @@ package cmd import ( "github.com/pkg/errors" - "github.com/pulumi/pulumi/pkg/component" "github.com/pulumi/pulumi/pkg/engine" + "github.com/pulumi/pulumi/pkg/operations" "github.com/pulumi/pulumi/pkg/tokens" ) @@ -31,5 +31,5 @@ type pulumiBackend interface { Update(stackName tokens.QName, debug bool, opts engine.DeployOptions) error Destroy(stackName tokens.QName, debug bool, opts engine.DestroyOptions) error - GetLogs(stackName tokens.QName, query component.LogQuery) ([]component.LogEntry, error) + GetLogs(stackName tokens.QName, query operations.LogQuery) ([]operations.LogEntry, error) } diff --git a/cmd/backend_cloud.go b/cmd/backend_cloud.go index fe0111ff1..42b8a05e1 100644 --- a/cmd/backend_cloud.go +++ b/cmd/backend_cloud.go @@ -14,9 +14,9 @@ import ( "github.com/pkg/errors" "github.com/pulumi/pulumi/pkg/apitype" - "github.com/pulumi/pulumi/pkg/component" "github.com/pulumi/pulumi/pkg/diag/colors" "github.com/pulumi/pulumi/pkg/engine" + "github.com/pulumi/pulumi/pkg/operations" "github.com/pulumi/pulumi/pkg/pack" "github.com/pulumi/pulumi/pkg/resource/config" "github.com/pulumi/pulumi/pkg/util/archive" @@ -225,7 +225,7 @@ func uploadProgram(uploadURL string, printSize bool) error { return nil } -func (b *pulumiCloudPulumiBackend) GetLogs(stackName tokens.QName, query component.LogQuery) ([]component.LogEntry, error) { +func (b *pulumiCloudPulumiBackend) GetLogs(stackName tokens.QName, query operations.LogQuery) ([]operations.LogEntry, error) { // TODO[pulumi/pulumi-service#227]: Relax these conditions once the service can take these arguments. if query.StartTime != nil || query.EndTime != nil || query.Query != nil { return nil, errors.New("not implemented") @@ -242,9 +242,9 @@ func (b *pulumiCloudPulumiBackend) GetLogs(stackName tokens.QName, query compone return nil, err } - logs := make([]component.LogEntry, 0, len(response.Logs)) + logs := make([]operations.LogEntry, 0, len(response.Logs)) for _, entry := range response.Logs { - logs = append(logs, component.LogEntry(entry)) + logs = append(logs, operations.LogEntry(entry)) } return logs, nil diff --git a/cmd/backend_local.go b/cmd/backend_local.go index 3ec69f075..4534e1697 100644 --- a/cmd/backend_local.go +++ b/cmd/backend_local.go @@ -9,15 +9,13 @@ import ( "strconv" "github.com/pkg/errors" - "github.com/pulumi/pulumi/pkg/component" + "github.com/pulumi/pulumi/pkg/encoding" - "github.com/pulumi/pulumi/pkg/pulumiframework" - "github.com/pulumi/pulumi/pkg/util/contract" - "github.com/pulumi/pulumi/pkg/engine" + "github.com/pulumi/pulumi/pkg/operations" "github.com/pulumi/pulumi/pkg/resource/config" - "github.com/pulumi/pulumi/pkg/tokens" + "github.com/pulumi/pulumi/pkg/util/contract" ) type localPulumiBackend struct { @@ -139,7 +137,7 @@ func (b *localPulumiBackend) Destroy(stackName tokens.QName, debug bool, opts en return nil } -func (b *localPulumiBackend) GetLogs(stackName tokens.QName, query component.LogQuery) ([]component.LogEntry, error) { +func (b *localPulumiBackend) GetLogs(stackName tokens.QName, query operations.LogQuery) ([]operations.LogEntry, error) { pulumiEngine, err := b.getEngine(stackName) if err != nil { return nil, err @@ -159,10 +157,13 @@ func (b *localPulumiBackend) GetLogs(stackName tokens.QName, query component.Log contract.Assert(target != nil) // TODO[pulumi/pulumi#54]: replace this with a call into a generalized operations provider. - components := pulumiframework.NewResource(snap.Resources) - ops := pulumiframework.NewResourceOperations(target.Config, components) - - return ops.GetLogs(query) + components := operations.NewResource(snap.Resources) + ops := components.OperationsProvider(target.Config) + logs, err := ops.GetLogs(query) + if logs == nil { + return nil, err + } + return *logs, err } func (b *localPulumiBackend) getEngine(stackName tokens.QName) (engine.Engine, error) { diff --git a/cmd/logs.go b/cmd/logs.go index 9786c85ee..44bf60bbc 100644 --- a/cmd/logs.go +++ b/cmd/logs.go @@ -8,7 +8,7 @@ import ( "github.com/spf13/cobra" - "github.com/pulumi/pulumi/pkg/component" + "github.com/pulumi/pulumi/pkg/operations" "github.com/pulumi/pulumi/pkg/util/cmdutil" ) @@ -29,7 +29,7 @@ func newLogsCmd() *cobra.Command { highestTimeSeen := time.Unix(0, 0) for { - logs, err := backend.GetLogs(stackName, component.LogQuery{}) + logs, err := backend.GetLogs(stackName, operations.LogQuery{}) if err != nil { return err } diff --git a/pkg/component/component.go b/pkg/component/component.go deleted file mode 100644 index c8f7f186a..000000000 --- a/pkg/component/component.go +++ /dev/null @@ -1,82 +0,0 @@ -package component - -import ( - "time" - - "github.com/pulumi/pulumi/pkg/resource" - "github.com/pulumi/pulumi/pkg/tokens" -) - -// Components is a map of URN to component -type Components map[resource.URN]*Component - -// Component is a serializable virtual node in a resource graph -type Component struct { - Type tokens.Type `json:"type"` // this components's full type token. - Properties resource.PropertyMap `json:"props,omitempty"` // the properties of this component. - Resources map[string]*resource.State `json:"resources,omitempty"` // the resources owned by this component. -} - -// LogEntry is a row in the logs for a running compute service -type LogEntry struct { - ID string - Timestamp int64 - Message string -} - -// LogQuery represents the parameters to a log query operation. -// All fields are optional, leaving them off returns all logs. -type LogQuery struct { - StartTime *time.Time - EndTime *time.Time - Query *string -} - -// MetricName is a handle to a metric supported by a Pulumi Framework resources -type MetricName string - -type MetricRequest struct { - Name string -} - -type MetricDataPoint struct { - Timestamp time.Time - Unit string - Sum float64 - SampleCount float64 - Average float64 - Maximum float64 - Minimum float64 -} - -// OperationsProvider is the interface for making operational requests about the -// state of a Component (or Components) -type OperationsProvider interface { - // GetLogs returns logs matching a query - GetLogs(query LogQuery) ([]LogEntry, error) - // ListMetrics returns the list of supported metrics for the requested component type - ListMetrics() []MetricName - // GetMetricStatistics provides metrics data for a given metric request - GetMetricStatistics(metric MetricRequest) ([]MetricDataPoint, error) -} - -// FilterByType returns only components matching the requested type -func (c Components) FilterByType(t tokens.Type) Components { - ret := make(Components) - for k, v := range c { - if v.Type == t { - ret[k] = v - } - } - return ret -} - -// GetByTypeAndName returns the component with the requested type and name -func (c Components) GetByTypeAndName(t tokens.Type, name tokens.QName) *Component { - for k, v := range c { - if k.Type() == t && k.Name() == name { - return v - } - } - return nil -} diff --git a/pkg/pulumiframework/awsConnection.go b/pkg/operations/awsConnection.go similarity index 89% rename from pkg/pulumiframework/awsConnection.go rename to pkg/operations/awsConnection.go index a0533ae4b..c6feb3a6c 100644 --- a/pkg/pulumiframework/awsConnection.go +++ b/pkg/operations/awsConnection.go @@ -1,4 +1,4 @@ -package pulumiframework +package operations import ( "regexp" @@ -8,8 +8,6 @@ import ( "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/golang/glog" - - "github.com/pulumi/pulumi/pkg/component" ) type awsConnection struct { @@ -28,7 +26,7 @@ func newAWSConnection(sess *session.Session) *awsConnection { var logRegexp = regexp.MustCompile(".*Z\t[a-g0-9\\-]*\t(.*)") -func (p *awsConnection) getLogsForLogGroupsConcurrently(names []string, logGroups []string) []component.LogEntry { +func (p *awsConnection) getLogsForLogGroupsConcurrently(names []string, logGroups []string) []LogEntry { // Create a channel for collecting log event outputs ch := make(chan []*cloudwatchlogs.FilteredLogEvent) @@ -47,14 +45,14 @@ func (p *awsConnection) getLogsForLogGroupsConcurrently(names []string, logGroup } // Collect responses on the channel and append logs into combined log array - var logs []component.LogEntry + var logs []LogEntry for i := 0; i < len(logGroups); i++ { logEvents := <-ch for _, event := range logEvents { innerMatches := logRegexp.FindAllStringSubmatch(aws.StringValue(event.Message), -1) glog.V(5).Infof("[getLogs] Inner matches: %v\n", innerMatches) if len(innerMatches) > 0 { - logs = append(logs, component.LogEntry{ + logs = append(logs, LogEntry{ ID: names[i], Message: innerMatches[0][1], Timestamp: aws.Int64Value(event.Timestamp), diff --git a/pkg/operations/operations.go b/pkg/operations/operations.go new file mode 100644 index 000000000..c31ae299f --- /dev/null +++ b/pkg/operations/operations.go @@ -0,0 +1,50 @@ +package operations + +import ( + "time" +) + +// LogEntry is a row in the logs for a running compute service +type LogEntry struct { + ID string + Timestamp int64 + Message string +} + +// LogQuery represents the parameters to a log query operation. +// All fields are optional, leaving them off returns all logs. +type LogQuery struct { + StartTime *time.Time + EndTime *time.Time + Query *string +} + +// MetricName is a handle to a metric supported by a Pulumi Framework resources +type MetricName string + +// MetricRequest is a request for a metric name +type MetricRequest struct { + Name string +} + +// MetricDataPoint is a data point returned from a metric. +type MetricDataPoint struct { + Timestamp time.Time + Unit string + Sum float64 + SampleCount float64 + Average float64 + Maximum float64 + Minimum float64 +} + +// Provider is the interface for making operational requests about the +// state of a Component (or Components) +type Provider interface { + // GetLogs returns logs matching a query + GetLogs(query LogQuery) (*[]LogEntry, error) + // ListMetrics returns the list of supported metrics for the requested component type + ListMetrics() []MetricName + // GetMetricStatistics provides metrics data for a given metric request + GetMetricStatistics(metric MetricRequest) ([]MetricDataPoint, error) +} diff --git a/pkg/operations/resources.go b/pkg/operations/resources.go new file mode 100644 index 000000000..c887ef71e --- /dev/null +++ b/pkg/operations/resources.go @@ -0,0 +1,142 @@ +package operations + +import ( + "fmt" + + "github.com/pulumi/pulumi/pkg/resource" + "github.com/pulumi/pulumi/pkg/tokens" + "github.com/pulumi/pulumi/pkg/util/contract" +) + +// Resource is a tree representation of a resource/component hierarchy +type Resource struct { + ns tokens.QName + alloc tokens.PackageName + state *resource.State + parent *Resource + children map[resource.URN]*Resource +} + +// NewResource constructs a tree representation of a resource/component hierarchy +func NewResource(source []*resource.State) *Resource { + treeNodes := map[resource.URN]*Resource{} + var ns tokens.QName + var alloc tokens.PackageName + + // Walk the ordered resource list and build tree nodes based on child relationships + for _, state := range source { + ns = state.URN.Namespace() + alloc = state.URN.Alloc() + newTree := &Resource{ + ns: ns, + alloc: alloc, + state: state, + parent: nil, + children: map[resource.URN]*Resource{}, + } + for _, childURN := range state.Children { + childTree, ok := treeNodes[childURN] + contract.Assertf(ok, "Expected children to be before parents in resource checkpoint") + childTree.parent = newTree + newTree.children[childTree.state.URN] = childTree + } + treeNodes[state.URN] = newTree + } + + // Create a single root node which is the parent of all unparented nodes + root := &Resource{ + ns: ns, + alloc: alloc, + state: nil, + parent: nil, + children: map[resource.URN]*Resource{}, + } + for _, node := range treeNodes { + if node.parent == nil { + root.children[node.state.URN] = node + node.parent = root + } + } + + // Return the root node + return root +} + +// GetChild find a child with the given type and name or returns `nil`. +func (r *Resource) GetChild(typ string, name string) *Resource { + childURN := resource.NewURN(r.ns, r.alloc, tokens.Type(typ), tokens.QName(name)) + return r.children[childURN] +} + +// OperationsProvider gets an OperationsProvider for this resource. +func (r *Resource) OperationsProvider(config map[tokens.ModuleMember]string) Provider { + return &resourceOperations{ + resource: r, + config: config, + } +} + +func getOperationsProvider(resource *Resource, config map[tokens.ModuleMember]string) (Provider, error) { + if resource == nil || resource.state == nil { + return nil, nil + } + switch resource.state.Type.Package() { + case "cloud": + return CloudOperationsProvider(config, resource) + default: + return nil, nil + } +} + +// ResourceOperations is an OperationsProvider for Resources +type resourceOperations struct { + resource *Resource + config map[tokens.ModuleMember]string +} + +var _ Provider = (*resourceOperations)(nil) + +// GetLogs gets logs for a Resource +func (ops *resourceOperations) GetLogs(query LogQuery) (*[]LogEntry, error) { + opsProvider, err := getOperationsProvider(ops.resource, ops.config) + if err != nil { + return nil, err + } + if opsProvider != nil { + // If this resource has an operations provider - use it and don't recur into children. It is the responsibility + // of it's GetLogs implementation to aggregate all logs from children, either by passing them through or by + // filtering specific content out. + logsResult, err := opsProvider.GetLogs(query) + if err != nil { + return logsResult, err + } + if logsResult != nil { + return logsResult, nil + } + } + var logs []LogEntry + for _, child := range ops.resource.children { + childOps := &resourceOperations{ + resource: child, + config: ops.config, + } + childLogs, err := childOps.GetLogs(query) + if err != nil { + return &logs, err + } + if childLogs != nil { + logs = append(logs, *childLogs...) + } + } + return &logs, nil +} + +// ListMetrics lists metrics for a Resource +func (ops *resourceOperations) ListMetrics() []MetricName { + return []MetricName{} +} + +// GetMetricStatistics gets metric statistics for a Resource +func (ops *resourceOperations) GetMetricStatistics(metric MetricRequest) ([]MetricDataPoint, error) { + return nil, fmt.Errorf("not yet implemented") +} diff --git a/pkg/operations/resources_cloud.go b/pkg/operations/resources_cloud.go new file mode 100644 index 000000000..f15f35730 --- /dev/null +++ b/pkg/operations/resources_cloud.go @@ -0,0 +1,87 @@ +package operations + +import ( + "fmt" + "sort" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/pkg/errors" + + "github.com/pulumi/pulumi/pkg/tokens" + "github.com/pulumi/pulumi/pkg/util/contract" +) + +// CloudOperationsProvider creates an OperationsProvider capable of answering operational queries based on the +// underlying resources of the `@pulumi/cloud-aws` implementation. +func CloudOperationsProvider( + config map[tokens.ModuleMember]string, + component *Resource) (Provider, error) { + + sess, err := createSessionFromConfig(config) + if err != nil { + return nil, errors.Wrap(err, "failed to create AWS session") + } + + prov := &cloudOpsProvider{ + awsConnection: newAWSConnection(sess), + component: component, + } + return prov, nil +} + +// This function grovels through the given configuration bag, extracts the bits necessary to create an AWS session +// (currently just the AWS region to target), and creates and returns the session. If the bag does not contain the +// necessary properties or if session creation fails, this function returns `nil, error`. +func createSessionFromConfig(config map[tokens.ModuleMember]string) (*session.Session, error) { + awsRegion, ok := config[regionKey] + if !ok { + return nil, errors.New("no AWS region found") + } + + awsConfig := aws.NewConfig() + awsConfig.Region = aws.String(awsRegion) + return session.NewSession(awsConfig) +} + +type cloudOpsProvider struct { + awsConnection *awsConnection + component *Resource +} + +var _ Provider = (*cloudOpsProvider)(nil) + +const ( + // AWS config keys + regionKey = "aws:config:region" + + // Pulumi Framework component types + pulumiFunctionType = tokens.Type("cloud:function:Function") +) + +func (ops *cloudOpsProvider) GetLogs(query LogQuery) (*[]LogEntry, error) { + if query.StartTime != nil || query.EndTime != nil || query.Query != nil { + contract.Failf("not yet implemented - StartTime, Endtime, Query") + } + switch ops.component.state.Type { + case pulumiFunctionType: + urn := ops.component.state.URN + serverlessFunction := ops.component.GetChild("aws:serverless:Function", string(urn.Name())) + awsFunction := serverlessFunction.GetChild("aws:lambda/function:Function", string(urn.Name())) + functionName := awsFunction.state.Outputs["name"].StringValue() + logResult := ops.awsConnection.getLogsForLogGroupsConcurrently([]string{functionName}, []string{"/aws/lambda/" + functionName}) + sort.SliceStable(logResult, func(i, j int) bool { return logResult[i].Timestamp < logResult[j].Timestamp }) + return &logResult, nil + default: + // Else this resource kind does not produce any logs. + return nil, nil + } +} + +func (ops *cloudOpsProvider) ListMetrics() []MetricName { + return nil +} + +func (ops *cloudOpsProvider) GetMetricStatistics(metric MetricRequest) ([]MetricDataPoint, error) { + return nil, fmt.Errorf("Not yet implmeneted: GetMetricStatistics") +} diff --git a/pkg/pulumiframework/resources_test.go b/pkg/operations/resources_test.go similarity index 99% rename from pkg/pulumiframework/resources_test.go rename to pkg/operations/resources_test.go index 3225f0ba4..fce91421d 100644 --- a/pkg/pulumiframework/resources_test.go +++ b/pkg/operations/resources_test.go @@ -1,4 +1,4 @@ -package pulumiframework +package operations import ( "encoding/json" diff --git a/pkg/pulumiframework/testdata/crawler.json b/pkg/operations/testdata/crawler.json similarity index 100% rename from pkg/pulumiframework/testdata/crawler.json rename to pkg/operations/testdata/crawler.json diff --git a/pkg/pulumiframework/testdata/todo.json b/pkg/operations/testdata/todo.json similarity index 100% rename from pkg/pulumiframework/testdata/todo.json rename to pkg/operations/testdata/todo.json diff --git a/pkg/pulumiframework/resources.go b/pkg/pulumiframework/resources.go deleted file mode 100644 index 7de88a781..000000000 --- a/pkg/pulumiframework/resources.go +++ /dev/null @@ -1,316 +0,0 @@ -package pulumiframework - -import ( - "fmt" - "sort" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/pkg/errors" - - "github.com/pulumi/pulumi/pkg/component" - "github.com/pulumi/pulumi/pkg/resource" - "github.com/pulumi/pulumi/pkg/tokens" - "github.com/pulumi/pulumi/pkg/util/contract" -) - -// This file contains the implementation of the component.Components interface for the -// AWS implementation of the Pulumi Framework defined in this repo. - -// This function grovels through the given configuration bag, extracts the bits necessary to create an AWS session -// (currently just the AWS region to target), and creates and returns the session. If the bag does not contain the -// necessary properties or if session creation fails, this function returns `nil, error`. -func createSessionFromConfig(config map[tokens.ModuleMember]string) (*session.Session, error) { - awsRegion, ok := config[regionKey] - if !ok { - return nil, errors.New("no AWS region found") - } - - awsConfig := aws.NewConfig() - awsConfig.Region = aws.String(awsRegion) - return session.NewSession(awsConfig) -} - -// OperationsProviderForComponent creates an OperationsProvider capable of answering -// operational queries based on the underlying resources of the AWS Pulumi Framework implementation. -func OperationsProviderForComponent( - config map[tokens.ModuleMember]string, - component *Resource) (component.OperationsProvider, error) { - - sess, err := createSessionFromConfig(config) - if err != nil { - return nil, errors.Wrap(err, "failed to create AWS session") - } - - prov := &componentOpsProvider{ - awsConnection: newAWSConnection(sess), - component: component, - } - return prov, nil -} - -type componentOpsProvider struct { - awsConnection *awsConnection - component *Resource -} - -var _ component.OperationsProvider = (*componentOpsProvider)(nil) - -const ( - // AWS config keys - regionKey = "aws:config:region" - - // Pulumi Framework "virtual" types - pulumiEndpointType = tokens.Type("pulumi:framework:Endpoint") - pulumiTopicType = tokens.Type("pulumi:framework:Topic") - pulumiTimerType = tokens.Type("pulumi:framework:Timer") - pulumiTableType = tokens.Type("pulumi:framework:Table") - pulumiFunctionType = tokens.Type("pulumi:framework:Function") - - // Operational metric names for Pulumi Framework components - functionInvocations component.MetricName = "Invocation" - functionDuration component.MetricName = "Duration" - functionErrors component.MetricName = "Errors" - functionThrottles component.MetricName = "Throttles" - endpoint4xxError component.MetricName = "4xxerror" - endpoint5xxError component.MetricName = "5xxerror" - endpointCount component.MetricName = "count" - endpointLatency component.MetricName = "latency" - topicPulished component.MetricName = "published" - topicPublishSize component.MetricName = "publishsize" - topicDelivered component.MetricName = "delivered" - topicFailed component.MetricName = "failed" - timerInvocations component.MetricName = "invocations" - timerFailedInvocations component.MetricName = "failedinvocations" - tableConsumedReadCapacity component.MetricName = "consumedreadcapacity" - tableConsumedWriteCapacity component.MetricName = "consumerwritecapacity" - tableThrottles component.MetricName = "throttles" -) - -func (ops *componentOpsProvider) GetLogs(query component.LogQuery) ([]component.LogEntry, error) { - if query.StartTime != nil || query.EndTime != nil || query.Query != nil { - contract.Failf("not yet implemented - StartTime, Endtime, Query") - } - fmt.Printf("[GetLogs] type = %v", ops.component.state.Type) - switch ops.component.state.Type { - case pulumiFunctionType: - urn := ops.component.state.URN - serverlessFunction := ops.component.GetChild("aws:serverless:Function", string(urn.Name())) - awsFunction := serverlessFunction.GetChild("aws:lambda/function:Function", string(urn.Name())) - functionName := awsFunction.state.Outputs["name"].StringValue() - logResult := ops.awsConnection.getLogsForLogGroupsConcurrently([]string{functionName}, []string{"/aws/lambda/" + functionName}) - sort.SliceStable(logResult, func(i, j int) bool { return logResult[i].Timestamp < logResult[j].Timestamp }) - return logResult, nil - default: - return nil, fmt.Errorf("Logs not supported for component type: %s", ops.component.state.Type) - } -} - -func (ops *componentOpsProvider) ListMetrics() []component.MetricName { - switch ops.component.state.Type { - case pulumiFunctionType: - // Don't include these which are internal implementation metrics: DLQ delivery errors - return []component.MetricName{functionInvocations, functionDuration, functionErrors, functionThrottles} - case pulumiEndpointType: - return []component.MetricName{endpoint4xxError, endpoint5xxError, endpointCount, endpointLatency} - case pulumiTopicType: - return []component.MetricName{topicPulished, topicPublishSize, topicDelivered, topicFailed} - case pulumiTimerType: - return []component.MetricName{timerInvocations, timerFailedInvocations} - case pulumiTableType: - // Internal only: "provisionedreadcapacity", "provisionedwritecapacity", "usererrors", "timetolivedeleted", - // "systemerrors", "succesfulrequestlatency", "returnedrecordscount", "returenditemcount", "returnedbytes", - // "onlineindex*", "conditionalcheckfailed" - return []component.MetricName{tableConsumedReadCapacity, tableConsumedWriteCapacity, tableThrottles} - default: - contract.Failf("invalid component type") - return nil - } -} - -func (ops *componentOpsProvider) GetMetricStatistics(metric component.MetricRequest) ([]component.MetricDataPoint, error) { - - return nil, fmt.Errorf("Not yet implmeneted: GetMetricStatistics") - - // var dimensions []*cloudwatch.Dimension - // var namespace string - - // switch ops.component.state.Type { - // case pulumiFunctionType: - // dimensions = append(dimensions, &cloudwatch.Dimension{ - // Name: aws.String("FunctionName"), - // Value: aws.String(string(ops.component.Resources["function"].ID)), - // }) - // namespace = "AWS/Lambda" - // case pulumiEndpointType: - // contract.Failf("not yet implemented") - // case pulumiTopicType: - // contract.Failf("not yet implemented") - // case pulumiTimerType: - // contract.Failf("not yet implemented") - // case pulumiTableType: - // contract.Failf("not yet implemented") - // default: - // contract.Failf("invalid component type") - // } - - // resp, err := ops.awsConnection.metricSvc.GetMetricStatistics(&cloudwatch.GetMetricStatisticsInput{ - // Namespace: aws.String(namespace), - // MetricName: aws.String(metric.Name), - // Dimensions: dimensions, - // Statistics: []*string{ - // aws.String("Sum"), aws.String("SampleCount"), aws.String("Average"), - // aws.String("Maximum"), aws.String("Minimum"), - // }, - // }) - // if err != nil { - // return nil, err - // } - - // var metrics []component.MetricDataPoint - // for _, datapoint := range resp.Datapoints { - // metrics = append(metrics, component.MetricDataPoint{ - // Timestamp: aws.TimeValue(datapoint.Timestamp), - // Unit: aws.StringValue(datapoint.Unit), - // Sum: aws.Float64Value(datapoint.Sum), - // SampleCount: aws.Float64Value(datapoint.SampleCount), - // Average: aws.Float64Value(datapoint.Average), - // Maximum: aws.Float64Value(datapoint.Maximum), - // Minimum: aws.Float64Value(datapoint.Minimum), - // }) - // } - // return metrics, nil -} - -// Resource is a tree representation of a resource/component hierarchy -type Resource struct { - ns tokens.QName - alloc tokens.PackageName - state *resource.State - parent *Resource - children map[resource.URN]*Resource -} - -// NewResource constructs a tree representation of a resource/component hierarchy -func NewResource(source []*resource.State) *Resource { - treeNodes := map[resource.URN]*Resource{} - var ns tokens.QName - var alloc tokens.PackageName - - // Walk the ordered resource list and build tree nodes based on child relationships - for _, state := range source { - ns = state.URN.Namespace() - alloc = state.URN.Alloc() - newTree := &Resource{ - ns: ns, - alloc: alloc, - state: state, - parent: nil, - children: map[resource.URN]*Resource{}, - } - for _, childURN := range state.Children { - childTree, ok := treeNodes[childURN] - contract.Assertf(ok, "Expected children to be before parents in resource checkpoint") - childTree.parent = newTree - newTree.children[childTree.state.URN] = childTree - } - treeNodes[state.URN] = newTree - } - - // Create a single root node which is the parent of all unparented nodes - root := &Resource{ - ns: ns, - alloc: alloc, - state: nil, - parent: nil, - children: map[resource.URN]*Resource{}, - } - for _, node := range treeNodes { - if node.parent == nil { - root.children[node.state.URN] = node - node.parent = root - } - } - - // Return the root node - return root -} - -// GetChild find a child with the given type and name or returns `nil`. -func (r *Resource) GetChild(typ string, name string) *Resource { - childURN := resource.NewURN(r.ns, r.alloc, tokens.Type(typ), tokens.QName(name)) - return r.children[childURN] -} - -func getOperationsProvider(resource *Resource, config map[tokens.ModuleMember]string) (component.OperationsProvider, error) { - if resource == nil || resource.state == nil { - return nil, nil - } - switch resource.state.Type { - case "cloud:function:Function": - return cloudFunctionOperationsProvider(resource, config) - default: - return nil, nil - } -} - -func cloudFunctionOperationsProvider(resource *Resource, config map[tokens.ModuleMember]string) (component.OperationsProvider, error) { - return OperationsProviderForComponent(config, resource) -} - -// ResourceOperations is an OperationsProvider for Resources -type ResourceOperations struct { - resource *Resource - config map[tokens.ModuleMember]string -} - -var _ component.OperationsProvider = (*ResourceOperations)(nil) - -// NewResourceOperations constructs an OperationsProvider for a resource and configuration options -func NewResourceOperations(config map[tokens.ModuleMember]string, resource *Resource) *ResourceOperations { - return &ResourceOperations{ - resource: resource, - config: config, - } -} - -// GetLogs gets logs for a Resource -func (ops *ResourceOperations) GetLogs(query component.LogQuery) ([]component.LogEntry, error) { - fmt.Printf("[ResourceOperations.GetLogs]: %v\n", query) - opsProvider, err := getOperationsProvider(ops.resource, ops.config) - if err != nil { - return nil, err - } - if opsProvider != nil { - // If this resource has an operations provider - use it and don't recur into children. It is the responsibility - // of it's GetLogs implementation to aggregate all logs from children, either by passing them through or by - // filtering specific content out. - // - // Note: We should also allow it to have a resource provider but to elect not to - // handle GetLogs. - return opsProvider.GetLogs(query) - } - var logs []component.LogEntry - for _, child := range ops.resource.children { - childOps := &ResourceOperations{ - resource: child, - config: ops.config, - } - childLogs, err := childOps.GetLogs(query) - if err != nil { - return logs, err - } - logs = append(logs, childLogs...) - } - return logs, nil -} - -// ListMetrics lists metrics for a Resource -func (ops *ResourceOperations) ListMetrics() []component.MetricName { - return []component.MetricName{} -} - -// GetMetricStatistics gets metric statistics for a Resource -func (ops *ResourceOperations) GetMetricStatistics(metric component.MetricRequest) ([]component.MetricDataPoint, error) { - return nil, fmt.Errorf("not yet implemented") -} From 329d70fba276f2d9363a74c97f15fbdd95e62a60 Mon Sep 17 00:00:00 2001 From: Luke Hoban Date: Sun, 19 Nov 2017 22:31:30 -0800 Subject: [PATCH 04/18] Rename cloud operations file --- pkg/operations/{resources_cloud.go => operations_cloud.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename pkg/operations/{resources_cloud.go => operations_cloud.go} (100%) diff --git a/pkg/operations/resources_cloud.go b/pkg/operations/operations_cloud.go similarity index 100% rename from pkg/operations/resources_cloud.go rename to pkg/operations/operations_cloud.go From 16ccc67654a9fb0147e67406ea41c0b66943c900 Mon Sep 17 00:00:00 2001 From: Luke Hoban Date: Mon, 20 Nov 2017 13:31:20 -0800 Subject: [PATCH 05/18] Incorporate logs from logCollector Also allow AWS Lambda Function logs to be projected in raw form, but filtered/formatted by higher level layers. --- pkg/operations/awsConnection.go | 18 +-- ...{operations_cloud.go => operations_aws.go} | 28 ++-- pkg/operations/operations_cloud_aws.go | 139 ++++++++++++++++++ pkg/operations/operations_cloud_aws_test.go | 19 +++ pkg/operations/resources.go | 44 ++++-- 5 files changed, 205 insertions(+), 43 deletions(-) rename pkg/operations/{operations_cloud.go => operations_aws.go} (68%) create mode 100644 pkg/operations/operations_cloud_aws.go create mode 100644 pkg/operations/operations_cloud_aws_test.go diff --git a/pkg/operations/awsConnection.go b/pkg/operations/awsConnection.go index c6feb3a6c..11eb341c2 100644 --- a/pkg/operations/awsConnection.go +++ b/pkg/operations/awsConnection.go @@ -1,8 +1,6 @@ package operations import ( - "regexp" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/cloudwatch" @@ -24,8 +22,6 @@ func newAWSConnection(sess *session.Session) *awsConnection { } } -var logRegexp = regexp.MustCompile(".*Z\t[a-g0-9\\-]*\t(.*)") - func (p *awsConnection) getLogsForLogGroupsConcurrently(names []string, logGroups []string) []LogEntry { // Create a channel for collecting log event outputs @@ -49,15 +45,11 @@ func (p *awsConnection) getLogsForLogGroupsConcurrently(names []string, logGroup for i := 0; i < len(logGroups); i++ { logEvents := <-ch for _, event := range logEvents { - innerMatches := logRegexp.FindAllStringSubmatch(aws.StringValue(event.Message), -1) - glog.V(5).Infof("[getLogs] Inner matches: %v\n", innerMatches) - if len(innerMatches) > 0 { - logs = append(logs, LogEntry{ - ID: names[i], - Message: innerMatches[0][1], - Timestamp: aws.Int64Value(event.Timestamp), - }) - } + logs = append(logs, LogEntry{ + ID: names[i], + Message: aws.StringValue(event.Message), + Timestamp: aws.Int64Value(event.Timestamp), + }) } } diff --git a/pkg/operations/operations_cloud.go b/pkg/operations/operations_aws.go similarity index 68% rename from pkg/operations/operations_cloud.go rename to pkg/operations/operations_aws.go index f15f35730..6b05e3109 100644 --- a/pkg/operations/operations_cloud.go +++ b/pkg/operations/operations_aws.go @@ -7,14 +7,13 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/pkg/errors" - "github.com/pulumi/pulumi/pkg/tokens" "github.com/pulumi/pulumi/pkg/util/contract" ) -// CloudOperationsProvider creates an OperationsProvider capable of answering operational queries based on the +// AWSOperationsProvider creates an OperationsProvider capable of answering operational queries based on the // underlying resources of the `@pulumi/cloud-aws` implementation. -func CloudOperationsProvider( +func AWSOperationsProvider( config map[tokens.ModuleMember]string, component *Resource) (Provider, error) { @@ -23,7 +22,7 @@ func CloudOperationsProvider( return nil, errors.Wrap(err, "failed to create AWS session") } - prov := &cloudOpsProvider{ + prov := &awsOpsProvider{ awsConnection: newAWSConnection(sess), component: component, } @@ -44,31 +43,28 @@ func createSessionFromConfig(config map[tokens.ModuleMember]string) (*session.Se return session.NewSession(awsConfig) } -type cloudOpsProvider struct { +type awsOpsProvider struct { awsConnection *awsConnection component *Resource } -var _ Provider = (*cloudOpsProvider)(nil) +var _ Provider = (*awsOpsProvider)(nil) const ( // AWS config keys regionKey = "aws:config:region" - // Pulumi Framework component types - pulumiFunctionType = tokens.Type("cloud:function:Function") + // AWS resource types + awsFunctionType = tokens.Type("aws:lambda/function:Function") ) -func (ops *cloudOpsProvider) GetLogs(query LogQuery) (*[]LogEntry, error) { +func (ops *awsOpsProvider) GetLogs(query LogQuery) (*[]LogEntry, error) { if query.StartTime != nil || query.EndTime != nil || query.Query != nil { contract.Failf("not yet implemented - StartTime, Endtime, Query") } switch ops.component.state.Type { - case pulumiFunctionType: - urn := ops.component.state.URN - serverlessFunction := ops.component.GetChild("aws:serverless:Function", string(urn.Name())) - awsFunction := serverlessFunction.GetChild("aws:lambda/function:Function", string(urn.Name())) - functionName := awsFunction.state.Outputs["name"].StringValue() + case awsFunctionType: + functionName := ops.component.state.Outputs["name"].StringValue() logResult := ops.awsConnection.getLogsForLogGroupsConcurrently([]string{functionName}, []string{"/aws/lambda/" + functionName}) sort.SliceStable(logResult, func(i, j int) bool { return logResult[i].Timestamp < logResult[j].Timestamp }) return &logResult, nil @@ -78,10 +74,10 @@ func (ops *cloudOpsProvider) GetLogs(query LogQuery) (*[]LogEntry, error) { } } -func (ops *cloudOpsProvider) ListMetrics() []MetricName { +func (ops *awsOpsProvider) ListMetrics() []MetricName { return nil } -func (ops *cloudOpsProvider) GetMetricStatistics(metric MetricRequest) ([]MetricDataPoint, error) { +func (ops *awsOpsProvider) GetMetricStatistics(metric MetricRequest) ([]MetricDataPoint, error) { return nil, fmt.Errorf("Not yet implmeneted: GetMetricStatistics") } diff --git a/pkg/operations/operations_cloud_aws.go b/pkg/operations/operations_cloud_aws.go new file mode 100644 index 000000000..30f5c0e55 --- /dev/null +++ b/pkg/operations/operations_cloud_aws.go @@ -0,0 +1,139 @@ +package operations + +import ( + "encoding/json" + "fmt" + "regexp" + "time" + + "github.com/pulumi/pulumi/pkg/tokens" + "github.com/pulumi/pulumi/pkg/util/contract" +) + +// CloudOperationsProvider creates an OperationsProvider capable of answering operational queries based on the +// underlying resources of the `@pulumi/cloud-aws` implementation. +func CloudOperationsProvider(config map[tokens.ModuleMember]string, component *Resource) (Provider, error) { + prov := &cloudOpsProvider{ + config: config, + component: component, + } + return prov, nil +} + +type cloudOpsProvider struct { + config map[tokens.ModuleMember]string + component *Resource +} + +var _ Provider = (*cloudOpsProvider)(nil) + +const ( + // Pulumi Framework component types + pulumiFunctionType = tokens.Type("cloud:function:Function") + logCollectorType = tokens.Type("cloud:logCollector:LogCollector") + + // AWS resource types + serverlessFunctionType = "aws:serverless:Function" +) + +func (ops *cloudOpsProvider) GetLogs(query LogQuery) (*[]LogEntry, error) { + if query.StartTime != nil || query.EndTime != nil || query.Query != nil { + contract.Failf("not yet implemented - StartTime, Endtime, Query") + } + switch ops.component.state.Type { + case pulumiFunctionType: + urn := ops.component.state.URN + serverlessFunction := ops.component.GetChild(serverlessFunctionType, string(urn.Name())) + rawLogs, err := serverlessFunction.OperationsProvider(ops.config).GetLogs(query) + if err != nil { + return nil, err + } + contract.Assertf(rawLogs != nil, "expect aws:serverless:Function to provide logs") + var logs []LogEntry + for _, rawLog := range *rawLogs { + extractedLog := extractLambdaLogMessage(rawLog.Message) + if extractedLog != nil { + logs = append(logs, *extractedLog) + } + } + return &logs, nil + case logCollectorType: + urn := ops.component.state.URN + serverlessFunction := ops.component.GetChild(serverlessFunctionType, string(urn.Name())) + rawLogs, err := serverlessFunction.OperationsProvider(ops.config).GetLogs(query) + if err != nil { + return nil, err + } + contract.Assertf(rawLogs != nil, "expect aws:serverless:Function to provide logs") + // Extract out the encoded and batched logs + var logs []LogEntry + for _, rawLog := range *rawLogs { + var logMessage encodedLogMessage + extractedLog := extractLambdaLogMessage(rawLog.Message) + if extractedLog != nil { + err := json.Unmarshal([]byte(extractedLog.Message), &logMessage) + if err != nil { + return nil, err + } + for _, logEvent := range logMessage.LogEvents { + if extracted := extractLambdaLogMessage(logEvent.Message); extracted != nil { + logs = append(logs, *extracted) + } + } + } + } + return &logs, nil + default: + // Else this resource kind does not produce any logs. + return nil, nil + } +} + +func (ops *cloudOpsProvider) ListMetrics() []MetricName { + return nil +} + +func (ops *cloudOpsProvider) GetMetricStatistics(metric MetricRequest) ([]MetricDataPoint, error) { + return nil, fmt.Errorf("Not yet implmeneted: GetMetricStatistics") +} + +type encodedLogEvent struct { + ID string `json:"id"` + Timestamp int64 `json:"timestamp"` + Message string `json:"message"` +} + +type encodedLogMessage struct { + MessageType string `json:"messageType"` + Owner string `json:"owner"` + LogGroup string `json:"logGroup"` + LogStream string `json:"logStream"` + SubscriptionFilters []string `json:"subscriptionFilters"` + LogEvents []encodedLogEvent `json:"logEvents"` +} + +var logRegexp = regexp.MustCompile("(.*Z)\t[a-g0-9\\-]*\t(.*)") + +// extractLambdaLogMessage extracts out only the log messages associated with user logs, skipping Lambda-specific metadata. +// In particular, only the second line below is extracter, and it is extracted with the recorded timestamp. +// +// ``` +// START RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 Version: $LATEST +// 2017-11-17T20:30:27.736Z 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 GET /todo +// END RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 +// REPORT RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 Duration: 222.92 ms Billed Duration: 300 ms Memory Size: 128 MB Max Memory Used: 33 MB +// ``` +func extractLambdaLogMessage(message string) *LogEntry { + innerMatches := logRegexp.FindAllStringSubmatch(message, -1) + if len(innerMatches) > 0 { + contract.Assertf(len(innerMatches[0]) >= 3, "expected log regexp to always produce at least two capture groups") + timestamp, err := time.Parse(time.RFC3339Nano, innerMatches[0][1]) + contract.Assertf(err == nil, "expected to be able to parse timestamp") + return &LogEntry{ + ID: "hmm", + Message: innerMatches[0][2], + Timestamp: timestamp.UnixNano() / 1000000, // milliseconds + } + } + return nil +} diff --git a/pkg/operations/operations_cloud_aws_test.go b/pkg/operations/operations_cloud_aws_test.go new file mode 100644 index 000000000..9e64c093e --- /dev/null +++ b/pkg/operations/operations_cloud_aws_test.go @@ -0,0 +1,19 @@ +package operations + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_extractLambdaLogMessage(t *testing.T) { + res := extractLambdaLogMessage("START RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 Version: $LATEST") + assert.Nil(t, res) + res = extractLambdaLogMessage("2017-11-17T20:30:27.736Z 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 GET /todo") + assert.NotNil(t, res) + assert.Equal(t, "GET /todo", res.Message) + res = extractLambdaLogMessage("END RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723") + assert.Nil(t, res) + res = extractLambdaLogMessage("REPORT RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 Duration: 222.92 ms Billed Duration: 300 ms Memory Size: 128 MB Max Memory Used: 33 MB") + assert.Nil(t, res) +} diff --git a/pkg/operations/resources.go b/pkg/operations/resources.go index c887ef71e..a8b8583c6 100644 --- a/pkg/operations/resources.go +++ b/pkg/operations/resources.go @@ -2,6 +2,7 @@ package operations import ( "fmt" + "sort" "github.com/pulumi/pulumi/pkg/resource" "github.com/pulumi/pulumi/pkg/tokens" @@ -76,18 +77,6 @@ func (r *Resource) OperationsProvider(config map[tokens.ModuleMember]string) Pro } } -func getOperationsProvider(resource *Resource, config map[tokens.ModuleMember]string) (Provider, error) { - if resource == nil || resource.state == nil { - return nil, nil - } - switch resource.state.Type.Package() { - case "cloud": - return CloudOperationsProvider(config, resource) - default: - return nil, nil - } -} - // ResourceOperations is an OperationsProvider for Resources type resourceOperations struct { resource *Resource @@ -98,7 +87,7 @@ var _ Provider = (*resourceOperations)(nil) // GetLogs gets logs for a Resource func (ops *resourceOperations) GetLogs(query LogQuery) (*[]LogEntry, error) { - opsProvider, err := getOperationsProvider(ops.resource, ops.config) + opsProvider, err := ops.getOperationsProvider() if err != nil { return nil, err } @@ -120,6 +109,7 @@ func (ops *resourceOperations) GetLogs(query LogQuery) (*[]LogEntry, error) { resource: child, config: ops.config, } + // TODO: Parallelize these calls to child GetLogs childLogs, err := childOps.GetLogs(query) if err != nil { return &logs, err @@ -128,7 +118,19 @@ func (ops *resourceOperations) GetLogs(query LogQuery) (*[]LogEntry, error) { logs = append(logs, *childLogs...) } } - return &logs, nil + // Sort + sort.SliceStable(logs, func(i, j int) bool { return logs[i].Timestamp < logs[j].Timestamp }) + // Remove duplicates + var retLogs []LogEntry + var lastLog LogEntry + for _, log := range logs { + if log.Message == lastLog.Message && log.Timestamp == lastLog.Timestamp { + continue + } + lastLog = log + retLogs = append(retLogs, log) + } + return &retLogs, nil } // ListMetrics lists metrics for a Resource @@ -140,3 +142,17 @@ func (ops *resourceOperations) ListMetrics() []MetricName { func (ops *resourceOperations) GetMetricStatistics(metric MetricRequest) ([]MetricDataPoint, error) { return nil, fmt.Errorf("not yet implemented") } + +func (ops *resourceOperations) getOperationsProvider() (Provider, error) { + if ops.resource == nil || ops.resource.state == nil { + return nil, nil + } + switch ops.resource.state.Type.Package() { + case "cloud": + return CloudOperationsProvider(ops.config, ops.resource) + case "aws": + return AWSOperationsProvider(ops.config, ops.resource) + default: + return nil, nil + } +} From 7555eed3d5aec00e91520f722eba782410427c6b Mon Sep 17 00:00:00 2001 From: Luke Hoban Date: Mon, 20 Nov 2017 13:44:23 -0800 Subject: [PATCH 06/18] Consolidate AWS operations --- pkg/operations/awsConnection.go | 57 ------------------- pkg/operations/operations_aws.go | 94 +++++++++++++++++++++++++------- 2 files changed, 75 insertions(+), 76 deletions(-) delete mode 100644 pkg/operations/awsConnection.go diff --git a/pkg/operations/awsConnection.go b/pkg/operations/awsConnection.go deleted file mode 100644 index 11eb341c2..000000000 --- a/pkg/operations/awsConnection.go +++ /dev/null @@ -1,57 +0,0 @@ -package operations - -import ( - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/cloudwatch" - "github.com/aws/aws-sdk-go/service/cloudwatchlogs" - "github.com/golang/glog" -) - -type awsConnection struct { - sess *session.Session - logSvc *cloudwatchlogs.CloudWatchLogs - metricSvc *cloudwatch.CloudWatch -} - -func newAWSConnection(sess *session.Session) *awsConnection { - return &awsConnection{ - sess: sess, - logSvc: cloudwatchlogs.New(sess), - metricSvc: cloudwatch.New(sess), - } -} - -func (p *awsConnection) getLogsForLogGroupsConcurrently(names []string, logGroups []string) []LogEntry { - - // Create a channel for collecting log event outputs - ch := make(chan []*cloudwatchlogs.FilteredLogEvent) - - // Run FilterLogEvents for each log group in parallel - for _, logGroup := range logGroups { - go func(logGroup string) { - resp, err := p.logSvc.FilterLogEvents(&cloudwatchlogs.FilterLogEventsInput{ - LogGroupName: aws.String(logGroup), - }) - if err != nil { - glog.V(5).Infof("[getLogs] Error getting logs: %v %v\n", logGroup, err) - } - ch <- resp.Events - }(logGroup) - } - - // Collect responses on the channel and append logs into combined log array - var logs []LogEntry - for i := 0; i < len(logGroups); i++ { - logEvents := <-ch - for _, event := range logEvents { - logs = append(logs, LogEntry{ - ID: names[i], - Message: aws.StringValue(event.Message), - Timestamp: aws.Int64Value(event.Timestamp), - }) - } - } - - return logs -} diff --git a/pkg/operations/operations_aws.go b/pkg/operations/operations_aws.go index 6b05e3109..5630a9329 100644 --- a/pkg/operations/operations_aws.go +++ b/pkg/operations/operations_aws.go @@ -6,7 +6,11 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/cloudwatch" + "github.com/aws/aws-sdk-go/service/cloudwatchlogs" + "github.com/golang/glog" "github.com/pkg/errors" + "github.com/pulumi/pulumi/pkg/tokens" "github.com/pulumi/pulumi/pkg/util/contract" ) @@ -17,30 +21,21 @@ func AWSOperationsProvider( config map[tokens.ModuleMember]string, component *Resource) (Provider, error) { - sess, err := createSessionFromConfig(config) - if err != nil { - return nil, errors.Wrap(err, "failed to create AWS session") - } - - prov := &awsOpsProvider{ - awsConnection: newAWSConnection(sess), - component: component, - } - return prov, nil -} - -// This function grovels through the given configuration bag, extracts the bits necessary to create an AWS session -// (currently just the AWS region to target), and creates and returns the session. If the bag does not contain the -// necessary properties or if session creation fails, this function returns `nil, error`. -func createSessionFromConfig(config map[tokens.ModuleMember]string) (*session.Session, error) { awsRegion, ok := config[regionKey] if !ok { return nil, errors.New("no AWS region found") } - awsConfig := aws.NewConfig() - awsConfig.Region = aws.String(awsRegion) - return session.NewSession(awsConfig) + awsConnection, err := getAWSConnection(awsRegion) + if err != nil { + return nil, err + } + + prov := &awsOpsProvider{ + awsConnection: awsConnection, + component: component, + } + return prov, nil } type awsOpsProvider struct { @@ -81,3 +76,64 @@ func (ops *awsOpsProvider) ListMetrics() []MetricName { func (ops *awsOpsProvider) GetMetricStatistics(metric MetricRequest) ([]MetricDataPoint, error) { return nil, fmt.Errorf("Not yet implmeneted: GetMetricStatistics") } + +type awsConnection struct { + sess *session.Session + logSvc *cloudwatchlogs.CloudWatchLogs + metricSvc *cloudwatch.CloudWatch +} + +var awsConnectionCache = map[string]*awsConnection{} + +func getAWSConnection(awsRegion string) (*awsConnection, error) { + connection, ok := awsConnectionCache[awsRegion] + if !ok { + awsConfig := aws.NewConfig() + awsConfig.Region = aws.String(awsRegion) + sess, err := session.NewSession(awsConfig) + if err != nil { + return nil, errors.Wrap(err, "failed to create AWS session") + } + connection = &awsConnection{ + sess: sess, + logSvc: cloudwatchlogs.New(sess), + metricSvc: cloudwatch.New(sess), + } + awsConnectionCache[awsRegion] = connection + } + return connection, nil +} + +func (p *awsConnection) getLogsForLogGroupsConcurrently(names []string, logGroups []string) []LogEntry { + + // Create a channel for collecting log event outputs + ch := make(chan []*cloudwatchlogs.FilteredLogEvent) + + // Run FilterLogEvents for each log group in parallel + for _, logGroup := range logGroups { + go func(logGroup string) { + resp, err := p.logSvc.FilterLogEvents(&cloudwatchlogs.FilterLogEventsInput{ + LogGroupName: aws.String(logGroup), + }) + if err != nil { + glog.V(5).Infof("[getLogs] Error getting logs: %v %v\n", logGroup, err) + } + ch <- resp.Events + }(logGroup) + } + + // Collect responses on the channel and append logs into combined log array + var logs []LogEntry + for i := 0; i < len(logGroups); i++ { + logEvents := <-ch + for _, event := range logEvents { + logs = append(logs, LogEntry{ + ID: names[i], + Message: aws.StringValue(event.Message), + Timestamp: aws.Int64Value(event.Timestamp), + }) + } + } + + return logs +} From af34e5cb8338aafc1ff5f578a64fb8b4a98a20d6 Mon Sep 17 00:00:00 2001 From: Luke Hoban Date: Mon, 20 Nov 2017 14:55:09 -0800 Subject: [PATCH 07/18] Add container logging support Also fix bug in de-duping. --- cmd/logs.go | 3 +- pkg/operations/operations_aws.go | 6 +++ pkg/operations/operations_cloud_aws.go | 51 +++++++++++++++++++------- pkg/operations/resources.go | 19 ++++++++-- 4 files changed, 60 insertions(+), 19 deletions(-) diff --git a/cmd/logs.go b/cmd/logs.go index 44bf60bbc..2c81813d9 100644 --- a/cmd/logs.go +++ b/cmd/logs.go @@ -37,9 +37,8 @@ func newLogsCmd() *cobra.Command { for _, logEntry := range logs { eventTime := time.Unix(0, logEntry.Timestamp*1000000) if eventTime.After(sinceTime) { - fmt.Printf("[%v] %v\n", eventTime, logEntry.Message) + fmt.Printf("%29v[%25v] %v\n", eventTime.Format(time.RFC3339Nano), logEntry.ID, logEntry.Message) } - if eventTime.After(highestTimeSeen) { highestTimeSeen = eventTime } diff --git a/pkg/operations/operations_aws.go b/pkg/operations/operations_aws.go index 5630a9329..ff6627485 100644 --- a/pkg/operations/operations_aws.go +++ b/pkg/operations/operations_aws.go @@ -51,6 +51,7 @@ const ( // AWS resource types awsFunctionType = tokens.Type("aws:lambda/function:Function") + awsLogGroupType = tokens.Type("aws:cloudwatch/logGroup:LogGroup") ) func (ops *awsOpsProvider) GetLogs(query LogQuery) (*[]LogEntry, error) { @@ -63,6 +64,11 @@ func (ops *awsOpsProvider) GetLogs(query LogQuery) (*[]LogEntry, error) { logResult := ops.awsConnection.getLogsForLogGroupsConcurrently([]string{functionName}, []string{"/aws/lambda/" + functionName}) sort.SliceStable(logResult, func(i, j int) bool { return logResult[i].Timestamp < logResult[j].Timestamp }) return &logResult, nil + case awsLogGroupType: + name := ops.component.state.Outputs["name"].StringValue() + logResult := ops.awsConnection.getLogsForLogGroupsConcurrently([]string{name}, []string{name}) + sort.SliceStable(logResult, func(i, j int) bool { return logResult[i].Timestamp < logResult[j].Timestamp }) + return &logResult, nil default: // Else this resource kind does not produce any logs. return nil, nil diff --git a/pkg/operations/operations_cloud_aws.go b/pkg/operations/operations_cloud_aws.go index 30f5c0e55..e86dc6b17 100644 --- a/pkg/operations/operations_cloud_aws.go +++ b/pkg/operations/operations_cloud_aws.go @@ -29,11 +29,14 @@ var _ Provider = (*cloudOpsProvider)(nil) const ( // Pulumi Framework component types - pulumiFunctionType = tokens.Type("cloud:function:Function") - logCollectorType = tokens.Type("cloud:logCollector:LogCollector") + cloudFunctionType = tokens.Type("cloud:function:Function") + cloudLogCollectorType = tokens.Type("cloud:logCollector:LogCollector") + cloudServiceType = tokens.Type("cloud:service:Service") + cloudTaskType = tokens.Type("cloud:task:Task") // AWS resource types - serverlessFunctionType = "aws:serverless:Function" + awsServerlessFunctionTypeName = "aws:serverless:Function" + awsLogGroupTypeName = "aws:cloudwatch/logGroup:LogGroup" ) func (ops *cloudOpsProvider) GetLogs(query LogQuery) (*[]LogEntry, error) { @@ -41,9 +44,11 @@ func (ops *cloudOpsProvider) GetLogs(query LogQuery) (*[]LogEntry, error) { contract.Failf("not yet implemented - StartTime, Endtime, Query") } switch ops.component.state.Type { - case pulumiFunctionType: - urn := ops.component.state.URN - serverlessFunction := ops.component.GetChild(serverlessFunctionType, string(urn.Name())) + case cloudFunctionType: + // We get the aws:serverless:Function child and request it's logs, parsing out the user-visible content from + // those logs to project into our own log output, but leaving out explicit Lambda metadata. + name := string(ops.component.state.URN.Name()) + serverlessFunction := ops.component.GetChild(awsServerlessFunctionTypeName, name) rawLogs, err := serverlessFunction.OperationsProvider(ops.config).GetLogs(query) if err != nil { return nil, err @@ -51,15 +56,23 @@ func (ops *cloudOpsProvider) GetLogs(query LogQuery) (*[]LogEntry, error) { contract.Assertf(rawLogs != nil, "expect aws:serverless:Function to provide logs") var logs []LogEntry for _, rawLog := range *rawLogs { - extractedLog := extractLambdaLogMessage(rawLog.Message) + extractedLog := extractLambdaLogMessage(rawLog.Message, name) if extractedLog != nil { logs = append(logs, *extractedLog) } } return &logs, nil - case logCollectorType: - urn := ops.component.state.URN - serverlessFunction := ops.component.GetChild(serverlessFunctionType, string(urn.Name())) + case cloudLogCollectorType: + // A LogCollector has an aws:serverless:Function which is wired up to receive logs from all other compute in the + // program. These logs are batched and then console.log'd into the log collector lambdas own logs, so we must + // get those logs and then decode through two layers of Lambda logging to extract the original messages. These + // logs are delayed somewhat more than raw lambda logs, but can survive even after the source lambda is deleted. + // In addition, we set the Lambda logs to automatically delete after 24 hours, which is safe because we have + // centrally archived into the log collector. As a result, we will combine reading these logs with reading the + // live Lambda logs from individual functions, de-duplicating the results, to piece together the full set of + // logs. + name := string(ops.component.state.URN.Name()) + serverlessFunction := ops.component.GetChild(awsServerlessFunctionTypeName, name) rawLogs, err := serverlessFunction.OperationsProvider(ops.config).GetLogs(query) if err != nil { return nil, err @@ -69,20 +82,30 @@ func (ops *cloudOpsProvider) GetLogs(query LogQuery) (*[]LogEntry, error) { var logs []LogEntry for _, rawLog := range *rawLogs { var logMessage encodedLogMessage - extractedLog := extractLambdaLogMessage(rawLog.Message) + extractedLog := extractLambdaLogMessage(rawLog.Message, name) if extractedLog != nil { err := json.Unmarshal([]byte(extractedLog.Message), &logMessage) if err != nil { return nil, err } for _, logEvent := range logMessage.LogEvents { - if extracted := extractLambdaLogMessage(logEvent.Message); extracted != nil { + if extracted := extractLambdaLogMessage(logEvent.Message, name); extracted != nil { logs = append(logs, *extracted) } } } } return &logs, nil + case cloudServiceType, cloudTaskType: + // Both Services and Tasks track a log group, which we can directly query for logs. These logs are only populated by user + // code withing containers, so we can safely project these logs back unmodified. + urn := ops.component.state.URN + logGroup := ops.component.GetChild(awsLogGroupTypeName, string(urn.Name()+"-task-logs")) + logs, err := logGroup.OperationsProvider(ops.config).GetLogs(query) + if err != nil { + return nil, err + } + return logs, nil default: // Else this resource kind does not produce any logs. return nil, nil @@ -123,14 +146,14 @@ var logRegexp = regexp.MustCompile("(.*Z)\t[a-g0-9\\-]*\t(.*)") // END RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 // REPORT RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 Duration: 222.92 ms Billed Duration: 300 ms Memory Size: 128 MB Max Memory Used: 33 MB // ``` -func extractLambdaLogMessage(message string) *LogEntry { +func extractLambdaLogMessage(message string, id string) *LogEntry { innerMatches := logRegexp.FindAllStringSubmatch(message, -1) if len(innerMatches) > 0 { contract.Assertf(len(innerMatches[0]) >= 3, "expected log regexp to always produce at least two capture groups") timestamp, err := time.Parse(time.RFC3339Nano, innerMatches[0][1]) contract.Assertf(err == nil, "expected to be able to parse timestamp") return &LogEntry{ - ID: "hmm", + ID: id, Message: innerMatches[0][2], Timestamp: timestamp.UnixNano() / 1000000, // milliseconds } diff --git a/pkg/operations/resources.go b/pkg/operations/resources.go index a8b8583c6..cc519c992 100644 --- a/pkg/operations/resources.go +++ b/pkg/operations/resources.go @@ -122,12 +122,25 @@ func (ops *resourceOperations) GetLogs(query LogQuery) (*[]LogEntry, error) { sort.SliceStable(logs, func(i, j int) bool { return logs[i].Timestamp < logs[j].Timestamp }) // Remove duplicates var retLogs []LogEntry - var lastLog LogEntry + var lastLogTimestamp int64 + var lastLogs []LogEntry for _, log := range logs { - if log.Message == lastLog.Message && log.Timestamp == lastLog.Timestamp { + shouldContinue := false + if log.Timestamp == lastLogTimestamp { + for _, lastLog := range lastLogs { + if log.Message == lastLog.Message { + shouldContinue = true + break + } + } + } else { + lastLogs = nil + } + if shouldContinue { continue } - lastLog = log + lastLogs = append(lastLogs, log) + lastLogTimestamp = log.Timestamp retLogs = append(retLogs, log) } return &retLogs, nil From c6aac7df89722cefc5ae03f5c80d7e0830624843 Mon Sep 17 00:00:00 2001 From: Luke Hoban Date: Mon, 20 Nov 2017 16:37:41 -0800 Subject: [PATCH 08/18] Support `--since` on `pulumi logs` Adds a `since` flag which gets only longs since some relative offset in the past. --- cmd/logs.go | 53 +++++++++++++++++++++++++- pkg/operations/operations_aws.go | 32 +++++++++++++--- pkg/operations/operations_cloud_aws.go | 3 -- 3 files changed, 79 insertions(+), 9 deletions(-) diff --git a/cmd/logs.go b/cmd/logs.go index 2c81813d9..29f5db3ff 100644 --- a/cmd/logs.go +++ b/cmd/logs.go @@ -4,6 +4,8 @@ package cmd import ( "fmt" + "regexp" + "strconv" "time" "github.com/spf13/cobra" @@ -15,6 +17,7 @@ import ( func newLogsCmd() *cobra.Command { var stack string var follow bool + var since string logsCmd := &cobra.Command{ Use: "logs", @@ -25,11 +28,15 @@ func newLogsCmd() *cobra.Command { return err } + startTime := parseRelativeDuration(since) + sinceTime := time.Unix(0, 0) highestTimeSeen := time.Unix(0, 0) for { - logs, err := backend.GetLogs(stackName, operations.LogQuery{}) + logs, err := backend.GetLogs(stackName, operations.LogQuery{ + StartTime: startTime, + }) if err != nil { return err } @@ -60,6 +67,50 @@ func newLogsCmd() *cobra.Command { logsCmd.PersistentFlags().BoolVarP( &follow, "follow", "f", false, "Follow the log stream in real time (like tail -f)") + logsCmd.PersistentFlags().StringVar( + &since, "since", "", + "Only return logs newer than a relative duration ('5s', '2m', '3h'). Defaults to returning all logs.") return logsCmd } + +var durationRegexp = regexp.MustCompile(`(\d+)([y|w|d|h|m|s])`) + +// parseRelativeDuration extracts a time.Time previous to now by the a relative duration in the format '5s', '2m', '3h'. +func parseRelativeDuration(duration string) *time.Time { + now := time.Now() + if duration == "" { + return nil + } + parts := durationRegexp.FindStringSubmatch(duration) + if parts == nil { + fmt.Printf("Err: %v\n", duration) + return nil + } + num, err := strconv.ParseInt(parts[1], 10, 64) + if err != nil { + fmt.Printf("Err: %v\n", err) + return nil + } + d := time.Duration(-num) + switch parts[2] { + case "y": + d *= time.Hour * 24 * 365 + case "w": + d *= time.Hour * 24 * 7 + case "d": + d *= time.Hour * 24 + case "h": + d *= time.Hour + case "m": + d *= time.Minute + case "s": + d *= time.Second + default: + return nil + } + // fmt.Printf("Duration: %v\n", d) + ret := now.Add(d) + fmt.Printf("Since: %v\n", ret.Format(time.RFC3339Nano)) + return &ret +} diff --git a/pkg/operations/operations_aws.go b/pkg/operations/operations_aws.go index ff6627485..7dd2b3cbf 100644 --- a/pkg/operations/operations_aws.go +++ b/pkg/operations/operations_aws.go @@ -3,6 +3,7 @@ package operations import ( "fmt" "sort" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" @@ -55,18 +56,28 @@ const ( ) func (ops *awsOpsProvider) GetLogs(query LogQuery) (*[]LogEntry, error) { - if query.StartTime != nil || query.EndTime != nil || query.Query != nil { - contract.Failf("not yet implemented - StartTime, Endtime, Query") + if query.Query != nil { + contract.Failf("not yet implemented - Query") } switch ops.component.state.Type { case awsFunctionType: functionName := ops.component.state.Outputs["name"].StringValue() - logResult := ops.awsConnection.getLogsForLogGroupsConcurrently([]string{functionName}, []string{"/aws/lambda/" + functionName}) + logResult := ops.awsConnection.getLogsForLogGroupsConcurrently( + []string{functionName}, + []string{"/aws/lambda/" + functionName}, + query.StartTime, + query.EndTime, + ) sort.SliceStable(logResult, func(i, j int) bool { return logResult[i].Timestamp < logResult[j].Timestamp }) return &logResult, nil case awsLogGroupType: name := ops.component.state.Outputs["name"].StringValue() - logResult := ops.awsConnection.getLogsForLogGroupsConcurrently([]string{name}, []string{name}) + logResult := ops.awsConnection.getLogsForLogGroupsConcurrently( + []string{name}, + []string{name}, + query.StartTime, + query.EndTime, + ) sort.SliceStable(logResult, func(i, j int) bool { return logResult[i].Timestamp < logResult[j].Timestamp }) return &logResult, nil default: @@ -110,16 +121,27 @@ func getAWSConnection(awsRegion string) (*awsConnection, error) { return connection, nil } -func (p *awsConnection) getLogsForLogGroupsConcurrently(names []string, logGroups []string) []LogEntry { +func (p *awsConnection) getLogsForLogGroupsConcurrently(names []string, logGroups []string, startTime *time.Time, endTime *time.Time) []LogEntry { // Create a channel for collecting log event outputs ch := make(chan []*cloudwatchlogs.FilteredLogEvent) + var startMilli *int64 + if startTime != nil { + startMilli = aws.Int64(aws.TimeUnixMilli(*startTime)) + } + var endMilli *int64 + if endTime != nil { + endMilli = aws.Int64(aws.TimeUnixMilli(*endTime)) + } + // Run FilterLogEvents for each log group in parallel for _, logGroup := range logGroups { go func(logGroup string) { resp, err := p.logSvc.FilterLogEvents(&cloudwatchlogs.FilterLogEventsInput{ LogGroupName: aws.String(logGroup), + StartTime: startMilli, + EndTime: endMilli, }) if err != nil { glog.V(5).Infof("[getLogs] Error getting logs: %v %v\n", logGroup, err) diff --git a/pkg/operations/operations_cloud_aws.go b/pkg/operations/operations_cloud_aws.go index e86dc6b17..5dac96551 100644 --- a/pkg/operations/operations_cloud_aws.go +++ b/pkg/operations/operations_cloud_aws.go @@ -40,9 +40,6 @@ const ( ) func (ops *cloudOpsProvider) GetLogs(query LogQuery) (*[]LogEntry, error) { - if query.StartTime != nil || query.EndTime != nil || query.Query != nil { - contract.Failf("not yet implemented - StartTime, Endtime, Query") - } switch ops.component.state.Type { case cloudFunctionType: // We get the aws:serverless:Function child and request it's logs, parsing out the user-visible content from From bee1b55b0bd0b2ca2ba5f2b25c96f4b924434183 Mon Sep 17 00:00:00 2001 From: Luke Hoban Date: Mon, 20 Nov 2017 23:18:47 -0800 Subject: [PATCH 09/18] Get all pages of results --- cmd/logs.go | 9 ++++----- pkg/operations/operations_aws.go | 28 ++++++++++++++++++++-------- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/cmd/logs.go b/cmd/logs.go index 29f5db3ff..916f20c90 100644 --- a/cmd/logs.go +++ b/cmd/logs.go @@ -44,7 +44,7 @@ func newLogsCmd() *cobra.Command { for _, logEntry := range logs { eventTime := time.Unix(0, logEntry.Timestamp*1000000) if eventTime.After(sinceTime) { - fmt.Printf("%29v[%25v] %v\n", eventTime.Format(time.RFC3339Nano), logEntry.ID, logEntry.Message) + fmt.Printf("%30.30s[%30.30s] %v\n", eventTime.Format(time.RFC3339Nano), logEntry.ID, logEntry.Message) } if eventTime.After(highestTimeSeen) { highestTimeSeen = eventTime @@ -84,12 +84,12 @@ func parseRelativeDuration(duration string) *time.Time { } parts := durationRegexp.FindStringSubmatch(duration) if parts == nil { - fmt.Printf("Err: %v\n", duration) + fmt.Printf("Warning: duration could not be parsed: '%v'\n", duration) return nil } num, err := strconv.ParseInt(parts[1], 10, 64) if err != nil { - fmt.Printf("Err: %v\n", err) + fmt.Printf("Warning: duration could not be parsed: '%v'\n", duration) return nil } d := time.Duration(-num) @@ -107,10 +107,9 @@ func parseRelativeDuration(duration string) *time.Time { case "s": d *= time.Second default: + fmt.Printf("Warning: duration could not be parsed: '%v'\n", duration) return nil } - // fmt.Printf("Duration: %v\n", d) ret := now.Add(d) - fmt.Printf("Since: %v\n", ret.Format(time.RFC3339Nano)) return &ret } diff --git a/pkg/operations/operations_aws.go b/pkg/operations/operations_aws.go index 7dd2b3cbf..1259beb78 100644 --- a/pkg/operations/operations_aws.go +++ b/pkg/operations/operations_aws.go @@ -138,15 +138,27 @@ func (p *awsConnection) getLogsForLogGroupsConcurrently(names []string, logGroup // Run FilterLogEvents for each log group in parallel for _, logGroup := range logGroups { go func(logGroup string) { - resp, err := p.logSvc.FilterLogEvents(&cloudwatchlogs.FilterLogEventsInput{ - LogGroupName: aws.String(logGroup), - StartTime: startMilli, - EndTime: endMilli, - }) - if err != nil { - glog.V(5).Infof("[getLogs] Error getting logs: %v %v\n", logGroup, err) + var nextToken *string + var ret []*cloudwatchlogs.FilteredLogEvent + for { + resp, err := p.logSvc.FilterLogEvents(&cloudwatchlogs.FilterLogEventsInput{ + LogGroupName: aws.String(logGroup), + StartTime: startMilli, + EndTime: endMilli, + NextToken: nextToken, + }) + if err != nil { + glog.V(5).Infof("[getLogs] Error getting logs: %v %v\n", logGroup, err) + } + ret = append(ret, resp.Events...) + nextToken = resp.NextToken + if resp.NextToken == nil { + break + } else { + fmt.Printf("Getting more logs for %v...\n", logGroup) + } } - ch <- resp.Events + ch <- ret }(logGroup) } From 512044b407058ac33a6794fa951565be35164a17 Mon Sep 17 00:00:00 2001 From: Luke Hoban Date: Mon, 20 Nov 2017 23:23:11 -0800 Subject: [PATCH 10/18] Simplify paging FilterLogEvents --- pkg/operations/operations_aws.go | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/pkg/operations/operations_aws.go b/pkg/operations/operations_aws.go index 1259beb78..884f22856 100644 --- a/pkg/operations/operations_aws.go +++ b/pkg/operations/operations_aws.go @@ -138,25 +138,20 @@ func (p *awsConnection) getLogsForLogGroupsConcurrently(names []string, logGroup // Run FilterLogEvents for each log group in parallel for _, logGroup := range logGroups { go func(logGroup string) { - var nextToken *string var ret []*cloudwatchlogs.FilteredLogEvent - for { - resp, err := p.logSvc.FilterLogEvents(&cloudwatchlogs.FilterLogEventsInput{ - LogGroupName: aws.String(logGroup), - StartTime: startMilli, - EndTime: endMilli, - NextToken: nextToken, - }) - if err != nil { - glog.V(5).Infof("[getLogs] Error getting logs: %v %v\n", logGroup, err) - } + err := p.logSvc.FilterLogEventsPages(&cloudwatchlogs.FilterLogEventsInput{ + LogGroupName: aws.String(logGroup), + StartTime: startMilli, + EndTime: endMilli, + }, func(resp *cloudwatchlogs.FilterLogEventsOutput, lastPage bool) bool { ret = append(ret, resp.Events...) - nextToken = resp.NextToken - if resp.NextToken == nil { - break - } else { + if !lastPage { fmt.Printf("Getting more logs for %v...\n", logGroup) } + return true + }) + if err != nil { + glog.V(5).Infof("[getLogs] Error getting logs: %v %v\n", logGroup, err) } ch <- ret }(logGroup) From 098c90419a507fe37391b265af6b8773f1581705 Mon Sep 17 00:00:00 2001 From: Luke Hoban Date: Wed, 22 Nov 2017 12:47:40 -0800 Subject: [PATCH 11/18] Use correct source ID for logs from LogCollector We have to reverse engineer the name from the soruce LogGroup information since that is all we got at runtime, but luckily that is sufficient given current name generation approach. This kind of code is *very* sensitive to any changes to automatic name generation - but that is likely inevitable at this layer. --- pkg/operations/operations_cloud_aws.go | 19 ++++++++++++++++--- pkg/operations/operations_cloud_aws_test.go | 14 ++++++++++---- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/pkg/operations/operations_cloud_aws.go b/pkg/operations/operations_cloud_aws.go index 5dac96551..8986978eb 100644 --- a/pkg/operations/operations_cloud_aws.go +++ b/pkg/operations/operations_cloud_aws.go @@ -78,15 +78,23 @@ func (ops *cloudOpsProvider) GetLogs(query LogQuery) (*[]LogEntry, error) { // Extract out the encoded and batched logs var logs []LogEntry for _, rawLog := range *rawLogs { - var logMessage encodedLogMessage extractedLog := extractLambdaLogMessage(rawLog.Message, name) if extractedLog != nil { + // Decode the JSON blog of data from within the log entries, which will itself be a nested log entry. + var logMessage encodedLogMessage err := json.Unmarshal([]byte(extractedLog.Message), &logMessage) if err != nil { return nil, err } + // Reverse engineer the name of the function that was the source of this message from the LogGroup name. + logName := logMessage.LogGroup + match := functionNameFromLogGroupNameRegExp.FindStringSubmatch(logMessage.LogGroup) + if len(match) == 2 { + logName = match[1] + } + // Extract out each individual log event and add them to our array of logs. for _, logEvent := range logMessage.LogEvents { - if extracted := extractLambdaLogMessage(logEvent.Message, name); extracted != nil { + if extracted := extractLambdaLogMessage(logEvent.Message, logName); extracted != nil { logs = append(logs, *extracted) } } @@ -132,7 +140,12 @@ type encodedLogMessage struct { LogEvents []encodedLogEvent `json:"logEvents"` } -var logRegexp = regexp.MustCompile("(.*Z)\t[a-g0-9\\-]*\t(.*)") +var ( + // Extract function name from LogGroup name + functionNameFromLogGroupNameRegExp = regexp.MustCompile(`^/aws/lambda/(.*)\-[0-9A-Fa-f]+$`) + // Extract Lambda log parts from Lambda log format + logRegexp = regexp.MustCompile("(.*Z)\t[a-g0-9\\-]*\t(.*)") +) // extractLambdaLogMessage extracts out only the log messages associated with user logs, skipping Lambda-specific metadata. // In particular, only the second line below is extracter, and it is extracted with the recorded timestamp. diff --git a/pkg/operations/operations_cloud_aws_test.go b/pkg/operations/operations_cloud_aws_test.go index 9e64c093e..6b1d7bfbb 100644 --- a/pkg/operations/operations_cloud_aws_test.go +++ b/pkg/operations/operations_cloud_aws_test.go @@ -7,13 +7,19 @@ import ( ) func Test_extractLambdaLogMessage(t *testing.T) { - res := extractLambdaLogMessage("START RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 Version: $LATEST") + res := extractLambdaLogMessage("START RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 Version: $LATEST", "foo") assert.Nil(t, res) - res = extractLambdaLogMessage("2017-11-17T20:30:27.736Z 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 GET /todo") + res = extractLambdaLogMessage("2017-11-17T20:30:27.736Z 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 GET /todo", "foo") assert.NotNil(t, res) assert.Equal(t, "GET /todo", res.Message) - res = extractLambdaLogMessage("END RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723") + res = extractLambdaLogMessage("END RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723", "foo") assert.Nil(t, res) - res = extractLambdaLogMessage("REPORT RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 Duration: 222.92 ms Billed Duration: 300 ms Memory Size: 128 MB Max Memory Used: 33 MB") + res = extractLambdaLogMessage("REPORT RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 Duration: 222.92 ms Billed Duration: 300 ms Memory Size: 128 MB Max Memory Used: 33 MB", "foo") assert.Nil(t, res) } + +func Test_functionNameFromLogGroupNameRegExp(t *testing.T) { + match := functionNameFromLogGroupNameRegExp.FindStringSubmatch("/aws/lambda/examples-todoc57917fa-023a27bc") + assert.Len(t, match, 2) + assert.Equal(t, "examples-todoc57917fa", match[1]) +} From e8a8bfb2c5ba602ad3c22b10e5fc5ecee2c4e065 Mon Sep 17 00:00:00 2001 From: Luke Hoban Date: Wed, 22 Nov 2017 13:08:19 -0800 Subject: [PATCH 12/18] Fix lint errors --- pkg/operations/operations_aws.go | 6 +++++- pkg/operations/operations_cloud_aws.go | 4 ++-- pkg/operations/operations_cloud_aws_test.go | 2 -- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/operations/operations_aws.go b/pkg/operations/operations_aws.go index 884f22856..829d35acf 100644 --- a/pkg/operations/operations_aws.go +++ b/pkg/operations/operations_aws.go @@ -121,7 +121,11 @@ func getAWSConnection(awsRegion string) (*awsConnection, error) { return connection, nil } -func (p *awsConnection) getLogsForLogGroupsConcurrently(names []string, logGroups []string, startTime *time.Time, endTime *time.Time) []LogEntry { +func (p *awsConnection) getLogsForLogGroupsConcurrently( + names []string, + logGroups []string, + startTime *time.Time, + endTime *time.Time) []LogEntry { // Create a channel for collecting log event outputs ch := make(chan []*cloudwatchlogs.FilteredLogEvent) diff --git a/pkg/operations/operations_cloud_aws.go b/pkg/operations/operations_cloud_aws.go index 8986978eb..244eea10a 100644 --- a/pkg/operations/operations_cloud_aws.go +++ b/pkg/operations/operations_cloud_aws.go @@ -103,7 +103,7 @@ func (ops *cloudOpsProvider) GetLogs(query LogQuery) (*[]LogEntry, error) { return &logs, nil case cloudServiceType, cloudTaskType: // Both Services and Tasks track a log group, which we can directly query for logs. These logs are only populated by user - // code withing containers, so we can safely project these logs back unmodified. + // code within containers, so we can safely project these logs back unmodified. urn := ops.component.state.URN logGroup := ops.component.GetChild(awsLogGroupTypeName, string(urn.Name()+"-task-logs")) logs, err := logGroup.OperationsProvider(ops.config).GetLogs(query) @@ -154,7 +154,7 @@ var ( // START RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 Version: $LATEST // 2017-11-17T20:30:27.736Z 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 GET /todo // END RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 -// REPORT RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 Duration: 222.92 ms Billed Duration: 300 ms Memory Size: 128 MB Max Memory Used: 33 MB +// REPORT RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 Duration: 222.92 ms Billed Duration: 300 ms // ``` func extractLambdaLogMessage(message string, id string) *LogEntry { innerMatches := logRegexp.FindAllStringSubmatch(message, -1) diff --git a/pkg/operations/operations_cloud_aws_test.go b/pkg/operations/operations_cloud_aws_test.go index 6b1d7bfbb..94d4a0730 100644 --- a/pkg/operations/operations_cloud_aws_test.go +++ b/pkg/operations/operations_cloud_aws_test.go @@ -14,8 +14,6 @@ func Test_extractLambdaLogMessage(t *testing.T) { assert.Equal(t, "GET /todo", res.Message) res = extractLambdaLogMessage("END RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723", "foo") assert.Nil(t, res) - res = extractLambdaLogMessage("REPORT RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 Duration: 222.92 ms Billed Duration: 300 ms Memory Size: 128 MB Max Memory Used: 33 MB", "foo") - assert.Nil(t, res) } func Test_functionNameFromLogGroupNameRegExp(t *testing.T) { From 8fd11f26b80ff33ae04b2c28e1ceb039e5c93554 Mon Sep 17 00:00:00 2001 From: Luke Hoban Date: Wed, 22 Nov 2017 17:18:32 -0800 Subject: [PATCH 13/18] Ensure we don't skip logs in `--follow` mode --- cmd/logs.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/cmd/logs.go b/cmd/logs.go index 916f20c90..fd5e2cd26 100644 --- a/cmd/logs.go +++ b/cmd/logs.go @@ -30,8 +30,13 @@ func newLogsCmd() *cobra.Command { startTime := parseRelativeDuration(since) - sinceTime := time.Unix(0, 0) - highestTimeSeen := time.Unix(0, 0) + // IDEA: This map will grow forever as new log entries are found. We may need to do a more approximate + // approach here to ensure we don't grow memory unboundedly while following logs. + // + // Note: Just tracking latest log date is not sufficient - as stale logs may show up which should have been + // displayed before previously rendered log entries, but weren't available at the time, so still need to be + // rendered now even though they are technically out of order. + shown := map[operations.LogEntry]bool{} for { logs, err := backend.GetLogs(stackName, operations.LogQuery{ @@ -42,12 +47,10 @@ func newLogsCmd() *cobra.Command { } for _, logEntry := range logs { - eventTime := time.Unix(0, logEntry.Timestamp*1000000) - if eventTime.After(sinceTime) { + if _, shownAlready := shown[logEntry]; !shownAlready { + eventTime := time.Unix(0, logEntry.Timestamp*1000000) fmt.Printf("%30.30s[%30.30s] %v\n", eventTime.Format(time.RFC3339Nano), logEntry.ID, logEntry.Message) - } - if eventTime.After(highestTimeSeen) { - highestTimeSeen = eventTime + shown[logEntry] = true } } @@ -55,7 +58,6 @@ func newLogsCmd() *cobra.Command { return nil } - sinceTime = highestTimeSeen time.Sleep(time.Second) } }), From 9648444b05b3201a04b3acf50fbddeef8164f894 Mon Sep 17 00:00:00 2001 From: Luke Hoban Date: Wed, 22 Nov 2017 20:58:46 -0800 Subject: [PATCH 14/18] Support for filtering logs by resource --- cmd/logs.go | 10 +++++++ pkg/operations/operations.go | 23 +++++++++++++-- pkg/operations/resources.go | 56 +++++++++++++++++++++++++++--------- 3 files changed, 74 insertions(+), 15 deletions(-) diff --git a/cmd/logs.go b/cmd/logs.go index fd5e2cd26..22f6ab585 100644 --- a/cmd/logs.go +++ b/cmd/logs.go @@ -18,6 +18,7 @@ func newLogsCmd() *cobra.Command { var stack string var follow bool var since string + var resource string logsCmd := &cobra.Command{ Use: "logs", @@ -29,6 +30,11 @@ func newLogsCmd() *cobra.Command { } startTime := parseRelativeDuration(since) + var resourceFilter *operations.ResourceFilter + if resource != "" { + var rf = operations.ResourceFilter(resource) + resourceFilter = &rf + } // IDEA: This map will grow forever as new log entries are found. We may need to do a more approximate // approach here to ensure we don't grow memory unboundedly while following logs. @@ -41,6 +47,7 @@ func newLogsCmd() *cobra.Command { for { logs, err := backend.GetLogs(stackName, operations.LogQuery{ StartTime: startTime, + Resource: resourceFilter, }) if err != nil { return err @@ -72,6 +79,9 @@ func newLogsCmd() *cobra.Command { logsCmd.PersistentFlags().StringVar( &since, "since", "", "Only return logs newer than a relative duration ('5s', '2m', '3h'). Defaults to returning all logs.") + logsCmd.PersistentFlags().StringVarP( + &resource, "resource", "r", "", + "Only return logs for the requested resource ('name', 'type::name' or full URN). Defaults to returning all logs.") return logsCmd } diff --git a/pkg/operations/operations.go b/pkg/operations/operations.go index c31ae299f..246220791 100644 --- a/pkg/operations/operations.go +++ b/pkg/operations/operations.go @@ -11,12 +11,31 @@ type LogEntry struct { Message string } +// ResourceFilter specifies a specific resource or subset of resources. It can be provided in three formats: +// - Full URN: "::::::" +// - Type + Name: "::" +// - Name: "" +type ResourceFilter string + +// Query is a filter on logs, in the format '=,=>val>' where is one of the following: +// - `URN` +// - `QualifiedName` +// - `Name` +// - `Type` +// - `Message` +type Query string + // LogQuery represents the parameters to a log query operation. // All fields are optional, leaving them off returns all logs. type LogQuery struct { + // StartTime is an optional time indiciating that only logs from after this time should be produced. StartTime *time.Time - EndTime *time.Time - Query *string + // EndTime is an optional time indiciating that only logs from before this time should be produced. + EndTime *time.Time + // Query is a string indicating a filter to apply to the logs - query syntax TBD + Query *string + // Resource is a string indicating that logs should be limited toa resource of resoruces + Resource *ResourceFilter } // MetricName is a handle to a metric supported by a Pulumi Framework resources diff --git a/pkg/operations/resources.go b/pkg/operations/resources.go index cc519c992..09aa1a6e3 100644 --- a/pkg/operations/resources.go +++ b/pkg/operations/resources.go @@ -87,29 +87,34 @@ var _ Provider = (*resourceOperations)(nil) // GetLogs gets logs for a Resource func (ops *resourceOperations) GetLogs(query LogQuery) (*[]LogEntry, error) { - opsProvider, err := ops.getOperationsProvider() - if err != nil { - return nil, err - } - if opsProvider != nil { - // If this resource has an operations provider - use it and don't recur into children. It is the responsibility - // of it's GetLogs implementation to aggregate all logs from children, either by passing them through or by - // filtering specific content out. - logsResult, err := opsProvider.GetLogs(query) + // Only get logs for this resource if it matches the resource filter query + if ops.matchesResourceFilter(query.Resource) { + // Try to get an operations provider for this resource, it may be `nil` + opsProvider, err := ops.getOperationsProvider() if err != nil { - return logsResult, err + return nil, err } - if logsResult != nil { - return logsResult, nil + if opsProvider != nil { + // If this resource has an operations provider - use it and don't recur into children. It is the + // responsibility of it's GetLogs implementation to aggregate all logs from children, either by passing them + // through or by filtering specific content out. + logsResult, err := opsProvider.GetLogs(query) + if err != nil { + return logsResult, err + } + if logsResult != nil { + return logsResult, nil + } } } + // If this resource did not choose to provide it's own logs, recur into children and collect + aggregate their logs. var logs []LogEntry for _, child := range ops.resource.children { childOps := &resourceOperations{ resource: child, config: ops.config, } - // TODO: Parallelize these calls to child GetLogs + // IDEA: Parallelize these calls to child GetLogs childLogs, err := childOps.GetLogs(query) if err != nil { return &logs, err @@ -146,6 +151,31 @@ func (ops *resourceOperations) GetLogs(query LogQuery) (*[]LogEntry, error) { return &retLogs, nil } +// matchesResourceFilter determines whether this resource matches the provided resource filter. +func (ops *resourceOperations) matchesResourceFilter(filter *ResourceFilter) bool { + if filter == nil { + // No filter, all resources match it. + return true + } + if ops.resource == nil || ops.resource.state == nil { + return false + } + urn := ops.resource.state.URN + if resource.URN(*filter) == urn { + // The filter matched the full URN + return true + } + if string(*filter) == string(urn.Type())+"::"+string(urn.Name()) { + // The filter matched the '::' part of the URN + return true + } + if tokens.QName(*filter) == urn.Name() { + // The filter matched the '' part of the URN + return true + } + return false +} + // ListMetrics lists metrics for a Resource func (ops *resourceOperations) ListMetrics() []MetricName { return []MetricName{} From 88452ff8da51ed19818867c06f32c24e1e322642 Mon Sep 17 00:00:00 2001 From: Luke Hoban Date: Wed, 22 Nov 2017 21:09:13 -0800 Subject: [PATCH 15/18] Include all children of resources that match resource filter in logging We need to clear the resource filter during the resource tree walk to ensure that logs from children of matched resources are collected and aggregated. --- pkg/operations/operations.go | 8 -------- pkg/operations/resources.go | 8 ++++++++ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/operations/operations.go b/pkg/operations/operations.go index 246220791..8deca15e1 100644 --- a/pkg/operations/operations.go +++ b/pkg/operations/operations.go @@ -17,14 +17,6 @@ type LogEntry struct { // - Name: "" type ResourceFilter string -// Query is a filter on logs, in the format '=,=>val>' where is one of the following: -// - `URN` -// - `QualifiedName` -// - `Name` -// - `Type` -// - `Message` -type Query string - // LogQuery represents the parameters to a log query operation. // All fields are optional, leaving them off returns all logs. type LogQuery struct { diff --git a/pkg/operations/resources.go b/pkg/operations/resources.go index 09aa1a6e3..49e7b81b4 100644 --- a/pkg/operations/resources.go +++ b/pkg/operations/resources.go @@ -89,6 +89,14 @@ var _ Provider = (*resourceOperations)(nil) func (ops *resourceOperations) GetLogs(query LogQuery) (*[]LogEntry, error) { // Only get logs for this resource if it matches the resource filter query if ops.matchesResourceFilter(query.Resource) { + // Set query to be a new query with `resource` nil so that we don't filter out logs from any children of this + // resource since this resource did match the resource filter. + query = LogQuery{ + StartTime: query.StartTime, + EndTime: query.EndTime, + Query: query.Query, + Resource: nil, + } // Try to get an operations provider for this resource, it may be `nil` opsProvider, err := ops.getOperationsProvider() if err != nil { From 75f74165244b1eaf6a5e5037e38061836fac684c Mon Sep 17 00:00:00 2001 From: Luke Hoban Date: Wed, 22 Nov 2017 21:33:36 -0800 Subject: [PATCH 16/18] Collect logs in parallel Parallelize collection of logs at each layer of the resource tree walk. Since almost all cost of log collection is at leaf nodes, this should allow the total time for log collection to be close to the time taken for the single longest leaf node, instead of the sum of all leaf nodes. --- cmd/logs.go | 4 ++-- pkg/operations/operations.go | 4 ++-- pkg/operations/operations_aws.go | 6 ++++++ pkg/operations/resources.go | 29 ++++++++++++++++++++--------- 4 files changed, 30 insertions(+), 13 deletions(-) diff --git a/cmd/logs.go b/cmd/logs.go index 22f6ab585..8f3f613c8 100644 --- a/cmd/logs.go +++ b/cmd/logs.go @@ -46,8 +46,8 @@ func newLogsCmd() *cobra.Command { for { logs, err := backend.GetLogs(stackName, operations.LogQuery{ - StartTime: startTime, - Resource: resourceFilter, + StartTime: startTime, + ResourceFilter: resourceFilter, }) if err != nil { return err diff --git a/pkg/operations/operations.go b/pkg/operations/operations.go index 8deca15e1..1ff89347b 100644 --- a/pkg/operations/operations.go +++ b/pkg/operations/operations.go @@ -26,8 +26,8 @@ type LogQuery struct { EndTime *time.Time // Query is a string indicating a filter to apply to the logs - query syntax TBD Query *string - // Resource is a string indicating that logs should be limited toa resource of resoruces - Resource *ResourceFilter + // ResourceFilter is a string indicating that logs should be limited to a resource or resources + ResourceFilter *ResourceFilter } // MetricName is a handle to a metric supported by a Pulumi Framework resources diff --git a/pkg/operations/operations_aws.go b/pkg/operations/operations_aws.go index 829d35acf..4538e6620 100644 --- a/pkg/operations/operations_aws.go +++ b/pkg/operations/operations_aws.go @@ -3,6 +3,7 @@ package operations import ( "fmt" "sort" + "sync" "time" "github.com/aws/aws-sdk-go/aws" @@ -101,9 +102,12 @@ type awsConnection struct { } var awsConnectionCache = map[string]*awsConnection{} +var awsConnectionCacheMutex = sync.RWMutex{} func getAWSConnection(awsRegion string) (*awsConnection, error) { + awsConnectionCacheMutex.RLock() connection, ok := awsConnectionCache[awsRegion] + awsConnectionCacheMutex.RUnlock() if !ok { awsConfig := aws.NewConfig() awsConfig.Region = aws.String(awsRegion) @@ -116,7 +120,9 @@ func getAWSConnection(awsRegion string) (*awsConnection, error) { logSvc: cloudwatchlogs.New(sess), metricSvc: cloudwatch.New(sess), } + awsConnectionCacheMutex.Lock() awsConnectionCache[awsRegion] = connection + awsConnectionCacheMutex.Unlock() } return connection, nil } diff --git a/pkg/operations/resources.go b/pkg/operations/resources.go index 49e7b81b4..bb7b9a1f3 100644 --- a/pkg/operations/resources.go +++ b/pkg/operations/resources.go @@ -88,14 +88,14 @@ var _ Provider = (*resourceOperations)(nil) // GetLogs gets logs for a Resource func (ops *resourceOperations) GetLogs(query LogQuery) (*[]LogEntry, error) { // Only get logs for this resource if it matches the resource filter query - if ops.matchesResourceFilter(query.Resource) { - // Set query to be a new query with `resource` nil so that we don't filter out logs from any children of this - // resource since this resource did match the resource filter. + if ops.matchesResourceFilter(query.ResourceFilter) { + // Set query to be a new query with `ResourceFilter` nil so that we don't filter out logs from any children of + // this resource since this resource did match the resource filter. query = LogQuery{ - StartTime: query.StartTime, - EndTime: query.EndTime, - Query: query.Query, - Resource: nil, + StartTime: query.StartTime, + EndTime: query.EndTime, + Query: query.Query, + ResourceFilter: nil, } // Try to get an operations provider for this resource, it may be `nil` opsProvider, err := ops.getOperationsProvider() @@ -117,13 +117,24 @@ func (ops *resourceOperations) GetLogs(query LogQuery) (*[]LogEntry, error) { } // If this resource did not choose to provide it's own logs, recur into children and collect + aggregate their logs. var logs []LogEntry + // Kick off GetLogs on all children in parallel, writing results to shared channels + ch := make(chan *[]LogEntry) + errch := make(chan error) for _, child := range ops.resource.children { childOps := &resourceOperations{ resource: child, config: ops.config, } - // IDEA: Parallelize these calls to child GetLogs - childLogs, err := childOps.GetLogs(query) + go func() { + childLogs, err := childOps.GetLogs(query) + ch <- childLogs + errch <- err + }() + } + // Handle results from GetLogs calls as they complete + for range ops.resource.children { + childLogs := <-ch + err := <-errch if err != nil { return &logs, err } From 6c694dd9594ae83f46f217562f49a588b1a98175 Mon Sep 17 00:00:00 2001 From: Luke Hoban Date: Wed, 22 Nov 2017 21:46:50 -0800 Subject: [PATCH 17/18] Use better names for container logs Use the URN name instead of the log group name for logs reported up by `cloud:service:Service` and `cloud:task:Task` components. --- pkg/operations/operations_cloud_aws.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/pkg/operations/operations_cloud_aws.go b/pkg/operations/operations_cloud_aws.go index 244eea10a..dd617277d 100644 --- a/pkg/operations/operations_cloud_aws.go +++ b/pkg/operations/operations_cloud_aws.go @@ -105,12 +105,22 @@ func (ops *cloudOpsProvider) GetLogs(query LogQuery) (*[]LogEntry, error) { // Both Services and Tasks track a log group, which we can directly query for logs. These logs are only populated by user // code within containers, so we can safely project these logs back unmodified. urn := ops.component.state.URN - logGroup := ops.component.GetChild(awsLogGroupTypeName, string(urn.Name()+"-task-logs")) - logs, err := logGroup.OperationsProvider(ops.config).GetLogs(query) + name := string(urn.Name()) + logGroup := ops.component.GetChild(awsLogGroupTypeName, name+"-task-logs") + rawLogs, err := logGroup.OperationsProvider(ops.config).GetLogs(query) if err != nil { return nil, err } - return logs, nil + contract.Assertf(rawLogs != nil, "expect aws:cloudwatch/logGroup:LogGroup to provide logs") + var logs []LogEntry + for _, rawLog := range *rawLogs { + logs = append(logs, LogEntry{ + ID: name, + Message: rawLog.Message, + Timestamp: rawLog.Timestamp, + }) + } + return &logs, nil default: // Else this resource kind does not produce any logs. return nil, nil From a79b3ab50fa99b12d06d4e460d88f8cc037c7db9 Mon Sep 17 00:00:00 2001 From: Luke Hoban Date: Sun, 26 Nov 2017 09:57:41 -0800 Subject: [PATCH 18/18] TODO and comment fixup --- cmd/backend_cloud.go | 2 +- cmd/backend_local.go | 1 - pkg/operations/operations_aws.go | 5 ++++- pkg/operations/operations_cloud_aws.go | 3 +++ 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/cmd/backend_cloud.go b/cmd/backend_cloud.go index 42b8a05e1..99daa7a67 100644 --- a/cmd/backend_cloud.go +++ b/cmd/backend_cloud.go @@ -228,7 +228,7 @@ func uploadProgram(uploadURL string, printSize bool) error { func (b *pulumiCloudPulumiBackend) GetLogs(stackName tokens.QName, query operations.LogQuery) ([]operations.LogEntry, error) { // TODO[pulumi/pulumi-service#227]: Relax these conditions once the service can take these arguments. if query.StartTime != nil || query.EndTime != nil || query.Query != nil { - return nil, errors.New("not implemented") + return nil, errors.New("cloud backend does not (yet) support filtering logs by start time, end time or message contents") } projID, err := getCloudProjectIdentifier() diff --git a/cmd/backend_local.go b/cmd/backend_local.go index 4534e1697..0251626c6 100644 --- a/cmd/backend_local.go +++ b/cmd/backend_local.go @@ -156,7 +156,6 @@ func (b *localPulumiBackend) GetLogs(stackName tokens.QName, query operations.Lo contract.Assert(snap != nil) contract.Assert(target != nil) - // TODO[pulumi/pulumi#54]: replace this with a call into a generalized operations provider. components := operations.NewResource(snap.Resources) ops := components.OperationsProvider(target.Config) logs, err := ops.GetLogs(query) diff --git a/pkg/operations/operations_aws.go b/pkg/operations/operations_aws.go index 4538e6620..9cd75006a 100644 --- a/pkg/operations/operations_aws.go +++ b/pkg/operations/operations_aws.go @@ -17,8 +17,11 @@ import ( "github.com/pulumi/pulumi/pkg/util/contract" ) +// TODO[pulumi/pulumi#54] This should be factored out behind an OperationsProvider RPC interface and versioned with the +// `pulumi-aws` repo instead of statically linked into the engine. + // AWSOperationsProvider creates an OperationsProvider capable of answering operational queries based on the -// underlying resources of the `@pulumi/cloud-aws` implementation. +// underlying resources of the `@pulumi/aws` implementation. func AWSOperationsProvider( config map[tokens.ModuleMember]string, component *Resource) (Provider, error) { diff --git a/pkg/operations/operations_cloud_aws.go b/pkg/operations/operations_cloud_aws.go index dd617277d..7ca1ca1e5 100644 --- a/pkg/operations/operations_cloud_aws.go +++ b/pkg/operations/operations_cloud_aws.go @@ -10,6 +10,9 @@ import ( "github.com/pulumi/pulumi/pkg/util/contract" ) +// TODO[pulumi/pulumi#54] This should be factored out behind an OperationsProvider RPC interface and versioned with the +// `pulumi-cloud` repo instead of statically linked into the engine. + // CloudOperationsProvider creates an OperationsProvider capable of answering operational queries based on the // underlying resources of the `@pulumi/cloud-aws` implementation. func CloudOperationsProvider(config map[tokens.ModuleMember]string, component *Resource) (Provider, error) {