pulumi/pkg/operations/operations_aws.go

202 lines
6 KiB
Go
Raw Normal View History

// Copyright 2016-2018, Pulumi Corporation. All rights reserved.
2017-11-28 21:54:36 +01:00
2017-11-20 07:28:49 +01:00
package operations
import (
"sort"
"sync"
"time"
2017-11-20 07:28:49 +01:00
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
2017-11-20 07:28:49 +01:00
"github.com/aws/aws-sdk-go/aws/session"
2017-11-20 22:44:23 +01:00
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/golang/glog"
2017-11-20 07:28:49 +01:00
"github.com/pkg/errors"
2017-11-20 22:44:23 +01:00
"github.com/pulumi/pulumi/pkg/resource/config"
2017-11-20 07:28:49 +01:00
"github.com/pulumi/pulumi/pkg/tokens"
)
2017-11-26 18:57:41 +01:00
// TODO[pulumi/pulumi#54] This should be factored out behind an OperationsProvider RPC interface and versioned with the
// `pulumi-aws` repo instead of statically linked into the engine.
// AWSOperationsProvider creates an OperationsProvider capable of answering operational queries based on the
2017-11-26 18:57:41 +01:00
// underlying resources of the `@pulumi/aws` implementation.
func AWSOperationsProvider(
config map[config.Key]string,
2017-11-20 07:28:49 +01:00
component *Resource) (Provider, error) {
2017-11-20 22:44:23 +01:00
awsRegion, ok := config[regionKey]
if !ok {
return nil, errors.New("no AWS region found")
}
// If provided, also pass along the access and secret keys so that we have permission to access operational data on
// resources in the target account.
//
// [pulumi/pulumi#608]: We are only approximating the actual logic that the AWS provider (via
// terraform-provdider-aws) uses to turn config into a valid AWS connection. We should find some way to unify these
// as part of moving this code into a separate process on the other side of an RPC boundary.
awsAccessKey := config[accessKey]
awsSecretKey := config[secretKey]
awsToken := config[token]
sess, err := getAWSSession(awsRegion, awsAccessKey, awsSecretKey, awsToken)
2017-11-20 07:28:49 +01:00
if err != nil {
2017-11-20 22:44:23 +01:00
return nil, err
2017-11-20 07:28:49 +01:00
}
connection := &awsConnection{
logSvc: cloudwatchlogs.New(sess),
}
prov := &awsOpsProvider{
awsConnection: connection,
2017-11-20 07:28:49 +01:00
component: component,
}
return prov, nil
}
type awsOpsProvider struct {
2017-11-20 07:28:49 +01:00
awsConnection *awsConnection
component *Resource
}
var _ Provider = (*awsOpsProvider)(nil)
2017-11-20 07:28:49 +01:00
var (
2017-11-20 07:28:49 +01:00
// AWS config keys
regionKey = config.MustMakeKey("aws", "region")
accessKey = config.MustMakeKey("aws", "accessKey")
General prep work for refresh This change includes a bunch of refactorings I made in prep for doing refresh (first, the command, see pulumi/pulumi#1081): * The primary change is to change the way the engine's core update functionality works with respect to deploy.Source. This is the way we can plug in new sources of resource information during planning (and, soon, diffing). The way I intend to model refresh is by having a new kind of source, deploy.RefreshSource, which will let us do virtually everything about an update/diff the same way with refreshes, which avoid otherwise duplicative effort. This includes changing the planOptions (nee deployOptions) to take a new SourceFunc callback, which is responsible for creating a source specific to the kind of plan being requested. Preview, Update, and Destroy now are primarily differentiated by the kind of deploy.Source that they return, rather than sprinkling things like `if Destroying` throughout. This tidies up some logic and, more importantly, gives us precisely the refresh hook we need. * Originally, we used the deploy.NullSource for Destroy operations. This simply returns nothing, which is how Destroy works. For some reason, we were no longer doing this, and instead had some `if Destroying` cases sprinkled throughout the deploy.EvalSource. I think this is a vestige of some old way we did configuration, at least judging by a comment, which is apparently no longer relevant. * Move diff and diff-printing logic within the engine into its own pkg/engine/diff.go file, to prepare for upcoming work. * I keep noticing benign diffs anytime I regenerate protobufs. I suspect this is because we're also on different versions. I changed generate.sh to also dump the version into grpc_version.txt. At least we can understand where the diffs are coming from, decide whether to take them (i.e., a newer version), and ensure that as a team we are monotonically increasing, and not going backwards. * I also tidied up some tiny things I noticed while in there, like comments, incorrect types, lint suppressions, and so on.
2018-03-28 16:45:23 +02:00
secretKey = config.MustMakeKey("aws", "secretKey")
token = config.MustMakeKey("aws", "token")
)
2017-11-20 07:28:49 +01:00
const (
// AWS resource types
awsFunctionType = tokens.Type("aws:lambda/function:Function")
awsLogGroupType = tokens.Type("aws:cloudwatch/logGroup:LogGroup")
2017-11-20 07:28:49 +01:00
)
func (ops *awsOpsProvider) GetLogs(query LogQuery) (*[]LogEntry, error) {
state := ops.component.State
glog.V(6).Infof("GetLogs[%v]", state.URN)
switch state.Type {
case awsFunctionType:
functionName := state.Outputs["name"].StringValue()
logResult := ops.awsConnection.getLogsForLogGroupsConcurrently(
[]string{functionName},
[]string{"/aws/lambda/" + functionName},
query.StartTime,
query.EndTime,
)
2017-11-20 07:28:49 +01:00
sort.SliceStable(logResult, func(i, j int) bool { return logResult[i].Timestamp < logResult[j].Timestamp })
glog.V(5).Infof("GetLogs[%v] return %d logs", state.URN, len(logResult))
2017-11-20 07:28:49 +01:00
return &logResult, nil
case awsLogGroupType:
name := state.Outputs["name"].StringValue()
logResult := ops.awsConnection.getLogsForLogGroupsConcurrently(
[]string{name},
[]string{name},
query.StartTime,
query.EndTime,
)
sort.SliceStable(logResult, func(i, j int) bool { return logResult[i].Timestamp < logResult[j].Timestamp })
glog.V(5).Infof("GetLogs[%v] return %d logs", state.URN, len(logResult))
return &logResult, nil
2017-11-20 07:28:49 +01:00
default:
// Else this resource kind does not produce any logs.
glog.V(6).Infof("GetLogs[%v] does not produce logs", state.URN)
2017-11-20 07:28:49 +01:00
return nil, nil
}
}
2017-11-20 22:44:23 +01:00
type awsConnection struct {
2017-11-28 21:54:36 +01:00
logSvc *cloudwatchlogs.CloudWatchLogs
2017-11-20 22:44:23 +01:00
}
var awsDefaultSession *session.Session
var awsDefaultSessionMutex sync.Mutex
2017-11-20 22:44:23 +01:00
func getAWSSession(awsRegion, awsAccessKey, awsSecretKey, token string) (*session.Session, error) {
// AWS SDK for Go documentation: "Sessions should be cached when possible"
// We keep a default session around and then make cheap copies of it.
awsDefaultSessionMutex.Lock()
defer awsDefaultSessionMutex.Unlock()
if awsDefaultSession == nil {
sess, err := session.NewSession()
2017-11-20 22:44:23 +01:00
if err != nil {
return nil, errors.Wrap(err, "failed to create AWS session")
}
awsDefaultSession = sess
2017-11-20 22:44:23 +01:00
}
extraConfig := aws.NewConfig()
extraConfig.Region = aws.String(awsRegion)
if awsAccessKey != "" || awsSecretKey != "" || token != "" {
extraConfig.Credentials = credentials.NewStaticCredentials(awsAccessKey, awsSecretKey, token)
}
return awsDefaultSession.Copy(extraConfig), nil
2017-11-20 22:44:23 +01:00
}
2017-11-22 22:08:19 +01:00
func (p *awsConnection) getLogsForLogGroupsConcurrently(
names []string,
logGroups []string,
startTime *time.Time,
endTime *time.Time) []LogEntry {
2017-11-20 22:44:23 +01:00
// Create a channel for collecting log event outputs
2017-11-28 21:54:36 +01:00
ch := make(chan []*cloudwatchlogs.FilteredLogEvent, len(logGroups))
2017-11-20 22:44:23 +01:00
var startMilli *int64
if startTime != nil {
startMilli = aws.Int64(aws.TimeUnixMilli(*startTime))
}
var endMilli *int64
if endTime != nil {
endMilli = aws.Int64(aws.TimeUnixMilli(*endTime))
}
2017-11-20 22:44:23 +01:00
// Run FilterLogEvents for each log group in parallel
for _, logGroup := range logGroups {
go func(logGroup string) {
2017-11-21 08:18:47 +01:00
var ret []*cloudwatchlogs.FilteredLogEvent
2017-11-21 08:23:11 +01:00
err := p.logSvc.FilterLogEventsPages(&cloudwatchlogs.FilterLogEventsInput{
LogGroupName: aws.String(logGroup),
StartTime: startMilli,
EndTime: endMilli,
}, func(resp *cloudwatchlogs.FilterLogEventsOutput, lastPage bool) bool {
2017-11-21 08:18:47 +01:00
ret = append(ret, resp.Events...)
2017-11-21 08:23:11 +01:00
if !lastPage {
glog.V(5).Infof("[getLogs] Getting more logs for %v...\n", logGroup)
2017-11-21 08:18:47 +01:00
}
2017-11-21 08:23:11 +01:00
return true
})
if err != nil {
glog.V(5).Infof("[getLogs] Error getting logs: %v %v\n", logGroup, err)
2017-11-20 22:44:23 +01:00
}
2017-11-21 08:18:47 +01:00
ch <- ret
2017-11-20 22:44:23 +01:00
}(logGroup)
}
// Collect responses on the channel and append logs into combined log array
var logs []LogEntry
for i := 0; i < len(logGroups); i++ {
logEvents := <-ch
for _, event := range logEvents {
logs = append(logs, LogEntry{
ID: names[i],
Message: aws.StringValue(event.Message),
Timestamp: aws.Int64Value(event.Timestamp),
})
}
}
return logs
}