pulumi/pkg/operations/operations_cloud_aws.go
Luke Hoban 098c90419a Use correct source ID for logs from LogCollector
We have to reverse engineer the name from the soruce LogGroup information since that is all we got at runtime, but luckily that is sufficient given current name generation approach.

This kind of code is *very* sensitive to any changes to automatic name generation - but that is likely inevitable at this layer.
2017-11-22 12:47:40 -08:00

173 lines
6.9 KiB
Go

package operations
import (
"encoding/json"
"fmt"
"regexp"
"time"
"github.com/pulumi/pulumi/pkg/tokens"
"github.com/pulumi/pulumi/pkg/util/contract"
)
// CloudOperationsProvider creates an OperationsProvider capable of answering operational queries based on the
// underlying resources of the `@pulumi/cloud-aws` implementation.
func CloudOperationsProvider(config map[tokens.ModuleMember]string, component *Resource) (Provider, error) {
prov := &cloudOpsProvider{
config: config,
component: component,
}
return prov, nil
}
type cloudOpsProvider struct {
config map[tokens.ModuleMember]string
component *Resource
}
var _ Provider = (*cloudOpsProvider)(nil)
const (
// Pulumi Framework component types
cloudFunctionType = tokens.Type("cloud:function:Function")
cloudLogCollectorType = tokens.Type("cloud:logCollector:LogCollector")
cloudServiceType = tokens.Type("cloud:service:Service")
cloudTaskType = tokens.Type("cloud:task:Task")
// AWS resource types
awsServerlessFunctionTypeName = "aws:serverless:Function"
awsLogGroupTypeName = "aws:cloudwatch/logGroup:LogGroup"
)
func (ops *cloudOpsProvider) GetLogs(query LogQuery) (*[]LogEntry, error) {
switch ops.component.state.Type {
case cloudFunctionType:
// We get the aws:serverless:Function child and request it's logs, parsing out the user-visible content from
// those logs to project into our own log output, but leaving out explicit Lambda metadata.
name := string(ops.component.state.URN.Name())
serverlessFunction := ops.component.GetChild(awsServerlessFunctionTypeName, name)
rawLogs, err := serverlessFunction.OperationsProvider(ops.config).GetLogs(query)
if err != nil {
return nil, err
}
contract.Assertf(rawLogs != nil, "expect aws:serverless:Function to provide logs")
var logs []LogEntry
for _, rawLog := range *rawLogs {
extractedLog := extractLambdaLogMessage(rawLog.Message, name)
if extractedLog != nil {
logs = append(logs, *extractedLog)
}
}
return &logs, nil
case cloudLogCollectorType:
// A LogCollector has an aws:serverless:Function which is wired up to receive logs from all other compute in the
// program. These logs are batched and then console.log'd into the log collector lambdas own logs, so we must
// get those logs and then decode through two layers of Lambda logging to extract the original messages. These
// logs are delayed somewhat more than raw lambda logs, but can survive even after the source lambda is deleted.
// In addition, we set the Lambda logs to automatically delete after 24 hours, which is safe because we have
// centrally archived into the log collector. As a result, we will combine reading these logs with reading the
// live Lambda logs from individual functions, de-duplicating the results, to piece together the full set of
// logs.
name := string(ops.component.state.URN.Name())
serverlessFunction := ops.component.GetChild(awsServerlessFunctionTypeName, name)
rawLogs, err := serverlessFunction.OperationsProvider(ops.config).GetLogs(query)
if err != nil {
return nil, err
}
contract.Assertf(rawLogs != nil, "expect aws:serverless:Function to provide logs")
// Extract out the encoded and batched logs
var logs []LogEntry
for _, rawLog := range *rawLogs {
extractedLog := extractLambdaLogMessage(rawLog.Message, name)
if extractedLog != nil {
// Decode the JSON blog of data from within the log entries, which will itself be a nested log entry.
var logMessage encodedLogMessage
err := json.Unmarshal([]byte(extractedLog.Message), &logMessage)
if err != nil {
return nil, err
}
// Reverse engineer the name of the function that was the source of this message from the LogGroup name.
logName := logMessage.LogGroup
match := functionNameFromLogGroupNameRegExp.FindStringSubmatch(logMessage.LogGroup)
if len(match) == 2 {
logName = match[1]
}
// Extract out each individual log event and add them to our array of logs.
for _, logEvent := range logMessage.LogEvents {
if extracted := extractLambdaLogMessage(logEvent.Message, logName); extracted != nil {
logs = append(logs, *extracted)
}
}
}
}
return &logs, nil
case cloudServiceType, cloudTaskType:
// Both Services and Tasks track a log group, which we can directly query for logs. These logs are only populated by user
// code withing containers, so we can safely project these logs back unmodified.
urn := ops.component.state.URN
logGroup := ops.component.GetChild(awsLogGroupTypeName, string(urn.Name()+"-task-logs"))
logs, err := logGroup.OperationsProvider(ops.config).GetLogs(query)
if err != nil {
return nil, err
}
return logs, nil
default:
// Else this resource kind does not produce any logs.
return nil, nil
}
}
func (ops *cloudOpsProvider) ListMetrics() []MetricName {
return nil
}
func (ops *cloudOpsProvider) GetMetricStatistics(metric MetricRequest) ([]MetricDataPoint, error) {
return nil, fmt.Errorf("Not yet implmeneted: GetMetricStatistics")
}
type encodedLogEvent struct {
ID string `json:"id"`
Timestamp int64 `json:"timestamp"`
Message string `json:"message"`
}
type encodedLogMessage struct {
MessageType string `json:"messageType"`
Owner string `json:"owner"`
LogGroup string `json:"logGroup"`
LogStream string `json:"logStream"`
SubscriptionFilters []string `json:"subscriptionFilters"`
LogEvents []encodedLogEvent `json:"logEvents"`
}
var (
// Extract function name from LogGroup name
functionNameFromLogGroupNameRegExp = regexp.MustCompile(`^/aws/lambda/(.*)\-[0-9A-Fa-f]+$`)
// Extract Lambda log parts from Lambda log format
logRegexp = regexp.MustCompile("(.*Z)\t[a-g0-9\\-]*\t(.*)")
)
// extractLambdaLogMessage extracts out only the log messages associated with user logs, skipping Lambda-specific metadata.
// In particular, only the second line below is extracter, and it is extracted with the recorded timestamp.
//
// ```
// START RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 Version: $LATEST
// 2017-11-17T20:30:27.736Z 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 GET /todo
// END RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723
// REPORT RequestId: 25e0d1e0-cbd6-11e7-9808-c7085dfe5723 Duration: 222.92 ms Billed Duration: 300 ms Memory Size: 128 MB Max Memory Used: 33 MB
// ```
func extractLambdaLogMessage(message string, id string) *LogEntry {
innerMatches := logRegexp.FindAllStringSubmatch(message, -1)
if len(innerMatches) > 0 {
contract.Assertf(len(innerMatches[0]) >= 3, "expected log regexp to always produce at least two capture groups")
timestamp, err := time.Parse(time.RFC3339Nano, innerMatches[0][1])
contract.Assertf(err == nil, "expected to be able to parse timestamp")
return &LogEntry{
ID: id,
Message: innerMatches[0][2],
Timestamp: timestamp.UnixNano() / 1000000, // milliseconds
}
}
return nil
}