diff --git a/.gitignore b/.gitignore index db57f0e79a948bbd737e232d305675be292d62bf..e6d0ca6c5e3b155affde8a3b4a0703194ac00de0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ pyconf.py + pulledData/ +plotOutput/ +prop_pull_api/ \ No newline at end of file diff --git a/property_pull.py b/property_pull.py index 56507b1dde8b34e7941c86941494134d2b9de002..d236b5723b6963f1c60f9be0794ea1ae6e33713b 100644 --- a/property_pull.py +++ b/property_pull.py @@ -1,7 +1,8 @@ #!spark-submit --master yarn - import json import sys +import requests +from requests.exceptions import HTTPError import mysql.connector import pyconf as cfg @@ -10,7 +11,6 @@ from cern.nxcals.pyquery.builders import * from pyspark import SparkConf from pyspark import SparkContext from pyspark.sql import * -from pyspark.accumulators import AccumulatorParam from pyspark.sql.functions import udf, col from pyspark.sql.types import IntegerType from pyspark.sql.utils import AnalysisException @@ -20,6 +20,8 @@ from multiprocessing import Process import numpy as np import pandas as pd +API_HOST = 'http://test-nxdblm-api2.web.cern.ch/' + _dest_dir = './pulledData' # mysqlDB variables @@ -31,13 +33,10 @@ mydb = mysql.connector.connect( database=cfg.mysql['database'] ) -# id of the sql data's row for this filling -data_id = -1 - # Sparks Variable conf = SparkConf() conf.setMaster('yarn') -conf.setAppName('property_pull.py') +conf.setAppName('cern_nxcals_dblm_property_pull.py') sc = SparkContext(conf=conf) sql_context = SQLContext(sc) @@ -64,6 +63,17 @@ def write_file(_device, _start_time, _end_time, _property, data, _files_write): + _end_time.replace(' ', '_') + '_' + _property + '.dat' + " Pulled") +''' uncompress_pandas +Apply a decompression process accordingly to NXCALS compression documentation : +zeros a removed from data_array index of non zeros data are stocked into an another array +Input : + _data_array : datas get from NXCALS for a compressed type of data + _index_array : indexes for non-zero data + _size : size of the final array + _nrows : number of rows given by NXCALS +''' + + def uncompress_pandas(_data_array, _index_array, _size, _nrows): print('uncompress start') result = np.zeros(_size, dtype=np.int64) @@ -75,10 +85,119 @@ def uncompress_pandas(_data_array, _index_array, _size, _nrows): return result.tolist() +''' check_db +Check some databases infos : + - device & mode are registered into the database. + - the measurement are already been done + - if already done, check the data completion + - if not already done, + - check if fill & fill_mode already exist (create if not) + - init & create a new measure +''' + + +def check_db(_fill_name, _mode, _device, _start, _end): + try: + response = requests.get(API_HOST + 'mode', + params={ + 'name': _mode + }) + if response.status_code == 204: + print('Mode ' + _mode + ' unknown ') + return False, -1 + + response = requests.get(API_HOST + 'device', + params={ + 'name': _device + }) + if response.status_code == 204: + print('Device ' + _device + ' unknown') + return False, -1 + device_id = response.json()['id'] + + response = requests.get(API_HOST + 'measure', + params={ + 'device_id': device_id, + 'mode_name': _mode, + 'fill_name': _fill_name, + 'start': _start + }) + + if response.status_code != 204: + data_id = response.json()['data_id'] + response = requests.get(API_HOST + 'data_complete', params={id: data_id}) + if response.status_code == 200: + if response.json()['complete']: + print('data are already pulled for fill ' + str(_fill_name) + + ' for device ' + _device + + ' mode ' + _mode + + ' from ' + _start + ' to ' + _end) + return False, -1 + else: + return True, data_id + else: + print('error during dataCheck') + return False, -1 + + else: + response = requests.post(API_HOST + 'new_data') + data_id = response.json()['id'] + + response = requests.get(API_HOST + 'fill/metadata/name', params={'name': _fill_name}) + if response.status_code == 204: + requests.post(API_HOST + 'new_fill', + params={ + 'name': _fill_name, + 'start': _start, + 'end': _end + }) + + response = requests.get(API_HOST + 'fill_mode', + params={ + 'mode_name': _mode, + 'fill_name': _fill_name, + 'start': _start, + 'end': _end, + }) + if response.status_code == 204: + response = requests.post(API_HOST + 'new_fill_mode', + params={ + 'fill_name': _fill_name, + 'mode_name': _mode, + 'start': _start, + 'end': _end, + }) + fill_mode_id = response.json()['id'] + else: + fill_mode_id = response.json()['id'] + + requests.post(API_HOST + 'new_measure', + params={ + 'fill_mode_id': fill_mode_id, + 'data_id': data_id, + 'device_id': device_id + }) + return True, data_id + except HTTPError as err: + print('HTTPError' + str(err)) + return False, -1 + + # noinspection SqlResolve -def pull_histogram_pandas(_fill_name, _mode, _device, _start_time, _end_time, _files_write=False, _db_write=True): +def pull_histogram_pandas(_fill_name, _mode, + _device, + _start_time, + _end_time, + _files_write=False, + _db_write=True, + _data_id=None): _property = 'LoggingHistogram' + if (_data_id is None or _data_id == -1) and _db_write: + ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time) + if not ok or _data_id is None or _data_id == -1: + return -1 + data = {} df = DataFrame(KeyValuesQuery.builder(spark).system("CMW") @@ -184,8 +303,11 @@ def pull_histogram_pandas(_fill_name, _mode, _device, _start_time, _end_time, _f # Writing to db if _db_write: - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE Data SET histogram = %s where id = %s', (json.dumps(data), data_id,)) + db_put_response = requests.put(API_HOST + 'histogram', + params={ + 'id': _data_id + }, + json=data) write_file(_device, _start_time, _end_time, _property, data, _files_write) @@ -193,9 +315,21 @@ def pull_histogram_pandas(_fill_name, _mode, _device, _start_time, _end_time, _f # noinspection SqlResolve -def pull_integral_pandas(_fill_name, _mode, _device, _start_time, _end_time, _files_write=False, _db_write=True): +def pull_integral_pandas(_fill_name, + _mode, + _device, + _start_time, + _end_time, + _files_write=False, + _db_write=True, + _data_id=None): _property = "LoggingIntegral" + if (_data_id is None or _data_id == -1) and _db_write: + ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time) + if not ok or _data_id is None or _data_id == -1: + return -1 + data = {} df = DataFrame(KeyValuesQuery.builder(spark).system("CMW") @@ -301,8 +435,11 @@ def pull_integral_pandas(_fill_name, _mode, _device, _start_time, _end_time, _fi # Writing to db if _db_write: - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE Data SET integral = %s where id = %s', (json.dumps(data), data_id,)) + db_put_response = requests.put(API_HOST + 'integral', + params={ + 'id': _data_id + }, + json=data) write_file(_device, _start_time, _end_time, _property, data, _files_write) @@ -310,9 +447,21 @@ def pull_integral_pandas(_fill_name, _mode, _device, _start_time, _end_time, _fi # noinspection SqlResolve -def pull_raw_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time, _files_write=False, _db_write=True): +def pull_raw_dist_pandas(_fill_name, + _mode, + _device, + _start_time, + _end_time, + _files_write=False, + _db_write=True, + _data_id=None): _property = "LoggingRawDist" + if (_data_id is None or _data_id == -1) and _db_write: + ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time) + if not ok or _data_id is None or _data_id == -1: + return -1 + data = {} df = DataFrame(KeyValuesQuery.builder(spark).system("CMW") @@ -401,8 +550,11 @@ def pull_raw_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time, _fi # Writing to db if _db_write: - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE Data SET raw_dist = %s where id = %s', (json.dumps(data), data_id,)) + db_put_response = requests.put(API_HOST + 'raw_dist', + params={ + 'id': _data_id + }, + json=data) write_file(_device, _start_time, _end_time, _property, data, _files_write) @@ -410,9 +562,21 @@ def pull_raw_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time, _fi # noinspection SqlResolve -def pull_integral_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time, _files_write=False, _db_write=True): +def pull_integral_dist_pandas(_fill_name, + _mode, + _device, + _start_time, + _end_time, + _files_write=False, + _db_write=True, + _data_id=None): _property = "LoggingIntegralDist" + if (_data_id is None or _data_id == -1) and _db_write: + ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time) + if not ok or _data_id is None or _data_id == -1: + return -1 + data = {} df = DataFrame(KeyValuesQuery.builder(spark).system("CMW") @@ -509,8 +673,11 @@ def pull_integral_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time # Writing to db if _db_write: - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE Data SET integral_dist = %s where id = %s', (json.dumps(data), data_id,)) + requests.put(API_HOST + 'integral_dist', + params={ + 'id': _data_id + }, + json=data) write_file(_device, _start_time, _end_time, _property, data, _files_write) @@ -518,9 +685,21 @@ def pull_integral_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time # noinspection SqlResolve -def pull_turnloss_pandas(_fill_name, _mode, _device, _start_time, _end_time, _files_write=False, _db_write=True): +def pull_turnloss_pandas(_fill_name, + _mode, + _device, + _start_time, + _end_time, + _files_write=False, + _db_write=True, + _data_id=None): _property = "LoggingTurnLoss" + if (_data_id is None or _data_id == -1) and _db_write: + ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time) + if not ok or _data_id is None or _data_id == -1: + return -1 + data = {} df = DataFrame(KeyValuesQuery.builder(spark).system("CMW") @@ -633,8 +812,11 @@ def pull_turnloss_pandas(_fill_name, _mode, _device, _start_time, _end_time, _fi # Writing to db if _db_write: - db_cursor = mydb.cursor() - db_cursor.execute('UPDATE Data SET turnloss = %s where id = %s', (json.dumps(data), data_id,)) + requests.put(API_HOST + 'turnloss', + params={ + 'id': _data_id + }, + json=data) write_file(_device, _start_time, _end_time, _property, data, _files_write) @@ -642,74 +824,24 @@ def pull_turnloss_pandas(_fill_name, _mode, _device, _start_time, _end_time, _fi def property_pull(_fill_name, _mode, _device, _start, _end, _files_write=False, _db_write=True): - gcursor = mydb.cursor() - - gcursor.execute('select count(*) from Mode where name=%s', (_mode,)) - mode_exists = gcursor.fetchone()[0] - if mode_exists == 0: - print('Mode ' + _mode + ' unknown ') - return -1 - - gcursor.execute('SELECT id FROM Device where name=%s limit 1', (_device,)) - device_id = gcursor.fetchone()[0] - if device_id is None: - print('Device ' + _device + ' unknown') - return -1 - - gcursor.execute("select count(*) from Measure where device_id=%s " - "and fill_mode_id=( select id from FillMode " - "where mode_id=(select id from Mode where name=%s) " - "and fill_id=(select id from Fill where name=%s)" - "and start=%s)", (device_id, _mode, _fill_name, _start)) - - already_done = gcursor.fetchone()[0] - if already_done != 0: - print('Data for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end + ' already pulled') + ok, data_id = check_db(_fill_name, _mode, _device, _start, _end) + if not ok or data_id is None or data_id == -1: return -1 print('Pulling data for fill ' + str(_fill_name) + ' for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end) - global data_id - - gcursor.execute("insert into Data(histogram, integral, integral_dist, raw_dist, turnloss) " - "VALUES ( '{}', '{}', '{}', '{}', '{}');") - data_id = gcursor.lastrowid - - gcursor.execute("select count(*) from Fill where name=%s", (_fill_name,)) - if gcursor.fetchone()[0] == 0: - gcursor.execute("insert into Fill(name, start, end) values (%s, %s, %s)", (_fill_name, _start, _end)) - # print(data_id) - gcursor.execute("select count(*) from FillMode " - "where mode_id=(select id from Mode where name=%s) " - "and fill_id=(select id from Fill where name=%s) " - "and start=%s and end=%s", - (_mode, _fill_name, _start, _end)) - if gcursor.fetchone()[0] == 0: - gcursor.execute("insert into FillMode(fill_id, mode_id, start, end) " - "values ((select id from Fill where name=%s),(select id from Mode where name=%s), %s, %s)", - (_fill_name, _mode, _start, _end)) - fill_mode_id = gcursor.lastrowid - else: - gcursor.execute("select id from FillMode where mode_id=(select id from Mode where name=%s) " - "and fill_id=(select id from Fill where name=%s) " - "and start=%s and end=%s", - (_mode, _fill_name, _start, _end)) - fill_mode_id = gcursor.fetchone()[0] - - gcursor.execute('INSERT INTO Measure(fill_mode_id, data_id, device_id)' - 'VALUES ( %s, %s, %s )', - (fill_mode_id, data_id, device_id)) - - pull_histogram_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write) - pull_integral_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write) - pull_raw_dist_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write) - pull_integral_dist_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write) - pull_turnloss_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write) - - mydb.commit() + + pull_histogram_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id) + pull_integral_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id) + pull_raw_dist_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id) + pull_integral_dist_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id) + pull_turnloss_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id) + print('Pulled done, data pushed on DB for fill ' + str(_fill_name) + ' for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end) + return 0 + if __name__ == '__main__': @@ -736,10 +868,10 @@ if __name__ == '__main__': " format: 'yyyy-mm-dd hh:mm:ss.ms' or 'yyyy-mm-dd_hh:mm:ss.ms'") sys.exit(0) elif len(sys.argv) < 6: - print("usage: python property_pull.py <fillName> <device> <mode> <start> <end> [-f] \n" + print("usage: python cern_nxcals_dblm_property_pull.py <fillName> <device> <mode> <start> <end> [-f] \n" "where : \n" " -f also generate .dat files \n" - "type 'python property_pull.py -h' for more infos") + "type 'python cern_nxcals_dblm_property_pull.py -h' for more infos") sys.exit(1) else: _fill_name = sys.argv[1] diff --git a/routine_pull.py b/routine_pull.py index 54e35f0f1c0056f70b01dcce0716fe80d33c630e..42b01df4f80100e684b561aa6a37a82ed20cca43 100644 --- a/routine_pull.py +++ b/routine_pull.py @@ -1,7 +1,9 @@ -import mysql.connector import time -import property_pull +import mysql.connector +import requests + +from property_pull import property_pull import pytimber as pytimber import datetime as dt @@ -18,6 +20,8 @@ mydb = mysql.connector.connect( database=cfg.mysql['database'] ) +API_HOST = 'http://test-nxdblm-api2.web.cern.ch/' + def start_pull(_fill_infos, _devices=None): if _devices is None: @@ -29,18 +33,18 @@ def start_pull(_fill_infos, _devices=None): for iDevice in _devices: - property_pull.property_pull(_fill_infos['fillNumber'], 'ALL', iDevice, - _fill_infos['startTime'].strftime("%Y-%m-%d %H:%M:%S.000"), - _fill_infos['endTime'].strftime("%Y-%m-%d %H:%M:%S.000"), - _files_write=False) + property_pull(_fill_infos['fillNumber'], 'ALL', iDevice, + _fill_infos['startTime'].strftime("%Y-%m-%d %H:%M:%S.000"), + _fill_infos['endTime'].strftime("%Y-%m-%d %H:%M:%S.000"), + _files_write=False) for iMode in _fill_infos["beamModes"]: print(iMode['mode']) - property_pull.property_pull(_fill_infos['fillNumber'], iMode['mode'], iDevice, - iMode['startTime'].strftime("%Y-%m-%d %H:%M:%S.000"), - iMode['endTime'].strftime("%Y-%m-%d %H:%M:%S.000")) + property_pull(_fill_infos['fillNumber'], iMode['mode'], iDevice, + iMode['startTime'].strftime("%Y-%m-%d %H:%M:%S.000"), + iMode['endTime'].strftime("%Y-%m-%d %H:%M:%S.000")) - '''p = Process(target=property_pull.property_pull, + '''p = Process(target=cern_nxcals_dblm_property_pull.cern_nxcals_dblm_property_pull, args=(fill_infos['fillNumber'], iMode['mode'], 'BLMED.06L7.B1B30_TCPA.A6L7.B1', iMode['startTime'].strftime("%y-%m-%d %H:%M:%S.000"), iMode['endTime'].strftime("%y-%m-%d %H:%M:%S.000"))) @@ -51,17 +55,16 @@ def start_pull(_fill_infos, _devices=None): def routine_pull(_start_fill=None, _end_fill=None): ldb = pytimber.LoggingDB() - gcursor = mydb.cursor() - _start_fill = None - _end_fill = None if _start_fill is None and _end_fill is None: res = ldb.get('HX:FILLN', dt.datetime.now()) new_fill_number = int(res['HX:FILLN'][1][0]) print(new_fill_number) - gcursor.execute('select max(name) from Fill limit 1;') - last_fill_pulled = gcursor.fetchone()[0] + # gcursor.execute('select max(name) from Fill limit 1;') + # last_fill_pulled = gcursor.fetchone()[0] + response = requests.get(API_HOST + 'fill/last') + last_fill_pulled = response.json()['name'] print(last_fill_pulled) if new_fill_number == last_fill_pulled: @@ -69,24 +72,19 @@ def routine_pull(_start_fill=None, _end_fill=None): return -1 else: new_fill_number = _end_fill - last_fll_pulled = _start_fill - 1 + last_fill_pulled = _start_fill - gcursor.execute('select name from Device') - devices_request = gcursor.fetchall() + response = requests.get(API_HOST + 'devices') devices = [] - for device in devices_request: - devices.append(device[0]) + for device in response.json()['devices']: + devices.append(device['name']) - gcursor.close() - mydb.close() + # fill_number = 7444 + # fill_infos = ldb.getLHCFillData(fill_number, False) + # start_pull(fill_infos, devices) - fill_number = 7443 - fill_infos = ldb.getLHCFillData(fill_number, False) - start_pull(fill_infos, devices) - - ''' - for iFill in range(last_fill_pulled + 1, new_fill_number): - fill_infos = ldb.getLHCFillData(iFill, False) + for iFill in range(int(last_fill_pulled), int(new_fill_number)): + fill_infos = ldb.getLHCFillData(str(iFill), False) # check if the fill is finished if fill_infos['endTime'] is None: @@ -94,15 +92,11 @@ def routine_pull(_start_fill=None, _end_fill=None): # check if data is too new midnight_stamp = dt.datetime.now().replace(second=0, minute=0, hour=0) - dt.timedelta(days=1) - if time.mktime(midnight_stamp.timetuple()) < fill_infos['endTime']: + if midnight_stamp < fill_infos['endTime']: continue start_pull(fill_infos, devices) - ''' if __name__ == '__main__': - routine_pull() - - - + routine_pull(_start_fill=7440, _end_fill=7494)