Skip to content
Snippets Groups Projects
Unverified Commit b9788ed1 authored by Samuel Guillemet's avatar Samuel Guillemet
Browse files

:package: NEW: Add sensor_data support

parent f9687783
No related branches found
No related tags found
1 merge request!7🚀 RELEASE: Bump to 0.2.0
""" Zoom Room Sensor Data Component"""
from .handler import sensor_data_handler
from .schema import ResponseWebhookSensorData
event_name = "zoomroom.sensor_data"
handler_function = sensor_data_handler
response_model = ResponseWebhookSensorData
import logging
from .schema import ResponseWebhookSensorData, SensorDataWebHook
logger = logging.getLogger("app.components.zoomroom.sensor_data")
def sensor_data_handler(body: dict):
sensor_data = SensorDataWebHook(**body)
logger.debug(
"New sensor data event in room %s [%s].",
sensor_data.payload.object.room_name,
sensor_data.payload.object.id,
)
try:
sensor_data.payload.object.sensor_data[0]
except IndexError:
logger.warning("No sensor data in the payload.")
return ResponseWebhookSensorData(message="No sensor data in the payload.")
# For the sensor data:
logger.info(
"%s | %s | %s | %s | %s | %s | %s | %s",
sensor_data.event,
sensor_data.event_ts,
sensor_data.payload.account_id,
sensor_data.payload.object.id,
sensor_data.payload.object.room_name,
sensor_data.payload.object.sensor_data[0].sensor_type.value,
sensor_data.payload.object.sensor_data[0].sensor_value,
sensor_data.payload.object.sensor_data[0].date_time,
)
# TODO: Add code here to handle the sensor_data event.
return ResponseWebhookSensorData(message="Sensor data event received.")
from datetime import datetime
from enum import Enum
from typing import List
from pydantic import BaseModel, Field
from app.core.base_webhook_event_schema import (
BaseResponseWebhookEvent,
BaseWebhookEvent,
)
class SensorType(Enum):
CO2 = "CO2"
TEMPERATURE = "TEMPERATURE"
REAL_TIME_PEOPLE_COUNT = "REAL_TIME_PEOPLE_COUNT"
HUMIDITY = "HUMIDITY"
VOC = "VOC"
class SensorData(BaseModel):
date_time: datetime = Field(
..., description="The time when the sensor data was reported."
)
sensor_type: SensorType = Field(..., description="The type of sensor.")
sensor_value: str = Field(..., description="The value of the sensor.")
class Object(BaseModel):
id: str = Field(..., description="The ID of the Zoom Room.")
room_name: str = Field(..., description="The name of the Zoom Room.")
device_id: str = Field(..., description="The ID of the device.")
sensor_data: List[SensorData] = Field(
..., description="The sensor data reported by the device."
)
class Payload(BaseModel):
account_id: str = Field(..., description="The account ID of the Zoom account.")
object: Object = Field(..., description="Information about the Zoom Room.")
class SensorDataWebHook(BaseWebhookEvent):
"""This is the schema for the request body of the webhook for the zoomroom.sensor_data event.
Args:
WebhookEvent (WebhookEvent): The base webhook model for the schema.
"""
payload: Payload = Field(
...,
description="Contains all the information about the Zoom Room that reported the sensor data.",
)
class ResponseWebhookSensorData(BaseResponseWebhookEvent):
"""This is the schema for the response for zoomroom.sensor_data webhook.
Args:
BaseResponseWebhookEvent (BaseResponseWebhookEvent): The base model for the schema.
"""
message: str = Field(..., description="The message of the response.")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment