Apply Firehose DynamicPartitioningConfiguration to parse the input message to extract department value to use it to create S3 partition
using Amazon.CDK;
using Constructs;
using Amazon.CDK.AWS.Events;
using Amazon.CDK.AWS.KinesisFirehose;
using static Amazon.CDK.AWS.KinesisFirehose.CfnDeliveryStream;
using Amazon.CDK.AWS.S3;
using Amazon.CDK.AWS.IAM;
using Amazon.CDK.AWS.Events.Targets;
namespace EventBridgeFirehoseS3Stack
{
/*
CDK Code For -
1. Create EventBridge Bus
2. Create EventBridge Rule
3. Create Firehose delivery stream
4. Add S3 target to Firehose delivery stream
5. Configure Firehose processor MetadataExtraction property
6. Add partition key to target S3 path
*/
public class CdkStack : Stack
{
internal CdkStack(Construct scope, string id, IStackProps props) : base(scope, id, props)
{
//Create EventBridge Bus
var eventBridgeBus = new Amazon.CDK.AWS.Events.EventBus(this, "EventBridgeBus", new Amazon.CDK.AWS.Events.EventBusProps
{
EventBusName = "EventBridgeBus"
});
//Create EventBridge rule
var eventBridgeRule = new Rule(this, "EventBridgeRule", new RuleProps
{
EventPattern = new EventPattern
{
DetailType = new[] { "SaveToS3" }
},
EventBus = eventBridgeBus
});
//Create target S3 bucket
var targetBucket = new Bucket(
this,
"TargetBucket",
new BucketProps { Versioned = true, BlockPublicAccess = BlockPublicAccess.BLOCK_ALL }
);
//Create role for Firehose Delivery Stream
var firehoseDeliveryStreamRole = new Role(this, "FirehoseDeliveryStreamRole", new RoleProps
{
AssumedBy = new ServicePrincipal("firehose.amazonaws.com")
});
//Grant access to S3 bucket
targetBucket.GrantWrite(firehoseDeliveryStreamRole);
//Create Firehose delivery stream
var firehoseDeliveryStream = new CfnDeliveryStream(this, "DeliveryStream", new CfnDeliveryStreamProps
{
DeliveryStreamName = "DeliveryStream",
DeliveryStreamType = "DirectPut",
ExtendedS3DestinationConfiguration = new ExtendedS3DestinationConfigurationProperty
{
BucketArn = targetBucket.BucketArn,
RoleArn = firehoseDeliveryStreamRole.RoleArn,
DynamicPartitioningConfiguration = new DynamicPartitioningConfigurationProperty
{
Enabled = true,
},
ProcessingConfiguration = new ProcessingConfigurationProperty
{
Enabled = true,
Processors = new object[] {
new ProcessorProperty(){
Type = "MetadataExtraction",
Parameters = new object[] {
new ProcessorParameterProperty(){
ParameterName = "MetadataExtractionQuery",
ParameterValue = "{DEPARTMENT: with_entries(.key|=ascii_upcase) .DEPARTMENT|ascii_upcase}"
},
new ProcessorParameterProperty(){
ParameterName = "JsonParsingEngine",
ParameterValue = "JQ-1.6"
}
}
},
new ProcessorProperty(){
Type = "AppendDelimiterToRecord",
Parameters = new object[] {
new ProcessorParameterProperty{
ParameterName = "Delimiter",
ParameterValue = "\\n"
}
}
}
}
},
Prefix = "!{partitionKeyFromQuery:DEPARTMENT}/",
BufferingHints = new BufferingHintsProperty
{
IntervalInSeconds = 60,
SizeInMBs = 64
},
CloudWatchLoggingOptions = new CloudWatchLoggingOptionsProperty
{
Enabled = true,
LogGroupName = "FirehoseLogs",
LogStreamName = "DliveryStreamLogs"
},
ErrorOutputPrefix = "FirehoseFailures/"
}
});
//Filter Detail field from EventBridge event
eventBridgeRule.AddTarget(new KinesisFirehoseStream(firehoseDeliveryStream, new KinesisFirehoseStreamProps
{
Message = RuleTargetInput.FromEventPath("$.detail")
}));
new CfnOutput(this, "S3BucketName", new CfnOutputProps
{
Value = targetBucket.BucketName
});
}
}
}
git clone https://github.com/aws-samples/serverless-patterns
cd serverless-patterns/eventbridge-firehose-s3-cdk