diff --git a/AtlasTest/TestTools/share/post.sh b/AtlasTest/TestTools/share/post.sh index 8bb3819c6011c305d4131f86596449ae284af24b..c3576034da789e2dcfb2f1ca64e72d3d626477c4 100755 --- a/AtlasTest/TestTools/share/post.sh +++ b/AtlasTest/TestTools/share/post.sh @@ -132,7 +132,6 @@ PP="$PP"'|//GP: ' #ignore which malloc we are using PP="$PP"'|^Preloading tcmalloc' PP="$PP"'|^WARNING: TCMALLOCDIR not defined' -PP="$PP"'|^Py:AthFile .*shutting down athfile-server' PP="$PP"'|^HistogramPersis... INFO *.CnvServices.:' PP="$PP"'|^HistogramPersis.*Histograms saving not required.' PP="$PP"'|^StatusCodeSvc' diff --git a/Control/AthenaPython/python/FilePeekerLib.py b/Control/AthenaPython/python/FilePeekerLib.py index a165ead9e083bb1f81a77ca31fd33246b80452cd..9d51d8ccda8c7a9a65e010e71f9b6e14374edb06 100644 --- a/Control/AthenaPython/python/FilePeekerLib.py +++ b/Control/AthenaPython/python/FilePeekerLib.py @@ -32,15 +32,9 @@ def toiter(beg,end): return def _create_file_infos(): - """simple helper function to create consistent dicts for the - fileinfos attribute of AthFile + """simple helper function to create consistent dicts for in-file metadata """ d = { - #'file_md5sum': None, # provided by AthFile.impl - #'file_name': None, # ditto - #'file_type': None, # ditto - #'file_guid': None, # ditto - 'nentries' : 0, # to handle empty files 'run_number': [], 'run_type': ['N/A'], diff --git a/Control/AthenaPython/share/athfile_peeker.py b/Control/AthenaPython/share/athfile_peeker.py deleted file mode 100644 index 7a6771d9a1fd74964240eb1a5ace9a9cfb8d7dbc..0000000000000000000000000000000000000000 --- a/Control/AthenaPython/share/athfile_peeker.py +++ /dev/null @@ -1,37 +0,0 @@ -## @file: AthenaPython/athfile_peeker.py -## @purpose: simple joboptions to inspect a POOL file's content -## @date May 2009 -## @author Sebastien Binet <binet@cern.ch> - -__version__ = "$Revision$" -__author__ = "Sebastien Binet <binet@cern.ch>" - -## imports -import os - -## usual Athena POOL drudegery -import AthenaPoolCnvSvc.ReadAthenaPool -from AthenaCommon.AppMgr import ServiceMgr as svcMgr -FNAME = vars().get('FNAME', 'esd.pool') -if isinstance(FNAME, str): - FNAMES = [FNAME] -elif isinstance(FNAME, list): - FNAMES = FNAME[:] -svcMgr.EventSelector.InputCollections = FNAMES - -## configuring top-sequence -from AthenaCommon.AlgSequence import AlgSequence -job = AlgSequence() -from AthenaPython.FilePeekerLib import FilePeeker -import AthenaCommon.Constants as Lvl -job += FilePeeker('peeker', OutputLevel=Lvl.INFO) -job.peeker.outfname = 'peeker.%08i.pkl' % os.getpid() -job.peeker.infname = FNAMES[0] - -## metadata + taginfo configuration -import IOVDbSvc.IOVDb - -## evt-max -theApp.EvtMax = vars().get('EVTMAX', 1) - -svcMgr.MessageSvc.OutputLevel = Lvl.ERROR diff --git a/Generators/FlowAfterburner/share/clean.sh b/Generators/FlowAfterburner/share/clean.sh index a9872759a7fb9317b501bea7999dde331fe3682a..d689c9cec9923d443cc45c598c59052c3577e18a 100644 --- a/Generators/FlowAfterburner/share/clean.sh +++ b/Generators/FlowAfterburner/share/clean.sh @@ -1,2 +1 @@ -#rm *.root *.txt *.xml* openedfiles* AtRndmGenSvc.out athfile-cache.ascii -rm *.txt *.xml* openedfiles* AtRndmGenSvc.out athfile-cache.ascii +rm *.txt *.xml* openedfiles* AtRndmGenSvc.out diff --git a/Generators/FlowAfterburner/share/grid_submit/clean.sh b/Generators/FlowAfterburner/share/grid_submit/clean.sh index a9872759a7fb9317b501bea7999dde331fe3682a..d689c9cec9923d443cc45c598c59052c3577e18a 100644 --- a/Generators/FlowAfterburner/share/grid_submit/clean.sh +++ b/Generators/FlowAfterburner/share/grid_submit/clean.sh @@ -1,2 +1 @@ -#rm *.root *.txt *.xml* openedfiles* AtRndmGenSvc.out athfile-cache.ascii -rm *.txt *.xml* openedfiles* AtRndmGenSvc.out athfile-cache.ascii +rm *.txt *.xml* openedfiles* AtRndmGenSvc.out diff --git a/Generators/Sherpa_i/python/sherpaTarCreator/jobDefinitions.py b/Generators/Sherpa_i/python/sherpaTarCreator/jobDefinitions.py index 065c26ad9486067c13ab0cd70396d234d98802d3..b8a5a35cb742e70da58e74845735fbd93d7892b5 100644 --- a/Generators/Sherpa_i/python/sherpaTarCreator/jobDefinitions.py +++ b/Generators/Sherpa_i/python/sherpaTarCreator/jobDefinitions.py @@ -98,7 +98,7 @@ def mkCreateLibsJob(options, prevJob): job.cmds += [" exit 1"] job.cmds += ["fi"] - job.cmds += ["rm -rf ${outputEVNTFile} _joproxy* AtRndmGenSvc.out AthenaSummary_Generate.txt Generate_messageSvc_jobOptions.py Generate_runathena PoolFileCatalog.xml PoolFileCatalog.xml.BAK TransformTimer_Generate.pickle athfile-cache.ascii.gz config.pickle dmesg_trf.txt hostnamelookup.tmp inputDictionary.pickle jobInfo.xml jobInfo_Generate.xml jobReport* last.Generate last.runargs.gpickle runargs.Generate.gpickle runargs.Generate.py metadata_Generate.xml metadata.xml Sherpa_References.tex ntuple.pmon.stream setupevprod.sh share ntuple.pmon.gz testHepMC.root events.py Bdecays0.dat Bs2Jpsiphi.DEC DECAY.DEC G4particle_acceptlist.txt PDGTABLE.MeV pdt.table runargs.generate.py runwrapper.generate.sh eventLoopHeartBeat.txt susyParticlePdgid.txt TestHepMC.root log.generate mem.full.generate mem.summary.generate.json env.txt Run.dat Sherpa.yaml"] + job.cmds += ["rm -rf ${outputEVNTFile} _joproxy* AtRndmGenSvc.out AthenaSummary_Generate.txt Generate_messageSvc_jobOptions.py Generate_runathena PoolFileCatalog.xml PoolFileCatalog.xml.BAK TransformTimer_Generate.pickle config.pickle dmesg_trf.txt hostnamelookup.tmp inputDictionary.pickle jobInfo.xml jobInfo_Generate.xml jobReport* last.Generate last.runargs.gpickle runargs.Generate.gpickle runargs.Generate.py metadata_Generate.xml metadata.xml Sherpa_References.tex ntuple.pmon.stream setupevprod.sh share ntuple.pmon.gz testHepMC.root events.py Bdecays0.dat Bs2Jpsiphi.DEC DECAY.DEC G4particle_acceptlist.txt PDGTABLE.MeV pdt.table runargs.generate.py runwrapper.generate.sh eventLoopHeartBeat.txt susyParticlePdgid.txt TestHepMC.root log.generate mem.full.generate mem.summary.generate.json env.txt Run.dat Sherpa.yaml"] job.write() job.submit(dryRun=options.dryRun) diff --git a/PhysicsAnalysis/SUSYPhys/LongLivedParticleDPDMaker/run/readGridFiles.py b/PhysicsAnalysis/SUSYPhys/LongLivedParticleDPDMaker/run/readGridFiles.py index 1811fd0e0e3f08d565e07553294b52e669ff4593..616e216c17fd1a7dfc67fac523c8d92b13997864 100644 --- a/PhysicsAnalysis/SUSYPhys/LongLivedParticleDPDMaker/run/readGridFiles.py +++ b/PhysicsAnalysis/SUSYPhys/LongLivedParticleDPDMaker/run/readGridFiles.py @@ -37,9 +37,6 @@ for x in tarballs: if "log.RAWtoALL" in f: openFile = open(x+'/'+f) for line in openFile: - # find input data file for lumiblock info - if re.match('.*Py:AthFile\s*INFO opening \[\S*\]...',line): - data_file = line[line.index("[")+1:line.rindex("]")] # find RPVLL filter information if re.match('.*RPVLL.*Events',line) or re.match('.*BSESOutputSvcStreamDRAW_RPVLL.*events',line): # write input data file + RPVLL info to output file diff --git a/Tools/PyJobTransforms/test/test_trfArgClassesATLAS.py b/Tools/PyJobTransforms/test/test_trfArgClassesATLAS.py index ee929f0da38fe09b4d2f36e91d150b5fce091638..f5e9db3573b09aafaad159b7ba54ece76e23d5ed 100755 --- a/Tools/PyJobTransforms/test/test_trfArgClassesATLAS.py +++ b/Tools/PyJobTransforms/test/test_trfArgClassesATLAS.py @@ -35,11 +35,7 @@ class argFileEOSTests(unittest.TestCase): class argPOOLFiles(unittest.TestCase): def tearDown(self): - for f in 'athfile-cache.ascii.gz', 'athfile-infos.ascii': - try: - os.unlink(f) - except OSError: - pass + return def test_argPOOLFileMetadata_ESD(self): try: @@ -73,11 +69,7 @@ class argPOOLFiles(unittest.TestCase): class argBSFiles(unittest.TestCase): def tearDown(self): - for f in 'athfile-cache.ascii.gz', 'athfile-infos.ascii': - try: - os.unlink(f) - except OSError: - pass + return def test_argBSFileMetadata(self): try: diff --git a/Tools/PyUtils/CMakeLists.txt b/Tools/PyUtils/CMakeLists.txt index 5d518ecde1188a3df5f71df8aa538168e25c478d..cc99a7256fd4402015e29805824047063e87bc9f 100644 --- a/Tools/PyUtils/CMakeLists.txt +++ b/Tools/PyUtils/CMakeLists.txt @@ -33,7 +33,7 @@ else() find_package( six ) # Install files from the package: - atlas_install_python_modules( python/*.py python/scripts python/AthFile POST_BUILD_CMD ${ATLAS_FLAKE8} ) + atlas_install_python_modules( python/*.py python/scripts POST_BUILD_CMD ${ATLAS_FLAKE8} ) atlas_install_scripts( bin/acmd.py bin/checkFile.py bin/checkPlugins.py bin/checkSG.py bin/checkMetaSG.py bin/checkTP.py bin/checkxAOD.py bin/diff-jobo-cfg.py bin/diffConfigs.py diff --git a/Tools/PyUtils/python/AthFile/__init__.py b/Tools/PyUtils/python/AthFile/__init__.py deleted file mode 100644 index 0be996f426b2635068c40e683de1880eba24e16b..0000000000000000000000000000000000000000 --- a/Tools/PyUtils/python/AthFile/__init__.py +++ /dev/null @@ -1,150 +0,0 @@ -# Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration - -# @file PyUtils/python/AthFile/__init__.py -# @purpose a simple abstraction of a file to retrieve informations out of it -# @author Sebastien Binet <binet@cern.ch> -# @date October 2008 - -__doc__ = "a simple abstraction of a file to retrieve informations out of it" -__author__ = "Sebastien Binet <binet@cern.ch>" - -__all__ = [] -__pseudo_all__ = [ - 'AthFile', - 'ftype', - 'fopen', - 'exists', - 'server', - ] - -import functools -import PyUtils.Decorators as _decos -from . import impl as _impl -AthFile = _impl.AthFile - -def _update_cache(fct): - @functools.wraps(fct) - def wrapper(*args): - res = fct(*args) - import PyUtils.AthFile as af - if af.server._do_pers_cache: - try: - af.server.load_cache() - except Exception: - pass - return res - return wrapper - -### classes ------------------------------------------------------------------- -import types -class ModuleFacade(types.ModuleType): - """a helper class to manage the instantiation of the ``AthFileMgr`` and - ``AthFileServer`` objects and allow attribute-like access to methods - (stolen from PyRoot) - """ - def __init__( self, module ): - types.ModuleType.__init__(self, module.__name__) - self.__dict__['module'] = module - self.__dict__[ '__doc__' ] = module.__doc__ - self.__dict__[ '__name__' ] = module.__name__ - self.__dict__[ '__file__' ] = module.__file__ - - self.__dict__['_impl'] = _impl - - self.__dict__['server'] = _impl.g_server - - import atexit - atexit.register(self.shutdown) - del atexit - - def __getattr__(self, k): - if k in self.__dict__: - return self.__dict__.get(k) - if k.startswith('__'): - return types.ModuleType.__getattribute__(self, k) - return object.__getattribute__(self, k) - - def restart_server(self): - return - - def shutdown(self): - #self.server._cleanup_pyroot() - return - - @property - def msg(self): - return self.server.msg() - - @property - def cache(self): - return self.server.cache() - - @property - def save_cache(self): - return self.server.save_cache - - @property - def load_cache(self): - return self.server.load_cache - - @property - def flush_cache(self): - return self.server.flush_cache - - @_decos.forking - def ftype(self, fname): - return self.server.ftype(fname) - - @_decos.forking - def fname(self, fname): - return self.server.fname(fname) - - @_decos.forking - def exists(self, fname): - return self.server.exists(fname) - - @_update_cache # also decorate with _update_cache to pick-up the changes - @_decos.forking # from the forked athfile server... - def fopen(self, fnames, evtmax=1): - """ - helper function to create @c AthFile instances - @param `fnames` name of the file (or a list of names of files) to inspect - @param `nentries` number of entries to process (for each file) - - Note that if `fnames` is a list of filenames, then `fopen` returns a list - of @c AthFile instances. - """ - if isinstance(fnames, (list, tuple)): - infos = [] - for fname in fnames: - info = self.server.fopen(fname, evtmax) - infos.append(info) - pass - return infos - return self.server.fopen(fnames, evtmax) - - @_update_cache # also decorate with _update_cache to pick-up the changes - @_decos.forking # from the forked athfile server... - def pfopen(self, fnames, evtmax=1): - """ - helper function to create @c AthFile instances - @param `fnames` name of the file (or a list of names of files) to inspect - @param `nentries` number of entries to process (for each file) - - Note that if `fnames` is a list of filenames, then `fopen` returns a list - of @c AthFile instances. - - This is a parallel (multi-threaded) version of ``fopen``. - """ - return self.server.pfopen(fnames, evtmax) - - ## def __del__(self): - ## self._mgr.shutdown() - ## return super(ModuleFacade, self).__del__() - - pass # class ModuleFacade - -### exec at import ------------------------------------------------------------ -import sys -sys.modules[ __name__ ] = ModuleFacade( sys.modules[ __name__ ] ) -del ModuleFacade diff --git a/Tools/PyUtils/python/AthFile/impl.py b/Tools/PyUtils/python/AthFile/impl.py deleted file mode 100644 index e850cf6dee5622ed1b9579d429edea7da3636f70..0000000000000000000000000000000000000000 --- a/Tools/PyUtils/python/AthFile/impl.py +++ /dev/null @@ -1,1384 +0,0 @@ -# Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration - -# @file PyUtils/python/AthFile/impl.py -# @purpose a simple abstraction of a file to retrieve informations out of it -# @author Sebastien Binet <binet@cern.ch> -# @date November 2009 - -from __future__ import with_statement, print_function - -__author__ = "Sebastien Binet" -__doc__ = "implementation of AthFile-server behind a set of proxies to isolate environments" - -import errno -import os - -import PyUtils.Helpers as H -from .timerdecorator import timelimit, TimeoutError - -# see bug #95942 for the excruciating details -try: - from AthenaCommon.Include import excludeTracePattern - excludeTracePattern.append("*cache.ascii.gz") - del excludeTracePattern -except Exception: - pass - -### globals ------------------------------------------------------------------- -DEFAULT_AF_RUN = os.environ.get('DEFAULT_AF_RUN', False) -'''Revert to old file peeking via Athena sub-process if True.''' - -DEFAULT_AF_CACHE_FNAME = os.environ.get('DEFAULT_AF_CACHE_FNAME', - 'athfile-cache.ascii.gz') - -DEFAULT_AF_TIMEOUT = 20 -'''Default timeout for commands to be completed.''' - -### utils ---------------------------------------------------------------------- - -def _get_real_ext(fname): - """little helper to get the 'real' extension of a filename, handling 'fake' extensions (e.g. foo.ascii.gz -> .ascii)""" - se = os.path.splitext - f,ext = se(fname) - if ext in ('.gz',): - _,ext = se(f) - return ext - -def _my_open(name, mode='r', bufsiz=-1): - """helper method to handle gzipped or not files. - if `name` ends with '.gz' the correct gzip.open function will be called. - """ - f,ext = os.path.splitext(name) - if ext in ('.gz',): - import gzip - def gzip_exit(self, type, value, traceback): - return self.close() - def gzip_enter(self): - return self - gzip.GzipFile.__exit__ = gzip_exit - gzip.GzipFile.__enter__= gzip_enter - return gzip.open(name, mode + 't') - else: - return open(name, mode, bufsiz) - -def _find_file(filename, pathlist, access): - """Find <filename> with rights <access> through <pathlist>.""" - # special case for those filenames that already contain a path - if os.path.dirname(filename): - if os.access(filename, access): - return filename - - # test the file name in all possible paths until first found - for path in pathlist: - f = os.path.join(path, filename) - if os.access(f, access): - return f - - # no such accessible file avalailable - return None - -def _setup_ssl(msg, root): - x509_proxy = os.environ.get('X509_USER_PROXY', '') - if x509_proxy: - # setup proper credentials - root.TSSLSocket.SetUpSSL( - x509_proxy, - "/etc/grid-security/certificates", - x509_proxy, - x509_proxy) - else: - msg.warning("protocol https is requested but no X509_USER_PROXY was found! (opening the file might fail.)") - pass - return - -def _create_file_infos(): - """simple helper function to create consistent dicts for the - fileinfos attribute of AthFile - """ - d = { - 'file_md5sum': None, - 'file_name': None, - 'file_size': -1, - 'file_type': None, - 'file_guid': None, - 'nentries' : 0, # to handle empty files - 'run_number': [], - 'run_type': [], - 'evt_type': [], - 'evt_number': [], - 'lumi_block': [], - 'beam_energy': [], - 'beam_type': [], - 'stream_tags': [], - 'mc_channel_number': [], - 'metadata_items': None, - 'eventdata_items': None, - 'stream_names': None, - 'geometry': None, - 'conditions_tag': None, - 'det_descr_tags': None, - 'metadata': None, - 'tag_info': None, - } - return d - -def ami_dsinfos(dsname): - """a helper function to query AMI for informations about a dataset name. - `dsname` can be either a logical dataset name (a bag of files) or a - logical filename. - """ - import PyUtils.AmiLib as A - import PyUtils.xmldict as _x - import xml.etree.cElementTree as ET - - # keep order of tokens ! - for token in ('ami://', '//', '/'): - if dsname.startswith(token): - dsname = dsname[len(token):] - pass - pass - - ami = A.Client() - try: - res = ami.exec_cmd(cmd="GetDatasetInfo", logicalFileName=dsname) - except A.PyAmi.AMI_Error: - # maybe a logical dataset name then ? - res = ami.exec_cmd(cmd="GetDatasetInfo", logicalDatasetName=dsname) - res = _x.xml2dict(ET.fromstring(res.transform('xml'))) - data = res['AMIMessage']['Result'] - - # get only interesting informations... - rowset = data['rowset'] - if isinstance(rowset, list): - fields = rowset[-1]['row']['field'] - else: - fields = rowset['row']['field'] - - # translate into athfile-infos format - af_infos = _create_file_infos() - for i in fields: - if not ('name' in i and '_text' in i): - continue - k = i['name'] - v = i['_text'] - - if v.lower() == 'none': - v = None - - if k == 'logicalDatasetName': - af_infos['file_name'] = 'ami://'+v - - elif k == 'totalEvents': - af_infos['nentries'] = int(v) - - elif k == 'runNumber': - af_infos['run_number'] = [int(v)] - - elif k == 'geometryVersion': - af_infos['geometry'] = v - - elif k == 'conditionsTag': - af_infos['conditions_tag'] = v - - elif k == 'beamType': - af_infos['beam_type'] = [v] - - elif k == 'dataType': - af_infos['file_type'] = 'bs' if v.lower() == 'raw' else 'pool' - stream_name = 'Stream' + v.upper() - af_infos['stream_names'] = [stream_name] - - elif k == 'streamName': - stream_type,stream_name = v.split('_') - af_infos['stream_tags'] = [ - {'obeys_lbk': None, - 'stream_type': stream_type, - 'stream_name': stream_name} - ] - # FIXME !! - af_infos['file_guid'] = af_infos['file_name'] - # FIXME !! - if not af_infos['run_number']: - dsname = af_infos['file_name'] - idx = [i for i,j in enumerate(fields) - if j['name']=='contained_dataset'] - if len(idx)==1: - #try to extract run-number from the name of the first dataset - dsname = fields[idx[0]]['_text'].split(';')[0] - try: - # an AMI dsname is of the form: - # (project_name).(run_nbr).[...] - run_number = dsname.split('.')[1] - af_infos['run_number'] = [int(run_number)] - except ValueError: - pass - - else: - try: - # an AMI dsname is of the form: - # (project_name).(run_nbr).[...] - run_number = dsname.split('.')[1] - af_infos['run_number'] = [int(run_number)] - except ValueError: - pass - pass - - return af_infos - - -### classes ------------------------------------------------------------------- -class AthFile (object): - """A handle to an athena file (POOL,ROOT or ByteStream) - """ - __slots__ = ('fileinfos',) - - @staticmethod - def from_infos(infos): - o = AthFile() - o.fileinfos = _create_file_infos() # ensure basic layout - o.fileinfos.update(infos.copy()) - return o - - @staticmethod - def from_fname(fname): - import PyUtils.AthFile as af - return af.fopen(fname) - - @property - def name(self): - return self.fileinfos['file_name'] - - @property - def nentries(self): - return self.fileinfos['nentries'] - - @property - def infos(self): - return self.fileinfos - - @property - def run_number (self): - """return the list of unique run-numbers the @c AthFile contains""" - return list(set(self.infos['run_number'])) - # backward compatibility - run_numbers = run_number - - @property - def mc_channel_number (self): - """return the list of unique mc_channel-numbers the @c AthFile contains""" - return list(set(self.infos['mc_channel_number'])) - # ATEAM-168: requested for derivations - mc_channel_numbers = mc_channel_number - - @property - def evt_number (self): - """return the list of unique evt-numbers the @c AthFile contains""" - return list(set(self.infos['evt_number'])) - - @property - def lumi_block (self): - """return the list of unique lumi-block nbrs the @c AthFile contains - """ - return list(set(self.infos['lumi_block'])) - - @property - def run_type (self): - """return the list of unique run-types the @c AthFile contains""" - return list(set(self.infos['run_type'])) - - @property - def beam_type (self): - """return the list of unique beam-types the @c AthFile contains""" - return list(set(self.infos['beam_type'])) - - @property - def beam_energy (self): - """return the list of unique beam-energies the @c AthFile contains""" - return list(set(self.infos['beam_energy'])) - - pass # AthFile class - -class AthFileServer(object): - """the object serving AthFile requests - """ - - def __init__(self): - - import PyUtils.Logging as _L - self._msg = _L.logging.getLogger("AthFile") - self.set_msg_lvl(_L.logging.INFO) - - if os.environ.get('ATHFILE_DEBUG', '0') == '1': - import PyUtils.Logging as _L - self.set_msg_lvl(_L.logging.VERBOSE) - pass - - self.msg().debug('importing ROOT...') - import PyUtils.RootUtils as ru - self.pyroot = ru.import_root() - try: - ru._pythonize_tfile() - except Exception as err: - self.msg().warning('problem during TFile pythonization:\n%s', err) - - self.msg().debug('importing ROOT... [done]') - - # a cache of already processed requests - self._cache = {} - self._do_pers_cache = True - self.enable_pers_cache() - return - - # make the _peeker on-demand to get an up-to-date os.environ - @property - def _peeker(self): - return FilePeeker(self) - - def _cleanup_pyroot(self): - import PyUtils.RootUtils as ru - root = ru.import_root() - tfiles = root.gROOT.GetListOfFiles()[:] - for i,f in enumerate(tfiles): - try: - if f: - f.Close() - del f - except Exception as err: - self._msg.info('could not close a TFile:\n%s', err) - pass - tfiles[:] = [] - - def msg(self): - return self._msg - - def set_msg_lvl(self, lvl): - self.msg().setLevel(lvl) - - def _md5_for_file(self, f, block_size=2**20, do_fast_md5=True): - """helper function to calculate a MD5 checksum - ``f`` can be filename, an open python file or an open TFile - """ - import hashlib - md5 = hashlib.md5() - do_close = False - if isinstance(f, str): - protocol,fname = self.fname(f) - f = self._root_open(fname) - do_close = True - if f is None or not f: - raise IOError(errno.ENOENT, - "No such file or directory", - fname) - - assert hasattr(f, 'read'), \ - "'f' must be a file-like object. (f=%r, type=%s)"%( - f,type(f), - ) - orig_pos = f.tell() - f.seek(0) - try: - while True: - data = f.read(block_size) - if not data: - break - md5.update(data) - if do_fast_md5: - break - finally: - f.seek(orig_pos) - if do_close: - f.Close() - return md5.hexdigest() - - def _root_open(self, fname, raw=True): - root = self.pyroot - import re - with H.ShutUp(filters=[ - re.compile('TClass::TClass:0: RuntimeWarning: no dictionary for class.*') ]): - root.gSystem.Load('libRootCollection') - root_open = root.TFile.Open - - # we need to get back the protocol b/c of the special - # case of secure-http which needs to open TFiles as TWebFiles... - protocol, _ = self.fname(fname) - if protocol == 'https': - _setup_ssl(self.msg(), root) - root_open = root.TWebFile.Open - if raw: - if protocol == 'https' and '?' in fname: - # append filetype to existing parameters - f = root_open(fname+'&filetype=raw', 'READ') - else: - f = root_open(fname+'?filetype=raw', 'READ') - else: - f = root_open(fname, 'READ') - if f is None or not f: - raise IOError(errno.ENOENT, - 'No such file or directory',fname) - return f - return - - def pfopen(self, fnames, evtmax=1): - if isinstance(fnames, (list, tuple)): - self.msg().debug("using mp.pool... (files=%s)", len(fnames)) - fct = _do_fopen - do_pers_cache = self._do_pers_cache - self.disable_pers_cache() - import multiprocessing as mp - from multiprocessing.pool import ThreadPool - # Never run more than 4 parallel instances - pool_sz = min(mp.cpu_count(), 4) - pool = ThreadPool(pool_sz) - - infos = None - try: - setattr(self, '_evtmax', evtmax) - infos = pool.map(fct, fnames) - finally: - delattr(self, '_evtmax') - if do_pers_cache: - self.enable_pers_cache() - pass - pass - # collect back infos into ourself - for f in infos: - fname = f.infos['file_name'] - self._cache[fname] = f - pass - # synchronize once - try: - self._sync_pers_cache() - except Exception as err: - self.msg().info('could not synchronize the persistent cache:\n%s', err) - pass - - return infos - return self._fopen_file(fnames, evtmax) - - def fopen(self, fnames, evtmax=1): - if isinstance(fnames, (list, tuple)): - infos = [] - for fname in fnames: - info = self._fopen_file(fname, evtmax) - infos.append(info) - return infos - return self._fopen_file(fnames, evtmax) - - def _fopen_stateless(self, fname, evtmax): - msg = self.msg() - cache = dict(self._cache) - fids = [] - for k,v in cache.items(): - v = v.infos - fid = v.get('file_md5sum', v['file_guid']) - if fid: - fids.append((fid,k)) - pass - for v in fids: - fid, k = v - cache[fid] = k - pass - - protocol, fname = self.fname(fname) - if protocol in ('fid', 'lfn'): - protocol, fname = self.fname(fname) - - use_cache = False - sync_cache = True - if protocol in ('', 'file') : - fid = self.md5sum(fname) - fid_in_cache = fid in cache - # also check the cached name in case 2 identical files - # are named differently or under different paths - fid_match_fname = cache.get(fid,None) == fname - if fid_in_cache and fid_match_fname: - use_cache = True - sync_cache = False - msg.debug('fetched [%s] from cache (md5sum is a match)', fname) - f = cache[cache[fid]] - elif protocol in ('ami',): - use_cache = True - sync_cache = True # yes, we want to update the pers. cache - # take data from AMI - infos = ami_dsinfos(fname[len('ami://'):]) - msg.debug('fetched [%s] from cache', fname) - fid = infos.get('file_md5sum', infos['file_guid']) - cache[fid] = fname - f = AthFile.from_infos(infos) - cache[fname] = f - # hysteresis... - cache[infos['file_name']] = f - else: - # use the cache indexed by name rather than md5sums to - # skip one TFile.Open... - # Note: we assume files on mass storage systems do not - # change very often. - if fname in self._cache: - use_cache = True - sync_cache = False - msg.debug('fetched [%s] from cache', fname) - f = cache[fname] - - if not use_cache: - msg.info("opening [%s]...", fname) - infos = self._peeker(fname, evtmax) - f = AthFile.from_infos(infos) - cache[fname] = f - # hysteresis... - cache[infos['file_name']] = f - sync_cache = True - pass - - # remove the fids we added... - for v in fids: - fid, k = v - # in case there were duplicate fids - try: del cache[fid] - except KeyError: pass - pass - - return (fname, cache, sync_cache) - - def _fopen_file(self, fname, evtmax): - msg = self.msg() - fname, cache, sync_cache = self._fopen_stateless(fname, evtmax) - if sync_cache: - try: - self._cache = cache - self._sync_pers_cache() - except Exception as err: - msg.info('could not synchronize the persistent cache:\n%s', err) - pass - return self._cache[fname] - - def md5sum(self, fname): - """return the md5 checksum of file ``fname`` - """ - if isinstance(fname, str): - protocol,fname = self.fname(fname) - - md5 = self._md5_for_file(fname) - return md5 - - @timelimit(timeout=DEFAULT_AF_TIMEOUT) - def fname(self, fname): - """take a file name, return the pair (protocol, 'real' file name) - """ - fname = os.path.expanduser(os.path.expandvars(fname)) - - msg = self.msg() - - def _normalize_uri(uri): - if uri.startswith('/'): - return 'file:'+uri - return uri - - from urllib.parse import urlsplit - url = urlsplit(_normalize_uri(fname)) - protocol = url.scheme - def _normalize(fname): - from posixpath import normpath - fname = normpath(fname) - if fname.startswith('//'): fname = fname[1:] - return fname - - if protocol in ('', 'file', 'pfn'): - protocol = '' - fname = _normalize(url.path) - - ## hack for '/castor/cern.ch/...' paths - if fname.startswith('/castor/'): - protocol = 'rfio' - fname = protocol + ':' + fname - - elif protocol in ('rfio', 'castor'): - protocol = 'rfio' - fname = _normalize(url.path) - fname = protocol+':'+fname - - elif protocol in ('root','dcap', 'dcache', 'http', 'https', 'dav', 'davs'): - pass - - elif protocol in ('gsidcap',): - protocol = 'gfal:gsidcap' - pass - - elif protocol in ('lfn','fid',): - # percolate through the PoolFileCatalog - from PyUtils.PoolFile import PoolFileCatalog as pfc - fname = pfc().pfn(protocol+':'+url.path) - pass - - elif protocol in ('ami',): - # !! keep order of tokens ! - for token in ('ami:', '//', '/'): - if fname.startswith(token): - fname = fname[len(token):] - fname = 'ami://' + fname - pass - - else: - msg.warning('unknown protocol [%s]. we\'ll just return our input', - protocol) - pass - - return (protocol, fname) - - def cache(self): - return self._cache - - def enable_pers_cache(self): - """configure the file server to write out the persistent cache - of inspected files. - """ - # first disable previous cache, if any, to prevent hysteresis... - self.disable_pers_cache() - msg = self.msg() - self._do_pers_cache = True - - fname = DEFAULT_AF_CACHE_FNAME - if (fname and - os.path.exists(fname) and - os.access(fname, os.R_OK)): - msg.info('loading cache from [%s]...', fname) - try: - self.load_cache(fname) - msg.info('loading cache from [%s]... [done]', fname) - except TimeoutError: - msg.info('loading cache timed out!') - return - - def disable_pers_cache(self): - """configure the file server to NOT write out the persistent cache - of inspected files. - if the persistent cache wasn't enabled, this is a no-op. - """ - self._do_pers_cache = False - return - - def _sync_pers_cache(self): - if not self._do_pers_cache: - return - msg = self.msg() - fname = DEFAULT_AF_CACHE_FNAME - if not fname: - # protect against empty or invalid (None) cache file names - return - import uuid - pid = str(os.getpid())+'-'+str(uuid.uuid4()) - fname_,fname_ext = os.path.splitext(fname) - if fname_ext in ('.gz',): - fname_,fname_ext = os.path.splitext(fname_) - pid_fname = fname_ + '.' + pid + fname_ext + ".gz" - else: - pid_fname = fname_ + '.' + pid + fname_ext - msg.debug('synch-ing cache to [%s]...', fname) - try: - msg.debug('writing to [%s]...', pid_fname) - self.save_cache(pid_fname) - if os.path.exists(pid_fname): - # should be atomic on most FS... - os.rename(pid_fname, fname) - else: - msg.warning("could not save to [%s]", pid_fname) - msg.debug('synch-ing cache to [%s]... [done]', fname) - except Exception as err: - msg.debug('synch-ing cache to [%s]... [failed]', fname) - msg.debug('reason:\n%s', err) - pass - return - - # dead-lock on self.msg (I think!)... - #@timelimit(timeout=DEFAULT_AF_TIMEOUT) - def load_cache(self, fname=DEFAULT_AF_CACHE_FNAME): - """load file informations from a cache file. - the back-end (JSON, ASCII, pickle, ...) is inferred from the - extension of the `fname` parameter. - defaults to py-ASCII. - """ - msg = self.msg() - - ext = _get_real_ext(os.path.basename(fname)) - if len(ext) == 0: - # illegal file... - msg.info('load_cache: invalid file [%s]', fname) - return - - ext = ext[1:] if ext[0]=='.' else ext - try: - loader = getattr(self, '_load_%s_cache'%ext) - except AttributeError: - msg.info('load_cache: could not find a suitable backend for ' - 'extension [.%s] => using [ascii]', ext) - loader = self._load_ascii_cache - - try: - search_path = os.environ.get('DATAPATH',os.getcwd()) - search_path = search_path.split(os.pathsep) - fname = _find_file(os.path.expanduser(os.path.expandvars(fname)), - search_path, - os.R_OK) or fname - except ImportError: - # not enough karma... tough luck! - pass - - # ensure one can read that file... - with open(fname, 'r'): - pass - - msg.debug('loading cache from [%s]...', fname) - cache = {} - try: - cache = loader(fname) - except Exception as err: - msg.info("problem loading cache from [%s]!", fname) - msg.info(repr(err)) - pass - - self._cache.update(cache) - msg.debug('loading cache from [%s]... [done]', fname) - - def save_cache(self, fname=DEFAULT_AF_CACHE_FNAME): - """save file informations into a cache file. - the back-end (JSON, ASCII, pickle, ...) is inferred from the - extension of the `fname` parameter. - falls back to py-ASCII. - """ - msg = self.msg() - if os.path.exists(fname): - os.rename(fname, fname+'.bak') - ext = _get_real_ext(fname) - ext = ext[1:] # drop the dot - try: - saver = getattr(self, '_save_%s_cache'%ext) - except AttributeError: - msg.info('save_cache: could not find a suitable backend for ' - 'extension [.%s] => using [ascii]', ext) - saver = self._save_ascii_cache - try: - saver(fname) - except IOError as err: - import errno - if err.errno != errno.EACCES: - raise - else: - msg.info('could not save cache in [%s]', fname) - except Exception as err: - msg.warning('could not save cache into [%s]:\n%s', fname, err) - return - - def _load_pkl_cache(self, fname): - """load file informations from pickle/shelve 'fname'""" - try: import cPickle as pickle - except ImportError: import pickle - import shelve - db = shelve.open(fname, protocol=pickle.HIGHEST_PROTOCOL) - return db['fileinfos_cache'].copy() - - def _save_pkl_cache(self, fname): - """save file informations into pickle/shelve 'fname'""" - try: import cPickle as pickle - except ImportError: import pickle - import shelve - db = shelve.open(fname, protocol=pickle.HIGHEST_PROTOCOL) - db['fileinfos_cache'] = self._cache.copy() - db.close() - return - - def _load_json_cache(self, fname): - """load file informations from a JSON file""" - import json - with _my_open(fname) as fd: - cache = json.load(fd) - return dict((k,AthFile.from_infos(v)) for k,v in cache) - - def _save_json_cache(self, fname): - """save file informations using JSON""" - import json - cache = self._cache - with _my_open(fname, 'w') as fd: - json.dump([(k, cache[k].fileinfos) for k in cache], - fd, - indent=2, - sort_keys=True) - return - - def _load_ascii_cache(self, fname): - """load file informations from a pretty-printed python code""" - dct = {} - ast = compile(_my_open(fname).read(), fname, 'exec') - exec (ast, dct,dct) - del ast - try: - cache = dct['fileinfos'] - except Exception: - raise - finally: - del dct - return dict((k,AthFile.from_infos(v)) for k,v in cache) - - def _save_ascii_cache(self, fname): - """save file informations into pretty-printed python code""" - from pprint import pprint - cache = self._cache - with _my_open(fname, 'w') as fd: - print ("# this is -*- python -*-", file=fd) - print ("# this file has been automatically generated.", file=fd) - print ("fileinfos = [", file=fd) - fd.flush() - for k in cache: - print ("\n## new-entry", file=fd) - pprint((k, cache[k].fileinfos), - stream=fd, - width=120) - fd.flush() - print (", ", file=fd) - print ("]", file=fd) - print ("### EOF ###", file=fd) - fd.flush() - return - - def _load_db_cache(self, fname): - """load file informations from a sqlite file""" - import PyUtils.dbsqlite as dbsqlite - cache = dbsqlite.open(fname) - d = {} - for k,v in cache.items(): - d[k] = AthFile.from_infos(v) - return d - - def _save_db_cache(self, fname): - """save file informations using sqlite""" - import PyUtils.dbsqlite as dbsqlite - db = dbsqlite.open(fname,flags='w') - cache = self._cache - for k in cache: - db[k] = cache[k].fileinfos - db.close() - return - - def flush_cache(self): - self._cache = {} - return - - @timelimit(timeout=DEFAULT_AF_TIMEOUT) - def ftype(self, fname): - """ - returns the type of a file ('pool' or 'bs') together with its - canonical name. `fname` can be a string or a `ROOT.TFile` handle. - - example: - >>> import PyUtils.AthFile as af - >>> af.ftype ('castor:/castor/cern.ch/foo.pool') - ('pool', 'rfio:/castor/cern.ch/foo.pool') - - >>> af.ftype ('LFN:ttbar.pool') - ('pool', '/afs/cern.ch/somewhere/ttbar.pool') - - >>> af.ftype ('rfio:/castor/cern.ch/bs.data') - ('bs', 'rfio:/castor/cern.ch/bs.data') - - >>> af.ftype ('rfio:/castor/cern.ch/bs.data') - ('bs', 'rfio:/castor/cern.ch/bs.data') - """ - - _is_root_file = None - do_close = True - if isinstance(fname, str): - if not self.exists(fname): - import errno - raise IOError( - errno.ENOENT, - 'No such file or directory', - fname - ) - protocol,fname = self.fname(fname) - if protocol == 'ami': - # FIXME: what (else) can we do ? - ami_infos = self.fopen(fname).infos - return ami_infos['file_type'], fname - - f = self._root_open(fname) - else: - do_close = False - f = fname - - _is_root_file= bool(f and f.IsOpen() and b'root' in f.read(10)) - if f and do_close: - f.Close() - del f - - ftype = 'pool' if _is_root_file else 'bs' - return (ftype, fname) - - @timelimit(timeout=DEFAULT_AF_TIMEOUT) - def exists(self, fname): - """helper function to test if a fiven `fname` exists. - - handles local filesystems as well as RFIO. - usage example: - >>> import PyUtils.AthFile as af - >>> af.exists('/castor/cern.ch/user/b/binet/reffiles/14.1.0.x/AllBasicSamples.AOD.pool.root') - False - >>> af.exists('rfio:/castor/cern.ch/user/b/binet/reffiles/14.1.0.x/AllBasicSamples.AOD.pool.root') - True - >>> af.exists('castor:/castor/cern.ch/user/b/binet/reffiles/14.1.0.x/AllBasicSamples.AOD.pool.root') - True - >>> # you need a valid PoolFileCatalog.xml file for this to work: - >>> af.exists('LFN:top_CSC-01-02-00_RDO_extract.pool') - True - >>> af.exists('/afs/cern.ch/atlas/offline/ReleaseData/v2/testfile/calib1_csc11.005200.T1_McAtNlo_Jimmy.digit.RDO.v12000301_tid003138._00016_extract_10evt.pool.root') - True - """ - - def _root_exists(fname): - exists = False - f = None - try: - f = self._root_open(fname) - exists = f and f.IsOpen() - except Exception: - # swallow... - pass - finally: - if f: - f.Close() - del f - return bool(exists) - - protocol,fname = self.fname(fname) - - if protocol in ('fid', 'lfn'): - return self.exists(fname) - - elif protocol in ('ami',): - # FIXME: what else can we do ? - try: - ami_dsinfos(fname) - return True - except Exception: - return False - - else: - return _root_exists(fname) - # un-reachable - return False - - pass # class AthFileServer - -class FilePeeker(object): - def __init__(self, server): - self.server= server - self.msg = server.msg - self.pyroot= server.pyroot - self._sub_env = dict(os.environ) - # prevent ROOT from looking into $HOME for .rootrc files - # we carefully (?) set this environment variable *only* in the - # subprocess to not stomp on the toes of our parent one which is - # user-driven (and might need user-customized macros or configurations) - self._sub_env['ROOTENV_NO_HOME'] = '1' - - # prevent from running athena in interactive mode (and freeze) - if 'PYTHONINSPECT' in self._sub_env: - del self._sub_env['PYTHONINSPECT'] - - # prevent from running athena with igprof - for k in ('LD_PRELOAD', 'IGPROF'): - if k in self._sub_env: - del self._sub_env[k] - - def _root_open(self, fname, raw=False): - return self.server._root_open(fname, raw) - - def _is_tag_file(self, fname, evtmax): - is_tag = False - tag_ref= None - tag_guid=None - nentries = 0 - runs=[] - evts=[] - do_close = True - if isinstance(fname, str): - f = self._root_open(fname, raw=False) - else: - f = fname - do_close = False - schema = f.Get('Schema') if f else None - if schema: - is_tag = True - # note: we used to use .rstrip('\0') b/c of the change in - # semantics in PyROOT (char[] and const char* may not mean - # the same thing) - # see https://savannah.cern.ch/bugs/?100920 for the gory details - # but in the end, we use ctypes... - # see https://savannah.cern.ch/bugs/?101200 for the gory details - import ctypes - tag_ref = str(ctypes.c_char_p(schema.m_eventRefColumnName).value) - del schema - metadata= f.Get('CollectionMetadata') if f else None - if metadata: - metadata.GetEntry(0) - # note: we used to use .rstrip('\0') b/c of the change in - # semantics in PyROOT (char[] and const char* may not mean - # the same thing) - # see https://savannah.cern.ch/bugs/?100920 for the gory details - # but in the end, we use ctypes... - # see https://savannah.cern.ch/bugs/?101200 for the gory details - # - # make sure it is what we think it is - import ctypes - key_name = str(ctypes.c_char_p(metadata.Key).value) - assert key_name == 'POOLCollectionID' - tag_guid = str(ctypes.c_char_p(metadata.Value).value) - del metadata - coll_tree = f.Get('POOLCollectionTree') if f else None - if coll_tree: - nentries = coll_tree.GetEntries() - if evtmax in (-1, None): - evtmax = nentries - evtmax = int(evtmax) - for row in range(evtmax): - if coll_tree.GetEntry(row) < 0: - break - # With root 5.34.22, trying to access leaves of a - # fundamental type like this gives an error: - # TypeError: attempt to bind ROOT object w/o class - # Rewrite like this for now to work around the problem. - #runnbr = coll_tree.RunNumber - runnbr = coll_tree.GetBranch('RunNumber').GetListOfLeaves()[0].GetValueLong64() - runs.append(runnbr) - #evtnbr = coll_tree.EventNumber - evtnbr = coll_tree.GetBranch('EventNumber').GetListOfLeaves()[0].GetValueLong64() - evts.append(evtnbr) - del coll_tree - if f and do_close: - f.Close() - del f - return (is_tag, tag_ref, tag_guid, nentries, runs, evts) - - def _is_empty_pool_file(self, fname): - is_empty = False - do_close = True - if isinstance(fname, str): - f = self._root_open(fname, raw=False) - else: - f = fname - do_close = False - payload = f.Get('CollectionTree') if f else None - if payload: - is_empty = False - else: - is_empty = True - del payload - - if f and do_close: - f.Close() - del f - return is_empty - - def _process_call(self, fname, evtmax, projects=['AtlasCore']): - msg = self.msg() - f = _create_file_infos() - protocol, _ = self.server.fname(fname) - f_raw = self._root_open(fname, raw=True) - if f_raw is None or not f_raw: - raise IOError( - errno.ENOENT, - 'No such file or directory', - fname) - f_root = f_raw - try: - file_type, file_name = self.server.ftype(f_raw) - - protocol,file_name = self.server.fname(fname) - f['file_md5sum'] = self.server.md5sum(f_raw) - f['file_name'] = file_name - f['file_type'] = file_type - f['file_size'] = f_raw.GetSize() - if file_type == 'pool': - f_root = self._root_open(fname, raw=False) - # POOL files are most nutritious when known to PoolFileCatalog.xml - # FIXME: best would be to do that in athfile_peeker.py but - # athena.py closes sys.stdin when in batch, which confuses - # PyUtils:subprocess.getstatusoutput - # - # ATEAM-192: avoid the PoolFileCatalog.xml conflict - #cmd = ['pool_insertFileToCatalog.py', - # file_name,] - #subprocess.call(cmd, env=self._sub_env) - # - if True: - is_tag, tag_ref, tag_guid, nentries, runs, evts = self._is_tag_file(f_root, evtmax) - if is_tag: - f['stream_names'] = ['TAG'] - f['file_guid'] = tag_guid - f['nentries'] = nentries - f['run_number'] = runs - f['evt_number'] = evts - else: - import tempfile - fd_pkl,out_pkl_fname = tempfile.mkstemp(suffix='.pkl') - os.close(fd_pkl) - if os.path.exists(out_pkl_fname): - os.remove(out_pkl_fname) - print ("\n --------- running Athena peeker") - print (os.environ.get('CMTPATH','')) - - import AthenaCommon.ChapPy as api - app = api.AthenaApp(cmdlineargs=["--nprocs=0"]) - app << """ - FNAME = %s - """ % str([file_name]) - app << """ - import os - # prevent from running athena in interactive mode (and freeze) - if 'PYTHONINSPECT' in os.environ: - del os.environ['PYTHONINSPECT'] - - - include('AthenaPython/athfile_peeker.py') - from AthenaCommon.AlgSequence import AlgSequence - job = AlgSequence() - # we don't really need this... - job.peeker.outfname='%(outfname)s' - job.peeker.infname='%(infname)s' - - # metadata + taginfo - import IOVDbSvc.IOVDb - - # evt-max - theApp.EvtMax = %(evtmax)i - """ % { - 'infname' : file_name, - 'outfname': out_pkl_fname, - 'evtmax': evtmax, - } - import uuid - stdout_fname = ( - 'athfile-%i-%s.log.txt' % - (os.getpid(), uuid.uuid4()) - ) - stdout = open(stdout_fname, "w") - print ("="*80, file=stdout) - print (self._sub_env, file=stdout) - print ("="*80, file=stdout) - stdout.flush() - if DEFAULT_AF_RUN: - sc = app.run(stdout=stdout, env=self._sub_env) - else: - import PyUtils.FilePeekerTool as fpt - fp = fpt.FilePeekerTool(f_root) - sc, fp_pkl_fname = fp.run() - # revert to athena sub-process in case of file with old schema - if sc == 0: - out_pkl_fname = fp_pkl_fname - else: - sc = app.run(stdout=stdout, env=self._sub_env) - stdout.flush() - stdout.close() - import AthenaCommon.ExitCodes as ath_codes - if sc == 0: - #import shelve - import PyUtils.dbsqlite as dbsqlite - msg.info('extracting infos from [%s]...', - out_pkl_fname) - db = dbsqlite.open(out_pkl_fname) - msg.info('keys: %s',db.keys()) - f.update(db['fileinfos']) - db.close() - msg.info('extracting infos from [%s]... [ok]', - out_pkl_fname) - os.remove(stdout.name) - else: - # maybe an empty file - # trust but verify - if not self._is_empty_pool_file(f_root): - # actually a problem in athena ! - from textwrap import dedent - err = dedent(""" - %s - problem running chappy! - code: [%s (%s)] - what: [%s] - => corrupted input file ? - %s - logfile: [%s] - """% (":"*25, - sc,errno.errorcode.get(sc,sc), - ath_codes.codes.get(sc,sc), - ":"*25, - stdout.name - )) - msg.error(err) - raise IOError(sc, err) - msg.info('athena failed to initialize.') - msg.info('=> probably an empty input POOL file') - # TAG-file - else: # bytestream - bs_fileinfos = self._process_bs_file(file_name, - evtmax=evtmax, - full_details=False) - del bs_fileinfos['file_name'] - del bs_fileinfos['file_size'] - del bs_fileinfos['file_type'] - del bs_fileinfos['file_md5sum'] - f.update(bs_fileinfos) - finally: - try: - f_raw.Close() - f_root.Close() - del f_raw - del f_root - except Exception as err: - msg.warning( - 'problem while closing raw and root file handles:\n%s', - err - ) - return f - - def __call__(self, fname, evtmax): - import re - import PyUtils.Helpers as H - with H.ShutUp(filters=[re.compile('.*')]): - f = self._process_call(fname, evtmax, projects=None) - - return f - - def _process_bs_file (self, fname, evtmax=1, full_details=True): - msg = self.msg() - import eformat as ef - - data_reader = ef.EventStorage.pickDataReader(fname) - assert data_reader, \ - 'problem picking a data reader for file [%s]'%fname - - beam_type = '<beam-type N/A>' - try: - beam_type = data_reader.beamType() - except Exception: - msg.warning ("problem while extracting beam-type information") - pass - - beam_energy = '<beam-energy N/A>' - try: - beam_energy = data_reader.beamEnergy() - except Exception: - msg.warning ("problem while extracting beam-type information") - pass - - bs = ef.istream(fname) - - file_infos = _create_file_infos() - nentries = bs.total_events - file_infos['nentries'] = nentries - import uuid - def _uuid(): - return str(uuid.uuid4()).upper() - bs_metadata = {} - for md in data_reader.freeMetaDataStrings(): - if md.startswith('Event type:'): - k = 'evt_type' - v = [] - if 'is sim' in md: v.append('IS_SIMULATION') - else: v.append('IS_DATA') - if 'is atlas' in md: v.append('IS_ATLAS') - else: v.append('IS_TESTBEAM') - if 'is physics' in md: v.append('IS_PHYSICS') - else: v.append('IS_CALIBRATION') - bs_metadata[k] = tuple(v) - elif md.startswith('GeoAtlas:'): - k = 'geometry' - v = md.split('GeoAtlas:')[1].strip() - bs_metadata[k] = v - elif md.startswith('IOVDbGlobalTag:'): - k = 'conditions_tag' - v = md.split('IOVDbGlobalTag:')[1].strip() - bs_metadata[k] = v - elif '=' in md: - k,v = md.split('=') - bs_metadata[k] = v - - # for bwd/fwd compat... - # see: https://savannah.cern.ch/bugs/?73208 - for key_name,fct_name in ( - ('GUID','GUID'), - ('Stream','stream'), - ('Project', 'projectTag'), - ('LumiBlock', 'lumiblockNumber'), - ('run_number', 'runNumber'), - ): - if key_name in bs_metadata: - # no need: already in bs metadata dict - continue - if hasattr(data_reader, fct_name): - v = getattr(data_reader, fct_name)() - bs_metadata[key_name] = v - # for bwd/fwd compat... -- END - - # fix for ATEAM-122 - if len(bs_metadata.get('evt_type','')) == 0 : # see: ATMETADATA-6 - evt_type = ['IS_DATA', 'IS_ATLAS'] - if bs_metadata.get('Stream', '').startswith('physics_'): - evt_type.append('IS_PHYSICS') - elif bs_metadata.get('Stream', '').startswith('calibration_'): - evt_type.append('IS_CALIBRATION') - elif bs_metadata.get('Project', '').endswith('_calib'): - evt_type.append('IS_CALIBRATION') - else: - evt_type.append('Unknown') - bs_metadata['evt_type'] = evt_type - - file_infos['file_guid'] = bs_metadata.get('GUID', _uuid()) - file_infos['evt_type'] = bs_metadata.get('evt_type', []) - file_infos['geometry'] = bs_metadata.get('geometry', None) - file_infos['conditions_tag'] = bs_metadata.get('conditions_tag', None) - file_infos['bs_metadata'] = bs_metadata - - if not data_reader.good(): - # event-less file... - file_infos['run_number'].append(bs_metadata.get('run_number', 0)) - file_infos['lumi_block'].append(bs_metadata.get('LumiBlock', 0)) - # FIXME: not sure how to do that... - return file_infos - - if evtmax == -1: - evtmax = nentries - - ievt = iter(bs) - for i in range(evtmax): - try: - evt = next(ievt) - evt.check() # may raise a RuntimeError - stream_tags = [dict(stream_type=tag.type, - stream_name=tag.name, - obeys_lbk=bool(tag.obeys_lumiblock)) - for tag in evt.stream_tag()] - file_infos['run_number'].append(evt.run_no()) - file_infos['evt_number'].append(evt.global_id()) - file_infos['lumi_block'].append(evt.lumi_block()) - file_infos['run_type'].append(ef.helper.run_type2string(evt.run_type())) - file_infos['beam_type'].append(beam_type) - file_infos['beam_energy'].append(beam_energy) - file_infos['stream_tags'].extend(stream_tags) - - except RuntimeError as err: - print ("** WARNING ** detected a corrupted bs-file:\n",err) - """ - detailed dump how-to: - --------------------- - import eformat as ef - import eformat.dump as edump - edump.event_callback.append (('.+', edump.fullevent_handler)) - edump.dump (stream=ef.istream(fname), skip=0, total=0) - """ - return file_infos - - pass # class FilePeeker - -### globals -g_server = AthFileServer() - -def _do_fopen(fname): - self = g_server - evtmax= getattr(g_server, '_evtmax', 1) - return self._fopen_file(fname, evtmax) diff --git a/Tools/PyUtils/python/AthFile/timerdecorator.py b/Tools/PyUtils/python/AthFile/timerdecorator.py deleted file mode 100644 index 1b4e0f02cb37e81666e26238156f804fcd800d50..0000000000000000000000000000000000000000 --- a/Tools/PyUtils/python/AthFile/timerdecorator.py +++ /dev/null @@ -1,61 +0,0 @@ -# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration - -# @file timerdecorator.py -# @purpose decorate functions which will have a limited alloted time to finish execution -# @date February 2011 -# ripped off from: -# http://code.activestate.com/recipes/483752/ - -import sys -import os -import threading - -if 'linux' in sys.platform.lower(): - def _run_from_valgrind(): - """ - helper function to detect if one runs under valgrind or not - """ - for l in open('/proc/self/maps'): - if '/valgrind' in l: - return True - return False - -else: # mac-os - def _run_from_valgrind(): - """ - helper function to detect if one runs under valgrind or not - """ - return 'VALGRIND_STARTUP_PWD' in os.environ - -class TimeoutError(Exception): - pass - -def timelimit(timeout): - def internal(function): - def internal2(*args, **kw): - class Calculator(threading.Thread): - def __init__(self): - threading.Thread.__init__(self) - self.result = None - self.error = None - - def run(self): - try: - self.result = function(*args, **kw) - except BaseException: - self.error = sys.exc_info()[0] - - c = Calculator() - c.start() - if _run_from_valgrind(): - # don't set any timeout under valgrind... - c.join() - else: - c.join(timeout) - if c.is_alive(): - raise TimeoutError - if c.error: - raise c.error - return c.result - return internal2 - return internal diff --git a/Tools/PyUtils/python/AthFileLite.py b/Tools/PyUtils/python/AthFileLite.py deleted file mode 100644 index 8740c9f0930698c4f68c6f32b6dde39a2ed6fd81..0000000000000000000000000000000000000000 --- a/Tools/PyUtils/python/AthFileLite.py +++ /dev/null @@ -1,440 +0,0 @@ -# Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration - -# Lightweight and simplified version of AthFile -# As the transform knows which files are bytestream and which are -# POOL files we just have two simple classes and definately avoid -# doing anything fancy here - -import os -import os.path -import re -import subprocess -import sys -import uuid - -import PyUtils.dbsqlite as dbsqlite -from PyUtils.Logging import msg - -def _create_file_info_template(): - """simple helper function to create consistent dicts for the - fileinfos attribute of AthFile - """ - d = { - 'file_md5sum': None, - 'file_name': None, - 'file_size': None, - 'file_type': None, - 'file_guid': None, - 'nentries' : 0, # to handle empty files - 'run_number': [], - 'run_type': [], - 'evt_type': [], - 'evt_number': [], - 'lumi_block': [], - 'beam_energy': [], - 'beam_type': [], - 'stream_tags': [], - 'metadata_items': None, - 'eventdata_items': None, - 'stream_names': None, - 'geometry': None, - 'conditions_tag': None, - 'det_descr_tags': None, - ## - 'metadata': None, - 'tag_info': None, - } - return d - - -def _urlType(filename): - if filename.startswith('dcap:'): - return 'dcap' - if filename.startswith('root:'): - return 'root' - if filename.startswith('https:'): - return 'https' - if filename.startswith('rfio:'): - return 'rfio' - if filename.startswith('file:'): - return 'posix' - if filename.startswith('davs:'): - return 'root' - return 'posix' - - -def _get_file_size(filename): - if _urlType(filename) == 'posix': - try: - fsize = os.stat(filename)[6] - except IOError: - fsize = None - else: - from PyUtils.RootUtils import import_root - root = import_root() - try: - msg.debug('Calling TFile.Open for {0}'.format(filename)) - pos = filename.find("?") - if pos>=0: - extraparam = '&filetype=raw' - else: - extraparam = '?filetype=raw' - - file = root.TFile.Open(filename + extraparam, 'READ') - fsize = file.GetSize() - msg.debug('Got size {0} from TFile.GetSize'.format(fsize)) - except ReferenceError: - msg.error('Failed to get size of {0}'.format(filename)) - fsize = None - - file.Close() - del root - return fsize - - -class AthPoolFile(object): - def __init__(self, filename): - self._filename = filename - if self._filename: - self._stub = os.path.basename(self._filename) + '-' + str(uuid.uuid4()) - else: - self._stub = str(uuid.uuid4()) - self._jobOptionsFile = self._stub + '-miniAthFile.py' - self._infoOutputFile = self._stub + '-miniAthFile.db' - self._logFile = self._stub + '-miniAthFile.log' - - self._metadata = _create_file_info_template() - self._error = False - self.fopen() - - - def fopen(self): - self._writeMiniJobOptions() - self._runMiniAthena() - self._loadFileInfo() - self._metadata['file_type'] = 'pool' - self._metadata['file_size'] = _get_file_size(self._filename) - - - @property - def fileinfo(self): - return self._metadata - - def _writeMiniJobOptions(self): - try: - jo = open(self._jobOptionsFile, "w") - - print(os.linesep.join(("FNAME=['{filename}']", - "import os", - "os.environ.pop('PYTHONINSPECT', None)", - "include('AthenaPython/athfile_peeker.py')", - "from AthenaCommon.AlgSequence import AlgSequence", - "job = AlgSequence()", - "job.peeker.outfname='{picklename}'", - "job.peeker.infname=FNAME[0]", - "import IOVDbSvc.IOVDb", - "theApp.EvtMax = 1")).format(filename=self._filename, picklename=self._infoOutputFile), file=jo) - - except Exception as e: - print("Exception raised when writing JO file: {0}".format(e), file=sys.stderr) - self._error = True - raise - - def _runMiniAthena(self): - out = open(self._logFile, 'wb') - try: - athenv = os.environ.copy() - subprocess.check_call(['athena.py', self._jobOptionsFile], stdout=out, stderr=out, env=athenv) - except subprocess.CalledProcessError: - # Don't delete log files if errors occured - self._error = True - raise - - - def _loadFileInfo(self): - db = dbsqlite.open(self._infoOutputFile) - self._metadata = db['fileinfos'] - - def _getSize(self): - # FIXME Probably need to use ROOT for non-posix fs - try: - self._metadata['file_size'] = os.stat(self._filename)[6] - except IOError: - self._metadata['file_size'] = None - - def __del__(self): - if ('AFDEBUG' not in os.environ) and (not self._error): - for fname in (self._jobOptionsFile, self._infoOutputFile, self._logFile): - try: - os.unlink(fname) - except (OSError, IOError): - pass - - -class AthBSFile(object): - def __init__(self, filename): - self._filename = filename - self._metadata = _create_file_info_template() - self.fopen() - - def fopen(self): - self._process_bs_file(self._filename) - self._metadata['file_type'] = 'bs' - self._metadata['file_size'] = _get_file_size(self._filename) - - @property - def fileinfo(self): - return self._metadata - - - def _process_bs_file (self, fname, evtmax=1, full_details=True): - import eformat as ef - - data_reader = ef.EventStorage.pickDataReader(fname) - assert data_reader, \ - 'problem picking a data reader for file [%s]'%fname - - beam_type = '<beam-type N/A>' - try: - beam_type = data_reader.beamType() - except Exception: - msg.warning ("problem while extracting beam-type information") - - beam_energy = '<beam-energy N/A>' - try: - beam_energy = data_reader.beamEnergy() - except Exception: - msg.warning ("problem while extracting beam-type information") - - bs = ef.istream(fname) - - self._metadata['nentries'] = bs.total_events - - bs_metadata = {} - - for md in data_reader.freeMetaDataStrings(): - if md.startswith('Event type:'): - k = 'evt_type' - v = [] - if 'is sim' in md: v.append('IS_SIMULATION') - else: v.append('IS_DATA') - if 'is atlas' in md: v.append('IS_ATLAS') - else: v.append('IS_TESTBEAM') - if 'is physics' in md: v.append('IS_PHYSICS') - else: v.append('IS_CALIBRATION') - bs_metadata[k] = tuple(v) - elif md.startswith('GeoAtlas:'): - k = 'geometry' - v = md.split('GeoAtlas:')[1].strip() - bs_metadata[k] = v - elif md.startswith('IOVDbGlobalTag:'): - k = 'conditions_tag' - v = md.split('IOVDbGlobalTag:')[1].strip() - bs_metadata[k] = v - elif '=' in md: - k,v = md.split('=') - bs_metadata[k] = v - - # for bwd/fwd compat... - # see: https://savannah.cern.ch/bugs/?73208 - # needed for very old BS - for key_name,fn_name in ( - ('GUID','GUID'), - ('Stream','stream'), - ('Project', 'projectTag'), - ('LumiBlock', 'lumiblockNumber'), - ('run_number', 'runNumber'), - ): - if key_name in bs_metadata: - # no need: already in bs metadata dict - continue - if hasattr(data_reader, fn_name): - bs_metadata[key_name] = getattr(data_reader, fn_name)() - - self._metadata['file_guid'] = bs_metadata.get('GUID', None) - self._metadata['evt_type'] = bs_metadata.get('evt_type', []) - self._metadata['geometry'] = bs_metadata.get('geometry', None) - self._metadata['conditions_tag'] = bs_metadata.get('conditions_tag', None) - self._metadata['bs_metadata'] = bs_metadata - - if not data_reader.good(): - # event-less file... - self._metadata['run_number'].append(bs_metadata.get('run_number', 0)) - self._metadata['lumi_block'].append(bs_metadata.get('LumiBlock', 0)) - return - - if evtmax == -1: - evtmax = bs.total_events - - ievt = iter(bs) - for i in range(evtmax): - try: - evt = next(ievt) - evt.check() # may raise a RuntimeError - stream_tags = [dict(stream_type=tag.type, - stream_name=tag.name, - obeys_lbk=bool(tag.obeys_lumiblock)) - for tag in evt.stream_tag()] - self._metadata['run_number'].append(evt.run_no()) - self._metadata['evt_number'].append(evt.global_id()) - self._metadata['lumi_block'].append(evt.lumi_block()) - self._metadata['run_type'].append(ef.helper.run_type2string(evt.run_type())) - self._metadata['beam_type'].append(beam_type) - self._metadata['beam_energy'].append(beam_energy) - self._metadata['stream_tags'].extend(stream_tags) - - except RuntimeError as err: - print("** WARNING ** detected a corrupted bs-file:\n",err) - - -class AthTagFile(object): - def __init__(self, filename): - self._filename = filename - self._metadata = _create_file_info_template() - - self.fopen() - - def fopen(self): - self._process_tag_file() - self._metadata['file_type'] = 'tag' - self._metadata['file_size'] = _get_file_size(self._filename) - - @property - def fileinfo(self): - return self._metadata - - def _process_tag_file(self, evtmax=1): - tag_guid=None - nentries = 0 - runs=[] - evts=[] - - try: - from PyUtils.RootUtils import import_root - root = import_root() - f = root.TFile.Open(self._filename, 'READ') - - metadata= f.Get('CollectionMetadata') if f else None - if metadata: - metadata.GetEntry(0) - # note: we used to use .rstrip('\0') b/c of the change in - # semantics in PyROOT (char[] and const char* may not mean - # the same thing) - # see https://savannah.cern.ch/bugs/?100920 for the gory details - # but in the end, we use ctypes... - # see https://savannah.cern.ch/bugs/?101200 for the gory details - # - # make sure it is what we think it is - import ctypes - key_name = str(ctypes.c_char_p(metadata.Key).value) - assert key_name == 'POOLCollectionID' - tag_guid = str(ctypes.c_char_p(metadata.Value).value) - del metadata - coll_tree = f.Get('POOLCollectionTree') if f else None - if coll_tree: - nentries = coll_tree.GetEntries() - if evtmax in (-1, None): - evtmax = nentries - evtmax = int(evtmax) - for row in range(evtmax): - if coll_tree.GetEntry(row) < 0: - break - runnbr = coll_tree.RunNumber - runs.append(runnbr) - evtnbr = coll_tree.EventNumber - evts.append(evtnbr) - del coll_tree - f.Close() - del f - - self._metadata['stream_names'] = ['TAG'] - self._metadata['file_guid'] = tag_guid - self._metadata['nentries'] = nentries - self._metadata['run_number'] = runs - self._metadata['evt_number'] = evts - except Exception as e: - print("Exception raised when processing TAG file {0}: {1}".format(self._filename, e), file=sys.stderr) - raise - - def _getSize(self): - # FIXME Probably need to use ROOT for non-posix fs - try: - self._metadata['file_size'] = os.stat(self._filename)[6] - except IOError: - self._metadata['file_size'] = None - - -class AthInpFile(object): - def __init__(self, filename): - self._filename = filename - self._metadata = _create_file_info_template() - - self.fopen() - - def fopen(self): - self._process_inp_file() - self._metadata['file_type'] = 'pool' - self._metadata['file_size'] = _get_file_size(self._filename) - - @property - def fileinfo(self): - return self._metadata - - def _process_inp_file(self): - pool_guid = None - nentries = 0 - try: - from PyUtils.RootUtils import import_root - root = import_root() - f = root.TFile.Open(self._filename, 'READ') - - if f: - # Get the number of entries in the file - pool_cont_token = re.compile(f'{root.APRDefaults.TTreeNames.DataHeader}(?!Form)|{root.APRDefaults.RNTupleNames.DataHeader}').match - - for key in f.GetListOfKeys(): - key_name = key.GetName() - match = pool_cont_token(key_name) - if not match: - continue - obj = f.Get(key_name) - if isinstance(obj, root.TTree): - nentries = obj.GetEntriesFast() - break - elif isinstance(obj, root.Experimental.RNTuple): - reader = root.Experimental.RNTupleReader.Open(obj) - nentries = reader.GetNEntries() - break - - pool = f.Get('##Params') - if not isinstance(pool, root.TTree): - raise NotImplementedError(f"Cannot extract ##Params from object of type {type(pool)!r}") - if pool: - pool_token = re.compile(r'\[NAME=(?P<name>.*?)\]' - r'\[VALUE=(?P<value>.*?)\]').match - params = [] - for entry in pool: - param = entry.GetLeaf('db_string').GetValueString() - match = pool_token(param) - if not match: - continue - d = match.groupdict() - params.append((d['name'], d['value'])) - if d['name'].lower() == 'fid': - pool_guid = d['value'] - del pool - f.Close() - del f - - self._metadata['file_guid'] = pool_guid - self._metadata['nentries'] = nentries - except Exception as e: - print("Exception raised when processing POOL file {0}: {1}".format(self._filename, e), file=sys.stderr) - raise - - def _getSize(self): - # FIXME Probably need to use ROOT for non-posix fs - try: - self._metadata['file_size'] = os.stat(self._filename)[6] - except IOError: - self._metadata['file_size'] = None diff --git a/Trigger/TrigValidation/TrigValTools/python/TrigValSteering/CheckSteps.py b/Trigger/TrigValidation/TrigValTools/python/TrigValSteering/CheckSteps.py index 7707dd81f1f78e4de44cec50173ddba3f505c122..6ca70cdd73567f3bc428391453c9967ca4d53a81 100644 --- a/Trigger/TrigValidation/TrigValTools/python/TrigValSteering/CheckSteps.py +++ b/Trigger/TrigValidation/TrigValTools/python/TrigValSteering/CheckSteps.py @@ -765,7 +765,6 @@ def default_check_steps(test): if not get_step_from_list('LogMerge', check_steps): for step in reco_tf_steps: reco_tf_logmerge.log_files.append(step.get_log_file_name()) - reco_tf_logmerge.extra_log_regex = r'athfile-.*\.log\.txt' reco_tf_logmerge.merged_name = 'athena.merged.log' log_to_zip = reco_tf_logmerge.merged_name if log_to_check is not None: