Skip to content
Snippets Groups Projects
Commit b68dc019 authored by Quentin Codelupi's avatar Quentin Codelupi
Browse files

[add] check for doublons + propery-pull function for packaging

parent 30c1c2a2
No related branches found
No related tags found
No related merge requests found
......@@ -31,14 +31,16 @@ mydb = mysql.connector.connect(
data_id = -1
# Sparks Variable
conf = ''
conf = SparkConf()
conf.setMaster('yarn')
conf.setAppName('property_pull.py')
sc = ''
sqlContext = ''
spark = ''
sc = SparkContext(conf=conf)
sql_context = SQLContext(sc)
spark = SparkSession(sc)
# sumUDF
sums_cols = udf(lambda arr: 0 if arr == [] else __builtins__.sum(arr), IntegerType())
sums_cols = udf(lambda arr: 0 if arr == [] else sum(arr), IntegerType())
'''
#TODO -
......@@ -85,7 +87,7 @@ def uncompress_pandas(_data_array, _index_array, _size, _nrows):
# noinspection SqlResolve
def pull_histogram_pandas(_start_time, _end_time, _device):
def pull_histogram_pandas(_fill_name, _mode, _device, _start_time, _end_time):
_property = 'LoggingHistogram'
data = {}
......@@ -96,7 +98,7 @@ def pull_histogram_pandas(_start_time, _end_time, _device):
.entity()
.keyValue("device", _device)
.keyValue("property", _property)
.buildDataset(), sqlContext)
.buildDataset(), sql_context)
# data['rows'] = df.count()
df.registerTempTable("pulled_data")
......@@ -108,7 +110,7 @@ def pull_histogram_pandas(_start_time, _end_time, _device):
array_index_hg = spark.sql("select histogram_hg_ind.elements from pulled_data") \
.toPandas()
array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x))
array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x))
# Process lg array data
array_dataset_lg = spark.sql("select histogram_lg_com.elements from pulled_data") \
......@@ -117,11 +119,10 @@ def pull_histogram_pandas(_start_time, _end_time, _device):
array_index_lg = spark.sql("select histogram_lg_ind.elements from pulled_data") \
.toPandas()
array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x))
array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x))
# Datas #
data['rows'] = array_dataset_hg.shape[0]
print(data['rows'])
data['startTime'] = _start_time
data['endTime'] = _end_time
......@@ -134,6 +135,10 @@ def pull_histogram_pandas(_start_time, _end_time, _device):
if len(data['threshold_lg']) > 0:
data['threshold_lg'] = data['threshold_lg'][0]
data['nsamples'] = spark.sql("select nsamples from pulled_data limit 1").toPandas().values.tolist()
if len(data['nsamples']) > 0:
data['nsamples'] = data['nsamples'][0]
# Timestamp #
time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \
.toPandas()
......@@ -160,7 +165,7 @@ def pull_histogram_pandas(_start_time, _end_time, _device):
# noinspection SqlResolve
def pull_integral_pandas(_start_time, _end_time, _device):
def pull_integral_pandas(_fill_name, _mode, _device, _start_time, _end_time):
_property = "LoggingIntegral"
data = {}
......@@ -171,7 +176,7 @@ def pull_integral_pandas(_start_time, _end_time, _device):
.entity()
.keyValue("device", _device)
.keyValue("property", _property)
.buildDataset(), sqlContext)
.buildDataset(), sql_context)
df.registerTempTable("pulled_data")
......@@ -179,13 +184,13 @@ def pull_integral_pandas(_start_time, _end_time, _device):
array_dataset_hg = spark.sql("select integral_hg.elements from pulled_data") \
.toPandas()
array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x))
array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x))
# Process lg Data #
array_dataset_lg = spark.sql("select integral_lg.elements from pulled_data") \
.toPandas()
array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x))
array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x))
# DATA #
data['rows'] = array_dataset_hg.shape[0]
......@@ -242,7 +247,7 @@ def pull_integral_pandas(_start_time, _end_time, _device):
# noinspection SqlResolve
def pull_raw_dist_pandas(_start_time, _end_time, _device):
def pull_raw_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time):
_property = "LoggingRawDist"
data = {}
......@@ -253,7 +258,7 @@ def pull_raw_dist_pandas(_start_time, _end_time, _device):
.entity()
.keyValue("device", _device)
.keyValue("property", _property)
.buildDataset(), sqlContext)
.buildDataset(), sql_context)
# data['rows'] = df.count()
df.registerTempTable("pulled_data")
......@@ -313,7 +318,7 @@ def pull_raw_dist_pandas(_start_time, _end_time, _device):
# noinspection SqlResolve
def pull_integral_dist_pandas(_start_time, _end_time, _device):
def pull_integral_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time):
_property = "LoggingIntegralDist"
data = {}
......@@ -324,7 +329,7 @@ def pull_integral_dist_pandas(_start_time, _end_time, _device):
.entity()
.keyValue("device", _device)
.keyValue("property", _property)
.buildDataset(), sqlContext)
.buildDataset(), sql_context)
# data['rows'] = df.count()
df.registerTempTable("pulled_data")
......@@ -388,7 +393,7 @@ def pull_integral_dist_pandas(_start_time, _end_time, _device):
# noinspection SqlResolve
def pull_turnloss_pandas(_start_time, _end_time, _device):
def pull_turnloss_pandas(_fill_name, _mode, _device, _start_time, _end_time):
_property = "LoggingTurnLoss"
data = {}
......@@ -399,7 +404,7 @@ def pull_turnloss_pandas(_start_time, _end_time, _device):
.entity()
.keyValue("device", _device)
.keyValue("property", _property)
.buildDataset(), sqlContext)
.buildDataset(), sql_context)
# data['rows'] = df.count()
df.registerTempTable("pulled_data")
......@@ -411,7 +416,7 @@ def pull_turnloss_pandas(_start_time, _end_time, _device):
array_index_hg = spark.sql("select turnloss_hg_ind.elements from pulled_data") \
.toPandas()
array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x))
array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x))
# Process lg Data #
array_dataset_lg = spark.sql("select turnloss_lg_com.elements from pulled_data") \
......@@ -420,7 +425,7 @@ def pull_turnloss_pandas(_start_time, _end_time, _device):
array_index_lg = spark.sql("select turnloss_lg_ind.elements from pulled_data") \
.toPandas()
array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x))
array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else sum(x))
# DATA #
data['rows'] = array_dataset_hg.shape[0]
......@@ -883,9 +888,44 @@ def pull_turnloss(_start_time, _end_time, _device):
write_file(_device, _start_time, _end_time, _property, data)
if __name__ == '__main__':
def property_pull(_fill_name, _mode, _device, _start, _end):
gcursor = mydb.cursor()
gcursor.execute('SELECT id FROM device where name=%s limit 1', (_device,))
device_id = gcursor.fetchone()[0]
request = "select count(*) from fill where name=" + str(_fill_name) + " and device_id=" + str(device_id) +\
" and mode='" + _mode + "' and start='" + _start + "' and end='" + _end + "';"
gcursor.execute(request)
already_done = gcursor.fetchone()[0]
if already_done != 0:
print('Data for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end + ' already pulled')
return -1
print('Pulling data for fill ' + str(_fill_name) + ' for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end)
global data_id
gcursor.execute("insert into nxcals_blm_data.data(histogram, integral, integral_dist, raw_dist, turnloss) "
"VALUES ( '{}', '{}', '{}', '{}', '{}');")
data_id = gcursor.lastrowid
# print(data_id)
gcursor.execute('INSERT INTO fill(name, device_id, mode, data_id, start, end)'
'VALUES (%s, %s, %s, %s, %s, %s )',
(_fill_name, device_id, _mode, data_id, _start, _end,))
pull_histogram_pandas(_fill_name, _mode, _device, _start, _end)
pull_integral_pandas(_fill_name, _mode, _device, _start, _end)
pull_raw_dist_pandas(_fill_name, _mode, _device, _start, _end)
pull_integral_dist_pandas(_fill_name, _mode, _device, _start, _end)
pull_turnloss_pandas(_fill_name, _mode, _device, _start, _end)
mydb.commit()
print('Pulled done, data pushed on DB')
_pandas = True
if __name__ == '__main__':
_fill_name = "7340"
_mode = "STABLE"
......@@ -910,10 +950,10 @@ if __name__ == '__main__':
" format: 'yyyy-mm-dd hh:mm:ss.ms' or 'yyyy-mm-dd_hh:mm:ss.ms'")
sys.exit(0)
elif len(sys.argv) < 6:
print("usage: python property-pull.py <fillName> <device> <mode> <start> <end> [-f] \n"
print("usage: python property_pull.py <fillName> <device> <mode> <start> <end> [-f] \n"
"where : \n"
" -f also generate .dat files \n"
"type 'python property-pull.py -h' for more infos")
"type 'python property_pull.py -h' for more infos")
sys.exit(1)
else:
_fill_name = sys.argv[1]
......@@ -924,46 +964,4 @@ if __name__ == '__main__':
if len(sys.argv) > 6:
_files = sys.argv[6] == "-f"
print(_files)
print('Pulling data for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end)
gcursor = mydb.cursor()
gcursor.execute('SELECT id FROM device where name=%s', (_device,))
device_id = gcursor.fetchone()[0]
# print(device_id)
gcursor.execute("insert into nxcals_blm_data.data(histogram, integral, integral_dist, raw_dist, turnloss) "
"VALUES ( '{}', '{}', '{}', '{}', '{}');")
data_id = gcursor.lastrowid
# print(data_id)
gcursor.execute('INSERT INTO fill(name, device_id, mode, data_id, start, end)'
'VALUES (%s, %s, %s, %s, %s, %s )',
(_fill_name, device_id, _mode, data_id, _start, _end,))
conf = SparkConf()
conf.setMaster('yarn')
conf.setAppName('property-pull.py')
conf.set('driver.memory', '20g')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
spark = SparkSession(sc)
if _pandas:
pull_histogram_pandas(_start, _end, _device)
pull_integral_pandas(_start, _end, _device)
pull_raw_dist_pandas(_start, _end, _device)
pull_integral_dist_pandas(_start, _end, _device)
pull_turnloss_pandas(_start, _end, _device)
else:
pull_histogram(_start, _end, _device)
pull_integral(_start, _end, _device)
pull_raw_dist(_start, _end, _device)
pull_integral_dist(_start, _end, _device)
# pull_turnloss(_start, _end, _device)
mydb.commit()
print('Pulled done, data pushed on DB')
property_pull(_fill_name, _mode, _device, _start, _end)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment