Amazon API Gateway to SQS to Lambda

Create an HTTP API that routes to SQS then Lambda

API Gateway HTTP APISQSAWS Lambda
import os
from aws_cdk import (
    Duration,
    Stack,
    CfnOutput,
    aws_sqs as _sqs,
    aws_apigatewayv2 as _apigwv2,
    aws_lambda as _lambda,
    aws_iam as _iam,
)

from aws_cdk.aws_lambda_event_sources import SqsEventSource

from constructs import Construct


class ApigwHttpApiSqsLambdaStack(Stack):

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)
        
        self._account_id = os.environ["CDK_DEFAULT_ACCOUNT"]
        self._region = os.environ["CDK_DEFAULT_REGION"]

        self._queue = _sqs.Queue(
            self, "ApigwV2SqsLambdaQueue",
            visibility_timeout=Duration.seconds(300),
        )

        self._sqs_event_source = SqsEventSource(self._queue)

        self._fn = _lambda.Function(self, 'SqsMessageHandler',
            runtime=_lambda.Runtime.PYTHON_3_8,
            handler='app.handler',
            code=_lambda.Code.from_asset(
                path='src'
            ),
            timeout=Duration.minutes(3),
            memory_size=128,
            environment={
                'REGION': self._region,
                'ACCOUNT_ID': self._account_id
            },
        )

        self._fn.add_event_source(self._sqs_event_source)

        self._http_api = self._create_apigw_v2()

        self._integration_role = self._create_apigw_to_sqs_role()

        self._send_msg_route = self._create_sqs_send_msg_route()

        # Enable Auto Deploy
        self._stage = self._create_stage()

        # Outputs
        CfnOutput(self, "API Endpoint", 
            description="API Endpoint", 
            value=self._http_api.attr_api_endpoint
        )
    
    def _create_apigw_to_sqs_role(self):
        _role = _iam.Role(
            self, 'ApiGwV2ToSqsRole',
            assumed_by=_iam.ServicePrincipal('apigateway.amazonaws.com')
        )

        _role.add_managed_policy(
            _iam.ManagedPolicy.from_managed_policy_arn(
                self, 'ApiGwPushCwPolicy',
                'arn:aws:iam::aws:policy/service-role/'\
                'AmazonAPIGatewayPushToCloudWatchLogs'
            )
        )
        _role.attach_inline_policy(
            _iam.Policy(
                self,
                'ApiGwV2ToSqsInlinePolicy',
                statements=[
                    _iam.PolicyStatement(
                        actions=[
                            'sqs:SendMessage',
                            'sqs:ReceiveMessage',
                            'sqs:PurgeQueue',
                            'sqs:DeleteMessage',
                        ],
                        resources=[self._queue.queue_arn]
                    )
                ]
            )
        )
        return _role

    def _create_apigw_v2(self):
        return _apigwv2.CfnApi(
            self,'HttpToSqs',
            cors_configuration=_apigwv2.CfnApi.CorsProperty(
                allow_credentials=False,
                allow_headers=["*"],
                allow_methods=['GET', 'POST', 'PUT', 'DELETE'],
                allow_origins=["*"],
                max_age=43200,
            ),
            name='HttpToSqs',
            protocol_type='HTTP'
        )

    def _create_stage(self):
        _stage = _apigwv2.CfnStage(self, 'HttpToSqsStage',
            api_id=self._http_api.ref,
            stage_name='$default',
            auto_deploy=True
        )
        return _stage

    def _create_sqs_send_msg_route(self):
        _integ =_apigwv2.CfnIntegration(
            self, 'HttpApiIntegSqsSendMessage',
            api_id=self._http_api.ref,
            integration_type='AWS_PROXY',
            integration_subtype='SQS-SendMessage',
            payload_format_version='1.0',
            request_parameters={
                'QueueUrl': self._queue.queue_url,
                'MessageBody': '$request.body.MessageBody',
            },
            credentials_arn=self._integration_role.role_arn,
        )
        return _apigwv2.CfnRoute(
            self, 'HttpApiRouteSqsSendMsg',
            api_id=self._http_api.ref,
            route_key='POST /send',
            target='/'.join(['integrations',_integ.ref]),
        )

Download

git clone https://github.com/aws-samples/serverless-patterns
cd serverless-patterns/apigw-http-api-sqs-lambda-cdk

Pattern repository

View on GitHub

Last updated on 26 Dec 2024

Edit this page