Skip to content
Snippets Groups Projects
Commit 50682b78 authored by bcopy's avatar bcopy
Browse files

Add support for NXCALS System parameter

parent aa281afd
No related branches found
No related tags found
1 merge request!2Add support for NXCALS System parameter
......@@ -43,18 +43,18 @@ def timing(f):
return wrap
@timing
def build_dataset(measurements, start_date, end_date):
def build_dataset(measurements, start_date, end_date, nxcals_system):
global spark
# Building the query and load data into spark dataframe
winccoaData_builder = DataQuery.builder(spark).byEntities().system('WINCCOA') \
query_builder = DataQuery.builder(spark).byEntities().system(nxcals_system) \
.startTime(start_date).endTime(end_date)
entityAliasStage = None
entity_alias = None
for measurement in measurements:
entityAliasStage=winccoaData_builder.entity().keyValue('variable_name', measurement)
if entityAliasStage == None:
entity_alias=query_builder.entity().keyValue('variable_name', measurement)
if entity_alias == None:
logging.warning("No entity specified, not performing extraction")
return None
return entityAliasStage.buildDataset()
return entity_alias.buildDataset()
@timing
def select_and_sort(dataset):
......@@ -72,19 +72,19 @@ def filter(measurement, spark_dataframe ,filter_lambda=None):
return (p_dataframe, field_columns)
@timing
def extraction(measurements, start_date, end_date, filter_lambda=None, tag_lookup_lambda = None, measurement_lookup_lambda = ( lambda x: x )):
winccoaDataset = build_dataset(measurements, start_date, end_date)
def extraction(measurements, start_date, end_date, filter_lambda=None, tag_lookup_lambda = None, measurement_lookup_lambda = ( lambda x: x ), nxcals_system = "WINCCOA"):
dataSet = build_dataset(measurements, start_date, end_date, nxcals_system)
logging.warn("Connecting to INFLUXDB {}@{}:{} DB : {}".format(os.environ.get("INFLUXDB_USERNAME"),os.environ.get("INFLUXDB_HOST"), os.environ.get("INFLUXDB_PORT"), os.environ.get("INFLUXDB_DATABASE")))
client = DataFrameClient(host=os.environ.get("INFLUXDB_HOST"), port=int(os.environ.get("INFLUXDB_PORT")),
username=os.environ.get("INFLUXDB_USERNAME"),
password=os.environ.get("INFLUXDB_PASSWORD"),
database=os.environ.get("INFLUXDB_DATABASE"), ssl=True, verify_ssl=False)
winccoaData = select_and_sort(winccoaDataset)
data = select_and_sort(dataSet)
for measurement in measurements:
filter_result=filter(measurement, winccoaData,filter_lambda)
filter_result=filter(measurement, data,filter_lambda)
my_dataframe = filter_result[0]
print(my_dataframe.dtypes)
tags_dictionary = {}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment