Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions build/terraform/aws/dynamodb/_variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ variable "attributes" {
}))
}

# change data capture via Streams is enabled by default for the table
# https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html
variable "stream_view_type" {
type = string
default = "NEW_AND_OLD_IMAGES"
}

variable "tags" {
type = map(any)
default = {}
Expand Down
5 changes: 5 additions & 0 deletions build/terraform/aws/dynamodb/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ resource "aws_dynamodb_table" "table" {
ignore_changes = [read_capacity, write_capacity]
}

# Streams are only charged for read operations and reads from AWS Lambda are free
# https://aws.amazon.com/dynamodb/pricing/
stream_enabled = true
stream_view_type = var.stream_view_type

dynamic "attribute" {
for_each = var.attributes

Expand Down
14 changes: 14 additions & 0 deletions build/terraform/aws/iam/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,20 @@ data "aws_iam_policy_document" "dynamodb_write" {
}
}

# https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/iam-policy-read-stream-only.html
data "aws_iam_policy_document" "dynamodb_stream_read" {
statement {
effect = "Allow"
actions = [
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:DescribeStream",
"dynamodb:ListStreams",
]
resources = var.resources
}
}

data "aws_iam_policy_document" "kms_read" {
statement {
effect = "Allow"
Expand Down
199 changes: 199 additions & 0 deletions cmd/aws/lambda/substation/dynamodb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package main

import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/brexhq/substation/cmd"
"github.com/brexhq/substation/config"
"github.com/brexhq/substation/internal/aws/dynamodb"
"golang.org/x/sync/errgroup"
)

type dynamodbMetadata struct {
ApproximateCreationDateTime time.Time `json:"approximateCreationDateTime"`
EventSourceArn string `json:"eventSourceArn"`
SequenceNumber string `json:"sequenceNumber"`
SizeBytes int64 `json:"sizeBytes"`
StreamViewType string `json:"streamViewType"`
}

//nolint: gocognit // ignore cognitive complexity
func dynamodbHandler(ctx context.Context, event events.DynamoDBEvent) error {
sub := cmd.New()

// retrieve and load configuration
cfg, err := getConfig(ctx)
if err != nil {
return fmt.Errorf("dynamodb handler: %v", err)
}

if err := sub.SetConfig(cfg); err != nil {
return fmt.Errorf("dynamodb handler: %v", err)
}

// maintains app state
group, ctx := errgroup.WithContext(ctx)

// load
var sinkWg sync.WaitGroup
sinkWg.Add(1)
group.Go(func() error {
return sub.Sink(ctx, &sinkWg)
})

// transform
var transformWg sync.WaitGroup
for w := 0; w < sub.Concurrency(); w++ {
transformWg.Add(1)
group.Go(func() error {
return sub.Transform(ctx, &transformWg)
})
}

// ingest
group.Go(func() error {
// the DynamoDB table name is the second element of the slash-delimited Stream ARN.
// arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291
table := strings.Split(event.Records[0].EventSourceArn, "/")[1]

for _, record := range event.Records {
select {
case <-ctx.Done():
return ctx.Err()
default:
// only records that contain image data (changes) are supported.
if record.Change.StreamViewType == "KEYS_ONLY" {
continue
}

// DynamoDB record changes are converted to an object modeled similarly to
// schemas used in Debezium (https://debezium.io/). if the View Type on the
// Stream is OLD_IMAGE, then the "after" field is always null; if the View
// Type is NEW_IMAGE, then the "before" field is always null. setting the
// View Type to NEW_AND_OLD_IMAGES is recommended.
//
// for more information see these examples from the Debezium documentation:
// - https://debezium.io/documentation/reference/1.2/connectors/mysql.html#mysql-change-event-value
// - https://debezium.io/documentation/reference/1.2/connectors/postgresql.html#postgresql-change-event-value
// - https://debezium.io/documentation/reference/1.2/connectors/sqlserver.html#sqlserver-change-event-value
//
// records are converted to this format:
// {
// "source": {
// "ts_ms": 0,
// "table": "table",
// "connector": "dynamodb"
// },
// "ts_ms": 0,
// "op": "c",
// "before": { ... },
// "after": { ... }
// }
capsule := config.NewCapsule()

if err := capsule.Set("source.ts_ms", record.Change.ApproximateCreationDateTime.Time.UnixMilli()); err != nil {
return fmt.Errorf("dynamodb handler: %v", err)
}

if err := capsule.Set("source.table", table); err != nil {
return fmt.Errorf("dynamodb handler: %v", err)
}

if err := capsule.Set("source.connector", "dynamodb"); err != nil {
return fmt.Errorf("dynamodb handler: %v", err)
}

if err := capsule.Set("ts_ms", time.Now().UnixMilli()); err != nil {
return fmt.Errorf("dynamodb handler: %v", err)
}

// maps the type of data modification to a Debezium operation string.
// Debezium operations that are relevant to DynamoDB are:
// - c: create (INSERT)
// - u: update (MODIFY)
// - d: delete (REMOVE)
switch record.EventName {
case "INSERT":
if err := capsule.Set("op", "c"); err != nil {
return fmt.Errorf("dynamodb handler: %v", err)
}
case "MODIFY":
if err := capsule.Set("op", "u"); err != nil {
return fmt.Errorf("dynamodb handler: %v", err)
}
case "REMOVE":
if err := capsule.Set("op", "d"); err != nil {
return fmt.Errorf("dynamodb handler: %v", err)
}
}

// if either image is missing, then the value is set to null.
if record.Change.OldImage == nil {
if err := capsule.Set("before", nil); err != nil {
return fmt.Errorf("dynamodb handler: %v", err)
}
} else {
var before map[string]interface{}
if err = dynamodbattribute.UnmarshalMap(
dynamodb.ConvertEventsAttributeValueMap(record.Change.OldImage),
&before,
); err != nil {
return fmt.Errorf("dynamodb handler: %v", err)
}

if err := capsule.Set("before", before); err != nil {
return fmt.Errorf("dynamodb handler: %v", err)
}
}

if record.Change.NewImage == nil {
if err := capsule.Set("after", nil); err != nil {
return fmt.Errorf("dynamodb handler: %v", err)
}
} else {
var after map[string]interface{}
if err = dynamodbattribute.UnmarshalMap(
dynamodb.ConvertEventsAttributeValueMap(record.Change.NewImage),
&after,
); err != nil {
return fmt.Errorf("dynamodb handler: %v", err)
}

if err := capsule.Set("after", after); err != nil {
return fmt.Errorf("dynamodb handler: %v", err)
}
}

if _, err := capsule.SetMetadata(dynamodbMetadata{
record.Change.ApproximateCreationDateTime.Time,
record.EventSourceArn,
record.Change.SequenceNumber,
record.Change.SizeBytes,
record.Change.StreamViewType,
}); err != nil {
return fmt.Errorf("dynamodb handler: %v", err)
}

sub.Send(capsule)
}
}

sub.WaitTransform(&transformWg)
sub.WaitSink(&sinkWg)

return nil
})

// block until ITL is complete
if err := sub.Block(ctx, group); err != nil {
return fmt.Errorf("dynamodb handler: %v", err)
}

return nil
}
2 changes: 2 additions & 0 deletions cmd/aws/lambda/substation/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func main() {
switch h := handler; h {
case "AWS_API_GATEWAY":
lambda.Start(gatewayHandler)
case "AWS_DYNAMODB":
lambda.Start(dynamodbHandler)
case "AWS_KINESIS":
lambda.Start(kinesisHandler)
case "AWS_LAMBDA_ASYNC":
Expand Down
77 changes: 77 additions & 0 deletions internal/aws/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"strconv"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
Expand Down Expand Up @@ -127,3 +128,79 @@ func (a *API) GetItem(ctx aws.Context, table string, attributes map[string]inter

return resp, nil
}

// ConvertEventsAttributeValue converts events.DynamoDBAttributeValue to dynamodb.AttributeValue.
func ConvertEventsAttributeValue(v events.DynamoDBAttributeValue) *dynamodb.AttributeValue {
switch v.DataType() {
case events.DataTypeBinary:
return &dynamodb.AttributeValue{
B: v.Binary(),
}
case events.DataTypeBinarySet:
return &dynamodb.AttributeValue{
BS: v.BinarySet(),
}
case events.DataTypeNumber:
return &dynamodb.AttributeValue{
N: aws.String(v.Number()),
}
case events.DataTypeNumberSet:
av := &dynamodb.AttributeValue{}

for _, n := range v.NumberSet() {
av.NS = append(av.NS, aws.String(n))
}

return av
case events.DataTypeString:
return &dynamodb.AttributeValue{
S: aws.String(v.String()),
}
case events.DataTypeStringSet:
av := &dynamodb.AttributeValue{}

for _, s := range v.StringSet() {
av.SS = append(av.SS, aws.String(s))
}

return av
case events.DataTypeList:
av := &dynamodb.AttributeValue{}

for _, v := range v.List() {
av.L = append(av.L, ConvertEventsAttributeValue(v))
}

return av
case events.DataTypeMap:
av := &dynamodb.AttributeValue{}
av.M = make(map[string]*dynamodb.AttributeValue)

for k, v := range v.Map() {
av.M[k] = ConvertEventsAttributeValue(v)
}

return av
case events.DataTypeNull:
return &dynamodb.AttributeValue{
NULL: aws.Bool(true),
}
case events.DataTypeBoolean:
return &dynamodb.AttributeValue{
BOOL: aws.Bool(v.Boolean()),
}
default:
return nil
}
}

// ConvertEventsAttributeValueMap converts a map of events.DynamoDBAttributeValue to a map of dynamodb.AttributeValue.
func ConvertEventsAttributeValueMap(m map[string]events.DynamoDBAttributeValue) map[string]*dynamodb.AttributeValue {
av := make(map[string]*dynamodb.AttributeValue)

for k, v := range m {
av[k] = ConvertEventsAttributeValue(v)
}

return av
}