Commit f4991b88 authored by Domenico Giordano's avatar Domenico Giordano
Browse files

Merge branch 'stiv_newer_merge' into qa-v0.4_stiven

parent 4187230c
......@@ -281,9 +281,11 @@ class BaseOutlierAnalyser(ABC):
if ts_second_window_start is None:
ts_second_window_start = \
ts_second_window_end - (self.history_len * self.granularity_min * 60) # noqa
counter = 0
for (entity, score, percentile, rank) in \
zip(self.host_names, scores_selected, self.percentiles, self.ranks): # noqa
if (rank <= top_k):
if (rank <= top_k) and (counter < top_k):
counter += 1
entity_without_cern_ch = entity.split(".")[0]
self.publish_anomaly(entity_without_cern_ch,
score,
......
# Modules
The repository implements ETL (Extract, Transform, Loa) modules for:
The repository implements ETL (Extract, Transform, Load) modules for:
1. Grafana ETL
1. Spark ETL
......
......@@ -3,4 +3,53 @@
This folder contains the library necessary to download the HDFS data:
1. cluster_utils.py + utils.py: methods to access the Spark context and prepare HDFS paths and folders.
1. etl_steps.py: low level operations that define how to aggregate the time series data, with which granularity, the hard part of Spark processing is here. Note that the normalization strategy is here.
1. etl_pipeline.py: how to combine basic steps into the final ETL pipeline from definition of what you want to find them stored in the desired way in HDFS under the desired path.
\ No newline at end of file
1. etl_pipeline.py: how to combine basic steps into the final ETL pipeline from definition of what you want to find them stored in the desired way in HDFS under the desired path.
<hr>
Let's describe more details about the most important functions that you can find in the different files inside this folder (keep in my mind that, especially for atomic operations, you can find some documentation in the code about the functions and the expected parameters).
## etl_steps.py
- **data_reading**(spark, plugins, days, hostgroups)
Read all plugin and all days.
- **downsampling_and_aggregate**(spark, df_all_plugins, every_n_minutes)
Create the aggregate every x minutes (e.g. if every 10 min it means that data between 15:10 and 15:20 are summarized with the mean statistic and they will get timestamp 15:20.
- **normalize**(spark, df, df_normalization)
Remove the mean and divide by the std deviation to the value column.
- **compute_the_coefficients**(spark, df_aggregated)
Compute from each of the plugin columns. These coefficients are needed for the normalization step.
- **create_window**(spark, df_plugins_as_col, start_date, end_date, history_steps, future_steps, every_n_minutes)
Create a window with the timesteps for history and past. Create the lagged timesteps for each column (aka plugin). Do the same also for furture steps. Note that beforhand, all the missing timesteps have to be replaced with a null value.
- **data_writing**(spark, df_window, outfolder, every_n_minutes, history_steps, future_steps, id_plugins, id_normalization, mode='overwrite')
Save the windows partitioned in the outfolder. It saves every window in a folder path like the following:
```
<outpath>/10min/gva_project_014/2020/02/14/
```
and a filename like the following:
```
<id_plugins>_h<history_steps>_f<future_steps>_<normalization_id>
```
Note that an id to identify that group of plugins will be created.
## etl_pipeline.py
- run_pipeline_all_in_one(spark, config_filepath)
An unique function called in the ETL Airflow pipeline that produces the windwos datasets in HDFS, divided by day and hostgroup. (We use almost every main function of etl_steps.py).
- pipeline_preparation_norm_coeff(spark, config_filepath)
Run the pipeline to get the coefficeint and create a normalization df. It produces normalization datasets in HDFS with noramlization coefficents (e.g. mean and stddev) for every pair of (hostgroup, plugin).
(This for example is not used in the "all_in_one" function above, but it will be used in a single task on Airflow for preparing the normalization coefficients).
- ...
......@@ -27,6 +27,7 @@ from adcern.algo_steps import read_window_dataset
from pyspark.sql.utils import AnalysisException
import shutil
import subprocess
def run_pipeline(spark, config_filepath):
......@@ -432,9 +433,18 @@ def materialize_locally(spark, config_filepath,
.write.format("parquet")\
.mode("overwrite")\
.save(hdfs_outfolder)
copy_to_local(hdfs_path=hdfs_outfolder,
local_path=local_outfolder)
raw_folder = config_dict["hdfs_out_folder"] + project_code
print("Deleting the raw data saved in %s ..." % raw_folder)
try:
subprocess.call(["hdfs", "dfs", "-rm", "-r", raw_folder])
except Exception as e_delete:
print('Error while deleting raw_folder directory: ', e_delete)
def get_normalization_path(spark, config_filepath):
"""Given the config dictionary path get the normalization path."""
......
......@@ -93,8 +93,7 @@ def data_reading(spark, plugins, days, hostgroups):
"""
plugins = copy.deepcopy(plugins)
inpaths = \
create_paths_for_collectd(basepath="/project/monitoring/collectd",
plugins=plugins, days=days)
create_paths_for_collectd(plugins=plugins, days=days)
existing_inpaths = keep_only_existing_paths(spark=spark, paths=inpaths)
[print(p) for p in sorted(existing_inpaths)]
......@@ -609,16 +608,9 @@ def _keep_only_these_plugins_and_hg(df, plugins, hostgroups):
# filter all the plugin individually
for k in plugins.keys():
plugin_dict = plugins[k]
# for all attributes of this plugin
and_conditions = []
plugin_dict["plugin"] = plugin_dict.pop("plugin_name")
for sub_k in plugin_dict.keys():
value = plugin_dict[sub_k]
and_conditions.append(sub_k + " == '" + value + "'")
plg_condition = "( " + " and ".join(and_conditions) + " )"
# print("plg_condition: ", plg_condition)
or_conditions.append(plg_condition)
# print("or_conditions: ", or_conditions)
plg_filter = "( " + plugin_dict['plugin_filter'] + " )"
or_conditions.append(plg_filter)
# filter all interested line
filter_str = " or ".join(or_conditions)
......
......@@ -172,19 +172,18 @@ def get_date_between(start_date: str, end_date: str):
return list_of_dates
def create_paths_for_collectd(basepath, plugins, days):
def create_paths_for_collectd(plugins, days):
"""Create the paths to access the plugin metrics."""
paths = []
for k in plugins.keys():
current_plg_dict = plugins[k]
print(current_plg_dict)
plg_folder = current_plg_dict["plugin_name"]
plg_folder = current_plg_dict["plugin_data_path"]
for (year, month, day) in days:
str_year = str(year)
str_month = str(month).zfill(2)
str_day = str(day).zfill(2)
new_path = basepath + \
"/" + plg_folder + \
new_path = plg_folder + \
"/" + str_year + "/" + str_month + "/" + str_day + "/"
paths.append(new_path)
return list(set(paths))
......
This diff is collapsed.
......@@ -19,32 +19,24 @@
"date_end_normalization_excluded": "2021-01-10",
"selected_plugins": {
"load_longterm": {
"value_instance": "longterm",
"plugin_name": "load"
"plugin_data_path": "/project/monitoring/collectd/load",
"plugin_filter": "value_instance == 'longterm' and plugin == 'load'"
},
"cpu__percent_idle": {
"plugin_instance": "",
"type": "percent",
"type_instance": "idle",
"plugin_name": "cpu"
"plugin_data_path": "/project/monitoring/collectd/cpu",
"plugin_filter": "type == 'percent' and type_instance == 'idle' and plugin == 'cpu'"
},
"memory__memory_free": {
"plugin_instance": "",
"type": "memory",
"type_instance": "free",
"plugin_name": "memory"
"plugin_data_path": "/project/monitoring/collectd/memory",
"plugin_filter": "type == 'memory' and type_instance == 'free' and plugin == 'memory'"
},
"vmem__vmpage_io_memory_in": {
"plugin_instance": "",
"type": "vmpage_io",
"type_instance": "memory",
"value_instance": "in",
"plugin_name": "vmem"
"plugin_data_path": "/project/monitoring/collectd/vmem",
"plugin_filter": "type == 'vmpage_io' and type_instance == 'memory' and value_instance == 'in' and plugin == 'vmem'"
},
"swap_swapfile_swap_free": {
"type": "swap",
"type_instance": "free",
"plugin_name": "swap"
"plugin_data_path": "/project/monitoring/collectd/swap",
"plugin_filter": "type == 'swap' and type_instance == 'free' and plugin == 'swap'"
}
}
}
\ No newline at end of file
......@@ -19,32 +19,24 @@
"date_end_normalization_excluded": "2021-01-10",
"selected_plugins": {
"load_longterm": {
"value_instance": "longterm",
"plugin_name": "load"
"plugin_data_path": "/project/monitoring/collectd/load",
"plugin_filter": "value_instance == 'longterm' and plugin == 'load'"
},
"cpu__percent_idle": {
"plugin_instance": "",
"type": "percent",
"type_instance": "idle",
"plugin_name": "cpu"
"plugin_data_path": "/project/monitoring/collectd/cpu",
"plugin_filter": "type == 'percent' and type_instance == 'idle' and plugin == 'cpu'"
},
"memory__memory_free": {
"plugin_instance": "",
"type": "memory",
"type_instance": "free",
"plugin_name": "memory"
"plugin_data_path": "/project/monitoring/collectd/memory",
"plugin_filter": "type == 'memory' and type_instance == 'free' and plugin == 'memory'"
},
"vmem__vmpage_io_memory_in": {
"plugin_instance": "",
"type": "vmpage_io",
"type_instance": "memory",
"value_instance": "in",
"plugin_name": "vmem"
"plugin_data_path": "/project/monitoring/collectd/vmem",
"plugin_filter": "type == 'vmpage_io' and type_instance == 'memory' and value_instance == 'in' and plugin == 'vmem'"
},
"swap_swapfile_swap_free": {
"type": "swap",
"type_instance": "free",
"plugin_name": "swap"
"plugin_data_path": "/project/monitoring/collectd/swap",
"plugin_filter": "type == 'swap' and type_instance == 'free' and plugin == 'swap'"
}
}
}
\ No newline at end of file
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment