Claim Check Pattern using .NET

This pattern shows how to implement the claim check pattern using Amazon EventBridge, Amazon SQS, AWS Lambda and AWS Step Functions.

Amazon EventbridgeAmazon SQSAWS LambdaAWS Step Functions
import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as pipes from 'aws-cdk-lib/aws-pipes';
import * as logs from 'aws-cdk-lib/aws-logs';
import * as events from 'aws-cdk-lib/aws-events';
import * as targets from 'aws-cdk-lib/aws-events-targets';

export class ClaimCheckStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // Prerequisites that are not specific to this pattern
    const lambdaRuntime = lambda.Runtime.NODEJS_18_X;
    const deadLetterQueue1 = new sqs.Queue(this, 'ClaimCheckDeadLetterQueue1', {
      enforceSSL: true,
    });
    const deadLetterQueue2 = new sqs.Queue(this, 'ClaimCheckDeadLetterQueue2', {
      enforceSSL: true,
    });

    // Step 1: Create sample data of a "large" payload and put it in an SQS queue
    // Implementation: an AWS Lambda function ("claimCheckSampleDataCreatorLambda") creates this sample data and puts it in the SQS queue ("sampleDataWriteQueue")
    const sampleDataWriteQueue = new sqs.Queue(this, 'SampleDataWriteQueue', {
      enforceSSL: true,
      deadLetterQueue: {
        maxReceiveCount: 1,
        queue: deadLetterQueue1,
      }
    });

    const claimCheckSampleDataCreatorLambda = new lambda.Function(this, 'ClaimCheckSampleDataCreatorLambda', {
      runtime: lambdaRuntime,
      code: lambda.Code.fromAsset('lib/lambda'),
      handler: 'claimCheckSampleDataCreator.handler',
      environment: {
        QUEUE_URL: sampleDataWriteQueue.queueUrl,
      }
    });
    sampleDataWriteQueue.grantSendMessages(claimCheckSampleDataCreatorLambda);

    // Step 2: We want to put this event on our application bus. However, we don't want to put the entire payload on the bus, only the claim check.
    // Implementation: we use an EventBridge Pipe to split the payload.
    // Internally, this Pipe uses an AWS Lambda function ("claimCheckSplitLambda") as enrichment, putting the payload into DynamoDB.
    
    const claimCheckApplicationBus = new events.EventBus(this, 'ClaimCheckApplicationBus', {
      eventBusName: 'ClaimCheckApplicationBus'
    });

    const claimCheckTable = new dynamodb.Table(this, 'ClaimCheckTable', {
      partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING },
      pointInTimeRecovery: true,
      removalPolicy: cdk.RemovalPolicy.DESTROY
    });

    const claimCheckSplitLambda = new lambda.Function(this, 'ClaimCheckSplitLambda', {
      runtime: lambdaRuntime,
      code: lambda.Code.fromAsset('lib/lambda'),
      handler: 'claimCheckSplit.handler',
      environment: {
        CLAIM_CHECK_TABLE: claimCheckTable.tableName,
      }
    });
    claimCheckTable.grantWriteData(claimCheckSplitLambda);

    const splitPipeRole = new iam.Role(this, 'ClaimCheckSplitRole', {
      assumedBy: new iam.ServicePrincipal('pipes.amazonaws.com'),
    });
    sampleDataWriteQueue.grantConsumeMessages(splitPipeRole);
    claimCheckApplicationBus.grantPutEventsTo(splitPipeRole);
    claimCheckSplitLambda.grantInvoke(splitPipeRole);

    const claimCheckSplitPipe = new pipes.CfnPipe(this, 'ClaimCheckSplitPipe', {
      roleArn: splitPipeRole.roleArn,
      source: sampleDataWriteQueue.queueArn,
      target: claimCheckApplicationBus.eventBusArn,
      enrichment: claimCheckSplitLambda.functionArn,
      sourceParameters: {
        sqsQueueParameters: {
          batchSize: 1,
        },
      },
    });

    // Step 3: We want to process the event from our application bus using AWS Step Functions.
    // However, we don't have all information we need in the event. We use an EventBridge Pipe to retrieve the additional information.
    // Internally, this Pipe uses an AWS Lambda function ("ClaimCheckRetrievalLambda") as enrichment, retrieving the payload from DynamoDB.

    const sampleProcessorInputQueue = new sqs.Queue(this, 'SampleProcessorInputQueue', {
      enforceSSL: true,
      deadLetterQueue: {
        maxReceiveCount: 1,
        queue: deadLetterQueue2,
      }
    });

    const sampleProcessorInputQueueRule = new events.Rule(this, 'SampleProcessorInputQueueRule', {
      eventBus: claimCheckApplicationBus,
      eventPattern: {
        source: events.Match.prefix(''),
      },
      targets: [new targets.SqsQueue(sampleProcessorInputQueue)],
    });

    const claimCheckRetrievalLambda = new lambda.Function(this, 'ClaimCheckRetrievalLambda', {
      runtime: lambdaRuntime,
      code: lambda.Code.fromAsset('lib/lambda'),
      handler: 'claimCheckRetrieval.handler',
      environment: {
        CLAIM_CHECK_TABLE: claimCheckTable.tableName
      }
    });
    claimCheckTable.grantReadData(claimCheckRetrievalLambda);

    const targetWorkflow = new sfn.StateMachine(this, 'ClaimCheckTargetWorkflow', {
      definition: sfn.Chain.start(new sfn.Pass(this, 'Process Message')),
      tracingEnabled: true,
      logs: {
        destination: new logs.LogGroup(this, 'ClaimCheckTargetWorkflowLogGroup', {
          removalPolicy: cdk.RemovalPolicy.DESTROY,
        }),
        level: sfn.LogLevel.ALL,
      },
    });

    const retrievalPipeRole = new iam.Role(this, 'ClaimCheckRetrievalRole', {
      assumedBy: new iam.ServicePrincipal('pipes.amazonaws.com'),
    });
    sampleProcessorInputQueue.grantConsumeMessages(retrievalPipeRole);
    targetWorkflow.grantStartExecution(retrievalPipeRole);
    claimCheckRetrievalLambda.grantInvoke(retrievalPipeRole);

    const claimCheckEnrichmentPipe = new pipes.CfnPipe(this, 'ClaimCheckEnrichmentPipe', {
      roleArn: retrievalPipeRole.roleArn,
      source: sampleProcessorInputQueue.queueArn,
      target: targetWorkflow.stateMachineArn,
      enrichment: claimCheckRetrievalLambda.functionArn,
      sourceParameters: {
        sqsQueueParameters: {
          batchSize: 1,
        },
      },
      targetParameters: {
        stepFunctionStateMachineParameters: {
          invocationType: 'FIRE_AND_FORGET',
        },
      }
    });

    // Send all events on claimCheckApplicationBusRule to CloudWatch Logs to demonstrate only id's are passed on bus        
    const claimCheckApplicationBusRule = new events.Rule(this, 'ClaimCheckApplicationBusRule', {
      ruleName: 'claimCheckApplicationBusRule',
      eventBus: claimCheckApplicationBus,
      eventPattern: {
        source: events.Match.prefix(''),
      },
      targets: [new targets.CloudWatchLogGroup(new logs.LogGroup(this, 'ClaimTargetLog', {
        logGroupName: '/aws/events/claimTargetLog',
        removalPolicy: cdk.RemovalPolicy.DESTROY,
      }))],
    });

    // Relevant outputs so that the user can trigger this pattern and watch it in action.
    new cdk.CfnOutput(this, "ClaimCheckSampleDataCreatorLambdaArn", {
      value: claimCheckSampleDataCreatorLambda.functionArn,
      exportName: "ClaimCheckSampleDataCreatorLambdaArn",
    });
    new cdk.CfnOutput(this, "ClaimCheckTableName", {
      value: claimCheckTable.tableName,
      exportName: "ClaimCheckTableName",
    });
    new cdk.CfnOutput(this, "ClaimCheckTargetWorkflowArn", {
      value: targetWorkflow.stateMachineArn,
      exportName: "ClaimCheckTargetWorkflowArn",
    });
  }
}

Download

git clone https://github.com/aws-samples/serverless-patterns
cd serverless-patterns/claim-check-pattern-cdk

Pattern repository

View on GitHub

Last updated on 26 Dec 2024

Edit this page