Create an API Gateway REST API that is protected with an AWS Lambda token authorizer and integrates with Amazon SQS
from aws_cdk import (
core as core,
aws_sqs as sqs,
aws_apigateway as apigateway,
aws_iam as iam,
aws_lambda as lambdafun
)
import os
import json
class ApigwSqsAuthStack(core.Stack):
def __init__(self, scope: core.Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Get config values from cdk.out.context
params = self.node.try_get_context("params")
sqs_queue_name = params["storage_sqs_queue_name"]
aws_region =params["aws_region_name"]
lambda_func_arn = params["authorizer_lambda_arn"]
# If Lambda function ARN is provided, create the function from ARN.
authorizer_function = lambdafun.Function.from_function_arn(self,
"Function",
function_arn = lambda_func_arn
)
# Create a SQS queue for storing message that APIGateway will receive.
sqs_queue = sqs.Queue(self,
'storagesqs',
#queue_name = os.getenv("storage_sqs_queue_name","apigwstoragequeue"),
queue_name = sqs_queue_name,
encryption = sqs.QueueEncryption.KMS_MANAGED
)
# Create an inline Policy document that grants permission to access SQS queue
sqs_inline_policy = iam.Policy(self, "sqs-policy-for-apigw",
statements=[iam.PolicyStatement(
actions=["sqs:SendMessage"],
resources=[sqs_queue.queue_arn]
)]
)
# Create an IAM apigateway execution role.
apigw_role = iam.Role(self, "apigw_role",
assumed_by=iam.CompositePrincipal(
iam.ServicePrincipal("apigateway.amazonaws.com"),
iam.ServicePrincipal("lambda.amazonaws.com")
),
description="This is the role for apigateway to access SQS queue."
)
# Attach policy to the role.
apigw_role.attach_inline_policy(sqs_inline_policy)
# Create new Integration Options that can for adding request paramaters and templates.
integration_options = apigateway.IntegrationOptions(
credentials_role = apigw_role,
request_parameters={
'integration.request.header.Content-Type' : "'application/x-www-form-urlencoded'"
},
passthrough_behavior=apigateway.PassthroughBehavior.NEVER,
request_templates={
"application/json": "Action=SendMessage&MessageBody=$input.body"
},
integration_responses=[apigateway.IntegrationResponse(status_code="200")],
)
# Create RequestAuthorizer
authorizer = apigateway.TokenAuthorizer(self, "sqsRequestAuthorizer",
authorizer_name = 'SQSRequestAuthorizer',
#identity_sources=[apigateway.IdentitySource.header("Authorization")],
handler = authorizer_function
)
# Create new apigateway rest api.
apigw = apigateway.RestApi(self, 'sqsapi-withauth',
description ='This api will insert data into SQS ' + sqs_queue.queue_name,
endpoint_types =[apigateway.EndpointType.REGIONAL]
)
# Create a apigateway resource method with authorizer needed
apigw_method = apigw.root.add_resource('sqs').add_method('POST',
apigateway.AwsIntegration(
region = aws_region,
service = 'sqs',
integration_http_method = 'POST',
path = sqs_queue.queue_name,
options = integration_options
),
method_responses=[apigateway.MethodResponse(status_code="200")],
authorization_type=apigateway.AuthorizationType.CUSTOM,
authorizer=authorizer
)
git clone https://github.com/aws-samples/serverless-patterns
cd serverless-patterns/cdk-apigw-authorizer-sqs