Skip to content
Snippets Groups Projects

Merging full workflow into Tanay's HiggsDNA

Open Sergi Castells requested to merge castells/higgs-dna-4-gamma-tanays-copy:master into master
Compare and Show latest version
1 file
+ 1
1
Compare changes
  • Side-by-side
  • Inline
+ 268
0
#!/usr/bin/env python
import subprocess
import os
import argparse
import glob
import sys
import json
import shlex
"""
Examples:
python3 resubmit_vanilla_lxplus.py .higgs_dna_vanilla_lxplus/<submission-directory>
Use -l for running locally.
Combine -l and -c to copy the files locally and configure local_resubmit_<sample-type>.sh to run over those local files.
Use -t to check the finished jobs without resubmitting. Similar to tail.
"""
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("directory", type=str)
parser.add_argument("-t", "--tail", action="store_true", required=False, default=False, help="Check finished jobs without resubmitting.")
parser.add_argument("-l", "--prepLocal", action="store_true", required=False, default=False, help="Prepares local scripts instead of submitting jobs.")
parser.add_argument("-c", "--copySamples", action="store_true", required=False, default=False, help="Copy samples using xrdcp to local for processing. Required to use prepLocal when specifying this option.")
args = parser.parse_args()
if args.copySamples:
assert args.copySamples and args.prepLocal, "Required to use prepLocal when using copySamples!"
## Get directory of file
cwd = os.path.dirname(os.path.abspath(__file__))
## Check if jobs are signal or data/bkg
signal = None
d = args.directory.strip("/")
if "signal" in d.lower():
signal = True
elif "data" in d.lower() or "bkg" in d.lower():
signal = False
else:
assert False, "Naming scheme must match signal/bkg/data in run JSON for this script to work properly."
print(f"Checking: {d}/")
if not signal:
EvtMix = ""
for fp in os.listdir(f"{d}/jobs/"):
for y in [f"RunIIData{EvtMix}", "Run2022", "Run2023", "RunEvtMix2022", "RunEvtMix2023"]:
if fp.find(y) != -1:
year = y
break
elif signal:
year = ""
## Write submit and sh files
submit_file = f"""executable = {cwd}/{d}/jobs/AN-{year}{{0}}_resubmit.sh
arguments = $(Item)
output = {cwd}/{d}/jobs/AN-{year}{{0}}.$(ClusterId).$(Item).out
error = {cwd}/{d}/jobs/AN-{year}{{0}}.$(ClusterId).$(Item).err
log = {cwd}/{d}/jobs/AN-{year}{{0}}.$(ClusterId).log
request_memory = 10GB
getenv = True
+JobFlavour = "espresso"
on_exit_remove = (ExitBySignal == False) && (ExitCode == 0)
max_retries = 10
requirements = Machine =!= LastRemoteHost
queue {{1}}
"""
sh_file = """#!/bin/sh
unset PYTHONPATH
export X509_USER_PROXY=/afs/crc.nd.edu/user/s/scastel2/.x509up_u240439
#export XrdSecDEBUG=2
"""
#export XRD_CPTPCTIMEOUT=3000
#export XRD_DATASERVERTTL=500
#export XRD_LOADBALANCERTTL=1500
#export XRD_REDIRCTLIMIT=22
#export XRD_REQUESTTIMEOUT=3000
#export XRD_TCPKEEPALIVE=1
## Get list of jobs to resubmit
resubmit = {}
sub_list = [fp for fp in os.listdir(f"{d}/jobs/") if (os.path.join(f"{d}/jobs/", fp).endswith(".sub") and "resubmit" not in os.path.join(f"{d}/jobs/", fp))]
for file in sub_list:
path = os.path.join(f"{d}/jobs/", file)
with open(path, "r") as f:
if not signal:
era_idx = file.find(f"{year}") + len(year)
era_end = era_idx + 1
elif signal:
era_idx = file.find("Signal")
era_end = file.find(".s")
line = f.readline()
for line in f.readlines():
if "queue" in line:
q = int(line[5:])
resubmit[file[era_idx : era_end]] = [set([procId for procId in range(q)]), None, None, None].copy() # [resubmit set, sub file, sh file, queue #]
resubmit[file[era_idx : era_end]][1] = submit_file
resubmit[file[era_idx : era_end]][2] = sh_file
resubmit[file[era_idx : era_end]][3] = q
break
## Remove jobs that are finished from resubmit list
finished = ["Processing", "100%"]
out_list = [fp for fp in os.listdir(f"{d}/jobs/") if os.path.join(f"{d}/jobs/", fp).endswith(".out")]
for file in out_list:
path = os.path.join(f"{d}/jobs/", file)
with open(path, "r") as f:
if not signal:
era_idx = file.find(f"{year}") + len(year)
era_end = era_idx + 1
elif signal:
era_idx = file.find("Signal")
era_end = file.find(".")
for line in f.readlines():
if finished[0] in line and finished[1] in line:
# Remove job from resubmit list
try:
resubmit[file[era_idx : era_end]][0].remove(int(file.split(".")[-2]))
except KeyError:
pass # Means there are multiple .out files that are finished. This is fine since outputs naming is consistent.
break
# Remove err files for easier debugging
if not args.tail:
err_list = [fp for fp in os.listdir(f"{d}/jobs/") if os.path.join(f"{d}/jobs/", fp).endswith(".err")]
for file in err_list:
path = os.path.join(f"{d}/jobs/", file)
if os.path.exists(path):
os.remove(path)
## Print out totals
for era_str in resubmit.keys():
print(f"Resubmit Needed {era_str}:\t {len(resubmit[era_str][0]):>4} \t\t Total Jobs: {resubmit[era_str][3]:>4}")
if sum([len(resubmit[key][0]) for key in resubmit.keys()]) == 0:
print("No jobs to resubmit!")
sys.exit(0)
## Send out environment details to Condor
stat, out = subprocess.getstatusoutput("voms-proxy-info -p")
out = out.strip().split("\n")[-1]
# stat is 0 the proxy is valid
if stat != 0:
raise RuntimeError("No valid proxy found. Please create one.")
proxy = out
env_extra = [
"export XRD_RUNFORKHANDLER=1",
f"export X509_USER_PROXY={proxy}",
"export PYTHONPATH=$PYTHONPATH:/afs/crc.nd.edu/user/s/scastel2/miniforge3/envs/higgs-dna/bin/python",
]
try:
env_extra.append(f'export X509_CERT_DIR={os.environ["X509_CERT_DIR"]}')
except KeyError:
pass
for cmd_str in env_extra:
g_var = cmd_str.split()[-1]
var, var_value = g_var.split("=")
os.environ[var] = var_value
## Start resubmission
# Prepare local file
if args.prepLocal:
rfile = f"local_resubmit_"
if "signal" in args.directory.lower():
rfile += "signal"
elif "data" in args.directory.lower():
rfile += "data"
elif "bkg" in args.directory.lower():
rfile += "bkg"
else:
assert False
with open(f"{rfile}.sh", "w") as _:
pass
# Prepare resubmission
for key in resubmit.keys():
if len(resubmit[key][0]) == 0:
continue
## Write sub and sh files
with open(f"{d}/jobs/AN-{year}{key}.sh", "r") as f:
line = f.readline()
while line:
if f"if [ $1 -eq" in line:
job = int(line[line.find("-eq")+3 : line.find("];")].strip())
if job in resubmit[key][0]:
resubmit[key][2] += line
command = f.readline()
resubmit[key][2] += command
resubmit[key][2] += f.readline()
if args.copySamples:
# Download sample and update to local analysis json
# NOTE: If things get deleted, just re-run HiggsDNA and copy JSONs from the new submission directory. They are consistent.
# You will need to run "find .higgs_dna_vanilla_lxplus -type f -exec sed -i 's/wrong_dir/right_dir/g' {} \;" to fix naming in JSONs
analysis_json = command[command.find("--json-analysis")+len("--json-analysis") : command.find("--dump")].strip()
with open(analysis_json, "r") as aj:
an_js = json.load(aj)
# Make paths if necessary
full_key = key
if len(key) == 1:
full_key = year + key
if not os.path.exists(f"{cwd}/2022_local_samples/"):
os.mkdir(f"{cwd}/2022_local_samples/")
if not os.path.exists(f"{cwd}/2022_local_samples/{full_key}"):
os.mkdir(f"{cwd}/2022_local_samples/{full_key}")
# Copy and update json
sample_json = an_js["samplejson"]
with open(sample_json, "r") as sj:
sm_js = json.load(sj)
assert len(sm_js[full_key]) == 1
try:
# Copy sample via xrootd. If this returns 0B then just run it again with xrdcp -f [source] [dest] manually until it works!
if "root://" in sm_js[full_key][0] and not os.path.exists(f"{cwd}/2022_local_samples/{key}/{sm_js[full_key][0].split('/')[-1]}"):
try:
subprocess.run(
shlex.split(f"xrdcp {sm_js[full_key][0]} {cwd}/2022_local_samples/{full_key}/"),
stderr = subprocess.STDOUT,
universal_newlines = True,
timeout = 30
)
except:
print(f"xrdcp timeout for {full_key} job #{job}!")
assert os.path.exists(f"{cwd}/2022_local_samples/{full_key}/{sm_js[full_key][0].split('/')[-1]}"), f"{cwd}/2022_local_samples/{full_key}/{sm_js[full_key][0].split('/')[-1]}"
assert os.stat(f"{cwd}/2022_local_samples/{full_key}/{sm_js[full_key][0].split('/')[-1]}").st_size != 0
with open(sample_json.replace(".json", "_local.json"), "w+") as sj:
sm_js[key] = [f"{cwd}/2022_local_samples/{full_key}/{sm_js[full_key][0].split('/')[-1]}"]
json.dump(sm_js, sj)
except:
print(f"Failed to update for {full_key} job #{job}!", sm_js[full_key][0])
if args.prepLocal:
# Prepare local script
new_command = command[command.find("run_analysis.py"):].strip() + " | tee " + os.path.join(d, "jobs", f"AN-{year}{key}.local.{job}.out")
if args.copySamples:
new_command = new_command.replace(sample_json, sample_json.replace(".json", "_local.json"))
with open(f"{rfile}.sh", "a") as rf:
rf.write(f"{new_command} || true\n")
line = f.readline()
## Fix permissions and resubmit jobs
if not args.tail and not args.prepLocal and not args.copySamples:
with open(f"{d}/jobs/AN-{year}{key}_resubmit.sub", "w") as f:
queue = "in (" + ", ".join([str(q) for q in sorted(resubmit[key][0])]) + ")"
f.write(resubmit[key][1].format(key, queue))
with open(f"{d}/jobs/AN-{year}{key}_resubmit.sh", "w") as f:
f.write(resubmit[key][2])
assert os.stat(f"{d}/jobs/AN-{year}{key}_resubmit.sub").st_size != 0 and os.stat(f"{d}/jobs/AN-{year}{key}_resubmit.sh").st_size != 0, "Submission and executable must exist in order to submit!"
os.system(f"chmod 775 {f'{d}/jobs/AN-{year}{key}_resubmit.sh'}")
#subprocess.run(["condor_submit", "-dry-run", f"test_{year}{key}.txt", f"{d}/jobs/AN-{year}{key}_resubmit.sub"])
subprocess.run(["condor_submit", f"{d}/jobs/AN-{year}{key}_resubmit.sub"])
Loading