Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
cloud-infrastructure
data-analytics
Commits
c7a92926
Commit
c7a92926
authored
Apr 22, 2021
by
Domenico Giordano
Browse files
improve example
parent
85a66929
Changes
1
Hide whitespace changes
Inline
Side-by-side
examples/example_spark_etl_via_swan.ipynb
View file @
c7a92926
...
...
@@ -2,12 +2,7 @@
"cells": [
{
"cell_type": "markdown",
"metadata": {
"ExecuteTime": {
"end_time": "2020-03-24T10:29:29.255273Z",
"start_time": "2020-03-24T10:29:20.050008Z"
}
},
"metadata": {},
"source": [
"# Test access of Spark via pySPark"
]
...
...
@@ -31,12 +26,7 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2021-04-21T16:06:05.526971Z",
"start_time": "2021-04-21T16:06:05.521784Z"
}
},
"metadata": {},
"outputs": [],
"source": [
"import os, sys"
...
...
@@ -53,26 +43,17 @@
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2021-04-21T14:11:38.753712Z",
"start_time": "2021-04-21T13:58:40.523132Z"
},
"scrolled": false
"scrolled": true
},
"outputs": [],
"source": [
"!pip install --user git+https://:@gitlab.cern.ch:8443/cloud-infrastructure/data-analytics.git
@v0.4rc5
"
"!pip
3
install --user git+https://:@gitlab.cern.ch:8443/cloud-infrastructure/data-analytics.git"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2021-04-21T15:41:59.371942Z",
"start_time": "2021-04-21T15:41:59.363074Z"
}
},
"metadata": {},
"outputs": [],
"source": [
"# Extend PATH\n",
...
...
@@ -83,12 +64,7 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2021-04-21T15:46:10.327991Z",
"start_time": "2021-04-21T15:46:08.978928Z"
}
},
"metadata": {},
"outputs": [],
"source": [
"# Let's verify that the version is still 3.8\n",
...
...
@@ -98,15 +74,10 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2021-04-21T15:46:13.440145Z",
"start_time": "2021-04-21T15:46:13.430523Z"
}
},
"metadata": {},
"outputs": [],
"source": [
"os.environ['PYTHONPATH']=os.environ['HOME']+'.local/lib/python3.8/site-packages
/
:'+os.environ['PYTHONPATH']\n",
"os.environ['PYTHONPATH']=os.environ['HOME']+'.local/lib/python3.8/site-packages:'+os.environ['PYTHONPATH']\n",
"# Extend PYTHONPATH\n",
"os.environ['PYTHONPATH']"
]
...
...
@@ -121,26 +92,19 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2021-04-21T15:46:17.002624Z",
"start_time": "2021-04-21T15:46:16.997738Z"
}
},
"metadata": {},
"outputs": [],
"source": [
"# NB: if the library doesn't get loaded \n",
"# at the first time after installation\n",
"# please restart your notebook\n",
"from etl.spark_etl import cluster_utils"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2021-04-21T15:46:17.736316Z",
"start_time": "2021-04-21T15:46:17.729069Z"
}
},
"metadata": {},
"outputs": [],
"source": [
"# This spark config comes from Swan (did you start the cluster pushing on the Star button?!)\n",
...
...
@@ -151,11 +115,7 @@
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2021-04-21T15:46:21.109939Z",
"start_time": "2021-04-21T15:46:20.981701Z"
},
"scrolled": false
"scrolled": true
},
"outputs": [],
"source": [
...
...
@@ -166,12 +126,7 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2021-04-21T15:46:21.866064Z",
"start_time": "2021-04-21T15:46:21.858048Z"
}
},
"metadata": {},
"outputs": [],
"source": [
"sc"
...
...
@@ -180,12 +135,7 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2021-04-21T15:46:25.670331Z",
"start_time": "2021-04-21T15:46:25.661446Z"
}
},
"metadata": {},
"outputs": [],
"source": [
"spark"
...
...
@@ -209,12 +159,7 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2021-04-21T15:46:29.727191Z",
"start_time": "2021-04-21T15:46:29.721742Z"
}
},
"metadata": {},
"outputs": [],
"source": [
"#Configuration PARAMS\n",
...
...
@@ -225,7 +170,7 @@
"#output base file name\n",
"outbasefile=\"rally_errors.parquet\"\n",
"#input file path with data to process with spark\n",
"inbasepath=\"/project/monitoring/archive/openstack/logs/generic/rallytester/2021/
0
*/01\"\n",
"inbasepath=\"/project/monitoring/archive/openstack/logs/generic/rallytester/2021/*/01\"\n",
"#schema file\n",
"schemafile='rally_schema.json'\n"
]
...
...
@@ -233,12 +178,16 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2021-04-21T15:46:39.921840Z",
"start_time": "2021-04-21T15:46:30.881232Z"
}
},
"metadata": {},
"outputs": [],
"source": [
"! hdfs dfs -test -d $outbasepath; [[ \"$?\" -eq \"0\" ]] && echo \"have to remove folder $outbasepath\" && hdfs dfs -rm -r $outbasepath"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Generate the data schema, using a single day example\n",
...
...
@@ -254,12 +203,7 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2021-04-21T15:46:39.953608Z",
"start_time": "2021-04-21T15:46:39.927522Z"
}
},
"metadata": {},
"outputs": [],
"source": [
"# Load the schema from the file\n",
...
...
@@ -269,12 +213,7 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2021-04-21T15:46:39.963432Z",
"start_time": "2021-04-21T15:46:39.957760Z"
}
},
"metadata": {},
"outputs": [],
"source": [
"# Print schema\n",
...
...
@@ -284,12 +223,7 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2021-04-21T15:46:44.828522Z",
"start_time": "2021-04-21T15:46:39.968052Z"
}
},
"metadata": {},
"outputs": [],
"source": [
"# Discover the files in the input data pattern\n",
...
...
@@ -301,12 +235,7 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2021-04-21T15:46:49.987376Z",
"start_time": "2021-04-21T15:46:44.835683Z"
}
},
"metadata": {},
"outputs": [],
"source": [
"# Discover existing output \n",
...
...
@@ -317,21 +246,9 @@
},
{
"cell_type": "code",
"execution_count":
null
,
"execution_count":
71
,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2021-04-21T15:46:50.004596Z",
"start_time": "2021-04-21T15:46:49.992947Z"
}
},
"outputs": [],
"source": [
"# User defined function to extract data and manipulate it\n",
"# This is the effecive ETL\n",
...
...
@@ -342,7 +259,10 @@
" global spark\n",
" global schemafile \n",
" #load the schema\n",
" logschema = cluster_utils.load_schema(schemafile)\n",
" nschema = json.loads(open(schemafile, 'r').read())\n",
" logschema = StructType.fromJson(json.loads(nschema))\n",
"\n",
" #logschema = cluster_utils.load_schema(schemafile)\n",
" #load data\n",
" full_df = spark.read.json(inpath,schema=logschema)\n",
" #prepare for sql\n",
...
...
@@ -350,51 +270,18 @@
" full_df.createOrReplaceTempView(\"tfull\")\n",
"\n",
" #\n",
"
filtered
_df = spark.sql(\"select metadata.timestamp as atime, metadata.hostgroup as hostgroup,\\\n",
"
output
_df = spark.sql(\"select metadata.timestamp as atime, metadata.hostgroup as hostgroup,\\\n",
" metadata.environment, metadata.host as host, data.process as process,\\\n",
" data.short_task as task, data.deployment as deployment,\\\n",
" data.raw,\\\n",
" data.source as source, data.log_source as log_source, metadata._id as uid\\\n",
" from tfull where data.log_level == 'ERROR' and data.log_source == 'rallytester.rallytester' \")\n",
"##############\n",
"### Example of manipulation of strings\n",
"##############\n",
" \n",
" #extract raw substring\n",
"# rawsubstring_regex = lambda x: F.regexp_replace(x,'.* (ERROR rallytester\\.rallytester)\\s\\[[^\\]]*\\]\\s\\[[^\\]]*\\]\\s(Task failed:\\s*|)(.*)','$3')\n",
" \n",
"# #extract json\n",
"# jsonmessage_regex = lambda x: F.regexp_replace(x,'Resource (.*)\\'message\\': u(\\'|\")(.*)(\\'|\"), u\\'code\\'(.*)','$3')\n",
" \n",
"# #replace https long strings\n",
"# httpreplace_regex = lambda x : F.regexp_replace(x,'(.*)(https:\\/\\/(([^\\s\\/]*\\/)))([^\\s]*)(?<=\\w)(?=\\W)(.*)','$1 $2**** $6')\n",
" \n",
"# #replace ips\n",
"# ipreplace_regex = lambda x : F.regexp_replace(x,'(.*)[^\\d]([0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3})(.*)','$1 xyz.xyz.xyz.xyz $3')\n",
" \n",
"# #replace UIDs\n",
"# idreplace_regex_1 = lambda x : F.regexp_replace(x,'(.*)rally-([\\w-\\:]{8,}[>]?)(.*)','$1 rally-xxxx $3')\n",
"# idreplace_regex_2 = lambda x : F.regexp_replace(x,'(.*)(req-[\\w-\\:]{12,})(.*)','$1 xxxx $3')\n",
"# idreplace_regex_3 = lambda x : F.regexp_replace(x,'(.*)([/|\\s][\\w-\\:,]{12,}[/|\\s])(.*)','$1 xxxx $3')\n",
"# idreplace_regex_4 = lambda x : F.regexp_replace(x,'(.*)\\s([\\w-]{12,})(.*)','$1 xxxx $3')\n",
"\n",
"# idreplace_regex = lambda x : idreplace_regex_4(idreplace_regex_3(idreplace_regex_2(idreplace_regex_2(idreplace_regex_1(x)))))\n",
"# all_replaces = lambda x:idreplace_regex(ipreplace_regex(httpreplace_regex(jsonmessage_regex(rawsubstring_regex(x)))))\n",
" \n",
"# regex_df = filtered_df.filter(\n",
"# ~filtered_df.raw.contains(\"Error executing rally task. Check the deployment is correctly configured.\")\n",
"# ).withColumn('msg',all_replaces('raw'))\n",
" \n",
" #regex_df = filtered_df.withColumn('msg',all_replaces('raw')) \n",
" \n",
"\n",
" #regex_df.show()\n",
" #output_df = regex_df\n",
" from tfull where data.log_level == 'ERROR' \")\n",
"\n",
" #moved at python level\n",
" output_df = filtered_df\n",
"\n",
" cluster_utils.write_spark_df(output_df,outpath)\n",
" df_empty = (len(output_df.head(1))==0)\n",
" print(\"Is the dataframe empty? %s\" % df_empty)\n",
" if not df_empty:\n",
" print(\"Saving dataframe\")\n",
" cluster_utils.write_spark_df(output_df,outpath)\n",
" return output_df"
]
},
...
...
@@ -402,10 +289,7 @@
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2021-04-21T15:46:59.552083Z",
"start_time": "2021-04-21T15:46:50.008553Z"
}
"scrolled": false
},
"outputs": [],
"source": [
...
...
@@ -417,12 +301,7 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2021-04-21T15:47:04.218033Z",
"start_time": "2021-04-21T15:46:59.558034Z"
}
},
"metadata": {},
"outputs": [],
"source": [
"# Discover existing output \n",
...
...
@@ -434,12 +313,7 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2021-04-21T15:47:05.108155Z",
"start_time": "2021-04-21T15:47:04.222612Z"
}
},
"metadata": {},
"outputs": [],
"source": [
"#loopOnDates writes in the user HDFS area\n",
...
...
@@ -450,12 +324,7 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"ExecuteTime": {
"end_time": "2021-04-21T15:47:05.705789Z",
"start_time": "2021-04-21T15:47:05.112517Z"
}
},
"metadata": {},
"outputs": [],
"source": [
"allinone.show(n=3,truncate=False)"
...
...
%% Cell type:markdown id: tags:
# Test access of Spark via pySPark
%% Cell type:markdown id: tags:
This notebooks installs the data-analytics package
and tests the basic functionalities
In order to run it in Swan, follow those steps
<br>
1.
Start the swan-spark cluster connection
-
the Star icon in the tool bar
1.
Install the data-analytics package, using a specific tag of the repository (in the example is v0.4rc5)
-
NB. this could require up to 10 mins, be patient. It should be done only once, if not already done
1.
Run the rest of the notebook
-
To extract data from HDFS MONIT and build a derived dataframe
%% Cell type:code id: tags:
```
python
import
os
,
sys
```
%% Cell type:markdown id: tags:
## Install the package (if not done already)
%% Cell type:code id: tags:
```
python
!
pip
install
--
user
git
+
https
:
//
:
@
gitlab
.
cern
.
ch
:
8443
/
cloud
-
infrastructure
/
data
-
analytics
.
git
@
v0
.
4
rc5
!
pip
3
install
--
user
git
+
https
:
//
:
@
gitlab
.
cern
.
ch
:
8443
/
cloud
-
infrastructure
/
data
-
analytics
.
git
```
%% Cell type:code id: tags:
```
python
# Extend PATH
os
.
environ
[
'PATH'
]
=
os
.
environ
[
'HOME'
]
+
'.local/bin:'
+
os
.
environ
[
'PATH'
]
os
.
environ
[
'PATH'
]
```
%% Cell type:code id: tags:
```
python
# Let's verify that the version is still 3.8
!
python
-
V
```
%% Cell type:code id: tags:
```
python
os
.
environ
[
'PYTHONPATH'
]
=
os
.
environ
[
'HOME'
]
+
'.local/lib/python3.8/site-packages
/
:'
+
os
.
environ
[
'PYTHONPATH'
]
os
.
environ
[
'PYTHONPATH'
]
=
os
.
environ
[
'HOME'
]
+
'.local/lib/python3.8/site-packages:'
+
os
.
environ
[
'PYTHONPATH'
]
# Extend PYTHONPATH
os
.
environ
[
'PYTHONPATH'
]
```
%% Cell type:markdown id: tags:
# Access spark using cluster_utils lib
%% Cell type:code id: tags:
```
python
# NB: if the library doesn't get loaded
# at the first time after installation
# please restart your notebook
from
etl.spark_etl
import
cluster_utils
```
%% Cell type:code id: tags:
```
python
# This spark config comes from Swan (did you start the cluster pushing on the Star button?!)
swan_spark_conf
.
getAll
()
```
%% Cell type:code id: tags:
```
python
# Let's access Spark, spark context and configuration (the one above)
sc
,
spark
,
conf
=
cluster_utils
.
set_spark
(
swan_spark_conf
)
```
%% Cell type:code id: tags:
```
python
sc
```
%% Cell type:code id: tags:
```
python
spark
```
%% Cell type:markdown id: tags:
# Test data extraction
%% Cell type:markdown id: tags:
In this example access the rally data, extract a subset of data
and store the results in a different outpath (a file per day)
%% Cell type:code id: tags:
```
python
#Configuration PARAMS
#Define configuration parameters
#output dir in user HDFS area
outbasepath
=
"test_rally_errors"
#output base file name
outbasefile
=
"rally_errors.parquet"
#input file path with data to process with spark
inbasepath
=
"/project/monitoring/archive/openstack/logs/generic/rallytester/2021/
0
*/01"
inbasepath
=
"/project/monitoring/archive/openstack/logs/generic/rallytester/2021/*/01"
#schema file
schemafile
=
'rally_schema.json'
```
%% Cell type:code id: tags:
```
python
!
hdfs
dfs
-
test
-
d
$
outbasepath
;
[[
"$?"
-
eq
"0"
]]
&&
echo
"have to remove folder $outbasepath"
&&
hdfs
dfs
-
rm
-
r
$
outbasepath
```
%% Cell type:code id: tags:
```
python
# Generate the data schema, using a single day example
# in general, if enough data are collected in a single day
# it is representative of the whole data structure expected
# in the other days
spark_df
=
cluster_utils
.
get_schema
(
spark
,
'/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01'
,
schemafile
)
#<<<< If you do not have a schema file pre-defined
```
%% Cell type:code id: tags:
```
python
# Load the schema from the file
logschema
=
cluster_utils
.
load_schema
(
schemafile
)
```
%% Cell type:code id: tags:
```
python
# Print schema
logschema
```
%% Cell type:code id: tags:
```
python
# Discover the files in the input data pattern
print
(
inbasepath
)
dirList
=
cluster_utils
.
get_list_dirs
(
inbasepath
)
print
(
dirList
)
```
%% Cell type:code id: tags:
```
python
# Discover existing output
print
(
"outbasepath is "
+
outbasepath
)
outDirList
=
cluster_utils
.
get_list_dirs
(
"%s/*"
%
outbasepath
)
print
(
"file content is %s"
%
outDirList
)
```
%% Cell type:code id: tags:
```
python
``
`
%%
Cell
type
:
code
id
:
tags
:
```
python
# User defined function to extract data and manipulate it
# This is the effecive ETL
from
pyspark.sql
import
functions
as
F
from
pyspark.sql.window
import
Window
def
getSparkDF
(
inpath
,
outpath
):
global
spark
global
schemafile
#load the schema
logschema = cluster_utils.load_schema(schemafile)
nschema
=
json
.
loads
(
open
(
schemafile
,
'r'
).
read
())
logschema
=
StructType
.
fromJson
(
json
.
loads
(
nschema
))
#logschema = cluster_utils.load_schema(schemafile)
#load data
full_df
=
spark
.
read
.
json
(
inpath
,
schema
=
logschema
)
#prepare for sql
full_df
.
createOrReplaceTempView
(
"tfull"
)
#
filtered
_df = spark.sql("select metadata.timestamp as atime, metadata.hostgroup as hostgroup,\
output
_df
=
spark
.
sql
(
"select metadata.timestamp as atime, metadata.hostgroup as hostgroup,
\
metadata.environment, metadata.host as host, data.process as process,
\
data.short_task as task, data.deployment as deployment,
\
data.raw,
\
data.source as source, data.log_source as log_source, metadata._id as uid
\
from tfull where data.log_level == 'ERROR' and data.log_source == 'rallytester.rallytester' ")
##############
### Example of manipulation of strings
##############
#extract raw substring
# rawsubstring_regex = lambda x: F.regexp_replace(x,'.* (ERROR rallytester\.rallytester)\s\[[^\]]*\]\s\[[^\]]*\]\s(Task failed:\s*|)(.*)','$3')
# #extract json
# jsonmessage_regex = lambda x: F.regexp_replace(x,'Resource (.*)\'message\': u(\'|")(.*)(\'|"), u\'code\'(.*)','$3')
# #replace https long strings
# httpreplace_regex = lambda x : F.regexp_replace(x,'(.*)(https:\/\/(([^\s\/]*\/)))([^\s]*)(?<=\w)(?=\W)(.*)','$1 $2**** $6')
# #replace ips
# ipreplace_regex = lambda x : F.regexp_replace(x,'(.*)[^\d]([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})(.*)','$1 xyz.xyz.xyz.xyz $3')
# #replace UIDs
# idreplace_regex_1 = lambda x : F.regexp_replace(x,'(.*)rally-([\w-\:]{8,}[>]?)(.*)','$1 rally-xxxx $3')
# idreplace_regex_2 = lambda x : F.regexp_replace(x,'(.*)(req-[\w-\:]{12,})(.*)','$1 xxxx $3')
# idreplace_regex_3 = lambda x : F.regexp_replace(x,'(.*)([/|\s][\w-\:,]{12,}[/|\s])(.*)','$1 xxxx $3')
# idreplace_regex_4 = lambda x : F.regexp_replace(x,'(.*)\s([\w-]{12,})(.*)','$1 xxxx $3')
# idreplace_regex = lambda x : idreplace_regex_4(idreplace_regex_3(idreplace_regex_2(idreplace_regex_2(idreplace_regex_1(x)))))
# all_replaces = lambda x:idreplace_regex(ipreplace_regex(httpreplace_regex(jsonmessage_regex(rawsubstring_regex(x)))))
# regex_df = filtered_df.filter(
# ~filtered_df.raw.contains("Error executing rally task. Check the deployment is correctly configured.")
# ).withColumn('msg',all_replaces('raw'))
#regex_df = filtered_df.withColumn('msg',all_replaces('raw'))
#regex_df.show()
#output_df = regex_df
#moved at python level
output_df = filtered_df
from tfull where data.log_level == 'ERROR' "
)
cluster_utils.write_spark_df(output_df,outpath)
df_empty
=
(
len
(
output_df
.
head
(
1
))
==
0
)
print
(
"Is the dataframe empty? %s"
%
df_empty
)
if
not
df_empty
:
print
(
"Saving dataframe"
)
cluster_utils
.
write_spark_df
(
output_df
,
outpath
)
return
output_df
```
%% Cell type:code id: tags:
```
python
# Loop on the day folders, discover files, skip days that have been already processed (or force rewrite)
# run day per day the filter (getSparkDF), and store the filtered data in files in the outputbasepath dir
cluster_utils
.
loop_on_dates
(
getSparkDF
,
inbasepath
,
outbasepath
,
force_rewrite
=
True
,
debug
=
True
)
```
%% Cell type:code id: tags:
```
python
# Discover existing output
print
(
"outbasepath is "
+
outbasepath
)
outDirList
=
cluster_utils
.
get_list_dirs
(
"%s/*"
%
outbasepath
)
print
(
"file content is %s"
%
outDirList
)
```
%% Cell type:code id: tags:
```
python
#loopOnDates writes in the user HDFS area
#now let's read it back
allinone
=
spark
.
read
.
parquet
(
outbasepath
+
"/*"
)
```
%% Cell type:code id: tags:
```
python
allinone
.
show
(
n
=
3
,
truncate
=
False
)
```
%% Cell type:code id: tags:
```
python
``
`
%%
Cell
type
:
code
id
:
tags
:
```
python
```
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment