Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
cloud-infrastructure
data-analytics
Commits
21dd2b3b
Commit
21dd2b3b
authored
Apr 28, 2021
by
smetaj
Browse files
wrong files move adjusted
parent
2ad982a6
Changes
4
Expand all
Hide whitespace changes
Inline
Side-by-side
etl/etl_steps.py
deleted
100644 → 0
View file @
2ad982a6
This diff is collapsed.
Click to expand it.
etl/spark_etl/etl_steps.py
View file @
21dd2b3b
...
...
@@ -93,8 +93,7 @@ def data_reading(spark, plugins, days, hostgroups):
"""
plugins
=
copy
.
deepcopy
(
plugins
)
inpaths
=
\
create_paths_for_collectd
(
basepath
=
"/project/monitoring/collectd"
,
plugins
=
plugins
,
days
=
days
)
create_paths_for_collectd
(
plugins
=
plugins
,
days
=
days
)
existing_inpaths
=
keep_only_existing_paths
(
spark
=
spark
,
paths
=
inpaths
)
[
print
(
p
)
for
p
in
sorted
(
existing_inpaths
)]
...
...
@@ -611,9 +610,10 @@ def _keep_only_these_plugins_and_hg(df, plugins, hostgroups):
plugin_dict
=
plugins
[
k
]
# for all attributes of this plugin
and_conditions
=
[]
plugin_dict
[
"plugin"
]
=
plugin_dict
.
pop
(
"plugin_name"
)
for
sub_k
in
plugin_dict
.
keys
():
value
=
plugin_dict
[
sub_k
]
filters
=
plugin_dict
[
'plugin_filters'
]
filters
[
"plugin"
]
=
filters
.
pop
(
"plugin_name"
)
for
sub_k
in
filters
.
keys
():
value
=
filters
[
sub_k
]
and_conditions
.
append
(
sub_k
+
" == '"
+
value
+
"'"
)
plg_condition
=
"( "
+
" and "
.
join
(
and_conditions
)
+
" )"
# print("plg_condition: ", plg_condition)
...
...
etl/spark_etl/utils.py
View file @
21dd2b3b
...
...
@@ -172,19 +172,18 @@ def get_date_between(start_date: str, end_date: str):
return
list_of_dates
def
create_paths_for_collectd
(
basepath
,
plugins
,
days
):
def
create_paths_for_collectd
(
plugins
,
days
):
"""Create the paths to access the plugin metrics."""
paths
=
[]
for
k
in
plugins
.
keys
():
current_plg_dict
=
plugins
[
k
]
print
(
current_plg_dict
)
plg_folder
=
current_plg_dict
[
"plugin_
name
"
]
plg_folder
=
current_plg_dict
[
"plugin_
data_path
"
]
for
(
year
,
month
,
day
)
in
days
:
str_year
=
str
(
year
)
str_month
=
str
(
month
).
zfill
(
2
)
str_day
=
str
(
day
).
zfill
(
2
)
new_path
=
basepath
+
\
"/"
+
plg_folder
+
\
new_path
=
plg_folder
+
\
"/"
+
str_year
+
"/"
+
str_month
+
"/"
+
str_day
+
"/"
paths
.
append
(
new_path
)
return
list
(
set
(
paths
))
...
...
etl/utils.py
deleted
100644 → 0
View file @
2ad982a6
"""Genearl Utils for file, folder, config management."""
from
datetime
import
datetime
from
datetime
import
timedelta
import
json
import
pandas
as
pd
import
re
from
adcern.publisher
import
create_id_for_metrics_dict
from
adcern.publisher
import
create_id_for_hostrgroups_and_plugins_start_end
def
read_json_config
(
local_path
):
"""Read the configuration file.
Params
------
local_path: str
path for the JSON file with the following keys:
- hdfs_out_folder: (str) where to save your files in hdfs
- date_start: (str) day for the first date e.g. "2020-02-14"
- date_end_excluded: (str) day for the last date (excluded)
- overwrite_on_hdfs: (boolean) to decide if overwrite or not (default
is false = no overwrite)
- aggregate_every_n_minutes: (int) how many min do you want to
aggregate the raw timeseries
- history_steps: (int) nr of past lag timestamp in each window
- future_steps: (int) nr of future timestamp in each window
- hostgroups: (list of str) the hostgroups you are interested in.
NB put them in square brachets
e.g. ["cloud_compute/level2/batch/gva_project_014",
"cloud_compute/level2/main/gva_shared_012"]
- selected_plugins: (dict of dict)
every key represents a signal, and that signal name will be used
to identify that signal all the time. Attributes must be specificed
for every signal by using a dictionary with the following
attributes:
"plugin_instance", "type", "type_instance", "value_instance",
"plugin_name", "scale"
NB "scale" can be "log" to have a logarithmic transformation TODO
Return
------
config_dict: dict
containing all the above mentioned keys
hostgroups: list of str
plugins: list of plugins
"""
with
open
(
local_path
)
as
json_file
:
config_dict
=
json
.
load
(
json_file
)
return
config_dict
def
create_path_for_hostgroup
(
config_dict
,
hostgroup
):
"""Create all paths for the configuration dictionary.
Params
------
config_dict: dict with the following keys
hdfs_out_folder (str), date_start (str), date_end_excluded(str),
overwrite_on_hdfs (boolean), aggregate_every_n_minutes (int),
history_steps (int), future_steps (int),
hostgroups (list of str), selected_plugins (dict of dict)
hostgroup: str
hostgroup that you want to construct
"""
base_path
=
config_dict
[
"hdfs_out_folder"
]
# frequency
freq_path
=
\
"freq="
+
str
(
config_dict
[
"aggregate_every_n_minutes"
])
+
"min"
# hostgroups
escaped_hostgroup
=
convert_to_slash
(
hostgroup
)
hostgroup_path
=
\
"hostgroup="
+
escaped_hostgroup
# plugins
plugins
=
config_dict
[
"selected_plugins"
]
id_plugins
=
create_id_for_metrics_dict
(
plugins
)
plugins_path
=
\
"plg="
+
id_plugins
# normalization
hostgroups
=
config_dict
[
"hostgroups"
]
normalization_id
=
\
create_id_for_hostrgroups_and_plugins_start_end
(
hostgroups
=
hostgroups
,
plugins
=
plugins
,
start
=
config_dict
[
"date_start_normalization"
],
end
=
config_dict
[
"date_end_normalization_excluded"
])
normalization_path
=
\
"norm="
+
normalization_id
# history
history_path
=
\
"h="
+
str
(
config_dict
[
"history_steps"
])
# future
future_path
=
\
"f="
+
str
(
config_dict
[
"future_steps"
])
# create parts around the day
first_part
=
\
"/"
.
join
([
base_path
,
freq_path
,
hostgroup_path
])
last_part
=
\
"/"
.
join
([
plugins_path
,
normalization_path
,
history_path
,
future_path
])
# days
days
=
get_date_between
(
config_dict
[
"date_start"
],
config_dict
[
"date_end_excluded"
])
paths
=
[]
for
(
year
,
month
,
day
)
in
days
:
str_year
=
"year="
+
str
(
year
)
str_month
=
"month="
+
str
(
month
)
# str_day = "day=" + str(day)
new_path
=
\
first_part
+
\
"/"
+
str_year
+
"/"
+
str_month
+
"/"
+
\
last_part
# str_day + "/" +
paths
.
append
(
new_path
)
print
(
"Reading the following paths:"
)
[
print
(
x
)
for
x
in
paths
]
return
paths
def
delete_files
(
spark
,
config_dict
,
hostgroup
):
"""Delete all saved files related to this hostrgoup and interval.
All the info are contained in the config dictionary.
Params
------
spark: spark context
config_filepath: dictionary with the following keys:
hdfs_out_folder (str), date_start (str), date_end_excluded(str),
overwrite_on_hdfs (boolean), aggregate_every_n_minutes (int),
history_steps (int), future_steps (int),
hostgroups (list of str), selected_plugins (dict of dict)
hostgroup: str
hostgroup that you want to get, it must be in the config file too, as
an element in the list "hostgroups" key
"""
paths_to_delete
=
create_path_for_hostgroup
(
config_dict
,
hostgroup
)
sc
=
spark
.
sparkContext
hadoop
=
sc
.
_jvm
.
org
.
apache
.
hadoop
fs
=
hadoop
.
fs
.
FileSystem
conf
=
hadoop
.
conf
.
Configuration
()
hdfs_conf
=
fs
.
get
(
conf
)
for
path_str
in
paths_to_delete
:
path
=
hadoop
.
fs
.
Path
(
path_str
)
if
hdfs_conf
.
exists
(
path
):
hdfs_conf
.
delete
(
path
)
print
(
"Deleted: "
,
path_str
)
else
:
print
(
"Not existing: "
,
path_str
)
print
(
"Success"
)
def
get_date_between
(
start_date
:
str
,
end_date
:
str
):
"""Get date in the following range as tuples of int (year, month, day).
NB: the end_date is excluded
"""
list_of_dates
=
[]
sdate
=
datetime
.
strptime
(
start_date
,
"%Y-%m-%d"
)
# start date
edate
=
datetime
.
strptime
(
end_date
,
"%Y-%m-%d"
)
# end date
delta
=
edate
-
sdate
# as timedelta
for
i
in
range
(
delta
.
days
):
day
=
sdate
+
timedelta
(
days
=
i
)
day_tuple
=
(
day
.
year
,
day
.
month
,
day
.
day
)
list_of_dates
.
append
(
day_tuple
)
print
(
"Start: %s - End %s"
%
(
list_of_dates
[
0
],
list_of_dates
[
-
1
]))
return
list_of_dates
def
create_paths_for_collectd
(
plugins
,
days
):
"""Create the paths to access the plugin metrics."""
paths
=
[]
for
k
in
plugins
.
keys
():
current_plg_dict
=
plugins
[
k
]
print
(
current_plg_dict
)
plg_folder
=
current_plg_dict
[
"plugin_data_path"
]
for
(
year
,
month
,
day
)
in
days
:
str_year
=
str
(
year
)
str_month
=
str
(
month
).
zfill
(
2
)
str_day
=
str
(
day
).
zfill
(
2
)
new_path
=
plg_folder
+
\
"/"
+
str_year
+
"/"
+
str_month
+
"/"
+
str_day
+
"/"
paths
.
append
(
new_path
)
return
list
(
set
(
paths
))
def
keep_only_existing_paths
(
spark
,
paths
):
"""Filter paths and discard the non-existent.
The not existent will be printed
Return
------
existing_paths: list of str
path that actually exists
"""
sc
=
spark
.
sparkContext
hadoop
=
sc
.
_jvm
.
org
.
apache
.
hadoop
# https://stackoverflow.com/questions/52166841/spark-reading-data-frame-from-list-of-paths-with-empty-path
# https://hadoop.apache.org/docs/r2.8.2/api/org/apache/hadoop/fs/FileSystem.html
fs
=
hadoop
.
fs
.
FileSystem
conf
=
hadoop
.
conf
.
Configuration
()
existing_paths
=
[]
non_existing
=
[]
for
path_str
in
paths
:
path
=
hadoop
.
fs
.
Path
(
path_str
)
try
:
fs
.
get
(
conf
).
listStatus
(
path
)
existing_paths
.
append
(
path_str
)
except
Exception
:
print
(
"not found: "
,
path_str
)
# print(e)
non_existing
.
append
(
path_str
)
if
len
(
existing_paths
)
==
0
:
raise
Exception
(
"In this configuration file you inserted only data "
"that do not exist in HDFS"
)
return
existing_paths
def
convert_to_slash
(
path
):
"""Replace the slashed in the same way done by Spark."""
return
path
.
replace
(
"/"
,
"%2F"
)
def
read_local_window_dataset
(
config_filepath
=
None
,
path
=
None
,
nr_timeseries
=
None
,
list_metrics_to_keep
=
None
):
"""Read the window dataset from the local cache folder.
Params
------
config_filepath: path to json with the following keys:
local_cache_folder (str), code_project_name (str),
selected_plugins (dict of dict)
path: str
direct path of the parquet file
nr_timeseries: int
nr of plugins you expect to find (needed if path is passed)
list_metrics_to_keep: list of str
it includes the list of names of the metrics you want to read.
Use the same name in the configuration file, those names will be
used to filter columns.
Return
------
feature_dataset: Pandas Dataframe
each row is a vector containing the values for the
selected metrics for one host in a given timestamp.
load_h0 | load_h1 | load_h2 | cpu_h0 | cpu_h1 | cpu_h2 | etc
load-t means the signal load t step in the past.
Refer to: history_steps (int) key in the dict
provenance_dataset: Pandas Dataframe
hostname | timestamp
to know to where each row is coming from in
terms of host and timestamp (the current timestamp is part of
the history)
nr_timeseries: int
number of plugins present
"""
if
config_filepath
is
None
\
and
path
is
not
None
\
and
nr_timeseries
is
not
None
:
local_path
=
path
nr_timeseries
=
nr_timeseries
else
:
config_dict
=
read_json_config
(
config_filepath
)
local_path
=
\
config_dict
[
"local_cache_folder"
]
+
config_dict
[
"code_project_name"
]
# noqa
nr_timeseries
=
len
(
config_dict
[
"selected_plugins"
])
print
(
"Opening in Pandas -> parquet file: "
,
local_path
)
pdf
=
pd
.
read_parquet
(
local_path
)
pdf
.
drop_duplicates
(
subset
=
[
"timestamp"
,
"hostname"
],
inplace
=
True
)
regex_history
=
re
.
compile
(
"_h[0-9]*$"
)
h_columns
=
list
(
filter
(
regex_history
.
search
,
pdf
.
columns
))
# keep only history columns
feature_dataset
=
pdf
[
h_columns
]
if
list_metrics_to_keep
is
not
None
:
columns_to_keep
=
[]
for
metric
in
list_metrics_to_keep
:
regex_this_metric
=
re
.
compile
(
metric
+
"_h[0-9]*$"
)
columns_this_metric
=
\
list
(
filter
(
regex_this_metric
.
search
,
pdf
.
columns
))
columns_to_keep
+=
columns_this_metric
feature_dataset
=
feature_dataset
[
columns_to_keep
]
provenance_dataset
=
pdf
[[
"hostname"
,
"ts"
,
"timestamp"
]]
return
feature_dataset
,
provenance_dataset
,
nr_timeseries
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment