Skip to content
Snippets Groups Projects
Commit 16bf96ed authored by Maria Maxouti's avatar Maria Maxouti
Browse files

add etl library

parent 8e936a29
No related branches found
No related tags found
No related merge requests found
etl.py 0 → 100644
from influxdb import InfluxDBClient
from influxdb import DataFrameClient
import pandas
from cern.nxcals.pyquery.builders import *
import numpy as np
from scipy.signal import savgol_filter
import os
import time
def timing(f):
def wrap(*args, **kwargs):
time1 = time.time()
ret = f(*args, **kwargs)
time2 = time.time()
print('{:s} function took {:.3f} s'.format(f.__name__, (time2-time1)))
return ret
return wrap
@timing
def build_dataset(measurements, start_date, end_date):
# Building the query and load data into spark dataframe
winccoaData_builder = DataQuery.builder(spark).byEntities().system('WINCCOA')\
.startTime(start_date).endTime(end_date)
entityAliasStage = None
for measurement in measurements:
entityAliasStage=winccoaData_builder.entity().keyValue('variable_name', measurement)
if entityAliasStage == None:
print("No entity specified, we are not performing extraction")
return None
return entityAliasStage.buildDataset()
@timing
def select_and_sort(dataset):
return dataset.select("nxcals_entity_id", "timestamp", "value","variable_name").sort("timestamp")
@timing
def filter(measurement, spark_dataframe ,filter_lambda=None):
p_dataframe = spark_dataframe.filter("variable_name=='" + measurement + "'").toPandas()
p_dataframe.index = pandas.to_datetime(p_dataframe.pop('timestamp'),unit='ns')
field_columns = ["value"]
if filter_lambda is not None:
p_dataframe.rename(columns={"value": "raw_value"}, inplace=True)
p_dataframe["value"]= filter_lambda(p_dataframe["raw_value"])
field_columns.append("raw_value")
return (p_dataframe, field_columns)
@timing
def extraction(measurements, start_date, end_date, filter_lambda=None, tag_lookup_lambda = None):
winccoaDataset = build_dataset(measurements, start_date, end_date)
client = DataFrameClient(host=os.environ.get("INFLUXDB_HOST"), port=int(os.environ.get("INFLUXDB_PORT")),
username=os.environ.get("INFLUXDB_USERNAME"),
password=os.environ.get("INFLUXDB_PASSWORD"),
database=os.environ.get("INFLUXDB_DATABASE"), ssl=True, verify_ssl=False)
winccoaData = select_and_sort(winccoaDataset)
for measurement in measurements:
filter_result=filter(measurement, winccoaData,filter_lambda)
my_dataframe = filter_result[0]
print(my_dataframe.dtypes)
tags_dictionary = {}
if tag_lookup_lambda is not None:
tags_dictionary.update(tag_lookup_lambda(measurement))
#print("Sending dataframe to influxDB {} {}".format(measurement, filter_result[0].shape))
client.write_points(dataframe = my_dataframe, measurement = measurement,
tags = tags_dictionary,
field_columns = filter_result[1],
protocol = "line",
time_precision = "n")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment