Commit 0a62a3dd authored by smetaj's avatar smetaj
Browse files

added transient hdfs raw

parent ebb64123
......@@ -27,6 +27,7 @@ from adcern.algo_steps import read_window_dataset
from pyspark.sql.utils import AnalysisException
import shutil
import subprocess
def run_pipeline(spark, config_filepath):
......@@ -432,9 +433,18 @@ def materialize_locally(spark, config_filepath,
.write.format("parquet")\
.mode("overwrite")\
.save(hdfs_outfolder)
copy_to_local(hdfs_path=hdfs_outfolder,
local_path=local_outfolder)
raw_folder = config_dict["hdfs_out_folder"] + project_code
print("Deleting the raw data saved in %s ..." % raw_folder)
try:
subprocess.call(["hdfs", "dfs", "-rm", "-r", raw_folder])
except Exception as e_delete:
print('Error while deleting raw_folder directory: ', e_delete)
def get_normalization_path(spark, config_filepath):
"""Given the config dictionary path get the normalization path."""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment