Skip to content
Snippets Groups Projects
Commit 34dfaa70 authored by Kyle Bringmans's avatar Kyle Bringmans
Browse files

Merge branch 'add-ipoc-resampling' into 'proposed-master'

Add ipoc resampling

See merge request te-abt-ec/anomaly-detection-lbds-2020!24
parents 3df29dcd db781be9
No related branches found
No related tags found
1 merge request!24Add ipoc resampling
......@@ -154,4 +154,4 @@ pipeline:
contamination: 0.5,
max_features: 1.,
verbose: 1 # 0, 1, or 2
}
\ No newline at end of file
}
......@@ -3,6 +3,7 @@ import argparse
from abc import ABC
import getpass
from datetime import datetime
import sys
import yaml
import numpy as np
......@@ -13,7 +14,7 @@ from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import Window, SparkSession
from pyspark.sql.functions import udf, collect_list, first, last, asc, count, when, isnull
from pyspark.sql.types import DateType
from pyspark.sql.types import DateType, LongType, ArrayType
from cern.nxcals.pyquery.builders import DevicePropertyQuery
from scipy import signal
......@@ -123,12 +124,40 @@ class DataProcessor(ABC):
start_df = start_df.drop('t')
return start_df
def resample(self, timestamps):
def resample(self, timestamps, step=1):
"""
Resample data to certain timestamps
Args:
timestamps: timestamps which will be kept in DataFrame
"""
def date_range(t1, t2, ts, step=1):
t1 = int(t1)
t2 = int(t2)
# resample to seconds
seconds = [t1 + step * x for x in range(int((t2 - t1) / step) + 1)]
time = [t for t in seconds if t in ts]
# only keep ipoc timestamps
return time
# map timestamps to int because otherwise data_range() resamples to []
# because no timestamp from 'seconds' of type int is in a list of strings
timestamps = [int(t) for t in timestamps]
# udf to generate upsampled timestamps
date_range_udf = udf(lambda x, y: date_range(x, y, timestamps, step), ArrayType(LongType()))
# get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp)
max_time = self.data.agg({_dfindex: 'max'}).collect()[0][0]
# get next timestamp using sliding window
window = Window.orderBy(_dfindex)
self.data = self.data.withColumn('next_ts', f.lead(f.col(_dfindex), default=max_time)
.over(window))
# resample data to 'step'
self.data = self.data.withColumn(_dfindex, f.explode(date_range_udf(f.col(_dfindex), f.col('next_ts'))))
self.data = self.data.drop('week_of_year')
self.data = self.data.drop('next_ts')
# join dataframes on timestamp index (rounded to nearest second)
self.data = self.data.dropDuplicates([_dfindex])
# Remove old timestamps if they did not align with ipoc timestamps
self.data = self.data.where(f.col(_dfindex).isin(timestamps))
@staticmethod
......@@ -530,6 +559,13 @@ if __name__ == "__main__":
timestamps = get_segment_timestamps(time_interval, tsq, spark)
start_time, end_time = str(time_interval[0]), str(time_interval[1])
# If no timestamps to resample to, then no preprocessing can be done
if not len(timestamps):
print("No timestamps to resample to, skipping data retrieval for interval {} - {}".format(start_time,
end_time))
continue
print("Reading data from {} to {}".format(start_time, end_time))
features = None
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment