Create a CloudWatch Logs subscription to a Lambda function, publish to SNS topic and deliver logs to SQS queue.
from constructs import Construct
from aws_cdk import (
Aspects,
Duration,
Stack,
CfnOutput,
aws_kms as kms,
aws_lambda as _lambda,
aws_logs as logs,
aws_sns as sns,
aws_sqs as sqs,
aws_sns_subscriptions as sns_sub,
aws_iam as iam,
aws_logs_destinations as log_destinations
)
import cdk_nag
class CwlogsLambdaSnsSqsStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
#Create KMS Customer managed key for SNS server side encryption
key = kms.Key(self, "Key",
enable_key_rotation=True,
pending_window=Duration.days(7)
)
alias = key.add_alias("alias/queuecustomkey")
#Create an SNS topic
topic = sns.Topic(self, 'LambdaTopic',
master_key=alias
)
#Create a Lambda function and Lambda execution role
lambda_execution_role = iam.Role(self, "Custom Lambda Role",
assumed_by=iam.ServicePrincipal("lambda.amazonaws.com")
)
subscribed_lambda = _lambda.Function(self, "SubscribedFunction",
runtime=_lambda.Runtime.PYTHON_3_9,
code=_lambda.Code.from_asset("src/lambda"),
handler="index.lambda_handler",
timeout=Duration.seconds(2),
environment={'TOPIC_ARN': topic.topic_arn},
role=lambda_execution_role
)
#Create a CloudWatch Log group and stream
log_group = logs.LogGroup(self, "LogGroup")
log_stream = logs.LogStream(self, "LogStream",
log_group=log_group,
)
#IAM role to access CloudWatch logs
subscribed_lambda.add_to_role_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
resources=["*"],
))
#CDK Nag rule suppression for wildcard permission
cdk_nag.NagSuppressions.add_resource_suppressions(lambda_execution_role, [{
"id":"AwsSolutions-IAM5",
"reason":"To create custom Lambda execution role to write to CloudWatch.",
"applies_to": [{
"regex": "Resource::arn:aws:logs:(.*):\\*$/g"
}]
}],
apply_to_children=True
)
#IAM role to invoke Lambda
subscribed_lambda.add_to_role_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"logs:PutLogEvents"
],
resources=[log_group.log_group_arn],
))
#IAM Role to publish to SNS
subscribed_lambda.add_to_role_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"sns:Publish"
],
resources=[topic.topic_arn],
))
#IAM Role to access KMS key
subscribed_lambda.add_to_role_policy(iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"kms:GenerateDataKey",
"kms:Decrypt"
],
resources=[key.key_arn],
))
#Add Lambda as a destination for log subscription
log_group.add_subscription_filter(
id="lambdaSubscription",
destination=log_destinations.LambdaDestination(fn=subscribed_lambda),
filter_pattern=logs.FilterPattern.all_terms("ERROR")
)
#Create an SQS dead letter queue
dead_letter_queue = sqs.Queue(self, "DeadLetterQueue",
enforce_ssl=True
)
#CDK Nag rule suppression for dlq
cdk_nag.NagSuppressions.add_resource_suppressions(dead_letter_queue,
[{"id":"AwsSolutions-SQS3", "reason":"This is a dead letter queue for the main queue."}])
#Create an SQS queue
main_queue = sqs.Queue(self, "MainQueue",
enforce_ssl=True,
dead_letter_queue = sqs.DeadLetterQueue(
max_receive_count=2,
queue=dead_letter_queue
))
#Subscribe SQS queue to SNS topic
topic.add_subscription(sns_sub.SqsSubscription(
main_queue,
raw_message_delivery=True
))
Aspects.of(self).add(cdk_nag.AwsSolutionsChecks())
#Stack Outputs
CfnOutput(self, "LOG GROUP NAME",
value=log_group.log_group_name
)
CfnOutput(self, "LOG STREAM NAME",
value=log_stream.log_stream_name
)
CfnOutput(self, "QUEUE URL",
value=main_queue.queue_url
)
git clone https://github.com/aws-samples/serverless-patterns
cd serverless-patterns/cwlogs-lambda-sns-sqs-cdk