Commit 2ad982a6 authored by smetaj's avatar smetaj
Browse files

Changes for notebook: folder_path + analysis print

parent d1e2cf93
......@@ -281,9 +281,11 @@ class BaseOutlierAnalyser(ABC):
if ts_second_window_start is None:
ts_second_window_start = \
ts_second_window_end - (self.history_len * self.granularity_min * 60) # noqa
counter = 0
for (entity, score, percentile, rank) in \
zip(self.host_names, scores_selected, self.percentiles, self.ranks): # noqa
if (rank <= top_k):
if (rank <= top_k) and (counter < top_k):
counter += 1
entity_without_cern_ch = entity.split(".")[0]
self.publish_anomaly(entity_without_cern_ch,
score,
......
This diff is collapsed.
"""Genearl Utils for file, folder, config management."""
from datetime import datetime
from datetime import timedelta
import json
import pandas as pd
import re
from adcern.publisher import create_id_for_metrics_dict
from adcern.publisher import create_id_for_hostrgroups_and_plugins_start_end
def read_json_config(local_path):
"""Read the configuration file.
Params
------
local_path: str
path for the JSON file with the following keys:
- hdfs_out_folder: (str) where to save your files in hdfs
- date_start: (str) day for the first date e.g. "2020-02-14"
- date_end_excluded: (str) day for the last date (excluded)
- overwrite_on_hdfs: (boolean) to decide if overwrite or not (default
is false = no overwrite)
- aggregate_every_n_minutes: (int) how many min do you want to
aggregate the raw timeseries
- history_steps: (int) nr of past lag timestamp in each window
- future_steps: (int) nr of future timestamp in each window
- hostgroups: (list of str) the hostgroups you are interested in.
NB put them in square brachets
e.g. ["cloud_compute/level2/batch/gva_project_014",
"cloud_compute/level2/main/gva_shared_012"]
- selected_plugins: (dict of dict)
every key represents a signal, and that signal name will be used
to identify that signal all the time. Attributes must be specificed
for every signal by using a dictionary with the following
attributes:
"plugin_instance", "type", "type_instance", "value_instance",
"plugin_name", "scale"
NB "scale" can be "log" to have a logarithmic transformation TODO
Return
------
config_dict: dict
containing all the above mentioned keys
hostgroups: list of str
plugins: list of plugins
"""
with open(local_path) as json_file:
config_dict = json.load(json_file)
return config_dict
def create_path_for_hostgroup(config_dict, hostgroup):
"""Create all paths for the configuration dictionary.
Params
------
config_dict: dict with the following keys
hdfs_out_folder (str), date_start (str), date_end_excluded(str),
overwrite_on_hdfs (boolean), aggregate_every_n_minutes (int),
history_steps (int), future_steps (int),
hostgroups (list of str), selected_plugins (dict of dict)
hostgroup: str
hostgroup that you want to construct
"""
base_path = config_dict["hdfs_out_folder"]
# frequency
freq_path = \
"freq=" + str(config_dict["aggregate_every_n_minutes"]) + "min"
# hostgroups
escaped_hostgroup = convert_to_slash(hostgroup)
hostgroup_path = \
"hostgroup=" + escaped_hostgroup
# plugins
plugins = config_dict["selected_plugins"]
id_plugins = create_id_for_metrics_dict(plugins)
plugins_path = \
"plg=" + id_plugins
# normalization
hostgroups = config_dict["hostgroups"]
normalization_id = \
create_id_for_hostrgroups_and_plugins_start_end(
hostgroups=hostgroups,
plugins=plugins,
start=config_dict["date_start_normalization"],
end=config_dict["date_end_normalization_excluded"])
normalization_path = \
"norm=" + normalization_id
# history
history_path = \
"h=" + str(config_dict["history_steps"])
# future
future_path = \
"f=" + str(config_dict["future_steps"])
# create parts around the day
first_part = \
"/".join([base_path,
freq_path,
hostgroup_path])
last_part = \
"/".join([plugins_path,
normalization_path,
history_path,
future_path])
# days
days = get_date_between(config_dict["date_start"],
config_dict["date_end_excluded"])
paths = []
for (year, month, day) in days:
str_year = "year=" + str(year)
str_month = "month=" + str(month)
# str_day = "day=" + str(day)
new_path = \
first_part + \
"/" + str_year + "/" + str_month + "/" + \
last_part
# str_day + "/" +
paths.append(new_path)
print("Reading the following paths:")
[print(x) for x in paths]
return paths
def delete_files(spark, config_dict, hostgroup):
"""Delete all saved files related to this hostrgoup and interval.
All the info are contained in the config dictionary.
Params
------
spark: spark context
config_filepath: dictionary with the following keys:
hdfs_out_folder (str), date_start (str), date_end_excluded(str),
overwrite_on_hdfs (boolean), aggregate_every_n_minutes (int),
history_steps (int), future_steps (int),
hostgroups (list of str), selected_plugins (dict of dict)
hostgroup: str
hostgroup that you want to get, it must be in the config file too, as
an element in the list "hostgroups" key
"""
paths_to_delete = create_path_for_hostgroup(config_dict, hostgroup)
sc = spark.sparkContext
hadoop = sc._jvm.org.apache.hadoop
fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration()
hdfs_conf = fs.get(conf)
for path_str in paths_to_delete:
path = hadoop.fs.Path(path_str)
if hdfs_conf.exists(path):
hdfs_conf.delete(path)
print("Deleted: ", path_str)
else:
print("Not existing: ", path_str)
print("Success")
def get_date_between(start_date: str, end_date: str):
"""Get date in the following range as tuples of int (year, month, day).
NB: the end_date is excluded
"""
list_of_dates = []
sdate = datetime.strptime(start_date, "%Y-%m-%d") # start date
edate = datetime.strptime(end_date, "%Y-%m-%d") # end date
delta = edate - sdate # as timedelta
for i in range(delta.days):
day = sdate + timedelta(days=i)
day_tuple = (day.year, day.month, day.day)
list_of_dates.append(day_tuple)
print("Start: %s - End %s" % (list_of_dates[0], list_of_dates[-1]))
return list_of_dates
def create_paths_for_collectd(plugins, days):
"""Create the paths to access the plugin metrics."""
paths = []
for k in plugins.keys():
current_plg_dict = plugins[k]
print(current_plg_dict)
plg_folder = current_plg_dict["plugin_data_path"]
for (year, month, day) in days:
str_year = str(year)
str_month = str(month).zfill(2)
str_day = str(day).zfill(2)
new_path = plg_folder + \
"/" + str_year + "/" + str_month + "/" + str_day + "/"
paths.append(new_path)
return list(set(paths))
def keep_only_existing_paths(spark, paths):
"""Filter paths and discard the non-existent.
The not existent will be printed
Return
------
existing_paths: list of str
path that actually exists
"""
sc = spark.sparkContext
hadoop = sc._jvm.org.apache.hadoop
# https://stackoverflow.com/questions/52166841/spark-reading-data-frame-from-list-of-paths-with-empty-path
# https://hadoop.apache.org/docs/r2.8.2/api/org/apache/hadoop/fs/FileSystem.html
fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration()
existing_paths = []
non_existing = []
for path_str in paths:
path = hadoop.fs.Path(path_str)
try:
fs.get(conf).listStatus(path)
existing_paths.append(path_str)
except Exception:
print("not found: ", path_str)
# print(e)
non_existing.append(path_str)
if len(existing_paths) == 0:
raise Exception("In this configuration file you inserted only data "
"that do not exist in HDFS")
return existing_paths
def convert_to_slash(path):
"""Replace the slashed in the same way done by Spark."""
return path.replace("/", "%2F")
def read_local_window_dataset(config_filepath=None,
path=None, nr_timeseries=None,
list_metrics_to_keep=None):
"""Read the window dataset from the local cache folder.
Params
------
config_filepath: path to json with the following keys:
local_cache_folder (str), code_project_name (str),
selected_plugins (dict of dict)
path: str
direct path of the parquet file
nr_timeseries: int
nr of plugins you expect to find (needed if path is passed)
list_metrics_to_keep: list of str
it includes the list of names of the metrics you want to read.
Use the same name in the configuration file, those names will be
used to filter columns.
Return
------
feature_dataset: Pandas Dataframe
each row is a vector containing the values for the
selected metrics for one host in a given timestamp.
load_h0 | load_h1 | load_h2 | cpu_h0 | cpu_h1 | cpu_h2 | etc
load-t means the signal load t step in the past.
Refer to: history_steps (int) key in the dict
provenance_dataset: Pandas Dataframe
hostname | timestamp
to know to where each row is coming from in
terms of host and timestamp (the current timestamp is part of
the history)
nr_timeseries: int
number of plugins present
"""
if config_filepath is None \
and path is not None \
and nr_timeseries is not None:
local_path = path
nr_timeseries = nr_timeseries
else:
config_dict = read_json_config(config_filepath)
local_path = \
config_dict["local_cache_folder"] + config_dict["code_project_name"] # noqa
nr_timeseries = len(config_dict["selected_plugins"])
print("Opening in Pandas -> parquet file: ", local_path)
pdf = pd.read_parquet(local_path)
pdf.drop_duplicates(subset=["timestamp", "hostname"], inplace=True)
regex_history = re.compile("_h[0-9]*$")
h_columns = list(filter(regex_history.search, pdf.columns))
# keep only history columns
feature_dataset = pdf[h_columns]
if list_metrics_to_keep is not None:
columns_to_keep = []
for metric in list_metrics_to_keep:
regex_this_metric = re.compile(metric + "_h[0-9]*$")
columns_this_metric = \
list(filter(regex_this_metric.search, pdf.columns))
columns_to_keep += columns_this_metric
feature_dataset = feature_dataset[columns_to_keep]
provenance_dataset = pdf[["hostname", "ts", "timestamp"]]
return feature_dataset, provenance_dataset, nr_timeseries
This diff is collapsed.
......@@ -19,32 +19,47 @@
"date_end_normalization_excluded": "2021-01-10",
"selected_plugins": {
"load_longterm": {
"value_instance": "longterm",
"plugin_name": "load"
"plugin_data_path": "/project/monitoring/collectd/load",
"plugin_filters": {
"value_instance": "longterm",
"plugin_name": "load"
}
},
"cpu__percent_idle": {
"plugin_instance": "",
"type": "percent",
"type_instance": "idle",
"plugin_name": "cpu"
"plugin_data_path": "/project/monitoring/collectd/cpu",
"plugin_filters": {
"plugin_instance": "",
"type": "percent",
"type_instance": "idle",
"plugin_name": "cpu"
}
},
"memory__memory_free": {
"plugin_instance": "",
"type": "memory",
"type_instance": "free",
"plugin_name": "memory"
"plugin_data_path": "/project/monitoring/collectd/memory",
"plugin_filters": {
"plugin_instance": "",
"type": "memory",
"type_instance": "free",
"plugin_name": "memory"
}
},
"vmem__vmpage_io_memory_in": {
"plugin_instance": "",
"type": "vmpage_io",
"type_instance": "memory",
"value_instance": "in",
"plugin_name": "vmem"
"plugin_data_path": "/project/monitoring/collectd/vmem",
"plugin_filters": {
"plugin_instance": "",
"type": "vmpage_io",
"type_instance": "memory",
"value_instance": "in",
"plugin_name": "vmem"
}
},
"swap_swapfile_swap_free": {
"type": "swap",
"type_instance": "free",
"plugin_name": "swap"
"plugin_data_path": "/project/monitoring/collectd/swap",
"plugin_filters": {
"type": "swap",
"type_instance": "free",
"plugin_name": "swap"
}
}
}
}
\ No newline at end of file
......@@ -19,32 +19,47 @@
"date_end_normalization_excluded": "2021-01-10",
"selected_plugins": {
"load_longterm": {
"value_instance": "longterm",
"plugin_name": "load"
"plugin_data_path": "/project/monitoring/collectd/load",
"plugin_filters": {
"value_instance": "longterm",
"plugin_name": "load"
}
},
"cpu__percent_idle": {
"plugin_instance": "",
"type": "percent",
"type_instance": "idle",
"plugin_name": "cpu"
"plugin_data_path": "/project/monitoring/collectd/cpu",
"plugin_filters": {
"plugin_instance": "",
"type": "percent",
"type_instance": "idle",
"plugin_name": "cpu"
}
},
"memory__memory_free": {
"plugin_instance": "",
"type": "memory",
"type_instance": "free",
"plugin_name": "memory"
"plugin_data_path": "/project/monitoring/collectd/memory",
"plugin_filters": {
"plugin_instance": "",
"type": "memory",
"type_instance": "free",
"plugin_name": "memory"
}
},
"vmem__vmpage_io_memory_in": {
"plugin_instance": "",
"type": "vmpage_io",
"type_instance": "memory",
"value_instance": "in",
"plugin_name": "vmem"
"plugin_data_path": "/project/monitoring/collectd/vmem",
"plugin_filters": {
"plugin_instance": "",
"type": "vmpage_io",
"type_instance": "memory",
"value_instance": "in",
"plugin_name": "vmem"
}
},
"swap_swapfile_swap_free": {
"type": "swap",
"type_instance": "free",
"plugin_name": "swap"
"plugin_data_path": "/project/monitoring/collectd/swap",
"plugin_filters": {
"type": "swap",
"type_instance": "free",
"plugin_name": "swap"
}
}
}
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment