diff --git a/.gitignore b/.gitignore index db08384a02fc7f82f2e64190fe0b22def6ca7206..e6d0ca6c5e3b155affde8a3b4a0703194ac00de0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,6 @@ -pyconf.py \ No newline at end of file +pyconf.py + +pulledData/ +plotOutput/ +prop_pull_api/ \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..0ed6303a9dea2723de409e333ac17d3ca4e586ab --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,6 @@ +# Default ignored files +/workspace.xml + +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000000000000000000000000000000000000..393f99def5847b50d6e4a53302507e2b7767976a --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,17 @@ +<component name="InspectionProjectProfileManager"> + <profile version="1.0"> + <option name="myName" value="Project Default" /> + <inspection_tool class="DuplicatedCode" enabled="true" level="WEAK WARNING" enabled_by_default="true"> + <Languages> + <language minSize="213" name="Python" /> + </Languages> + </inspection_tool> + <inspection_tool class="PyUnresolvedReferencesInspection" enabled="true" level="WARNING" enabled_by_default="true"> + <option name="ignoredIdentifiers"> + <list> + <option value="numpy.*" /> + </list> + </option> + </inspection_tool> + </profile> +</component> \ No newline at end of file diff --git a/README.md b/README.md index 0e95fc8961c590b2295e9f6afe84856536519a94..7c2baed62d27429d66a99ee052b51907776910bc 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,58 @@ -# Property Pull +# BLM NXCALS Property Pull Property pull is a Python project for the Cern's BLM section. It get precise data property for diamond devices by NXCALS and store it in lighter format on a MYSQL DB. In order to be used in other application. +A routine script have been done in order to get the beamMode data +from pytimber and pull the data accordingly. + +The final purpose is that it run daily on a server with a cronJob. + +An API may be develop in the future for precise data pull without DB writing. +Meaning an instant return (JSON format) of data required. + ## Installation +For now, it only work on linux OS and Windows Professional, +and not tested on MacOS. + +At the time of writing, `python3.6` is required to run the CERN's +Spark bundle propertly. + +In order to run it, you have to Download the CERN's Spark bundle +and set up a working CERN's NXCALS environment. +See on <a href="nxcals-docs.web.cern.ch"> NXCALS Documentation</a>. + +You need then to export 2 environment variables for Spark & Python. + +For linux (Ubuntu tested) +``` bash +export SPARK_HOME=<location_of_the_bundle>/spark-<version>-bin-hadoop<version> +export PYTHONPATH=$SPARK_HOME/nxcals-python3-env/lib/python3.6/site-packages +``` +For windows +<br> Into shell with admin rights +```shell script +setx SPARK_HOME <location_of_the_bundle>/spark-<version>-bin-hadoop<version> +setx PYTHON_PATH %SPARK_HOME%/nxcals-python3-env/lib/python3.6/site-packages +``` +Then install pyspark packages on python if not already done. +```shell script +pip install pyspark +``` + +Maybe you would have to replace the nxcals-jars directory with the +one you can find into the bundle at `<bundle_path>/nxcals-jars` and also +the nxcals-hdoop-pro-config<version>.jar that you can find at +`<bundle_path>/jars/nxcals-hadoop-pro-config-<version>.jar`. + +Know that all the links and directory path are up to date at the time of writing. +It may have been some updates since. + +You can ask this info by creating an issue on this repo gitLab or by sending +a mail to a manager of this project with your name, section and why you want +to use the project. ## Usage diff --git a/property-pull.py b/property-pull.py deleted file mode 100644 index c4eb892aa5f5712ccf399a958a61c61a20168797..0000000000000000000000000000000000000000 --- a/property-pull.py +++ /dev/null @@ -1,505 +0,0 @@ -import json -import sys - -import mysql.connector -import pyconf as cfg - -from cern.nxcals.pyquery.builders import * -from pyspark import SparkConf -from pyspark import SparkContext -from pyspark.sql import * -from pyspark.accumulators import AccumulatorParam -from pyspark.sql.functions import udf, col -from pyspark.sql.types import IntegerType - -import numpy as np - -# deprecated, used for file storing -_dest_dir = '.' - -# mysqlDB variables -mydb = mysql.connector.connect( - host=cfg.mysql['host'], - port=cfg.mysql['port'], - user=cfg.mysql['user'], - passwd=cfg.mysql['passwd'], - database=cfg.mysql['database'] -) - -# id of the sql data's row for this filling -data_id = -1 - -# Sparks Variable -conf = '' - -sc = '' -sqlContext = '' -spark = '' - -# sumUDF -sums_cols = udf(lambda arr: 0 if arr == [] else __builtins__.sum(arr), IntegerType()) - -''' -#TODO - -# Check compression ? Done => MB optimisation using numpy -# Reduce ( UDAF ? ) -# Scala UDF ? (working with python's lambda for now ) -''' - - -def uncompress(_data_array, _index_array, _size, _nrows): - print('uncompress start') - result = np.zeros(_size, dtype=np.int) - for i in range(_nrows): - tmpresult = np.zeros(_size, dtype=np.int) - ind = 0 - for index in _index_array[i]['elements']: - tmpresult[index] = _data_array[i]['elements'][ind] - ind += 1 - result = np.add(result, tmpresult) - print("uncompress finished") - return result.tolist() - - -def pull_histogram(_start_time, _end_time, _device): - _property = 'LoggingHistogram' - - data = {} - - df = KeyValuesQuery.builder(spark).system("CMW") \ - .startTime(_start_time) \ - .endTime(_end_time) \ - .entity() \ - .keyValue("device", _device) \ - .keyValue("property", _property) \ - .buildDataset() - - df.printSchema() - - df_columns = df.columns - print(df_columns) - - # Process hg array data - array_dataset_hg = df.select("histogram_hg_com.elements") - array_dataset_hg.printSchema() - - array_index_hg = df.select("histogram_hg_ind.elements") - - intime_array_hg = array_dataset_hg.withColumn('Total', sums_cols(col('elements'))).select('Total') - intime_array_hg.printSchema() - - # Process lg array data - array_dataset_lg = df.select("histogram_lg_com.elements") - array_dataset_lg.printSchema() - - array_index_lg = df.select("histogram_lg_ind.elements") - - intime_array_lg = array_dataset_lg.withColumn('Total', sums_cols(col('elements'))).select('Total') - intime_array_lg.printSchema() - - # Datas # - data['rows'] = array_dataset_hg.count() - data['startTime'] = _start_time - data['endTime'] = _end_time - data['treshold_hg'] = df.select('threshold_hg').collect()[0] - data['treshold_lg'] = df.select('threshold_lg').collect()[0] - - # Timestamp # - time_stamp = df.select("__record_timestamp__") - result_time_stamp = time_stamp.collect() - data['timestamps'] = [] - for TS in result_time_stamp: - data['timestamps'].append(TS["__record_timestamp__"]) - - # Intime Data # - result_intime_hg = intime_array_hg.collect() - data['histogram_hg_intime'] = [] - for IT in result_intime_hg: - data['histogram_hg_intime'].append(IT["Total"]) - - result_intime_lg = intime_array_lg.collect() - data['histogram_lg_intime'] = [] - for IT in result_intime_lg: - data['histogram_lg_intime'].append(IT['Total']) - - # Histogram data # - result_array_dataset_hg = array_dataset_hg.collect() - result_array_index_hg = array_index_hg.collect() - data['histogram_hg'] = uncompress(result_array_dataset_hg, result_array_index_hg, 57802, data['rows']) - - result_array_dataset_lg = array_dataset_lg.collect() - result_array_index_lg = array_index_lg.collect() - data['histogram_lg'] = uncompress(result_array_dataset_lg, result_array_index_lg, 57802, data['rows']) - - # Writing to db - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE data SET histogram = %s where id = %s', (json.dumps(data), data_id,)) - - # Writing output - with open(_dest_dir + '/' + _device + '_' + _start_time.replace(' ', '_') + 'to' - + _end_time.replace(' ', '_') + '_' + _property + '.dat', 'w') as outfile: - json.dump(data, outfile) - - print(_dest_dir + '/' + _device + '_' + _start_time.replace(' ', '_') + 'to' - + _end_time.replace(' ', '_') + '_' + _property + '.dat' + " Pulled") - - -def pull_integral(_start_time, _end_time, _device): - _property = "LoggingIntegral" - - data = {} - - df = KeyValuesQuery.builder(spark).system("CMW") \ - .startTime(_start_time) \ - .endTime(_end_time) \ - .entity() \ - .keyValue("device", _device) \ - .keyValue("property", _property) \ - .buildDataset() - - df.printSchema() - - # Process hg Data # - array_dataset_hg = df.select("integral_hg.elements") - array_dataset_hg.printSchema() - - intime_array_hg = array_dataset_hg.withColumn('Total', sums_cols(col('elements'))).select('Total') - intime_array_hg.printSchema() - - # Process lg Data # - array_dataset_lg = df.select("integral_lg.elements") - array_dataset_lg.printSchema() - - intime_array_lg = array_dataset_lg.withColumn('Total', sums_cols(col('elements'))).select('Total') - intime_array_lg.printSchema() - - # DATA # - data['rows'] = df.count() - data['startTime'] = _start_time - data['endTime'] = _end_time - data['zsthr_hg'] = df.select('zsthr_hg').collect()[0] - data['zsthrave_hg'] = df.select('zsthrave_hg').collect()[0] - data['zsthr_lg'] = df.select('zsthr_lg').collect()[0] - data['zsthrave_lg'] = df.select('zsthrave_lg').collect()[0] - data['baselinesub'] = df.select('baselinesub').collect()[0] - - # Intime Data # - result_intime_hg = intime_array_hg.collect() - data['integral_hg_intime'] = [] - for IT in result_intime_hg: - data['integral_hg_intime'].append(IT["Total"]) - - result_intime_lg = intime_array_lg.collect() - data['integral_lg_intime'] = [] - for IT in result_intime_lg: - data['integral_lg_intime'].append(IT['Total']) - - # Timestamp # - time_stamp = df.select("__record_timestamp__") - result_time_stamp = time_stamp.collect() - data['timestamps'] = [] - for TS in result_time_stamp: - data['timestamps'].append(TS["__record_timestamp__"]) - - # Integral Data - integral_hg = np.zeros(3564, dtype=np.int) - result_array_integral_hg = array_dataset_hg.collect() - for T in result_array_integral_hg: - if len(T) == 3564: - integral_hg = np.add(integral_hg, T['elements']) - data['integral_hg'] = integral_hg.tolist() - - integral_lg = np.zeros(3564, dtype=np.int) - result_array_integral_lg = array_dataset_lg.collect() - for T in result_array_integral_lg: - if len(T) == 3564: - integral_lg = np.add(integral_lg, T['elements']) - data['integral_lg'] = integral_lg.tolist() - - # Writing to db - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE data SET integral = %s where id = %s', (json.dumps(data), data_id,)) - - # Writing output # - with open(_dest_dir + '/' + _device + '_' + _start_time.replace(' ', '_') + 'to' - + _end_time.replace(' ', '_') + '_' + _property + '.dat', 'w') as outfile: - json.dump(data, outfile) - - print(_dest_dir + '/' + _device + '_' + _start_time.replace(' ', '_') + 'to' - + _end_time.replace(' ', '_') + '_' + _property + '.dat' + " Pulled") - - -# Not Working with too mny dat (dump memory) -# MB split the search (end / start ) -def pull_raw_dist(_start_time, _end_time, _device): - _property = "LoggingRawDist" - - data = {} - - df = KeyValuesQuery.builder(spark).system("CMW") \ - .startTime(_start_time) \ - .endTime(_end_time) \ - .entity() \ - .keyValue("device", _device) \ - .keyValue("property", _property) \ - .buildDataset() - - df.printSchema() - - # Process hg Data # - array_dataset_hg = df.select("distribution_hg.elements") - array_dataset_hg.printSchema() - - # Process lg Data # - array_dataset_lg = df.select("distribution_lg.elements") - array_dataset_lg.printSchema() - - # DATA # - data['rows'] = df.count() - data['startTime'] = _start_time - data['endTime'] = _end_time - data['lsbcut_hg'] = df.select('lsbcut_hg').collect()[0] - data['offset_hg'] = df.select('offset_hg').collect()[0] - data['lsbcut_lg'] = df.select('lsbcut_lg').collect()[0] - data['offset_lg'] = df.select('offset_lg').collect()[0] - - # Array Data # - result_hg = array_dataset_hg.collect() - result_distrib_hg = np.zeros(2048, dtype=np.int) - for R in result_hg: - result_distrib_hg = np.add(result_distrib_hg, R['elements']) - data['distribution_hg'] = result_distrib_hg.tolist() - - result_lg = array_dataset_hg.collect() - result_distrib_lg = np.zeros(2048, dtype=np.int) - for R in result_lg: - result_distrib_lg = np.add(result_distrib_lg, R['elements']) - data['distribution_lg'] = result_distrib_lg.tolist() - - # Timestamp # - time_stamp = df.select("__record_timestamp__") - result_time_stamp = time_stamp.collect() - data['timestamps'] = [] - for TS in result_time_stamp: - data['timestamps'].append(TS["__record_timestamp__"]) - - # Writing to db - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE data SET raw_dist = %s where id = %s', (json.dumps(data), data_id,)) - - # Writing output # - with open(_dest_dir + '/' + _device + '_' + _start_time.replace(' ', '_') + 'to' - + _end_time.replace(' ', '_') + '_' + _property + '.dat', 'w') as outfile: - json.dump(data, outfile) - - print(_dest_dir + '/' + _device + '_' + _start_time.replace(' ', '_') + 'to' - + _end_time.replace(' ', '_') + '_' + _property + '.dat' + " Pulled") - - -def pull_integral_dist(_start_time, _end_time, _device): - _property = "LoggingIntegralDist" - - data = {} - - df = KeyValuesQuery.builder(spark).system("CMW") \ - .startTime(_start_time) \ - .endTime(_end_time) \ - .entity() \ - .keyValue("device", _device) \ - .keyValue("property", _property) \ - .buildDataset() - - df.printSchema() - - # Process hg Data # - array_dataset_hg = df.select("distribution_hg.elements").limit(1) - array_dataset_hg.printSchema() - - # Process lg Data # - array_dataset_lg = df.select("distribution_lg.elements").limit(1) - array_dataset_lg.printSchema() - - # DATA # - data['rows'] = df.count() - data['startTime'] = _start_time - data['endTime'] = _end_time - data['lsbcut_hg'] = df.select('lsbcut_hg').collect()[0] - data['offset_hg'] = df.select('offset_hg').collect()[0] - data['lsbcut_lg'] = df.select('lsbcut_lg').collect()[0] - data['offset_lg'] = df.select('offset_lg').collect()[0] - data['nturns'] = df.select('nturns').collect()[0] - - # Array Data # - result_hg = array_dataset_hg.collect() - result_distrib_hg = np.zeros(2048, dtype=np.int) - for R in result_hg: - result_distrib_hg = np.add(result_distrib_hg, R['elements']) - data['distribution_hg'] = result_distrib_hg.tolist() - - result_lg = array_dataset_lg.collect() - result_distrib_lg = np.zeros(2048, dtype=np.int) - for R in result_lg: - if len(R) == 2048: - result_distrib_lg = np.add(result_distrib_lg, R['elements']) - data['distribution_lg'] = result_distrib_lg.tolist() - - # Timestamp # - time_stamp = df.select("__record_timestamp__") - result_time_stamp = time_stamp.collect() - data['timestamps'] = [] - for TS in result_time_stamp: - data['timestamps'].append(TS["__record_timestamp__"]) - - # Writing to db - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE data SET integral_dist = %s where id = %s', (json.dumps(data), data_id,)) - - # Writing output # - with open(_dest_dir + '/' + _device + '_' + _start_time.replace(' ', '_') + 'to' - + _end_time.replace(' ', '_') + '_' + _property + '.dat', 'w') as outfile: - json.dump(data, outfile) - - print(_dest_dir + '/' + _device + '_' + _start_time.replace(' ', '_') + 'to' - + _end_time.replace(' ', '_') + '_' + _property + '.dat' + " Pulled") - - -def pull_turnloss(_start_time, _end_time, _device): - _property = "LoggingTurnLoss" - - data = {} - - df = KeyValuesQuery.builder(spark).system("CMW") \ - .startTime(_start_time) \ - .endTime(_end_time) \ - .entity() \ - .keyValue("device", _device) \ - .keyValue("property", _property) \ - .buildDataset() - - df.printSchema() - data['rows'] = df.count() - print(data['rows']) - - # Process hg Data # - array_dataset_hg = df.select("turnloss_hg_com.elements") - array_dataset_hg.printSchema() - - array_dataset_hg_com1 = df.select("turnloss_hg_com.elements").limit(int(data['rows'] / 4)) - # array_dataset_hg_com2 = df.select("turnloss_hg_com.elements").limit(int(data['rows'] / 2)) - array_index_hg = df.select("turnloss_hg_ind.elements") - - intime_array_hg = array_dataset_hg.withColumn('Total', sums_cols(col('elements'))).select('Total') - intime_array_hg.printSchema() - - # Process lg Data # - array_dataset_lg = df.select("turnloss_lg_com.elements") - array_dataset_lg.printSchema() - - array_index_lg = df.select("turnloss_lg_ind.elements") - - intime_array_lg = array_dataset_lg.withColumn('Total', sums_cols(col('elements'))).select('Total') - intime_array_lg.printSchema() - - # DATA # - data['startTime'] = _start_time - data['endTime'] = _end_time - data['nturns'] = df.select('nturns').collect()[0] - data['zsthr_hg'] = df.select('zsthr_hg').collect()[0] - data['zsthr_lg'] = df.select('zsthr_lg').collect()[0] - data['baselinesub'] = df.select('baselinesub').collect()[0] - - # Timestamp # - time_stamp = df.select("__record_timestamp__") - result_time_stamp = time_stamp.collect() - data['timestamps'] = [] - for TS in result_time_stamp: - data['timestamps'].append(TS["__record_timestamp__"]) - - # Intime Data # - result_hg = intime_array_hg.collect() - data['turnloss_hg_intime'] = [] - for IT in result_hg: - data['turnloss_hg_intime'].append(IT['Total']) - - result_lg = intime_array_lg.collect() - data['turnloss_lg_intime'] = [] - for IT in result_lg: - data['turnloss_lg_intime'].append(IT['Total']) - - # Turnloss Data - result_array_dataset_hg = array_dataset_hg_com1.collect() - result_array_index_hg = array_index_hg.collect() - data['turnloss_hg'] = uncompress(result_array_dataset_hg, result_array_index_hg, data['nturns'], data['rows']) - - ''' - result_array_dataset_lg = array_dataset_lg.collect() - result_array_index_lg = array_index_lg.collect() - data['turnloss_lg'] = uncompress(result_array_dataset_lg, result_array_index_lg, 57802, data['rows']) - ''' - # Writing to db - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE data SET turnloss = %s where id = %s', (json.dumps(data), data_id,)) - - # Writing output # - with open(_dest_dir + '/' + _device + '_' + _start_time.replace(' ', '_') + 'to' - + _end_time.replace(' ', '_') + '_' + _property + '.dat', 'w') as outfile: - json.dump(data, outfile) - - print(_dest_dir + '/' + _device + '_' + _start_time.replace(' ', '_') + 'to' - + _end_time.replace(' ', '_') + '_' + _property + '.dat' + " Pulled") - - -if __name__ == '__main__': - - _mode = "STABLE" - _start = "2018-10-22 21:18:00.000" - _end = "2018-10-23 11:18:00.000" - _device = "BLMED.06L7.B1B30_TCPA.A6L7.B1" - - if len(sys.argv) > 1 and sys.argv[1] == '-h': - print("-- HELP -- \n" - "-- TODO -- \n" - "- ------ -") - sys.exit(0) - elif len(sys.argv) < 4: - print("usage: python property-pull.py <device> <mode> <start> <end> \n" - "type 'python property-pull.py -h' for more infos") - sys.exit(1) - else: - _device = sys.argv[1] - _mode = sys.argv[2] - _start = sys.argv[3] - _end = sys.argv[4] - - conf = SparkConf() - conf.setMaster('yarn') - conf.setAppName('property-pull.py') - - sc = SparkContext(conf=conf) - sqlContext = SQLContext(sc) - spark = SparkSession(sc) - - print('Pulling data for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end) - - gcursor = mydb.cursor() - gcursor.execute('SELECT id FROM device where name=%s', (_device,)) - device_id = gcursor.fetchone()[0] - # print(device_id) - - gcursor.execute("insert into nxcals_blm_data.data(histogram, integral, integral_dist, raw_dist, turnloss) " - "VALUES ( '{}', '{}', '{}', '{}', '{}');") - data_id = gcursor.lastrowid - # print(data_id) - - gcursor.execute('INSERT INTO fill(name, device_id, mode, data_id, start, end)' - 'VALUES (%s, %s, %s, %s, %s, %s )', - (_device + _start + 'to' + _end, device_id, _mode, data_id, _start, _end,)) - - pull_histogram(_start, _end, _device) - pull_integral(_start, _end, _device) - pull_raw_dist(_start, _end, _device) - pull_integral_dist(_start, _end, _device) - # pull_turnloss(_start, _end, _device) - - mydb.commit() - print('Pulled done, data pushed on DB') diff --git a/property_pull.py b/property_pull.py new file mode 100644 index 0000000000000000000000000000000000000000..d236b5723b6963f1c60f9be0794ea1ae6e33713b --- /dev/null +++ b/property_pull.py @@ -0,0 +1,885 @@ +#!spark-submit --master yarn +import json +import sys +import requests +from requests.exceptions import HTTPError + +import mysql.connector +import pyconf as cfg + +from cern.nxcals.pyquery.builders import * +from pyspark import SparkConf +from pyspark import SparkContext +from pyspark.sql import * +from pyspark.sql.functions import udf, col +from pyspark.sql.types import IntegerType +from pyspark.sql.utils import AnalysisException + +from multiprocessing import Process + +import numpy as np +import pandas as pd + +API_HOST = 'http://test-nxdblm-api2.web.cern.ch/' + +_dest_dir = './pulledData' + +# mysqlDB variables +mydb = mysql.connector.connect( + host=cfg.mysql['host'], + port=cfg.mysql['port'], + user=cfg.mysql['user'], + passwd=cfg.mysql['passwd'], + database=cfg.mysql['database'] +) + +# Sparks Variable +conf = SparkConf() +conf.setMaster('yarn') +conf.setAppName('cern_nxcals_dblm_property_pull.py') + +sc = SparkContext(conf=conf) +sql_context = SQLContext(sc) +spark = SparkSession(sc) + +# sumUDF +sums_cols = udf(lambda arr: 0 if arr == [] else sum(arr), IntegerType()) + +''' +#TODO - +# Reduce ( UDAF ? ) +# Scala UDF ? (working with python's lambda for now ) +''' + + +def write_file(_device, _start_time, _end_time, _property, data, _files_write): + if _files_write: + # Writing output # + with open(_dest_dir + '/' + _device + '_' + _start_time.replace(' ', '_') + 'to' + + _end_time.replace(' ', '_') + '_' + _property + '.dat', 'w') as outfile: + json.dump(data, outfile) + + print(_dest_dir + '/' + _device + '_' + _start_time.replace(' ', '_') + 'to' + + _end_time.replace(' ', '_') + '_' + _property + '.dat' + " Pulled") + + +''' uncompress_pandas +Apply a decompression process accordingly to NXCALS compression documentation : +zeros a removed from data_array index of non zeros data are stocked into an another array +Input : + _data_array : datas get from NXCALS for a compressed type of data + _index_array : indexes for non-zero data + _size : size of the final array + _nrows : number of rows given by NXCALS +''' + + +def uncompress_pandas(_data_array, _index_array, _size, _nrows): + print('uncompress start') + result = np.zeros(_size, dtype=np.int64) + for i in range(_nrows): + tmpresult = np.zeros(_size, dtype=np.int) + np.put(tmpresult, _index_array[i][0], _data_array[i][0]) + result = np.add(result, tmpresult) + print("uncompress finished") + return result.tolist() + + +''' check_db +Check some databases infos : + - device & mode are registered into the database. + - the measurement are already been done + - if already done, check the data completion + - if not already done, + - check if fill & fill_mode already exist (create if not) + - init & create a new measure +''' + + +def check_db(_fill_name, _mode, _device, _start, _end): + try: + response = requests.get(API_HOST + 'mode', + params={ + 'name': _mode + }) + if response.status_code == 204: + print('Mode ' + _mode + ' unknown ') + return False, -1 + + response = requests.get(API_HOST + 'device', + params={ + 'name': _device + }) + if response.status_code == 204: + print('Device ' + _device + ' unknown') + return False, -1 + device_id = response.json()['id'] + + response = requests.get(API_HOST + 'measure', + params={ + 'device_id': device_id, + 'mode_name': _mode, + 'fill_name': _fill_name, + 'start': _start + }) + + if response.status_code != 204: + data_id = response.json()['data_id'] + response = requests.get(API_HOST + 'data_complete', params={id: data_id}) + if response.status_code == 200: + if response.json()['complete']: + print('data are already pulled for fill ' + str(_fill_name) + + ' for device ' + _device + + ' mode ' + _mode + + ' from ' + _start + ' to ' + _end) + return False, -1 + else: + return True, data_id + else: + print('error during dataCheck') + return False, -1 + + else: + response = requests.post(API_HOST + 'new_data') + data_id = response.json()['id'] + + response = requests.get(API_HOST + 'fill/metadata/name', params={'name': _fill_name}) + if response.status_code == 204: + requests.post(API_HOST + 'new_fill', + params={ + 'name': _fill_name, + 'start': _start, + 'end': _end + }) + + response = requests.get(API_HOST + 'fill_mode', + params={ + 'mode_name': _mode, + 'fill_name': _fill_name, + 'start': _start, + 'end': _end, + }) + if response.status_code == 204: + response = requests.post(API_HOST + 'new_fill_mode', + params={ + 'fill_name': _fill_name, + 'mode_name': _mode, + 'start': _start, + 'end': _end, + }) + fill_mode_id = response.json()['id'] + else: + fill_mode_id = response.json()['id'] + + requests.post(API_HOST + 'new_measure', + params={ + 'fill_mode_id': fill_mode_id, + 'data_id': data_id, + 'device_id': device_id + }) + return True, data_id + except HTTPError as err: + print('HTTPError' + str(err)) + return False, -1 + + +# noinspection SqlResolve +def pull_histogram_pandas(_fill_name, _mode, + _device, + _start_time, + _end_time, + _files_write=False, + _db_write=True, + _data_id=None): + _property = 'LoggingHistogram' + + if (_data_id is None or _data_id == -1) and _db_write: + ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time) + if not ok or _data_id is None or _data_id == -1: + return -1 + + data = {} + + df = DataFrame(KeyValuesQuery.builder(spark).system("CMW") + .startTime(_start_time) + .endTime(_end_time) + .entity() + .keyValue("device", _device) + .keyValue("property", _property) + .buildDataset(), sql_context) + + df.registerTempTable("pulled_data") + + try: + array_dataset_hg = spark.sql("select histogram_hg_com.elements from pulled_data") \ + .toPandas() + + array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + data['histogram_hg_intime'] = array_dataset_hg['Total'].values.tolist() + except AnalysisException: + print('histogram_hg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device) + array_dataset_hg = None + data['histogram_hg_intime'] = [] + + try: + array_index_hg = spark.sql("select histogram_hg_ind.elements from pulled_data") \ + .toPandas() + except AnalysisException: + print('histogram_hg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device) + array_index_hg = None + + # Process lg array data + try: + array_dataset_lg = spark.sql("select histogram_lg_com.elements from pulled_data") \ + .toPandas() + + array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + data['histogram_lg_intime'] = array_dataset_lg['Total'].values.tolist() + except AnalysisException: + print('histogram_lg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device) + array_dataset_lg = None + data['histogram_lg_intime'] = [] + + try: + array_index_lg = spark.sql("select histogram_lg_ind.elements from pulled_data") \ + .toPandas() + except AnalysisException: + print('histogram_hg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device) + array_index_lg = None + + # Metadata + if array_dataset_hg is not None: + data['rows'] = array_dataset_hg.shape[0] + elif array_dataset_lg is not None: + data['rows'] = array_dataset_lg.shape[0] + else: + data['rows'] = 0 + + data['startTime'] = _start_time + data['endTime'] = _end_time + + try: + data['threshold_hg'] = spark.sql("select threshold_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['threshold_hg']) > 0: + data['threshold_hg'] = data['threshold_hg'][0] + except AnalysisException: + data['threshold_hg'] = 0 + + try: + data['threshold_lg'] = spark.sql("select threshold_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['threshold_lg']) > 0: + data['threshold_lg'] = data['threshold_lg'][0] + except AnalysisException: + data['threshold_lg'] = 0 + + try: + data['nsamples'] = spark.sql("select nsamples from pulled_data limit 1").toPandas().values.tolist() + if len(data['nsamples']) > 0: + data['nsamples'] = data['nsamples'][0] + except AnalysisException: + data['nsamples'] = 0 + + # Timestamp # + try: + time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ + .toPandas() + data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() + except AnalysisException: + data['timestamps'] = [] + + # Histogram data # + if array_dataset_hg is None or array_index_hg is None: + data['histogram_hg'] = [] + else: + data['histogram_hg'] = uncompress_pandas(array_dataset_hg.values, + array_index_hg.values, + 57802, data['rows'] - 1) + if array_dataset_lg is None or array_index_lg is None: + data['histogram_lg'] = [] + else: + data['histogram_lg'] = uncompress_pandas(array_dataset_lg.values, + array_index_lg.values, + 57802, data['rows'] - 1) + + # Writing to db + if _db_write: + db_put_response = requests.put(API_HOST + 'histogram', + params={ + 'id': _data_id + }, + json=data) + + write_file(_device, _start_time, _end_time, _property, data, _files_write) + + return data + + +# noinspection SqlResolve +def pull_integral_pandas(_fill_name, + _mode, + _device, + _start_time, + _end_time, + _files_write=False, + _db_write=True, + _data_id=None): + _property = "LoggingIntegral" + + if (_data_id is None or _data_id == -1) and _db_write: + ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time) + if not ok or _data_id is None or _data_id == -1: + return -1 + + data = {} + + df = DataFrame(KeyValuesQuery.builder(spark).system("CMW") + .startTime(_start_time) + .endTime(_end_time) + .entity() + .keyValue("device", _device) + .keyValue("property", _property) + .buildDataset(), sql_context) + + df.registerTempTable("pulled_data") + + try: + array_dataset_hg = spark.sql("select integral_hg.elements from pulled_data") \ + .toPandas() + + array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + data['integral_hg_intime'] = array_dataset_hg['Total'].values.tolist() + + integral_hg = np.zeros(3564, dtype=np.int) + for T in array_dataset_hg.values: + if len(T[0]) == 3564: + integral_hg = np.add(integral_hg, T[0]) + data['integral_hg'] = integral_hg.tolist() + + except AnalysisException: + print('integral_hg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device) + array_dataset_hg = None + data['integral_hg_intime'] = [] + data['integral_hg'] = [] + + try: + array_dataset_lg = spark.sql("select integral_lg.elements from pulled_data") \ + .toPandas() + + array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + + data['integral_lg_intime'] = array_dataset_lg['Total'].values.tolist() + + integral_lg = np.zeros(3564, dtype=np.int) + for T in array_dataset_lg.values: + if len(T[0]) == 3564: + integral_lg = np.add(integral_lg, T[0]) + data['integral_lg'] = integral_lg.tolist() + except AnalysisException: + array_dataset_lg = None + data['integral_lg_intime'] = [] + data['integral_lg'] = [] + + # Metadata + if array_dataset_hg is not None: + data['rows'] = array_dataset_hg.shape[0] + elif array_dataset_lg is not None: + data['rows'] = array_dataset_lg.shape[0] + else: + data['rows'] = 0 + + data['startTime'] = _start_time + data['endTime'] = _end_time + + try: + data['zsthr_hg'] = spark.sql("select zsthr_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['zsthr_hg']) > 0: + data['zsthr_hg'] = data['zsthr_hg'][0] + except AnalysisException: + data['zsthr_hg'] = 0 + + try: + data['zsthrave_hg'] = spark.sql("select zsthrave_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['zsthrave_hg']) > 0: + data['zsthrave_hg'] = data['zsthrave_hg'][0] + except AnalysisException: + data['zsthrave_hg'] = 0 + + try: + data['zsthr_lg'] = spark.sql("select zsthr_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['zsthr_lg']) > 0: + data['zsthr_lg'] = data['zsthr_lg'][0] + except AnalysisException: + data['zsthr_lg'] = 0 + + try: + data['zsthrave_lg'] = spark.sql("select zsthrave_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['zsthrave_lg']) > 0: + data['zsthrave_lg'] = data['zsthrave_lg'][0] + except AnalysisException: + data['zsthrave_lg'] = -1 + + try: + data['baselinesub'] = spark.sql("select baselinesub from pulled_data limit 1").toPandas().values.tolist() + if len(data['baselinesub']) > 0: + data['baselinesub'] = data['baselinesub'][0] + except AnalysisException: + data['baselinesub'] = 0 + + # Timestamp # + try: + time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ + .toPandas() + data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() + except AnalysisException: + data['timestamps'] = [] + + # Writing to db + if _db_write: + db_put_response = requests.put(API_HOST + 'integral', + params={ + 'id': _data_id + }, + json=data) + + write_file(_device, _start_time, _end_time, _property, data, _files_write) + + return data + + +# noinspection SqlResolve +def pull_raw_dist_pandas(_fill_name, + _mode, + _device, + _start_time, + _end_time, + _files_write=False, + _db_write=True, + _data_id=None): + _property = "LoggingRawDist" + + if (_data_id is None or _data_id == -1) and _db_write: + ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time) + if not ok or _data_id is None or _data_id == -1: + return -1 + + data = {} + + df = DataFrame(KeyValuesQuery.builder(spark).system("CMW") + .startTime(_start_time) + .endTime(_end_time) + .entity() + .keyValue("device", _device) + .keyValue("property", _property) + .buildDataset(), sql_context) + + df.registerTempTable("pulled_data") + + # Process hg Data # + try: + array_dataset_hg = spark.sql("select distribution_hg.elements from pulled_data") \ + .toPandas() + + result_distrib_hg = np.zeros(2048, dtype=np.int) + for R in array_dataset_hg.values: + if len(R[0]) == 2048: + result_distrib_hg = np.add(result_distrib_hg, R[0]) + data['distribution_hg'] = result_distrib_hg.tolist() + except AnalysisException: + array_dataset_hg = None + data['distribution_hg'] = [] + + # Process lg Data # + try: + array_dataset_lg = spark.sql("select distribution_lg.elements from pulled_data") \ + .toPandas() + result_distrib_lg = np.zeros(2048, dtype=np.int) + for R in array_dataset_lg.values: + if len(R[0]) == 2048: + result_distrib_lg = np.add(result_distrib_lg, R[0]) + data['distribution_lg'] = result_distrib_lg.tolist() + except AnalysisException: + array_dataset_lg = None + data['distribution_lg'] = [] + + # Metadata + if array_dataset_hg is not None: + data['rows'] = array_dataset_hg.shape[0] + elif array_dataset_lg is not None: + data['rows'] = array_dataset_lg.shape[0] + else: + data['rows'] = 0 + + data['startTime'] = _start_time + data['endTime'] = _end_time + + try: + data['lsbcut_hg'] = spark.sql("select lsbcut_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['lsbcut_hg']) > 0: + data['lsbcut_hg'] = data['lsbcut_hg'][0] + except AnalysisException: + data['lsbcut_hg'] = 0 + + try: + data['offset_hg'] = spark.sql("select offset_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['offset_hg']) > 0: + data['offset_hg'] = data['offset_hg'][0] + except AnalysisException: + data['offset_hg'] = 0 + + try: + data['lsbcut_lg'] = spark.sql("select lsbcut_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['lsbcut_lg']) > 0: + data['lsbcut_lg'] = data['lsbcut_lg'][0] + except AnalysisException: + data['lsbcut_lg'] = 0 + + try: + data['offset_lg'] = spark.sql("select offset_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['offset_lg']) > 0: + data['offset_lg'] = data['offset_lg'][0] + except AnalysisException: + data['offset_lg'] = 0 + + # Timestamp # + try: + time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ + .toPandas() + data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() + except AnalysisException: + data['timestamps'] = [] + + # Writing to db + if _db_write: + db_put_response = requests.put(API_HOST + 'raw_dist', + params={ + 'id': _data_id + }, + json=data) + + write_file(_device, _start_time, _end_time, _property, data, _files_write) + + return data + + +# noinspection SqlResolve +def pull_integral_dist_pandas(_fill_name, + _mode, + _device, + _start_time, + _end_time, + _files_write=False, + _db_write=True, + _data_id=None): + _property = "LoggingIntegralDist" + + if (_data_id is None or _data_id == -1) and _db_write: + ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time) + if not ok or _data_id is None or _data_id == -1: + return -1 + + data = {} + + df = DataFrame(KeyValuesQuery.builder(spark).system("CMW") + .startTime(_start_time) + .endTime(_end_time) + .entity() + .keyValue("device", _device) + .keyValue("property", _property) + .buildDataset(), sql_context) + + df.registerTempTable("pulled_data") + + # Process hg Data # + try: + array_dataset_hg = spark.sql("select distribution_hg.elements from pulled_data") \ + .toPandas() + + result_distrib_hg = np.zeros(2048, dtype=np.int) + for R in array_dataset_hg.values: + if len(R[0]) == 2048: + result_distrib_hg = np.add(result_distrib_hg, R[0]) + data['distribution_hg'] = result_distrib_hg.tolist() + except AnalysisException: + data['distribution_hg'] = [] + array_dataset_hg = None + + # Process lg Data # + try: + array_dataset_lg = spark.sql("select distribution_lg.elements from pulled_data") \ + .toPandas() + + result_distrib_lg = np.zeros(2048, dtype=np.int) + for R in array_dataset_lg.values: + if len(R[0]) == 2048: + result_distrib_lg = np.add(result_distrib_lg, R[0]) + data['distribution_lg'] = result_distrib_lg.tolist() + except AnalysisException: + array_dataset_lg = None + data['distribution_lg'] = [] + + # Metadata # + if array_dataset_hg is not None: + data['rows'] = array_dataset_hg.shape[0] + elif array_dataset_lg is not None: + data['rows'] = array_dataset_lg.shape[0] + else: + data['rows'] = 0 + + data['startTime'] = _start_time + data['endTime'] = _end_time + + try: + data['lsbcut_hg'] = spark.sql("select lsbcut_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['lsbcut_hg']) > 0: + data['lsbcut_hg'] = data['lsbcut_hg'][0] + except AnalysisException: + data['lsbcut_hg'] = 0 + + try: + data['offset_hg'] = spark.sql("select offset_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['offset_hg']) > 0: + data['offset_hg'] = data['offset_hg'][0] + except AnalysisException: + data['offset_hg'] = 0 + + try: + data['lsbcut_lg'] = spark.sql("select lsbcut_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['lsbcut_lg']) > 0: + data['lsbcut_lg'] = data['lsbcut_lg'][0] + except AnalysisException: + data['lsbcut_lg'] = 0 + + try: + data['offset_lg'] = spark.sql("select offset_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['offset_lg']) > 0: + data['offset_lg'] = data['offset_lg'][0] + except AnalysisException: + data['offset_lg'] = 0 + + try: + data['nturns'] = spark.sql("select nturns from pulled_data limit 1").toPandas().values.tolist() + if len(data['nturns']) > 0: + data['nturns'] = data['nturns'][0] + except AnalysisException: + data['nturns'] = 0 + + # Timestamp # + try: + time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ + .toPandas() + data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() + except AnalysisException: + data['timestamps'] = [] + + # Writing to db + if _db_write: + requests.put(API_HOST + 'integral_dist', + params={ + 'id': _data_id + }, + json=data) + + write_file(_device, _start_time, _end_time, _property, data, _files_write) + + return data + + +# noinspection SqlResolve +def pull_turnloss_pandas(_fill_name, + _mode, + _device, + _start_time, + _end_time, + _files_write=False, + _db_write=True, + _data_id=None): + _property = "LoggingTurnLoss" + + if (_data_id is None or _data_id == -1) and _db_write: + ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time) + if not ok or _data_id is None or _data_id == -1: + return -1 + + data = {} + + df = DataFrame(KeyValuesQuery.builder(spark).system("CMW") + .startTime(_start_time) + .endTime(_end_time) + .entity() + .keyValue("device", _device) + .keyValue("property", _property) + .buildDataset(), sql_context) + + df.registerTempTable("pulled_data") + + # Process hg Data # + try: + array_dataset_hg = spark.sql("select turnloss_hg_com.elements from pulled_data") \ + .toPandas() + + array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + data['turnloss_hg_intime'] = array_dataset_hg['Total'].values.tolist() + + except AnalysisException: + array_dataset_hg = None + data['turnloss_hg_intime'] = [] + + try: + array_index_hg = spark.sql("select turnloss_hg_ind.elements from pulled_data") \ + .toPandas() + except AnalysisException: + array_index_hg = None + + # Process lg Data # + try: + array_dataset_lg = spark.sql("select turnloss_lg_com.elements from pulled_data") \ + .toPandas() + + array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + data['turnloss_lg_intime'] = array_dataset_lg['Total'].values.tolist() + + except AnalysisException: + array_dataset_lg = None + data['turnloss_lg_intime'] = [] + + try: + array_index_lg = spark.sql("select turnloss_lg_ind.elements from pulled_data") \ + .toPandas() + except AnalysisException: + array_index_lg = None + + # Metadata # + if array_dataset_hg is not None: + data['rows'] = array_dataset_hg.shape[0] + elif array_dataset_lg is not None: + data['rows'] = array_dataset_lg.shape[0] + else: + data['rows'] = 0 + + data['startTime'] = _start_time + data['endTime'] = _end_time + + try: + data['zsthr_hg'] = spark.sql("select zsthr_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['zsthr_hg']) > 0: + data['zsthr_hg'] = data['zsthr_hg'][0] + except AnalysisException: + data['zsthr_hg'] = 0 + + try: + data['zsthr_lg'] = spark.sql("select zsthr_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['zsthr_lg']) > 0: + data['zsthr_lg'] = data['zsthr_lg'][0] + except AnalysisException: + data['zsthr_lg'] = 0 + + try: + data['baselinesub'] = spark.sql("select baselinesub from pulled_data limit 1").toPandas().values.tolist() + if len(data['baselinesub']) > 0: + data['baselinesub'] = data['baselinesub'][0] + except AnalysisException: + data['baselinesub'] = 0 + + try: + data['nturns'] = spark.sql("select nturns from pulled_data limit 1").toPandas().values.tolist() + if len(data['nturns']) > 0: + data['nturns'] = data['nturns'][0] + except AnalysisException: + data['nturns'] = 0 + + # Timestamp # + try: + time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ + .toPandas() + data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() + except AnalysisException: + data['timestamps'] = [] + + # Turnloss Data + if array_dataset_hg is None or array_index_hg is None: + data['turnloss_hg'] = [] + else: + data['turnloss_hg'] = uncompress_pandas(array_dataset_hg.values, + array_index_hg.values, + data['nturns'], data['rows']) + + if array_dataset_lg is None or array_index_lg is None: + data['turnloss_lg'] = [] + else: + data['turnloss_lg'] = uncompress_pandas(array_dataset_lg.values, + array_index_lg.values, + data['nturns'], data['rows']) + + # Writing to db + if _db_write: + requests.put(API_HOST + 'turnloss', + params={ + 'id': _data_id + }, + json=data) + + write_file(_device, _start_time, _end_time, _property, data, _files_write) + + return data + + +def property_pull(_fill_name, _mode, _device, _start, _end, _files_write=False, _db_write=True): + ok, data_id = check_db(_fill_name, _mode, _device, _start, _end) + if not ok or data_id is None or data_id == -1: + return -1 + + print('Pulling data for fill ' + + str(_fill_name) + ' for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end) + + pull_histogram_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id) + pull_integral_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id) + pull_raw_dist_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id) + pull_integral_dist_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id) + pull_turnloss_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id) + + print('Pulled done, data pushed on DB for fill ' + + str(_fill_name) + ' for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end) + + return 0 + + +if __name__ == '__main__': + + _fill_name = "7340" + _mode = "STABLE" + _start = "2018-10-22 21:18:00.000" + _end = "2018-10-23 11:18:00.000" + _device = "BLMED.06L7.B1B30_TCPA.A6L7.B1" + _files = False + + if len(sys.argv) > 1 and sys.argv[1] == '-h': + print(" -- HELP -- \n" + " -- TODO -- \n" + "-args- \n" + " <fillName> : the name of the fill you are pulling, a fill is a complete experience. " + "In one fill there is multiples device, modes, time measurements ... " + "So you'll need to do multiples pulling for the same fill \n" + " <device> : the name of the device you want to pull. If you want to pull multiples device at once," + " enter devices in quotes, separated by spaces \n" + " ex: '<device1> <device2> <device3> ...' \n\n" + " <mode> : the name of the mode you want to pull. \n" + " ex: STABLE, ALL, START, BEAMDUMP ... \n\n" + " <start> : the start time of the pull you want to do. \n" + " format: 'yyyy-mm-dd hh:mm:ss.ms' or 'yyyy-mm-dd_hh:mm:ss.ms'") + sys.exit(0) + elif len(sys.argv) < 6: + print("usage: python cern_nxcals_dblm_property_pull.py <fillName> <device> <mode> <start> <end> [-f] \n" + "where : \n" + " -f also generate .dat files \n" + "type 'python cern_nxcals_dblm_property_pull.py -h' for more infos") + sys.exit(1) + else: + _fill_name = sys.argv[1] + _device = sys.argv[2] + _mode = sys.argv[3] + _start = sys.argv[4] + _end = sys.argv[5] + if len(sys.argv) > 6: + _files = sys.argv[6] == "-f" + + property_pull(_fill_name, _mode, _device, _start, _end) diff --git a/routine_pull.py b/routine_pull.py new file mode 100644 index 0000000000000000000000000000000000000000..42b01df4f80100e684b561aa6a37a82ed20cca43 --- /dev/null +++ b/routine_pull.py @@ -0,0 +1,102 @@ +import time + +import mysql.connector +import requests + +from property_pull import property_pull +import pytimber as pytimber +import datetime as dt + +# from multiprocessing import Process + +# mysqlDB variables +import pyconf as cfg + +mydb = mysql.connector.connect( + host=cfg.mysql['host'], + port=cfg.mysql['port'], + user=cfg.mysql['user'], + passwd=cfg.mysql['passwd'], + database=cfg.mysql['database'] +) + +API_HOST = 'http://test-nxdblm-api2.web.cern.ch/' + + +def start_pull(_fill_infos, _devices=None): + if _devices is None: + _devices = ['BLMED.06L7.B1B30_TCPA.A6L7.B1'] + + print(_fill_infos) + if _fill_infos["endTime"] is None: + return -1 + + for iDevice in _devices: + + property_pull(_fill_infos['fillNumber'], 'ALL', iDevice, + _fill_infos['startTime'].strftime("%Y-%m-%d %H:%M:%S.000"), + _fill_infos['endTime'].strftime("%Y-%m-%d %H:%M:%S.000"), + _files_write=False) + + for iMode in _fill_infos["beamModes"]: + print(iMode['mode']) + property_pull(_fill_infos['fillNumber'], iMode['mode'], iDevice, + iMode['startTime'].strftime("%Y-%m-%d %H:%M:%S.000"), + iMode['endTime'].strftime("%Y-%m-%d %H:%M:%S.000")) + + '''p = Process(target=cern_nxcals_dblm_property_pull.cern_nxcals_dblm_property_pull, + args=(fill_infos['fillNumber'], iMode['mode'], 'BLMED.06L7.B1B30_TCPA.A6L7.B1', + iMode['startTime'].strftime("%y-%m-%d %H:%M:%S.000"), + iMode['endTime'].strftime("%y-%m-%d %H:%M:%S.000"))) + p.start() + p.join() + ''' + + +def routine_pull(_start_fill=None, _end_fill=None): + ldb = pytimber.LoggingDB() + + if _start_fill is None and _end_fill is None: + res = ldb.get('HX:FILLN', dt.datetime.now()) + new_fill_number = int(res['HX:FILLN'][1][0]) + print(new_fill_number) + + # gcursor.execute('select max(name) from Fill limit 1;') + # last_fill_pulled = gcursor.fetchone()[0] + response = requests.get(API_HOST + 'fill/last') + last_fill_pulled = response.json()['name'] + print(last_fill_pulled) + + if new_fill_number == last_fill_pulled: + print('already pulled last fill') + return -1 + else: + new_fill_number = _end_fill + last_fill_pulled = _start_fill + + response = requests.get(API_HOST + 'devices') + devices = [] + for device in response.json()['devices']: + devices.append(device['name']) + + # fill_number = 7444 + # fill_infos = ldb.getLHCFillData(fill_number, False) + # start_pull(fill_infos, devices) + + for iFill in range(int(last_fill_pulled), int(new_fill_number)): + fill_infos = ldb.getLHCFillData(str(iFill), False) + + # check if the fill is finished + if fill_infos['endTime'] is None: + break + + # check if data is too new + midnight_stamp = dt.datetime.now().replace(second=0, minute=0, hour=0) - dt.timedelta(days=1) + if midnight_stamp < fill_infos['endTime']: + continue + + start_pull(fill_infos, devices) + + +if __name__ == '__main__': + routine_pull(_start_fill=7440, _end_fill=7494)