From 6b8fd08cfba5d7053152859fb677a82edf569074 Mon Sep 17 00:00:00 2001
From: Daniele Dal Santo <dal.santo.daniele@cern.ch>
Date: Fri, 22 Mar 2024 09:57:25 +0100
Subject: [PATCH 1/2] Queue clear

---
 .../api/openapi/openapi.yaml                  | 25 ++++++++++++++++++-
 api/itk_demo_optoboard/api/routes.py          |  8 ++++--
 api/itk_demo_optoboard/worker/celery.py       |  2 +-
 api/itk_demo_optoboard/worker/celeryTasks.py  |  6 ++++-
 4 files changed, 36 insertions(+), 5 deletions(-)

diff --git a/api/itk_demo_optoboard/api/openapi/openapi.yaml b/api/itk_demo_optoboard/api/openapi/openapi.yaml
index 1b27710..e687736 100644
--- a/api/itk_demo_optoboard/api/openapi/openapi.yaml
+++ b/api/itk_demo_optoboard/api/openapi/openapi.yaml
@@ -12,6 +12,29 @@ servers:
     description: This is a dummy value describing the base URL for the API.
 
 paths:
+
+  /clearQueuesTask_wrapper:
+    post:
+      tags:
+        - Default
+      summary: Purge Celery queue(s)
+      x-openapi-router-controller: itk_demo_optoboard.api.routes
+      operationId: clearQueuesTask_wrapper
+      requestBody:
+        description: Purge Celery queue(s)
+        content:
+          application/json:
+            schema:
+              type: object
+      responses:
+        200:
+          description: Returns a message to confirm all queues cleared
+          content:
+            application/json:
+              schema:
+                type: string
+                example: "Queue(s) cleared"
+
   /health:
     get:
       tags:
@@ -122,7 +145,7 @@ paths:
 
   /initOptoList:
     post:
-      summary: Endpoint which defines the list of optoboards
+      summary: Endpoint which defines the list of optoboards and purges all queues
       x-openapi-router-controller: itk_demo_optoboard.api.routes
       operationId: initOptoList
       requestBody:
diff --git a/api/itk_demo_optoboard/api/routes.py b/api/itk_demo_optoboard/api/routes.py
index 365205c..9bdb963 100644
--- a/api/itk_demo_optoboard/api/routes.py
+++ b/api/itk_demo_optoboard/api/routes.py
@@ -1,3 +1,4 @@
+
 from connexion import request
 from time import sleep
 import logging
@@ -8,7 +9,6 @@ logger.setLevel(logging.DEBUG)
 ch = logging.StreamHandler()
 logger.addHandler(ch)
 
-
 def health():
     return _error_catcher(lambda: {"status": 200})
 
@@ -37,6 +37,11 @@ def initOptoList():
         ).wait
     )
 
+def clearQueuesTask_wrapper():
+    payload = request.get_json()
+    return _error_catcher(
+        clearQueuesTask.apply_async(queue="itk_demo_optoboard_queue").wait
+    )
 
 def getOptoGUIConf():
     return _error_catcher(
@@ -268,6 +273,5 @@ def readLog_wrapper():
         readLogTasks.apply_async(queue="itk_demo_optoboard_queue").wait
     )
 
-
 # from itk_demo_optoboard.wsgi import celery
 from itk_demo_optoboard.worker.celeryTasks import *
diff --git a/api/itk_demo_optoboard/worker/celery.py b/api/itk_demo_optoboard/worker/celery.py
index ebb5c94..6cfbd9e 100644
--- a/api/itk_demo_optoboard/worker/celery.py
+++ b/api/itk_demo_optoboard/worker/celery.py
@@ -12,7 +12,7 @@ def make_celery():
     for queue_name in queue_names:
         task_queues.append(Queue(queue_name, routing_key=queue_name))
     celery.conf.task_queues = task_queues
-    celery.conf.task_default_queue = "itk_demo_optoboard_queue"
+    celery.conf.task_default_queue = "itk_demo_optoboard_queue_OB0"
     celery.conf.task_create_missing_queues = False
     celery.conf.worker_concurrency = len(queue_names)
     return celery
diff --git a/api/itk_demo_optoboard/worker/celeryTasks.py b/api/itk_demo_optoboard/worker/celeryTasks.py
index aee741f..7284150 100644
--- a/api/itk_demo_optoboard/worker/celeryTasks.py
+++ b/api/itk_demo_optoboard/worker/celeryTasks.py
@@ -66,7 +66,7 @@ def loadOptoListTask():
     opto_info_dic = {}
     optoboard_dic, opto_info_dic, opto = InitOpto.InitOpto(optoListConfig=optoGUIConf[defaultConfig])
     logger.info("Configuration loaded!")
-    if int(os.environ.get("AUTO_CONFIG_OPTOs")) == 1:
+    if os.environ.get("AUTO_CONFIG_OPTOs") == 1:
         logger.info("Configuring all optoboards...")
         try:
             configureAllTasks()
@@ -206,6 +206,10 @@ def configureAllTasks():
         opto.configure()
     return "Configuration completed"
 
+@celery.task()
+def clearQueuesTask():
+    celery.control.purge()
+    return "Queue(s) cleared"
 
 ### task for BER test
 
-- 
GitLab


From bc97945c4a1d179ceb819c9db2ec3acb4d6dc0bb Mon Sep 17 00:00:00 2001
From: Daniele Dal Santo <dal.santo.daniele@cern.ch>
Date: Fri, 22 Mar 2024 18:02:45 +0100
Subject: [PATCH 2/2] WIP: clearing active tasks

---
 .../api/openapi/openapi.yaml                  | 24 ++++++++++++++++++-
 api/itk_demo_optoboard/api/routes.py          | 16 ++++++++-----
 api/itk_demo_optoboard/worker/celery.py       |  2 +-
 api/itk_demo_optoboard/worker/celeryTasks.py  | 19 +++++++++++++--
 4 files changed, 51 insertions(+), 10 deletions(-)

diff --git a/api/itk_demo_optoboard/api/openapi/openapi.yaml b/api/itk_demo_optoboard/api/openapi/openapi.yaml
index e687736..970081f 100644
--- a/api/itk_demo_optoboard/api/openapi/openapi.yaml
+++ b/api/itk_demo_optoboard/api/openapi/openapi.yaml
@@ -13,10 +13,32 @@ servers:
 
 paths:
 
+  /clearActiveTasks_wrapper:
+    post:
+      tags:
+        - default
+      summary: List all tasks and clear them (WIP - clear specific tasks)
+      x-openapi-router-controller: itk_demo_optoboard.api.routes
+      operationId: clearActiveTasks_wrapper
+      requestBody:
+        description: List active tasks and clear them
+        content:
+          application/json:
+            schema:
+              type: object
+      responses:
+        200:
+          description: Returns message that tasks cleared
+          content:
+            application/json:
+              schema:
+                type: string
+                example: "Active tasks revoked"
+
   /clearQueuesTask_wrapper:
     post:
       tags:
-        - Default
+        - default
       summary: Purge Celery queue(s)
       x-openapi-router-controller: itk_demo_optoboard.api.routes
       operationId: clearQueuesTask_wrapper
diff --git a/api/itk_demo_optoboard/api/routes.py b/api/itk_demo_optoboard/api/routes.py
index 9bdb963..24991bb 100644
--- a/api/itk_demo_optoboard/api/routes.py
+++ b/api/itk_demo_optoboard/api/routes.py
@@ -21,31 +21,35 @@ def healthCelery():
 
 def longTaskCelery():
     return _error_catcher(
-        longTaskCeleryTask.apply_async(queue="itk_demo_optoboard_queue").wait
+        longTaskCeleryTask.apply_async(queue="itk_demo_optoboard_queue_OB0").wait
     )
 
 def loadOptoList():
     return _error_catcher(
-        loadOptoListTask.apply_async(queue="itk_demo_optoboard_queue").wait
+        loadOptoListTask.apply_async(queue="itk_demo_optoboard_queue_OB0").wait
     )
 
 def initOptoList():
     payload = request.get_json()
     return _error_catcher(
         initOptoListTask.apply_async(
-            args=(payload,), queue="itk_demo_optoboard_queue"
+            args=(payload,), queue="itk_demo_optoboard_queue_OB0"
         ).wait
     )
 
 def clearQueuesTask_wrapper():
-    payload = request.get_json()
     return _error_catcher(
         clearQueuesTask.apply_async(queue="itk_demo_optoboard_queue").wait
     )
 
+def clearActiveTasks_wrapper():
+    return _error_catcher(
+        clearActiveTasks.apply_async(queue="itk_demo_optoboard_queue").wait
+    )
+
 def getOptoGUIConf():
     return _error_catcher(
-        getOptoGUIConfTasks.apply_async(queue="itk_demo_optoboard_queue").wait
+        getOptoGUIConfTasks.apply_async(queue="itk_demo_optoboard_queue_OB0").wait
     )
 
 
@@ -56,7 +60,7 @@ def modifyOptoGUIConf():
 
     return _error_catcher(
         modifyOptoGUIConfTasks.apply_async(
-            (confKey, newConfiguration), queue="itk_demo_optoboard_queue"
+            (confKey, newConfiguration), queue="itk_demo_optoboard_queue_OB0"
         ).wait
     )
 
diff --git a/api/itk_demo_optoboard/worker/celery.py b/api/itk_demo_optoboard/worker/celery.py
index 6cfbd9e..ebb5c94 100644
--- a/api/itk_demo_optoboard/worker/celery.py
+++ b/api/itk_demo_optoboard/worker/celery.py
@@ -12,7 +12,7 @@ def make_celery():
     for queue_name in queue_names:
         task_queues.append(Queue(queue_name, routing_key=queue_name))
     celery.conf.task_queues = task_queues
-    celery.conf.task_default_queue = "itk_demo_optoboard_queue_OB0"
+    celery.conf.task_default_queue = "itk_demo_optoboard_queue"
     celery.conf.task_create_missing_queues = False
     celery.conf.worker_concurrency = len(queue_names)
     return celery
diff --git a/api/itk_demo_optoboard/worker/celeryTasks.py b/api/itk_demo_optoboard/worker/celeryTasks.py
index 7284150..b1dd576 100644
--- a/api/itk_demo_optoboard/worker/celeryTasks.py
+++ b/api/itk_demo_optoboard/worker/celeryTasks.py
@@ -37,7 +37,7 @@ def healthCeleryTask():
 def longTaskCeleryTask():
     logger.info("Celery is running! The connection to the microservice backend works!")
     logger.info("Sleeping for 10 seconds...")
-    sleep(10)
+    sleep(20)
     return {"status": 200}
 
 @worker_ready.connect
@@ -195,7 +195,8 @@ def statusCheckTasks(optoboardPosition):
 
 @celery.task()
 def configureTasks(optoboardPosition, activeLpgbt=None, activeGbcr=None):
-    logger.info("Starting configuration of optoboard in position " + optoboardPosition + "...")   
+    logger.info("Starting configuration of optoboard in position " + optoboardPosition + "...")  
+    sleep(10) 
     optoboard_dic[optoboardPosition].configure(activeLpgbt, activeGbcr)
     return "Optoboard configuration completed"
 
@@ -206,11 +207,25 @@ def configureAllTasks():
         opto.configure()
     return "Configuration completed"
 
+#Clearing queues, stopping active tasks
+
 @celery.task()
 def clearQueuesTask():
     celery.control.purge()
     return "Queue(s) cleared"
 
+@celery.task()
+def clearActiveTasks():
+    device = celery.control.inspect().active()
+    taskList = device['celery@atlas01.aec.unibe.ch']
+    logger.info(taskList)
+    if len(taskList) == 1:
+        return "No active tasks to clear"
+    for i in range(1,len(taskList)):
+        print(taskList[1]['id'])
+        celery.control.revoke(taskList[1]['id'], terminate=True, signal='SIGKILL')
+    return "Tasks cleared"
+
 ### task for BER test
 
 
-- 
GitLab