diff --git a/.gitignore b/.gitignore index db57f0e79a948bbd737e232d305675be292d62bf..e6d0ca6c5e3b155affde8a3b4a0703194ac00de0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ pyconf.py + pulledData/ +plotOutput/ +prop_pull_api/ \ No newline at end of file diff --git a/property_pull.py b/property_pull.py index d82725ce7d9e7156baebb46c22376e689c5e0d52..d236b5723b6963f1c60f9be0794ea1ae6e33713b 100644 --- a/property_pull.py +++ b/property_pull.py @@ -1,5 +1,8 @@ +#!spark-submit --master yarn import json import sys +import requests +from requests.exceptions import HTTPError import mysql.connector import pyconf as cfg @@ -8,16 +11,18 @@ from cern.nxcals.pyquery.builders import * from pyspark import SparkConf from pyspark import SparkContext from pyspark.sql import * -from pyspark.accumulators import AccumulatorParam from pyspark.sql.functions import udf, col from pyspark.sql.types import IntegerType +from pyspark.sql.utils import AnalysisException + +from multiprocessing import Process import numpy as np import pandas as pd +API_HOST = 'http://test-nxdblm-api2.web.cern.ch/' + _dest_dir = './pulledData' -_files = False -_db_write = True # mysqlDB variables mydb = mysql.connector.connect( @@ -28,13 +33,10 @@ mydb = mysql.connector.connect( database=cfg.mysql['database'] ) -# id of the sql data's row for this filling -data_id = -1 - # Sparks Variable conf = SparkConf() conf.setMaster('yarn') -conf.setAppName('property_pull.py') +conf.setAppName('cern_nxcals_dblm_property_pull.py') sc = SparkContext(conf=conf) sql_context = SQLContext(sc) @@ -50,8 +52,8 @@ sums_cols = udf(lambda arr: 0 if arr == [] else sum(arr), IntegerType()) ''' -def write_file(_device, _start_time, _end_time, _property, data): - if _files: +def write_file(_device, _start_time, _end_time, _property, data, _files_write): + if _files_write: # Writing output # with open(_dest_dir + '/' + _device + '_' + _start_time.replace(' ', '_') + 'to' + _end_time.replace(' ', '_') + '_' + _property + '.dat', 'w') as outfile: @@ -61,6 +63,17 @@ def write_file(_device, _start_time, _end_time, _property, data): + _end_time.replace(' ', '_') + '_' + _property + '.dat' + " Pulled") +''' uncompress_pandas +Apply a decompression process accordingly to NXCALS compression documentation : +zeros a removed from data_array index of non zeros data are stocked into an another array +Input : + _data_array : datas get from NXCALS for a compressed type of data + _index_array : indexes for non-zero data + _size : size of the final array + _nrows : number of rows given by NXCALS +''' + + def uncompress_pandas(_data_array, _index_array, _size, _nrows): print('uncompress start') result = np.zeros(_size, dtype=np.int64) @@ -72,10 +85,119 @@ def uncompress_pandas(_data_array, _index_array, _size, _nrows): return result.tolist() +''' check_db +Check some databases infos : + - device & mode are registered into the database. + - the measurement are already been done + - if already done, check the data completion + - if not already done, + - check if fill & fill_mode already exist (create if not) + - init & create a new measure +''' + + +def check_db(_fill_name, _mode, _device, _start, _end): + try: + response = requests.get(API_HOST + 'mode', + params={ + 'name': _mode + }) + if response.status_code == 204: + print('Mode ' + _mode + ' unknown ') + return False, -1 + + response = requests.get(API_HOST + 'device', + params={ + 'name': _device + }) + if response.status_code == 204: + print('Device ' + _device + ' unknown') + return False, -1 + device_id = response.json()['id'] + + response = requests.get(API_HOST + 'measure', + params={ + 'device_id': device_id, + 'mode_name': _mode, + 'fill_name': _fill_name, + 'start': _start + }) + + if response.status_code != 204: + data_id = response.json()['data_id'] + response = requests.get(API_HOST + 'data_complete', params={id: data_id}) + if response.status_code == 200: + if response.json()['complete']: + print('data are already pulled for fill ' + str(_fill_name) + + ' for device ' + _device + + ' mode ' + _mode + + ' from ' + _start + ' to ' + _end) + return False, -1 + else: + return True, data_id + else: + print('error during dataCheck') + return False, -1 + + else: + response = requests.post(API_HOST + 'new_data') + data_id = response.json()['id'] + + response = requests.get(API_HOST + 'fill/metadata/name', params={'name': _fill_name}) + if response.status_code == 204: + requests.post(API_HOST + 'new_fill', + params={ + 'name': _fill_name, + 'start': _start, + 'end': _end + }) + + response = requests.get(API_HOST + 'fill_mode', + params={ + 'mode_name': _mode, + 'fill_name': _fill_name, + 'start': _start, + 'end': _end, + }) + if response.status_code == 204: + response = requests.post(API_HOST + 'new_fill_mode', + params={ + 'fill_name': _fill_name, + 'mode_name': _mode, + 'start': _start, + 'end': _end, + }) + fill_mode_id = response.json()['id'] + else: + fill_mode_id = response.json()['id'] + + requests.post(API_HOST + 'new_measure', + params={ + 'fill_mode_id': fill_mode_id, + 'data_id': data_id, + 'device_id': device_id + }) + return True, data_id + except HTTPError as err: + print('HTTPError' + str(err)) + return False, -1 + + # noinspection SqlResolve -def pull_histogram_pandas(_fill_name, _mode, _device, _start_time, _end_time): +def pull_histogram_pandas(_fill_name, _mode, + _device, + _start_time, + _end_time, + _files_write=False, + _db_write=True, + _data_id=None): _property = 'LoggingHistogram' + if (_data_id is None or _data_id == -1) and _db_write: + ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time) + if not ok or _data_id is None or _data_id == -1: + return -1 + data = {} df = DataFrame(KeyValuesQuery.builder(spark).system("CMW") @@ -86,75 +208,128 @@ def pull_histogram_pandas(_fill_name, _mode, _device, _start_time, _end_time): .keyValue("property", _property) .buildDataset(), sql_context) - # data['rows'] = df.count() df.registerTempTable("pulled_data") - # Process hg array data - array_dataset_hg = spark.sql("select histogram_hg_com.elements from pulled_data") \ - .toPandas() + try: + array_dataset_hg = spark.sql("select histogram_hg_com.elements from pulled_data") \ + .toPandas() - array_index_hg = spark.sql("select histogram_hg_ind.elements from pulled_data") \ - .toPandas() + array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + data['histogram_hg_intime'] = array_dataset_hg['Total'].values.tolist() + except AnalysisException: + print('histogram_hg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device) + array_dataset_hg = None + data['histogram_hg_intime'] = [] - array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + try: + array_index_hg = spark.sql("select histogram_hg_ind.elements from pulled_data") \ + .toPandas() + except AnalysisException: + print('histogram_hg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device) + array_index_hg = None # Process lg array data - array_dataset_lg = spark.sql("select histogram_lg_com.elements from pulled_data") \ - .toPandas() - - array_index_lg = spark.sql("select histogram_lg_ind.elements from pulled_data") \ - .toPandas() - - array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) - - # Datas # - data['rows'] = array_dataset_hg.shape[0] + try: + array_dataset_lg = spark.sql("select histogram_lg_com.elements from pulled_data") \ + .toPandas() + + array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + data['histogram_lg_intime'] = array_dataset_lg['Total'].values.tolist() + except AnalysisException: + print('histogram_lg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device) + array_dataset_lg = None + data['histogram_lg_intime'] = [] + + try: + array_index_lg = spark.sql("select histogram_lg_ind.elements from pulled_data") \ + .toPandas() + except AnalysisException: + print('histogram_hg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device) + array_index_lg = None + + # Metadata + if array_dataset_hg is not None: + data['rows'] = array_dataset_hg.shape[0] + elif array_dataset_lg is not None: + data['rows'] = array_dataset_lg.shape[0] + else: + data['rows'] = 0 data['startTime'] = _start_time data['endTime'] = _end_time - data['threshold_hg'] = spark.sql("select threshold_hg from pulled_data limit 1").toPandas().values.tolist() - if len(data['threshold_hg']) > 0: - data['threshold_hg'] = data['threshold_hg'][0] - - data['threshold_lg'] = spark.sql("select threshold_lg from pulled_data limit 1").toPandas().values.tolist() - if len(data['threshold_lg']) > 0: - data['threshold_lg'] = data['threshold_lg'][0] - - data['nsamples'] = spark.sql("select nsamples from pulled_data limit 1").toPandas().values.tolist() - if len(data['nsamples']) > 0: - data['nsamples'] = data['nsamples'][0] + try: + data['threshold_hg'] = spark.sql("select threshold_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['threshold_hg']) > 0: + data['threshold_hg'] = data['threshold_hg'][0] + except AnalysisException: + data['threshold_hg'] = 0 + + try: + data['threshold_lg'] = spark.sql("select threshold_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['threshold_lg']) > 0: + data['threshold_lg'] = data['threshold_lg'][0] + except AnalysisException: + data['threshold_lg'] = 0 + + try: + data['nsamples'] = spark.sql("select nsamples from pulled_data limit 1").toPandas().values.tolist() + if len(data['nsamples']) > 0: + data['nsamples'] = data['nsamples'][0] + except AnalysisException: + data['nsamples'] = 0 # Timestamp # - time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ - .toPandas() - data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() - - # Intime Data # - data['histogram_hg_intime'] = array_dataset_hg['Total'].values.tolist() - data['histogram_lg_intime'] = array_dataset_lg['Total'].values.tolist() + try: + time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ + .toPandas() + data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() + except AnalysisException: + data['timestamps'] = [] # Histogram data # - data['histogram_hg'] = uncompress_pandas(array_dataset_hg.values, - array_index_hg.values, - 57802, data['rows'] - 1) - - data['histogram_lg'] = uncompress_pandas(array_dataset_lg.values, - array_index_lg.values, - 57802, data['rows'] - 1) + if array_dataset_hg is None or array_index_hg is None: + data['histogram_hg'] = [] + else: + data['histogram_hg'] = uncompress_pandas(array_dataset_hg.values, + array_index_hg.values, + 57802, data['rows'] - 1) + if array_dataset_lg is None or array_index_lg is None: + data['histogram_lg'] = [] + else: + data['histogram_lg'] = uncompress_pandas(array_dataset_lg.values, + array_index_lg.values, + 57802, data['rows'] - 1) # Writing to db if _db_write: - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE data SET histogram = %s where id = %s', (json.dumps(data), data_id,)) + db_put_response = requests.put(API_HOST + 'histogram', + params={ + 'id': _data_id + }, + json=data) + + write_file(_device, _start_time, _end_time, _property, data, _files_write) - write_file(_device, _start_time, _end_time, _property, data) + return data # noinspection SqlResolve -def pull_integral_pandas(_fill_name, _mode, _device, _start_time, _end_time): +def pull_integral_pandas(_fill_name, + _mode, + _device, + _start_time, + _end_time, + _files_write=False, + _db_write=True, + _data_id=None): _property = "LoggingIntegral" + if (_data_id is None or _data_id == -1) and _db_write: + ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time) + if not ok or _data_id is None or _data_id == -1: + return -1 + data = {} df = DataFrame(KeyValuesQuery.builder(spark).system("CMW") @@ -167,77 +342,126 @@ def pull_integral_pandas(_fill_name, _mode, _device, _start_time, _end_time): df.registerTempTable("pulled_data") - # Process hg Data # - array_dataset_hg = spark.sql("select integral_hg.elements from pulled_data") \ - .toPandas() - - array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) - - # Process lg Data # - array_dataset_lg = spark.sql("select integral_lg.elements from pulled_data") \ - .toPandas() - - array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + try: + array_dataset_hg = spark.sql("select integral_hg.elements from pulled_data") \ + .toPandas() + + array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + data['integral_hg_intime'] = array_dataset_hg['Total'].values.tolist() + + integral_hg = np.zeros(3564, dtype=np.int) + for T in array_dataset_hg.values: + if len(T[0]) == 3564: + integral_hg = np.add(integral_hg, T[0]) + data['integral_hg'] = integral_hg.tolist() + + except AnalysisException: + print('integral_hg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device) + array_dataset_hg = None + data['integral_hg_intime'] = [] + data['integral_hg'] = [] + + try: + array_dataset_lg = spark.sql("select integral_lg.elements from pulled_data") \ + .toPandas() + + array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + + data['integral_lg_intime'] = array_dataset_lg['Total'].values.tolist() + + integral_lg = np.zeros(3564, dtype=np.int) + for T in array_dataset_lg.values: + if len(T[0]) == 3564: + integral_lg = np.add(integral_lg, T[0]) + data['integral_lg'] = integral_lg.tolist() + except AnalysisException: + array_dataset_lg = None + data['integral_lg_intime'] = [] + data['integral_lg'] = [] + + # Metadata + if array_dataset_hg is not None: + data['rows'] = array_dataset_hg.shape[0] + elif array_dataset_lg is not None: + data['rows'] = array_dataset_lg.shape[0] + else: + data['rows'] = 0 - # DATA # - data['rows'] = array_dataset_hg.shape[0] data['startTime'] = _start_time data['endTime'] = _end_time - data['zsthr_hg'] = spark.sql("select zsthr_hg from pulled_data limit 1").toPandas().values.tolist() - if len(data['zsthr_hg']) > 0: - data['zsthr_hg'] = data['zsthr_hg'][0] - - data['zsthrave_hg'] = spark.sql("select zsthrave_hg from pulled_data limit 1").toPandas().values.tolist() - if len(data['zsthrave_hg']) > 0: - data['zsthrave_hg'] = data['zsthrave_hg'][0] - - data['zsthr_lg'] = spark.sql("select zsthr_lg from pulled_data limit 1").toPandas().values.tolist() - if len(data['zsthr_lg']) > 0: - data['zsthr_lg'] = data['zsthr_lg'][0] - - data['zsthrave_lg'] = spark.sql("select zsthrave_lg from pulled_data limit 1").toPandas().values.tolist() - if len(data['zsthrave_lg']) > 0: - data['zsthrave_lg'] = data['zsthrave_lg'][0] - - data['baselinesub'] = spark.sql("select baselinesub from pulled_data limit 1").toPandas().values.tolist() - if len(data['baselinesub']) > 0: - data['baselinesub'] = data['baselinesub'][0] - - # Intime Data # - data['integral_hg_intime'] = array_dataset_hg['Total'].values.tolist() - data['integral_lg_intime'] = array_dataset_lg['Total'].values.tolist() + try: + data['zsthr_hg'] = spark.sql("select zsthr_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['zsthr_hg']) > 0: + data['zsthr_hg'] = data['zsthr_hg'][0] + except AnalysisException: + data['zsthr_hg'] = 0 + + try: + data['zsthrave_hg'] = spark.sql("select zsthrave_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['zsthrave_hg']) > 0: + data['zsthrave_hg'] = data['zsthrave_hg'][0] + except AnalysisException: + data['zsthrave_hg'] = 0 + + try: + data['zsthr_lg'] = spark.sql("select zsthr_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['zsthr_lg']) > 0: + data['zsthr_lg'] = data['zsthr_lg'][0] + except AnalysisException: + data['zsthr_lg'] = 0 + + try: + data['zsthrave_lg'] = spark.sql("select zsthrave_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['zsthrave_lg']) > 0: + data['zsthrave_lg'] = data['zsthrave_lg'][0] + except AnalysisException: + data['zsthrave_lg'] = -1 + + try: + data['baselinesub'] = spark.sql("select baselinesub from pulled_data limit 1").toPandas().values.tolist() + if len(data['baselinesub']) > 0: + data['baselinesub'] = data['baselinesub'][0] + except AnalysisException: + data['baselinesub'] = 0 # Timestamp # - time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ - .toPandas() - data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() - - # Integral Data - integral_hg = np.zeros(3564, dtype=np.int) - for T in array_dataset_hg.values: - if len(T[0]) == 3564: - integral_hg = np.add(integral_hg, T[0]) - data['integral_hg'] = integral_hg.tolist() - - integral_lg = np.zeros(3564, dtype=np.int) - for T in array_dataset_lg.values: - if len(T[0]) == 3564: - integral_lg = np.add(integral_lg, T[0]) - data['integral_lg'] = integral_lg.tolist() + try: + time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ + .toPandas() + data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() + except AnalysisException: + data['timestamps'] = [] # Writing to db if _db_write: - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE data SET integral = %s where id = %s', (json.dumps(data), data_id,)) + db_put_response = requests.put(API_HOST + 'integral', + params={ + 'id': _data_id + }, + json=data) + + write_file(_device, _start_time, _end_time, _property, data, _files_write) - write_file(_device, _start_time, _end_time, _property, data) + return data # noinspection SqlResolve -def pull_raw_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time): +def pull_raw_dist_pandas(_fill_name, + _mode, + _device, + _start_time, + _end_time, + _files_write=False, + _db_write=True, + _data_id=None): _property = "LoggingRawDist" + if (_data_id is None or _data_id == -1) and _db_write: + ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time) + if not ok or _data_id is None or _data_id == -1: + return -1 + data = {} df = DataFrame(KeyValuesQuery.builder(spark).system("CMW") @@ -248,68 +472,111 @@ def pull_raw_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time): .keyValue("property", _property) .buildDataset(), sql_context) - # data['rows'] = df.count() df.registerTempTable("pulled_data") # Process hg Data # - array_dataset_hg = spark.sql("select distribution_hg.elements from pulled_data") \ - .toPandas() + try: + array_dataset_hg = spark.sql("select distribution_hg.elements from pulled_data") \ + .toPandas() + + result_distrib_hg = np.zeros(2048, dtype=np.int) + for R in array_dataset_hg.values: + if len(R[0]) == 2048: + result_distrib_hg = np.add(result_distrib_hg, R[0]) + data['distribution_hg'] = result_distrib_hg.tolist() + except AnalysisException: + array_dataset_hg = None + data['distribution_hg'] = [] # Process lg Data # - array_dataset_lg = spark.sql("select distribution_lg.elements from pulled_data") \ - .toPandas() + try: + array_dataset_lg = spark.sql("select distribution_lg.elements from pulled_data") \ + .toPandas() + result_distrib_lg = np.zeros(2048, dtype=np.int) + for R in array_dataset_lg.values: + if len(R[0]) == 2048: + result_distrib_lg = np.add(result_distrib_lg, R[0]) + data['distribution_lg'] = result_distrib_lg.tolist() + except AnalysisException: + array_dataset_lg = None + data['distribution_lg'] = [] + + # Metadata + if array_dataset_hg is not None: + data['rows'] = array_dataset_hg.shape[0] + elif array_dataset_lg is not None: + data['rows'] = array_dataset_lg.shape[0] + else: + data['rows'] = 0 - # DATA # - data['rows'] = array_dataset_hg.shape[0] data['startTime'] = _start_time data['endTime'] = _end_time - data['lsbcut_hg'] = spark.sql("select lsbcut_hg from pulled_data limit 1").toPandas().values.tolist() - if len(data['lsbcut_hg']) > 0: - data['lsbcut_hg'] = data['lsbcut_hg'][0] - - data['offset_hg'] = spark.sql("select offset_hg from pulled_data limit 1").toPandas().values.tolist() - if len(data['offset_hg']) > 0: - data['offset_hg'] = data['offset_hg'][0] - - data['lsbcut_lg'] = spark.sql("select lsbcut_lg from pulled_data limit 1").toPandas().values.tolist() - if len(data['lsbcut_lg']) > 0: - data['lsbcut_lg'] = data['lsbcut_lg'][0] - - data['offset_lg'] = spark.sql("select offset_lg from pulled_data limit 1").toPandas().values.tolist() - if len(data['offset_lg']) > 0: - data['offset_lg'] = data['offset_lg'][0] - - # Array Data # - result_distrib_hg = np.zeros(2048, dtype=np.int) - for R in array_dataset_hg.values: - if len(R[0]) == 2048: - result_distrib_hg = np.add(result_distrib_hg, R[0]) - data['distribution_hg'] = result_distrib_hg.tolist() - - result_distrib_lg = np.zeros(2048, dtype=np.int) - for R in array_dataset_lg.values: - if len(R[0]) == 2048: - result_distrib_lg = np.add(result_distrib_lg, R[0]) - data['distribution_lg'] = result_distrib_lg.tolist() + try: + data['lsbcut_hg'] = spark.sql("select lsbcut_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['lsbcut_hg']) > 0: + data['lsbcut_hg'] = data['lsbcut_hg'][0] + except AnalysisException: + data['lsbcut_hg'] = 0 + + try: + data['offset_hg'] = spark.sql("select offset_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['offset_hg']) > 0: + data['offset_hg'] = data['offset_hg'][0] + except AnalysisException: + data['offset_hg'] = 0 + + try: + data['lsbcut_lg'] = spark.sql("select lsbcut_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['lsbcut_lg']) > 0: + data['lsbcut_lg'] = data['lsbcut_lg'][0] + except AnalysisException: + data['lsbcut_lg'] = 0 + + try: + data['offset_lg'] = spark.sql("select offset_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['offset_lg']) > 0: + data['offset_lg'] = data['offset_lg'][0] + except AnalysisException: + data['offset_lg'] = 0 # Timestamp # - time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ - .toPandas() - data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() + try: + time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ + .toPandas() + data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() + except AnalysisException: + data['timestamps'] = [] # Writing to db if _db_write: - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE data SET raw_dist = %s where id = %s', (json.dumps(data), data_id,)) + db_put_response = requests.put(API_HOST + 'raw_dist', + params={ + 'id': _data_id + }, + json=data) - write_file(_device, _start_time, _end_time, _property, data) + write_file(_device, _start_time, _end_time, _property, data, _files_write) + + return data # noinspection SqlResolve -def pull_integral_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time): +def pull_integral_dist_pandas(_fill_name, + _mode, + _device, + _start_time, + _end_time, + _files_write=False, + _db_write=True, + _data_id=None): _property = "LoggingIntegralDist" + if (_data_id is None or _data_id == -1) and _db_write: + ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time) + if not ok or _data_id is None or _data_id == -1: + return -1 + data = {} df = DataFrame(KeyValuesQuery.builder(spark).system("CMW") @@ -320,72 +587,119 @@ def pull_integral_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time .keyValue("property", _property) .buildDataset(), sql_context) - # data['rows'] = df.count() df.registerTempTable("pulled_data") # Process hg Data # - array_dataset_hg = spark.sql("select distribution_hg.elements from pulled_data") \ - .toPandas() + try: + array_dataset_hg = spark.sql("select distribution_hg.elements from pulled_data") \ + .toPandas() + + result_distrib_hg = np.zeros(2048, dtype=np.int) + for R in array_dataset_hg.values: + if len(R[0]) == 2048: + result_distrib_hg = np.add(result_distrib_hg, R[0]) + data['distribution_hg'] = result_distrib_hg.tolist() + except AnalysisException: + data['distribution_hg'] = [] + array_dataset_hg = None # Process lg Data # - array_dataset_lg = spark.sql("select distribution_lg.elements from pulled_data") \ - .toPandas() + try: + array_dataset_lg = spark.sql("select distribution_lg.elements from pulled_data") \ + .toPandas() + + result_distrib_lg = np.zeros(2048, dtype=np.int) + for R in array_dataset_lg.values: + if len(R[0]) == 2048: + result_distrib_lg = np.add(result_distrib_lg, R[0]) + data['distribution_lg'] = result_distrib_lg.tolist() + except AnalysisException: + array_dataset_lg = None + data['distribution_lg'] = [] + + # Metadata # + if array_dataset_hg is not None: + data['rows'] = array_dataset_hg.shape[0] + elif array_dataset_lg is not None: + data['rows'] = array_dataset_lg.shape[0] + else: + data['rows'] = 0 - # DATA # - data['rows'] = array_dataset_hg.shape[0] data['startTime'] = _start_time data['endTime'] = _end_time - data['lsbcut_hg'] = spark.sql("select lsbcut_hg from pulled_data limit 1").toPandas().values.tolist() - if len(data['lsbcut_hg']) > 0: - data['lsbcut_hg'] = data['lsbcut_hg'][0] - - data['offset_hg'] = spark.sql("select offset_hg from pulled_data limit 1").toPandas().values.tolist() - if len(data['offset_hg']) > 0: - data['offset_hg'] = data['offset_hg'][0] - - data['lsbcut_lg'] = spark.sql("select lsbcut_lg from pulled_data limit 1").toPandas().values.tolist() - if len(data['lsbcut_lg']) > 0: - data['lsbcut_lg'] = data['lsbcut_lg'][0] - - data['offset_lg'] = spark.sql("select offset_lg from pulled_data limit 1").toPandas().values.tolist() - if len(data['offset_lg']) > 0: - data['offset_lg'] = data['offset_lg'][0] - - data['nturns'] = spark.sql("select nturns from pulled_data limit 1").toPandas().values.tolist() - if len(data['nturns']) > 0: - data['nturns'] = data['nturns'][0] - - # Array Data # - result_distrib_hg = np.zeros(2048, dtype=np.int) - for R in array_dataset_hg.values: - if len(R[0]) == 2048: - result_distrib_hg = np.add(result_distrib_hg, R[0]) - data['distribution_hg'] = result_distrib_hg.tolist() - - result_distrib_lg = np.zeros(2048, dtype=np.int) - for R in array_dataset_lg.values: - if len(R[0]) == 2048: - result_distrib_lg = np.add(result_distrib_lg, R[0]) - data['distribution_lg'] = result_distrib_lg.tolist() + try: + data['lsbcut_hg'] = spark.sql("select lsbcut_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['lsbcut_hg']) > 0: + data['lsbcut_hg'] = data['lsbcut_hg'][0] + except AnalysisException: + data['lsbcut_hg'] = 0 + + try: + data['offset_hg'] = spark.sql("select offset_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['offset_hg']) > 0: + data['offset_hg'] = data['offset_hg'][0] + except AnalysisException: + data['offset_hg'] = 0 + + try: + data['lsbcut_lg'] = spark.sql("select lsbcut_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['lsbcut_lg']) > 0: + data['lsbcut_lg'] = data['lsbcut_lg'][0] + except AnalysisException: + data['lsbcut_lg'] = 0 + + try: + data['offset_lg'] = spark.sql("select offset_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['offset_lg']) > 0: + data['offset_lg'] = data['offset_lg'][0] + except AnalysisException: + data['offset_lg'] = 0 + + try: + data['nturns'] = spark.sql("select nturns from pulled_data limit 1").toPandas().values.tolist() + if len(data['nturns']) > 0: + data['nturns'] = data['nturns'][0] + except AnalysisException: + data['nturns'] = 0 # Timestamp # - time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ - .toPandas() - data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() + try: + time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ + .toPandas() + data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() + except AnalysisException: + data['timestamps'] = [] # Writing to db if _db_write: - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE data SET integral_dist = %s where id = %s', (json.dumps(data), data_id,)) + requests.put(API_HOST + 'integral_dist', + params={ + 'id': _data_id + }, + json=data) - write_file(_device, _start_time, _end_time, _property, data) + write_file(_device, _start_time, _end_time, _property, data, _files_write) + + return data # noinspection SqlResolve -def pull_turnloss_pandas(_fill_name, _mode, _device, _start_time, _end_time): +def pull_turnloss_pandas(_fill_name, + _mode, + _device, + _start_time, + _end_time, + _files_write=False, + _db_write=True, + _data_id=None): _property = "LoggingTurnLoss" + if (_data_id is None or _data_id == -1) and _db_write: + ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time) + if not ok or _data_id is None or _data_id == -1: + return -1 + data = {} df = DataFrame(KeyValuesQuery.builder(spark).system("CMW") @@ -396,123 +710,138 @@ def pull_turnloss_pandas(_fill_name, _mode, _device, _start_time, _end_time): .keyValue("property", _property) .buildDataset(), sql_context) - # data['rows'] = df.count() df.registerTempTable("pulled_data") # Process hg Data # - array_dataset_hg = spark.sql("select turnloss_hg_com.elements from pulled_data") \ - .toPandas() + try: + array_dataset_hg = spark.sql("select turnloss_hg_com.elements from pulled_data") \ + .toPandas() - array_index_hg = spark.sql("select turnloss_hg_ind.elements from pulled_data") \ - .toPandas() + array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + data['turnloss_hg_intime'] = array_dataset_hg['Total'].values.tolist() - array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + except AnalysisException: + array_dataset_hg = None + data['turnloss_hg_intime'] = [] - # Process lg Data # - array_dataset_lg = spark.sql("select turnloss_lg_com.elements from pulled_data") \ - .toPandas() - - array_index_lg = spark.sql("select turnloss_lg_ind.elements from pulled_data") \ - .toPandas() + try: + array_index_hg = spark.sql("select turnloss_hg_ind.elements from pulled_data") \ + .toPandas() + except AnalysisException: + array_index_hg = None - array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + # Process lg Data # + try: + array_dataset_lg = spark.sql("select turnloss_lg_com.elements from pulled_data") \ + .toPandas() + + array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x)) + data['turnloss_lg_intime'] = array_dataset_lg['Total'].values.tolist() + + except AnalysisException: + array_dataset_lg = None + data['turnloss_lg_intime'] = [] + + try: + array_index_lg = spark.sql("select turnloss_lg_ind.elements from pulled_data") \ + .toPandas() + except AnalysisException: + array_index_lg = None + + # Metadata # + if array_dataset_hg is not None: + data['rows'] = array_dataset_hg.shape[0] + elif array_dataset_lg is not None: + data['rows'] = array_dataset_lg.shape[0] + else: + data['rows'] = 0 - # DATA # - data['rows'] = array_dataset_hg.shape[0] data['startTime'] = _start_time data['endTime'] = _end_time - data['zsthr_hg'] = spark.sql("select zsthr_hg from pulled_data limit 1").toPandas().values.tolist() - if len(data['zsthr_hg']) > 0: - data['zsthr_hg'] = data['zsthr_hg'][0] - - data['zsthr_lg'] = spark.sql("select zsthr_lg from pulled_data limit 1").toPandas().values.tolist() - if len(data['zsthr_lg']) > 0: - data['zsthr_lg'] = data['zsthr_lg'][0] - - data['baselinesub'] = spark.sql("select baselinesub from pulled_data limit 1").toPandas().values.tolist() - if len(data['baselinesub']) > 0: - data['baselinesub'] = data['baselinesub'][0] - - data['nturns'] = spark.sql("select nturns from pulled_data limit 1").toPandas().values.tolist() - if len(data['nturns']) > 0: - data['nturns'] = data['nturns'][0] + try: + data['zsthr_hg'] = spark.sql("select zsthr_hg from pulled_data limit 1").toPandas().values.tolist() + if len(data['zsthr_hg']) > 0: + data['zsthr_hg'] = data['zsthr_hg'][0] + except AnalysisException: + data['zsthr_hg'] = 0 + + try: + data['zsthr_lg'] = spark.sql("select zsthr_lg from pulled_data limit 1").toPandas().values.tolist() + if len(data['zsthr_lg']) > 0: + data['zsthr_lg'] = data['zsthr_lg'][0] + except AnalysisException: + data['zsthr_lg'] = 0 + + try: + data['baselinesub'] = spark.sql("select baselinesub from pulled_data limit 1").toPandas().values.tolist() + if len(data['baselinesub']) > 0: + data['baselinesub'] = data['baselinesub'][0] + except AnalysisException: + data['baselinesub'] = 0 + + try: + data['nturns'] = spark.sql("select nturns from pulled_data limit 1").toPandas().values.tolist() + if len(data['nturns']) > 0: + data['nturns'] = data['nturns'][0] + except AnalysisException: + data['nturns'] = 0 # Timestamp # - time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ - .toPandas() - data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() - - # Intime Data # - data['turnloss_hg_intime'] = array_dataset_hg['Total'].values.tolist() - data['turnloss_lg_intime'] = array_dataset_lg['Total'].values.tolist() + try: + time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \ + .toPandas() + data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist() + except AnalysisException: + data['timestamps'] = [] # Turnloss Data - data['turnloss_hg'] = uncompress_pandas(array_dataset_hg.values, - array_index_hg.values, - data['nturns'], data['rows']) + if array_dataset_hg is None or array_index_hg is None: + data['turnloss_hg'] = [] + else: + data['turnloss_hg'] = uncompress_pandas(array_dataset_hg.values, + array_index_hg.values, + data['nturns'], data['rows']) - data['turnloss_lg'] = uncompress_pandas(array_dataset_lg.values, - array_index_lg.values, - data['nturns'], data['rows']) + if array_dataset_lg is None or array_index_lg is None: + data['turnloss_lg'] = [] + else: + data['turnloss_lg'] = uncompress_pandas(array_dataset_lg.values, + array_index_lg.values, + data['nturns'], data['rows']) # Writing to db if _db_write: - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE data SET turnloss = %s where id = %s', (json.dumps(data), data_id,)) - - write_file(_device, _start_time, _end_time, _property, data) - - -def property_pull(_fill_name, _mode, _device, _start, _end, _files_input=False, _db_write_input=True): - global _files - _files = _files_input - - global _db_write - _db_write = _db_write_input + requests.put(API_HOST + 'turnloss', + params={ + 'id': _data_id + }, + json=data) - gcursor = mydb.cursor() + write_file(_device, _start_time, _end_time, _property, data, _files_write) - gcursor.execute('select count(*) from mode where name=%s', (_mode, )) - mode_exists = gcursor.fetchone()[0] - if mode_exists == 0: - print('Mode ' + _mode + ' ') + return data - gcursor.execute('SELECT id FROM device where name=%s limit 1', (_device,)) - device_id = gcursor.fetchone()[0] - request = "select count(*) from fill where name=" + str(_fill_name) + " and device_id=" + str(device_id) +\ - " and mode='" + _mode + "' and start='" + _start + "' and end='" + _end + "';" - gcursor.execute(request) - - already_done = gcursor.fetchone()[0] - if already_done != 0: - print('Data for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end + ' already pulled') +def property_pull(_fill_name, _mode, _device, _start, _end, _files_write=False, _db_write=True): + ok, data_id = check_db(_fill_name, _mode, _device, _start, _end) + if not ok or data_id is None or data_id == -1: return -1 print('Pulling data for fill ' + str(_fill_name) + ' for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end) - global data_id - - gcursor.execute("insert into nxcals_blm_data.data(histogram, integral, integral_dist, raw_dist, turnloss) " - "VALUES ( '{}', '{}', '{}', '{}', '{}');") - data_id = gcursor.lastrowid - # print(data_id) - gcursor.execute('INSERT INTO fill(name, device_id, mode, data_id, start, end)' - 'VALUES (%s, %s, %s, %s, %s, %s )', - (_fill_name, device_id, _mode, data_id, _start, _end,)) + pull_histogram_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id) + pull_integral_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id) + pull_raw_dist_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id) + pull_integral_dist_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id) + pull_turnloss_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id) - pull_histogram_pandas(_fill_name, _mode, _device, _start, _end) - pull_integral_pandas(_fill_name, _mode, _device, _start, _end) - pull_raw_dist_pandas(_fill_name, _mode, _device, _start, _end) - pull_integral_dist_pandas(_fill_name, _mode, _device, _start, _end) - pull_turnloss_pandas(_fill_name, _mode, _device, _start, _end) - - mydb.commit() - print('Pulled done, data pushed on DB for fill' + print('Pulled done, data pushed on DB for fill ' + str(_fill_name) + ' for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end) + return 0 + if __name__ == '__main__': @@ -539,10 +868,10 @@ if __name__ == '__main__': " format: 'yyyy-mm-dd hh:mm:ss.ms' or 'yyyy-mm-dd_hh:mm:ss.ms'") sys.exit(0) elif len(sys.argv) < 6: - print("usage: python property_pull.py <fillName> <device> <mode> <start> <end> [-f] \n" + print("usage: python cern_nxcals_dblm_property_pull.py <fillName> <device> <mode> <start> <end> [-f] \n" "where : \n" " -f also generate .dat files \n" - "type 'python property_pull.py -h' for more infos") + "type 'python cern_nxcals_dblm_property_pull.py -h' for more infos") sys.exit(1) else: _fill_name = sys.argv[1] diff --git a/routine_pull.py b/routine_pull.py index 528c10493002a393012d4a5dcdf35901fcf606e2..42b01df4f80100e684b561aa6a37a82ed20cca43 100644 --- a/routine_pull.py +++ b/routine_pull.py @@ -1,7 +1,9 @@ -import mysql.connector import time -import property_pull +import mysql.connector +import requests + +from property_pull import property_pull import pytimber as pytimber import datetime as dt @@ -18,6 +20,8 @@ mydb = mysql.connector.connect( database=cfg.mysql['database'] ) +API_HOST = 'http://test-nxdblm-api2.web.cern.ch/' + def start_pull(_fill_infos, _devices=None): if _devices is None: @@ -29,17 +33,18 @@ def start_pull(_fill_infos, _devices=None): for iDevice in _devices: - property_pull.property_pull(_fill_infos['fillNumber'], 'ALL', iDevice, - _fill_infos['startTime'].strftime("%Y-%m-%d %H:%M:%S.000"), - _fill_infos['endTime'].strftime("%Y-%m-%d %H:%M:%S.000")) + property_pull(_fill_infos['fillNumber'], 'ALL', iDevice, + _fill_infos['startTime'].strftime("%Y-%m-%d %H:%M:%S.000"), + _fill_infos['endTime'].strftime("%Y-%m-%d %H:%M:%S.000"), + _files_write=False) for iMode in _fill_infos["beamModes"]: print(iMode['mode']) - property_pull.property_pull(_fill_infos['fillNumber'], iMode['mode'], iDevice, - iMode['startTime'].strftime("%Y-%m-%d %H:%M:%S.000"), - iMode['endTime'].strftime("%Y-%m-%d %H:%M:%S.000")) + property_pull(_fill_infos['fillNumber'], iMode['mode'], iDevice, + iMode['startTime'].strftime("%Y-%m-%d %H:%M:%S.000"), + iMode['endTime'].strftime("%Y-%m-%d %H:%M:%S.000")) - '''p = Process(target=property_pull.property_pull, + '''p = Process(target=cern_nxcals_dblm_property_pull.cern_nxcals_dblm_property_pull, args=(fill_infos['fillNumber'], iMode['mode'], 'BLMED.06L7.B1B30_TCPA.A6L7.B1', iMode['startTime'].strftime("%y-%m-%d %H:%M:%S.000"), iMode['endTime'].strftime("%y-%m-%d %H:%M:%S.000"))) @@ -50,17 +55,16 @@ def start_pull(_fill_infos, _devices=None): def routine_pull(_start_fill=None, _end_fill=None): ldb = pytimber.LoggingDB() - gcursor = mydb.cursor() - _start_fill = None - _end_fill = None if _start_fill is None and _end_fill is None: res = ldb.get('HX:FILLN', dt.datetime.now()) new_fill_number = int(res['HX:FILLN'][1][0]) print(new_fill_number) - gcursor.execute('select max(name) from fill limit 1;') - last_fill_pulled = gcursor.fetchone()[0] + # gcursor.execute('select max(name) from Fill limit 1;') + # last_fill_pulled = gcursor.fetchone()[0] + response = requests.get(API_HOST + 'fill/last') + last_fill_pulled = response.json()['name'] print(last_fill_pulled) if new_fill_number == last_fill_pulled: @@ -68,25 +72,19 @@ def routine_pull(_start_fill=None, _end_fill=None): return -1 else: new_fill_number = _end_fill - last_fll_pulled = _start_fill - 1 + last_fill_pulled = _start_fill - # Select only the first 6 devices Cause: not all device works properly with NXCALS yet - gcursor.execute('select name from device limit 6') - devices_request = gcursor.fetchall() + response = requests.get(API_HOST + 'devices') devices = [] - for device in devices_request: - devices.append(device[0]) + for device in response.json()['devices']: + devices.append(device['name']) - gcursor.close() - mydb.close() + # fill_number = 7444 + # fill_infos = ldb.getLHCFillData(fill_number, False) + # start_pull(fill_infos, devices) - fill_number = 7472 - fill_infos = ldb.getLHCFillData(fill_number, False) - start_pull(fill_infos, devices) - - ''' - for iFill in range(last_fill_pulled + 1, new_fill_number): - fill_infos = ldb.getLHCFillData(iFill, False) + for iFill in range(int(last_fill_pulled), int(new_fill_number)): + fill_infos = ldb.getLHCFillData(str(iFill), False) # check if the fill is finished if fill_infos['endTime'] is None: @@ -94,15 +92,11 @@ def routine_pull(_start_fill=None, _end_fill=None): # check if data is too new midnight_stamp = dt.datetime.now().replace(second=0, minute=0, hour=0) - dt.timedelta(days=1) - if time.mktime(midnight_stamp.timetuple()) < fill_infos['endTime']: + if midnight_stamp < fill_infos['endTime']: continue start_pull(fill_infos, devices) - ''' if __name__ == '__main__': - routine_pull() - - - + routine_pull(_start_fill=7440, _end_fill=7494)