From 30c1c2a2bf0eef6c164b47c23fec2dab8855031e Mon Sep 17 00:00:00 2001
From: Quentin Codelupi <quentin.codelupi@cern.ch>
Date: Fri, 26 Jul 2019 14:39:12 +0200
Subject: [PATCH] [add] pulling with pandas finished

---
 property-pull.py | 183 ++++++++++++++++++++++++++++++-----------------
 1 file changed, 119 insertions(+), 64 deletions(-)

diff --git a/property-pull.py b/property-pull.py
index 4c90716..9c37c93 100644
--- a/property-pull.py
+++ b/property-pull.py
@@ -15,7 +15,7 @@ from pyspark.sql.types import IntegerType
 import numpy as np
 import pandas as pd
 
-_dest_dir = '.'
+_dest_dir = './pulledData'
 _files = False
 
 # mysqlDB variables
@@ -75,13 +75,10 @@ def uncompress(_data_array, _index_array, _size, _nrows):
 
 def uncompress_pandas(_data_array, _index_array, _size, _nrows):
     print('uncompress start')
-    result = np.zeros(_size, dtype=np.int)
+    result = np.zeros(_size, dtype=np.int64)
     for i in range(_nrows):
         tmpresult = np.zeros(_size, dtype=np.int)
-        ind = 0
-        for index in _index_array[i]:
-            tmpresult[index] = _data_array[i][ind]
-            ind += 1
+        np.put(tmpresult, _index_array[i][0], _data_array[i][0])
         result = np.add(result, tmpresult)
     print("uncompress finished")
     return result.tolist()
@@ -101,36 +98,44 @@ def pull_histogram_pandas(_start_time, _end_time, _device):
                    .keyValue("property", _property)
                    .buildDataset(), sqlContext)
 
-    data['rows'] = df.count()
+    # data['rows'] = df.count()
     df.registerTempTable("pulled_data")
 
     # Process hg array data
-    array_dataset_hg = spark.sql("select histogram_hg_com.elements from pulled_data limit " + str(data['rows'] - 1)) \
+    array_dataset_hg = spark.sql("select histogram_hg_com.elements from pulled_data") \
         .toPandas()
 
-    array_index_hg = spark.sql("select histogram_hg_ind.elements from pulled_data limit " + str(data['rows'] - 1)) \
+    array_index_hg = spark.sql("select histogram_hg_ind.elements from pulled_data") \
         .toPandas()
 
     array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x))
 
     # Process lg array data
-    array_dataset_lg = spark.sql("select histogram_lg_com.elements from pulled_data limit " + str(data['rows'] - 1)) \
+    array_dataset_lg = spark.sql("select histogram_lg_com.elements from pulled_data") \
         .toPandas()
 
-    array_index_lg = spark.sql("select histogram_lg_ind.elements from pulled_data limit " + str(data['rows'] - 1)) \
+    array_index_lg = spark.sql("select histogram_lg_ind.elements from pulled_data") \
         .toPandas()
 
     array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x))
 
     # Datas #
+    data['rows'] = array_dataset_hg.shape[0]
+    print(data['rows'])
+
     data['startTime'] = _start_time
     data['endTime'] = _end_time
 
-    data['threshold_hg'] = spark.sql("select threshold_hg from pulled_data limit 1").toPandas().values[0].tolist()
-    data['threshold_lg'] = spark.sql("select threshold_lg from pulled_data limit 1").toPandas().values[0].tolist()
+    data['threshold_hg'] = spark.sql("select threshold_hg from pulled_data limit 1").toPandas().values.tolist()
+    if len(data['threshold_hg']) > 0:
+        data['threshold_hg'] = data['threshold_hg'][0]
+
+    data['threshold_lg'] = spark.sql("select threshold_lg from pulled_data limit 1").toPandas().values.tolist()
+    if len(data['threshold_lg']) > 0:
+        data['threshold_lg'] = data['threshold_lg'][0]
 
     # Timestamp #
-    time_stamp = spark.sql("select __record_timestamp__ from pulled_data limit " + str(data['rows'] - 1)) \
+    time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \
         .toPandas()
     data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist()
 
@@ -168,51 +173,65 @@ def pull_integral_pandas(_start_time, _end_time, _device):
                    .keyValue("property", _property)
                    .buildDataset(), sqlContext)
 
-    data['rows'] = df.count()
     df.registerTempTable("pulled_data")
 
     # Process hg Data #
-    array_dataset_hg = spark.sql("select integral_hg.elements from pulled_data limit " + str(data['rows'] - 1)) \
+    array_dataset_hg = spark.sql("select integral_hg.elements from pulled_data") \
         .toPandas()
 
     array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x))
 
     # Process lg Data #
-    array_dataset_lg = spark.sql("select integral_lg.elements from pulled_data limit " + str(data['rows'] - 1)) \
+    array_dataset_lg = spark.sql("select integral_lg.elements from pulled_data") \
         .toPandas()
 
     array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x))
 
     # DATA #
+    data['rows'] = array_dataset_hg.shape[0]
     data['startTime'] = _start_time
     data['endTime'] = _end_time
 
-    data['zsthr_hg'] = spark.sql("select zsthr_hg from pulled_data limit 1").toPandas().values[0].tolist()
-    data['zsthrave_hg'] = spark.sql("select zsthrave_hg from pulled_data limit 1").toPandas().values[0].tolist()
+    data['zsthr_hg'] = spark.sql("select zsthr_hg from pulled_data limit 1").toPandas().values.tolist()
+    if len(data['zsthr_hg']) > 0:
+        data['zsthr_hg'] = data['zsthr_hg'][0]
 
-    data['zsthr_lg'] = spark.sql("select zsthr_lg from pulled_data limit 1").toPandas().values[0].tolist()
-    data['zsthrave_lg'] = spark.sql("select zsthrave_lg from pulled_data limit 1").toPandas().values[0].tolist()
+    data['zsthrave_hg'] = spark.sql("select zsthrave_hg from pulled_data limit 1").toPandas().values.tolist()
+    if len(data['zsthrave_hg']) > 0:
+        data['zsthrave_hg'] = data['zsthrave_hg'][0]
 
-    data['baselinesub'] = spark.sql("select baselinesub from pulled_data limit 1").toPandas().values[0].tolist()
+    data['zsthr_lg'] = spark.sql("select zsthr_lg from pulled_data limit 1").toPandas().values.tolist()
+    if len(data['zsthr_lg']) > 0:
+        data['zsthr_lg'] = data['zsthr_lg'][0]
+
+    data['zsthrave_lg'] = spark.sql("select zsthrave_lg from pulled_data limit 1").toPandas().values.tolist()
+    if len(data['zsthrave_lg']) > 0:
+        data['zsthrave_lg'] = data['zsthrave_lg'][0]
+
+    data['baselinesub'] = spark.sql("select baselinesub from pulled_data limit 1").toPandas().values.tolist()
+    if len(data['baselinesub']) > 0:
+        data['baselinesub'] = data['baselinesub'][0]
 
     # Intime Data #
     data['integral_hg_intime'] = array_dataset_hg['Total'].values.tolist()
     data['integral_lg_intime'] = array_dataset_lg['Total'].values.tolist()
 
     # Timestamp #
-    time_stamp = spark.sql("select __record_timestamp__ from pulled_data limit " + str(data['rows'] - 1)) \
+    time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \
         .toPandas()
     data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist()
 
     # Integral Data
     integral_hg = np.zeros(3564, dtype=np.int)
     for T in array_dataset_hg.values:
-        integral_hg = np.add(integral_hg, T)
+        if len(T[0]) == 3564:
+            integral_hg = np.add(integral_hg, T[0])
     data['integral_hg'] = integral_hg.tolist()
 
     integral_lg = np.zeros(3564, dtype=np.int)
     for T in array_dataset_lg.values:
-        integral_lg = np.add(integral_lg, T)
+        if len(T[0]) == 3564:
+            integral_lg = np.add(integral_lg, T[0])
     data['integral_lg'] = integral_lg.tolist()
 
     # Writing to db
@@ -236,39 +255,53 @@ def pull_raw_dist_pandas(_start_time, _end_time, _device):
                    .keyValue("property", _property)
                    .buildDataset(), sqlContext)
 
-    data['rows'] = df.count()
+    # data['rows'] = df.count()
+    df.registerTempTable("pulled_data")
 
     # Process hg Data #
-    array_dataset_hg = spark.sql("select distribution_hg.elements from pulled_data limit " + str(data['rows'] - 1)) \
+    array_dataset_hg = spark.sql("select distribution_hg.elements from pulled_data") \
         .toPandas()
 
     # Process lg Data #
-    array_dataset_lg = spark.sql("select distribution_lg.elements from pulled_data limit " + str(data['rows'] - 1)) \
+    array_dataset_lg = spark.sql("select distribution_lg.elements from pulled_data") \
         .toPandas()
 
     # DATA #
+    data['rows'] = array_dataset_hg.shape[0]
     data['startTime'] = _start_time
     data['endTime'] = _end_time
 
-    data['lsbcut_hg'] = spark.sql("select lsbcut_hg from pulled_data limit 1").toPandas().values[0].tolist()
-    data['offset_hg'] = spark.sql("select offset_hg from pulled_data limit 1").toPandas().values[0].tolist()
+    data['lsbcut_hg'] = spark.sql("select lsbcut_hg from pulled_data limit 1").toPandas().values.tolist()
+    if len(data['lsbcut_hg']) > 0:
+        data['lsbcut_hg'] = data['lsbcut_hg'][0]
+
+    data['offset_hg'] = spark.sql("select offset_hg from pulled_data limit 1").toPandas().values.tolist()
+    if len(data['offset_hg']) > 0:
+        data['offset_hg'] = data['offset_hg'][0]
 
-    data['lsbcut_lg'] = spark.sql("select lsbcut_lg from pulled_data limit 1").toPandas().values[0].tolist()
-    data['offset_lg'] = spark.sql("select offset_lg from pulled_data limit 1").toPandas().values[0].tolist()
+    data['lsbcut_lg'] = spark.sql("select lsbcut_lg from pulled_data limit 1").toPandas().values.tolist()
+    if len(data['lsbcut_lg']) > 0:
+        data['lsbcut_lg'] = data['lsbcut_lg'][0]
+
+    data['offset_lg'] = spark.sql("select offset_lg from pulled_data limit 1").toPandas().values.tolist()
+    if len(data['offset_lg']) > 0:
+        data['offset_lg'] = data['offset_lg'][0]
 
     # Array Data #
     result_distrib_hg = np.zeros(2048, dtype=np.int)
     for R in array_dataset_hg.values:
-        result_distrib_hg = np.add(result_distrib_hg, R)
+        if len(R[0]) == 2048:
+            result_distrib_hg = np.add(result_distrib_hg, R[0])
     data['distribution_hg'] = result_distrib_hg.tolist()
 
     result_distrib_lg = np.zeros(2048, dtype=np.int)
     for R in array_dataset_lg.values:
-        result_distrib_lg = np.add(result_distrib_lg, R)
+        if len(R[0]) == 2048:
+            result_distrib_lg = np.add(result_distrib_lg, R[0])
     data['distribution_lg'] = result_distrib_lg.tolist()
 
     # Timestamp #
-    time_stamp = spark.sql("select __record_timestamp__ from pulled_data limit " + str(data['rows'] - 1)) \
+    time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \
         .toPandas()
     data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist()
 
@@ -293,42 +326,57 @@ def pull_integral_dist_pandas(_start_time, _end_time, _device):
                    .keyValue("property", _property)
                    .buildDataset(), sqlContext)
 
-    data['rows'] = df.count()
+    # data['rows'] = df.count()
+    df.registerTempTable("pulled_data")
 
     # Process hg Data #
-    array_dataset_hg = spark.sql("select distribution_hg.elements from pulled_data limit " + str(data['rows'] - 1)) \
+    array_dataset_hg = spark.sql("select distribution_hg.elements from pulled_data") \
         .toPandas()
 
     # Process lg Data #
-    array_dataset_lg = spark.sql("select distribution_lg.elements from pulled_data limit " + str(data['rows'] - 1)) \
+    array_dataset_lg = spark.sql("select distribution_lg.elements from pulled_data") \
         .toPandas()
 
     # DATA #
+    data['rows'] = array_dataset_hg.shape[0]
     data['startTime'] = _start_time
     data['endTime'] = _end_time
 
-    data['lsbcut_hg'] = spark.sql("select lsbcut_hg from pulled_data limit 1").toPandas().values[0].tolist()
-    data['offset_hg'] = spark.sql("select offset_hg from pulled_data limit 1").toPandas().values[0].tolist()
+    data['lsbcut_hg'] = spark.sql("select lsbcut_hg from pulled_data limit 1").toPandas().values.tolist()
+    if len(data['lsbcut_hg']) > 0:
+        data['lsbcut_hg'] = data['lsbcut_hg'][0]
+
+    data['offset_hg'] = spark.sql("select offset_hg from pulled_data limit 1").toPandas().values.tolist()
+    if len(data['offset_hg']) > 0:
+        data['offset_hg'] = data['offset_hg'][0]
+
+    data['lsbcut_lg'] = spark.sql("select lsbcut_lg from pulled_data limit 1").toPandas().values.tolist()
+    if len(data['lsbcut_lg']) > 0:
+        data['lsbcut_lg'] = data['lsbcut_lg'][0]
 
-    data['lsbcut_lg'] = spark.sql("select lsbcut_lg from pulled_data limit 1").toPandas().values[0].tolist()
-    data['offset_lg'] = spark.sql("select offset_lg from pulled_data limit 1").toPandas().values[0].tolist()
+    data['offset_lg'] = spark.sql("select offset_lg from pulled_data limit 1").toPandas().values.tolist()
+    if len(data['offset_lg']) > 0:
+        data['offset_lg'] = data['offset_lg'][0]
 
-    data['nturns'] = spark.sql("select nturns from pulled_data limit 1").toPandas().values[0].tolist()
+    data['nturns'] = spark.sql("select nturns from pulled_data limit 1").toPandas().values.tolist()
+    if len(data['nturns']) > 0:
+        data['nturns'] = data['nturns'][0]
 
     # Array Data #
     result_distrib_hg = np.zeros(2048, dtype=np.int)
     for R in array_dataset_hg.values:
-        result_distrib_hg = np.add(result_distrib_hg, R)
+        if len(R[0]) == 2048:
+            result_distrib_hg = np.add(result_distrib_hg, R[0])
     data['distribution_hg'] = result_distrib_hg.tolist()
 
     result_distrib_lg = np.zeros(2048, dtype=np.int)
     for R in array_dataset_lg.values:
-        if len(R) == 2048:
-            result_distrib_lg = np.add(result_distrib_lg, R)
+        if len(R[0]) == 2048:
+            result_distrib_lg = np.add(result_distrib_lg, R[0])
     data['distribution_lg'] = result_distrib_lg.tolist()
 
     # Timestamp #
-    time_stamp = spark.sql("select __record_timestamp__ from pulled_data limit " + str(data['rows'] - 1)) \
+    time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \
         .toPandas()
     data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist()
 
@@ -353,38 +401,50 @@ def pull_turnloss_pandas(_start_time, _end_time, _device):
                    .keyValue("property", _property)
                    .buildDataset(), sqlContext)
 
-    data['rows'] = df.count()
+    # data['rows'] = df.count()
+    df.registerTempTable("pulled_data")
 
     # Process hg Data #
-    array_dataset_hg = spark.sql("select turnloss_hg_com.elements from pulled_data limit " + str(data['rows'] - 1)) \
+    array_dataset_hg = spark.sql("select turnloss_hg_com.elements from pulled_data") \
         .toPandas()
 
-    array_index_hg = spark.sql("select turnloss_hg_ind.elements from pulled_data limit " + str(data['rows'] - 1)) \
+    array_index_hg = spark.sql("select turnloss_hg_ind.elements from pulled_data") \
         .toPandas()
 
     array_dataset_hg['Total'] = array_dataset_hg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x))
 
     # Process lg Data #
-    array_dataset_lg = spark.sql("select turnloss_lg_com.elements from pulled_data limit " + str(data['rows'] - 1)) \
+    array_dataset_lg = spark.sql("select turnloss_lg_com.elements from pulled_data") \
         .toPandas()
 
-    array_index_lg = spark.sql("select turnloss_lg_ind.elements from pulled_data limit " + str(data['rows'] - 1)) \
+    array_index_lg = spark.sql("select turnloss_lg_ind.elements from pulled_data") \
         .toPandas()
 
     array_dataset_lg['Total'] = array_dataset_lg['elements'].apply(lambda x: 0 if len(x) == 0 else __builtins__.sum(x))
 
     # DATA #
+    data['rows'] = array_dataset_hg.shape[0]
     data['startTime'] = _start_time
     data['endTime'] = _end_time
 
-    data['zsthr_hg'] = spark.sql("select zsthr_hg from pulled_data limit 1").toPandas().values[0].tolist()
-    data['zsthr_lg'] = spark.sql("select zsthr_lg from pulled_data limit 1").toPandas().values[0].tolist()
+    data['zsthr_hg'] = spark.sql("select zsthr_hg from pulled_data limit 1").toPandas().values.tolist()
+    if len(data['zsthr_hg']) > 0:
+        data['zsthr_hg'] = data['zsthr_hg'][0]
+
+    data['zsthr_lg'] = spark.sql("select zsthr_lg from pulled_data limit 1").toPandas().values.tolist()
+    if len(data['zsthr_lg']) > 0:
+        data['zsthr_lg'] = data['zsthr_lg'][0]
+
+    data['baselinesub'] = spark.sql("select baselinesub from pulled_data limit 1").toPandas().values.tolist()
+    if len(data['baselinesub']) > 0:
+        data['baselinesub'] = data['baselinesub'][0]
 
-    data['baselinesub'] = spark.sql("select baselinesub from pulled_data limit 1").toPandas().values[0].tolist()
-    data['nturns'] = spark.sql("select nturns from pulled_data limit 1").toPandas().values[0].tolist()
+    data['nturns'] = spark.sql("select nturns from pulled_data limit 1").toPandas().values.tolist()
+    if len(data['nturns']) > 0:
+        data['nturns'] = data['nturns'][0]
 
     # Timestamp #
-    time_stamp = spark.sql("select __record_timestamp__ from pulled_data limit " + str(data['rows'] - 1)) \
+    time_stamp = spark.sql("select __record_timestamp__ from pulled_data") \
         .toPandas()
     data['timestamps'] = time_stamp['__record_timestamp__'].values.tolist()
 
@@ -395,11 +455,11 @@ def pull_turnloss_pandas(_start_time, _end_time, _device):
     # Turnloss Data
     data['turnloss_hg'] = uncompress_pandas(array_dataset_hg.values,
                                             array_index_hg.values,
-                                            data['nturns'], data['rows'] - 1)
+                                            data['nturns'], data['rows'])
 
     data['turnloss_lg'] = uncompress_pandas(array_dataset_lg.values,
                                             array_index_lg.values,
-                                            data['nturns'], data['rows'] - 1)
+                                            data['nturns'], data['rows'])
 
     # Writing to db
     db_cursor = mydb.cursor()
@@ -490,9 +550,6 @@ def pull_histogram(_start_time, _end_time, _device):
     write_file(_device, _start_time, _end_time, _property, data)
 
 
-# Java Heap Space realy common on realFill > 10h
-
-
 def pull_integral(_start_time, _end_time, _device):
     _property = "LoggingIntegral"
 
@@ -587,8 +644,6 @@ def pull_integral(_start_time, _end_time, _device):
     write_file(_device, _start_time, _end_time, _property, data)
 
 
-# Not Working with too mny dat (dump memory)
-# MB split the search (end / start )
 def pull_raw_dist(_start_time, _end_time, _device):
     _property = "LoggingRawDist"
 
-- 
GitLab