Step Functions to Athena

Create a Step Functions workflow to query Amazon Athena.

Step FunctionsAmazon Athena
from aws_cdk import (
    Aws,
    Stack,
    CfnOutput,
    Duration,
    aws_s3 as s3,
    aws_glue as glue,
    aws_stepfunctions as sf,
    aws_stepfunctions_tasks as tasks,
)
region = Aws.REGION
account = Aws.ACCOUNT_ID

from constructs import Construct

class SfnAthenaCdkPythonStack(Stack):

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        #the S3 bucket where CloudFront Access Logs will be stored
        cf_access_logs = s3.Bucket(self, "LogBucket")

        #S3 bucket where Athena will put the results
        athena_results = s3.Bucket(self, "AthenaResultsBucket")

        #create an Athena database
        glue_database_name = "serverlessland_database"
        myDatabase = glue.CfnDatabase(
            self,
            id=glue_database_name,
            catalog_id=account,
            database_input=glue.CfnDatabase.DatabaseInputProperty(
                description=f"Glue database '{glue_database_name}'",
                name=glue_database_name,
            )
        )

        #define a table with the structure of CloudFront Logs https://docs.aws.amazon.com/athena/latest/ug/cloudfront-logs.html
        athena_table = glue.CfnTable(self,
            id='cfaccesslogs',
            catalog_id=account,
            database_name=glue_database_name,
            table_input=glue.CfnTable.TableInputProperty(
                name='cf_access_logs',
                description='CloudFront access logs',
                table_type='EXTERNAL_TABLE',
                parameters = {
                    'skip.header.line.count': '2',
                },
                storage_descriptor=glue.CfnTable.StorageDescriptorProperty(
                    location="s3://"+cf_access_logs.bucket_name+"/",
                    input_format='org.apache.hadoop.mapred.TextInputFormat',
                    output_format='org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
                    compressed=False,
                    serde_info=glue.CfnTable.SerdeInfoProperty(
                        serialization_library='org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe',
                        parameters={
                            'field.delim' : '	'
                        }
                    ),
                    columns=[
                        glue.CfnTable.ColumnProperty(name='date', type='date'),
                        glue.CfnTable.ColumnProperty(name='time', type='string'),
                        glue.CfnTable.ColumnProperty(name='location', type='string'),
                        glue.CfnTable.ColumnProperty(name='bytes', type='bigint'),
                        glue.CfnTable.ColumnProperty(name='request_ip', type='string'),
                        glue.CfnTable.ColumnProperty(name='method', type='string'),
                        glue.CfnTable.ColumnProperty(name='host', type='string'),
                        glue.CfnTable.ColumnProperty(name='uri', type='string'),
                        glue.CfnTable.ColumnProperty(name='status', type='string'),
                        glue.CfnTable.ColumnProperty(name='referer', type='string'),
                        glue.CfnTable.ColumnProperty(name='user_agent', type='string'),
                        glue.CfnTable.ColumnProperty(name='query_string', type='string'),
                        glue.CfnTable.ColumnProperty(name='cookie', type='string'),
                        glue.CfnTable.ColumnProperty(name='result_type', type='string'),
                        glue.CfnTable.ColumnProperty(name='request_id', type='string'),
                        glue.CfnTable.ColumnProperty(name='host_header', type='string'),
                        glue.CfnTable.ColumnProperty(name='request_protocol', type='string'),
                        glue.CfnTable.ColumnProperty(name='request_bytes', type='bigint'),
                        glue.CfnTable.ColumnProperty(name='time_taken', type='float'),
                        glue.CfnTable.ColumnProperty(name='xforwarded_for', type='string'),
                        glue.CfnTable.ColumnProperty(name='ssl_protocol', type='string'),
                        glue.CfnTable.ColumnProperty(name='ssl_cipher', type='string'),
                        glue.CfnTable.ColumnProperty(name='response_result_type', type='string'),
                        glue.CfnTable.ColumnProperty(name='http_version', type='string'),
                        glue.CfnTable.ColumnProperty(name='fle_status', type='string'),
                        glue.CfnTable.ColumnProperty(name='fle_encrypted_fields', type='int'),
                        glue.CfnTable.ColumnProperty(name='c_port', type='int'),
                        glue.CfnTable.ColumnProperty(name='time_to_first_byte', type='float'),
                        glue.CfnTable.ColumnProperty(name='x_edge_detailed_result_type', type='string'),
                        glue.CfnTable.ColumnProperty(name='sc_content_type', type='string'),
                        glue.CfnTable.ColumnProperty(name='sc_content_len', type='string'),
                        glue.CfnTable.ColumnProperty(name='sc_range_start', type='bigint'),
                        glue.CfnTable.ColumnProperty(name='sc_range_end', type='bigint')
                    ]
                ),
            )
        )

        #submit the query and wait for the results
        start_query_execution_job = tasks.AthenaStartQueryExecution(self, "Start Athena Query",
            query_string="SELECT uri FROM cf_access_logs limit 10",
            integration_pattern=sf.IntegrationPattern.RUN_JOB, #executes the command in SYNC mode
            query_execution_context=tasks.QueryExecutionContext(
                database_name=glue_database_name
            ),
            result_configuration=tasks.ResultConfiguration(
                output_location=s3.Location(
                    bucket_name=athena_results.bucket_name,
                    object_key="results"
                )
            )
        )

        #get the results
        get_query_results_job = tasks.AthenaGetQueryResults(self, "Get Query Results",
            query_execution_id=sf.JsonPath.string_at("$.QueryExecution.QueryExecutionId"),
            result_path=sf.JsonPath.string_at("$.GetQueryResults"),
        )

        #prepare the query to see if more results are available (up to 1000 can be retrieved)
        prepare_next_params = sf.Pass(self, "Prepare Next Query Params",
            parameters={
                "QueryExecutionId.$": "$.StartQueryParams.QueryExecutionId",
                "NextToken.$": "$.GetQueryResults.NextToken"
            },
            result_path=sf.JsonPath.string_at("$.StartQueryParams")
        )

        #check to see if more results are available
        has_more_results = sf.Choice(self, "Has More Results?").when(
                    sf.Condition.is_present("$.GetQueryResults.NextToken"),
                    prepare_next_params.next(get_query_results_job)
                ).otherwise(sf.Succeed(self, "Done"))


        #do something with each result
        #here add your own logic
        map = sf.Map(self, "Map State",
            max_concurrency=1,
            input_path=sf.JsonPath.string_at("$.GetQueryResults.ResultSet.Rows[1:]"),
            result_path = sf.JsonPath.DISCARD
        )
        map.iterator(sf.Pass(self, "DoSomething"))


        # Step function to orchestrate Athena query and retrieving the results
        workflow = sf.StateMachine(self, "AthenaQuery",
            definition=start_query_execution_job.next(get_query_results_job).next(map).next(has_more_results),
            timeout=Duration.minutes(60)
        )

        CfnOutput(self, "Logs",
            value=cf_access_logs.bucket_name, export_name='LogsBucket')

        CfnOutput(self, "SFName",
            value=workflow.state_machine_name, export_name='SFName')

        CfnOutput(self, "SFArn",
            value = workflow.state_machine_arn,
            export_name = 'StepFunctionArn',
            description = 'Step Function arn')

Download

git clone https://github.com/aws-samples/serverless-patterns
cd serverless-patterns/sfn-athena-cdk-python

Pattern repository

View on GitHub

Last updated on 26 Dec 2024

Edit this page