From 1a4af2c51c67903856305c9aaf84e5a83e6d80bb Mon Sep 17 00:00:00 2001
From: Emmanuel Ormancey <emmanuel.ormancey@cern.ch>
Date: Wed, 3 Nov 2021 17:10:21 +0100
Subject: [PATCH] [#58] Targeted Notifications

---
 .../data_source/data_source.py                | 10 +++
 .../postgres/postgres_data_source.py          | 44 ++++++++++++
 notifications_routing/router.py               | 70 ++++++++++++++-----
 notifications_routing/utils.py                |  8 +++
 scripts/send-targeted.py                      | 20 ++++++
 5 files changed, 135 insertions(+), 17 deletions(-)
 create mode 100644 scripts/send-targeted.py

diff --git a/notifications_routing/data_source/data_source.py b/notifications_routing/data_source/data_source.py
index 7b62a2b..8a22331 100644
--- a/notifications_routing/data_source/data_source.py
+++ b/notifications_routing/data_source/data_source.py
@@ -134,3 +134,13 @@ class DataSource(ABC):
         :param channel_id: Channel ID
         """
         pass
+
+    @abstractmethod
+    def get_target_users(self, channel_id: str, specified_target_users: List[str], **kwargs) -> List[Dict[str, str]]:
+        """Return a list of dictionaries with user_ids and emails of channel_users."""
+        pass
+
+    @abstractmethod
+    def get_target_groups(self, channel_id: str, specified_target_groups: List[str], **kwargs) -> List[Dict[str, str]]:
+        """Return a list of dictionaries with group_ids of a channel."""
+        pass
diff --git a/notifications_routing/data_source/postgres/postgres_data_source.py b/notifications_routing/data_source/postgres/postgres_data_source.py
index 48b8bed..6b8a090 100644
--- a/notifications_routing/data_source/postgres/postgres_data_source.py
+++ b/notifications_routing/data_source/postgres/postgres_data_source.py
@@ -91,6 +91,32 @@ class PostgresDataSource(DataSource):
             unsubscribed_ids = [user.id for user in channel.unsubscribed]
             return [build_user(member) for member in channel.members if member.id not in unsubscribed_ids]
 
+    def get_target_users(self, channel_id: str, specified_target_users: List[str], **kwargs) -> List[Dict[str, str]]:
+        """Return a list of dictionaries with user_ids and emails of channel_users."""
+
+        def build_user(user):
+            return {
+                DataSource.USER_ID: user.id,
+                DataSource.USERNAME: user.username,
+                DataSource.EMAIL: user.email,
+                DataSource.LAST_LOGIN: user.lastLogin,
+            }
+
+        with self.session() as session:
+            channel = self.__get_scalar(session, Channel, id=channel_id, deleteDate=None)
+            if not channel:
+                raise NotFoundDataSourceError(Channel, id=channel_id)
+
+            unsubscribed_ids = [user.id for user in channel.unsubscribed]
+            # Comes as string from the queue, convert to UUID for proper DB comparison
+            targeted_ids = [uuid.UUID(user_id) for user_id in specified_target_users]
+
+            return [
+                build_user(member)
+                for member in channel.members
+                if (member.id in targeted_ids and member.id not in unsubscribed_ids)
+            ]
+
     def get_channel_unsubscribed_users(self, channel_id: str, **kwargs) -> List[str]:
         """Return a list of string with usernames of channel unsubscribed users."""
         channel_unsubscribed_users = []
@@ -119,6 +145,24 @@ class PostgresDataSource(DataSource):
 
         return groups
 
+    def get_target_groups(self, channel_id: str, specified_target_groups: List[str], **kwargs) -> List[Dict[str, str]]:
+        """Return a list of dictionaries with group_ids of a channel."""
+        groups = []
+
+        with self.session() as session:
+            channel = self.__get_scalar(session, Channel, id=channel_id, deleteDate=None)
+            if not channel:
+                raise NotFoundDataSourceError(Channel, id=channel_id)
+
+            # Comes as string from the queue, convert to UUID for proper DB comparison
+            targeted_group_ids = [uuid.UUID(group_id) for group_id in specified_target_groups]
+
+            for group in channel.groups:
+                if group.id in targeted_group_ids:
+                    groups.append({DataSource.GROUP_ID: group.id})
+
+        return groups
+
     def get_group_users(self, group_id: str, **kwargs) -> List[Dict[str, str]]:
         """Return a list of dictionaries with user_ids and emails of group_users."""
         group_users = []
diff --git a/notifications_routing/router.py b/notifications_routing/router.py
index e6a0696..49a1e52 100644
--- a/notifications_routing/router.py
+++ b/notifications_routing/router.py
@@ -1,6 +1,7 @@
 """Notifications Router definition."""
 import json
 import logging
+from typing import Dict, List
 
 import megabus
 
@@ -86,24 +87,23 @@ class Router(megabus.Listener):
             str(OutputMessageKeys.LINK): message_json[str(InputMessageKeys.LINK)],
             str(OutputMessageKeys.IMGURL): message_json[str(InputMessageKeys.IMGURL)],
             str(OutputMessageKeys.CHANNEL_CATEGORY): category_name,
+            str(OutputMessageKeys.PRIVATE): message_json[str(InputMessageKeys.PRIVATE)],
+            str(OutputMessageKeys.TARGETUSERS): message_json[str(InputMessageKeys.TARGETUSERS)],
+            str(OutputMessageKeys.TARGETGROUPS): message_json[str(InputMessageKeys.TARGETGROUPS)],
         }
 
         return notification
 
-    def get_channel_users(self, channel_id):
-        """Join users from our data source and the grappa system to return a unique users list."""
-        channel_users = self.data_source.get_channel_users(channel_id)
-        unique_usernames = [channel_user[self.data_source.USERNAME] for channel_user in channel_users]
+    def add_users_from_groups(self, channel_id: str, users: List[Dict[str, str]], groups: List[Dict[str, str]]):
+        """Add users from groups to users list."""
+        unique_usernames = [channel_user[self.data_source.USERNAME] for channel_user in users]
         logging.debug("channel %s usernames: %s", channel_id, unique_usernames)
 
-        channel_groups = self.data_source.get_channel_groups(channel_id)
-        logging.debug("channel %s groups %s", channel_id, channel_groups)
-
-        if channel_groups:
+        if groups:
             channel_unsubscribed_users = self.data_source.get_channel_unsubscribed_users(channel_id)
             logging.debug("channel %s unsubscribed users %s", channel_id, channel_unsubscribed_users)
 
-            for group in channel_groups:
+            for group in groups:
                 for group_id in group.values():
                     group_users = self.data_source.get_group_users(group_id)
                     logging.debug("channel %s groups %s", channel_id, group_users)
@@ -117,24 +117,60 @@ class Router(megabus.Listener):
 
                         try:
                             system_user = self.data_source.get_system_user(user[self.data_source.USERNAME])
-                            channel_users.append(system_user)
+                            users.append(system_user)
                             unique_usernames.append(self.data_source.USERNAME)
                         except NotFoundDataSourceError:
-                            channel_users.append(user)
+                            users.append(user)
                             unique_usernames.append(self.data_source.USERNAME)
 
+        logging.debug("channel %s final users %s", channel_id, users)
+
+    def get_channel_users(self, channel_id):
+        """Join users from our data source and the grappa system to return a unique users list."""
+        channel_users = self.data_source.get_channel_users(channel_id)
+        channel_groups = self.data_source.get_channel_groups(channel_id)
+        logging.debug("channel %s groups %s", channel_id, channel_groups)
+
+        self.add_users_from_groups(channel_id, channel_users, channel_groups)
+
         logging.debug("channel %s final users %s", channel_id, channel_users)
 
         return channel_users
 
+    def get_target_users(self, channel_id, specified_target_users, specified_target_groups):  # noqa: C901
+        """Join users from our data source and the grappa system to return a unique users list."""
+        target_users = []
+        if specified_target_users:
+            target_users = self.data_source.get_target_users(channel_id, specified_target_users)
+
+        if specified_target_groups:
+            target_groups = self.data_source.get_target_groups(channel_id, specified_target_groups)
+            logging.debug("channel %s targeted groups %s", channel_id, target_groups)
+
+            self.add_users_from_groups(channel_id, target_users, target_groups)
+
+        return target_users
+
     def process_message(self, message):
         """Process a message according to user and default preferences and sends to available delivery channels."""
-        channel_users = self.get_channel_users(message[OutputMessageKeys.CHANNEL_ID])
-        if not channel_users:
-            logging.debug("no channel_users for channel %s", message[OutputMessageKeys.CHANNEL_ID])
-            return
-
-        for user in channel_users:
+        if message.get(OutputMessageKeys.PRIVATE) is True:
+            logging.debug("Processing direct notification %s", message[OutputMessageKeys.ID])
+            target_users = self.get_target_users(
+                message[OutputMessageKeys.CHANNEL_ID],
+                message[OutputMessageKeys.TARGETUSERS],
+                message[OutputMessageKeys.TARGETGROUPS],
+            )
+            if not target_users:
+                logging.debug("no target_users found for notification %s", message[OutputMessageKeys.ID])
+                return
+        else:
+            logging.debug("Processing general notification %s", message[OutputMessageKeys.ID])
+            target_users = self.get_channel_users(message[OutputMessageKeys.CHANNEL_ID])
+            if not target_users:
+                logging.debug("no channel_users for channel %s", message[OutputMessageKeys.CHANNEL_ID])
+                return
+
+        for user in target_users:
             # Never logged in users, apply default preferences
             has_logged_in = self.data_source.LAST_LOGIN in user and user.get(self.data_source.LAST_LOGIN)
             if self.data_source.USER_ID not in user or not has_logged_in:
diff --git a/notifications_routing/utils.py b/notifications_routing/utils.py
index 813666a..2191740 100644
--- a/notifications_routing/utils.py
+++ b/notifications_routing/utils.py
@@ -28,6 +28,9 @@ class InputMessageKeys(StrEnum):
     SUMMARY = "summary"
     LINK = "link"
     IMGURL = "imgUrl"
+    PRIVATE = "private"
+    TARGETUSERS = "targetUsers"
+    TARGETGROUPS = "targetGroups"
 
 
 class OutputMessageKeys(StrEnum):
@@ -44,6 +47,9 @@ class OutputMessageKeys(StrEnum):
     LINK = "link"
     IMGURL = "img_url"
     ID = "id"
+    PRIVATE = "private"
+    TARGETUSERS = "target_users"
+    TARGETGROUPS = "target_groups"
 
 
 class FeedFrequency(StrEnum):
@@ -86,6 +92,7 @@ def convert_notification_email_to_json_string(message, email):
         "notification_id": message[OutputMessageKeys.ID],
         "created_at": message[OutputMessageKeys.CREATED_TIMESTAMP],
         "category_name": message[OutputMessageKeys.CHANNEL_CATEGORY],
+        "private": message.get(OutputMessageKeys.PRIVATE) or False,
     }
 
     return json.dumps(notif)
@@ -109,6 +116,7 @@ def convert_notification_push_to_json_string(message, devicetoken, encoding=None
             "priority": message[OutputMessageKeys.PRIORITY],
             "notification_id": message[OutputMessageKeys.ID],
             "created_at": message[OutputMessageKeys.CREATED_TIMESTAMP],
+            "private": message.get(OutputMessageKeys.PRIVATE) or False,
         }
     )
 
diff --git a/scripts/send-targeted.py b/scripts/send-targeted.py
new file mode 100644
index 0000000..e211d0b
--- /dev/null
+++ b/scripts/send-targeted.py
@@ -0,0 +1,20 @@
+"""Utility to send one targeted message."""
+import stomp
+
+
+conn = stomp.Connection([("localhost", 61613)])
+conn.connect("admin", "admin", wait=True)
+# noqa
+message_body = r"""{"id":"bd19eea4-9dca-48d9-a577-5de9d2bf374a",
+"target":{"id":"50b1faf5-ae0d-46a6-8c62-7072fe5d466b","slug":"test","name":"Test","description":"",
+"visibility":"RESTRICTED","subscriptionPolicy":"DYNAMIC","archive":false,"APIKey":null,
+"creationDate":"2021-02-26T12:58:42.131Z","lastActivityDate":"2021-02-26T12:58:42.131Z","incomingEmail":null,
+"deleteDate":null}, "body":"<p>test</p>\n", "sendAt":null,"sentAt":"2021-02-26T13:59:40.754Z","users":[],
+"link":"https://test-notifications.web.cern.ch/main/channels/a4db5752-8f58-4144-a4f7-a2b6454273f5/notifications/de213d1c-4438-4f24-9bc3-58c0373db527",
+"summary":"test","imgUrl":null,"priority":"LOW","tags":null,"contentType":null,
+"private":true,
+"targetUsers":["79e92fe3-97a3-4970-a8dd-181caa7b5a10"],
+"targetGroups":[]} """
+
+conn.send(body=message_body, destination="/queue/np.routing", headers={"persistent": "true"})
+conn.disconnect()
-- 
GitLab