From c2887568cf8e008bd17217ec9e88ff91c01887d5 Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Tue, 19 May 2020 14:11:28 +0200
Subject: [PATCH 01/25] first try for ipoc resampling (to be tested)

---
 src/spark/get_data.py | 46 ++++++++++++++++++++++++++++++++++---------
 1 file changed, 37 insertions(+), 9 deletions(-)

diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index 75da720c..779ff369 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -13,7 +13,7 @@ from pyspark import SparkContext
 from pyspark.conf import SparkConf
 from pyspark.sql import Window, SparkSession
 from pyspark.sql.functions import udf, collect_list, first, last, asc, count, when, isnull
-from pyspark.sql.types import DateType
+from pyspark.sql.types import DateType, LongType, ArrayType
 from cern.nxcals.pyquery.builders import DevicePropertyQuery
 from scipy import signal
 
@@ -122,13 +122,37 @@ class DataProcessor(ABC):
         start_df = start_df.drop('t')
         return start_df
 
-    def resample(self, timestamps):
+    @staticmethod
+    def date_range(t1, t2, step=1):
+        """
+        Create range of timestamps with step_size step between t1 and t2
+        :param t1: (DateType) lower bound of time-interval
+        :param t2: (DateType) upper bound of time-interval
+        :param step: (int) step size in seconds
+        """
+        nr_of_ns_in_s = 1000000000
+        step = step * nr_of_ns_in_s
+        return [t1 + step * x for x in range(int((t2 - t1) / step) + 1)]
+
+    def resample(self, timestamps, step=1):
         """
         Resample data to certain timestamps
         Args:
             timestamps: timestamps which will be kept in DataFrame
         """
-        self.data = self.data.where(f.col(_dfindex).isin(timestamps))
+        # udf to generate upsampled timestamps
+        date_range_udf = f.udf(lambda x, y: self.date_range(x, y, step), ArrayType(LongType()))
+        # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp)
+        max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0]
+        # get next timestamp using sliding window
+        self.data = self.data.withColumn('next_ts', f.lead(f.col('acqStamp'), default=max_time)
+                                         .over(Window.orderBy("acqStamp")))
+        # resample data to 'step'
+        self.data = self.data.withColumn("resampled_time", f.explode(date_range_udf(f.col(_dfindex), f.col('next_ts'))))
+        self.data = self.data.drop('next_ts')
+        # join dataframes on timestamp index (rounded to nearest second)
+        div_udf = udf(lambda x: x // 1000000000)
+        self.data = self.data.where(div_udf(f.col(_dfindex)).isin(timestamps))
 
     @staticmethod
     def handle_array(df, col_name):
@@ -382,7 +406,9 @@ def write_to_hdfs(df, path, dformat='csv', repartition=True):
         repartition: repartition data to 1 large file or not
     """
     f_path = "/user/" + getpass.getuser() + "/ml-data/" + path
-    print("{} - Writing out a total of {} columns/features and {} entries to {}".format(datetime.now(), len(df.columns)-1, df.count(), f_path))
+    print("{} - Writing out a total of {} columns/features and {} entries to {}".format(datetime.now(),
+                                                                                        len(df.columns) - 1, df.count(),
+                                                                                        f_path))
     if repartition:
         df = df.repartition(1)
         print("{} - Repartition finished, starting write..".format(datetime.now()))
@@ -486,7 +512,8 @@ if __name__ == "__main__":
     spark = SparkSession(sc)
 
     parser = argparse.ArgumentParser(description="Pyspark script to fetch data from NXCALS; start with spark-submit")
-    parser.add_argument('-c', dest='configfn', default='config.yaml', help='config file for NXCALS queries and preprocessing')
+    parser.add_argument('-c', dest='configfn', default='config.yaml',
+                        help='config file for NXCALS queries and preprocessing')
     args = parser.parse_args()
 
     # absolute path to the config folder
@@ -513,7 +540,7 @@ if __name__ == "__main__":
     n_timestamps = len(time_intervals)
     for i in range(0, n_timestamps, 2):
         print("Getting data for time interval {}".format(i))
-        time_interval = time_intervals[i], time_intervals[i+1]
+        time_interval = time_intervals[i], time_intervals[i + 1]
 
         print("Getting unique timestamps from {} device".format(tsq[0]))
         timestamps = get_segment_timestamps(time_interval, tsq, spark)
@@ -532,13 +559,14 @@ if __name__ == "__main__":
             if not dfp:
                 print("Warning - no Preprocessor could be created for {}".format(subs))
             else:
-                print("{} - Updating {} column names and adding these to the common features dataset".format(datetime.now(), subs))
+                print("{} - Updating {} column names and adding these to the common features dataset".format(
+                    datetime.now(), subs))
                 # add prefix cause unique column names required by hdfs;
                 # backticks needed cause of spaces in column names, replacing these for the new alias
                 # _dfindex has to be kept cause it's used for joining
                 dfp = dfp.select([f.col("`" + c + "`").alias(subs + "_" + c.replace(" ", "")) for c in dfp.columns]) \
                     .withColumnRenamed(subs + "_" + _dfindex, _dfindex)
                 features = update_features(features, dfp)
-        suffix = '_part_' + str(i//2) if n_timestamps > 2 else ''
+        suffix = '_part_' + str(i // 2) if n_timestamps > 2 else ''
         print("Preprocessing done")
-        write_to_hdfs(features, 'features'+suffix, dformat)
+        write_to_hdfs(features, 'features' + suffix, dformat)
-- 
GitLab


From 92ec3e56f3e2b3190404a39471c4c4d2e78c1dda Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Tue, 19 May 2020 14:48:57 +0200
Subject: [PATCH 02/25] fixed small column name bug and used _dfindex instead
 of acqStamp

---
 src/spark/get_data.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index 779ff369..f8beb351 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -145,10 +145,10 @@ class DataProcessor(ABC):
         # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp)
         max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0]
         # get next timestamp using sliding window
-        self.data = self.data.withColumn('next_ts', f.lead(f.col('acqStamp'), default=max_time)
-                                         .over(Window.orderBy("acqStamp")))
+        self.data = self.data.withColumn('next_ts', f.lead(f.col(_dfindex), default=max_time)
+                                         .over(Window.orderBy(_dfindex)))
         # resample data to 'step'
-        self.data = self.data.withColumn("resampled_time", f.explode(date_range_udf(f.col(_dfindex), f.col('next_ts'))))
+        self.data = self.data.withColumn(_dfindex, f.explode(date_range_udf(f.col(_dfindex), f.col('next_ts'))))
         self.data = self.data.drop('next_ts')
         # join dataframes on timestamp index (rounded to nearest second)
         div_udf = udf(lambda x: x // 1000000000)
-- 
GitLab


From 09049fad1fde75e369dbe92292ec925abe290169 Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Tue, 19 May 2020 15:00:52 +0200
Subject: [PATCH 03/25] trying to fix bug caused by unserializable datatype

---
 src/spark/get_data.py | 26 +++++++++++++-------------
 1 file changed, 13 insertions(+), 13 deletions(-)

diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index f8beb351..b457095d 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -122,18 +122,6 @@ class DataProcessor(ABC):
         start_df = start_df.drop('t')
         return start_df
 
-    @staticmethod
-    def date_range(t1, t2, step=1):
-        """
-        Create range of timestamps with step_size step between t1 and t2
-        :param t1: (DateType) lower bound of time-interval
-        :param t2: (DateType) upper bound of time-interval
-        :param step: (int) step size in seconds
-        """
-        nr_of_ns_in_s = 1000000000
-        step = step * nr_of_ns_in_s
-        return [t1 + step * x for x in range(int((t2 - t1) / step) + 1)]
-
     def resample(self, timestamps, step=1):
         """
         Resample data to certain timestamps
@@ -141,7 +129,7 @@ class DataProcessor(ABC):
             timestamps: timestamps which will be kept in DataFrame
         """
         # udf to generate upsampled timestamps
-        date_range_udf = f.udf(lambda x, y: self.date_range(x, y, step), ArrayType(LongType()))
+        date_range_udf = f.udf(lambda x, y: date_range(x, y, step), ArrayType(LongType()))
         # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp)
         max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0]
         # get next timestamp using sliding window
@@ -363,6 +351,18 @@ class BetsProcessor(DataProcessor):
         self.combine_sensor_data()
 
 
+def date_range(t1, t2, step=1):
+    """
+    Create range of timestamps with step_size step between t1 and t2
+    :param t1: (DateType) lower bound of time-interval
+    :param t2: (DateType) upper bound of time-interval
+    :param step: (int) step size in seconds
+    """
+    nr_of_ns_in_s = 1000000000
+    step = step * nr_of_ns_in_s
+    return [t1 + step * x for x in range(int((t2 - t1) / step) + 1)]
+
+
 def cmw_query(spark, startt, endt, devicel, propertyl):
     df = DevicePropertyQuery \
         .builder(spark) \
-- 
GitLab


From 3dd34a0225d73f5ee2663cee5160e01b12f0ec7d Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Tue, 19 May 2020 15:12:06 +0200
Subject: [PATCH 04/25] pickling error fixed, trying to fix error in count() in
 print tstatement after resample

---
 src/spark/get_data.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index b457095d..7a564ca4 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -133,8 +133,9 @@ class DataProcessor(ABC):
         # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp)
         max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0]
         # get next timestamp using sliding window
+        window = Window.partitionBy(_dfdev).orderBy(_dfindex)
         self.data = self.data.withColumn('next_ts', f.lead(f.col(_dfindex), default=max_time)
-                                         .over(Window.orderBy(_dfindex)))
+                                         .over(window))
         # resample data to 'step'
         self.data = self.data.withColumn(_dfindex, f.explode(date_range_udf(f.col(_dfindex), f.col('next_ts'))))
         self.data = self.data.drop('next_ts')
@@ -463,6 +464,7 @@ def preprocess(df, subs, filter_list, time_interval, timestamps, store_data=Fals
     if timestamps:
         # not a real resample, more a filter, data has to be filled first (happens in preprocess above)
         proc.resample(timestamps)
+        print(proc.data.schema)
         print('{} - Resampled {}, {} entries remain'.format(datetime.now(), subs, proc.data.count()))
     if store_data:
         write_to_hdfs(proc.data, subs)
-- 
GitLab


From 692dc1c661cec4b185a72b11230d3c92f1f8744e Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Tue, 19 May 2020 15:16:41 +0200
Subject: [PATCH 05/25] used wrong index in window, fixed

---
 src/spark/get_data.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index 7a564ca4..feb2084e 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -133,7 +133,7 @@ class DataProcessor(ABC):
         # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp)
         max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0]
         # get next timestamp using sliding window
-        window = Window.partitionBy(_dfdev).orderBy(_dfindex)
+        window = Window.partitionBy(_dfindex).orderBy(_dfindex)
         self.data = self.data.withColumn('next_ts', f.lead(f.col(_dfindex), default=max_time)
                                          .over(window))
         # resample data to 'step'
-- 
GitLab


From 810c920fb59b11f70dd799f8cffc9632e6c370e4 Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Tue, 19 May 2020 15:32:52 +0200
Subject: [PATCH 06/25] write dataframe to hdfs for debugging

---
 src/spark/get_data.py | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index feb2084e..42b5ceec 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -463,8 +463,7 @@ def preprocess(df, subs, filter_list, time_interval, timestamps, store_data=Fals
     print('Preprocessing {} finished, start resampling..'.format(subs))
     if timestamps:
         # not a real resample, more a filter, data has to be filled first (happens in preprocess above)
-        proc.resample(timestamps)
-        print(proc.data.schema)
+        write_to_hdfs(proc.data,'debug','json')
         print('{} - Resampled {}, {} entries remain'.format(datetime.now(), subs, proc.data.count()))
     if store_data:
         write_to_hdfs(proc.data, subs)
-- 
GitLab


From d0c8c965a0a24d4958dac9274fc01b38d622a4dc Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Tue, 19 May 2020 15:44:07 +0200
Subject: [PATCH 07/25] added debug prints

---
 src/spark/get_data.py | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index 42b5ceec..209a7cdd 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -142,6 +142,8 @@ class DataProcessor(ABC):
         # join dataframes on timestamp index (rounded to nearest second)
         div_udf = udf(lambda x: x // 1000000000)
         self.data = self.data.where(div_udf(f.col(_dfindex)).isin(timestamps))
+        print('nr of rows', self.data.count())
+        print('timestamps size', len(timestamps))
 
     @staticmethod
     def handle_array(df, col_name):
-- 
GitLab


From 1f7b3523558f555f78a6aa66732b7f2c8549f88f Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Tue, 19 May 2020 16:33:37 +0200
Subject: [PATCH 08/25] trying to cast possible strings back to integers

---
 src/spark/get_data.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index 209a7cdd..6b8d7a40 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -142,8 +142,6 @@ class DataProcessor(ABC):
         # join dataframes on timestamp index (rounded to nearest second)
         div_udf = udf(lambda x: x // 1000000000)
         self.data = self.data.where(div_udf(f.col(_dfindex)).isin(timestamps))
-        print('nr of rows', self.data.count())
-        print('timestamps size', len(timestamps))
 
     @staticmethod
     def handle_array(df, col_name):
@@ -361,6 +359,8 @@ def date_range(t1, t2, step=1):
     :param t2: (DateType) upper bound of time-interval
     :param step: (int) step size in seconds
     """
+    t1 = int(t1)
+    t2 = int(t2)
     nr_of_ns_in_s = 1000000000
     step = step * nr_of_ns_in_s
     return [t1 + step * x for x in range(int((t2 - t1) / step) + 1)]
@@ -465,7 +465,7 @@ def preprocess(df, subs, filter_list, time_interval, timestamps, store_data=Fals
     print('Preprocessing {} finished, start resampling..'.format(subs))
     if timestamps:
         # not a real resample, more a filter, data has to be filled first (happens in preprocess above)
-        write_to_hdfs(proc.data,'debug','json')
+        proc.resample(timestamps)
         print('{} - Resampled {}, {} entries remain'.format(datetime.now(), subs, proc.data.count()))
     if store_data:
         write_to_hdfs(proc.data, subs)
-- 
GitLab


From 054bab170a6058fb874b28da7e51efd03f75541f Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Tue, 19 May 2020 16:53:43 +0200
Subject: [PATCH 09/25] string error is fixed, now checking why all columns are
 filtered when resmapling after upsample

---
 src/spark/get_data.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index 6b8d7a40..8ce87b30 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -141,6 +141,7 @@ class DataProcessor(ABC):
         self.data = self.data.drop('next_ts')
         # join dataframes on timestamp index (rounded to nearest second)
         div_udf = udf(lambda x: x // 1000000000)
+        print(timestamps[0:10])
         self.data = self.data.where(div_udf(f.col(_dfindex)).isin(timestamps))
 
     @staticmethod
-- 
GitLab


From a9c151599dfa9028b0fcce28c0e885874792e192 Mon Sep 17 00:00:00 2001
From: Kyle <kyle.bringmans@cern.ch>
Date: Tue, 19 May 2020 19:25:19 +0200
Subject: [PATCH 10/25] resampling seems to work

---
 src/config_files/config_mki.yaml | 4 ++--
 src/spark/get_data.py            | 8 ++++----
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/src/config_files/config_mki.yaml b/src/config_files/config_mki.yaml
index b7b32cfc..7572176f 100644
--- a/src/config_files/config_mki.yaml
+++ b/src/config_files/config_mki.yaml
@@ -3,8 +3,8 @@ load_data:
 preprocess:
   task: MKI
 time_intervals:
-  - '2017-01-01 00:00:00.000'
-  - '2017-12-31 00:00:00.000'
+  - '2017-05-01 00:00:00.000'
+  - '2017-05-02 00:00:00.000'
 features:
   # json here cause of several arrays, csv takes less space but doesn't allow these
   dataformat: json
diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index 8ce87b30..e76b95d4 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -133,16 +133,16 @@ class DataProcessor(ABC):
         # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp)
         max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0]
         # get next timestamp using sliding window
-        window = Window.partitionBy(_dfindex).orderBy(_dfindex)
+        window = Window.orderBy(_dfindex)
         self.data = self.data.withColumn('next_ts', f.lead(f.col(_dfindex), default=max_time)
                                          .over(window))
         # resample data to 'step'
+        print(self.data.take(2))
         self.data = self.data.withColumn(_dfindex, f.explode(date_range_udf(f.col(_dfindex), f.col('next_ts'))))
         self.data = self.data.drop('next_ts')
         # join dataframes on timestamp index (rounded to nearest second)
         div_udf = udf(lambda x: x // 1000000000)
-        print(timestamps[0:10])
-        self.data = self.data.where(div_udf(f.col(_dfindex)).isin(timestamps))
+        self.data = self.data.where(f.col(_dfindex).isin(timestamps))
 
     @staticmethod
     def handle_array(df, col_name):
@@ -362,7 +362,7 @@ def date_range(t1, t2, step=1):
     """
     t1 = int(t1)
     t2 = int(t2)
-    nr_of_ns_in_s = 1000000000
+    nr_of_ns_in_s = 1
     step = step * nr_of_ns_in_s
     return [t1 + step * x for x in range(int((t2 - t1) / step) + 1)]
 
-- 
GitLab


From d33d2ad26d7c4d477f355490612c1097ac394dc0 Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Tue, 19 May 2020 19:34:36 +0200
Subject: [PATCH 11/25] removed unnecessary prints and documented design
 choices in the code when necessary

---
 src/spark/get_data.py | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index 8ce87b30..67806f3b 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -133,16 +133,17 @@ class DataProcessor(ABC):
         # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp)
         max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0]
         # get next timestamp using sliding window
-        window = Window.partitionBy(_dfindex).orderBy(_dfindex)
+        # cannot speedup by partitioning because unique timestamps in each subframe will get the default value of
+        # max_time for column next_ts which is incorrect
+        window = Window.orderBy(_dfindex)
         self.data = self.data.withColumn('next_ts', f.lead(f.col(_dfindex), default=max_time)
                                          .over(window))
         # resample data to 'step'
         self.data = self.data.withColumn(_dfindex, f.explode(date_range_udf(f.col(_dfindex), f.col('next_ts'))))
         self.data = self.data.drop('next_ts')
+
         # join dataframes on timestamp index (rounded to nearest second)
-        div_udf = udf(lambda x: x // 1000000000)
-        print(timestamps[0:10])
-        self.data = self.data.where(div_udf(f.col(_dfindex)).isin(timestamps))
+        self.data = self.data.where(f.col(_dfindex).isin(timestamps))
 
     @staticmethod
     def handle_array(df, col_name):
@@ -353,6 +354,7 @@ class BetsProcessor(DataProcessor):
         self.combine_sensor_data()
 
 
+# Not made static in DataProcessor because it breaks pickling in Spark
 def date_range(t1, t2, step=1):
     """
     Create range of timestamps with step_size step between t1 and t2
@@ -360,10 +362,9 @@ def date_range(t1, t2, step=1):
     :param t2: (DateType) upper bound of time-interval
     :param step: (int) step size in seconds
     """
+    # Ensure timestamps are integers
     t1 = int(t1)
     t2 = int(t2)
-    nr_of_ns_in_s = 1000000000
-    step = step * nr_of_ns_in_s
     return [t1 + step * x for x in range(int((t2 - t1) / step) + 1)]
 
 
-- 
GitLab


From 02becf83fd3900c89994e80bf9c7cc0974b6c08d Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <kyle.bringmans@cern.ch>
Date: Tue, 19 May 2020 20:44:16 +0000
Subject: [PATCH 12/25] fixed problem with merge

---
 src/spark/get_data.py | 2 --
 1 file changed, 2 deletions(-)

diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index 42f1af9b..bf4e546a 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -362,8 +362,6 @@ def date_range(t1, t2, step=1):
     """
     t1 = int(t1)
     t2 = int(t2)
-    nr_of_ns_in_s = 1000000000
-    step = step * nr_of_ns_in_s
     return [t1 + step * x for x in range(int((t2 - t1) / step) + 1)]
 
 
-- 
GitLab


From 3bca476f2b98f4b5f478a50705c67f3d9acce2f0 Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Wed, 20 May 2020 10:05:55 +0200
Subject: [PATCH 13/25] trying to fix increase in timestamps after resample

---
 src/spark/get_data.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index bf4e546a..cb2e4785 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -142,6 +142,7 @@ class DataProcessor(ABC):
         self.data = self.data.withColumn(_dfindex, f.explode(date_range_udf(f.col(_dfindex), f.col('next_ts'))))
         self.data = self.data.drop('next_ts')
         # join dataframes on timestamp index (rounded to nearest second)
+        self.data = self.data.dropDuplicates([_dfindex])
         self.data = self.data.where(f.col(_dfindex).isin(timestamps))
 
     @staticmethod
-- 
GitLab


From 83b15046765c768e6e5d6b37f199f4f8691bd01e Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Wed, 20 May 2020 13:49:22 +0200
Subject: [PATCH 14/25] added week partitioning

---
 src/config_files/config_lbds.yaml | 8 ++++----
 src/spark/get_data.py             | 7 ++++---
 src/util.py                       | 1 +
 3 files changed, 9 insertions(+), 7 deletions(-)

diff --git a/src/config_files/config_lbds.yaml b/src/config_files/config_lbds.yaml
index 83f054d7..1bfcb53d 100644
--- a/src/config_files/config_lbds.yaml
+++ b/src/config_files/config_lbds.yaml
@@ -1,7 +1,7 @@
 load_data:
   type: JSON
 train_data_file_name: 'features_lbds_2017_b1-2.csv'
-beam: 1
+beam: 2
 preprocess:
   task: LBDS
   use_filters: True
@@ -205,10 +205,10 @@ grid_search:
       feature_selection: 'all'
       scoring: 'auc'
       params: {
-        n_estimators: [50, 100, 200, 300, 400],
-        max_samples: [64, 128, 256, 512],
+        n_estimators: [50, 100, 200, 300,],
+        max_samples: [32, 64, 128, 256],
         contamination: [0.005, 0.01, 0.5],
-        max_features: [.6, .8, 1.]
+        max_features: [0.4, .6, .8, 1.]
       }
 
     lof:
diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index cb2e4785..934cb294 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -132,14 +132,15 @@ class DataProcessor(ABC):
         date_range_udf = f.udf(lambda x, y: date_range(x, y, step), ArrayType(LongType()))
         # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp)
         max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0]
+        # Get week to of year to enable sliding window partitions
+        self.data = self.data.withColumn('week_of_year', f.weekofyear(f.from_unixtime(f.col(_dfindex))))
         # get next timestamp using sliding window
-        # cannot speedup by partitioning because unique timestamps in each subframe will get the default value of
-        # max_time for column next_ts which is incorrect
-        window = Window.orderBy(_dfindex)
+        window = Window.partitionBy('week_of_year').orderBy(_dfindex)
         self.data = self.data.withColumn('next_ts', f.lead(f.col(_dfindex), default=max_time)
                                          .over(window))
         # resample data to 'step'
         self.data = self.data.withColumn(_dfindex, f.explode(date_range_udf(f.col(_dfindex), f.col('next_ts'))))
+        self.data = self.data.drop('week_of_year')
         self.data = self.data.drop('next_ts')
         # join dataframes on timestamp index (rounded to nearest second)
         self.data = self.data.dropDuplicates([_dfindex])
diff --git a/src/util.py b/src/util.py
index 97e672ac..cda8f2cf 100644
--- a/src/util.py
+++ b/src/util.py
@@ -242,6 +242,7 @@ def get_lbds_data(filter_cols, start_time, end_time, cfg, use_case):
         try:
             fn += cfg['train_data_file_name']
             beam = cfg['beam']
+            print("Loading feature vectors for beam {}".format(beam))
         except KeyError as e:
             print("Config file is missing configuration variable {}".format(e))
             exit()
-- 
GitLab


From a58374e949c1233ff5e4b2664bc8f5c96dccd2ff Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Wed, 20 May 2020 13:52:23 +0200
Subject: [PATCH 15/25] Revert "added week partitioning"

This reverts commit 83b15046765c768e6e5d6b37f199f4f8691bd01e.
---
 src/config_files/config_lbds.yaml | 8 ++++----
 src/spark/get_data.py             | 7 +++----
 src/util.py                       | 1 -
 3 files changed, 7 insertions(+), 9 deletions(-)

diff --git a/src/config_files/config_lbds.yaml b/src/config_files/config_lbds.yaml
index 1bfcb53d..83f054d7 100644
--- a/src/config_files/config_lbds.yaml
+++ b/src/config_files/config_lbds.yaml
@@ -1,7 +1,7 @@
 load_data:
   type: JSON
 train_data_file_name: 'features_lbds_2017_b1-2.csv'
-beam: 2
+beam: 1
 preprocess:
   task: LBDS
   use_filters: True
@@ -205,10 +205,10 @@ grid_search:
       feature_selection: 'all'
       scoring: 'auc'
       params: {
-        n_estimators: [50, 100, 200, 300,],
-        max_samples: [32, 64, 128, 256],
+        n_estimators: [50, 100, 200, 300, 400],
+        max_samples: [64, 128, 256, 512],
         contamination: [0.005, 0.01, 0.5],
-        max_features: [0.4, .6, .8, 1.]
+        max_features: [.6, .8, 1.]
       }
 
     lof:
diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index 934cb294..cb2e4785 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -132,15 +132,14 @@ class DataProcessor(ABC):
         date_range_udf = f.udf(lambda x, y: date_range(x, y, step), ArrayType(LongType()))
         # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp)
         max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0]
-        # Get week to of year to enable sliding window partitions
-        self.data = self.data.withColumn('week_of_year', f.weekofyear(f.from_unixtime(f.col(_dfindex))))
         # get next timestamp using sliding window
-        window = Window.partitionBy('week_of_year').orderBy(_dfindex)
+        # cannot speedup by partitioning because unique timestamps in each subframe will get the default value of
+        # max_time for column next_ts which is incorrect
+        window = Window.orderBy(_dfindex)
         self.data = self.data.withColumn('next_ts', f.lead(f.col(_dfindex), default=max_time)
                                          .over(window))
         # resample data to 'step'
         self.data = self.data.withColumn(_dfindex, f.explode(date_range_udf(f.col(_dfindex), f.col('next_ts'))))
-        self.data = self.data.drop('week_of_year')
         self.data = self.data.drop('next_ts')
         # join dataframes on timestamp index (rounded to nearest second)
         self.data = self.data.dropDuplicates([_dfindex])
diff --git a/src/util.py b/src/util.py
index cda8f2cf..97e672ac 100644
--- a/src/util.py
+++ b/src/util.py
@@ -242,7 +242,6 @@ def get_lbds_data(filter_cols, start_time, end_time, cfg, use_case):
         try:
             fn += cfg['train_data_file_name']
             beam = cfg['beam']
-            print("Loading feature vectors for beam {}".format(beam))
         except KeyError as e:
             print("Config file is missing configuration variable {}".format(e))
             exit()
-- 
GitLab


From b6416f9ab858278f834bcd559ba68d48edb9302f Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Wed, 20 May 2020 13:53:09 +0200
Subject: [PATCH 16/25] added partitioning by week

---
 src/spark/get_data.py | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index cb2e4785..934cb294 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -132,14 +132,15 @@ class DataProcessor(ABC):
         date_range_udf = f.udf(lambda x, y: date_range(x, y, step), ArrayType(LongType()))
         # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp)
         max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0]
+        # Get week to of year to enable sliding window partitions
+        self.data = self.data.withColumn('week_of_year', f.weekofyear(f.from_unixtime(f.col(_dfindex))))
         # get next timestamp using sliding window
-        # cannot speedup by partitioning because unique timestamps in each subframe will get the default value of
-        # max_time for column next_ts which is incorrect
-        window = Window.orderBy(_dfindex)
+        window = Window.partitionBy('week_of_year').orderBy(_dfindex)
         self.data = self.data.withColumn('next_ts', f.lead(f.col(_dfindex), default=max_time)
                                          .over(window))
         # resample data to 'step'
         self.data = self.data.withColumn(_dfindex, f.explode(date_range_udf(f.col(_dfindex), f.col('next_ts'))))
+        self.data = self.data.drop('week_of_year')
         self.data = self.data.drop('next_ts')
         # join dataframes on timestamp index (rounded to nearest second)
         self.data = self.data.dropDuplicates([_dfindex])
-- 
GitLab


From 39b2f9fad8543c1ce18577db428ab079d08bc2bc Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Wed, 20 May 2020 18:44:46 +0200
Subject: [PATCH 17/25] decreasing window size

---
 src/spark/get_data.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index 934cb294..02e46e1a 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -133,9 +133,9 @@ class DataProcessor(ABC):
         # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp)
         max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0]
         # Get week to of year to enable sliding window partitions
-        self.data = self.data.withColumn('week_of_year', f.weekofyear(f.from_unixtime(f.col(_dfindex))))
+        self.data = self.data.withColumn('day_of_year', f.dayofyear(f.from_unixtime(f.col(_dfindex))))
         # get next timestamp using sliding window
-        window = Window.partitionBy('week_of_year').orderBy(_dfindex)
+        window = Window.partitionBy('day_of_year').orderBy(_dfindex)
         self.data = self.data.withColumn('next_ts', f.lead(f.col(_dfindex), default=max_time)
                                          .over(window))
         # resample data to 'step'
-- 
GitLab


From 4015568d65f5db0844cc98f95e48f1b625008866 Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Mon, 25 May 2020 14:42:56 +0200
Subject: [PATCH 18/25] added resampling directly to ipoc instead of seconds

---
 src/spark/get_data.py | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)

diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index 02e46e1a..41346621 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -129,13 +129,11 @@ class DataProcessor(ABC):
             timestamps: timestamps which will be kept in DataFrame
         """
         # udf to generate upsampled timestamps
-        date_range_udf = f.udf(lambda x, y: date_range(x, y, step), ArrayType(LongType()))
+        date_range_udf = udf(lambda x, y: date_range(x, y, timestamps, step), ArrayType(LongType()))
         # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp)
         max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0]
-        # Get week to of year to enable sliding window partitions
-        self.data = self.data.withColumn('day_of_year', f.dayofyear(f.from_unixtime(f.col(_dfindex))))
         # get next timestamp using sliding window
-        window = Window.partitionBy('day_of_year').orderBy(_dfindex)
+        window = Window.orderBy(_dfindex)
         self.data = self.data.withColumn('next_ts', f.lead(f.col(_dfindex), default=max_time)
                                          .over(window))
         # resample data to 'step'
@@ -144,6 +142,7 @@ class DataProcessor(ABC):
         self.data = self.data.drop('next_ts')
         # join dataframes on timestamp index (rounded to nearest second)
         self.data = self.data.dropDuplicates([_dfindex])
+        # Remove old timestamps if they did not align with ipoc timestamps
         self.data = self.data.where(f.col(_dfindex).isin(timestamps))
 
     @staticmethod
@@ -355,7 +354,7 @@ class BetsProcessor(DataProcessor):
         self.combine_sensor_data()
 
 
-def date_range(t1, t2, step=1):
+def date_range(t1, t2, timestamps, step=1,):
     """
     Create range of timestamps with step_size step between t1 and t2
     :param t1: (DateType) lower bound of time-interval
@@ -364,7 +363,10 @@ def date_range(t1, t2, step=1):
     """
     t1 = int(t1)
     t2 = int(t2)
-    return [t1 + step * x for x in range(int((t2 - t1) / step) + 1)]
+    # resample to seconds
+    seconds =  [t1 + step * x for x in range(int((t2 - t1) / step) + 1)]
+    # only keep ipoc timestamps
+    return [t for t in seconds if t in timestamps]
 
 
 def cmw_query(spark, startt, endt, devicel, propertyl):
-- 
GitLab


From d151009486f9f8373a0dd9b12ca2d573afabc82e Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Mon, 25 May 2020 17:25:36 +0200
Subject: [PATCH 19/25] changed helper function to inner function for easier
 overview

---
 src/spark/get_data.py | 23 ++++++++---------------
 1 file changed, 8 insertions(+), 15 deletions(-)

diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index 41346621..c6b74e4e 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -128,6 +128,14 @@ class DataProcessor(ABC):
         Args:
             timestamps: timestamps which will be kept in DataFrame
         """
+        def date_range(t1, t2, ts, step=1, ):
+            t1 = int(t1)
+            t2 = int(t2)
+            # resample to seconds
+            seconds = [t1 + step * x for x in range(int((t2 - t1) / step) + 1)]
+            # only keep ipoc timestamps
+            return [t for t in seconds if t in ts]
+
         # udf to generate upsampled timestamps
         date_range_udf = udf(lambda x, y: date_range(x, y, timestamps, step), ArrayType(LongType()))
         # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp)
@@ -354,21 +362,6 @@ class BetsProcessor(DataProcessor):
         self.combine_sensor_data()
 
 
-def date_range(t1, t2, timestamps, step=1,):
-    """
-    Create range of timestamps with step_size step between t1 and t2
-    :param t1: (DateType) lower bound of time-interval
-    :param t2: (DateType) upper bound of time-interval
-    :param step: (int) step size in seconds
-    """
-    t1 = int(t1)
-    t2 = int(t2)
-    # resample to seconds
-    seconds =  [t1 + step * x for x in range(int((t2 - t1) / step) + 1)]
-    # only keep ipoc timestamps
-    return [t for t in seconds if t in timestamps]
-
-
 def cmw_query(spark, startt, endt, devicel, propertyl):
     df = DevicePropertyQuery \
         .builder(spark) \
-- 
GitLab


From b518102b1cff2a614dbd1c3041f436ad940d1e15 Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Tue, 26 May 2020 13:30:09 +0200
Subject: [PATCH 20/25] skip all other preprocessing if resample timestamps are
 empty

---
 src/spark/get_data.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index c6b74e4e..c5f0e312 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -151,6 +151,7 @@ class DataProcessor(ABC):
         # join dataframes on timestamp index (rounded to nearest second)
         self.data = self.data.dropDuplicates([_dfindex])
         # Remove old timestamps if they did not align with ipoc timestamps
+
         self.data = self.data.where(f.col(_dfindex).isin(timestamps))
 
     @staticmethod
@@ -543,7 +544,8 @@ if __name__ == "__main__":
 
         print("Getting unique timestamps from {} device".format(tsq[0]))
         timestamps = get_segment_timestamps(time_interval, tsq, spark)
-
+        if not len(timestamps) :
+            continue
         start_time, end_time = str(time_interval[0]), str(time_interval[1])
         print("Reading data from {} to {}".format(start_time, end_time))
 
-- 
GitLab


From 41d6c4c064cdc60b7c6a9219dde97a8404ebf5ca Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Tue, 26 May 2020 13:32:31 +0200
Subject: [PATCH 21/25] added print to warn user of choice

---
 src/spark/get_data.py | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index c5f0e312..5611b4e2 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -544,9 +544,14 @@ if __name__ == "__main__":
 
         print("Getting unique timestamps from {} device".format(tsq[0]))
         timestamps = get_segment_timestamps(time_interval, tsq, spark)
-        if not len(timestamps) :
-            continue
+
         start_time, end_time = str(time_interval[0]), str(time_interval[1])
+
+        # If no timestamps to resample to, then no preprocessing can be done
+        if not len(timestamps):
+            print("No timestamps to resample to, skipping data retrieval for interval {} - {}".format(start_time, end_time))
+            continue
+
         print("Reading data from {} to {}".format(start_time, end_time))
 
         features = None
-- 
GitLab


From 21d676430a1122300e0379239734896c23fe102c Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Tue, 26 May 2020 15:07:15 +0200
Subject: [PATCH 22/25] removed typo

---
 src/spark/get_data.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index 5611b4e2..835e4f09 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -128,7 +128,7 @@ class DataProcessor(ABC):
         Args:
             timestamps: timestamps which will be kept in DataFrame
         """
-        def date_range(t1, t2, ts, step=1, ):
+        def date_range(t1, t2, ts, step=1):
             t1 = int(t1)
             t2 = int(t2)
             # resample to seconds
-- 
GitLab


From 86467e1b2a0aaff2239c52bd8d20cedf9e523410 Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Tue, 26 May 2020 15:10:07 +0200
Subject: [PATCH 23/25] added debug line for udf

---
 src/spark/get_data.py | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index 835e4f09..7e5b5a53 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -3,6 +3,7 @@ import argparse
 from abc import ABC
 import getpass
 from datetime import datetime
+import sys
 
 import yaml
 import numpy as np
@@ -128,13 +129,16 @@ class DataProcessor(ABC):
         Args:
             timestamps: timestamps which will be kept in DataFrame
         """
+
         def date_range(t1, t2, ts, step=1):
             t1 = int(t1)
             t2 = int(t2)
             # resample to seconds
             seconds = [t1 + step * x for x in range(int((t2 - t1) / step) + 1)]
+            time = [t for t in seconds if t in ts]
+            print(seconds, file=sys.stderr)
             # only keep ipoc timestamps
-            return [t for t in seconds if t in ts]
+            return time
 
         # udf to generate upsampled timestamps
         date_range_udf = udf(lambda x, y: date_range(x, y, timestamps, step), ArrayType(LongType()))
@@ -549,7 +553,8 @@ if __name__ == "__main__":
 
         # If no timestamps to resample to, then no preprocessing can be done
         if not len(timestamps):
-            print("No timestamps to resample to, skipping data retrieval for interval {} - {}".format(start_time, end_time))
+            print("No timestamps to resample to, skipping data retrieval for interval {} - {}".format(start_time,
+                                                                                                      end_time))
             continue
 
         print("Reading data from {} to {}".format(start_time, end_time))
-- 
GitLab


From 0112d2ab69bf56836644f821fa4e80a94204c1e7 Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Tue, 26 May 2020 15:55:23 +0200
Subject: [PATCH 24/25] removed bug with mismatching datatypes

---
 src/spark/get_data.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index 7e5b5a53..5798eb64 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -136,10 +136,11 @@ class DataProcessor(ABC):
             # resample to seconds
             seconds = [t1 + step * x for x in range(int((t2 - t1) / step) + 1)]
             time = [t for t in seconds if t in ts]
-            print(seconds, file=sys.stderr)
             # only keep ipoc timestamps
             return time
-
+        
+        # map timestamps to int because otherwise data_range filters all entries becuase int is not in list of strings
+        timestamps = [int(t) for t in timestamps]
         # udf to generate upsampled timestamps
         date_range_udf = udf(lambda x, y: date_range(x, y, timestamps, step), ArrayType(LongType()))
         # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp)
@@ -155,7 +156,6 @@ class DataProcessor(ABC):
         # join dataframes on timestamp index (rounded to nearest second)
         self.data = self.data.dropDuplicates([_dfindex])
         # Remove old timestamps if they did not align with ipoc timestamps
-
         self.data = self.data.where(f.col(_dfindex).isin(timestamps))
 
     @staticmethod
-- 
GitLab


From 8dba5b5998185cde7650546bedaf056e31f3169c Mon Sep 17 00:00:00 2001
From: Kyle Bringmans <bringmans.kyle@gmail.com>
Date: Tue, 26 May 2020 15:58:56 +0200
Subject: [PATCH 25/25] added clearer reason for line of code

---
 src/spark/get_data.py | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/src/spark/get_data.py b/src/spark/get_data.py
index 5798eb64..a7473486 100644
--- a/src/spark/get_data.py
+++ b/src/spark/get_data.py
@@ -138,8 +138,9 @@ class DataProcessor(ABC):
             time = [t for t in seconds if t in ts]
             # only keep ipoc timestamps
             return time
-        
-        # map timestamps to int because otherwise data_range filters all entries becuase int is not in list of strings
+
+        # map timestamps to int because otherwise data_range() resamples to []
+        # because no timestamp from 'seconds' of type int is in a list of strings
         timestamps = [int(t) for t in timestamps]
         # udf to generate upsampled timestamps
         date_range_udf = udf(lambda x, y: date_range(x, y, timestamps, step), ArrayType(LongType()))
-- 
GitLab