diff --git a/notifications_routing/router.py b/notifications_routing/router.py index beae0fb9c9e5cf0fa6e3f9cdeb868919d34eb54b..d70bf9befefeed547d76bac4085486957e3a7f21 100644 --- a/notifications_routing/router.py +++ b/notifications_routing/router.py @@ -114,11 +114,11 @@ class Router(megabus.Listener): groups = self.data_source.get_channel_groups(channel_id) logging.debug("channel %s groups %s", users, groups) - if groups: - unsubscribed_users = self.data_source.get_channel_unsubscribed_users(channel_id) - logging.debug("channel %s unsubscribed users %s", channel_id, unsubscribed_users) - audit_notification(notification_id, {"event": "Unsubscribed users", "targets": unsubscribed_users}) + unsubscribed_users = self.data_source.get_channel_unsubscribed_users(channel_id) + logging.debug("channel %s unsubscribed users %s", channel_id, unsubscribed_users) + audit_notification(notification_id, {"event": "Unsubscribed users", "targets": unsubscribed_users}) + if groups: self.add_users_from_groups(notification_id, channel_id, users, groups, unsubscribed_users) logging.debug("channel %s final users %s", channel_id, users) @@ -150,8 +150,13 @@ class Router(megabus.Listener): return subscribed_target_users - def process_users(self, message): - """Process unique users list, if targeted, intersection or standard post.""" + def process_users(self, message) -> List[Dict[str, str]]: + """Process unique users list, if targeted, intersection or standard post. + + :param message: Dict with keys from OutputMessageKeys + :return: A list of user data in the format of dicts containing user_id, username, email and last_login + :raises: NotFoundDataSourceError, MultipleResultsFoundError + """ if OutputMessageKeys.PRIVATE in message and message[OutputMessageKeys.PRIVATE]: audit_notification(message[OutputMessageKeys.ID], {"event": "Start processing targeted notification"}) logging.debug("Processing direct notification %s", message[OutputMessageKeys.ID]) @@ -159,17 +164,27 @@ class Router(megabus.Listener): message[OutputMessageKeys.ID], message[OutputMessageKeys.CHANNEL_ID], ) + if not target_users: logging.debug("no target_users found for notification %s", message[OutputMessageKeys.ID]) - return + return [] if OutputMessageKeys.INTERSECTION in message and message[OutputMessageKeys.INTERSECTION]: logging.debug("Processing intersection target users for %s", message[OutputMessageKeys.ID]) channel_users = self.get_channel_users( message[OutputMessageKeys.ID], message[OutputMessageKeys.CHANNEL_ID] ) + + audit_notification( + message[OutputMessageKeys.ID], + {"event": "Target users", "total": len(target_users), "users": target_users}, + ) + audit_notification( + message[OutputMessageKeys.ID], + {"event": "Channel users for intersection", "total": len(channel_users), "users": channel_users}, + ) if not channel_users: logging.debug("no channel_users to intersect for channel %s", message[OutputMessageKeys.CHANNEL_ID]) - return + return [] target_users = [x for x in target_users if x in channel_users] return target_users @@ -179,20 +194,21 @@ class Router(megabus.Listener): target_users = self.get_channel_users(message[OutputMessageKeys.ID], message[OutputMessageKeys.CHANNEL_ID]) if not target_users: logging.debug("no channel_users for channel %s", message[OutputMessageKeys.CHANNEL_ID]) - return + return [] return target_users def process_message(self, message): """Process a message according to user and default preferences and sends to available delivery channels.""" target_users = self.process_users(message) - if not target_users: - return - audit_notification( message[OutputMessageKeys.ID], {"event": "Expanded recipients", "total": len(target_users), "targets": target_users}, ) + + if not target_users: + return + for user in target_users: # Never logged in users, apply default preferences has_logged_in = self.data_source.LAST_LOGIN in user and user.get(self.data_source.LAST_LOGIN)