From b1332fb87be2eecdcdee97c72afe23e0dcce3e6d Mon Sep 17 00:00:00 2001
From: Quentin Codelupi <quentin.codelupi@cern.ch>
Date: Thu, 25 Jul 2019 23:00:15 +0200
Subject: [PATCH] [add] same thing with pandas for trying efficienty difference

---
 .idea/inspectionProfiles/Project_Default.xml |   7 +
 property-pull.py                             | 404 +++++++++++++++++--
 2 files changed, 383 insertions(+), 28 deletions(-)

diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml
index c2d4450..ab875d7 100644
--- a/.idea/inspectionProfiles/Project_Default.xml
+++ b/.idea/inspectionProfiles/Project_Default.xml
@@ -6,5 +6,12 @@
         <language minSize="146" name="Python" />
       </Languages>
     </inspection_tool>
+    <inspection_tool class="PyUnresolvedReferencesInspection" enabled="true" level="WARNING" enabled_by_default="true">
+      <option name="ignoredIdentifiers">
+        <list>
+          <option value="numpy.*" />
+        </list>
+      </option>
+    </inspection_tool>
   </profile>
 </component>
\ No newline at end of file
diff --git a/property-pull.py b/property-pull.py
index dc3714b..4c90716 100644
--- a/property-pull.py
+++ b/property-pull.py
@@ -13,6 +13,7 @@ from pyspark.sql.functions import udf, col
 from pyspark.sql.types import IntegerType
 
 import numpy as np
+import pandas as pd
 
 _dest_dir = '.'
 _files = False
@@ -48,7 +49,14 @@ sums_cols = udf(lambda arr: 0 if arr == [] else __builtins__.sum(arr), IntegerTy
 
 
 def write_file(_device, _start_time, _end_time, _property, data):
-    write_file(_device, _start_time, _end_time, _property, data)
+    if _files:
+        # Writing output #
+        with open(_dest_dir + '/' + _device + '_' + _start_time.replace(' ', '_') + 'to'
+                  + _end_time.replace(' ', '_') + '_' + _property + '.dat', 'w') as outfile:
+            json.dump(data, outfile)
+
+        print(_dest_dir + '/' + _device + '_' + _start_time.replace(' ', '_') + 'to'
+              + _end_time.replace(' ', '_') + '_' + _property + '.dat' + " Pulled")
 
 
 def uncompress(_data_array, _index_array, _size, _nrows):
@@ -65,6 +73,341 @@ def uncompress(_data_array, _index_array, _size, _nrows):
     return result.tolist()
 
 
+def uncompress_pandas(_data_array, _index_array, _size, _nrows):
+    print('uncompress start')
+    result = np.zeros(_size, dtype=np.int)
+    for i in range(_nrows):
+        tmpresult = np.zeros(_size, dtype=np.int)
+        ind = 0
+        for index in _index_array[i]:
+            tmpresult[index] = _data_array[i][ind]
+            ind += 1
+        result = np.add(result, tmpresult)
+    print("uncompress finished")
+    return result.tolist()
+
+
+# noinspection SqlResolve
+def pull_histogram_pandas(_start_time, _end_time, _device):
+    _property = 'LoggingHistogram'
+
+    data = {}
+
+    df = DataFrame(KeyValuesQuery.builder(spark).system("CMW")
+                   .startTime(_start_time)
+                   .endTime(_end_time)
+                   .entity()
+                   .keyValue("device", _device)
+                   .keyValue("property", _property)
+                   .buildDataset(), sqlContext)
+
+    data['rows'] = df.count()
+    df.registerTempTable("pulled_data")
+
+    # Process hg array data
+    array_dataset_hg = spark.sql("select histogram_hg_com.elements from pulled_data limit " + str(data['rows'] - 1)) \
+        .toPandas()
+
+    array_index_hg = spark.sql("select histogram_hg_ind.elements from pulled_data limit " + str(data['rows'] - 1)) \
+        .toPandas()
+
+    array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x))
+
+    # Process lg array data
+    array_dataset_lg = spark.sql("select histogram_lg_com.elements from pulled_data limit " + str(data['rows'] - 1)) \
+        .toPandas()
+
+    array_index_lg = spark.sql("select histogram_lg_ind.elements from pulled_data limit " + str(data['rows'] - 1)) \
+        .toPandas()
+
+    array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x))
+
+    # Datas #
+    data['startTime'] = _start_time
+    data['endTime'] = _end_time
+
+    data['threshold_hg'] = spark.sql("select threshold_hg from pulled_data limit 1").toPandas().values[0].tolist()
+    data['threshold_lg'] = spark.sql("select threshold_lg from pulled_data limit 1").toPandas().values[0].tolist()
+
+    # Timestamp #
+    time_stamp = spark.sql("select __record_timestamp__ from pulled_data limit " + str(data['rows'] - 1)) \
+        .toPandas()
+    data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist()
+
+    # Intime Data #
+    data['histogram_hg_intime'] = array_dataset_hg['Total'].values.tolist()
+    data['histogram_lg_intime'] = array_dataset_lg['Total'].values.tolist()
+
+    # Histogram data #
+    data['histogram_hg'] = uncompress_pandas(array_dataset_hg.values,
+                                             array_index_hg.values,
+                                             57802, data['rows'] - 1)
+
+    data['histogram_lg'] = uncompress_pandas(array_dataset_lg.values,
+                                             array_index_lg.values,
+                                             57802, data['rows'] - 1)
+
+    # Writing to db
+    db_cursor = mydb.cursor()
+    db_cursor.execute('UPDATE data SET histogram = %s where id = %s', (json.dumps(data), data_id,))
+
+    write_file(_device, _start_time, _end_time, _property, data)
+
+
+# noinspection SqlResolve
+def pull_integral_pandas(_start_time, _end_time, _device):
+    _property = "LoggingIntegral"
+
+    data = {}
+
+    df = DataFrame(KeyValuesQuery.builder(spark).system("CMW")
+                   .startTime(_start_time)
+                   .endTime(_end_time)
+                   .entity()
+                   .keyValue("device", _device)
+                   .keyValue("property", _property)
+                   .buildDataset(), sqlContext)
+
+    data['rows'] = df.count()
+    df.registerTempTable("pulled_data")
+
+    # Process hg Data #
+    array_dataset_hg = spark.sql("select integral_hg.elements from pulled_data limit " + str(data['rows'] - 1)) \
+        .toPandas()
+
+    array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x))
+
+    # Process lg Data #
+    array_dataset_lg = spark.sql("select integral_lg.elements from pulled_data limit " + str(data['rows'] - 1)) \
+        .toPandas()
+
+    array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x))
+
+    # DATA #
+    data['startTime'] = _start_time
+    data['endTime'] = _end_time
+
+    data['zsthr_hg'] = spark.sql("select zsthr_hg from pulled_data limit 1").toPandas().values[0].tolist()
+    data['zsthrave_hg'] = spark.sql("select zsthrave_hg from pulled_data limit 1").toPandas().values[0].tolist()
+
+    data['zsthr_lg'] = spark.sql("select zsthr_lg from pulled_data limit 1").toPandas().values[0].tolist()
+    data['zsthrave_lg'] = spark.sql("select zsthrave_lg from pulled_data limit 1").toPandas().values[0].tolist()
+
+    data['baselinesub'] = spark.sql("select baselinesub from pulled_data limit 1").toPandas().values[0].tolist()
+
+    # Intime Data #
+    data['integral_hg_intime'] = array_dataset_hg['Total'].values.tolist()
+    data['integral_lg_intime'] = array_dataset_lg['Total'].values.tolist()
+
+    # Timestamp #
+    time_stamp = spark.sql("select __record_timestamp__ from pulled_data limit " + str(data['rows'] - 1)) \
+        .toPandas()
+    data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist()
+
+    # Integral Data
+    integral_hg = np.zeros(3564, dtype=np.int)
+    for T in array_dataset_hg.values:
+        integral_hg = np.add(integral_hg, T)
+    data['integral_hg'] = integral_hg.tolist()
+
+    integral_lg = np.zeros(3564, dtype=np.int)
+    for T in array_dataset_lg.values:
+        integral_lg = np.add(integral_lg, T)
+    data['integral_lg'] = integral_lg.tolist()
+
+    # Writing to db
+    db_cursor = mydb.cursor()
+    db_cursor.execute('UPDATE data SET integral = %s where id = %s', (json.dumps(data), data_id,))
+
+    write_file(_device, _start_time, _end_time, _property, data)
+
+
+# noinspection SqlResolve
+def pull_raw_dist_pandas(_start_time, _end_time, _device):
+    _property = "LoggingRawDist"
+
+    data = {}
+
+    df = DataFrame(KeyValuesQuery.builder(spark).system("CMW")
+                   .startTime(_start_time)
+                   .endTime(_end_time)
+                   .entity()
+                   .keyValue("device", _device)
+                   .keyValue("property", _property)
+                   .buildDataset(), sqlContext)
+
+    data['rows'] = df.count()
+
+    # Process hg Data #
+    array_dataset_hg = spark.sql("select distribution_hg.elements from pulled_data limit " + str(data['rows'] - 1)) \
+        .toPandas()
+
+    # Process lg Data #
+    array_dataset_lg = spark.sql("select distribution_lg.elements from pulled_data limit " + str(data['rows'] - 1)) \
+        .toPandas()
+
+    # DATA #
+    data['startTime'] = _start_time
+    data['endTime'] = _end_time
+
+    data['lsbcut_hg'] = spark.sql("select lsbcut_hg from pulled_data limit 1").toPandas().values[0].tolist()
+    data['offset_hg'] = spark.sql("select offset_hg from pulled_data limit 1").toPandas().values[0].tolist()
+
+    data['lsbcut_lg'] = spark.sql("select lsbcut_lg from pulled_data limit 1").toPandas().values[0].tolist()
+    data['offset_lg'] = spark.sql("select offset_lg from pulled_data limit 1").toPandas().values[0].tolist()
+
+    # Array Data #
+    result_distrib_hg = np.zeros(2048, dtype=np.int)
+    for R in array_dataset_hg.values:
+        result_distrib_hg = np.add(result_distrib_hg, R)
+    data['distribution_hg'] = result_distrib_hg.tolist()
+
+    result_distrib_lg = np.zeros(2048, dtype=np.int)
+    for R in array_dataset_lg.values:
+        result_distrib_lg = np.add(result_distrib_lg, R)
+    data['distribution_lg'] = result_distrib_lg.tolist()
+
+    # Timestamp #
+    time_stamp = spark.sql("select __record_timestamp__ from pulled_data limit " + str(data['rows'] - 1)) \
+        .toPandas()
+    data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist()
+
+    # Writing to db
+    db_cursor = mydb.cursor()
+    db_cursor.execute('UPDATE data SET raw_dist = %s where id = %s', (json.dumps(data), data_id,))
+
+    write_file(_device, _start_time, _end_time, _property, data)
+
+
+# noinspection SqlResolve
+def pull_integral_dist_pandas(_start_time, _end_time, _device):
+    _property = "LoggingIntegralDist"
+
+    data = {}
+
+    df = DataFrame(KeyValuesQuery.builder(spark).system("CMW")
+                   .startTime(_start_time)
+                   .endTime(_end_time)
+                   .entity()
+                   .keyValue("device", _device)
+                   .keyValue("property", _property)
+                   .buildDataset(), sqlContext)
+
+    data['rows'] = df.count()
+
+    # Process hg Data #
+    array_dataset_hg = spark.sql("select distribution_hg.elements from pulled_data limit " + str(data['rows'] - 1)) \
+        .toPandas()
+
+    # Process lg Data #
+    array_dataset_lg = spark.sql("select distribution_lg.elements from pulled_data limit " + str(data['rows'] - 1)) \
+        .toPandas()
+
+    # DATA #
+    data['startTime'] = _start_time
+    data['endTime'] = _end_time
+
+    data['lsbcut_hg'] = spark.sql("select lsbcut_hg from pulled_data limit 1").toPandas().values[0].tolist()
+    data['offset_hg'] = spark.sql("select offset_hg from pulled_data limit 1").toPandas().values[0].tolist()
+
+    data['lsbcut_lg'] = spark.sql("select lsbcut_lg from pulled_data limit 1").toPandas().values[0].tolist()
+    data['offset_lg'] = spark.sql("select offset_lg from pulled_data limit 1").toPandas().values[0].tolist()
+
+    data['nturns'] = spark.sql("select nturns from pulled_data limit 1").toPandas().values[0].tolist()
+
+    # Array Data #
+    result_distrib_hg = np.zeros(2048, dtype=np.int)
+    for R in array_dataset_hg.values:
+        result_distrib_hg = np.add(result_distrib_hg, R)
+    data['distribution_hg'] = result_distrib_hg.tolist()
+
+    result_distrib_lg = np.zeros(2048, dtype=np.int)
+    for R in array_dataset_lg.values:
+        if len(R) == 2048:
+            result_distrib_lg = np.add(result_distrib_lg, R)
+    data['distribution_lg'] = result_distrib_lg.tolist()
+
+    # Timestamp #
+    time_stamp = spark.sql("select __record_timestamp__ from pulled_data limit " + str(data['rows'] - 1)) \
+        .toPandas()
+    data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist()
+
+    # Writing to db
+    db_cursor = mydb.cursor()
+    db_cursor.execute('UPDATE data SET integral_dist = %s where id = %s', (json.dumps(data), data_id,))
+
+    write_file(_device, _start_time, _end_time, _property, data)
+
+
+# noinspection SqlResolve
+def pull_turnloss_pandas(_start_time, _end_time, _device):
+    _property = "LoggingTurnLoss"
+
+    data = {}
+
+    df = DataFrame(KeyValuesQuery.builder(spark).system("CMW")
+                   .startTime(_start_time)
+                   .endTime(_end_time)
+                   .entity()
+                   .keyValue("device", _device)
+                   .keyValue("property", _property)
+                   .buildDataset(), sqlContext)
+
+    data['rows'] = df.count()
+
+    # Process hg Data #
+    array_dataset_hg = spark.sql("select turnloss_hg_com.elements from pulled_data limit " + str(data['rows'] - 1)) \
+        .toPandas()
+
+    array_index_hg = spark.sql("select turnloss_hg_ind.elements from pulled_data limit " + str(data['rows'] - 1)) \
+        .toPandas()
+
+    array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x))
+
+    # Process lg Data #
+    array_dataset_lg = spark.sql("select turnloss_lg_com.elements from pulled_data limit " + str(data['rows'] - 1)) \
+        .toPandas()
+
+    array_index_lg = spark.sql("select turnloss_lg_ind.elements from pulled_data limit " + str(data['rows'] - 1)) \
+        .toPandas()
+
+    array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x))
+
+    # DATA #
+    data['startTime'] = _start_time
+    data['endTime'] = _end_time
+
+    data['zsthr_hg'] = spark.sql("select zsthr_hg from pulled_data limit 1").toPandas().values[0].tolist()
+    data['zsthr_lg'] = spark.sql("select zsthr_lg from pulled_data limit 1").toPandas().values[0].tolist()
+
+    data['baselinesub'] = spark.sql("select baselinesub from pulled_data limit 1").toPandas().values[0].tolist()
+    data['nturns'] = spark.sql("select nturns from pulled_data limit 1").toPandas().values[0].tolist()
+
+    # Timestamp #
+    time_stamp = spark.sql("select __record_timestamp__ from pulled_data limit " + str(data['rows'] - 1)) \
+        .toPandas()
+    data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist()
+
+    # Intime Data #
+    data['turnloss_hg_intime'] = array_dataset_hg['Total'].values.tolist()
+    data['turnloss_lg_intime'] = array_dataset_lg['Total'].values.tolist()
+
+    # Turnloss Data
+    data['turnloss_hg'] = uncompress_pandas(array_dataset_hg.values,
+                                            array_index_hg.values,
+                                            data['nturns'], data['rows'] - 1)
+
+    data['turnloss_lg'] = uncompress_pandas(array_dataset_lg.values,
+                                            array_index_lg.values,
+                                            data['nturns'], data['rows'] - 1)
+
+    # Writing to db
+    db_cursor = mydb.cursor()
+    db_cursor.execute('UPDATE data SET turnloss = %s where id = %s', (json.dumps(data), data_id,))
+
+    write_file(_device, _start_time, _end_time, _property, data)
+
+
 def pull_histogram(_start_time, _end_time, _device):
     _property = 'LoggingHistogram'
 
@@ -105,13 +448,13 @@ def pull_histogram(_start_time, _end_time, _device):
     data['startTime'] = _start_time
     data['endTime'] = _end_time
 
-    data['treshold_hg'] = df.select('threshold_hg').collect()
-    if len(data['treshold_hg']) > 0:
-        data['treshold_hg'] = data['treshold_hg'][0]
+    data['threshold_hg'] = df.select('threshold_hg').collect()
+    if len(data['threshold_hg']) > 0:
+        data['threshold_hg'] = data['threshold_hg'][0]
 
-    data['treshold_lg'] = df.select('threshold_lg').collect()
-    if len(data['treshold_lg']) > 0:
-        data['treshold_lg'] = data['treshold_lg'][0]
+    data['threshold_lg'] = df.select('threshold_lg').collect()
+    if len(data['threshold_lg']) > 0:
+        data['threshold_lg'] = data['threshold_lg'][0]
 
     # Timestamp #
     time_stamp = df.select("__record_timestamp__")
@@ -144,18 +487,12 @@ def pull_histogram(_start_time, _end_time, _device):
     db_cursor = mydb.cursor()
     db_cursor.execute('UPDATE data SET histogram = %s where id = %s', (json.dumps(data), data_id,))
 
-    if _files:
-        # Writing output
-        with open(_dest_dir + '/' + _device + '_' + _start_time.replace(' ', '_') + 'to'
-                  + _end_time.replace(' ', '_') + '_' + _property + '.dat', 'w') as outfile:
-            json.dump(data, outfile)
-
-        print(_dest_dir + '/' + _device + '_' + _start_time.replace(' ', '_') + 'to'
-              + _end_time.replace(' ', '_') + '_' + _property + '.dat' + " Pulled")
+    write_file(_device, _start_time, _end_time, _property, data)
 
 
 # Java Heap Space realy common on realFill > 10h
 
+
 def pull_integral(_start_time, _end_time, _device):
     _property = "LoggingIntegral"
 
@@ -170,23 +507,23 @@ def pull_integral(_start_time, _end_time, _device):
         .buildDataset()
 
     df.printSchema()
+    data['rows'] = df.count()
 
     # Process hg Data #
-    array_dataset_hg = df.select("integral_hg.elements")
+    array_dataset_hg = df.select("integral_hg.elements").limit(data['rows'] - 1)
     array_dataset_hg.printSchema()
 
     intime_array_hg = array_dataset_hg.withColumn('Total', sums_cols(col('elements'))).select('Total')
     intime_array_hg.printSchema()
 
     # Process lg Data #
-    array_dataset_lg = df.select("integral_lg.elements")
+    array_dataset_lg = df.select("integral_lg.elements").limit(data['rows'] - 1)
     array_dataset_lg.printSchema()
 
     intime_array_lg = array_dataset_lg.withColumn('Total', sums_cols(col('elements'))).select('Total')
     intime_array_lg.printSchema()
 
     # DATA #
-    data['rows'] = df.count()
     data['startTime'] = _start_time
     data['endTime'] = _end_time
 
@@ -266,13 +603,14 @@ def pull_raw_dist(_start_time, _end_time, _device):
         .buildDataset()
 
     df.printSchema()
+    data['rows'] = df.count()
 
     # Process hg Data #
-    array_dataset_hg = df.select("distribution_hg.elements")
+    array_dataset_hg = df.select("distribution_hg.elements").limit(data['rows'] - 1)
     array_dataset_hg.printSchema()
 
     # Process lg Data #
-    array_dataset_lg = df.select("distribution_lg.elements")
+    array_dataset_lg = df.select("distribution_lg.elements").limit(data['rows'] - 1)
     array_dataset_lg.printSchema()
 
     # DATA #
@@ -337,17 +675,17 @@ def pull_integral_dist(_start_time, _end_time, _device):
         .buildDataset()
 
     df.printSchema()
+    data['rows'] = df.count()
 
     # Process hg Data #
-    array_dataset_hg = df.select("distribution_hg.elements").limit(1)
+    array_dataset_hg = df.select("distribution_hg.elements").limit(data['rows'] - 1)
     array_dataset_hg.printSchema()
 
     # Process lg Data #
-    array_dataset_lg = df.select("distribution_lg.elements").limit(1)
+    array_dataset_lg = df.select("distribution_lg.elements").limit(data['rows'] - 1)
     array_dataset_lg.printSchema()
 
     # DATA #
-    data['rows'] = df.count()
     data['startTime'] = _start_time
     data['endTime'] = _end_time
 
@@ -492,6 +830,8 @@ def pull_turnloss(_start_time, _end_time, _device):
 
 if __name__ == '__main__':
 
+    _pandas = True
+
     _fill_name = "7340"
     _mode = "STABLE"
     _start = "2018-10-22 21:18:00.000"
@@ -551,16 +891,24 @@ if __name__ == '__main__':
     conf = SparkConf()
     conf.setMaster('yarn')
     conf.setAppName('property-pull.py')
+    conf.set('driver.memory', '20g')
 
     sc = SparkContext(conf=conf)
     sqlContext = SQLContext(sc)
     spark = SparkSession(sc)
 
-    pull_histogram(_start, _end, _device)
-    pull_integral(_start, _end, _device)
-    pull_raw_dist(_start, _end, _device)
-    pull_integral_dist(_start, _end, _device)
-    # pull_turnloss(_start, _end, _device)
+    if _pandas:
+        pull_histogram_pandas(_start, _end, _device)
+        pull_integral_pandas(_start, _end, _device)
+        pull_raw_dist_pandas(_start, _end, _device)
+        pull_integral_dist_pandas(_start, _end, _device)
+        pull_turnloss_pandas(_start, _end, _device)
+    else:
+        pull_histogram(_start, _end, _device)
+        pull_integral(_start, _end, _device)
+        pull_raw_dist(_start, _end, _device)
+        pull_integral_dist(_start, _end, _device)
+        # pull_turnloss(_start, _end, _device)
 
     mydb.commit()
     print('Pulled done, data pushed on DB')
-- 
GitLab