from pyspark.sql import functions as F from etl.spark_etl import cluster_utils def myprint(astring): print("\n" + "{:_<50}".format('')) print(astring) print("{:_<50}".format('') + "\n") def getSparkDF(inpath, outpath, spark, schemafile): """User defined function for the spark data ETL. This is the effecive ETL Input params: inpath: full path of the folders to be analysed (can include wildcard) outpath: full path of the folder where derived data are stored spark: current running spark session schemafile: json file containing the schema """ print('schema file used by getSparkDF is : ' + schemafile) logschema = cluster_utils.load_schema(schemafile) # load data full_df = spark.read.json(inpath, schema=logschema) # spark sql on a temprorary table full_df.createOrReplaceTempView("tfull") filtered_df = spark.sql("select metadata.timestamp as atime,\ metadata.hostgroup as hostgroup,\ metadata.environment, metadata.host as host,\ data.process as process,\ data.short_task as task,\ data.deployment as deployment,\ data.raw,\ data.source as source,\ data.log_source as log_source,\ metadata._id as uid\ from tfull\ where\ data.log_level == 'ERROR' \ ") # ####################################### # Example of manipulation of strings # ####################################### # extract raw substring rawsubstring_regex =\ lambda x: F.regexp_replace(x, '.* (ERROR rallytester\.rallytester)\s\[[^\]]*\]\s\[[^\]]*\]\s(Task failed:\s*|)(.*)', # noqa: W605 '$3') # extract json jsonmessage_regex =\ lambda x: F.regexp_replace(x, 'Resource (.*)\'message\': u(\'|")(.*)(\'|"), u\'code\'(.*)', # noqa: W605 '$3') # replace https long strings httpreplace_regex =\ lambda x: F.regexp_replace(x, '(.*)(https:\/\/(([^\s\/]*\/)))([^\s]*)(?<=\w)(?=\W)(.*)', # noqa: W605 '$1 $2**** $6') # replace ips ipreplace_regex =\ lambda x: F.regexp_replace(x, '(.*)[^\d]([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})(.*)', # noqa: W605 '$1 xyz.xyz.xyz.xyz $3') # replace UIDs def idreplace_regex_1(x): return F.regexp_replace(x, '(.*)rally-([\w-\:]{8,}[>]?)(.*)', # noqa: W605 '$1 rally-xxxx $3') def idreplace_regex_2(x): return F.regexp_replace(x, '(.*)(req-[\w-\:]{12,})(.*)', # noqa: W605 '$1 xxxx $3') def idreplace_regex_3(x): return F.regexp_replace(x, '(.*)([/|\s][\w-\:,]{12,}[/|\s])(.*)', # noqa: W605 '$1 xxxx $3') def idreplace_regex_4(x): return F.regexp_replace( x, r'(.*)\s([\w-]{12,})(.*)', '$1 xxxx $3') idreplace_regex =\ lambda x: idreplace_regex_4(idreplace_regex_3( idreplace_regex_2(idreplace_regex_1(x)))) all_replaces =\ lambda x: idreplace_regex(ipreplace_regex( httpreplace_regex(jsonmessage_regex( rawsubstring_regex(x))))) regex_df = filtered_df.withColumn('msg', all_replaces('raw')) output_df = regex_df # Let's now write to the output dir df_empty = (len(output_df.head(1))==0) print("Is the dataframe empty? %s" % df_empty) if not df_empty: print("Saving dataframe") cluster_utils.write_spark_df(output_df,outpath) return output_df def run_tests(): myprint("Start the spark session") sc, spark, conf = cluster_utils.set_spark() myprint(sc) myprint(spark) myprint("Test stopping spark session") cluster_utils.stop_spark(sc, spark) myprint("Start again the spark session") sc, spark, conf = cluster_utils.set_spark() myprint("Configuration PARAMS") # ################################ # Define configuration parameters # ################################ # output dir in user HDFS area outbasepath = "test_rally_errors_py" # input file path with data to process with spark inbasepath = "/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/0[1-2]" # noqa: E501 # schema file schemafile = 'rally_schema.json' print("outbasepath = " + outbasepath) print("inbasepath = " + inbasepath) print("schemafile = " + schemafile) myprint("Generate the data schema, using a single day example") # Needed if you do not have a schema file pre-defined # in general, if enough data are collected in a single day # it is representative of the whole data structure expected # in the other days sdir = '/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01' # noqa: E501 cluster_utils.get_schema(spark, sdir, schemafile) myprint("Load the schema from the file") logschema = cluster_utils.load_schema(schemafile) # Myprint schema myprint(logschema) # Discover the files in the input data pattern myprint('Discover the files in the input data pattern') print(inbasepath) dirList = cluster_utils.get_list_dirs(inbasepath) print(dirList) # Discover existing output myprint('Discover existing output') print("outbasepath is " + outbasepath) outDirList = cluster_utils.get_list_dirs("%s/*" % outbasepath) print("file content is %s" % outDirList) def getSparkDF_lambda(x, y): return getSparkDF(x, y, spark=spark, schemafile=schemafile) # Loop on the day folders, discover files, # skip days that have been already processed (or force rewrite) # run day per day the filter (getSparkDF), # and store the filtered data in files in the outputbasepath dir cluster_utils.loop_on_dates(getSparkDF_lambda, inbasepath, outbasepath, force_rewrite=True, debug=True) # Discover existing output myprint("outbasepath is " + outbasepath) outDirList = cluster_utils.get_list_dirs("%s/*" % outbasepath) myprint("file content is %s" % outDirList) # loop_on_dates writes in the user HDFS area # now let's read it back allinone = spark.read.parquet(outbasepath + "/*") allinone.show(n=3, truncate=False) if __name__ == "__main__": myprint('Starting %s' % __file__) run_tests()