From 3fdd4bbee08bbb63df3ae58ae312aaae5630c210 Mon Sep 17 00:00:00 2001 From: Carina Antunes <carina.oliveira.antunes@cern.ch> Date: Mon, 20 Dec 2021 15:44:34 +0100 Subject: [PATCH] Revert "[#61] Use cs8-base as base image and update code for python-megabus-1.5.1-1" This reverts commit e65882544545451dc4ad8fbcecb381f071f923d1 --- Dockerfile | 2 +- Dockerfile-base | 2 +- .../activemq-publisher-routing_publisher.conf | 1 - docker/activemq/email_daily_publisher.conf | 1 - docker/activemq/email_monthly_publisher.conf | 1 - docker/activemq/email_weekly_publisher.conf | 1 - logging.yaml | 2 +- notifications_routing/app.py | 3 +- notifications_routing/jobs/registry.py | 3 +- notifications_routing/preferences.py | 2 +- notifications_routing/publisher.py | 63 +++++++++++++++++++ notifications_routing/router.py | 19 ++++++ scripts/docker-send.py | 7 +-- 13 files changed, 91 insertions(+), 16 deletions(-) create mode 100644 notifications_routing/publisher.py diff --git a/Dockerfile b/Dockerfile index 19c7bb7..c27cb00 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM gitlab-registry.cern.ch/push-notifications/notifications-routing/notifications-routing-base:9ae84809 +FROM gitlab-registry.cern.ch/push-notifications/notifications-routing/notifications-routing-base:77c9480e ARG build_env COPY ./ ./ diff --git a/Dockerfile-base b/Dockerfile-base index ac1ce45..3517ce1 100644 --- a/Dockerfile-base +++ b/Dockerfile-base @@ -1,4 +1,4 @@ -FROM cern/cs8-base:latest +FROM cern/c8-base:latest ENV LANG=en_US.utf8 LC_ALL=en_US.utf8 diff --git a/docker/activemq/activemq-publisher-routing_publisher.conf b/docker/activemq/activemq-publisher-routing_publisher.conf index 7b726ea..7fab7ab 100644 --- a/docker/activemq/activemq-publisher-routing_publisher.conf +++ b/docker/activemq/activemq-publisher-routing_publisher.conf @@ -10,4 +10,3 @@ top_hostgroup: user:admin pass:admin use_multiple_brokers:False -stomp_connection_timeout: 5 diff --git a/docker/activemq/email_daily_publisher.conf b/docker/activemq/email_daily_publisher.conf index 7b726ea..7fab7ab 100644 --- a/docker/activemq/email_daily_publisher.conf +++ b/docker/activemq/email_daily_publisher.conf @@ -10,4 +10,3 @@ top_hostgroup: user:admin pass:admin use_multiple_brokers:False -stomp_connection_timeout: 5 diff --git a/docker/activemq/email_monthly_publisher.conf b/docker/activemq/email_monthly_publisher.conf index 7b726ea..7fab7ab 100644 --- a/docker/activemq/email_monthly_publisher.conf +++ b/docker/activemq/email_monthly_publisher.conf @@ -10,4 +10,3 @@ top_hostgroup: user:admin pass:admin use_multiple_brokers:False -stomp_connection_timeout: 5 diff --git a/docker/activemq/email_weekly_publisher.conf b/docker/activemq/email_weekly_publisher.conf index 7b726ea..7fab7ab 100644 --- a/docker/activemq/email_weekly_publisher.conf +++ b/docker/activemq/email_weekly_publisher.conf @@ -10,4 +10,3 @@ top_hostgroup: user:admin pass:admin use_multiple_brokers:False -stomp_connection_timeout: 5 diff --git a/logging.yaml b/logging.yaml index 21986ee..ecffc39 100644 --- a/logging.yaml +++ b/logging.yaml @@ -21,7 +21,7 @@ loggers: handlers: [] propagate: True megabus: - level: DEBUG + level: WARNING handlers: [] propagate: True stomp: diff --git a/notifications_routing/app.py b/notifications_routing/app.py index cd6312b..352be59 100644 --- a/notifications_routing/app.py +++ b/notifications_routing/app.py @@ -4,10 +4,11 @@ import logging import logging.config import yaml -from megabus import Consumer, Publisher +from megabus import Consumer from notifications_routing.config import load_config from notifications_routing.data_source.postgres.postgres_data_source import PostgresDataSource +from notifications_routing.publisher import Publisher from notifications_routing.router import Router diff --git a/notifications_routing/jobs/registry.py b/notifications_routing/jobs/registry.py index 0733ed5..fcb41e7 100644 --- a/notifications_routing/jobs/registry.py +++ b/notifications_routing/jobs/registry.py @@ -1,10 +1,9 @@ """Processor automatic registry.""" import logging -from megabus import Publisher - from notifications_routing.config import Config, load_config from notifications_routing.data_source.postgres.postgres_data_source import PostgresDataSource +from notifications_routing.publisher import Publisher class JobRegistry: diff --git a/notifications_routing/preferences.py b/notifications_routing/preferences.py index 7ce9cf9..64dd9c4 100644 --- a/notifications_routing/preferences.py +++ b/notifications_routing/preferences.py @@ -4,13 +4,13 @@ from datetime import datetime from typing import Dict, List, Set import megabus -from megabus import Publisher from notifications_routing.config import Config from notifications_routing.data_source.data_source import DataSource from notifications_routing.data_source.postgres.postgres_data_source import UserFeedNotification from notifications_routing.exceptions import DuplicatedError +from .publisher import Publisher from .utils import ( OutputMessageKeys, convert_notification_email_to_json_string, diff --git a/notifications_routing/publisher.py b/notifications_routing/publisher.py new file mode 100644 index 0000000..bc2be68 --- /dev/null +++ b/notifications_routing/publisher.py @@ -0,0 +1,63 @@ +"""Publisher extended send method. + +Copied from python-megabus @ https://gitlab.cern.ch/ai-config-team/python-megabus +Adds support for headers in the method send. +""" +import time + +import megabus +import stomp + + +class Publisher(megabus.Publisher): + """Inherit megabus.Publisher.""" + + def send(self, message, extension=None, hostgroup=None, hostname=None, ttl=None, headers=None): # noqa: C901 + """Extend megabus.Publisher.send method to receive headers parameter.""" + if not headers: + headers = {} + + destination = "/%s/%s" % (self._destination_type, self._destination) + hosts = megabus.common.get_hosts(self._server, self._port, self._use_multiple_brokers) + + if extension: + destination += "." + extension + if hostgroup: + headers["hostgroup"] = hostgroup + if hostname: + headers["hostname"] = hostname + if not ttl: + ttl = self._ttl + if ttl: + headers["expires"] = (int(time.time()) + int(ttl)) * 1000 + + for host in hosts: + try: + connection = None + if self._auth_method == "password": + connection = stomp.Connection([(host, self._port)]) + connection.connect(wait=True, username=self._user, passcode=self._pass) + else: + connection = stomp.Connection( + [(host, self._port)], + use_ssl=True, + ssl_key_file=self._ssl_key_file, + ssl_cert_file=self._ssl_cert_file, + ssl_version=2, + ) + connection.connect(wait=True) + connection.send( + body=message, + destination=destination, + headers=headers, + ) + if connection.is_connected(): + connection.disconnect() + except AttributeError: + raise AttributeError("Unexpected NoneType object error in connection") + except stomp.exception.ConnectFailedException: + raise megabus.exceptions.ConnectFailedException("stomp.exception.ConnectFailedException") + except stomp.exception.NotConnectedException: + raise megabus.exceptions.ConnectFailedException("stomp.exception.NotConnectedException") + except stomp.exception.ConnectionClosedException: + raise megabus.exceptions.ConnectFailedException("stomp.exception.ConnectionClosedException") diff --git a/notifications_routing/router.py b/notifications_routing/router.py index 3d40b2c..c8a69f2 100644 --- a/notifications_routing/router.py +++ b/notifications_routing/router.py @@ -28,6 +28,25 @@ class Router(megabus.Listener): """Initialize the Router consumer.""" self.publisher = publisher self.data_source = data_source + self.connections = [] + + def set_connections(self, connections): + """Set stomp connections obtained from megabus.""" + self.connections = connections + + def ack_message(self, message_id): + """Acknowledge a message.""" + logging.debug("Ack message message_id:%s", message_id) + for connection in self.connections: + if connection.is_connected(): + connection.ack(message_id, 1) + + def nack_message(self, message_id): + """Send a message back to the queue.""" + logging.debug("Nack message: message_id:%s", message_id) + for connection in self.connections: + if connection.is_connected(): + connection.nack(message_id, 1) def on_message(self, message, headers): """Process a message.""" diff --git a/scripts/docker-send.py b/scripts/docker-send.py index 3680eb8..365224e 100644 --- a/scripts/docker-send.py +++ b/scripts/docker-send.py @@ -9,7 +9,7 @@ parser = argparse.ArgumentParser(description="Send a test email notification.") parser.add_argument("-l", action="store_true", default=False, help="use to connect to localhost instead of activemq") parser.add_argument("-c", action="store_true", default=False, help="use to send an email with Category") parser.add_argument("-e", default="user", help="use to set a target email username") -parser.add_argument("-ci", default="bb5f841a-5554-4038-98c7-4eacd19acb61", help="use to set a target channel id") +parser.add_argument("-ci", default="EF46ACA0-E73B-4794-8043-FFE974247CC8", help="use to set a target channel id") parser.add_argument("-ni", default="D19EEA4-9DCA-48D9-A577-5DE9D2BF374A", help="use to set a target notification id") args = parser.parse_args() @@ -26,7 +26,7 @@ conn.connect("admin", "admin", wait=True) message = { "id": "D19EEA4-9DCA-48D9-A577-5DE9D2BF374A", "target": { - "id": "bb5f841a-5554-4038-98c7-4eacd19acb61", + "id": "EF46ACA0-E73B-4794-8043-FFE974247CC8", "slug": "test slug", "name": "Test name", "description": "", @@ -49,9 +49,6 @@ message = { "priority": "important", "tags": None, "contentType": None, - "private": False, - "targetUsers": False, - "targetGroups": False, } message["email"] = email + "@cern.ch" -- GitLab