+# Python
+# IDE
+# Others
+# Env
+# stress-testing Python script
+Tool to stress test Notification service.
+### Prerequisites for LXPLUS usage
+Install a local auth-get-sso-cookie compatible with python3
+git clone https://gitlab.cern.ch/authzsvc/tools/auth-get-sso-cookie.git
+cd auth-get-sso-cookie
+python3 setup.py install --user
+### Steps
+- Needs https://gitlab.cern.ch/authzsvc/tools/auth-get-sso-cookie/
+    - Works on LXPLUS for example
+- kinit  
+- Edit ```clientapp_name``` and ```audience``` in ```get_api_token.py``` depending on your target (dev, qa, prod)
+python stress_testing.py -c <countchannels> -n <count notifications>
+    -c <countchannels> : number of test channels to create
+    -n <countnotifications> : number of test notifications to send per channel
\ No newline at end of file
+# API Library early stage
+- channel client
+- notification client
+- tag client
\ No newline at end of file
+import requests
+import json, os, re
+from requests.api import delete
+from .config import Config
+import sys
+# Create new Channel
+def create_channel(name, admingroup, description, verbose=False):
+    if verbose:
+        print('Creating Channel:', name)
+    data = {'channel': {
+        'name': name,
+        'slug': re.sub('[^0-9a-z-_]', '-', name.lower()),
+        'description': description,
+        'adminGroup': { 'groupIdentifier': admingroup },
+        'visibility': 'RESTRICTED',
+        'submissionByForm': [ 'ADMINISTRATORS' ],
+        #'submissionByEmail': [ 'EGROUP' ],
+        #'incomingEgroup': egroup + '@cern.ch',
+    }}
+    #print(data)
+    r = requests.post(Config.BACKEND_URL + '/channels/', json=data, headers=Config.HEADER(), verify=Config.VERIFY)
+    if r.status_code != requests.codes.ok:
+        print('error creating channel', r.json())
+        sys.exit(2)
+    new_channel = r.json()
+    #print(new_channel)
+    return new_channel['id']
+# Delete Channel
+def delete_channel(channel_id, verbose=False):
+    if verbose:
+        print('Deleting Channel', channel_id)
+    r = requests.delete(Config.BACKEND_URL + '/channels/' + channel_id, headers=Config.HEADER(), verify=Config.VERIFY)
+    if r.status_code != requests.codes.ok:
+        print('error deleting channel', r.json())
+        sys.exit(2)
+    return
+# Add egroup as Channel Member
+def add_user_to_channel(channel_id, username, verbose=False):
+    if verbose:
+        print('Adding user to Channel members', username)
+    data = { 'username': username }
+    r = requests.put(Config.BACKEND_URL + '/channels/' + channel_id + '/members', json=data, headers=Config.HEADER(), verify=Config.VERIFY)
+    if r.status_code != requests.codes.ok:
+        if r.json()["name"] == 'ForbiddenError':
+            raise FileExistsError("add_user_to_channel", r.json())
+        else:
+            print('error updating channel', r.json()["name"])
+            sys.exit(2)
+    updated_channel = r.json()
+    return updated_channel['id']
+# Add group as Channel Member
+def add_group_to_channel(channel_id, group, verbose=False):
+    if verbose:
+        print('Adding group to Channel members', group)
+    data = { 'group': { 'groupIdentifier': group } }
+    r = requests.put(Config.BACKEND_URL + '/channels/' + channel_id + '/groups', json=data, headers=Config.HEADER(), verify=Config.VERIFY)
+    if r.status_code != requests.codes.ok:
+        if r.json()["name"] == 'ForbiddenError':
+            raise FileExistsError("add_group_to_channel", r.json())
+        else:
+            print('error updating channel', r.json()["name"])
+            sys.exit(2)
+    updated_channel = r.json()
+    return updated_channel['id']
+# Remove ME from Members
+def remove_me_from_channel(channel_id, verbose=False):
+    if verbose:
+        print('Removing ME from Channel members', channel_id)
+    r = requests.get(Config.BACKEND_URL + '/usersettings', headers=Config.HEADER(), verify=Config.VERIFY)
+    if r.status_code != requests.codes.ok:
+        print('error removing ME from channel', r.json())
+        sys.exit(2)
+    me = r.json()
+    if not me['userId']:
+        print('error retrieving ME', me)
+        sys.exit(2)
+    data = { 'userId': me['userId'] }
+    r = requests.delete(Config.BACKEND_URL + '/channels/' + channel_id + '/members', json=data, headers=Config.HEADER(), verify=Config.VERIFY)
+    if r.status_code != requests.codes.ok:
+        print('error removing ME from channel members', r.json())
+        sys.exit(2)
+    updated_channel = r.json()
+    return updated_channel['id']
+# Change Channel owner
+def set_channel_owner(channel_id, username, verbose=False):
+    if verbose:
+        print('Setting Channel owner to', username)
+    data = { 'username': username }
+    r = requests.put(Config.BACKEND_URL + '/channels/' + channel_id + '/owner', json=data, headers=Config.HEADER(), verify=Config.VERIFY)
+    if r.status_code != requests.codes.ok:
+        print('error setting channel owner', r.json())
+        sys.exit(2)
+    updated_channel = r.json()
+    return updated_channel['id']
+def get_channels_by_prefix(prefix, verbose=False):
+    if verbose:
+        print('Get Channel By Prefix:', prefix)
+    data = { 'searchText': prefix, 'skip': 0, 'take': 10000 }
+    r = requests.get(Config.BACKEND_URL + '/channels', params=data, headers=Config.HEADER(), verify=Config.VERIFY)
+    if r.status_code != requests.codes.ok:
+        print('error getting channels by prefix', r.json())
+        sys.exit(2)
+    found_channels = r.json()
+    #print(found_channels)
+    #print(found_channels["channels"])
+    return found_channels["channels"]
+# Set Channel Tags
+def set_channel_tags(channel_id, tags, verbose=False):
+    if verbose:
+        print('Setting Channel tags', channel_id)
+    data = { 'tags': tags }
+    r = requests.put(Config.BACKEND_URL + '/channels/' + channel_id + '/tags', json=data, headers=Config.HEADER(), verify=Config.VERIFY)
+    if r.status_code != requests.codes.ok:
+        print('error setting channel tags', r.json())
+        sys.exit(2)
+    updated_channel = r.json()
+    return updated_channel['id']
+from .get_api_token import get_api_token
+import subprocess
+import datetime
+from multiprocessing import Lock
+# Comment this out on a kerberize machine and use import above
+# def get_api_token():
+#     print("Requesting a new token via ssh")
+#     result = subprocess.run(['sshpass -f ~/password ssh ormancey@lxplus.cern.ch "python notificationApiClient/get-api-token.py"'], stdout=subprocess.PIPE, shell=True)
+#     return result.stdout.decode().replace("\n", "")
+class Config:
+    """App configuration."""
+    ACCESS_TOKEN_TIME=datetime.datetime.min
+    lock = Lock()
+    @staticmethod
+    def renew():
+        time_delta = (datetime.datetime.now() - Config.ACCESS_TOKEN_TIME)
+        if (time_delta.total_seconds() / 60) > 8:
+            with Config.lock:
+                time_delta = (datetime.datetime.now() - Config.ACCESS_TOKEN_TIME)
+                if (time_delta.total_seconds() / 60) > 8:
+                    print("Bearer token older than 8 minutes, renewing...")
+                    Config.ACCESS_TOKEN=get_api_token()
+                    Config.ACCESS_TOKEN_TIME = datetime.datetime.now()
+        return
+    @staticmethod
+    def HEADER():
+        Config.renew()
+        return {"Authorization": "Bearer " + Config.ACCESS_TOKEN}
+    # BACKEND_URL='https://api-notifications-dev.app.cern.ch'
+    # Drop SSL warnings with: export PYTHONWARNINGS="ignore:Unverified HTTPS request"
+    BACKEND_URL = "https://localhost:8080"
+    ACCESS_TOKEN = '' #get_api_token()
+    #HEADER = {"Authorization": "Bearer " + ACCESS_TOKEN}
+    VERIFY = False  # Verify SSL certificate for requests
+#!/usr/bin/env python
+from auth_get_sso_cookie import cern_sso
+import subprocess
+import requests
+AUTH_HOSTNAME = "auth.cern.ch"
+AUTH_REALM = "cern"
+################# CONFIGURATION ##################
+# The Client application (application portal, option my app cannot keep a secret)
+#clientapp_name = "tmp-push-notifications-clientscript"
+clientapp_name = "notifications-dev-clientapi"
+# clientapp_name = "notifications-qa-clientapi"
+# clientapp_name = "notifications-clientapi"
+# Standard localhost uri for this virtual app
+clientapp_uri = "https://localhost"
+# The target application (the backend API), with granted permissions to client application for token exchange
+#audience = "tmp-push-notifications"
+audience = "notifications-dev"
+# audience = "notifications-qa"
+# audience = "notifications"
+#if __name__ == "__main__":
+def get_api_token():
+    # Get Token for the clientscript application
+    # Using https://gitlab.cern.ch/authzsvc/tools/auth-get-sso-cookie/
+    # Run with parameters for the clientscript application 
+    # clientapi.py -u https://localhost -c tmp-push-notifications-clientscript
+    #token = command_line_tools.auth_get_sso_token()
+    # proc = subprocess.Popen(
+    #     ["auth-get-sso-token", "-u", clientapp_uri, "-c", clientapp_name], 
+    #     stdout=subprocess.PIPE, 
+    #     stderr=subprocess.STDOUT)
+    # token = proc.communicate()[0].rstrip()
+    token = cern_sso.get_sso_token(clientapp_uri, clientapp_name, True, AUTH_HOSTNAME, AUTH_REALM)
+    #print("TOKEN to exchange retrieved")
+    #print(token)
+    # Do Token Exchange for the Backend API application
+    # https://auth.docs.cern.ch/user-documentation/oidc/exchange-for-api/
+    r = requests.post(
+            "https://auth.cern.ch/auth/realms/cern/protocol/openid-connect/token",
+            data={
+                "client_id": clientapp_name,
+                "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
+                "subject_token": token,
+                "requested_token_type": "urn:ietf:params:oauth:token-type:refresh_token",
+                "audience": audience,
+            },
+        )
+    if not r.ok:
+        print(
+            "The token response was not successful: {}".format(r.json()))
+        r.raise_for_status()
+    token_response = r.json()
+    access_token = token_response["access_token"]
+    #print("access_token retrieved")
+    #print(access_token)
+    return access_token
+    # Then calls to the backend can be performed with this access token
+    # ACCESS_TOKEN=$(python get-api-token.py)
+    # curl -X GET "https://api-notifications-dev.app.cern.ch/channels/" -H  "authorization: Bearer $ACCESS_TOKEN"
+import requests
+import json, os, re
+from requests.api import delete
+from .config import Config
+import sys
+# Send Notification
+def send_notification(channel_id, summary, verbose=False):
+    if verbose:
+        print('Sending notification to:', channel_id, summary)
+    data = {'notification': {
+        'summary': summary,
+        'body': Config.NOTIFICATION_BODY,
+        'target': channel_id,
+    }}
+    #print(data)
+    r = requests.post(Config.BACKEND_URL + '/notifications', json=data, headers=Config.HEADER(), verify=Config.VERIFY)
+    if r.status_code != requests.codes.ok:
+        print('error sending notification', r.json())
+        sys.exit(2)
+    new_notification = r.json()
+    #print(new_notification)
+    return new_notification['id']
+import requests
+from .config import Config
+import sys
+# Create Tag
+def create_tag(tag, verbose=False):
+    if verbose:
+        print('Creating new Tag:', tag)
+    data = {'tag': {
+        'name': tag,
+    }}
+    #print(data)
+    r = requests.post(Config.BACKEND_URL + '/tags', json=data, headers=Config.HEADER(), verify=Config.VERIFY)
+    if r.status_code != requests.codes.ok:
+        if r.json()["name"] == 'ForbiddenError':
+            raise FileExistsError("create_tag", r.json())
+        else:
+            print('error create_tag', r.json()["name"])
+            sys.exit(2)
+    new_tag = r.json()
+    return new_tag['id']
+def get_tags(verbose=False):
+    if verbose:
+        print('Get Tags:')
+    r = requests.get(Config.BACKEND_URL + '/tags', headers=Config.HEADER(), verify=Config.VERIFY)
+    if r.status_code != requests.codes.ok:
+        print('error getting tags', r.json())
+        sys.exit(2)
+    found_tags = r.json()
+    #print(found_tags)
+    if verbose:
+        print(found_tags["tags"])
+    return found_tags["tags"]
\ No newline at end of file
+import datetime
+class Config:
+    """App configuration."""
+    CHANNEL_NAME = "Stress Test Channel "
+    ADMIN_GROUP = "notifications-service-admins"
+    NOTIFICATION_SUMMARY = "Stress Test Notification "
+        "<p><h3>Stress Test Notification</h3>This is a stress test notification</p>"
+    )
+    # 50 notiftestxx users were created
+    NOTIFTEST_USERS = ['notiftest' + f"{i:02d}" for i in range(1, 51)]
+    # 5 Grappa groups created, containing 10 notiftestxx users each
+    NOTIFTEST_GROUPS = ['group-notiftest' + f"{i:02d}" for i in range(1, 6)]
+    # Random 'unverified email users' to add more members
+    UNVERIFIED_USERS = ['bill.gates.vaccines.5G.' + f"{i:05d}" + '@ffp.club' for i in range(1, 40000)]
+    # Tags
+    TAGS = ['tag.' + f"{i:02d}" for i in range(1, 41)]
+    PROBE_USERS = [
+        "probe000@cern.ch",
+        "probe001@cern.ch",
+        "probe002@cern.ch",
+        "probe003@cern.ch",
+        "probe004@cern.ch",
+        "probe005@cern.ch",
+        "probe006@cern.ch",
+        "probe007@cern.ch",
+        "probe008@cern.ch",
+        "probe009@cern.ch",
+        "probe010@cern.ch",
+        "probe011@cern.ch",
+        "probe012@cern.ch",
+        "probe013@cern.ch",
+        "probe014@cern.ch",
+        "probe100@cern.ch",
+        "probe101@cern.ch",
+        "probe102@cern.ch",
+        "probe103@cern.ch",
+        "probe104@cern.ch",
+        "probe105@cern.ch",
+        "probe106@cern.ch",
+        "probe107@cern.ch",
+        "probe108@cern.ch",
+        "probe109@cern.ch",
+        "probe110@cern.ch",
+        "probe111@cern.ch",
+        "probe112@cern.ch",
+        "probe113@cern.ch",
+        "probe114@cern.ch",
+        "probe200@cern.ch",
+        "probe201@cern.ch",
+        "probe202@cern.ch",
+        "probe203@cern.ch",
+        "probe204@cern.ch",
+        "probe205@cern.ch",
+        "probe206@cern.ch",
+        "probe207@cern.ch",
+        "probe208@cern.ch",
+        "probe209@cern.ch",
+        "probe210@cern.ch",
+        "probe211@cern.ch",
+        "probe212@cern.ch",
+        "probe213@cern.ch",
+        "probe214@cern.ch",
+        "probe300@cern.ch",
+        "probe301@cern.ch",
+        "probe302@cern.ch",
+        "probe303@cern.ch",
+        "probe304@cern.ch",
+        "probe305@cern.ch",
+        "probe306@cern.ch",
+        "probe307@cern.ch",
+        "probe308@cern.ch",
+        "probe309@cern.ch",
+        "probe310@cern.ch",
+        "probe311@cern.ch",
+        "probe312@cern.ch",
+        "probe313@cern.ch",
+        "probe314@cern.ch",
+        "probe900@cern.ch",
+        "probe901@cern.ch",
+    ]
+import json, os, re
+import sys, getopt
+from config import Config
+from api_library.channel import (
+    create_channel,
+    add_group_to_channel,
+    remove_me_from_channel,
+    set_channel_owner,
+    add_user_to_channel,
+    delete_channel,
+    get_channels_by_prefix,
+    set_channel_tags,
+from api_library.notification import send_notification
+from api_library.tag import create_tag, get_tags
+import concurrent.futures
+def usage():
+    print(
+        "stress_testing.py -c <count channels> [-n <count notifications> -u <count users> -d]"
+    )
+    print("\t-c|--channels <count channels> : number of test channels to create")
+    print(
+        "\t-n|--notifications <count notifications> : number of test notifications to send per channel"
+    )
+    print(
+        "\t-u|--users <count users> : number of test users to add per channel, default and max =",
+        len(Config.NOTIFTEST_USERS),
+    )
+    print(
+        "\t--startatcountusers : continuating add of test users per channel, starting at count"
+    )
+    print(
+        "\t--startatcountchannels : continuating add of test users at channel starting at count"
+    )
+    print(
+        "\t--tags <count tags> : number of tags to add per channel, default and max =",
+        len(Config.TAGS),
+    )
+    print(
+        "\t--threads <max threads> : max number of threads to run for adding channel members. Default = 1",
+        len(Config.TAGS),
+    )
+    print(
+        "\t-d|--delete : find and delete all previously created test Channels prefixed",
+        Config.CHANNEL_NAME,
+    )
+def add_users_to_channel(ucnl, channel_id, verbose, startatcountusers, countusers):
+    print("Processing channel", ucnl, channel_id)
+    # if (ucnl % 100) == 99:
+    #     print("================= Processed", ucnl, "channels =================")
+    # Add one grappa gourp as member
+    try:
+        add_group_to_channel(
+            channel_id,
+            Config.NOTIFTEST_GROUPS[ucnl % len(Config.NOTIFTEST_GROUPS)],
+            verbose
+        )
+    except FileExistsError as ex:
+        if verbose:
+            print("\tGroup already exists")
+        pass
+    # Add notiftest users as member, except the ones in the group added above
+    for ucpt in range(startatcountusers, countusers):
+        try:
+            if ucpt < len(Config.NOTIFTEST_USERS):
+                if (int(ucpt / 10)) != (ucnl % len(Config.NOTIFTEST_GROUPS)):
+                    add_user_to_channel(
+                        channel_id, Config.NOTIFTEST_USERS[ucpt], verbose
+                    )
+            else:
+                add_user_to_channel(
+                    channel_id,
+                    Config.UNVERIFIED_USERS[ucpt - len(Config.NOTIFTEST_USERS)], 
+                    verbose
+                )
+        except FileExistsError as ex:
+            if verbose:
+                print("\tUser already exists")
+            pass
+# Main
+def main(argv):
+    countchannels = 0
+    countnotifications = 0
+    countusers = 0 #len(Config.NOTIFTEST_USERS)
+    counttags = 0
+    startatcountusers = 0
+    startatcountchannels = 0
+    adminGroup = Config.ADMIN_GROUP
+    delete_test_channels = False
+    verbose = False
+    maxthreads = 1
+    try:
+        opts, args = getopt.getopt(
+            argv,
+            "vhdc:n:u:",
+            [
+                "verbose",
+                "help",
+                "channels=",
+                "notifications=",
+                "users=",
+                "startatcountusers=",
+                "startatcountchannels=",
+                "delete",
+                "tags=",
+                "threads="
+            ],
+        )
+    except getopt.GetoptError:
+        print("GetoptError")
+        usage()
+        sys.exit(2)
+    for opt, arg in opts:
+        if opt in ("-h", "--help"):
+            usage()
+            sys.exit()
+        elif opt in ("-v", "--verbose"):
+            verbose = True
+        elif opt in ("-c", "--channels"):
+            countchannels = int(arg)
+        elif opt in ("-n", "--notifications"):
+            countnotifications = int(arg)
+        elif opt in ("-u", "--users"):
+            countusers = int(arg)
+            if countusers > (len(Config.NOTIFTEST_USERS) + len(Config.UNVERIFIED_USERS)):
+                countusers = len(Config.NOTIFTEST_USERS) + len(Config.UNVERIFIED_USERS)
+        elif opt in ("--startatcountusers"):
+            startatcountusers = int(arg)
+            if startatcountusers > countusers:
+                startatcountusers = countusers - 1
+        elif opt in ("--startatcountchannels"):
+            startatcountchannels = int(arg)
+            if startatcountchannels > countchannels:
+                startatcountchannels = countchannels - 1
+        elif opt in ("--tags"):
+            counttags = int(arg)
+            if counttags > len(Config.TAGS):
+                counttags = len(Config.TAGS)
+        elif opt in ("-d", "--delete"):
+            delete_test_channels = True
+        elif opt in ("--threads"):
+            maxthreads = int(arg)
+            print("Running with multithreads", maxthreads)
+    print("Stress testing Notifications")
+    print("Retrieving existing stress test channels if any")
+    channels_reloaded = get_channels_by_prefix(Config.CHANNEL_NAME, verbose)
+    if channels_reloaded:
+        channel_ids = [channel["id"] for channel in channels_reloaded]
+        print("\tfound " + str(len(channel_ids)) + " test channels...")
+        if countchannels < len(channel_ids):
+            channel_ids = channel_ids[0:countchannels]
+        print("\tand continuing with " + str(len(channel_ids)) + " test channels...")
+    else:
+        channel_ids = []
+    # Delete all test channels
+    if delete_test_channels is True:
+        print("Deleting all test channels")
+        for channel_id in channel_ids:
+            delete_channel(channel_id, verbose)
+        sys.exit()
+    # Create Channels
+    print("Creating ", countchannels, " test channels")
+    for i in range(len(channel_ids), countchannels):
+        channelname = description = Config.CHANNEL_NAME + str(i)
+        channel_id = create_channel(channelname, adminGroup, description, verbose)
+        if channel_id:
+            channel_ids.append(channel_id)
+            # Remove ME as member
+            remove_me_from_channel(channel_id, verbose)
+            # Set new owner to one notiftestXX
+            set_channel_owner(channel_id, Config.NOTIFTEST_USERS[i % 50], verbose)
+        else:
+            print("Error creating channel ", channelname)
+    # Create or retrieve Tags and assign to channels
+    if counttags > 0:
+        print("Retrieving existing tags if any")
+        tags = get_tags(verbose)
+        print("Creating ", counttags, " tags if required")
+        for tcpt in range(len(tags), counttags):
+            try:
+                tags.append(create_tag(Config.TAGS[tcpt], verbose))
+            except FileExistsError as ex:
+                if verbose: 
+                    print("\tTags already exists")
+                pass                
+        print ("Applying tags to channels", tags)
+        for ucnl in range(startatcountchannels, len(channel_ids)):
+            channel_id = channel_ids[ucnl]
+            if verbose:
+                print("Setting tags to channel", channel_id)
+            set_channel_tags(channel_id, tags, verbose)
+    # Add users to channel
+    if countusers > 0:
+        print("Populating channel members ", countusers)
+        # We can use a with statement to ensure threads are cleaned up promptly
+        with concurrent.futures.ThreadPoolExecutor(max_workers=maxthreads) as executor:
+            for ucnl in range(startatcountchannels, len(channel_ids)):
+                channel_id = channel_ids[ucnl]
+                #add_users_to_channel(ucnl, channel_id, verbose, startatcountusers, countusers)
+                executor.submit(add_users_to_channel, ucnl, channel_id, verbose, startatcountusers, countusers)
+    # Send Notifications
+    if countnotifications > 0:
+        print("Sending notifications ", countnotifications)
+        cnt = 0
+        for channel_id in channel_ids:
+            if (cnt % 1000) == 999:
+                print("Processed", cnt, "channels")
+            for i in range(0, countnotifications):
+                send_notification(channel_id, Config.NOTIFICATION_SUMMARY + str(i), verbose)
+            cnt = cnt + 1
+    # Wait for checks to be done, and notifications to be delivered before exit and cleanup
+    # print('Please wait for notifications to be delivered, and check the system is stable before continuing.')
+    # text = input("Press Enter to remove all test channels and exit, or CTRL+C to exit directly...")
+    # Cleanup by deleting channels
+    # for channel_id in channel_ids:
+    #    delete_channel(channel_id)
+if __name__ == "__main__":
+    main(sys.argv[1:])