Amazon CloudTrail to AWS Lambda to Amazon OpenSearch

Create an OpenSearch cluster in VPC that receives CloudTrail CloudWatch Logs from Lambda.

Amazon CloudTrailAWS LambdaAmazon OpenSearch
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from aws_cdk import (
    Stack,
    Duration,
    aws_ec2 as ec2,
    aws_cloudtrail as ct,
    aws_iam as iam,
    aws_lambda as lambda_,
    aws_logs as cwl,
    aws_logs_destinations as cwl_destinations,
    aws_opensearchserverless as opensearchserverless,
    RemovalPolicy,
)
import boto3
from aws_cdk.aws_s3_assets import Asset
from constructs import Construct

## Constants
INDEX_NAME = "cwl"
LOG_GROUP_NAME = "SvlCTCWL/svl_cloudtrail_logs"
COLLECTION_NAME = "ctcollection"
CWL_RETENTION = cwl.RetentionDays.THREE_DAYS
ARN_IAM_ROLE = "#TODO:" # add your IAM role which has permission for `arn:aws:aoss:*` for `aoss:*` action arn:aws:iam::AWSACCOUNT_NUMBER:role/os-dashboard-role"
ENCRYPTIONPOLICY = f"""{{"Rules":[{{"ResourceType":"collection","Resource":["collection/{COLLECTION_NAME}"]}}],"AWSOwnedKey":true}}"""
NETWORKPOLICY = f"""[{{"Description":"Endpoint access for Lambda and for random querying","SourceVPCEs":["VPCENDPOINTID"],"Rules":[{{"ResourceType":"collection","Resource":["collection/{COLLECTION_NAME}"]}}],"AllowFromPublic":false}},{{"Description":"Dashboards access","AllowFromPublic":true,"Rules":[{{"ResourceType":"dashboard","Resource":["collection/{COLLECTION_NAME}"]}}]}}]"""
DATAPOLICY = f"""[
  {{
    "Description": "Endpoint access for Lambda and for random querying",
    "Rules":[
        {{
          "ResourceType":"collection",
          "Resource":["collection/{COLLECTION_NAME}"],
          "Permission":["aoss:*"]
        }},
        {{
          "ResourceType":"index",
          "Resource":["index/{COLLECTION_NAME}/{INDEX_NAME}*"],
          "Permission":["aoss:*"]
        }}
    ],
    "Principal":["{ARN_IAM_ROLE}", "LAMBDAROLEARN"]
  }}
]
"""

class OpensearchServerlessStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        ################################################################################
        # VPC
        vpc = ec2.Vpc(self, "SvlCTCWLVpc")
        es_sec_grp = ec2.SecurityGroup(
            self,
            "SvlCTCWLOpenSearchSecGrp",
            vpc=vpc,
            allow_all_outbound=True,
            security_group_name="SvlCTCWLSecGrp",
        )
        es_sec_grp.add_ingress_rule(ec2.Peer.any_ipv4(), ec2.Port.tcp(80))
        es_sec_grp.add_ingress_rule(ec2.Peer.any_ipv4(), ec2.Port.tcp(443))

        endpoint = opensearchserverless.CfnVpcEndpoint(
            self,
            "SvlCTCWLEndpoint",
            name="svlctcwlendpoint",
            vpc_id=vpc.vpc_id,
            security_group_ids=[es_sec_grp.security_group_id],
            subnet_ids=[s.subnet_id for s in vpc.public_subnets],
        )

        ###############################################################################
        # Amazon OpenSearch Serverless collection
        network_policy = NETWORKPOLICY.replace("VPCENDPOINTID", endpoint.attr_id)
        net = opensearchserverless.CfnSecurityPolicy(
            self,
            "SvlCTCWLNetwork",
            name="svlctcwlnetwork",
            description=f"Open access for {COLLECTION_NAME}",
            type="network",
            policy=network_policy,
        )
        print("Network Policy attached to OpenSearch Collection", net.name)
        sec = opensearchserverless.CfnSecurityPolicy(
            self,
            "SvlCTCWLEncryption",
            name="svlctcwlencryption",
            description=f"AWS Owned key policy for {COLLECTION_NAME}",
            type="encryption",
            policy=ENCRYPTIONPOLICY,
        )

        col = opensearchserverless.CfnCollection(
            self, COLLECTION_NAME, name=COLLECTION_NAME, type="TIMESERIES"
        )
        col.add_dependency(sec)

        ################################################################################
        # Lambda for subscription filter
        subscription_filter_lambda = lambda_.Function(
            self,
            "build_os_client_and_bulk_ingest_logevents_handler",
            function_name="build_os_client_and_bulk_ingest_logevents_handler",
            runtime=lambda_.Runtime.PYTHON_3_9,
            code=lambda_.Code.from_asset(
                "lambda/build_os_client_and_bulk_ingest_logevents_handler"
            ),
            handler="index.handler",
            vpc=vpc,
            memory_size=1024,
            timeout=Duration.minutes(5),
        )

        # Load Amazon OpenSearch Service Collection to env variable
        collection_endpoint = col.attr_collection_endpoint.replace("https://", "")
        print(f"\n\nCollection endpoint: {collection_endpoint}\n")
        subscription_filter_lambda.add_environment(
            "COLLECTION_ENDPOINT", collection_endpoint
        )
        subscription_filter_lambda.add_environment("REGION", self.region)
        subscription_filter_lambda.add_to_role_policy(
            iam.PolicyStatement(actions=["aoss:*"], resources=["*"])
        )
        subscription_filter_lambda.add_to_role_policy(
            iam.PolicyStatement(actions=["logs:*"], resources=["*"])
        )
        subscription_filter_lambda.add_environment(
            "INDEX_NAME", INDEX_NAME
        )
        #################################################################################
        # The data access policy needs the lambda role ARN to allow writing.
        dap = DATAPOLICY.replace(
            "LAMBDAROLEARN", subscription_filter_lambda.role.role_arn
        )
        dat = opensearchserverless.CfnAccessPolicy(
            self,
            "SvlCTCWLData",
            name="svlctcwldata",
            type="data",
            description=f"Data access for {COLLECTION_NAME}",
            policy=dap,
        )
        print("Data access for collection is created", dat.name)
        ################################################################################
        # CWL Log Group
        log_group = cwl.LogGroup(
            self,
            "SvlCTCWLLogGroup",
            log_group_name=LOG_GROUP_NAME,
            retention=CWL_RETENTION,
            removal_policy=RemovalPolicy.DESTROY,
        )

        ################################################################################
        # CloudTrail trail
        trail = ct.Trail(
            self,
            "SvlCTCWLTrail",
            send_to_cloud_watch_logs=True,
            cloud_watch_log_group=log_group,
        )
        print("CloudTrail is created", trail._physical_name)

        ################################################################################
        # Set up subscription filter
        subscription_filter = cwl.SubscriptionFilter(
            self,
            "SvlCTCWLSubFilter",
            log_group=log_group,
            destination=cwl_destinations.LambdaDestination(subscription_filter_lambda),
            filter_pattern=cwl.FilterPattern.all_events(),
        )
        print(
            "Subscription Filter for CloudTrail is created",
            subscription_filter._physical_name,
        )

Download

git clone https://github.com/aws-samples/serverless-patterns
cd serverless-patterns/cloudtrail-streaming-to-opensearch-serverless-cdk

Pattern repository

View on GitHub

Last updated on 26 Dec 2024

Edit this page