SQS to EventBridge Bus using EventBridge Pipes

This pattern uses SQS and triggers EventBridge events directly with optional filtering supported.

SQSEventBridge PipesEventBridge Bus
from aws_cdk import (
    CfnOutput,
    Stack,
    aws_events as events,
    aws_iam as iam,
    aws_pipes as pipes,
    aws_sqs as sqs,
)
from constructs import Construct
import json

class EventbridgePipesSqsToEventbridgeCdkPythonStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        source = sqs.Queue(self, 'sqs-queue')
        target = events.EventBus(self, 'event-bus')

        source_policy = iam.PolicyStatement(
                actions=['sqs:ReceiveMessage', 'sqs:DeleteMessage', 'sqs:GetQueueAttributes'],
                resources=[source.queue_arn],
                effect=iam.Effect.ALLOW,
        )

        target_policy = iam.PolicyStatement(
                actions=['events:PutEvents'],
                resources=[target.event_bus_arn],
                effect=iam.Effect.ALLOW,
        )

        pipe_role = iam.Role(self, 'pipe-role',
            assumed_by=iam.ServicePrincipal('pipes.amazonaws.com'),
        )

        pipe_role.add_to_policy(source_policy)
        pipe_role.add_to_policy(target_policy)

        pipe = pipes.CfnPipe(self, "pipe",
            role_arn=pipe_role.role_arn,
            source=source.queue_arn,
            source_parameters=pipes.CfnPipe.PipeSourceParametersProperty(
                sqs_queue_parameters=pipes.CfnPipe.PipeSourceSqsQueueParametersProperty(
                    batch_size=5,
                    maximum_batching_window_in_seconds=60
                )
            ),
            target=target.event_bus_arn,
            target_parameters=pipes.CfnPipe.PipeTargetParametersProperty(
                event_bridge_event_bus_parameters=pipes.CfnPipe.PipeTargetEventBridgeEventBusParametersProperty(
                    detail_type="OrderCreated",
                    source="myapp.orders",
                ),
                input_template=json.dumps({
                    "orderId": "<$.body.orderId>",
                    "customerId": "<$.body.customerId>",
                })
            )
        )
        
        # Output
        CfnOutput(self, "QUEUE_URL", value=source.queue_url)
        CfnOutput(self, "PIPE_ARN", value=pipe.attr_arn)

Download

git clone https://github.com/aws-samples/serverless-patterns
cd serverless-patterns/eventbridge-pipes-sqs-to-eventbridge-cdk-python

Pattern repository

View on GitHub

Last updated on 26 Dec 2024

Edit this page