Commit 6d585d31 authored by Chris Burr's avatar Chris Burr
Browse files

Merge branch 'use-lbprodrun-and-lbapi' into 'master'

Use LbProdRun and LbAPI

See merge request !44
parents 68b60256 65358999
Pipeline #3598378 passed with stage
in 5 minutes and 16 seconds
[report]
show_missing = True
omit =
src/LbAPLocal/cern_sso.py
......@@ -47,13 +47,11 @@ run_pytest:
deploy-packages:
stage: deploy
only:
- tags
dependencies: []
image: gitlab-registry.cern.ch/lhcb-docker/python-deployment:python-3.7
- tags@lhcb-dpa/analysis-productions/LbAPLocal
image: registry.cern.ch/docker.io/library/python:3.9
before_script:
- pip install build twine
script:
- python setup.py sdist --dist-dir public/
- python setup.py bdist_wheel --dist-dir public/
- python -m build
- if [ -z "$TWINE_PASSWORD" ] ; then echo "Set TWINE_PASSWORD in CI variables" ; exit 1 ; fi
- twine upload -u __token__ public/*
before_script: []
after_script: []
- twine upload -u __token__ dist/*
......@@ -3,7 +3,7 @@
exclude: ^(tests/data)
default_language_version:
python: python3
python: python3.9
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
......
......@@ -4,7 +4,7 @@ channels:
- conda-forge
dependencies:
- python =3.8
- python =3.9
- pip
# Runtime dependencies
- click
......@@ -23,3 +23,4 @@ dependencies:
- pytest-cov
- pytest-mock
- pytest-timeout
- pytest-recording
......@@ -4,7 +4,7 @@ extend-exclude =
tests/data/data-pkg-repo/,
[tool:pytest]
addopts = --cov=LbAPLocal --cov-report=term-missing
addopts = -v --cov=LbAPLocal --cov-report=term-missing --record-mode=none
[isort]
profile = black
......
......@@ -33,23 +33,24 @@ setup(
"Development Status :: 4 - Beta",
"License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
],
keywords="LHCb AnalysisProductions DIRAC",
packages=find_packages("src"),
package_dir={"": "src"},
python_requires=">=3.8",
python_requires=">=3.9",
setup_requires=["setuptools_scm"],
install_requires=[
"click",
"consolemd",
"LbAPCommon~=0.5.9",
"LbAPCommon~=0.6.0",
"LbEnv",
"LbDiracWrappers",
"requests",
"requests_kerberos",
"setuptools",
"mplhep",
"LbProdRun~=1.1",
],
extras_require={
"testing": [
......@@ -57,6 +58,7 @@ setup(
"pytest-cov",
"pytest-mock",
"pytest-timeout",
"pytest-recording",
]
},
package_data={"LbAPLocal": package_data},
......
......@@ -18,12 +18,7 @@ from pkg_resources import DistributionNotFound, get_distribution
from .checks import checks_exist, perform_checks
from .log_parsing import show_log_advice
from .testing import (
do_options_parsing,
enter_debugging,
prepare_reproduce,
prepare_test,
)
from .testing import enter_debugging, prepare_reproduce, prepare_test
from .utils import (
available_productions,
check_production,
......@@ -204,6 +199,7 @@ def validate(production_name):
@main.command()
@click.argument("production_name", type=production_name_type, nargs=1)
@click.argument("job_name", type=str, nargs=1)
@click.option("--validate/--no-validate", default=True)
@click.option(
"-i",
"--dependent-input",
......@@ -211,28 +207,27 @@ def validate(production_name):
nargs=1,
help="Run the test on a specific input file by passing either an LFN or a path to a local file",
)
def test(production_name, job_name, dependent_input):
def test(production_name, job_name, dependent_input, validate):
"""Execute a job locally"""
validate_environment()
if validate:
validate_environment()
inside_ap_datapkg()
out_dir, env_cmd, gaudi_cmd = prepare_test(
out_dir, prodrun_config_path = prepare_test(
production_name, job_name, dependent_input
)
cmd = env_cmd + gaudi_cmd
click.secho(f"Starting lb-run with: {shlex.join(cmd)}", fg="green")
cmd = ["lb-prod-run", prodrun_config_path, "--verbose"]
click.secho(f"Starting application with: {shlex.join(cmd)}", fg="green")
result = logging_subprocess_run(cmd, cwd=out_dir)
with open(join(out_dir, "stdout.log"), "wb") as fp:
fp.write(result.stdout)
with open(join(out_dir, "stderr.log"), "wb") as fp:
fp.write(result.stderr)
(out_dir / "stdout.log").write_bytes(result.stdout)
(out_dir / "stderr.log").write_bytes(result.stderr)
click.secho("Summary of log messages:", fg="green")
show_log_advice(result.stdout.decode() + "\n" + result.stderr.decode())
show_log_advice(
b"\n".join([result.stdout, result.stderr]).decode(errors="backslashreplace")
)
if result.returncode != 0:
raise click.ClickException("Execution failed, see above for details")
......@@ -248,41 +243,43 @@ def test(production_name, job_name, dependent_input):
raise click.ClickException(
f"Job {job_name} is not found for production {production_name}!"
)
output_file = job_data["output"].pop()
output_file = job_data["output"][0]
# Validating Gaudi options
test_ntuple_path = join(out_dir, f"00012345_00006789_1.{output_file}")
test_ntuple_path = out_dir / f"00012345_00006789_1.{output_file}"
errors, warnings = [], []
if os.path.isfile(test_ntuple_path):
if ".root" in test_ntuple_path.lower():
errors, warnings = do_options_parsing(
env_cmd,
out_dir,
join(out_dir, "output.pkl"),
test_ntuple_path,
job_name,
prod_data,
)
# TODO: This hasn't been updated to work with lb-prod-run
click.secho("Options parsing is currently unavailable", fg="yellow")
# if ".root" in test_ntuple_path.lower():
# errors, warnings = do_options_parsing(
# # TODO
# env_cmd,
# out_dir,
# join(out_dir, "output.pkl"),
# test_ntuple_path,
# job_name,
# prod_data,
# )
elif any(
str(test_ntuple_path).lower() == join(out_dir, name).lower()
for name in out_dir.iterdir()
):
warnings = [
"The output file has a different capitalisation to what is expected!"
" If this is a stripping job it could be due to a custom_stream"
" name not being written in all capital letters."
]
else:
if any(
test_ntuple_path.lower() == join(out_dir, name).lower()
for name in os.listdir(out_dir)
):
warnings = [
"The output file has a different capitalisation to what is expected!"
" If this is a stripping job it could be due to a custom_stream"
" name not being written in all capital letters."
]
else:
raise click.ClickException(
"ERROR: The expected output file does not exist!"
)
raise click.ClickException(
"ERROR: The expected output file does not exist!\n"
f"Expected: {test_ntuple_path}"
)
warnings.extend(yaml_warnings)
if warnings:
for warning in warnings:
click.secho(f"WARNING: {warning}", fg="yellow")
for warning in warnings:
click.secho(f"WARNING: {warning}", fg="yellow")
if errors:
raise click.ClickException(
......@@ -311,9 +308,11 @@ def test(production_name, job_name, dependent_input):
@click.argument("job_name", type=str, nargs=1)
@click.argument("test_ntuple_path", type=click.Path(exists=True), nargs=1)
@click.argument("checks_output_dir", type=click.Path(), default="", nargs=1)
def check(production_name, job_name, test_ntuple_path, checks_output_dir):
@click.option("--validate/--no-validate", default=True)
def check(production_name, job_name, test_ntuple_path, checks_output_dir, validate):
"""Run checks for a production"""
validate_environment()
if validate:
validate_environment()
inside_ap_datapkg()
# Obtain the right output file name
......@@ -343,6 +342,7 @@ def check(production_name, job_name, test_ntuple_path, checks_output_dir):
@main.command()
@click.argument("production_name", type=production_name_type, nargs=1)
@click.argument("job_name", type=str, nargs=1)
@click.option("--validate/--no-validate", default=True)
@click.option(
"-i",
"--dependent-input",
......@@ -350,9 +350,10 @@ def check(production_name, job_name, test_ntuple_path, checks_output_dir):
nargs=1,
help="Run the test on a specific input file by passing either an LFN or a path to a local file",
)
def debug(production_name, job_name, dependent_input):
def debug(production_name, job_name, dependent_input, validate):
"""Start an interactive session inside the job's environment"""
validate_environment()
if validate:
validate_environment()
inside_ap_datapkg()
enter_debugging(*prepare_test(production_name, job_name, dependent_input))
......@@ -363,9 +364,11 @@ def debug(production_name, job_name, dependent_input):
@click.argument("production_name", type=production_name_type, nargs=1)
@click.argument("job_name", type=str, nargs=1)
@click.argument("test_id", type=str, nargs=1, default="latest")
def reproduce(pipeline_id, production_name, job_name, test_id):
@click.option("--validate/--no-validate", default=True)
def reproduce(pipeline_id, production_name, job_name, test_id, validate):
"""Reproduce an existing online test locally"""
validate_environment()
if validate:
validate_environment()
enter_debugging(*prepare_reproduce(pipeline_id, production_name, job_name, test_id))
......
......@@ -9,34 +9,39 @@
# or submit itself to any jurisdiction. #
###############################################################################
import importlib
import io
import json
import os
import shlex
import shutil
import subprocess
import tempfile
import xml.etree.ElementTree as ET
from os.path import dirname, join
from urllib.parse import urlencode, urlparse, urlunparse
import zipfile
from os.path import join
import click
import requests
from LbAPCommon import cern_sso, parse_yaml, render_yaml, validate_yaml
from LbAPCommon import get_sso_token, parse_yaml, render_yaml, validate_yaml
from LbAPCommon.hacks import project_uses_cmt
from LbAPCommon.options_parsing import validate_options
from .utils import (
check_production,
check_status_code,
create_output_dir,
pool_xml_catalog,
recursively_create_input,
resolve_input_overrides,
)
ANA_PROD_WEB_URL = urlparse(cern_sso.ANA_PROD_HOST)[:2] + ("/dynamic/test-locally/",)
LBAPI_HOST = os.environ.get("LBAPI_HOST", "https://lbap.app.cern.ch")
LBAPI_CLIENT_ID = os.environ.get("LBAPI_CLIENT_ID", "lhcb-analysis-productions")
def prepare_test(production_name, job_name, dependent_input=None):
"""Run a local test job for a specific production job"""
token = get_sso_token(LBAPI_HOST, LBAPI_CLIENT_ID)
# Check if production exists
check_production(production_name)
......@@ -53,187 +58,169 @@ def prepare_test(production_name, job_name, dependent_input=None):
)
application_name, application_version = job_data["application"].rsplit("/", 1)
options = [*job_data["options"]]
require_cmt = project_uses_cmt(application_name, application_version)
params = {
"application_name": application_name,
"application_version": application_version,
prodrun_conf = {
"spec_version": 1,
"application": {
"name": application_name,
"version": application_version,
"data_pkgs": ["AnalysisProductions v999999999999", "ProdConf"],
},
"options": {"format": "WGProd", "files": job_data["options"]},
"input": {"xml_summary_file": "summaryDaVinci_00012345_00006789_1.xml"},
"output": {"prefix": "00012345_00006789_1"},
}
if "bk_query" in job_data["input"]:
params["bk_query"] = job_data["input"]["bk_query"]
elif "transform_ids" in job_data["input"]:
params["transform_ids"] = job_data["input"]["transform_ids"]
params["filetype"] = job_data["input"]["filetype"]
elif "job_name" in job_data["input"]:
dependent_job = job_data["input"]["job_name"]
# check we only need to handle one output
if len(prod_data[dependent_job]["output"]) != 1:
raise NotImplementedError(
"Testing jobs that take input from jobs with multiple outputs is not yet supported"
)
# figure out the location of the input we need
if dependent_input is None:
prod_data[dependent_job]["job_name"] = dependent_job
dependent_input = recursively_create_input(
production_name, prod_data[dependent_job], job_name
)
# check the input we need exists at the location we think it should
if not os.path.exists(dependent_input):
raise OSError(
f"Local input file not found for {job_name}, please check you have provided the correct path."
)
if "bk_query" in job_data["input"]:
params["bk_query"] = prod_data[dependent_job]["input"]["bk_query"]
elif "transform_ids" in job_data["input"]:
params["transform_ids"] = job_data[dependent_job]["transform_ids"]
params["filetype"] = job_data[dependent_job]["filetype"]
else:
raise NotImplementedError(
"Input requires either a bookkeeping location or a previous job name"
)
params["override_output_filetype"] = prod_data[dependent_job]["output"][0]
else:
raise NotImplementedError(
"Input requires either a bookkeeping location or a previous job name"
override_input, output_filetype = resolve_input_overrides(prod_data, job_name)
if "job_name" in job_data["input"] and dependent_input is None:
dependent_input = recursively_create_input(
production_name, job_data["input"]["job_name"], prod_data
)
# only create output directories if this is the job that's about to be run
# i.e. there are no dependent jobs that need to be run first
dynamic_dir, out_dir = create_output_dir(production_name, require_cmt)
if "turbo" in job_data and job_data["turbo"]:
params["turbo"] = job_data["turbo"]
if "root_in_tes" in job_data:
params["root_in_tes"] = job_data["root_in_tes"]
try:
data = cern_sso.get_with_cookies(
urlunparse(ANA_PROD_WEB_URL + ("", urlencode(params), ""))
if job_data["automatically_configure"]:
params = {}
if output_filetype:
params["override-output-filetype"] = output_filetype
response = requests.post(
f"{LBAPI_HOST}/pipelines/autoconf-options/",
headers={"Authorization": f"Bearer {token}"},
json={**job_data, **({"input": override_input} if override_input else {})},
params=params,
)
except cern_sso.SSOException as e:
raise click.ClickException(str(e))
check_status_code(response)
if job_data["automatically_configure"]:
config_fn = job_name + "_autoconf.py"
config_path = join(dynamic_dir, production_name, config_fn)
os.makedirs(dirname(config_path))
with open(config_path, "wt") as f:
f.write(data["dynamic_options"]["local_autoconf.py"])
options.insert(
0, join("$ANALYSIS_PRODUCTIONS_DYNAMIC", production_name, config_fn)
config_path = dynamic_dir / production_name / f"{job_name}_autoconf.py"
config_path.parent.mkdir(parents=True)
config_path.write_text(response.text)
prodrun_conf["options"]["files"].insert(
0, join("$ANALYSIS_PRODUCTIONS_DYNAMIC", production_name, config_path.name)
)
prod_conf_fn = "prodConf_DaVinci_00012345_00006789_1.py"
output_pkl = "--output=output.pkl"
gaudi_cmd = ["gaudirun.py", "-T", *options, prod_conf_fn, output_pkl]
prodrun_config_path = out_dir / "prodConf_DaVinci_00012345_00006789_1.json"
if len(job_data["output"]) != 1:
raise NotImplementedError()
output_file_type = job_data["output"].pop()
prodrun_conf["output"]["types"] = job_data["output"]
# force to use the file chosen by the user
if "bk_query" in job_data["input"]:
if "bk_query" in job_data["input"] or "transform_ids" in job_data["input"]:
if dependent_input is not None:
lfns = [dependent_input]
prodrun_conf["input"]["files"] = [dependent_input]
else:
lfns = json.dumps([f"LFN:{lfn}" for lfn in data["lfns"]])
response = requests.post(
f"{LBAPI_HOST}/pipelines/lfns/",
headers={"Authorization": f"Bearer {token}"},
json=job_data["input"],
)
lfns = check_status_code(response).json()
# only need a catalog if we're using Dirac data
with open(join(out_dir, "pool_xml_catalog.xml"), "wt") as fp:
fp.write(pool_xml_catalog(data["lfns"]))
prodrun_conf["input"]["files"] = [f"LFN:{lfn}" for lfn in lfns]
(out_dir / "pool_xml_catalog.xml").write_text(pool_xml_catalog(lfns))
prodrun_conf["input"]["xml_file_catalog"] = "pool_xml_catalog.xml"
elif "job_name" in job_data["input"]:
lfns = [dependent_input]
with open(join(out_dir, prod_conf_fn), "wt") as fp:
fp.write(
"\n".join(
[
"from ProdConf import ProdConf",
"ProdConf(",
" NOfEvents=-1,",
f" AppVersion='{application_version}',",
" OptionFormat='WGProd',",
" XMLSummaryFile='summaryDaVinci_00012345_00006789_1.xml',",
f" Application='{application_name}',",
" OutputFilePrefix='00012345_00006789_1',",
" XMLFileCatalog='pool_xml_catalog.xml',",
f" InputFiles={lfns},",
f" OutputFileTypes=['{output_file_type}'],",
")",
]
)
)
prodrun_conf["input"]["files"] = [dependent_input]
prodrun_config_path.write_text(json.dumps(prodrun_conf))
return out_dir, prodrun_config_path.name
return out_dir, data["env-command"], gaudi_cmd
def prepare_reproduce(pipeline_id, production_name, job_name, test_id):
token = get_sso_token(LBAPI_HOST, LBAPI_CLIENT_ID)
def prepare_reproduce(pipeline_id, production_name, job_name, test_id="latest"):
click.secho(
f"Reproducing test for test {pipeline_id} {production_name} {job_name}",
fg="green",
)
try:
data = cern_sso.get_with_cookies(
f"{cern_sso.ANA_PROD_HOST}/dynamic/{pipeline_id}/{production_name}/"
f"{job_name}/{test_id}/reproduce_locally.json"
)
except cern_sso.SSOException as e:
raise click.ClickException(str(e))
prod_url = f"{LBAPI_HOST}/pipelines/{pipeline_id}/{production_name}/"
response = requests.get(prod_url, headers={"Authorization": f"Bearer {token}"})
prod_info = check_status_code(response).json()
job_url = f"{prod_url}jobs/{job_name}/"
if test_id != "latest":
job_url += f"tests/{test_id}"
response = requests.get(job_url, headers={"Authorization": f"Bearer {token}"})
job_info = check_status_code(response).json()
if test_id == "latest":
job_url += f"tests/{job_info['test']['attempt']}"
tmp_dir = tempfile.mkdtemp()
click.secho(f"Cloning {data['git_repo']}", fg="green")
subprocess.check_call(["git", "clone", data["git_repo"], tmp_dir])
click.secho(f"Cloning {prod_info['repo_url']}", fg="green")
subprocess.check_call(["git", "clone", prod_info["repo_url"], tmp_dir])
click.secho(f"Running test in {tmp_dir}", fg="green")
os.chdir(tmp_dir)
click.secho(f"Checking out {data['revision']}", fg="green")
subprocess.check_call(["git", "checkout", data["revision"]])
click.secho(f"Checking out {prod_info['commit']}", fg="green")
subprocess.check_call(["git", "checkout", prod_info["commit"]])
check_production(production_name)
app_name, app_version = data["env-command"][-1].rsplit("/", 1)
require_cmt = project_uses_cmt(app_name, app_version)
require_cmt = project_uses_cmt(
job_info["application_name"], job_info["application_version"]
)
dynamic_dir, out_dir = create_output_dir(production_name, require_cmt)
# Write the dynamic option files
for filename, filecontent in data["dynamic_options"].items():
filename = join(dynamic_dir, filename)
os.makedirs(dirname(filename), exist_ok=True)
with open(filename, "wt") as f:
f.write(filecontent)
# Download the jobs output sandbox
response = requests.get(
f"{job_url}/zip", headers={"Authorization": f"Bearer {token}"}
)
zip_data = io.BytesIO(check_status_code(response).content)
with zipfile.ZipFile(zip_data) as zf:
zf.extractall(out_dir)
prodrun_config_path = max(
out_dir.glob("prodConf*.json"), key=lambda x: x.name.split("_")[-1]
)
# Write automatic configuration if requested
if job_info["automatically_configure"]:
filename = dynamic_dir / job_info["dynamic_options_path"]
# Prevent directory traversal
if str(filename.relative_to(dynamic_dir)).startswith("."):
raise NotImplementedError(dynamic_dir, filename)
filename.parent.mkdir(parents=True)
filename.write_text(job_info["autoconf_options"])
# Download the job input
for filename, url in data["download_files"].items():
click.secho(f"Downloading {filename}", fg="green")
filename = join(out_dir, filename)
os.makedirs(dirname(filename), exist_ok=True)
input_paths = [x["path"] for x in job_info["test"]["input_files"]]
if all("/" in x for x in input_paths):
# It's an LFN so write the pool_xml_catalog.xml
(out_dir / "pool_xml_catalog.xml").write_text(pool_xml_catalog(input_paths))
elif all("/" not in x for x in input_paths):
if len(input_paths) != 1:
raise NotImplementedError(input_paths)
filename = out_dir / input_paths[0]
# It's from another job so download from S3
response = requests.get(
f"{prod_url}jobs/{job_info['input']['job_name']}/tests/"
f"{job_info['test']['attempt']}/files/{filename.name}/url",
headers={"Authorization": f"Bearer {token}"},
)
url = check_status_code(response).json()["url"]
click.secho(f"Downloading {filename.name}", fg="green")
with requests.get(url, stream=True) as resp:
if not resp.ok:
click.secho(resp.text, fg="red")
raise click.ClickException("Network request for job file failed")
with open(filename, "wb") as fp: