This pattern shows how to implement the claim check pattern using Amazon EventBridge, Amazon SQS, AWS Lambda and AWS Step Functions.
using System.Collections.Generic;
using Amazon.CDK;
using Amazon.CDK.AWS.DynamoDB;
using Amazon.CDK.AWS.Events;
using Amazon.CDK.AWS.Events.Targets;
using Amazon.CDK.AWS.IAM;
using Amazon.CDK.AWS.Lambda;
using Amazon.CDK.AWS.Logs;
using Amazon.CDK.AWS.Pipes;
using Amazon.CDK.AWS.SQS;
using Amazon.CDK.AWS.StepFunctions;
using Constructs;
using EventBus = Amazon.CDK.AWS.Events.EventBus;
using EventBusProps = Amazon.CDK.AWS.Events.EventBusProps;
using LogGroupProps = Amazon.CDK.AWS.Logs.LogGroupProps;
namespace Cdk
{
    /*
     * This stack contains the resources used to demo a Serverless .NET implementation of the Claim Check Pattern.
     *
     * From a high level, this stack creates resources to fulfill the following demo:
     *
     * -> A) Generation of large payloads.
     *       A Lambda function generates "large" messages and puts them on a queue for processing.
     *
     * -> B) We want to use these messages in our Event-Driven Architecture, but we don't want to send all data via our Event Bus.
     *       To solve this, an Event Bridge Pipe consumes the incoming messages and uses an enrichment step to split messages
     *       into claim checks, stored in DynamoDB. When done, the smaller claim check message is published on the Event Bus.
     *
     * -> C) Now, we want to process a claim check in a Step Function workflow. However, the workflow needs the full message
     *       in order to do its processing. To solve this, we send the claim check to another SQS queue, which is consumed
     *       by an Event Bridge Pipe. The Pipe uses a Lambda enrichment step to resolve the full message based on the claim check,
     *       before sending the full message as input to the target workflow.
     */
    public class CdkStack : Stack
    {
        internal CdkStack(Construct scope, string id, IStackProps props = null) : base(scope, id, props)
        {
            // Storage
            var claimCheckTable = CreateTable();
            // Queues
            var (sampleDataWriteQueue, sampleProcessorInputQueue) = CreateQueues();
            // Workflows
            var targetWorkflow = CreateWorkflow();
            // Lambda functions
            var (claimCheckSplitLambda, claimCheckRetrievalLambda) = CreateFunctions(sampleDataWriteQueue, claimCheckTable);
            // Event Bus and Rules
            var claimCheckApplicationBus = CreateEventBus();
            CreateRules(claimCheckApplicationBus, sampleProcessorInputQueue);
            // Event Bridge Pipes
            CreatePipes(
                sampleProcessorInputQueue,
                sampleDataWriteQueue,
                claimCheckApplicationBus,
                targetWorkflow,
                claimCheckRetrievalLambda,
                claimCheckSplitLambda);
        }
        /// <summary>
        /// Creates the DynamoDB table used to store the mapping between claim checks and full messages.
        /// </summary>
        private Table CreateTable()
        {
            var claimCheckTable = new Table(this, "ClaimCheckTable", new TableProps
            {
                BillingMode = BillingMode.PAY_PER_REQUEST,
                TableName = "ClaimCheckTable",
                PartitionKey = new Attribute
                {
                    Name = "id",
                    Type = AttributeType.STRING
                },
                RemovalPolicy = RemovalPolicy.DESTROY
            });
            return claimCheckTable;
        }
        /// <summary>
        /// Creates the Step Functions workflow that is the end target of the full message.
        /// </summary>
        private StateMachine CreateWorkflow()
        {
            var workflow = new StateMachine(this, "ClaimCheckTargetWorkflow", new StateMachineProps
            {
                TracingEnabled = true,
                Logs = new LogOptions
                {
                    Destination = new LogGroup(this, "ClaimCheckTargetWorkflowLogGroup", new LogGroupProps
                    {
                        LogGroupName = "/aws/stepFunctions/ClaimCheckWorkflow",
                        RemovalPolicy = RemovalPolicy.DESTROY,
                        Retention = RetentionDays.ONE_WEEK
                    }),
                    Level = LogLevel.ALL
                },
                StateMachineName = "ClaimCheckTargetWorkflow",
                DefinitionBody = new ChainDefinitionBody(new Pass(this, "Process Message"))
            });
            return workflow;
        }
        /// <summary>
        /// Creates the rules for our custom event bus.
        /// </summary>
        private void CreateRules(IEventBus claimCheckApplicationBus, IQueue sampleProcessorInputQueue)
        {
            // This rule logs all messages to demonstrate that only claim checks are passed on the event bus.
            _ = new Rule(this, "ClaimCheckApplicationBusRule", new RuleProps
            {
                RuleName = "ClaimCheckApplicationBusRule",
                EventBus = claimCheckApplicationBus,
                EventPattern = new EventPattern
                {
                    // Match all events
                    Source = Match.Prefix("")
                },
                Targets = new IRuleTarget[]
                {
                    new CloudWatchLogGroup(new LogGroup(this, "ClaimTargetLog", new LogGroupProps
                    {
                        LogGroupName = "/aws/eventBus/rules/targets/ClaimCheckTargetLog",
                        RemovalPolicy = RemovalPolicy.DESTROY,
                        Retention = RetentionDays.ONE_WEEK
                    }))
                }
            });
            // This rule sends all events to the input processing queue.
            _ = new Rule(this, "SampleProcessorInputQueueRule", new RuleProps
            {
                RuleName = "SampleProcessorInputQueueRule",
                EventBus = claimCheckApplicationBus,
                EventPattern = new EventPattern
                {
                    // Match all events
                    Source = Match.Prefix("")
                },
                Targets = new IRuleTarget[]
                {
                    new SqsQueue(sampleProcessorInputQueue)
                }
            });
        }
        /// <summary>
        /// Creates the pipes for claim check split and retrieval.
        /// </summary>
        private void CreatePipes(
            IQueue sampleProcessorInputQueue,
            IQueue sampleDataWriteQueue,
            IEventBus claimCheckApplicationBus,
            IStateMachine targetWorkflow,
            IFunction claimCheckRetrievalLambda,
            IFunction claimCheckSplitLambda)
        {
            /*
             * Pipe: Claim Check Split
             *
             * -> 1) Source....: SQS sample data write queue (full messages).
             * -> 2) Enrichment: Full message sent to Lambda for claim check split (uses DynamoDB as storage).
             * -> 3) Target....: Claim check sent to Event Bus.
             */
            var claimCheckSplitPipeRole = new Role(this, "ClaimCheckSplitPipeRole", new RoleProps
            {
                AssumedBy = new ServicePrincipal("pipes.amazonaws.com"),
            });
            _ = new CfnPipe(this, "ClaimCheckSplitPipe", new CfnPipeProps
            {
                RoleArn = claimCheckSplitPipeRole.RoleArn,
                Source = sampleDataWriteQueue.QueueArn,
                Target = claimCheckApplicationBus.EventBusArn,
                Enrichment = claimCheckSplitLambda.FunctionArn,
                SourceParameters = new CfnPipe.PipeSourceParametersProperty
                {
                    SqsQueueParameters = new CfnPipe.PipeSourceSqsQueueParametersProperty
                    {
                        BatchSize = 1
                    }
                },
                Name = "ClaimCheckSplitPipe"
            });
            sampleDataWriteQueue.GrantConsumeMessages(claimCheckSplitPipeRole);
            claimCheckSplitLambda.GrantInvoke(claimCheckSplitPipeRole);
            claimCheckApplicationBus.GrantPutEventsTo(claimCheckSplitPipeRole);
            /*
             * Pipe: Claim Check Retrieval
             *
             * -> 1) Source....: SQS input processing queue (claim check message).
             * -> 2) Enrichment: Claim check sent to Lambda for enrichment.
             * -> 3) Target....: Full message sent to workflow.
             */
            var claimCheckEnrichmentPipeRole = new Role(this, "ClaimCheckEnrichmentPipeRole", new RoleProps
            {
                AssumedBy = new ServicePrincipal("pipes.amazonaws.com"),
            });
            _ = new CfnPipe(this, "ClaimCheckEnrichmentPipe", new CfnPipeProps
            {
                RoleArn = claimCheckEnrichmentPipeRole.RoleArn,
                Source = sampleProcessorInputQueue.QueueArn,
                Target = targetWorkflow.StateMachineArn,
                Enrichment = claimCheckRetrievalLambda.FunctionArn,
                SourceParameters = new CfnPipe.PipeSourceParametersProperty
                {
                    SqsQueueParameters = new CfnPipe.PipeSourceSqsQueueParametersProperty
                    {
                        BatchSize = 1
                    }
                },
                TargetParameters = new CfnPipe.PipeTargetParametersProperty
                {
                    StepFunctionStateMachineParameters = new CfnPipe.PipeTargetStateMachineParametersProperty
                    {
                        InvocationType = $"{ServiceIntegrationPattern.FIRE_AND_FORGET}"
                    }
                },
                Name = "ClaimCheckEnrichmentPipe"
            });
            sampleProcessorInputQueue.GrantConsumeMessages(claimCheckEnrichmentPipeRole);
            targetWorkflow.GrantStartExecution(claimCheckEnrichmentPipeRole);
            claimCheckRetrievalLambda.GrantInvoke(claimCheckEnrichmentPipeRole);
        }
        /// <summary>
        /// Create the SQS queues used for processing of data.
        /// </summary>
        private (IQueue sampleDataWriteQueue, IQueue sampleProcessorInputQueue) CreateQueues()
        {
            // Queue for sample data (full messages) to processed.
            var sampleDataWriteQueueDlq = new Queue(this, "SampleDataWriteQueueDLQ", new QueueProps
            {
                EnforceSSL = true,
                QueueName = "SampleDataWriteQueueDLQ"
            });
            var sampleDataWriteQueue = new Queue(this, "SampleDataWriteQueue", new QueueProps
            {
                EnforceSSL = true,
                DeadLetterQueue = new DeadLetterQueue
                {
                    MaxReceiveCount = 1,
                    Queue = sampleDataWriteQueueDlq
                },
                QueueName = "SampleDataWriteQueue"
            });
            // Queue for claim checks to be processed.
            var sampleProcessorInputQueueDlq = new Queue(this, "SampleProcessorInputQueueDLQ", new QueueProps
            {
                EnforceSSL = true,
                QueueName = "SampleProcessorInputQueueDLQ"
            });
            var sampleProcessorInputQueue = new Queue(this, "SampleProcessorInputQueue", new QueueProps
            {
                EnforceSSL = true,
                DeadLetterQueue = new DeadLetterQueue
                {
                    MaxReceiveCount = 1,
                    Queue = sampleProcessorInputQueueDlq,
                },
                QueueName = "SampleProcessorInputQueue"
            });
            return (sampleDataWriteQueue, sampleProcessorInputQueue);
        }
        /// <summary>
        /// Create the custom Event Bus.
        /// </summary>
        private IEventBus CreateEventBus()
        {
            return new EventBus(this, "ClaimCheckApplicationBus", new EventBusProps
            {
                EventBusName = "ClaimCheckApplicationBus"
            });
        }
        /// <summary>
        /// Create the .NET Lambda functions used for
        /// 1/ generating sample data,
        /// 2/ splitting full messages into claim checks and for
        /// 3/ retrieving a full message based on a claim check.
        /// </summary>
        private (IFunction claimCheckSplitLambda, IFunction claimCheckRetrievalLambda) CreateFunctions(IQueue sampleDataWriteQueue, ITable claimCheckTable)
        {
            const string lambdaBinPath = "../lambda/bin/Debug/net6.0";
            // Lambda for generation of sample data
            var claimCheckSampleDataCreatorLambda = new Function(this, "ClaimCheckSampleDataCreatorLambda", new FunctionProps
            {
                Runtime = Runtime.DOTNET_6,
                Code = Code.FromAsset(lambdaBinPath),
                Handler = "ClaimCheckPattern::ClaimCheckPattern.ClaimCheckDataCreator::FunctionHandler",
                Environment = new Dictionary<string, string>(1)
                {
                    {"QUEUE_URL", sampleDataWriteQueue.QueueUrl}
                },
                Timeout = Duration.Seconds(15),
                FunctionName = "ClaimCheckSampleDataCreatorLambda",
                LogRetention = RetentionDays.ONE_WEEK
            });
            sampleDataWriteQueue.GrantSendMessages(claimCheckSampleDataCreatorLambda);
            // Lambda for splitting a full message into a claim check
            var claimCheckSplitLambda = new Function(this, "ClaimCheckSplitLambda", new FunctionProps
            {
                Runtime = Runtime.DOTNET_6,
                Code = Code.FromAsset(lambdaBinPath),
                Handler = "ClaimCheckPattern::ClaimCheckPattern.ClaimCheckSplitter::FunctionHandler",
                Environment = new Dictionary<string, string>(1)
                {
                    {"CLAIM_CHECK_TABLE", claimCheckTable.TableName}
                },
                Timeout = Duration.Seconds(15),
                FunctionName = "ClaimCheckSplitLambda",
                LogRetention = RetentionDays.ONE_WEEK
            });
            claimCheckTable.GrantWriteData(claimCheckSplitLambda);
            // Lambda for retrieving a full message from a claim check
            var claimCheckRetrievalLambda = new Function(this, "ClaimCheckRetrievalLambda", new FunctionProps
            {
                Runtime = Runtime.DOTNET_6,
                Code = Code.FromAsset(lambdaBinPath),
                Handler = "ClaimCheckPattern::ClaimCheckPattern.ClaimCheckRetriever::FunctionHandler",
                Environment = new Dictionary<string, string>(1)
                {
                    {"CLAIM_CHECK_TABLE", claimCheckTable.TableName}
                },
                Timeout = Duration.Seconds(15),
                FunctionName = "ClaimCheckRetrievalLambda",
                LogRetention = RetentionDays.ONE_WEEK
            });
            claimCheckTable.GrantReadData(claimCheckRetrievalLambda);
            return new(claimCheckSplitLambda, claimCheckRetrievalLambda);
        }
    }
}
git clone https://github.com/aws-samples/serverless-patternscd serverless-patterns/claim-check-pattern-dotnet-cdk