Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/aws/lambda/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ We recommend using one autoscaling Lambda for an entire Substation deployment, b
This app handles ingest, transform, and load (ITL) for data from these AWS services:
* [API Gateway](https://docs.aws.amazon.com/lambda/latest/dg/services-apigateway.html)
* [Kinesis Data Streams](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html)
* [Asynchronous Invocation (Lambda)](https://docs.aws.amazon.com/lambda/latest/dg/invocation-async.html)
* [Synchronous Invocation (Lambda)](https://docs.aws.amazon.com/lambda/latest/dg/invocation-sync.html)
* [S3](https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html)
* [S3 via SNS](https://docs.aws.amazon.com/AmazonS3/latest/userguide/ways-to-add-notification-config-to-bucket.html)
* [SNS](https://docs.aws.amazon.com/lambda/latest/dg/with-sns.html)
Expand Down
80 changes: 80 additions & 0 deletions cmd/aws/lambda/substation/api_gateway.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package main

import (
"context"
"fmt"
"sync"

"github.com/aws/aws-lambda-go/events"
"github.com/brexhq/substation/cmd"
"github.com/brexhq/substation/config"
"golang.org/x/sync/errgroup"
)

type gatewayMetadata struct {
Resource string `json:"resource"`
Path string `json:"path"`
Headers map[string]string `json:"headers"`
}

func gatewayHandler(ctx context.Context, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
sub := cmd.New()

// retrieve and load configuration
cfg, err := getConfig(ctx)
if err != nil {
return events.APIGatewayProxyResponse{StatusCode: 500}, fmt.Errorf("gateway: %v", err)
}

if err := sub.SetConfig(cfg); err != nil {
return events.APIGatewayProxyResponse{StatusCode: 500}, fmt.Errorf("gateway: %v", err)
}

// maintains app state
group, ctx := errgroup.WithContext(ctx)

// load
var sinkWg sync.WaitGroup
sinkWg.Add(1)
group.Go(func() error {
return sub.Sink(ctx, &sinkWg)
})

// transform
var transformWg sync.WaitGroup
for w := 0; w < sub.Concurrency(); w++ {
transformWg.Add(1)
group.Go(func() error {
return sub.Transform(ctx, &transformWg)
})
}

// ingest
group.Go(func() error {
if len(request.Body) != 0 {
capsule := config.NewCapsule()
capsule.SetData([]byte(request.Body))
if _, err := capsule.SetMetadata(gatewayMetadata{
request.Resource,
request.Path,
request.Headers,
}); err != nil {
return fmt.Errorf("gateway handler: %v", err)
}

sub.Send(capsule)
}

sub.WaitTransform(&transformWg)
sub.WaitSink(&sinkWg)

return nil
})

// block until ITL is complete
if err := sub.Block(ctx, group); err != nil {
return events.APIGatewayProxyResponse{StatusCode: 500}, fmt.Errorf("gateway: %v", err)
}

return events.APIGatewayProxyResponse{StatusCode: 200}, nil
}
98 changes: 98 additions & 0 deletions cmd/aws/lambda/substation/kinesis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package main

import (
"context"
"fmt"
"sync"
"time"

"github.com/aws/aws-lambda-go/events"
"github.com/awslabs/kinesis-aggregation/go/deaggregator"
"github.com/brexhq/substation/cmd"
"github.com/brexhq/substation/config"
"github.com/brexhq/substation/internal/aws/kinesis"
"golang.org/x/sync/errgroup"
)

type kinesisMetadata struct {
ApproximateArrivalTimestamp time.Time `json:"approximateArrivalTimestamp"`
EventSourceArn string `json:"eventSourceArn"`
PartitionKey string `json:"partitionKey"`
SequenceNumber string `json:"sequenceNumber"`
}

func kinesisHandler(ctx context.Context, event events.KinesisEvent) error {
sub := cmd.New()

// retrieve and load configuration
cfg, err := getConfig(ctx)
if err != nil {
return fmt.Errorf("kinesis handler: %v", err)
}

if err := sub.SetConfig(cfg); err != nil {
return fmt.Errorf("kinesis handler: %v", err)
}

// maintains app state
group, ctx := errgroup.WithContext(ctx)

// load
var sinkWg sync.WaitGroup
sinkWg.Add(1)
group.Go(func() error {
return sub.Sink(ctx, &sinkWg)
})

// transform
var transformWg sync.WaitGroup
for w := 0; w < sub.Concurrency(); w++ {
transformWg.Add(1)
group.Go(func() error {
return sub.Transform(ctx, &transformWg)
})
}

// ingest
eventSourceArn := event.Records[len(event.Records)-1].EventSourceArn

group.Go(func() error {
converted := kinesis.ConvertEventsRecords(event.Records)
deaggregated, err := deaggregator.DeaggregateRecords(converted)
if err != nil {
return fmt.Errorf("kinesis handler: %v", err)
}

for _, record := range deaggregated {
select {
case <-ctx.Done():
return ctx.Err()
default:
capsule := config.NewCapsule()
capsule.SetData(record.Data)
if _, err := capsule.SetMetadata(kinesisMetadata{
*record.ApproximateArrivalTimestamp,
eventSourceArn,
*record.PartitionKey,
*record.SequenceNumber,
}); err != nil {
return fmt.Errorf("kinesis handler: %v", err)
}

sub.Send(capsule)
}
}

sub.WaitTransform(&transformWg)
sub.WaitSink(&sinkWg)

return nil
})

// block until ITL is complete
if err := sub.Block(ctx, group); err != nil {
return fmt.Errorf("kinesis handler: %v", err)
}

return nil
}
182 changes: 182 additions & 0 deletions cmd/aws/lambda/substation/lambda.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package main

import (
"context"
"encoding/json"
"fmt"
"sync"

"github.com/brexhq/substation/cmd"
"github.com/brexhq/substation/config"
"github.com/brexhq/substation/internal/errors"
"github.com/brexhq/substation/internal/service"
"golang.org/x/sync/errgroup"
)

// lambdaAsyncHandler implements ITL that is triggered by an asynchronous invocation
// of the Lambda. Read more about synchronous invocation here:
// https://docs.aws.amazon.com/lambda/latest/dg/invocation-async.html.
//
// This implementation of Substation only supports the object data handling pattern
// -- if the payload sent to the Lambda is not JSON, then the invocation will fail.
func lambdaAsyncHandler(ctx context.Context, event json.RawMessage) error {
evt, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("lambda async: %v", err)
}

sub := cmd.New()

// retrieve and load configuration
cfg, err := getConfig(ctx)
if err != nil {
return fmt.Errorf("lambda async: %v", err)
}

if err := sub.SetConfig(cfg); err != nil {
return fmt.Errorf("lambda async: %v", err)
}

// maintains app state
group, ctx := errgroup.WithContext(ctx)

// load
var sinkWg sync.WaitGroup
sinkWg.Add(1)
group.Go(func() error {
return sub.Sink(ctx, &sinkWg)
})

// transform
var transformWg sync.WaitGroup
for w := 0; w < sub.Concurrency(); w++ {
transformWg.Add(1)
group.Go(func() error {
return sub.Transform(ctx, &transformWg)
})
}

// ingest
group.Go(func() error {
capsule := config.NewCapsule()
capsule.SetData(evt)

// do not add metadata -- there is no metadata worth adding from the invocation
sub.Send(capsule)

sub.WaitTransform(&transformWg)
sub.WaitSink(&sinkWg)

return nil
})

// block until ITL is complete
if err := sub.Block(ctx, group); err != nil {
panic(err)
}

return nil
}

// errLambdaSyncMultipleItems is returned when an invocation of the lambdaSync handler
// produces multiple items, which cannot be returned.
const errLambdaSyncMultipleItems = errors.Error("transformed data into multiple items")

// lambdaSyncHandler implements ITL using a request-reply service that is triggered
// by synchronous invocation of the Lambda. Read more about synchronous invocation here:
// https://docs.aws.amazon.com/lambda/latest/dg/invocation-sync.html.
//
// This implementation of Substation has some limitations and requirements:
//
// - Only supports the object data handling pattern -- if the payload sent to the Lambda
// and the result are not JSON, then the invocation will fail
//
// - Only returns a single object -- if many objects may be returned, then they should be
// aggregated into one object using the Aggregate processor
//
// - Must use the gRPC sink configured to send data to localhost:50051 -- data is routed
// from the sink to the handler using the Substation gRPC Sink service
func lambdaSyncHandler(ctx context.Context, event json.RawMessage) (json.RawMessage, error) {
evt, err := json.Marshal(event)
if err != nil {
return nil, fmt.Errorf("lambda sync: %v", err)
}

sub := cmd.New()

// retrieve and load configuration
cfg, err := getConfig(ctx)
if err != nil {
return nil, fmt.Errorf("lambda sync: %v", err)
}

if err := sub.SetConfig(cfg); err != nil {
return nil, fmt.Errorf("lambda sync: %v", err)
}

// maintains app state
group, ctx := errgroup.WithContext(ctx)

// gRPC service, required for catching results from the sink
server := service.Server{}
server.Setup()

// deferring guarantees that the gRPC server will shutdown
defer server.Stop()

srv := &service.Sink{}
server.RegisterSink(srv)

// gRPC server runs in a goroutine to prevent blocking main
group.Go(func() error {
return server.Start("localhost:50051")
})

// load
var sinkWg sync.WaitGroup
sinkWg.Add(1)
group.Go(func() error {
return sub.Sink(ctx, &sinkWg)
})

// transform
var transformWg sync.WaitGroup
for w := 0; w < sub.Concurrency(); w++ {
transformWg.Add(1)
group.Go(func() error {
return sub.Transform(ctx, &transformWg)
})
}

// ingest
group.Go(func() error {
capsule := config.NewCapsule()
capsule.SetData(evt)

// do not add metadata -- there is no metadata worth adding from the invocation
sub.Send(capsule)

sub.WaitTransform(&transformWg)
sub.WaitSink(&sinkWg)

return nil
})

// block until ITL is complete and the gRPC stream is closed
if err := sub.Block(ctx, group); err != nil {
panic(err)
}
srv.Block()

if len(srv.Capsules) > 1 {
return nil, fmt.Errorf("lambda sync: %v", errLambdaSyncMultipleItems)
}

capsule := srv.Capsules[0]
var output json.RawMessage
if err := json.Unmarshal(capsule.Data(), &output); err != nil {
return nil, fmt.Errorf("lambda sync: %v", err)
}

return output, nil
}
Loading