Newer
Older
import requests
from requests.exceptions import HTTPError
from cern.nxcals.pyquery.builders import *
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import *
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType
from pyspark.sql.utils import AnalysisException
import pandas as pd
API_HOST = 'http://test-nxdblm-api2.web.cern.ch/'
conf = SparkConf()
conf.setMaster('yarn')
conf.setAppName('cern_nxcals_dblm_property_pull.py')
sc = SparkContext(conf=conf)
sql_context = SQLContext(sc)
spark = SparkSession(sc)
sums_cols = udf(lambda arr: 0 if arr == [] else sum(arr), IntegerType())
# Reduce ( UDAF ? )
# Scala UDF ? (working with python's lambda for now )
def write_file(_device, _start_time, _end_time, _property, data, _files_write):
if _files_write:
# Writing output #
with open(_dest_dir + '/' + _device + '_' + _start_time.replace(' ', '_') + 'to'
+ _end_time.replace(' ', '_') + '_' + _property + '.dat', 'w') as outfile:
json.dump(data, outfile)
print(_dest_dir + '/' + _device + '_' + _start_time.replace(' ', '_') + 'to'
+ _end_time.replace(' ', '_') + '_' + _property + '.dat' + " Pulled")
''' uncompress_pandas
Apply a decompression process accordingly to NXCALS compression documentation :
zeros a removed from data_array index of non zeros data are stocked into an another array
Input :
_data_array : datas get from NXCALS for a compressed type of data
_index_array : indexes for non-zero data
_size : size of the final array
_nrows : number of rows given by NXCALS
'''
def uncompress_pandas(_data_array, _index_array, _size, _nrows):
print('uncompress start')
result = np.zeros(_size, dtype=np.int64)
for i in range(_nrows):
tmpresult = np.zeros(_size, dtype=np.int)
np.put(tmpresult, _index_array[i][0], _data_array[i][0])
result = np.add(result, tmpresult)
print("uncompress finished")
return result.tolist()
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
''' check_db
Check some databases infos :
- device & mode are registered into the database.
- the measurement are already been done
- if already done, check the data completion
- if not already done,
- check if fill & fill_mode already exist (create if not)
- init & create a new measure
'''
def check_db(_fill_name, _mode, _device, _start, _end):
try:
response = requests.get(API_HOST + 'mode',
params={
'name': _mode
})
if response.status_code == 204:
print('Mode ' + _mode + ' unknown ')
return False, -1
response = requests.get(API_HOST + 'device',
params={
'name': _device
})
if response.status_code == 204:
print('Device ' + _device + ' unknown')
return False, -1
device_id = response.json()['id']
response = requests.get(API_HOST + 'measure',
params={
'device_id': device_id,
'mode_name': _mode,
'fill_name': _fill_name,
'start': _start
})
if response.status_code != 204:
data_id = response.json()['data_id']
response = requests.get(API_HOST + 'data_complete', params={id: data_id})
if response.status_code == 200:
if response.json()['complete']:
print('data are already pulled for fill ' + str(_fill_name) +
' for device ' + _device +
' mode ' + _mode +
' from ' + _start + ' to ' + _end)
return False, -1
else:
return True, data_id
else:
print('error during dataCheck')
return False, -1
else:
response = requests.post(API_HOST + 'new_data')
data_id = response.json()['id']
response = requests.get(API_HOST + 'fill/metadata/name', params={'name': _fill_name})
if response.status_code == 204:
requests.post(API_HOST + 'new_fill',
params={
'name': _fill_name,
'start': _start,
'end': _end
})
response = requests.get(API_HOST + 'fill_mode',
params={
'mode_name': _mode,
'fill_name': _fill_name,
'start': _start,
'end': _end,
})
if response.status_code == 204:
response = requests.post(API_HOST + 'new_fill_mode',
params={
'fill_name': _fill_name,
'mode_name': _mode,
'start': _start,
'end': _end,
})
fill_mode_id = response.json()['id']
else:
fill_mode_id = response.json()['id']
requests.post(API_HOST + 'new_measure',
params={
'fill_mode_id': fill_mode_id,
'data_id': data_id,
'device_id': device_id
})
return True, data_id
except HTTPError as err:
print('HTTPError' + str(err))
return False, -1
# noinspection SqlResolve
def pull_histogram_pandas(_fill_name, _mode,
_device,
_start_time,
_end_time,
_files_write=False,
_db_write=True,
_data_id=None):
_property = 'LoggingHistogram'
if (_data_id is None or _data_id == -1) and _db_write:
ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time)
if not ok or _data_id is None or _data_id == -1:
return -1
data = {}
df = DataFrame(KeyValuesQuery.builder(spark).system("CMW")
.startTime(_start_time)
.endTime(_end_time)
.entity()
.keyValue("device", _device)
.keyValue("property", _property)
.buildDataset(), sql_context)
df.registerTempTable("pulled_data")
try:
array_dataset_hg = spark.sql("select histogram_hg_com.elements from pulled_data") \
.toPandas()
array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x))
data['histogram_hg_intime'] = array_dataset_hg['Total'].values.tolist()
except AnalysisException:
print('histogram_hg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device)
array_dataset_hg = None
data['histogram_hg_intime'] = []
try:
array_index_hg = spark.sql("select histogram_hg_ind.elements from pulled_data") \
.toPandas()
except AnalysisException:
print('histogram_hg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device)
array_index_hg = None
# Process lg array data
try:
array_dataset_lg = spark.sql("select histogram_lg_com.elements from pulled_data") \
.toPandas()
array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x))
data['histogram_lg_intime'] = array_dataset_lg['Total'].values.tolist()
except AnalysisException:
print('histogram_lg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device)
array_dataset_lg = None
data['histogram_lg_intime'] = []
try:
array_index_lg = spark.sql("select histogram_lg_ind.elements from pulled_data") \
.toPandas()
except AnalysisException:
print('histogram_hg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device)
array_index_lg = None
# Metadata
if array_dataset_hg is not None:
data['rows'] = array_dataset_hg.shape[0]
elif array_dataset_lg is not None:
data['rows'] = array_dataset_lg.shape[0]
else:
data['rows'] = 0
data['startTime'] = _start_time
data['endTime'] = _end_time
try:
data['threshold_hg'] = spark.sql("select threshold_hg from pulled_data limit 1").toPandas().values.tolist()
if len(data['threshold_hg']) > 0:
data['threshold_hg'] = data['threshold_hg'][0]
except AnalysisException:
data['threshold_hg'] = 0
try:
data['threshold_lg'] = spark.sql("select threshold_lg from pulled_data limit 1").toPandas().values.tolist()
if len(data['threshold_lg']) > 0:
data['threshold_lg'] = data['threshold_lg'][0]
except AnalysisException:
data['threshold_lg'] = 0
try:
data['nsamples'] = spark.sql("select nsamples from pulled_data limit 1").toPandas().values.tolist()
if len(data['nsamples']) > 0:
data['nsamples'] = data['nsamples'][0]
except AnalysisException:
data['nsamples'] = 0
# Timestamp #
try:
time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \
.toPandas()
data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist()
except AnalysisException:
data['timestamps'] = []
# Histogram data #
if array_dataset_hg is None or array_index_hg is None:
data['histogram_hg'] = []
else:
data['histogram_hg'] = uncompress_pandas(array_dataset_hg.values,
array_index_hg.values,
57802, data['rows'] - 1)
if array_dataset_lg is None or array_index_lg is None:
data['histogram_lg'] = []
else:
data['histogram_lg'] = uncompress_pandas(array_dataset_lg.values,
array_index_lg.values,
57802, data['rows'] - 1)
# Writing to db
db_put_response = requests.put(API_HOST + 'histogram',
params={
'id': _data_id
},
json=data)
write_file(_device, _start_time, _end_time, _property, data, _files_write)
return data
# noinspection SqlResolve
def pull_integral_pandas(_fill_name,
_mode,
_device,
_start_time,
_end_time,
_files_write=False,
_db_write=True,
_data_id=None):
_property = "LoggingIntegral"
if (_data_id is None or _data_id == -1) and _db_write:
ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time)
if not ok or _data_id is None or _data_id == -1:
return -1
data = {}
df = DataFrame(KeyValuesQuery.builder(spark).system("CMW")
.startTime(_start_time)
.endTime(_end_time)
.entity()
.keyValue("device", _device)
.keyValue("property", _property)
.buildDataset(), sql_context)
df.registerTempTable("pulled_data")
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
try:
array_dataset_hg = spark.sql("select integral_hg.elements from pulled_data") \
.toPandas()
array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x))
data['integral_hg_intime'] = array_dataset_hg['Total'].values.tolist()
integral_hg = np.zeros(3564, dtype=np.int)
for T in array_dataset_hg.values:
if len(T[0]) == 3564:
integral_hg = np.add(integral_hg, T[0])
data['integral_hg'] = integral_hg.tolist()
except AnalysisException:
print('integral_hg not know for fill ' + str(_fill_name) + ' mode ' + _mode + ' device ' + _device)
array_dataset_hg = None
data['integral_hg_intime'] = []
data['integral_hg'] = []
try:
array_dataset_lg = spark.sql("select integral_lg.elements from pulled_data") \
.toPandas()
array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x))
data['integral_lg_intime'] = array_dataset_lg['Total'].values.tolist()
integral_lg = np.zeros(3564, dtype=np.int)
for T in array_dataset_lg.values:
if len(T[0]) == 3564:
integral_lg = np.add(integral_lg, T[0])
data['integral_lg'] = integral_lg.tolist()
except AnalysisException:
array_dataset_lg = None
data['integral_lg_intime'] = []
data['integral_lg'] = []
# Metadata
if array_dataset_hg is not None:
data['rows'] = array_dataset_hg.shape[0]
elif array_dataset_lg is not None:
data['rows'] = array_dataset_lg.shape[0]
else:
data['rows'] = 0
data['startTime'] = _start_time
data['endTime'] = _end_time
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
try:
data['zsthr_hg'] = spark.sql("select zsthr_hg from pulled_data limit 1").toPandas().values.tolist()
if len(data['zsthr_hg']) > 0:
data['zsthr_hg'] = data['zsthr_hg'][0]
except AnalysisException:
data['zsthr_hg'] = 0
try:
data['zsthrave_hg'] = spark.sql("select zsthrave_hg from pulled_data limit 1").toPandas().values.tolist()
if len(data['zsthrave_hg']) > 0:
data['zsthrave_hg'] = data['zsthrave_hg'][0]
except AnalysisException:
data['zsthrave_hg'] = 0
try:
data['zsthr_lg'] = spark.sql("select zsthr_lg from pulled_data limit 1").toPandas().values.tolist()
if len(data['zsthr_lg']) > 0:
data['zsthr_lg'] = data['zsthr_lg'][0]
except AnalysisException:
data['zsthr_lg'] = 0
try:
data['zsthrave_lg'] = spark.sql("select zsthrave_lg from pulled_data limit 1").toPandas().values.tolist()
if len(data['zsthrave_lg']) > 0:
data['zsthrave_lg'] = data['zsthrave_lg'][0]
except AnalysisException:
data['zsthrave_lg'] = -1
try:
data['baselinesub'] = spark.sql("select baselinesub from pulled_data limit 1").toPandas().values.tolist()
if len(data['baselinesub']) > 0:
data['baselinesub'] = data['baselinesub'][0]
except AnalysisException:
data['baselinesub'] = 0
# Timestamp #
try:
time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \
.toPandas()
data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist()
except AnalysisException:
data['timestamps'] = []
# Writing to db
db_put_response = requests.put(API_HOST + 'integral',
params={
'id': _data_id
},
json=data)
write_file(_device, _start_time, _end_time, _property, data, _files_write)
return data
# noinspection SqlResolve
def pull_raw_dist_pandas(_fill_name,
_mode,
_device,
_start_time,
_end_time,
_files_write=False,
_db_write=True,
_data_id=None):
_property = "LoggingRawDist"
if (_data_id is None or _data_id == -1) and _db_write:
ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time)
if not ok or _data_id is None or _data_id == -1:
return -1
data = {}
df = DataFrame(KeyValuesQuery.builder(spark).system("CMW")
.startTime(_start_time)
.endTime(_end_time)
.entity()
.keyValue("device", _device)
.keyValue("property", _property)
.buildDataset(), sql_context)
df.registerTempTable("pulled_data")
# Process hg Data #
try:
array_dataset_hg = spark.sql("select distribution_hg.elements from pulled_data") \
.toPandas()
result_distrib_hg = np.zeros(2048, dtype=np.int)
for R in array_dataset_hg.values:
if len(R[0]) == 2048:
result_distrib_hg = np.add(result_distrib_hg, R[0])
data['distribution_hg'] = result_distrib_hg.tolist()
except AnalysisException:
array_dataset_hg = None
data['distribution_hg'] = []
# Process lg Data #
try:
array_dataset_lg = spark.sql("select distribution_lg.elements from pulled_data") \
.toPandas()
result_distrib_lg = np.zeros(2048, dtype=np.int)
for R in array_dataset_lg.values:
if len(R[0]) == 2048:
result_distrib_lg = np.add(result_distrib_lg, R[0])
data['distribution_lg'] = result_distrib_lg.tolist()
except AnalysisException:
array_dataset_lg = None
data['distribution_lg'] = []
# Metadata
if array_dataset_hg is not None:
data['rows'] = array_dataset_hg.shape[0]
elif array_dataset_lg is not None:
data['rows'] = array_dataset_lg.shape[0]
else:
data['rows'] = 0
data['startTime'] = _start_time
data['endTime'] = _end_time
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
try:
data['lsbcut_hg'] = spark.sql("select lsbcut_hg from pulled_data limit 1").toPandas().values.tolist()
if len(data['lsbcut_hg']) > 0:
data['lsbcut_hg'] = data['lsbcut_hg'][0]
except AnalysisException:
data['lsbcut_hg'] = 0
try:
data['offset_hg'] = spark.sql("select offset_hg from pulled_data limit 1").toPandas().values.tolist()
if len(data['offset_hg']) > 0:
data['offset_hg'] = data['offset_hg'][0]
except AnalysisException:
data['offset_hg'] = 0
try:
data['lsbcut_lg'] = spark.sql("select lsbcut_lg from pulled_data limit 1").toPandas().values.tolist()
if len(data['lsbcut_lg']) > 0:
data['lsbcut_lg'] = data['lsbcut_lg'][0]
except AnalysisException:
data['lsbcut_lg'] = 0
try:
data['offset_lg'] = spark.sql("select offset_lg from pulled_data limit 1").toPandas().values.tolist()
if len(data['offset_lg']) > 0:
data['offset_lg'] = data['offset_lg'][0]
except AnalysisException:
data['offset_lg'] = 0
# Timestamp #
try:
time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \
.toPandas()
data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist()
except AnalysisException:
data['timestamps'] = []
# Writing to db
db_put_response = requests.put(API_HOST + 'raw_dist',
params={
'id': _data_id
},
json=data)
write_file(_device, _start_time, _end_time, _property, data, _files_write)
return data
# noinspection SqlResolve
def pull_integral_dist_pandas(_fill_name,
_mode,
_device,
_start_time,
_end_time,
_files_write=False,
_db_write=True,
_data_id=None):
_property = "LoggingIntegralDist"
if (_data_id is None or _data_id == -1) and _db_write:
ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time)
if not ok or _data_id is None or _data_id == -1:
return -1
data = {}
df = DataFrame(KeyValuesQuery.builder(spark).system("CMW")
.startTime(_start_time)
.endTime(_end_time)
.entity()
.keyValue("device", _device)
.keyValue("property", _property)
.buildDataset(), sql_context)
df.registerTempTable("pulled_data")
# Process hg Data #
try:
array_dataset_hg = spark.sql("select distribution_hg.elements from pulled_data") \
.toPandas()
result_distrib_hg = np.zeros(2048, dtype=np.int)
for R in array_dataset_hg.values:
if len(R[0]) == 2048:
result_distrib_hg = np.add(result_distrib_hg, R[0])
data['distribution_hg'] = result_distrib_hg.tolist()
except AnalysisException:
data['distribution_hg'] = []
array_dataset_hg = None
# Process lg Data #
try:
array_dataset_lg = spark.sql("select distribution_lg.elements from pulled_data") \
.toPandas()
result_distrib_lg = np.zeros(2048, dtype=np.int)
for R in array_dataset_lg.values:
if len(R[0]) == 2048:
result_distrib_lg = np.add(result_distrib_lg, R[0])
data['distribution_lg'] = result_distrib_lg.tolist()
except AnalysisException:
array_dataset_lg = None
data['distribution_lg'] = []
# Metadata #
if array_dataset_hg is not None:
data['rows'] = array_dataset_hg.shape[0]
elif array_dataset_lg is not None:
data['rows'] = array_dataset_lg.shape[0]
else:
data['rows'] = 0
data['startTime'] = _start_time
data['endTime'] = _end_time
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
try:
data['lsbcut_hg'] = spark.sql("select lsbcut_hg from pulled_data limit 1").toPandas().values.tolist()
if len(data['lsbcut_hg']) > 0:
data['lsbcut_hg'] = data['lsbcut_hg'][0]
except AnalysisException:
data['lsbcut_hg'] = 0
try:
data['offset_hg'] = spark.sql("select offset_hg from pulled_data limit 1").toPandas().values.tolist()
if len(data['offset_hg']) > 0:
data['offset_hg'] = data['offset_hg'][0]
except AnalysisException:
data['offset_hg'] = 0
try:
data['lsbcut_lg'] = spark.sql("select lsbcut_lg from pulled_data limit 1").toPandas().values.tolist()
if len(data['lsbcut_lg']) > 0:
data['lsbcut_lg'] = data['lsbcut_lg'][0]
except AnalysisException:
data['lsbcut_lg'] = 0
try:
data['offset_lg'] = spark.sql("select offset_lg from pulled_data limit 1").toPandas().values.tolist()
if len(data['offset_lg']) > 0:
data['offset_lg'] = data['offset_lg'][0]
except AnalysisException:
data['offset_lg'] = 0
try:
data['nturns'] = spark.sql("select nturns from pulled_data limit 1").toPandas().values.tolist()
if len(data['nturns']) > 0:
data['nturns'] = data['nturns'][0]
except AnalysisException:
data['nturns'] = 0
# Timestamp #
try:
time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \
.toPandas()
data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist()
except AnalysisException:
data['timestamps'] = []
# Writing to db
requests.put(API_HOST + 'integral_dist',
params={
'id': _data_id
},
json=data)
write_file(_device, _start_time, _end_time, _property, data, _files_write)
return data
# noinspection SqlResolve
def pull_turnloss_pandas(_fill_name,
_mode,
_device,
_start_time,
_end_time,
_files_write=False,
_db_write=True,
_data_id=None):
_property = "LoggingTurnLoss"
if (_data_id is None or _data_id == -1) and _db_write:
ok, _data_id = check_db(_fill_name, _mode, _device, _start_time, _end_time)
if not ok or _data_id is None or _data_id == -1:
return -1
data = {}
df = DataFrame(KeyValuesQuery.builder(spark).system("CMW")
.startTime(_start_time)
.endTime(_end_time)
.entity()
.keyValue("device", _device)
.keyValue("property", _property)
.buildDataset(), sql_context)
df.registerTempTable("pulled_data")
# Process hg Data #
try:
array_dataset_hg = spark.sql("select turnloss_hg_com.elements from pulled_data") \
.toPandas()
array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x))
data['turnloss_hg_intime'] = array_dataset_hg['Total'].values.tolist()
except AnalysisException:
array_dataset_hg = None
data['turnloss_hg_intime'] = []
try:
array_index_hg = spark.sql("select turnloss_hg_ind.elements from pulled_data") \
.toPandas()
except AnalysisException:
array_index_hg = None
# Process lg Data #
try:
array_dataset_lg = spark.sql("select turnloss_lg_com.elements from pulled_data") \
.toPandas()
array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x))
data['turnloss_lg_intime'] = array_dataset_lg['Total'].values.tolist()
except AnalysisException:
array_dataset_lg = None
data['turnloss_lg_intime'] = []
try:
array_index_lg = spark.sql("select turnloss_lg_ind.elements from pulled_data") \
.toPandas()
except AnalysisException:
array_index_lg = None
# Metadata #
if array_dataset_hg is not None:
data['rows'] = array_dataset_hg.shape[0]
elif array_dataset_lg is not None:
data['rows'] = array_dataset_lg.shape[0]
else:
data['rows'] = 0
data['startTime'] = _start_time
data['endTime'] = _end_time
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
try:
data['zsthr_hg'] = spark.sql("select zsthr_hg from pulled_data limit 1").toPandas().values.tolist()
if len(data['zsthr_hg']) > 0:
data['zsthr_hg'] = data['zsthr_hg'][0]
except AnalysisException:
data['zsthr_hg'] = 0
try:
data['zsthr_lg'] = spark.sql("select zsthr_lg from pulled_data limit 1").toPandas().values.tolist()
if len(data['zsthr_lg']) > 0:
data['zsthr_lg'] = data['zsthr_lg'][0]
except AnalysisException:
data['zsthr_lg'] = 0
try:
data['baselinesub'] = spark.sql("select baselinesub from pulled_data limit 1").toPandas().values.tolist()
if len(data['baselinesub']) > 0:
data['baselinesub'] = data['baselinesub'][0]
except AnalysisException:
data['baselinesub'] = 0
try:
data['nturns'] = spark.sql("select nturns from pulled_data limit 1").toPandas().values.tolist()
if len(data['nturns']) > 0:
data['nturns'] = data['nturns'][0]
except AnalysisException:
data['nturns'] = 0
# Timestamp #
try:
time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \
.toPandas()
data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist()
except AnalysisException:
data['timestamps'] = []
# Turnloss Data
if array_dataset_hg is None or array_index_hg is None:
data['turnloss_hg'] = []
else:
data['turnloss_hg'] = uncompress_pandas(array_dataset_hg.values,
array_index_hg.values,
data['nturns'], data['rows'])
if array_dataset_lg is None or array_index_lg is None:
data['turnloss_lg'] = []
else:
data['turnloss_lg'] = uncompress_pandas(array_dataset_lg.values,
array_index_lg.values,
data['nturns'], data['rows'])
# Writing to db
requests.put(API_HOST + 'turnloss',
params={
'id': _data_id
},
json=data)
write_file(_device, _start_time, _end_time, _property, data, _files_write)
def property_pull(_fill_name, _mode, _device, _start, _end, _files_write=False, _db_write=True):
ok, data_id = check_db(_fill_name, _mode, _device, _start, _end)
if not ok or data_id is None or data_id == -1:
return -1
print('Pulling data for fill '
+ str(_fill_name) + ' for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end)
pull_histogram_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id)
pull_integral_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id)
pull_raw_dist_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id)
pull_integral_dist_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id)
pull_turnloss_pandas(_fill_name, _mode, _device, _start, _end, _files_write, _db_write, data_id)
print('Pulled done, data pushed on DB for fill '
+ str(_fill_name) + ' for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end)
if __name__ == '__main__':
_mode = "STABLE"
_start = "2018-10-22 21:18:00.000"
_end = "2018-10-23 11:18:00.000"
_device = "BLMED.06L7.B1B30_TCPA.A6L7.B1"
print(" -- HELP -- \n"
" -- TODO -- \n"
"-args- \n"
" <fillName> : the name of the fill you are pulling, a fill is a complete experience. "
"In one fill there is multiples device, modes, time measurements ... "
"So you'll need to do multiples pulling for the same fill \n"
" <device> : the name of the device you want to pull. If you want to pull multiples device at once,"
" enter devices in quotes, separated by spaces \n"
" ex: '<device1> <device2> <device3> ...' \n\n"
" <mode> : the name of the mode you want to pull. \n"
" ex: STABLE, ALL, START, BEAMDUMP ... \n\n"
" <start> : the start time of the pull you want to do. \n"
" format: 'yyyy-mm-dd hh:mm:ss.ms' or 'yyyy-mm-dd_hh:mm:ss.ms'")
elif len(sys.argv) < 6:
print("usage: python cern_nxcals_dblm_property_pull.py <fillName> <device> <mode> <start> <end> [-f] \n"
"where : \n"
" -f also generate .dat files \n"
"type 'python cern_nxcals_dblm_property_pull.py -h' for more infos")
_fill_name = sys.argv[1]
_device = sys.argv[2]
_mode = sys.argv[3]
_start = sys.argv[4]
_end = sys.argv[5]
if len(sys.argv) > 6:
_files = sys.argv[6] == "-f"
property_pull(_fill_name, _mode, _device, _start, _end)