Commit e02877d8 authored by Matteo Paltenghi's avatar Matteo Paltenghi
Browse files

plotoutlier color theme + load dataset from config

parent f0b23ea7
......@@ -7,6 +7,9 @@ grafana_token.txt
#*#
*.py[cod]
# LOGS for TENSORBOARD
logs
# C extensions
*.so
......
......@@ -2,6 +2,7 @@ import os
import hashlib
from datetime import datetime
from datetime import timedelta
import json
import numpy as np
import pandas as pd
......@@ -16,8 +17,7 @@ from pyspark.sql.utils import AnalysisException
class CollectdDataset(object):
"""Dataset containig metrics data of CERN Data Centre machines."""
def __init__(self, spark, cell,
):
def __init__(self, spark=None, cell=None):
"""Initialize with a Spark context to access the cluster.
spark: spark context
......@@ -29,6 +29,43 @@ class CollectdDataset(object):
self.files_per_single_write = 1
self.entire_pandas_df = None
def from_config(self, spark=None, json_config_file='resources.json',
position_cell_to_read=0,
verbose=False):
"""Initialize you dataset form a json config file.
Params:
spark: (Spark Context) Deafult is None
position_cell_to_read: (int) which cellto consider
json_config_file: (str) path to the file with config
The field of the JSON should be the following:
"output_folder": (str) path in local folder
"date_start": (str) YYYY-MM-DD "2020-05-10" start date of the analysis
"date_end": (str) YYYY-MM-DD "2020-05-15" end date of the analysis
"minutes": (int) granularity of the dataset
"chunks": (int) nr of chnunks for the topandas method on Spark
"cell_names": (list of str) only the element specified by
position_cell_to_read will be considered
"selected_plugins": (nested json) it represent the plugin dictionary
as explained in the add_plugin function
"""
with open(json_config_file) as json_file:
data = json.load(json_file)
if verbose:
print(json.dumps(data, indent=4, sort_keys=True))
self.cell = data["cell_names"][position_cell_to_read]
self.local_folder = data["local_folder"]
self.hdfs_folder = data["hdfs_folder"]
self.start = data["date_start"]
self.end = data["date_end"]
self.n_minutes = data["minutes"]
self.num_chunks_pandas = data["chunks"]
self.plugins = data["selected_plugins"]
self.override_on_hdfs = data["override_on_hdfs"]
self.acquire_data()
def add_plugins(self, plugins, override=False):
"""Add info about the plugin you want in the dataset.
......@@ -48,6 +85,31 @@ class CollectdDataset(object):
else:
self.plugins.update(plugins)
def set_download_options(self, hdfs_folder: str, local_folder: str,
override_on_hdfs=False, num_chunks_pandas=5):
"""Set options for download.
hdfs_folder: the name of the folder on HDFS in your private space. The
file scraped from the main db will be stored in this folder of your
private space.
local_folder: the name of the folder in your local environment. Here we
will save the hdf5 file representing the aggregate you are
analyzing.
override_on_hdfs: if False (default behaviour) the method will use
previously computed aggregates for the selected plugin, datasource
and granularity of downsampling. This is recommended to avoid
useful computation, if data aren't changed, there is no need to
recompute. If True override the data already collected.
num_chunks_pandas: increase this if obtaining out of memory error.
it represents the number of chunks to partition the conversion
from spark to pands to avoid out of memory error (due to l
imitation in the memory of the driver).
"""
self.hdfs_folder = hdfs_folder
self.local_folder = local_folder
self.num_chunks_pandas = num_chunks_pandas
self.override_on_hdfs = override_on_hdfs
def set_file_per_single_write(self, nr_files):
"""Change the nr of files that are saved for each day."""
self.files_per_single_write = nr_files
......@@ -104,41 +166,24 @@ class CollectdDataset(object):
"""
return self.metadata
def acquire_data(self, hdfs_folder: str, local_folder: str,
override_on_hdfs=False, num_chunks_pandas=5):
def acquire_data(self):
"""Get the data from HDFS (downsample query if needed).
hdfs_folder: the name of the folder on HDFS in your private space. The
file scraped from the main db will be stored in this folder of your
private space.
local_folder: the name of the folder in your local environment. Here we
will save the hdf5 file representing the aggregate you are
analyzing.
override_on_hdfs: if False (default behaviour) the method will use
previously computed aggregates for the selected plugin, datasource
and granularity of downsampling. This is recommended to avoid
useful computation, if data aren't changed, there is no need to
recompute. If True override the data already collected.
num_chunks_pandas: increase this if obtaining out of memory error.
it represents the number of chunks to partition the conversion
from spark to pands to avoid out of memory error (due to l
imitation in the memory of the driver).
It uses the settings decided in the set_download_option function
"""
print("-----------------------------------------------------")
print("------------ PROCEDURE TO GET DATA ------------------")
self.hdfs_folder = hdfs_folder
self.local_folder = local_folder
local_file_name = self.cell + "_" + str(self.n_minutes) + "min_" + \
"_" + self.start + "_" + self.end + ".hdf5"
# scraping HDFS
if not os.path.isfile(self.local_folder + "/" + local_file_name) or \
override_on_hdfs:
self.override_on_hdfs:
print("Local hdf5 File not found: long mining procedure started."
"This will take a while.")
# scraping HDFS
self._create_in_HDFS(override_on_hdfs)
self._create_in_HDFS(self.override_on_hdfs)
self._transfer_locally(hdf5_filename=local_file_name,
num_chunks=num_chunks_pandas)
num_chunks=self.num_chunks_pandas)
self.from_local_file(local_file_name)
print("Dataset stored here "
"(save the name for fast reuse): %s" % local_file_name)
......
......@@ -311,6 +311,7 @@ class Explorer(object):
window_start=None,
window_end=None,
nr_machines_in_legend=4,
color_outlier="red",
short=1, **pltargs):
"""Plot the selected machines, color indicates anomaly score.
......@@ -323,13 +324,33 @@ class Explorer(object):
scores: the anomaly scores (HIGHER is MORE CRITICAL/RED). It must
have the same order of the machines.
short: a multiplier from 0 to 1 to shrink the height of your plots.
color_outlier: ("red") color of your timeseries: "green", "blue",
"orange", "purple".
**pltargs: arguments that will be passed to the pandas plot function
of the DataFrame object
return: list of machines from the most critical to the lowest and
list of their scores.
"""
dict_cmap = {}
dict_cmap["red"] = matplotlib.cm.Reds
dict_cmap["blue"] = matplotlib.cm.Blues
dict_cmap["green"] = matplotlib.cm.Greens
dict_cmap["orange"] = matplotlib.cm.Oranges
dict_cmap["purple"] = matplotlib.cm.Purples
on_machines = list(on_machines)
# Outlier preprocessing
# reorder the anomalies:
# plot first normal data (go to background)
# plot outlier at the end (on top of the others)
ordered_index = np.argsort(scores)
ordered_score = [scores[x] for x in ordered_index]
ordered_machines = [on_machines[x] for x in ordered_index]
print("Candidates outliers:")
print(ordered_machines[-nr_machines_in_legend:])
# print(on_machines)
if on_metrics is None:
on_metrics = list(self.snapshot.plugin.unique())
......@@ -348,17 +369,16 @@ class Explorer(object):
norm = matplotlib.colors.Normalize(vmin=min(scores),
vmax=max(scores))
cmap = matplotlib.cm.ScalarMappable(norm=norm,
cmap=matplotlib.cm.Reds)
cmap=dict_cmap[color_outlier])
cmap.set_array([])
height = 10 * short
fig, ax = plt.subplots(dpi=100,
figsize=(self.img_width, height))
# reorder the anomalies:
# plot first normal data (go to background)
# plot outlier at the end (on top of the others)
ordered_index = np.argsort(scores)
ordered_score = [scores[i] for i in ordered_index]
ordered_machines = [on_machines[i] for i in ordered_index]
fig, axes = plt.subplots(ncols=1, nrows=2, dpi=100,
figsize=(self.img_width, height))
# plot score distribution
sns.distplot(ordered_score, ax=axes[0],
color=color_outlier,
kde=False)
i = 0
for score, host in zip(ordered_score, ordered_machines):
# plot current timeseries
......@@ -366,36 +386,38 @@ class Explorer(object):
# print legend only for a subset
if (len(ordered_machines) - i) <= nr_machines_in_legend:
lebel_plus_score = ("(%.2f) - %s" % (score, host))
ax = df_ts.plot(c=cmap.to_rgba(score), ax=ax,
label=lebel_plus_score,
**pltargs)
axes[1] = df_ts.plot(c=cmap.to_rgba(score),
ax=axes[1],
label=lebel_plus_score,
**pltargs)
else:
ax = df_ts.plot(c=cmap.to_rgba(score), ax=ax,
label='_nolegend_',
**pltargs)
axes[1] = df_ts.plot(c=cmap.to_rgba(score),
ax=axes[1],
label='_nolegend_',
**pltargs)
i += 1
print("Candidates outliers:")
print(ordered_machines[-nr_machines_in_legend:])
ax.set_title(plugin_name, fontsize=20)
axes[1].set_title(plugin_name, fontsize=20)
if window_start is not None:
ax.axvline(pd.to_datetime(window_start), color='red',
linestyle='--')
axes[1].axvline(pd.to_datetime(window_start), color='red',
linestyle='--')
if window_end is not None:
ax.axvline(pd.to_datetime(window_end), color='blue',
linestyle='--')
axes[1].axvline(pd.to_datetime(window_end), color='blue',
linestyle='--')
if window_end is not None and window_start is not None:
ax.axvspan(pd.to_datetime(window_start),
pd.to_datetime(window_end),
alpha=0.5, color='yellow')
axes[1].axvspan(pd.to_datetime(window_start),
pd.to_datetime(window_end),
alpha=0.5, color='yellow')
fig.colorbar(cmap)
# move the legend outside
# and reverse the order (most critical first)
handles, labels = ax.get_legend_handles_labels()
handles, labels = axes[1].get_legend_handles_labels()
plt.legend(handles[::-1], labels[::-1],
title='(Score) - Most anomalous machines',
bbox_to_anchor=(-0.03, 1), loc="upper right")
plt.show()
return ax
plt.show
return axes
def plot_clusters(self,
on_machines,
......@@ -422,6 +444,13 @@ class Explorer(object):
**pltargs: arguments that will be passed to the pandas plot function
of the DataFrame object
"""
# Cluster preprocessing
# count how many sample in each cluster
counter = collections.Counter(labels)
labels_freq_big_first = counter.most_common()
# list of tuples like -> ("label", frequency)
print("Cluster partitioning: %s" % labels_freq_big_first)
on_machines = list(on_machines)
# print(on_machines)
if on_metrics is None:
......@@ -435,14 +464,11 @@ class Explorer(object):
if on_machines is not None:
pdf_swarm = pdf_swarm[[h for h in pdf_swarm.columns
if h in on_machines]]
# count how many sample in each cluster
counter = collections.Counter(labels)
labels_freq_big_first = counter.most_common()
# list of tuples like -> ("label", frequency)
height = 10 * short
fig, ax = plt.subplots(dpi=100,
figsize=(self.img_width, height))
ax.set_title(plugin_name, fontsize=20)
NUM_COLORS = len(labels_freq_big_first)
if smaller_most_critical:
......@@ -453,7 +479,6 @@ class Explorer(object):
i = 0
# get the list of cluster from the biggest to the smalles
for (label, freq) in labels_freq_big_first:
print("Cluster %s with %i elements" % (label, freq))
machines_in_cluster = [on_machines[x]
for x in range(len(labels))
if labels[x] == label]
......
......@@ -14,9 +14,12 @@ def myprint(astring):
def run_scarping(data):
cell_names = data["cell_names"]
output_folder = data["output_folder"]
stat_date = data["date_start"]
local_folder = data["local_folder"]
hdfs_folder = data["hdfs_folder"]
start_date = data["date_start"]
override_on_hdfs = data["override_on_hdfs"]
end_date = data["date_end"]
minutes = data["minutes"]
chunks = data["chunks"]
......@@ -29,11 +32,13 @@ def run_scarping(data):
for c in cell_names:
myprint("Mining Cell: %s" % c)
clds = dataset.CollectdDataset(spark, c)
clds.set_range(stat_date, end_date, minutes)
clds.set_range(start_date, end_date, minutes)
clds.add_plugins(selected_plugins)
clds.acquire_data(local_folder=output_folder,
hdfs_folder="hdfslake",
num_chunks_pandas=chunks)
clds.set_download_options(local_folder=local_folder,
hdfs_folder=hdfs_folder,
num_chunks_pandas=chunks,
override_on_hdfs=override_on_hdfs)
clds.acquire_data()
clds = None
......
{
"output_folder": "/tmp/vm-datalake/",
"date_start": "2020-04-25",
"date_end": "2020-04-30",
"minutes": 60,
"chunks": 10,
"local_folder": "/tmp/vm-datalake/",
"hdfs_folder": "hdfslake",
"date_start": "2020-05-10",
"date_end": "2020-05-15",
"override_on_hdfs": false,
"minutes": 10,
"chunks": 40,
"cell_names": [
"gva_project_014"
],
"selected_plugins": {
"vmem__vmpage_io_memory_in": {"plugin_instance" : "",
"type" : "vmpage_io",
"type_instance" : "memory",
"swap_swapfile_percent_used": {
"plugin_instance" : "swapfile",
"type" : "percent",
"type_instance" : "used",
"plugin_name": "swap"
},
"df_root_percent_bytes_used": {
"plugin_instance" : "root",
"type" : "percent_bytes",
"type_instance" : "used",
"plugin_name": "df"
},
"df_var_percent_bytes_used": {
"plugin_instance" : "var",
"type" : "percent_bytes",
"type_instance" : "used",
"plugin_name": "df"
},
"vmem__vmpage_io_memory_in": {"plugin_instance" : "",
"type" : "vmpage_io",
"type_instance" : "memory",
"value_instance" : "in",
"plugin_name": "vmem",
"scale":"log"},
"vmem__vmpage_io_memory_out": {"plugin_instance" : "",
"type" : "vmpage_io",
"type_instance" : "memory",
"vmem__vmpage_io_memory_out": {"plugin_instance" : "",
"type" : "vmpage_io",
"type_instance" : "memory",
"value_instance" : "out",
"plugin_name": "vmem",
"scale":"log"},
"cpu__percent_user": {"plugin_instance" : "",
"type" : "percent",
"type_instance" : "user",
"cpu__percent_user": {"plugin_instance" : "",
"type" : "percent",
"type_instance" : "user",
"plugin_name": "cpu"},
"cpu__percent_idle": {"plugin_instance" : "",
"type" : "percent",
"type_instance" : "idle",
"cpu__percent_idle": {"plugin_instance" : "",
"type" : "percent",
"type_instance" : "idle",
"plugin_name": "cpu"},
"cpu__percent_system": {"plugin_instance" : "",
"type" : "percent",
"type_instance" : "system",
"cpu__percent_system": {"plugin_instance" : "",
"type" : "percent",
"type_instance" : "system",
"plugin_name": "cpu"},
"load_longterm": {"value_instance" : "longterm",
"load_longterm": {"value_instance" : "longterm",
"plugin_name": "load"},
"memory__memory_free": {"plugin_instance" : "",
"type" : "memory",
"type_instance" : "free",
"memory__memory_free": {"plugin_instance" : "",
"type" : "memory",
"type_instance" : "free",
"plugin_name": "memory"},
"interface__if_octets__tx": {"type" : "if_octets",
"type_instance" : "",
"value_instance" : "tx",
"interface__if_octets__tx": {"type" : "if_octets",
"type_instance" : "",
"value_instance" : "tx",
"plugin_name": "interface"},
"interface__if_octets__rx": {"type" : "if_octets",
"type_instance" : "",
"value_instance" : "rx",
"interface__if_octets__rx": {"type" : "if_octets",
"type_instance" : "",
"value_instance" : "rx",
"plugin_name": "interface"}
}
}
......@@ -17,8 +17,10 @@ def myprint(astring):
def run_analysis(data, algo):
cell_names = data["cell_names"]
output_folder = data["output_folder"]
stat_date = data["date_start"]
local_folder = data["local_folder"]
hdfs_folder = data["hdfs_folder"]
start_date = data["date_start"]
override_on_hdfs = data["override_on_hdfs"]
end_date = data["date_end"]
minutes = data["minutes"]
chunks = data["chunks"]
......@@ -38,11 +40,13 @@ def run_analysis(data, algo):
for cell_name in cell_names:
myprint("Load Dataset Cell: %s" % cell_name)
clds = dataset.CollectdDataset(spark, cell_name)
clds.set_range(stat_date, end_date, minutes)
clds.set_range(start_date, end_date, minutes)
clds.add_plugins(selected_plugins)
clds.acquire_data(local_folder=output_folder,
hdfs_folder="hdfslake",
num_chunks_pandas=chunks)
clds.set_download_options(local_folder=local_folder,
hdfs_folder=hdfs_folder,
num_chunks_pandas=chunks,
override_on_hdfs=override_on_hdfs)
clds.acquire_data()
wd_test = dataprep.WindowDispatcher(clds)
X_test, y_test, prov_test = \
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment