EventBridge to Kinesis Firehose to S3

Forward custom events to Amazon Simple Storage Service (Amazon S3) using EventBridge rules and Amazon Kinesis Data Firehose.

EventBridgeKinesis FirehoseS3
from aws_cdk import (
    App,
    Stack,
    CfnOutput,
    RemovalPolicy,
    aws_iam as iam,
    aws_s3 as s3,
    aws_kms as kms,
    aws_events as events,
    aws_kinesisfirehose as firehose,
    aws_events_targets as targets,
    aws_logs as logs
)
from constructs import Construct


class EventBridgeKinesisFirehoseStack(Stack):

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        #KMS Key to encrypt firehose delivery stream
        firehose_kms_key = kms.Key(self, 'FirehoseKMSKey')

        #S3 bucket to be used as firehose destination
        destination_bucket = s3.Bucket(
            self,
            "Destination-Bucket",
            removal_policy=RemovalPolicy.DESTROY,
            auto_delete_objects=True,
            encryption=s3.BucketEncryption.KMS
        )

        #firehose role
        firehose_role = iam.Role(self, "firehose-role", assumed_by=iam.ServicePrincipal("firehose.amazonaws.com"))
        firehose_role_arn = firehose_role.role_arn

        #add s3 permissions to role
        firehose_role.add_to_policy(iam.PolicyStatement(
            effect=iam.Effect.ALLOW,
            resources=[destination_bucket.bucket_arn, destination_bucket.bucket_arn + "/*"],
            actions=["s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject"],
        ))

        
        #add KMS permission to role
        firehose_role.add_to_policy(iam.PolicyStatement(
            effect=iam.Effect.ALLOW,
            resources=[firehose_kms_key.key_arn],
            actions=["kms:Decrypt",
               "kms:GenerateDataKey"],
        ))

        #Kinesis Firehose
        firehose_delivery_stream = firehose.CfnDeliveryStream(
            self, "firehose-delivery-stream",
            s3_destination_configuration=firehose.CfnDeliveryStream.S3DestinationConfigurationProperty(
                bucket_arn=destination_bucket.bucket_arn,
                buffering_hints=firehose.CfnDeliveryStream.BufferingHintsProperty(
                    interval_in_seconds=60
                ),
                encryption_configuration=firehose.CfnDeliveryStream.EncryptionConfigurationProperty(
                    kms_encryption_config=firehose.CfnDeliveryStream.KMSEncryptionConfigProperty(
                        awskms_key_arn=firehose_kms_key.key_arn
                    )
                ),
                compression_format="UNCOMPRESSED",
                role_arn=firehose_role_arn
            )
        )

        # Custom EventBridge Bus
        custom_bus = events.EventBus(
            self, "bus",
            event_bus_name="test-bus-cdk"
        )

        # EventBridge Rule
        rule = events.Rule(
            self, "rule",
            event_bus=custom_bus,

        )

        # Event Pattern to filter events
        rule.add_event_pattern(
            source=["my-application"],
            detail_type=["message"]
        )

        # Kinesis Firehose as target for Eventbridge Rue
        rule.add_target(targets.KinesisFirehoseStream(firehose_delivery_stream))

        #Output
        CfnOutput(self, "S3 Destination Bucket Name", description="S3 Destination Bucket Name", value=destination_bucket.bucket_name)


       
app = App()
EventBridgeKinesisFirehoseStack(app, "EventBridgeKinesisFirehoseExample")
app.synth()

Download

git clone https://github.com/aws-samples/serverless-patterns
cd serverless-patterns/eventbridge-firehose-cdk

Pattern repository

View on GitHub

Last updated on 26 Dec 2024

Edit this page