From c2887568cf8e008bd17217ec9e88ff91c01887d5 Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Tue, 19 May 2020 14:11:28 +0200 Subject: [PATCH 01/25] first try for ipoc resampling (to be tested) --- src/spark/get_data.py | 46 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/src/spark/get_data.py b/src/spark/get_data.py index 75da720c..779ff369 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -13,7 +13,7 @@ from pyspark import SparkContext from pyspark.conf import SparkConf from pyspark.sql import Window, SparkSession from pyspark.sql.functions import udf, collect_list, first, last, asc, count, when, isnull -from pyspark.sql.types import DateType +from pyspark.sql.types import DateType, LongType, ArrayType from cern.nxcals.pyquery.builders import DevicePropertyQuery from scipy import signal @@ -122,13 +122,37 @@ class DataProcessor(ABC): start_df = start_df.drop('t') return start_df - def resample(self, timestamps): + @staticmethod + def date_range(t1, t2, step=1): + """ + Create range of timestamps with step_size step between t1 and t2 + :param t1: (DateType) lower bound of time-interval + :param t2: (DateType) upper bound of time-interval + :param step: (int) step size in seconds + """ + nr_of_ns_in_s = 1000000000 + step = step * nr_of_ns_in_s + return [t1 + step * x for x in range(int((t2 - t1) / step) + 1)] + + def resample(self, timestamps, step=1): """ Resample data to certain timestamps Args: timestamps: timestamps which will be kept in DataFrame """ - self.data = self.data.where(f.col(_dfindex).isin(timestamps)) + # udf to generate upsampled timestamps + date_range_udf = f.udf(lambda x, y: self.date_range(x, y, step), ArrayType(LongType())) + # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp) + max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0] + # get next timestamp using sliding window + self.data = self.data.withColumn('next_ts', f.lead(f.col('acqStamp'), default=max_time) + .over(Window.orderBy("acqStamp"))) + # resample data to 'step' + self.data = self.data.withColumn("resampled_time", f.explode(date_range_udf(f.col(_dfindex), f.col('next_ts')))) + self.data = self.data.drop('next_ts') + # join dataframes on timestamp index (rounded to nearest second) + div_udf = udf(lambda x: x // 1000000000) + self.data = self.data.where(div_udf(f.col(_dfindex)).isin(timestamps)) @staticmethod def handle_array(df, col_name): @@ -382,7 +406,9 @@ def write_to_hdfs(df, path, dformat='csv', repartition=True): repartition: repartition data to 1 large file or not """ f_path = "/user/" + getpass.getuser() + "/ml-data/" + path - print("{} - Writing out a total of {} columns/features and {} entries to {}".format(datetime.now(), len(df.columns)-1, df.count(), f_path)) + print("{} - Writing out a total of {} columns/features and {} entries to {}".format(datetime.now(), + len(df.columns) - 1, df.count(), + f_path)) if repartition: df = df.repartition(1) print("{} - Repartition finished, starting write..".format(datetime.now())) @@ -486,7 +512,8 @@ if __name__ == "__main__": spark = SparkSession(sc) parser = argparse.ArgumentParser(description="Pyspark script to fetch data from NXCALS; start with spark-submit") - parser.add_argument('-c', dest='configfn', default='config.yaml', help='config file for NXCALS queries and preprocessing') + parser.add_argument('-c', dest='configfn', default='config.yaml', + help='config file for NXCALS queries and preprocessing') args = parser.parse_args() # absolute path to the config folder @@ -513,7 +540,7 @@ if __name__ == "__main__": n_timestamps = len(time_intervals) for i in range(0, n_timestamps, 2): print("Getting data for time interval {}".format(i)) - time_interval = time_intervals[i], time_intervals[i+1] + time_interval = time_intervals[i], time_intervals[i + 1] print("Getting unique timestamps from {} device".format(tsq[0])) timestamps = get_segment_timestamps(time_interval, tsq, spark) @@ -532,13 +559,14 @@ if __name__ == "__main__": if not dfp: print("Warning - no Preprocessor could be created for {}".format(subs)) else: - print("{} - Updating {} column names and adding these to the common features dataset".format(datetime.now(), subs)) + print("{} - Updating {} column names and adding these to the common features dataset".format( + datetime.now(), subs)) # add prefix cause unique column names required by hdfs; # backticks needed cause of spaces in column names, replacing these for the new alias # _dfindex has to be kept cause it's used for joining dfp = dfp.select([f.col("`" + c + "`").alias(subs + "_" + c.replace(" ", "")) for c in dfp.columns]) \ .withColumnRenamed(subs + "_" + _dfindex, _dfindex) features = update_features(features, dfp) - suffix = '_part_' + str(i//2) if n_timestamps > 2 else '' + suffix = '_part_' + str(i // 2) if n_timestamps > 2 else '' print("Preprocessing done") - write_to_hdfs(features, 'features'+suffix, dformat) + write_to_hdfs(features, 'features' + suffix, dformat) -- GitLab From 92ec3e56f3e2b3190404a39471c4c4d2e78c1dda Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Tue, 19 May 2020 14:48:57 +0200 Subject: [PATCH 02/25] fixed small column name bug and used _dfindex instead of acqStamp --- src/spark/get_data.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/spark/get_data.py b/src/spark/get_data.py index 779ff369..f8beb351 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -145,10 +145,10 @@ class DataProcessor(ABC): # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp) max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0] # get next timestamp using sliding window - self.data = self.data.withColumn('next_ts', f.lead(f.col('acqStamp'), default=max_time) - .over(Window.orderBy("acqStamp"))) + self.data = self.data.withColumn('next_ts', f.lead(f.col(_dfindex), default=max_time) + .over(Window.orderBy(_dfindex))) # resample data to 'step' - self.data = self.data.withColumn("resampled_time", f.explode(date_range_udf(f.col(_dfindex), f.col('next_ts')))) + self.data = self.data.withColumn(_dfindex, f.explode(date_range_udf(f.col(_dfindex), f.col('next_ts')))) self.data = self.data.drop('next_ts') # join dataframes on timestamp index (rounded to nearest second) div_udf = udf(lambda x: x // 1000000000) -- GitLab From 09049fad1fde75e369dbe92292ec925abe290169 Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Tue, 19 May 2020 15:00:52 +0200 Subject: [PATCH 03/25] trying to fix bug caused by unserializable datatype --- src/spark/get_data.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/spark/get_data.py b/src/spark/get_data.py index f8beb351..b457095d 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -122,18 +122,6 @@ class DataProcessor(ABC): start_df = start_df.drop('t') return start_df - @staticmethod - def date_range(t1, t2, step=1): - """ - Create range of timestamps with step_size step between t1 and t2 - :param t1: (DateType) lower bound of time-interval - :param t2: (DateType) upper bound of time-interval - :param step: (int) step size in seconds - """ - nr_of_ns_in_s = 1000000000 - step = step * nr_of_ns_in_s - return [t1 + step * x for x in range(int((t2 - t1) / step) + 1)] - def resample(self, timestamps, step=1): """ Resample data to certain timestamps @@ -141,7 +129,7 @@ class DataProcessor(ABC): timestamps: timestamps which will be kept in DataFrame """ # udf to generate upsampled timestamps - date_range_udf = f.udf(lambda x, y: self.date_range(x, y, step), ArrayType(LongType())) + date_range_udf = f.udf(lambda x, y: date_range(x, y, step), ArrayType(LongType())) # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp) max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0] # get next timestamp using sliding window @@ -363,6 +351,18 @@ class BetsProcessor(DataProcessor): self.combine_sensor_data() +def date_range(t1, t2, step=1): + """ + Create range of timestamps with step_size step between t1 and t2 + :param t1: (DateType) lower bound of time-interval + :param t2: (DateType) upper bound of time-interval + :param step: (int) step size in seconds + """ + nr_of_ns_in_s = 1000000000 + step = step * nr_of_ns_in_s + return [t1 + step * x for x in range(int((t2 - t1) / step) + 1)] + + def cmw_query(spark, startt, endt, devicel, propertyl): df = DevicePropertyQuery \ .builder(spark) \ -- GitLab From 3dd34a0225d73f5ee2663cee5160e01b12f0ec7d Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Tue, 19 May 2020 15:12:06 +0200 Subject: [PATCH 04/25] pickling error fixed, trying to fix error in count() in print tstatement after resample --- src/spark/get_data.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/spark/get_data.py b/src/spark/get_data.py index b457095d..7a564ca4 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -133,8 +133,9 @@ class DataProcessor(ABC): # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp) max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0] # get next timestamp using sliding window + window = Window.partitionBy(_dfdev).orderBy(_dfindex) self.data = self.data.withColumn('next_ts', f.lead(f.col(_dfindex), default=max_time) - .over(Window.orderBy(_dfindex))) + .over(window)) # resample data to 'step' self.data = self.data.withColumn(_dfindex, f.explode(date_range_udf(f.col(_dfindex), f.col('next_ts')))) self.data = self.data.drop('next_ts') @@ -463,6 +464,7 @@ def preprocess(df, subs, filter_list, time_interval, timestamps, store_data=Fals if timestamps: # not a real resample, more a filter, data has to be filled first (happens in preprocess above) proc.resample(timestamps) + print(proc.data.schema) print('{} - Resampled {}, {} entries remain'.format(datetime.now(), subs, proc.data.count())) if store_data: write_to_hdfs(proc.data, subs) -- GitLab From 692dc1c661cec4b185a72b11230d3c92f1f8744e Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Tue, 19 May 2020 15:16:41 +0200 Subject: [PATCH 05/25] used wrong index in window, fixed --- src/spark/get_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/spark/get_data.py b/src/spark/get_data.py index 7a564ca4..feb2084e 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -133,7 +133,7 @@ class DataProcessor(ABC): # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp) max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0] # get next timestamp using sliding window - window = Window.partitionBy(_dfdev).orderBy(_dfindex) + window = Window.partitionBy(_dfindex).orderBy(_dfindex) self.data = self.data.withColumn('next_ts', f.lead(f.col(_dfindex), default=max_time) .over(window)) # resample data to 'step' -- GitLab From 810c920fb59b11f70dd799f8cffc9632e6c370e4 Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Tue, 19 May 2020 15:32:52 +0200 Subject: [PATCH 06/25] write dataframe to hdfs for debugging --- src/spark/get_data.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/spark/get_data.py b/src/spark/get_data.py index feb2084e..42b5ceec 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -463,8 +463,7 @@ def preprocess(df, subs, filter_list, time_interval, timestamps, store_data=Fals print('Preprocessing {} finished, start resampling..'.format(subs)) if timestamps: # not a real resample, more a filter, data has to be filled first (happens in preprocess above) - proc.resample(timestamps) - print(proc.data.schema) + write_to_hdfs(proc.data,'debug','json') print('{} - Resampled {}, {} entries remain'.format(datetime.now(), subs, proc.data.count())) if store_data: write_to_hdfs(proc.data, subs) -- GitLab From d0c8c965a0a24d4958dac9274fc01b38d622a4dc Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Tue, 19 May 2020 15:44:07 +0200 Subject: [PATCH 07/25] added debug prints --- src/spark/get_data.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/spark/get_data.py b/src/spark/get_data.py index 42b5ceec..209a7cdd 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -142,6 +142,8 @@ class DataProcessor(ABC): # join dataframes on timestamp index (rounded to nearest second) div_udf = udf(lambda x: x // 1000000000) self.data = self.data.where(div_udf(f.col(_dfindex)).isin(timestamps)) + print('nr of rows', self.data.count()) + print('timestamps size', len(timestamps)) @staticmethod def handle_array(df, col_name): -- GitLab From 1f7b3523558f555f78a6aa66732b7f2c8549f88f Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Tue, 19 May 2020 16:33:37 +0200 Subject: [PATCH 08/25] trying to cast possible strings back to integers --- src/spark/get_data.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/spark/get_data.py b/src/spark/get_data.py index 209a7cdd..6b8d7a40 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -142,8 +142,6 @@ class DataProcessor(ABC): # join dataframes on timestamp index (rounded to nearest second) div_udf = udf(lambda x: x // 1000000000) self.data = self.data.where(div_udf(f.col(_dfindex)).isin(timestamps)) - print('nr of rows', self.data.count()) - print('timestamps size', len(timestamps)) @staticmethod def handle_array(df, col_name): @@ -361,6 +359,8 @@ def date_range(t1, t2, step=1): :param t2: (DateType) upper bound of time-interval :param step: (int) step size in seconds """ + t1 = int(t1) + t2 = int(t2) nr_of_ns_in_s = 1000000000 step = step * nr_of_ns_in_s return [t1 + step * x for x in range(int((t2 - t1) / step) + 1)] @@ -465,7 +465,7 @@ def preprocess(df, subs, filter_list, time_interval, timestamps, store_data=Fals print('Preprocessing {} finished, start resampling..'.format(subs)) if timestamps: # not a real resample, more a filter, data has to be filled first (happens in preprocess above) - write_to_hdfs(proc.data,'debug','json') + proc.resample(timestamps) print('{} - Resampled {}, {} entries remain'.format(datetime.now(), subs, proc.data.count())) if store_data: write_to_hdfs(proc.data, subs) -- GitLab From 054bab170a6058fb874b28da7e51efd03f75541f Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Tue, 19 May 2020 16:53:43 +0200 Subject: [PATCH 09/25] string error is fixed, now checking why all columns are filtered when resmapling after upsample --- src/spark/get_data.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/spark/get_data.py b/src/spark/get_data.py index 6b8d7a40..8ce87b30 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -141,6 +141,7 @@ class DataProcessor(ABC): self.data = self.data.drop('next_ts') # join dataframes on timestamp index (rounded to nearest second) div_udf = udf(lambda x: x // 1000000000) + print(timestamps[0:10]) self.data = self.data.where(div_udf(f.col(_dfindex)).isin(timestamps)) @staticmethod -- GitLab From a9c151599dfa9028b0fcce28c0e885874792e192 Mon Sep 17 00:00:00 2001 From: Kyle <kyle.bringmans@cern.ch> Date: Tue, 19 May 2020 19:25:19 +0200 Subject: [PATCH 10/25] resampling seems to work --- src/config_files/config_mki.yaml | 4 ++-- src/spark/get_data.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/config_files/config_mki.yaml b/src/config_files/config_mki.yaml index b7b32cfc..7572176f 100644 --- a/src/config_files/config_mki.yaml +++ b/src/config_files/config_mki.yaml @@ -3,8 +3,8 @@ load_data: preprocess: task: MKI time_intervals: - - '2017-01-01 00:00:00.000' - - '2017-12-31 00:00:00.000' + - '2017-05-01 00:00:00.000' + - '2017-05-02 00:00:00.000' features: # json here cause of several arrays, csv takes less space but doesn't allow these dataformat: json diff --git a/src/spark/get_data.py b/src/spark/get_data.py index 8ce87b30..e76b95d4 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -133,16 +133,16 @@ class DataProcessor(ABC): # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp) max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0] # get next timestamp using sliding window - window = Window.partitionBy(_dfindex).orderBy(_dfindex) + window = Window.orderBy(_dfindex) self.data = self.data.withColumn('next_ts', f.lead(f.col(_dfindex), default=max_time) .over(window)) # resample data to 'step' + print(self.data.take(2)) self.data = self.data.withColumn(_dfindex, f.explode(date_range_udf(f.col(_dfindex), f.col('next_ts')))) self.data = self.data.drop('next_ts') # join dataframes on timestamp index (rounded to nearest second) div_udf = udf(lambda x: x // 1000000000) - print(timestamps[0:10]) - self.data = self.data.where(div_udf(f.col(_dfindex)).isin(timestamps)) + self.data = self.data.where(f.col(_dfindex).isin(timestamps)) @staticmethod def handle_array(df, col_name): @@ -362,7 +362,7 @@ def date_range(t1, t2, step=1): """ t1 = int(t1) t2 = int(t2) - nr_of_ns_in_s = 1000000000 + nr_of_ns_in_s = 1 step = step * nr_of_ns_in_s return [t1 + step * x for x in range(int((t2 - t1) / step) + 1)] -- GitLab From d33d2ad26d7c4d477f355490612c1097ac394dc0 Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Tue, 19 May 2020 19:34:36 +0200 Subject: [PATCH 11/25] removed unnecessary prints and documented design choices in the code when necessary --- src/spark/get_data.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/spark/get_data.py b/src/spark/get_data.py index 8ce87b30..67806f3b 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -133,16 +133,17 @@ class DataProcessor(ABC): # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp) max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0] # get next timestamp using sliding window - window = Window.partitionBy(_dfindex).orderBy(_dfindex) + # cannot speedup by partitioning because unique timestamps in each subframe will get the default value of + # max_time for column next_ts which is incorrect + window = Window.orderBy(_dfindex) self.data = self.data.withColumn('next_ts', f.lead(f.col(_dfindex), default=max_time) .over(window)) # resample data to 'step' self.data = self.data.withColumn(_dfindex, f.explode(date_range_udf(f.col(_dfindex), f.col('next_ts')))) self.data = self.data.drop('next_ts') + # join dataframes on timestamp index (rounded to nearest second) - div_udf = udf(lambda x: x // 1000000000) - print(timestamps[0:10]) - self.data = self.data.where(div_udf(f.col(_dfindex)).isin(timestamps)) + self.data = self.data.where(f.col(_dfindex).isin(timestamps)) @staticmethod def handle_array(df, col_name): @@ -353,6 +354,7 @@ class BetsProcessor(DataProcessor): self.combine_sensor_data() +# Not made static in DataProcessor because it breaks pickling in Spark def date_range(t1, t2, step=1): """ Create range of timestamps with step_size step between t1 and t2 @@ -360,10 +362,9 @@ def date_range(t1, t2, step=1): :param t2: (DateType) upper bound of time-interval :param step: (int) step size in seconds """ + # Ensure timestamps are integers t1 = int(t1) t2 = int(t2) - nr_of_ns_in_s = 1000000000 - step = step * nr_of_ns_in_s return [t1 + step * x for x in range(int((t2 - t1) / step) + 1)] -- GitLab From 02becf83fd3900c89994e80bf9c7cc0974b6c08d Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <kyle.bringmans@cern.ch> Date: Tue, 19 May 2020 20:44:16 +0000 Subject: [PATCH 12/25] fixed problem with merge --- src/spark/get_data.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/spark/get_data.py b/src/spark/get_data.py index 42f1af9b..bf4e546a 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -362,8 +362,6 @@ def date_range(t1, t2, step=1): """ t1 = int(t1) t2 = int(t2) - nr_of_ns_in_s = 1000000000 - step = step * nr_of_ns_in_s return [t1 + step * x for x in range(int((t2 - t1) / step) + 1)] -- GitLab From 3bca476f2b98f4b5f478a50705c67f3d9acce2f0 Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Wed, 20 May 2020 10:05:55 +0200 Subject: [PATCH 13/25] trying to fix increase in timestamps after resample --- src/spark/get_data.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/spark/get_data.py b/src/spark/get_data.py index bf4e546a..cb2e4785 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -142,6 +142,7 @@ class DataProcessor(ABC): self.data = self.data.withColumn(_dfindex, f.explode(date_range_udf(f.col(_dfindex), f.col('next_ts')))) self.data = self.data.drop('next_ts') # join dataframes on timestamp index (rounded to nearest second) + self.data = self.data.dropDuplicates([_dfindex]) self.data = self.data.where(f.col(_dfindex).isin(timestamps)) @staticmethod -- GitLab From 83b15046765c768e6e5d6b37f199f4f8691bd01e Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Wed, 20 May 2020 13:49:22 +0200 Subject: [PATCH 14/25] added week partitioning --- src/config_files/config_lbds.yaml | 8 ++++---- src/spark/get_data.py | 7 ++++--- src/util.py | 1 + 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/config_files/config_lbds.yaml b/src/config_files/config_lbds.yaml index 83f054d7..1bfcb53d 100644 --- a/src/config_files/config_lbds.yaml +++ b/src/config_files/config_lbds.yaml @@ -1,7 +1,7 @@ load_data: type: JSON train_data_file_name: 'features_lbds_2017_b1-2.csv' -beam: 1 +beam: 2 preprocess: task: LBDS use_filters: True @@ -205,10 +205,10 @@ grid_search: feature_selection: 'all' scoring: 'auc' params: { - n_estimators: [50, 100, 200, 300, 400], - max_samples: [64, 128, 256, 512], + n_estimators: [50, 100, 200, 300,], + max_samples: [32, 64, 128, 256], contamination: [0.005, 0.01, 0.5], - max_features: [.6, .8, 1.] + max_features: [0.4, .6, .8, 1.] } lof: diff --git a/src/spark/get_data.py b/src/spark/get_data.py index cb2e4785..934cb294 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -132,14 +132,15 @@ class DataProcessor(ABC): date_range_udf = f.udf(lambda x, y: date_range(x, y, step), ArrayType(LongType())) # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp) max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0] + # Get week to of year to enable sliding window partitions + self.data = self.data.withColumn('week_of_year', f.weekofyear(f.from_unixtime(f.col(_dfindex)))) # get next timestamp using sliding window - # cannot speedup by partitioning because unique timestamps in each subframe will get the default value of - # max_time for column next_ts which is incorrect - window = Window.orderBy(_dfindex) + window = Window.partitionBy('week_of_year').orderBy(_dfindex) self.data = self.data.withColumn('next_ts', f.lead(f.col(_dfindex), default=max_time) .over(window)) # resample data to 'step' self.data = self.data.withColumn(_dfindex, f.explode(date_range_udf(f.col(_dfindex), f.col('next_ts')))) + self.data = self.data.drop('week_of_year') self.data = self.data.drop('next_ts') # join dataframes on timestamp index (rounded to nearest second) self.data = self.data.dropDuplicates([_dfindex]) diff --git a/src/util.py b/src/util.py index 97e672ac..cda8f2cf 100644 --- a/src/util.py +++ b/src/util.py @@ -242,6 +242,7 @@ def get_lbds_data(filter_cols, start_time, end_time, cfg, use_case): try: fn += cfg['train_data_file_name'] beam = cfg['beam'] + print("Loading feature vectors for beam {}".format(beam)) except KeyError as e: print("Config file is missing configuration variable {}".format(e)) exit() -- GitLab From a58374e949c1233ff5e4b2664bc8f5c96dccd2ff Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Wed, 20 May 2020 13:52:23 +0200 Subject: [PATCH 15/25] Revert "added week partitioning" This reverts commit 83b15046765c768e6e5d6b37f199f4f8691bd01e. --- src/config_files/config_lbds.yaml | 8 ++++---- src/spark/get_data.py | 7 +++---- src/util.py | 1 - 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/config_files/config_lbds.yaml b/src/config_files/config_lbds.yaml index 1bfcb53d..83f054d7 100644 --- a/src/config_files/config_lbds.yaml +++ b/src/config_files/config_lbds.yaml @@ -1,7 +1,7 @@ load_data: type: JSON train_data_file_name: 'features_lbds_2017_b1-2.csv' -beam: 2 +beam: 1 preprocess: task: LBDS use_filters: True @@ -205,10 +205,10 @@ grid_search: feature_selection: 'all' scoring: 'auc' params: { - n_estimators: [50, 100, 200, 300,], - max_samples: [32, 64, 128, 256], + n_estimators: [50, 100, 200, 300, 400], + max_samples: [64, 128, 256, 512], contamination: [0.005, 0.01, 0.5], - max_features: [0.4, .6, .8, 1.] + max_features: [.6, .8, 1.] } lof: diff --git a/src/spark/get_data.py b/src/spark/get_data.py index 934cb294..cb2e4785 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -132,15 +132,14 @@ class DataProcessor(ABC): date_range_udf = f.udf(lambda x, y: date_range(x, y, step), ArrayType(LongType())) # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp) max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0] - # Get week to of year to enable sliding window partitions - self.data = self.data.withColumn('week_of_year', f.weekofyear(f.from_unixtime(f.col(_dfindex)))) # get next timestamp using sliding window - window = Window.partitionBy('week_of_year').orderBy(_dfindex) + # cannot speedup by partitioning because unique timestamps in each subframe will get the default value of + # max_time for column next_ts which is incorrect + window = Window.orderBy(_dfindex) self.data = self.data.withColumn('next_ts', f.lead(f.col(_dfindex), default=max_time) .over(window)) # resample data to 'step' self.data = self.data.withColumn(_dfindex, f.explode(date_range_udf(f.col(_dfindex), f.col('next_ts')))) - self.data = self.data.drop('week_of_year') self.data = self.data.drop('next_ts') # join dataframes on timestamp index (rounded to nearest second) self.data = self.data.dropDuplicates([_dfindex]) diff --git a/src/util.py b/src/util.py index cda8f2cf..97e672ac 100644 --- a/src/util.py +++ b/src/util.py @@ -242,7 +242,6 @@ def get_lbds_data(filter_cols, start_time, end_time, cfg, use_case): try: fn += cfg['train_data_file_name'] beam = cfg['beam'] - print("Loading feature vectors for beam {}".format(beam)) except KeyError as e: print("Config file is missing configuration variable {}".format(e)) exit() -- GitLab From b6416f9ab858278f834bcd559ba68d48edb9302f Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Wed, 20 May 2020 13:53:09 +0200 Subject: [PATCH 16/25] added partitioning by week --- src/spark/get_data.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/spark/get_data.py b/src/spark/get_data.py index cb2e4785..934cb294 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -132,14 +132,15 @@ class DataProcessor(ABC): date_range_udf = f.udf(lambda x, y: date_range(x, y, step), ArrayType(LongType())) # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp) max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0] + # Get week to of year to enable sliding window partitions + self.data = self.data.withColumn('week_of_year', f.weekofyear(f.from_unixtime(f.col(_dfindex)))) # get next timestamp using sliding window - # cannot speedup by partitioning because unique timestamps in each subframe will get the default value of - # max_time for column next_ts which is incorrect - window = Window.orderBy(_dfindex) + window = Window.partitionBy('week_of_year').orderBy(_dfindex) self.data = self.data.withColumn('next_ts', f.lead(f.col(_dfindex), default=max_time) .over(window)) # resample data to 'step' self.data = self.data.withColumn(_dfindex, f.explode(date_range_udf(f.col(_dfindex), f.col('next_ts')))) + self.data = self.data.drop('week_of_year') self.data = self.data.drop('next_ts') # join dataframes on timestamp index (rounded to nearest second) self.data = self.data.dropDuplicates([_dfindex]) -- GitLab From 39b2f9fad8543c1ce18577db428ab079d08bc2bc Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Wed, 20 May 2020 18:44:46 +0200 Subject: [PATCH 17/25] decreasing window size --- src/spark/get_data.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/spark/get_data.py b/src/spark/get_data.py index 934cb294..02e46e1a 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -133,9 +133,9 @@ class DataProcessor(ABC): # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp) max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0] # Get week to of year to enable sliding window partitions - self.data = self.data.withColumn('week_of_year', f.weekofyear(f.from_unixtime(f.col(_dfindex)))) + self.data = self.data.withColumn('day_of_year', f.dayofyear(f.from_unixtime(f.col(_dfindex)))) # get next timestamp using sliding window - window = Window.partitionBy('week_of_year').orderBy(_dfindex) + window = Window.partitionBy('day_of_year').orderBy(_dfindex) self.data = self.data.withColumn('next_ts', f.lead(f.col(_dfindex), default=max_time) .over(window)) # resample data to 'step' -- GitLab From 4015568d65f5db0844cc98f95e48f1b625008866 Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Mon, 25 May 2020 14:42:56 +0200 Subject: [PATCH 18/25] added resampling directly to ipoc instead of seconds --- src/spark/get_data.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/spark/get_data.py b/src/spark/get_data.py index 02e46e1a..41346621 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -129,13 +129,11 @@ class DataProcessor(ABC): timestamps: timestamps which will be kept in DataFrame """ # udf to generate upsampled timestamps - date_range_udf = f.udf(lambda x, y: date_range(x, y, step), ArrayType(LongType())) + date_range_udf = udf(lambda x, y: date_range(x, y, timestamps, step), ArrayType(LongType())) # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp) max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0] - # Get week to of year to enable sliding window partitions - self.data = self.data.withColumn('day_of_year', f.dayofyear(f.from_unixtime(f.col(_dfindex)))) # get next timestamp using sliding window - window = Window.partitionBy('day_of_year').orderBy(_dfindex) + window = Window.orderBy(_dfindex) self.data = self.data.withColumn('next_ts', f.lead(f.col(_dfindex), default=max_time) .over(window)) # resample data to 'step' @@ -144,6 +142,7 @@ class DataProcessor(ABC): self.data = self.data.drop('next_ts') # join dataframes on timestamp index (rounded to nearest second) self.data = self.data.dropDuplicates([_dfindex]) + # Remove old timestamps if they did not align with ipoc timestamps self.data = self.data.where(f.col(_dfindex).isin(timestamps)) @staticmethod @@ -355,7 +354,7 @@ class BetsProcessor(DataProcessor): self.combine_sensor_data() -def date_range(t1, t2, step=1): +def date_range(t1, t2, timestamps, step=1,): """ Create range of timestamps with step_size step between t1 and t2 :param t1: (DateType) lower bound of time-interval @@ -364,7 +363,10 @@ def date_range(t1, t2, step=1): """ t1 = int(t1) t2 = int(t2) - return [t1 + step * x for x in range(int((t2 - t1) / step) + 1)] + # resample to seconds + seconds = [t1 + step * x for x in range(int((t2 - t1) / step) + 1)] + # only keep ipoc timestamps + return [t for t in seconds if t in timestamps] def cmw_query(spark, startt, endt, devicel, propertyl): -- GitLab From d151009486f9f8373a0dd9b12ca2d573afabc82e Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Mon, 25 May 2020 17:25:36 +0200 Subject: [PATCH 19/25] changed helper function to inner function for easier overview --- src/spark/get_data.py | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/src/spark/get_data.py b/src/spark/get_data.py index 41346621..c6b74e4e 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -128,6 +128,14 @@ class DataProcessor(ABC): Args: timestamps: timestamps which will be kept in DataFrame """ + def date_range(t1, t2, ts, step=1, ): + t1 = int(t1) + t2 = int(t2) + # resample to seconds + seconds = [t1 + step * x for x in range(int((t2 - t1) / step) + 1)] + # only keep ipoc timestamps + return [t for t in seconds if t in ts] + # udf to generate upsampled timestamps date_range_udf = udf(lambda x, y: date_range(x, y, timestamps, step), ArrayType(LongType())) # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp) @@ -354,21 +362,6 @@ class BetsProcessor(DataProcessor): self.combine_sensor_data() -def date_range(t1, t2, timestamps, step=1,): - """ - Create range of timestamps with step_size step between t1 and t2 - :param t1: (DateType) lower bound of time-interval - :param t2: (DateType) upper bound of time-interval - :param step: (int) step size in seconds - """ - t1 = int(t1) - t2 = int(t2) - # resample to seconds - seconds = [t1 + step * x for x in range(int((t2 - t1) / step) + 1)] - # only keep ipoc timestamps - return [t for t in seconds if t in timestamps] - - def cmw_query(spark, startt, endt, devicel, propertyl): df = DevicePropertyQuery \ .builder(spark) \ -- GitLab From b518102b1cff2a614dbd1c3041f436ad940d1e15 Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Tue, 26 May 2020 13:30:09 +0200 Subject: [PATCH 20/25] skip all other preprocessing if resample timestamps are empty --- src/spark/get_data.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/spark/get_data.py b/src/spark/get_data.py index c6b74e4e..c5f0e312 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -151,6 +151,7 @@ class DataProcessor(ABC): # join dataframes on timestamp index (rounded to nearest second) self.data = self.data.dropDuplicates([_dfindex]) # Remove old timestamps if they did not align with ipoc timestamps + self.data = self.data.where(f.col(_dfindex).isin(timestamps)) @staticmethod @@ -543,7 +544,8 @@ if __name__ == "__main__": print("Getting unique timestamps from {} device".format(tsq[0])) timestamps = get_segment_timestamps(time_interval, tsq, spark) - + if not len(timestamps) : + continue start_time, end_time = str(time_interval[0]), str(time_interval[1]) print("Reading data from {} to {}".format(start_time, end_time)) -- GitLab From 41d6c4c064cdc60b7c6a9219dde97a8404ebf5ca Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Tue, 26 May 2020 13:32:31 +0200 Subject: [PATCH 21/25] added print to warn user of choice --- src/spark/get_data.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/spark/get_data.py b/src/spark/get_data.py index c5f0e312..5611b4e2 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -544,9 +544,14 @@ if __name__ == "__main__": print("Getting unique timestamps from {} device".format(tsq[0])) timestamps = get_segment_timestamps(time_interval, tsq, spark) - if not len(timestamps) : - continue + start_time, end_time = str(time_interval[0]), str(time_interval[1]) + + # If no timestamps to resample to, then no preprocessing can be done + if not len(timestamps): + print("No timestamps to resample to, skipping data retrieval for interval {} - {}".format(start_time, end_time)) + continue + print("Reading data from {} to {}".format(start_time, end_time)) features = None -- GitLab From 21d676430a1122300e0379239734896c23fe102c Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Tue, 26 May 2020 15:07:15 +0200 Subject: [PATCH 22/25] removed typo --- src/spark/get_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/spark/get_data.py b/src/spark/get_data.py index 5611b4e2..835e4f09 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -128,7 +128,7 @@ class DataProcessor(ABC): Args: timestamps: timestamps which will be kept in DataFrame """ - def date_range(t1, t2, ts, step=1, ): + def date_range(t1, t2, ts, step=1): t1 = int(t1) t2 = int(t2) # resample to seconds -- GitLab From 86467e1b2a0aaff2239c52bd8d20cedf9e523410 Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Tue, 26 May 2020 15:10:07 +0200 Subject: [PATCH 23/25] added debug line for udf --- src/spark/get_data.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/spark/get_data.py b/src/spark/get_data.py index 835e4f09..7e5b5a53 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -3,6 +3,7 @@ import argparse from abc import ABC import getpass from datetime import datetime +import sys import yaml import numpy as np @@ -128,13 +129,16 @@ class DataProcessor(ABC): Args: timestamps: timestamps which will be kept in DataFrame """ + def date_range(t1, t2, ts, step=1): t1 = int(t1) t2 = int(t2) # resample to seconds seconds = [t1 + step * x for x in range(int((t2 - t1) / step) + 1)] + time = [t for t in seconds if t in ts] + print(seconds, file=sys.stderr) # only keep ipoc timestamps - return [t for t in seconds if t in ts] + return time # udf to generate upsampled timestamps date_range_udf = udf(lambda x, y: date_range(x, y, timestamps, step), ArrayType(LongType())) @@ -549,7 +553,8 @@ if __name__ == "__main__": # If no timestamps to resample to, then no preprocessing can be done if not len(timestamps): - print("No timestamps to resample to, skipping data retrieval for interval {} - {}".format(start_time, end_time)) + print("No timestamps to resample to, skipping data retrieval for interval {} - {}".format(start_time, + end_time)) continue print("Reading data from {} to {}".format(start_time, end_time)) -- GitLab From 0112d2ab69bf56836644f821fa4e80a94204c1e7 Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Tue, 26 May 2020 15:55:23 +0200 Subject: [PATCH 24/25] removed bug with mismatching datatypes --- src/spark/get_data.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/spark/get_data.py b/src/spark/get_data.py index 7e5b5a53..5798eb64 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -136,10 +136,11 @@ class DataProcessor(ABC): # resample to seconds seconds = [t1 + step * x for x in range(int((t2 - t1) / step) + 1)] time = [t for t in seconds if t in ts] - print(seconds, file=sys.stderr) # only keep ipoc timestamps return time - + + # map timestamps to int because otherwise data_range filters all entries becuase int is not in list of strings + timestamps = [int(t) for t in timestamps] # udf to generate upsampled timestamps date_range_udf = udf(lambda x, y: date_range(x, y, timestamps, step), ArrayType(LongType())) # get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp) @@ -155,7 +156,6 @@ class DataProcessor(ABC): # join dataframes on timestamp index (rounded to nearest second) self.data = self.data.dropDuplicates([_dfindex]) # Remove old timestamps if they did not align with ipoc timestamps - self.data = self.data.where(f.col(_dfindex).isin(timestamps)) @staticmethod -- GitLab From 8dba5b5998185cde7650546bedaf056e31f3169c Mon Sep 17 00:00:00 2001 From: Kyle Bringmans <bringmans.kyle@gmail.com> Date: Tue, 26 May 2020 15:58:56 +0200 Subject: [PATCH 25/25] added clearer reason for line of code --- src/spark/get_data.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/spark/get_data.py b/src/spark/get_data.py index 5798eb64..a7473486 100644 --- a/src/spark/get_data.py +++ b/src/spark/get_data.py @@ -138,8 +138,9 @@ class DataProcessor(ABC): time = [t for t in seconds if t in ts] # only keep ipoc timestamps return time - - # map timestamps to int because otherwise data_range filters all entries becuase int is not in list of strings + + # map timestamps to int because otherwise data_range() resamples to [] + # because no timestamp from 'seconds' of type int is in a list of strings timestamps = [int(t) for t in timestamps] # udf to generate upsampled timestamps date_range_udf = udf(lambda x, y: date_range(x, y, timestamps, step), ArrayType(LongType())) -- GitLab