Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
cloud-infrastructure
data-analytics
Commits
e5a6e742
Commit
e5a6e742
authored
Apr 22, 2021
by
Domenico Giordano
Browse files
make sure out folders are empty
parent
c7a92926
Changes
3
Hide whitespace changes
Inline
Side-by-side
tests/spark_etl/integration/test_notebook_spark_connector.ipynb
View file @
e5a6e742
...
...
@@ -107,7 +107,7 @@
"#Define configuration parameters \n",
"\n",
"#output dir in user HDFS area\n",
"outbasepath=\"test_rally_errors\"\n",
"outbasepath=\"test_rally_errors
_ipynb
\"\n",
"#output base file name\n",
"outbasefile=\"rally_errors.parquet\"\n",
"#input file path with data to process with spark\n",
...
...
%% Cell type:markdown id: tags:
# Test access of Spark via pySPark
%% Cell type:markdown id: tags:
This notebooks installs the data-analytics package
and tests the basic functionalities
In order to run it in Swan, follow those steps
1) pass your kerberos credentials
2) install the package, using a specific tag (in the example is qa)
3) run
%% Cell type:code id: tags:
```
python
import
os
,
sys
os
.
environ
[
'PYTHONPATH'
]
=
os
.
environ
[
'HOME'
]
+
'.local/lib/python3.6/site-packages/:'
+
os
.
environ
[
'PYTHONPATH'
]
```
%% Cell type:markdown id: tags:
# Test package
%% Cell type:code id: tags:
```
python
from
etl.spark_etl
import
cluster_utils
```
%% Cell type:code id: tags:
```
python
# Start the spark session
sc
,
spark
,
conf
=
cluster_utils
.
set_spark
()
```
%% Cell type:markdown id: tags:
# Test data extraction
%% Cell type:markdown id: tags:
In this example access the rally data, extract a subset of data
and store the results in a different outpath (a file per day)
%% Cell type:code id: tags:
```
python
#Configuration PARAMS
#Define configuration parameters
#output dir in user HDFS area
outbasepath
=
"test_rally_errors"
outbasepath
=
"test_rally_errors
_ipynb
"
#output base file name
outbasefile
=
"rally_errors.parquet"
#input file path with data to process with spark
inbasepath
=
"/project/monitoring/archive/openstack/logs/generic/rallytester/2021/0*/01"
#schema file
schemafile
=
'rally_schema.json'
```
%% Cell type:code id: tags:
```
python
!
hdfs
dfs
-
ls
/
project
/
monitoring
/
archive
/
openstack
/
logs
/
generic
/
rallytester
/
2021
/
02
/
01
```
%% Cell type:code id: tags:
```
python
cluster_utils
.
get_list_dirs
(
'/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01'
)
```
%% Cell type:code id: tags:
```
python
full_df
=
spark
.
read
.
json
(
'/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01'
)
```
%% Cell type:code id: tags:
```
python
``
`
%%
Cell
type
:
code
id
:
tags
:
```
python
# Generate the data schema, using a single day example
# in general, if enough data are collected in a single day
# it is representative of the whole data structure expected
# in the other days
spark_df = cluster_utils.get_schema(spark,
'/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01',
schemafile) #<<<< If you do not have a schema file pre-defined
```
%% Cell type:code id: tags:
```
python
# Load the schema from the file
logschema = cluster_utils.load_schema(schemafile)
```
%% Cell type:code id: tags:
```
python
# Print schema
logschema
```
%% Cell type:code id: tags:
```
python
# Discover the files in the input data pattern
print(inbasepath)
dirList = cluster_utils.get_list_dirs(inbasepath)
print(dirList)
```
%% Cell type:code id: tags:
```
python
# Discover existing output
print("outbasepath is " + outbasepath)
outDirList = cluster_utils.get_list_dirs("%s/
*
"%outbasepath)
print("file content is %s" % outDirList)
```
%% Cell type:code id: tags:
```
python
```
%% Cell type:code id: tags:
```
python
# User defined function to extract data and manipulate it
# This is the effecive ETL
from pyspark.sql import functions as F
from pyspark.sql.window import Window
def getSparkDF(inpath,outpath):
global spark
global schemafile
#load the schema
logschema = cluster_utils.load_schema(schemafile)
#load data
full_df = spark.read.json(inpath,schema=logschema)
#prepare for sql
full_df.createOrReplaceTempView("tfull")
#
filtered_df = spark.sql("select metadata.timestamp as atime, metadata.hostgroup as hostgroup,\
metadata.environment, metadata.host as host, data.process as process,\
data.short_task as task, data.deployment as deployment,\
data.raw,\
data.source as source, data.log_source as log_source, metadata._id as uid\
from tfull where data.log_level == 'ERROR' and data.log_source == 'rallytester.rallytester' ")
##############
### Example of manipulation of strings
##############
#extract raw substring
# rawsubstring_regex = lambda x: F.regexp_replace(x,'.* (ERROR rallytester\.rallytester)\s\[[^\]]*\]\s\[[^\]]*\]\s(Task failed:\s*|)(.*)','$3')
# #extract json
# jsonmessage_regex = lambda x: F.regexp_replace(x,'Resource (.*)\'message\': u(\'|")(.*)(\'|"), u\'code\'(.*)','$3')
# #replace https long strings
# httpreplace_regex = lambda x : F.regexp_replace(x,'(.*)(https:\/\/(([^\s\/]*\/)))([^\s]*)(?<=\w)(?=\W)(.*)','$1 $2**** $6')
# #replace ips
# ipreplace_regex = lambda x : F.regexp_replace(x,'(.*)[^\d]([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})(.*)','$1 xyz.xyz.xyz.xyz $3')
# #replace UIDs
# idreplace_regex_1 = lambda x : F.regexp_replace(x,'(.*)rally-([\w-\:]{8,}[>]?)(.*)','$1 rally-xxxx $3')
# idreplace_regex_2 = lambda x : F.regexp_replace(x,'(.*)(req-[\w-\:]{12,})(.*)','$1 xxxx $3')
# idreplace_regex_3 = lambda x : F.regexp_replace(x,'(.*)([/|\s][\w-\:,]{12,}[/|\s])(.*)','$1 xxxx $3')
# idreplace_regex_4 = lambda x : F.regexp_replace(x,'(.*)\s([\w-]{12,})(.*)','$1 xxxx $3')
# idreplace_regex = lambda x : idreplace_regex_4(idreplace_regex_3(idreplace_regex_2(idreplace_regex_2(idreplace_regex_1(x)))))
# all_replaces = lambda x:idreplace_regex(ipreplace_regex(httpreplace_regex(jsonmessage_regex(rawsubstring_regex(x)))))
# regex_df = filtered_df.filter(
# ~filtered_df.raw.contains("Error executing rally task. Check the deployment is correctly configured.")
# ).withColumn('msg',all_replaces('raw'))
#regex_df = filtered_df.withColumn('msg',all_replaces('raw'))
#regex_df.show()
#output_df = regex_df
#moved at python level
output_df = filtered_df
cluster_utils.write_spark_df(output_df,outpath)
return output_df
```
%% Cell type:code id: tags:
```
python
# Loop on the day folders, discover files, skip days that have been already processed (or force rewrite)
# run day per day the filter (getSparkDF), and store the filtered data in files in the outputbasepath dir
cluster_utils.loop_on_dates(getSparkDF, inbasepath ,outbasepath, force_rewrite=True, debug=True)
```
%% Cell type:code id: tags:
```
python
# Discover existing output
print("outbasepath is " + outbasepath)
outDirList = cluster_utils.get_list_dirs("%s/
*
"%outbasepath)
print("file content is %s" % outDirList)
```
%% Cell type:code id: tags:
```
python
#loopOnDates writes in the user HDFS area
#now let's read it back
allinone = spark.read.parquet(outbasepath+"/
*
")
```
%% Cell type:code id: tags:
```
python
allinone.show(n=3,truncate=False)
```
%% Cell type:code id: tags:
```
python
```
%% Cell type:code id: tags:
```
python
```
%% Cell type:code id: tags:
```
python
```
%% Cell type:code id: tags:
```
python
```
%% Cell type:code id: tags:
```
python
```
%% Cell type:code id: tags:
```
python
```
...
...
tests/spark_etl/integration/test_spark_connector.py
View file @
e5a6e742
...
...
@@ -122,7 +122,7 @@ def run_tests():
# Define configuration parameters
# ################################
# output dir in user HDFS area
outbasepath
=
"test_rally_errors"
outbasepath
=
"test_rally_errors
_py
"
# input file path with data to process with spark
inbasepath
=
"/project/monitoring/archive/openstack/logs/generic/rallytester/2021/0*/0[1-2]"
# noqa: E501
# schema file
...
...
tests/spark_etl/integration/test_spark_connector.sh
View file @
e5a6e742
...
...
@@ -47,6 +47,8 @@ hdfs dfs -ls /project/monitoring || fail 'test access to hdfs'
echo
-e
"
\n
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"
echo
-e
"
\n
test python pyspark module
\n
"
echo
-e
"@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
\n
"
outbasepath
=
"test_rally_errors_py"
hdfs dfs
-test
-d
$outbasepath
;
[[
"
$?
"
-eq
"0"
]]
&&
echo
"have to remove folder
$outbasepath
"
&&
hdfs dfs
-rm
-r
$outbasepath
python3
${
WORK_DIR
}
/test_spark_connector.py
# This still does not work in the swan image (issue with nbconvert libraries)
...
...
@@ -54,4 +56,6 @@ python3 ${WORK_DIR}/test_spark_connector.py
#echo -e "\ntest jupyter notebook\n"
jupyter nbconvert
--version
#echo -e "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
outbasepath
=
"test_rally_errors_ipynb"
hdfs dfs
-test
-d
$outbasepath
;
[[
"
$?
"
-eq
"0"
]]
&&
echo
"have to remove folder
$outbasepath
"
&&
hdfs dfs
-rm
-r
$outbasepath
jupyter nbconvert
--ExecutePreprocessor
.timeout
=
600
--to
notebook
--execute
${
WORK_DIR
}
/test_notebook_spark_connector.ipynb
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment