diff --git a/Tools/ART/python/ART/__init__.py b/Tools/ART/python/ART/__init__.py index 01a987debda707b593da70242fd983d5734b7e06..83f3adc32ad22eff9c7d9f08203397dea1528c7d 100644 --- a/Tools/ART/python/ART/__init__.py +++ b/Tools/ART/python/ART/__init__.py @@ -1,4 +1,4 @@ -# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +# Copyright (C) 2002-2018 CERN for the benefit of the ATLAS collaboration """ Import default Classes. diff --git a/Tools/ART/python/ART/art_base.py b/Tools/ART/python/ART/art_base.py index 927db00e7a7854c2a7201eb415756e8cde06acc1..f397d513661f3ff64944990993416e936c5b654d 100755 --- a/Tools/ART/python/ART/art_base.py +++ b/Tools/ART/python/ART/art_base.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +# Copyright (C) 2002-2018 CERN for the benefit of the ATLAS collaboration """Base class for grid and (local) build submits.""" __author__ = "Tulay Cuhadar Donszelmann <tcuhadar@cern.ch>" @@ -103,10 +103,6 @@ class ArtBase(object): log.info("%s %s", key, config.get(nightly_release, project, platform, package, key)) return 0 - def download(self, input_file): - """Download input_file from RUCIO.""" - return self.get_input(input_file) - # # Default implementations # @@ -160,9 +156,9 @@ class ArtBase(object): """ Return a list of all test files matching 'test_*.sh' of given 'job_type', 'index_type' and nightly/project/platform. - 'index_type' can be 'all', 'batch' or 'single'. + 'job_type' can be 'grid' or 'build', given by the test - If "given" is None, all files are returned. + 'index_type' can be 'all', 'batch' or 'single'. Only the filenames are returned. """ @@ -243,20 +239,6 @@ class ArtBase(object): return True return False - def get_input(self, input_name): - """Download input file from rucio. Retuns path of inputfile.""" - work_dir = '.' - - # run in correct environment - env = os.environ.copy() - env['PATH'] = '.:' + env['PATH'] - - (code, out, err) = run_command(os.path.join(self.art_directory, "art-get-input.sh") + " " + input_name, dir=work_dir, env=env) - if code == 0 and out != '': - return os.path.join(work_dir, input_name.replace(':', '/', 1)) - - return None - # # Private Methods # diff --git a/Tools/ART/python/ART/art_build.py b/Tools/ART/python/ART/art_build.py index b77d18664044fdd3e25ab6235284a5b511f06019..28c449b1b73cdbc6ff96eac0c26897c336cf1952 100644 --- a/Tools/ART/python/ART/art_build.py +++ b/Tools/ART/python/ART/art_build.py @@ -1,22 +1,23 @@ #!/usr/bin/env python -# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +# Copyright (C) 2002-2018 CERN for the benefit of the ATLAS collaboration """Class for (local) build submits.""" __author__ = "Tulay Cuhadar Donszelmann <tcuhadar@cern.ch>" import collections +import concurrent.futures import fnmatch import json import logging import multiprocessing import os +from datetime import datetime + from art_misc import run_command, mkdir_p from art_base import ArtBase from art_header import ArtHeader -from parallelScheduler import ParallelScheduler - MODULE = "art.build" @@ -29,11 +30,13 @@ def run_job(art_directory, sequence_tag, script_directory, package, job_type, jo """ # <script_directory> <sequence_tag> <package> <outfile> <job_type> <job_index> log = logging.getLogger(MODULE) + start_time = datetime.now() log.info("job started %s %s %s %s %s %d %s", art_directory, sequence_tag, script_directory, package, job_type, job_index, test_name) (exit_code, out, err) = run_command(' '.join((os.path.join(art_directory, './art-internal.py'), "build", "job", script_directory, sequence_tag, package, "out", job_type, str(job_index)))) log.info("job ended %s %s %s %s %s %d %s", art_directory, sequence_tag, script_directory, package, job_type, job_index, test_name) + end_time = datetime.now() - return (test_name, exit_code, out, err) + return (package, test_name, exit_code, out, err, start_time, end_time) class ArtBuild(ArtBase): @@ -61,29 +64,56 @@ class ArtBuild(ArtBase): if not test_directories: log.warning('No tests found in directories ending in "test"') - status = collections.defaultdict(lambda: collections.defaultdict(lambda: collections.defaultdict())) + log.info("Executor started with %d threads", self.max_jobs) + executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_jobs) + future_set = [] for package, directory in test_directories.items(): + future_set.extend(self.task(executor, package, job_type, sequence_tag)) + + # Create status of all packages + status = collections.defaultdict(lambda: collections.defaultdict(lambda: collections.defaultdict())) + + # Some release information + status['release_info']['nightly_release'] = self.nightly_release + status['release_info']['nightly_tag'] = self.nightly_tag + status['release_info']['project'] = self.project + status['release_info']['platform'] = self.platform + + # Package information with all tests in each package + for future in concurrent.futures.as_completed(future_set): + (package, test_name, exit_code, out, err, start_time, end_time) = future.result() + log.debug("Handling job for %s %s", package, test_name) + status[package][test_name]['exit_code'] = exit_code + # Removed, seem to give empty lines + # status[package][test_name]['out'] = out + # status[package][test_name]['err'] = err + status[package][test_name]['start_time'] = start_time.strftime('%Y-%m-%dT%H:%M:%S') + status[package][test_name]['end_time'] = end_time.strftime('%Y-%m-%dT%H:%M:%S') + status[package][test_name]['start_epoch'] = start_time.strftime('%s') + status[package][test_name]['end_epoch'] = end_time.strftime('%s') + test_directory = os.path.abspath(test_directories[package]) - job_results = self.task(package, job_type, sequence_tag) - for job_result in job_results: - test_name = job_result[0] - status[package][test_name]['exit_code'] = job_result[1] - # Removed, seem to give empty lines - # status[package][test_name]['out'] = job_result[2] - # status[package][test_name]['err'] = job_result[3] - fname = os.path.join(test_directory, test_name) + fname = os.path.join(test_directory, test_name) + if os.path.exists(fname): status[package][test_name]['description'] = ArtHeader(fname).get(ArtHeader.ART_DESCRIPTION) - status[package][test_name]['test_directory'] = test_directory - - # gather results - result = [] - log.debug("Looking for results for test %s", test_name) - with open(os.path.join(sequence_tag, package, os.path.splitext(test_name)[0], 'stdout.txt'), 'r') as f: + else: + log.warning("Test file cannot be opened to get description: %s", fname) + status[package][test_name]['description'] = "" + status[package][test_name]['test_directory'] = test_directory + + # gather results + result = [] + stdout_path = os.path.join(sequence_tag, package, os.path.splitext(test_name)[0], 'stdout.txt') + log.debug("Looking for results in %s", stdout_path) + if os.path.exists(stdout_path): + with open(stdout_path, 'r') as f: output = f.read() result = ArtBase.get_art_results(output) + else: + log.warning("Output file does not exist: %s", stdout_path) - status[package][test_name]['result'] = result + status[package][test_name]['result'] = result mkdir_p(sequence_tag) with open(os.path.join(sequence_tag, "status.json"), 'w') as outfile: @@ -91,16 +121,18 @@ class ArtBuild(ArtBase): return 0 - def task(self, package, job_type, sequence_tag): + def task(self, executor, package, job_type, sequence_tag): """Run tests of a single package.""" log = logging.getLogger(MODULE) log.debug("task %s %s %s", package, job_type, sequence_tag) test_directories = self.get_test_directories(self.script_directory) test_directory = os.path.abspath(test_directories[package]) test_names = self.get_files(test_directory, job_type, "all", self.nightly_release, self.project, self.platform) - scheduler = ParallelScheduler(self.max_jobs + 1) + if not test_names: + log.debug("No tests found for package %s and job_type %s", package, job_type) - index = 0 + future_set = [] + job_index = 0 for test_name in test_names: schedule_test = False fname = os.path.join(test_directory, test_name) @@ -119,11 +151,10 @@ class ArtBuild(ArtBase): log.warning("job skipped, file not executable: %s", fname) if schedule_test: - scheduler.add_task(task_name="t" + str(index), dependencies=[], description="d", target_function=run_job, function_kwargs={'art_directory': self.art_directory, 'sequence_tag': sequence_tag, 'script_directory': self.script_directory, 'package': package, 'job_type': job_type, 'job_index': index, 'test_name': test_name}) - index += 1 + future_set.append(executor.submit(run_job, self.art_directory, sequence_tag, self.script_directory, package, job_type, job_index, test_name)) + job_index += 1 - result = scheduler.run() - return result + return future_set def job(self, sequence_tag, package, out, job_type, job_index): """Run a single test.""" @@ -135,6 +166,7 @@ class ArtBuild(ArtBase): work_directory = os.path.join(sequence_tag, package, os.path.splitext(test_name)[0]) mkdir_p(work_directory) + log.debug("Work dir %s", work_directory) # Tests are called with arguments: PACKAGE TEST_NAME SCRIPT_DIRECTORY TYPE script_directory = '.' @@ -147,8 +179,10 @@ class ArtBuild(ArtBase): (exit_code, output, err) = run_command(cmd, dir=work_directory, env=env) with open(os.path.join(work_directory, "stdout.txt"), "w") as text_file: + log.debug("Copying stdout into %s", work_directory) text_file.write(output) with open(os.path.join(work_directory, "stderr.txt"), "w") as text_file: + log.debug("Copying stderr into %s", work_directory) text_file.write(err) return exit_code diff --git a/Tools/ART/python/ART/art_configuration.py b/Tools/ART/python/ART/art_configuration.py index 1e8910b62dde194c49a7bd0a1b67c13791bf55ff..b87a3790be7d6dcdf8708a7c88678bacdecd127e 100644 --- a/Tools/ART/python/ART/art_configuration.py +++ b/Tools/ART/python/ART/art_configuration.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +# Copyright (C) 2002-2018 CERN for the benefit of the ATLAS collaboration """Interface to the general ART configuration.""" __author__ = "Tulay Cuhadar Donszelmann <tcuhadar@cern.ch>" diff --git a/Tools/ART/python/ART/art_grid.py b/Tools/ART/python/ART/art_grid.py index a00e499f54bf468cb42e42d8d717b53d57192bfa..b9b6dc86637d53c7eb12a395461a99262fd79e2c 100644 --- a/Tools/ART/python/ART/art_grid.py +++ b/Tools/ART/python/ART/art_grid.py @@ -1,14 +1,15 @@ #!/usr/bin/env python -# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +# Copyright (C) 2002-2018 CERN for the benefit of the ATLAS collaboration """Class for grid submission.""" __author__ = "Tulay Cuhadar Donszelmann <tcuhadar@cern.ch>" import atexit -import datetime +import concurrent.futures import glob import json import logging +import multiprocessing import os import re import shutil @@ -18,6 +19,9 @@ import tempfile import time import urllib2 +from datetime import datetime +from datetime import timedelta + from art_base import ArtBase from art_configuration import ArtConfiguration from art_header import ArtHeader @@ -27,6 +31,27 @@ from art_misc import mkdir_p, make_executable, run_command MODULE = "art.grid" +def copy_job(art_directory, indexed_package, dst): + """ + Copy job to be run by executor. + + Needs to be defined outside a class. + Names of arguments are important, see call to scheduler. + """ + log = logging.getLogger(MODULE) + start_time = datetime.now() + log.info("job started %s %s %s", art_directory, indexed_package, dst) + (exit_code, out, err) = run_command(' '.join((os.path.join(art_directory, './art.py'), "copy", "--dst=" + dst, indexed_package))) + log.info("job ended %s %s %s", art_directory, indexed_package, dst) + end_time = datetime.now() + + print "Exit Code:", exit_code + print "Out: ", out + print "Err: ", err + + return (indexed_package, exit_code, out, err, start_time, end_time) + + class ArtGrid(ArtBase): """Class for grid submission.""" @@ -39,18 +64,20 @@ class ArtGrid(ArtBase): JOB_REPORT_ART_KEY = 'art' RESULT_WAIT_INTERVAL = 5 * 60 - def __init__(self, art_directory, nightly_release, project, platform, nightly_tag, script_directory=None, skip_setup=False, submit_directory=None): + def __init__(self, art_directory, nightly_release, project, platform, nightly_tag, script_directory=None, skip_setup=False, submit_directory=None, max_jobs=0): """Keep arguments.""" super(ArtGrid, self).__init__(art_directory) self.nightly_release = nightly_release + self.nightly_release_short = re.sub(r"-VAL-.*", "-VAL", self.nightly_release) self.project = project self.platform = platform self.nightly_tag = nightly_tag self.script_directory = script_directory self.skip_setup = skip_setup self.submit_directory = submit_directory + self.max_jobs = multiprocessing.cpu_count() if max_jobs <= 0 else max_jobs - self.rucio = ArtRucio() + self.rucio = ArtRucio(self.art_directory, self.nightly_release_short, project, platform, nightly_tag) def status(self, status): """Print status for usage in gitlab-ci.""" @@ -102,9 +129,9 @@ class ArtGrid(ArtBase): shutil.copy(os.path.join(self.art_directory, 'art.py'), run_dir) shutil.copy(os.path.join(self.art_directory, 'art-diff.py'), run_dir) - shutil.copy(os.path.join(self.art_directory, 'art-get-input.sh'), run_dir) shutil.copy(os.path.join(self.art_directory, 'art-internal.py'), run_dir) shutil.copy(os.path.join(self.art_directory, 'art-task-grid.sh'), run_dir) + shutil.copy(os.path.join(self.art_directory, 'art-download.sh'), run_dir) shutil.copy(os.path.join(art_python_directory, '__init__.py'), ART) shutil.copy(os.path.join(art_python_directory, 'art_base.py'), ART) shutil.copy(os.path.join(art_python_directory, 'art_build.py'), ART) @@ -115,14 +142,12 @@ class ArtGrid(ArtBase): shutil.copy(os.path.join(art_python_directory, 'art_rucio.py'), ART) shutil.copy(os.path.join(art_python_directory, 'docopt.py'), ART) shutil.copy(os.path.join(art_python_directory, 'docopt_dispatch.py'), ART) - shutil.copy(os.path.join(art_python_directory, 'parallelScheduler.py'), ART) - shutil.copy(os.path.join(art_python_directory, 'serialScheduler.py'), ART) make_executable(os.path.join(run_dir, 'art.py')) make_executable(os.path.join(run_dir, 'art-diff.py')) - make_executable(os.path.join(run_dir, 'art-get-input.sh')) make_executable(os.path.join(run_dir, 'art-internal.py')) make_executable(os.path.join(run_dir, 'art-task-grid.sh')) + make_executable(os.path.join(run_dir, 'art-download.sh')) script_directory = self.get_script_directory() @@ -140,24 +165,12 @@ class ArtGrid(ArtBase): match = re.search(r"jediTaskID=(\d+)", text) return match.group(1) if match else -1 - def get_nightly_release_short(self): - """Return a short version of the nightly release.""" - return re.sub(r"-VAL-.*", "-VAL", self.nightly_release) - - def copy(self, package, dst=None, user=None): + def copy(self, indexed_package, dst=None, user=None): """Copy output from scratch area to eos area.""" log = logging.getLogger(MODULE) - real_user = os.getenv('USER', ArtGrid.ARTPROD) - user = real_user if user is None else user - default_dst = ArtGrid.EOS_OUTPUT_DIR if real_user == ArtGrid.ARTPROD else '.' - dst = default_dst if dst is None else dst - - if package is not None: - log.info("Copy %s", package) - outfile = self.rucio.get_outfile(user, package, self.get_nightly_release_short(), self.project, self.platform, self.nightly_tag) - log.info("Copying from %s", outfile) - return self.copy_output(outfile, dst) + if indexed_package is not None: + return self.copy_package(indexed_package, dst, user) # make sure script directory exist self.exit_if_no_script_directory() @@ -169,108 +182,59 @@ class ArtGrid(ArtBase): # copy results for all packages result = 0 - for package, root in test_directories.items(): + for indexed_package, root in test_directories.items(): number_of_tests = len(self.get_files(root, "grid", "all", self.nightly_release, self.project, self.platform)) if number_of_tests > 0: - log.info("Copy %s", package) - outfile = self.rucio.get_outfile(user, package, self.get_nightly_release_short(), self.project, self.platform, self.nightly_tag) - log.info("Copying from %s", outfile) - - result |= self.copy_output(outfile, dst) + result |= self.copy_package(indexed_package, dst, user) return result - def copy_output(self, outfile, dst): - """Copy outfile to dst.""" + def copy_package(self, indexed_package, dst, user): + """Copy package to dst.""" log = logging.getLogger(MODULE) + real_user = os.getenv('USER', ArtGrid.ARTPROD) + user = real_user if user is None else user + default_dst = ArtGrid.EOS_OUTPUT_DIR if real_user == ArtGrid.ARTPROD else '.' + dst = default_dst if dst is None else dst - cleanup = False + # for debugging + cleanup = True result = 0 - outfile_pattern = r"([^\.]+)\.([^\.]+)\.([^\.]+)\.(.+)\.([^\.]+)\.([^\.]+)\.([^\.]+)\.([^\.]+)\.([^\.\n]+)" - match = re.search(outfile_pattern, outfile) - if not match: - log.error("%s does not match pattern", outfile) - return 1 - (user_type, user, experiment, nightly_release, project, platform, nightly_tag, sequence_tag, package) = match.groups() - dst_dir = os.path.join(dst, nightly_release, nightly_tag, project, platform, package) - log.info("%s", dst_dir) + + package = indexed_package.split('.')[0] + dst_dir = os.path.join(dst, self.nightly_release, self.project, self.platform, self.nightly_tag, package) + log.info("dst_dir %s", dst_dir) tmp_dir = tempfile.mkdtemp() if cleanup: - atexit.register(shutil.rmtree, tmp_dir) - - tmp_json = os.path.join(tmp_dir, ArtRucio.ART_JOB) - tmp_log = os.path.join(tmp_dir, ArtRucio.LOG_TGZ) - tmp_tar = os.path.join(tmp_dir, ArtRucio.JOB_TAR) + atexit.register(shutil.rmtree, tmp_dir, ignore_errors=True) - for index in self.rucio.get_indices(user, outfile + ArtRucio.OUTPUT): + for entry in self.rucio.get_table(user, indexed_package): + index = entry['grid_index'] + log.debug("Index %d", index) # get the test name - test_name = self.rucio.get_job_name(user, index, package, sequence_tag, nightly_release, project, platform, nightly_tag) + test_name = entry['job_name'] if test_name is None: log.error("JSON Lookup Error for test %d", index) result = 1 continue + log.debug("Test_name %s", test_name) # create tmp test directory test_dir = os.path.join(tmp_dir, test_name) mkdir_p(test_dir) - # copy art-job.json, ignore error - log.info("Copying JSON: %d %s", index, outfile + ArtRucio.JSON) - if self.rucio.xrdcp(self.rucio.get_rucio_name(user, outfile + ArtRucio.JSON, index), tmp_json, force=True) == 0: - shutil.copyfile(tmp_json, os.path.join(test_dir, ArtRucio.ART_JOB)) + # copy art-job.json + result |= self.copy_json(os.path.join(tempfile.gettempdir(), entry['outfile'] + "_EXT0", self.__get_rucio_name(user, entry, 'json')), test_dir) # copy and unpack log - log.info("Copying LOG: %d %s", index, outfile + ArtRucio.LOG) - if self.rucio.xrdcp(self.rucio.get_rucio_name(user, outfile + ArtRucio.LOG, index), tmp_log, force=True) != 0: - log.error("Log Unpack Error") - result = 1 - else: - log.info("Unpacking LOG: %s %s", index, test_dir) - tar = tarfile.open(tmp_log) - for member in tar.getmembers(): - tar.extract(member, path=test_dir) - # does not work: tar.extractall() - tar.close() - - log.info("Copying TAR: %d %s", index, outfile + ArtRucio.OUTPUT) + result |= self.copy_log(user, package, test_name, test_dir) # copy results and unpack - if self.rucio.xrdcp(self.rucio.get_rucio_name(user, outfile + ArtRucio.OUTPUT, index), tmp_tar, force=True) != 0: - log.error("TAR Error") - result = 1 - else: - log.info("Unpacking TAR: %d %s to %s", index, tmp_tar, test_dir) - tar = tarfile.open(tmp_tar) - tar.extractall(path=test_dir) - tar.close() + result |= self.copy_results(user, package, test_name, test_dir) # copy to eos - dst_target = os.path.join(dst_dir, test_name) - if dst_target.startswith('/eos'): - # mkdir_cmd = 'eos ' + ArtGrid.EOS_MGM_URL + ' mkdir -p' - mkdir_cmd = None - xrdcp_target = ArtGrid.EOS_MGM_URL + dst_target + '/' - else: - mkdir_cmd = 'mkdir -p' - xrdcp_target = dst_target - log.info("Copying to DST: %d %s", index, xrdcp_target) - - if mkdir_cmd is not None: - (exit_code, out, err) = run_command(' '.join((mkdir_cmd, dst_target))) - if exit_code != 0: - log.error("Mkdir Error: %d %s %s", exit_code, out, err) - result = 1 - - cmd = ' '.join(('xrdcp -N -r -p -v', test_dir, xrdcp_target)) - log.info("using: %s", cmd) - (exit_code, out, err) = run_command(cmd) - if exit_code not in [0, 51, 54]: - # 0 all is ok - # 51 File exists - # 54 is already copied - log.error("XRDCP to EOS Error: %d %s %s", exit_code, out, err) - result = 1 + result |= self.copy_to_eos(index, test_name, test_dir, dst_dir) # cleanup if cleanup: @@ -278,6 +242,70 @@ class ArtGrid(ArtBase): return result + def copy_json(self, json_file, test_dir): + """Copy json.""" + log = logging.getLogger(MODULE) + log.info("Copying JSON: %s", json_file) + shutil.copyfile(json_file, os.path.join(test_dir, ArtRucio.ART_JOB)) + return 0 + + def copy_log(self, user, package, test_name, test_dir): + """Copy and unpack log file.""" + log = logging.getLogger(MODULE) + log.info("Copying LOG: %s %s", package, test_name) + + tar = self.__open_tar(user, package, test_name, tar=False) + if tar is not None: + log.info("Unpacking LOG: %s", test_dir) + for member in tar.getmembers(): + tar.extract(member, path=test_dir) + # does not work: tar.extractall() + tar.close() + return 0 + + def copy_results(self, user, package, test_name, test_dir): + """Copy results and unpack.""" + log = logging.getLogger(MODULE) + log.info("Copying TAR: %s %s", package, test_name) + + tar = self.__open_tar(user, package, test_name) + if tar is not None: + log.info("Unpacking TAR: %s", test_dir) + tar.extractall(path=test_dir) + tar.close() + return 0 + + def copy_to_eos(self, index, test_name, test_dir, dst_dir): + """Copy to eos.""" + log = logging.getLogger(MODULE) + dst_target = os.path.join(dst_dir, test_name) + if dst_target.startswith('/eos'): + # mkdir_cmd = 'eos ' + ArtGrid.EOS_MGM_URL + ' mkdir -p' + mkdir_cmd = None + xrdcp_target = ArtGrid.EOS_MGM_URL + dst_target + '/' + else: + mkdir_cmd = 'mkdir -p' + xrdcp_target = dst_target + log.info("Copying to DST: %d %s", index, xrdcp_target) + + if mkdir_cmd is not None: + (exit_code, out, err) = run_command(' '.join((mkdir_cmd, dst_target))) + if exit_code != 0: + log.error("Mkdir Error: %d %s %s", exit_code, out, err) + return 1 + + cmd = ' '.join(('xrdcp -N -r -p -v', test_dir, xrdcp_target)) + log.info("using: %s", cmd) + (exit_code, out, err) = run_command(cmd) + if exit_code not in [0, 50, 51, 54]: + # 0 all is ok + # 50 File exists + # 51 File exists + # 54 is already copied + log.error("XRDCP to EOS Error: %d %s %s", exit_code, out, err) + return 1 + return 0 + def task_package(self, root, package, job_type, sequence_tag, no_action, config_file): """Submit a single package.""" log = logging.getLogger(MODULE) @@ -299,66 +327,102 @@ class ArtGrid(ArtBase): def task_list(self, job_type, sequence_tag, package=None, no_action=False, wait_and_copy=True, config_file=None): """Submit a list of packages.""" log = logging.getLogger(MODULE) - # job will be submitted from tmp directory - self.submit_directory = tempfile.mkdtemp(dir='.') - # make sure tmp is removed afterwards - atexit.register(shutil.rmtree, self.submit_directory) + test_copy = False - # make sure script directory exist - self.exit_if_no_script_directory() + if test_copy: + all_results = {} + all_results[0] = ('TrigAnalysisTest', "xxx", "yyy", 0) - # get the test_*.sh from the test directory - test_directories = self.get_test_directories(self.get_script_directory()) - if not test_directories: - log.warning('No tests found in directories ending in "test"') + else: + # job will be submitted from tmp directory + self.submit_directory = tempfile.mkdtemp(dir='.') - configuration = None if self.skip_setup else ArtConfiguration(config_file) + # make sure tmp is removed afterwards + atexit.register(shutil.rmtree, self.submit_directory, ignore_errors=True) - all_results = {} + # make sure script directory exist + self.exit_if_no_script_directory() - if package is None: - # submit tasks for all packages - for package, root in test_directories.items(): - if configuration is not None and configuration.get(self.nightly_release, self.project, self.platform, package, 'exclude', False): - log.warning("Package %s is excluded", package) - else: - all_results.update(self.task_package(root, package, job_type, sequence_tag, no_action, config_file)) - else: - # Submit single package - root = test_directories[package] - all_results.update(self.task_package(root, package, job_type, sequence_tag, no_action, config_file)) + # get the test_*.sh from the test directory + test_directories = self.get_test_directories(self.get_script_directory()) + if not test_directories: + log.warning('No tests found in directories ending in "test"') - if no_action: - log.info("--no-action specified, so not waiting for results") - return 0 + configuration = None if self.skip_setup else ArtConfiguration(config_file) - if len(all_results) == 0: - log.warning('No tests found, nothing to submit.') - return 0 + all_results = {} + + if package is None: + # submit tasks for all packages + for package, root in test_directories.items(): + if configuration is not None and configuration.get(self.nightly_release, self.project, self.platform, package, 'exclude', False): + log.warning("Package %s is excluded", package) + else: + all_results.update(self.task_package(root, package, job_type, sequence_tag, no_action, config_file)) + else: + # Submit single package + root = test_directories[package] + all_results.update(self.task_package(root, package, job_type, sequence_tag, no_action, config_file)) + + if no_action: + log.info("--no-action specified, so not waiting for results") + return 0 + + if len(all_results) == 0: + log.warning('No tests found, nothing to submit.') + return 0 # wait for all results if wait_and_copy: configuration = ArtConfiguration(config_file) + + log.info("Executor started with %d threads", self.max_jobs) + executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.max_jobs) + future_set = [] + while len(all_results) > 0: - time.sleep(ArtGrid.RESULT_WAIT_INTERVAL) + log.debug("No of Results %d", len(all_results)) + log.debug("Waiting...") + if not test_copy: + time.sleep(ArtGrid.RESULT_WAIT_INTERVAL) + log.debug("Done Waiting") + # force a copy of all_results since we are modifying all_results for jedi_id in list(all_results): package = all_results[jedi_id][0] # skip packages without copy if not configuration.get(self.nightly_release, self.project, self.platform, package, "copy"): + log.info("Copy not configured - skipped") del all_results[jedi_id] continue + log.debug("Checking package %s for %s", package, str(jedi_id)) status = self.task_status(jedi_id) if status is not None: log.info("JediID %s finished with status %s", str(jedi_id), status) if status in ['finished', 'done']: + # job_name = all_results[jedi_id][1] + # outfile = all_results[jedi_id][2] + index = all_results[jedi_id][3] dst = configuration.get(self.nightly_release, self.project, self.platform, package, "dst", ArtGrid.EOS_OUTPUT_DIR) - log.info("Copy %s to %s", package, dst) - self.copy(package, dst) + indexed_package = package + ('.' + str(index) if index > 0 else '') + log.info("Copy %s to %s", indexed_package, dst) + future_set.append(executor.submit(copy_job, self.art_directory, indexed_package, dst)) del all_results[jedi_id] + # wait for all copy jobs to finish + log.info("Waiting for copy jobs to finish...") + for future in concurrent.futures.as_completed(future_set): + (indexed_package, exit_code, out, err, start_time, end_time) = future.result() + if exit_code == 0: + log.info("Copied %s exit_code: %d", indexed_package, exit_code) + log.info(" starting %s until %s", start_time.strftime('%Y-%m-%dT%H:%M:%S'), end_time.strftime('%Y-%m-%dT%H:%M:%S')) + else: + log.error("Failed to copy: %s exit_code: %d", indexed_package, exit_code) + print err + print out + return 0 def task_status(self, jedi_id): @@ -374,14 +438,18 @@ class ArtGrid(ArtBase): return "done" try: - r = urllib2.urlopen('https://bigpanda.cern.ch/task/' + str(jedi_id) + '?json=true') + url = 'https://bigpanda.cern.ch/task/' + str(jedi_id) + '?json=true' + r = urllib2.urlopen(url) s = json.load(r) - status = s['task']['superstatus'] - if status in ["done", "finished", "failed", "aborted", "broken"]: - log.info("Task: %s %s", str(jedi_id), str(status)) - return status + if (s is not None) and ('task' in s): + task = s['task'] + if (task is not None) and ('status' in task): + status = task['status'] + if status in ["done", "finished", "failed", "aborted", "broken"]: + log.info("Task: %s %s", str(jedi_id), str(status)) + return status except urllib2.HTTPError, e: - log.error('%s for %s status', str(e.code), str(jedi_id)) + log.error('%s for %s status: %s', str(e.code), str(jedi_id), url) return None def task_job(self, grid_options, sub_cmd, script_directory, sequence_tag, package, outfile, job_type='', number_of_tests=0, split=0, job_name='', inds='', n_files=0, in_file=False, no_action=False): @@ -390,14 +458,13 @@ class ArtGrid(ArtBase): Returns jedi_id or 0 if submission failed. - # art-task-grid.sh [--no-action --skip-setup] batch <submit_directory> <script_directory> <sequence_tag> <package> <outfile> <job_type> <number_of_tests> + # art-task-grid.sh [--no-action] batch <submit_directory> <script_directory> <sequence_tag> <package> <outfile> <job_type> <number_of_tests> # - # art-task-grid.sh [--no-action --skip-setup] single [--inds <input_file> --n-files <number_of_files> --split <split> --in] <submit_directory> <script_directory> <sequence_tag> <package> <outfile> <job_name> + # art-task-grid.sh [--no-action] single [--inds <input_file> --n-files <number_of_files> --split <split> --in] <submit_directory> <script_directory> <sequence_tag> <package> <outfile> <job_name> """ log = logging.getLogger(MODULE) cmd = ' '.join((os.path.join(self.art_directory, 'art-task-grid.sh'), '--no-action' if no_action else '', - '--skip-setup' if self.skip_setup else '', sub_cmd)) if sub_cmd == 'single': @@ -428,7 +495,6 @@ class ArtGrid(ArtBase): log.info("cmd: %s", cmd) # run task from Bash Script as is needed in ATLAS setup - # FIXME we need to parse the output log.info("Grid_options: %s", grid_options) env = os.environ.copy() env['PATH'] = '.:' + env['PATH'] @@ -478,7 +544,7 @@ class ArtGrid(ArtBase): number_of_batch_tests = len(self.get_files(test_directory, job_type, "batch", self.nightly_release, self.project, self.platform)) user = os.getenv('USER', 'artprod') if self.skip_setup else ArtGrid.ARTPROD - outfile = self.rucio.get_outfile(user, package, self.get_nightly_release_short(), self.project, self.platform, self.nightly_tag, sequence_tag) + outfile = self.rucio.get_outfile_name(user, package, sequence_tag) result = {} @@ -490,7 +556,7 @@ class ArtGrid(ArtBase): log.info("Batch") jedi_id = self.task_job(grid_options, "batch", script_directory, sequence_tag, package, outfile, job_type=job_type, number_of_tests=number_of_batch_tests, no_action=no_action) if jedi_id > 0: - result[jedi_id] = (package, "", outfile) + result[jedi_id] = (package, "", outfile, 0) # submit single tests index = 1 @@ -501,7 +567,7 @@ class ArtGrid(ArtBase): n_files = header.get(ArtHeader.ART_INPUT_NFILES) split = header.get(ArtHeader.ART_INPUT_SPLIT) - outfile_test = self.rucio.get_outfile(user, package, self.get_nightly_release_short(), self.project, self.platform, self.nightly_tag, sequence_tag, str(index)) + outfile_test = self.rucio.get_outfile_name(user, package, sequence_tag, str(index)) self.exit_if_outfile_too_long(outfile_test) # Single @@ -509,7 +575,7 @@ class ArtGrid(ArtBase): jedi_id = self.task_job(grid_options, "single", script_directory, sequence_tag, package, outfile_test, split=split, job_name=job_name, inds=inds, n_files=n_files, in_file=True, no_action=no_action) if jedi_id > 0: - result[jedi_id] = (package, job_name, outfile_test) + result[jedi_id] = (package, job_name, outfile_test, index) index += 1 @@ -526,7 +592,7 @@ class ArtGrid(ArtBase): test_list = self.get_files(test_directory, job_type, "batch", self.nightly_release, self.project, self.platform) - # FIXME ??? minus one for grid + # NOTE: grid counts from 1 index = int(job_index) job_name = test_list[index - 1] @@ -548,7 +614,7 @@ class ArtGrid(ArtBase): return self.job(test_directory, package, job_name, job_type, out, in_file) def job(self, test_directory, package, job_name, job_type, out, in_file): - """Run a single job.""" + """Run a job.""" log = logging.getLogger(MODULE) log.info("art-job-name: %s", job_name) test_file = os.path.join(test_directory, job_name) @@ -633,22 +699,23 @@ class ArtGrid(ArtBase): # Always return 0 return 0 - def list(self, package, job_type, index_type, json_format, user, nogrid): + def list(self, package, job_type, index_type, json_format, user): """List all jobs available.""" - log = logging.getLogger(MODULE) user = ArtGrid.ARTPROD if user is None else user # make sure script directory exist self.exit_if_no_script_directory() - log.info("Getting test names...") - test_names = self.get_list(self.get_script_directory(), package, job_type, index_type) json_array = [] - for test_name in test_names: - job_name = os.path.splitext(test_name)[0] + for entry in self.rucio.get_table(user, package): + # print entry json_array.append({ - 'name': job_name, - 'grid_index': str(self.rucio.get_index(user, '*', package, job_name, self.get_nightly_release_short(), self.project, self.platform, self.nightly_tag)) if not nogrid else '-1' + 'name': entry['job_name'], + 'grid_index': entry['grid_index'], + 'job_index': entry['job_index'], + 'single_index': entry['single_index'], + 'file_index': entry['file_index'], + 'outfile': entry['outfile'] }) if json_format: @@ -656,16 +723,27 @@ class ArtGrid(ArtBase): return 0 i = 0 + print "Example FileName: user.artprod.atlas.21.0.Athena.x86_64-slc6-gcc62-opt.2018-02-25T2154.314889.TrigInDetValidation.<Single>" + print "Example OutputName: user.artprod.<Job>.EXT1._<Grid>.tar.<File>" + print + print '{:-^5}'.format('Index'), \ + '{:-^60}'.format('Name'), \ + '{:-^6}'.format('Grid'), \ + '{:-^9}'.format('Job'), \ + '{:-^6}'.format('Single'), \ + '{:-^4}'.format('File'), \ + '{:-^80}'.format('FileName') + for entry in json_array: - print str(i) + ' ' + entry['name'] + (' ' + entry['grid_index']) + print '{:5d}'.format(i), \ + '{:60}'.format('None' if entry['name'] is None else entry['name']), \ + '{:06d}'.format(entry['grid_index']), \ + '{:9d}'.format(entry['job_index']), \ + '{:6d}'.format(entry['single_index']), \ + '{:4d}'.format(entry['file_index']), \ + '{:80}'.format(entry['outfile']) i += 1 - # print warnings - if not nogrid: - for entry in json_array: - if entry['grid_index'] < 0: - log.warning('test %s could not be found in json or log', entry['name']) - return 0 def log(self, package, test_name, user): @@ -676,7 +754,7 @@ class ArtGrid(ArtBase): # make sure script directory exist self.exit_if_no_script_directory() - tar = self.open_tar(user, package, test_name, ArtRucio.LOG) + tar = self.__open_tar(user, package, test_name, tar=False) if tar is None: log.error("No log tar file found") return 1 @@ -691,18 +769,22 @@ class ArtGrid(ArtBase): return 0 def output(self, package, test_name, user): - """Download the putput of a job.""" + """Download the output of a job.""" log = logging.getLogger(MODULE) user = ArtGrid.ARTPROD if user is None else user # make sure script directory exist self.exit_if_no_script_directory() - outfile = self.rucio.get_outfile(user, package, self.get_nightly_release_short(), self.project, self.platform, self.nightly_tag) - tar_dir = os.path.join(tempfile.gettempdir(), outfile + ArtRucio.OUTPUT) + outfile = self.rucio.get_outfiles(user, package)[0] + if not outfile.endswith(package): + # remove .13 + outfile = os.path.splitext(outfile)[0] + job_name = os.path.splitext(test_name)[0] + tar_dir = os.path.join(tempfile.gettempdir(), outfile, job_name) mkdir_p(tar_dir) - tar = self.open_tar(user, package, test_name, ArtRucio.OUTPUT) + tar = self.__open_tar(user, package, test_name) if tar is None: log.error("No output tar file found") return 1 @@ -710,9 +792,10 @@ class ArtGrid(ArtBase): tar.extractall(path=tar_dir) tar.close() print "Output extracted in", tar_dir + return 0 - def compare(self, package, test_name, days, user, entries=-1): + def compare(self, package, test_name, days, user, entries=-1, shell=False): """Compare current output against a job of certain days ago.""" log = logging.getLogger(MODULE) user = ArtGrid.ARTPROD if user is None else user @@ -727,7 +810,8 @@ class ArtGrid(ArtBase): ref_dir = os.path.join('.', 'ref-' + previous_nightly_tag) mkdir_p(ref_dir) - tar = self.open_tar(user, package, test_name, ArtRucio.OUTPUT, previous_nightly_tag) + log.info("Shell = %s", shell) + tar = self.__open_tar(user, package, test_name, nightly_tag=previous_nightly_tag, shell=shell) if tar is None: log.error("No comparison tar file found") return 1 @@ -738,38 +822,46 @@ class ArtGrid(ArtBase): return self.compare_ref('.', ref_dir, entries) - def open_tar(self, user, package, test_name, extension, nightly_tag=None): + def __open_tar(self, user, package, test_name, tar=True, nightly_tag=None, shell=False): """Open tar file for particular release.""" log = logging.getLogger(MODULE) + log.info("Tar: %s", tar) + nightly_tag = self.nightly_tag if nightly_tag is None else nightly_tag job_name = os.path.splitext(test_name)[0] - if nightly_tag is None: - nightly_tag = self.nightly_tag - grid_index = self.rucio.get_index(user, '*', package, job_name, self.get_nightly_release_short(), self.project, self.platform, nightly_tag) - if grid_index < 0: - log.error("No log or tar found for package %s or test %s", package, test_name) - return None + for entry in self.rucio.get_table(user, package, nightly_tag, shell): + if entry['job_name'] == job_name: - log.info("Grid Index: %d", grid_index) + rucio_name = self.__get_rucio_name(user, entry, 'tar' if tar else 'log') - outfile = self.rucio.get_outfile(user, package, self.get_nightly_release_short(), self.project, self.platform, nightly_tag) + log.info("RUCIO: %s", rucio_name) - rucio_name = self.rucio.get_rucio_name(user, outfile + extension, grid_index) - if rucio_name is None: - log.error("No rucio_name for %d", grid_index) - return None - log.info("RUCIO: %s", rucio_name) + # tmp_dir = tempfile.gettempdir() + tmp_dir = tempfile.mkdtemp() + atexit.register(shutil.rmtree, tmp_dir, ignore_errors=True) - tmp_dir = tempfile.mkdtemp() - atexit.register(shutil.rmtree, tmp_dir) + log.info("Shell = %s", shell) + exit_code = self.rucio.download(rucio_name, tmp_dir, shell) + if exit_code == 0: + tmp_tar = os.path.join(tmp_dir, 'user.' + user, rucio_name) + return tarfile.open(tmp_tar) + + log.error("No log or tar found for package %s or test %s", package, test_name) + return None - tmp_tar = os.path.join(tmp_dir, os.path.basename(rucio_name)) + def __get_rucio_name(self, user, entry, file_type): + rucio_name = None + if file_type == 'json': + rucio_name = '.'.join(('user', user, str(entry['job_index']), 'EXT0', '_{0:06d}'.format(entry['grid_index']), 'art-job', 'json')) + elif file_type == 'tar': + rucio_name = '.'.join(('user', user, str(entry['job_index']), 'EXT1', '_{0:06d}'.format(entry['grid_index']), 'tar')) + else: + rucio_name = '.'.join((entry['outfile'], 'log', str(entry['job_index']), '{0:06d}'.format(entry['grid_index']), 'log.tgz')) - if self.rucio.xrdcp(rucio_name, tmp_tar) != 0: - log.error("TAR Error: %s", rucio_name) - return None + if entry['file_index'] > 0: + rucio_name = '.'.join((rucio_name, str(entry['file_index']))) - return tarfile.open(tmp_tar) + return rucio_name def get_previous_nightly_tag(self, days): """ @@ -788,11 +880,11 @@ class ArtGrid(ArtBase): elif found: # check this is within days... (cutoff is 21:00, just move by 3 hours to get full days) fmt = '%Y-%m-%dT%H%M' - offset = datetime.timedelta(hours=3) - nightly_tag_dt = datetime.datetime.strptime(self.nightly_tag, fmt) + offset - from_dt = nightly_tag_dt.replace(hour=0, minute=0, second=0, microsecond=0) - datetime.timedelta(days=days) - to_dt = from_dt + datetime.timedelta(days=1) - tag_dt = datetime.datetime.strptime(tag, fmt) + offset + offset = timedelta(hours=3) + nightly_tag_dt = datetime.strptime(self.nightly_tag, fmt) + offset + from_dt = nightly_tag_dt.replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=days) + to_dt = from_dt + timedelta(days=1) + tag_dt = datetime.strptime(tag, fmt) + offset within_days = from_dt <= tag_dt and tag_dt < to_dt target_exists = len(glob.glob(os.path.join(directory, tag, self.project, '*', 'InstallArea', self.platform))) > 0 if within_days and target_exists: diff --git a/Tools/ART/python/ART/art_header.py b/Tools/ART/python/ART/art_header.py index 61346f2cf2cc6205b27a4410dbc501f7629921e2..516af0f6308e7a3aff0e984b18b8984b15c0f275 100644 --- a/Tools/ART/python/ART/art_header.py +++ b/Tools/ART/python/ART/art_header.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +# Copyright (C) 2002-2018 CERN for the benefit of the ATLAS collaboration """Class to handle art-headers.""" __author__ = "Tulay Cuhadar Donszelmann <tcuhadar@cern.ch>" diff --git a/Tools/ART/python/ART/art_misc.py b/Tools/ART/python/ART/art_misc.py index 13661e7c94885f6b43a7060e79aa3db30415222b..0b844ea0cb9b1a183e2fae9e0364340554308982 100644 --- a/Tools/ART/python/ART/art_misc.py +++ b/Tools/ART/python/ART/art_misc.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +# Copyright (C) 2002-2018 CERN for the benefit of the ATLAS collaboration """Miscellaneous functions.""" __author__ = "Tulay Cuhadar Donszelmann <tcuhadar@cern.ch>" diff --git a/Tools/ART/python/ART/art_rucio.py b/Tools/ART/python/ART/art_rucio.py index 4cfcdef9e5d86678a1ad32230126eeb6a6fe3a4b..9b8a5275cde02ef7d43786b037f18ce0bd8f9fa3 100755 --- a/Tools/ART/python/ART/art_rucio.py +++ b/Tools/ART/python/ART/art_rucio.py @@ -1,16 +1,13 @@ #!/usr/bin/env python -# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +# Copyright (C) 2002-2018 CERN for the benefit of the ATLAS collaboration """Class to interact with RUCIO.""" __author__ = "Tulay Cuhadar Donszelmann <tcuhadar@cern.ch>" -import atexit import json import logging import os import re -import shutil -import tarfile import tempfile try: @@ -29,15 +26,17 @@ class ArtRucio(object): ART_JOB = 'art-job.json' ATHENA_STDOUT = 'athena_stdout.txt' - JOB_TAR = 'job.tar' JSON = '_EXT0' - LOG = '.log' - LOG_TGZ = 'log.tgz' - OUTPUT = '_EXT1' - def __init__(self): + def __init__(self, art_directory, nightly_release, project, platform, nightly_tag): """Keep arguments.""" - pass + self.art_directory = art_directory + self.nightly_release = nightly_release + self.project = project + self.platform = platform + self.nightly_tag = nightly_tag + + self.table = None def exit_if_no_rucio(self): """Exit if RUCIO is not available.""" @@ -50,13 +49,21 @@ class ArtRucio(object): """Return scope.""" return '.'.join(('user', user)) - def download(self, did, dst_dir): + def download(self, did, dst_dir, shell=False): """Download did into temp directory.""" log = logging.getLogger(MODULE) self.exit_if_no_rucio() # rucio downloads cache properly - (exit_code, out, err) = run_command("rucio download --dir " + dst_dir + " " + did) + log.info("Shell = %s", shell) + env = os.environ.copy() + if shell: + cmd = ' '.join((os.path.join(self.art_directory, 'art-download.sh'), did, dst_dir)) + env['PATH'] = '.:' + env['PATH'] + else: + cmd = ' '.join(('rucio', 'download', '--dir', dst_dir, did)) + + (exit_code, out, err) = run_command(cmd, env=env) if (exit_code != 0): log.error(err) log.info(out) @@ -76,142 +83,180 @@ class ArtRucio(object): # log.info(out) return exit_code - def get_outfile(self, user, package, nightly_release, project, platform, nightly_tag, sequence_tag='*', test_name=None): - """Create outfile from parameters.""" - if nightly_tag is None: - nightly_tag = self.nightly_tag - - outfile = '.'.join(('user', user, 'atlas', nightly_release, project, platform, nightly_tag, sequence_tag, package)) - if sequence_tag == '*': - self.exit_if_no_rucio() - rucio_client = rucio.client.Client() - for out in rucio_client.list_dids(self.get_scope(user), {'name': '.'.join((outfile, 'log'))}): - outfile = os.path.splitext(out)[0] + def __parse_outfile(self, outfile): + """Parse outfile and return tuple (sequence_tag, single_index) or None.""" + # + # Matching: user.artprod.atlas.master.Athena.x86_64-slc6-gcc62-opt.2018-01-21T2301.284099.MuonRecRTT.6.log.13062437.000001.log.tgz + # user.artprod.atlas.master.Athena.x86_64-slc6-gcc62-opt.2018-01-21T2301.284099.MuonRecRTT.6 + # user.artprod.atlas.master.Athena.x86_64-slc6-gcc62-opt.2018-01-19T2301.283573.TrigAnalysisTest + # + PATTERN = r"user\.([^\.]+)\.([^\.]+)\." + self.nightly_release + "\." + self.project + "\." + self.platform + "\." + self.nightly_tag + "\.(.+)" + match = re.search(PATTERN, outfile) + if not match: + return None + + (user, experiment, rest) = match.groups() + + items = rest.split(".") + sequence_tag = items[0] if len(items) > 0 else -1 + try: + single_index = int(items[2]) if len(items) > 2 else -1 + except ValueError: + single_index = -1 + + if single_index < 0: + grid_index = int(items[4]) if len(items) > 4 else -1 + else: + grid_index = int(items[5]) if len(items) > 5 else -1 + + # print outfile, sequence_tag, single_index, grid_index + + return (sequence_tag, single_index, grid_index) + + def get_sequence_tag(self, outfile): + """Return sequence tag or None.""" + result = self.__parse_outfile(outfile) + return result[0] if result is not None else None + + def get_single_index(self, outfile): + """Return single index or -1.""" + result = self.__parse_outfile(outfile) + return result[1] if result is not None else -1 + + def get_grid_index(self, outfile): + """Return frid index or -1.""" + result = self.__parse_outfile(outfile) + return result[2] if result is not None else -1 + + def get_outfile_name(self, user, package, sequence_tag, test_name=None, nightly_tag=None): + """Create outfile name based on parameters.""" + nightly_tag = self.nightly_tag if nightly_tag is None else nightly_tag + outfile = '.'.join(('user', user, 'atlas', self.nightly_release, self.project, self.platform, nightly_tag, sequence_tag, package)) return outfile if test_name is None else '.'.join((outfile, test_name)) - # private - def get_rucio_map(self, user, outfile): - """Return map of entries by grid_index into { source, rucio_name }.""" + def get_outfiles(self, user, package, nightly_tag=None): + """ + Create list of outfiles from parameters. + + example: ['user.artprod.atlas.master.Athena.x86_64-slc6-gcc62-opt.2018-01-21T2301.284099.MuonRecRTT.3'] + """ log = logging.getLogger(MODULE) - log.debug("Looking for %s", outfile) + nightly_tag = self.nightly_tag if nightly_tag is None else nightly_tag + self.exit_if_no_rucio() + rucio_client = rucio.client.Client() - CERN = 'CERN-PROD_SCRATCHDISK' + result = [] - LOG_PATTERN = r"\.(\d{6})\.log\.tgz" - JSON_PATTERN = r"\._(\d{6})\.art-job\.json" - OUTPUT_PATTERN = r"\._(\d{6})\.tar" - table = {} - rucio_client = rucio.client.Client() - for rep in rucio_client.list_replicas([{'scope': self.get_scope(user), 'name': outfile}], schemes=['root']): - source = None - rucio_name = None - log.debug("Found in %s", rep['states'].keys()) - # first look at CERN - if CERN in rep['states'].keys() and rep['states'][CERN] == 'AVAILABLE': - source = CERN - rucio_name = rep['rses'][CERN][0] - else: - for rse in rep['states'].keys(): - if rep['states'][rse] == 'AVAILABLE' and len(rep['rses'][rse]) >= 1: - source = rse - rucio_name = rep['rses'][rse][0] - break - - # maybe not found at all - if rucio_name is not None: - log.debug("Found rucio name %s in %s", rucio_name, source) - pattern = JSON_PATTERN if outfile.endswith(ArtRucio.JSON) else LOG_PATTERN if outfile.endswith(ArtRucio.LOG) else OUTPUT_PATTERN - match = re.search(pattern, rucio_name) - if match: - number = int(match.group(1)) - else: - log.warning("%s does not contain test number using pattern %s skipped...", rucio_name, pattern) - continue - - table[number] = {'source': source, 'rucio_name': rucio_name} - - if not table: - log.warning("Outfile %s not found or empty", outfile) - return table + # look for "batch" outfile, and take latest (by sequence tag) + pattern = self.get_outfile_name(user, package, '*', None, nightly_tag) + outfile = None + sequence = None + for out in rucio_client.list_dids(self.get_scope(user), {'name': '.'.join((pattern, 'log'))}): + sequence_tag = self.get_sequence_tag(out) + if sequence is None or sequence_tag > sequence: + outfile = os.path.splitext(out)[0] + sequence = sequence_tag + + if outfile is not None: + log.debug("Adding 'batch': %s", outfile) + result.append(outfile) + + # look for "single" outfile, deduce sequence_tag + pattern = self.get_outfile_name(user, package, '*', '*', nightly_tag) + log.debug("Trying pattern %s", pattern) + outfile = None + sequence = None + for out in rucio_client.list_dids(self.get_scope(user), {'name': '.'.join((pattern, 'log'))}): + sequence_tag = self.get_sequence_tag(out) + if sequence is None or sequence_tag > sequence: + outfile = os.path.splitext(out)[0] + sequence = sequence_tag + + if outfile is not None: + log.debug("Found %s", outfile) + sequence_tag = self.get_sequence_tag(outfile) + if sequence_tag is not None: + # found sequence_tag, find all 'single' outfiles + pattern = self.get_outfile_name(user, package, sequence_tag, '*', nightly_tag) + for out in rucio_client.list_dids(self.get_scope(user), {'name': '.'.join((pattern, 'log'))}): + outfile = os.path.splitext(out)[0] + log.debug("Adding 'single': %s", outfile) + result.append(outfile) - def get_index_map(self, user, sequence_tag, package, nightly_release, project, platform, nightly_tag): - """Return grid map of job_name to index.""" - outfile = self.get_outfile(user, package, nightly_release, project, platform, nightly_tag, sequence_tag) + return result - # if outfile in self.index_map_cache: - # return self.index_map_cache[outfile] + def get_table(self, user, package, nightly_tag=None, shell=False): + """Get full table with grid_index, single_index and test_name for particular package and nightly_tag.""" + log = logging.getLogger(MODULE) - result = {} - for index in self.get_indices(user, outfile + ArtRucio.LOG): - test_name = self.get_job_name(user, index, package, sequence_tag, nightly_release, project, platform, nightly_tag) - if test_name is None: - # log.warning("JSON Lookup failed for test %s", rucio_log_name if rucio_name is None else rucio_name) - continue + if self.table is not None: + return self.table - result[test_name] = int(index) + self.exit_if_no_rucio() - # self.index_map_cache[outfile] = result - return result + table = [] - def get_rucio_name(self, user, outfile, index): - """Return rucio name for given outfile and index.""" - rucio_map = self.get_rucio_map(user, outfile) - return rucio_map[index]['rucio_name'] if index in rucio_map else None + nightly_tag = self.nightly_tag if nightly_tag is None else nightly_tag - def get_indices(self, user, outfile): - """Return list of indices.""" - return self.get_rucio_map(user, outfile).keys() + outfiles = self.get_outfiles(user, package, nightly_tag) - def get_job_name(self, user, index, package, sequence_tag, nightly_release, project, platform, nightly_tag): - """ - Return job name for index. + outfiles_str = [x + ArtRucio.JSON for x in outfiles] + outfiles_str = ' '.join(outfiles_str) - job_name is without .sh or .py - """ - log = logging.getLogger(MODULE) - self.exit_if_no_rucio() + tmp_dir = tempfile.gettempdir() + dst_dir = tmp_dir + + log.info("Shell = %s", shell) + exit_code = self.download(outfiles_str, dst_dir, shell) + if exit_code != 0: + log.error("Failed to execute rucio download %d", exit_code) + return table + + for outfile in outfiles: + single_index = self.get_single_index(outfile) + + json_directory = os.path.join(dst_dir, outfile + ArtRucio.JSON) + if not os.path.isdir(json_directory): + # print single_index, rucio_name + table.append({ + 'single_index': single_index, + 'grid_index': -1, + 'file_index': -1, + 'job_index': -1, + 'outfile': outfile, + 'job_name': None + }) + continue - outfile = self.get_outfile(user, package, nightly_release, project, platform, nightly_tag, sequence_tag) - log.debug("outfile %s", outfile) - - tmp_dir = tempfile.mkdtemp() - atexit.register(shutil.rmtree, tmp_dir) - - tmp_json = os.path.join(tmp_dir, ArtRucio.ART_JOB) - rucio_name = self.get_rucio_name(user, outfile + ArtRucio.JSON, index) - if self.xrdcp(rucio_name, tmp_json, force=True) == 0: - log.debug("copied json %s", rucio_name) - with open(tmp_json) as json_file: - info = json.load(json_file) - job_name = os.path.splitext(info['name'])[0] - return job_name - - tmp_log = os.path.join(tmp_dir, ArtRucio.LOG_TGZ) - rucio_log_name = self.get_rucio_name(user, outfile + ArtRucio.LOG, index) - if self.xrdcp(rucio_log_name, tmp_log, force=True) == 0: - log.debug("copied log %s %s", rucio_log_name, tmp_log) - tar = tarfile.open(tmp_log) - for name in tar.getnames(): - if ArtRucio.ATHENA_STDOUT in name: - log.debug("Found %s", ArtRucio.ATHENA_STDOUT) - info = tar.extractfile(name).read() - # try art-job-name - match = re.search(r"art-job-name:\s(\S+)", info) - if match: - log.debug("Found 'art-job-name'") - return os.path.splitext(match.group(1))[0] - - # try Job Name - match = re.search(r"Job Name:\s(\S+)", info) - if match: - log.debug("Found 'Job Name:'") - return os.path.splitext(match.group(1))[0] - - log.debug("Cannot retrieve job_name from art-job.json or logfile") - return None - - def get_index(self, user, sequence_tag, package, job_name, nightly_release, project, platform, nightly_tag): - """Return index for job_name.""" - index_map = self.get_index_map(user, sequence_tag, package, nightly_release, project, platform, nightly_tag) - return index_map[job_name] if job_name in index_map else -1 + for json_file in os.listdir(json_directory): + json_path = os.path.join(json_directory, json_file) + if os.path.isfile(json_path): + with open(json_path) as json_fd: + info = json.load(json_fd) + job_name = os.path.splitext(info['name'])[0] + + # Match: user.artprod.13199077.EXT0._000002.art-job.json + # Match: user.artprod.13199077.EXT0._000003.art-job.json.4 + # job_index = 13199077, grid_index = 3, file_index = 4 + match = re.search(r"user\.([^\.]+)\.(\d+)\.EXT0\._(\d+)\.art-job.json(?:\.(\d+))?", json_file) + if match: + job_index = int(match.group(2)) + grid_index = int(match.group(3)) + file_index = -1 if match.group(4) is None else int(match.group(4)) + else: + job_index = -1 + grid_index = -1 + file_index = -1 + + table.append({ + 'single_index': single_index, + 'grid_index': grid_index, + 'file_index': file_index, + 'job_index': job_index, + 'outfile': outfile, + 'job_name': job_name + }) + + self.table = table + return table diff --git a/Tools/ART/python/ART/parallelScheduler.py b/Tools/ART/python/ART/parallelScheduler.py deleted file mode 100644 index 42a1d1cfc42a5f7f7a4e1a2c8cea3c05d36293c7..0000000000000000000000000000000000000000 --- a/Tools/ART/python/ART/parallelScheduler.py +++ /dev/null @@ -1,215 +0,0 @@ -''' -Created on 16/05/2012 - - * Repository : https://github.com/victor-gil-sepulveda/pyScheduler - * Licensed under the MIT license (see LICENSE-MIT) - * Copyright (C) 2013 Victor Alejandro Gil Sepulveda - -@author: victor -''' -import multiprocessing -from serialScheduler import SerialScheduler -import sys - -def printnflush(*args): - """ - Prints and flushes the things passes as arguments. - @param args: The data we want to print. - """ - if False: - print args - sys.stdout.flush() - -def run_task(process_name, tasks, pipe_end): - """ - Helper function to run tasks inside a process. It implements an infinite loop controlled by the messages - received from 'pipe_end'. - Messages from the pipe are (message_type, value) tuples. Thsi is the currently implemented protocol: - - "EXECUTE": Runs the task with id == value. - -> Sends a "TASK FINISHED" message with value = (task_id, task_result) - - "FINISH": Ends the loop so that process can end and free its resources. - @param process_name: Unique id of the process executing this function. - @param tasks: The dictionary of all tasks (we want to execute in this scheduling) indexed by their id . - @param pipe_end: A process pipe used to send/receive messages from/to the master. - """ - task_ended = False - try: - while not task_ended: - # Blocks until it receives a message - message_type, value = pipe_end.recv() - - if message_type == "EXECUTE": - result = tasks[value].run() - pipe_end.send(("TASK FINISHED", (value, result))) - - elif message_type == "FINISH": - printnflush( "Communication successfully closed for",process_name) - task_ended = True - else: - printnflush("Unexpected message: %s"%message_type) - task_ended = True - - except EOFError: - printnflush("Communication closed due to remote closing of the pipe in process %s"%process_name) - - except Exception, msg: - printnflush("Communication closed due to unexpected exception: %s"%msg) - - pipe_end.close() - printnflush( "Task reached end") - -class TaskRunner(object): - """ - Helper class that encapsulates a process used to execute a subset of the tasks list. - """ - def __init__(self, process_name, target_function, tasks): - """ - Creates the process that will be in charge of executing the tasks and a pipe to communicate - with the main process. - @param process_name: Unique id for this task executor. - @param target_function: Is the function the process will execute. In the case of ProcessParallelScheduler - the function used is 'run_task', however it can use any function that receives the same parameters that - 'run_task' needs. - @param tasks: The dictionary of all tasks. - """ - self.pipe_start, self.pipe_end = multiprocessing.Pipe() - printnflush ("Process started: %s"%process_name) - self.process = multiprocessing.Process(group=None, - target=target_function, - name=process_name, - args = (process_name, tasks, self.pipe_end)) - self.busy = False - - def run(self): - """ - Starts the inner process (and therefore the defined function that is going to be used to control the - messages). - """ - self.process.start() - - def execute_task(self, task_name): - """ - Sends the process an "EXECUTE" task message to run the task named 'task_name'. - @param task_name: Name of the task to be executed. - """ - self.busy = True - self.pipe_start.send(("EXECUTE",task_name)) - - def set_task_finished(self): - """ - Sets the 'busy' flag in order to mark this task executor as busy (its associated process is - performing a task) - """ - self.busy = False - - def finalize(self): - """ - Sends a finalization message (forces the associated process to break the loop and end)- - """ - self.busy = False - self.pipe_start.send(("FINISH",None)) - self.process.join() - if self.process.is_alive(): - self.process.terminate() - - def has_an_incomming_message(self): - """ - True if this task runner has received a message from its associated process. - """ - return self.pipe_start.poll(1) - - def get_message(self): - """ - Returns the message the associated process sent (using the 'run_task' function it can only be a - "TASK FINISHED" message) - """ - return self.pipe_start.recv() - -class ParallelScheduler(SerialScheduler): - """ - Scheduler type that works by creating a limited number of processes and distributing the tasks between them. - """ - - def __init__(self, max_processes, functions = {}): - """ - Creates the scheduler. - @param max_processes: Indeed is the total number of processes that will be used for the scheduling parallelization - plus one (which is representing the current process). - @param functions: @see SerialScheduler - """ - SerialScheduler.__init__(self,functions) - self.number_of_processes = max_processes - 1 - self.running = [] - - def run(self): - """ - Like in the SerialScheduler, this function tries to run all the tasks, checking their dependencies. In this case - some processes will be spawned so that they can share the work of executing the tasks. - This run function acts as the real scheduler, telling the 'task executor' objects which task to run. This kind - of dynamic scheduling fosters an efficient use of the resources (every time a 'task executor' ends a task, it is - told to run another one, so that load is balanced). - This is a simple implementation of a master-slave pattern (where slaves are the task runners). - """ - self.function_exec('scheduling_started', {"number_of_tasks":len(self.not_completed)}) - - # Create processes - available_workers = self.number_of_processes - task_runners = [] - for i in range(available_workers): - process_name = "TaskExecutor"+str(i) - runner = TaskRunner(process_name, run_task, self.tasks) - runner.run() - task_runners.append(runner) - - # Execute all tasks - while not len(self.finished) == len(self.tasks): - cannot_choose_a_task = False - - # Choose an available process - task_name = self.choose_runnable_task() - - # Try to execute it - if task_name is not None: - # If we can still execute a task we find a free task runner to do it - for task_runner in task_runners: - if not task_runner.busy: - self.function_exec('task_started', {"task_name":task_name}) - task_runner.execute_task(task_name) - self.lock_task(task_name) # Ensure that it can't be selected again until task is finished - self.running.append(task_name) - break - else: - cannot_choose_a_task = True - - if cannot_choose_a_task or len(self.running) == available_workers: - # If there is not an available task (so all remaining tasks have dependencies) or - # we do not have any available worker, it's time to block until we receive results. - - # We start polling busy runners pipes to wait for a result and add this result to the - # results list - task_finished = False - while not task_finished: - for task_runner in task_runners: - if task_runner.busy and task_runner.has_an_incomming_message(): - message, value = task_runner.get_message() - if message == "TASK FINISHED": - task_name, result = value - self.function_exec('task_ended', {"task_name":task_name, "finished":len(self.finished)}) - self.running.remove(task_name) - self.complete_task(task_name) - self.remove_from_dependencies(task_name) - task_runner.set_task_finished() - self.results.append(result) - else: - printnflush ( "Unexpected message: %s"%message) - exit() - task_finished = True - - printnflush ("Sending processes termination message.") - - for task_runner in task_runners: - task_runner.finalize() - - self.function_exec('scheduling_ended') - - return self.results diff --git a/Tools/ART/python/ART/serialScheduler.py b/Tools/ART/python/ART/serialScheduler.py deleted file mode 100644 index 9272f8eff6f325652d262d0f60d81795369e581a..0000000000000000000000000000000000000000 --- a/Tools/ART/python/ART/serialScheduler.py +++ /dev/null @@ -1,177 +0,0 @@ -''' -Created on 16/08/2012 - - * Repository : https://github.com/victor-gil-sepulveda/pyScheduler - * Licensed under the MIT license (see LICENSE-MIT) - * Copyright (C) 2013 Victor Alejandro Gil Sepulveda - -@author: victor -''' -class Task(object): - """ - Representation of a task. - """ - def __init__(self, function, name, kwargs, description = ""): - """ - Creates a Task object. - @param function: A callable object that will perform the real work of the task. - @param name: The name of the task (an identifier). - @param kkwargs: Parameters for the callable. - @param description: A short description of what the task does. Can be empty. - """ - self.function = function - self.name = name - self.kwargs = kwargs - self.result = None - - def run(self): - """ - Runs the task's associated callable and returns its result. - @return: The result of the callable execution. - """ - self.result = self.function(**(self.kwargs)) - return self.result - -class SerialScheduler(object): - """ - Base scheduling class. It ensures that no task is executed before its dependencies (without building a - dependency tree). - It allows to define some functions that will be executed when the scheduler reaches some strategic points. - TODO: In all scheduler types a dependencies must be checked to avoid cycles for instance. - """ - - def __init__(self, functions = {}): - """ - Constructor. Initializes needed variables. - - @param fucntions: A dictionary containing 3 possible keys. Each key defines another dictionary of two - entries ('function' and 'kwargs') with a callable and its arguments. The possible keys are: - 'task_started' -> Were an action performed after each task is called is defined. - 'task_ended' -> Defines the action performed when a task is finished. - 'scheduling_started' -> Defines the action performed when the scheduler starts to run tasks. - 'scheduling_ended' -> Defines the action performed when the scheduler has finished to run all tasks. - """ - self.functions = functions - self.tasks = {} - self.dependencies = {} - self.not_completed = [] - self.finished = [] - self.results = [] - - def function_exec(self, function_type, info = None): - """ - Execute one of the predefined functions if defined. - - @param function_type: Type of the function to check and run (proper types should be 'task_start','task_end' - and 'scheduling_end', each defining 'function' and 'kwargs' entries. - - """ - if function_type in self.functions: - self.functions[function_type]['kwargs']['info'] = info - self.functions[function_type]['function'](**(self.functions[function_type]['kwargs'])) - - def run(self): - """ - Runs all the tasks in a way that tasks are not executed before their dependencies are - cleared. - - @return: An array with the results of task calculations. - """ - self.function_exec('scheduling_started', {"number_of_tasks":len(self.not_completed)}) - - ordered_tasks = self.get_ordered_tasks() - - for task in ordered_tasks: - self.function_exec('task_started', {"task_name":task.name}) - self.results.append(task.run()) - self.function_exec('task_ended', {"task_name":task.name, "finished":len(self.finished)}) - - self.function_exec('scheduling_ended') - - return self.results - - def get_ordered_tasks(self): - """ - Returns a list of task names so that any task name will have an index bigger than the tasks it depends on. - - @return: A list of task names. - """ - ordered_tasks = [] - while len( self.not_completed) > 0: - #Choose an available process - task_name = self.choose_runnable_task() - - if task_name is None: - print "It was impossible to pick a suitable task for running. Check dependencies." - return [] - else: - # Run a process - ordered_tasks.append(self.tasks[task_name]) - self.lock_task(task_name) - self.complete_task(task_name) - self.remove_from_dependencies(task_name) - return ordered_tasks - - def choose_runnable_task(self): - """ - Returns a task name which dependencies have already been fulfilled. - - @return: The task name. - """ - for task_name in self.not_completed: - if len(self.dependencies[task_name]) == 0: # This process has no dependencies - return task_name; - return None # All task have dependencies (circular dependencies for instance) - - - def lock_task(self, task_name): - """ - Removes a task from the 'not complete list' making it unavailable for further selections. - - @param task_name: The name of the task to lock. - """ - # Remove it from the not_completed list - self.not_completed.remove(task_name) - - def complete_task(self, task_name): - """ - Adds a task to the list of completed tasks. - - @param task_name: The name of the task to complete. - """ - self.finished.append(task_name) - - def remove_from_dependencies(self, task_name): - """ - Removes a task from the dependencies of all other uncomplete tasks. At the end of execution, all dependency - lists must be empty. - - @param task_name: The name of the task to remove from dependencies. - """ - for tn in self.dependencies: - if task_name in self.dependencies[tn]: - self.dependencies[tn].remove(task_name) - - def add_task(self, task_name, dependencies, target_function, function_kwargs, description): - """ - Adds a task to the scheduler. The task will be executed along with the other tasks when the 'run' function is called. - - @param task_name: - @param dependencies: A list with the task_names of the tasks that must be fulfilled before executing this other task. - Example of dependencies dictionary: - {"task_C":["dep_task_A", "dep_task_B"]} - This dependencies dict. means that task C cannot be run until task B and A are cleared. - @param target_function: The function executed by this task. - @param function_kwargs: Its arguments. - @param description: A brief description of the task. - """ - - if not task_name in self.tasks: - task = Task( name = task_name, description = description, function = target_function, kwargs=function_kwargs) - task.description = description - self.tasks[task_name] = task - self.not_completed.append(task_name) - self.dependencies[task_name] = dependencies - else: - print "[Error SerialScheduler::add_task] Task %s already exists. Task name must be unique."%task_name - exit() diff --git a/Tools/ART/scripts/art-diff.py b/Tools/ART/scripts/art-diff.py index ebce80e5877f3790b2cdd3d550a47377ff039ed2..380e296d8ff45dc09beacf92c0290900dcc6e1a5 100755 --- a/Tools/ART/scripts/art-diff.py +++ b/Tools/ART/scripts/art-diff.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +# Copyright (C) 2002-2018 CERN for the benefit of the ATLAS collaboration """ ART - ATLAS Release Tester - Diff. diff --git a/Tools/ART/scripts/art-download.sh b/Tools/ART/scripts/art-download.sh new file mode 100755 index 0000000000000000000000000000000000000000..90f8c4dc268f58a01c72d74c3974f1c67bfbf455 --- /dev/null +++ b/Tools/ART/scripts/art-download.sh @@ -0,0 +1,34 @@ +#!/bin/bash +# Copyright (C) 2002-2018 CERN for the benefit of the ATLAS collaboration +# +# NOTE do NOT run with /bin/bash -x as the output is too big for gitlab-ci +# arguments: INPUTNAME +# +# author : Tulay Cuhadar Donszelmann <tcuhadar@cern.ch> +# +# example: art-download NAME DIRECTORY + +if [ $# -ne 2 ]; then + echo 'Usage: art-get-input.sh NAME DIRECTORY' + exit 1 +fi + +NAME=$1 +shift +DIRECTORY=$1 +shift + +export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase +source $ATLAS_LOCAL_ROOT_BASE/user/atlasLocalSetup.sh + +unset ALRB_noGridMW + +lsetup -f rucio + +echo "Name: ${NAME}" +echo "Directory: ${DIRECTORY}" + +# Do not use: rucio delivers warnings as exit code 127 +#set -e + +rucio download --dir ${DIRECTORY} ${NAME} diff --git a/Tools/ART/scripts/art-internal.py b/Tools/ART/scripts/art-internal.py index ca83d03937a04357ddc09d41fa5ab322bb92520a..7eb30f6791318a0397ab4512a401bbbd9c82114d 100755 --- a/Tools/ART/scripts/art-internal.py +++ b/Tools/ART/scripts/art-internal.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +# Copyright (C) 2002-2018 CERN for the benefit of the ATLAS collaboration """ ART-internal - ATLAS Release Tester (internal command). diff --git a/Tools/ART/scripts/art-share.py b/Tools/ART/scripts/art-share.py index 9d776fc777d0fb5eb0f2b99d0f80552fb527b070..46732ebc00f8a2393022bca5ae4e41c02cf5a32f 100755 --- a/Tools/ART/scripts/art-share.py +++ b/Tools/ART/scripts/art-share.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +# Copyright (C) 2002-2018 CERN for the benefit of the ATLAS collaboration """ ART - ATLAS Release Tester - Share. diff --git a/Tools/ART/scripts/art-task-build.sh b/Tools/ART/scripts/art-task-build.sh index 3899d37b1e13e8f946741b6c16956b28b9b293a4..86b6693752b25ef590f86b987a8ed73de16e2d89 100755 --- a/Tools/ART/scripts/art-task-build.sh +++ b/Tools/ART/scripts/art-task-build.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +# Copyright (C) 2002-2018 CERN for the benefit of the ATLAS collaboration # arguments: RELEASE_BASE, PROJECT, PLATFORM, DATESTAMP # author : Tulay Cuhadar Donszelmann <tcuhadar@cern.ch>, Emil Obreshkov <Emil.Obreshkov@cern.ch> diff --git a/Tools/ART/scripts/art-task-grid.sh b/Tools/ART/scripts/art-task-grid.sh index 807cbe821096e6b7ffe21bbf28e77dce6b00ecb3..2b0207205d56e920dbf614d4d25709af58e69973 100755 --- a/Tools/ART/scripts/art-task-grid.sh +++ b/Tools/ART/scripts/art-task-grid.sh @@ -1,13 +1,13 @@ #!/bin/bash -# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +# Copyright (C) 2002-2018 CERN for the benefit of the ATLAS collaboration # # NOTE do NOT run with /bin/bash -x as the output is too big for gitlab-ci # # Example command lines for three types: # -# art-task-grid.sh [--no-action --skip-setup] batch <submit_directory> <script_directory> <sequence_tag> <package> <outfile> <job_type> <number_of_tests> +# art-task-grid.sh [--no-action] batch <submit_directory> <script_directory> <sequence_tag> <package> <outfile> <job_type> <number_of_tests> # -# art-task-grid.sh [--no-action --skip-setup] single [--inds <input_file> --n-files <number_of_files> --split <split>] <submit_directory> <script_directory> <sequence_tag> <package> <outfile> <job_name> +# art-task-grid.sh [--no-action] single [--inds <input_file> --n-files <number_of_files> --split <split>] <submit_directory> <script_directory> <sequence_tag> <package> <outfile> <job_name> # # env: ART_GRID_OPTIONS # @@ -15,7 +15,7 @@ # # options have to be in-order, and at the correct place # -# example: [--skip-setup --test-name TestName --inDS user.tcuhadar.SingleMuon... --nFiles 3 --in] tmp /cvmfs/atlas-nightlies.cern.ch/sw/... Tier0ChainTests grid 316236 3 user.${USER}.atlas.${NIGHTLY_RELEASE_SHORT}.${PROJECT}.${PLATFORM}.${NIGHTLY_TAG}.${SEQUENCE_TAG}.${PACKAGE}[.${TEST_NUMBER}] +# example: [--test-name TestName --inDS user.tcuhadar.SingleMuon... --nFiles 3 --in] tmp /cvmfs/atlas-nightlies.cern.ch/sw/... Tier0ChainTests grid 316236 3 user.${USER}.atlas.${NIGHTLY_RELEASE_SHORT}.${PROJECT}.${PLATFORM}.${NIGHTLY_TAG}.${SEQUENCE_TAG}.${PACKAGE}[.${TEST_NUMBER}] set -e echo "art-task-grid.sh executed by $(whoami) on $(date)" @@ -26,12 +26,6 @@ if [ $1 == "--no-action" ]; then shift echo "NO_ACTION=${NO_ACTION}" fi -SKIP_SETUP=0 -if [ $1 == "--skip-setup" ]; then - SKIP_SETUP=1 - shift - echo "SKIP_SETUP=${SKIP_SETUP}" -fi TYPE=$1 shift @@ -130,26 +124,6 @@ GRID_OPTIONS=$ART_GRID_OPTIONS echo "GRID_OPTIONS=${GRID_OPTIONS}" -if [ ${SKIP_SETUP} -eq 0 ]; then - # maybe not necessary - PLATFORM=${AtlasProject}_PLATFORM - echo "Setting up release: ${!PLATFORM} ${AtlasBuildBranch} ${AtlasBuildStamp} ${AtlasProject} " - USER=artprod - - export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase - source $ATLAS_LOCAL_ROOT_BASE/user/atlasLocalSetup.sh || true - - export RUCIO_ACCOUNT=artprod - - echo "Setting up panda and release" - lsetup panda "asetup --platform=${!PLATFORM} ${AtlasBuildBranch},${AtlasBuildStamp},${AtlasProject}" || true - echo "Setting up panda and release done" - - voms-proxy-init --rfc -noregen -cert ./grid.proxy -voms atlas --valid 24:00 || true - echo "Setting up proxy done" - -fi - case ${TYPE} in 'batch') diff --git a/Tools/ART/scripts/art.py b/Tools/ART/scripts/art.py index 7cc3486297d01ace0fa55f556ad7281739d63ffe..5d348306ce3a52dabd5ce2b7d71caf1ad3f73548 100755 --- a/Tools/ART/scripts/art.py +++ b/Tools/ART/scripts/art.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +# Copyright (C) 2002-2018 CERN for the benefit of the ATLAS collaboration """ ART - ATLAS Release Tester. @@ -7,15 +7,14 @@ You need to setup for an ATLAS release before using ART. Usage: art.py run [-v -q --type=<T> --max-jobs=<N> --ci] <script_directory> <sequence_tag> - art.py grid [-v -q --type=<T> -n] <script_directory> <sequence_tag> - art.py submit [-v -q --type=<T> -n --config=<file>] <sequence_tag> [<package>] - art.py copy [-v -q --user=<user> --dst=<dir>] <package> + art.py grid [-v -q --type=<T> --max-jobs=<N> --config=<file> --copy -n] <script_directory> <sequence_tag> + art.py submit [-v -q --type=<T> --max-jobs=<N> --config=<file> -n] <sequence_tag> [<package>] + art.py copy [-v -q --user=<user> --dst=<dir>] <indexed_package> art.py validate [-v -q] <script_directory> art.py included [-v -q --type=<T> --test-type=<TT>] <script_directory> art.py compare grid [-v -q --days=<D> --user=<user> --entries=<entries>] <package> <test_name> art.py compare ref [-v -q --entries=<entries>] <path> <ref_path> - art.py download [-v -q] <input_file> - art.py list grid [-v -q --user=<user> --json --type=<T> --test-type=<TT> --nogrid] <package> + art.py list grid [-v -q --user=<user> --json --test-type=<TT>] <package> art.py log grid [-v -q --user=<user>] <package> <test_name> art.py output grid [-v -q --user=<user>] <package> <test_name> art.py config [-v -q --config=<file>] <package> @@ -23,6 +22,7 @@ Usage: Options: --ci Run Continuous Integration tests only (using env: AtlasBuildBranch) --config=<file> Use specific config file [default: art-configuration.yml] + --copy Run the copy after running the jobs --days=<D> Number of days ago to pick up reference for compare [default: 1] --dst=<dir> Destination directory for downloaded files --entries=<entries> Number of entries to compare [default: 10] @@ -30,7 +30,6 @@ Options: --json Output in json format --max-jobs=<N> Maximum number of concurrent jobs to run [default: 0] -n --no-action No real submit will be done - --nogrid Do not retrieve grid indices -q --quiet Show less information, only warnings and errors --test-type=<TT> Type of test (e.g. all, batch or single) [default: all] --type=<T> Type of job (e.g. grid, build) @@ -46,14 +45,13 @@ Sub-commands: validate Check headers in tests included Show list of files which will be included for art submit/art grid compare Compare the output of a job - download Download a file from rucio list List the jobs of a package log Show the log of a job output Get the output of a job config Show configuration Arguments: - input_file Input file to download (e.g. CONTAINER_ID:ENTRY_NAME) + indexed_package Package of the test or indexed package (e.g. MooPerformance.4) package Package of the test (e.g. Tier0ChainTests) path Directory or File to compare ref_path Directory or File to compare to @@ -73,7 +71,7 @@ Tests are called with: """ __author__ = "Tulay Cuhadar Donszelmann <tcuhadar@cern.ch>" -__version__ = '0.7.21' +__version__ = '0.8.20' import logging import os @@ -110,7 +108,7 @@ def compare_grid(package, test_name, **kwargs): days = int(kwargs['days']) entries = kwargs['entries'] user = kwargs['user'] - exit(ArtGrid(art_directory, nightly_release, project, platform, nightly_tag).compare(package, test_name, days, user, entries)) + exit(ArtGrid(art_directory, nightly_release, project, platform, nightly_tag).compare(package, test_name, days, user, entries=entries, shell=True)) @dispatch.on('list', 'grid') @@ -119,12 +117,11 @@ def list(package, **kwargs): set_log(kwargs) art_directory = os.path.dirname(os.path.realpath(sys.argv[0])) (nightly_release, project, platform, nightly_tag) = get_atlas_env() - job_type = 'grid' if kwargs['type'] is None else kwargs['type'] + job_type = 'grid' index_type = kwargs['test_type'] json_format = kwargs['json'] user = kwargs['user'] - nogrid = kwargs['nogrid'] - exit(ArtGrid(art_directory, nightly_release, project, platform, nightly_tag).list(package, job_type, index_type, json_format, user, nogrid)) + exit(ArtGrid(art_directory, nightly_release, project, platform, nightly_tag).list(package, job_type, index_type, json_format, user)) @dispatch.on('log', 'grid') @@ -158,7 +155,7 @@ def submit(sequence_tag, **kwargs): config = kwargs['config'] no_action = kwargs['no_action'] wait_and_copy = True - exit(ArtGrid(art_directory, nightly_release, project, platform, nightly_tag).task_list(job_type, sequence_tag, package, no_action, wait_and_copy, config)) + exit(ArtGrid(art_directory, nightly_release, project, platform, nightly_tag, max_jobs=int(kwargs['max_jobs'])).task_list(job_type, sequence_tag, package, no_action, wait_and_copy, config)) @dispatch.on('grid') @@ -169,10 +166,10 @@ def grid(script_directory, sequence_tag, **kwargs): (nightly_release, project, platform, nightly_tag) = get_atlas_env() job_type = 'grid' if kwargs['type'] is None else kwargs['type'] package = None + config = kwargs['config'] no_action = kwargs['no_action'] - wait_and_copy = False - config = None - exit(ArtGrid(art_directory, nightly_release, project, platform, nightly_tag, script_directory, True).task_list(job_type, sequence_tag, package, no_action, wait_and_copy, config)) + wait_and_copy = kwargs['copy'] + exit(ArtGrid(art_directory, nightly_release, project, platform, nightly_tag, script_directory=script_directory, skip_setup=True, max_jobs=int(kwargs['max_jobs'])).task_list(job_type, sequence_tag, package, no_action, wait_and_copy, config)) @dispatch.on('run') @@ -186,7 +183,7 @@ def run(script_directory, sequence_tag, **kwargs): @dispatch.on('copy') -def copy(package, **kwargs): +def copy(indexed_package, **kwargs): """Copy outputs to eos area.""" set_log(kwargs) art_directory = os.path.dirname(os.path.realpath(sys.argv[0])) @@ -194,7 +191,7 @@ def copy(package, **kwargs): # NOTE: default depends on USER, not set it here but in ArtGrid.copy dst = kwargs['dst'] user = kwargs['user'] - exit(ArtGrid(art_directory, nightly_release, project, platform, nightly_tag).copy(package, dst, user)) + exit(ArtGrid(art_directory, nightly_release, project, platform, nightly_tag).copy(indexed_package, dst=dst, user=user)) @dispatch.on('validate') @@ -211,9 +208,9 @@ def included(script_directory, **kwargs): set_log(kwargs) art_directory = os.path.dirname(os.path.realpath(sys.argv[0])) (nightly_release, project, platform, nightly_tag) = get_atlas_env() - art_type = 'grid' if kwargs['type'] is None else kwargs['type'] + job_type = kwargs['type'] # None will list all types index_type = kwargs['test_type'] - exit(ArtBase(art_directory).included(script_directory, art_type, index_type, nightly_release, project, platform)) + exit(ArtBase(art_directory).included(script_directory, job_type, index_type, nightly_release, project, platform)) @dispatch.on('config') @@ -226,14 +223,6 @@ def config(package, **kwargs): exit(ArtBase(art_directory).config(package, nightly_release, project, platform, config)) -@dispatch.on('download') -def download(input_file, **kwargs): - """Download a file from rucio.""" - set_log(kwargs) - art_directory = os.path.dirname(os.path.realpath(sys.argv[0])) - exit(ArtBase(art_directory).download(input_file)) - - if __name__ == '__main__': if sys.version_info < (2, 7, 0): sys.stderr.write("You need python 2.7 or later to run this script\n")