From b57020bf8e27926b66ab2c4252c7d4363d97330f Mon Sep 17 00:00:00 2001
From: Quentin Codelupi <quentin.codelupi@cern.ch>
Date: Tue, 30 Jul 2019 15:48:05 +0200
Subject: [PATCH] [add] check data before start pulling

---
 property_pull.py | 471 +++--------------------------------------------
 1 file changed, 30 insertions(+), 441 deletions(-)

diff --git a/property_pull.py b/property_pull.py
index 9c99533..d82725c 100644
--- a/property_pull.py
+++ b/property_pull.py
@@ -17,6 +17,7 @@ import pandas as pd
 
 _dest_dir = './pulledData'
 _files = False
+_db_write = True
 
 # mysqlDB variables
 mydb = mysql.connector.connect(
@@ -44,7 +45,6 @@ sums_cols = udf(lambda arr: 0 if arr == [] else sum(arr), IntegerType())
 
 ''' 
 #TODO - 
-# 	Check compression ? Done => MB optimisation using numpy 
 # 	Reduce ( UDAF ? )
 # 	Scala UDF ? (working with python's lambda for now ) 
 '''
@@ -61,20 +61,6 @@ def write_file(_device, _start_time, _end_time, _property, data):
               + _end_time.replace(' ', '_') + '_' + _property + '.dat' + " Pulled")
 
 
-def uncompress(_data_array, _index_array, _size, _nrows):
-    print('uncompress start')
-    result = np.zeros(_size, dtype=np.int)
-    for i in range(_nrows):
-        tmpresult = np.zeros(_size, dtype=np.int)
-        ind = 0
-        for index in _index_array[i]['elements']:
-            tmpresult[index] = _data_array[i]['elements'][ind]
-            ind += 1
-        result = np.add(result, tmpresult)
-    print("uncompress finished")
-    return result.tolist()
-
-
 def uncompress_pandas(_data_array, _index_array, _size, _nrows):
     print('uncompress start')
     result = np.zeros(_size, dtype=np.int64)
@@ -158,8 +144,9 @@ def pull_histogram_pandas(_fill_name, _mode, _device, _start_time, _end_time):
                                              57802, data['rows'] - 1)
 
     # Writing to db
-    db_cursor = mydb.cursor()
-    db_cursor.execute('UPDATE data SET histogram = %s where id = %s', (json.dumps(data), data_id,))
+    if _db_write:
+        db_cursor = mydb.cursor()
+        db_cursor.execute('UPDATE data SET histogram = %s where id = %s', (json.dumps(data), data_id,))
 
     write_file(_device, _start_time, _end_time, _property, data)
 
@@ -240,8 +227,9 @@ def pull_integral_pandas(_fill_name, _mode, _device, _start_time, _end_time):
     data['integral_lg'] = integral_lg.tolist()
 
     # Writing to db
-    db_cursor = mydb.cursor()
-    db_cursor.execute('UPDATE data SET integral = %s where id = %s', (json.dumps(data), data_id,))
+    if _db_write:
+        db_cursor = mydb.cursor()
+        db_cursor.execute('UPDATE data SET integral = %s where id = %s', (json.dumps(data), data_id,))
 
     write_file(_device, _start_time, _end_time, _property, data)
 
@@ -311,8 +299,9 @@ def pull_raw_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time):
     data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist()
 
     # Writing to db
-    db_cursor = mydb.cursor()
-    db_cursor.execute('UPDATE data SET raw_dist = %s where id = %s', (json.dumps(data), data_id,))
+    if _db_write:
+        db_cursor = mydb.cursor()
+        db_cursor.execute('UPDATE data SET raw_dist = %s where id = %s', (json.dumps(data), data_id,))
 
     write_file(_device, _start_time, _end_time, _property, data)
 
@@ -386,8 +375,9 @@ def pull_integral_dist_pandas(_fill_name, _mode, _device, _start_time, _end_time
     data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist()
 
     # Writing to db
-    db_cursor = mydb.cursor()
-    db_cursor.execute('UPDATE data SET integral_dist = %s where id = %s', (json.dumps(data), data_id,))
+    if _db_write:
+        db_cursor = mydb.cursor()
+        db_cursor.execute('UPDATE data SET integral_dist = %s where id = %s', (json.dumps(data), data_id,))
 
     write_file(_device, _start_time, _end_time, _property, data)
 
@@ -467,430 +457,27 @@ def pull_turnloss_pandas(_fill_name, _mode, _device, _start_time, _end_time):
                                             data['nturns'], data['rows'])
 
     # Writing to db
-    db_cursor = mydb.cursor()
-    db_cursor.execute('UPDATE data SET turnloss = %s where id = %s', (json.dumps(data), data_id,))
-
-    write_file(_device, _start_time, _end_time, _property, data)
-
-
-def pull_histogram(_start_time, _end_time, _device):
-    _property = 'LoggingHistogram'
-
-    data = {}
-
-    df = KeyValuesQuery.builder(spark).system("CMW") \
-        .startTime(_start_time) \
-        .endTime(_end_time) \
-        .entity() \
-        .keyValue("device", _device) \
-        .keyValue("property", _property) \
-        .buildDataset()
-
-    df.printSchema()
-
-    # df_columns = df.columns
-
-    # Process hg array data
-    array_dataset_hg = df.select("histogram_hg_com.elements")
-    array_dataset_hg.printSchema()
-
-    array_index_hg = df.select("histogram_hg_ind.elements")
-
-    intime_array_hg = array_dataset_hg.withColumn('Total', sums_cols(col('elements'))).select('Total')
-    intime_array_hg.printSchema()
-
-    # Process lg array data
-    array_dataset_lg = df.select("histogram_lg_com.elements")
-    array_dataset_lg.printSchema()
-
-    array_index_lg = df.select("histogram_lg_ind.elements")
-
-    intime_array_lg = array_dataset_lg.withColumn('Total', sums_cols(col('elements'))).select('Total')
-    intime_array_lg.printSchema()
-
-    # Datas #
-    data['rows'] = array_dataset_hg.count()
-    data['startTime'] = _start_time
-    data['endTime'] = _end_time
-
-    data['threshold_hg'] = df.select('threshold_hg').collect()
-    if len(data['threshold_hg']) > 0:
-        data['threshold_hg'] = data['threshold_hg'][0]
-
-    data['threshold_lg'] = df.select('threshold_lg').collect()
-    if len(data['threshold_lg']) > 0:
-        data['threshold_lg'] = data['threshold_lg'][0]
-
-    # Timestamp #
-    time_stamp = df.select("__record_timestamp__")
-    result_time_stamp = time_stamp.collect()
-    data['timestamps'] = []
-    for TS in result_time_stamp:
-        data['timestamps'].append(TS["__record_timestamp__"])
-
-    # Intime Data #
-    result_intime_hg = intime_array_hg.collect()
-    data['histogram_hg_intime'] = []
-    for IT in result_intime_hg:
-        data['histogram_hg_intime'].append(IT["Total"])
-
-    result_intime_lg = intime_array_lg.collect()
-    data['histogram_lg_intime'] = []
-    for IT in result_intime_lg:
-        data['histogram_lg_intime'].append(IT['Total'])
-
-    # Histogram data #
-    result_array_dataset_hg = array_dataset_hg.collect()
-    result_array_index_hg = array_index_hg.collect()
-    data['histogram_hg'] = uncompress(result_array_dataset_hg, result_array_index_hg, 57802, data['rows'])
-
-    result_array_dataset_lg = array_dataset_lg.collect()
-    result_array_index_lg = array_index_lg.collect()
-    data['histogram_lg'] = uncompress(result_array_dataset_lg, result_array_index_lg, 57802, data['rows'])
-
-    # Writing to db
-    db_cursor = mydb.cursor()
-    db_cursor.execute('UPDATE data SET histogram = %s where id = %s', (json.dumps(data), data_id,))
-
-    write_file(_device, _start_time, _end_time, _property, data)
-
-
-def pull_integral(_start_time, _end_time, _device):
-    _property = "LoggingIntegral"
-
-    data = {}
-
-    df = KeyValuesQuery.builder(spark).system("CMW") \
-        .startTime(_start_time) \
-        .endTime(_end_time) \
-        .entity() \
-        .keyValue("device", _device) \
-        .keyValue("property", _property) \
-        .buildDataset()
-
-    df.printSchema()
-    data['rows'] = df.count()
-
-    # Process hg Data #
-    array_dataset_hg = df.select("integral_hg.elements").limit(data['rows'] - 1)
-    array_dataset_hg.printSchema()
-
-    intime_array_hg = array_dataset_hg.withColumn('Total', sums_cols(col('elements'))).select('Total')
-    intime_array_hg.printSchema()
-
-    # Process lg Data #
-    array_dataset_lg = df.select("integral_lg.elements").limit(data['rows'] - 1)
-    array_dataset_lg.printSchema()
-
-    intime_array_lg = array_dataset_lg.withColumn('Total', sums_cols(col('elements'))).select('Total')
-    intime_array_lg.printSchema()
-
-    # DATA #
-    data['startTime'] = _start_time
-    data['endTime'] = _end_time
-
-    data['zsthr_hg'] = df.select('zsthr_hg').collect()
-    if len(data['zsthr_hg']) > 0:
-        data['zsthr_hg'] = data['zsthr_hg'][0]
-
-    data['zsthrave_hg'] = df.select('zsthrave_hg').collect()
-    if len(data['zsthrave_hg']) > 0:
-        data['zsthrave_hg'] = data['zsthrave_hg'][0]
-
-    data['zsthr_lg'] = df.select('zsthr_lg').collect()
-    if len(data['zsthr_lg']) > 0:
-        data['zsthr_lg'] = data['zsthr_lg'][0]
-
-    data['zsthrave_lg'] = df.select('zsthrave_lg').collect()
-    if len(data['zsthrave_lg']) > 0:
-        data['zsthrave_lg'] = data['zsthrave_lg'][0]
-
-    data['baselinesub'] = df.select('baselinesub').collect()
-    if len(data['baselinesub']) > 0:
-        data['baselinesub'] = data['baselinesub'][0]
-
-    # Intime Data #
-    result_intime_hg = intime_array_hg.collect()
-    data['integral_hg_intime'] = []
-    for IT in result_intime_hg:
-        data['integral_hg_intime'].append(IT["Total"])
-
-    result_intime_lg = intime_array_lg.collect()
-    data['integral_lg_intime'] = []
-    for IT in result_intime_lg:
-        data['integral_lg_intime'].append(IT['Total'])
-
-    # Timestamp #
-    time_stamp = df.select("__record_timestamp__")
-    result_time_stamp = time_stamp.collect()
-    data['timestamps'] = []
-    for TS in result_time_stamp:
-        data['timestamps'].append(TS["__record_timestamp__"])
-
-    # Integral Data
-    integral_hg = np.zeros(3564, dtype=np.int)
-    result_array_integral_hg = array_dataset_hg.collect()
-    for T in result_array_integral_hg:
-        if len(T) == 3564:
-            integral_hg = np.add(integral_hg, T['elements'])
-    data['integral_hg'] = integral_hg.tolist()
-
-    integral_lg = np.zeros(3564, dtype=np.int)
-    result_array_integral_lg = array_dataset_lg.collect()
-    for T in result_array_integral_lg:
-        if len(T) == 3564:
-            integral_lg = np.add(integral_lg, T['elements'])
-    data['integral_lg'] = integral_lg.tolist()
-
-    # Writing to db
-    db_cursor = mydb.cursor()
-    db_cursor.execute('UPDATE data SET integral = %s where id = %s', (json.dumps(data), data_id,))
-
-    write_file(_device, _start_time, _end_time, _property, data)
-
-
-def pull_raw_dist(_start_time, _end_time, _device):
-    _property = "LoggingRawDist"
-
-    data = {}
-
-    df = KeyValuesQuery.builder(spark).system("CMW") \
-        .startTime(_start_time) \
-        .endTime(_end_time) \
-        .entity() \
-        .keyValue("device", _device) \
-        .keyValue("property", _property) \
-        .buildDataset()
-
-    df.printSchema()
-    data['rows'] = df.count()
-
-    # Process hg Data #
-    array_dataset_hg = df.select("distribution_hg.elements").limit(data['rows'] - 1)
-    array_dataset_hg.printSchema()
-
-    # Process lg Data #
-    array_dataset_lg = df.select("distribution_lg.elements").limit(data['rows'] - 1)
-    array_dataset_lg.printSchema()
-
-    # DATA #
-    data['rows'] = df.count()
-    data['startTime'] = _start_time
-    data['endTime'] = _end_time
-
-    data['lsbcut_hg'] = df.select('lsbcut_hg').collect()
-    if len(data['lsbcut_hg']) > 0:
-        data['lsbcut_hg'] = data['lsbcut_hg'][0]
-
-    data['offset_hg'] = df.select('offset_hg').collect()
-    if len(data['offset_hg']) > 0:
-        data['offset_hg'] = data['offset_hg'][0]
-
-    data['lsbcut_lg'] = df.select('lsbcut_lg').collect()
-    if len(data['lsbcut_lg']) > 0:
-        data['lsbcut_lg'] = data['lsbcut_lg'][0]
-
-    data['offset_lg'] = df.select('offset_lg').collect()
-    if len(data['offset_lg']) > 0:
-        data['offset_lg'] = data['offset_lg'][0]
-
-    # Array Data #
-    result_hg = array_dataset_hg.collect()
-    result_distrib_hg = np.zeros(2048, dtype=np.int)
-    for R in result_hg:
-        result_distrib_hg = np.add(result_distrib_hg, R['elements'])
-    data['distribution_hg'] = result_distrib_hg.tolist()
-
-    result_lg = array_dataset_hg.collect()
-    result_distrib_lg = np.zeros(2048, dtype=np.int)
-    for R in result_lg:
-        result_distrib_lg = np.add(result_distrib_lg, R['elements'])
-    data['distribution_lg'] = result_distrib_lg.tolist()
-
-    # Timestamp #
-    time_stamp = df.select("__record_timestamp__")
-    result_time_stamp = time_stamp.collect()
-    data['timestamps'] = []
-    for TS in result_time_stamp:
-        data['timestamps'].append(TS["__record_timestamp__"])
-
-    # Writing to db
-    db_cursor = mydb.cursor()
-    db_cursor.execute('UPDATE data SET raw_dist = %s where id = %s', (json.dumps(data), data_id,))
-
-    write_file(_device, _start_time, _end_time, _property, data)
-
-
-def pull_integral_dist(_start_time, _end_time, _device):
-    _property = "LoggingIntegralDist"
-
-    data = {}
-
-    df = KeyValuesQuery.builder(spark).system("CMW") \
-        .startTime(_start_time) \
-        .endTime(_end_time) \
-        .entity() \
-        .keyValue("device", _device) \
-        .keyValue("property", _property) \
-        .buildDataset()
-
-    df.printSchema()
-    data['rows'] = df.count()
-
-    # Process hg Data #
-    array_dataset_hg = df.select("distribution_hg.elements").limit(data['rows'] - 1)
-    array_dataset_hg.printSchema()
-
-    # Process lg Data #
-    array_dataset_lg = df.select("distribution_lg.elements").limit(data['rows'] - 1)
-    array_dataset_lg.printSchema()
-
-    # DATA #
-    data['startTime'] = _start_time
-    data['endTime'] = _end_time
-
-    data['lsbcut_hg'] = df.select('lsbcut_hg').collect()
-    if len(data['lsbcut_hg']) > 0:
-        data['lsbcut_hg'] = data['lsbcut_hg'][0]
-
-    data['offset_hg'] = df.select('offset_hg').collect()
-    if len(data['offset_hg']) > 0:
-        data['offset_hg'] = data['offset_hg'][0]
-
-    data['lsbcut_lg'] = df.select('lsbcut_lg').collect()
-    if len(data['lsbcut_lg']) > 0:
-        data['lsbcut_lg'] = data['lsbcut_lg'][0]
-
-    data['offset_lg'] = df.select('offset_lg').collect()
-    if len(data['offset_lg']) > 0:
-        data['offset_lg'] = data['offset_lg'][0]
-
-    data['nturns'] = df.select('nturns').collect()
-    if len(data['nturns']) > 0:
-        data['nturns'] = data['nturns'][0]
-
-    # Array Data #
-    result_hg = array_dataset_hg.collect()
-    result_distrib_hg = np.zeros(2048, dtype=np.int)
-    for R in result_hg:
-        result_distrib_hg = np.add(result_distrib_hg, R['elements'])
-    data['distribution_hg'] = result_distrib_hg.tolist()
-
-    result_lg = array_dataset_lg.collect()
-    result_distrib_lg = np.zeros(2048, dtype=np.int)
-    for R in result_lg:
-        if len(R) == 2048:
-            result_distrib_lg = np.add(result_distrib_lg, R['elements'])
-    data['distribution_lg'] = result_distrib_lg.tolist()
-
-    # Timestamp #
-    time_stamp = df.select("__record_timestamp__")
-    result_time_stamp = time_stamp.collect()
-    data['timestamps'] = []
-    for TS in result_time_stamp:
-        data['timestamps'].append(TS["__record_timestamp__"])
-
-    # Writing to db
-    db_cursor = mydb.cursor()
-    db_cursor.execute('UPDATE data SET integral_dist = %s where id = %s', (json.dumps(data), data_id,))
+    if _db_write:
+        db_cursor = mydb.cursor()
+        db_cursor.execute('UPDATE data SET turnloss = %s where id = %s', (json.dumps(data), data_id,))
 
     write_file(_device, _start_time, _end_time, _property, data)
 
 
-def pull_turnloss(_start_time, _end_time, _device):
-    _property = "LoggingTurnLoss"
-
-    data = {}
-
-    df = KeyValuesQuery.builder(spark).system("CMW") \
-        .startTime(_start_time) \
-        .endTime(_end_time) \
-        .entity() \
-        .keyValue("device", _device) \
-        .keyValue("property", _property) \
-        .buildDataset()
-
-    df.printSchema()
-    data['rows'] = df.count()
-    print(data['rows'])
-
-    # Process hg Data #
-    array_dataset_hg = df.select("turnloss_hg_com.elements")
-    array_dataset_hg.printSchema()
-
-    array_dataset_hg_com1 = df.select("turnloss_hg_com.elements").limit(int(data['rows'] / 4))
-    # array_dataset_hg_com2 = df.select("turnloss_hg_com.elements").limit(int(data['rows'] / 2))
-    array_index_hg = df.select("turnloss_hg_ind.elements")
-
-    intime_array_hg = array_dataset_hg.withColumn('Total', sums_cols(col('elements'))).select('Total')
-    intime_array_hg.printSchema()
-
-    # Process lg Data #
-    array_dataset_lg = df.select("turnloss_lg_com.elements")
-    array_dataset_lg.printSchema()
-
-    array_index_lg = df.select("turnloss_lg_ind.elements")
-
-    intime_array_lg = array_dataset_lg.withColumn('Total', sums_cols(col('elements'))).select('Total')
-    intime_array_lg.printSchema()
-
-    # DATA #
-    data['startTime'] = _start_time
-    data['endTime'] = _end_time
-    data['nturns'] = df.select('nturns').collect()
-    if len(data['nturns']) > 0:
-        data['nturns'] = data['nturns'][0]
-
-    data['zsthr_hg'] = df.select('zsthr_hg').collect()
-    if len(data['zsthr_hg']) > 0:
-        data['zsthr_hg'] = data['zsthr_hg'][0]
+def property_pull(_fill_name, _mode, _device, _start, _end, _files_input=False, _db_write_input=True):
+    global _files
+    _files = _files_input
 
-    data['zsthr_lg'] = df.select('zsthr_lg').collect()
-    if len(data['zsthr_lg']) > 0:
-        data['zsthr_lg'] = data['zsthr_lg'][0]
+    global _db_write
+    _db_write = _db_write_input
 
-    data['baselinesub'] = df.select('baselinesub').collect()
-    if len(data['baselinesub']) > 0:
-        data['baselinesub'] = data['baselinesub'][0]
-
-    # Timestamp #
-    time_stamp = df.select("__record_timestamp__")
-    result_time_stamp = time_stamp.collect()
-    data['timestamps'] = []
-    for TS in result_time_stamp:
-        data['timestamps'].append(TS["__record_timestamp__"])
-
-    # Intime Data #
-    result_hg = intime_array_hg.collect()
-    data['turnloss_hg_intime'] = []
-    for IT in result_hg:
-        data['turnloss_hg_intime'].append(IT['Total'])
-
-    result_lg = intime_array_lg.collect()
-    data['turnloss_lg_intime'] = []
-    for IT in result_lg:
-        data['turnloss_lg_intime'].append(IT['Total'])
-
-    # Turnloss Data
-    result_array_dataset_hg = array_dataset_hg_com1.collect()
-    result_array_index_hg = array_index_hg.collect()
-    data['turnloss_hg'] = uncompress(result_array_dataset_hg, result_array_index_hg, data['nturns'], data['rows'])
-
-    '''
-    result_array_dataset_lg = array_dataset_lg.collect()
-    result_array_index_lg = array_index_lg.collect()
-    data['turnloss_lg'] = uncompress(result_array_dataset_lg, result_array_index_lg, 57802, data['rows'])
-    '''
-    # Writing to db
-    db_cursor = mydb.cursor()
-    db_cursor.execute('UPDATE data SET turnloss = %s where id = %s', (json.dumps(data), data_id,))
-
-    write_file(_device, _start_time, _end_time, _property, data)
-
-
-def property_pull(_fill_name, _mode, _device, _start, _end):
     gcursor = mydb.cursor()
 
+    gcursor.execute('select count(*) from mode where name=%s', (_mode, ))
+    mode_exists = gcursor.fetchone()[0]
+    if mode_exists == 0:
+        print('Mode ' + _mode + ' ')
+
     gcursor.execute('SELECT id FROM device where name=%s limit 1', (_device,))
     device_id = gcursor.fetchone()[0]
 
@@ -903,7 +490,8 @@ def property_pull(_fill_name, _mode, _device, _start, _end):
         print('Data for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end + ' already pulled')
         return -1
 
-    print('Pulling data for fill ' + str(_fill_name) + ' for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end)
+    print('Pulling data for fill '
+          + str(_fill_name) + ' for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end)
     global data_id
 
     gcursor.execute("insert into nxcals_blm_data.data(histogram, integral, integral_dist, raw_dist, turnloss) "
@@ -922,7 +510,8 @@ def property_pull(_fill_name, _mode, _device, _start, _end):
     pull_turnloss_pandas(_fill_name, _mode, _device, _start, _end)
 
     mydb.commit()
-    print('Pulled done, data pushed on DB')
+    print('Pulled done, data pushed on DB for fill'
+          + str(_fill_name) + ' for device ' + _device + ' mode ' + _mode + ' from ' + _start + ' to ' + _end)
 
 
 if __name__ == '__main__':
-- 
GitLab