Invoke a Step Functions workflow through a Lambda resolver from Appsync
from os import path
from aws_cdk import (
Stack,
aws_appsync as appsync,
aws_dynamodb as dynamodb,
aws_iam as role,
aws_stepfunctions as sf,
aws_stepfunctions_tasks as sf_tasks,
aws_lambda as lambda_, CfnOutput, Duration
)
from constructs import Construct
dirname = path.dirname(__file__)
class CdkMomoStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
with open(path.join(dirname, "schema.txt"), 'r') as file:
data_schema = file.read().replace('\n', '')
# dynamodb service role
general_role = role.Role(self, 'dynamodbRole',
assumed_by=role.ServicePrincipal("appsync.amazonaws.com"))
states_role = role.Role(self, 'statesRole',
assumed_by=role.ServicePrincipal("states.amazonaws.com"))
general_role.add_to_policy(
role.PolicyStatement( # Restrict to listing and describing tables
actions=["states:StartSyncExecution"],
resources=["*"])
)
general_role.add_managed_policy(role.ManagedPolicy.from_aws_managed_policy_name("AmazonDynamoDBFullAccess"))
states_role.add_managed_policy(role.ManagedPolicy.from_aws_managed_policy_name("AmazonDynamoDBFullAccess"))
general_role.add_managed_policy(role.ManagedPolicy.from_aws_managed_policy_name("AWSLambda_FullAccess"))
general_role \
.add_managed_policy(role.ManagedPolicy
.from_aws_managed_policy_name("service-role/AWSAppSyncPushToCloudWatchLogs"))
api = appsync.CfnGraphQLApi(
self, "cdkMomoApi", name="cdkMomoApi",
authentication_type='API_KEY',
log_config=appsync.CfnGraphQLApi.LogConfigProperty(
cloud_watch_logs_role_arn=general_role.role_arn,
exclude_verbose_content=False,
field_log_level='ALL'
),
xray_enabled=True
)
graphql_schema = appsync.CfnGraphQLSchema(self, "CdkMomoGraphQLSchema",
api_id=api.attr_api_id,
definition=data_schema
)
cdk_momo_table = dynamodb.Table(self, "CdkMomoTable",
table_name="CdkMomoTable",
partition_key=dynamodb.Attribute(
name='id',
type=dynamodb.AttributeType.STRING,
),
billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
)
fail_step = sf.Fail(self, 'Fail', cause="Failed to Update Apartment Status", error="ConditionalFailedException")
# Define Step function tasks
change_apartment_status = sf_tasks.DynamoUpdateItem(
self, "Change Apartment Status",
key={
'id': sf_tasks.DynamoAttributeValue.from_string(sf.JsonPath.string_at("$.details.accountId")),
},
table=cdk_momo_table,
condition_expression="attribute_exists(id)",
update_expression="SET bookedStatus = :bookedStatus",
expression_attribute_values={
":bookedStatus": sf_tasks.DynamoAttributeValue.from_string('Booked')
},
result_path="$.updateResult",
).add_catch(handler=fail_step)
wait_step = sf.Wait(self, 'Wait', time=sf.WaitTime.duration(Duration.seconds(30)))
get_status = sf_tasks.DynamoGetItem(
self, "Get Booking Status",
table=cdk_momo_table,
key={
'id': sf_tasks.DynamoAttributeValue.from_string(sf.JsonPath.string_at("$.details.accountId")),
},
result_path='$.getItem'
).add_catch(handler=fail_step)
apartment_not_paid = sf_tasks.DynamoUpdateItem(
self, 'Not Paid(Revert Apartment Status)',
key={
'id': sf_tasks.DynamoAttributeValue.from_string(sf.JsonPath.string_at("$.getItem.Item.id.S")),
},
table=cdk_momo_table,
condition_expression="attribute_exists(id)",
update_expression="SET bookedStatus = :bookedStatus",
expression_attribute_values={
":bookedStatus": sf_tasks.DynamoAttributeValue.from_string('Pending')
},
result_path="$.notPaid",
)
apartment_paid = sf.Pass(self, 'Apartment Paid', comment="Apartment Paid")
definition = change_apartment_status.next(wait_step) \
.next(get_status) \
.next(sf.Choice(self, "Has the Apartment been Paid ?", comment="Has the Apartment been Paid ?")
.when(sf.Condition.string_equals(sf.JsonPath.string_at("$.getItem.Item.id.S"), '1234567') and
sf.Condition.string_equals(sf.JsonPath.string_at("$.getItem.Item.bookedStatus.S"), 'Paid'),
apartment_paid
)
.otherwise(apartment_not_paid))
step = sf.StateMachine(self, 'MomoStateMachine',
definition=definition,
state_machine_name="MomoStateMachine",
state_machine_type=sf.StateMachineType.STANDARD
)
# Create the AWS Lambda function to subscribe to Amazon SQS queue
# The source code is in './lambda' directory
lambda_function = lambda_.Function(
self, "LambdaFunction",
runtime=lambda_.Runtime.PYTHON_3_9,
handler="start_step_function.handler",
code=lambda_.Code.from_asset(path.join(dirname, "lambda")),
)
cdk_momo_data_source = appsync.CfnDataSource(self, "CDKMOMODatasource", api_id=api.attr_api_id,
name="CdkMomoDataSource", type='AWS_LAMBDA',
lambda_config=appsync.CfnDataSource.LambdaConfigProperty(
lambda_function_arn=lambda_function.function_arn
),
service_role_arn=general_role.role_arn)
add_demo_resolver = appsync.CfnResolver(
self,
"AddStepFunctionsExecutionResolver",
api_id=api.attr_api_id,
type_name="Mutation",
field_name="addStepFunctionExecution",
data_source_name=cdk_momo_data_source.attr_name
)
step.grant_start_execution(lambda_function)
add_demo_resolver.add_depends_on(graphql_schema)
lambda_function.add_environment('STEP_FNS_ARN', step.state_machine_arn)
cdk_momo_table.grant_full_access(lambda_function)
CfnOutput(self, "LambdaFunctionName",
value=lambda_function.function_name,
export_name='FunctionName',
description='Function name')
CfnOutput(self, "AppSync Url",
value=api.attr_graph_ql_url,
export_name='AppsyncUrl',
description='AppsyncUrl')
CfnOutput(self, "database arn",
value=cdk_momo_table.table_arn,
export_name='DynamoDbArn',
description='DynamoDBArn')
CfnOutput(self, "step functions arn",
value=step.state_machine_arn,
export_name='StepFunctionArn',
description='StepFunctionArn')
git clone https://github.com/aws-samples/serverless-patterns
cd serverless-patterns/appsync-lambda-sfn-cdk