diff --git a/Dockerfile b/Dockerfile index c27cb003ca864f5655912102c1c5db43ae44c8e3..19c7bb760d31837a40b7e1f2a9428c94a00a82ce 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM gitlab-registry.cern.ch/push-notifications/notifications-routing/notifications-routing-base:77c9480e +FROM gitlab-registry.cern.ch/push-notifications/notifications-routing/notifications-routing-base:9ae84809 ARG build_env COPY ./ ./ diff --git a/Dockerfile-base b/Dockerfile-base index 3517ce1e3739da82f5494f673002b5f252b5cf77..ac1ce45ae3e667f647e2e8adc08faee482e878d3 100644 --- a/Dockerfile-base +++ b/Dockerfile-base @@ -1,4 +1,4 @@ -FROM cern/c8-base:latest +FROM cern/cs8-base:latest ENV LANG=en_US.utf8 LC_ALL=en_US.utf8 diff --git a/docker/activemq/activemq-publisher-routing_publisher.conf b/docker/activemq/activemq-publisher-routing_publisher.conf index 7fab7ab2fc9b9b290e6e4b6ef56fe645e2f0c1a9..7b726eaac66b8d0de1fd61006442f471efcf14de 100644 --- a/docker/activemq/activemq-publisher-routing_publisher.conf +++ b/docker/activemq/activemq-publisher-routing_publisher.conf @@ -10,3 +10,4 @@ top_hostgroup: user:admin pass:admin use_multiple_brokers:False +stomp_connection_timeout: 5 diff --git a/docker/activemq/email_daily_publisher.conf b/docker/activemq/email_daily_publisher.conf index 7fab7ab2fc9b9b290e6e4b6ef56fe645e2f0c1a9..7b726eaac66b8d0de1fd61006442f471efcf14de 100644 --- a/docker/activemq/email_daily_publisher.conf +++ b/docker/activemq/email_daily_publisher.conf @@ -10,3 +10,4 @@ top_hostgroup: user:admin pass:admin use_multiple_brokers:False +stomp_connection_timeout: 5 diff --git a/docker/activemq/email_monthly_publisher.conf b/docker/activemq/email_monthly_publisher.conf index 7fab7ab2fc9b9b290e6e4b6ef56fe645e2f0c1a9..7b726eaac66b8d0de1fd61006442f471efcf14de 100644 --- a/docker/activemq/email_monthly_publisher.conf +++ b/docker/activemq/email_monthly_publisher.conf @@ -10,3 +10,4 @@ top_hostgroup: user:admin pass:admin use_multiple_brokers:False +stomp_connection_timeout: 5 diff --git a/docker/activemq/email_weekly_publisher.conf b/docker/activemq/email_weekly_publisher.conf index 7fab7ab2fc9b9b290e6e4b6ef56fe645e2f0c1a9..7b726eaac66b8d0de1fd61006442f471efcf14de 100644 --- a/docker/activemq/email_weekly_publisher.conf +++ b/docker/activemq/email_weekly_publisher.conf @@ -10,3 +10,4 @@ top_hostgroup: user:admin pass:admin use_multiple_brokers:False +stomp_connection_timeout: 5 diff --git a/logging.yaml b/logging.yaml index ecffc39f63992e0f0b4afe39c4562a19156161b4..21986ee1d60a607fbe732019bb7fc3c1ba92ff54 100644 --- a/logging.yaml +++ b/logging.yaml @@ -21,7 +21,7 @@ loggers: handlers: [] propagate: True megabus: - level: WARNING + level: DEBUG handlers: [] propagate: True stomp: diff --git a/notifications_routing/app.py b/notifications_routing/app.py index 352be590b996224c20454eb478c8e0ffe4b3ebd1..cd6312bb7b1138ab4a128f1b581fbe7acac49327 100644 --- a/notifications_routing/app.py +++ b/notifications_routing/app.py @@ -4,11 +4,10 @@ import logging import logging.config import yaml -from megabus import Consumer +from megabus import Consumer, Publisher from notifications_routing.config import load_config from notifications_routing.data_source.postgres.postgres_data_source import PostgresDataSource -from notifications_routing.publisher import Publisher from notifications_routing.router import Router diff --git a/notifications_routing/jobs/registry.py b/notifications_routing/jobs/registry.py index fcb41e748b45df67b3d13ca532393008a28363f5..0733ed56ee5ebfe8720d95ae3ee386ed46d22888 100644 --- a/notifications_routing/jobs/registry.py +++ b/notifications_routing/jobs/registry.py @@ -1,9 +1,10 @@ """Processor automatic registry.""" import logging +from megabus import Publisher + from notifications_routing.config import Config, load_config from notifications_routing.data_source.postgres.postgres_data_source import PostgresDataSource -from notifications_routing.publisher import Publisher class JobRegistry: diff --git a/notifications_routing/preferences.py b/notifications_routing/preferences.py index 64dd9c41ae5520b90521f043fdc857c8f1af51e7..7ce9cf950a20c06f907e45248dcca50d7addf3a0 100644 --- a/notifications_routing/preferences.py +++ b/notifications_routing/preferences.py @@ -4,13 +4,13 @@ from datetime import datetime from typing import Dict, List, Set import megabus +from megabus import Publisher from notifications_routing.config import Config from notifications_routing.data_source.data_source import DataSource from notifications_routing.data_source.postgres.postgres_data_source import UserFeedNotification from notifications_routing.exceptions import DuplicatedError -from .publisher import Publisher from .utils import ( OutputMessageKeys, convert_notification_email_to_json_string, diff --git a/notifications_routing/publisher.py b/notifications_routing/publisher.py deleted file mode 100644 index bc2be68b4029d10e9a1f33333696161c2b9f405c..0000000000000000000000000000000000000000 --- a/notifications_routing/publisher.py +++ /dev/null @@ -1,63 +0,0 @@ -"""Publisher extended send method. - -Copied from python-megabus @ https://gitlab.cern.ch/ai-config-team/python-megabus -Adds support for headers in the method send. -""" -import time - -import megabus -import stomp - - -class Publisher(megabus.Publisher): - """Inherit megabus.Publisher.""" - - def send(self, message, extension=None, hostgroup=None, hostname=None, ttl=None, headers=None): # noqa: C901 - """Extend megabus.Publisher.send method to receive headers parameter.""" - if not headers: - headers = {} - - destination = "/%s/%s" % (self._destination_type, self._destination) - hosts = megabus.common.get_hosts(self._server, self._port, self._use_multiple_brokers) - - if extension: - destination += "." + extension - if hostgroup: - headers["hostgroup"] = hostgroup - if hostname: - headers["hostname"] = hostname - if not ttl: - ttl = self._ttl - if ttl: - headers["expires"] = (int(time.time()) + int(ttl)) * 1000 - - for host in hosts: - try: - connection = None - if self._auth_method == "password": - connection = stomp.Connection([(host, self._port)]) - connection.connect(wait=True, username=self._user, passcode=self._pass) - else: - connection = stomp.Connection( - [(host, self._port)], - use_ssl=True, - ssl_key_file=self._ssl_key_file, - ssl_cert_file=self._ssl_cert_file, - ssl_version=2, - ) - connection.connect(wait=True) - connection.send( - body=message, - destination=destination, - headers=headers, - ) - if connection.is_connected(): - connection.disconnect() - except AttributeError: - raise AttributeError("Unexpected NoneType object error in connection") - except stomp.exception.ConnectFailedException: - raise megabus.exceptions.ConnectFailedException("stomp.exception.ConnectFailedException") - except stomp.exception.NotConnectedException: - raise megabus.exceptions.ConnectFailedException("stomp.exception.NotConnectedException") - except stomp.exception.ConnectionClosedException: - raise megabus.exceptions.ConnectFailedException("stomp.exception.ConnectionClosedException") diff --git a/notifications_routing/router.py b/notifications_routing/router.py index c8a69f2fe62777edad6e12b698593aca55cfae33..3d40b2c7681bb23339f85a013ec7342850629169 100644 --- a/notifications_routing/router.py +++ b/notifications_routing/router.py @@ -28,25 +28,6 @@ class Router(megabus.Listener): """Initialize the Router consumer.""" self.publisher = publisher self.data_source = data_source - self.connections = [] - - def set_connections(self, connections): - """Set stomp connections obtained from megabus.""" - self.connections = connections - - def ack_message(self, message_id): - """Acknowledge a message.""" - logging.debug("Ack message message_id:%s", message_id) - for connection in self.connections: - if connection.is_connected(): - connection.ack(message_id, 1) - - def nack_message(self, message_id): - """Send a message back to the queue.""" - logging.debug("Nack message: message_id:%s", message_id) - for connection in self.connections: - if connection.is_connected(): - connection.nack(message_id, 1) def on_message(self, message, headers): """Process a message.""" diff --git a/scripts/docker-send.py b/scripts/docker-send.py index 365224eadd07668e08a4ea058264009655b094a3..3680eb8dd26be0f2d3fba4715ac093635a47dba1 100644 --- a/scripts/docker-send.py +++ b/scripts/docker-send.py @@ -9,7 +9,7 @@ parser = argparse.ArgumentParser(description="Send a test email notification.") parser.add_argument("-l", action="store_true", default=False, help="use to connect to localhost instead of activemq") parser.add_argument("-c", action="store_true", default=False, help="use to send an email with Category") parser.add_argument("-e", default="user", help="use to set a target email username") -parser.add_argument("-ci", default="EF46ACA0-E73B-4794-8043-FFE974247CC8", help="use to set a target channel id") +parser.add_argument("-ci", default="bb5f841a-5554-4038-98c7-4eacd19acb61", help="use to set a target channel id") parser.add_argument("-ni", default="D19EEA4-9DCA-48D9-A577-5DE9D2BF374A", help="use to set a target notification id") args = parser.parse_args() @@ -26,7 +26,7 @@ conn.connect("admin", "admin", wait=True) message = { "id": "D19EEA4-9DCA-48D9-A577-5DE9D2BF374A", "target": { - "id": "EF46ACA0-E73B-4794-8043-FFE974247CC8", + "id": "bb5f841a-5554-4038-98c7-4eacd19acb61", "slug": "test slug", "name": "Test name", "description": "", @@ -49,6 +49,9 @@ message = { "priority": "important", "tags": None, "contentType": None, + "private": False, + "targetUsers": False, + "targetGroups": False, } message["email"] = email + "@cern.ch"