Invoke a Step functions workflow from Appsync

Invoke a Step Functions workflow through a Lambda resolver from Appsync

AWS AppSyncAWS LambdaAWS Step Functions
from os import path

from aws_cdk import (
    Stack,
    aws_appsync as appsync,
    aws_dynamodb as dynamodb,
    aws_iam as role,
    aws_stepfunctions as sf,
    aws_stepfunctions_tasks as sf_tasks,

    aws_lambda as lambda_, CfnOutput, Duration

)

from constructs import Construct

dirname = path.dirname(__file__)


class CdkMomoStack(Stack):

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        with open(path.join(dirname, "schema.txt"), 'r') as file:
            data_schema = file.read().replace('\n', '')

        # dynamodb service role

        general_role = role.Role(self, 'dynamodbRole',
                                 assumed_by=role.ServicePrincipal("appsync.amazonaws.com"))
        states_role = role.Role(self, 'statesRole',
                                assumed_by=role.ServicePrincipal("states.amazonaws.com"))
        general_role.add_to_policy(
            role.PolicyStatement(  # Restrict to listing and describing tables
                actions=["states:StartSyncExecution"],
                resources=["*"])

        )

        general_role.add_managed_policy(role.ManagedPolicy.from_aws_managed_policy_name("AmazonDynamoDBFullAccess"))
        states_role.add_managed_policy(role.ManagedPolicy.from_aws_managed_policy_name("AmazonDynamoDBFullAccess"))
        general_role.add_managed_policy(role.ManagedPolicy.from_aws_managed_policy_name("AWSLambda_FullAccess"))
        general_role \
            .add_managed_policy(role.ManagedPolicy
                                .from_aws_managed_policy_name("service-role/AWSAppSyncPushToCloudWatchLogs"))

        api = appsync.CfnGraphQLApi(
            self, "cdkMomoApi", name="cdkMomoApi",
            authentication_type='API_KEY',
            log_config=appsync.CfnGraphQLApi.LogConfigProperty(
                cloud_watch_logs_role_arn=general_role.role_arn,
                exclude_verbose_content=False,
                field_log_level='ALL'

            ),

            xray_enabled=True
        )

        graphql_schema = appsync.CfnGraphQLSchema(self, "CdkMomoGraphQLSchema",
                                                  api_id=api.attr_api_id,

                                                  definition=data_schema
                                                  )

        cdk_momo_table = dynamodb.Table(self, "CdkMomoTable",
                                        table_name="CdkMomoTable",
                                        partition_key=dynamodb.Attribute(
                                            name='id',
                                            type=dynamodb.AttributeType.STRING,

                                        ),

                                        billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,

                                        )
        fail_step = sf.Fail(self, 'Fail', cause="Failed to Update Apartment Status", error="ConditionalFailedException")

        # Define Step function tasks
        change_apartment_status = sf_tasks.DynamoUpdateItem(
            self, "Change Apartment Status",
            key={
                'id': sf_tasks.DynamoAttributeValue.from_string(sf.JsonPath.string_at("$.details.accountId")),

            },
            table=cdk_momo_table,
            condition_expression="attribute_exists(id)",
            update_expression="SET bookedStatus = :bookedStatus",
            expression_attribute_values={
                ":bookedStatus": sf_tasks.DynamoAttributeValue.from_string('Booked')
            },
            result_path="$.updateResult",

        ).add_catch(handler=fail_step)

        wait_step = sf.Wait(self, 'Wait', time=sf.WaitTime.duration(Duration.seconds(30)))

        get_status = sf_tasks.DynamoGetItem(
            self, "Get Booking Status",
            table=cdk_momo_table,
            key={
                'id': sf_tasks.DynamoAttributeValue.from_string(sf.JsonPath.string_at("$.details.accountId")),

            },
            result_path='$.getItem'
        ).add_catch(handler=fail_step)
        apartment_not_paid = sf_tasks.DynamoUpdateItem(
            self, 'Not Paid(Revert Apartment Status)',
            key={
                'id': sf_tasks.DynamoAttributeValue.from_string(sf.JsonPath.string_at("$.getItem.Item.id.S")),

            },
            table=cdk_momo_table,
            condition_expression="attribute_exists(id)",
            update_expression="SET bookedStatus = :bookedStatus",
            expression_attribute_values={
                ":bookedStatus": sf_tasks.DynamoAttributeValue.from_string('Pending')
            },
            result_path="$.notPaid",

        )
        apartment_paid = sf.Pass(self, 'Apartment Paid', comment="Apartment Paid")

        definition = change_apartment_status.next(wait_step) \
            .next(get_status) \
            .next(sf.Choice(self, "Has the Apartment been Paid ?", comment="Has the Apartment been Paid ?")
                  .when(sf.Condition.string_equals(sf.JsonPath.string_at("$.getItem.Item.id.S"), '1234567') and
                        sf.Condition.string_equals(sf.JsonPath.string_at("$.getItem.Item.bookedStatus.S"), 'Paid'),
                        apartment_paid
                        )
                  .otherwise(apartment_not_paid))

        step = sf.StateMachine(self, 'MomoStateMachine',

                               definition=definition,

                               state_machine_name="MomoStateMachine",
                               state_machine_type=sf.StateMachineType.STANDARD
                               )

        # Create the AWS Lambda function to subscribe to Amazon SQS queue
        # The source code is in './lambda' directory
        lambda_function = lambda_.Function(
            self, "LambdaFunction",
            runtime=lambda_.Runtime.PYTHON_3_9,
            handler="start_step_function.handler",
            code=lambda_.Code.from_asset(path.join(dirname, "lambda")),
        )

        cdk_momo_data_source = appsync.CfnDataSource(self, "CDKMOMODatasource", api_id=api.attr_api_id,
                                                     name="CdkMomoDataSource", type='AWS_LAMBDA',
                                                     lambda_config=appsync.CfnDataSource.LambdaConfigProperty(
                                                         lambda_function_arn=lambda_function.function_arn
                                                     ),
                                                     service_role_arn=general_role.role_arn)

        add_demo_resolver = appsync.CfnResolver(
            self,
            "AddStepFunctionsExecutionResolver",
            api_id=api.attr_api_id,
            type_name="Mutation",
            field_name="addStepFunctionExecution",
            data_source_name=cdk_momo_data_source.attr_name

        )

        step.grant_start_execution(lambda_function)
        add_demo_resolver.add_depends_on(graphql_schema)
        lambda_function.add_environment('STEP_FNS_ARN', step.state_machine_arn)
        cdk_momo_table.grant_full_access(lambda_function)
        CfnOutput(self, "LambdaFunctionName",
                  value=lambda_function.function_name,
                  export_name='FunctionName',
                  description='Function name')
        CfnOutput(self, "AppSync Url",
                  value=api.attr_graph_ql_url,
                  export_name='AppsyncUrl',
                  description='AppsyncUrl')

        CfnOutput(self, "database arn",
                  value=cdk_momo_table.table_arn,
                  export_name='DynamoDbArn',
                  description='DynamoDBArn')

        CfnOutput(self, "step functions arn",
                  value=step.state_machine_arn,
                  export_name='StepFunctionArn',
                  description='StepFunctionArn')

Download

git clone https://github.com/aws-samples/serverless-patterns
cd serverless-patterns/appsync-lambda-sfn-cdk

Pattern repository

View on GitHub

Last updated on 26 Dec 2024

Edit this page