Skip to content
Snippets Groups Projects
Commit 4ce1ff65 authored by Quentin Codelupi's avatar Quentin Codelupi
Browse files

[update] routine final

parent 4e8ecd87
No related branches found
No related tags found
No related merge requests found
pyconf.py
pulledData/
plotOutput/
prop_pull_api/
\ No newline at end of file
#!spark-submit --master yarn
import json
import sys
import requests
from requests.exceptions import HTTPError
import mysql.connector
import pyconf as cfg
......@@ -10,7 +11,6 @@ from cern.nxcals.pyquery.builders import *
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import *
from pyspark.accumulators import AccumulatorParam
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType
from pyspark.sql.utils import AnalysisException
......@@ -20,6 +20,8 @@ from multiprocessing import Process
import numpy as np
import pandas as pd
API_HOST = 'http://test-nxdblm-api2.web.cern.ch/'
_dest_dir = './pulledData'
# mysqlDB variables
......@@ -31,13 +33,10 @@ mydb = mysql.connector.connect(
database=cfg.mysql['database']
)
# id of the sql data's row for this filling
data_id = -1
# Sparks Variable
conf = SparkConf()
conf.setMaster('yarn')
conf.setAppName('property_pull.py')
conf.setAppName('cern_nxcals_dblm_property_pull.py')
sc = SparkContext(conf=conf)
sql_context = SQLContext(sc)
......@@ -64,6 +63,17 @@ def write_file(_device, _start_time, _end_time, _property, data, _files_write):
+ _end_time.replace(' ', '_') + '_' + _property + '.dat' + " Pulled")
''' uncompress_pandas
Apply a decompression process accordingly to NXCALS compression documentation :
zeros a removed from data_array index of non zeros data are stocked into an another array
Input :
_data_array : datas get from NXCALS for a compressed type of data
_index_array : indexes for non-zero data
_size : size of the final array
_nrows : number of rows given by NXCALS
'''
def uncompress_pandas(_data_array, _index_array, _size, _nrows):
print('uncompress start')
result = np.zeros(_size, dtype=np.int64)
......@@ -75,10 +85,119 @@ def uncompress_pandas(_data_array, _index_array, _size, _nrows):
return result.tolist()
''' check_db
Check some databases infos :
- device & mode are registered into the database.
- the measurement are already been done
- if already done, check the data completion
- if not already done,
- check if fill & fill_mode already exist (create if not)
- init & create a new measure
'''
def check_db(_fill_name, _mode, _device, _start, _end):
try:
response = requests.get(API_HOST + 'mode',
params={
'name': _mode
})
if response.status_code == 204:
print('Mode ' + _mode + ' unknown ')
return False, -1
response = requests.get(API_HOST + 'device',
params={
'name': _device
})
if response.status_code == 204:
print('Device ' + _device + ' unknown')
return False, -1
device_id = response.json()['id']
response = requests.get(API_HOST + 'measure',
params={
'device_id': device_id,
'mode_name': _mode,
'fill_name': _fill_name,
'start': _start
})
if response.status_code != 204:
data_id = response.json()['data_id']
response = requests.get(API_HOST + 'data_complete', params={id: data_id})
if response.status_code == 200:
if response.json()['complete']:
print('data are already pulled for fill ' + str(_fill_name) +
' for device ' + _device +
' mode ' + _mode +
' from ' + _start + ' to ' + _end)
return False, -1
else:
return True, data_id
else:
print('error during dataCheck')
return False, -1
else:
response = requests.post(API_HOST + 'new_data')
data_id = response.json()['id']
response = requests.get(API_HOST + 'fill/metadata/name', params={'name': _fill_name})
if response.status_code == 204:
requests.post(API_HOST + 'new_fill',
params={
'name': _fill_name,
'start': _start,
'end': _end
})
response = requests.get(API_HOST + 'fill_mode',
params={
'mode_name': _mode,
'fill_name': _fill_name,
'start': _start,
'end': _end,
})
if response.status_code == 204:
response = requests.post(API_HOST + 'new_fill_mode',
params={
'fill_name': _fill_name,
'mode_name': _mode,
'start': _start,
'end': _end,
})
fill_mode_id = response.json()['id']
else:
fill_mode_id = response.json()['id']
requests.post(API_HOST + 'new_measure',
params={
'fill_mode_id': fill_mode_id,
'data_id': data_id,
'device_id': device_id
})
return True, data_id
except HTTPError as err:
print('HTTPError' + str(err))
return False, -1
# noinspection SqlResolve
def pull_histogram_pandas(_fill_name, _mode, _device, _start_time, _end_time, _files_write=False, _db_write=True):
def pull_histogram_pandas(_fill_name, _mode,
_device,
_start_time,
_end_time,
_files_write=False,
_db_write=True,
_data_id=None):
_property = 'LoggingHistogram'
if (_data_id is None or _data_id == -1) and _db_write:
ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time)
if not ok or _data_id is None or _data_id == -1:
return -1
data = {}
df = DataFrame(KeyValuesQuery.builder(spark).system("CMW")
......@@ -184,8 +303,11 @@ def pull_histogram_pandas(_fill_name, _mode, _device, _start_time, _end_time, _f
# Writing to db
if _db_write:
db_cursor = mydb.cursor()
db_cursor.execute('UPDATE Data SET histogram = %s where id = %s', (json.dumps(data), data_id,))
db_put_response = requests.put(API_HOST + 'histogram',
params={
'id': _data_id
},
json=data)
write_file(_device, _start_time, _end_time, _property, data, _files_write)
......@@ -193,9 +315,21 @@ def pull_histogram_pandas(_fill_name, _mode, _device, _start_time, _end_time, _f
# noinspection SqlResolve
def pull_integral_pandas(_fill_name, _mode, _device, _start_time, _end_time, _files_write=False, _db_write=True):
def pull_integral_pandas(_fill_name,
_mode,
_device,
_start_time,
_end_time,
_files_write=False,
_db_write=True,
_data_id=None):
_property = "LoggingIntegral"
if (_data_id is None or _data_id == -1) and _db_write:
ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time)
if not ok or _data_id is None or _data_id == -1:
return -1
data = {}
df = DataFrame(KeyValuesQuery.builder(spark).system("CMW")
......@@ -301,8 +435,11 @@ def pull_integral_pandas(_fill_name, _mode, _device, _start_time, _end_time, _fi
# Writing to db
if _db_write:
db_cursor = mydb.cursor()
db_cursor.execute('UPDATE Data SET integral = %s where id = %s', (json.dumps(data), data_id,))
db_put_response = requests.put(API_HOST + 'integral',
params={
'id': _data_id
},
json=data)
write_file(_device, _start_time, _end_time, _property, data, _files_write)
......@@ -310,9 +447,21 @@ def pull_integral_pandas(_fill_name, _mode, _device, _start_time, _end_time, _fi
# noinspection SqlResolve
def pull_raw_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time, _files_write=False, _db_write=True):
def pull_raw_dist_pandas(_fill_name,
_mode,
_device,
_start_time,
_end_time,
_files_write=False,
_db_write=True,
_data_id=None):
_property = "LoggingRawDist"
if (_data_id is None or _data_id == -1) and _db_write:
ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time)
if not ok or _data_id is None or _data_id == -1:
return -1
data = {}
df = DataFrame(KeyValuesQuery.builder(spark).system("CMW")
......@@ -401,8 +550,11 @@ def pull_raw_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time, _fi
# Writing to db
if _db_write:
db_cursor = mydb.cursor()
db_cursor.execute('UPDATE Data SET raw_dist = %s where id = %s', (json.dumps(data), data_id,))
db_put_response = requests.put(API_HOST + 'raw_dist',
params={
'id': _data_id
},
json=data)
write_file(_device, _start_time, _end_time, _property, data, _files_write)
......@@ -410,9 +562,21 @@ def pull_raw_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time, _fi
# noinspection SqlResolve
def pull_integral_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time, _files_write=False, _db_write=True):
def pull_integral_dist_pandas(_fill_name,
_mode,
_device,
_start_time,
_end_time,
_files_write=False,
_db_write=True,
_data_id=None):
_property = "LoggingIntegralDist"
if (_data_id is None or _data_id == -1) and _db_write:
ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time)
if not ok or _data_id is None or _data_id == -1:
return -1
data = {}
df = DataFrame(KeyValuesQuery.builder(spark).system("CMW")
......@@ -509,8 +673,11 @@ def pull_integral_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time
# Writing to db
if _db_write:
db_cursor = mydb.cursor()
db_cursor.execute('UPDATE Data SET integral_dist = %s where id = %s', (json.dumps(data), data_id,))
requests.put(API_HOST + 'integral_dist',
params={
'id': _data_id
},
json=data)
write_file(_device, _start_time, _end_time, _property, data, _files_write)
......@@ -518,9 +685,21 @@ def pull_integral_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time
# noinspection SqlResolve
def pull_turnloss_pandas(_fill_name, _mode, _device, _start_time, _end_time, _files_write=False, _db_write=True):
def pull_turnloss_pandas(_fill_name,
_mode,
_device,
_start_time,
_end_time,
_files_write=False,
_db_write=True,
_data_id=None):
_property = "LoggingTurnLoss"
if (_data_id is None or _data_id == -1) and _db_write:
ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time)
if not ok or _data_id is None or _data_id == -1:
return -1
data = {}
df = DataFrame(KeyValuesQuery.builder(spark).system("CMW")
......@@ -633,8 +812,11 @@ def pull_turnloss_pandas(_fill_name, _mode, _device, _start_time, _end_time, _fi
# Writing to db
if _db_write:
db_cursor = mydb.cursor()
db_cursor.execute('UPDATE Data SET turnloss = %s where id = %s', (json.dumps(data), data_id,))
requests.put(API_HOST + 'turnloss',
params={
'id': _data_id
},
json=data)
write_file(_device, _start_time, _end_time, _property, data, _files_write)
......@@ -642,74 +824,24 @@ def pull_turnloss_pandas(_fill_name, _mode, _device, _start_time, _end_time, _fi
def property_pull(_fill_name, _mode, _device, _start, _end, _files_write=False, _db_write=True):
gcursor = mydb.cursor()
gcursor.execute('select count(*) from Mode where name=%s', (_mode,))
mode_exists = gcursor.fetchone()[0]
if mode_exists == 0:
print('Mode ' + _mode + ' unknown ')
return -1
gcursor.execute('SELECT id FROM Device where name=%s limit 1', (_device,))
device_id = gcursor.fetchone()[0]
if device_id is None:
print('Device ' + _device + ' unknown')
return -1
gcursor.execute("select count(*) from Measure where device_id=%s "
"and fill_mode_id=( select id from FillMode "
"where mode_id=(select id from Mode where name=%s) "
"and fill_id=(select id from Fill where name=%s)"
"and start=%s)", (device_id, _mode, _fill_name, _start))
already_done = gcursor.fetchone()[0]
if already_done != 0:
print('Data for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end + ' already pulled')
ok, data_id = check_db(_fill_name, _mode, _device, _start, _end)
if not ok or data_id is None or data_id == -1:
return -1
print('Pulling data for fill '
+ str(_fill_name) + ' for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end)
global data_id
gcursor.execute("insert into Data(histogram, integral, integral_dist, raw_dist, turnloss) "
"VALUES ( '{}', '{}', '{}', '{}', '{}');")
data_id = gcursor.lastrowid
gcursor.execute("select count(*) from Fill where name=%s", (_fill_name,))
if gcursor.fetchone()[0] == 0:
gcursor.execute("insert into Fill(name, start, end) values (%s, %s, %s)", (_fill_name, _start, _end))
# print(data_id)
gcursor.execute("select count(*) from FillMode "
"where mode_id=(select id from Mode where name=%s) "
"and fill_id=(select id from Fill where name=%s) "
"and start=%s and end=%s",
(_mode, _fill_name, _start, _end))
if gcursor.fetchone()[0] == 0:
gcursor.execute("insert into FillMode(fill_id, mode_id, start, end) "
"values ((select id from Fill where name=%s),(select id from Mode where name=%s), %s, %s)",
(_fill_name, _mode, _start, _end))
fill_mode_id = gcursor.lastrowid
else:
gcursor.execute("select id from FillMode where mode_id=(select id from Mode where name=%s) "
"and fill_id=(select id from Fill where name=%s) "
"and start=%s and end=%s",
(_mode, _fill_name, _start, _end))
fill_mode_id = gcursor.fetchone()[0]
gcursor.execute('INSERT INTO Measure(fill_mode_id, data_id, device_id)'
'VALUES ( %s, %s, %s )',
(fill_mode_id, data_id, device_id))
pull_histogram_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write)
pull_integral_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write)
pull_raw_dist_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write)
pull_integral_dist_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write)
pull_turnloss_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write)
mydb.commit()
pull_histogram_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id)
pull_integral_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id)
pull_raw_dist_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id)
pull_integral_dist_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id)
pull_turnloss_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id)
print('Pulled done, data pushed on DB for fill '
+ str(_fill_name) + ' for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end)
return 0
if __name__ == '__main__':
......@@ -736,10 +868,10 @@ if __name__ == '__main__':
" format: 'yyyy-mm-dd hh:mm:ss.ms' or 'yyyy-mm-dd_hh:mm:ss.ms'")
sys.exit(0)
elif len(sys.argv) < 6:
print("usage: python property_pull.py <fillName> <device> <mode> <start> <end> [-f] \n"
print("usage: python cern_nxcals_dblm_property_pull.py <fillName> <device> <mode> <start> <end> [-f] \n"
"where : \n"
" -f also generate .dat files \n"
"type 'python property_pull.py -h' for more infos")
"type 'python cern_nxcals_dblm_property_pull.py -h' for more infos")
sys.exit(1)
else:
_fill_name = sys.argv[1]
......
import mysql.connector
import time
import property_pull
import mysql.connector
import requests
from property_pull import property_pull
import pytimber as pytimber
import datetime as dt
......@@ -18,6 +20,8 @@ mydb = mysql.connector.connect(
database=cfg.mysql['database']
)
API_HOST = 'http://test-nxdblm-api2.web.cern.ch/'
def start_pull(_fill_infos, _devices=None):
if _devices is None:
......@@ -29,18 +33,18 @@ def start_pull(_fill_infos, _devices=None):
for iDevice in _devices:
property_pull.property_pull(_fill_infos['fillNumber'], 'ALL', iDevice,
_fill_infos['startTime'].strftime("%Y-%m-%d %H:%M:%S.000"),
_fill_infos['endTime'].strftime("%Y-%m-%d %H:%M:%S.000"),
_files_write=False)
property_pull(_fill_infos['fillNumber'], 'ALL', iDevice,
_fill_infos['startTime'].strftime("%Y-%m-%d %H:%M:%S.000"),
_fill_infos['endTime'].strftime("%Y-%m-%d %H:%M:%S.000"),
_files_write=False)
for iMode in _fill_infos["beamModes"]:
print(iMode['mode'])
property_pull.property_pull(_fill_infos['fillNumber'], iMode['mode'], iDevice,
iMode['startTime'].strftime("%Y-%m-%d %H:%M:%S.000"),
iMode['endTime'].strftime("%Y-%m-%d %H:%M:%S.000"))
property_pull(_fill_infos['fillNumber'], iMode['mode'], iDevice,
iMode['startTime'].strftime("%Y-%m-%d %H:%M:%S.000"),
iMode['endTime'].strftime("%Y-%m-%d %H:%M:%S.000"))
'''p = Process(target=property_pull.property_pull,
'''p = Process(target=cern_nxcals_dblm_property_pull.cern_nxcals_dblm_property_pull,
args=(fill_infos['fillNumber'], iMode['mode'], 'BLMED.06L7.B1B30_TCPA.A6L7.B1',
iMode['startTime'].strftime("%y-%m-%d %H:%M:%S.000"),
iMode['endTime'].strftime("%y-%m-%d %H:%M:%S.000")))
......@@ -51,17 +55,16 @@ def start_pull(_fill_infos, _devices=None):
def routine_pull(_start_fill=None, _end_fill=None):
ldb = pytimber.LoggingDB()
gcursor = mydb.cursor()
_start_fill = None
_end_fill = None
if _start_fill is None and _end_fill is None:
res = ldb.get('HX:FILLN', dt.datetime.now())
new_fill_number = int(res['HX:FILLN'][1][0])
print(new_fill_number)
gcursor.execute('select max(name) from Fill limit 1;')
last_fill_pulled = gcursor.fetchone()[0]
# gcursor.execute('select max(name) from Fill limit 1;')
# last_fill_pulled = gcursor.fetchone()[0]
response = requests.get(API_HOST + 'fill/last')
last_fill_pulled = response.json()['name']
print(last_fill_pulled)
if new_fill_number == last_fill_pulled:
......@@ -69,24 +72,19 @@ def routine_pull(_start_fill=None, _end_fill=None):
return -1
else:
new_fill_number = _end_fill
last_fll_pulled = _start_fill - 1
last_fill_pulled = _start_fill
gcursor.execute('select name from Device')
devices_request = gcursor.fetchall()
response = requests.get(API_HOST + 'devices')
devices = []
for device in devices_request:
devices.append(device[0])
for device in response.json()['devices']:
devices.append(device['name'])
gcursor.close()
mydb.close()
# fill_number = 7444
# fill_infos = ldb.getLHCFillData(fill_number, False)
# start_pull(fill_infos, devices)
fill_number = 7443
fill_infos = ldb.getLHCFillData(fill_number, False)
start_pull(fill_infos, devices)
'''
for iFill in range(last_fill_pulled + 1, new_fill_number):
fill_infos = ldb.getLHCFillData(iFill, False)
for iFill in range(int(last_fill_pulled), int(new_fill_number)):
fill_infos = ldb.getLHCFillData(str(iFill), False)
# check if the fill is finished
if fill_infos['endTime'] is None:
......@@ -94,15 +92,11 @@ def routine_pull(_start_fill=None, _end_fill=None):
# check if data is too new
midnight_stamp = dt.datetime.now().replace(second=0, minute=0, hour=0) - dt.timedelta(days=1)
if time.mktime(midnight_stamp.timetuple()) < fill_infos['endTime']:
if midnight_stamp < fill_infos['endTime']:
continue
start_pull(fill_infos, devices)
'''
if __name__ == '__main__':
routine_pull()
routine_pull(_start_fill=7440, _end_fill=7494)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment