Commit 12acfdae authored by Chris Burr's avatar Chris Burr
Browse files

Merge branch 'input-from-multiple-outputs' into 'master'

Support getting input from a job with multiple outputs

See merge request !58
parents 309dd7c2 8f72b916
Pipeline #3650936 passed with stages
in 3 minutes and 27 seconds
......@@ -64,7 +64,7 @@ INPUT_SCHEMAS = {
"bk_query": Map(
{"bk_query": Str(), Opt("n_test_lfns"): Int(), Opt("dq_flags"): DQ_FLAGS_SCHEMA}
),
"job_name": Map({"job_name": Str()}),
"job_name": Map({"job_name": Str(), Opt("filetype"): Regex(RE_OUTPUT_FILE_TYPE)}),
"transform_ids": Map(
{
"transform_ids": Seq(Int()),
......@@ -476,6 +476,7 @@ def validate_yaml(jobs_data, checks_data, repo_root, prod_name):
_validate_job_data(repo_root, prod_name, job_name, job_data, checks_data)
except Exception as e:
raise ValueError(f"Failed to validate {job_name!r} with error {e!r}") from e
if "bk_query" in job_data["input"]:
if job_data["input"]["bk_query"].lower() in bk_query_to_job:
warnings.append(
......@@ -483,6 +484,31 @@ def validate_yaml(jobs_data, checks_data, repo_root, prod_name):
)
bk_query_to_job[job_data["input"]["bk_query"].lower()].add(job_name.lower())
elif "job_name" in job_data["input"]:
# Ensire that the job name and output filetype are unambiguously defined
if job_data["input"]["job_name"] not in jobs_data:
raise ValueError(
f"Unrecognised job name in input: {job_data['input']['job_name']}"
)
input_job_data = jobs_data[job_data["input"]["job_name"]]
input_filetype = job_data["input"].get("filetype", "").upper()
if len(input_job_data["output"]) == 1:
if input_filetype not in [""] + input_job_data["output"]:
raise ValueError(
f"Unrecognised {input_filetype=} for {job_name=} input, "
f"expected one of: {input_job_data['output']}"
)
elif input_filetype == "":
raise ValueError(
f"{job_name} gets it's input from a job with multiple outputs. "
"The 'filetype' key must be specified in the 'input' section."
)
elif input_filetype.upper() not in input_job_data["output"]:
raise ValueError(
f"Unrecognised {input_filetype=} for {job_name=} input, "
f"expected one of: {input_job_data['output']}"
)
for query, job_set in bk_query_to_job.items():
for job_name in job_set:
warnings.extend(_check_name_magnet_polarity(query, job_name))
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment