-
Notifications
You must be signed in to change notification settings - Fork 30
feat: Add Sync and Async AWS Lambda Ingest #72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
35d036a
feat: add lambda invoke ingest
jshlbrd 7a948d6
docs: README
jshlbrd 4ed258d
chore: add async source lambda example
jshlbrd dda28f8
feat: microservice example
jshlbrd 20d8ff1
chore: add Lambda URL to example
jshlbrd e345c48
refactor: json raw message
jshlbrd File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "sync" | ||
|
|
||
| "github.com/aws/aws-lambda-go/events" | ||
| "github.com/brexhq/substation/cmd" | ||
| "github.com/brexhq/substation/config" | ||
| "golang.org/x/sync/errgroup" | ||
| ) | ||
|
|
||
| type gatewayMetadata struct { | ||
| Resource string `json:"resource"` | ||
| Path string `json:"path"` | ||
| Headers map[string]string `json:"headers"` | ||
| } | ||
|
|
||
| func gatewayHandler(ctx context.Context, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) { | ||
| sub := cmd.New() | ||
|
|
||
| // retrieve and load configuration | ||
| cfg, err := getConfig(ctx) | ||
| if err != nil { | ||
| return events.APIGatewayProxyResponse{StatusCode: 500}, fmt.Errorf("gateway: %v", err) | ||
| } | ||
|
|
||
| if err := sub.SetConfig(cfg); err != nil { | ||
| return events.APIGatewayProxyResponse{StatusCode: 500}, fmt.Errorf("gateway: %v", err) | ||
| } | ||
|
|
||
| // maintains app state | ||
| group, ctx := errgroup.WithContext(ctx) | ||
|
|
||
| // load | ||
| var sinkWg sync.WaitGroup | ||
| sinkWg.Add(1) | ||
| group.Go(func() error { | ||
| return sub.Sink(ctx, &sinkWg) | ||
| }) | ||
|
|
||
| // transform | ||
| var transformWg sync.WaitGroup | ||
| for w := 0; w < sub.Concurrency(); w++ { | ||
| transformWg.Add(1) | ||
| group.Go(func() error { | ||
| return sub.Transform(ctx, &transformWg) | ||
| }) | ||
| } | ||
|
|
||
| // ingest | ||
| group.Go(func() error { | ||
| if len(request.Body) != 0 { | ||
| capsule := config.NewCapsule() | ||
| capsule.SetData([]byte(request.Body)) | ||
| if _, err := capsule.SetMetadata(gatewayMetadata{ | ||
| request.Resource, | ||
| request.Path, | ||
| request.Headers, | ||
| }); err != nil { | ||
| return fmt.Errorf("gateway handler: %v", err) | ||
| } | ||
|
|
||
| sub.Send(capsule) | ||
| } | ||
|
|
||
| sub.WaitTransform(&transformWg) | ||
| sub.WaitSink(&sinkWg) | ||
|
|
||
| return nil | ||
| }) | ||
|
|
||
| // block until ITL is complete | ||
| if err := sub.Block(ctx, group); err != nil { | ||
| return events.APIGatewayProxyResponse{StatusCode: 500}, fmt.Errorf("gateway: %v", err) | ||
| } | ||
|
|
||
| return events.APIGatewayProxyResponse{StatusCode: 200}, nil | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,98 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/aws/aws-lambda-go/events" | ||
| "github.com/awslabs/kinesis-aggregation/go/deaggregator" | ||
| "github.com/brexhq/substation/cmd" | ||
| "github.com/brexhq/substation/config" | ||
| "github.com/brexhq/substation/internal/aws/kinesis" | ||
| "golang.org/x/sync/errgroup" | ||
| ) | ||
|
|
||
| type kinesisMetadata struct { | ||
| ApproximateArrivalTimestamp time.Time `json:"approximateArrivalTimestamp"` | ||
| EventSourceArn string `json:"eventSourceArn"` | ||
| PartitionKey string `json:"partitionKey"` | ||
| SequenceNumber string `json:"sequenceNumber"` | ||
| } | ||
|
|
||
| func kinesisHandler(ctx context.Context, event events.KinesisEvent) error { | ||
| sub := cmd.New() | ||
|
|
||
| // retrieve and load configuration | ||
| cfg, err := getConfig(ctx) | ||
| if err != nil { | ||
| return fmt.Errorf("kinesis handler: %v", err) | ||
| } | ||
|
|
||
| if err := sub.SetConfig(cfg); err != nil { | ||
| return fmt.Errorf("kinesis handler: %v", err) | ||
| } | ||
|
|
||
| // maintains app state | ||
| group, ctx := errgroup.WithContext(ctx) | ||
|
|
||
| // load | ||
| var sinkWg sync.WaitGroup | ||
| sinkWg.Add(1) | ||
| group.Go(func() error { | ||
| return sub.Sink(ctx, &sinkWg) | ||
| }) | ||
|
|
||
| // transform | ||
| var transformWg sync.WaitGroup | ||
| for w := 0; w < sub.Concurrency(); w++ { | ||
| transformWg.Add(1) | ||
| group.Go(func() error { | ||
| return sub.Transform(ctx, &transformWg) | ||
| }) | ||
| } | ||
|
|
||
| // ingest | ||
| eventSourceArn := event.Records[len(event.Records)-1].EventSourceArn | ||
|
|
||
| group.Go(func() error { | ||
| converted := kinesis.ConvertEventsRecords(event.Records) | ||
| deaggregated, err := deaggregator.DeaggregateRecords(converted) | ||
| if err != nil { | ||
| return fmt.Errorf("kinesis handler: %v", err) | ||
| } | ||
|
|
||
| for _, record := range deaggregated { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| default: | ||
| capsule := config.NewCapsule() | ||
| capsule.SetData(record.Data) | ||
| if _, err := capsule.SetMetadata(kinesisMetadata{ | ||
| *record.ApproximateArrivalTimestamp, | ||
| eventSourceArn, | ||
| *record.PartitionKey, | ||
| *record.SequenceNumber, | ||
| }); err != nil { | ||
| return fmt.Errorf("kinesis handler: %v", err) | ||
| } | ||
|
|
||
| sub.Send(capsule) | ||
| } | ||
| } | ||
|
|
||
| sub.WaitTransform(&transformWg) | ||
| sub.WaitSink(&sinkWg) | ||
|
|
||
| return nil | ||
| }) | ||
|
|
||
| // block until ITL is complete | ||
| if err := sub.Block(ctx, group); err != nil { | ||
| return fmt.Errorf("kinesis handler: %v", err) | ||
| } | ||
|
|
||
| return nil | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,182 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "fmt" | ||
| "sync" | ||
|
|
||
| "github.com/brexhq/substation/cmd" | ||
| "github.com/brexhq/substation/config" | ||
| "github.com/brexhq/substation/internal/errors" | ||
| "github.com/brexhq/substation/internal/service" | ||
| "golang.org/x/sync/errgroup" | ||
| ) | ||
|
|
||
| // lambdaAsyncHandler implements ITL that is triggered by an asynchronous invocation | ||
| // of the Lambda. Read more about synchronous invocation here: | ||
| // https://docs.aws.amazon.com/lambda/latest/dg/invocation-async.html. | ||
| // | ||
| // This implementation of Substation only supports the object data handling pattern | ||
| // -- if the payload sent to the Lambda is not JSON, then the invocation will fail. | ||
| func lambdaAsyncHandler(ctx context.Context, event json.RawMessage) error { | ||
| evt, err := json.Marshal(event) | ||
| if err != nil { | ||
| return fmt.Errorf("lambda async: %v", err) | ||
| } | ||
|
|
||
| sub := cmd.New() | ||
|
|
||
| // retrieve and load configuration | ||
| cfg, err := getConfig(ctx) | ||
| if err != nil { | ||
| return fmt.Errorf("lambda async: %v", err) | ||
| } | ||
|
|
||
| if err := sub.SetConfig(cfg); err != nil { | ||
| return fmt.Errorf("lambda async: %v", err) | ||
| } | ||
|
|
||
| // maintains app state | ||
| group, ctx := errgroup.WithContext(ctx) | ||
|
|
||
| // load | ||
| var sinkWg sync.WaitGroup | ||
| sinkWg.Add(1) | ||
| group.Go(func() error { | ||
| return sub.Sink(ctx, &sinkWg) | ||
| }) | ||
|
|
||
| // transform | ||
| var transformWg sync.WaitGroup | ||
| for w := 0; w < sub.Concurrency(); w++ { | ||
| transformWg.Add(1) | ||
| group.Go(func() error { | ||
| return sub.Transform(ctx, &transformWg) | ||
| }) | ||
| } | ||
|
|
||
| // ingest | ||
| group.Go(func() error { | ||
| capsule := config.NewCapsule() | ||
| capsule.SetData(evt) | ||
|
|
||
| // do not add metadata -- there is no metadata worth adding from the invocation | ||
| sub.Send(capsule) | ||
|
|
||
| sub.WaitTransform(&transformWg) | ||
| sub.WaitSink(&sinkWg) | ||
|
|
||
| return nil | ||
| }) | ||
|
|
||
| // block until ITL is complete | ||
| if err := sub.Block(ctx, group); err != nil { | ||
| panic(err) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // errLambdaSyncMultipleItems is returned when an invocation of the lambdaSync handler | ||
| // produces multiple items, which cannot be returned. | ||
| const errLambdaSyncMultipleItems = errors.Error("transformed data into multiple items") | ||
|
|
||
| // lambdaSyncHandler implements ITL using a request-reply service that is triggered | ||
| // by synchronous invocation of the Lambda. Read more about synchronous invocation here: | ||
| // https://docs.aws.amazon.com/lambda/latest/dg/invocation-sync.html. | ||
| // | ||
| // This implementation of Substation has some limitations and requirements: | ||
| // | ||
| // - Only supports the object data handling pattern -- if the payload sent to the Lambda | ||
| // and the result are not JSON, then the invocation will fail | ||
| // | ||
| // - Only returns a single object -- if many objects may be returned, then they should be | ||
| // aggregated into one object using the Aggregate processor | ||
| // | ||
| // - Must use the gRPC sink configured to send data to localhost:50051 -- data is routed | ||
| // from the sink to the handler using the Substation gRPC Sink service | ||
| func lambdaSyncHandler(ctx context.Context, event json.RawMessage) (json.RawMessage, error) { | ||
| evt, err := json.Marshal(event) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("lambda sync: %v", err) | ||
| } | ||
|
|
||
| sub := cmd.New() | ||
|
|
||
| // retrieve and load configuration | ||
| cfg, err := getConfig(ctx) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("lambda sync: %v", err) | ||
| } | ||
|
|
||
| if err := sub.SetConfig(cfg); err != nil { | ||
| return nil, fmt.Errorf("lambda sync: %v", err) | ||
| } | ||
|
|
||
| // maintains app state | ||
| group, ctx := errgroup.WithContext(ctx) | ||
|
|
||
| // gRPC service, required for catching results from the sink | ||
| server := service.Server{} | ||
| server.Setup() | ||
|
|
||
| // deferring guarantees that the gRPC server will shutdown | ||
| defer server.Stop() | ||
|
|
||
| srv := &service.Sink{} | ||
| server.RegisterSink(srv) | ||
|
|
||
| // gRPC server runs in a goroutine to prevent blocking main | ||
| group.Go(func() error { | ||
| return server.Start("localhost:50051") | ||
| }) | ||
|
|
||
| // load | ||
| var sinkWg sync.WaitGroup | ||
| sinkWg.Add(1) | ||
| group.Go(func() error { | ||
| return sub.Sink(ctx, &sinkWg) | ||
| }) | ||
|
|
||
| // transform | ||
| var transformWg sync.WaitGroup | ||
| for w := 0; w < sub.Concurrency(); w++ { | ||
| transformWg.Add(1) | ||
| group.Go(func() error { | ||
| return sub.Transform(ctx, &transformWg) | ||
| }) | ||
| } | ||
|
|
||
| // ingest | ||
| group.Go(func() error { | ||
| capsule := config.NewCapsule() | ||
| capsule.SetData(evt) | ||
|
|
||
| // do not add metadata -- there is no metadata worth adding from the invocation | ||
| sub.Send(capsule) | ||
|
|
||
| sub.WaitTransform(&transformWg) | ||
| sub.WaitSink(&sinkWg) | ||
|
|
||
| return nil | ||
| }) | ||
|
|
||
| // block until ITL is complete and the gRPC stream is closed | ||
| if err := sub.Block(ctx, group); err != nil { | ||
| panic(err) | ||
| } | ||
| srv.Block() | ||
|
|
||
| if len(srv.Capsules) > 1 { | ||
| return nil, fmt.Errorf("lambda sync: %v", errLambdaSyncMultipleItems) | ||
| } | ||
|
|
||
| capsule := srv.Capsules[0] | ||
| var output json.RawMessage | ||
| if err := json.Unmarshal(capsule.Data(), &output); err != nil { | ||
| return nil, fmt.Errorf("lambda sync: %v", err) | ||
| } | ||
|
|
||
| return output, nil | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.