test_spark_connector.py 7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from pyspark.sql import functions as F
from etl.spark_etl import cluster_utils


def myprint(astring):
    print("\n" + "{:_<50}".format(''))
    print(astring)
    print("{:_<50}".format('') + "\n")


def getSparkDF(inpath, outpath, spark, schemafile):
    """User defined function for the spark data ETL.

    This is the effecive ETL
    Input params:
    inpath: full path of the folders to be analysed (can include wildcard)
    outpath: full path of the folder where derived data are stored
    spark: current running spark session
    schemafile: json file containing the schema
    """
    print('schema file used by getSparkDF is : ' + schemafile)
    logschema = cluster_utils.load_schema(schemafile)
    # load data
    full_df = spark.read.json(inpath, schema=logschema)

    # spark sql on a temprorary table
    full_df.createOrReplaceTempView("tfull")
    filtered_df = spark.sql("select metadata.timestamp as atime,\
                            metadata.hostgroup as hostgroup,\
                            metadata.environment, metadata.host as host,\
                            data.process as process,\
                            data.short_task as task,\
                            data.deployment as deployment,\
                            data.raw,\
                            data.source as source,\
                            data.log_source as log_source,\
                            metadata._id as uid\
                            from tfull\
                            where\
Domenico Giordano's avatar
Domenico Giordano committed
40
                            data.log_level == 'ERROR' \
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
                            ")

    # #######################################
    #  Example of manipulation of strings
    # #######################################

    # extract raw substring
    rawsubstring_regex =\
        lambda x: F.regexp_replace(x,
                                   '.* (ERROR rallytester\.rallytester)\s\[[^\]]*\]\s\[[^\]]*\]\s(Task failed:\s*|)(.*)',  # noqa: W605
                                   '$3')

    # extract json
    jsonmessage_regex =\
        lambda x: F.regexp_replace(x,
                                   'Resource (.*)\'message\': u(\'|")(.*)(\'|"), u\'code\'(.*)',  # noqa: W605
                                   '$3')

    # replace https long strings
    httpreplace_regex =\
        lambda x: F.regexp_replace(x,
                                    '(.*)(https:\/\/(([^\s\/]*\/)))([^\s]*)(?<=\w)(?=\W)(.*)',  # noqa: W605
                                    '$1 $2**** $6')

    # replace ips
    ipreplace_regex =\
        lambda x: F.regexp_replace(x,
                                    '(.*)[^\d]([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})(.*)',  # noqa: W605
                                    '$1 xyz.xyz.xyz.xyz $3')

    # replace UIDs
    def idreplace_regex_1(x): return F.regexp_replace(x,
                                                   '(.*)rally-([\w-\:]{8,}[>]?)(.*)',  # noqa: W605
                                                   '$1 rally-xxxx $3')

    def idreplace_regex_2(x): return F.regexp_replace(x,
                                                    '(.*)(req-[\w-\:]{12,})(.*)',  # noqa: W605
                                                    '$1 xxxx $3')

    def idreplace_regex_3(x): return F.regexp_replace(x,
                                                      '(.*)([/|\s][\w-\:,]{12,}[/|\s])(.*)',  # noqa: W605
                                                      '$1 xxxx $3')

    def idreplace_regex_4(x): return F.regexp_replace(
        x, r'(.*)\s([\w-]{12,})(.*)', '$1 xxxx $3')

    idreplace_regex =\
        lambda x: idreplace_regex_4(idreplace_regex_3(
            idreplace_regex_2(idreplace_regex_1(x))))

    all_replaces =\
        lambda x: idreplace_regex(ipreplace_regex(
            httpreplace_regex(jsonmessage_regex(
                rawsubstring_regex(x)))))

    regex_df = filtered_df.withColumn('msg', all_replaces('raw'))
    output_df = regex_df

    # Let's now write to the output dir
Domenico Giordano's avatar
Domenico Giordano committed
100
101
102
103
104
    df_empty = (len(output_df.head(1))==0)
    print("Is the dataframe empty?  %s" % df_empty)
    if not df_empty:
        print("Saving dataframe")
        cluster_utils.write_spark_df(output_df,outpath)
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
    return output_df


def run_tests():
    myprint("Start the spark session")

    sc, spark, conf = cluster_utils.set_spark()

    myprint(sc)
    myprint(spark)

    myprint("Test stopping spark session")
    cluster_utils.stop_spark(sc, spark)

    myprint("Start again the spark session")
    sc, spark, conf = cluster_utils.set_spark()

    myprint("Configuration PARAMS")

    # ################################
    # Define configuration parameters
    # ################################
    # output dir in user HDFS area
128
    outbasepath = "test_rally_errors_py"
129
    # input file path with data to process with spark
Domenico Giordano's avatar
Domenico Giordano committed
130
    inbasepath = "/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/0[1-2]"  # noqa: E501
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
    # schema file
    schemafile = 'rally_schema.json'

    print("outbasepath = " + outbasepath)
    print("inbasepath = " + inbasepath)
    print("schemafile = " + schemafile)

    myprint("Generate the data schema, using a single day example")

    # Needed if you do not have a schema file pre-defined
    # in general, if enough data are collected in a single day
    # it is representative of the whole data structure expected
    # in the other days
    sdir = '/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01'  # noqa: E501
    cluster_utils.get_schema(spark, sdir, schemafile)

    myprint("Load the schema from the file")
    logschema = cluster_utils.load_schema(schemafile)

    # Myprint schema
    myprint(logschema)

    # Discover the files in the input data pattern
    myprint('Discover the files in the input data pattern')
    print(inbasepath)
    dirList = cluster_utils.get_list_dirs(inbasepath)
    print(dirList)

    # Discover existing output
    myprint('Discover existing output')
    print("outbasepath is " + outbasepath)
    outDirList = cluster_utils.get_list_dirs("%s/*" % outbasepath)
    print("file content is %s" % outDirList)

    def getSparkDF_lambda(x, y): return getSparkDF(x, y,
                                                   spark=spark,
                                                   schemafile=schemafile)

    # Loop on the day folders, discover files,
    # skip days that have been already processed (or force rewrite)
    # run day per day the filter (getSparkDF),
    # and store the filtered data in files in the outputbasepath dir
    cluster_utils.loop_on_dates(getSparkDF_lambda,
                                inbasepath, outbasepath,
                                force_rewrite=True, debug=True)

    # Discover existing output
    myprint("outbasepath is " + outbasepath)
    outDirList = cluster_utils.get_list_dirs("%s/*" % outbasepath)
    myprint("file content is %s" % outDirList)

    # loop_on_dates writes in the user HDFS area
    # now let's read it back
    allinone = spark.read.parquet(outbasepath + "/*")
    allinone.show(n=3, truncate=False)


if __name__ == "__main__":
    myprint('Starting %s' % __file__)
    run_tests()