From addf0ff627e30684d9630b1dda37fcc7b9a1f9b8 Mon Sep 17 00:00:00 2001 From: Carina Antunes <carina.oliveira.antunes@cern.ch> Date: Wed, 26 Apr 2023 16:22:55 +0200 Subject: [PATCH] Add stomp heartbeats --- .env | 3 +++ notifications_consumer/app.py | 1 + notifications_consumer/config.py | 5 +++++ .../megabus/client_individual_consumer.py | 7 ++++--- 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/.env b/.env index 36a5678..55ca29f 100644 --- a/.env +++ b/.env @@ -44,3 +44,6 @@ AUDITING=True # ETCD_PASSWORD=xxx # Default 5 minutes # ETCD_AUTH_TOKEN_TTL=60 + +STOMP_HEARTBEATS_OUTGOING_MS = 5000 +STOMP_HEARTBEATS_INCOMING_MS = 5000 \ No newline at end of file diff --git a/notifications_consumer/app.py b/notifications_consumer/app.py index 0dd8a69..f7abfd7 100644 --- a/notifications_consumer/app.py +++ b/notifications_consumer/app.py @@ -24,6 +24,7 @@ class App: name=self.config.CONSUMER_NAME, listener_class=NotificationsConsumer, listener_kwargs={"config": self.config}, + heartbeats=self.config.STOMP_HEARTBEATS, ) logging.info("Finished initialising %s processor - Waiting for messages...", self.config.PROCESSOR) diff --git a/notifications_consumer/config.py b/notifications_consumer/config.py index 9306fec..68ffd3d 100644 --- a/notifications_consumer/config.py +++ b/notifications_consumer/config.py @@ -93,6 +93,11 @@ class Config: # Service SERVICE_NAME = os.getenv("SERVICE_NAME", "CERN Notifications") + # STOMP + STOMP_HEARTBEATS = ( + int(os.getenv("STOMP_HEARTBEATS_OUTGOING_MS", 0)), + int(os.getenv("STOMP_HEARTBEATS_INCOMING_MS", 0))) + TEMPLATES = Environment( loader=PrefixLoader( { diff --git a/notifications_consumer/megabus/client_individual_consumer.py b/notifications_consumer/megabus/client_individual_consumer.py index 446c7f8..00aa6a0 100644 --- a/notifications_consumer/megabus/client_individual_consumer.py +++ b/notifications_consumer/megabus/client_individual_consumer.py @@ -56,7 +56,7 @@ class ClientIndividualConsumer(Consumer): listener as well, to allow easy ack/nack implementations. """ - def __init__(self, listener_class, listener_kwargs, auto_connect=True, **kwargs): + def __init__(self, listener_class, listener_kwargs, auto_connect=True, heartbeats=(0, 0), **kwargs): """Initialize the custom consumer.""" Consumer.__init__(self, listener=None, auto_connect=False, **kwargs) @@ -66,6 +66,7 @@ class ClientIndividualConsumer(Consumer): self._hosts = [] # type : List[str] self._on_disconnect_evt = threading.Event() self._asyncio_loop = asyncio.get_event_loop() + self.heartbeats = heartbeats # Auto connect must be called until _listener_kwargs are set if auto_connect: @@ -75,7 +76,7 @@ class ClientIndividualConsumer(Consumer): pass def _create_listener_instance( - self, + self, ): """Create a new instance of the Listener class.""" return self._external_listener(**self._listener_kwargs, asyncio_loop=self._asyncio_loop) @@ -131,7 +132,7 @@ class ClientIndividualConsumer(Consumer): self._listeners.append(external_listener_instance) try: try: - connection = stomp.Connection([(host, self._port)], keepalive=True) + connection = stomp.Connection([(host, self._port)], keepalive=True, heartbeats=self.heartbeats) listener = _ClientIndividualStompListener( external_listener_instance, connection, self._on_disconnect_evt ) -- GitLab