Amazon SQS FIFO queue with controlled delay

Amazon SQS FIFO queue with delay using AWS Lambda and Amazon DynamoDB

Amazon SQSAWS LambdaAmazon SQS
from aws_cdk import (
    Duration,
    Stack,
    aws_sqs as sqs,
    aws_lambda as lambda_,
    aws_iam as iam,
    aws_cloudwatch as cloudwatch,
    aws_cloudwatch_actions as cloudwatch_actions,
    aws_dynamodb as dynamodb,
    aws_sns as sns,
    aws_sns_subscriptions as subscriptions,
    CfnOutput as cfnoutput 
)
from constructs import Construct

class DelayFifoQueueTestStack(Stack):

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)    
        
        # create a dead letter queue called primary_queue_dlq
        primary_queue_dlq = sqs.Queue(self, "DelayFifoQueueDlq",
                                      visibility_timeout=Duration.seconds(60),
                                      fifo=True,
                                      content_based_deduplication=True
                                      )

        # create an initial primary SQS FIFO queue with a visibility timeout of 60 seconds
        primary_queue = sqs.Queue(self, "DelayFifoQueue",
                          visibility_timeout=Duration.seconds(60),
                          fifo=True,
                          content_based_deduplication=True,
                          dead_letter_queue=sqs.DeadLetterQueue(
                            max_receive_count=5,
                            queue=primary_queue_dlq
                          )
        )
        
        # create a downstream SQS FIFO queue with a visibility timeout of 60 seconds
        downstream_queue = sqs.Queue(self, "DelayFifoQueueDownstream",
                          visibility_timeout=Duration.seconds(60),
                          fifo=True,
                          content_based_deduplication=True
                          )
        
        # create a dynamodb table to store customer id and created timestamp
        customer_table = dynamodb.Table(self, "CustomerTable",
                                        table_name="DelayFifoQueueCustomerTable",
                                        partition_key=dynamodb.Attribute(name="customer_id", type=dynamodb.AttributeType.STRING),
                                        time_to_live_attribute="ttl"
                                        )
        
        # create a Lambda function to process messages from the queue
        process_queue_function = lambda_.Function(self, "ProcessMessageLambda",
                                                    runtime=lambda_.Runtime.PYTHON_3_9,
                                                    code=lambda_.Code.from_asset("lambda"),
                                                    handler="process_message.handler",
                                                    environment={
                                                        "QUEUE_URL": downstream_queue.queue_url,
                                                        "TABLE_NAME": customer_table.table_name
                                                    })

        # create an SNS topic to send notifications when primary_queue_dlq is not empty
        dlq_size_sns_topic = sns.Topic(self, "PrimaryQueueDqlSizeAlertTopic")
        dlq_size_sns_topic.add_subscription(subscriptions.EmailSubscription("[email protected]"))

        # create a CloudWatch alarm if primary_queue_dlq is not empty
        dlq_size_alarm = cloudwatch.Alarm(self, "PrimaryQueueDqlSizeAlert",
                                            metric=cloudwatch.Metric(metric_name="ApproximateNumberOfMessagesVisible",
                                                                    namespace="AWS/SQS",
                                                                    dimensions_map={
                                                                        "QueueName": primary_queue_dlq.queue_name
                                                                        },
                                                                    statistic="Sum",
                                                                    period=Duration.seconds(60)
                                                                    ),
                                            evaluation_periods=1,
                                            threshold=0,
                                            comparison_operator=cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD,
                                            treat_missing_data=cloudwatch.TreatMissingData.NOT_BREACHING
                                            )
        dlq_size_alarm.add_alarm_action(
            cloudwatch_actions.SnsAction(
                topic = dlq_size_sns_topic
            )
        )


        # create Lambda execution role that has access to receive messages from primary_queue queue
        process_queue_function.add_to_role_policy(iam.PolicyStatement(
            actions=["sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes", "sqs:GetQueueUrl"],
            resources=[primary_queue.queue_arn]
        ))

        # add to Lambda execution role policy to send messages to the downstream_queue queue
        process_queue_function.add_to_role_policy(iam.PolicyStatement(
            actions=["sqs:SendMessage"],
            resources=[downstream_queue.queue_arn]
        ))
                
        lambda_.EventSourceMapping(self, "ProcessMessageLambdaEventSourceMapping",
                                    event_source_arn=primary_queue.queue_arn,
                                    target=process_queue_function,
                                    batch_size=10,
                                    report_batch_item_failures=True
        )
      
        # give permissions for the  function to read and write to the dynamodb table
        customer_table.grant_read_write_data(process_queue_function)

        cfnoutput(self, "DelayFifoQueueURL", value=primary_queue.queue_url)

Download

git clone https://github.com/aws-samples/serverless-patterns
cd serverless-patterns/sqs-fifo-delayed-queue-dynamodb

Pattern repository

View on GitHub

Last updated on 26 Dec 2024

Edit this page