Create a Step Functions workflow to query Amazon Athena.
from aws_cdk import (
Aws,
Stack,
CfnOutput,
Duration,
aws_s3 as s3,
aws_glue as glue,
aws_stepfunctions as sf,
aws_stepfunctions_tasks as tasks,
)
region = Aws.REGION
account = Aws.ACCOUNT_ID
from constructs import Construct
class SfnAthenaCdkPythonStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
#the S3 bucket where CloudFront Access Logs will be stored
cf_access_logs = s3.Bucket(self, "LogBucket")
#S3 bucket where Athena will put the results
athena_results = s3.Bucket(self, "AthenaResultsBucket")
#create an Athena database
glue_database_name = "serverlessland_database"
myDatabase = glue.CfnDatabase(
self,
id=glue_database_name,
catalog_id=account,
database_input=glue.CfnDatabase.DatabaseInputProperty(
description=f"Glue database '{glue_database_name}'",
name=glue_database_name,
)
)
#define a table with the structure of CloudFront Logs https://docs.aws.amazon.com/athena/latest/ug/cloudfront-logs.html
athena_table = glue.CfnTable(self,
id='cfaccesslogs',
catalog_id=account,
database_name=glue_database_name,
table_input=glue.CfnTable.TableInputProperty(
name='cf_access_logs',
description='CloudFront access logs',
table_type='EXTERNAL_TABLE',
parameters = {
'skip.header.line.count': '2',
},
storage_descriptor=glue.CfnTable.StorageDescriptorProperty(
location="s3://"+cf_access_logs.bucket_name+"/",
input_format='org.apache.hadoop.mapred.TextInputFormat',
output_format='org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
compressed=False,
serde_info=glue.CfnTable.SerdeInfoProperty(
serialization_library='org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe',
parameters={
'field.delim' : ' '
}
),
columns=[
glue.CfnTable.ColumnProperty(name='date', type='date'),
glue.CfnTable.ColumnProperty(name='time', type='string'),
glue.CfnTable.ColumnProperty(name='location', type='string'),
glue.CfnTable.ColumnProperty(name='bytes', type='bigint'),
glue.CfnTable.ColumnProperty(name='request_ip', type='string'),
glue.CfnTable.ColumnProperty(name='method', type='string'),
glue.CfnTable.ColumnProperty(name='host', type='string'),
glue.CfnTable.ColumnProperty(name='uri', type='string'),
glue.CfnTable.ColumnProperty(name='status', type='string'),
glue.CfnTable.ColumnProperty(name='referer', type='string'),
glue.CfnTable.ColumnProperty(name='user_agent', type='string'),
glue.CfnTable.ColumnProperty(name='query_string', type='string'),
glue.CfnTable.ColumnProperty(name='cookie', type='string'),
glue.CfnTable.ColumnProperty(name='result_type', type='string'),
glue.CfnTable.ColumnProperty(name='request_id', type='string'),
glue.CfnTable.ColumnProperty(name='host_header', type='string'),
glue.CfnTable.ColumnProperty(name='request_protocol', type='string'),
glue.CfnTable.ColumnProperty(name='request_bytes', type='bigint'),
glue.CfnTable.ColumnProperty(name='time_taken', type='float'),
glue.CfnTable.ColumnProperty(name='xforwarded_for', type='string'),
glue.CfnTable.ColumnProperty(name='ssl_protocol', type='string'),
glue.CfnTable.ColumnProperty(name='ssl_cipher', type='string'),
glue.CfnTable.ColumnProperty(name='response_result_type', type='string'),
glue.CfnTable.ColumnProperty(name='http_version', type='string'),
glue.CfnTable.ColumnProperty(name='fle_status', type='string'),
glue.CfnTable.ColumnProperty(name='fle_encrypted_fields', type='int'),
glue.CfnTable.ColumnProperty(name='c_port', type='int'),
glue.CfnTable.ColumnProperty(name='time_to_first_byte', type='float'),
glue.CfnTable.ColumnProperty(name='x_edge_detailed_result_type', type='string'),
glue.CfnTable.ColumnProperty(name='sc_content_type', type='string'),
glue.CfnTable.ColumnProperty(name='sc_content_len', type='string'),
glue.CfnTable.ColumnProperty(name='sc_range_start', type='bigint'),
glue.CfnTable.ColumnProperty(name='sc_range_end', type='bigint')
]
),
)
)
#submit the query and wait for the results
start_query_execution_job = tasks.AthenaStartQueryExecution(self, "Start Athena Query",
query_string="SELECT uri FROM cf_access_logs limit 10",
integration_pattern=sf.IntegrationPattern.RUN_JOB, #executes the command in SYNC mode
query_execution_context=tasks.QueryExecutionContext(
database_name=glue_database_name
),
result_configuration=tasks.ResultConfiguration(
output_location=s3.Location(
bucket_name=athena_results.bucket_name,
object_key="results"
)
)
)
#get the results
get_query_results_job = tasks.AthenaGetQueryResults(self, "Get Query Results",
query_execution_id=sf.JsonPath.string_at("$.QueryExecution.QueryExecutionId"),
result_path=sf.JsonPath.string_at("$.GetQueryResults"),
)
#prepare the query to see if more results are available (up to 1000 can be retrieved)
prepare_next_params = sf.Pass(self, "Prepare Next Query Params",
parameters={
"QueryExecutionId.$": "$.StartQueryParams.QueryExecutionId",
"NextToken.$": "$.GetQueryResults.NextToken"
},
result_path=sf.JsonPath.string_at("$.StartQueryParams")
)
#check to see if more results are available
has_more_results = sf.Choice(self, "Has More Results?").when(
sf.Condition.is_present("$.GetQueryResults.NextToken"),
prepare_next_params.next(get_query_results_job)
).otherwise(sf.Succeed(self, "Done"))
#do something with each result
#here add your own logic
map = sf.Map(self, "Map State",
max_concurrency=1,
input_path=sf.JsonPath.string_at("$.GetQueryResults.ResultSet.Rows[1:]"),
result_path = sf.JsonPath.DISCARD
)
map.iterator(sf.Pass(self, "DoSomething"))
# Step function to orchestrate Athena query and retrieving the results
workflow = sf.StateMachine(self, "AthenaQuery",
definition=start_query_execution_job.next(get_query_results_job).next(map).next(has_more_results),
timeout=Duration.minutes(60)
)
CfnOutput(self, "Logs",
value=cf_access_logs.bucket_name, export_name='LogsBucket')
CfnOutput(self, "SFName",
value=workflow.state_machine_name, export_name='SFName')
CfnOutput(self, "SFArn",
value = workflow.state_machine_arn,
export_name = 'StepFunctionArn',
description = 'Step Function arn')
git clone https://github.com/aws-samples/serverless-patterns
cd serverless-patterns/sfn-athena-cdk-python