From 6b8fd08cfba5d7053152859fb677a82edf569074 Mon Sep 17 00:00:00 2001 From: Daniele Dal Santo <dal.santo.daniele@cern.ch> Date: Fri, 22 Mar 2024 09:57:25 +0100 Subject: [PATCH 1/2] Queue clear --- .../api/openapi/openapi.yaml | 25 ++++++++++++++++++- api/itk_demo_optoboard/api/routes.py | 8 ++++-- api/itk_demo_optoboard/worker/celery.py | 2 +- api/itk_demo_optoboard/worker/celeryTasks.py | 6 ++++- 4 files changed, 36 insertions(+), 5 deletions(-) diff --git a/api/itk_demo_optoboard/api/openapi/openapi.yaml b/api/itk_demo_optoboard/api/openapi/openapi.yaml index 1b27710..e687736 100644 --- a/api/itk_demo_optoboard/api/openapi/openapi.yaml +++ b/api/itk_demo_optoboard/api/openapi/openapi.yaml @@ -12,6 +12,29 @@ servers: description: This is a dummy value describing the base URL for the API. paths: + + /clearQueuesTask_wrapper: + post: + tags: + - Default + summary: Purge Celery queue(s) + x-openapi-router-controller: itk_demo_optoboard.api.routes + operationId: clearQueuesTask_wrapper + requestBody: + description: Purge Celery queue(s) + content: + application/json: + schema: + type: object + responses: + 200: + description: Returns a message to confirm all queues cleared + content: + application/json: + schema: + type: string + example: "Queue(s) cleared" + /health: get: tags: @@ -122,7 +145,7 @@ paths: /initOptoList: post: - summary: Endpoint which defines the list of optoboards + summary: Endpoint which defines the list of optoboards and purges all queues x-openapi-router-controller: itk_demo_optoboard.api.routes operationId: initOptoList requestBody: diff --git a/api/itk_demo_optoboard/api/routes.py b/api/itk_demo_optoboard/api/routes.py index 365205c..9bdb963 100644 --- a/api/itk_demo_optoboard/api/routes.py +++ b/api/itk_demo_optoboard/api/routes.py @@ -1,3 +1,4 @@ + from connexion import request from time import sleep import logging @@ -8,7 +9,6 @@ logger.setLevel(logging.DEBUG) ch = logging.StreamHandler() logger.addHandler(ch) - def health(): return _error_catcher(lambda: {"status": 200}) @@ -37,6 +37,11 @@ def initOptoList(): ).wait ) +def clearQueuesTask_wrapper(): + payload = request.get_json() + return _error_catcher( + clearQueuesTask.apply_async(queue="itk_demo_optoboard_queue").wait + ) def getOptoGUIConf(): return _error_catcher( @@ -268,6 +273,5 @@ def readLog_wrapper(): readLogTasks.apply_async(queue="itk_demo_optoboard_queue").wait ) - # from itk_demo_optoboard.wsgi import celery from itk_demo_optoboard.worker.celeryTasks import * diff --git a/api/itk_demo_optoboard/worker/celery.py b/api/itk_demo_optoboard/worker/celery.py index ebb5c94..6cfbd9e 100644 --- a/api/itk_demo_optoboard/worker/celery.py +++ b/api/itk_demo_optoboard/worker/celery.py @@ -12,7 +12,7 @@ def make_celery(): for queue_name in queue_names: task_queues.append(Queue(queue_name, routing_key=queue_name)) celery.conf.task_queues = task_queues - celery.conf.task_default_queue = "itk_demo_optoboard_queue" + celery.conf.task_default_queue = "itk_demo_optoboard_queue_OB0" celery.conf.task_create_missing_queues = False celery.conf.worker_concurrency = len(queue_names) return celery diff --git a/api/itk_demo_optoboard/worker/celeryTasks.py b/api/itk_demo_optoboard/worker/celeryTasks.py index aee741f..7284150 100644 --- a/api/itk_demo_optoboard/worker/celeryTasks.py +++ b/api/itk_demo_optoboard/worker/celeryTasks.py @@ -66,7 +66,7 @@ def loadOptoListTask(): opto_info_dic = {} optoboard_dic, opto_info_dic, opto = InitOpto.InitOpto(optoListConfig=optoGUIConf[defaultConfig]) logger.info("Configuration loaded!") - if int(os.environ.get("AUTO_CONFIG_OPTOs")) == 1: + if os.environ.get("AUTO_CONFIG_OPTOs") == 1: logger.info("Configuring all optoboards...") try: configureAllTasks() @@ -206,6 +206,10 @@ def configureAllTasks(): opto.configure() return "Configuration completed" +@celery.task() +def clearQueuesTask(): + celery.control.purge() + return "Queue(s) cleared" ### task for BER test -- GitLab From bc97945c4a1d179ceb819c9db2ec3acb4d6dc0bb Mon Sep 17 00:00:00 2001 From: Daniele Dal Santo <dal.santo.daniele@cern.ch> Date: Fri, 22 Mar 2024 18:02:45 +0100 Subject: [PATCH 2/2] WIP: clearing active tasks --- .../api/openapi/openapi.yaml | 24 ++++++++++++++++++- api/itk_demo_optoboard/api/routes.py | 16 ++++++++----- api/itk_demo_optoboard/worker/celery.py | 2 +- api/itk_demo_optoboard/worker/celeryTasks.py | 19 +++++++++++++-- 4 files changed, 51 insertions(+), 10 deletions(-) diff --git a/api/itk_demo_optoboard/api/openapi/openapi.yaml b/api/itk_demo_optoboard/api/openapi/openapi.yaml index e687736..970081f 100644 --- a/api/itk_demo_optoboard/api/openapi/openapi.yaml +++ b/api/itk_demo_optoboard/api/openapi/openapi.yaml @@ -13,10 +13,32 @@ servers: paths: + /clearActiveTasks_wrapper: + post: + tags: + - default + summary: List all tasks and clear them (WIP - clear specific tasks) + x-openapi-router-controller: itk_demo_optoboard.api.routes + operationId: clearActiveTasks_wrapper + requestBody: + description: List active tasks and clear them + content: + application/json: + schema: + type: object + responses: + 200: + description: Returns message that tasks cleared + content: + application/json: + schema: + type: string + example: "Active tasks revoked" + /clearQueuesTask_wrapper: post: tags: - - Default + - default summary: Purge Celery queue(s) x-openapi-router-controller: itk_demo_optoboard.api.routes operationId: clearQueuesTask_wrapper diff --git a/api/itk_demo_optoboard/api/routes.py b/api/itk_demo_optoboard/api/routes.py index 9bdb963..24991bb 100644 --- a/api/itk_demo_optoboard/api/routes.py +++ b/api/itk_demo_optoboard/api/routes.py @@ -21,31 +21,35 @@ def healthCelery(): def longTaskCelery(): return _error_catcher( - longTaskCeleryTask.apply_async(queue="itk_demo_optoboard_queue").wait + longTaskCeleryTask.apply_async(queue="itk_demo_optoboard_queue_OB0").wait ) def loadOptoList(): return _error_catcher( - loadOptoListTask.apply_async(queue="itk_demo_optoboard_queue").wait + loadOptoListTask.apply_async(queue="itk_demo_optoboard_queue_OB0").wait ) def initOptoList(): payload = request.get_json() return _error_catcher( initOptoListTask.apply_async( - args=(payload,), queue="itk_demo_optoboard_queue" + args=(payload,), queue="itk_demo_optoboard_queue_OB0" ).wait ) def clearQueuesTask_wrapper(): - payload = request.get_json() return _error_catcher( clearQueuesTask.apply_async(queue="itk_demo_optoboard_queue").wait ) +def clearActiveTasks_wrapper(): + return _error_catcher( + clearActiveTasks.apply_async(queue="itk_demo_optoboard_queue").wait + ) + def getOptoGUIConf(): return _error_catcher( - getOptoGUIConfTasks.apply_async(queue="itk_demo_optoboard_queue").wait + getOptoGUIConfTasks.apply_async(queue="itk_demo_optoboard_queue_OB0").wait ) @@ -56,7 +60,7 @@ def modifyOptoGUIConf(): return _error_catcher( modifyOptoGUIConfTasks.apply_async( - (confKey, newConfiguration), queue="itk_demo_optoboard_queue" + (confKey, newConfiguration), queue="itk_demo_optoboard_queue_OB0" ).wait ) diff --git a/api/itk_demo_optoboard/worker/celery.py b/api/itk_demo_optoboard/worker/celery.py index 6cfbd9e..ebb5c94 100644 --- a/api/itk_demo_optoboard/worker/celery.py +++ b/api/itk_demo_optoboard/worker/celery.py @@ -12,7 +12,7 @@ def make_celery(): for queue_name in queue_names: task_queues.append(Queue(queue_name, routing_key=queue_name)) celery.conf.task_queues = task_queues - celery.conf.task_default_queue = "itk_demo_optoboard_queue_OB0" + celery.conf.task_default_queue = "itk_demo_optoboard_queue" celery.conf.task_create_missing_queues = False celery.conf.worker_concurrency = len(queue_names) return celery diff --git a/api/itk_demo_optoboard/worker/celeryTasks.py b/api/itk_demo_optoboard/worker/celeryTasks.py index 7284150..b1dd576 100644 --- a/api/itk_demo_optoboard/worker/celeryTasks.py +++ b/api/itk_demo_optoboard/worker/celeryTasks.py @@ -37,7 +37,7 @@ def healthCeleryTask(): def longTaskCeleryTask(): logger.info("Celery is running! The connection to the microservice backend works!") logger.info("Sleeping for 10 seconds...") - sleep(10) + sleep(20) return {"status": 200} @worker_ready.connect @@ -195,7 +195,8 @@ def statusCheckTasks(optoboardPosition): @celery.task() def configureTasks(optoboardPosition, activeLpgbt=None, activeGbcr=None): - logger.info("Starting configuration of optoboard in position " + optoboardPosition + "...") + logger.info("Starting configuration of optoboard in position " + optoboardPosition + "...") + sleep(10) optoboard_dic[optoboardPosition].configure(activeLpgbt, activeGbcr) return "Optoboard configuration completed" @@ -206,11 +207,25 @@ def configureAllTasks(): opto.configure() return "Configuration completed" +#Clearing queues, stopping active tasks + @celery.task() def clearQueuesTask(): celery.control.purge() return "Queue(s) cleared" +@celery.task() +def clearActiveTasks(): + device = celery.control.inspect().active() + taskList = device['celery@atlas01.aec.unibe.ch'] + logger.info(taskList) + if len(taskList) == 1: + return "No active tasks to clear" + for i in range(1,len(taskList)): + print(taskList[1]['id']) + celery.control.revoke(taskList[1]['id'], terminate=True, signal='SIGKILL') + return "Tasks cleared" + ### task for BER test -- GitLab