Skip to content
Snippets Groups Projects
Commit d1510094 authored by Kyle Bringmans's avatar Kyle Bringmans
Browse files

changed helper function to inner function for easier overview

parent 4015568d
No related branches found
No related tags found
1 merge request!24Add ipoc resampling
......@@ -128,6 +128,14 @@ class DataProcessor(ABC):
Args:
timestamps: timestamps which will be kept in DataFrame
"""
def date_range(t1, t2, ts, step=1, ):
t1 = int(t1)
t2 = int(t2)
# resample to seconds
seconds = [t1 + step * x for x in range(int((t2 - t1) / step) + 1)]
# only keep ipoc timestamps
return [t for t in seconds if t in ts]
# udf to generate upsampled timestamps
date_range_udf = udf(lambda x, y: date_range(x, y, timestamps, step), ArrayType(LongType()))
# get upper limit of timestamps for final timestamp in dataframe (handles edge case for last timestamp)
......@@ -354,21 +362,6 @@ class BetsProcessor(DataProcessor):
self.combine_sensor_data()
def date_range(t1, t2, timestamps, step=1,):
"""
Create range of timestamps with step_size step between t1 and t2
:param t1: (DateType) lower bound of time-interval
:param t2: (DateType) upper bound of time-interval
:param step: (int) step size in seconds
"""
t1 = int(t1)
t2 = int(t2)
# resample to seconds
seconds = [t1 + step * x for x in range(int((t2 - t1) / step) + 1)]
# only keep ipoc timestamps
return [t for t in seconds if t in timestamps]
def cmw_query(spark, startt, endt, devicel, propertyl):
df = DevicePropertyQuery \
.builder(spark) \
......
  • Contributor

    testing on 1 month of data for speed (22min for 2.677.746 entries)

    Edited by Kyle Bringmans
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment