From d50eabd0bc94adde7b9af4c5fcad9a609291e473 Mon Sep 17 00:00:00 2001
From: ormancey <emmanuel.ormancey@cern.ch>
Date: Wed, 20 Oct 2021 14:46:47 +0200
Subject: [PATCH] added multithreading

---
 Python/stress-testing/api_library/channel.py  |  41 +++---
 Python/stress-testing/api_library/config.py   |  12 +-
 .../api_library/notification.py               |   5 +-
 Python/stress-testing/api_library/tag.py      |  15 ++-
 Python/stress-testing/stress_testing.py       | 124 +++++++++++-------
 5 files changed, 119 insertions(+), 78 deletions(-)

diff --git a/Python/stress-testing/api_library/channel.py b/Python/stress-testing/api_library/channel.py
index 427642c..ee4e08e 100644
--- a/Python/stress-testing/api_library/channel.py
+++ b/Python/stress-testing/api_library/channel.py
@@ -6,8 +6,9 @@ from .config import Config
 import sys
 
 # Create new Channel
-def create_channel(name, admingroup, description):
-    print('Creating Channel:', name)
+def create_channel(name, admingroup, description, verbose=False):
+    if verbose:
+        print('Creating Channel:', name)
     data = {'channel': {
         'name': name,
         'slug': re.sub('[^0-9a-z-_]', '-', name.lower()),
@@ -29,8 +30,9 @@ def create_channel(name, admingroup, description):
     return new_channel['id']
 
 # Delete Channel
-def delete_channel(channel_id):
-    print('Deleting Channel', channel_id)
+def delete_channel(channel_id, verbose=False):
+    if verbose:
+        print('Deleting Channel', channel_id)
     r = requests.delete(Config.BACKEND_URL + '/channels/' + channel_id, headers=Config.HEADER(), verify=Config.VERIFY)
     if r.status_code != requests.codes.ok:
         print('error deleting channel', r.json())
@@ -38,8 +40,9 @@ def delete_channel(channel_id):
     return
 
 # Add egroup as Channel Member
-def add_user_to_channel(channel_id, username):
-    print('Adding user to Channel members', username)
+def add_user_to_channel(channel_id, username, verbose=False):
+    if verbose:
+        print('Adding user to Channel members', username)
     data = { 'username': username }
     r = requests.put(Config.BACKEND_URL + '/channels/' + channel_id + '/members', json=data, headers=Config.HEADER(), verify=Config.VERIFY)
     if r.status_code != requests.codes.ok:
@@ -53,8 +56,9 @@ def add_user_to_channel(channel_id, username):
     return updated_channel['id']
 
 # Add group as Channel Member
-def add_group_to_channel(channel_id, group):
-    print('Adding group to Channel members', group)
+def add_group_to_channel(channel_id, group, verbose=False):
+    if verbose:
+        print('Adding group to Channel members', group)
     data = { 'group': { 'groupIdentifier': group } }
     r = requests.put(Config.BACKEND_URL + '/channels/' + channel_id + '/groups', json=data, headers=Config.HEADER(), verify=Config.VERIFY)
     if r.status_code != requests.codes.ok:
@@ -68,8 +72,9 @@ def add_group_to_channel(channel_id, group):
     return updated_channel['id']
 
 # Remove ME from Members
-def remove_me_from_channel(channel_id):
-    print('Removing ME from Channel members')
+def remove_me_from_channel(channel_id, verbose=False):
+    if verbose:
+        print('Removing ME from Channel members', channel_id)
     r = requests.get(Config.BACKEND_URL + '/me', headers=Config.HEADER(), verify=Config.VERIFY)
     if r.status_code != requests.codes.ok:
         print('error removing ME from channel', r.json())
@@ -78,7 +83,6 @@ def remove_me_from_channel(channel_id):
     if not me['userId']:
         print('error retrieving ME', me)
         sys.exit(2)
-
     data = { 'userId': me['userId'] }
     r = requests.delete(Config.BACKEND_URL + '/channels/' + channel_id + '/members', json=data, headers=Config.HEADER(), verify=Config.VERIFY)
     if r.status_code != requests.codes.ok:
@@ -89,8 +93,9 @@ def remove_me_from_channel(channel_id):
     return updated_channel['id']
 
 # Change Channel owner
-def set_channel_owner(channel_id, username):
-    print('Setting Channel owner to', username)
+def set_channel_owner(channel_id, username, verbose=False):
+    if verbose:
+        print('Setting Channel owner to', username)
     data = { 'username': username }
     r = requests.put(Config.BACKEND_URL + '/channels/' + channel_id + '/owner', json=data, headers=Config.HEADER(), verify=Config.VERIFY)
     if r.status_code != requests.codes.ok:
@@ -100,8 +105,9 @@ def set_channel_owner(channel_id, username):
 
     return updated_channel['id']
 
-def get_channels_by_prefix(prefix):
-    print('Get Channel By Prefix:', prefix)
+def get_channels_by_prefix(prefix, verbose=False):
+    if verbose:
+        print('Get Channel By Prefix:', prefix)
     data = { 'searchText': prefix, 'skip': 0, 'take': 10000 }
     r = requests.get(Config.BACKEND_URL + '/channels', params=data, headers=Config.HEADER(), verify=Config.VERIFY)
     if r.status_code != requests.codes.ok:
@@ -113,8 +119,9 @@ def get_channels_by_prefix(prefix):
     return found_channels["channels"]
 
 # Set Channel Tags
-def set_channel_tags(channel_id, tags):
-    print('Setting Channel tags')
+def set_channel_tags(channel_id, tags, verbose=False):
+    if verbose:
+        print('Setting Channel tags', channel_id)
     data = { 'tags': tags }
     r = requests.put(Config.BACKEND_URL + '/channels/' + channel_id + '/tags', json=data, headers=Config.HEADER(), verify=Config.VERIFY)
     if r.status_code != requests.codes.ok:
diff --git a/Python/stress-testing/api_library/config.py b/Python/stress-testing/api_library/config.py
index 46c6699..6b3327d 100644
--- a/Python/stress-testing/api_library/config.py
+++ b/Python/stress-testing/api_library/config.py
@@ -1,6 +1,7 @@
 #from get_api_token import get_api_token
 import subprocess
 import datetime
+from multiprocessing import Lock
 
 # Comment this out on a kerberize machine and use import above
 def get_api_token():
@@ -11,14 +12,19 @@ def get_api_token():
 class Config:
     """App configuration."""
     ACCESS_TOKEN_TIME=datetime.datetime.min
+    
+    lock = Lock()
 
     @staticmethod
     def renew():
         time_delta = (datetime.datetime.now() - Config.ACCESS_TOKEN_TIME)
         if (time_delta.total_seconds() / 60) > 8:
-            print("Bearer token older than 8 minutes, renewing...")
-            Config.ACCESS_TOKEN=get_api_token()
-            Config.ACCESS_TOKEN_TIME = datetime.datetime.now()
+            with Config.lock:
+                time_delta = (datetime.datetime.now() - Config.ACCESS_TOKEN_TIME)
+                if (time_delta.total_seconds() / 60) > 8:
+                    print("Bearer token older than 8 minutes, renewing...")
+                    Config.ACCESS_TOKEN=get_api_token()
+                    Config.ACCESS_TOKEN_TIME = datetime.datetime.now()
         return
 
     @staticmethod
diff --git a/Python/stress-testing/api_library/notification.py b/Python/stress-testing/api_library/notification.py
index bd51512..609e8b1 100644
--- a/Python/stress-testing/api_library/notification.py
+++ b/Python/stress-testing/api_library/notification.py
@@ -7,8 +7,9 @@ import sys
 
 
 # Send Notification
-def send_notification(channel_id, summary):
-    print('Sending notification to:', channel_id, summary)
+def send_notification(channel_id, summary, verbose=False):
+    if verbose:
+        print('Sending notification to:', channel_id, summary)
     data = {'notification': {
         'summary': summary,
         'body': Config.NOTIFICATION_BODY,
diff --git a/Python/stress-testing/api_library/tag.py b/Python/stress-testing/api_library/tag.py
index 1c16c70..5682155 100644
--- a/Python/stress-testing/api_library/tag.py
+++ b/Python/stress-testing/api_library/tag.py
@@ -5,8 +5,9 @@ import sys
 
 
 # Create Tag
-def create_tag(tag):
-    print('Creating new Tag:', tag)
+def create_tag(tag, verbose=False):
+    if verbose:
+        print('Creating new Tag:', tag)
     data = {'tag': {
         'name': tag,
     }}
@@ -22,13 +23,15 @@ def create_tag(tag):
 
     return new_tag['id']
 
-def get_tags():
-    print('Get Tags:')
+def get_tags(verbose=False):
+    if verbose:
+        print('Get Tags:')
     r = requests.get(Config.BACKEND_URL + '/tags', headers=Config.HEADER(), verify=Config.VERIFY)
     if r.status_code != requests.codes.ok:
         print('error getting tags', r.json())
         sys.exit(2)
     found_tags = r.json()
-    print(found_tags)
-    print(found_tags["tags"])
+    #print(found_tags)
+    if verbose:
+        print(found_tags["tags"])
     return found_tags["tags"]
\ No newline at end of file
diff --git a/Python/stress-testing/stress_testing.py b/Python/stress-testing/stress_testing.py
index cc380c9..190ab83 100644
--- a/Python/stress-testing/stress_testing.py
+++ b/Python/stress-testing/stress_testing.py
@@ -15,6 +15,7 @@ from api_library.channel import (
 )
 from api_library.notification import send_notification
 from api_library.tag import create_tag, get_tags
+import concurrent.futures
 
 
 def usage():
@@ -39,12 +40,51 @@ def usage():
         "\t--tags <count tags> : number of tags to add per channel, default and max =",
         len(Config.TAGS),
     )
+    print(
+        "\t--threads <max threads> : max number of threads to run for adding channel members. Default = 1",
+        len(Config.TAGS),
+    )
 
     print(
         "\t-d|--delete : find and delete all previously created test Channels prefixed",
         Config.CHANNEL_NAME,
     )
 
+def add_users_to_channel(ucnl, channel_id, verbose, startatcountusers, countusers):
+    print("Processing channel", ucnl, channel_id)
+    # if (ucnl % 100) == 99:
+    #     print("================= Processed", ucnl, "channels =================")
+    # Add one grappa gourp as member
+    try:
+        add_group_to_channel(
+            channel_id,
+            Config.NOTIFTEST_GROUPS[ucnl % len(Config.NOTIFTEST_GROUPS)],
+            verbose
+        )
+    except FileExistsError as ex:
+        if verbose:
+            print("\tGroup already exists")
+        pass
+
+    # Add notiftest users as member, except the ones in the group added above
+    for ucpt in range(startatcountusers, countusers):
+        try:
+            if ucpt < len(Config.NOTIFTEST_USERS):
+                if (int(ucpt / 10)) != (ucnl % len(Config.NOTIFTEST_GROUPS)):
+                    add_user_to_channel(
+                        channel_id, Config.NOTIFTEST_USERS[ucpt], verbose
+                    )
+            else:
+                add_user_to_channel(
+                    channel_id,
+                    Config.UNVERIFIED_USERS[ucpt - len(Config.NOTIFTEST_USERS)], 
+                    verbose
+                )
+        except FileExistsError as ex:
+            if verbose:
+                print("\tUser already exists")
+            pass
+
 
 # Main
 def main(argv):
@@ -56,11 +96,14 @@ def main(argv):
     startatcountchannels = 0
     adminGroup = Config.ADMIN_GROUP
     delete_test_channels = False
+    verbose = False
+    maxthreads = 1
     try:
         opts, args = getopt.getopt(
             argv,
-            "hdc:n:u:",
+            "vhdc:n:u:",
             [
+                "verbose",
                 "help",
                 "channels=",
                 "notifications=",
@@ -69,6 +112,7 @@ def main(argv):
                 "startatcountchannels=",
                 "delete",
                 "tags=",
+                "threads="
             ],
         )
     except getopt.GetoptError:
@@ -79,6 +123,8 @@ def main(argv):
         if opt in ("-h", "--help"):
             usage()
             sys.exit()
+        elif opt in ("-v", "--verbose"):
+            verbose = True
         elif opt in ("-c", "--channels"):
             countchannels = int(arg)
         elif opt in ("-n", "--notifications"):
@@ -87,11 +133,11 @@ def main(argv):
             countusers = int(arg)
             if countusers > (len(Config.NOTIFTEST_USERS) + len(Config.UNVERIFIED_USERS)):
                 countusers = len(Config.NOTIFTEST_USERS) + len(Config.UNVERIFIED_USERS)
-        elif opt in ("-s", "--startatcountusers"):
+        elif opt in ("--startatcountusers"):
             startatcountusers = int(arg)
             if startatcountusers > countusers:
                 startatcountusers = countusers - 1
-        elif opt in ("-t", "--startatcountchannels"):
+        elif opt in ("--startatcountchannels"):
             startatcountchannels = int(arg)
             if startatcountchannels > countchannels:
                 startatcountchannels = countchannels - 1
@@ -101,11 +147,14 @@ def main(argv):
                 counttags = len(Config.TAGS)
         elif opt in ("-d", "--delete"):
             delete_test_channels = True
+        elif opt in ("--threads"):
+            maxthreads = int(arg)
+            print("Running with multithreads", maxthreads)
 
     print("Stress testing Notifications")
 
     print("Retrieving existing stress test channels if any")
-    channels_reloaded = get_channels_by_prefix(Config.CHANNEL_NAME)
+    channels_reloaded = get_channels_by_prefix(Config.CHANNEL_NAME, verbose)
     if channels_reloaded:
         channel_ids = [channel["id"] for channel in channels_reloaded]
         print("\tfound " + str(len(channel_ids)) + " test channels...")
@@ -119,86 +168,61 @@ def main(argv):
     if delete_test_channels is True:
         print("Deleting all test channels")
         for channel_id in channel_ids:
-            delete_channel(channel_id)
+            delete_channel(channel_id, verbose)
         sys.exit()
 
     # Create Channels
     print("Creating ", countchannels, " test channels")
     for i in range(len(channel_ids), countchannels):
         channelname = description = Config.CHANNEL_NAME + str(i)
-        channel_id = create_channel(channelname, adminGroup, description)
+        channel_id = create_channel(channelname, adminGroup, description, verbose)
         if channel_id:
             channel_ids.append(channel_id)
             # Remove ME as member
-            remove_me_from_channel(channel_id)
+            remove_me_from_channel(channel_id, verbose)
             # Set new owner to one notiftestXX
-            set_channel_owner(channel_id, Config.NOTIFTEST_USERS[i % 50])
+            set_channel_owner(channel_id, Config.NOTIFTEST_USERS[i % 50], verbose)
         else:
             print("Error creating channel ", channelname)
     
     # Create or retrieve Tags and assign to channels
     if counttags > 0:
         print("Retrieving existing tags if any")
-        tags = get_tags()
+        tags = get_tags(verbose)
         print("Creating ", counttags, " tags if required")
         for tcpt in range(len(tags), counttags):
             try:
-                tags.append(create_tag(Config.TAGS[tcpt]))
+                tags.append(create_tag(Config.TAGS[tcpt], verbose))
             except FileExistsError as ex:
-                print("\tTags already exists")
+                if verbose: 
+                    print("\tTags already exists")
                 pass                
-        print ("tags", tags)
+        print ("Applying tags to channels", tags)
         for ucnl in range(startatcountchannels, len(channel_ids)):
             channel_id = channel_ids[ucnl]
-            print("Setting tags to channel", channel_id)
-            set_channel_tags(channel_id,tags)
+            if verbose:
+                print("Setting tags to channel", channel_id)
+            set_channel_tags(channel_id, tags, verbose)
 
     # Add users to channel
     if countusers > 0:
-        for ucnl in range(startatcountchannels, len(channel_ids)):
-            channel_id = channel_ids[ucnl]
-            print("Processing channel", channel_id)
-            if (ucnl % 100) == 99:
-                print("================= Processed", ucnl, "channels =================")
-            # Add one grappa gourp as member
-            try:
-                add_group_to_channel(
-                    channel_id,
-                    Config.NOTIFTEST_GROUPS[ucnl % len(Config.NOTIFTEST_GROUPS)],
-                )
-            except FileExistsError as ex:
-                print("\tGroup already exists")
-                pass
-
-            # Add notiftest users as member, except the ones in the group added above
-            for ucpt in range(startatcountusers, countusers):
-                try:
-                    if ucpt < len(Config.NOTIFTEST_USERS):
-                        if (int(ucpt / 10)) != (ucnl % len(Config.NOTIFTEST_GROUPS)):
-                            add_user_to_channel(
-                                channel_id, Config.NOTIFTEST_USERS[ucpt]
-                            )
-                    else:
-                        add_user_to_channel(
-                            channel_id,
-                            Config.UNVERIFIED_USERS[ucpt - len(Config.NOTIFTEST_USERS)],
-                        )
-                except FileExistsError as ex:
-                    print("\tUser already exists")
-                    pass
-
-            # Add probeXXX users as member
-            # for username in Config.PROBE_USERS:
-            #   add_user_to_channel(channel_id, username)
+        print("Populating channel members ", countusers)
+        # We can use a with statement to ensure threads are cleaned up promptly
+        with concurrent.futures.ThreadPoolExecutor(max_workers=maxthreads) as executor:
+            for ucnl in range(startatcountchannels, len(channel_ids)):
+                channel_id = channel_ids[ucnl]
+                #add_users_to_channel(ucnl, channel_id, verbose, startatcountusers, countusers)
+                executor.submit(add_users_to_channel, ucnl, channel_id, verbose, startatcountusers, countusers)
 
     # Send Notifications
     if countnotifications > 0:
+        print("Sending notifications ", countnotifications)
         cnt = 0
         for channel_id in channel_ids:
             if (cnt % 1000) == 999:
                 print("Processed", cnt, "channels")
             for i in range(0, countnotifications):
-                send_notification(channel_id, Config.NOTIFICATION_SUMMARY + str(i))
+                send_notification(channel_id, Config.NOTIFICATION_SUMMARY + str(i), verbose)
             cnt = cnt + 1
 
     # Wait for checks to be done, and notifications to be delivered before exit and cleanup
-- 
GitLab