diff --git a/.env b/.env index 36a5678b92dc333f5085a1a5833031e1590bdde6..55ca29fdf7f800937e1033b30e744d5998e1d0c0 100644 --- a/.env +++ b/.env @@ -44,3 +44,6 @@ AUDITING=True # ETCD_PASSWORD=xxx # Default 5 minutes # ETCD_AUTH_TOKEN_TTL=60 + +STOMP_HEARTBEATS_OUTGOING_MS = 5000 +STOMP_HEARTBEATS_INCOMING_MS = 5000 \ No newline at end of file diff --git a/notifications_consumer/app.py b/notifications_consumer/app.py index 0dd8a692acc30c96627449ec3d80f1a7729e8b1d..f7abfd7b6b54257095777233a29da523624c89fc 100644 --- a/notifications_consumer/app.py +++ b/notifications_consumer/app.py @@ -24,6 +24,7 @@ class App: name=self.config.CONSUMER_NAME, listener_class=NotificationsConsumer, listener_kwargs={"config": self.config}, + heartbeats=self.config.STOMP_HEARTBEATS, ) logging.info("Finished initialising %s processor - Waiting for messages...", self.config.PROCESSOR) diff --git a/notifications_consumer/config.py b/notifications_consumer/config.py index 9306fecb797f84cddb097446ab26fcf838cbd391..68ffd3d321de3c43ce88704c482c7a2a46c06af8 100644 --- a/notifications_consumer/config.py +++ b/notifications_consumer/config.py @@ -93,6 +93,11 @@ class Config: # Service SERVICE_NAME = os.getenv("SERVICE_NAME", "CERN Notifications") + # STOMP + STOMP_HEARTBEATS = ( + int(os.getenv("STOMP_HEARTBEATS_OUTGOING_MS", 0)), + int(os.getenv("STOMP_HEARTBEATS_INCOMING_MS", 0))) + TEMPLATES = Environment( loader=PrefixLoader( { diff --git a/notifications_consumer/megabus/client_individual_consumer.py b/notifications_consumer/megabus/client_individual_consumer.py index 446c7f897f55a9f8ba0b8b6dad2e8718786e7e44..00aa6a08ad5ada882714d6a2cd75e8c4d529e0fa 100644 --- a/notifications_consumer/megabus/client_individual_consumer.py +++ b/notifications_consumer/megabus/client_individual_consumer.py @@ -56,7 +56,7 @@ class ClientIndividualConsumer(Consumer): listener as well, to allow easy ack/nack implementations. """ - def __init__(self, listener_class, listener_kwargs, auto_connect=True, **kwargs): + def __init__(self, listener_class, listener_kwargs, auto_connect=True, heartbeats=(0, 0), **kwargs): """Initialize the custom consumer.""" Consumer.__init__(self, listener=None, auto_connect=False, **kwargs) @@ -66,6 +66,7 @@ class ClientIndividualConsumer(Consumer): self._hosts = [] # type : List[str] self._on_disconnect_evt = threading.Event() self._asyncio_loop = asyncio.get_event_loop() + self.heartbeats = heartbeats # Auto connect must be called until _listener_kwargs are set if auto_connect: @@ -75,7 +76,7 @@ class ClientIndividualConsumer(Consumer): pass def _create_listener_instance( - self, + self, ): """Create a new instance of the Listener class.""" return self._external_listener(**self._listener_kwargs, asyncio_loop=self._asyncio_loop) @@ -131,7 +132,7 @@ class ClientIndividualConsumer(Consumer): self._listeners.append(external_listener_instance) try: try: - connection = stomp.Connection([(host, self._port)], keepalive=True) + connection = stomp.Connection([(host, self._port)], keepalive=True, heartbeats=self.heartbeats) listener = _ClientIndividualStompListener( external_listener_instance, connection, self._on_disconnect_evt )