Commit 03999575 authored by Domenico Giordano's avatar Domenico Giordano
Browse files

improve verbosity of set_spark

parent e5a6e742
......@@ -36,16 +36,27 @@ TIMESTAMP_TO_CONSIDER = 'event_timestamp'
# SPARK MANAGEMENT -----------------------------------------------------------
def set_spark(spark_conf=None, view_ui=False):
"""Set Spark.
NB: kerberos file in /tmp is expected
def set_spark(spark_conf=None, view_ui=False, debug=False):
"""Set/Get SparkContext and Spark Object.
If a SparkContext already exists, this is returned.
Otherwise a new SparkContext is created using the defined SparkConf
Some parameters of the SparkConf are forced to be compatible with the
CERN Spark Cluster (such as ports)
"""
if spark_conf is None:
spark_conf = SparkConf()
# spark_conf.set('spark.driver.memory', '8g')
# spark_conf.set('spark.executor.memory', '2g')
# spark_conf.set('spark.python.worker.memory', '1g')
# Prasanth ADVICE
'''
spark_conf.set('spark.executor.memory', '8g')
spark_conf.set('spark.executor.cores', 4)
spark_conf.set('spark.sql.shuffle.partitions', 200)
'''
spark_conf.set('spark.sql.shuffle.partitions', 200)
spark_conf.set('spark.driver.maxResultSize', 0)
# spark_conf.set('spark.dynamicAllocation.enabled', False)
......@@ -89,48 +100,19 @@ def set_spark(spark_conf=None, view_ui=False):
# spark_conf.set('spark.driver.extraClassPath', extra_class)
# spark_conf.set('spark.driver.extraClassPath','/eos/project/s/swan/public/hadoop-mapreduce-client-core-2.6.0-cdh5.7.6.jar')
# Prasanth ADVICE
'''
spark_conf.set('spark.executor.memory', '8g')
spark_conf.set('spark.executor.cores', 4)
spark_conf.set('spark.sql.shuffle.partitions', 200)
'''
# Empirically working for 5 days of data for cells
# with more than ca 75 hosts
# spark_conf.set('spark.driver.memory', '8g')
# spark_conf.set('spark.executor.memory', '8g')
# spark_conf.set('spark.python.worker.memory', '6g')
# spark_conf.set('spark.sql.shuffle.partitions', 600)
# spark_conf.set('spark.yarn.web-proxy.address', '172.17.0.1')
# spark_conf.set('spark.web-proxy.address', '172.17.0.1')
# spark_conf.set('spark.sql.shuffle.partitions', 30)
# java empty result error (with 200 partitions and 1 week of data)
# solution: decrease the nr of partitions
# per qualche strano motivo le partizioni erano troppe, e quindi finiva
# con lo scrivere partizioni vuote, e questo lo mandava in crash, cercando
# di fare pop da una coda che era empty. Abbassando il numero di partizioni
# sembra funzionare. Per 5-6 giorni di dati -> 10 partizioni. Non so bene
# che relazione serve perchè aumentando i giorni poi va in errore di troppa
# memoria.. è un po' oscura la faccenda...
# spark_conf.set('spark.driver.maxResultSize', 0)
# spark_conf.set('spark.storage.memoryFraction', 0.1)
print(functools.reduce(lambda x, y:
"%s\n%s\t%s" % (x, y[0], y[1]),
spark_conf.getAll(),
"Configuration data:"))
sc = SparkContext.getOrCreate(spark_conf)
spark = SparkSession(sc)
print(functools.reduce(lambda x, y:
"%s\n%s:\t%s" % (x, y[0], y[1]),
spark.sparkContext.getConf().getAll(),
"Configuration data after setup:"))
print(spark.sparkContext.getConf().getAll())
if debug:
final_conf = spark.sparkContext.getConf().getAll()
custom_conf = dict(spark_conf.getAll())
print(functools.reduce(lambda x, y:
"%s\n%50s: %-150s | %-200s" % \
(x, y[0], y[1], custom_conf[y[0]] \
if (y[0] in custom_conf.keys() and custom_conf[y[0]]!=y[1]) \
else ('same' if (y[0] in custom_conf.keys()) else '-') ),
final_conf,
"\nSparkConf of this Spark session Vs Custom Conf passed (after `|` ):\n"))
return sc, spark, spark_conf
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment