Commit 68b005a2 authored by Domenico Giordano's avatar Domenico Giordano
Browse files
parents 98b521e9 f859632c
......@@ -12,6 +12,7 @@
#
# In order to run the same script manually, assuming only docker available, run
#
# CI_PROJECT_DIR=`pwd | sed -e 's@/tests/spark_etl@@'`
# docker run --rm -e CI_USER=$CI_USER -e CI_USER_PASSWD=$CI_USER_PASSWD -e CI_PROJECT_DIR=${CI_PROJECT_DIR} -v /tmp:/tmp -v /builds:/builds -v `pwd`:/work -v /var/run/docker.sock:/var/run/docker.sock gitlab-registry.cern.ch/cloud-infrastructure/data-analytics/compose:qa /work/tests/spark_etl/ci_test_script.sh
#
# Consider to open the Spark connection ports in iptables
......
%% Cell type:markdown id: tags:
# Test access of Spark via pySPark
%% Cell type:markdown id: tags:
This notebooks installs the data-analytics package
and tests the basic functionalities
In order to run it in Swan, follow those steps
1) pass your kerberos credentials
2) install the package, using a specific tag (in the example is qa)
3) run
%% Cell type:markdown id: tags:
## Kerberos credentials
%% Cell type:code id: tags:
``` python
import getpass
import os, sys
print("Please enter your kerberos password")
ret = os.system("echo \"%s\" | kinit" % getpass.getpass())
if ret == 0: print("Credentials created successfully")
else: sys.stderr.write('Error creating credentials, return code: %s\n' % ret)
```
%% Output
Please enter your password
········
Credentials created successfully
%% Cell type:markdown id: tags:
## Install the package (if not done already)
%% Cell type:code id: tags:
``` python
!pip install --user git+https://:@gitlab.cern.ch:8443/cloud-infrastructure/data-analytics.git@feature/spark
```
%% Cell type:markdown id: tags:
# Test package
%% Cell type:code id: tags:
``` python
from etl.spark_etl import cluster_utils
```
%% Cell type:code id: tags:
``` python
swan_spark_conf.getAll()
```
%% Output
dict_items([('spark.extraListeners', 'sparkmonitor.listener.JupyterSparkMonitorListener'), ('spark.driver.extraClassPath', '/usr/local/lib/swan/extensions/sparkmonitor/listener.jar')])
%% Cell type:code id: tags:
``` python
# Test connection to Spark
sc, spark, conf = cluster_utils.set_spark(swan_spark_conf)
```
%% Output
dict_items([('spark.extraListeners', 'sparkmonitor.listener.JupyterSparkMonitorListener'), ('spark.driver.extraClassPath', '/usr/local/lib/swan/extensions/sparkmonitor/listener.jar:/eos/project/s/swan/public/hadoop-mapreduce-client-core-2.6.0-cdh5.7.6.jar'), ('spark.master', 'yarn'), ('spark.authenticate', 'True'), ('spark.network.crypto.enabled', 'True'), ('spark.authenticate.enableSaslEncryption', 'True'), ('spark.logConf', 'True'), ('spark.driver.host', 'swan006.cern.ch'), ('spark.driver.port', '5026'), ('spark.blockManager.port', '5244'), ('spark.ui.port', '5196'), ('spark.executorEnv.PYTHONPATH', '/usr/local/lib/swan/extensions/:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/lib:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/lib/python3.6/site-packages'), ('spark.executorEnv.LD_LIBRARY_PATH', '/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/lib/python3.6/site-packages/tensorflow:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/lib/python3.6/site-packages/tensorflow/contrib/tensor_forest:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/lib/python3.6/site-packages/tensorflow/python/framework:/cvmfs/sft.cern.ch/lcg/releases/java/8u91-ae32f/x86_64-centos7-gcc8-opt/jre/lib/amd64:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/lib64:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/lib:/cvmfs/sft.cern.ch/lcg/releases/gcc/8.3.0-cebb0/x86_64-centos7/lib:/cvmfs/sft.cern.ch/lcg/releases/gcc/8.3.0-cebb0/x86_64-centos7/lib64:/cvmfs/sft.cern.ch/lcg/releases/binutils/2.30-e5b21/x86_64-centos7/lib:/usr/local/lib/:/cvmfs/sft.cern.ch/lcg/releases/R/3.5.3-883db/x86_64-centos7-gcc8-opt/lib64/R/library/readr/rcon'), ('spark.executorEnv.JAVA_HOME', '/cvmfs/sft.cern.ch/lcg/releases/java/8u91-ae32f/x86_64-centos7-gcc8-opt'), ('spark.executorEnv.SPARK_HOME', '/cvmfs/sft.cern.ch/lcg/releases/spark/2.4.3-cern2-82741/x86_64-centos7-gcc8-opt'), ('spark.executorEnv.SPARK_EXTRA_CLASSPATH', '/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/etc/hadoop:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/share/hadoop/common/lib/*:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/share/hadoop/common/*:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/share/hadoop/hdfs:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/share/hadoop/hdfs/lib/*:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/share/hadoop/hdfs/*:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/share/hadoop/yarn/lib/*:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/share/hadoop/yarn/*:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/share/hadoop/mapreduce/lib/*:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/share/hadoop/mapreduce/*:/cvmfs/sft.cern.ch/lcg/releases/hadoop_xrootd/1.0.4-5a566/x86_64-centos7-gcc8-opt/lib/hadoop-xrootd.jar'), ('spark.driver.memory', '8g')])
%% Cell type:code id: tags:
``` python
sc
```
%% Output
<SparkContext master=yarn appName=pyspark-shell>
%% Cell type:code id: tags:
``` python
spark
```
%% Output
<pyspark.sql.session.SparkSession at 0x7f0cae0615c0>
%% Cell type:code id: tags:
``` python
# Test stopping spark session
cluster_utils.stop_spark(sc,spark)
```
%% Cell type:code id: tags:
``` python
# Start again the spark session
sc, spark, conf = cluster_utils.set_spark(swan_spark_conf)
```
%% Output
dict_items([('spark.extraListeners', 'sparkmonitor.listener.JupyterSparkMonitorListener'), ('spark.driver.extraClassPath', '/usr/local/lib/swan/extensions/sparkmonitor/listener.jar:/eos/project/s/swan/public/hadoop-mapreduce-client-core-2.6.0-cdh5.7.6.jar:/eos/project/s/swan/public/hadoop-mapreduce-client-core-2.6.0-cdh5.7.6.jar'), ('spark.master', 'yarn'), ('spark.authenticate', 'True'), ('spark.network.crypto.enabled', 'True'), ('spark.authenticate.enableSaslEncryption', 'True'), ('spark.logConf', 'True'), ('spark.driver.host', 'swan006.cern.ch'), ('spark.driver.port', '5026'), ('spark.blockManager.port', '5244'), ('spark.ui.port', '5196'), ('spark.executorEnv.PYTHONPATH', '/usr/local/lib/swan/extensions/:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/lib:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/lib/python3.6/site-packages'), ('spark.executorEnv.LD_LIBRARY_PATH', '/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/lib/python3.6/site-packages/tensorflow:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/lib/python3.6/site-packages/tensorflow/contrib/tensor_forest:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/lib/python3.6/site-packages/tensorflow/python/framework:/cvmfs/sft.cern.ch/lcg/releases/java/8u91-ae32f/x86_64-centos7-gcc8-opt/jre/lib/amd64:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/lib64:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/lib:/cvmfs/sft.cern.ch/lcg/releases/gcc/8.3.0-cebb0/x86_64-centos7/lib:/cvmfs/sft.cern.ch/lcg/releases/gcc/8.3.0-cebb0/x86_64-centos7/lib64:/cvmfs/sft.cern.ch/lcg/releases/binutils/2.30-e5b21/x86_64-centos7/lib:/usr/local/lib/:/cvmfs/sft.cern.ch/lcg/releases/R/3.5.3-883db/x86_64-centos7-gcc8-opt/lib64/R/library/readr/rcon'), ('spark.executorEnv.JAVA_HOME', '/cvmfs/sft.cern.ch/lcg/releases/java/8u91-ae32f/x86_64-centos7-gcc8-opt'), ('spark.executorEnv.SPARK_HOME', '/cvmfs/sft.cern.ch/lcg/releases/spark/2.4.3-cern2-82741/x86_64-centos7-gcc8-opt'), ('spark.executorEnv.SPARK_EXTRA_CLASSPATH', '/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/etc/hadoop:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/share/hadoop/common/lib/*:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/share/hadoop/common/*:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/share/hadoop/hdfs:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/share/hadoop/hdfs/lib/*:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/share/hadoop/hdfs/*:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/share/hadoop/yarn/lib/*:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/share/hadoop/yarn/*:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/share/hadoop/mapreduce/lib/*:/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/share/hadoop/mapreduce/*:/cvmfs/sft.cern.ch/lcg/releases/hadoop_xrootd/1.0.4-5a566/x86_64-centos7-gcc8-opt/lib/hadoop-xrootd.jar'), ('spark.driver.memory', '8g')])
%% Cell type:markdown id: tags:
# Test data extraction
%% Cell type:markdown id: tags:
In this example access the rally data, extract a subset of data
and store the results in a different outpath (a file per day)
%% Cell type:code id: tags:
``` python
#Configuration PARAMS
#Define configuration parameters
#output dir in user HDFS area
outbasepath="test_rally_errors"
#output base file name
outbasefile="rally_errors.parquet"
#input file path with data to process with spark
inbasepath="/project/monitoring/archive/openstack/logs/generic/rallytester/2020/0*/01"
inbasepath="/project/monitoring/archive/openstack/logs/generic/rallytester/2021/0*/01"
#schema file
schemafile='rally_schema.json'
```
%% Cell type:code id: tags:
``` python
# Generate the data schema, using a single day example
# in general, if enough data are collected in a single day
# it is representative of the whole data structure expected
# in the other days
spark_df = cluster_utils.get_schema(spark,
'/project/monitoring/archive/openstack/logs/generic/rallytester/2020/02/01',
'/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01',
schemafile) #<<<< If you do not have a schema file pre-defined
```
%% Cell type:code id: tags:
``` python
# Load the schema from the file
logschema = cluster_utils.load_schema(schemafile)
```
%% Cell type:code id: tags:
``` python
# Print schema
logschema
```
%% Output
StructType(List(StructField(data,StructType(List(StructField(component,StringType,true),StructField(deployment,StringType,true),StructField(duration,StringType,true),StructField(file,StringType,true),StructField(filename,StringType,true),StructField(log_level,StringType,true),StructField(log_source,StringType,true),StructField(process,StringType,true),StructField(raw,StringType,true),StructField(scenarios,StringType,true),StructField(short_msg,StringType,true),StructField(short_task,StringType,true),StructField(source,StringType,true),StructField(succeed,StringType,true),StructField(task,StringType,true))),true),StructField(metadata,StructType(List(StructField(_id,StringType,true),StructField(environment,StringType,true),StructField(host,StringType,true),StructField(hostgroup,StringType,true),StructField(json,StringType,true),StructField(kafka_timestamp,LongType,true),StructField(partition,StringType,true),StructField(path,StringType,true),StructField(producer,StringType,true),StructField(timestamp,LongType,true),StructField(topic,StringType,true),StructField(toplevel_hostgroup,StringType,true),StructField(type,StringType,true),StructField(type_prefix,StringType,true))),true)))
%% Cell type:code id: tags:
``` python
# Discover the files in the input data pattern
print(inbasepath)
dirList = cluster_utils.get_list_dirs(inbasepath)
print(dirList)
```
%% Output
/project/monitoring/archive/openstack/logs/generic/rallytester/2020/0*/01
['/project/monitoring/archive/openstack/logs/generic/rallytester/2020/01/01', '/project/monitoring/archive/openstack/logs/generic/rallytester/2020/02/01', '/project/monitoring/archive/openstack/logs/generic/rallytester/2020/03/01']
%% Cell type:code id: tags:
``` python
# Discover existing output
print("outbasepath is " + outbasepath)
outDirList = cluster_utils.get_list_dirs("%s/*"%outbasepath)
print("file content is %s" % outDirList)
```
%% Output
outbasepath is test_rally_errors
file content is ['test_rally_errors/2020_01_01', 'test_rally_errors/2020_02_01', 'test_rally_errors/2020_03_01']
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
# User defined function to extract data and manipulate it
# This is the effecive ETL
from pyspark.sql import functions as F
from pyspark.sql.window import Window
def getSparkDF(inpath,outpath):
global spark
global schemafile
#load the schema
logschema = cluster_utils.load_schema(schemafile)
#load data
full_df = spark.read.json(inpath,schema=logschema)
#prepare for sql
full_df.createOrReplaceTempView("tfull")
#
filtered_df = spark.sql("select metadata.timestamp as atime, metadata.hostgroup as hostgroup,\
metadata.environment, metadata.host as host, data.process as process,\
data.short_task as task, data.deployment as deployment,\
data.raw,\
data.source as source, data.log_source as log_source, metadata._id as uid\
from tfull where data.log_level == 'ERROR' and data.log_source == 'rallytester.rallytester' ")
##############
### Example of manipulation of strings
##############
#extract raw substring
# rawsubstring_regex = lambda x: F.regexp_replace(x,'.* (ERROR rallytester\.rallytester)\s\[[^\]]*\]\s\[[^\]]*\]\s(Task failed:\s*|)(.*)','$3')
# #extract json
# jsonmessage_regex = lambda x: F.regexp_replace(x,'Resource (.*)\'message\': u(\'|")(.*)(\'|"), u\'code\'(.*)','$3')
# #replace https long strings
# httpreplace_regex = lambda x : F.regexp_replace(x,'(.*)(https:\/\/(([^\s\/]*\/)))([^\s]*)(?<=\w)(?=\W)(.*)','$1 $2**** $6')
# #replace ips
# ipreplace_regex = lambda x : F.regexp_replace(x,'(.*)[^\d]([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})(.*)','$1 xyz.xyz.xyz.xyz $3')
# #replace UIDs
# idreplace_regex_1 = lambda x : F.regexp_replace(x,'(.*)rally-([\w-\:]{8,}[>]?)(.*)','$1 rally-xxxx $3')
# idreplace_regex_2 = lambda x : F.regexp_replace(x,'(.*)(req-[\w-\:]{12,})(.*)','$1 xxxx $3')
# idreplace_regex_3 = lambda x : F.regexp_replace(x,'(.*)([/|\s][\w-\:,]{12,}[/|\s])(.*)','$1 xxxx $3')
# idreplace_regex_4 = lambda x : F.regexp_replace(x,'(.*)\s([\w-]{12,})(.*)','$1 xxxx $3')
# idreplace_regex = lambda x : idreplace_regex_4(idreplace_regex_3(idreplace_regex_2(idreplace_regex_2(idreplace_regex_1(x)))))
# all_replaces = lambda x:idreplace_regex(ipreplace_regex(httpreplace_regex(jsonmessage_regex(rawsubstring_regex(x)))))
# regex_df = filtered_df.filter(
# ~filtered_df.raw.contains("Error executing rally task. Check the deployment is correctly configured.")
# ).withColumn('msg',all_replaces('raw'))
#regex_df = filtered_df.withColumn('msg',all_replaces('raw'))
#regex_df.show()
#output_df = regex_df
#moved at python level
output_df = filtered_df
cluster_utils.write_spark_df(output_df,outpath)
return output_df
```
%% Cell type:code id: tags:
``` python
# Loop on the day folders, discover files, skip days that have been already processed (or force rewrite)
# run day per day the filter (getSparkDF), and store the filtered data in files in the outputbasepath dir
cluster_utils.loop_on_dates(getSparkDF, inbasepath ,outbasepath, forceRewrite=True, debug=True)
```
%% Output
---------- checking date 2020/01/01
"---------- the output dir is test_rally_errors/2020_01_01
"---------- the input dirs are ['/project/monitoring/archive/openstack/logs/generic/rallytester/2020/01/01']
---------- checking date 2020/02/01
"---------- the output dir is test_rally_errors/2020_02_01
"---------- the input dirs are ['/project/monitoring/archive/openstack/logs/generic/rallytester/2020/02/01']
---------- checking date 2020/03/01
"---------- the output dir is test_rally_errors/2020_03_01
"---------- the input dirs are ['/project/monitoring/archive/openstack/logs/generic/rallytester/2020/03/01']
%% Cell type:code id: tags:
``` python
# Discover existing output
print("outbasepath is " + outbasepath)
outDirList = cluster_utils.get_list_dirs("%s/*"%outbasepath)
print("file content is %s" % outDirList)
```
%% Output
outbasepath is test_rally_errors
file content is ['test_rally_errors/2020_01_01', 'test_rally_errors/2020_02_01', 'test_rally_errors/2020_03_01']
%% Cell type:code id: tags:
``` python
#loopOnDates writes in the user HDFS area
#now let's read it back
allinone = spark.read.parquet(outbasepath+"/*")
```
%% Cell type:code id: tags:
``` python
allinone.show(n=3,truncate=False)
```
%% Output
+-------------+----------------------+------------+----------------------------+-------+------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-----------------------+------------------------------------+
|atime |hostgroup |environment |host |process|task |deployment |raw |source |log_source |uid |
+-------------+----------------------+------------+----------------------------+-------+------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-----------------------+------------------------------------+
|1583061322620|cloud_monitoring/rally|train_master|cci-rally-ff92280f7b.cern.ch|21644 |reboot-linux|pt8_project_009|2020-03-01 12:15:22.620 21644 ERROR rallytester.rallytester [-] [nova reboot-linux pt8_project_009] Task failed: Resource <Server: rally-570d-Wnj6> has ERROR status. Fault: {u'message': u'internal error: qemu unexpectedly closed the monitor: ioctl(KVM_CREATE_VM) failed: 12 Cannot allocate memory\n2020-03-01T11:15:03.823527Z qemu-kvm: failed to initialize KVM: Cannot allocate memory', u'code': 500, u'created': u'2020-03-01T11:15:07Z'} <sep-rally-sep> internal error: qemu unexpectedly closed the monitor: ioctl(KVM_CREATE_VM) failed: 12 Cannot alloca |rallytester|rallytester.rallytester|91a5c588-84e7-81fd-3de9-f73cc5d185b5|
|1583061354570|cloud_monitoring/rally|train_master|cci-rally-74af4dadb3.cern.ch|13817 |reboot-linux|gva_project_049|2020-03-01 12:15:54.570 13817 ERROR rallytester.rallytester [-] [nova reboot-linux gva_project_049] Task failed: Resource <Server: rally-5650-fMkh> has ERROR status. Fault: {u'message': u'internal error: process exited while connecting to monitor: ioctl(KVM_CREATE_VM) failed: 12 Cannot allocate memory\n2020-03-01T11:15:37.733637Z qemu-kvm: failed to initialize KVM: Cannot allocate memory', u'code': 500, u'created': u'2020-03-01T11:15:46Z'} <sep-rally-sep> internal error: process exited while connecting to monitor: ioctl(KVM_CREATE_VM) failed: 12 Cannot |rallytester|rallytester.rallytester|ab653166-8060-f003-0176-e37b9f9a5776|
|1583061424318|cloud_monitoring/rally|train_master|cci-rally-ik2bmv72j4.cern.ch|25129 |boot-linux |pt8_project_005|2020-03-01 12:17:04.318 25129 ERROR rallytester.rallytester [-] [nova boot-linux pt8_project_005] Task failed: Resource <Server: rally-301b-XRvu> has ERROR status. Fault: {u'message': u'internal error: process exited while connecting to monitor: ioctl(KVM_CREATE_VM) failed: 12 Cannot allocate memory\n2020-03-01T11:16:40.316622Z qemu-kvm: failed to initialize KVM: Cannot allocate memory', u'code': 500, u'created': u'2020-03-01T11:16:52Z'} <sep-rally-sep> internal error: process exited while connecting to monitor: ioctl(KVM_CREATE_VM) failed: 12 Cannot |rallytester|rallytester.rallytester|a0ee1ad7-1448-47fe-6bc3-3688cc4abc1e|
+-------------+----------------------+------------+----------------------------+-------+------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-----------------------+------------------------------------+
only showing top 3 rows
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
```
......
%% Cell type:markdown id: tags:
# Test access of Spark via pySPark
%% Cell type:markdown id: tags:
This notebooks installs the data-analytics package
and tests the basic functionalities
In order to run it in Swan, follow those steps
1) pass your kerberos credentials
2) install the package, using a specific tag (in the example is qa)
3) run
%% Cell type:code id: tags:
``` python
import os, sys
os.environ['PYTHONPATH']=os.environ['HOME']+'.local/lib/python3.6/site-packages/:'+os.environ['PYTHONPATH']
```
%% Cell type:markdown id: tags:
# Test package
%% Cell type:code id: tags:
``` python
from etl.spark_etl import cluster_utils
```
%% Cell type:code id: tags:
``` python
# Start the spark session
sc, spark, conf = cluster_utils.set_spark()
```
%% Cell type:markdown id: tags:
# Test data extraction
%% Cell type:markdown id: tags:
In this example access the rally data, extract a subset of data
and store the results in a different outpath (a file per day)
%% Cell type:code id: tags:
``` python
#Configuration PARAMS
#Define configuration parameters
#output dir in user HDFS area
outbasepath="test_rally_errors"
#output base file name
outbasefile="rally_errors.parquet"
#input file path with data to process with spark
inbasepath="/project/monitoring/archive/openstack/logs/generic/rallytester/2020/0*/01"
inbasepath="/project/monitoring/archive/openstack/logs/generic/rallytester/2021/0*/01"
#schema file
schemafile='rally_schema.json'
```
%% Cell type:code id: tags:
``` python
!hdfs dfs -ls /project/monitoring/archive/openstack/logs/generic/rallytester/2020/02/01
!hdfs dfs -ls /project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01
```
%% Cell type:code id: tags:
``` python
cluster_utils.get_list_dirs('/project/monitoring/archive/openstack/logs/generic/rallytester/2020/02/01')
cluster_utils.get_list_dirs('/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01')
```
%% Cell type:code id: tags:
``` python
full_df = spark.read.json('/project/monitoring/archive/openstack/logs/generic/rallytester/2020/02/01')
full_df = spark.read.json('/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01')
```
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
# Generate the data schema, using a single day example
# in general, if enough data are collected in a single day
# it is representative of the whole data structure expected
# in the other days
spark_df = cluster_utils.get_schema(spark,
'/project/monitoring/archive/openstack/logs/generic/rallytester/2020/02/01',
'/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01',
schemafile) #<<<< If you do not have a schema file pre-defined
```
%% Cell type:code id: tags:
``` python
# Load the schema from the file
logschema = cluster_utils.load_schema(schemafile)
```
%% Cell type:code id: tags:
``` python
# Print schema
logschema
```
%% Cell type:code id: tags:
``` python
# Discover the files in the input data pattern
print(inbasepath)
dirList = cluster_utils.get_list_dirs(inbasepath)
print(dirList)
```
%% Cell type:code id: tags:
``` python
# Discover existing output
print("outbasepath is " + outbasepath)
outDirList = cluster_utils.get_list_dirs("%s/*"%outbasepath)
print("file content is %s" % outDirList)
```
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
# User defined function to extract data and manipulate it
# This is the effecive ETL
from pyspark.sql import functions as F
from pyspark.sql.window import Window
def getSparkDF(inpath,outpath):
global spark
global schemafile
#load the schema
logschema = cluster_utils.load_schema(schemafile)
#load data
full_df = spark.read.json(inpath,schema=logschema)
#prepare for sql
full_df.createOrReplaceTempView("tfull")
#
filtered_df = spark.sql("select metadata.timestamp as atime, metadata.hostgroup as hostgroup,\
metadata.environment, metadata.host as host, data.process as process,\
data.short_task as task, data.deployment as deployment,\
data.raw,\
data.source as source, data.log_source as log_source, metadata._id as uid\
from tfull where data.log_level == 'ERROR' and data.log_source == 'rallytester.rallytester' ")
##############
### Example of manipulation of strings
##############
#extract raw substring
# rawsubstring_regex = lambda x: F.regexp_replace(x,'.* (ERROR rallytester\.rallytester)\s\[[^\]]*\]\s\[[^\]]*\]\s(Task failed:\s*|)(.*)','$3')
# #extract json
# jsonmessage_regex = lambda x: F.regexp_replace(x,'Resource (.*)\'message\': u(\'|")(.*)(\'|"), u\'code\'(.*)','$3')
# #replace https long strings
# httpreplace_regex = lambda x : F.regexp_replace(x,'(.*)(https:\/\/(([^\s\/]*\/)))([^\s]*)(?<=\w)(?=\W)(.*)','$1 $2**** $6')
# #replace ips
# ipreplace_regex = lambda x : F.regexp_replace(x,'(.*)[^\d]([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})(.*)','$1 xyz.xyz.xyz.xyz $3')
# #replace UIDs
# idreplace_regex_1 = lambda x : F.regexp_replace(x,'(.*)rally-([\w-\:]{8,}[>]?)(.*)','$1 rally-xxxx $3')
# idreplace_regex_2 = lambda x : F.regexp_replace(x,'(.*)(req-[\w-\:]{12,})(.*)','$1 xxxx $3')
# idreplace_regex_3 = lambda x : F.regexp_replace(x,'(.*)([/|\s][\w-\:,]{12,}[/|\s])(.*)','$1 xxxx $3')
# idreplace_regex_4 = lambda x : F.regexp_replace(x,'(.*)\s([\w-]{12,})(.*)','$1 xxxx $3')
# idreplace_regex = lambda x : idreplace_regex_4(idreplace_regex_3(idreplace_regex_2(idreplace_regex_2(idreplace_regex_1(x)))))
# all_replaces = lambda x:idreplace_regex(ipreplace_regex(httpreplace_regex(jsonmessage_regex(rawsubstring_regex(x)))))
# regex_df = filtered_df.filter(
# ~filtered_df.raw.contains("Error executing rally task. Check the deployment is correctly configured.")
# ).withColumn('msg',all_replaces('raw'))
#regex_df = filtered_df.withColumn('msg',all_replaces('raw'))
#regex_df.show()
#output_df = regex_df
#moved at python level
output_df = filtered_df
cluster_utils.write_spark_df(output_df,outpath)
return output_df
```
%% Cell type:code id: tags:
``` python
# Loop on the day folders, discover files, skip days that have been already processed (or force rewrite)
# run day per day the filter (getSparkDF), and store the filtered data in files in the outputbasepath dir
cluster_utils.loop_on_dates(getSparkDF, inbasepath ,outbasepath, force_rewrite=True, debug=True)
```
%% Cell type:code id: tags:
``` python
# Discover existing output
print("outbasepath is " + outbasepath)
outDirList = cluster_utils.get_list_dirs("%s/*"%outbasepath)
print("file content is %s" % outDirList)
```
%% Cell type:code id: tags:
``` python
#loopOnDates writes in the user HDFS area
#now let's read it back
allinone = spark.read.parquet(outbasepath+"/*")
```
%% Cell type:code id: tags:
``` python
allinone.show(n=3,truncate=False)
```
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
```
......
......@@ -124,7 +124,7 @@ def run_tests():
# output dir in user HDFS area
outbasepath = "test_rally_errors"
# input file path with data to process with spark
inbasepath = "/project/monitoring/archive/openstack/logs/generic/rallytester/2020/0*/0[1-2]" # noqa: E501
inbasepath = "/project/monitoring/archive/openstack/logs/generic/rallytester/2021/0*/0[1-2]" # noqa: E501
# schema file
schemafile = 'rally_schema.json'
......@@ -138,7 +138,7 @@ def run_tests():
# in general, if enough data are collected in a single day
# it is representative of the whole data structure expected
# in the other days
sdir = '/project/monitoring/archive/openstack/logs/generic/rallytester/2020/02/01' # noqa: E501
sdir = '/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01' # noqa: E501
cluster_utils.get_schema(spark, sdir, schemafile)
myprint("Load the schema from the file")
......
%% Cell type:markdown id: tags:
# Test access of Spark via pySPark
%% Cell type:markdown id: tags:
This notebooks installs the data-analytics package
and tests the basic functionalities
In order to run it in Swan, follow those steps
1) pass your kerberos credentials
2) install the package, using a specific tag (in the example is qa)
3) run
%% Cell type:code id: tags:
``` python
import getpass
import os, sys
os.environ['PYTHONPATH']=os.environ['HOME']+'.local/lib/python3.6/site-packages/:'+os.environ['PYTHONPATH']
```
%% Cell type:markdown id: tags:
## Start Spark (click the "star")
%% Cell type:markdown id: tags:
## Install the package (if not done already)
%% Cell type:code id: tags:
``` python
%%bash
#install_branch=qa
#pip3 install --user git+https://:@gitlab.cern.ch:8443/cloud-infrastructure/data-analytics.git@${install_branch}
```
%% Cell type:markdown id: tags:
# Test package
%% Cell type:code id: tags:
``` python
from etl.spark_etl import cluster_utils
```
%% Cell type:code id: tags:
``` python
swan_spark_conf.getAll()
```
%% Cell type:code id: tags:
``` python
# Test connection to Spark
#sc, spark, conf = cluster_utils.set_spark(swan_spark_conf)
```
%% Cell type:code id: tags:
``` python
sc
```
%% Cell type:code id: tags:
``` python
spark
```
%% Cell type:code id: tags:
``` python
# Test stopping spark session
#cluster_utils.stop_spark(sc,spark)
```
%% Cell type:code id: tags:
``` python
# Start again the spark session
#sc, spark, conf = cluster_utils.set_spark()
```
%% Cell type:markdown id: tags:
# Test data extraction
%% Cell type:markdown id: tags:
In this example access the rally data, extract a subset of data
and store the results in a different outpath (a file per day)
%% Cell type:code id: tags:
``` python
#Configuration PARAMS
#Define configuration parameters
#output dir in user HDFS area
outbasepath="test_rally_errors"
#output base file name
outbasefile="rally_errors.parquet"
#input file path with data to process with spark
inbasepath="/project/monitoring/archive/openstack/logs/generic/rallytester/2020/0*/01"
inbasepath="/project/monitoring/archive/openstack/logs/generic/rallytester/2021/0*/01"
#schema file
schemafile='rally_schema.json'
```
%% Cell type:code id: tags:
``` python
!hdfs dfs -ls /project/monitoring/archive/openstack/logs/generic/rallytester/2020/02/01
!hdfs dfs -ls /project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01
```
%% Cell type:code id: tags:
``` python
cluster_utils.get_list_dirs('/project/monitoring/archive/openstack/logs/generic/rallytester/2020/02/01')
cluster_utils.get_list_dirs('/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01')
```
%% Cell type:code id: tags:
``` python
full_df = spark.read.json('/project/monitoring/archive/openstack/logs/generic/rallytester/2020/02/01')
full_df = spark.read.json('/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01')
```
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
# Generate the data schema, using a single day example
# in general, if enough data are collected in a single day
# it is representative of the whole data structure expected
# in the other days
spark_df = cluster_utils.get_schema(spark,
'/project/monitoring/archive/openstack/logs/generic/rallytester/2020/02/01',
'/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01',
schemafile) #<<<< If you do not have a schema file pre-defined
```
%% Cell type:code id: tags:
``` python
# Load the schema from the file
logschema = cluster_utils.load_schema(schemafile)
```
%% Cell type:code id: tags:
``` python
# Print schema
logschema
```
%% Cell type:code id: tags:
``` python
# Discover the files in the input data pattern
print(inbasepath)
dirList = cluster_utils.get_list_dirs(inbasepath)
print(dirList)
```
%% Cell type:code id: tags:
``` python
# Discover existing output
print("outbasepath is " + outbasepath)
outDirList = cluster_utils.get_list_dirs("%s/*"%outbasepath)
print("file content is %s" % outDirList)
```
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
# User defined function to extract data and manipulate it
# This is the effecive ETL
from pyspark.sql import functions as F
from pyspark.sql.window import Window
def getSparkDF(inpath,outpath):
global spark
global schemafile
#load the schema
logschema = cluster_utils.load_schema(schemafile)
#load data
full_df = spark.read.json(inpath,schema=logschema)
#prepare for sql
full_df.createOrReplaceTempView("tfull")
#
filtered_df = spark.sql("select metadata.timestamp as atime, metadata.hostgroup as hostgroup,\
metadata.environment, metadata.host as host, data.process as process,\
data.short_task as task, data.deployment as deployment,\
data.raw,\
data.source as source, data.log_source as log_source, metadata._id as uid\
from tfull where data.log_level == 'ERROR' and data.log_source == 'rallytester.rallytester' ")
##############
### Example of manipulation of strings
##############
#extract raw substring
# rawsubstring_regex = lambda x: F.regexp_replace(x,'.* (ERROR rallytester\.rallytester)\s\[[^\]]*\]\s\[[^\]]*\]\s(Task failed:\s*|)(.*)','$3')
# #extract json
# jsonmessage_regex = lambda x: F.regexp_replace(x,'Resource (.*)\'message\': u(\'|")(.*)(\'|"), u\'code\'(.*)','$3')
# #replace https long strings
# httpreplace_regex = lambda x : F.regexp_replace(x,'(.*)(https:\/\/(([^\s\/]*\/)))([^\s]*)(?<=\w)(?=\W)(.*)','$1 $2**** $6')
# #replace ips
# ipreplace_regex = lambda x : F.regexp_replace(x,'(.*)[^\d]([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})(.*)','$1 xyz.xyz.xyz.xyz $3')
# #replace UIDs
# idreplace_regex_1 = lambda x : F.regexp_replace(x,'(.*)rally-([\w-\:]{8,}[>]?)(.*)','$1 rally-xxxx $3')
# idreplace_regex_2 = lambda x : F.regexp_replace(x,'(.*)(req-[\w-\:]{12,})(.*)','$1 xxxx $3')
# idreplace_regex_3 = lambda x : F.regexp_replace(x,'(.*)([/|\s][\w-\:,]{12,}[/|\s])(.*)','$1 xxxx $3')
# idreplace_regex_4 = lambda x : F.regexp_replace(x,'(.*)\s([\w-]{12,})(.*)','$1 xxxx $3')
# idreplace_regex = lambda x : idreplace_regex_4(idreplace_regex_3(idreplace_regex_2(idreplace_regex_2(idreplace_regex_1(x)))))
# all_replaces = lambda x:idreplace_regex(ipreplace_regex(httpreplace_regex(jsonmessage_regex(rawsubstring_regex(x)))))
# regex_df = filtered_df.filter(
# ~filtered_df.raw.contains("Error executing rally task. Check the deployment is correctly configured.")
# ).withColumn('msg',all_replaces('raw'))
#regex_df = filtered_df.withColumn('msg',all_replaces('raw'))
#regex_df.show()
#output_df = regex_df
#moved at python level
output_df = filtered_df
cluster_utils.write_spark_df(output_df,outpath)
return output_df
```
%% Cell type:code id: tags:
``` python
# Loop on the day folders, discover files, skip days that have been already processed (or force rewrite)
# run day per day the filter (getSparkDF), and store the filtered data in files in the outputbasepath dir
cluster_utils.loop_on_dates(getSparkDF, inbasepath ,outbasepath, force_rewrite=True, debug=True)
```
%% Cell type:code id: tags:
``` python
# Discover existing output
print("outbasepath is " + outbasepath)
outDirList = cluster_utils.get_list_dirs("%s/*"%outbasepath)
print("file content is %s" % outDirList)
```
%% Cell type:code id: tags:
``` python
#loopOnDates writes in the user HDFS area
#now let's read it back
allinone = spark.read.parquet(outbasepath+"/*")
```
%% Cell type:code id: tags:
``` python
allinone.show(n=3,truncate=False)
```
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
```
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment