diff --git a/api/itk_demo_optoboard/api/openapi/openapi.yaml b/api/itk_demo_optoboard/api/openapi/openapi.yaml index 1b27710be0d10384f562916d8e7688de912d2a92..970081f69cdf92635b622eacead5556fe05a00a1 100644 --- a/api/itk_demo_optoboard/api/openapi/openapi.yaml +++ b/api/itk_demo_optoboard/api/openapi/openapi.yaml @@ -12,6 +12,51 @@ servers: description: This is a dummy value describing the base URL for the API. paths: + + /clearActiveTasks_wrapper: + post: + tags: + - default + summary: List all tasks and clear them (WIP - clear specific tasks) + x-openapi-router-controller: itk_demo_optoboard.api.routes + operationId: clearActiveTasks_wrapper + requestBody: + description: List active tasks and clear them + content: + application/json: + schema: + type: object + responses: + 200: + description: Returns message that tasks cleared + content: + application/json: + schema: + type: string + example: "Active tasks revoked" + + /clearQueuesTask_wrapper: + post: + tags: + - default + summary: Purge Celery queue(s) + x-openapi-router-controller: itk_demo_optoboard.api.routes + operationId: clearQueuesTask_wrapper + requestBody: + description: Purge Celery queue(s) + content: + application/json: + schema: + type: object + responses: + 200: + description: Returns a message to confirm all queues cleared + content: + application/json: + schema: + type: string + example: "Queue(s) cleared" + /health: get: tags: @@ -122,7 +167,7 @@ paths: /initOptoList: post: - summary: Endpoint which defines the list of optoboards + summary: Endpoint which defines the list of optoboards and purges all queues x-openapi-router-controller: itk_demo_optoboard.api.routes operationId: initOptoList requestBody: diff --git a/api/itk_demo_optoboard/api/routes.py b/api/itk_demo_optoboard/api/routes.py index 365205c23190992bc12df2e475b31318dbbb5752..24991bb155f073105610edf90de50b0a34643f83 100644 --- a/api/itk_demo_optoboard/api/routes.py +++ b/api/itk_demo_optoboard/api/routes.py @@ -1,3 +1,4 @@ + from connexion import request from time import sleep import logging @@ -8,7 +9,6 @@ logger.setLevel(logging.DEBUG) ch = logging.StreamHandler() logger.addHandler(ch) - def health(): return _error_catcher(lambda: {"status": 200}) @@ -21,26 +21,35 @@ def healthCelery(): def longTaskCelery(): return _error_catcher( - longTaskCeleryTask.apply_async(queue="itk_demo_optoboard_queue").wait + longTaskCeleryTask.apply_async(queue="itk_demo_optoboard_queue_OB0").wait ) def loadOptoList(): return _error_catcher( - loadOptoListTask.apply_async(queue="itk_demo_optoboard_queue").wait + loadOptoListTask.apply_async(queue="itk_demo_optoboard_queue_OB0").wait ) def initOptoList(): payload = request.get_json() return _error_catcher( initOptoListTask.apply_async( - args=(payload,), queue="itk_demo_optoboard_queue" + args=(payload,), queue="itk_demo_optoboard_queue_OB0" ).wait ) +def clearQueuesTask_wrapper(): + return _error_catcher( + clearQueuesTask.apply_async(queue="itk_demo_optoboard_queue").wait + ) + +def clearActiveTasks_wrapper(): + return _error_catcher( + clearActiveTasks.apply_async(queue="itk_demo_optoboard_queue").wait + ) def getOptoGUIConf(): return _error_catcher( - getOptoGUIConfTasks.apply_async(queue="itk_demo_optoboard_queue").wait + getOptoGUIConfTasks.apply_async(queue="itk_demo_optoboard_queue_OB0").wait ) @@ -51,7 +60,7 @@ def modifyOptoGUIConf(): return _error_catcher( modifyOptoGUIConfTasks.apply_async( - (confKey, newConfiguration), queue="itk_demo_optoboard_queue" + (confKey, newConfiguration), queue="itk_demo_optoboard_queue_OB0" ).wait ) @@ -268,6 +277,5 @@ def readLog_wrapper(): readLogTasks.apply_async(queue="itk_demo_optoboard_queue").wait ) - # from itk_demo_optoboard.wsgi import celery from itk_demo_optoboard.worker.celeryTasks import * diff --git a/api/itk_demo_optoboard/worker/celeryTasks.py b/api/itk_demo_optoboard/worker/celeryTasks.py index aee741f69f7bc643df666f9c86a4439a900fe391..b1dd576a880a180b62ca4c97cb9067edd742c4aa 100644 --- a/api/itk_demo_optoboard/worker/celeryTasks.py +++ b/api/itk_demo_optoboard/worker/celeryTasks.py @@ -37,7 +37,7 @@ def healthCeleryTask(): def longTaskCeleryTask(): logger.info("Celery is running! The connection to the microservice backend works!") logger.info("Sleeping for 10 seconds...") - sleep(10) + sleep(20) return {"status": 200} @worker_ready.connect @@ -66,7 +66,7 @@ def loadOptoListTask(): opto_info_dic = {} optoboard_dic, opto_info_dic, opto = InitOpto.InitOpto(optoListConfig=optoGUIConf[defaultConfig]) logger.info("Configuration loaded!") - if int(os.environ.get("AUTO_CONFIG_OPTOs")) == 1: + if os.environ.get("AUTO_CONFIG_OPTOs") == 1: logger.info("Configuring all optoboards...") try: configureAllTasks() @@ -195,7 +195,8 @@ def statusCheckTasks(optoboardPosition): @celery.task() def configureTasks(optoboardPosition, activeLpgbt=None, activeGbcr=None): - logger.info("Starting configuration of optoboard in position " + optoboardPosition + "...") + logger.info("Starting configuration of optoboard in position " + optoboardPosition + "...") + sleep(10) optoboard_dic[optoboardPosition].configure(activeLpgbt, activeGbcr) return "Optoboard configuration completed" @@ -206,6 +207,24 @@ def configureAllTasks(): opto.configure() return "Configuration completed" +#Clearing queues, stopping active tasks + +@celery.task() +def clearQueuesTask(): + celery.control.purge() + return "Queue(s) cleared" + +@celery.task() +def clearActiveTasks(): + device = celery.control.inspect().active() + taskList = device['celery@atlas01.aec.unibe.ch'] + logger.info(taskList) + if len(taskList) == 1: + return "No active tasks to clear" + for i in range(1,len(taskList)): + print(taskList[1]['id']) + celery.control.revoke(taskList[1]['id'], terminate=True, signal='SIGKILL') + return "Tasks cleared" ### task for BER test