Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions build/config/substation.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,20 @@
type: 'meta_for_each',
settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))),
},
kv_store: {
lock(settings={}): {
local default = {
object: $.config.object { lock_key: null, ttl_key: null },
transform: null,
kv_store: null,
prefix: null,
ttl_offset: "0s",
},

type: 'meta_kv_store_lock',
settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))),
},
},
metric: {
duration(settings={}): {
local default = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// This example shows how to use the `meta_kv_store_lock` transform to
// create an "exactly once" semantic for a pipeline consumer.
local sub = import '../../../../../build/config/substation.libsonnet';

// In production environments a distributed KV store should be used.
local kv = sub.kv_store.memory();

{
transforms: [
// If a message acquires a lock, then it is tagged for inspection.
sub.tf.meta.kv_store.lock(settings={
kv_store: kv,
prefix: 'eo_consumer',
ttl_offset: '1m',
transform: sub.tf.obj.insert({ object: { target_key: 'meta eo_consumer' }, value: 'locked' }),
}),
// Messages that are not locked are dropped from the pipeline.
sub.tf.meta.switch({ cases: [
{
condition: sub.cnd.none([
sub.cnd.str.eq({ object: { source_key: 'meta eo_consumer' }, value: 'locked' }),
]),
transform: sub.tf.utility.drop(),
},
] }),
// At this point only locked messages exist in the pipeline.
sub.tf.send.stdout(),
],
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"a":"b"}
{"a":"b"}
{"c":"d"}
{"a":"b"}
{"c":"d"}
{"c":"d"}
{"e":"f"}
{"a":"b"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// This example shows how to use the `meta_kv_store_lock` transform to
// create an "exactly once" semantic for a pipeline producer.
local sub = import '../../../../../build/config/substation.libsonnet';

// In production environments a distributed KV store should be used.
local kv = sub.kv_store.memory();

{
transforms: [
// This only prints messages that acquire a lock. Any message
// that fails to acquire a lock will be skipped. An error in the
// sub-transform will cause all previously locked messages to be
// unlocked.
sub.tf.meta.err({ transform: sub.tf.meta.kv_store.lock(settings={
kv_store: kv,
prefix: 'eo_producer',
ttl_offset: '1m',
transform: sub.tf.send.stdout(),
}) }),
],
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"a":"b"}
{"a":"b"}
{"c":"d"}
{"a":"b"}
{"c":"d"}
{"c":"d"}
{"e":"f"}
{"a":"b"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// This example shows how to use the `meta_kv_store_lock` transform to
// create an "exactly once" semantic for an entire pipeline system.
local sub = import '../../../../../build/config/substation.libsonnet';

// In production environments a distributed KV store should be used.
local kv = sub.kv_store.memory();

{
transforms: [
// All messages are locked before being sent through other transform
// functions, ensuring that the message is processed only once.
// An error in any sub-transform will cause all previously locked
// messages to be unlocked.
sub.tf.meta.err({ transform: sub.tf.meta.kv_store.lock(settings={
kv_store: kv,
prefix: 'eo_system',
ttl_offset: '1m',
transform: sub.tf.meta.pipeline({ transforms: [
sub.tf.obj.insert({ object: { target_key: 'processed' }, value: true }),
sub.tf.send.stdout(),
] }),
}) }),
],
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"a":"b"}
{"a":"b"}
{"c":"d"}
{"a":"b"}
{"c":"d"}
{"c":"d"}
{"e":"f"}
{"a":"b"}
4 changes: 4 additions & 0 deletions examples/terraform/aws/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ flowchart LR
end
```

## Distributed Lock

Deploys a data pipeline that implements a distributed lock pattern using DynamoDB. This pattern can be used to add "exactly-once" semantics to services that otherwise do not support it. For similar examples, see the "exactly once" configurations [here](/examples/config/transform/meta/).

## Telephone

Deploys a data pipeline that implements a "telephone" pattern by sharing data as context between multiple Lambda functions using a DynamoDB table. This pattern can be used to enrich events across unique data sources.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
local sub = import '../../../../../../../build/config/substation.libsonnet';

local kv = sub.kv_store.aws_dynamodb({
table_name: 'substation',
attributes: { partition_key: 'PK', ttl: 'ttl' },
});

{
transforms: [
// All messages are locked before they are sent through other
// transform functions, ensuring that the message is processed
// exactly once.
//
// An error in any sub-transform will cause all previously locked
// messages to be unlocked; this only applies to messages that have
// not yet been flushed by a control message. Use the `utility_control`
// transform to manage how often messages are flushed.
sub.tf.meta.kv_store.lock(settings={
kv_store: kv,
prefix: 'distributed_lock',
ttl_offset: '1m',
transform: sub.tf.meta.pipeline({ transforms: [
// Delaying and simulating an error makes it possible to
// test message unlocking in real-time (view changes using
// the DynamoDB console). Uncomment the lines below to see
// how it works.
//
// sub.tf.utility.delay({ duration: '10s' }),
// sub.pattern.transform.conditional(
// condition=sub.cnd.utility.random(),
// transform=sub.tf.utility.err({ message: 'simulating error to trigger unlock' }),
// ),
//
// Messages are printed to the console. After this, they are locked
// and will not be printed again until the lock expires.
sub.tf.send.stdout(),
] }),
}),
],
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
data "aws_caller_identity" "caller" {}

module "appconfig" {
source = "../../../../../../build/terraform/aws/appconfig"

config = {
name = "substation"
environments = [{ name = "example" }]
}
}

module "ecr" {
source = "../../../../../../build/terraform/aws/ecr"

config = {
name = "substation"
force_delete = true
}
}

module "dynamodb" {
source = "../../../../../../build/terraform/aws/dynamodb"

config = {
name = "substation"
hash_key = "PK"
ttl = "ttl"

attributes = [
{
name = "PK"
type = "S"
}
]
}

access = [
module.node.role.name,
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
module "node" {
source = "../../../../../../build/terraform/aws/lambda"
appconfig = module.appconfig

config = {
name = "node"
description = "Substation node that transforms data exactly-once using a distributed lock"
image_uri = "${module.ecr.url}:v1.3.0"
image_arm = true
env = {
"SUBSTATION_CONFIG" : "http://localhost:2772/applications/substation/environments/example/configurations/node"
"SUBSTATION_LAMBDA_HANDLER" : "AWS_API_GATEWAY"
"SUBSTATION_DEBUG" : true
}
}

depends_on = [
module.appconfig.name,
module.ecr.url,
]
}

resource "aws_lambda_function_url" "node" {
function_name = module.node.name
authorization_type = "NONE"
}
36 changes: 34 additions & 2 deletions internal/aws/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,22 @@ func (a *API) IsEnabled() bool {
return a.Client != nil
}

func (a *API) DeleteItem(ctx aws.Context, table string, key map[string]*dynamodb.AttributeValue) (resp *dynamodb.DeleteItemOutput, err error) {
ctx = context.WithoutCancel(ctx)
resp, err = a.Client.DeleteItemWithContext(
ctx,
&dynamodb.DeleteItemInput{
TableName: aws.String(table),
Key: key,
},
)
if err != nil {
return nil, fmt.Errorf("deleteitem table %s: %v", table, err)
}

return resp, nil
}

// BatchPutItem is a convenience wrapper for putting multiple items into a DynamoDB table.
func (a *API) BatchPutItem(ctx aws.Context, table string, items []map[string]*dynamodb.AttributeValue) (resp *dynamodb.BatchWriteItemOutput, err error) {
var requests []*dynamodb.WriteRequest
Expand All @@ -62,7 +78,6 @@ func (a *API) BatchPutItem(ctx aws.Context, table string, items []map[string]*dy
},
},
)

if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
Expand Down Expand Up @@ -96,14 +111,31 @@ func (a *API) PutItem(ctx aws.Context, table string, item map[string]*dynamodb.A
TableName: aws.String(table),
Item: item,
})

if err != nil {
return nil, fmt.Errorf("putitem table %s: %v", table, err)
}

return resp, nil
}

func (a *API) PutItemWithCondition(ctx aws.Context, table string, item map[string]*dynamodb.AttributeValue, conditionExpression string, expressionAttributeNames map[string]*string, expressionAttributeValues map[string]*dynamodb.AttributeValue) (resp *dynamodb.PutItemOutput, err error) {
input := &dynamodb.PutItemInput{
TableName: aws.String(table),
ConditionExpression: aws.String(conditionExpression),
ExpressionAttributeNames: expressionAttributeNames,
Item: item,
ExpressionAttributeValues: expressionAttributeValues,
ReturnValues: aws.String("ALL_OLD"),
}

resp, err = a.Client.PutItemWithContext(ctx, input)
if err != nil {
return resp, err
}

return resp, nil
}

/*
Query is a convenience wrapper for querying a DynamoDB table. The paritition and sort keys are always referenced in the key condition expression as ":PK" and ":SK". Refer to the DynamoDB documentation for the Query operation's request syntax and key condition expression patterns:

Expand Down
69 changes: 69 additions & 0 deletions internal/kv/aws_dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package kv
import (
"context"
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/brexhq/substation/config"
"github.com/brexhq/substation/internal/aws"
Expand Down Expand Up @@ -66,6 +68,73 @@ func (store *kvAWSDynamoDB) String() string {
return toString(store)
}

// Lock adds an item to the DynamoDB table with a conditional check.
func (kv *kvAWSDynamoDB) Lock(ctx context.Context, key string, ttl int64) error {
attr := map[string]interface{}{
kv.Attributes.PartitionKey: key,
kv.Attributes.TTL: ttl,
}

if kv.Attributes.SortKey != "" {
attr[kv.Attributes.SortKey] = "substation:kv_store"
}

// Since the sort key is optional and static, it is not included in the check.
exp := "attribute_not_exists(#pk) OR #ttl <= :now"
expAttrNames := map[string]*string{
"#pk": &kv.Attributes.PartitionKey,
"#ttl": &kv.Attributes.TTL,
}
expAttrVals := map[string]interface{}{
":now": time.Now().Unix(),
}

a, err := dynamodbattribute.MarshalMap(attr)
if err != nil {
return err
}

v, err := dynamodbattribute.MarshalMap(expAttrVals)
if err != nil {
return err
}

// If the item already exists and the TTL has not expired, then this returns ErrNoLock. The
// caller is expected to handle this error and retry the call if necessary.
if _, err := kv.client.PutItemWithCondition(ctx, kv.TableName, a, exp, expAttrNames, v); err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "ConditionalCheckFailedException" {
return ErrNoLock
}
}

return err
}

return nil
}

func (store *kvAWSDynamoDB) Unlock(ctx context.Context, key string) error {
m := map[string]interface{}{
store.Attributes.PartitionKey: key,
}

if store.Attributes.SortKey != "" {
m[store.Attributes.SortKey] = "substation:kv_store"
}

item, err := dynamodbattribute.MarshalMap(m)
if err != nil {
return err
}

if _, err := store.client.DeleteItem(ctx, store.TableName, item); err != nil {
return err
}

return nil
}

// Get retrieves an item from the DynamoDB table. If the item had a time-to-live (TTL)
// configured when it was added and the TTL has passed, then nothing is returned.
//
Expand Down
Loading