Kinesis Firehose Data Transformation with Lambda

Transform incoming source data and deliver the transformed data to S3.

Kinesis Data FirehoseLambdaKinesis Data FirehoseS3
package com.myorg;

import software.amazon.awscdk.*;
import software.amazon.awscdk.services.kinesisfirehose.alpha.DeliveryStream;
import software.amazon.awscdk.services.kinesisfirehose.alpha.LambdaFunctionProcessor;
import software.amazon.awscdk.services.kinesisfirehose.destinations.alpha.S3Bucket;
import software.amazon.awscdk.services.lambda.Code;
import software.amazon.awscdk.services.lambda.Function;
import software.amazon.awscdk.services.lambda.Runtime;
import software.amazon.awscdk.services.s3.Bucket;
import software.amazon.awscdk.services.s3.BucketProps;
import software.amazon.awscdk.services.s3.assets.AssetOptions;
import software.constructs.Construct;

import java.util.Arrays;
import java.util.List;

import static java.util.Collections.singletonList;
import static software.amazon.awscdk.BundlingOutput.ARCHIVED;

public class CdkFirehoseLambdaS3JavaStack extends Stack {

    public CdkFirehoseLambdaS3JavaStack(final App parent, final String id) {
        this(parent, id, null);
    }

    public CdkFirehoseLambdaS3JavaStack(final Construct parent, final String id, final StackProps props) {
        super(parent, id, props);
        List<String> kinesisLambdaClientPackagingInstructions = Arrays.asList(
                "/bin/sh",
                "-c",
                "cd FirehoseTransformationLambda " +
                        "&& mvn clean install " +
                        "&& cp /asset-input/FirehoseTransformationLambda/target/KinesisLambdaClient.jar /asset-output/"
        );

        BundlingOptions.Builder builderOptions = BundlingOptions.builder()
                .command(kinesisLambdaClientPackagingInstructions)
                .image(Runtime.JAVA_17.getBundlingImage())
                .volumes(singletonList(
                        // Mount local .m2 repo to avoid download all the dependencies again inside the container
                        DockerVolume.builder()
                                .hostPath(System.getProperty("user.home") + "/.m2/")
                                .containerPath("/root/.m2/")
                                .build()
                ))
                .user("root")
                .outputType(ARCHIVED);


        Bucket bucket = new Bucket(this, "firehose-lambda-processor", new BucketProps.Builder()
                .versioned(false)
                .build());

        Function lambdaFunction = Function.Builder.create(this, "fh-lambda-transformer")
                .runtime(Runtime.JAVA_17)
                .code(Code.fromAsset("../software/", AssetOptions.builder()
                        .bundling(builderOptions
                                .command(kinesisLambdaClientPackagingInstructions)
                                .build())
                        .build()))
                .handler("com.myorg.kinesis.client.App")
                .memorySize(1024)
                .functionName("KinesisFHLambdaProcessor")
                .reservedConcurrentExecutions(1)
                .timeout(Duration.seconds(10))
                .build();

        LambdaFunctionProcessor lambdaProcessor = LambdaFunctionProcessor.Builder.create(lambdaFunction)
                .bufferInterval(Duration.minutes(5))
                .bufferSize(Size.mebibytes(3))
                .retries(5)
                .build();

        S3Bucket s3Destination = S3Bucket.Builder.create(bucket)
                .processor(lambdaProcessor)
                .build();
        DeliveryStream.Builder.create(this, "Delivery Stream")
                .destinations(List.of(s3Destination))
                .build();
    }
}

Download

git clone https://github.com/aws-samples/serverless-patterns
cd serverless-patterns/cdk-firehose-s3-transform-java

Pattern repository

View on GitHub

Last updated on 26 Dec 2024

Edit this page