Skip to content

Commit 86938bd

Browse files
committed
feat: add Step Functions enrichiment for pipes
1 parent 2ce263b commit 86938bd

16 files changed

Lines changed: 33700 additions & 0 deletions

packages/@aws-cdk/aws-pipes-enrichments-alpha/README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,24 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
4646
target: new SomeTarget(targetQueue),
4747
});
4848
```
49+
50+
### Step Functions statemachine
51+
52+
A Step Functions statemachine can be used to enrich events of a pipe.
53+
Express workflows are only supported and can only be invoked synchronously.
54+
For more information, see [Amazon EventBridge Pipes event enrichment](https://docs.aws.amazon.com/eventbridge/latest/userguide/pipes-enrichment.html).
55+
56+
```ts
57+
declare const sourceQueue: sqs.Queue;
58+
declare const targetQueue: sqs.Queue;
59+
60+
declare const enrichmentStateMachine: stepfunctions.StateMachine;
61+
62+
const enrichment = new enrichments.StepFunctionsEnrichment(enrichmentStateMachine);
63+
64+
const pipe = new pipes.Pipe(this, 'Pipe', {
65+
source: new SomeSource(sourceQueue),
66+
enrichment,
67+
target: new SomeTarget(targetQueue),
68+
});
69+
```
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
export * from './lambda';
2+
export * from './stepfunctions';
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import { EnrichmentParametersConfig, IEnrichment, IPipe, InputTransformation } from '@aws-cdk/aws-pipes-alpha';
2+
import { IRole } from 'aws-cdk-lib/aws-iam';
3+
import { IStateMachine, StateMachine, StateMachineType } from 'aws-cdk-lib/aws-stepfunctions';
4+
5+
/**
6+
* Properties for a StepFunctionsEnrichment
7+
*/
8+
export interface StepFunctionsEnrichmentProps {
9+
/**
10+
* The input transformation for the enrichment
11+
* @see https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-input-transformation.html
12+
* @default - None
13+
*/
14+
readonly inputTransformation?: InputTransformation;
15+
}
16+
17+
/**
18+
* A StepFunctions enrichment for a pipe
19+
*/
20+
export class StepFunctionsEnrichment implements IEnrichment {
21+
public readonly enrichmentArn: string;
22+
23+
private readonly inputTransformation?: InputTransformation;
24+
private readonly stateMachine: IStateMachine;
25+
26+
constructor(stateMachine: IStateMachine, props?: StepFunctionsEnrichmentProps) {
27+
this.stateMachine=stateMachine;
28+
if (this.stateMachine instanceof StateMachine
29+
&& (this.stateMachine.stateMachineType === StateMachineType.STANDARD)
30+
) {
31+
throw new Error('Enrichiment does not support STANDARD state machine workflows. Use EXPRESS instead.');
32+
}
33+
this.enrichmentArn = stateMachine.stateMachineArn;
34+
this.inputTransformation = props?.inputTransformation;
35+
}
36+
37+
bind(pipe: IPipe): EnrichmentParametersConfig {
38+
return {
39+
enrichmentParameters: {
40+
inputTemplate: this.inputTransformation?.bind(pipe).inputTemplate,
41+
},
42+
};
43+
}
44+
45+
grantInvoke(pipeRole: IRole): void {
46+
this.stateMachine.grantStartSyncExecution(pipeRole);
47+
}
48+
}
49+

packages/@aws-cdk/aws-pipes-enrichments-alpha/rosetta/default.ts-fixture

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import * as cdk from 'aws-cdk-lib';
33
import * as sqs from 'aws-cdk-lib/aws-sqs';
44
import * as lambda from 'aws-cdk-lib/aws-lambda';
5+
import * as stepfunctions from 'aws-cdk-lib/aws-stepfunctions';
56
import { Construct } from 'constructs';
67
import * as pipes from '@aws-cdk/aws-pipes-alpha';
78
import * as enrichments from '@aws-cdk/aws-pipes-enrichments-alpha';
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Jest Snapshot v1, https://goo.gl/fbAQLP
2+
3+
exports[`stepfunctions should grant pipe role invoke access 1`] = `
4+
{
5+
"EnrichmentStateMachineRoleDE810FCA": {
6+
"Properties": {
7+
"AssumeRolePolicyDocument": {
8+
"Statement": [
9+
{
10+
"Action": "sts:AssumeRole",
11+
"Effect": "Allow",
12+
"Principal": {
13+
"Service": {
14+
"Fn::FindInMap": [
15+
"ServiceprincipalMap",
16+
{
17+
"Ref": "AWS::Region",
18+
},
19+
"states",
20+
],
21+
},
22+
},
23+
},
24+
],
25+
"Version": "2012-10-17",
26+
},
27+
},
28+
"Type": "AWS::IAM::Role",
29+
},
30+
"MyPipeRoleCBC8E9AB": {
31+
"Properties": {
32+
"AssumeRolePolicyDocument": {
33+
"Statement": [
34+
{
35+
"Action": "sts:AssumeRole",
36+
"Effect": "Allow",
37+
"Principal": {
38+
"Service": "pipes.amazonaws.com",
39+
},
40+
},
41+
],
42+
"Version": "2012-10-17",
43+
},
44+
},
45+
"Type": "AWS::IAM::Role",
46+
},
47+
}
48+
`;
49+
50+
exports[`stepfunctions should grant pipe role invoke access 2`] = `
51+
{
52+
"MyPipeRoleDefaultPolicy31387C20": {
53+
"Properties": {
54+
"PolicyDocument": {
55+
"Statement": [
56+
{
57+
"Action": "states:StartSyncExecution",
58+
"Effect": "Allow",
59+
"Resource": {
60+
"Ref": "EnrichmentStateMachine8BED6C4E",
61+
},
62+
},
63+
],
64+
"Version": "2012-10-17",
65+
},
66+
"PolicyName": "MyPipeRoleDefaultPolicy31387C20",
67+
"Roles": [
68+
{
69+
"Ref": "MyPipeRoleCBC8E9AB",
70+
},
71+
],
72+
},
73+
"Type": "AWS::IAM::Policy",
74+
},
75+
}
76+
`;

0 commit comments

Comments
 (0)