diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml index ab875d702baa42c4863a5fee5989d01b4d728dbd..393f99def5847b50d6e4a53302507e2b7767976a 100644 --- a/.idea/inspectionProfiles/Project_Default.xml +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -3,7 +3,7 @@ <option name="myName" value="Project Default" /> <inspection_tool class="DuplicatedCode" enabled="true" level="WEAK WARNING" enabled_by_default="true"> <Languages> - <language minSize="146" name="Python" /> + <language minSize="213" name="Python" /> </Languages> </inspection_tool> <inspection_tool class="PyUnresolvedReferencesInspection" enabled="true" level="WARNING" enabled_by_default="true"> diff --git a/README.md b/README.md index c36b31beaa26974b1056a48190dab0f3e87c6b95..860348fa044b01175a06af8c8719ed20bd93ecb1 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,24 @@ -# BiBl NXCALS Property Pull +# BLM NXCALS Property Pull Property pull is a Python project for the Cern's BLM section. It get precise data property for diamond devices by NXCALS and store it in lighter format on a MYSQL DB. In order to be used in other application. +A routine script have been done in order to get the beamMode data +from pytimber and pull the data accordingly. + +The final purpose is that it run daily on a server with a cronJob. + +An API may be develop in the future for precise data pull without DB writing. +Meaning an instant return (JSON format) of data required. + ## Installation For now, it only work on linux OS and Windows Professional, and not tested on MacOS. + +At the time of writing, `python3.6` is required to run the CERN's +Spark bundle propertly. In order to run it, you have to Download the CERN's Spark bundle and set up a working CERN's NXCALS environment. @@ -15,12 +26,13 @@ See on <a href="nxcals-docs.web.cern.ch"> NXCALS Documentation</a>. You need then to export 2 environment variables for Spark & Python. -For linux (Ubuntu) +For linux (Ubuntu tested) ``` bash export SPARK_HOME=<location_of_the_bundle>/spark-<version>-bin-hadoop<version> export PYTHONPATH=$SPARK_HOME/nxcals-python3-env/lib/python3.6/site-packages ``` -For windows into shell with admin rights +For windows +<br> Into shell with admin rights ```shell script setx SPARK_HOME <location_of_the_bundle>/spark-<version>-bin-hadoop<version> setx PYTHON_PATH %SPARK_HOME%/nxcals-python3-env/lib/python3.6/site-packages @@ -38,7 +50,7 @@ the nxcals-hdoop-pro-config<version>.jar that you can find at Know that all the links and directory path are up to date at the time of writing. It may have been some updates since. -After that you need to create a 'pyconf.py' file in order to create +After that you need to create a `pyconf.py` file in order to create mysql configuration. The template is : ```python mysql = { diff --git a/property_pull.py b/property_pull.py index 9c995331eb958fc2ae66500722c58d5931ec987f..d82725ce7d9e7156baebb46c22376e689c5e0d52 100644 --- a/property_pull.py +++ b/property_pull.py @@ -17,6 +17,7 @@ import pandas as pd _dest_dir = './pulledData' _files = False +_db_write = True # mysqlDB variables mydb = mysql.connector.connect( @@ -44,7 +45,6 @@ sums_cols = udf(lambda arr: 0 if arr == [] else sum(arr), IntegerType()) ''' #TODO - -# Check compression ? Done => MB optimisation using numpy # Reduce ( UDAF ? ) # Scala UDF ? (working with python's lambda for now ) ''' @@ -61,20 +61,6 @@ def write_file(_device, _start_time, _end_time, _property, data): + _end_time.replace(' ', '_') + '_' + _property + '.dat' + " Pulled") -def uncompress(_data_array, _index_array, _size, _nrows): - print('uncompress start') - result = np.zeros(_size, dtype=np.int) - for i in range(_nrows): - tmpresult = np.zeros(_size, dtype=np.int) - ind = 0 - for index in _index_array[i]['elements']: - tmpresult[index] = _data_array[i]['elements'][ind] - ind += 1 - result = np.add(result, tmpresult) - print("uncompress finished") - return result.tolist() - - def uncompress_pandas(_data_array, _index_array, _size, _nrows): print('uncompress start') result = np.zeros(_size, dtype=np.int64) @@ -158,8 +144,9 @@ def pull_histogram_pandas(_fill_name, _mode, _device, _start_time, _end_time): 57802, data['rows'] - 1) # Writing to db - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE data SET histogram = %s where id = %s', (json.dumps(data), data_id,)) + if _db_write: + db_cursor = mydb.cursor() + db_cursor.execute('UPDATE data SET histogram = %s where id = %s', (json.dumps(data), data_id,)) write_file(_device, _start_time, _end_time, _property, data) @@ -240,8 +227,9 @@ def pull_integral_pandas(_fill_name, _mode, _device, _start_time, _end_time): data['integral_lg'] = integral_lg.tolist() # Writing to db - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE data SET integral = %s where id = %s', (json.dumps(data), data_id,)) + if _db_write: + db_cursor = mydb.cursor() + db_cursor.execute('UPDATE data SET integral = %s where id = %s', (json.dumps(data), data_id,)) write_file(_device, _start_time, _end_time, _property, data) @@ -311,8 +299,9 @@ def pull_raw_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time): data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() # Writing to db - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE data SET raw_dist = %s where id = %s', (json.dumps(data), data_id,)) + if _db_write: + db_cursor = mydb.cursor() + db_cursor.execute('UPDATE data SET raw_dist = %s where id = %s', (json.dumps(data), data_id,)) write_file(_device, _start_time, _end_time, _property, data) @@ -386,8 +375,9 @@ def pull_integral_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() # Writing to db - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE data SET integral_dist = %s where id = %s', (json.dumps(data), data_id,)) + if _db_write: + db_cursor = mydb.cursor() + db_cursor.execute('UPDATE data SET integral_dist = %s where id = %s', (json.dumps(data), data_id,)) write_file(_device, _start_time, _end_time, _property, data) @@ -467,430 +457,27 @@ def pull_turnloss_pandas(_fill_name, _mode, _device, _start_time, _end_time): data['nturns'], data['rows']) # Writing to db - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE data SET turnloss = %s where id = %s', (json.dumps(data), data_id,)) - - write_file(_device, _start_time, _end_time, _property, data) - - -def pull_histogram(_start_time, _end_time, _device): - _property = 'LoggingHistogram' - - data = {} - - df = KeyValuesQuery.builder(spark).system("CMW") \ - .startTime(_start_time) \ - .endTime(_end_time) \ - .entity() \ - .keyValue("device", _device) \ - .keyValue("property", _property) \ - .buildDataset() - - df.printSchema() - - # df_columns = df.columns - - # Process hg array data - array_dataset_hg = df.select("histogram_hg_com.elements") - array_dataset_hg.printSchema() - - array_index_hg = df.select("histogram_hg_ind.elements") - - intime_array_hg = array_dataset_hg.withColumn('Total', sums_cols(col('elements'))).select('Total') - intime_array_hg.printSchema() - - # Process lg array data - array_dataset_lg = df.select("histogram_lg_com.elements") - array_dataset_lg.printSchema() - - array_index_lg = df.select("histogram_lg_ind.elements") - - intime_array_lg = array_dataset_lg.withColumn('Total', sums_cols(col('elements'))).select('Total') - intime_array_lg.printSchema() - - # Datas # - data['rows'] = array_dataset_hg.count() - data['startTime'] = _start_time - data['endTime'] = _end_time - - data['threshold_hg'] = df.select('threshold_hg').collect() - if len(data['threshold_hg']) > 0: - data['threshold_hg'] = data['threshold_hg'][0] - - data['threshold_lg'] = df.select('threshold_lg').collect() - if len(data['threshold_lg']) > 0: - data['threshold_lg'] = data['threshold_lg'][0] - - # Timestamp # - time_stamp = df.select("__record_timestamp__") - result_time_stamp = time_stamp.collect() - data['timestamps'] = [] - for TS in result_time_stamp: - data['timestamps'].append(TS["__record_timestamp__"]) - - # Intime Data # - result_intime_hg = intime_array_hg.collect() - data['histogram_hg_intime'] = [] - for IT in result_intime_hg: - data['histogram_hg_intime'].append(IT["Total"]) - - result_intime_lg = intime_array_lg.collect() - data['histogram_lg_intime'] = [] - for IT in result_intime_lg: - data['histogram_lg_intime'].append(IT['Total']) - - # Histogram data # - result_array_dataset_hg = array_dataset_hg.collect() - result_array_index_hg = array_index_hg.collect() - data['histogram_hg'] = uncompress(result_array_dataset_hg, result_array_index_hg, 57802, data['rows']) - - result_array_dataset_lg = array_dataset_lg.collect() - result_array_index_lg = array_index_lg.collect() - data['histogram_lg'] = uncompress(result_array_dataset_lg, result_array_index_lg, 57802, data['rows']) - - # Writing to db - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE data SET histogram = %s where id = %s', (json.dumps(data), data_id,)) - - write_file(_device, _start_time, _end_time, _property, data) - - -def pull_integral(_start_time, _end_time, _device): - _property = "LoggingIntegral" - - data = {} - - df = KeyValuesQuery.builder(spark).system("CMW") \ - .startTime(_start_time) \ - .endTime(_end_time) \ - .entity() \ - .keyValue("device", _device) \ - .keyValue("property", _property) \ - .buildDataset() - - df.printSchema() - data['rows'] = df.count() - - # Process hg Data # - array_dataset_hg = df.select("integral_hg.elements").limit(data['rows'] - 1) - array_dataset_hg.printSchema() - - intime_array_hg = array_dataset_hg.withColumn('Total', sums_cols(col('elements'))).select('Total') - intime_array_hg.printSchema() - - # Process lg Data # - array_dataset_lg = df.select("integral_lg.elements").limit(data['rows'] - 1) - array_dataset_lg.printSchema() - - intime_array_lg = array_dataset_lg.withColumn('Total', sums_cols(col('elements'))).select('Total') - intime_array_lg.printSchema() - - # DATA # - data['startTime'] = _start_time - data['endTime'] = _end_time - - data['zsthr_hg'] = df.select('zsthr_hg').collect() - if len(data['zsthr_hg']) > 0: - data['zsthr_hg'] = data['zsthr_hg'][0] - - data['zsthrave_hg'] = df.select('zsthrave_hg').collect() - if len(data['zsthrave_hg']) > 0: - data['zsthrave_hg'] = data['zsthrave_hg'][0] - - data['zsthr_lg'] = df.select('zsthr_lg').collect() - if len(data['zsthr_lg']) > 0: - data['zsthr_lg'] = data['zsthr_lg'][0] - - data['zsthrave_lg'] = df.select('zsthrave_lg').collect() - if len(data['zsthrave_lg']) > 0: - data['zsthrave_lg'] = data['zsthrave_lg'][0] - - data['baselinesub'] = df.select('baselinesub').collect() - if len(data['baselinesub']) > 0: - data['baselinesub'] = data['baselinesub'][0] - - # Intime Data # - result_intime_hg = intime_array_hg.collect() - data['integral_hg_intime'] = [] - for IT in result_intime_hg: - data['integral_hg_intime'].append(IT["Total"]) - - result_intime_lg = intime_array_lg.collect() - data['integral_lg_intime'] = [] - for IT in result_intime_lg: - data['integral_lg_intime'].append(IT['Total']) - - # Timestamp # - time_stamp = df.select("__record_timestamp__") - result_time_stamp = time_stamp.collect() - data['timestamps'] = [] - for TS in result_time_stamp: - data['timestamps'].append(TS["__record_timestamp__"]) - - # Integral Data - integral_hg = np.zeros(3564, dtype=np.int) - result_array_integral_hg = array_dataset_hg.collect() - for T in result_array_integral_hg: - if len(T) == 3564: - integral_hg = np.add(integral_hg, T['elements']) - data['integral_hg'] = integral_hg.tolist() - - integral_lg = np.zeros(3564, dtype=np.int) - result_array_integral_lg = array_dataset_lg.collect() - for T in result_array_integral_lg: - if len(T) == 3564: - integral_lg = np.add(integral_lg, T['elements']) - data['integral_lg'] = integral_lg.tolist() - - # Writing to db - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE data SET integral = %s where id = %s', (json.dumps(data), data_id,)) - - write_file(_device, _start_time, _end_time, _property, data) - - -def pull_raw_dist(_start_time, _end_time, _device): - _property = "LoggingRawDist" - - data = {} - - df = KeyValuesQuery.builder(spark).system("CMW") \ - .startTime(_start_time) \ - .endTime(_end_time) \ - .entity() \ - .keyValue("device", _device) \ - .keyValue("property", _property) \ - .buildDataset() - - df.printSchema() - data['rows'] = df.count() - - # Process hg Data # - array_dataset_hg = df.select("distribution_hg.elements").limit(data['rows'] - 1) - array_dataset_hg.printSchema() - - # Process lg Data # - array_dataset_lg = df.select("distribution_lg.elements").limit(data['rows'] - 1) - array_dataset_lg.printSchema() - - # DATA # - data['rows'] = df.count() - data['startTime'] = _start_time - data['endTime'] = _end_time - - data['lsbcut_hg'] = df.select('lsbcut_hg').collect() - if len(data['lsbcut_hg']) > 0: - data['lsbcut_hg'] = data['lsbcut_hg'][0] - - data['offset_hg'] = df.select('offset_hg').collect() - if len(data['offset_hg']) > 0: - data['offset_hg'] = data['offset_hg'][0] - - data['lsbcut_lg'] = df.select('lsbcut_lg').collect() - if len(data['lsbcut_lg']) > 0: - data['lsbcut_lg'] = data['lsbcut_lg'][0] - - data['offset_lg'] = df.select('offset_lg').collect() - if len(data['offset_lg']) > 0: - data['offset_lg'] = data['offset_lg'][0] - - # Array Data # - result_hg = array_dataset_hg.collect() - result_distrib_hg = np.zeros(2048, dtype=np.int) - for R in result_hg: - result_distrib_hg = np.add(result_distrib_hg, R['elements']) - data['distribution_hg'] = result_distrib_hg.tolist() - - result_lg = array_dataset_hg.collect() - result_distrib_lg = np.zeros(2048, dtype=np.int) - for R in result_lg: - result_distrib_lg = np.add(result_distrib_lg, R['elements']) - data['distribution_lg'] = result_distrib_lg.tolist() - - # Timestamp # - time_stamp = df.select("__record_timestamp__") - result_time_stamp = time_stamp.collect() - data['timestamps'] = [] - for TS in result_time_stamp: - data['timestamps'].append(TS["__record_timestamp__"]) - - # Writing to db - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE data SET raw_dist = %s where id = %s', (json.dumps(data), data_id,)) - - write_file(_device, _start_time, _end_time, _property, data) - - -def pull_integral_dist(_start_time, _end_time, _device): - _property = "LoggingIntegralDist" - - data = {} - - df = KeyValuesQuery.builder(spark).system("CMW") \ - .startTime(_start_time) \ - .endTime(_end_time) \ - .entity() \ - .keyValue("device", _device) \ - .keyValue("property", _property) \ - .buildDataset() - - df.printSchema() - data['rows'] = df.count() - - # Process hg Data # - array_dataset_hg = df.select("distribution_hg.elements").limit(data['rows'] - 1) - array_dataset_hg.printSchema() - - # Process lg Data # - array_dataset_lg = df.select("distribution_lg.elements").limit(data['rows'] - 1) - array_dataset_lg.printSchema() - - # DATA # - data['startTime'] = _start_time - data['endTime'] = _end_time - - data['lsbcut_hg'] = df.select('lsbcut_hg').collect() - if len(data['lsbcut_hg']) > 0: - data['lsbcut_hg'] = data['lsbcut_hg'][0] - - data['offset_hg'] = df.select('offset_hg').collect() - if len(data['offset_hg']) > 0: - data['offset_hg'] = data['offset_hg'][0] - - data['lsbcut_lg'] = df.select('lsbcut_lg').collect() - if len(data['lsbcut_lg']) > 0: - data['lsbcut_lg'] = data['lsbcut_lg'][0] - - data['offset_lg'] = df.select('offset_lg').collect() - if len(data['offset_lg']) > 0: - data['offset_lg'] = data['offset_lg'][0] - - data['nturns'] = df.select('nturns').collect() - if len(data['nturns']) > 0: - data['nturns'] = data['nturns'][0] - - # Array Data # - result_hg = array_dataset_hg.collect() - result_distrib_hg = np.zeros(2048, dtype=np.int) - for R in result_hg: - result_distrib_hg = np.add(result_distrib_hg, R['elements']) - data['distribution_hg'] = result_distrib_hg.tolist() - - result_lg = array_dataset_lg.collect() - result_distrib_lg = np.zeros(2048, dtype=np.int) - for R in result_lg: - if len(R) == 2048: - result_distrib_lg = np.add(result_distrib_lg, R['elements']) - data['distribution_lg'] = result_distrib_lg.tolist() - - # Timestamp # - time_stamp = df.select("__record_timestamp__") - result_time_stamp = time_stamp.collect() - data['timestamps'] = [] - for TS in result_time_stamp: - data['timestamps'].append(TS["__record_timestamp__"]) - - # Writing to db - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE data SET integral_dist = %s where id = %s', (json.dumps(data), data_id,)) + if _db_write: + db_cursor = mydb.cursor() + db_cursor.execute('UPDATE data SET turnloss = %s where id = %s', (json.dumps(data), data_id,)) write_file(_device, _start_time, _end_time, _property, data) -def pull_turnloss(_start_time, _end_time, _device): - _property = "LoggingTurnLoss" - - data = {} - - df = KeyValuesQuery.builder(spark).system("CMW") \ - .startTime(_start_time) \ - .endTime(_end_time) \ - .entity() \ - .keyValue("device", _device) \ - .keyValue("property", _property) \ - .buildDataset() - - df.printSchema() - data['rows'] = df.count() - print(data['rows']) - - # Process hg Data # - array_dataset_hg = df.select("turnloss_hg_com.elements") - array_dataset_hg.printSchema() - - array_dataset_hg_com1 = df.select("turnloss_hg_com.elements").limit(int(data['rows'] / 4)) - # array_dataset_hg_com2 = df.select("turnloss_hg_com.elements").limit(int(data['rows'] / 2)) - array_index_hg = df.select("turnloss_hg_ind.elements") - - intime_array_hg = array_dataset_hg.withColumn('Total', sums_cols(col('elements'))).select('Total') - intime_array_hg.printSchema() - - # Process lg Data # - array_dataset_lg = df.select("turnloss_lg_com.elements") - array_dataset_lg.printSchema() - - array_index_lg = df.select("turnloss_lg_ind.elements") - - intime_array_lg = array_dataset_lg.withColumn('Total', sums_cols(col('elements'))).select('Total') - intime_array_lg.printSchema() - - # DATA # - data['startTime'] = _start_time - data['endTime'] = _end_time - data['nturns'] = df.select('nturns').collect() - if len(data['nturns']) > 0: - data['nturns'] = data['nturns'][0] - - data['zsthr_hg'] = df.select('zsthr_hg').collect() - if len(data['zsthr_hg']) > 0: - data['zsthr_hg'] = data['zsthr_hg'][0] +def property_pull(_fill_name, _mode, _device, _start, _end, _files_input=False, _db_write_input=True): + global _files + _files = _files_input - data['zsthr_lg'] = df.select('zsthr_lg').collect() - if len(data['zsthr_lg']) > 0: - data['zsthr_lg'] = data['zsthr_lg'][0] + global _db_write + _db_write = _db_write_input - data['baselinesub'] = df.select('baselinesub').collect() - if len(data['baselinesub']) > 0: - data['baselinesub'] = data['baselinesub'][0] - - # Timestamp # - time_stamp = df.select("__record_timestamp__") - result_time_stamp = time_stamp.collect() - data['timestamps'] = [] - for TS in result_time_stamp: - data['timestamps'].append(TS["__record_timestamp__"]) - - # Intime Data # - result_hg = intime_array_hg.collect() - data['turnloss_hg_intime'] = [] - for IT in result_hg: - data['turnloss_hg_intime'].append(IT['Total']) - - result_lg = intime_array_lg.collect() - data['turnloss_lg_intime'] = [] - for IT in result_lg: - data['turnloss_lg_intime'].append(IT['Total']) - - # Turnloss Data - result_array_dataset_hg = array_dataset_hg_com1.collect() - result_array_index_hg = array_index_hg.collect() - data['turnloss_hg'] = uncompress(result_array_dataset_hg, result_array_index_hg, data['nturns'], data['rows']) - - ''' - result_array_dataset_lg = array_dataset_lg.collect() - result_array_index_lg = array_index_lg.collect() - data['turnloss_lg'] = uncompress(result_array_dataset_lg, result_array_index_lg, 57802, data['rows']) - ''' - # Writing to db - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE data SET turnloss = %s where id = %s', (json.dumps(data), data_id,)) - - write_file(_device, _start_time, _end_time, _property, data) - - -def property_pull(_fill_name, _mode, _device, _start, _end): gcursor = mydb.cursor() + gcursor.execute('select count(*) from mode where name=%s', (_mode, )) + mode_exists = gcursor.fetchone()[0] + if mode_exists == 0: + print('Mode ' + _mode + ' ') + gcursor.execute('SELECT id FROM device where name=%s limit 1', (_device,)) device_id = gcursor.fetchone()[0] @@ -903,7 +490,8 @@ def property_pull(_fill_name, _mode, _device, _start, _end): print('Data for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end + ' already pulled') return -1 - print('Pulling data for fill ' + str(_fill_name) + ' for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end) + print('Pulling data for fill ' + + str(_fill_name) + ' for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end) global data_id gcursor.execute("insert into nxcals_blm_data.data(histogram, integral, integral_dist, raw_dist, turnloss) " @@ -922,7 +510,8 @@ def property_pull(_fill_name, _mode, _device, _start, _end): pull_turnloss_pandas(_fill_name, _mode, _device, _start, _end) mydb.commit() - print('Pulled done, data pushed on DB') + print('Pulled done, data pushed on DB for fill' + + str(_fill_name) + ' for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end) if __name__ == '__main__': diff --git a/routine_pull.py b/routine_pull.py new file mode 100644 index 0000000000000000000000000000000000000000..528c10493002a393012d4a5dcdf35901fcf606e2 --- /dev/null +++ b/routine_pull.py @@ -0,0 +1,108 @@ +import mysql.connector +import time + +import property_pull +import pytimber as pytimber +import datetime as dt + +# from multiprocessing import Process + +# mysqlDB variables +import pyconf as cfg + +mydb = mysql.connector.connect( + host=cfg.mysql['host'], + port=cfg.mysql['port'], + user=cfg.mysql['user'], + passwd=cfg.mysql['passwd'], + database=cfg.mysql['database'] +) + + +def start_pull(_fill_infos, _devices=None): + if _devices is None: + _devices = ['BLMED.06L7.B1B30_TCPA.A6L7.B1'] + + print(_fill_infos) + if _fill_infos["endTime"] is None: + return -1 + + for iDevice in _devices: + + property_pull.property_pull(_fill_infos['fillNumber'], 'ALL', iDevice, + _fill_infos['startTime'].strftime("%Y-%m-%d %H:%M:%S.000"), + _fill_infos['endTime'].strftime("%Y-%m-%d %H:%M:%S.000")) + + for iMode in _fill_infos["beamModes"]: + print(iMode['mode']) + property_pull.property_pull(_fill_infos['fillNumber'], iMode['mode'], iDevice, + iMode['startTime'].strftime("%Y-%m-%d %H:%M:%S.000"), + iMode['endTime'].strftime("%Y-%m-%d %H:%M:%S.000")) + + '''p = Process(target=property_pull.property_pull, + args=(fill_infos['fillNumber'], iMode['mode'], 'BLMED.06L7.B1B30_TCPA.A6L7.B1', + iMode['startTime'].strftime("%y-%m-%d %H:%M:%S.000"), + iMode['endTime'].strftime("%y-%m-%d %H:%M:%S.000"))) + p.start() + p.join() + ''' + + +def routine_pull(_start_fill=None, _end_fill=None): + ldb = pytimber.LoggingDB() + gcursor = mydb.cursor() + _start_fill = None + _end_fill = None + + if _start_fill is None and _end_fill is None: + res = ldb.get('HX:FILLN', dt.datetime.now()) + new_fill_number = int(res['HX:FILLN'][1][0]) + print(new_fill_number) + + gcursor.execute('select max(name) from fill limit 1;') + last_fill_pulled = gcursor.fetchone()[0] + print(last_fill_pulled) + + if new_fill_number == last_fill_pulled: + print('already pulled last fill') + return -1 + else: + new_fill_number = _end_fill + last_fll_pulled = _start_fill - 1 + + # Select only the first 6 devices Cause: not all device works properly with NXCALS yet + gcursor.execute('select name from device limit 6') + devices_request = gcursor.fetchall() + devices = [] + for device in devices_request: + devices.append(device[0]) + + gcursor.close() + mydb.close() + + fill_number = 7472 + fill_infos = ldb.getLHCFillData(fill_number, False) + start_pull(fill_infos, devices) + + ''' + for iFill in range(last_fill_pulled + 1, new_fill_number): + fill_infos = ldb.getLHCFillData(iFill, False) + + # check if the fill is finished + if fill_infos['endTime'] is None: + break + + # check if data is too new + midnight_stamp = dt.datetime.now().replace(second=0, minute=0, hour=0) - dt.timedelta(days=1) + if time.mktime(midnight_stamp.timetuple()) < fill_infos['endTime']: + continue + + start_pull(fill_infos, devices) + ''' + + +if __name__ == '__main__': + routine_pull() + + +