From 53ae25892a8e83c8b5a037f171f04a1dfd9e4d21 Mon Sep 17 00:00:00 2001 From: Carina Antunes <carina.oliveira.antunes@cern.ch> Date: Mon, 8 Aug 2022 12:39:01 +0200 Subject: [PATCH] [hotfix] reconnect loop improvements --- notifications_consumer/auditing.py | 5 ++ notifications_consumer/consumer.py | 4 ++ .../megabus/client_individual_consumer.py | 47 ++++++++++++++++--- .../megabus/client_individual_listener.py | 6 ++- .../processors/processor.py | 4 ++ 5 files changed, 58 insertions(+), 8 deletions(-) diff --git a/notifications_consumer/auditing.py b/notifications_consumer/auditing.py index 9a7e38b..1706c89 100644 --- a/notifications_consumer/auditing.py +++ b/notifications_consumer/auditing.py @@ -31,6 +31,11 @@ class Auditor: except Exception: logging.exception("Failed ETCD client init") + def client_disconnect(self): + """Disconnect the client if necessary.""" + if self.client: + self.client.close() + def retry_init_if_necessary(self): """Retry initialization if client is not there.""" if not self.client: diff --git a/notifications_consumer/consumer.py b/notifications_consumer/consumer.py index 8958d45..57762a7 100644 --- a/notifications_consumer/consumer.py +++ b/notifications_consumer/consumer.py @@ -50,3 +50,7 @@ class NotificationsConsumer(ClientIndividualListener): ClientIndividualListener.nack_message(self, message_id, subscription) except BrokenPipeError: logging.warning("Broken Pipe caused by Reconnect Loop/ ActiveMQ Disconnect %s", message_id) + + def teardown(self): + """Disconnect and cleanup.""" + self.processor.disconnect() diff --git a/notifications_consumer/megabus/client_individual_consumer.py b/notifications_consumer/megabus/client_individual_consumer.py index 66c97ea..2b4b9ee 100644 --- a/notifications_consumer/megabus/client_individual_consumer.py +++ b/notifications_consumer/megabus/client_individual_consumer.py @@ -1,4 +1,7 @@ """Client Individual Consumer definition.""" +import logging +import time + import stomp from megabus.common import get_hosts from megabus.consumer import Consumer, _StompListener @@ -7,6 +10,9 @@ from megabus.exceptions import ConnectFailedException from notifications_consumer.megabus.client_individual_listener import ClientIndividualListener +logger = logging.getLogger() + + class _ClientIndividualStompListener(_StompListener): """ Custom stomp listener that supports subscribe ack='client/client-individual'. @@ -49,6 +55,8 @@ class ClientIndividualConsumer(Consumer): self._external_listener = listener_class self._listener_kwargs = listener_kwargs + self._listeners = [] # type : List[ClientIndividualListener] + self._hosts = [] # type : List[str] # Auto connect must be called until _listener_kwargs are set if auto_connect: @@ -63,6 +71,35 @@ class ClientIndividualConsumer(Consumer): """Create a new instance of the Listener class.""" return self._external_listener(**self._listener_kwargs) + def _reconnection_loop(self): + """Reconnection loop. + + Reconnects only if host list changes. + """ + while self._is_listening: + time.sleep(self._reconnection_interval) + hosts = get_hosts(self._server, self._port, self._use_multiple_brokers) + if set(hosts) == set(self._hosts): + continue + + logger.debug("Host list change - reconnecting in order to do rebalancing") + self._disconnect() + try: + self._simple_connect() + except ConnectFailedException as e: + self._external_listener.on_exception("CONNECTION", e) + + def _disconnect(self): + """Disconnect and cleanup. + + Clear the _connections list for memory saving. + Trigger teardown on listeners. + """ + Consumer._disconnect(self) + [listener.teardown() for listener in self._listeners] + self._connections.clear() + self._listeners.clear() + def _simple_connect(self): """ Create a new Listener per connection in order to allow ack/nack to that specific connection. @@ -72,9 +109,10 @@ class ClientIndividualConsumer(Consumer): We don't reuse the Listener instance for the new connections because get_hosts is dynamic and its length might change. """ - hosts = get_hosts(self._server, self._port, self._use_multiple_brokers) - for host in hosts: + self._hosts = get_hosts(self._server, self._port, self._use_multiple_brokers) + for host in self._hosts: external_listener_instance = self._create_listener_instance() + self._listeners.append(external_listener_instance) try: try: connection = stomp.Connection([(host, self._port)]) @@ -103,8 +141,3 @@ class ClientIndividualConsumer(Consumer): raise ConnectFailedException("stomp.exception.ConnectionClosedException") except ConnectFailedException as e: external_listener_instance.on_exception("CONNECTION", e) - - def disconnect(self): - """Clear the _connections list for memory saving.""" - Consumer.disconnect(self) - self._connections.clear() diff --git a/notifications_consumer/megabus/client_individual_listener.py b/notifications_consumer/megabus/client_individual_listener.py index 339f921..6fa18c9 100644 --- a/notifications_consumer/megabus/client_individual_listener.py +++ b/notifications_consumer/megabus/client_individual_listener.py @@ -10,7 +10,7 @@ class ClientIndividualListener(Listener): Read more: https://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE - This implementation has access to its own connection, through set_connection method, in order to allow wasy ack/nack + This implementation has access to its own connection, through set_connection method, in order to allow easy ack/nack through the on_message method. This class should be used with ClientIndividualConsumer. @@ -41,6 +41,10 @@ class ClientIndividualListener(Listener): """Set the connection.""" self.connection = connection + def teardown(self): + """Disconnect and cleanup.""" + pass + def on_error(self, headers, body): """On error handler.""" logging.warning("Failed processing message. Error %s", body) diff --git a/notifications_consumer/processors/processor.py b/notifications_consumer/processors/processor.py index 80bf3ef..336cf1c 100644 --- a/notifications_consumer/processors/processor.py +++ b/notifications_consumer/processors/processor.py @@ -33,3 +33,7 @@ class Processor(ABC): def read_message(self, message: Dict): """Read the message.""" pass + + def disconnect(self): + """Disconnect resources.""" + self.auditor.client_disconnect() -- GitLab