Kinesis Firehose Data transformation with Lambda

Transform incoming source data and deliver the transformed data to destinations.

Amazon Kinesis FirehoseAWS LambdaAmazon S3
from xml.etree.ElementTree import ProcessingInstruction
from aws_cdk import (
    App,
    Stack,
    CfnOutput,
    RemovalPolicy,
    aws_iam as iam,
    aws_s3 as s3,
    aws_kms as kms,
    aws_events as events,
    aws_kinesisfirehose as firehose,
    aws_events_targets as targets,
    aws_lambda as lambda_,
    Duration
)
from constructs import Construct

class FirehoseTransformationCdkStack(Stack):

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        lambda_role = iam.Role(scope=self, id='cdk-lambda-role',
            assumed_by =iam.ServicePrincipal('lambda.amazonaws.com'),
            role_name='cdk-lambda-role',
            managed_policies=[
            iam.ManagedPolicy.from_aws_managed_policy_name(
                'service-role/AWSLambdaBasicExecutionRole')
            ]
        )

        lambda_role.add_to_policy(iam.PolicyStatement(
            effect=iam.Effect.ALLOW,
            resources=["*"],
            actions=[
                "firehose:DescribeDeliveryStream",
                "firehose:PutRecord",
                "firehose:StartDeliveryStreamEncryption",
                "firehose:PutRecordBatch",
                "firehose:ListDeliveryStreams"
            ]
        ))

        #tranformation Lambda function
        transformation_lambda = lambda_.Function(
            self, 
            id='TransformationLambdaCdk',
            runtime=lambda_.Runtime.PYTHON_3_9,
            code=lambda_.Code.from_asset('src'),
            handler='lambda_function.lambda_handler',
            role=lambda_role,
            timeout=Duration.seconds(90)
        )

        #KMS Key to encrypt firehose delivery stream
        firehose_kms_key = kms.Key(self, 'FirehoseKMSKey')

        #S3 bucket to be used as firehose destination
        destination_bucket = s3.Bucket(
            self,
            "Destination-Bucket",
            removal_policy=RemovalPolicy.DESTROY,
            auto_delete_objects=True,
            encryption=s3.BucketEncryption.KMS
        )

        #firehose role
        firehose_role = iam.Role(self, "firehose-role", assumed_by=iam.ServicePrincipal("firehose.amazonaws.com"))
        firehose_role_arn = firehose_role.role_arn

        #add s3 permissions to role
        firehose_role.add_to_policy(iam.PolicyStatement(
            effect=iam.Effect.ALLOW,
            resources=[destination_bucket.bucket_arn, destination_bucket.bucket_arn + "/*"],
            actions=["s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject"],
        ))
        
        #add KMS permission to Firehose role 
        firehose_role.add_to_policy(iam.PolicyStatement(
            effect=iam.Effect.ALLOW,
            resources=[firehose_kms_key.key_arn],
            actions=["kms:Decrypt",
               "kms:GenerateDataKey"],
        ))

        #add Lambda permission to Firehose role 
        firehose_role.add_to_policy(iam.PolicyStatement(
            effect=iam.Effect.ALLOW,
            resources=[transformation_lambda.function_arn],
            actions=["lambda:InvokeFunction", 
               "lambda:GetFunctionConfiguration"],
        ))

        #Kinesis Firehose
        firehose_delivery_stream = firehose.CfnDeliveryStream(
            self, "firehose-delivery-stream",
            extended_s3_destination_configuration=firehose.CfnDeliveryStream.ExtendedS3DestinationConfigurationProperty(
                bucket_arn=destination_bucket.bucket_arn,
                buffering_hints=firehose.CfnDeliveryStream.BufferingHintsProperty(
                    interval_in_seconds=60
                ),
                encryption_configuration=firehose.CfnDeliveryStream.EncryptionConfigurationProperty(
                    kms_encryption_config=firehose.CfnDeliveryStream.KMSEncryptionConfigProperty(
                        awskms_key_arn=firehose_kms_key.key_arn
                    )
                ),
                compression_format="UNCOMPRESSED",
                role_arn=firehose_role_arn,
                processing_configuration=firehose.CfnDeliveryStream.ProcessingConfigurationProperty(
                    enabled=True,
                    processors=[firehose.CfnDeliveryStream.ProcessorProperty(
                        type="Lambda",

                        # the properties below are optional
                        parameters=[firehose.CfnDeliveryStream.ProcessorParameterProperty(
                        parameter_name="LambdaArn",
                        parameter_value=transformation_lambda.function_arn
                        )]
                    )]
                )   
            )
        )

        #Output
        CfnOutput(self, "S3 Destination Bucket Name", description="S3 Destination Bucket Name", value=destination_bucket.bucket_name)
        CfnOutput(self, "Kinesis Firehose Delivery Stream Name", description="Kinesis Firehose Delivery Stream Name", value=firehose_delivery_stream.delivery_stream_name)


app = App()
FirehoseTransformationCdkStack(app, "FirehoseTransformationCdkStack")
app.synth()

Download

git clone https://github.com/aws-samples/serverless-patterns
cd serverless-patterns/firehose-transformation-cdk

Pattern repository

View on GitHub

Last updated on 26 Dec 2024

Edit this page