Skip to content
Snippets Groups Projects

:rocket: RELEASE: Bump to 0.1.1

Merged Samuel Guillemet requested to merge develop into main
9 files
+ 83
180
Compare changes
  • Side-by-side
  • Inline
Files
9
""" Webhook endpoint. """
import logging
from typing import Union
from typing import Callable, Union
from fastapi import APIRouter, Depends, HTTPException
from app.components import (
endpoint_url_validation,
zoom_room_checked_in,
zoom_room_checked_out,
from app import components
from app.core.base_webhook_event_schema import (
BaseResponseWebhookEvent,
BaseWebhookEvent,
ErrorModel,
)
from app.core.base_webhook_event_schema import BaseWebhookEvent, ErrorModel
from app.core.decorators import webhook_handler
from app.core.dependencies import verify_webhook_signature
from app.utils.load_submodules import load_submodules
logger = logging.getLogger("app.api.v1.webhook")
@@ -24,32 +24,28 @@ responses = {
501: {"description": "Webhook event not supported.", "model": ErrorModel},
}
# Union of all response models for the webhook handlers.
ResponseModel = Union[
endpoint_url_validation.response_model,
zoom_room_checked_in.response_model,
zoom_room_checked_out.response_model,
# Load all webhook components and create a list of tuples containing
# the event name, handler function, and response model.
components_tuple: list[
tuple[str, Callable[..., BaseResponseWebhookEvent], BaseResponseWebhookEvent]
] = [
(
getattr(component, "event_name"),
getattr(component, "handler_function"),
getattr(component, "response_model"),
)
for component in load_submodules(components)
]
@router.post(
"", # Will be /webhook
dependencies=[Depends(verify_webhook_signature)],
response_model=ResponseModel,
response_model=Union[
tuple(response_model for _, _, response_model in components_tuple) # type: ignore
],
responses={**responses},
)
@webhook_handler(
event_name=endpoint_url_validation.event_name,
handler_function=endpoint_url_validation.handler_function,
)
@webhook_handler(
event_name=zoom_room_checked_in.event_name,
handler_function=zoom_room_checked_in.handler_function,
)
@webhook_handler(
event_name=zoom_room_checked_out.event_name,
handler_function=zoom_room_checked_out.handler_function,
)
async def webhook_route(webhook_event: BaseWebhookEvent):
"""
Handle incoming webhook events.
@@ -60,6 +56,10 @@ async def webhook_route(webhook_event: BaseWebhookEvent):
Raises:
HTTPException: If the webhook event is not supported.
"""
for event_name, handler_function, _ in components_tuple:
if webhook_event.event == event_name:
return handler_function(webhook_event.model_dump())
logger.warning("Unhandled webhook event: %s", webhook_event.event)
raise HTTPException(
detail=f"Webhook event {webhook_event.event} not supported.",
Loading