Support for AWS DynamoDB Table GlobalSecondaryIndexes
Adds support for global secondary indexes on DynamoDB Tables. Also adds a HashSet API to the AWS provider library. This handles part of #178, providing a standard way for AWS provider implementations to compute set-based diffs. This new API is used in both aws.dynamodb.Table and aws.elasticbeanstalk.Environment currently.
This commit is contained in:
parent
7f8b1e59c1
commit
8bbf48bf87
|
@ -861,7 +861,7 @@ export class Transformer {
|
|||
}
|
||||
|
||||
// Finally, if we got here, it's not a type we support yet; issue an error and return `dynamic`.
|
||||
this.diagnostics.push(this.dctx.newInvalidTypeError(node, simple));
|
||||
this.diagnostics.push(this.dctx.newInvalidTypeWarning(node, simple));
|
||||
return tokens.dynamicType;
|
||||
}
|
||||
|
||||
|
|
|
@ -286,7 +286,7 @@ export class Context {
|
|||
};
|
||||
}
|
||||
|
||||
public newInvalidTypeError(node: ts.Node, ty: ts.Type): Diagnostic {
|
||||
public newInvalidTypeWarning(node: ts.Node, ty: ts.Type): Diagnostic {
|
||||
let name: string;
|
||||
if (ty.symbol) {
|
||||
name = `'${ty.symbol.name}' `;
|
||||
|
@ -295,9 +295,9 @@ export class Context {
|
|||
name = "";
|
||||
}
|
||||
return {
|
||||
category: DiagnosticCategory.Error,
|
||||
category: DiagnosticCategory.Warning,
|
||||
code: 501,
|
||||
message: `Type ${name}(kind ${ts.TypeFlags[ty.flags]}) is not supported in LumiJS`,
|
||||
message: `Type ${name}(kind ${ts.TypeFlags[ty.flags]}) is not supported in LumiJS, emitting 'dynamic'`,
|
||||
loc: this.locationFrom(node),
|
||||
};
|
||||
}
|
||||
|
|
|
@ -20,11 +20,33 @@ let music = new aws.dynamodb.Table("music", {
|
|||
attributes: [
|
||||
{ name: "Album", type: "S" },
|
||||
{ name: "Artist", type: "S" },
|
||||
{ name: "NumberOfSongs", type: "N" },
|
||||
{ name: "Sales", type: "N" },
|
||||
],
|
||||
hashKey: "Album",
|
||||
rangeKey: "Artist",
|
||||
readCapacity: 1,
|
||||
writeCapacity: 1
|
||||
writeCapacity: 1,
|
||||
globalSecondaryIndexes: [
|
||||
{
|
||||
indexName: "myGSI",
|
||||
hashKey: "Sales",
|
||||
rangeKey: "Artist",
|
||||
readCapacity: 1,
|
||||
writeCapacity: 1,
|
||||
nonKeyAttributes: ["Album", "NumberOfSongs"],
|
||||
projectionType: "INCLUDE",
|
||||
},
|
||||
{
|
||||
indexName: "myGSI2",
|
||||
hashKey: "NumberOfSongs",
|
||||
rangeKey: "Sales",
|
||||
nonKeyAttributes: ["Album", "Artist"],
|
||||
projectionType: "INCLUDE",
|
||||
readCapacity: 2,
|
||||
writeCapacity: 2,
|
||||
},
|
||||
],
|
||||
})
|
||||
|
||||
// TODO[pulumi/lumi#174] Until we have global definitions available in Lumi for these APIs that are expected
|
||||
|
|
|
@ -23,16 +23,16 @@ import (
|
|||
// http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/.
|
||||
type Table struct {
|
||||
idl.NamedResource
|
||||
HashKey string `lumi:"hashKey,replaces"`
|
||||
Attributes []Attribute `lumi:"attributes"`
|
||||
ReadCapacity float64 `lumi:"readCapacity"`
|
||||
WriteCapacity float64 `lumi:"writeCapacity"`
|
||||
RangeKey *string `lumi:"rangeKey,optional,replaces"`
|
||||
TableName *string `lumi:"tableName,optional,replaces"`
|
||||
HashKey string `lumi:"hashKey,replaces"`
|
||||
Attributes []Attribute `lumi:"attributes"`
|
||||
ReadCapacity float64 `lumi:"readCapacity"`
|
||||
WriteCapacity float64 `lumi:"writeCapacity"`
|
||||
RangeKey *string `lumi:"rangeKey,optional,replaces"`
|
||||
TableName *string `lumi:"tableName,optional,replaces"`
|
||||
GlobalSecondaryIndexes *[]GlobalSecondaryIndex `lumi:"globalSecondaryIndexes,optional"`
|
||||
|
||||
// TODO:
|
||||
// LocalSecondaryIndexes
|
||||
// GlobalSecondaryIndexes
|
||||
// StreamSpecification
|
||||
}
|
||||
|
||||
|
@ -52,3 +52,23 @@ const (
|
|||
NumberAttribute AttributeType = "N"
|
||||
BinaryAttribute AttributeType = "B"
|
||||
)
|
||||
|
||||
// A GlobalSecondaryIndex represents an alternative index at DynamoDB Table
|
||||
type GlobalSecondaryIndex struct {
|
||||
IndexName string `lumi:"indexName"`
|
||||
HashKey string `lumi:"hashKey"`
|
||||
RangeKey *string `lumi:"rangeKey,optional"`
|
||||
ReadCapacity float64 `lumi:"readCapacity"`
|
||||
WriteCapacity float64 `lumi:"writeCapacity"`
|
||||
NonKeyAttributes []string `lumi:"nonKeyAttributes"`
|
||||
ProjectionType ProjectionType `lumi:"projectionType"`
|
||||
}
|
||||
|
||||
// ProjectionType represents the types of DynamoDB Table Attributes.
|
||||
type ProjectionType string
|
||||
|
||||
const (
|
||||
KeysOnlyProjection ProjectionType = "KEYS_ONLY"
|
||||
IncludeProjection ProjectionType = "INCLUDE"
|
||||
AllProjection ProjectionType = "ALL"
|
||||
)
|
||||
|
|
|
@ -3,7 +3,10 @@
|
|||
|
||||
import * as lumi from "@lumi/lumi";
|
||||
|
||||
export let AllProjection: ProjectionType = "ALL";
|
||||
export let BinaryAttribute: AttributeType = "B";
|
||||
export let IncludeProjection: ProjectionType = "INCLUDE";
|
||||
export let KeysOnlyProjection: ProjectionType = "KEYS_ONLY";
|
||||
export let NumberAttribute: AttributeType = "N";
|
||||
export let StringAttribute: AttributeType = "S";
|
||||
|
||||
|
@ -17,6 +20,21 @@ export type AttributeType =
|
|||
"N" |
|
||||
"S";
|
||||
|
||||
export interface GlobalSecondaryIndex {
|
||||
indexName: string;
|
||||
hashKey: string;
|
||||
rangeKey?: string;
|
||||
readCapacity: number;
|
||||
writeCapacity: number;
|
||||
nonKeyAttributes: string[];
|
||||
projectionType: ProjectionType;
|
||||
}
|
||||
|
||||
export type ProjectionType =
|
||||
"ALL" |
|
||||
"INCLUDE" |
|
||||
"KEYS_ONLY";
|
||||
|
||||
export class Table extends lumi.Resource implements TableArgs {
|
||||
public readonly name: string;
|
||||
public readonly hashKey: string;
|
||||
|
@ -25,6 +43,7 @@ export class Table extends lumi.Resource implements TableArgs {
|
|||
public writeCapacity: number;
|
||||
public readonly rangeKey?: string;
|
||||
public readonly tableName?: string;
|
||||
public globalSecondaryIndexes?: GlobalSecondaryIndex[];
|
||||
|
||||
constructor(name: string, args: TableArgs) {
|
||||
super();
|
||||
|
@ -50,6 +69,7 @@ export class Table extends lumi.Resource implements TableArgs {
|
|||
this.writeCapacity = args.writeCapacity;
|
||||
this.rangeKey = args.rangeKey;
|
||||
this.tableName = args.tableName;
|
||||
this.globalSecondaryIndexes = args.globalSecondaryIndexes;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -60,6 +80,7 @@ export interface TableArgs {
|
|||
writeCapacity: number;
|
||||
readonly rangeKey?: string;
|
||||
readonly tableName?: string;
|
||||
globalSecondaryIndexes?: GlobalSecondaryIndex[];
|
||||
}
|
||||
|
||||
|
||||
|
|
90
lib/aws/provider/awsctx/hashset.go
Normal file
90
lib/aws/provider/awsctx/hashset.go
Normal file
|
@ -0,0 +1,90 @@
|
|||
// Licensed to Pulumi Corporation ("Pulumi") under one or more
|
||||
// contributor license agreements. See the NOTICE file distributed with
|
||||
// this work for additional information regarding copyright ownership.
|
||||
// Pulumi licenses this file to You under the Apache License, Version 2.0
|
||||
// (the "License"); you may not use this file except in compliance with
|
||||
// the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package awsctx
|
||||
|
||||
type Hash string
|
||||
|
||||
type Hashable interface {
|
||||
HashKey() Hash
|
||||
HashValue() Hash
|
||||
}
|
||||
|
||||
type HashSet struct {
|
||||
items map[Hash]Hashable
|
||||
}
|
||||
|
||||
func (set *HashSet) Add(item Hashable) *Hash {
|
||||
key := item.HashKey()
|
||||
_, existed := set.items[key]
|
||||
set.items[key] = item
|
||||
if existed {
|
||||
return &key
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (set *HashSet) Length() int { return len(set.items) }
|
||||
func (old *HashSet) Changes(new *HashSet) *HashSetDiff { return newHashSetDiff(old, new) }
|
||||
|
||||
func NewHashSet() *HashSet { return &HashSet{map[Hash]Hashable{}} }
|
||||
|
||||
type HashSetDiff struct {
|
||||
adds []Hashable
|
||||
updates []Hashable
|
||||
deletes []Hashable
|
||||
}
|
||||
|
||||
// Adds returns the items added b
|
||||
func (diff *HashSetDiff) Adds() []Hashable { return diff.adds }
|
||||
func (diff *HashSetDiff) Updates() []Hashable { return diff.updates }
|
||||
func (diff *HashSetDiff) Deletes() []Hashable { return diff.deletes }
|
||||
func (diff *HashSetDiff) AddOrUpdates() []Hashable {
|
||||
newArr := []Hashable{}
|
||||
for _, update := range diff.updates {
|
||||
newArr = append(newArr, update)
|
||||
}
|
||||
for _, add := range diff.adds {
|
||||
newArr = append(newArr, add)
|
||||
}
|
||||
return newArr
|
||||
}
|
||||
|
||||
func newHashSetDiff(old *HashSet, new *HashSet) *HashSetDiff {
|
||||
hashSetDiff := HashSetDiff{
|
||||
adds: []Hashable{},
|
||||
updates: []Hashable{},
|
||||
deletes: []Hashable{},
|
||||
}
|
||||
for key, val := range new.items {
|
||||
oldVal, exists := old.items[key]
|
||||
if exists {
|
||||
if oldVal.HashValue() != val.HashValue() {
|
||||
// If it exists in both, but with different values, it's an update
|
||||
hashSetDiff.updates = append(hashSetDiff.updates, val)
|
||||
}
|
||||
} else {
|
||||
// If it exists in new but not in old, its an add
|
||||
hashSetDiff.adds = append(hashSetDiff.adds, val)
|
||||
}
|
||||
}
|
||||
for key, val := range old.items {
|
||||
_, exists := new.items[key]
|
||||
if !exists {
|
||||
// If it exists in old but not in new, its a delete
|
||||
hashSetDiff.deletes = append(hashSetDiff.deletes, val)
|
||||
}
|
||||
}
|
||||
return &hashSetDiff
|
||||
}
|
185
lib/aws/provider/awsctx/hashset_test.go
Normal file
185
lib/aws/provider/awsctx/hashset_test.go
Normal file
|
@ -0,0 +1,185 @@
|
|||
// Licensed to Pulumi Corporation ("Pulumi") under one or more
|
||||
// contributor license agreements. See the NOTICE file distributed with
|
||||
// this work for additional information regarding copyright ownership.
|
||||
// Pulumi licenses this file to You under the Apache License, Version 2.0
|
||||
// (the "License"); you may not use this file except in compliance with
|
||||
// the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package awsctx
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
type Setting struct {
|
||||
Namespace string
|
||||
Name string
|
||||
Value string
|
||||
}
|
||||
|
||||
var _ Hashable = Setting{}
|
||||
|
||||
func (s Setting) HashKey() Hash {
|
||||
return Hash(s.Namespace + ":" + s.Name)
|
||||
}
|
||||
func (s Setting) HashValue() Hash {
|
||||
return Hash(s.Namespace + ":" + s.Name + ":" + s.Value)
|
||||
}
|
||||
func NewSettingHashSet(options *[]Setting) *HashSet {
|
||||
set := NewHashSet()
|
||||
if options == nil {
|
||||
return set
|
||||
}
|
||||
for _, option := range *options {
|
||||
set.Add(option)
|
||||
}
|
||||
return set
|
||||
}
|
||||
|
||||
func TestHashSetSimple(t *testing.T) {
|
||||
items := &[]Setting{
|
||||
{
|
||||
Namespace: "a",
|
||||
Name: "b",
|
||||
Value: "x",
|
||||
},
|
||||
{
|
||||
Namespace: "a",
|
||||
Name: "c",
|
||||
Value: "y",
|
||||
},
|
||||
}
|
||||
|
||||
set := NewSettingHashSet(items)
|
||||
assert.Equal(t, 2, set.Length(), "expected length is 2")
|
||||
}
|
||||
|
||||
func TestHashSetConflicts(t *testing.T) {
|
||||
items := &[]Setting{
|
||||
{
|
||||
Namespace: "a",
|
||||
Name: "b",
|
||||
Value: "x",
|
||||
},
|
||||
{
|
||||
Namespace: "a",
|
||||
Name: "b",
|
||||
Value: "y",
|
||||
},
|
||||
}
|
||||
|
||||
set := NewSettingHashSet(items)
|
||||
assert.Equal(t, 1, set.Length(), "expected length is 1")
|
||||
}
|
||||
|
||||
func TestHashSetDiffReorder(t *testing.T) {
|
||||
itemsOld := &[]Setting{
|
||||
{
|
||||
Namespace: "a",
|
||||
Name: "b",
|
||||
Value: "x",
|
||||
},
|
||||
{
|
||||
Namespace: "a",
|
||||
Name: "c",
|
||||
Value: "y",
|
||||
},
|
||||
}
|
||||
itemsNew := &[]Setting{
|
||||
{
|
||||
Namespace: "a",
|
||||
Name: "c",
|
||||
Value: "y",
|
||||
},
|
||||
{
|
||||
Namespace: "a",
|
||||
Name: "b",
|
||||
Value: "x",
|
||||
},
|
||||
}
|
||||
|
||||
oldSet := NewSettingHashSet(itemsOld)
|
||||
newSet := NewSettingHashSet(itemsNew)
|
||||
d := oldSet.Changes(newSet)
|
||||
assert.Equal(t, 0, len(d.Deletes()), "expected no deletes")
|
||||
assert.Equal(t, 0, len(d.Adds()), "expected no adds")
|
||||
assert.Equal(t, 0, len(d.Updates()), "expected no updates")
|
||||
}
|
||||
|
||||
func TestHashSetDiffUpdate(t *testing.T) {
|
||||
itemsOld := &[]Setting{
|
||||
{
|
||||
Namespace: "a",
|
||||
Name: "b",
|
||||
Value: "x",
|
||||
},
|
||||
{
|
||||
Namespace: "a",
|
||||
Name: "c",
|
||||
Value: "y",
|
||||
},
|
||||
}
|
||||
itemsNew := &[]Setting{
|
||||
{
|
||||
Namespace: "a",
|
||||
Name: "c",
|
||||
Value: "z",
|
||||
},
|
||||
{
|
||||
Namespace: "a",
|
||||
Name: "b",
|
||||
Value: "x",
|
||||
},
|
||||
}
|
||||
|
||||
oldSet := NewSettingHashSet(itemsOld)
|
||||
newSet := NewSettingHashSet(itemsNew)
|
||||
d := oldSet.Changes(newSet)
|
||||
assert.Equal(t, 0, len(d.Deletes()), "expected no deletes")
|
||||
assert.Equal(t, 0, len(d.Adds()), "expected no adds")
|
||||
assert.Equal(t, 1, len(d.Updates()), "expected 1 update")
|
||||
}
|
||||
|
||||
func TestHashSetDiffUpdateDeleteAndAdd(t *testing.T) {
|
||||
itemsOld := &[]Setting{
|
||||
{ // this is deleted
|
||||
Namespace: "a",
|
||||
Name: "b",
|
||||
Value: "x",
|
||||
},
|
||||
{ // this is updated
|
||||
Namespace: "a",
|
||||
Name: "c",
|
||||
Value: "y",
|
||||
},
|
||||
}
|
||||
itemsNew := &[]Setting{
|
||||
{
|
||||
Namespace: "a",
|
||||
Name: "c",
|
||||
Value: "z",
|
||||
},
|
||||
{ // This is added
|
||||
Namespace: "b",
|
||||
Name: "a",
|
||||
Value: "x",
|
||||
},
|
||||
}
|
||||
|
||||
oldSet := NewSettingHashSet(itemsOld)
|
||||
newSet := NewSettingHashSet(itemsNew)
|
||||
d := oldSet.Changes(newSet)
|
||||
assert.Equal(t, 1, len(d.Deletes()), "expected 1 delete")
|
||||
assert.Equal(t, 1, len(d.Adds()), "expected 1 add1")
|
||||
assert.Equal(t, 1, len(d.Updates()), "expected 1 update")
|
||||
}
|
|
@ -37,12 +37,13 @@ const TableToken = dynamodb.TableToken
|
|||
|
||||
// constants for the various table limits.
|
||||
const (
|
||||
minTableName = 3
|
||||
maxTableName = 255
|
||||
minTableAttributeName = 1
|
||||
maxTableAttributeName = 255
|
||||
minReadCapacity = 1
|
||||
minWriteCapacity = 1
|
||||
minTableName = 3
|
||||
maxTableName = 255
|
||||
minTableAttributeName = 1
|
||||
maxTableAttributeName = 255
|
||||
minReadCapacity = 1
|
||||
minWriteCapacity = 1
|
||||
maxGlobalSecondaryIndexes = 5
|
||||
)
|
||||
|
||||
// NewTableProvider creates a provider that handles DynamoDB Table operations.
|
||||
|
@ -105,6 +106,38 @@ func (p *tableProvider) Check(ctx context.Context, obj *dynamodb.Table) ([]mappe
|
|||
}
|
||||
}
|
||||
|
||||
if obj.GlobalSecondaryIndexes != nil {
|
||||
gsis := *obj.GlobalSecondaryIndexes
|
||||
if len(gsis) > maxGlobalSecondaryIndexes {
|
||||
failures = append(failures,
|
||||
mapper.NewFieldErr(reflect.TypeOf(obj), dynamodb.Table_GlobalSecondaryIndexes,
|
||||
fmt.Errorf("more than %v global secondary indexes requested", maxGlobalSecondaryIndexes)))
|
||||
}
|
||||
for _, gsi := range gsis {
|
||||
name := gsi.IndexName
|
||||
if len(name) < minTableName {
|
||||
failures = append(failures,
|
||||
mapper.NewFieldErr(reflect.TypeOf(gsi), dynamodb.GlobalSecondaryIndex_IndexName,
|
||||
fmt.Errorf("less than minimum length of %v", minTableName)))
|
||||
}
|
||||
if len(name) > maxTableName {
|
||||
failures = append(failures,
|
||||
mapper.NewFieldErr(reflect.TypeOf(gsi), dynamodb.GlobalSecondaryIndex_IndexName,
|
||||
fmt.Errorf("exceeded maximum length of %v", maxTableName)))
|
||||
}
|
||||
if gsi.ReadCapacity < minReadCapacity {
|
||||
failures = append(failures,
|
||||
mapper.NewFieldErr(reflect.TypeOf(gsi), dynamodb.GlobalSecondaryIndex_ReadCapacity,
|
||||
fmt.Errorf("less than minimum of %v", minReadCapacity)))
|
||||
}
|
||||
if gsi.WriteCapacity < minWriteCapacity {
|
||||
failures = append(failures,
|
||||
mapper.NewFieldErr(reflect.TypeOf(gsi), dynamodb.GlobalSecondaryIndex_WriteCapacity,
|
||||
fmt.Errorf("less than minimum of %v", minWriteCapacity)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return failures, nil
|
||||
}
|
||||
|
||||
|
@ -151,6 +184,36 @@ func (p *tableProvider) Create(ctx context.Context, obj *dynamodb.Table) (resour
|
|||
WriteCapacityUnits: aws.Int64(int64(obj.WriteCapacity)),
|
||||
},
|
||||
}
|
||||
if obj.GlobalSecondaryIndexes != nil {
|
||||
var gsis []*awsdynamodb.GlobalSecondaryIndex
|
||||
for _, gsi := range *obj.GlobalSecondaryIndexes {
|
||||
keySchema := []*awsdynamodb.KeySchemaElement{
|
||||
{
|
||||
AttributeName: aws.String(gsi.HashKey),
|
||||
KeyType: aws.String("HASH"),
|
||||
},
|
||||
}
|
||||
if gsi.RangeKey != nil {
|
||||
keySchema = append(keySchema, &awsdynamodb.KeySchemaElement{
|
||||
AttributeName: gsi.RangeKey,
|
||||
KeyType: aws.String("RANGE"),
|
||||
})
|
||||
}
|
||||
gsis = append(gsis, &awsdynamodb.GlobalSecondaryIndex{
|
||||
IndexName: aws.String(gsi.IndexName),
|
||||
KeySchema: keySchema,
|
||||
ProvisionedThroughput: &awsdynamodb.ProvisionedThroughput{
|
||||
ReadCapacityUnits: aws.Int64(int64(gsi.ReadCapacity)),
|
||||
WriteCapacityUnits: aws.Int64(int64(gsi.WriteCapacity)),
|
||||
},
|
||||
Projection: &awsdynamodb.Projection{
|
||||
NonKeyAttributes: aws.StringSlice(gsi.NonKeyAttributes),
|
||||
ProjectionType: aws.String(string(gsi.ProjectionType)),
|
||||
},
|
||||
})
|
||||
}
|
||||
create.GlobalSecondaryIndexes = gsis
|
||||
}
|
||||
|
||||
// Now go ahead and perform the action.
|
||||
if _, err := p.ctx.DynamoDB().CreateTable(create); err != nil {
|
||||
|
@ -180,9 +243,23 @@ func (p *tableProvider) InspectChange(ctx context.Context, id resource.ID,
|
|||
// to new values. The resource ID is returned and may be different if the resource had to be recreated.
|
||||
func (p *tableProvider) Update(ctx context.Context, id resource.ID,
|
||||
old *dynamodb.Table, new *dynamodb.Table, diff *resource.ObjectDiff) error {
|
||||
if diff.Changed(dynamodb.Table_Attributes) {
|
||||
return errors.New("Not yet implemented - update of Attributes property")
|
||||
}
|
||||
|
||||
// Note: Changing dynamodb.Table_Attributes alone does not trigger an update on the resource, it must be changed
|
||||
// along with using the new attributes in an index. The latter will process the update.
|
||||
|
||||
// Per DynamoDB documention at http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_UpdateTable.html:
|
||||
|
||||
// You can only perform one of the following operations at once:
|
||||
// * Modify the provisioned throughput settings of the table.
|
||||
// * Enable or disable Streams on the table.
|
||||
// * Remove a global secondary index from the table.
|
||||
// * Create a new global secondary index on the table. Once the index begins backfilling, you can use
|
||||
// UpdateTable to perform other operations.
|
||||
|
||||
// So we have to serialize each of the requested updates and potentially make multiple calls to UpdateTable, waiting
|
||||
// for the Table to reach the ready state between calls.
|
||||
|
||||
// First modify provisioned throughput if needed.
|
||||
if diff.Changed(dynamodb.Table_ReadCapacity) || diff.Changed(dynamodb.Table_WriteCapacity) {
|
||||
fmt.Printf("Updating provisioned capacity for DynamoDB Table %v\n", id.String())
|
||||
update := &awsdynamodb.UpdateTableInput{
|
||||
|
@ -192,11 +269,106 @@ func (p *tableProvider) Update(ctx context.Context, id resource.ID,
|
|||
WriteCapacityUnits: aws.Int64(int64(new.WriteCapacity)),
|
||||
},
|
||||
}
|
||||
if _, err := p.ctx.DynamoDB().UpdateTable(update); err != nil {
|
||||
err := p.updateTable(id, update)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := p.waitForTableState(id, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Next, delete and create global secondary indexes.
|
||||
if diff.Changed(dynamodb.Table_GlobalSecondaryIndexes) {
|
||||
newGlobalSecondaryIndexes := newGlobalSecondaryIndexHashSet(new.GlobalSecondaryIndexes)
|
||||
oldGlobalSecondaryIndexes := newGlobalSecondaryIndexHashSet(old.GlobalSecondaryIndexes)
|
||||
d := oldGlobalSecondaryIndexes.Changes(newGlobalSecondaryIndexes)
|
||||
// First, add any new indexes
|
||||
for _, o := range d.Adds() {
|
||||
gsi := o.(globalSecondaryIndexHash).item
|
||||
fmt.Printf("Adding new global secondary index %v for DynamoDB Table %v\n", gsi.IndexName, id.String())
|
||||
keySchema := []*awsdynamodb.KeySchemaElement{
|
||||
{
|
||||
AttributeName: aws.String(gsi.HashKey),
|
||||
KeyType: aws.String("HASH"),
|
||||
},
|
||||
}
|
||||
if gsi.RangeKey != nil {
|
||||
keySchema = append(keySchema, &awsdynamodb.KeySchemaElement{
|
||||
AttributeName: gsi.RangeKey,
|
||||
KeyType: aws.String("RANGE"),
|
||||
})
|
||||
}
|
||||
var attributeDefinitions []*awsdynamodb.AttributeDefinition
|
||||
for _, attr := range new.Attributes {
|
||||
attributeDefinitions = append(attributeDefinitions, &awsdynamodb.AttributeDefinition{
|
||||
AttributeName: aws.String(attr.Name),
|
||||
AttributeType: aws.String(string(attr.Type)),
|
||||
})
|
||||
}
|
||||
update := &awsdynamodb.UpdateTableInput{
|
||||
TableName: aws.String(id.String()),
|
||||
AttributeDefinitions: attributeDefinitions,
|
||||
GlobalSecondaryIndexUpdates: []*awsdynamodb.GlobalSecondaryIndexUpdate{
|
||||
{
|
||||
Create: &awsdynamodb.CreateGlobalSecondaryIndexAction{
|
||||
IndexName: aws.String(gsi.IndexName),
|
||||
KeySchema: keySchema,
|
||||
ProvisionedThroughput: &awsdynamodb.ProvisionedThroughput{
|
||||
ReadCapacityUnits: aws.Int64(int64(gsi.ReadCapacity)),
|
||||
WriteCapacityUnits: aws.Int64(int64(gsi.WriteCapacity)),
|
||||
},
|
||||
Projection: &awsdynamodb.Projection{
|
||||
NonKeyAttributes: aws.StringSlice(gsi.NonKeyAttributes),
|
||||
ProjectionType: aws.String(string(gsi.ProjectionType)),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
err := p.updateTable(id, update)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// Next, modify provisioned throughput on any updated indexes
|
||||
for _, o := range d.Updates() {
|
||||
gsi := o.(globalSecondaryIndexHash).item
|
||||
fmt.Printf("Updating capacity for global secondary index %v for DynamoDB Table %v\n", gsi.IndexName, id.String())
|
||||
update := &awsdynamodb.UpdateTableInput{
|
||||
TableName: aws.String(id.String()),
|
||||
GlobalSecondaryIndexUpdates: []*awsdynamodb.GlobalSecondaryIndexUpdate{
|
||||
{
|
||||
Update: &awsdynamodb.UpdateGlobalSecondaryIndexAction{
|
||||
IndexName: aws.String(gsi.IndexName),
|
||||
ProvisionedThroughput: &awsdynamodb.ProvisionedThroughput{
|
||||
ReadCapacityUnits: aws.Int64(int64(gsi.ReadCapacity)),
|
||||
WriteCapacityUnits: aws.Int64(int64(gsi.WriteCapacity)),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
err := p.updateTable(id, update)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// Finally, delete and removed indexes
|
||||
for _, o := range d.Deletes() {
|
||||
gsi := o.(globalSecondaryIndexHash).item
|
||||
fmt.Printf("Deleting global secondary index %v for DynamoDB Table %v\n", gsi.IndexName, id.String())
|
||||
update := &awsdynamodb.UpdateTableInput{
|
||||
TableName: aws.String(id.String()),
|
||||
GlobalSecondaryIndexUpdates: []*awsdynamodb.GlobalSecondaryIndexUpdate{
|
||||
{
|
||||
Delete: &awsdynamodb.DeleteGlobalSecondaryIndexAction{
|
||||
IndexName: aws.String(gsi.IndexName),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
err := p.updateTable(id, update)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -238,6 +410,35 @@ func (p *tableProvider) Delete(ctx context.Context, id resource.ID) error {
|
|||
return p.waitForTableState(id, false)
|
||||
}
|
||||
|
||||
func (p *tableProvider) updateTable(id resource.ID, update *awsdynamodb.UpdateTableInput) error {
|
||||
succ, err := awsctx.RetryUntil(
|
||||
p.ctx,
|
||||
func() (bool, error) {
|
||||
_, err := p.ctx.DynamoDB().UpdateTable(update)
|
||||
if err != nil {
|
||||
if erraws, iserraws := err.(awserr.Error); iserraws {
|
||||
if erraws.Code() == "ResourceNotFoundException" || erraws.Code() == "ResourceInUseException" {
|
||||
fmt.Printf("Waiting to update resource '%v': %v", id, erraws.Message())
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return false, err // anything else is a real error; propagate it.
|
||||
}
|
||||
return true, nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !succ {
|
||||
return fmt.Errorf("DynamoDB table '%v' could not be updated", id)
|
||||
}
|
||||
if err := p.waitForTableState(id, true); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *tableProvider) waitForTableState(id resource.ID, exist bool) error {
|
||||
succ, err := awsctx.RetryUntilLong(
|
||||
p.ctx,
|
||||
|
@ -277,3 +478,26 @@ func (p *tableProvider) waitForTableState(id resource.ID, exist bool) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type globalSecondaryIndexHash struct {
|
||||
item dynamodb.GlobalSecondaryIndex
|
||||
}
|
||||
|
||||
var _ awsctx.Hashable = globalSecondaryIndexHash{}
|
||||
|
||||
func (option globalSecondaryIndexHash) HashKey() awsctx.Hash {
|
||||
return awsctx.Hash(option.item.IndexName)
|
||||
}
|
||||
func (option globalSecondaryIndexHash) HashValue() awsctx.Hash {
|
||||
return awsctx.Hash(string(int(option.item.ReadCapacity)) + ":" + string(int(option.item.WriteCapacity)))
|
||||
}
|
||||
func newGlobalSecondaryIndexHashSet(options *[]dynamodb.GlobalSecondaryIndex) *awsctx.HashSet {
|
||||
set := awsctx.NewHashSet()
|
||||
if options == nil {
|
||||
return set
|
||||
}
|
||||
for _, option := range *options {
|
||||
set.Add(globalSecondaryIndexHash{option})
|
||||
}
|
||||
return set
|
||||
}
|
||||
|
|
|
@ -153,41 +153,19 @@ func (p *environmentProvider) Update(ctx context.Context, id resource.ID,
|
|||
envUpdate.SolutionStackName = new.SolutionStackName
|
||||
}
|
||||
if diff.Changed(elasticbeanstalk.Environment_OptionSettings) {
|
||||
newOptions := []elasticbeanstalk.OptionSetting{}
|
||||
if new.OptionSettings != nil {
|
||||
newOptions = *new.OptionSettings
|
||||
}
|
||||
oldOptions := []elasticbeanstalk.OptionSetting{}
|
||||
if old.OptionSettings != nil {
|
||||
oldOptions = *old.OptionSettings
|
||||
}
|
||||
optionSettingAddOrUpdates := map[string]*elasticbeanstalk.OptionSetting{}
|
||||
optionSettingDeletes := map[string]*elasticbeanstalk.OptionSetting{}
|
||||
for _, newOption := range newOptions {
|
||||
key := newOption.Namespace + ":" + newOption.OptionName
|
||||
// This setting appears in the new so is a candidate for add/update
|
||||
optionSettingAddOrUpdates[key] = &newOption
|
||||
}
|
||||
for _, oldOption := range oldOptions {
|
||||
key := oldOption.Namespace + ":" + oldOption.OptionName
|
||||
if update, ok := optionSettingAddOrUpdates[key]; !ok {
|
||||
// This setting appears in old but not new, it should be deleted
|
||||
optionSettingDeletes[key] = &oldOption
|
||||
} else {
|
||||
if update.Value == oldOption.Value {
|
||||
// This setting appears in old and new and has the same value, remove from candidates for add/update
|
||||
delete(optionSettingAddOrUpdates, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, option := range optionSettingAddOrUpdates {
|
||||
newOptionsSet := newOptionSettingHashSet(new.OptionSettings)
|
||||
oldOptionsSet := newOptionSettingHashSet(old.OptionSettings)
|
||||
d := oldOptionsSet.Changes(newOptionsSet)
|
||||
for _, o := range d.AddOrUpdates() {
|
||||
option := o.(optionSettingHash).item
|
||||
envUpdate.OptionSettings = append(envUpdate.OptionSettings, &awselasticbeanstalk.ConfigurationOptionSetting{
|
||||
Namespace: aws.String(option.Namespace),
|
||||
OptionName: aws.String(option.OptionName),
|
||||
Value: aws.String(option.Value),
|
||||
})
|
||||
}
|
||||
for _, option := range optionSettingDeletes {
|
||||
for _, o := range d.Deletes() {
|
||||
option := o.(optionSettingHash).item
|
||||
envUpdate.OptionsToRemove = append(envUpdate.OptionsToRemove, &awselasticbeanstalk.OptionSpecification{
|
||||
Namespace: aws.String(option.Namespace),
|
||||
OptionName: aws.String(option.OptionName),
|
||||
|
@ -265,3 +243,26 @@ func (p *environmentProvider) getEnvironment(name *string) (*awselasticbeanstalk
|
|||
environment := environments[0]
|
||||
return environment, nil
|
||||
}
|
||||
|
||||
type optionSettingHash struct {
|
||||
item elasticbeanstalk.OptionSetting
|
||||
}
|
||||
|
||||
var _ awsctx.Hashable = optionSettingHash{}
|
||||
|
||||
func (option optionSettingHash) HashKey() awsctx.Hash {
|
||||
return awsctx.Hash(option.item.Namespace + ":" + option.item.OptionName)
|
||||
}
|
||||
func (option optionSettingHash) HashValue() awsctx.Hash {
|
||||
return awsctx.Hash(option.item.Namespace + ":" + option.item.OptionName + ":" + option.item.Value)
|
||||
}
|
||||
func newOptionSettingHashSet(options *[]elasticbeanstalk.OptionSetting) *awsctx.HashSet {
|
||||
set := awsctx.NewHashSet()
|
||||
if options == nil {
|
||||
return set
|
||||
}
|
||||
for _, option := range *options {
|
||||
set.Add(optionSettingHash{option})
|
||||
}
|
||||
return set
|
||||
}
|
||||
|
|
|
@ -31,6 +31,30 @@ const (
|
|||
Attribute_Type = "type"
|
||||
)
|
||||
|
||||
/* Marshalable GlobalSecondaryIndex structure(s) */
|
||||
|
||||
// GlobalSecondaryIndex is a marshalable representation of its corresponding IDL type.
|
||||
type GlobalSecondaryIndex struct {
|
||||
IndexName string `json:"indexName"`
|
||||
HashKey string `json:"hashKey"`
|
||||
RangeKey *string `json:"rangeKey,omitempty"`
|
||||
ReadCapacity float64 `json:"readCapacity"`
|
||||
WriteCapacity float64 `json:"writeCapacity"`
|
||||
NonKeyAttributes []string `json:"nonKeyAttributes"`
|
||||
ProjectionType ProjectionType `json:"projectionType"`
|
||||
}
|
||||
|
||||
// GlobalSecondaryIndex's properties have constants to make dealing with diffs and property bags easier.
|
||||
const (
|
||||
GlobalSecondaryIndex_IndexName = "indexName"
|
||||
GlobalSecondaryIndex_HashKey = "hashKey"
|
||||
GlobalSecondaryIndex_RangeKey = "rangeKey"
|
||||
GlobalSecondaryIndex_ReadCapacity = "readCapacity"
|
||||
GlobalSecondaryIndex_WriteCapacity = "writeCapacity"
|
||||
GlobalSecondaryIndex_NonKeyAttributes = "nonKeyAttributes"
|
||||
GlobalSecondaryIndex_ProjectionType = "projectionType"
|
||||
)
|
||||
|
||||
/* RPC stubs for Table resource provider */
|
||||
|
||||
// TableToken is the type token corresponding to the Table package type.
|
||||
|
@ -203,6 +227,7 @@ type Table struct {
|
|||
WriteCapacity float64 `json:"writeCapacity"`
|
||||
RangeKey *string `json:"rangeKey,omitempty"`
|
||||
TableName *string `json:"tableName,omitempty"`
|
||||
GlobalSecondaryIndexes *[]GlobalSecondaryIndex `json:"globalSecondaryIndexes,omitempty"`
|
||||
}
|
||||
|
||||
// Table's properties have constants to make dealing with diffs and property bags easier.
|
||||
|
@ -214,18 +239,23 @@ const (
|
|||
Table_WriteCapacity = "writeCapacity"
|
||||
Table_RangeKey = "rangeKey"
|
||||
Table_TableName = "tableName"
|
||||
Table_GlobalSecondaryIndexes = "globalSecondaryIndexes"
|
||||
)
|
||||
|
||||
/* Typedefs */
|
||||
|
||||
type (
|
||||
AttributeType string
|
||||
ProjectionType string
|
||||
)
|
||||
|
||||
/* Constants */
|
||||
|
||||
const (
|
||||
AllProjection ProjectionType = "ALL"
|
||||
BinaryAttribute AttributeType = "B"
|
||||
IncludeProjection ProjectionType = "INCLUDE"
|
||||
KeysOnlyProjection ProjectionType = "KEYS_ONLY"
|
||||
NumberAttribute AttributeType = "N"
|
||||
StringAttribute AttributeType = "S"
|
||||
)
|
||||
|
|
Loading…
Reference in a new issue