Amazon API Gateway HTTP API to Amazon EventBridge

Create an Amazon API Gateway HTTP API connected to Amazon EventBridge.

API Gateway HTTP APIAmazon EventBridge
from aws_cdk import (
    Stack,
    aws_events as events,
    aws_logs as logs,
    aws_events_targets as cloudw,
    aws_apigatewayv2_alpha as apigw,
    CfnOutput
)
from aws_cdk.aws_apigateway import (
    IntegrationType
)
from aws_cdk.aws_apigatewayv2 import (
    CfnIntegration,
    CfnRoute,
)
from aws_cdk.aws_iam import (
    Role,
    ServicePrincipal,
    PolicyStatement,
    Effect
)
from constructs import Construct


class ApigwHttpApiEventbridgeStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Creating an event bus. Change `event_bus_name` with your own name
        event_bus = events.EventBus(
            scope=self,
            id='MyEventBus',
            event_bus_name='MyEventBus'
        )

        # Match the events to log to certain region
        event_logger_rule = events.Rule(
            scope=self,
            id="EventLoggerRule",
            description="Log all events",
            event_pattern=events.EventPattern(
                region=['app-southeast-2']
            ),
            event_bus=event_bus
        )

        # Creating log group
        log_group = logs.LogGroup(
            scope=self,
            id='EventLogGroup',
            log_group_name=f'/aws/events/{event_bus.event_bus_name}'
        )

        event_logger_rule.add_target(
            cloudw.CloudWatchLogGroup(log_group=log_group)
        )

        # Creating the HTTP Api. Change `api_name` with your own name
        http_api = apigw.HttpApi(
            scope=self,
            id='MyRestApi',
            api_name='MyRestApi'
        )

        api_role = Role(
            scope=self,
            id='EventBridgeIntegrationRole',
            assumed_by=ServicePrincipal('apigateway.amazonaws.com')
        )

        api_role.add_to_policy(
            PolicyStatement(
                effect=Effect.ALLOW,
                resources=[event_bus.event_bus_arn],
                actions=['events:PutEvents']
            )
        )

        event_bridge_integration = CfnIntegration(
            scope=self,
            id='EventBridgeIntegration',
            integration_type=str(IntegrationType.AWS_PROXY),
            integration_subtype='EventBridge-PutEvents',
            credentials_arn=api_role.role_arn,
            api_id=http_api.http_api_id,
            request_parameters={
                'Source': 'WebApp',
                'DetailType': 'MyDetailType',
                'Detail': '$request.body',
                'EventBusName': event_bus.event_bus_arn
            },
            payload_format_version='1.0',
            timeout_in_millis=10000
        )

        CfnRoute(
            scope=self,
            id='EventRoute',
            api_id=http_api.http_api_id,
            route_key='POST /',
            target=f'integrations/{event_bridge_integration.ref}'
        )

        CfnOutput(
            scope=self,
            id='apiUrl',
            value=http_api.url,
            description='HTTP API endpoint URL'
        )

Download

git clone https://github.com/aws-samples/serverless-patterns
cd serverless-patterns/apigw-http-api-eventbridge-python

Pattern repository

View on GitHub

Last updated on 26 Dec 2024

Edit this page