EventBridge to Firehose Delivery Stream to S3

Apply Firehose DynamicPartitioningConfiguration to parse the input message to extract department value to use it to create S3 partition

EventBridgeKinesis FirehoseS3
using Amazon.CDK;
using Constructs;
using Amazon.CDK.AWS.Events;
using Amazon.CDK.AWS.KinesisFirehose;
using static Amazon.CDK.AWS.KinesisFirehose.CfnDeliveryStream;
using Amazon.CDK.AWS.S3;
using Amazon.CDK.AWS.IAM;
using Amazon.CDK.AWS.Events.Targets;

namespace EventBridgeFirehoseS3Stack
{
    /*
    CDK Code For -
    1. Create EventBridge Bus
    2. Create EventBridge Rule
    3. Create Firehose delivery stream
    4. Add S3 target to Firehose delivery stream
    5. Configure Firehose processor MetadataExtraction property
    6. Add partition key to target S3 path 
    */
    public class CdkStack : Stack
    {
        internal CdkStack(Construct scope, string id, IStackProps props) : base(scope, id, props)
        {
            //Create EventBridge Bus
            var eventBridgeBus = new Amazon.CDK.AWS.Events.EventBus(this, "EventBridgeBus", new Amazon.CDK.AWS.Events.EventBusProps
            {
                EventBusName = "EventBridgeBus"
            });

            //Create EventBridge rule
            var eventBridgeRule = new Rule(this, "EventBridgeRule", new RuleProps
            {
                EventPattern = new EventPattern
                {
                    DetailType = new[] { "SaveToS3" }
                },
                EventBus = eventBridgeBus
            });

            //Create target S3 bucket
            var targetBucket = new Bucket(
                this,
                "TargetBucket",
                new BucketProps { Versioned = true, BlockPublicAccess = BlockPublicAccess.BLOCK_ALL }
            );

            //Create role for Firehose Delivery Stream
            var firehoseDeliveryStreamRole = new Role(this, "FirehoseDeliveryStreamRole", new RoleProps
            {
                AssumedBy = new ServicePrincipal("firehose.amazonaws.com")
            });

            //Grant access to S3 bucket
            targetBucket.GrantWrite(firehoseDeliveryStreamRole);

            //Create Firehose delivery stream
            var firehoseDeliveryStream = new CfnDeliveryStream(this, "DeliveryStream", new CfnDeliveryStreamProps
            {
                DeliveryStreamName = "DeliveryStream",
                DeliveryStreamType = "DirectPut",
                ExtendedS3DestinationConfiguration = new ExtendedS3DestinationConfigurationProperty
                {
                    BucketArn = targetBucket.BucketArn,
                    RoleArn = firehoseDeliveryStreamRole.RoleArn,
                    DynamicPartitioningConfiguration = new DynamicPartitioningConfigurationProperty
                    {
                        Enabled = true,
                    },
                    ProcessingConfiguration = new ProcessingConfigurationProperty
                    {
                        Enabled = true,
                        Processors = new object[] {
                            new ProcessorProperty(){
                                Type = "MetadataExtraction",
                                Parameters = new object[] {
                                    new ProcessorParameterProperty(){
                                        ParameterName = "MetadataExtractionQuery",
                                        ParameterValue = "{DEPARTMENT: with_entries(.key|=ascii_upcase) .DEPARTMENT|ascii_upcase}"
                                    },
                                    new ProcessorParameterProperty(){
                                        ParameterName = "JsonParsingEngine",
                                        ParameterValue = "JQ-1.6"
                                    }
                                }
                            },
                            new ProcessorProperty(){
                                Type = "AppendDelimiterToRecord",
                                Parameters = new object[] {
                                    new ProcessorParameterProperty{
                                        ParameterName = "Delimiter",
                                        ParameterValue = "\\n"
                                    }
                                }
                            }
                        }
                    },
                    Prefix = "!{partitionKeyFromQuery:DEPARTMENT}/",
                    BufferingHints = new BufferingHintsProperty
                    {
                        IntervalInSeconds = 60,
                        SizeInMBs = 64
                    },
                    CloudWatchLoggingOptions = new CloudWatchLoggingOptionsProperty
                    {
                        Enabled = true,
                        LogGroupName = "FirehoseLogs",
                        LogStreamName = "DliveryStreamLogs"
                    },
                    ErrorOutputPrefix = "FirehoseFailures/"
                }
            });

            //Filter Detail field from EventBridge event
            eventBridgeRule.AddTarget(new KinesisFirehoseStream(firehoseDeliveryStream, new KinesisFirehoseStreamProps
            {
                Message = RuleTargetInput.FromEventPath("$.detail")
            }));

            new CfnOutput(this, "S3BucketName", new CfnOutputProps
            {
                Value = targetBucket.BucketName
            });
        }
    }
}

Download

git clone https://github.com/aws-samples/serverless-patterns
cd serverless-patterns/eventbridge-firehose-s3-cdk

Pattern repository

View on GitHub

Last updated on 26 Dec 2024

Edit this page