From f9b1d1479eca72c788e9a8ead1d6321c79496e04 Mon Sep 17 00:00:00 2001
From: Dimitra Chatzichrysou <dimitra.chatzichrysou@cern.ch>
Date: Thu, 17 Jun 2021 14:54:22 +0000
Subject: [PATCH] [#46] Implement monthly and weekly job

---
 .env-job-daily-feed                           |   3 +
 .env-job-monthly-feed                         |   5 +
 .env-job-weekly-feed                          |   5 +
 .isort.cfg                                    |   2 +-
 Dockerfile                                    |   6 +-
 docker-compose.job.yml                        |   4 +
 docker/activemq/conf/activemq.xml             |  26 +++++
 docker/activemq/email_monthly_consumer.conf   |  14 +++
 docker/activemq/email_monthly_publisher.conf  |  12 ++
 docker/activemq/email_weekly_consumer.conf    |  14 +++
 docker/activemq/email_weekly_publisher.conf   |  12 ++
 notifications_routing/config.py               |   9 +-
 .../data_source/data_source.py                |  44 +++++--
 .../postgres/postgres_data_source.py          | 109 ++++++++++++------
 notifications_routing/jobs/__init__.py        |   2 +-
 notifications_routing/jobs/daily_feed/job.py  |  63 ----------
 notifications_routing/jobs/feed/job.py        |  67 +++++++++++
 notifications_routing/preferences.py          |  37 +++---
 notifications_routing/utils.py                |  29 ++---
 tests/unit/test_utils.py                      |  36 ------
 20 files changed, 314 insertions(+), 185 deletions(-)
 create mode 100644 .env-job-monthly-feed
 create mode 100644 .env-job-weekly-feed
 create mode 100644 docker/activemq/email_monthly_consumer.conf
 create mode 100644 docker/activemq/email_monthly_publisher.conf
 create mode 100644 docker/activemq/email_weekly_consumer.conf
 create mode 100644 docker/activemq/email_weekly_publisher.conf
 delete mode 100644 notifications_routing/jobs/daily_feed/job.py
 create mode 100644 notifications_routing/jobs/feed/job.py

diff --git a/.env-job-daily-feed b/.env-job-daily-feed
index dec47bb..cd84e3d 100644
--- a/.env-job-daily-feed
+++ b/.env-job-daily-feed
@@ -1,2 +1,5 @@
 PUBLISHER_NAME=email_daily_publisher
 JOB=daily-feed
+SCHEDULED_TIME = 1620198000
+FREQUENCY_TYPE=DAILY
+FEED_QUEUE=email-daily
diff --git a/.env-job-monthly-feed b/.env-job-monthly-feed
new file mode 100644
index 0000000..e3e9dd3
--- /dev/null
+++ b/.env-job-monthly-feed
@@ -0,0 +1,5 @@
+PUBLISHER_NAME=email_monthly_publisher
+JOB=monthly-feed
+SCHEDULED_TIME = 1620198000
+FREQUENCY_TYPE=MONTHLY
+FEED_QUEUE=email-monthly
diff --git a/.env-job-weekly-feed b/.env-job-weekly-feed
new file mode 100644
index 0000000..fa0f1e7
--- /dev/null
+++ b/.env-job-weekly-feed
@@ -0,0 +1,5 @@
+PUBLISHER_NAME=email_weekly_publisher
+JOB=weekly-feed
+SCHEDULED_TIME = 1623769200
+FREQUENCY_TYPE=WEEKLY
+FEED_QUEUE=email-weekly
diff --git a/.isort.cfg b/.isort.cfg
index f4d0ce0..a05ae59 100644
--- a/.isort.cfg
+++ b/.isort.cfg
@@ -4,4 +4,4 @@ multi_line_output=3
 include_trailing_comma=True
 lines_after_imports=2
 not_skip=__init__.py
-known_third_party=dateutil,freezegun,megabus,psycopg2,pytest,requests,sqlalchemy,stomp,yaml
+known_third_party=dateutil,megabus,psycopg2,pytest,requests,sqlalchemy,stomp,yaml
diff --git a/Dockerfile b/Dockerfile
index 19414c0..c27cb00 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -3,7 +3,11 @@ ARG build_env
 
 COPY ./ ./
 
-RUN python -V && poetry config --list
+RUN python -V && \
+    pip -V && \
+    pip install --upgrade pip && \
+    pip install poetry && \
+    poetry config virtualenvs.create false
 
 RUN if [ "$build_env" == "development" ]; \
     then poetry install; \
diff --git a/docker-compose.job.yml b/docker-compose.job.yml
index f6f2ed9..377d3f7 100644
--- a/docker-compose.job.yml
+++ b/docker-compose.job.yml
@@ -14,6 +14,10 @@ services:
       - '.:/opt:delegated'
       - './docker/activemq/email_daily_publisher.conf:/etc/activemq-publisher-email_daily_publisher.conf'
       - './docker/activemq/email_daily_consumer.conf:/etc/activemq-consumer-email_daily_consumer.conf'
+      - './docker/activemq/email_weekly_publisher.conf:/etc/activemq-publisher-email_weekly_publisher.conf'
+      - './docker/activemq/email_weekly_consumer.conf:/etc/activemq-consumer-email_weekly_consumer.conf'
+      - './docker/activemq/email_monthly_publisher.conf:/etc/activemq-publisher-email_monthly_publisher.conf'
+      - './docker/activemq/email_monthly_consumer.conf:/etc/activemq-consumer-email_monthly_consumer.conf'
     env_file:
       - .env
       - .env-job-daily-feed
diff --git a/docker/activemq/conf/activemq.xml b/docker/activemq/conf/activemq.xml
index 13d7e58..7afa1e0 100644
--- a/docker/activemq/conf/activemq.xml
+++ b/docker/activemq/conf/activemq.xml
@@ -75,6 +75,22 @@
                                     maximumRedeliveryDelay="-1"
                                     useExponentialBackOff="true"
                             />
+                            <redeliveryPolicy
+                                    queue="np.email-weekly"
+                                    maximumRedeliveries="6"
+                                    initialRedeliveryDelay="10000"
+                                    backOffMultiplier="3"
+                                    maximumRedeliveryDelay="-1"
+                                    useExponentialBackOff="true"
+                            />
+                            <redeliveryPolicy
+                                    queue="np.email-monthly"
+                                    maximumRedeliveries="6"
+                                    initialRedeliveryDelay="10000"
+                                    backOffMultiplier="3"
+                                    maximumRedeliveryDelay="-1"
+                                    useExponentialBackOff="true"
+                            />
                         </redeliveryPolicyEntries>
                     </redeliveryPolicyMap>
                 </redeliveryPolicyMap>
@@ -126,6 +142,16 @@
                     <individualDeadLetterStrategy queuePrefix="" queueSuffix=".dlq" useQueueForQueueMessages="true"/>
                   </deadLetterStrategy>
                 </policyEntry>
+                <policyEntry queue="np.email-weekly">
+                  <deadLetterStrategy>
+                    <individualDeadLetterStrategy queuePrefix="" queueSuffix=".dlq" useQueueForQueueMessages="true"/>
+                  </deadLetterStrategy>
+                </policyEntry>
+                <policyEntry queue="np.email-monthly">
+                  <deadLetterStrategy>
+                    <individualDeadLetterStrategy queuePrefix="" queueSuffix=".dlq" useQueueForQueueMessages="true"/>
+                  </deadLetterStrategy>
+                </policyEntry>
               </policyEntries>
             </policyMap>
         </destinationPolicy>
diff --git a/docker/activemq/email_monthly_consumer.conf b/docker/activemq/email_monthly_consumer.conf
new file mode 100644
index 0000000..bc35895
--- /dev/null
+++ b/docker/activemq/email_monthly_consumer.conf
@@ -0,0 +1,14 @@
+[client]
+server:activemq
+auth_method:password
+port:61613
+destination:np
+destination_type:queue
+hostgroup_selector:
+host_selector:
+top_hostgroup:
+user:admin
+pass:admin
+use_multiple_brokers:False
+extension:email-monthly
+ack:client-individual
diff --git a/docker/activemq/email_monthly_publisher.conf b/docker/activemq/email_monthly_publisher.conf
new file mode 100644
index 0000000..7fab7ab
--- /dev/null
+++ b/docker/activemq/email_monthly_publisher.conf
@@ -0,0 +1,12 @@
+[client]
+server:activemq
+auth_method:password
+port:61613
+destination:np
+destination_type:queue
+hostgroup_selector:
+host_selector:
+top_hostgroup:
+user:admin
+pass:admin
+use_multiple_brokers:False
diff --git a/docker/activemq/email_weekly_consumer.conf b/docker/activemq/email_weekly_consumer.conf
new file mode 100644
index 0000000..ff071e4
--- /dev/null
+++ b/docker/activemq/email_weekly_consumer.conf
@@ -0,0 +1,14 @@
+[client]
+server:activemq
+auth_method:password
+port:61613
+destination:np
+destination_type:queue
+hostgroup_selector:
+host_selector:
+top_hostgroup:
+user:admin
+pass:admin
+use_multiple_brokers:False
+extension:email-weekly
+ack:client-individual
diff --git a/docker/activemq/email_weekly_publisher.conf b/docker/activemq/email_weekly_publisher.conf
new file mode 100644
index 0000000..7fab7ab
--- /dev/null
+++ b/docker/activemq/email_weekly_publisher.conf
@@ -0,0 +1,12 @@
+[client]
+server:activemq
+auth_method:password
+port:61613
+destination:np
+destination_type:queue
+hostgroup_selector:
+host_selector:
+top_hostgroup:
+user:admin
+pass:admin
+use_multiple_brokers:False
diff --git a/notifications_routing/config.py b/notifications_routing/config.py
index 9196d50..3ec4e7c 100644
--- a/notifications_routing/config.py
+++ b/notifications_routing/config.py
@@ -1,7 +1,6 @@
 """Configuration definitions."""
 import ast
 import os
-from datetime import datetime
 
 
 ENV_DEV = "development"
@@ -53,13 +52,13 @@ class Config:
     TTL = int(os.getenv("TTL", 172800))
 
     # Push Notifications backend
-    DELIVERY_METHODS = {"LIVE", "DAILY"}
+    FEED_METHODS = {"DAILY", "WEEKLY", "MONTHLY"}
     CRITICAL_PRIORITY = "CRITICAL"
 
     # ----- Jobs ----- #
-    JOB = os.getenv("JOB", "daily-feed")
-    DAILY_FEED_TIME = os.getenv("DAILY_FEED_TIME", datetime.now().strftime("%H:%M"))
-    DEFAULT_DAILY_FEED_TIME = os.getenv("DEFAULT_DAILY_FEED_TIME", "09:00")
+    SCHEDULED_TIME = os.getenv("SCHEDULED_TIME")
+    FREQUENCY_TYPE = os.getenv("FREQUENCY_TYPE", "DAILY")
+    FEED_QUEUE = os.getenv("FEED_QUEUE", "email-daily")
 
 
 class DevelopmentConfig(Config):
diff --git a/notifications_routing/data_source/data_source.py b/notifications_routing/data_source/data_source.py
index ce4bba3..7b62a2b 100644
--- a/notifications_routing/data_source/data_source.py
+++ b/notifications_routing/data_source/data_source.py
@@ -67,33 +67,55 @@ class DataSource(ABC):
         pass
 
     @abstractmethod
-    def create_daily_user_notification(self, user_daily_notification: object) -> None:
-        """Specific implementation of save daily user notification.
+    def create_feed_user_notification(self, user_feed_notification: object) -> None:
+        """Specific implementation of save feed user notification.
 
-        :param user_daily_notification: UserDailyNotification object
+        :param user_feed_notification: UserFeedNotification object
         """
         pass
 
     @abstractmethod
-    def get_unique_users_from_daily_notifications(self, delivery_datetime: datetime) -> List[str]:
-        """Specific implementation of get unique users from daily notifications."""
+    def get_unique_users_from_feed_notifications(
+        self, runtime_datetime: datetime, runtime_time: datetime, runtime_day_of_week: int, frequency_type: str
+    ) -> List[str]:
+        """Specific implementation of get unique users from feed notifications for specific datetime and frequency."""
         pass
 
     @abstractmethod
-    def get_user_daily_notifications_ids(self, user_id: str, delivery_datetime: datetime) -> List[str]:
-        """Specific implementation of get user daily notifications ids for specific delivery datetime.
+    def get_user_feed_notifications_ids(
+        self,
+        user_id: str,
+        runtime_datetime: datetime,
+        runtime_time: datetime,
+        runtime_day_of_week: int,
+        frequency_type: str,
+    ) -> List[str]:
+        """Specific implementation of get user feed notifications ids for specific delivery datetime and frequency.
 
         :param user_id: User id
-        :param delivery_datetime: datetime
+        :param runtime_datetime: datetime
+        :param runtime_time: datetime
+        :param runtime_day_of_week: day of the week
+        :param frequency_type: frequency type
         """
         pass
 
     @abstractmethod
-    def delete_user_daily_notifications(self, user_id: str, delivery_datetime: datetime) -> None:
-        """Specific implementation of delete user daily notifications.
+    def delete_user_feed_notifications(
+        self,
+        user_id: str,
+        runtime_datetime: datetime,
+        runtime_time: datetime,
+        runtime_day_of_week: int,
+        frequency_type: str,
+    ) -> None:
+        """Specific implementation of delete user feed notifications.
 
         :param user_id: User id
-        :param delivery_datetime: datetime
+        :param runtime_datetime: datetime
+        :param runtime_time: datetime
+        :param runtime_day_of_week: day of the week
+        :param frequency_type: frequency type
         """
         pass
 
diff --git a/notifications_routing/data_source/postgres/postgres_data_source.py b/notifications_routing/data_source/postgres/postgres_data_source.py
index e6b0026..d115de9 100644
--- a/notifications_routing/data_source/postgres/postgres_data_source.py
+++ b/notifications_routing/data_source/postgres/postgres_data_source.py
@@ -12,6 +12,7 @@ from sqlalchemy import (
     Date,
     DateTime,
     ForeignKey,
+    Integer,
     MetaData,
     String,
     Table,
@@ -30,6 +31,7 @@ from notifications_routing.authorization_service import get_group_users_api
 from notifications_routing.config import Config
 from notifications_routing.data_source.data_source import DataSource
 from notifications_routing.exceptions import DuplicatedError, MultipleResultsFoundError, NotFoundDataSourceError
+from notifications_routing.utils import FeedFrequency
 
 
 class PostgresDataSource(DataSource):
@@ -203,51 +205,68 @@ class PostgresDataSource(DataSource):
                 .exists()
             ).scalar()
 
-    def create_daily_user_notification(self, user_daily_notification):
-        """Save a DailyUserNotification Object to Database."""
+    def create_feed_user_notification(self, user_feed_notification):
+        """Save a FeedUserNotification Object to Database."""
         try:
             with self.session() as session:
-                session.add(user_daily_notification)
+                session.add(user_feed_notification)
         except IntegrityError as e:
             if isinstance(e.orig, errors.UniqueViolation):
-                raise DuplicatedError(UserDailyNotification.__tablename__, e)
+                raise DuplicatedError(UserFeedNotification.__tablename__, e)
             else:
                 raise
 
-    def get_unique_users_from_daily_notifications(self, delivery_datetime: datetime) -> List[UUID]:
-        """Get unique users from daily notifications for specific delivery datetime."""
+    def get_unique_users_from_feed_notifications(
+        self, runtime_datetime, runtime_time, runtime_day_of_week, frequency_type
+    ) -> List[UUID]:
+        """Get unique users from weekly notifications for specific delivery datetime and frequency."""
         with self.session() as session:
-            return (
-                session.query(UserDailyNotification.userId)
-                .filter(
-                    UserDailyNotification.datetime <= delivery_datetime,
-                )
-                .distinct()
-                .yield_per(Config.QUERY_BATCH_SIZE)
+            user_ids = session.query(UserFeedNotification.userId).filter(
+                UserFeedNotification.frequencyType == frequency_type,
+                UserFeedNotification.creationDatetime <= runtime_datetime,
+                UserFeedNotification.deliveryTime == runtime_time,
             )
 
-    def get_user_daily_notifications_ids(self, user_id: UUID, delivery_datetime: datetime) -> List[str]:
-        """Get user daily notifications ids for specific delivery datetime."""
+            if not frequency_type == FeedFrequency.DAILY:
+                user_ids = user_ids.filter(UserFeedNotification.deliveryDayOfTheWeek == runtime_day_of_week)
+
+            return user_ids.distinct().yield_per(Config.QUERY_BATCH_SIZE)
+
+    def get_user_feed_notifications_ids(
+        self, user_id: UUID, runtime_datetime, runtime_time, runtime_day_of_week, frequency_type
+    ) -> List[str]:
+        """Get user feed notifications ids for specific delivery datetime and frequency."""
         with self.session() as session:
-            ids = session.query(UserDailyNotification.notificationId).filter(
-                UserDailyNotification.userId == user_id,
-                UserDailyNotification.datetime <= delivery_datetime,
+            ids = session.query(UserFeedNotification.notificationId).filter(
+                UserFeedNotification.userId == user_id,
+                UserFeedNotification.frequencyType == frequency_type,
+                UserFeedNotification.creationDatetime <= runtime_datetime,
+                UserFeedNotification.deliveryTime == runtime_time,
             )
 
+            if not frequency_type == FeedFrequency.DAILY:
+                ids = ids.filter(UserFeedNotification.deliveryDayOfTheWeek == runtime_day_of_week)
+
             return [str(notificationId) for notificationId, in ids]
 
-    def delete_user_daily_notifications(self, user_id: UUID, delivery_datetime: datetime) -> None:
-        """Delete user daily notifications."""
+    def delete_user_feed_notifications(
+        self, user_id: UUID, runtime_datetime, runtime_time, runtime_day_of_week, frequency_type
+    ) -> None:
+        """Delete user feed notifications."""
         with self.session() as session:
-            return (
-                session.query(UserDailyNotification)
-                .filter(
-                    UserDailyNotification.userId == user_id,
-                    UserDailyNotification.datetime <= delivery_datetime,
-                )
-                .delete()
+            user_feed_notifications = session.query(UserFeedNotification).filter(
+                UserFeedNotification.frequencyType == frequency_type,
+                UserFeedNotification.creationDatetime <= runtime_datetime,
+                UserFeedNotification.deliveryTime == runtime_time,
             )
 
+            if not frequency_type == FeedFrequency.DAILY:
+                user_feed_notifications = user_feed_notifications.filter(
+                    UserFeedNotification.deliveryDayOfTheWeek == runtime_day_of_week
+                )
+
+            return user_feed_notifications.delete()
+
     def get_system_user(self, username: str, **kwargs) -> Dict[str, str]:
         """Return a dict of user_id and email of user."""
         with self.session() as session:
@@ -342,6 +361,7 @@ class Preference(PostgresDataSource.Base):
     rangeStart = Column(Time)
     rangeEnd = Column(Time)
     scheduledTime = Column(Time)
+    scheduledDay = Column(Integer)
     devices = relationship("Device", secondary=preferences_devices, lazy="subquery")
     disabledChannels = relationship("Channel", secondary=preferences_disabled_channels, lazy="subquery")
 
@@ -382,18 +402,39 @@ class Group(PostgresDataSource.Base):
     groupIdentifier = Column(String)
 
 
-class UserDailyNotification(PostgresDataSource.Base):
+class Notification(PostgresDataSource.Base):
+    """Notification Model."""
+
+    __tablename__ = "Notifications"
+    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
+
+
+class UserFeedNotification(PostgresDataSource.Base):
     """UserDailyNotification Model."""
 
-    __tablename__ = "UserDailyNotifications"
+    __tablename__ = "UserFeedNotifications"
 
     userId = Column(UUID(as_uuid=True), ForeignKey("Users.id"), primary_key=True)
-    notificationId = Column(UUID(as_uuid=True), ForeignKey("Users.id"), primary_key=True)
-    datetime = Column(DateTime, index=True)
-
-    def __init__(self, user_id, notification_id, notification_datetime) -> None:
+    notificationId = Column(UUID(as_uuid=True), ForeignKey("Notifications.id"), primary_key=True)
+    frequencyType = Column(String, primary_key=True)
+    creationDatetime = Column(DateTime, index=True)
+    deliveryTime = Column(Time, nullable=True)
+    deliveryDayOfTheWeek = Column(Integer, nullable=True)
+
+    def __init__(
+        self,
+        user_id,
+        notification_id,
+        notification_frequency_type,
+        notification_creation_datetime,
+        notification_delivery_time,
+        notification_delivery_day_of_the_week,
+    ) -> None:
         """Initialize UserDailyNotification."""
         super().__init__()
         self.userId = user_id
         self.notificationId = notification_id
-        self.datetime = notification_datetime
+        self.frequencyType = notification_frequency_type
+        self.creationDatetime = notification_creation_datetime
+        self.deliveryTime = notification_delivery_time
+        self.deliveryDayOfTheWeek = notification_delivery_day_of_the_week
diff --git a/notifications_routing/jobs/__init__.py b/notifications_routing/jobs/__init__.py
index e6ef714..9937245 100644
--- a/notifications_routing/jobs/__init__.py
+++ b/notifications_routing/jobs/__init__.py
@@ -3,4 +3,4 @@
 Required to auto register the job: force interpreter to load them.
 """
 
-from notifications_routing.jobs.daily_feed.job import DailyFeedJob
+from notifications_routing.jobs.feed.job import FeedJob
diff --git a/notifications_routing/jobs/daily_feed/job.py b/notifications_routing/jobs/daily_feed/job.py
deleted file mode 100644
index c33a23a..0000000
--- a/notifications_routing/jobs/daily_feed/job.py
+++ /dev/null
@@ -1,63 +0,0 @@
-"""Daily Feed Job implementation."""
-import json
-import logging
-from datetime import datetime
-
-from notifications_routing.config import Config
-from notifications_routing.data_source.data_source import DataSource
-from notifications_routing.jobs.job import Job
-from notifications_routing.jobs.registry import JobRegistry
-
-
-@JobRegistry.register
-class DailyFeedJob(Job):
-    """Prepare daily feed email notifications."""
-
-    __id = "daily-feed"
-
-    def __init__(self, **kwargs):
-        """Initialize the job."""
-        super().__init__(**kwargs)
-        self.publisher = kwargs["publisher"]
-        self.data_source = kwargs["data_source"]  # type: DataSource
-        self.config = kwargs["config"]
-
-    def __str__(self):
-        """Return string representation."""
-        return f"Job:{self.id()}:{self.config.DAILY_FEED_TIME}"
-
-    @classmethod
-    def id(cls):
-        """Return the job id."""
-        return cls.__id
-
-    def run(self, **kwargs):
-        """Prepare daily feed and cleanup."""
-        super().run(**kwargs)
-
-        delivery_time = datetime.strptime(self.config.DAILY_FEED_TIME, "%H:%M").time()
-        delivery_datetime = datetime.now().replace(
-            hour=delivery_time.hour, minute=delivery_time.minute, second=0, microsecond=0
-        )
-
-        logging.debug("Delivery Datetime: %s", delivery_datetime)
-
-        for (user_id,) in self.data_source.get_unique_users_from_daily_notifications(delivery_datetime):
-            notifications = self.data_source.get_user_daily_notifications_ids(user_id, delivery_datetime)
-            logging.debug("User: %s -  Notifications: %s", user_id, notifications)
-
-            if not notifications:
-                continue
-
-            self.send_message(message={"user_id": str(user_id), "notifications": notifications})
-            self.data_source.delete_user_daily_notifications(user_id, delivery_datetime)
-
-    def send_message(self, message: dict) -> None:
-        """Send message to queue.
-
-        :param message: message object
-        """
-        logging.debug("Sending to queue: %s", message)
-        self.publisher.send(
-            json.dumps(message), extension="email-daily", ttl=Config.TTL, headers={"persistent": "true"}
-        )
diff --git a/notifications_routing/jobs/feed/job.py b/notifications_routing/jobs/feed/job.py
new file mode 100644
index 0000000..7afe389
--- /dev/null
+++ b/notifications_routing/jobs/feed/job.py
@@ -0,0 +1,67 @@
+"""Feed Job implementation."""
+import json
+import logging
+from datetime import datetime
+
+from notifications_routing.config import Config
+from notifications_routing.data_source.data_source import DataSource
+from notifications_routing.jobs.job import Job
+from notifications_routing.jobs.registry import JobRegistry
+
+
+@JobRegistry.register
+class FeedJob(Job):
+    """Prepare daily feed email notifications."""
+
+    __id = "feed"
+
+    def __init__(self, **kwargs):
+        """Initialize the job."""
+        super().__init__(**kwargs)
+        self.publisher = kwargs["publisher"]
+        self.data_source = kwargs["data_source"]  # type: DataSource
+        self.config = kwargs["config"]
+
+    def __str__(self):
+        """Return string representation."""
+        return f"Job:{self.id()}:{self.config.SCHEDULED_TIME}"
+
+    @classmethod
+    def id(cls):
+        """Return the job id."""
+        return cls.__id
+
+    def run(self, **kwargs):
+        """Prepare feed and cleanup."""
+        super().run(**kwargs)
+
+        runtime_datetime = datetime.fromtimestamp(int(Config.SCHEDULED_TIME))
+        runtime_time = runtime_datetime.time()
+        runtime_day_of_week = runtime_datetime.weekday()
+
+        logging.debug("Delivery Datetime: %s", runtime_datetime)
+
+        for (user_id,) in self.data_source.get_unique_users_from_feed_notifications(
+            runtime_datetime, runtime_time, runtime_day_of_week, Config.FREQUENCY_TYPE
+        ):
+            notifications = self.data_source.get_user_feed_notifications_ids(
+                user_id, runtime_datetime, runtime_time, runtime_day_of_week, Config.FREQUENCY_TYPE
+            )
+            logging.debug("User: %s -  Notifications: %s", user_id, notifications)
+            if not notifications:
+                continue
+
+            self.send_message(
+                message={"user_id": str(user_id), "notifications": notifications}, queue=Config.FEED_QUEUE
+            )
+            self.data_source.delete_user_feed_notifications(
+                user_id, runtime_datetime, runtime_time, runtime_day_of_week, Config.FREQUENCY_TYPE
+            )
+
+    def send_message(self, message: dict, queue: str) -> None:
+        """Send message to queue.
+
+        :param message: message object
+        """
+        logging.debug("Sending to queue: %s", message)
+        self.publisher.send(json.dumps(message), extension=queue, headers={"persistent": "true"})
diff --git a/notifications_routing/preferences.py b/notifications_routing/preferences.py
index 0427d4a..009a969 100644
--- a/notifications_routing/preferences.py
+++ b/notifications_routing/preferences.py
@@ -7,7 +7,7 @@ import megabus
 
 from notifications_routing.config import Config
 from notifications_routing.data_source.data_source import DataSource
-from notifications_routing.data_source.postgres.postgres_data_source import UserDailyNotification
+from notifications_routing.data_source.postgres.postgres_data_source import UserFeedNotification
 from notifications_routing.exceptions import DuplicatedError
 
 from .publisher import Publisher
@@ -15,7 +15,6 @@ from .utils import (
     OutputMessageKeys,
     convert_notification_email_to_json_string,
     convert_notification_push_to_json_string,
-    generate_daily_preference_delivery_datetime,
     is_created_time_between_allowed_range,
 )
 
@@ -26,11 +25,12 @@ class TargetPreference:
     Holds method and devices together.
     """
 
-    def __init__(self, method, devices, scheduledTime):
+    def __init__(self, method, devices, scheduledTime, scheduledDay):
         """Initialize the Target Preference."""
         self.method = method
         self.devices = devices
         self.scheduledTime = scheduledTime
+        self.scheduledDay = scheduledDay
 
 
 def get_delivery_methods_and_targets(
@@ -63,27 +63,36 @@ def get_delivery_methods_and_targets(
             logging.debug("\tUser Preferences devices: %s", device.name)
 
         delivery_methods_and_targets.add(
-            TargetPreference(preference.type, preference.devices, preference.scheduledTime)
+            TargetPreference(preference.type, preference.devices, preference.scheduledTime, preference.scheduledDay)
         )
 
     return delivery_methods_and_targets
 
 
-def create_daily_notification(data_source: DataSource, message: Dict, user: Dict, scheduled_time: datetime):
-    """Create a daily notification for the user and handle duplicated preferences from users.
+def create_feed_notification(
+    data_source: DataSource, message: Dict, user: Dict, frequency_type: str, delivery_time: datetime, delivery_day: int
+):
+    """Create a feed notification for the user and handle duplicated preferences from users.
 
     :param data_source: Data source
     :param message: message object
     :param user: User data dict
-    :param scheduled_time: datetime object
+    :param frequency_type: delivery method string
+    :param delivery_time: datetime object
+    :param delivery_day: delivery day int
     """
-    delivery_datetime = generate_daily_preference_delivery_datetime(scheduled_time)
-    user_daily_notification = UserDailyNotification(
-        user[data_source.USER_ID], message[OutputMessageKeys.ID], delivery_datetime
+    logging.info("Delivery Method %s", frequency_type)
+    user_feed_notification = UserFeedNotification(
+        user[data_source.USER_ID],
+        message[OutputMessageKeys.ID],
+        frequency_type,
+        message[OutputMessageKeys.CREATED_TIMESTAMP],
+        delivery_time,
+        delivery_day,
     )
 
     try:
-        data_source.create_daily_user_notification(user_daily_notification)
+        data_source.create_feed_user_notification(user_feed_notification)
     except DuplicatedError:
         logging.info("Skipping duplicated daily feed preference for user %s", user[data_source.USER_ID])
 
@@ -105,8 +114,10 @@ def apply_user_preferences(
     """
     for preference in delivery_methods_and_targets:
         logging.debug("Applying User Preferences delivery method: %s", preference.method)
-        if preference.method == "DAILY":
-            create_daily_notification(data_source, message, user, preference.scheduledTime)
+        if preference.method in Config.FEED_METHODS:
+            create_feed_notification(
+                data_source, message, user, preference.method, preference.scheduledTime, preference.scheduledDay
+            )
             continue
 
         if preference.method == "LIVE":
diff --git a/notifications_routing/utils.py b/notifications_routing/utils.py
index e345bee..7fa7a60 100644
--- a/notifications_routing/utils.py
+++ b/notifications_routing/utils.py
@@ -1,12 +1,10 @@
 """Generic utils."""
 import json
-from datetime import datetime, timedelta
+from datetime import datetime
 from enum import Enum
 
 from dateutil import tz
 
-from notifications_routing.config import Config
-
 
 class StrEnum(str, Enum):
     """Enum where members are also (and must be) strings."""
@@ -46,6 +44,14 @@ class OutputMessageKeys(StrEnum):
     ID = "id"
 
 
+class FeedFrequency(StrEnum):
+    """Feed frequency types."""
+
+    DAILY = "DAILY"
+    WEEKLY = "WEEKLY"
+    MONTHLY = "MONTHLY"
+
+
 def is_time_between(time, start_range, end_range):
     """Check if time is between a range.
 
@@ -122,23 +128,6 @@ def is_created_time_between_allowed_range(created_timestamp, range_start, range_
     return is_time_between(created_time, range_start, range_end)
 
 
-def generate_daily_preference_delivery_datetime(daily_feed_time: datetime or None):
-    """Generate a delivery date for notification based on daily feed time."""
-    if not daily_feed_time:
-        daily_feed_time = datetime.strptime(Config.DEFAULT_DAILY_FEED_TIME, "%H:%M")
-
-    delivery_datetime = datetime.now()
-
-    daily_feed_datetime = delivery_datetime.replace(
-        hour=daily_feed_time.hour, minute=daily_feed_time.minute, second=0, microsecond=0
-    )
-
-    if delivery_datetime > daily_feed_datetime:
-        daily_feed_datetime += timedelta(days=1)
-
-    return daily_feed_datetime
-
-
 def convert_timestamp_to_local_timezone(created_timestamp):
     """Convert timestamp from utc to local timezone."""
     created_timestamp_at_utc = datetime.strptime(created_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=tz.tzutc())
diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py
index e3293f1..f0e0012 100644
--- a/tests/unit/test_utils.py
+++ b/tests/unit/test_utils.py
@@ -1,48 +1,12 @@
 """Unit Tests for Utils."""
 import unittest
-from datetime import datetime, timedelta
-from unittest import mock
-
-import pytest
-from freezegun import freeze_time
 
 import notifications_routing.utils as utils
-from notifications_routing.config import Config
 
 
 class TestUtils(unittest.TestCase):
     """TestCase Class for Utils Unit Tests."""
 
-    @freeze_time("2012-01-01")
-    def test_generate_daily_preference_delivery_date_before_now(self):
-        """Test that notification processing datetime is before deliver datetime."""
-        test_time = datetime.strptime("13:00", "%H:%M")
-        now = datetime(2012, 1, 1, 1, 0)
-        delivery_date = utils.generate_daily_preference_delivery_datetime(test_time)
-        self.assertTrue(delivery_date - now > timedelta(seconds=1))
-
-    @freeze_time("2012-01-01 13:05")
-    def test_generate_daily_preference_delivery_date_after_now(self):
-        """Test that notification processing datetime is after deliver datetime."""
-        test_time = datetime.strptime("13:00", "%H:%M")
-        now = datetime(2012, 1, 1, 1, 0)
-        delivery_date = utils.generate_daily_preference_delivery_datetime(test_time)
-        self.assertTrue(delivery_date - now > timedelta(seconds=1))
-
-    @freeze_time("2012-01-01 13:05")
-    def test_generate_daily_preference_delivery_date_none_input(self):
-        """Test null input for generate_daily_preference_delivery_datetime."""
-        test_time = None
-        now = datetime(2012, 1, 1, 1, 0)
-        delivery_date = utils.generate_daily_preference_delivery_datetime(test_time)
-        self.assertTrue(delivery_date - now > timedelta(seconds=1))
-
-    @mock.patch.object(Config, "DEFAULT_DAILY_FEED_TIME", "assignment", "99:00")
-    def test_generate_daily_preference_delivery_date_Wrong_format_input(self):
-        """Test wrong input format for generate_daily_preference_delivery_datetime."""
-        with pytest.raises(ValueError):
-            utils.generate_daily_preference_delivery_datetime(None)
-
     def test_convert_timestamp_to_local_timezone(self):
         """Test that timestamp is converted from utc to local timezone."""
         self.assertEqual(
-- 
GitLab