Commit 778b4082 authored by Matteo Paltenghi's avatar Matteo Paltenghi
Browse files

fix bug + aggregate and plot pipeline

parent ab6a843d
......@@ -21,6 +21,7 @@ import random
import re
import subprocess
import pandas as pd
from pyspark import SparkConf # noqa: F401
from pyspark import SparkContext
from pyspark.sql import SparkSession
......@@ -104,6 +105,42 @@ def write_spark_df(df, out_path='', num_files=100):
if out_path != '':
df.coalesce(num_files).write.mode('overwrite').parquet(out_path)
def download_locally_as_csv(spark, hdfs_path,
csv_name="pandas_full",
csv_folder="datalake"):
"""Read a parquet file on HDFS and save a Pandas dataframe locally.
spark: spark context
hdfs_path: where your parquet files are located on the cluster
csv_name: hthe name you want for your output pandas DataFrame in csv format
csv_folder: the folder where you want to output your csv file ("datalake"
by default)
"""
print("Reading from HDFS... " + hdfs_path)
df_on_hdfs = spark.read.parquet(hdfs_path + "/*")
df_on_hdfs.cache()
print("COLUMNS: %s" % df_on_hdfs.schema.names)
if "plugin" in df_on_hdfs.schema.names:
plugins_presents = df_on_hdfs.select('plugin').distinct().collect()
print("Plugins in this DataFrame: %s" % plugins_presents)
if "host" in df_on_hdfs.schema.names:
different_hosts = df_on_hdfs.select('host').distinct().collect()
nr_of_different_hosts = len(different_hosts)
print("Nr of Different Hosts present: %s" % nr_of_different_hosts)
print("Conversion to pandas DataFrame... (this may take a while)")
pdf = df_on_hdfs.orderBy('n_min_group').toPandas()
# create time series
pdf["temporal_index"] = pd.to_datetime(pdf["event_timestamp"])
pdf.set_index("temporal_index", inplace=True)
pdf.drop(columns=['n_min_group'], inplace=True)
# save csv locally
filename = csv_name + ".csv"
print("Saving ... %s in %s " % (filename, csv_folder))
pdf.to_csv(csv_folder + "/" + filename)
return pdf
# HDFS INSPECTION ------------------------------------------------------------
......
......@@ -18,6 +18,7 @@ from datetime import datetime
import json
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql.utils import AnalysisException
from spark_etl.cluster_utils import get_elements_in_dir
......@@ -299,3 +300,19 @@ def inspect_collected_metrics(swan_folder="statistics-about-plugins",
with open(swan_folder + "/" + file_name + ".json") as f:
data = json.load(f)
print(json.dumps(data, indent=4, sort_keys=True))
# PLOTTING -------------------------------------------------------------------
def plot_each_plugin(local_file_path):
"""Plot the timeseries in the csv, one plot per plugin."""
pdf_all_data = pd.read_csv(local_file_path)
for plugin_name in list(pdf_all_data.plugin.unique()):
pdf_plugin = pdf_all_data[pdf_all_data["plugin"] == plugin_name]
pdf_swarm = pdf_plugin.pivot(columns="host",
index="temporal_index",
values="mean_value")
ax = pdf_swarm.plot(figsize=(22, 10))
ax.set_title(plugin_name)
plt.show()
......@@ -25,6 +25,7 @@ from pyspark.sql.types import FloatType
from spark_etl.cluster_utils import get_date
from spark_etl.cluster_utils import get_elements_in_dir
from spark_etl.cluster_utils import write_spark_df
from spark_etl.cluster_utils import MACRO_HOSTGROUP
......@@ -33,6 +34,7 @@ from spark_etl.cluster_utils import MACRO_HOSTGROUP
def create_aggregate_specific_path(spark, inpath, outpath,
n_minutes, selected_cell,
attributes,
timestamp_column='event_timestamp',
file_per_day=1):
"""It reads a parquet file at the inpath location.
......@@ -60,6 +62,22 @@ def create_aggregate_specific_path(spark, inpath, outpath,
filtered_df = filtered_df.filter(
(filtered_df.submitter_hostgroup.contains(selected_cell[0])))
# TODO(Domenico) enable to have a list of possible cells #noqa
# filter all the attributes - a stands for attribute
(a_plugin_instance, a_type, a_type_instance, a_value_instance) = attributes
# filter plugin instance
if a_plugin_instance is not None:
filtered_df = filtered_df.filter(
(filtered_df.plugin_instance == a_plugin_instance))
if a_type is not None:
filtered_df = filtered_df.filter(
(filtered_df.type == a_type))
if a_type_instance is not None:
filtered_df = filtered_df.filter(
(filtered_df.type_instance == a_type_instance))
if a_value_instance is not None:
filtered_df = filtered_df.filter(
(filtered_df.value_instance == a_value_instance))
filtered_df.show(3)
# keep only interesting column
df_projection = filtered_df \
.withColumn(
......@@ -86,6 +104,7 @@ def create_aggregate_specific_path(spark, inpath, outpath,
FloatType())
# create aggregate columns
df_projection.cache()
statistics_df = df_projection\
.groupBy([
'submitter_hostgroup',
......@@ -115,13 +134,13 @@ def create_aggregate_specific_path(spark, inpath, outpath,
# https://danvatterott.com/blog/2018/09/06/python-aggregate-udfs-in-pyspark/
output_df = timestamp_readded_df
output_df.coalesce(file_per_day).write.mode('overwrite').parquet(outpath)
write_spark_df(output_df, out_path=outpath, num_files=file_per_day)
return output_df
def create_aggregates_for_each_date(spark, dir_list, out_dir_list, outbasepath,
outfile, n_minutes,
attributes,
selected_cell=[],
timestamp_column='event_timestamp',
file_per_day=1):
......@@ -161,24 +180,46 @@ def create_aggregates_for_each_date(spark, dir_list, out_dir_list, outbasepath,
print("|---------- the input dirs are %s" % somedirs)
try:
# get_spark_df(somedirs,outpath,nschema)
create_aggregate_specific_path(spark, somedirs, out_path,
create_aggregate_specific_path(spark, somedirs, out_path, # noqa
n_minutes, selected_cell,
timestamp_column,
file_per_day)
timestamp_column=timestamp_column, # noqa
file_per_day=file_per_day,
attributes=attributes)
except Exception as ex:
print("problem?!")
print(ex)
def create_aggregates_for_plugins(spark, out_base_path="collectd_result",
pugins_names=["load", "swap"], n_minutes=15,
plugins=[], n_minutes=15,
selected_cell=[],
year="2020", month="02", day="14",
file_per_day=1):
"""Create statistics for every mentioned plugins."""
"""Create statistics for every mentioned plugins.
plugins: it is a list of tuple that is explaining what you want to
aggregate with the following pattern:
(plugin_name, plugin_instance, type, type_instance, value_instance)
all the elements should be string.
- plugin_name: determines the table you will look at
- all the others: determine with which value to filter each column
before the aggregation process in bin of n minutes (if you want
to retain every line regardless of those column content you have
to put None)
NB you always have to specify a tuple with all the element for each
plugin.
"""
# perform checks on the params
if len(plugins) == 0:
raise Exception("Empty plugin list")
if not all([len(x) == 5 for x in plugins]):
raise Exception("Some plugin doesn't have 5 params (read the doc \
string)")
# output dir in user HDFS area
# out_base_path="collectd_result"
for p in pugins_names:
for p, *attributes in plugins:
print(p)
print("(plugin_instance, type, type_instance, value_instance) = ", attributes) # noqa
# output base file name
out_base_file = "collectd_" + p + ".parquet"
# input file path with data to process with spark
......@@ -188,5 +229,6 @@ def create_aggregates_for_plugins(spark, out_base_path="collectd_result",
out_dir_list = get_elements_in_dir(out_base_path)
create_aggregates_for_each_date(spark, dir_list, out_dir_list,
out_base_path, out_base_file,
n_minutes, selected_cell,
file_per_day)
n_minutes, selected_cell=selected_cell,
file_per_day=file_per_day,
attributes=attributes)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment