Commit 53c4cec8 authored by Domenico Giordano's avatar Domenico Giordano
Browse files

Merge branch 'qa' into 'master'

Qa

See merge request cloud-infrastructure/data-analytics!25
parents fd600bb3 2329c39d
......@@ -67,4 +67,9 @@ cover
*.DS_Store
# Atom
.remote-sync.json
\ No newline at end of file
.remote-sync.json
# VSCODE
.vscode/settings.json
.vscode/.ropeproject/config.py
.vscode/.ropeproject/objectdb
......@@ -13,7 +13,7 @@ stages:
# Build and push the image from the Dockerfile at the root of the project.
# To push to a specific docker tag, amend the --destination parameter, e.g. --destination $CI_REGISTRY_IMAGE:$CI_BUILD_REF_NAME
# See https://docs.gitlab.com/ee/ci/variables/predefined_variables.html#variables-reference for available variables
- /kaniko/executor --context $CONTEXT --dockerfile $DOCKERFILE $DESTINATIONS
- /kaniko/executor --context $CONTEXT --dockerfile $DOCKERFILE $DESTINATIONS
# Build image to be used in tox tests
job_build_tox_image:
......@@ -31,14 +31,14 @@ job_build_tox_image:
#-------------------------------------------------------------------------------------
# Build image that runs jupyter notebook with the data-analytics libraries installed
# The same image can be used interactively to start a jupyter notebook
# The same image can be used interactively to start a jupyter notebook
job_build_jupyter_test_image:
stage: build-images
before_script:
- export DOCKERFILE=$CI_PROJECT_DIR/docker-images/jupyter/Dockerfile
- export CONTEXT=$CI_PROJECT_DIR
- export DESTINATIONS="--destination $CI_REGISTRY_IMAGE/jupyter:qa"
- export DESTINATIONS="--destination $CI_REGISTRY_IMAGE/jupyter:qa --destination $CI_REGISTRY_IMAGE/jupyter:ciqa-${CI_COMMIT_SHA:0:8}"
<<: *template_build_image
only:
refs:
......@@ -49,7 +49,7 @@ job_build_jupyter_prod_image:
before_script:
- export DOCKERFILE=$CI_PROJECT_DIR/docker-images/jupyter/Dockerfile
- export CONTEXT=$CI_PROJECT_DIR
- export DESTINATIONS="--destination $CI_REGISTRY_IMAGE/jupyter:latest --destination $CI_REGISTRY_IMAGE/jupyter:${CI_COMMIT_SHA:0:8}"
- export DESTINATIONS="--destination $CI_REGISTRY_IMAGE/jupyter:latest --destination $CI_REGISTRY_IMAGE/jupyter:cimaster-${CI_COMMIT_SHA:0:8}"
<<: *template_build_image
only:
refs:
......@@ -64,7 +64,7 @@ job_build_spark_test_image:
before_script:
- export DOCKERFILE=$CI_PROJECT_DIR/docker-images/sparknotebook/Dockerfile
- export CONTEXT=$CI_PROJECT_DIR
- export DESTINATIONS="--destination $CI_REGISTRY_IMAGE/sparknotebook:qa"
- export DESTINATIONS="--destination $CI_REGISTRY_IMAGE/sparknotebook:qa --destination $CI_REGISTRY_IMAGE/sparknotebook:ciqa-${CI_COMMIT_SHA:0:8}"
<<: *template_build_image
only:
refs:
......@@ -75,7 +75,7 @@ job_build_spark_prod_image:
before_script:
- export DOCKERFILE=$CI_PROJECT_DIR/docker-images/sparknotebook/Dockerfile
- export CONTEXT=$CI_PROJECT_DIR
- export DESTINATIONS="--destination $CI_REGISTRY_IMAGE/sparknotebook:latest --destination $CI_REGISTRY_IMAGE/sparknotebook:${CI_COMMIT_SHA:0:8}"
- export DESTINATIONS="--destination $CI_REGISTRY_IMAGE/sparknotebook:latest --destination $CI_REGISTRY_IMAGE/sparknotebook:cimaster-${CI_COMMIT_SHA:0:8}"
<<: *template_build_image
only:
refs:
......@@ -105,10 +105,10 @@ coverage:
- cover
# -----------------------------------------
#
#
job_run_notebook_grafana_etl:
stage: test
image:
image:
name: gitlab-registry.cern.ch/cloud-infrastructure/data-analytics/jupyter:qa
entrypoint: [""]
only:
......@@ -121,19 +121,19 @@ job_run_notebook_grafana_etl:
- cd $CI_PROJECT_DIR/grafana_etl/tests/
- jupyter nbconvert --to notebook --execute test_ETL.ipynb
artifacts:
paths:
- $CI_PROJECT_DIR/grafana_etl/tests/test_ETL.nbconvert.ipynb
expire_in: 1 week
when: always
paths:
- $CI_PROJECT_DIR/grafana_etl/tests/test_ETL.nbconvert.ipynb
expire_in: 1 week
when: always
# -----------------------------------------
#
#
job_run_spark_etl_test:
stage: test
tags:
- data-analytics-spark-ci # for private runner
image:
name: gitlab-registry.cern.ch/cloud-infrastructure/data-analytics/sparknotebook:qa
- data-analytics-spark-ci # for private runner
image:
name: gitlab-registry.cern.ch/cloud-infrastructure/data-analytics/sparknotebook:qa
entrypoint: [""]
only:
refs:
......@@ -142,3 +142,8 @@ job_run_spark_etl_test:
- export KRBUSER=$CI_USER
- export KRBPASSWD=$CI_USER_PASSWD
- $CI_PROJECT_DIR/spark_etl/tests/test_spark_connector.sh
artifacts:
paths:
- $CI_PROJECT_DIR/spark_etl/tests/*
expire_in: 1 week
when: always
\ No newline at end of file
# data-analytics
# Data Analytics
ETL
======
| qa | master |
| :-: | :-: |
|[![pipeline status qa](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/badges/qa/pipeline.svg)](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/-/commits/qa) |[![pipeline status master](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/badges/master/pipeline.svg)](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/-/commits/master)|
|[![coverage report qa](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/badges/qa/coverage.svg)](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/-/commits/qa)|[![coverage report master](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/badges/master/coverage.svg)](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/-/commits/master)|
![Build status](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/badges/testing/build.svg)
![coverage](https://gitlab.cern.ch/giordano/cloud-infrastructure/data-analytics/testing/coverage.svg?job=coverage)
The project contains a suite of tools (and respective packages) to:
1. extract time series data from CERN databases: InfluxDB, ElasticSearch, Spark Cluster.
2. analyze time series data.
Test algorithms for ETL
This time series data can come from:
- metrics measured for each hypervisor in the Data Centre.
- derived timeseries from log file analysis.
A central part of this project is the package that will contains a different algorithms to analyze the time series data and raise alarms if needed.
## Getting Started
You can use this module in two environments:
1. in the SWAN notebook
1. in your local machine
### Prerequisites
You need to:
1. be able to authenticate via kerberos for the Spark access
```
kinit username@CERN.CH
```
2. have a Grafana token, to access the InfluxDB and ElasticSearch data.
### Installing - SWAN (https://swan.web.cern.ch/)
1. Make sure you start Swan connecting to the right Spark cluster, that in general is `the General Purpose (Analytix)`
2. Paste this in you notebook:
``` python
# necessary to download the authenticate
import getpass
import os, sys
print("Please enter your kerberos password")
ret = os.system("echo \"%s\" | kinit" % getpass.getpass())
if ret == 0: print("Credentials created successfully")
else: sys.stderr.write('Error creating credentials, return code: %s\n' % ret)
```
2. Execute the cell and insert your passwork. Now you are logged in.
3. Paste this and execute:
```
! pip install --user git+https://:@gitlab.cern.ch:8443/cloud-infrastructure/data-analytics.git
```
4. Instal the package that you want (e.g):
``` python
from spark_etl import cluster_utils
```
### Installing - Local machine
1. You can directly login to your shell with the command:
```
kinit username@CERN.CH
```
2. Insert you pasword.
3. Directly install the package from gitlab
```
! pip install --user git+https://:@gitlab.cern.ch:8443/cloud-infrastructure/data-analytics.git
```
## Possible use cases
![use-case-diagram](documentation/diagrams/use-case-data-analytics.png)
## Example Notebooks
You can refer to the [notebook folder](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/-/tree/master/notebooks) to explore some example of usage.
## Authors
This project is created and mantained by the IT-CM-RPS team.
In particular the main active contributors are: Domenico, Patrycja, Matteo.
# Modules
The project is composed by the following ETL (Extract, Transform, Loa) modules:
1. Grafana ETL
1. Spark ETL
## Grafana ETL
Here the UML class diagram of the module.
![use-case-diagram](diagrams/grafana_etl_package.png)
## Spark ETL
(Work in progress)
\ No newline at end of file
"""Utility functions for PySpark usage.
SPARK + HDFS + JSON
You have functions:
- to setup your spark variables needed to access the cluster.
- to get information about the HDFS folder structure (especially
in terms of the nested folders that contains data about months
and days)
- to handle JSON files inside your cluster
"""
from collections import defaultdict
import functools
import json
import os
import random
import re
import subprocess
import pandas as pd
from pyspark import SparkConf # noqa: F401
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
import random
import subprocess
# import pyspark.sql.functions as F
# from pyspark.sql.window import Window
# import sys
from pyspark.sql.types import StructType
MACRO_HOSTGROUP = 'cloud_compute/level2/'
TIMESTAMP_TO_CONSIDER = 'event_timestamp'
def stopSpark(sc, spark):
"""Manage stop of Spark context and session, if existing."""
try:
sc.stop()
except Exception as e:
print("Cannot stop sc")
print(e)
try:
spark.stop()
except Exception as e:
print("Cannot stop spark")
print(e)
# SPARK MANAGEMENT -----------------------------------------------------------
def setSpark(spark_conf=SparkConf()):
def set_spark(spark_conf=SparkConf()):
"""Set Spark.
NB: kerberos file in /tmp is expected
......@@ -80,36 +86,65 @@ def setSpark(spark_conf=SparkConf()):
return sc, spark, spark_conf
def getSchema(spark, inpath, jsonschema):
"""Autodiscover schema from file inspections.
In order to generate a schema of the data,
use this utility with a subset of the data
(1 day for instance)
spark: spark session
inpath: input path of data to discover the schema
jsonschema: output file where to store the schema in json format
"""
full_df = spark.read.json(inpath)
f = open(jsonschema, 'w')
f.write(json.dumps(full_df.schema.json()))
f.close()
return full_df
def stop_spark(sc, spark):
"""Manage stop of Spark context and session, if existing."""
try:
sc.stop()
except Exception as ex:
print("Cannot stop sc")
print(ex)
try:
spark.stop()
except Exception as ex:
print("Cannot stop spark")
print(ex)
def loadSchema(jsonschema):
"""Load schema from json file."""
nschema = json.loads(open(jsonschema, 'r').read())
logschema = StructType.fromJson(json.loads(nschema))
return logschema
def write_spark_df(df, out_path='', num_files=100):
if out_path != '':
df.coalesce(num_files).write.mode('overwrite').parquet(out_path)
def writeSparkDF(df, outpath='', numFiles=100):
if outpath != '':
df.coalesce(numFiles).write.mode('overwrite').parquet(outpath)
def download_locally_as_csv(spark, hdfs_path,
csv_name="pandas_full",
csv_folder="datalake"):
"""Read a parquet file on HDFS and save a Pandas dataframe locally.
def getListDirs(inpath):
spark: spark context
hdfs_path: where your parquet files are located on the cluster
csv_name: hthe name you want for your output pandas DataFrame in csv format
csv_folder: the folder where you want to output your csv file ("datalake"
by default)
"""
print("Reading from HDFS... " + hdfs_path)
df_on_hdfs = spark.read.parquet(hdfs_path + "/*")
df_on_hdfs.cache()
print("COLUMNS: %s" % df_on_hdfs.schema.names)
if "plugin" in df_on_hdfs.schema.names:
plugins_presents = df_on_hdfs.select('plugin').distinct().collect()
print("Plugins in this DataFrame: %s" % plugins_presents)
if "host" in df_on_hdfs.schema.names:
different_hosts = df_on_hdfs.select('host').distinct().collect()
nr_of_different_hosts = len(different_hosts)
print("Nr of Different Hosts present: %s" % nr_of_different_hosts)
print("Conversion to pandas DataFrame... (this may take a while)")
pdf = df_on_hdfs.orderBy('n_min_group').toPandas()
# create time series
pdf["temporal_index"] = pd.to_datetime(pdf["event_timestamp"])
pdf.set_index("temporal_index", inplace=True)
pdf.drop(columns=['n_min_group'], inplace=True)
# save csv locally
filename = csv_name + ".csv"
print("Saving ... %s in %s " % (filename, csv_folder))
pdf.to_csv(csv_folder + "/" + filename)
return pdf
# HDFS INSPECTION ------------------------------------------------------------
def get_list_dirs(inpath):
"""get list of subdirs from hdfs."""
result = subprocess.run(['hdfs', 'dfs', '-ls', '-d', inpath],
stdout=subprocess.PIPE)
......@@ -122,15 +157,84 @@ def getListDirs(inpath):
return output
def getDate(astring):
def get_elements_in_dir(path, return_absolute_path=True,
do_not_modify_path=False):
"""Wrapper for the HDFS cmd 'hdfs dfs -ls -d {path}'.
Get the list of elements in hdfs at the given path.
e.g. if you want the document in folder 2019 pass:
the command is expecting sth like:
"/project/monitoring/collectd/cpu/2019"
With return_absolute_path param you can decide if
you want full absolute path or relative path
(aka names of the subfolders).
With do_not_modify_path = True you actually query the
exact path you entered, with false a modification is made
get always the subfolders (i.e. adding "/* at the end").
"""
if not do_not_modify_path:
if path[-1] != "/":
path += "/"
path += "*"
cmd = 'hdfs dfs -ls -d {}'.format(path)
# my_cmd_as_bytes = str.encode(cmd)
try:
raw_console_output = str(subprocess.check_output(cmd, shell=True))
except subprocess.CalledProcessError:
raw_console_output = ""
# https://stackoverflow.com/questions/35750614/pyspark-get-list-of-files-directories-on-hdfs-path
directories = re.findall(r"\/(.*?)\\n", raw_console_output)
# https://stackoverflow.com/questions/2013124/regex-matching-up-to-the-first-occurrence-of-a-character
# print(raw_console_output)
if return_absolute_path:
directories = ["/" + x for x in directories]
else:
directories = [x.split("/")[-1] for x in directories]
return directories
def scan_plugin_folder(plugin_name,
base_collectd_path="/project/monitoring/collectd/"):
"""Collect all the folder paths in a plugin folder.
It scans the plugin folder an return a list
absolute paths sorted alphabetically. Each
path with represent a month.
The list will be sorted such that the first
path is the one with the oldest data.
"""
plugin_folder = base_collectd_path + plugin_name
paths = []
years_folders = get_elements_in_dir(plugin_folder,
return_absolute_path=True)
print("Year folders for this plugin: %s" % years_folders)
# return the list only if this plugin is active in the 2020
no_2020 = True
for f in years_folders:
if f.split("/")[-1] == "2020":
no_2020 = False
if no_2020:
return []
for year_folder in years_folders:
# year_path = "/project/monitoring/collectd/"
# + plugin_name+"/"+year+"/*/*"
# print("Inspecting: %s " % year_folder)
months_folders = get_elements_in_dir(year_folder,
return_absolute_path=True)
# print("months_folders: %s" % months_folders)
paths += months_folders
return sorted(paths)
def get_date(a_string):
"""extract the date structure."""
import re
prog = re.compile("20[1-2][0-9]/[0-9][0-9]/[0-9][0-9]")
return prog.search(astring).group(0)
return prog.search(a_string).group(0)
def loopOnDates(getSparkDF, inpath, outbasepath,
forceRewrite=False, debug=False):
def loop_on_dates(get_spark_DF, inpath, out_base_path,
force_rewrite=False, debug=False):
"""Iterate on daily based data folders and extract/filter/transform data.
Following the directory patterns of input data,
......@@ -138,56 +242,83 @@ def loopOnDates(getSparkDF, inpath, outbasepath,
day by day, and store the results in daily smaller files
in local area.
Avoids to run again on daily datasets already processed,
except if forceRewrite is True.
except if force_rewrite is True.
Arguments:
getSparkDF: function that specifies the ETL
get_spark_DF: function that specifies the ETL
inpath: list of input paths (list)
outbasepath: directory where to store the extracted daily based data
forceRewrite: overwrite the already processed days, and rerun on the whole
out_base_path: directory where to store the extracted daily based data
force_rewrite: overwrite the already processed days, and rerun on the whole
input dataset
debug: verbosity
"""
dirList = getListDirs(inpath)
outDirList = getListDirs("%s/*" % outbasepath)
if forceRewrite:
outDirList = []
dir_list = get_list_dirs(inpath)
out_dir_list = get_list_dirs("%s/*" % out_base_path)
if force_rewrite:
out_dir_list = []
if debug:
for v in outDirList:
for v in out_dir_list:
print('Existing out dirs %s' % v)
dateList = defaultdict(list)
for x in dirList:
dateList[getDate(x)].append(x)
date_list = defaultdict(list)
for x in dir_list:
date_list[get_date(x)].append(x)
'''loop on dates'''
for adate, somedirs in dateList.items():
"""loop on dates"""
for adate, somedirs in date_list.items():
if debug:
print("---------- checking date ", adate)
outpath = "%s/%s" % (outbasepath, adate.replace("/", "_"))
outpath = "%s/%s" % (out_base_path, adate.replace("/", "_"))
# the tmp data go in another dir
outpathtmp = "%s/tmp_%s" % (outbasepath, adate.replace("/", "_"))
outpathtmp = "%s/tmp_%s" % (out_base_path, adate.replace("/", "_"))
# removing and refreshing the tmp
# given that the tmp is going to evolve, running on tmp in any case
if outpathtmp in outDirList:
if outpathtmp in out_dir_list:
if debug:
print("\tremoving %s " % outpathtmp)
subprocess.run(['hdfs', 'dfs', '-rm',
'-r', '-skipTrash',
outpathtmp], stdout=subprocess.PIPE)
isTMP = False
is_tmp = False
if somedirs[0].find(".tmp") > 0:
outpath = outpathtmp
isTMP = True
is_tmp = True
if outpath not in outDirList or isTMP:
if outpath not in out_dir_list or is_tmp:
if debug:
print("\"---------- the output dir is ", outpath)
print("\"---------- the input dirs are %s" % somedirs)
try:
getSparkDF(somedirs, outpath)
except Exception as e:
get_spark_DF(somedirs, outpath)
except Exception as ex:
print("problem?!")
print(e)
print(ex)
# JSON SPECIFIC FUNCTIONS ----------------------------------------------------
def get_schema(spark, in_path, json_schema):
"""Autodiscover schema from file inspections.
In order to generate a schema of the data,
use this utility with a subset of the data
(1 day for instance)
spark: spark session
inpath: input path of data to discover the schema
jsonschema: output file where to store the schema in json format
"""
full_df = spark.read.json(in_path)
file = open(json_schema, 'w')
file.write(json.dumps(full_df.schema.json()))
file.close()
return full_df
def load_schema(json_schema):
"""Load schema from json file."""
nschema = json.loads(open(json_schema, 'r').read())
logschema = StructType.fromJson(json.loads(nschema))
return logschema
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"ExecuteTime": {
"end_time": "2020-03-24T10:29:29.255273Z",
"start_time": "2020-03-24T10:29:20.050008Z"
}
},
"source": [
"# Test access of Spark via pySPark"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This notebooks installs the data-analytics package\n",
"and tests the basic functionalities\n",
"\n",
"In order to run it in Swan, follow those steps\n",
"1) pass your kerberos credentials\n",
"2) install the package, using a specific tag (in the example is qa)\n",
"3) run "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2020-04-06T16:56:27.806751Z",
"start_time": "2020-04-06T16:56:27.801090Z"
}
},
"outputs": [],
"source": [
"import os, sys \n",
"\n",
"os.environ['PYTHONPATH']=os.environ['HOME']+'.local/lib/python3.6/site-packages/:'+os.environ['PYTHONPATH']\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Test package"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2020-04-06T16:57:54.952772Z",
"start_time": "2020-04-06T16:57:54.946536Z"
}
},
"outputs": [],