SQS to trigger Step Functionsusing EventBridge Pipe with CDK Python

Simple pattern that triggers Step Functionsexecution from SQS using Amazon EventBridge Pipe and CDK using Python.

SQSAmazon EventBridgeStep Functions
from aws_cdk import (
    CfnOutput,
    Stack,
    aws_iam as iam,
    aws_pipes as pipes,
    aws_sqs as sqs,
    aws_stepfunctions as sfn,
)
from constructs import Construct
import json

class EventbridgePipesSqsToStepfunctionsCdkPythonStack(Stack):

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        source = sqs.Queue(self, 'sqs-queue')

        target = sfn.StateMachine(self, "state-machine",
            definition=sfn.Pass(self, "start-state")
        )

        source_policy = iam.PolicyStatement(
                actions=['sqs:ReceiveMessage', 'sqs:DeleteMessage', 'sqs:GetQueueAttributes'],
                resources=[source.queue_arn],
                effect=iam.Effect.ALLOW,
        )

        target_policy = iam.PolicyStatement(
                actions=['states:StartExecution'],
                resources=[target.state_machine_arn],
                effect=iam.Effect.ALLOW,
        )

        pipe_role = iam.Role(self, 'pipe-role',
            assumed_by=iam.ServicePrincipal('pipes.amazonaws.com'),
        )

        pipe_role.add_to_policy(source_policy)
        pipe_role.add_to_policy(target_policy)

        pipe = pipes.CfnPipe(self, "pipe",
            role_arn=pipe_role.role_arn,
            source=source.queue_arn,
            source_parameters=pipes.CfnPipe.PipeSourceParametersProperty(
                sqs_queue_parameters=pipes.CfnPipe.PipeSourceSqsQueueParametersProperty(
                    batch_size=1
                )
            ),
            target=target.state_machine_arn,
            target_parameters=pipes.CfnPipe.PipeTargetParametersProperty(
                step_function_state_machine_parameters=pipes.CfnPipe.PipeTargetStateMachineParametersProperty(
                    invocation_type="FIRE_AND_FORGET"
                ),
                input_template=json.dumps({
                    "orderId": "<$.body.orderId>",
                    "customerId": "<$.body.customerId>",
                })
            )
        )

        # Output
        CfnOutput(self, "QUEUE_URL", value=source.queue_url)
        CfnOutput(self, "PIPE_ARN", value=pipe.attr_arn)
        CfnOutput(self, "STATE_MACHINE_ARN", value=target.state_machine_arn)

Download

git clone https://github.com/aws-samples/serverless-patterns
cd serverless-patterns/eventbridge-pipes-sqs-to-stepfunctions-cdk-python

Pattern repository

View on GitHub

Last updated on 26 Dec 2024

Edit this page