From 3c0e1c229f64090399cb00614a39ca50abb28e86 Mon Sep 17 00:00:00 2001
From: Carina Antunes <carina.oliveira.antunes@cern.ch>
Date: Thu, 11 Mar 2021 12:58:41 +0000
Subject: [PATCH] [#31] Daily feed job

---
 .env-job-daily-feed                           |  2 +
 .flake8                                       |  3 +
 docker-compose.fedora.yml                     |  4 +-
 docker-compose.job.yml                        | 65 +++++++++++++++++++
 docker/activemq/conf/activemq.xml             | 13 ++++
 docker/activemq/email_daily_consumer.conf     | 14 ++++
 docker/activemq/email_daily_publisher.conf    | 12 ++++
 notifications_routing/config.py               |  4 ++
 .../data_source/data_source.py                | 26 ++++++++
 .../postgres/postgres_data_source.py          | 40 ++++++++++--
 notifications_routing/job.py                  | 50 ++++++++++++++
 notifications_routing/jobs/__init__.py        |  6 ++
 notifications_routing/jobs/daily_feed/job.py  | 57 ++++++++++++++++
 notifications_routing/jobs/job.py             | 33 ++++++++++
 notifications_routing/jobs/registry.py        | 35 ++++++++++
 notifications_routing/preferences.py          |  3 +-
 pyproject.toml                                |  3 +
 test/test_utils.py                            |  4 +-
 18 files changed, 365 insertions(+), 9 deletions(-)
 create mode 100644 .env-job-daily-feed
 create mode 100644 docker-compose.job.yml
 create mode 100644 docker/activemq/email_daily_consumer.conf
 create mode 100644 docker/activemq/email_daily_publisher.conf
 create mode 100644 notifications_routing/job.py
 create mode 100644 notifications_routing/jobs/__init__.py
 create mode 100644 notifications_routing/jobs/daily_feed/job.py
 create mode 100644 notifications_routing/jobs/job.py
 create mode 100644 notifications_routing/jobs/registry.py

diff --git a/.env-job-daily-feed b/.env-job-daily-feed
new file mode 100644
index 0000000..dec47bb
--- /dev/null
+++ b/.env-job-daily-feed
@@ -0,0 +1,2 @@
+PUBLISHER_NAME=email_daily_publisher
+JOB=daily-feed
diff --git a/.flake8 b/.flake8
index fe33ae6..3212a6f 100644
--- a/.flake8
+++ b/.flake8
@@ -2,3 +2,6 @@
 max-line-length = 120
 max-complexity = 10
 enable-extensions=G
+per-file-ignores =
+    # Imported but unused
+    notifications_routing/jobs/__init__.py: F401
diff --git a/docker-compose.fedora.yml b/docker-compose.fedora.yml
index a84ffa2..7506e11 100644
--- a/docker-compose.fedora.yml
+++ b/docker-compose.fedora.yml
@@ -28,7 +28,7 @@ services:
     image: postgres
     volumes:
       - pgsql-data:/var/lib/pgsql/data:rw
-      - ./scripts/push_dev.sql:/docker-entrypoint-initdb.d/init.sql
+      - ./scripts/dump.sql:/docker-entrypoint-initdb.d/init.sql
     networks:
       - default
     ports:
@@ -38,7 +38,7 @@ services:
       - POSTGRES_PASSWORD
       - POSTGRES_DB
     healthcheck:
-      test: ["CMD-SHELL", "pg_isready -U test -d push_dev"]
+      test: ["CMD-SHELL", "pg_isready -U admin -d push_dev"]
       interval: 10s
       timeout: 5s
       retries: 5
diff --git a/docker-compose.job.yml b/docker-compose.job.yml
new file mode 100644
index 0000000..f6f2ed9
--- /dev/null
+++ b/docker-compose.job.yml
@@ -0,0 +1,65 @@
+version: '3.7'
+
+services:
+  notifications-job-daily-feed:
+    container_name: notifications-job
+    build:
+      context: .
+      dockerfile: Dockerfile
+      args:
+        build_env: development
+    networks:
+      - default
+    volumes:
+      - '.:/opt:delegated'
+      - './docker/activemq/email_daily_publisher.conf:/etc/activemq-publisher-email_daily_publisher.conf'
+      - './docker/activemq/email_daily_consumer.conf:/etc/activemq-consumer-email_daily_consumer.conf'
+    env_file:
+      - .env
+      - .env-job-daily-feed
+    depends_on:
+      pg_db:
+        condition: service_healthy
+      activemq:
+        condition: service_healthy
+    command: "poetry run job"
+
+  pg_db:
+    image: postgres
+    volumes:
+      - pgsql-data:/var/lib/pgsql/data:rw
+      - ./scripts/dump.sql:/docker-entrypoint-initdb.d/init.sql
+    networks:
+      - default
+    ports:
+      - 5432:5432
+    environment:
+      - POSTGRES_USER
+      - POSTGRES_PASSWORD
+      - POSTGRES_DB
+    healthcheck:
+      test: ["CMD-SHELL", "pg_isready -U admin -d push_dev"]
+      interval: 10s
+      timeout: 5s
+      retries: 5
+
+  activemq:
+    build: ./docker/activemq/5.16.0-alpine
+    volumes:
+      - './docker/activemq/conf/activemq.xml:/opt/apache-activemq-5.16.0/conf/activemq.xml'
+    ports:
+      - 61613:61613
+      - 8161:8161
+    networks:
+      - default
+    healthcheck:
+      test: ["CMD", "nc", "-vz", "localhost", "8161"]
+      interval: 5s
+      timeout: 5s
+      retries: 5
+
+networks:
+  default:
+
+volumes:
+  pgsql-data:
diff --git a/docker/activemq/conf/activemq.xml b/docker/activemq/conf/activemq.xml
index 80f9e76..13d7e58 100644
--- a/docker/activemq/conf/activemq.xml
+++ b/docker/activemq/conf/activemq.xml
@@ -67,6 +67,14 @@
                                     maximumRedeliveryDelay="-1"
                                     useExponentialBackOff="true"
                             />
+                            <redeliveryPolicy
+                                    queue="np.email-daily"
+                                    maximumRedeliveries="6"
+                                    initialRedeliveryDelay="10000"
+                                    backOffMultiplier="3"
+                                    maximumRedeliveryDelay="-1"
+                                    useExponentialBackOff="true"
+                            />
                         </redeliveryPolicyEntries>
                     </redeliveryPolicyMap>
                 </redeliveryPolicyMap>
@@ -113,6 +121,11 @@
                     <individualDeadLetterStrategy queuePrefix="" queueSuffix=".dlq" useQueueForQueueMessages="true"/>
                   </deadLetterStrategy>
                 </policyEntry>
+                <policyEntry queue="np.email-daily">
+                  <deadLetterStrategy>
+                    <individualDeadLetterStrategy queuePrefix="" queueSuffix=".dlq" useQueueForQueueMessages="true"/>
+                  </deadLetterStrategy>
+                </policyEntry>
               </policyEntries>
             </policyMap>
         </destinationPolicy>
diff --git a/docker/activemq/email_daily_consumer.conf b/docker/activemq/email_daily_consumer.conf
new file mode 100644
index 0000000..3a1a367
--- /dev/null
+++ b/docker/activemq/email_daily_consumer.conf
@@ -0,0 +1,14 @@
+[client]
+server:activemq
+auth_method:password
+port:61613
+destination:np
+destination_type:queue
+hostgroup_selector:
+host_selector:
+top_hostgroup:
+user:admin
+pass:admin
+use_multiple_brokers:False
+extension:email-daily
+ack:client-individual
diff --git a/docker/activemq/email_daily_publisher.conf b/docker/activemq/email_daily_publisher.conf
new file mode 100644
index 0000000..7fab7ab
--- /dev/null
+++ b/docker/activemq/email_daily_publisher.conf
@@ -0,0 +1,12 @@
+[client]
+server:activemq
+auth_method:password
+port:61613
+destination:np
+destination_type:queue
+hostgroup_selector:
+host_selector:
+top_hostgroup:
+user:admin
+pass:admin
+use_multiple_brokers:False
diff --git a/notifications_routing/config.py b/notifications_routing/config.py
index b14b276..ec61c70 100644
--- a/notifications_routing/config.py
+++ b/notifications_routing/config.py
@@ -43,6 +43,7 @@ class Config:
         database=os.getenv("DB_NAME"),
     )
     SQLALCHEMY_TRACK_MODIFICATIONS = ast.literal_eval(os.getenv("SQLALCHEMY_TRACK_MODIFICATIONS", "False"))
+    QUERY_BATCH_SIZE = int(os.getenv("QUERY_BATCH_SIZE", 100))
 
     # ActiveMQ
     CONSUMER_NAME = os.getenv("CONSUMER_NAME", "routing_consumer")
@@ -51,6 +52,9 @@ class Config:
     # Push Notifications backend
     DELIVERY_METHODS = {"LIVE", "DAILY"}
     CRITICAL_PRIORITY = "CRITICAL"
+
+    # ----- Jobs ----- #
+    JOB = os.getenv("JOB", "daily-feed")
     DAILY_FEED_TIME = os.getenv("DAILY_FEED_TIME", "09:00")
 
 
diff --git a/notifications_routing/data_source/data_source.py b/notifications_routing/data_source/data_source.py
index 6557327..a284c06 100644
--- a/notifications_routing/data_source/data_source.py
+++ b/notifications_routing/data_source/data_source.py
@@ -1,5 +1,6 @@
 """Abstract Data Source."""
 from abc import ABC, abstractmethod
+from datetime import date, time
 from typing import Dict, List
 
 
@@ -62,3 +63,28 @@ class DataSource(ABC):
         :param user_daily_notification: UserDailyNotificaion object
         """
         pass
+
+    @abstractmethod
+    def get_unique_users_from_daily_notifications(self) -> List[str]:
+        """Specific implementation of get unique users from daily notifications."""
+        pass
+
+    @abstractmethod
+    def get_user_daily_notifications_ids(self, user_id: str, delivery_date: date, delivery_time: time) -> List[str]:
+        """Specific implementation of get user daily notifications ids for specific delivery date and time.
+
+        :param user_id: User id
+        :param delivery_date: date
+        :param delivery_time: time
+        """
+        pass
+
+    @abstractmethod
+    def delete_user_daily_notifications(self, user_id: str, delivery_date: date, delivery_time: time) -> None:
+        """Specific implementation of delete user daily notifications.
+
+        :param user_id: User id
+        :param delivery_date: date
+        :param delivery_time: time
+        """
+        pass
diff --git a/notifications_routing/data_source/postgres/postgres_data_source.py b/notifications_routing/data_source/postgres/postgres_data_source.py
index e6f8fb6..708839c 100644
--- a/notifications_routing/data_source/postgres/postgres_data_source.py
+++ b/notifications_routing/data_source/postgres/postgres_data_source.py
@@ -2,6 +2,7 @@
 import logging
 import uuid
 from contextlib import contextmanager
+from datetime import date, time
 from typing import Dict, List
 
 from sqlalchemy import Boolean, Column, Date, ForeignKey, String, Table, Time, create_engine, or_
@@ -145,6 +146,35 @@ class PostgresDataSource(DataSource):
         with self.session() as session:
             session.add(user_daily_notification)
 
+    def get_unique_users_from_daily_notifications(self) -> List[UUID]:
+        """Get unique users from daily notifications."""
+        with self.session() as session:
+            return session.query(UserDailyNotification.userId).distinct().yield_per(Config.QUERY_BATCH_SIZE)
+
+    def get_user_daily_notifications_ids(self, user_id: UUID, delivery_date: date, delivery_time: time) -> List[str]:
+        """Get user daily notifications ids for specific delivery date and time."""
+        with self.session() as session:
+            ids = session.query(UserDailyNotification.notificationId).filter(
+                UserDailyNotification.userId == user_id,
+                UserDailyNotification.date == delivery_date,
+                UserDailyNotification.time == delivery_time,
+            )
+
+            return [str(notificationId) for notificationId, in ids]
+
+    def delete_user_daily_notifications(self, user_id: UUID, delivery_date: date, delivery_time: time) -> None:
+        """Delete user daily notifications."""
+        with self.session() as session:
+            return (
+                session.query(UserDailyNotification)
+                .filter(
+                    UserDailyNotification.userId == user_id,
+                    UserDailyNotification.date == delivery_date,
+                    UserDailyNotification.time == delivery_time,
+                )
+                .delete()
+            )
+
 
 class User(PostgresDataSource.Base):
     """User Model."""
@@ -256,10 +286,10 @@ class UserDailyNotification(PostgresDataSource.Base):
     date = Column(Date, primary_key=True)
     time = Column(Time, primary_key=True)
 
-    def __init__(self, userId, notificationId, date, time) -> None:
+    def __init__(self, user_id, notification_id, notification_date, notification_time) -> None:
         """Initialize UserDailyNotification."""
         super().__init__()
-        self.userId = userId
-        self.notificationId = notificationId
-        self.date = date
-        self.time = time
+        self.userId = user_id
+        self.notificationId = notification_id
+        self.date = notification_date
+        self.time = notification_time
diff --git a/notifications_routing/job.py b/notifications_routing/job.py
new file mode 100644
index 0000000..c7429e4
--- /dev/null
+++ b/notifications_routing/job.py
@@ -0,0 +1,50 @@
+"""Notifications Generic Job Runner definition and creation."""
+import logging
+import logging.config
+
+import yaml
+
+from notifications_routing.config import load_config
+from notifications_routing.jobs.registry import JobRegistry
+
+
+class JobRunner:
+    """Notifications Job Runner."""
+
+    def __init__(self, config):
+        """Initialize the job runner class."""
+        self.config = config
+
+    def run(self):
+        """Init all dependencies and run the job."""
+        job = JobRegistry.registry[self.config.JOB]
+
+        logging.info("Finished initialising")
+        job.run()
+        logging.info("Finished running")
+
+
+def configure_logging(config):
+    """Configure logs."""
+    with open(config.LOGGING_CONFIG_FILE, "r") as file:
+        config = yaml.safe_load(file.read())
+        logging.config.dictConfig(config)
+
+
+def create_job():
+    """Create a new JobRunner Class."""
+    config = load_config()
+    if config.SENTRY_DSN:
+        import sentry_sdk
+
+        sentry_sdk.init(dsn=config.SENTRY_DSN)
+
+    configure_logging(config)
+
+    return JobRunner(config)
+
+
+def run():
+    """Run job."""
+    app = create_job()
+    app.run()
diff --git a/notifications_routing/jobs/__init__.py b/notifications_routing/jobs/__init__.py
new file mode 100644
index 0000000..e6ef714
--- /dev/null
+++ b/notifications_routing/jobs/__init__.py
@@ -0,0 +1,6 @@
+"""Import jobs.
+
+Required to auto register the job: force interpreter to load them.
+"""
+
+from notifications_routing.jobs.daily_feed.job import DailyFeedJob
diff --git a/notifications_routing/jobs/daily_feed/job.py b/notifications_routing/jobs/daily_feed/job.py
new file mode 100644
index 0000000..17b53a7
--- /dev/null
+++ b/notifications_routing/jobs/daily_feed/job.py
@@ -0,0 +1,57 @@
+"""Daily Feed Job implementation."""
+import json
+import logging
+from datetime import datetime
+
+from notifications_routing.data_source.data_source import DataSource
+from notifications_routing.jobs.job import Job
+from notifications_routing.jobs.registry import JobRegistry
+
+
+@JobRegistry.register
+class DailyFeedJob(Job):
+    """Prepare daily feed email notifications."""
+
+    __id = "daily-feed"
+
+    def __init__(self, **kwargs):
+        """Initialize the job."""
+        super().__init__(**kwargs)
+        self.publisher = kwargs["publisher"]
+        self.data_source = kwargs["data_source"]  # type: DataSource
+        self.config = kwargs["config"]
+
+    def __str__(self):
+        """Return string representation."""
+        return f"Job:{self.id()}:{self.config.DAILY_FEED_TIME}"
+
+    @classmethod
+    def id(cls):
+        """Return the job id."""
+        return cls.__id
+
+    def run(self, **kwargs):
+        """Prepare daily feed and cleanup."""
+        super().run(**kwargs)
+
+        delivery_date = datetime.now().date()
+        delivery_time = datetime.strptime(self.config.DAILY_FEED_TIME, "%H:%M").time()
+        logging.debug("Delivery Date: %s - Time: %s", delivery_date, delivery_time)
+
+        for (user_id,) in self.data_source.get_unique_users_from_daily_notifications():
+            notifications = self.data_source.get_user_daily_notifications_ids(user_id, delivery_date, delivery_time)
+            logging.debug("User: %s -  Notifications: %s", user_id, notifications)
+
+            if not notifications:
+                continue
+
+            self.send_message(message={"user_id": str(user_id), "notifications": notifications})
+            self.data_source.delete_user_daily_notifications(user_id, delivery_date, delivery_time)
+
+    def send_message(self, message: dict) -> None:
+        """Send message to queue.
+
+        :param message: message object
+        """
+        logging.debug("Sending to queue: %s", message)
+        self.publisher.send(json.dumps(message), extension="email_daily", headers={"persistent": "true"})
diff --git a/notifications_routing/jobs/job.py b/notifications_routing/jobs/job.py
new file mode 100644
index 0000000..8c12c5c
--- /dev/null
+++ b/notifications_routing/jobs/job.py
@@ -0,0 +1,33 @@
+"""Notifications routing job."""
+import logging
+from abc import ABC, abstractmethod
+
+
+class Job(ABC):
+    """Notifications routing job base class."""
+
+    def __init__(self, **kwargs):
+        """Initialize the Processor."""
+        super(Job, self).__init__()
+        self.__consumer_name = kwargs["config"].JOB
+
+    def __str__(self):
+        """Return string representation."""
+        return f"Job:{self.id()}"
+
+    @classmethod
+    @abstractmethod
+    def id(cls):
+        """Return the job id."""
+        pass
+
+    @property
+    def consumer_name(self):
+        """Return activeMQ consumer's name."""
+        return self.__consumer_name
+
+    @abstractmethod
+    def run(self, **kwargs):
+        """Run the job."""
+        logging.debug("%s - status:running - kwargs:%r", self, kwargs)
+        pass
diff --git a/notifications_routing/jobs/registry.py b/notifications_routing/jobs/registry.py
new file mode 100644
index 0000000..fcb41e7
--- /dev/null
+++ b/notifications_routing/jobs/registry.py
@@ -0,0 +1,35 @@
+"""Processor automatic registry."""
+import logging
+
+from notifications_routing.config import Config, load_config
+from notifications_routing.data_source.postgres.postgres_data_source import PostgresDataSource
+from notifications_routing.publisher import Publisher
+
+
+class JobRegistry:
+    """Decorator to auto register jobs."""
+
+    registry = {}
+
+    @classmethod
+    def register(cls, register_cls):
+        """Register a job."""
+        config = load_config()
+
+        if register_cls.id() == config.JOB:
+            logging.debug("Register class: %s", register_cls.id())
+            cls.registry[register_cls.id()] = register_cls(**build_kwargs(config))
+
+        return register_cls
+
+
+def build_kwargs(config: Config) -> dict:
+    """Build job kwargs."""
+    kwargs = dict(config=config)
+    if config.PUBLISHER_NAME:
+        kwargs["publisher"] = Publisher(config.PUBLISHER_NAME)
+
+    if config.SQLALCHEMY_DATABASE_URI:
+        kwargs["data_source"] = PostgresDataSource()
+
+    return kwargs
diff --git a/notifications_routing/preferences.py b/notifications_routing/preferences.py
index d5a02ee..59dd1c6 100644
--- a/notifications_routing/preferences.py
+++ b/notifications_routing/preferences.py
@@ -74,9 +74,10 @@ def apply_user_preferences(
     """Specific implementation of apply user's preferences.
 
     :param publisher: Publisher object used to publish messages
+    :param data_source: Data source
     :param delivery_methods_and_targets: Deliver methods and their targets
     :param message: message object
-    :param email: e-mail
+    :param user: User data dict
     """
     for preference in delivery_methods_and_targets:
         logging.debug("Applying User Preferences delivery method: %s", preference.method)
diff --git a/pyproject.toml b/pyproject.toml
index 93d9013..a42dde8 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -44,3 +44,6 @@ freezegun = "^1.1.0"
 [build-system]
 requires = ["poetry-core>=1.0.0"]
 build-backend = "poetry.core.masonry.api"
+
+[tool.poetry.scripts]
+job = 'notifications_routing.job:run'
diff --git a/test/test_utils.py b/test/test_utils.py
index 926ac2f..1b6fddf 100644
--- a/test/test_utils.py
+++ b/test/test_utils.py
@@ -1,8 +1,10 @@
 """Unit Tests for Utils."""
-import unittest
 import datetime
+import unittest
+
 import pytest
 from freezegun import freeze_time
+
 import notifications_routing.utils as utils
 
 
-- 
GitLab