diff --git a/notifications_consumer/auditing.py b/notifications_consumer/auditing.py index 9a7e38b873f6edcb7352db5b9d1b9a9991d652d4..1706c89a211d3940a6973eea65a881b8cf9139f0 100644 --- a/notifications_consumer/auditing.py +++ b/notifications_consumer/auditing.py @@ -31,6 +31,11 @@ class Auditor: except Exception: logging.exception("Failed ETCD client init") + def client_disconnect(self): + """Disconnect the client if necessary.""" + if self.client: + self.client.close() + def retry_init_if_necessary(self): """Retry initialization if client is not there.""" if not self.client: diff --git a/notifications_consumer/consumer.py b/notifications_consumer/consumer.py index 8958d451d1de55ec879258acf3b69ef751f9d683..57762a70c631dcd2bbde4f08c2d228d282fe5004 100644 --- a/notifications_consumer/consumer.py +++ b/notifications_consumer/consumer.py @@ -50,3 +50,7 @@ class NotificationsConsumer(ClientIndividualListener): ClientIndividualListener.nack_message(self, message_id, subscription) except BrokenPipeError: logging.warning("Broken Pipe caused by Reconnect Loop/ ActiveMQ Disconnect %s", message_id) + + def teardown(self): + """Disconnect and cleanup.""" + self.processor.disconnect() diff --git a/notifications_consumer/megabus/client_individual_consumer.py b/notifications_consumer/megabus/client_individual_consumer.py index 66c97ea41bad1fe036b75320c1646247c1213d63..2b4b9ee30ae30c81b1de92736ec521ebf1d114b8 100644 --- a/notifications_consumer/megabus/client_individual_consumer.py +++ b/notifications_consumer/megabus/client_individual_consumer.py @@ -1,4 +1,7 @@ """Client Individual Consumer definition.""" +import logging +import time + import stomp from megabus.common import get_hosts from megabus.consumer import Consumer, _StompListener @@ -7,6 +10,9 @@ from megabus.exceptions import ConnectFailedException from notifications_consumer.megabus.client_individual_listener import ClientIndividualListener +logger = logging.getLogger() + + class _ClientIndividualStompListener(_StompListener): """ Custom stomp listener that supports subscribe ack='client/client-individual'. @@ -49,6 +55,8 @@ class ClientIndividualConsumer(Consumer): self._external_listener = listener_class self._listener_kwargs = listener_kwargs + self._listeners = [] # type : List[ClientIndividualListener] + self._hosts = [] # type : List[str] # Auto connect must be called until _listener_kwargs are set if auto_connect: @@ -63,6 +71,35 @@ class ClientIndividualConsumer(Consumer): """Create a new instance of the Listener class.""" return self._external_listener(**self._listener_kwargs) + def _reconnection_loop(self): + """Reconnection loop. + + Reconnects only if host list changes. + """ + while self._is_listening: + time.sleep(self._reconnection_interval) + hosts = get_hosts(self._server, self._port, self._use_multiple_brokers) + if set(hosts) == set(self._hosts): + continue + + logger.debug("Host list change - reconnecting in order to do rebalancing") + self._disconnect() + try: + self._simple_connect() + except ConnectFailedException as e: + self._external_listener.on_exception("CONNECTION", e) + + def _disconnect(self): + """Disconnect and cleanup. + + Clear the _connections list for memory saving. + Trigger teardown on listeners. + """ + Consumer._disconnect(self) + [listener.teardown() for listener in self._listeners] + self._connections.clear() + self._listeners.clear() + def _simple_connect(self): """ Create a new Listener per connection in order to allow ack/nack to that specific connection. @@ -72,9 +109,10 @@ class ClientIndividualConsumer(Consumer): We don't reuse the Listener instance for the new connections because get_hosts is dynamic and its length might change. """ - hosts = get_hosts(self._server, self._port, self._use_multiple_brokers) - for host in hosts: + self._hosts = get_hosts(self._server, self._port, self._use_multiple_brokers) + for host in self._hosts: external_listener_instance = self._create_listener_instance() + self._listeners.append(external_listener_instance) try: try: connection = stomp.Connection([(host, self._port)]) @@ -103,8 +141,3 @@ class ClientIndividualConsumer(Consumer): raise ConnectFailedException("stomp.exception.ConnectionClosedException") except ConnectFailedException as e: external_listener_instance.on_exception("CONNECTION", e) - - def disconnect(self): - """Clear the _connections list for memory saving.""" - Consumer.disconnect(self) - self._connections.clear() diff --git a/notifications_consumer/megabus/client_individual_listener.py b/notifications_consumer/megabus/client_individual_listener.py index 339f9211b9f4b5bf237af1698d8c87c597810da9..6fa18c9a3dcd4fed463c84f8b3a55a740b4fb831 100644 --- a/notifications_consumer/megabus/client_individual_listener.py +++ b/notifications_consumer/megabus/client_individual_listener.py @@ -10,7 +10,7 @@ class ClientIndividualListener(Listener): Read more: https://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE - This implementation has access to its own connection, through set_connection method, in order to allow wasy ack/nack + This implementation has access to its own connection, through set_connection method, in order to allow easy ack/nack through the on_message method. This class should be used with ClientIndividualConsumer. @@ -41,6 +41,10 @@ class ClientIndividualListener(Listener): """Set the connection.""" self.connection = connection + def teardown(self): + """Disconnect and cleanup.""" + pass + def on_error(self, headers, body): """On error handler.""" logging.warning("Failed processing message. Error %s", body) diff --git a/notifications_consumer/processors/processor.py b/notifications_consumer/processors/processor.py index 80bf3ef1a9c76854adedf0258cf952226325f31c..336cf1c0d765d505542e71751c98916b8ca5bbfc 100644 --- a/notifications_consumer/processors/processor.py +++ b/notifications_consumer/processors/processor.py @@ -33,3 +33,7 @@ class Processor(ABC): def read_message(self, message: Dict): """Read the message.""" pass + + def disconnect(self): + """Disconnect resources.""" + self.auditor.client_disconnect()