Skip to content
Snippets Groups Projects

Error handler for Celery tasks

Merged Marianna Glazewska requested to merge mglazews-task-error-catcher into master
3 files
+ 51
47
Compare changes
  • Side-by-side
  • Inline
Files
3
@@ -7,10 +7,11 @@ from collections import OrderedDict
from requests import get
from requests.exceptions import ConnectionError
from itk_demo_optoboard.exceptions import task_error_catcher
from itk_demo_optoboard.worker.celery import celery
from celery.signals import worker_ready, worker_init
# sys.path.append(os.getcwd() + "itk_demo_optoboard/optoboard_felix")
from optoboard_felix.additional_driver.DB_access import (
checkAvailableDatasetInDB,
@@ -26,7 +27,7 @@ from optoboard_felix.driver.Lpgbt import Lpgbt
from optoboard_felix.driver.Gbcr import Gbcr
from optoboard_felix.InitOpto import InitOpto
### check celery status
### check celery status
@celery.task()
def healthCeleryTask():
@@ -47,6 +48,7 @@ def run_startup_task(**kwargs):
loadOptoListTask.apply_async(queue="itk_demo_optoboard_queue")
logger.info("run_startup_task executed!")
## initialize optoboard object after config file modification/startup
@celery.task()
@@ -167,9 +169,8 @@ def readTasks(optoboardPosition, device, register):
except:
subregisterName = None
replyTasks = eval('optoboard_dic["' + optoboardPosition + '"].' + device).read(
registerName, subregisterName
)
replyTasks = task_error_catcher(eval('optoboard_dic["' + optoboardPosition + '"].' + device).read,
registerName, subregisterName)
logger.info("Value for " + register + " : " + str(replyTasks))
return replyTasks
@@ -180,8 +181,8 @@ def readTasks(optoboardPosition, device, register):
@celery.task()
def optoStatusTask(optoboardPosition):
full_dic = optoboard_dic[optoboardPosition].opto_status()
status = {"lpgbt1": 0, "lpgbt2": 0, "lpgbt3": 0, "lpgbt4": 0, "full_log": full_dic}
full_dic = task_error_catcher(optoboard_dic[optoboardPosition].opto_status())
status = {"lpgbt1": 0, "lpgbt1": 0, "lpgbt1": 0, "lpgbt1": 0}
for key, device_dic in full_dic.items():
if key[:-1] == "lpgbt":
if device_dic["PUSM_ready"] is True:
@@ -192,7 +193,7 @@ def optoStatusTask(optoboardPosition):
@celery.task()
def resetSlaveClockTask(optoboardPosition, device, slave_name):
eval('optoboard_dic["' + optoboardPosition + '"].' + device).reset_slave_clock(slave_name)
task_error_catcher(eval('optoboard_dic["' + optoboardPosition + '"].' + device).reset_slave_clock,slave_name)
if device == "lpgbt1" and slave_name != "gbcr":
return "Reset of " + slave_name + " clock successful"
elif device == "lpgbt1" and slave_name == "gbcr":
@@ -200,21 +201,19 @@ def resetSlaveClockTask(optoboardPosition, device, slave_name):
else:
return "Reset of " + slave_name + " clock of " + device + " successful"
### configuration tasks
@celery.task()
def configureTasks(optoboardPosition, activeLpgbt=None, activeGbcr=None):
logger.info("Starting configuration of optoboard in position " + optoboardPosition + "...")
optoboard_dic[optoboardPosition].configure(activeLpgbt, activeGbcr)
return "Optoboard configuration completed"
logger.info("Starting configuration of optoboard in position " + optoboardPosition + "...")
task_error_catcher(optoboard_dic[optoboardPosition].configure, activeLpgbt, activeGbcr)
@celery.task()
def configureAllTasks():
for pos, opto in optoboard_dic.items():
opto.configure()
task_error_catcher(opto.configure())
return "Configuration completed"
### task for BER test
@@ -222,29 +221,28 @@ def configureAllTasks():
@celery.task()
def BERT_Tasks(optoboardPosition, device, channel, meastime):
BERT_result = eval('optoboard_dic["' + optoboardPosition + '"].' + device).bert(
channel + 1, 6, meastime
)
BERT_result = task_error_catcher(eval('optoboard_dic["' + optoboardPosition + '"].' + device).bert,
channel + 1, 6, meastime)
return BERT_result
@celery.task()
def swapPolarityTask(optoboardPosition, device, TxRx, link):
if TxRx == "Tx":
eval('optoboard_dic["' + optoboardPosition + '"].' + device).tx_polarity_swap(link)
task_error_catcher(eval('optoboard_dic["' + optoboardPosition + '"].' + device).tx_polarity_swap,link)
return "Tx polarity swapped"
if TxRx == "Rx":
eval('optoboard_dic["' + optoboardPosition + '"].' + device).rx_polarity_swap(link)
task_error_catcher(eval('optoboard_dic["' + optoboardPosition + '"].' + device).rx_polarity_swap,link)
return "Rx polarity swapped"
@celery.task()
def setGCBREqualizationTask(optoboardPosition, device, channel, mfreq, hfreq):
eval('optoboard_dic["' + optoboardPosition + '"].' + device).set_equalizer(channel, mfreq, hfreq)
task_error_catcher(eval('optoboard_dic["' + optoboardPosition + '"].' + device).set_equalizer,channel, mfreq, hfreq)
return "GBCR equalization updated"
@celery.task()
def setLpgbtPhaseTask(optoboardPosition, device, phaseMode, group, phase):
eval('optoboard_dic["' + optoboardPosition + '"].' + device).set_phase_mode(phaseMode, group, phase)
task_error_catcher(eval('optoboard_dic["' + optoboardPosition + '"].' + device).set_phase_mode,phaseMode, group, phase)
return "Phase mode updated"
@celery.task()
@@ -260,5 +258,5 @@ def readLogTasks():
@celery.task()
def configureI2CControllerTask(optoboardPosition, device):
eval('optoboard_dic["' + optoboardPosition + '"].' + device).configure_I2C_controller()
task_error_catcher(eval('optoboard_dic["' + optoboardPosition + '"].' + device).configure_I2C_controller())
return "Configuration of I2C controller completed"
Loading