Commit 2da98fe2 authored by Domenico Giordano's avatar Domenico Giordano
Browse files

change destination hdfs folder

parent 5f3879ee
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Test access of Spark via pySPark # Test access of Spark via pySPark
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
This notebooks installs the data-analytics package This notebooks installs the data-analytics package
and tests the basic functionalities and tests the basic functionalities
In order to run it in Swan, follow those steps In order to run it in Swan, follow those steps
1) pass your kerberos credentials 1) pass your kerberos credentials
2) install the package, using a specific tag (in the example is qa) 2) install the package, using a specific tag (in the example is qa)
3) run 3) run
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
import os, sys import os, sys
os.environ['PYTHONPATH']=os.environ['HOME']+'.local/lib/python3.6/site-packages/:'+os.environ['PYTHONPATH'] os.environ['PYTHONPATH']=os.environ['HOME']+'.local/lib/python3.6/site-packages/:'+os.environ['PYTHONPATH']
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Test package # Test package
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
from etl.spark_etl import cluster_utils from etl.spark_etl import cluster_utils
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
# Start the spark session # Start the spark session
sc, spark, conf = cluster_utils.set_spark() sc, spark, conf = cluster_utils.set_spark()
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Test data extraction # Test data extraction
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
In this example access the rally data, extract a subset of data In this example access the rally data, extract a subset of data
and store the results in a different outpath (a file per day) and store the results in a different outpath (a file per day)
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
#Configuration PARAMS #Configuration PARAMS
#Define configuration parameters #Define configuration parameters
#output dir in user HDFS area #output dir in user HDFS area
outbasepath="test_rally_errors_ipynb" outbasepath="/project/it_cloud_data_analytics/CI/test_rally_errors_ipynb"
#output base file name #output base file name
outbasefile="rally_errors.parquet" outbasefile="rally_errors.parquet"
#input file path with data to process with spark #input file path with data to process with spark
inbasepath="/project/monitoring/archive/openstack/logs/generic/rallytester/2021/0[2-3]/01" inbasepath="/project/monitoring/archive/openstack/logs/generic/rallytester/2021/0[2-3]/01"
#schema file #schema file
schemafile='rally_schema.json' schemafile='rally_schema.json'
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
!hdfs dfs -ls /project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01 !hdfs dfs -ls /project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
cluster_utils.get_list_dirs('/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01') cluster_utils.get_list_dirs('/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01')
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
full_df = spark.read.json('/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01') full_df = spark.read.json('/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01')
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
# Generate the data schema, using a single day example # Generate the data schema, using a single day example
# in general, if enough data are collected in a single day # in general, if enough data are collected in a single day
# it is representative of the whole data structure expected # it is representative of the whole data structure expected
# in the other days # in the other days
spark_df = cluster_utils.get_schema(spark, spark_df = cluster_utils.get_schema(spark,
'/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01', '/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01',
schemafile) #<<<< If you do not have a schema file pre-defined schemafile) #<<<< If you do not have a schema file pre-defined
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
# Load the schema from the file # Load the schema from the file
logschema = cluster_utils.load_schema(schemafile) logschema = cluster_utils.load_schema(schemafile)
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
# Print schema # Print schema
logschema logschema
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
# Discover the files in the input data pattern # Discover the files in the input data pattern
print(inbasepath) print(inbasepath)
dirList = cluster_utils.get_list_dirs(inbasepath) dirList = cluster_utils.get_list_dirs(inbasepath)
print(dirList) print(dirList)
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
# Discover existing output # Discover existing output
print("outbasepath is " + outbasepath) print("outbasepath is " + outbasepath)
outDirList = cluster_utils.get_list_dirs("%s/*"%outbasepath) outDirList = cluster_utils.get_list_dirs("%s/*"%outbasepath)
print("file content is %s" % outDirList) print("file content is %s" % outDirList)
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
# User defined function to extract data and manipulate it # User defined function to extract data and manipulate it
# This is the effecive ETL # This is the effecive ETL
from pyspark.sql import functions as F from pyspark.sql import functions as F
from pyspark.sql.window import Window from pyspark.sql.window import Window
def getSparkDF(inpath,outpath): def getSparkDF(inpath,outpath):
global spark global spark
global schemafile global schemafile
#load the schema #load the schema
logschema = cluster_utils.load_schema(schemafile) logschema = cluster_utils.load_schema(schemafile)
#load data #load data
full_df = spark.read.json(inpath,schema=logschema) full_df = spark.read.json(inpath,schema=logschema)
#prepare for sql #prepare for sql
full_df.createOrReplaceTempView("tfull") full_df.createOrReplaceTempView("tfull")
# #
filtered_df = spark.sql("select metadata.timestamp as atime, metadata.hostgroup as hostgroup,\ filtered_df = spark.sql("select metadata.timestamp as atime, metadata.hostgroup as hostgroup,\
metadata.environment, metadata.host as host, data.process as process,\ metadata.environment, metadata.host as host, data.process as process,\
data.short_task as task, data.deployment as deployment,\ data.short_task as task, data.deployment as deployment,\
data.raw,\ data.raw,\
data.source as source, data.log_source as log_source, metadata._id as uid\ data.source as source, data.log_source as log_source, metadata._id as uid\
from tfull where data.log_level == 'ERROR' ") from tfull where data.log_level == 'ERROR' ")
############## ##############
### Example of manipulation of strings ### Example of manipulation of strings
############## ##############
#extract raw substring #extract raw substring
# rawsubstring_regex = lambda x: F.regexp_replace(x,'.* (ERROR rallytester\.rallytester)\s\[[^\]]*\]\s\[[^\]]*\]\s(Task failed:\s*|)(.*)','$3') # rawsubstring_regex = lambda x: F.regexp_replace(x,'.* (ERROR rallytester\.rallytester)\s\[[^\]]*\]\s\[[^\]]*\]\s(Task failed:\s*|)(.*)','$3')
# #extract json # #extract json
# jsonmessage_regex = lambda x: F.regexp_replace(x,'Resource (.*)\'message\': u(\'|")(.*)(\'|"), u\'code\'(.*)','$3') # jsonmessage_regex = lambda x: F.regexp_replace(x,'Resource (.*)\'message\': u(\'|")(.*)(\'|"), u\'code\'(.*)','$3')
# #replace https long strings # #replace https long strings
# httpreplace_regex = lambda x : F.regexp_replace(x,'(.*)(https:\/\/(([^\s\/]*\/)))([^\s]*)(?<=\w)(?=\W)(.*)','$1 $2**** $6') # httpreplace_regex = lambda x : F.regexp_replace(x,'(.*)(https:\/\/(([^\s\/]*\/)))([^\s]*)(?<=\w)(?=\W)(.*)','$1 $2**** $6')
# #replace ips # #replace ips
# ipreplace_regex = lambda x : F.regexp_replace(x,'(.*)[^\d]([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})(.*)','$1 xyz.xyz.xyz.xyz $3') # ipreplace_regex = lambda x : F.regexp_replace(x,'(.*)[^\d]([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})(.*)','$1 xyz.xyz.xyz.xyz $3')
# #replace UIDs # #replace UIDs
# idreplace_regex_1 = lambda x : F.regexp_replace(x,'(.*)rally-([\w-\:]{8,}[>]?)(.*)','$1 rally-xxxx $3') # idreplace_regex_1 = lambda x : F.regexp_replace(x,'(.*)rally-([\w-\:]{8,}[>]?)(.*)','$1 rally-xxxx $3')
# idreplace_regex_2 = lambda x : F.regexp_replace(x,'(.*)(req-[\w-\:]{12,})(.*)','$1 xxxx $3') # idreplace_regex_2 = lambda x : F.regexp_replace(x,'(.*)(req-[\w-\:]{12,})(.*)','$1 xxxx $3')
# idreplace_regex_3 = lambda x : F.regexp_replace(x,'(.*)([/|\s][\w-\:,]{12,}[/|\s])(.*)','$1 xxxx $3') # idreplace_regex_3 = lambda x : F.regexp_replace(x,'(.*)([/|\s][\w-\:,]{12,}[/|\s])(.*)','$1 xxxx $3')
# idreplace_regex_4 = lambda x : F.regexp_replace(x,'(.*)\s([\w-]{12,})(.*)','$1 xxxx $3') # idreplace_regex_4 = lambda x : F.regexp_replace(x,'(.*)\s([\w-]{12,})(.*)','$1 xxxx $3')
# idreplace_regex = lambda x : idreplace_regex_4(idreplace_regex_3(idreplace_regex_2(idreplace_regex_2(idreplace_regex_1(x))))) # idreplace_regex = lambda x : idreplace_regex_4(idreplace_regex_3(idreplace_regex_2(idreplace_regex_2(idreplace_regex_1(x)))))
# all_replaces = lambda x:idreplace_regex(ipreplace_regex(httpreplace_regex(jsonmessage_regex(rawsubstring_regex(x))))) # all_replaces = lambda x:idreplace_regex(ipreplace_regex(httpreplace_regex(jsonmessage_regex(rawsubstring_regex(x)))))
# regex_df = filtered_df.filter( # regex_df = filtered_df.filter(
# ~filtered_df.raw.contains("Error executing rally task. Check the deployment is correctly configured.") # ~filtered_df.raw.contains("Error executing rally task. Check the deployment is correctly configured.")
# ).withColumn('msg',all_replaces('raw')) # ).withColumn('msg',all_replaces('raw'))
#regex_df = filtered_df.withColumn('msg',all_replaces('raw')) #regex_df = filtered_df.withColumn('msg',all_replaces('raw'))
#regex_df.show() #regex_df.show()
#output_df = regex_df #output_df = regex_df
#moved at python level #moved at python level
output_df = filtered_df output_df = filtered_df
cluster_utils.write_spark_df(output_df,outpath) cluster_utils.write_spark_df(output_df,outpath)
return output_df return output_df
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
# Loop on the day folders, discover files, skip days that have been already processed (or force rewrite) # Loop on the day folders, discover files, skip days that have been already processed (or force rewrite)
# run day per day the filter (getSparkDF), and store the filtered data in files in the outputbasepath dir # run day per day the filter (getSparkDF), and store the filtered data in files in the outputbasepath dir
cluster_utils.loop_on_dates(getSparkDF, inbasepath ,outbasepath, force_rewrite=True, debug=True) cluster_utils.loop_on_dates(getSparkDF, inbasepath ,outbasepath, force_rewrite=True, debug=True)
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
# Discover existing output # Discover existing output
print("outbasepath is " + outbasepath) print("outbasepath is " + outbasepath)
outDirList = cluster_utils.get_list_dirs("%s/*"%outbasepath) outDirList = cluster_utils.get_list_dirs("%s/*"%outbasepath)
print("file content is %s" % outDirList) print("file content is %s" % outDirList)
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
#loopOnDates writes in the user HDFS area #loopOnDates writes in the user HDFS area
#now let's read it back #now let's read it back
allinone = spark.read.parquet(outbasepath+"/*") allinone = spark.read.parquet(outbasepath+"/*")
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
allinone.show(n=3,truncate=False) allinone.show(n=3,truncate=False)
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
``` ```
......
...@@ -125,7 +125,7 @@ def run_tests(): ...@@ -125,7 +125,7 @@ def run_tests():
# Define configuration parameters # Define configuration parameters
# ################################ # ################################
# output dir in user HDFS area # output dir in user HDFS area
outbasepath = "test_rally_errors_py" outbasepath = "/project/it_cloud_data_analytics/CI/test_rally_errors_py"
# input file path with data to process with spark # input file path with data to process with spark
inbasepath = "/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/0[1-2]" # noqa: E501 inbasepath = "/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/0[1-2]" # noqa: E501
# schema file # schema file
......
...@@ -47,7 +47,7 @@ hdfs dfs -ls /project/monitoring || fail 'test access to hdfs' ...@@ -47,7 +47,7 @@ hdfs dfs -ls /project/monitoring || fail 'test access to hdfs'
echo -e "\n@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@" echo -e "\n@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"
echo -e "\ntest python pyspark module\n" echo -e "\ntest python pyspark module\n"
echo -e "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n" echo -e "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
outbasepath="test_rally_errors_py" outbasepath="/project/it_cloud_data_analytics/CI/test_rally_errors_py"
hdfs dfs -test -d $outbasepath; [[ "$?" -eq "0" ]] && echo "have to remove folder $outbasepath" && hdfs dfs -rm -r $outbasepath hdfs dfs -test -d $outbasepath; [[ "$?" -eq "0" ]] && echo "have to remove folder $outbasepath" && hdfs dfs -rm -r $outbasepath
python3 ${WORK_DIR}/test_spark_connector.py python3 ${WORK_DIR}/test_spark_connector.py
...@@ -56,6 +56,6 @@ python3 ${WORK_DIR}/test_spark_connector.py ...@@ -56,6 +56,6 @@ python3 ${WORK_DIR}/test_spark_connector.py
#echo -e "\ntest jupyter notebook\n" #echo -e "\ntest jupyter notebook\n"
jupyter nbconvert --version jupyter nbconvert --version
#echo -e "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n" #echo -e "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
outbasepath="test_rally_errors_ipynb" outbasepath="/project/it_cloud_data_analytics/CI/test_rally_errors_ipynb"
hdfs dfs -test -d $outbasepath; [[ "$?" -eq "0" ]] && echo "have to remove folder $outbasepath" && hdfs dfs -rm -r $outbasepath hdfs dfs -test -d $outbasepath; [[ "$?" -eq "0" ]] && echo "have to remove folder $outbasepath" && hdfs dfs -rm -r $outbasepath
jupyter nbconvert --ExecutePreprocessor.timeout=600 --to notebook --execute ${WORK_DIR}/test_notebook_spark_connector.ipynb jupyter nbconvert --ExecutePreprocessor.timeout=600 --to notebook --execute ${WORK_DIR}/test_notebook_spark_connector.ipynb
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment