Split SQS Messages into multiple SQS queus using EventBridge Pipe with CDK Python

Simple pattern that split messages in a SQS queue into multiple SQS queues using EventBridge Pipe and Event Bus. Implemented using CDK using Python.

SQSeventbridge-pipesAWS LambdaSQS
from aws_cdk import (
    CfnOutput,
    Stack,
    aws_events as events,
    aws_events_targets as event_target,
    aws_iam as iam,
    aws_lambda as _lambda,
    aws_pipes as pipes,
    aws_sqs as sqs,
)
from constructs import Construct

class EventbridgePipesSqsToMultipleSqsStack(Stack):

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Create queues
        source_queue = sqs.Queue(self, 'source-sqs-queue')
        target_queue_order_created = sqs.Queue(self, 'target-sqs-queue-order-created')
        target_queue_order_updated = sqs.Queue(self, 'target-sqs-queue-order-updated')

        # Create event bus
        event_bus = events.EventBus(self, 'event-bus')

        # Add event bus rule and target for order created queue
        event_rule_order_created = events.Rule(self, 'event-rule-order-created',
            event_bus=event_bus,
            event_pattern=events.EventPattern(
                source=['myapp.orders'],
                detail={
                        'type': ['OrderCreated']
                    },
            ),
        );

        event_rule_order_created.add_target(event_target.SqsQueue(target_queue_order_created, message=events.RuleTargetInput.from_event_path('$.detail')))

        # Add event bus rule and target for order updated queue
        event_rule_order_updated = events.Rule(self, 'event-rule-order-updated',
            event_bus=event_bus,
            event_pattern=events.EventPattern(
                source=['myapp.orders'],
                detail={
                        'type': ['OrderUpdated']
                    },
            ),
        );

        event_rule_order_updated.add_target(event_target.SqsQueue(target_queue_order_updated, message=events.RuleTargetInput.from_event_path('$.detail')))

        # Create Pipe enrichment Lambda function
        enrichment_lambda = _lambda.Function(
            self, 'enrichment-lambda',
            runtime=_lambda.Runtime.PYTHON_3_9,
            code=_lambda.Code.from_asset('lambda'),
            handler='index.handler',
            retry_attempts=0,
        )

        # Create Pipe policies and role
        pipe_source_policy = iam.PolicyStatement(
                actions=['sqs:ReceiveMessage', 'sqs:DeleteMessage', 'sqs:GetQueueAttributes'],
                resources=[source_queue.queue_arn],
                effect=iam.Effect.ALLOW,
        )

        pipe_enrichment_policy = iam.PolicyStatement(
                actions=['lambda:InvokeFunction'],
                resources=[enrichment_lambda.function_arn],
                effect=iam.Effect.ALLOW,
        )

        pipe_target_policy = iam.PolicyStatement(
                actions=['events:PutEvents'],
                resources=[event_bus.event_bus_arn],
                effect=iam.Effect.ALLOW,
        )

        pipe_role = iam.Role(self, 'pipe-role',
            assumed_by=iam.ServicePrincipal('pipes.amazonaws.com'),
        )

        pipe_role.add_to_policy(pipe_source_policy)
        pipe_role.add_to_policy(pipe_target_policy)
        pipe_role.add_to_policy(pipe_enrichment_policy)

        # Create Pipe
        pipe = pipes.CfnPipe(self, "pipe",
            role_arn=pipe_role.role_arn,
            source=source_queue.queue_arn,
            source_parameters=pipes.CfnPipe.PipeSourceParametersProperty(
                sqs_queue_parameters=pipes.CfnPipe.PipeSourceSqsQueueParametersProperty(
                    batch_size=1,
                )
            ),
            enrichment=enrichment_lambda.function_arn,
            target=event_bus.event_bus_arn,
            target_parameters=pipes.CfnPipe.PipeTargetParametersProperty(
                event_bridge_event_bus_parameters=pipes.CfnPipe.PipeTargetEventBridgeEventBusParametersProperty(
                    source="myapp.orders",
                )
            )
        )

        # Output
        CfnOutput(self, "SourceQueueUrl", value=source_queue.queue_url)
        CfnOutput(self, "TargetQueueOrderCreated", value=target_queue_order_created.queue_url)
        CfnOutput(self, "TargetQueueOrderUpdated", value=target_queue_order_updated.queue_url)
        CfnOutput(self, "PIPE_ARN", value=pipe.attr_arn)

Download

git clone https://github.com/aws-samples/serverless-patterns
cd serverless-patterns/eventbridge-pipes-sqs-to-multiple-sqs

Pattern repository

View on GitHub

Last updated on 26 Dec 2024

Edit this page