Incorporate logs from logCollector
Also allow AWS Lambda Function logs to be projected in raw form, but filtered/formatted by higher level layers.
This commit is contained in:
parent
329d70fba2
commit
16ccc67654
|
@ -1,8 +1,6 @@
|
||||||
package operations
|
package operations
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"regexp"
|
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/aws/aws-sdk-go/aws/session"
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
||||||
|
@ -24,8 +22,6 @@ func newAWSConnection(sess *session.Session) *awsConnection {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var logRegexp = regexp.MustCompile(".*Z\t[a-g0-9\\-]*\t(.*)")
|
|
||||||
|
|
||||||
func (p *awsConnection) getLogsForLogGroupsConcurrently(names []string, logGroups []string) []LogEntry {
|
func (p *awsConnection) getLogsForLogGroupsConcurrently(names []string, logGroups []string) []LogEntry {
|
||||||
|
|
||||||
// Create a channel for collecting log event outputs
|
// Create a channel for collecting log event outputs
|
||||||
|
@ -49,15 +45,11 @@ func (p *awsConnection) getLogsForLogGroupsConcurrently(names []string, logGroup
|
||||||
for i := 0; i < len(logGroups); i++ {
|
for i := 0; i < len(logGroups); i++ {
|
||||||
logEvents := <-ch
|
logEvents := <-ch
|
||||||
for _, event := range logEvents {
|
for _, event := range logEvents {
|
||||||
innerMatches := logRegexp.FindAllStringSubmatch(aws.StringValue(event.Message), -1)
|
logs = append(logs, LogEntry{
|
||||||
glog.V(5).Infof("[getLogs] Inner matches: %v\n", innerMatches)
|
ID: names[i],
|
||||||
if len(innerMatches) > 0 {
|
Message: aws.StringValue(event.Message),
|
||||||
logs = append(logs, LogEntry{
|
Timestamp: aws.Int64Value(event.Timestamp),
|
||||||
ID: names[i],
|
})
|
||||||
Message: innerMatches[0][1],
|
|
||||||
Timestamp: aws.Int64Value(event.Timestamp),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,14 +7,13 @@ import (
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/aws/aws-sdk-go/aws/session"
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"github.com/pulumi/pulumi/pkg/tokens"
|
"github.com/pulumi/pulumi/pkg/tokens"
|
||||||
"github.com/pulumi/pulumi/pkg/util/contract"
|
"github.com/pulumi/pulumi/pkg/util/contract"
|
||||||
)
|
)
|
||||||
|
|
||||||
// CloudOperationsProvider creates an OperationsProvider capable of answering operational queries based on the
|
// AWSOperationsProvider creates an OperationsProvider capable of answering operational queries based on the
|
||||||
// underlying resources of the `@pulumi/cloud-aws` implementation.
|
// underlying resources of the `@pulumi/cloud-aws` implementation.
|
||||||
func CloudOperationsProvider(
|
func AWSOperationsProvider(
|
||||||
config map[tokens.ModuleMember]string,
|
config map[tokens.ModuleMember]string,
|
||||||
component *Resource) (Provider, error) {
|
component *Resource) (Provider, error) {
|
||||||
|
|
||||||
|
@ -23,7 +22,7 @@ func CloudOperationsProvider(
|
||||||
return nil, errors.Wrap(err, "failed to create AWS session")
|
return nil, errors.Wrap(err, "failed to create AWS session")
|
||||||
}
|
}
|
||||||
|
|
||||||
prov := &cloudOpsProvider{
|
prov := &awsOpsProvider{
|
||||||
awsConnection: newAWSConnection(sess),
|
awsConnection: newAWSConnection(sess),
|
||||||
component: component,
|
component: component,
|
||||||
}
|
}
|
||||||
|
@ -44,31 +43,28 @@ func createSessionFromConfig(config map[tokens.ModuleMember]string) (*session.Se
|
||||||
return session.NewSession(awsConfig)
|
return session.NewSession(awsConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
type cloudOpsProvider struct {
|
type awsOpsProvider struct {
|
||||||
awsConnection *awsConnection
|
awsConnection *awsConnection
|
||||||
component *Resource
|
component *Resource
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Provider = (*cloudOpsProvider)(nil)
|
var _ Provider = (*awsOpsProvider)(nil)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// AWS config keys
|
// AWS config keys
|
||||||
regionKey = "aws:config:region"
|
regionKey = "aws:config:region"
|
||||||
|
|
||||||
// Pulumi Framework component types
|
// AWS resource types
|
||||||
pulumiFunctionType = tokens.Type("cloud:function:Function")
|
awsFunctionType = tokens.Type("aws:lambda/function:Function")
|
||||||
)
|
)
|
||||||
|
|
||||||
func (ops *cloudOpsProvider) GetLogs(query LogQuery) (*[]LogEntry, error) {
|
func (ops *awsOpsProvider) GetLogs(query LogQuery) (*[]LogEntry, error) {
|
||||||
if query.StartTime != nil || query.EndTime != nil || query.Query != nil {
|
if query.StartTime != nil || query.EndTime != nil || query.Query != nil {
|
||||||
contract.Failf("not yet implemented - StartTime, Endtime, Query")
|
contract.Failf("not yet implemented - StartTime, Endtime, Query")
|
||||||
}
|
}
|
||||||
switch ops.component.state.Type {
|
switch ops.component.state.Type {
|
||||||
case pulumiFunctionType:
|
case awsFunctionType:
|
||||||
urn := ops.component.state.URN
|
functionName := ops.component.state.Outputs["name"].StringValue()
|
||||||
serverlessFunction := ops.component.GetChild("aws:serverless:Function", string(urn.Name()))
|
|
||||||
awsFunction := serverlessFunction.GetChild("aws:lambda/function:Function", string(urn.Name()))
|
|
||||||
functionName := awsFunction.state.Outputs["name"].StringValue()
|
|
||||||
logResult := ops.awsConnection.getLogsForLogGroupsConcurrently([]string{functionName}, []string{"/aws/lambda/" + functionName})
|
logResult := ops.awsConnection.getLogsForLogGroupsConcurrently([]string{functionName}, []string{"/aws/lambda/" + functionName})
|
||||||
sort.SliceStable(logResult, func(i, j int) bool { return logResult[i].Timestamp < logResult[j].Timestamp })
|
sort.SliceStable(logResult, func(i, j int) bool { return logResult[i].Timestamp < logResult[j].Timestamp })
|
||||||
return &logResult, nil
|
return &logResult, nil
|
||||||
|
@ -78,10 +74,10 @@ func (ops *cloudOpsProvider) GetLogs(query LogQuery) (*[]LogEntry, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ops *cloudOpsProvider) ListMetrics() []MetricName {
|
func (ops *awsOpsProvider) ListMetrics() []MetricName {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ops *cloudOpsProvider) GetMetricStatistics(metric MetricRequest) ([]MetricDataPoint, error) {
|
func (ops *awsOpsProvider) GetMetricStatistics(metric MetricRequest) ([]MetricDataPoint, error) {
|
||||||
return nil, fmt.Errorf("Not yet implmeneted: GetMetricStatistics")
|
return nil, fmt.Errorf("Not yet implmeneted: GetMetricStatistics")
|
||||||
}
|
}
|
139
pkg/operations/operations_cloud_aws.go
Normal file
139
pkg/operations/operations_cloud_aws.go
Normal file
|
@ -0,0 +1,139 @@
|
||||||
|
package operations
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"regexp"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/pulumi/pulumi/pkg/tokens"
|
||||||
|
"github.com/pulumi/pulumi/pkg/util/contract"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CloudOperationsProvider creates an OperationsProvider capable of answering operational queries based on the
|
||||||
|
// underlying resources of the `@pulumi/cloud-aws` implementation.
|
||||||
|
func CloudOperationsProvider(config map[tokens.ModuleMember]string, component *Resource) (Provider, error) {
|
||||||
|
prov := &cloudOpsProvider{
|
||||||
|
config: config,
|
||||||
|
component: component,
|
||||||
|
}
|
||||||
|
return prov, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type cloudOpsProvider struct {
|
||||||
|
config map[tokens.ModuleMember]string
|
||||||
|
component *Resource
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Provider = (*cloudOpsProvider)(nil)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Pulumi Framework component types
|
||||||
|
pulumiFunctionType = tokens.Type("cloud:function:Function")
|
||||||
|
logCollectorType = tokens.Type("cloud:logCollector:LogCollector")
|
||||||
|
|
||||||
|
// AWS resource types
|
||||||
|
serverlessFunctionType = "aws:serverless:Function"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (ops *cloudOpsProvider) GetLogs(query LogQuery) (*[]LogEntry, error) {
|
||||||
|
if query.StartTime != nil || query.EndTime != nil || query.Query != nil {
|
||||||
|
contract.Failf("not yet implemented - StartTime, Endtime, Query")
|
||||||
|
}
|
||||||
|
switch ops.component.state.Type {
|
||||||
|
case pulumiFunctionType:
|
||||||
|
urn := ops.component.state.URN
|
||||||
|
serverlessFunction := ops.component.GetChild(serverlessFunctionType, string(urn.Name()))
|
||||||
|
rawLogs, err := serverlessFunction.OperationsProvider(ops.config).GetLogs(query)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
contract.Assertf(rawLogs != nil, "expect aws:serverless:Function to provide logs")
|
||||||
|
var logs []LogEntry
|
||||||
|
for _, rawLog := range *rawLogs {
|
||||||
|
extractedLog := extractLambdaLogMessage(rawLog.Message)
|
||||||
|
if extractedLog != nil {
|
||||||
|
logs = append(logs, *extractedLog)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &logs, nil
|
||||||
|
case logCollectorType:
|
||||||
|
urn := ops.component.state.URN
|
||||||
|
serverlessFunction := ops.component.GetChild(serverlessFunctionType, string(urn.Name()))
|
||||||
|
rawLogs, err := serverlessFunction.OperationsProvider(ops.config).GetLogs(query)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
contract.Assertf(rawLogs != nil, "expect aws:serverless:Function to provide logs")
|
||||||
|
// Extract out the encoded and batched logs
|
||||||
|
var logs []LogEntry
|
||||||
|
for _, rawLog := range *rawLogs {
|
||||||
|
var logMessage encodedLogMessage
|
||||||
|
extractedLog := extractLambdaLogMessage(rawLog.Message)
|
||||||
|
if extractedLog != nil {
|
||||||
|
err := json.Unmarshal([]byte(extractedLog.Message), &logMessage)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, logEvent := range logMessage.LogEvents {
|
||||||
|
if extracted := extractLambdaLogMessage(logEvent.Message); extracted != nil {
|
||||||
|
logs = append(logs, *extracted)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &logs, nil
|
||||||
|
default:
|
||||||
|
// Else this resource kind does not produce any logs.
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ops *cloudOpsProvider) ListMetrics() []MetricName {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ops *cloudOpsProvider) GetMetricStatistics(metric MetricRequest) ([]MetricDataPoint, error) {
|
||||||
|
return nil, fmt.Errorf("Not yet implmeneted: GetMetricStatistics")
|
||||||
|
}
|
||||||
|
|
||||||
|
type encodedLogEvent struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Timestamp int64 `json:"timestamp"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type encodedLogMessage struct {
|
||||||
|
MessageType string `json:"messageType"`
|
||||||
|
Owner string `json:"owner"`
|
||||||
|
LogGroup string `json:"logGroup"`
|
||||||
|
LogStream string `json:"logStream"`
|
||||||
|
SubscriptionFilters []string `json:"subscriptionFilters"`
|
||||||
|
LogEvents []encodedLogEvent `json:"logEvents"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var logRegexp = regexp.MustCompile("(.*Z)\t[a-g0-9\\-]*\t(.*)")
|
||||||
|
|
||||||
|
// extractLambdaLogMessage extracts out only the log messages associated with user logs, skipping Lambda-specific metadata.
|
||||||
|
// In particular, only the second line below is extracter, and it is extracted with the recorded timestamp.
|
||||||
|
//
|
||||||
|
// ```
|
||||||
|
// START RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 Version: $LATEST
|
||||||
|
// 2017-11-17T20:30:27.736Z 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 GET /todo
|
||||||
|
// END RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723
|
||||||
|
// REPORT RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 Duration: 222.92 ms Billed Duration: 300 ms Memory Size: 128 MB Max Memory Used: 33 MB
|
||||||
|
// ```
|
||||||
|
func extractLambdaLogMessage(message string) *LogEntry {
|
||||||
|
innerMatches := logRegexp.FindAllStringSubmatch(message, -1)
|
||||||
|
if len(innerMatches) > 0 {
|
||||||
|
contract.Assertf(len(innerMatches[0]) >= 3, "expected log regexp to always produce at least two capture groups")
|
||||||
|
timestamp, err := time.Parse(time.RFC3339Nano, innerMatches[0][1])
|
||||||
|
contract.Assertf(err == nil, "expected to be able to parse timestamp")
|
||||||
|
return &LogEntry{
|
||||||
|
ID: "hmm",
|
||||||
|
Message: innerMatches[0][2],
|
||||||
|
Timestamp: timestamp.UnixNano() / 1000000, // milliseconds
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
19
pkg/operations/operations_cloud_aws_test.go
Normal file
19
pkg/operations/operations_cloud_aws_test.go
Normal file
|
@ -0,0 +1,19 @@
|
||||||
|
package operations
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_extractLambdaLogMessage(t *testing.T) {
|
||||||
|
res := extractLambdaLogMessage("START RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 Version: $LATEST")
|
||||||
|
assert.Nil(t, res)
|
||||||
|
res = extractLambdaLogMessage("2017-11-17T20:30:27.736Z 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 GET /todo")
|
||||||
|
assert.NotNil(t, res)
|
||||||
|
assert.Equal(t, "GET /todo", res.Message)
|
||||||
|
res = extractLambdaLogMessage("END RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723")
|
||||||
|
assert.Nil(t, res)
|
||||||
|
res = extractLambdaLogMessage("REPORT RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 Duration: 222.92 ms Billed Duration: 300 ms Memory Size: 128 MB Max Memory Used: 33 MB")
|
||||||
|
assert.Nil(t, res)
|
||||||
|
}
|
|
@ -2,6 +2,7 @@ package operations
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sort"
|
||||||
|
|
||||||
"github.com/pulumi/pulumi/pkg/resource"
|
"github.com/pulumi/pulumi/pkg/resource"
|
||||||
"github.com/pulumi/pulumi/pkg/tokens"
|
"github.com/pulumi/pulumi/pkg/tokens"
|
||||||
|
@ -76,18 +77,6 @@ func (r *Resource) OperationsProvider(config map[tokens.ModuleMember]string) Pro
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getOperationsProvider(resource *Resource, config map[tokens.ModuleMember]string) (Provider, error) {
|
|
||||||
if resource == nil || resource.state == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
switch resource.state.Type.Package() {
|
|
||||||
case "cloud":
|
|
||||||
return CloudOperationsProvider(config, resource)
|
|
||||||
default:
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ResourceOperations is an OperationsProvider for Resources
|
// ResourceOperations is an OperationsProvider for Resources
|
||||||
type resourceOperations struct {
|
type resourceOperations struct {
|
||||||
resource *Resource
|
resource *Resource
|
||||||
|
@ -98,7 +87,7 @@ var _ Provider = (*resourceOperations)(nil)
|
||||||
|
|
||||||
// GetLogs gets logs for a Resource
|
// GetLogs gets logs for a Resource
|
||||||
func (ops *resourceOperations) GetLogs(query LogQuery) (*[]LogEntry, error) {
|
func (ops *resourceOperations) GetLogs(query LogQuery) (*[]LogEntry, error) {
|
||||||
opsProvider, err := getOperationsProvider(ops.resource, ops.config)
|
opsProvider, err := ops.getOperationsProvider()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -120,6 +109,7 @@ func (ops *resourceOperations) GetLogs(query LogQuery) (*[]LogEntry, error) {
|
||||||
resource: child,
|
resource: child,
|
||||||
config: ops.config,
|
config: ops.config,
|
||||||
}
|
}
|
||||||
|
// TODO: Parallelize these calls to child GetLogs
|
||||||
childLogs, err := childOps.GetLogs(query)
|
childLogs, err := childOps.GetLogs(query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &logs, err
|
return &logs, err
|
||||||
|
@ -128,7 +118,19 @@ func (ops *resourceOperations) GetLogs(query LogQuery) (*[]LogEntry, error) {
|
||||||
logs = append(logs, *childLogs...)
|
logs = append(logs, *childLogs...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return &logs, nil
|
// Sort
|
||||||
|
sort.SliceStable(logs, func(i, j int) bool { return logs[i].Timestamp < logs[j].Timestamp })
|
||||||
|
// Remove duplicates
|
||||||
|
var retLogs []LogEntry
|
||||||
|
var lastLog LogEntry
|
||||||
|
for _, log := range logs {
|
||||||
|
if log.Message == lastLog.Message && log.Timestamp == lastLog.Timestamp {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
lastLog = log
|
||||||
|
retLogs = append(retLogs, log)
|
||||||
|
}
|
||||||
|
return &retLogs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListMetrics lists metrics for a Resource
|
// ListMetrics lists metrics for a Resource
|
||||||
|
@ -140,3 +142,17 @@ func (ops *resourceOperations) ListMetrics() []MetricName {
|
||||||
func (ops *resourceOperations) GetMetricStatistics(metric MetricRequest) ([]MetricDataPoint, error) {
|
func (ops *resourceOperations) GetMetricStatistics(metric MetricRequest) ([]MetricDataPoint, error) {
|
||||||
return nil, fmt.Errorf("not yet implemented")
|
return nil, fmt.Errorf("not yet implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ops *resourceOperations) getOperationsProvider() (Provider, error) {
|
||||||
|
if ops.resource == nil || ops.resource.state == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
switch ops.resource.state.Type.Package() {
|
||||||
|
case "cloud":
|
||||||
|
return CloudOperationsProvider(ops.config, ops.resource)
|
||||||
|
case "aws":
|
||||||
|
return AWSOperationsProvider(ops.config, ops.resource)
|
||||||
|
default:
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue