diff --git a/ansible/roles/python-wheels-upload/templates/config/spark-env.sh b/ansible/roles/python-wheels-upload/templates/config/spark-env.sh index fcb99ffdf46e34846b953ffe5fff1c30d2c86e3e..a5a22f3c699dedcf8ab14eeaa2d51fbc21e98e17 100644 --- a/ansible/roles/python-wheels-upload/templates/config/spark-env.sh +++ b/ansible/roles/python-wheels-upload/templates/config/spark-env.sh @@ -72,6 +72,23 @@ # In order to determine that we check the spark-default.conf file and script input arguments ($@). # If you use spark-submit and set this inside the Python script we are unable to determine that. (jwozniak) +NXCALS_VERSION="{{nxcals_version}}" + +#Global error handling +handle_error() { + echo "An error occurred on line $1" + exit 1 +} +trap 'handle_error $LINENO' ERR + +if command -v flock &> /dev/null; then + FLOCK="flock -x 100" + else + echo "flock is not installed, race condition possible if running multiple scripts for a single venv in parallel!" + FLOCK= +fi + + function md5_cmd { if [[ $(uname) == "Darwin" ]]; then md5 @@ -80,7 +97,7 @@ function md5_cmd { fi } -NXCALS_VERSION="{{nxcals_version}}" + function get_writable_dir() { dir=$(dirname "$(mktemp -u)") @@ -97,6 +114,43 @@ if [ ! "$NXCALS_WORKSPACE_TEMP_DIR" ]; then get_writable_dir fi +function pack_venv() { + if [ ! "$NXCALS_PACK_ALL_PACKAGES" ]; then + venv-pack --python-prefix "$PYTHON_PREFIX" --output "$PACKED_VENV_FILE" \ + --exclude nxcals-bundle/nxcals_jars/\* --exclude nxcals-bundle/jars/\* --exclude nxcals-bundle/examples/\* \ + --exclude \*/pyspark/jars/\* --exclude \*/pyspark/examples/\* + else + venv-pack --python-prefix "$PYTHON_PREFIX" --output "$PACKED_VENV_FILE" + fi +} + +function fix_venv() { + echo "Extracing packed venv to fix symlink to exec..." + # Fix packed venv - symlinks to python exec may be broken + FIXED_VENV_DIR="$NXCALS_WORKSPACE_TEMP_DIR/venv" + mkdir -p $FIXED_VENV_DIR + tar -xzf $NXCALS_WORKSPACE_TEMP_DIR/nxcals-python3-env.tar.gz -C $FIXED_VENV_DIR + + echo "Fixing symlink to exec in venv..." + + for file in $FIXED_VENV_DIR/bin/python*; do + if [ -L "$file" ]; then + newTarget=$PYTHON_PREFIX/bin/python$PYTHON_VERSION + rm $file + ln -s $newTarget $file + fi + done + + echo "Packing again venv..." + OLD_PWD=`pwd` + rm $NXCALS_WORKSPACE_TEMP_DIR/nxcals-python3-env.tar.gz + cd $FIXED_VENV_DIR + tar -czf $NXCALS_WORKSPACE_TEMP_DIR/nxcals-python3-env.tar.gz ./* + cd $OLD_PWD + rm -r $FIXED_VENV_DIR +} + + #A must as the pySpark is using those 2 variables to set the python on the executor. Both vars must be set. The driver uses what is the current python3 and the executor must use the LCG. #Exlusion is only for the jupyter setting - it shouldn't be overwritten JUPYTER_PYSPARK_REGEX='^\s*jupyter(\s.*|$)' @@ -112,6 +166,7 @@ OLD_SPARK_CONF_DIR="$SPARK_CONF_DIR" export SPARK_CONF_DIR="$NXCALS_WORKSPACE_TEMP_DIR/conf" PACKED_VENV_FILE="${NXCALS_WORKSPACE_TEMP_DIR}"/{{spark_packed_venv_name}} +LOCK="${NXCALS_WORKSPACE_TEMP_DIR}"/.lock echo "ENVIRONMENT:" echo "NXCALS_VERSION=${NXCALS_VERSION}" @@ -124,6 +179,7 @@ echo "PACKED_VENV_FILE=${PACKED_VENV_FILE}" echo "PYSPARK_PYTHON=${PYSPARK_PYTHON}" echo "PYSPARK_DRIVER_PYTHON=${PYSPARK_DRIVER_PYTHON}" echo "PYSPARK_DRIVER_PYTHON_OPTS=${PYSPARK_DRIVER_PYTHON_OPTS}" +echo "LOCK=${LOCK}" echo echo "IMPORTANT:" echo "Rebuilding of the packed venv is required in cases there are new or modified packages provided by a user. In order to recreate the packed venv please:" @@ -132,39 +188,41 @@ echo " - execute your script again (rebuild will be performed during the echo "Target directory can be set with an env variable NXCALS_WORKSPACE_TEMP_DIR (if not set a temp dir will be used)." echo "Adding NXCALS related files to packed venv can be enabled by setting NXCALS_PACK_ALL_PACKAGES with any value." -if [ ! -e "$NXCALS_WORKSPACE_TEMP_DIR/conf/spark-defaults.conf" ]; then - echo "Copying $SPARK_DEFAULTS to $SPARK_CONF_DIR ..." - mkdir -p "$SPARK_CONF_DIR" +( + $FLOCK + if [ ! -e "$NXCALS_WORKSPACE_TEMP_DIR/conf/spark-defaults.conf" ]; then + echo "Copying $SPARK_DEFAULTS to $SPARK_CONF_DIR ..." + mkdir -p "$SPARK_CONF_DIR" - if [ $? -ne 0 ]; then - echo "ERROR: could not create $NXCALS_WORKSPACE_TEMP_DIR/conf. Aborting." - exit 1 - fi + cp "$OLD_SPARK_CONF_DIR"/spark-defaults.conf "$SPARK_CONF_DIR" + cp "$OLD_SPARK_CONF_DIR"/log4j2.properties "$SPARK_CONF_DIR" + NEW_SPARK_CONF="$SPARK_CONF_DIR/spark-defaults.conf" - cp "$OLD_SPARK_CONF_DIR"/* "$SPARK_CONF_DIR" - NEW_SPARK_CONF="$SPARK_CONF_DIR/spark-defaults.conf" + # make the spark.jars path absolute otherwise they are relative to the current working directory + # Mac OSX requires escaping brackets - # make the spark.jars path absolute otherwise they are relative to the current working directory - # Mac OSX requires escaping brackets - if [[ $(uname) == "Darwin" ]]; then - sed -i -r 's,\([^/]\)nxcals_jars/\([^,]*\),\1'"$SPARK_HOME"'/nxcals_jars/\2,g' "$NEW_SPARK_CONF" - else - sed -i -r 's,([^/])nxcals_jars/([^,]*),\1'"$SPARK_HOME"'/nxcals_jars/\2,g' "$NEW_SPARK_CONF" - fi - # Replace the placeholder for the virtual_env path in spark-defaults.conf - # The archive is specified with '#environment' because that is how spark - # knows where to unzip it on the executors under a new directory called environment. - # For further information: http://spark.apache.org/docs/latest/api/python/user_guide/python_packaging.html + if [[ $(uname) == "Darwin" ]]; then + sed -i -r 's,\([^/]\)nxcals_jars/\([^,]*\),\1'"$SPARK_HOME"'/nxcals_jars/\2,g' "$NEW_SPARK_CONF" + else + sed -i -r 's,([^/])nxcals_jars/([^,]*),\1'"$SPARK_HOME"'/nxcals_jars/\2,g' "$NEW_SPARK_CONF" + fi - sed -i -r 's@spark.yarn.dist.archives.*@spark.yarn.dist.archives '"$NXCALS_WORKSPACE_TEMP_DIR"'/'{{spark_packed_venv_name}}'#'{{spark_bundle_pyspark_venv_name}}'@g' "$NEW_SPARK_CONF" -fi + # Replace the placeholder for the virtual_env path in spark-defaults.conf + # The archive is specified with '#environment' because that is how spark + # knows where to unzip it on the executors under a new directory called environment. + # For further information: http://spark.apache.org/docs/latest/api/python/user_guide/python_packaging.html + sed -i -r 's@spark.yarn.dist.archives.*@spark.yarn.dist.archives '"$NXCALS_WORKSPACE_TEMP_DIR"'/'{{spark_packed_venv_name}}'#'{{spark_bundle_pyspark_venv_name}}'@g' "$NEW_SPARK_CONF" + fi +) 100>$LOCK echo "Trying to determine YARN usage to make Python work correctly (conf/spark-env.sh)..." -grep -q -e "^\s*spark.master\s*yarn" "${SPARK_CONF_DIR}/spark-defaults.conf" -if [[ $? == '0' || $@ =~ .*master.*yarn.* ]]; then +exit_code=0 +grep -q -e "^\s*spark.master\s*yarn" "${SPARK_CONF_DIR}/spark-defaults.conf" || exit_code=$? + +if [[ "${exit_code}" == "0" || $@ =~ .*master.*yarn.* ]]; then echo "Using YARN" # Normally $PYSPARK_PYTHON is set in the spark_session_builder.get_or_create(). # But when user calls pyspark directly, this option is required. @@ -180,48 +238,17 @@ if [[ $? == '0' || $@ =~ .*master.*yarn.* ]]; then echo "ERROR: YARN cluster doesn't support Python in version $PYTHON_VERSION. Supported versions are either 3.9 or 3.11" exit 1 else - echo "Creating packed venv..." - - ERROR=0 - - if [ ! "$NXCALS_PACK_ALL_PACKAGES" ]; then - venv-pack --python-prefix "$PYTHON_PREFIX" --output "$PACKED_VENV_FILE" \ - --exclude nxcals-bundle/nxcals_jars/\* --exclude nxcals-bundle/jars/\* --exclude nxcals-bundle/examples/\* \ - --exclude \*/pyspark/jars/\* --exclude \*/pyspark/examples/\* || ERROR=1 - else - venv-pack --python-prefix "$PYTHON_PREFIX" --output "$PACKED_VENV_FILE" || ERROR=1 - fi - - echo "INFO: Extracing packed venv to fix symlink to exec..." - # Fix packed venv - symlinks to python exec may be broken - FIXED_VENV_DIR="$NXCALS_WORKSPACE_TEMP_DIR/venv" - mkdir $FIXED_VENV_DIR - tar -xzf $NXCALS_WORKSPACE_TEMP_DIR/nxcals-python3-env.tar.gz -C $FIXED_VENV_DIR || ERROR=1 - - echo "INFO: Fixing symlink to exec in venv..." - - for file in $FIXED_VENV_DIR/bin/python*; do - if [ -L "$file" ]; then - newTarget=$PYTHON_PREFIX/bin/python$PYTHON_VERSION - rm $file || ERROR=1 - ln -s $newTarget $file || ERROR=1 + ( + $FLOCK + if [[ ! -e "$PACKED_VENV_FILE" ]]; then + echo "Creating packed venv..." + pack_venv + fix_venv + echo -e "Packed venv created.\n...done!" + else + echo "Packed venv already built..." fi - done - - echo "INFO: Packing again venv..." - OLD_PWD=`pwd` - rm $NXCALS_WORKSPACE_TEMP_DIR/nxcals-python3-env.tar.gz - cd $FIXED_VENV_DIR - tar -czf $NXCALS_WORKSPACE_TEMP_DIR/nxcals-python3-env.tar.gz ./* || ERROR=1 - cd $OLD_PWD - rm -r $FIXED_VENV_DIR - - if [ $ERROR -ne 0 ]; then - echo "ERROR: could not create packed venv. Aborting." - exit 3 - else - echo -e "Packed venv created.\n...done!" - fi + ) 100>$LOCK fi fi echo "IMPORTANT:" diff --git a/integration-tests/pythonTasks.gradle b/integration-tests/pythonTasks.gradle index 042b8b2c2c18f3dfa091c96375c68521ff8e59d2..02c59242fd555ed1feffb2ebcb5c2fb6fd47560f 100644 --- a/integration-tests/pythonTasks.gradle +++ b/integration-tests/pythonTasks.gradle @@ -17,7 +17,7 @@ class PythonIntegrationTest extends Exec { PythonIntegrationTest() { environment("SERVICE_URL", System.getProperty("service.url")) environment("KAFKA_SERVERS_URL", System.getProperty("kafka.producer.bootstrap.servers")) - environment("NXCALS_WORKSPACE_TEMP_DIR", "${project.buildDir}/nxcals_workspace") + environment("NXCALS_WORKSPACE_TEMP_DIR", "${project.buildDir}") executable "../run_in_venv.sh" workingDir project.parent.projectDir }