Create an HTTP API that routes to SQS then Lambda
import os
from aws_cdk import (
Duration,
Stack,
CfnOutput,
aws_sqs as _sqs,
aws_apigatewayv2 as _apigwv2,
aws_lambda as _lambda,
aws_iam as _iam,
)
from aws_cdk.aws_lambda_event_sources import SqsEventSource
from constructs import Construct
class ApigwHttpApiSqsLambdaStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
self._account_id = os.environ["CDK_DEFAULT_ACCOUNT"]
self._region = os.environ["CDK_DEFAULT_REGION"]
self._queue = _sqs.Queue(
self, "ApigwV2SqsLambdaQueue",
visibility_timeout=Duration.seconds(300),
)
self._sqs_event_source = SqsEventSource(self._queue)
self._fn = _lambda.Function(self, 'SqsMessageHandler',
runtime=_lambda.Runtime.PYTHON_3_8,
handler='app.handler',
code=_lambda.Code.from_asset(
path='src'
),
timeout=Duration.minutes(3),
memory_size=128,
environment={
'REGION': self._region,
'ACCOUNT_ID': self._account_id
},
)
self._fn.add_event_source(self._sqs_event_source)
self._http_api = self._create_apigw_v2()
self._integration_role = self._create_apigw_to_sqs_role()
self._send_msg_route = self._create_sqs_send_msg_route()
# Enable Auto Deploy
self._stage = self._create_stage()
# Outputs
CfnOutput(self, "API Endpoint",
description="API Endpoint",
value=self._http_api.attr_api_endpoint
)
def _create_apigw_to_sqs_role(self):
_role = _iam.Role(
self, 'ApiGwV2ToSqsRole',
assumed_by=_iam.ServicePrincipal('apigateway.amazonaws.com')
)
_role.add_managed_policy(
_iam.ManagedPolicy.from_managed_policy_arn(
self, 'ApiGwPushCwPolicy',
'arn:aws:iam::aws:policy/service-role/'\
'AmazonAPIGatewayPushToCloudWatchLogs'
)
)
_role.attach_inline_policy(
_iam.Policy(
self,
'ApiGwV2ToSqsInlinePolicy',
statements=[
_iam.PolicyStatement(
actions=[
'sqs:SendMessage',
'sqs:ReceiveMessage',
'sqs:PurgeQueue',
'sqs:DeleteMessage',
],
resources=[self._queue.queue_arn]
)
]
)
)
return _role
def _create_apigw_v2(self):
return _apigwv2.CfnApi(
self,'HttpToSqs',
cors_configuration=_apigwv2.CfnApi.CorsProperty(
allow_credentials=False,
allow_headers=["*"],
allow_methods=['GET', 'POST', 'PUT', 'DELETE'],
allow_origins=["*"],
max_age=43200,
),
name='HttpToSqs',
protocol_type='HTTP'
)
def _create_stage(self):
_stage = _apigwv2.CfnStage(self, 'HttpToSqsStage',
api_id=self._http_api.ref,
stage_name='$default',
auto_deploy=True
)
return _stage
def _create_sqs_send_msg_route(self):
_integ =_apigwv2.CfnIntegration(
self, 'HttpApiIntegSqsSendMessage',
api_id=self._http_api.ref,
integration_type='AWS_PROXY',
integration_subtype='SQS-SendMessage',
payload_format_version='1.0',
request_parameters={
'QueueUrl': self._queue.queue_url,
'MessageBody': '$request.body.MessageBody',
},
credentials_arn=self._integration_role.role_arn,
)
return _apigwv2.CfnRoute(
self, 'HttpApiRouteSqsSendMsg',
api_id=self._http_api.ref,
route_key='POST /send',
target='/'.join(['integrations',_integ.ref]),
)
git clone https://github.com/aws-samples/serverless-patterns
cd serverless-patterns/apigw-http-api-sqs-lambda-cdk