From 43f7210b00b2cff3b9ee6a2747f044a1bbad8451 Mon Sep 17 00:00:00 2001
From: Carina Antunes <carina.oliveira.antunes@cern.ch>
Date: Fri, 5 Aug 2022 17:12:03 +0200
Subject: [PATCH] High load improvements

* Add ActiveMQ Docker logging config for development
* Add ClientIndividualConsumer and ClientIndividualListener. These classes allow:
  * create a listener per connection, to avoid one listener instance shared among all threads
  * each listener to call ack/nack in their connection
  * cleanup connections list (was growing forever, every 5 minutes appending new connections)
* Catch BrokenPipeError, until stomp.py has a proper fix to the race condition. Provided a [MR](https://github.com/jasonrbriggs/stomp.py/issues/393).
* Upgrade from centos stream 8 to centos stream 9
* Bump python 3.6 (EOL was 23 Dec 2021) to 3.9 (EOL 05 Oct 2025)
  * Replace `pycrypto` (unmaintained, obsolete, and contains security vulnerabilities.) with `pycryptodome`
* Log thread_id in all logs
* Make Auditing a class instance, property of the processor instance
* Share SQLAlchemy engine and connection pooler (set as a class property, protected by a lock) to avoid multiple calls to automap_base which would throw a SA warning
* Retry etcd client on external disconnect/ failure to create client
* Fix typo: Uppercase priority coming in email gateway
---
 .env                                          |   6 +-
 .env.safaripush                               |   6 +-
 .isort.cfg                                    |   2 +-
 .pre-commit-config.yaml                       |   2 +-
 Dockerfile                                    |   3 +-
 Dockerfile-base                               |  12 +-
 Makefile                                      |  12 +-
 README.md                                     |  14 ++
 docker-compose.yml                            |   4 +-
 docker/activemq/conf/activemq.xml             |   5 +-
 docker/activemq/conf/log4j.properties         |  88 ++++++++
 logging.yaml                                  |   8 +-
 notifications_consumer/app.py                 |  36 ++--
 notifications_consumer/auditing.py            | 195 +++++++++++-------
 notifications_consumer/config.py              |   2 +-
 notifications_consumer/consumer.py            |  41 +++-
 .../postgres/postgres_data_source.py          |  26 ++-
 .../megabus/client_individual_consumer.py     | 110 ++++++++++
 .../megabus/client_individual_listener.py     |  46 +++++
 .../processors/email/aescipher.py             |   2 +-
 .../processors/email/processor.py             |  14 +-
 .../processors/email_feed/processor.py        |  20 +-
 .../processors/email_gateway/processor.py     |   2 +-
 .../processors/mattermost/processor.py        |  17 +-
 .../processors/processor.py                   |   3 +
 notifications_consumer/processors/registry.py |   8 +-
 .../processors/safaripush/processor.py        |  13 +-
 .../processors/webpush/processor.py           |  15 +-
 poetry.lock                                   |  76 ++++++-
 pyproject.toml                                |   5 +-
 scripts/activemq_messages/safaripush.json     |  18 +-
 scripts/send-email.py                         |  15 +-
 tests/integration/conftest.py                 |   2 -
 tests/integration/test_etcd.py                |   9 +-
 34 files changed, 638 insertions(+), 199 deletions(-)
 create mode 100644 docker/activemq/conf/log4j.properties
 create mode 100644 notifications_consumer/megabus/client_individual_consumer.py
 create mode 100644 notifications_consumer/megabus/client_individual_listener.py

diff --git a/.env b/.env
index 3c54915..36a5678 100644
--- a/.env
+++ b/.env
@@ -39,6 +39,8 @@ CERN_OIDC_CLIENT_SECRET=fill-me
 # Auditing
 ETCD_HOST=etcd
 ETCD_PORT=2379
-AUDITING=true
+AUDITING=True
 # ETCD_USER=audituser
-# ETCD_PASSWORD=fill-me
+# ETCD_PASSWORD=xxx
+# Default 5 minutes
+# ETCD_AUTH_TOKEN_TTL=60
diff --git a/.env.safaripush b/.env.safaripush
index 9924625..e57916e 100644
--- a/.env.safaripush
+++ b/.env.safaripush
@@ -5,4 +5,8 @@ PUBLISHER_NAME=safaripush_publisher
 
 # SAFARIPUSH Config
 APPLE_SAFARI_PUSH_WEBSITEPUSHID=web.ch.cern.notifications
-APPLE_SAFARI_PUSH_CERT_KEY_FILE_PATH=/etc/cern.notifications.privatekey.pem
+APPLE_SAFARI_PUSH_CERT_KEY_FILE_PATH=safaripush.pem
+
+AUDITING=True
+ETCD_HOST=etcd
+ETCD_PORT=2379
\ No newline at end of file
diff --git a/.isort.cfg b/.isort.cfg
index 310e558..e6663ff 100644
--- a/.isort.cfg
+++ b/.isort.cfg
@@ -4,4 +4,4 @@ multi_line_output=3
 include_trailing_comma=True
 lines_after_imports=2
 not_skip=__init__.py
-known_third_party = Crypto,apns2,cachetools,etcd3,jinja2,mattermostdriver,megabus,pytest,pywebpush,requests,smail,sqlalchemy,stomp,yaml
+known_third_party = Crypto,apns2,bs4,cachetools,etcd3,jinja2,lxml,markdownify,mattermostdriver,megabus,pytest,pywebpush,requests,smail,sqlalchemy,stomp,yaml
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 92b112a..b319a2f 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -16,7 +16,7 @@ repos:
     hooks:
       - id: isort
   - repo: https://github.com/psf/black
-    rev: 20.8b1
+    rev: 22.3.0
     hooks:
       - id: black
   - repo: https://gitlab.com/pycqa/flake8
diff --git a/Dockerfile b/Dockerfile
index 7787604..9e5a8c2 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,4 +1,5 @@
-FROM gitlab-registry.cern.ch/push-notifications/notifications-consumer/notifications-consumer-base:c484c6ec
+FROM gitlab-registry.cern.ch/push-notifications/notifications-consumer/notifications-consumer-base:1a33f01a
+# FROM notifications-consumer-base:latest
 ARG build_env
 
 WORKDIR /opt/
diff --git a/Dockerfile-base b/Dockerfile-base
index 3c26cee..bfebd3a 100644
--- a/Dockerfile-base
+++ b/Dockerfile-base
@@ -1,21 +1,21 @@
-FROM cern/cs8-base:latest
+FROM cern/cs9-base:latest
 
 RUN dnf install -y gcc \
-        python36 \
-        python36-devel \
+        python39 \
+        python3.9-devel \
         dnf-utils \
         epel-release \
         which \
         git \
         make && \
-    dnf config-manager --add-repo "http://linuxsoft.cern.ch/internal/repos/config8-stable/x86_64/os/" && \
+    dnf config-manager --add-repo "http://linuxsoft.cern.ch/internal/repos/config9-stable/x86_64/os/" && \
     dnf install -y python-megabus && \
     dnf update -y && \
     dnf clean all
 
 # Symlink python
-RUN ln -nsf /usr/bin/python3.6 /usr/bin/python && ln -nsf /usr/bin/pip3.6 /usr/bin/pip && \
-    ln -nsf /usr/bin/python3.6 /usr/bin/python3 && ln -nsf /usr/bin/pip3.6 /usr/bin/pip3
+RUN ln -nsf /usr/bin/python3.9 /usr/bin/python && ln -nsf /usr/bin/pip3.9 /usr/bin/pip && \
+    ln -nsf /usr/bin/python3.9 /usr/bin/python3 && ln -nsf /usr/bin/pip3.9 /usr/bin/pip3
 
 ENV LANG en_US.utf8
 ENV LC_ALL en_US.utf8
diff --git a/Makefile b/Makefile
index 72d29c9..d8925b8 100644
--- a/Makefile
+++ b/Makefile
@@ -47,9 +47,13 @@ ci-test: docker-build-test pytest
 test: stop-test ci-test
 .PHONY: test
 
-docker-build-env:
+env:
 	docker-compose up --remove-orphans
-.PHONY: docker-build-env
+.PHONY: env
+
+logs:
+	docker-compose logs -f
+.PHONY: logs
 
 docker-build-full:
 	docker-compose -f docker-compose.full.yml up --remove-orphans
@@ -69,10 +73,10 @@ docker-shell-env:
 	docker-compose exec notifications-consumer /bin/bash
 .PHONY: docker-shell-env
 
-docker-stop:
+stop:
 	docker-compose down --volumes
 	docker-compose rm -f
-.PHONY: docker-stop
+.PHONY: stop
 
 stop-test:
 	docker-compose -f docker-compose.test.yml down --volumes
diff --git a/README.md b/README.md
index 6e7203b..eb3dcee 100644
--- a/README.md
+++ b/README.md
@@ -178,3 +178,17 @@ Requirements:
 
 ### VAPID Keys for webpush
 See README: https://gitlab.cern.ch/push-notifications/backend
+
+### Developing with ETCD auth
+
+1. Double check current instructions match to instructions in https://notifications-internal.docs.cern.ch/operations/etcd-setup/
+2. Inside the etcd pod run: (for password use `xxx`)
+```
+etcdctl user add root
+etcdctl auth enable
+etcdctl --user=root:xxx role add auditrole
+etcdctl --user=root:xxx user add audituser
+etcdctl --user=root:xxx user grant-role audituser auditrole
+etcdctl --user=root:xxx role grant-permission auditrole readwrite --prefix=true /
+```
+3. Uncomment etcd credentials in `.env` and restart the consumer
diff --git a/docker-compose.yml b/docker-compose.yml
index d82d229..5ed0176 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -71,6 +71,7 @@ services:
     build: ./docker/activemq/5.16.0-alpine
     volumes:
       - './docker/activemq/conf/activemq.xml:/opt/apache-activemq-5.16.0/conf/activemq.xml'
+      - './docker/activemq/conf/log4j.properties:/opt/apache-activemq-5.16.0/conf/log4j.properties'
     ports:
       - 61613:61613
       - 8161:8161
@@ -96,7 +97,8 @@ services:
     ports:
       - 2379:2379
       - 2380:2380
-
+    env_file:
+      - .env
 volumes:
   pgsql-data:
       name: pgsql-data
diff --git a/docker/activemq/conf/activemq.xml b/docker/activemq/conf/activemq.xml
index 5893573..400b910 100644
--- a/docker/activemq/conf/activemq.xml
+++ b/docker/activemq/conf/activemq.xml
@@ -22,6 +22,9 @@
     <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
         <!-- IMPORTANT added plugins section -->
         <plugins>
+            <!-- lets enable detailed logging in the broker but ignore ConnectionEvents -->
+            <!-- <loggingBrokerPlugin logAll="true"/> -->
+
             <redeliveryPlugin fallbackToDeadLetter="true" sendToDlqIfMaxRetriesExceeded="true">
                 <redeliveryPolicyMap>
                     <redeliveryPolicyMap>
@@ -271,7 +274,7 @@
             <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
             <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
             <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
-            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
+            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;trace=true"/>
             <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
             <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
         </transportConnectors>
diff --git a/docker/activemq/conf/log4j.properties b/docker/activemq/conf/log4j.properties
new file mode 100644
index 0000000..2cee15e
--- /dev/null
+++ b/docker/activemq/conf/log4j.properties
@@ -0,0 +1,88 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# This file controls most of the logging in ActiveMQ which is mainly based around
+# the commons logging API.
+#
+#log4j.rootLogger=DEBUG, console, logfile
+log4j.logger.org.apache.activemq.spring=WARN
+log4j.logger.org.apache.activemq.web.handler=WARN
+log4j.logger.org.springframework=WARN
+log4j.logger.org.apache.xbean=WARN
+log4j.logger.org.apache.camel=INFO
+log4j.logger.org.eclipse.jetty=WARN
+
+# When debugging or reporting problems to the ActiveMQ team,
+# comment out the above lines and uncomment the next.
+
+log4j.rootLogger=WARN, console
+log4j.logger.org.apache.activemq.transport.stomp=WARN
+log4j.logger.org.apache.activemq=WARN
+#log4j.logger.org.apache.camel=DEBUG
+
+# Console appender
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%5p | %m%n
+log4j.appender.console.threshold=WARN
+
+# File appender
+log4j.appender.logfile=org.apache.log4j.RollingFileAppender
+log4j.appender.logfile.file=${activemq.data}/activemq.log
+log4j.appender.logfile.maxFileSize=1024KB
+log4j.appender.logfile.maxBackupIndex=5
+log4j.appender.logfile.append=true
+log4j.appender.logfile.layout=org.apache.log4j.EnhancedPatternLayout
+log4j.appender.logfile.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n%throwable{full}
+
+# you can control the rendering of exception in the ConversionPattern
+# by default, we display the full stack trace
+# if you want to display short form of the exception, you can use
+#
+# log4j.appender.logfile.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n%throwable{short}
+#
+# a classic issue with filebeat/logstash is about multiline exception. The following pattern
+# allows to work smoothly with filebeat/logstash
+#
+# log4j.appender.logfile.layour.ConversionPattern=%d | %-5p | %m | %c | %t%n%replace(%throwable){\n}{ }
+#
+
+# use some of the following patterns to see MDC logging data
+#
+# %X{activemq.broker}
+# %X{activemq.connector}
+# %X{activemq.destination}
+#
+# e.g.
+#
+# log4j.appender.logfile.layout.ConversionPattern=%d | %-20.20X{activemq.connector} | %-5p | %m | %c | %t%n
+
+###########
+# Audit log
+###########
+
+log4j.additivity.org.apache.activemq.audit=false
+log4j.logger.org.apache.activemq.audit=INFO, audit
+
+log4j.appender.audit=org.apache.log4j.RollingFileAppender
+log4j.appender.audit.file=${activemq.data}/audit.log
+log4j.appender.audit.maxFileSize=1024KB
+log4j.appender.audit.maxBackupIndex=5
+log4j.appender.audit.append=true
+log4j.appender.audit.layout=org.apache.log4j.PatternLayout
+log4j.appender.audit.layout.ConversionPattern=%-5p | %m | %t%nn
diff --git a/logging.yaml b/logging.yaml
index df1f0ad..79d7459 100644
--- a/logging.yaml
+++ b/logging.yaml
@@ -3,7 +3,7 @@ disable_existing_loggers: False
 
 formatters:
   simple:
-    format: '%(asctime)s %(levelname)s %(name)s %(message)s'
+    format: 'Thread:%(thread_id)d %(asctime)s %(levelname)s %(name)s %(message)s'
 
 handlers:
   console:
@@ -17,7 +17,7 @@ handlers:
 
 loggers:
   megabus:
-    level: DEBUG
+    level: WARNING
     handlers: []
     propagate: True
   stomp:
@@ -34,9 +34,9 @@ loggers:
     propagate: True
   sqlalchemy:
     level: WARNING
-    handlers: [ ]
+    handlers: []
     propagate: True
 
 root:
-  level: DEBUG
+  level: INFO
   handlers: [ sentry, console ]
diff --git a/notifications_consumer/app.py b/notifications_consumer/app.py
index e01d99b..0dd8a69 100644
--- a/notifications_consumer/app.py
+++ b/notifications_consumer/app.py
@@ -2,37 +2,31 @@
 import asyncio
 import logging
 import logging.config
+import threading
 
 import yaml
-from megabus import Consumer
 
-from notifications_consumer.config import load_config
+from notifications_consumer.config import Config, load_config
 from notifications_consumer.consumer import NotificationsConsumer
-from notifications_consumer.processors.registry import ProcessorRegistry
+from notifications_consumer.megabus.client_individual_consumer import ClientIndividualConsumer
 
 
 class App:
     """Notifications consumer app."""
 
-    def __init__(self, config):
+    def __init__(self, config: Config):
         """Initialize the App."""
         self.config = config
 
     async def connect(self):
         """Init the consumer and connect to activeMQ."""
-        processor = ProcessorRegistry.registry[self.config.PROCESSOR]
-        listener = NotificationsConsumer(config=self.config, processor=processor)
-
-        cons = Consumer(
-            name=processor.consumer_name,
-            auto_connect=False,
-            listener=listener,
+        ClientIndividualConsumer(
+            name=self.config.CONSUMER_NAME,
+            listener_class=NotificationsConsumer,
+            listener_kwargs={"config": self.config},
         )
-        listener.set_connections(cons._connections)
-        cons.connect()
 
-        logging.info("Finished initialising - Waiting for messages...")
-        logging.info(self.config.PROCESSOR)
+        logging.info("Finished initialising %s processor - Waiting for messages...", self.config.PROCESSOR)
 
     def run(self):
         """Run the app in a loop."""
@@ -47,15 +41,27 @@ def configure_logging(config):
         config = yaml.safe_load(file.read())
         logging.config.dictConfig(config)
 
+    old_factory = logging.getLogRecordFactory()
+
+    def record_factory(*args, **kwargs):
+        record = old_factory(*args, **kwargs)
+        record.thread_id = threading.get_native_id()
+        return record
+
+    logging.setLogRecordFactory(record_factory)
+
 
 def create_app():
     """Create a new App."""
     config = load_config()
     if config.SENTRY_DSN:
         import sentry_sdk
+        from sentry_sdk import set_tag
 
         sentry_sdk.init(dsn=config.SENTRY_DSN)
 
+        set_tag("consumer", config.PROCESSOR)
+
     configure_logging(config)
 
     return App(config)
diff --git a/notifications_consumer/auditing.py b/notifications_consumer/auditing.py
index 5934114..9a7e38b 100644
--- a/notifications_consumer/auditing.py
+++ b/notifications_consumer/auditing.py
@@ -1,72 +1,123 @@
-"""Auditing definition."""
-import json
-import logging
-import uuid
-from datetime import datetime
-
-from etcd3 import Client
-from etcd3.errors import ErrInvalidAuthToken
-
-from notifications_consumer.config import Config
-
-
-client = None
-if Config.AUDITING:
-    client = Client(host=Config.ETCD_HOST, port=Config.ETCD_PORT)
-    if Config.ETCD_USER:
-        client.auth(username=Config.ETCD_USER, password=Config.ETCD_PASSWORD)
-
-
-def audit_notification(notification_id, value, key=None, user_id=None):
-    """Put audit notification information into audit DB."""
-    if Config.AUDITING is False:
-        logging.info("Audit disabled")
-        return
-
-    def put():
-        client.put(
-            (
-                f"/notifications/{notification_id}/{Config.AUDIT_ID}/{Config.PROCESSOR}"
-                f"/{'targets/' + user_id + '/' if user_id else ''}{key}"
-            ),
-            json.dumps({"date": datetime.now().strftime("%d/%m/%Y %H:%M:%S"), **value}),
-        )
-
-    if not key:
-        key = uuid.uuid4()
-    try:
-        put()
-    except ErrInvalidAuthToken:
-        logging.debug("refresh etcd token")
-        client.auth(username=Config.ETCD_USER, password=Config.ETCD_PASSWORD)
-        put()
-    except Exception:
-        logging.exception("Error auditing to etcd3:")
-
-
-def get_audit_notification(notification_id, key, user_id=None):
-    """Get audit notification information from audit DB."""
-    if Config.AUDITING is False:
-        logging.info("Audit disabled")
-        return None
-
-    def get():
-        kvs = client.range(
-            (
-                f"/notifications/{notification_id}/{Config.AUDIT_ID}/{Config.PROCESSOR}"
-                f"/{'targets/' + user_id + '/' if user_id else ''}{key}"
-            )
-        ).kvs
-        if kvs:
-            return json.loads(kvs[0].value)
-        return None
-
-    try:
-        return get()
-    except ErrInvalidAuthToken:
-        logging.debug("refresh etcd token")
-        client.auth(username=Config.ETCD_USER, password=Config.ETCD_PASSWORD)
-        return get()
-    except Exception:
-        logging.exception("Error getting from etcd")
-        return None
+"""Auditing definition."""
+import json
+import logging
+import uuid
+from datetime import datetime
+from typing import Optional
+
+from etcd3 import Client
+from etcd3.errors import ErrInvalidAuthToken
+
+from notifications_consumer.config import Config
+
+
+class Auditor:
+    """Global audit client, thread safe."""
+
+    def __init__(self):
+        """Init global audit client."""
+        self.client = None  # type: Optional[Client]
+
+        if Config.AUDITING:
+            self.client_init()
+
+    def client_init(self):
+        """Initialize the client and authenticate if necessary."""
+        try:
+            self.client = Client(host=Config.ETCD_HOST, port=Config.ETCD_PORT)
+
+            if Config.ETCD_USER:
+                self.auth()
+        except Exception:
+            logging.exception("Failed ETCD client init")
+
+    def retry_init_if_necessary(self):
+        """Retry initialization if client is not there."""
+        if not self.client:
+            self.client_init()
+
+    def auth(self):
+        """Authenticate."""
+        try:
+            self.client.auth(username=Config.ETCD_USER, password=Config.ETCD_PASSWORD)
+        except Exception:
+            logging.exception("Failed ETCD authentication")
+
+    def _put(self, key, value):
+        self.client.put(key, json.dumps({"date": datetime.now().strftime("%d/%m/%Y %H:%M:%S"), **value}))
+
+    def put(self, key, value):
+        """Put key/value."""
+        self.retry_init_if_necessary()
+        if not self.client:
+            logging.error("Error auditing to etcd3")
+            return
+
+        try:
+            self._put(key, value)
+        except ErrInvalidAuthToken:
+            logging.debug("refresh etcd token")
+            self.auth()
+            self._put(key, value)
+        except Exception:
+            logging.exception("Error auditing to etcd3:")
+
+    def _get(self, key):
+        logging.debug("getting key %s", key)
+        return self.client.range(key).kvs
+
+    def get(self, key):
+        """Get key."""
+        self.retry_init_if_necessary()
+        if not self.client:
+            logging.error("Error getting to etcd3")
+            return
+
+        kvs = None
+        try:
+            kvs = self._get(key)
+        except ErrInvalidAuthToken:
+            logging.debug("refresh etcd token")
+            self.auth()
+            kvs = self._get(key)
+        except Exception:
+            logging.exception("Error getting from etcd")
+            return None
+
+        if kvs:
+            return json.loads(kvs[0].value)
+        return None
+
+
+class NotificationAuditor(Auditor):
+    """Notifications Auditor Class."""
+
+    def audit_notification(self, notification_id, value, key=None, user_id=None):
+        """Put audit notification information into audit DB."""
+        if Config.AUDITING is False:
+            logging.debug("Audit disabled")
+            return
+
+        if not key:
+            key = uuid.uuid4()
+
+        self.put(
+            (
+                f"/notifications/{notification_id}/{Config.AUDIT_ID}/{Config.PROCESSOR}"
+                f"/{'targets/' + user_id + '/' if user_id else ''}{key}"
+            ),
+            value,
+        )
+
+    def get_audit_notification(self, notification_id, key, user_id=None):
+        """Get audit notification information from audit DB."""
+        if Config.AUDITING is False:
+            logging.debug("Audit disabled")
+            return None
+
+        return self.get(
+            (
+                f"/notifications/{notification_id}/{Config.AUDIT_ID}/{Config.PROCESSOR}"
+                f"/{'targets/' + user_id + '/' if user_id else ''}{key}"
+            )
+        )
diff --git a/notifications_consumer/config.py b/notifications_consumer/config.py
index 3edfa6c..9306fec 100644
--- a/notifications_consumer/config.py
+++ b/notifications_consumer/config.py
@@ -138,7 +138,7 @@ class Config:
     # Etcd auditing
     ETCD_HOST = os.getenv("ETCD_HOST", "localhost")
     ETCD_PORT = os.getenv("ETCD_PORT", 2379)
-    AUDITING = os.getenv("AUDITING", False)
+    AUDITING = ast.literal_eval(os.getenv("AUDITING", "False"))
     ETCD_USER = os.getenv("ETCD_USER", None)
     ETCD_PASSWORD = os.getenv("ETCD_PASSWORD", None)
     AUDIT_ID = os.getenv("AUDIT_SERVICE_NAME", "consumer")
diff --git a/notifications_consumer/consumer.py b/notifications_consumer/consumer.py
index 5440576..8958d45 100644
--- a/notifications_consumer/consumer.py
+++ b/notifications_consumer/consumer.py
@@ -2,28 +2,51 @@
 import json
 import logging
 
-import megabus
+from notifications_consumer.config import Config
+from notifications_consumer.megabus.client_individual_listener import ClientIndividualListener
+from notifications_consumer.processors.registry import ProcessorRegistry
 
-from notifications_consumer.processors.processor import Processor
 
-
-class NotificationsConsumer(megabus.Listener):
+class NotificationsConsumer(ClientIndividualListener):
     """Base consumer class."""
 
-    def __init__(self, processor: Processor, **kwargs):
+    def __init__(self, config: Config, **kwargs):
         """Initialize the Notifications consumer."""
-        self.processor = processor
+        ClientIndividualListener.__init__(self)
+        self.config = config
+        self.processor = ProcessorRegistry.processor(self.config.PROCESSOR, config)
         self.kwargs = kwargs
 
     def on_message(self, message, headers):
         """Process a message."""
         logging.info("Received message - %s", headers["message-id"])
-        logging.debug("Message Data: message:%s, headers:%s", message, headers)
+        logging.debug("Message Data %s: message:%s, headers:%s", headers["message-id"], message, headers)
 
         try:
             self.processor.process(**self.processor.read_message(json.loads(message)), headers=headers, **self.kwargs)
-            self.ack_message(headers["message-id"])
+            self.ack_message(headers["message-id"], headers["subscription"])
         except Exception:
             logging.exception("An exception happened while processing the message")
+            self.nack_message(headers["message-id"], headers["subscription"])
+
+    def ack_message(self, message_id, subscription):
+        """Ack a message.
+
+        Handles BrokenPipeError caused by a race condition on stomp.py.
+        Read more: https://github.com/jasonrbriggs/stomp.py/issues/393
+        """
+        try:
+            ClientIndividualListener.ack_message(self, message_id, subscription)
+        except BrokenPipeError:
+            logging.warning("Broken Pipe caused by Reconnect Loop/ ActiveMQ Disconnect %s", message_id)
+
+    def nack_message(self, message_id, subscription):
+        """Nack a message.
 
-            self.nack_message(headers["message-id"])
+        Handles BrokenPipeError caused by a race condition on stomp.py.
+        Read more: https://github.com/jasonrbriggs/stomp.py/issues/393
+        """
+        try:
+            ClientIndividualListener.nack_message(self, message_id, subscription)
+        except BrokenPipeError:
+            logging.warning("Broken Pipe caused by Reconnect Loop/ ActiveMQ Disconnect %s", message_id)
diff --git a/notifications_consumer/data_source/postgres/postgres_data_source.py b/notifications_consumer/data_source/postgres/postgres_data_source.py
index 6e9f7bd..b2ab747 100644
--- a/notifications_consumer/data_source/postgres/postgres_data_source.py
+++ b/notifications_consumer/data_source/postgres/postgres_data_source.py
@@ -1,6 +1,7 @@
 """Postgres Data Source Implementation."""
 import enum
 import logging
+import threading
 import uuid
 from contextlib import contextmanager
 from typing import Dict, List
@@ -29,18 +30,31 @@ class SubmissionByEmail(enum.Enum):
 class PostgresDataSource(DataSource):
     """Implements methods from DataSource interface."""
 
+    lock = threading.Lock()
+
     Base = automap_base(
         metadata=MetaData(
             schema=Config.DB_SCHEMA,
         )
     )
 
+    Session = None
+
+    @classmethod
+    def _init_db_engine(cls):
+        engine = create_engine(Config.SQLALCHEMY_DATABASE_URI)
+        cls.Base.prepare(engine)
+        cls.Session = sessionmaker(engine)
+
     def __init__(self):
-        """Initialize Data Source."""
-        logging.debug("Init PostgresDataSource")
-        self.__engine = create_engine(Config.SQLALCHEMY_DATABASE_URI)
-        self.__session = sessionmaker(self.__engine)
-        self.Base.prepare(self.__engine)
+        """
+        Initialize Data Source.
+
+        Thread safe implementation that keeps Session maker as a class property.
+        """
+        with self.lock:
+            if not self.Session:
+                self._init_db_engine()
 
     @staticmethod
     def __build_channel(channel: "Channel") -> object:
@@ -75,7 +89,7 @@ class PostgresDataSource(DataSource):
     @contextmanager
     def session(self):
         """Open Session With Database."""
-        session = self.__session()
+        session = self.Session()
         session.expire_on_commit = False
 
         try:
diff --git a/notifications_consumer/megabus/client_individual_consumer.py b/notifications_consumer/megabus/client_individual_consumer.py
new file mode 100644
index 0000000..66c97ea
--- /dev/null
+++ b/notifications_consumer/megabus/client_individual_consumer.py
@@ -0,0 +1,110 @@
+"""Client Individual Consumer definition."""
+import stomp
+from megabus.common import get_hosts
+from megabus.consumer import Consumer, _StompListener
+from megabus.exceptions import ConnectFailedException
+
+from notifications_consumer.megabus.client_individual_listener import ClientIndividualListener
+
+
+class _ClientIndividualStompListener(_StompListener):
+    """
+    Custom stomp listener that supports subscribe ack='client/client-individual'.
+
+    Read more: https://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE
+
+    Sets a connection inside the Listener class,
+    in order for on_message implementations be capable of ack/nack in a specific connection.
+    """
+
+    def __init__(self, listener: ClientIndividualListener, connection):
+        _StompListener.__init__(self, listener)
+        listener.set_connection(connection)
+
+    def on_error(self, headers, body):
+        self._listener.on_error(headers, body)
+
+
+class ClientIndividualConsumer(Consumer):
+    """
+    Custom consumer that supports subscribe ack='client/client-individual'.
+
+    Read more: https://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE
+
+    Instead of receiving a Listener class instance, to be shared among all connections, it receives a class reference
+    and the instance kwargs which to be passed to the Listener instantiation,
+    eg
+        ClientIndividualConsumer(
+            listener_class=MyConsumer
+            listener_kwargs={'kwarg': value})
+
+    This class differs from its base in the reconnection loop. Instead of reusing the Listener for every
+    stomp.Connection, its creates an individual Listener per connection and its sets the connection inside the
+    listener as well, to allow easy ack/nack implementations.
+    """
+
+    def __init__(self, listener_class, listener_kwargs, auto_connect=True, **kwargs):
+        """Initialize the custom consumer."""
+        Consumer.__init__(self, listener=None, auto_connect=False, **kwargs)
+
+        self._external_listener = listener_class
+        self._listener_kwargs = listener_kwargs
+
+        # Auto connect must be called until _listener_kwargs are set
+        if auto_connect:
+            self.connect()
+
+    def _set_listener(self, listener):
+        pass
+
+    def _create_listener_instance(
+        self,
+    ):
+        """Create a new instance of the Listener class."""
+        return self._external_listener(**self._listener_kwargs)
+
+    def _simple_connect(self):
+        """
+        Create a new Listener per connection in order to allow ack/nack to that specific connection.
+
+        Sets the connection inside the listener.
+
+        We don't reuse the Listener instance for the new connections because get_hosts is dynamic and its length
+        might change.
+        """
+        hosts = get_hosts(self._server, self._port, self._use_multiple_brokers)
+        for host in hosts:
+            external_listener_instance = self._create_listener_instance()
+            try:
+                try:
+                    connection = stomp.Connection([(host, self._port)])
+                    listener = _ClientIndividualStompListener(external_listener_instance, connection)
+
+                    if self._auth_method == "password":
+                        connection.set_listener("", listener)
+                        connection.connect(wait=True, username=self._user, passcode=self._pass)
+                    else:
+                        connection.set_ssl(
+                            for_hosts=[(host, self._port)],
+                            key_file=self._ssl_key_file,
+                            cert_file=self._ssl_cert_file,
+                            ssl_version=2,
+                        )
+                        connection.set_listener("", listener)
+                        connection.connect(wait=True)
+                    connection.subscribe(destination=self._full_destination, headers=self._headers, ack=self._ack, id=1)
+                    self._connections.append(connection)
+
+                except stomp.exception.ConnectFailedException:
+                    raise ConnectFailedException("stomp.exception.ConnectFailedException")
+                except stomp.exception.NotConnectedException:
+                    raise ConnectFailedException("stomp.exception.NotConnectedException")
+                except stomp.exception.ConnectionClosedException:
+                    raise ConnectFailedException("stomp.exception.ConnectionClosedException")
+            except ConnectFailedException as e:
+                external_listener_instance.on_exception("CONNECTION", e)
+
+    def disconnect(self):
+        """Clear the _connections list for memory saving."""
+        Consumer.disconnect(self)
+        self._connections.clear()
diff --git a/notifications_consumer/megabus/client_individual_listener.py b/notifications_consumer/megabus/client_individual_listener.py
new file mode 100644
index 0000000..339f921
--- /dev/null
+++ b/notifications_consumer/megabus/client_individual_listener.py
@@ -0,0 +1,46 @@
+"""Client Individual Listener definition."""
+import logging
+
+from megabus.listener import Listener
+
+
+class ClientIndividualListener(Listener):
+    """
+    Custom Listener that supports subscribe ack='client/client-individual'.
+
+    Read more: https://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE
+
+    This implementation has access to its own connection, through set_connection method, in order to allow wasy ack/nack
+    through the on_message method.
+
+    This class should be used with ClientIndividualConsumer.
+    """
+
+    def __init__(self):
+        """Initialize the class."""
+        Listener.__init__(self)
+        self.connection = None
+
+    def ack_message(self, message_id, subscription):
+        """Ack a message."""
+        logging.info("Ack message message_id:%s", message_id)
+        if self.connection.is_connected():
+            self.connection.ack(message_id, subscription)
+        else:
+            logging.warning("%s %s", "ack_message: not connected", message_id)
+
+    def nack_message(self, message_id, subscription):
+        """Nack a message."""
+        logging.debug("Nack message: message_id:%s", message_id)
+        if self.connection.is_connected():
+            self.connection.nack(message_id, subscription)
+        else:
+            logging.debug("%s %s", "nack_message: not connected", message_id)
+
+    def set_connection(self, connection):
+        """Set the connection."""
+        self.connection = connection
+
+    def on_error(self, headers, body):
+        """On error handler."""
+        logging.warning("Failed processing message. Error %s", body)
diff --git a/notifications_consumer/processors/email/aescipher.py b/notifications_consumer/processors/email/aescipher.py
index 0638451..21d85cf 100644
--- a/notifications_consumer/processors/email/aescipher.py
+++ b/notifications_consumer/processors/email/aescipher.py
@@ -18,7 +18,7 @@ class AESCipher:
 
     def __init__(self, key, blk_sz=16):
         """Cipher AES Init."""
-        self.key = key
+        self.key = key.encode()
         self.blk_sz = blk_sz
 
     def encrypt(self, raw):
diff --git a/notifications_consumer/processors/email/processor.py b/notifications_consumer/processors/email/processor.py
index b78c8d3..0da99e7 100644
--- a/notifications_consumer/processors/email/processor.py
+++ b/notifications_consumer/processors/email/processor.py
@@ -3,7 +3,6 @@ import logging
 from datetime import datetime
 from typing import Dict
 
-from notifications_consumer.auditing import audit_notification, get_audit_notification
 from notifications_consumer.config import ENV_DEV, Config
 from notifications_consumer.email_whitelist import get_email_whitelist
 from notifications_consumer.processors.email.utils import create_email, send_emails
@@ -40,13 +39,14 @@ class EmailProcessor(Processor):
                     recipient_email,
                     Config.EMAIL_WHITELIST_GROUP_ID,
                 )
-                audit_notification(kwargs["notification_id"], {"event": "Skipped, not in whitelist"}, recipient_email)
+                self.auditor.audit_notification(
+                    kwargs["notification_id"], {"event": "Skipped, not in whitelist"}, recipient_email
+                )
                 return
 
         created_at = kwargs.get("created_at", "")
         try:
             created_at = datetime.strptime(created_at, "%m/%d/%Y, %H:%M:%S").strftime("%d/%m/%Y, %H:%M:%S")
-
         except Exception:
             logging.exception("Failed to process created at date")
 
@@ -78,14 +78,14 @@ class EmailProcessor(Processor):
             context,
         )
 
-        if get_audit_notification(kwargs["notification_id"], recipient_email):
-            logging.warning(
-                "%s is already sent to %s according ot etcd, skipping", kwargs["notification_id"], recipient_email
+        if self.auditor.get_audit_notification(kwargs["notification_id"], recipient_email):
+            logging.info(
+                "%s is already sent to %s according to etcd, skipping", kwargs["notification_id"], recipient_email
             )
             return
 
         ret = send_emails([email])
-        audit_notification(kwargs["notification_id"], {"event": "Sent"}, recipient_email)
+        self.auditor.audit_notification(kwargs["notification_id"], {"event": "Sent"}, recipient_email)
         return ret
 
     def read_message(self, message: Dict):
diff --git a/notifications_consumer/processors/email_feed/processor.py b/notifications_consumer/processors/email_feed/processor.py
index 37cb86d..69dde9a 100644
--- a/notifications_consumer/processors/email_feed/processor.py
+++ b/notifications_consumer/processors/email_feed/processor.py
@@ -4,6 +4,8 @@ import logging
 from datetime import datetime
 from typing import Dict
 
+from bs4 import BeautifulSoup
+
 from notifications_consumer.config import ENV_DEV, Config
 from notifications_consumer.email_whitelist import get_email_whitelist
 from notifications_consumer.processors.email.utils import create_email, send_emails
@@ -11,8 +13,6 @@ from notifications_consumer.processors.processor import Processor
 from notifications_consumer.processors.registry import ProcessorRegistry
 from notifications_consumer.utils import NotificationPriority
 
-from bs4 import BeautifulSoup
-
 
 @ProcessorRegistry.register
 class EmailFeedProcessor(Processor):
@@ -48,7 +48,8 @@ class EmailFeedProcessor(Processor):
                     recipient_email,
                     Config.EMAIL_WHITELIST_GROUP_ID,
                 )
-                # audit_notification(kwargs["notification_id"], {"event": "Skipped, not in whitelist"}, recipient_email)
+                # self.auditor.audit_notification(kwargs["notification_id"], {"event": "Skipped, not in whitelist"},
+                # recipient_email)
                 return
 
         title = f'{Config.FEED_TITLE} Summary - {datetime.now().strftime("%d %B %Y")}'
@@ -57,15 +58,12 @@ class EmailFeedProcessor(Processor):
         notification_ids = kwargs["notifications"]
         notifications = self.data_source.get_channel_notifications(notification_ids)
         grouped_notifications = itertools.groupby(
-            notifications,
-            lambda n: (n['targetId'], n['channel']['name'], n['channel']['slug'])
-            )
+            notifications, lambda n: (n["targetId"], n["channel"]["name"], n["channel"]["slug"])
+        )
 
         def prepare_notifications(notifications):
             for notification in notifications:
-                notification.update(
-                    {('body', BeautifulSoup(notification['body'], "lxml").get_text(" "))}
-                )
+                notification.update({("body", BeautifulSoup(notification["body"], "lxml").get_text(" "))})
             return notifications
 
         context = {
@@ -87,14 +85,14 @@ class EmailFeedProcessor(Processor):
             context,
         )
 
-        # if get_audit_notification(kwargs["notification_id"], recipient_email):
+        # if self.auditor.get_audit_notification(kwargs["notification_id"], recipient_email):
         #     logging.warning(
         #         "%s is already sent to %s according ot etcd, skipping", kwargs["notification_id"], recipient_email
         #     )
         #     return
 
         ret = send_emails([email])
-        # audit_notification(kwargs["notification_id"], {"event": "Sent"}, recipient_email)
+        # self.auditor.audit_notification(kwargs["notification_id"], {"event": "Sent"}, recipient_email)
         return ret
 
     def read_message(self, message: Dict):
diff --git a/notifications_consumer/processors/email_gateway/processor.py b/notifications_consumer/processors/email_gateway/processor.py
index 71a0dec..7fb3860 100644
--- a/notifications_consumer/processors/email_gateway/processor.py
+++ b/notifications_consumer/processors/email_gateway/processor.py
@@ -85,7 +85,7 @@ class MailGatewayProcessor(Processor):
             "body": content,
             "summary": subject,
             "target": channel_id,
-            "priority": priority,
+            "priority": priority.upper(),
             "sender": sender,
             "source": Config.NOTIFICATION_SOURCE,
         }
diff --git a/notifications_consumer/processors/mattermost/processor.py b/notifications_consumer/processors/mattermost/processor.py
index b9a696d..c571038 100644
--- a/notifications_consumer/processors/mattermost/processor.py
+++ b/notifications_consumer/processors/mattermost/processor.py
@@ -2,15 +2,14 @@
 import logging
 from typing import Dict
 
-from notifications_consumer.auditing import audit_notification, get_audit_notification
+from bs4 import BeautifulSoup
+from lxml.html.clean import Cleaner
+from markdownify import MarkdownConverter
+
 from notifications_consumer.processors.mattermost.utils import create_message, send_message
 from notifications_consumer.processors.processor import Processor
 from notifications_consumer.processors.registry import ProcessorRegistry
 
-from bs4 import BeautifulSoup
-from markdownify import MarkdownConverter
-from lxml.html.clean import Cleaner
-
 
 @ProcessorRegistry.register
 class MattermostProcessor(Processor):
@@ -45,14 +44,16 @@ class MattermostProcessor(Processor):
             kwargs["img_url"],
         )
 
-        if get_audit_notification(kwargs["notification_id"], device_id, email):
-            logging.warning(
+        if self.auditor.get_audit_notification(kwargs["notification_id"], device_id, email):
+            logging.info(
                 "%s is already sent to %s according ot etcd, skipping", kwargs["notification_id"], device_token
             )
             return
 
         ret = send_message(wpmessage, device_token)
-        audit_notification(kwargs["notification_id"], {"event": "Sent", "device_token": device_token}, device_id, email)
+        self.auditor.audit_notification(
+            kwargs["notification_id"], {"event": "Sent", "device_token": device_token}, device_id, email
+        )
         return ret
 
     def read_message(self, message: Dict):
diff --git a/notifications_consumer/processors/processor.py b/notifications_consumer/processors/processor.py
index 0c85511..80bf3ef 100644
--- a/notifications_consumer/processors/processor.py
+++ b/notifications_consumer/processors/processor.py
@@ -2,6 +2,8 @@
 from abc import ABC, abstractmethod
 from typing import Dict
 
+from notifications_consumer.auditing import NotificationAuditor
+
 
 class Processor(ABC):
     """Notifications processor base class."""
@@ -10,6 +12,7 @@ class Processor(ABC):
         """Initialize the Processor."""
         super(Processor, self).__init__()
         self.__consumer_name = kwargs["config"].CONSUMER_NAME
+        self.auditor = NotificationAuditor()
 
     @classmethod
     def id(cls):
diff --git a/notifications_consumer/processors/registry.py b/notifications_consumer/processors/registry.py
index e231b94..2f11ae3 100644
--- a/notifications_consumer/processors/registry.py
+++ b/notifications_consumer/processors/registry.py
@@ -19,10 +19,16 @@ class ProcessorRegistry:
 
         if processor_cls.id() == config.PROCESSOR:
             logging.debug("Register processor: %s", processor_cls.id())
-            cls.registry[processor_cls.id()] = processor_cls(**build_kwargs(config))
+            cls.registry[processor_cls.id()] = processor_cls
 
         return processor_cls
 
+    @classmethod
+    def processor(cls, processor_id, config):
+        """Return a new processor instance."""
+        processor_cls = cls.registry[processor_id]
+        return processor_cls(**build_kwargs(config))
+
 
 def build_kwargs(config: Config) -> dict:
     """Build processor kwargs."""
diff --git a/notifications_consumer/processors/safaripush/processor.py b/notifications_consumer/processors/safaripush/processor.py
index 01e0587..8123d60 100644
--- a/notifications_consumer/processors/safaripush/processor.py
+++ b/notifications_consumer/processors/safaripush/processor.py
@@ -2,13 +2,12 @@
 import logging
 from typing import Dict
 
-from notifications_consumer.auditing import audit_notification, get_audit_notification
+from bs4 import BeautifulSoup
+
 from notifications_consumer.processors.processor import Processor
 from notifications_consumer.processors.registry import ProcessorRegistry
 from notifications_consumer.processors.safaripush.utils import create_message, send_message
 
-from bs4 import BeautifulSoup
-
 
 @ProcessorRegistry.register
 class SafariPushProcessor(Processor):
@@ -42,14 +41,16 @@ class SafariPushProcessor(Processor):
             kwargs["img_url"],
         )
 
-        if get_audit_notification(kwargs["notification_id"], device_id, email):
-            logging.warning(
+        if self.auditor.get_audit_notification(kwargs["notification_id"], device_id, email):
+            logging.info(
                 "%s is already sent to %s according ot etcd, skipping", kwargs["notification_id"], device_token
             )
             return
 
         ret = send_message(spmessage, device_token)
-        audit_notification(kwargs["notification_id"], {"event": "Sent", "device_token": device_token}, device_id, email)
+        self.auditor.audit_notification(
+            kwargs["notification_id"], {"event": "Sent", "device_token": device_token}, device_id, email
+        )
         return ret
 
     def read_message(self, message: Dict):
diff --git a/notifications_consumer/processors/webpush/processor.py b/notifications_consumer/processors/webpush/processor.py
index 7ce14a7..954f51d 100644
--- a/notifications_consumer/processors/webpush/processor.py
+++ b/notifications_consumer/processors/webpush/processor.py
@@ -2,13 +2,12 @@
 import logging
 from typing import Dict
 
-from notifications_consumer.auditing import audit_notification, get_audit_notification
+from bs4 import BeautifulSoup
+
 from notifications_consumer.processors.processor import Processor
 from notifications_consumer.processors.registry import ProcessorRegistry
 from notifications_consumer.processors.webpush.utils import create_message, send_message
 
-from bs4 import BeautifulSoup
-
 
 @ProcessorRegistry.register
 class WebPushProcessor(Processor):
@@ -43,14 +42,14 @@ class WebPushProcessor(Processor):
             kwargs["img_url"],
         )
 
-        if get_audit_notification(kwargs["notification_id"], device_id, email):
-            logging.warning(
-                "%s is already sent to %s according ot etcd, skipping", kwargs["notification_id"], device_id
-            )
+        if self.auditor.get_audit_notification(kwargs["notification_id"], device_id, email):
+            logging.info("%s is already sent to %s according ot etcd, skipping", kwargs["notification_id"], device_id)
             return
 
         ret = send_message(wpmessage, device_token, encoding)
-        audit_notification(kwargs["notification_id"], {"event": "Sent", "device_token": device_token}, device_id, email)
+        self.auditor.audit_notification(
+            kwargs["notification_id"], {"event": "Sent", "device_token": device_token}, device_id, email
+        )
         return ret
 
     def read_message(self, message: Dict):
diff --git a/poetry.lock b/poetry.lock
index f836f0a..072f634 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -558,6 +558,14 @@ category = "dev"
 optional = false
 python-versions = "*"
 
+[[package]]
+name = "objprint"
+version = "0.2.2"
+description = "A library that can print Python objects in human readable format"
+category = "main"
+optional = false
+python-versions = ">=3.6"
+
 [[package]]
 name = "oscrypto"
 version = "1.3.0"
@@ -718,12 +726,12 @@ optional = false
 python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
 
 [[package]]
-name = "pycrypto"
-version = "2.6.1"
-description = "Cryptographic modules for Python."
+name = "pycryptodome"
+version = "3.15.0"
+description = "Cryptographic library for Python"
 category = "main"
 optional = false
-python-versions = "*"
+python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
 
 [[package]]
 name = "pydocstyle"
@@ -958,7 +966,7 @@ pymysql = ["pymysql"]
 
 [[package]]
 name = "stomp.py"
-version = "6.1.0"
+version = "7.0.0"
 description = "Python STOMP client, supporting versions 1.0, 1.1 and 1.2 of the protocol"
 category = "main"
 optional = false
@@ -1040,6 +1048,17 @@ six = ">=1.9.0,<2"
 docs = ["proselint (>=0.10.2)", "sphinx (>=3)", "sphinx-argparse (>=0.2.5)", "sphinx-rtd-theme (>=0.4.3)", "towncrier (>=21.3)"]
 testing = ["coverage (>=4)", "coverage-enable-subprocess (>=1)", "flaky (>=3)", "pytest (>=4)", "pytest-env (>=0.6.2)", "pytest-freezegun (>=0.4.1)", "pytest-mock (>=2)", "pytest-randomly (>=1)", "pytest-timeout (>=1)", "packaging (>=20.0)"]
 
+[[package]]
+name = "watchpoints"
+version = "0.2.5"
+description = "watchpoints monitors read and write on variables"
+category = "main"
+optional = false
+python-versions = ">=3.6"
+
+[package.dependencies]
+objprint = ">=0.1.3"
+
 [[package]]
 name = "wcwidth"
 version = "0.2.5"
@@ -1084,7 +1103,7 @@ testing = ["pytest (>=4.6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytes
 [metadata]
 lock-version = "1.1"
 python-versions = "^3.6.1"
-content-hash = "3e06ca5304ac420d9d2634ad56cbcf605eff18f3ff5902455f9d37631c8dd877"
+content-hash = "3e248119a43c3556cb8a5dbd4f3db0329a617d9a1291aff7d7ff5d1da2e22e15"
 
 [metadata.files]
 aiohttp = [
@@ -1683,6 +1702,10 @@ nodeenv = [
     {file = "nodeenv-1.6.0-py2.py3-none-any.whl", hash = "sha256:621e6b7076565ddcacd2db0294c0381e01fd28945ab36bcf00f41c5daf63bef7"},
     {file = "nodeenv-1.6.0.tar.gz", hash = "sha256:3ef13ff90291ba2a4a7a4ff9a979b63ffdd00a464dbe04acf0ea6471517a4c2b"},
 ]
+objprint = [
+    {file = "objprint-0.2.2-py3-none-any.whl", hash = "sha256:9f50bb3b7cbe95b6d22c79e4f08a5273e3f7e5ff5ce35e146f6e7854abd276a0"},
+    {file = "objprint-0.2.2.tar.gz", hash = "sha256:90f2f3c19dfda1b5eb50d87d82ccbc20511ca4f02c4878553c56edc711e39689"},
+]
 oscrypto = [
     {file = "oscrypto-1.3.0-py2.py3-none-any.whl", hash = "sha256:2b2f1d2d42ec152ca90ccb5682f3e051fb55986e1b170ebde472b133713e7085"},
     {file = "oscrypto-1.3.0.tar.gz", hash = "sha256:6f5fef59cb5b3708321db7cca56aed8ad7e662853351e7991fcf60ec606d47a4"},
@@ -1796,8 +1819,37 @@ pycparser = [
     {file = "pycparser-2.21-py2.py3-none-any.whl", hash = "sha256:8ee45429555515e1f6b185e78100aea234072576aa43ab53aefcae078162fca9"},
     {file = "pycparser-2.21.tar.gz", hash = "sha256:e644fdec12f7872f86c58ff790da456218b10f863970249516d60a5eaca77206"},
 ]
-pycrypto = [
-    {file = "pycrypto-2.6.1.tar.gz", hash = "sha256:f2ce1e989b272cfcb677616763e0a2e7ec659effa67a88aa92b3a65528f60a3c"},
+pycryptodome = [
+    {file = "pycryptodome-3.15.0-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:ff7ae90e36c1715a54446e7872b76102baa5c63aa980917f4aa45e8c78d1a3ec"},
+    {file = "pycryptodome-3.15.0-cp27-cp27m-manylinux1_i686.whl", hash = "sha256:2ffd8b31561455453ca9f62cb4c24e6b8d119d6d531087af5f14b64bee2c23e6"},
+    {file = "pycryptodome-3.15.0-cp27-cp27m-manylinux1_x86_64.whl", hash = "sha256:2ea63d46157386c5053cfebcdd9bd8e0c8b7b0ac4a0507a027f5174929403884"},
+    {file = "pycryptodome-3.15.0-cp27-cp27m-manylinux2010_i686.whl", hash = "sha256:7c9ed8aa31c146bef65d89a1b655f5f4eab5e1120f55fc297713c89c9e56ff0b"},
+    {file = "pycryptodome-3.15.0-cp27-cp27m-manylinux2010_x86_64.whl", hash = "sha256:5099c9ca345b2f252f0c28e96904643153bae9258647585e5e6f649bb7a1844a"},
+    {file = "pycryptodome-3.15.0-cp27-cp27m-manylinux2014_aarch64.whl", hash = "sha256:2ec709b0a58b539a4f9d33fb8508264c3678d7edb33a68b8906ba914f71e8c13"},
+    {file = "pycryptodome-3.15.0-cp27-cp27m-win32.whl", hash = "sha256:fd2184aae6ee2a944aaa49113e6f5787cdc5e4db1eb8edb1aea914bd75f33a0c"},
+    {file = "pycryptodome-3.15.0-cp27-cp27m-win_amd64.whl", hash = "sha256:7e3a8f6ee405b3bd1c4da371b93c31f7027944b2bcce0697022801db93120d83"},
+    {file = "pycryptodome-3.15.0-cp27-cp27mu-manylinux1_i686.whl", hash = "sha256:b9c5b1a1977491533dfd31e01550ee36ae0249d78aae7f632590db833a5012b8"},
+    {file = "pycryptodome-3.15.0-cp27-cp27mu-manylinux1_x86_64.whl", hash = "sha256:0926f7cc3735033061ef3cf27ed16faad6544b14666410727b31fea85a5b16eb"},
+    {file = "pycryptodome-3.15.0-cp27-cp27mu-manylinux2010_i686.whl", hash = "sha256:2aa55aae81f935a08d5a3c2042eb81741a43e044bd8a81ea7239448ad751f763"},
+    {file = "pycryptodome-3.15.0-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:c3640deff4197fa064295aaac10ab49a0d55ef3d6a54ae1499c40d646655c89f"},
+    {file = "pycryptodome-3.15.0-cp27-cp27mu-manylinux2014_aarch64.whl", hash = "sha256:045d75527241d17e6ef13636d845a12e54660aa82e823b3b3341bcf5af03fa79"},
+    {file = "pycryptodome-3.15.0-cp35-abi3-macosx_10_9_x86_64.whl", hash = "sha256:9ee40e2168f1348ae476676a2e938ca80a2f57b14a249d8fe0d3cdf803e5a676"},
+    {file = "pycryptodome-3.15.0-cp35-abi3-manylinux1_i686.whl", hash = "sha256:4c3ccad74eeb7b001f3538643c4225eac398c77d617ebb3e57571a897943c667"},
+    {file = "pycryptodome-3.15.0-cp35-abi3-manylinux1_x86_64.whl", hash = "sha256:1b22bcd9ec55e9c74927f6b1f69843cb256fb5a465088ce62837f793d9ffea88"},
+    {file = "pycryptodome-3.15.0-cp35-abi3-manylinux2010_i686.whl", hash = "sha256:57f565acd2f0cf6fb3e1ba553d0cb1f33405ec1f9c5ded9b9a0a5320f2c0bd3d"},
+    {file = "pycryptodome-3.15.0-cp35-abi3-manylinux2010_x86_64.whl", hash = "sha256:4b52cb18b0ad46087caeb37a15e08040f3b4c2d444d58371b6f5d786d95534c2"},
+    {file = "pycryptodome-3.15.0-cp35-abi3-manylinux2014_aarch64.whl", hash = "sha256:092a26e78b73f2530b8bd6b3898e7453ab2f36e42fd85097d705d6aba2ec3e5e"},
+    {file = "pycryptodome-3.15.0-cp35-abi3-win32.whl", hash = "sha256:e244ab85c422260de91cda6379e8e986405b4f13dc97d2876497178707f87fc1"},
+    {file = "pycryptodome-3.15.0-cp35-abi3-win_amd64.whl", hash = "sha256:c77126899c4b9c9827ddf50565e93955cb3996813c18900c16b2ea0474e130e9"},
+    {file = "pycryptodome-3.15.0-pp27-pypy_73-macosx_10_9_x86_64.whl", hash = "sha256:9eaadc058106344a566dc51d3d3a758ab07f8edde013712bc8d22032a86b264f"},
+    {file = "pycryptodome-3.15.0-pp27-pypy_73-manylinux1_x86_64.whl", hash = "sha256:ff287bcba9fbeb4f1cccc1f2e90a08d691480735a611ee83c80a7d74ad72b9d9"},
+    {file = "pycryptodome-3.15.0-pp27-pypy_73-manylinux2010_x86_64.whl", hash = "sha256:60b4faae330c3624cc5a546ba9cfd7b8273995a15de94ee4538130d74953ec2e"},
+    {file = "pycryptodome-3.15.0-pp27-pypy_73-win32.whl", hash = "sha256:a8f06611e691c2ce45ca09bbf983e2ff2f8f4f87313609d80c125aff9fad6e7f"},
+    {file = "pycryptodome-3.15.0-pp36-pypy36_pp73-macosx_10_9_x86_64.whl", hash = "sha256:b9cc96e274b253e47ad33ae1fccc36ea386f5251a823ccb50593a935db47fdd2"},
+    {file = "pycryptodome-3.15.0-pp36-pypy36_pp73-manylinux1_x86_64.whl", hash = "sha256:ecaaef2d21b365d9c5ca8427ffc10cebed9d9102749fd502218c23cb9a05feb5"},
+    {file = "pycryptodome-3.15.0-pp36-pypy36_pp73-manylinux2010_x86_64.whl", hash = "sha256:d2a39a66057ab191e5c27211a7daf8f0737f23acbf6b3562b25a62df65ffcb7b"},
+    {file = "pycryptodome-3.15.0-pp36-pypy36_pp73-win32.whl", hash = "sha256:9c772c485b27967514d0df1458b56875f4b6d025566bf27399d0c239ff1b369f"},
+    {file = "pycryptodome-3.15.0.tar.gz", hash = "sha256:9135dddad504592bcc18b0d2d95ce86c3a5ea87ec6447ef25cfedea12d6018b8"},
 ]
 pydocstyle = [
     {file = "pydocstyle-6.1.1-py3-none-any.whl", hash = "sha256:6987826d6775056839940041beef5c08cc7e3d71d63149b48e36727f70144dc4"},
@@ -1930,8 +1982,8 @@ sqlalchemy = [
     {file = "SQLAlchemy-1.3.20.tar.gz", hash = "sha256:d2f25c7f410338d31666d7ddedfa67570900e248b940d186b48461bd4e5569a1"},
 ]
 "stomp.py" = [
-    {file = "stomp.py-6.1.0-py3-none-any.whl", hash = "sha256:8a1ed68cd8b12f41ba56a8dfeff995e7866d1d47ed7f53aaa78da3bea44696b8"},
-    {file = "stomp.py-6.1.0.tar.gz", hash = "sha256:1f6c7e1e5089b1d8a75161e66533cabb9895de5121cc3900cb7e12c41c1bda18"},
+    {file = "stomp.py-7.0.0-py3-none-any.whl", hash = "sha256:6e1d93f2b2a7c63301f3e09e7ffa82ea80affec59164cd8c9b7807af4fe0e732"},
+    {file = "stomp.py-7.0.0.tar.gz", hash = "sha256:fb301f338292b1b95089c6f1d3a38a9dd8353a5ff3f921e389dfa5f9413b5a8a"},
 ]
 toml = [
     {file = "toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b"},
@@ -1956,6 +2008,10 @@ virtualenv = [
     {file = "virtualenv-20.14.1-py2.py3-none-any.whl", hash = "sha256:e617f16e25b42eb4f6e74096b9c9e37713cf10bf30168fb4a739f3fa8f898a3a"},
     {file = "virtualenv-20.14.1.tar.gz", hash = "sha256:ef589a79795589aada0c1c5b319486797c03b67ac3984c48c669c0e4f50df3a5"},
 ]
+watchpoints = [
+    {file = "watchpoints-0.2.5-py3-none-any.whl", hash = "sha256:4c5b9e94b281939c66183579f8c1b7900caf6cd16a10a95d3646814f12bad8d5"},
+    {file = "watchpoints-0.2.5.tar.gz", hash = "sha256:779aac67aa7263b1ec110b4eee1abafbd49ad14aaa52c64e1e0867b62b35383c"},
+]
 wcwidth = [
     {file = "wcwidth-0.2.5-py2.py3-none-any.whl", hash = "sha256:beb4802a9cebb9144e99086eff703a642a13d6a0052920003a230f3294bbe784"},
     {file = "wcwidth-0.2.5.tar.gz", hash = "sha256:c4d647b99872929fdb7bdcaa4fbe7f01413ed3d98077df798530e5b04f116c83"},
diff --git a/pyproject.toml b/pyproject.toml
index 0727966..f0823d5 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -7,7 +7,7 @@ license = "MIT"
 
 [tool.poetry.dependencies]
 python = "^3.6.1"
-"stomp.py" = "6.1.0"
+"stomp.py" = "7.0.0"
 Jinja2 = "^2.11.2"
 sentry-sdk = "^0.19.2"
 six = "^1.15.0"
@@ -18,7 +18,6 @@ pywebpush = "^1.11.0"
 apns2 = "^0.7.2"
 pytest = "^6.2.3"
 unittest = "^0.0"
-pycrypto = "^2.6.1"
 python-smail = "^0.9.0"
 mattermostdriver = "^7.3.1"
 cachetools = "^4.2.4"
@@ -26,6 +25,8 @@ etcd3-py = "^0.1.6"
 beautifulsoup4 = "^4.11.1"
 lxml = "^4.8.0"
 markdownify = "^0.11.2"
+watchpoints = "^0.2.5"
+pycryptodome = "^3.15.0"
 
 [tool.poetry.dev-dependencies]
 pre-commit = "~2.9.2"
diff --git a/scripts/activemq_messages/safaripush.json b/scripts/activemq_messages/safaripush.json
index 84febb3..a152a66 100644
--- a/scripts/activemq_messages/safaripush.json
+++ b/scripts/activemq_messages/safaripush.json
@@ -1,9 +1,13 @@
 {
-    "channel_name": "The Best Notifications",
-    "message_body": "<p>This is an <a href=\"http://example.com/\">example link</a>.</p><!-- This is a comment --><code>/home/\nhicodeline2\ncodeline3</code>\n<img src=\"img_girl.jpg\" alt=\"Girl in a jacket\" width=\"500\" height=\"600\">\n<h1>Hello</h1>\n<h2>Hello</h2>",
-    "summary": "sub test EO",
-    "link": "http://cds.cern.ch/record/2687667",
-    "img_url": "http://cds.cern.ch/record/2687667/files/CLICtd.png?subformat=icon-640",
-    "device_token": "FD276F2A142090371ECEAAA368FB99C675577194DAA3F2968DBEB3F10FA4D60F",
-    "email": "emmanuel.ormancey@cern.ch",
+    "notification_id": "c5530f38-286e-40ee-8d5c-b5b41cc1fbb4",
+    "channel_id": "b6048b90-e55f-4274-bedf-42a10ba8908e",
+    "channel_name": "IT Status Board",
+    "message_body": "<b><a href=\"https://cern.service-now.com/service-portal?id=outage&amp;n=OTG0072193\">OTG0072193</a> - FactoryLogix update</b><br><p>This Outage is not public. In order to view the detailed description, please go to the <a href=\"https://cern.service-now.com/service-portal?id=outage&amp;n=OTG0072193\">Outage in the Service Status Board</a> and log in.</p><b>Type:</b> Planned Intervention<br> <b>Service element:</b> Electronics Design Software Service<br><b>Functional element</b>: Electronics Design Automation tools<br> <b>Begin:</b> 28-07-2022 09:00:00<br><b>End:</b> 28-07-2022 12:30:00<br>",
+    "summary": "FactoryLogix update",
+    "link": "https://cern.service-now.com/service-portal?id=outage&n=OTG0072193",
+    "img_url": "None",
+    "device_token": "E44E6B08ACCCA89570A50C222C611078329EE7645EF55A14EFC284A3F89D039F",
+    "device_id": "ade0d932-c795-44dd-bab2-559e50bfd7ae",
+    "email": "carina.antunes@cern.ch",
+    "encoding": "None"
 }
diff --git a/scripts/send-email.py b/scripts/send-email.py
index fc65b17..b43ab16 100644
--- a/scripts/send-email.py
+++ b/scripts/send-email.py
@@ -6,15 +6,18 @@ import stomp
 conn = stomp.Connection([("localhost", 61613)])
 conn.connect("admin", "admin", wait=True)
 message_body = r"""{
-"content":"<p>This is an <a href=\"http://example.com/\">example link</a>.</p><!-- This comment --><code>/home/\nhi"
-    "codeline2\ncodeline3</code>\n<img src=\"img_girl.jpg\" alt=\"Girl"
-    " in a jacket\" width=\"500\" height=\"600\">\n<h1>Hello</h1>\n<h2>Hello</h2>",
+"message_body":"<p>This is an notification</p>",
 "summary":"[IT Status Board] [Cloud Infrastructure] Instance creation impacted by DNS incident",
-"email":"user@cern.ch",
+"email":"carina.oliveira.antunes@cern.ch",
 "link":"http://toto.cern.ch",
 "created_at": "12/01/2021, 13:59:40",
 "channel_name":"zename",
-"channel_id":"123"
+"channel_id":"123",
+"notification_id":"1234"
 }"""
-conn.send(body=message_body, destination="/queue/np.email", headers={"persistent": "true"})
+trans_id = conn.begin()
+for n in range(100):
+    conn.send(body=message_body, destination="/queue/np.email", headers={"persistent": "true"}, transaction=trans_id)
+conn.commit(transaction=trans_id)
+
 conn.disconnect()
diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py
index ed8f2ff..bdfd14f 100644
--- a/tests/integration/conftest.py
+++ b/tests/integration/conftest.py
@@ -1,5 +1,4 @@
 """Package's fixtures."""
-
 import pytest
 from sqlalchemy import create_engine
 from sqlalchemy.schema import CreateSchema
@@ -36,7 +35,6 @@ def data_source():
     if not engine.dialect.has_schema(engine, Config.DB_SCHEMA):
         engine.execute(CreateSchema(Config.DB_SCHEMA))
 
-    # PostgresDataSource.Base.prepare(engine)
     PostgresDataSource.Base.metadata.create_all(engine)
 
     db = PostgresDataSource()
diff --git a/tests/integration/test_etcd.py b/tests/integration/test_etcd.py
index ae59f4a..39a164c 100644
--- a/tests/integration/test_etcd.py
+++ b/tests/integration/test_etcd.py
@@ -1,16 +1,17 @@
 """Integration Tests for ETCD."""
-from notifications_consumer.auditing import audit_notification, client, get_audit_notification
+from notifications_consumer.auditing import NotificationAuditor
 
 
 def test_etcd(appctx):
     """Test audit."""
-    assert client
+    auditor = NotificationAuditor()
+    assert auditor.client
 
     expected = {"event": "test"}
     id = "notification-id"
     key = "some-key"
 
-    audit_notification(id, expected, key=key)
-    value = get_audit_notification(id, key)
+    auditor.audit_notification(id, expected, key=key)
+    value = auditor.get_audit_notification(id, key)
     assert value["event"] == expected["event"]
     assert value["date"]
-- 
GitLab