diff --git a/Tools/ART/CMakeLists.txt b/Tools/ART/CMakeLists.txt index b512a3b480401e17a73fce77be40da767abd2136..04346a74bffff9b1583132998cf5cfae15c75ec4 100644 --- a/Tools/ART/CMakeLists.txt +++ b/Tools/ART/CMakeLists.txt @@ -10,7 +10,7 @@ atlas_depends_on_subdirs( PRIVATE TestPolicy ) # Install files from the package: -#atlas_install_python_modules( python/*.py ) +atlas_install_python_modules( python/*.py ) atlas_install_scripts( scripts/*.py scripts/*.sh ) diff --git a/Tools/ART/LICENSE-MIT b/Tools/ART/LICENSE-MIT new file mode 100644 index 0000000000000000000000000000000000000000..501495f7f14cfd8c882c3b70d4219b0a344985db --- /dev/null +++ b/Tools/ART/LICENSE-MIT @@ -0,0 +1,21 @@ +Permission is hereby granted, free of charge, to any person +obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the Software +without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, +and/or sell copies of the Software, and to permit persons to +whom the Software is furnished to do so, subject to the +following conditions: + +The above copyright notice and this permission notice shall +be included in all copies or substantial portions of the +Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY +KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE +WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR +PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/Tools/ART/python/art_base.py b/Tools/ART/python/art_base.py new file mode 100755 index 0000000000000000000000000000000000000000..72658267b2b89f89226912b2f01b569b6f40c28c --- /dev/null +++ b/Tools/ART/python/art_base.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python +# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +"""TBD.""" +__author__ = "Tulay Cuhadar Donszelmann <tcuhadar@cern.ch>" + +import fnmatch +import inspect +import os +import re +import sys +import yaml + +from art_misc import check, run_command + + +class ArtBase(object): + """TBD.""" + + def __init__(self): + """TBD.""" + pass + + def task_list(self, type, sequence_tag): + """TBD.""" + self.not_implemented() + + def task(self, package, type, sequence_tag): + """TBD.""" + self.not_implemented() + + def job(self, package, type, sequence_tag, index, out): + """TBD.""" + self.not_implemented() + + def compare(self, package, test_name, days, file_names): + """TBD.""" + self.not_implemented() + + def list(self, package, type): + """TBD.""" + self.not_implemented() + + def log(self, package, test_name): + """TBD.""" + self.not_implemented() + + def output(self, package, test_name, file_name): + """TBD.""" + self.not_implemented() + + def included(self): + """TBD.""" + self.not_implemented() + + def wait_for(self): + """TBD.""" + self.not_implemented() + + # + # Default implementations + # + def compare_ref(self, file_name, ref_file, entries=-1): + """TBD.""" + out = check(run_command("acmd.py diff-root " + file_name + " " + ref_file + " --error-mode resilient --ignore-leaves RecoTimingObj_p1_HITStoRDO_timings RecoTimingObj_p1_RAWtoESD_mems RecoTimingObj_p1_RAWtoESD_timings RAWtoESD_mems RAWtoESD_timings ESDtoAOD_mems ESDtoAOD_timings HITStoRDO_timings RAWtoALL_mems RAWtoALL_timings RecoTimingObj_p1_RAWtoALL_mems RecoTimingObj_p1_RAWtoALL_timings --entries " + str(entries))) + print out + sys.stdout.flush() + + # + # Protected Methods + # + def get_config(self): + """Retrieve dictionary of ART configuration file.""" + config_file = open("art-configuration.yml", "r") + config = yaml.load(config_file) + config_file.close() + return config + + def get_art_headers(self, filename): + """Return dictionary with art headers.""" + result = {} + for line in open(filename, "r"): + line_match = re.match(r'#\s*art-(\w+):\s+(.+)$', line) + if line_match: + result[line_match.group(1)] = line_match.group(2) + return result + + def get_files(self, directory, type): + """Return a list of all test files matching 'test_*.sh' of given 'queue'.""" + result = [] + if directory is not None: + files = os.listdir(directory) + files.sort() + for fname in files: + if fnmatch.fnmatch(fname, 'test_*.sh') or fnmatch.fnmatch(fname, 'test_*.py'): + headers = self.get_art_headers(os.path.join(directory, fname)) + if 'type' in headers and headers['type'] == type: + result.append(fname) + return result + + def get_type(self, directory, test_name): + """Return the 'type' of a test.""" + headers = self.get_art_headers(os.path.join(directory, test_name)) + return None if 'type' not in headers else headers['type'] + + def get_test_directories(self, directory): + """ + Search from '<directory>...' for '<package>/test' directories. + + A dictionary key=<package>, value=<directory> is returned + """ + result = {} + for root, dirs, files in os.walk(directory): + if root.endswith('/test'): + package = os.path.basename(os.path.dirname(root)) + result[package] = root + return result + + def get_list(self, directory, package, type): + """Return a list of tests for a particular package.""" + test_directories = self.get_test_directories(directory) + test_dir = test_directories[package] + return self.get_files(test_dir, type) + + # + # Private Methods + # + def not_implemented(self): + """TBD.""" + raise NotImplementedError("Class %s doesn't implement method: %s(...)" % (self.__class__.__name__, inspect.stack()[1][3])) diff --git a/Tools/ART/python/art_grid.py b/Tools/ART/python/art_grid.py new file mode 100644 index 0000000000000000000000000000000000000000..9cc0166f013e44859a0378c7e7e806eae2b0c4ef --- /dev/null +++ b/Tools/ART/python/art_grid.py @@ -0,0 +1,325 @@ +#!/usr/bin/env python +# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +"""TBD.""" + +__author__ = "Tulay Cuhadar Donszelmann <tcuhadar@cern.ch>" + +import datetime +import os +import re +import shutil +import sys +import tarfile +import time + +from art_base import ArtBase +from art_misc import mkdir_p, make_executable, check, run_command, count_string_occurrence + + +class ArtGrid(ArtBase): + """TBD.""" + + def __init__(self, nightly_release, project, platform, nightly_tag): + """TBD.""" + self.nightly_release = nightly_release + self.project = project + self.platform = platform + self.nightly_tag = nightly_tag + self.cvmfs_directory = '/cvmfs/atlas-nightlies.cern.ch/repo/sw' + self.script_directory = None + + def get_script_directory(self): + """On demand script directory, only to be called if directory exists.""" + if self.script_directory is None: + self.script_directory = self.cvmfs_directory + self.script_directory = os.path.join(self.script_directory, self.nightly_release, self.nightly_tag, self.project) + self.script_directory = os.path.join(self.script_directory, os.listdir(self.script_directory)[0]) # e.g. 21.0.3 + self.script_directory = os.path.join(self.script_directory, os.listdir(self.script_directory)[0], self.platform) # InstallArea/x86_64-slc6-gcc62-opt + return self.script_directory + + def task_list(self, type, sequence_tag): + """TBD.""" + # get the path of the art.py script + art_dir = os.path.dirname(os.path.realpath(sys.argv[0])) + + # job will be submitted from tmp directory + submit_directory = 'tmp' + + # make sure tmp is removed + if os.path.exists(submit_directory): + shutil.rmtree(submit_directory) + + config = self.get_config() + + # make sure script directory exist + if not os.path.isdir(self.get_script_directory()): + print 'ERROR: script directory does not exist: %s' % self.get_script_directory() + print 'art-status: error' + sys.stdout.flush() + exit(1) + + # get the test_*.sh from the test directory + test_directories = self.get_test_directories(self.get_script_directory()) + if not test_directories: + print 'No tests found in directories ending in "test"' + sys.stdout.flush() + + for package, root in test_directories.items(): + if self.excluded(config, package): + print 'Excluding ' + package + ' for ' + self.nightly_release + ' project ' + self.project + ' on ' + self.platform + print 'art-package: ' + package + print 'art-status: excluded' + sys.stdout.flush() + else: + shell_files = self.get_files(root, type) + number_of_tests = len(shell_files) + if number_of_tests > 0: + print 'art-package: ' + package + print 'art-status: included' + print 'root' + root + print 'Handling ' + package + ' for ' + self.nightly_release + ' project ' + self.project + ' on ' + self.platform + print "Number of tests: " + str(number_of_tests) + sys.stdout.flush() + submit_dir = os.path.join(submit_directory, package) + run = os.path.join(submit_dir, "run") + mkdir_p(run) + + shutil.copy(os.path.join(art_dir, 'art.py'), run) + shutil.copy(os.path.join(art_dir, 'art-internal.py'), run) + shutil.copy(os.path.join(art_dir, 'art_base.py'), run) + shutil.copy(os.path.join(art_dir, 'art_local.py'), run) + shutil.copy(os.path.join(art_dir, 'art_grid.py'), run) + shutil.copy(os.path.join(art_dir, 'art_batch.py'), run) + shutil.copy(os.path.join(art_dir, 'art_misc.py'), run) + shutil.copy(os.path.join(art_dir, 'serialScheduler.py'), run) + shutil.copy(os.path.join(art_dir, 'parallelScheduler.py'), run) + shutil.copy(os.path.join(art_dir, 'docopt.py'), run) + shutil.copy(os.path.join(art_dir, 'docopt_dispatch.py'), run) + + make_executable(os.path.join(run, 'art.py')) + make_executable(os.path.join(run, 'art-internal.py')) + + command = os.path.join(art_dir, 'art-internal.py') + ' task grid ' + package + ' ' + type + ' ' + sequence_tag + ' ' + self.nightly_release + ' ' + self.project + ' ' + self.platform + ' ' + self.nightly_tag + print command + sys.stdout.flush() + out = check(run_command(command)) + print out + sys.stdout.flush() + + def task(self, package, type, sequence_tag): + """TBD.""" + print 'Running art task' + sys.stdout.flush() + + # get the path of the art.py script + art_dir = os.path.dirname(os.path.realpath(sys.argv[0])) + + number_of_tests = len(self.get_list(self.get_script_directory(), package, type)) + + print self.nightly_release + " " + self.project + " " + self.platform + " " + self.nightly_tag + " " + sequence_tag + " " + package + " " + type + " " + str(number_of_tests) + sys.stdout.flush() + + # run task from Bash Script as is needed in ATLAS setup + # FIXME we need to parse the output + out = check(run_command(os.path.join(art_dir, 'art-task-grid.sh') + " " + package + " " + type + " " + sequence_tag + " " + str(number_of_tests) + " " + self.nightly_release + " " + self.project + " " + self.platform + " " + self.nightly_tag)) + print out + sys.stdout.flush() + + def job(self, package, type, sequence_tag, index, out): + """TBD.""" + print 'Running art job grid' + sys.stdout.flush() + + index = int(index) + + print self.nightly_release + " " + self.project + " " + self.platform + " " + self.nightly_tag + " " + package + " " + type + " " + str(index) + " " + out + sys.stdout.flush() + + test_directories = self.get_test_directories(self.get_script_directory()) + test_dir = test_directories[package] + test_list = self.get_files(test_dir, type) + + # minus one for grid + test_name = test_list[index - 1] + test_file = os.path.join(test_dir, test_name) + com = '%s %s %s %s %s %s %s %s %s' % (test_file, self.get_script_directory(), package, type, test_name, self.nightly_release, self.project, self.platform, self.nightly_tag) + + print test_name + print test_dir + print com + sys.stdout.flush() + + # run the test + print check(run_command(com)) + sys.stdout.flush() + + # pick up the output + # FIXME for other outputs + tar_file = tarfile.open(out, mode='w') + with open(test_file, "r") as f: + for line in f: + for word in line.split(): + out_name = re.findall("--output.*=(.*)", word) + if (out_name): + if os.path.exists(out_name[0]): + tar_file.add(out_name[0]) + tar_file.close() + + def list(self, package, type): + """TBD.""" + jobs = self.get_list(self.get_script_directory(), package, type) + i = 1 + for job in jobs: + print str(i) + ' ' + job + sys.stdout.flush() + i += 1 + + def log(self, package, test_name): + """TBD.""" + tar = self.get_tar(package, test_name, '.log') + + for name in tar.getnames(): + if 'athena_stdout.txt' in name: + f = tar.extractfile(name) + content = f.read() + print content + break + tar.close() + + def output(self, package, test_name, file_name): + """TBD.""" + tar = self.get_tar(package, test_name, '_EXT0') + + for member in tar.getmembers(): + if file_name in member.name: + tar.extractall(path='.', members=[member]) + break + tar.close() + + def included(self): + """TBD.""" + package_name = "__name_never_used__" + + if self.excluded(self.get_config(), package_name): + print 'Excluding ' + 'all' + ' for ' + self.nightly_release + ' project ' + self.project + ' on ' + self.platform + print 'art-status: excluded' + sys.stdout.flush() + return 1 + else: + print 'art-status: included' + return 0 + + def wait_for(self): + """TBD.""" + directory = os.path.join(self.cvmfs_directory, self.nightly_release, self.nightly_tag) + path = os.path.join(directory, self.nightly_release + "__" + self.project + "__" + self.platform + "*" + self.nightly_tag + "__*.ayum.log") + + count = 0 + needed = 1 + value = count_string_occurrence(path, "Install looks to have been successful") + print "art-status: waiting" + print path + print "count: " + str(value) + " mins: " + str(count) + sys.stdout.flush() + while (value < needed) and (count < 30): + time.sleep(60) + count += 1 + value = count_string_occurrence(path, "Install looks to have been successful") + print "count: " + str(value) + " mins: " + str(count) + sys.stdout.flush() + + if value < needed: + print "art-status: no release" + sys.stdout.flush() + return -2 + + print "art-status: setup" + sys.stdout.flush() + return 0 + + def compare(self, package, test_name, days, file_names): + """TBD.""" + previous_nightly_tag = self.get_previous_nightly_tag(days) + + ref_dir = os.path.join('.', 'ref-' + previous_nightly_tag) + mkdir_p(ref_dir) + + tar = self.get_tar(package, test_name, '_EXT0', previous_nightly_tag) + for member in tar.getmembers(): + if member.name in file_names: + tar.extractall(path=ref_dir, members=[member]) + tar.close() + + for file_name in file_names: + print "art-compare: " + previous_nightly_tag + " " + file_name + ref_file = os.path.join(ref_dir, file_name) + + self.compare_ref(file_name, ref_file, 10) + + # + # Protected Methods + # + def excluded(self, config, package): + """Based on config, decide if a release is excluded from testing.""" + if self.nightly_release not in config.keys(): + return True + + if self.project not in config[self.nightly_release].keys(): + return True + + if self.platform not in config[self.nightly_release][self.project].keys(): + return True + + excludes = config[self.nightly_release][self.project][self.platform] + if excludes is not None and package in excludes: + return True + + return False + + def get_tar(self, package, test_name, extension, nightly_tag=None): + """Open tar file for particular release.""" + if nightly_tag is None: + nightly_tag = self.nightly_tag + + type = self.get_type(self.get_test_directories(self.get_script_directory())[package], test_name) + index = self.get_list(self.get_script_directory(), package, type).index(test_name) + # Grid counts from 1 + index += 1 + + container = 'user.artprod.' + self.nightly_release + '.' + self.project + '.' + self.platform + '.' + nightly_tag + '.*.' + package + extension + print container + + out = check(run_command("rucio list-dids " + container + " --filter type=container | grep " + nightly_tag + " | sort -r | cut -d ' ' -f 2 | head -n 1")) + print out + + out = check(run_command("rucio list-files --csv " + out + " | grep " + "{0:0>6}".format(index))) + print out + + tar_name = out.split(',')[0] + out = check(run_command("rucio download " + tar_name)) + print out + + return tarfile.open(tar_name.replace(':', '/', 1)) + + def get_previous_nightly_tag(self, days): + """TBD. 21:00 is cutoff time.""" + directory = os.path.join(self.cvmfs_directory, self.nightly_release) + tags = os.listdir(directory) + tags.sort(reverse=True) + tags = [x for x in tags if re.match(r'\d{4}-\d{2}-\d{2}T\d{2}\d{2}', x)] + # print tags + found = False + for tag in tags: + if tag == self.nightly_tag: + found = True + elif found: + # check this is within days... (cutoff is 21:00, just move by 3 hours to get full days) + fmt = '%Y-%m-%dT%H%M' + offset = datetime.timedelta(hours=3) + nightly_tag_dt = datetime.datetime.strptime(self.nightly_tag, fmt) + offset + from_dt = nightly_tag_dt.replace(hour=0, minute=0, second=0, microsecond=0) - datetime.timedelta(days=days) + to_dt = from_dt + datetime.timedelta(days=1) + tag_dt = datetime.datetime.strptime(tag, fmt) + offset + if from_dt <= tag_dt and tag_dt < to_dt and os.path.isdir(os.path.join(directory, tag, self.project)): + return tag + return None diff --git a/Tools/ART/python/art_local.py b/Tools/ART/python/art_local.py new file mode 100644 index 0000000000000000000000000000000000000000..4a0a723d86ffb1eb2cec545c37a2a39bccf61eeb --- /dev/null +++ b/Tools/ART/python/art_local.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python +# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +"""TBD.""" + +__author__ = "Tulay Cuhadar Donszelmann <tcuhadar@cern.ch>" + +import os +import sys +import multiprocessing + +from art_misc import run_command, verify, redirect, mkdir_p +from art_base import ArtBase + +from parallelScheduler import ParallelScheduler + + +def run_job(sequence_tag, script_directory, package, type, index, test_name): + """TBD.""" + print "run_job", sequence_tag, script_directory, package, type, index, test_name + art_dir = os.path.dirname(os.path.realpath(sys.argv[0])) + verify(run_command(os.path.join(art_dir, './art-internal.py') + " job local " + script_directory + " " + package + " " + type + " " + sequence_tag + " " + str(index) + " " + "out")) + # print out + + +class ArtLocal(ArtBase): + """TBD.""" + + def __init__(self, script_directory, max_jobs=0): + """TBD.""" + print "ArtLocal", script_directory, max_jobs + self.script_directory = script_directory + self.max_jobs = multiprocessing.cpu_count() if max_jobs <= 0 else max_jobs + + def task_list(self, type, sequence_tag): + """TBD.""" + print "task_list", type, sequence_tag + test_directories = self.get_test_directories(self.script_directory) + for package, root in test_directories.items(): + self.task(package, type, sequence_tag) + + def task(self, package, type, sequence_tag): + """TBD.""" + print "task", package, type, sequence_tag + test_names = self.get_list(self.script_directory, package, type) + scheduler = ParallelScheduler(self.max_jobs + 1) + + index = 0 + for test_name in test_names: + scheduler.add_task(task_name="t" + str(index), dependencies=[], description="d", target_function=run_job, function_kwargs={'sequence_tag': sequence_tag, 'script_directory': self.script_directory, 'package': package, 'type': type, 'index': index, 'test_name': test_name}) + index += 1 + + scheduler.run() + + def job(self, package, type, sequence_tag, index, out): + """TBD.""" + print "job", package, type, sequence_tag, index, out + test_directories = self.get_test_directories(self.script_directory) + test_directory = os.path.abspath(test_directories[package]) + test_name = self.get_files(test_directory, type)[int(index)] + + work_directory = os.path.join(sequence_tag, package, os.path.splitext(test_name)[0]) + mkdir_p(work_directory) + + output = redirect(run_command(os.path.join(test_directory, test_name) + ' ' + '.' + ' ' + package + ' ' + type + ' ' + test_name, dir=work_directory, redirect=True)) + print output diff --git a/Tools/ART/python/art_misc.py b/Tools/ART/python/art_misc.py new file mode 100644 index 0000000000000000000000000000000000000000..e32aabee0fe33ad5d5763db19f110fba99133c65 --- /dev/null +++ b/Tools/ART/python/art_misc.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python +# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +"""TBD.""" + +__author__ = "Tulay Cuhadar Donszelmann <tcuhadar@cern.ch>" + +import errno +import glob +import os +import shlex +import subprocess +import sys + + +def run_command(cmd, dir=None, shell=False, redirect=False): + """Run the given command locally and returns the output, err and exit_code.""" + print "Execute: " + cmd + if "|" in cmd: + cmd_parts = cmd.split('|') + else: + cmd_parts = [] + cmd_parts.append(cmd) + i = 0 + p = {} + for cmd_part in cmd_parts: + cmd_part = cmd_part.strip() + if i == 0: + p[i] = subprocess.Popen(shlex.split(cmd_part), stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=dir, shell=shell) + else: + p[i] = subprocess.Popen(shlex.split(cmd_part), stdin=p[i - 1].stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=dir, shell=shell) + i = i + 1 + (output, err) = p[i - 1].communicate() + exit_code = p[0].wait() + + if redirect: + with open(os.path.join(dir, "stdout.txt"), "w") as text_file: + text_file.write(str(output)) + with open(os.path.join(dir, "stderr.txt"), "w") as text_file: + text_file.write(str(err)) + + return exit_code, str(output), str(err) + + +def check((exitcode, out, err)): + """Check exitcode and print statement and exit if needed.""" + if exitcode == 0: + print err + return out + + print "Error:", exitcode + print "StdOut:", out + print "StdErr:", err + + print 'art-status: error' + + exit(exitcode) + + +def verify((exitcode, out, err)): + """Check exitcode and print statement.""" + if exitcode == 0: + print out + return out + + print "Error:", exitcode + print "StdOut:", out + print "StdErr:", err + + print 'art-status: error' + + return exitcode + + +def redirect((exitcode, out, err)): + """Check exitcode.""" + return exitcode + + +def make_executable(path): + """Make file executable (chmod +x).""" + mode = os.stat(path).st_mode + mode |= (mode & 0o444) >> 2 # copy R bits to X + os.chmod(path, mode) + + +def mkdir_p(path): + """Make (missing) directories.""" + try: + os.makedirs(path) + except OSError as exc: # Python >2.5 + if exc.errno == errno.EEXIST and os.path.isdir(path): + pass + else: + raise + + +def which(program): + """TBD.""" + import os + + def is_exe(fpath): + """TBD.""" + return os.path.isfile(fpath) and os.access(fpath, os.X_OK) + + fpath, fname = os.path.split(program) + if fpath: + if is_exe(program): + return program + else: + for path in os.environ["PATH"].split(os.pathsep): + path = path.strip('"') + exe_file = os.path.join(path, program) + if is_exe(exe_file): + return exe_file + + return None + + +def count_string_occurrence(path, string): + """Count number of occurences of 'string' inside file 'path'. Returns count or 0.""" + for file in glob.iglob(path): + print file + sys.stdout.flush() + f = open(file) + contents = f.read() + f.close() + return contents.count(string) + return 0 diff --git a/Tools/ART/python/docopt.py b/Tools/ART/python/docopt.py new file mode 100644 index 0000000000000000000000000000000000000000..7c6a52df58a5561b0e491fda4499c548d262e36e --- /dev/null +++ b/Tools/ART/python/docopt.py @@ -0,0 +1,581 @@ +"""Pythonic command-line interface parser that will make you smile. + + * http://docopt.org + * Repository and issue-tracker: https://github.com/docopt/docopt + * Licensed under terms of MIT license (see LICENSE-MIT) + * Copyright (c) 2013 Vladimir Keleshev, vladimir@keleshev.com + +""" +import sys +import re + + +__all__ = ['docopt'] +__version__ = '0.6.2' + + +class DocoptLanguageError(Exception): + + """Error in construction of usage-message by developer.""" + + +class DocoptExit(SystemExit): + + """Exit in case user invoked program with incorrect arguments.""" + + usage = '' + + def __init__(self, message=''): + SystemExit.__init__(self, (message + '\n' + self.usage).strip()) + + +class Pattern(object): + + def __eq__(self, other): + return repr(self) == repr(other) + + def __hash__(self): + return hash(repr(self)) + + def fix(self): + self.fix_identities() + self.fix_repeating_arguments() + return self + + def fix_identities(self, uniq=None): + """Make pattern-tree tips point to same object if they are equal.""" + if not hasattr(self, 'children'): + return self + uniq = list(set(self.flat())) if uniq is None else uniq + for i, child in enumerate(self.children): + if not hasattr(child, 'children'): + assert child in uniq + self.children[i] = uniq[uniq.index(child)] + else: + child.fix_identities(uniq) + + def fix_repeating_arguments(self): + """Fix elements that should accumulate/increment values.""" + either = [list(child.children) for child in transform(self).children] + for case in either: + for e in [child for child in case if case.count(child) > 1]: + if type(e) is Argument or type(e) is Option and e.argcount: + if e.value is None: + e.value = [] + elif type(e.value) is not list: + e.value = e.value.split() + if type(e) is Command or type(e) is Option and e.argcount == 0: + e.value = 0 + return self + + +def transform(pattern): + """Expand pattern into an (almost) equivalent one, but with single Either. + + Example: ((-a | -b) (-c | -d)) => (-a -c | -a -d | -b -c | -b -d) + Quirks: [-a] => (-a), (-a...) => (-a -a) + + """ + result = [] + groups = [[pattern]] + while groups: + children = groups.pop(0) + parents = [Required, Optional, OptionsShortcut, Either, OneOrMore] + if any(t in map(type, children) for t in parents): + child = [c for c in children if type(c) in parents][0] + children.remove(child) + if type(child) is Either: + for c in child.children: + groups.append([c] + children) + elif type(child) is OneOrMore: + groups.append(child.children * 2 + children) + else: + groups.append(child.children + children) + else: + result.append(children) + return Either(*[Required(*e) for e in result]) + + +class LeafPattern(Pattern): + + """Leaf/terminal node of a pattern tree.""" + + def __init__(self, name, value=None): + self.name, self.value = name, value + + def __repr__(self): + return '%s(%r, %r)' % (self.__class__.__name__, self.name, self.value) + + def flat(self, *types): + return [self] if not types or type(self) in types else [] + + def match(self, left, collected=None): + collected = [] if collected is None else collected + pos, match = self.single_match(left) + if match is None: + return False, left, collected + left_ = left[:pos] + left[pos + 1:] + same_name = [a for a in collected if a.name == self.name] + if type(self.value) in (int, list): + if type(self.value) is int: + increment = 1 + else: + increment = ([match.value] if type(match.value) is str + else match.value) + if not same_name: + match.value = increment + return True, left_, collected + [match] + same_name[0].value += increment + return True, left_, collected + return True, left_, collected + [match] + + +class BranchPattern(Pattern): + + """Branch/inner node of a pattern tree.""" + + def __init__(self, *children): + self.children = list(children) + + def __repr__(self): + return '%s(%s)' % (self.__class__.__name__, + ', '.join(repr(a) for a in self.children)) + + def flat(self, *types): + if type(self) in types: + return [self] + return sum([child.flat(*types) for child in self.children], []) + + +class Argument(LeafPattern): + + def single_match(self, left): + for n, pattern in enumerate(left): + if type(pattern) is Argument: + return n, Argument(self.name, pattern.value) + return None, None + + @classmethod + def parse(class_, source): + name = re.findall('(<\S*?>)', source)[0] + value = re.findall('\[default: (.*)\]', source, flags=re.I) + return class_(name, value[0] if value else None) + + +class Command(Argument): + + def __init__(self, name, value=False): + self.name, self.value = name, value + + def single_match(self, left): + for n, pattern in enumerate(left): + if type(pattern) is Argument: + if pattern.value == self.name: + return n, Command(self.name, True) + else: + break + return None, None + + +class Option(LeafPattern): + + def __init__(self, short=None, long=None, argcount=0, value=False): + assert argcount in (0, 1) + self.short, self.long, self.argcount = short, long, argcount + self.value = None if value is False and argcount else value + + @classmethod + def parse(class_, option_description): + short, long, argcount, value = None, None, 0, False + options, _, description = option_description.strip().partition(' ') + options = options.replace(',', ' ').replace('=', ' ') + for s in options.split(): + if s.startswith('--'): + long = s + elif s.startswith('-'): + short = s + else: + argcount = 1 + if argcount: + matched = re.findall('\[default: (.*)\]', description, flags=re.I) + value = matched[0] if matched else None + return class_(short, long, argcount, value) + + def single_match(self, left): + for n, pattern in enumerate(left): + if self.name == pattern.name: + return n, pattern + return None, None + + @property + def name(self): + return self.long or self.short + + def __repr__(self): + return 'Option(%r, %r, %r, %r)' % (self.short, self.long, + self.argcount, self.value) + + +class Required(BranchPattern): + + def match(self, left, collected=None): + collected = [] if collected is None else collected + l = left + c = collected + for pattern in self.children: + matched, l, c = pattern.match(l, c) + if not matched: + return False, left, collected + return True, l, c + + +class Optional(BranchPattern): + + def match(self, left, collected=None): + collected = [] if collected is None else collected + for pattern in self.children: + m, left, collected = pattern.match(left, collected) + return True, left, collected + + +class OptionsShortcut(Optional): + + """Marker/placeholder for [options] shortcut.""" + + +class OneOrMore(BranchPattern): + + def match(self, left, collected=None): + assert len(self.children) == 1 + collected = [] if collected is None else collected + l = left + c = collected + l_ = None + matched = True + times = 0 + while matched: + # could it be that something didn't match but changed l or c? + matched, l, c = self.children[0].match(l, c) + times += 1 if matched else 0 + if l_ == l: + break + l_ = l + if times >= 1: + return True, l, c + return False, left, collected + + +class Either(BranchPattern): + + def match(self, left, collected=None): + collected = [] if collected is None else collected + outcomes = [] + for pattern in self.children: + matched, _, _ = outcome = pattern.match(left, collected) + if matched: + outcomes.append(outcome) + if outcomes: + return min(outcomes, key=lambda outcome: len(outcome[1])) + return False, left, collected + + +class Tokens(list): + + def __init__(self, source, error=DocoptExit): + self += source.split() if hasattr(source, 'split') else source + self.error = error + + @staticmethod + def from_pattern(source): + source = re.sub(r'([\[\]\(\)\|]|\.\.\.)', r' \1 ', source) + source = [s for s in re.split('\s+|(\S*<.*?>)', source) if s] + return Tokens(source, error=DocoptLanguageError) + + def move(self): + return self.pop(0) if len(self) else None + + def current(self): + return self[0] if len(self) else None + + +def parse_long(tokens, options): + """long ::= '--' chars [ ( ' ' | '=' ) chars ] ;""" + long, eq, value = tokens.move().partition('=') + assert long.startswith('--') + value = None if eq == value == '' else value + similar = [o for o in options if o.long == long] + if tokens.error is DocoptExit and similar == []: # if no exact match + similar = [o for o in options if o.long and o.long.startswith(long)] + if len(similar) > 1: # might be simply specified ambiguously 2+ times? + raise tokens.error('%s is not a unique prefix: %s?' % + (long, ', '.join(o.long for o in similar))) + elif len(similar) < 1: + argcount = 1 if eq == '=' else 0 + o = Option(None, long, argcount) + options.append(o) + if tokens.error is DocoptExit: + o = Option(None, long, argcount, value if argcount else True) + else: + o = Option(similar[0].short, similar[0].long, + similar[0].argcount, similar[0].value) + if o.argcount == 0: + if value is not None: + raise tokens.error('%s must not have an argument' % o.long) + else: + if value is None: + if tokens.current() in [None, '--']: + raise tokens.error('%s requires argument' % o.long) + value = tokens.move() + if tokens.error is DocoptExit: + o.value = value if value is not None else True + return [o] + + +def parse_shorts(tokens, options): + """shorts ::= '-' ( chars )* [ [ ' ' ] chars ] ;""" + token = tokens.move() + assert token.startswith('-') and not token.startswith('--') + left = token.lstrip('-') + parsed = [] + while left != '': + short, left = '-' + left[0], left[1:] + similar = [o for o in options if o.short == short] + if len(similar) > 1: + raise tokens.error('%s is specified ambiguously %d times' % + (short, len(similar))) + elif len(similar) < 1: + o = Option(short, None, 0) + options.append(o) + if tokens.error is DocoptExit: + o = Option(short, None, 0, True) + else: # why copying is necessary here? + o = Option(short, similar[0].long, + similar[0].argcount, similar[0].value) + value = None + if o.argcount != 0: + if left == '': + if tokens.current() in [None, '--']: + raise tokens.error('%s requires argument' % short) + value = tokens.move() + else: + value = left + left = '' + if tokens.error is DocoptExit: + o.value = value if value is not None else True + parsed.append(o) + return parsed + + +def parse_pattern(source, options): + tokens = Tokens.from_pattern(source) + result = parse_expr(tokens, options) + if tokens.current() is not None: + raise tokens.error('unexpected ending: %r' % ' '.join(tokens)) + return Required(*result) + + +def parse_expr(tokens, options): + """expr ::= seq ( '|' seq )* ;""" + seq = parse_seq(tokens, options) + if tokens.current() != '|': + return seq + result = [Required(*seq)] if len(seq) > 1 else seq + while tokens.current() == '|': + tokens.move() + seq = parse_seq(tokens, options) + result += [Required(*seq)] if len(seq) > 1 else seq + return [Either(*result)] if len(result) > 1 else result + + +def parse_seq(tokens, options): + """seq ::= ( atom [ '...' ] )* ;""" + result = [] + while tokens.current() not in [None, ']', ')', '|']: + atom = parse_atom(tokens, options) + if tokens.current() == '...': + atom = [OneOrMore(*atom)] + tokens.move() + result += atom + return result + + +def parse_atom(tokens, options): + """atom ::= '(' expr ')' | '[' expr ']' | 'options' + | long | shorts | argument | command ; + """ + token = tokens.current() + result = [] + if token in '([': + tokens.move() + matching, pattern = {'(': [')', Required], '[': [']', Optional]}[token] + result = pattern(*parse_expr(tokens, options)) + if tokens.move() != matching: + raise tokens.error("unmatched '%s'" % token) + return [result] + elif token == 'options': + tokens.move() + return [OptionsShortcut()] + elif token.startswith('--') and token != '--': + return parse_long(tokens, options) + elif token.startswith('-') and token not in ('-', '--'): + return parse_shorts(tokens, options) + elif token.startswith('<') and token.endswith('>') or token.isupper(): + return [Argument(tokens.move())] + else: + return [Command(tokens.move())] + + +def parse_argv(tokens, options, options_first=False): + """Parse command-line argument vector. + + If options_first: + argv ::= [ long | shorts ]* [ argument ]* [ '--' [ argument ]* ] ; + else: + argv ::= [ long | shorts | argument ]* [ '--' [ argument ]* ] ; + + """ + parsed = [] + while tokens.current() is not None: + if tokens.current() == '--': + return parsed + [Argument(None, v) for v in tokens] + elif tokens.current().startswith('--'): + parsed += parse_long(tokens, options) + elif tokens.current().startswith('-') and tokens.current() != '-': + parsed += parse_shorts(tokens, options) + elif options_first: + return parsed + [Argument(None, v) for v in tokens] + else: + parsed.append(Argument(None, tokens.move())) + return parsed + + +def parse_defaults(doc): + defaults = [] + for s in parse_section('options:', doc): + # FIXME corner case "bla: options: --foo" + _, _, s = s.partition(':') # get rid of "options:" + split = re.split('\n[ \t]*(-\S+?)', '\n' + s)[1:] + split = [s1 + s2 for s1, s2 in zip(split[::2], split[1::2])] + options = [Option.parse(s) for s in split if s.startswith('-')] + defaults += options + return defaults + + +def parse_section(name, source): + pattern = re.compile('^([^\n]*' + name + '[^\n]*\n?(?:[ \t].*?(?:\n|$))*)', + re.IGNORECASE | re.MULTILINE) + return [s.strip() for s in pattern.findall(source)] + + +def formal_usage(section): + _, _, section = section.partition(':') # drop "usage:" + pu = section.split() + return '( ' + ' '.join(') | (' if s == pu[0] else s for s in pu[1:]) + ' )' + + +def extras(help, version, options, doc): + if help and any((o.name in ('-h', '--help')) and o.value for o in options): + print(doc.strip("\n")) + sys.exit() + if version and any(o.name == '--version' and o.value for o in options): + print(version) + sys.exit() + + +class Dict(dict): + def __repr__(self): + return '{%s}' % ',\n '.join('%r: %r' % i for i in sorted(self.items())) + + +def docopt(doc, argv=None, help=True, version=None, options_first=False): + """Parse `argv` based on command-line interface described in `doc`. + + `docopt` creates your command-line interface based on its + description that you pass as `doc`. Such description can contain + --options, <positional-argument>, commands, which could be + [optional], (required), (mutually | exclusive) or repeated... + + Parameters + ---------- + doc : str + Description of your command-line interface. + argv : list of str, optional + Argument vector to be parsed. sys.argv[1:] is used if not + provided. + help : bool (default: True) + Set to False to disable automatic help on -h or --help + options. + version : any object + If passed, the object will be printed if --version is in + `argv`. + options_first : bool (default: False) + Set to True to require options precede positional arguments, + i.e. to forbid options and positional arguments intermix. + + Returns + ------- + args : dict + A dictionary, where keys are names of command-line elements + such as e.g. "--verbose" and "<path>", and values are the + parsed values of those elements. + + Example + ------- + >>> from docopt import docopt + >>> doc = ''' + ... Usage: + ... my_program tcp <host> <port> [--timeout=<seconds>] + ... my_program serial <port> [--baud=<n>] [--timeout=<seconds>] + ... my_program (-h | --help | --version) + ... + ... Options: + ... -h, --help Show this screen and exit. + ... --baud=<n> Baudrate [default: 9600] + ... ''' + >>> argv = ['tcp', '127.0.0.1', '80', '--timeout', '30'] + >>> docopt(doc, argv) + {'--baud': '9600', + '--help': False, + '--timeout': '30', + '--version': False, + '<host>': '127.0.0.1', + '<port>': '80', + 'serial': False, + 'tcp': True} + + See also + -------- + * For video introduction see http://docopt.org + * Full documentation is available in README.rst as well as online + at https://github.com/docopt/docopt#readme + + """ + argv = sys.argv[1:] if argv is None else argv + + usage_sections = parse_section('usage:', doc) + if len(usage_sections) == 0: + raise DocoptLanguageError('"usage:" (case-insensitive) not found.') + if len(usage_sections) > 1: + raise DocoptLanguageError('More than one "usage:" (case-insensitive).') + DocoptExit.usage = usage_sections[0] + + options = parse_defaults(doc) + pattern = parse_pattern(formal_usage(DocoptExit.usage), options) + # [default] syntax for argument is disabled + #for a in pattern.flat(Argument): + # same_name = [d for d in arguments if d.name == a.name] + # if same_name: + # a.value = same_name[0].value + argv = parse_argv(Tokens(argv), list(options), options_first) + pattern_options = set(pattern.flat(Option)) + for options_shortcut in pattern.flat(OptionsShortcut): + doc_options = parse_defaults(doc) + options_shortcut.children = list(set(doc_options) - pattern_options) + #if any_options: + # options_shortcut.children += [Option(o.short, o.long, o.argcount) + # for o in argv if type(o) is Option] + extras(help, version, argv, doc) + matched, left, collected = pattern.fix().match(argv) + if matched and left == []: # better error message if left? + return Dict((a.name, a.value) for a in (pattern.flat() + collected)) + raise DocoptExit() diff --git a/Tools/ART/python/docopt_dispatch.py b/Tools/ART/python/docopt_dispatch.py new file mode 100644 index 0000000000000000000000000000000000000000..966386db60f5e4f0ce2cd7591a98100999e1cfdf --- /dev/null +++ b/Tools/ART/python/docopt_dispatch.py @@ -0,0 +1,53 @@ +"""Dispatch from command-line arguments to functions.""" +import re +from collections import OrderedDict + + +__all__ = ('dispatch', 'DispatchError') +__author__ = 'Vladimir Keleshev <vladimir@keleshev.com>' +__version__ = '0.0.2' +__license__ = 'LICENSE-MIT' +__keywords__ = 'docopt dispatch function adapter kwargs' +__url__ = 'https://github.com/halst/docopt-dispatch' + + +class DispatchError(Exception): + """TBD.""" + + pass + + +class Dispatch(object): + + def __init__(self): + self._functions = OrderedDict() + + def on(self, *patterns): + def decorator(function): + self._functions[patterns] = function + return function + return decorator + + def __call__(self, *args, **kwargs): + from docopt import docopt + arguments = docopt(*args, **kwargs) + for patterns, function in self._functions.items(): + if all(arguments[pattern] for pattern in patterns): + function(**self._kwargify(arguments)) + return + raise DispatchError('None of dispatch conditions %s is triggered' + % self._formated_patterns) + + @property + def _formated_patterns(self): + return ', '.join(' '.join(pattern) + for pattern in self._functions.keys()) + + @staticmethod + def _kwargify(arguments): + def kwargify(string): + return re.sub('\W', '_', string).strip('_') + return dict((kwargify(key), value) for key, value in arguments.items()) + + +dispatch = Dispatch() diff --git a/Tools/ART/python/parallelScheduler.py b/Tools/ART/python/parallelScheduler.py new file mode 100644 index 0000000000000000000000000000000000000000..5471652b80e1fa1984f474cab5c5556d66a53fb6 --- /dev/null +++ b/Tools/ART/python/parallelScheduler.py @@ -0,0 +1,215 @@ +''' +Created on 16/05/2012 + + * Repository : https://github.com/victor-gil-sepulveda/pyScheduler + * Licensed under the MIT license (see LICENSE-MIT) + * Copyright (C) 2013 VÃctor Alejandro Gil Sepúlveda + +@author: victor +''' +import multiprocessing +from serialScheduler import SerialScheduler +import sys + +def printnflush(*args): + """ + Prints and flushes the things passes as arguments. + @param args: The data we want to print. + """ + if False: + print args + sys.stdout.flush() + +def run_task(process_name, tasks, pipe_end): + """ + Helper function to run tasks inside a process. It implements an infinite loop controlled by the messages + received from 'pipe_end'. + Messages from the pipe are (message_type, value) tuples. Thsi is the currently implemented protocol: + - "EXECUTE": Runs the task with id == value. + -> Sends a "TASK FINISHED" message with value = (task_id, task_result) + - "FINISH": Ends the loop so that process can end and free its resources. + @param process_name: Unique id of the process executing this function. + @param tasks: The dictionary of all tasks (we want to execute in this scheduling) indexed by their id . + @param pipe_end: A process pipe used to send/receive messages from/to the master. + """ + task_ended = False + try: + while not task_ended: + # Blocks until it receives a message + message_type, value = pipe_end.recv() + + if message_type == "EXECUTE": + result = tasks[value].run() + pipe_end.send(("TASK FINISHED", (value, result))) + + elif message_type == "FINISH": + printnflush( "Communication successfully closed for",process_name) + task_ended = True + else: + printnflush("Unexpected message: %s"%message_type) + task_ended = True + + except EOFError: + printnflush("Communication closed due to remote closing of the pipe in process %s"%process_name) + + except Exception, msg: + printnflush("Communication closed due to unexpected exception: %s"%msg) + + pipe_end.close() + printnflush( "Task reached end") + +class TaskRunner(object): + """ + Helper class that encapsulates a process used to execute a subset of the tasks list. + """ + def __init__(self, process_name, target_function, tasks): + """ + Creates the process that will be in charge of executing the tasks and a pipe to communicate + with the main process. + @param process_name: Unique id for this task executor. + @param target_function: Is the function the process will execute. In the case of ProcessParallelScheduler + the function used is 'run_task', however it can use any function that receives the same parameters that + 'run_task' needs. + @param tasks: The dictionary of all tasks. + """ + self.pipe_start, self.pipe_end = multiprocessing.Pipe() + printnflush ("Process started: %s"%process_name) + self.process = multiprocessing.Process(group=None, + target=target_function, + name=process_name, + args = (process_name, tasks, self.pipe_end)) + self.busy = False + + def run(self): + """ + Starts the inner process (and therefore the defined function that is going to be used to control the + messages). + """ + self.process.start() + + def execute_task(self, task_name): + """ + Sends the process an "EXECUTE" task message to run the task named 'task_name'. + @param task_name: Name of the task to be executed. + """ + self.busy = True + self.pipe_start.send(("EXECUTE",task_name)) + + def set_task_finished(self): + """ + Sets the 'busy' flag in order to mark this task executor as busy (its associated process is + performing a task) + """ + self.busy = False + + def finalize(self): + """ + Sends a finalization message (forces the associated process to break the loop and end)- + """ + self.busy = False + self.pipe_start.send(("FINISH",None)) + self.process.join() + if self.process.is_alive(): + self.process.terminate() + + def has_an_incomming_message(self): + """ + True if this task runner has received a message from its associated process. + """ + return self.pipe_start.poll(1) + + def get_message(self): + """ + Returns the message the associated process sent (using the 'run_task' function it can only be a + "TASK FINISHED" message) + """ + return self.pipe_start.recv() + +class ParallelScheduler(SerialScheduler): + """ + Scheduler type that works by creating a limited number of processes and distributing the tasks between them. + """ + + def __init__(self, max_processes, functions = {}): + """ + Creates the scheduler. + @param max_processes: Indeed is the total number of processes that will be used for the scheduling parallelization + plus one (which is representing the current process). + @param functions: @see SerialScheduler + """ + SerialScheduler.__init__(self,functions) + self.number_of_processes = max_processes - 1 + self.running = [] + + def run(self): + """ + Like in the SerialScheduler, this function tries to run all the tasks, checking their dependencies. In this case + some processes will be spawned so that they can share the work of executing the tasks. + This run function acts as the real scheduler, telling the 'task executor' objects which task to run. This kind + of dynamic scheduling fosters an efficient use of the resources (every time a 'task executor' ends a task, it is + told to run another one, so that load is balanced). + This is a simple implementation of a master-slave pattern (where slaves are the task runners). + """ + self.function_exec('scheduling_started', {"number_of_tasks":len(self.not_completed)}) + + # Create processes + available_workers = self.number_of_processes + task_runners = [] + for i in range(available_workers): + process_name = "TaskExecutor"+str(i) + runner = TaskRunner(process_name, run_task, self.tasks) + runner.run() + task_runners.append(runner) + + # Execute all tasks + while not len(self.finished) == len(self.tasks): + cannot_choose_a_task = False + + # Choose an available process + task_name = self.choose_runnable_task() + + # Try to execute it + if task_name is not None: + # If we can still execute a task we find a free task runner to do it + for task_runner in task_runners: + if not task_runner.busy: + self.function_exec('task_started', {"task_name":task_name}) + task_runner.execute_task(task_name) + self.lock_task(task_name) # Ensure that it can't be selected again until task is finished + self.running.append(task_name) + break + else: + cannot_choose_a_task = True + + if cannot_choose_a_task or len(self.running) == available_workers: + # If there is not an available task (so all remaining tasks have dependencies) or + # we do not have any available worker, it's time to block until we receive results. + + # We start polling busy runners pipes to wait for a result and add this result to the + # results list + task_finished = False + while not task_finished: + for task_runner in task_runners: + if task_runner.busy and task_runner.has_an_incomming_message(): + message, value = task_runner.get_message() + if message == "TASK FINISHED": + task_name, result = value + self.function_exec('task_ended', {"task_name":task_name, "finished":len(self.finished)}) + self.running.remove(task_name) + self.complete_task(task_name) + self.remove_from_dependencies(task_name) + task_runner.set_task_finished() + self.results.append(result) + else: + printnflush ( "Unexpected message: %s"%message) + exit() + task_finished = True + + printnflush ("Sending processes termination message.") + + for task_runner in task_runners: + task_runner.finalize() + + self.function_exec('scheduling_ended') + + return self.results diff --git a/Tools/ART/python/serialScheduler.py b/Tools/ART/python/serialScheduler.py new file mode 100644 index 0000000000000000000000000000000000000000..e11248fb6b83fc856bc132debf1bcc75dc55e6a4 --- /dev/null +++ b/Tools/ART/python/serialScheduler.py @@ -0,0 +1,177 @@ +''' +Created on 16/08/2012 + + * Repository : https://github.com/victor-gil-sepulveda/pyScheduler + * Licensed under the MIT license (see LICENSE-MIT) + * Copyright (C) 2013 VÃctor Alejandro Gil Sepúlveda + +@author: victor +''' +class Task(object): + """ + Representation of a task. + """ + def __init__(self, function, name, kwargs, description = ""): + """ + Creates a Task object. + @param function: A callable object that will perform the real work of the task. + @param name: The name of the task (an identifier). + @param kkwargs: Parameters for the callable. + @param description: A short description of what the task does. Can be empty. + """ + self.function = function + self.name = name + self.kwargs = kwargs + self.result = None + + def run(self): + """ + Runs the task's associated callable and returns its result. + @return: The result of the callable execution. + """ + self.result = self.function(**(self.kwargs)) + return self.result + +class SerialScheduler(object): + """ + Base scheduling class. It ensures that no task is executed before its dependencies (without building a + dependency tree). + It allows to define some functions that will be executed when the scheduler reaches some strategic points. + TODO: In all scheduler types a dependencies must be checked to avoid cycles for instance. + """ + + def __init__(self, functions = {}): + """ + Constructor. Initializes needed variables. + + @param fucntions: A dictionary containing 3 possible keys. Each key defines another dictionary of two + entries ('function' and 'kwargs') with a callable and its arguments. The possible keys are: + 'task_started' -> Were an action performed after each task is called is defined. + 'task_ended' -> Defines the action performed when a task is finished. + 'scheduling_started' -> Defines the action performed when the scheduler starts to run tasks. + 'scheduling_ended' -> Defines the action performed when the scheduler has finished to run all tasks. + """ + self.functions = functions + self.tasks = {} + self.dependencies = {} + self.not_completed = [] + self.finished = [] + self.results = [] + + def function_exec(self, function_type, info = None): + """ + Execute one of the predefined functions if defined. + + @param function_type: Type of the function to check and run (proper types should be 'task_start','task_end' + and 'scheduling_end', each defining 'function' and 'kwargs' entries. + + """ + if function_type in self.functions: + self.functions[function_type]['kwargs']['info'] = info + self.functions[function_type]['function'](**(self.functions[function_type]['kwargs'])) + + def run(self): + """ + Runs all the tasks in a way that tasks are not executed before their dependencies are + cleared. + + @return: An array with the results of task calculations. + """ + self.function_exec('scheduling_started', {"number_of_tasks":len(self.not_completed)}) + + ordered_tasks = self.get_ordered_tasks() + + for task in ordered_tasks: + self.function_exec('task_started', {"task_name":task.name}) + self.results.append(task.run()) + self.function_exec('task_ended', {"task_name":task.name, "finished":len(self.finished)}) + + self.function_exec('scheduling_ended') + + return self.results + + def get_ordered_tasks(self): + """ + Returns a list of task names so that any task name will have an index bigger than the tasks it depends on. + + @return: A list of task names. + """ + ordered_tasks = [] + while len( self.not_completed) > 0: + #Choose an available process + task_name = self.choose_runnable_task() + + if task_name is None: + print "It was impossible to pick a suitable task for running. Check dependencies." + return [] + else: + # Run a process + ordered_tasks.append(self.tasks[task_name]) + self.lock_task(task_name) + self.complete_task(task_name) + self.remove_from_dependencies(task_name) + return ordered_tasks + + def choose_runnable_task(self): + """ + Returns a task name which dependencies have already been fulfilled. + + @return: The task name. + """ + for task_name in self.not_completed: + if len(self.dependencies[task_name]) == 0: # This process has no dependencies + return task_name; + return None # All task have dependencies (circular dependencies for instance) + + + def lock_task(self, task_name): + """ + Removes a task from the 'not complete list' making it unavailable for further selections. + + @param task_name: The name of the task to lock. + """ + # Remove it from the not_completed list + self.not_completed.remove(task_name) + + def complete_task(self, task_name): + """ + Adds a task to the list of completed tasks. + + @param task_name: The name of the task to complete. + """ + self.finished.append(task_name) + + def remove_from_dependencies(self, task_name): + """ + Removes a task from the dependencies of all other uncomplete tasks. At the end of execution, all dependency + lists must be empty. + + @param task_name: The name of the task to remove from dependencies. + """ + for tn in self.dependencies: + if task_name in self.dependencies[tn]: + self.dependencies[tn].remove(task_name) + + def add_task(self, task_name, dependencies, target_function, function_kwargs, description): + """ + Adds a task to the scheduler. The task will be executed along with the other tasks when the 'run' function is called. + + @param task_name: + @param dependencies: A list with the task_names of the tasks that must be fulfilled before executing this other task. + Example of dependencies dictionary: + {"task_C":["dep_task_A", "dep_task_B"]} + This dependencies dict. means that task C cannot be run until task B and A are cleared. + @param target_function: The function executed by this task. + @param function_kwargs: Its arguments. + @param description: A brief description of the task. + """ + + if not task_name in self.tasks: + task = Task( name = task_name, description = description, function = target_function, kwargs=function_kwargs) + task.description = description + self.tasks[task_name] = task + self.not_completed.append(task_name) + self.dependencies[task_name] = dependencies + else: + print "[Error SerialScheduler::add_task] Task %s already exists. Task name must be unique."%task_name + exit() diff --git a/Tools/ART/scripts/art-internal.py b/Tools/ART/scripts/art-internal.py new file mode 100755 index 0000000000000000000000000000000000000000..4100948da0ddb67351879aa33ca7c3a0fc1aa129 --- /dev/null +++ b/Tools/ART/scripts/art-internal.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python +# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +""" +ART-internal - ATLAS Release Tester (internal command). + +Usage: + art.py included [-v] <nightly_release> <project> <platform> + art.py job local [-v] <script_directory> <package> <job_type> <sequence_tag> <index> <out> + art.py job (grid|batch) [-v] <package> <job_type> <sequence_tag> <index> <out> <nightly_release> <project> <platform> <nightly_tag> + art.py task local [-v] <script_directory> <package> <job_type> <sequence_tag> + art.py task (grid|batch) [-v] <package> <job_type> <sequence_tag> <nightly_release> <project> <platform> <nightly_tag> + art.py wait_for [-v] <nightly_release> <project> <platform> <nightly_tag> + +Options: + -h --help Show this screen. + --version Show version. + -v, --verbose Show details. + +Sub-commands: + included Check if a release and platform is included + job Runs a single job, given a particular index + task Runs a single task, consisting of given number of jobs + wait_for Wait for the release to be available + +Arguments: + index Index of the test inside the package + nightly_release Name of the nightly release (e.g. 21.0) + nightly_tag Nightly tag (e.g. 2017-02-26T2119) + out Tar filename used for the output of the job + package Package of the test (e.g. Tier0ChainTests) + platform Platform (e.g. x86_64-slc6-gcc62-opt) + project Name of the project (e.g. Athena) + script_directory Directory containing the packages with tests + sequence_tag Sequence tag (e.g. 0 or PIPELINE_ID) + job_type Type of job (e.g. grid, ci, build) +""" + +__author__ = "Tulay Cuhadar Donszelmann <tcuhadar@cern.ch>" + +import os +import sys +from docopt_dispatch import dispatch + +from art_local import ArtLocal +from art_grid import ArtGrid +from art_batch import ArtBatch + + +@dispatch.on('included') +def included(nightly_release, project, platform, **kwargs): + """TBD.""" + sys.exit(ArtGrid(nightly_release, project, platform, None).included()) + + +@dispatch.on('job', 'local') +def job_local(script_directory, package, job_type, sequence_tag, index, out, **kwargs): + """TBD. + + Tests are called with the following parameters: + SCRIPT_DIRECTORY, PACKAGE, TYPE, TEST_NAME + """ + print "job_local", script_directory, package, job_type, sequence_tag, index, out, kwargs + ArtLocal(script_directory).job(package, job_type, sequence_tag, index, out) + + +@dispatch.on('job', 'grid') +def job_grid(package, job_type, sequence_tag, index, out, nightly_release, project, platform, nightly_tag, **kwargs): + """TBD. + + Tests are called with the following parameters: + SCRIPT_DIRECTORY, PACKAGE, TYPE, TEST_NAME, NIGHTLY_RELEASE, PROJECT, PLATFORM, NIGHTLY_TAG + """ + ArtGrid(nightly_release, project, platform, nightly_tag).job(package, job_type, sequence_tag, index, out) + + +@dispatch.on('job', 'batch') +def job_batch(package, job_type, sequence_tag, index, out, nightly_release, project, platform, nightly_tag, **kwargs): + """TBD. + + Tests are called with the following parameters: + SCRIPT_DIRECTORY, PACKAGE, TYPE, TEST_NAME, NIGHTLY_RELEASE, PROJECT, PLATFORM, NIGHTLY_TAG + """ + ArtBatch(nightly_release, project, platform, nightly_tag).job(package, job_type, sequence_tag, index, out) + + +@dispatch.on('task', 'local') +def task_local(script_directory, job_type, sequence_tag, **kwargs): + """TBD.""" + ArtLocal(script_directory).task(job_type, sequence_tag) + + +@dispatch.on('task', 'grid') +def task_grid(package, job_type, sequence_tag, nightly_release, project, platform, nightly_tag, **kwargs): + """TBD.""" + ArtGrid(nightly_release, project, platform, nightly_tag).task(package, job_type, sequence_tag) + + +@dispatch.on('task', 'batch') +def task_batch(package, job_type, sequence_tag, nightly_release, project, platform, nightly_tag, **kwargs): + """TBD.""" + ArtBatch(nightly_release, project, platform, nightly_tag).task(package, sequence_tag) + + +@dispatch.on('wait_for') +def wait_for(nightly_release, project, platform, nightly_tag, **kwargs): + """TBD.""" + sys.exit(ArtGrid(nightly_release, project, platform, nightly_tag).wait_for()) + + +if __name__ == '__main__': + # NOTE: import should be here, to keep the order of the decorators (module first, art last and unused) + from art import __version__ + dispatch(__doc__, version=os.path.splitext(os.path.basename(__file__))[0] + ' ' + __version__) diff --git a/Tools/ART/scripts/art-task-grid.sh b/Tools/ART/scripts/art-task-grid.sh index 0a6581184382cd3105ee4a2df350b5a55d69a8af..8fe0d302c2c0bd093134dfe65be330da96ed8e20 100755 --- a/Tools/ART/scripts/art-task-grid.sh +++ b/Tools/ART/scripts/art-task-grid.sh @@ -1,20 +1,24 @@ #!/bin/bash +# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration # NOTE do NOT run with /bin/bash -x as the output is too big for gitlab-ci -# arguments: NIGHTLY_RELEASE, PROJECT, PLATFORM, NIGHTLY_TAG, SEQUENCE_TAG, PACKAGE, NUMBER_OF_TESTS +# arguments: PACKAGE, SEQUENCE_TAG, NUMBER_OF_TESTS, NIGHTLY_RELEASE, PROJECT, PLATFORM, NIGHTLY_TAG # -# example: 21.0 Athena x86_64-slc6-gcc62-opt 2017-02-26T2119 316236 Tier0ChainTests 32 +# example: Tier0ChainTests grid 316236 32 21.0 Athena x86_64-slc6-gcc62-opt 2017-02-26T2119 #set -e +USER=artprod + whoami date -NIGHTLY_RELEASE=$1 -PROJECT=$2 -PLATFORM=$3 -NIGHTLY_TAG=$4 -SEQUENCE_TAG=$5 -PACKAGE=$6 -NUMBER_OF_TESTS=$7 +PACKAGE=$1 +TYPE=$2 +SEQUENCE_TAG=$3 +NUMBER_OF_TESTS=$4 +NIGHTLY_RELEASE=$5 +PROJECT=$6 +PLATFORM=$7 +NIGHTLY_TAG=$8 export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase source $ATLAS_LOCAL_ROOT_BASE/user/atlasLocalSetup.sh @@ -23,14 +27,14 @@ lsetup panda voms-proxy-init --rfc -noregen -cert ./grid.proxy -voms atlas -# change -VAL-Prod and others into -VAL -NIGHTLY_RELEASE_SHORT=${NIGHTLY_RELEASE/-VAL-*/-VAL} +# change -VAL-Prod and others into -VAL +NIGHTLY_RELEASE_SHORT=${NIGHTLY_RELEASE/-VAL-*/-VAL} asetup --platform=${PLATFORM} ${NIGHTLY_RELEASE_SHORT},${NIGHTLY_TAG},${PROJECT} cd ./tmp/${PACKAGE}/run -OUTFILE="user.tcuhadar.atlas.art.${NIGHTLY_RELEASE_SHORT}.${PROJECT}.${PLATFORM}.${NIGHTLY_TAG}.${SEQUENCE_TAG}.${PACKAGE}" -CMD="pathena --noBuild --skipScout --trf \"./art.py job ${NIGHTLY_RELEASE_SHORT} ${PROJECT} ${PLATFORM} ${NIGHTLY_TAG} ${PACKAGE} %RNDM:0 %OUT.tar\" --split ${NUMBER_OF_TESTS} --outDS ${OUTFILE}" +OUTFILE="user.${USER}.atlas.${NIGHTLY_RELEASE_SHORT}.${PROJECT}.${PLATFORM}.${NIGHTLY_TAG}.${SEQUENCE_TAG}.${PACKAGE}" +CMD="pathena --noBuild --skipScout --trf \"./art-internal.py job grid ${PACKAGE} ${TYPE} ${SEQUENCE_TAG} %RNDM:0 %OUT.tar ${NIGHTLY_RELEASE_SHORT} ${PROJECT} ${PLATFORM} ${NIGHTLY_TAG}\" --split ${NUMBER_OF_TESTS} --outDS ${OUTFILE}" #--site=ANALY_NIKHEF-ELPROD_SHORT,ANALY_NIKHEF-ELPROD" #--site=ANALY_FZK,ANALY_BNL,ANALY_RAL" echo ${CMD} diff --git a/Tools/ART/scripts/art.py b/Tools/ART/scripts/art.py index c1a928fde02a7b9b50ca9506d803af0970d5e4ea..dd6de771080d805fa298477dadda7d67527e9d2a 100755 --- a/Tools/ART/scripts/art.py +++ b/Tools/ART/scripts/art.py @@ -1,628 +1,161 @@ #!/usr/bin/env python -"""TBD.""" +# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +""" +ART - ATLAS Release Tester. + +Usage: + art.py run [-v --type=<T> --max-jobs=<N>] <script_directory> <sequence_tag> + art.py submit [-v --type=<T>] <sequence_tag> <nightly_release> <project> <platform> <nightly_tag> + art.py compare grid [-v --days=<D>] <nightly_release> <project> <platform> <nightly_tag> <package> <test_name> <file_name>... + art.py compare ref [-v] <file_name> <ref_file> + art.py list grid [-v] <package> <job_type> <nightly_release> <project> <platform> <nightly_tag> + art.py log grid [-v] <package> <test_name> <nightly_release> <project> <platform> <nightly_tag> + art.py output grid [-v] <package> <test_name> <file_name> <nightly_release> <project> <platform> <nightly_tag> + +Options: + -h --help Show this screen. + --version Show version. + -v, --verbose Show details. + --type=<T> Type of job (e.g. grid, ci, build) + --days=<D> Number of days ago to pick up reference for compare [default: 1] + --max-jobs=<N> Maximum number of concurrent jobs to run + +Sub-commands: + run Run tests from a package locally + submit Submit tests to the grid + compare Compare the output of a job + list Lists the jobs of a package + log Show the log of a job + output Get the output of a job + +Arguments: + file_name Filename to save the output to + index Index of the test inside the package + nightly_release Name of the nightly release (e.g. 21.0) + nightly_tag Nightly tag (e.g. 2017-02-26T2119) + out Tar filename used for the output of the job + package Package of the test (e.g. Tier0ChainTests) + platform Platform (e.g. x86_64-slc6-gcc62-opt) + project Name of the project (e.g. Athena) + script_directory Directory containing the packages with tests + sequence_tag Sequence tag (e.g. 0 or PIPELINE_ID) + test_name Name of the test inside the package (e.g. test_q322.sh) + job_type Type of job (e.g. grid, ci, build) +""" __author__ = "Tulay Cuhadar Donszelmann <tcuhadar@cern.ch>" -__doc__ = """ART - ATLAS Nightly Release Tester script""" +__version__ = '0.0.3' -import argparse -import datetime -import errno -import fnmatch -import glob -import os import re import requests -# import scandir -import shlex -import shutil -import subprocess +import os import sys -import tarfile -import time -import yaml - - -def run_command(cmd, dir=None, shell=False): - """Run the given command locally and returns the output, err and exit_code.""" - print "Execute: " + cmd - if "|" in cmd: - cmd_parts = cmd.split('|') - else: - cmd_parts = [] - cmd_parts.append(cmd) - i = 0 - p = {} - for cmd_part in cmd_parts: - cmd_part = cmd_part.strip() - if i == 0: - p[i] = subprocess.Popen(shlex.split(cmd_part), stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=dir, shell=shell) - else: - p[i] = subprocess.Popen(shlex.split(cmd_part), stdin=p[i - 1].stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=dir, shell=shell) - i = i + 1 - (output, err) = p[i - 1].communicate() - exit_code = p[0].wait() - - return exit_code, str(output), str(err) - - -def check((exitcode, out, err)): - """Check exitcode and print statement and exit if needed.""" - if exitcode == 0: - print err - return out - - print "Error:", exitcode - print "StdOut:", out - print "StdErr:", err - - print 'art-status: error' - - exit(exitcode) - - -def make_executable(path): - """TBD.""" - mode = os.stat(path).st_mode - mode |= (mode & 0o444) >> 2 # copy R bits to X - os.chmod(path, mode) -def mkdir_p(path): - """TBD.""" - try: - os.makedirs(path) - except OSError as exc: # Python >2.5 - if exc.errno == errno.EEXIST and os.path.isdir(path): - pass - else: - raise +from docopt_dispatch import dispatch +from art_base import ArtBase +from art_local import ArtLocal +from art_grid import ArtGrid -def count_string_occurrence(path, string): +@dispatch.on('submit') +def submit_grid(sequence_tag, nightly_release, project, platform, nightly_tag, **kwargs): """TBD.""" - for file in glob.iglob(path): - print file - sys.stdout.flush() - f = open(file) - contents = f.read() - f.close() - return contents.count(string) - return 0 + type = 'grid' if kwargs['type'] is None else kwargs['type'] + ArtGrid(nightly_release, project, platform, nightly_tag).task_list(type, sequence_tag) -class Art(object): +@dispatch.on('run') +def run(script_directory, sequence_tag, **kwargs): """TBD.""" + type = 'build' if kwargs['type'] is None else kwargs['type'] + ArtLocal(script_directory, max_jobs=kwargs['max_jobs']).task_list(type, sequence_tag) - SCRIPT_DIRECTORY = '/cvmfs/atlas-nightlies.cern.ch/repo/sw' - NIGHTLY_RELEASE_HELP = "name of the nightly tag (e.g. 21.0)" - PROJECT_HELP = "name of the project (e.g. Athena)" - PLATFORM_HELP = "platform (e.g. x86_64-slc6-gcc62-opt)" - NIGHTLY_TAG_HELP = "nightly tag (e.g. 2017-02-26T2119)" - SEQUENCE_TAG_HELP = "sequence tag (e.g. 0 or JOB_ID)" - PACKAGE_HELP = "package of the test" - - # use epilog - def __init__(self): - """TBD.""" - parser = argparse.ArgumentParser( - description='ART - ATLAS Release Tester', - usage='''art <command> [<args>] - -Possible art commands are: - submit Submit tests to the grid - retrieve Wait for tests to finish and retrieve the results - included Check if a release and platform is included - wait_for Wait for the release to be available - task Runs a single task, consisting of given number of jobs - job Runs a single job, given a particular index - list Lists the jobs of a package - log Show the log of a job - output Get the output of a job - compare Compare the output of a job -''') - parser.add_argument("-v", "--verbose", help="increase output verbosity", action="store_true") - parser.add_argument('command', help='Subcommand to run') - # parse_args defaults to [1:] for args, but you need to - # exclude the rest of the args too, or validation will fail - args = parser.parse_args(sys.argv[1:2]) - if not hasattr(self, args.command): - print 'ERROR: Unrecognized command \'' + args.command + '\'' - parser.print_help() - sys.stdout.flush() - exit(1) - # use dispatch pattern to invoke method with same name - getattr(self, args.command)() - - def get_config(self): - """TBD.""" - config_file = open("art-configuration.yml", "r") - config = yaml.load(config_file) - config_file.close() - return config - - def excluded(self, config, nightly_release, project, platform, package): - """TBD.""" - if nightly_release not in config.keys(): - return True - - if project not in config[nightly_release].keys(): - return True - - if platform not in config[nightly_release][project].keys(): - return True - - excludes = config[nightly_release][project][platform] - if excludes is not None and package in excludes: - return True - - return False - - def included(self): - """TBD.""" - parser = argparse.ArgumentParser( - description='Check if a release and platform is included') - parser.add_argument('nightly_release', help=Art.NIGHTLY_RELEASE_HELP) - parser.add_argument('project', help=Art.PROJECT_HELP) - parser.add_argument('platform', help=Art.PLATFORM_HELP) - parser.add_argument('test_name', nargs='?', help="name of the test", default="__name_never_used__") - # now that we're inside a subcommand, ignore the first two args - args = parser.parse_args(sys.argv[2:]) - print 'Running art included' - sys.stdout.flush() - - if self.excluded(self.get_config(), args.nightly_release, args.project, args.platform, args.test_name): - print 'Excluding ' + 'all' + ' for ' + args.nightly_release + ' project ' + args.project + ' on ' + args.platform - print 'art-status: excluded' - sys.stdout.flush() - exit(1) - else: - print 'art-status: included' - exit(0) - - # @profile - def get_test_directories(self, nightly_release, project, platform, nightly_tag): - """ - Search from 'SCRIPT_DIRECTORY/<nightly_release>/<nightly_tag>/<project>' for '<package>/test' directories. - - The directories need to have the <platform> in their name. - A dictionary key=<package>, value=<directory> is returned - """ - result = {} - search = os.path.join(Art.SCRIPT_DIRECTORY, nightly_release, nightly_tag, project) - search = os.path.join(search, os.listdir(search)[0]) # e.g. 21.0.3 - search = os.path.join(search, os.listdir(search)[0], platform) # InstallArea/x86_64-slc6-gcc62-opt - for root, dirs, files in os.walk(search): - if root.endswith('/test'): - package = os.path.basename(os.path.dirname(root)) - result[package] = root - return result - - def get_files(self, directory, queue): - """Return a list of all test files matching 'test_*.sh' of given 'queue'.""" - result = list() - if directory is not None: - files = os.listdir(directory) - files.sort() - for fname in files: - if fnmatch.fnmatch(fname, 'test_*.sh'): - full_name = os.path.join(directory, fname) - for line in open(full_name, "r"): - line_match = re.match(r'#art-(\w+):\s+(.+)$', line) - if line_match and line_match.group(1) == 'queue': - # accept tests with long queue - if line_match.group(2) == queue: - result.append(fname) - return result - - def wait_for(self): - """TBD.""" - parser = argparse.ArgumentParser( - description='Wait for the release to be available') - parser.add_argument('nightly_release', help=Art.NIGHTLY_RELEASE_HELP) - parser.add_argument('project', help=Art.PROJECT_HELP) - parser.add_argument('platform', help=Art.PLATFORM_HELP) - parser.add_argument('nightly_tag', help=Art.NIGHTLY_TAG_HELP) - # now that we're inside a subcommand, ignore the first two args - args = parser.parse_args(sys.argv[2:]) - print 'Running art wait_for' - sys.stdout.flush() - directory = os.path.join(Art.SCRIPT_DIRECTORY, args.nightly_release, args.nightly_tag) - path = os.path.join(directory, args.nightly_release + "__" + args.project + "__" + args.platform + "*" + args.nightly_tag + "__*.ayum.log") +@dispatch.on('compare', 'ref') +def compare_local(file_name, ref_file, **kwargs): + """TBD.""" + ArtBase().compare_ref(file_name, ref_file) - count = 0 - needed = 1 - value = count_string_occurrence(path, "Install looks to have been successful") - print "art-status: waiting" - print path - print "count: " + str(value) + " mins: " + str(count) - sys.stdout.flush() - while (value < needed) and (count < 30): - time.sleep(60) - count += 1 - value = count_string_occurrence(path, "Install looks to have been successful") - print "count: " + str(value) + " mins: " + str(count) - sys.stdout.flush() - - if value < needed: - print "art-status: no release" - sys.stdout.flush() - sys.exit(-2) - - print "art-status: setup" - sys.stdout.flush() - sys.exit(0) - - def submit(self): - """TBD.""" - parser = argparse.ArgumentParser( - description='Submit tests to the grid') - parser.add_argument('nightly_release', help=Art.NIGHTLY_RELEASE_HELP) - parser.add_argument('project', help=Art.PROJECT_HELP) - parser.add_argument('platform', help=Art.PLATFORM_HELP) - parser.add_argument('nightly_tag', help=Art.NIGHTLY_TAG_HELP) - parser.add_argument('sequence_tag', help=Art.SEQUENCE_TAG_HELP) - # now that we're inside a subcommand, ignore the first two args - args = parser.parse_args(sys.argv[2:]) - print 'Running art submit' - sys.stdout.flush() - # get the path of the art.py script - art_dir = os.path.dirname(os.path.realpath(sys.argv[0])) - - # job will be submitted from tmp directory - submit_directory = 'tmp' - - # make sure tmp is removed - if os.path.exists(submit_directory): - shutil.rmtree(submit_directory) - - config = self.get_config() - - # make sure script directory exist - if not os.path.isdir(Art.SCRIPT_DIRECTORY): - print 'ERROR: script directory does not exist: %s' % Art.SCRIPT_DIRECTORY - print 'art-status: error' - sys.stdout.flush() - exit(1) - - # get the test_*.sh from the test directory - test_directories = self.get_test_directories(args.nightly_release, args.project, args.platform, args.nightly_tag) - if not test_directories: - print 'No tests found in directories ending in "test"' - sys.stdout.flush() - - for package, root in test_directories.items(): - if self.excluded(config, args.nightly_release, args.project, args.platform, package): - print 'Excluding ' + package + ' for ' + args.nightly_release + ' project ' + args.project + ' on ' + args.platform - print 'art-package: ' + package - print 'art-status: excluded' - sys.stdout.flush() - else: - shell_files = self.get_files(root, 'long') - number_of_tests = len(shell_files) - if number_of_tests > 0: - print 'art-package: ' + package - print 'art-status: included' - print 'root' + root - print 'Handling ' + package + ' for ' + args.nightly_release + ' project ' + args.project + ' on ' + args.platform - print "Number of tests: " + str(number_of_tests) - sys.stdout.flush() - submit_dir = os.path.join(submit_directory, package) - run = os.path.join(submit_dir, "run") - mkdir_p(run) - - shutil.copy(os.path.join(art_dir, 'art.py'), run) - make_executable(os.path.join(run, 'art.py')) - - command = os.path.join(art_dir, 'art.py') + ' task ' + args.nightly_release + ' ' + args.project + ' ' + args.platform + ' ' + args.nightly_tag + ' ' + args.sequence_tag + ' ' + package + ' ' + str(number_of_tests) - print command - sys.stdout.flush() - out = check(run_command(command)) - print out - sys.stdout.flush() - - def retrieve(self): - """TBD.""" - parser = argparse.ArgumentParser( - description='Wait for tests to finish and retrieve the results') - parser.add_argument('job_id', help="id of the job to retrieve") - # now that we're inside a subcommand, ignore the first two args - args = parser.parse_args(sys.argv[2:]) - job_id = args.job_id - - url = 'http://bigpanda.cern.ch/task/' + job_id + '/?json' - print 'Running art retrieve on ' + url - sys.stdout.flush() +@dispatch.on('compare', 'grid') +def compare_grid(package, test_name, nightly_release, project, platform, nightly_tag, **kwargs): + """TBD.""" + days = int(kwargs['days']) + file_names = kwargs['file_name'] + ArtGrid(nightly_release, project, platform, nightly_tag).compare(package, test_name, days, file_names) - response = requests.get(url) - if response.status_code != 200: - print 'ERROR http status code ' + str(response.status_code) - sys.stdout.flush() - exit(1) - - data = response.json() - - task = data['taskparams']['taskName'] - task_match = re.match(r'^(\w+)\.(\w+)\.(\w+)\.(\w+).([\w\.-]+)\.([\w-]+)\.([\w-]+)\.(\d+)/', task) - if not task_match: - print 'ERROR cannot decode task: ' + task - sys.stdout.flush() - exit(1) - - # job_type = task_match.group(1) - # user = task_match.group(2) - # experiment = task_match.group(3) - # job = task_match.group(4) - # nightly_release = task_match.group(5) - # platform = task_match.group(6) - # nightly_release = task_match.group(7) - # build_id = task_match.group(8) - - status = data['task']['status'] - dsinfo = data['task']['dsinfo'] - print 'Total / Done / Failed: ' + str(dsinfo['nfiles']) + ' / ' + str(dsinfo['nfilesfinished']) + ' / ' + str(dsinfo['nfilesfailed']) - sys.stdout.flush() - # Add other final states here - print 'Status: ' + status - sys.stdout.flush() +@dispatch.on('list', 'grid') +def list(package, job_type, nightly_release, project, platform, nightly_tag, **kwargs): + """TBD.""" + ArtGrid(nightly_release, project, platform, nightly_tag).list(package, job_type) - if status in ['done']: - exit(0) - - if status in ['finished', 'failed']: - exit(1) - - if status in ['broken', 'aborted']: - exit(2) - - # please re-call later - exit(-1) - - def task(self): - """TBD. - - arguments: SCRIPT_DIRECTORY, NIGHTLY_RELEASE, PROJECT, PLATFORM, NIGHTLY_TAG, SEQUENCE_TAG, PACKAGE, NUMBER_OF_TESTS - - example: /cvmfs/atlas-nightlies.cern.ch/repo/sw/21.0/2017-02-26T2119 21.0 Athena x86_64-slc6-gcc62-opt 2017-02-26T2119 316236 Tier0ChainTests 32 - """ - parser = argparse.ArgumentParser( - description='Runs a single task, consisting of given number of jobs') - parser.add_argument('nightly_release', help=Art.NIGHTLY_RELEASE_HELP) - parser.add_argument('project', help=Art.PROJECT_HELP) - parser.add_argument('platform', help=Art.PLATFORM_HELP) - parser.add_argument('nightly_tag', help=Art.NIGHTLY_TAG_HELP) - parser.add_argument('sequence_tag', help=Art.SEQUENCE_TAG_HELP) - parser.add_argument('package', help=Art.PACKAGE_HELP) - parser.add_argument('number_of_tests', help="number of tests") - # now that we're inside a subcommand, ignore the first two args - args = parser.parse_args(sys.argv[2:]) - print 'Running art task' - sys.stdout.flush() - # get the path of the art.py script - art_dir = os.path.dirname(os.path.realpath(sys.argv[0])) +@dispatch.on('log', 'grid') +def log(package, test_name, nightly_release, project, platform, nightly_tag, **kwargs): + """TBD.""" + ArtGrid(nightly_release, project, platform, nightly_tag).log(package, test_name) - print args.nightly_release + " " + args.project + " " + args.platform + " " + args.nightly_tag + " " + args.sequence_tag + " " + args.package + " " + str(args.number_of_tests) - sys.stdout.flush() - # run task from Bash Script as is needed in ATLAS setup - # FIXME we need to parse the output - out = check(run_command(os.path.join(art_dir, 'art-task-grid.sh') + " " + args.nightly_release + " " + args.project + " " + args.platform + " " + args.nightly_tag + " " + args.sequence_tag + " " + args.package + " " + str(args.number_of_tests))) - print out - sys.stdout.flush() +@dispatch.on('output', 'grid') +def output(package, test_name, file_name, nightly_release, project, platform, nightly_tag, **kwargs): + """TBD.""" + ArtGrid(nightly_release, project, platform, nightly_tag).output(package, test_name, file_name) - def job(self): - """TBD. - - arguments: NIGHTLY_RELEASE, PROJECT, PLATFORM, NIGHTLY_TAG, PACKAGE, INDEX, OUTPUT - - Tests are called with the following parameters: - SCRIPT_DIRECTORY, NIGHTLY_RELEASE, PROJECT, PLATFORM, NIGHTLY_TAG, PACKAGE, TEST_NAME - """ - parser = argparse.ArgumentParser( - description='Runs a single job, given an index') - parser.add_argument('nightly_release', help=Art.NIGHTLY_RELEASE_HELP) - parser.add_argument('project', help=Art.PROJECT_HELP) - parser.add_argument('platform', help=Art.PLATFORM_HELP) - parser.add_argument('nightly_tag', help=Art.NIGHTLY_TAG_HELP) - parser.add_argument('package', help=Art.PACKAGE_HELP) - parser.add_argument('index', help="index of the test, for grid starting from 1") - parser.add_argument('output', help="output filename") - # now that we're inside a subcommand, ignore the first two args - args = parser.parse_args(sys.argv[2:]) - print 'Running art job' - sys.stdout.flush() - index = int(args.index) +@dispatch.on('retrieve') +def retrieve(job_id, **kwargs): + """TBD.""" + url = 'http://bigpanda.cern.ch/task/' + job_id + '/?json' + print 'Running art retrieve on ' + url + sys.stdout.flush() - print args.nightly_release + " " + args.project + " " + args.platform + " " + args.nightly_tag + " " + args.package + " " + str(index) + " " + args.output + response = requests.get(url) + if response.status_code != 200: + print 'ERROR http status code ' + str(response.status_code) sys.stdout.flush() + exit(1) - test_directories = self.get_test_directories(args.nightly_release, args.project, args.platform, args.nightly_tag) - test_dir = test_directories[args.package] - test_list = self.get_files(test_dir, 'long') + data = response.json() - # minus one for grid - test_name = test_list[index - 1] - test_file = os.path.join(test_dir, test_name) - com = '%s %s %s %s %s %s %s %s' % (test_file, Art.SCRIPT_DIRECTORY, args.nightly_release, args.project, args.platform, args.nightly_tag, args.package, test_name) - - print test_name - print test_dir - print com + task = data['taskparams']['taskName'] + task_match = re.match(r'^(\w+)\.(\w+)\.(\w+)\.(\w+).([\w\.-]+)\.([\w-]+)\.([\w-]+)\.(\d+)/', task) + if not task_match: + print 'ERROR cannot decode task: ' + task sys.stdout.flush() + exit(1) - # run the test - out = check(run_command(com)) - print out - sys.stdout.flush() + # job_type = task_match.group(1) + # user = task_match.group(2) + # experiment = task_match.group(3) + # job = task_match.group(4) + # nightly_release = task_match.group(5) + # platform = task_match.group(6) + # nightly_release = task_match.group(7) + # build_id = task_match.group(8) - # pick up the output - # FIXME for other outputs - tar_file = tarfile.open(args.output, mode='w') - with open(test_file, "r") as f: - for line in f: - for word in line.split(): - out_name = re.findall("--output.*=(.*)", word) - if (out_name): - if os.path.exists(out_name[0]): - tar_file.add(out_name[0]) - tar_file.close() - - def list(self): - """TBD.""" - parser = argparse.ArgumentParser( - description='Lists the jobs of a package') - parser.add_argument('nightly_release', help=Art.NIGHTLY_RELEASE_HELP) - parser.add_argument('project', help=Art.PROJECT_HELP) - parser.add_argument('platform', help=Art.PLATFORM_HELP) - parser.add_argument('nightly_tag', help=Art.NIGHTLY_TAG_HELP) - parser.add_argument('package', help=Art.PACKAGE_HELP) - args = parser.parse_args(sys.argv[2:]) - print 'Running art list' - sys.stdout.flush() + status = data['task']['status'] + dsinfo = data['task']['dsinfo'] + print 'Total / Done / Failed: ' + str(dsinfo['nfiles']) + ' / ' + str(dsinfo['nfilesfinished']) + ' / ' + str(dsinfo['nfilesfailed']) + sys.stdout.flush() - jobs = self.get_list(args.nightly_release, args.project, args.platform, args.nightly_tag, args.package) - i = 1 - for job in jobs: - print str(i) + ' ' + job - sys.stdout.flush() - i += 1 - - def get_tar(self, nightly_release, project, platform, nightly_tag, package, test_name, extension): - """TBD.""" - index = self.get_list(nightly_release, project, platform, nightly_tag, package).index(test_name) - # Grid counts from 1 - index += 1 - - container = 'user.tcuhadar.atlas.art.' + nightly_release + '.' + project + '.' + platform + '.' + nightly_tag + '.*.' + package + extension - print container - - out = check(run_command("rucio list-dids " + container + " --filter type=container | grep " + nightly_tag + " | sort -r | cut -d ' ' -f 2 | head -n 1")) - print out - - out = check(run_command("rucio list-files --csv " + out + " | grep " + "{0:0>6}".format(index))) - print out - - tar_name = out.split(',')[0] - out = check(run_command("rucio download " + tar_name)) - print out - - return tarfile.open(tar_name.replace(':', '/', 1)) - - def log(self): - """TBD.""" - parser = argparse.ArgumentParser( - description='Show the log of a job') - parser.add_argument('nightly_release', help=Art.NIGHTLY_RELEASE_HELP) - parser.add_argument('project', help=Art.PROJECT_HELP) - parser.add_argument('platform', help=Art.PLATFORM_HELP) - parser.add_argument('nightly_tag', help=Art.NIGHTLY_TAG_HELP) - parser.add_argument('package', help=Art.PACKAGE_HELP) - parser.add_argument('test_name', help='name of the test') - args = parser.parse_args(sys.argv[2:]) - print 'Running art log' - sys.stdout.flush() + # Add other final states here + print 'Status: ' + status + sys.stdout.flush() - tar = self.get_tar(args.nightly_release, args.project, args.platform, args.nightly_tag, args.package, args.test_name, '.log') - - for name in tar.getnames(): - if 'athena_stdout.txt' in name: - f = tar.extractfile(name) - content = f.read() - print content - break - tar.close() - - def output(self): - """TBD.""" - parser = argparse.ArgumentParser( - description='Get the output of a job') - parser.add_argument('nightly_release', help=Art.NIGHTLY_RELEASE_HELP) - parser.add_argument('project', help=Art.PROJECT_HELP) - parser.add_argument('platform', help=Art.PLATFORM_HELP) - parser.add_argument('nightly_tag', help=Art.NIGHTLY_TAG_HELP) - parser.add_argument('package', help=Art.PACKAGE_HELP) - parser.add_argument('test_name', help='name of the test') - parser.add_argument('file_name', help='name of the output file') - args = parser.parse_args(sys.argv[2:]) - print 'Running art output' - sys.stdout.flush() + if status in ['done']: + exit(0) - tar = self.get_tar(args.nightly_release, args.project, args.platform, args.nightly_tag, args.package, args.test_name, '_EXT0') - - for member in tar.getmembers(): - if args.file_name in member.name: - tar.extractall(path='.', members=[member]) - break - tar.close() - - def compare(self): - """TBD.""" - parser = argparse.ArgumentParser( - description='Compare the output of a job') - parser.add_argument('nightly_release', help=Art.NIGHTLY_RELEASE_HELP) - parser.add_argument('project', help=Art.PROJECT_HELP) - parser.add_argument('platform', help=Art.PLATFORM_HELP) - parser.add_argument('nightly_tag', help=Art.NIGHTLY_TAG_HELP) - parser.add_argument('package', help=Art.PACKAGE_HELP) - parser.add_argument('test_name', help='name of the test') - parser.add_argument('days', help='number of days back for reference file') - parser.add_argument('file_names', nargs='+', help='name of the output file') - args = parser.parse_args(sys.argv[2:]) - print 'Running art compare' - sys.stdout.flush() + if status in ['finished', 'failed']: + exit(1) + + if status in ['broken', 'aborted']: + exit(2) - previous_nightly_tag = self.get_previous_nightly_tag(args.nightly_release, args.nightly_tag, args.project, int(args.days)) - - ref_dir = os.path.join('.', 'ref-' + previous_nightly_tag) - mkdir_p(ref_dir) - - tar = self.get_tar(args.nightly_release, args.project, args.platform, previous_nightly_tag, args.package, args.test_name, '_EXT0') - for member in tar.getmembers(): - if member.name in args.file_names: - tar.extractall(path=ref_dir, members=[member]) - tar.close() - - for file_name in args.file_names: - print "art-compare: " + previous_nightly_tag + " " + file_name - ref_file = os.path.join(ref_dir, file_name) - - out = check(run_command("acmd.py diff-root " + file_name + " " + ref_file + " --error-mode resilient --ignore-leaves RecoTimingObj_p1_HITStoRDO_timings RecoTimingObj_p1_RAWtoESD_mems RecoTimingObj_p1_RAWtoESD_timings RAWtoESD_mems RAWtoESD_timings ESDtoAOD_mems ESDtoAOD_timings HITStoRDO_timings RAWtoALL_mems RAWtoALL_timings RecoTimingObj_p1_RAWtoALL_mems RecoTimingObj_p1_RAWtoALL_timings --entries 10")) - print out - sys.stdout.flush() - - def get_previous_nightly_tag(self, nightly_release, nightly_tag, project, days): - """TBD. 21:00 is cutoff time.""" - directory = os.path.join(Art.SCRIPT_DIRECTORY, nightly_release) - tags = os.listdir(directory) - tags.sort(reverse=True) - tags = [x for x in tags if re.match(r'\d{4}-\d{2}-\d{2}T\d{2}\d{2}', x)] - # print tags - found = False - for tag in tags: - if tag == nightly_tag: - found = True - elif found: - # check this is within days... (cutoff is 21:00, just move by 3 hours to get full days) - fmt = '%Y-%m-%dT%H%M' - offset = datetime.timedelta(hours=3) - nightly_tag_dt = datetime.datetime.strptime(nightly_tag, fmt) + offset - from_dt = nightly_tag_dt.replace(hour=0, minute=0, second=0, microsecond=0) - datetime.timedelta(days=days) - to_dt = from_dt + datetime.timedelta(days=1) - tag_dt = datetime.datetime.strptime(tag, fmt) + offset - if from_dt <= tag_dt and tag_dt < to_dt and os.path.isdir(os.path.join(directory, tag, project)): - return tag - return None - - def get_list(self, nightly_release, project, platform, nightly_tag, package): - """TBD.""" - test_directories = self.get_test_directories(nightly_release, project, platform, nightly_tag) - test_dir = test_directories[package] - return self.get_files(test_dir, 'long') + # please re-call later + exit(-1) if __name__ == '__main__': - Art() + dispatch(__doc__, version=os.path.splitext(os.path.basename(__file__))[0] + ' ' + __version__)