Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
cloud-infrastructure
data-analytics
Commits
5f3879ee
Commit
5f3879ee
authored
May 03, 2021
by
Domenico Giordano
Browse files
reduce size of dataset
parent
229c115a
Changes
2
Hide whitespace changes
Inline
Side-by-side
tests/spark_etl/integration/test_notebook_spark_connector.ipynb
View file @
5f3879ee
%% Cell type:markdown id: tags:
# Test access of Spark via pySPark
%% Cell type:markdown id: tags:
This notebooks installs the data-analytics package
and tests the basic functionalities
In order to run it in Swan, follow those steps
1) pass your kerberos credentials
2) install the package, using a specific tag (in the example is qa)
3) run
%% Cell type:code id: tags:
```
python
import
os
,
sys
os
.
environ
[
'PYTHONPATH'
]
=
os
.
environ
[
'HOME'
]
+
'.local/lib/python3.6/site-packages/:'
+
os
.
environ
[
'PYTHONPATH'
]
```
%% Cell type:markdown id: tags:
# Test package
%% Cell type:code id: tags:
```
python
from
etl.spark_etl
import
cluster_utils
```
%% Cell type:code id: tags:
```
python
# Start the spark session
sc
,
spark
,
conf
=
cluster_utils
.
set_spark
()
```
%% Cell type:markdown id: tags:
# Test data extraction
%% Cell type:markdown id: tags:
In this example access the rally data, extract a subset of data
and store the results in a different outpath (a file per day)
%% Cell type:code id: tags:
```
python
#Configuration PARAMS
#Define configuration parameters
#output dir in user HDFS area
outbasepath
=
"test_rally_errors_ipynb"
#output base file name
outbasefile
=
"rally_errors.parquet"
#input file path with data to process with spark
inbasepath
=
"/project/monitoring/archive/openstack/logs/generic/rallytester/2021/0
*
/01"
inbasepath
=
"/project/monitoring/archive/openstack/logs/generic/rallytester/2021/0
[2-3]
/01"
#schema file
schemafile
=
'rally_schema.json'
```
%% Cell type:code id: tags:
```
python
!
hdfs
dfs
-
ls
/
project
/
monitoring
/
archive
/
openstack
/
logs
/
generic
/
rallytester
/
2021
/
02
/
01
```
%% Cell type:code id: tags:
```
python
cluster_utils
.
get_list_dirs
(
'/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01'
)
```
%% Cell type:code id: tags:
```
python
full_df
=
spark
.
read
.
json
(
'/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01'
)
```
%% Cell type:code id: tags:
```
python
``
`
%%
Cell
type
:
code
id
:
tags
:
```
python
# Generate the data schema, using a single day example
# in general, if enough data are collected in a single day
# it is representative of the whole data structure expected
# in the other days
spark_df = cluster_utils.get_schema(spark,
'/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01',
schemafile) #<<<< If you do not have a schema file pre-defined
```
%% Cell type:code id: tags:
```
python
# Load the schema from the file
logschema = cluster_utils.load_schema(schemafile)
```
%% Cell type:code id: tags:
```
python
# Print schema
logschema
```
%% Cell type:code id: tags:
```
python
# Discover the files in the input data pattern
print(inbasepath)
dirList = cluster_utils.get_list_dirs(inbasepath)
print(dirList)
```
%% Cell type:code id: tags:
```
python
# Discover existing output
print("outbasepath is " + outbasepath)
outDirList = cluster_utils.get_list_dirs("%s/
*
"%outbasepath)
print("file content is %s" % outDirList)
```
%% Cell type:code id: tags:
```
python
```
%% Cell type:code id: tags:
```
python
# User defined function to extract data and manipulate it
# This is the effecive ETL
from pyspark.sql import functions as F
from pyspark.sql.window import Window
def getSparkDF(inpath,outpath):
global spark
global schemafile
#load the schema
logschema = cluster_utils.load_schema(schemafile)
#load data
full_df = spark.read.json(inpath,schema=logschema)
#prepare for sql
full_df.createOrReplaceTempView("tfull")
#
filtered_df = spark.sql("select metadata.timestamp as atime, metadata.hostgroup as hostgroup,\
metadata.environment, metadata.host as host, data.process as process,\
data.short_task as task, data.deployment as deployment,\
data.raw,\
data.source as source, data.log_source as log_source, metadata._id as uid\
from tfull where data.log_level == 'ERROR' ")
##############
### Example of manipulation of strings
##############
#extract raw substring
# rawsubstring_regex = lambda x: F.regexp_replace(x,'.* (ERROR rallytester\.rallytester)\s\[[^\]]*\]\s\[[^\]]*\]\s(Task failed:\s*|)(.*)','$3')
# #extract json
# jsonmessage_regex = lambda x: F.regexp_replace(x,'Resource (.*)\'message\': u(\'|")(.*)(\'|"), u\'code\'(.*)','$3')
# #replace https long strings
# httpreplace_regex = lambda x : F.regexp_replace(x,'(.*)(https:\/\/(([^\s\/]*\/)))([^\s]*)(?<=\w)(?=\W)(.*)','$1 $2**** $6')
# #replace ips
# ipreplace_regex = lambda x : F.regexp_replace(x,'(.*)[^\d]([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})(.*)','$1 xyz.xyz.xyz.xyz $3')
# #replace UIDs
# idreplace_regex_1 = lambda x : F.regexp_replace(x,'(.*)rally-([\w-\:]{8,}[>]?)(.*)','$1 rally-xxxx $3')
# idreplace_regex_2 = lambda x : F.regexp_replace(x,'(.*)(req-[\w-\:]{12,})(.*)','$1 xxxx $3')
# idreplace_regex_3 = lambda x : F.regexp_replace(x,'(.*)([/|\s][\w-\:,]{12,}[/|\s])(.*)','$1 xxxx $3')
# idreplace_regex_4 = lambda x : F.regexp_replace(x,'(.*)\s([\w-]{12,})(.*)','$1 xxxx $3')
# idreplace_regex = lambda x : idreplace_regex_4(idreplace_regex_3(idreplace_regex_2(idreplace_regex_2(idreplace_regex_1(x)))))
# all_replaces = lambda x:idreplace_regex(ipreplace_regex(httpreplace_regex(jsonmessage_regex(rawsubstring_regex(x)))))
# regex_df = filtered_df.filter(
# ~filtered_df.raw.contains("Error executing rally task. Check the deployment is correctly configured.")
# ).withColumn('msg',all_replaces('raw'))
#regex_df = filtered_df.withColumn('msg',all_replaces('raw'))
#regex_df.show()
#output_df = regex_df
#moved at python level
output_df = filtered_df
cluster_utils.write_spark_df(output_df,outpath)
return output_df
```
%% Cell type:code id: tags:
```
python
# Loop on the day folders, discover files, skip days that have been already processed (or force rewrite)
# run day per day the filter (getSparkDF), and store the filtered data in files in the outputbasepath dir
cluster_utils.loop_on_dates(getSparkDF, inbasepath ,outbasepath, force_rewrite=True, debug=True)
```
%% Cell type:code id: tags:
```
python
# Discover existing output
print("outbasepath is " + outbasepath)
outDirList = cluster_utils.get_list_dirs("%s/
*
"%outbasepath)
print("file content is %s" % outDirList)
```
%% Cell type:code id: tags:
```
python
#loopOnDates writes in the user HDFS area
#now let's read it back
allinone = spark.read.parquet(outbasepath+"/
*
")
```
%% Cell type:code id: tags:
```
python
allinone.show(n=3,truncate=False)
```
%% Cell type:code id: tags:
```
python
```
%% Cell type:code id: tags:
```
python
```
%% Cell type:code id: tags:
```
python
```
%% Cell type:code id: tags:
```
python
```
%% Cell type:code id: tags:
```
python
```
%% Cell type:code id: tags:
```
python
```
...
...
tests/spark_etl/integration/test_spark_connector.py
View file @
5f3879ee
...
...
@@ -127,7 +127,7 @@ def run_tests():
# output dir in user HDFS area
outbasepath
=
"test_rally_errors_py"
# input file path with data to process with spark
inbasepath
=
"/project/monitoring/archive/openstack/logs/generic/rallytester/2021/0
*
/0[1-2]"
# noqa: E501
inbasepath
=
"/project/monitoring/archive/openstack/logs/generic/rallytester/2021/0
2
/0[1-2]"
# noqa: E501
# schema file
schemafile
=
'rally_schema.json'
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment