Commit b6e98f77 authored by Graeme Stewart's avatar Graeme Stewart
Browse files

* trf.py

      - Correct dbrelease message formating
    * Tag as PyJobTransformsCore-00-09-42 (PyJobTransformsCore-00-09-42)
parent cdd8b5da
package PyJobTransformsCore
author Alvin Tan <clat@hep.ph.bham.ac.uk>
use AtlasPolicy AtlasPolicy-*
use AtlasPython AtlasPython-* External -no_auto_imports
# default directory in current package to look for trf's.
macro trfs_dir '../scripts'
# command used to expand filenames to include package name
macro expand_files_cmd expand_files.py
apply_pattern declare_python_modules files="*.py"
apply_pattern declare_scripts files="${expand_files_cmd} slimmetadata checklog.py find_*.py trf_ls -s=../python envutil.py"
apply_pattern generic_declare_for_link kind=runtime files='-s=../share *.db' prefix=share name=trf
# Pattern to declare python jobtransforms.
# Each jobtransform normally has 2 components:
# - The python script (*_trf.py) defining the trf
# - The corresponding skeleton joboptions file(s)
# The pattern takes 2 arguments:
# trfs = list of jobtransforms, by default taken from ../scripts
# It will be installed in as python modules in InstallArea/python/<package>
# and as executable scripts in InstallArea/share/bin
# jo = list of skeleton joboptions files belonging to the jobtransforms (usually one).
# By default taken from ../share
# It will be installed in the Installarea/jobOptions/<package>
#
pattern declare_jobtransforms \
private ; \
apply_pattern generic_declare_for_link kind=trfs_exe files='-s=${trfs_dir} <trfs>' prefix=share/bin ; \
apply_pattern generic_declare_for_link kind=trfs_pyt files='-s=${trfs_dir} <trfs>' prefix=python/<package> ; \
apply_pattern generic_declare_for_link kind=trfs_jop files='-s=../share <jo>' prefix=jobOptions/<package> ; \
macro <package>_jobtransforms "`${expand_files_cmd} -r=$(<PACKAGE>ROOT) -d=<package> -s=${trfs_dir} <trfs>`" ; \
apply_pattern install_python_init ; \
macro_append <package>_python_init_dependencies " install_trfs_pyt " ; \
end_private ; \
macro_append all_jobtransforms " ${<package>_jobtransforms}"
This diff is collapsed.
This diff is collapsed.
# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
def CVSkeywords( listOfKeyWords ):
"""Take as input a list of strings containing CVS keywords of the form: \"$<name>:<value>$\"
It will return a dictionary with <name>,<value> as key,value pairs.
See cvs manual for possible keywords and their meaning."""
kwDict = {}
for kw in listOfKeyWords:
# CVS keywords are embedded between 2 '$' signs
dollar1 = kw.find('$')
dollar2 = kw.find('$',dollar1+1)
if dollar1 == -1 or dollar2 == -1:
print "WARNING: %s is not a CVS keyword (it should have 2 '$' signs)" % kw
continue
# get part in between the 2 '$' signs
cvskw = kw[dollar1+1:dollar2]
# split name and value
value = ''
colon = cvskw.find(':')
if colon == -1:
# no value. Full string is name
name = cvskw.strip()
else:
# name is part before colon
name = cvskw[:colon].strip()
# value is part after colon
if colon + 1 < len(cvskw):
value = cvskw[colon+1:].strip()
if not name:
print "WARNING: \"%s\" is not a CVS keyword (it should have a name after the first $" % kw
continue
kwDict[name] = value
return kwDict
# Apply it to this module
CVSkeywordsMap = CVSkeywords( ["$Id: CVSutil.py,v 1.2 2009-01-29 12:04:16 ctan Exp $" ,
"$Name: not supported by cvs2svn $" ,
"$Revision: 285339 $"] )
__version__ = CVSkeywordsMap["Revision"]
__author__ = "clat@hep.ph.bham.ac.uk"
del CVSkeywordsMap
# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
__doc__ = """FilePreStager for the castor tape system"""
__all__ = []
from FilePreStager import *
import os,re,time,commands
import rfio
from envutil import *
# mapping of Castor stager_qry output to FilePreStager stati.
_stager_qry_status_words = { 'STAGED' : FilePreStager.STAGED ,
'CANBEMIGR' : FilePreStager.STAGED ,
'STAGEIN' : FilePreStager.STAGEIN ,
'INVALID_STATUS' : FilePreStager.INVALID ,
'not in stager' : FilePreStager.ONTAPE }
_stager_qry_statusRE = '|'.join( _stager_qry_status_words.keys() )
_stager_qry_output_patterns = [ \
r'^%%s\s+\S+\s+(?P<status>%s)$' % (_stager_qry_statusRE) ,
r'^Error [0-9]+.*?\(File\s+%%s\s+(?P<status>%s)\s*\)$' % (_stager_qry_statusRE) ]
class CastorPreStager(FilePreStager):
def __init__(self,name=None,filenamePattern=r'^/castor/|^rfio:/castor/|^castor:', maxWaitingTime=120*60, pollingInterval=5*60):
FilePreStager.__init__(self,filenamePattern,maxWaitingTime, pollingInterval, name)
def _writeOutFileList(self, listOfFiles, filename='castorFiles.list'):
'''Write list of files for castor to operate on to a file - prevents problems with excessive command line length'''
try:
f = open(filename, 'w')
for file in listOfFiles:
print >>f, self.removePrefix(file)
f.close()
except IOError, (errno, errMsg):
raise FilePreStageError('Got IOError writing out list of files to stage as %s: %s' % (filename, errMsg))
return filename
def getFilesStatus(self,listOfFiles):
"""Return a dictionary containing the status of each file in <listOfFiles>.
The key is the filename and the value is the status."""
if not listOfFiles: return {}
if isinstance(listOfFiles, str):
listOfFiles = [ listOfFiles ]
fileList = self._writeOutFileList(listOfFiles)
# compatibility with single filename
# Write list of files to a file, so prevent
castor_cmd = 'stager_qry'
if not find_executable(castor_cmd):
raise FilePreStageError( "%s not found in PATH" % (castor_cmd) )
cmd = '%s -f %s' % (castor_cmd, fileList)
# execute command
self.printDebug(cmd)
stat,output = commands.getstatusoutput( cmd )
self.printVerbose(output)
if stat:
raise FilePreStageError( "Error executing %s (output=%s)" % (cmd,output) )
# analyse output
statusDict = {}
for f in listOfFiles:
for pat in _stager_qry_output_patterns:
patF = pat % self.removePrefix(f)
match = re.search( patF, output, re.MULTILINE )
if match:
status = _stager_qry_status_words[match.group('status')]
break
else:
status = FilePreStager.UNKNOWN
statusDict[f] = status
return statusDict
def preStageFiles(self,listOfFiles):
if not listOfFiles: return
if isinstance(listOfFiles, str):
listOfFiles = [ listOfFiles ]
# compatibility with single filename
fileList = self._writeOutFileList(listOfFiles)
castor_cmd = 'stager_get'
if not find_executable(castor_cmd):
raise FilePreStageError( "%s not found in PATH" % (castor_cmd) )
cmd = '%s -f %s' % (castor_cmd, fileList)
# execute command
self.printDebug(cmd)
stat,output = commands.getstatusoutput( cmd )
self.printVerbose(output)
if stat:
raise FilePreStageError( "Error executing %s (output=%s)" % (cmd,output) )
#
# setup for castor2 for atlas using the env values for STAGE_HOST and STAGE_SVCCLASS
#
print "Setting up castor 2 for ATLAS ..."
castorEnv = {}
castorEnv['RFIO_USE_CASTOR_V2']='YES'
if os.getenv('STAGE_HOST')!= None:
castorEnv['STAGE_HOST']=os.getenv('STAGE_HOST')
else:
castorEnv['STAGE_HOST']='castoratlas'
if os.getenv('STAGE_SVCCLASS')!= None:
castorEnv['STAGE_SVCCLASS']=os.getenv('STAGE_SVCCLASS')
else:
castorEnv['STAGE_SVCCLASS']='default'
for n,v in castorEnv.items():
print "%s=%s" % (n,v)
os.environ.update(castorEnv)
theCastorPreStager = CastorPreStager()
# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
import os,sys, inspect
from PyJobTransformsCore.trfutil import *
from PyJobTransformsCore.trferr import *
#from AthenaCommon.AppMgr import NO_EVSEL,MONTECARLO
NO_EVSEL = -99
MONTECARLO = 1
ExcludedFiles = set()
def cleanUpEnv( envDict ):
"""Private variables, builtin objects and modules are removed from envDict"""
for var, obj in envDict.items():
if var.startswith( '_' ) or inspect.isbuiltin( obj ) or inspect.ismodule( obj ):
envDict.pop( var )
def supplementEnv( envDict ):
"""Supplement envDict with some elements from this module and several AthenaCommon modules.
Private variables, builtin objects and modules are omitted."""
envDict.update( {
'theApp' : theApp ,
'Service' : Service ,
'Algorithm' : Algorithm ,
'MONTECARLO' : MONTECARLO ,
'NO_EVSEL' : NO_EVSEL ,
'AuditorSvc' : lambda : Service("AuditorSvc") ,
'MessageSvc' : lambda : Service("MessageSvc") } )
# add some more stuff to the environment
for athmod in [ 'SystemOfUnits', 'PhysicalConstants', 'Constants' ]:
mod = __import__( 'AthenaCommon.' + athmod, globals(), locals(), ['*'] )
for var, obj in mod.__dict__.iteritems():
if var.startswith( '__' ) or inspect.isbuiltin( obj ) or inspect.ismodule( obj ):
continue
envDict[var] = obj
def Service(name):
return theApp.service(name)
def Algorithm(name):
return theApp.algorithm(name)
def include( filename ):
"""execute the given filename in a given environment."""
if filename in ExcludedFiles:
return
if os.path.isabs( filename ):
full = filename
else:
full = find_joboptions( filename )
if not full:
raise JobOptionsNotFoundError( filename )
try:
_e = include.env
except AttributeError:
_e = {}
execfile( full, _e )
def exclude( filename ):
ExcludedFiles.add( filename )
# for backward compatibility
include.block = exclude
class FakeProperty:
def __init__(self,name,type):
self.name = name
self.type= type
def __str__(self):
me = [ '%s(\"%s\")' % (self.type,self.name) ]
for n,v in self.__dict__.items():
if not n.startswith('__') and n != 'name' and n != 'type':
me += [ " %s=%s" % (n,v) ]
return os.linesep.join(me)
def __setattr__(self,name,value):
self.__dict__[name] = value
# for AlgTools add each algtool
if name == 'AlgTools':
for tool in value:
toolname = os.path.basename(tool)
self.__dict__[toolname] = FakeProperty(tool,'AlgTool')
def __getattr__(self,name):
# default is needed for += (only for lists, hopefully...)
setattr(self,name,[])
return getattr(self,name)
class FakeAppMgr(TransformLogger):
def __init__(self, outputLevel = 3, jobOptions = None ):
TransformLogger.__init__(self,"FakeAppMgr")
self.__dict__['_setupValue'] = None
self.__dict__['_nEvents'] = 0
self.__dict__['_propertyRepository'] = {}
def __getattr__(self,name):
try:
return object.__getattr__(self,name)
except AttributeError:
# catch += when member does not exist yet (only for lists)
self.__dict__[name] = []
return self.__dict__[name]
def __str__(self):
me = []
for n,v in self.__dict__.items():
if not n.startswith('_') and not callable(v): me += [ "theApp.%s = %s" % (n,v) ]
me += [ 'theApp.Algorithms = %s' % self.algorithmDict().keys() ,
'theApp.Services = %s' % self.serviceDict().keys() ]
return os.linesep.join(me)
def exeJobOptions( self, filename, env ):
"""Execute jobOptions file <filename> in environment <env>"""
# prepare environment
env[ 'include' ] = include
supplementEnv( env )
include.env = env
include( filename )
cleanUpEnv( env )
def setup(self,value=NO_EVSEL):
if value == NO_EVSEL:
self.logger().error( "use of theApp.setup( NO_EVSEL ) is deprecated" )
self.logger().error( "use 'theApp.EvtSel = \"NONE\"' instead" )
elif value == MONTECARLO:
self.logger().error( "use of theApp.setup( MONTECARLO ) is deprecated" )
self.logger().error( "use 'include( \"AthenaCommon/Atlas_Gen.UnixStandardJob.py\" )' instead" )
else:
self.__dict__['_setupValue'] = value
def run(self,nEvents=None):
self._nEvents = nEvents
def propertyDict(self,type):
l = self._propertyRepository.get(type)
if not l:
l = {}
self._propertyRepository[type] = l
return l
def serviceDict(self):
return self.propertyDict('Service')
def algorithmDict(self):
return self.propertyDict('Algorithm')
def property(self,name,type):
l = self.propertyDict(type)
p = l.get(name)
if p is None:
p = FakeProperty(name,type)
l[name] = p
return p
def service(self,name):
return self.property(name,"Service")
def algorithm(self,name):
return self.property(name,"Algorithm")
theApp = FakeAppMgr()
# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
__doc__ = """A base class for pre-staging files from tape to disk. Specific tape staging
systems need to derive from this class and implement member functions
getFilesStatus(), preStageFiles()."""
import time,re,os
import rfio
from PyJobTransformsCore.TransformLogger import TransformLogger,logging
class FilePreStageError(IOError):
"""exception thrown in the case of problems"""
def __init__(self,*vargs):
IOError.__init__(self,*vargs)
class FilePreStager(TransformLogger):
STAGED = 'On disk' # file is on disk
STAGEIN = 'Being staged from tape to disk' # file is in the process of being put on disk
ONTAPE = 'Not on disk (but on tape)' # file is not on disk
NOTONTAPE = 'Not on tape' # file is not on tape
INVALID = 'Invalid' # invalid status
UNKNOWN = 'Unknown status' # unknown status
CANBEMIGR = 'Waiting for tape migration (on the stager disk)'
# list of all possible file status
fileStatus = ( STAGED, STAGEIN, ONTAPE, NOTONTAPE, INVALID, UNKNOWN, CANBEMIGR)
# list of problematic file status
problemStatus = ( NOTONTAPE, INVALID, UNKNOWN )
def __init__(self,filenamePattern,maxWaitingTime,pollingInterval,name=None):
"""<filenamePattern>: regular expression to match the filenames that are in this stager.
<pollingInterval>: default polling interval (seconds) to check file status when files are being staged from tape to disk.
<maxWaitingTime>: default maximum time (seconds) to wait for all files to be staged.
<name>: name by which this stager will be known"""
global theFileStagerRobot
if not name: name = self.__class__.__name__
TransformLogger.__init__(self,name)
self.__name = name
self.__filesToPreStage = {} # map with (key,value)=(filename,status)
self.setFilenamePattern( filenamePattern )
self.__maxWaitingTime = maxWaitingTime
self.__pollingInterval = pollingInterval
# list of files to pre-stage and their last status (key=filename,value=status)
theFileStagerRobot.addStager( self )
def _notImplemented(self,funcName):
raise FilePreStageError('%s has not implemented function %s' % (self.__class__.__name__, funcName) )
def getFilesStatus(self,listOfFiles):
"""Return a dictionary containing the status of each file in <listOfFiles>.
The key is the filename and the value is the status (one of FilePreStager.fileStatus).
<listOfFiles> is either a python list (or tuple) with filenames or a single filename.
If <listOfFiles> is empty, return an empty dictionary (and do nothing else).
Must be implemented in derived class."""
self._notImplemented(FilePreStager.getFilesStatus.__name__)
def preStageFiles(self,listOfFile):
"""Initiate staging of files from tape to disk. Function must return immediately
and not wait until the files are staged. Raise a FilePreStageError exception in case of errors.
<listOfFiles> is either a python list (or tuple) with filenames or a single filename.
If <listOfFiles> is empty, do nothing.
Must be implemented in derived class."""
self._notImplemented(FilePreStager.preStageFiles.__name__)
def name(self):
return self.__name
def fileExists(self,filename):
"""Check that the file exists in the tape system"""
return rfio.exists(self.removePrefix(filename))
def isFileInStager(self,filename):
"""Return boolean to indicate whether <filename> is known to the stager (i.e. on tape)."""
return self.__filenamePattern.search(filename) is not None
def maxWaitingTime(self):
"""Default maximum waiting time (seconds) for stageAllFilesAndWait()"""
return self.__maxWaitingTime
def pollingInterval(self):
"""Default interval (seconds) for polling the status of the files being staged"""
return self.__pollingInterval
def filenamePattern(self):
return self.__filenamePattern.pattern
def setFilenamePattern(self,pattern):
self.__filenamePattern = re.compile(pattern)
def setMaxWaitingTime(self,t):
"""Set the default maximum waiting time (seconds) for stageAllFilesAndWait()"""
self.__maxWaitingTime = t
def setPollingInterval(self,t):
"""Set the default interval (seconds) for polling the status of the files being staged"""
self.__pollingInterval = t
def removePrefix(self,filename):
"""Utility function. Remove prefix until first : (needed for certain commands)"""
colon = filename.find(':')
if colon == -1:
return filename
else:
firstChar = colon + 1
if firstChar == len(filename):
return ''
else:
return filename[firstChar:]
def addFilesToPreStage(self,listOfFiles):
"""Add a list (or tuple) of files or a single file to the list of files to be pre-staged.
<listOfFiles> is either a python list (or tuple) with filenames or a single filename (string).
Each file is tested with self.isFileInStager(), and a FilePreStageError exception
is raised if the file is not in the stager."""
# compatability with single filename
if not listOfFiles: return
if type(listOfFiles).__name__ == 'str': listOfFiles = [ listOfFiles ]
for f in listOfFiles:
if not self.isFileInStager(f):
raise FilePreStageError( 'File %s does not seem to belong in %s' % (f,self.__class__.__name__) )
if not self.fileExists(f):
raise FilePreStageError( 'File %s does not exist in %s' % (f,self.__class__.__name__) )
self.__filesToPreStage[f] = FilePreStager.UNKNOWN
def updateStatus(self,printStatus='none'):
"""Update the status of all files. Print out status according to the value of <printStatus>:
'none' : Don't print anything.
'changed : Only print files with changed state
'all' : Print out all files
"""
fileList = self.__filesToPreStage
if not fileList: return
statusDict = self.getFilesStatus(fileList.keys())
if printStatus == 'changed':
self.printInfo( "Status of requested files with changed status on: %s" % time.asctime() )
elif printStatus == 'all':
self.printInfo( "Status of all requested files on: %s" % time.asctime() )
nChanged = 0
for filename,status in statusDict.items():
if printStatus == 'all': self.printInfo( " %s: %s" % (filename,status) )
if status != fileList[filename]:
# print any changes
if printStatus == 'changed': self.printInfo( " %s: %s" % (filename,status) )
nChanged += 1
# update state
fileList[filename] = status
if nChanged == 0 and printStatus == 'changed':
self.printInfo( " (No changes since last check)" )
def filesToPreStage(self):
return self.__filesToPreStage
def getFilesWithStatus(self,status,*vargs):
"""Return the list of files which have one of the status given in the argument list"""
statusList = [ status ] + list(vargs)
return [ f for f,s in self.__filesToPreStage.items() if s in statusList ]
def getFilesNotWithStatus(self,status,*vargs):
"""Return a list of files which do NOT have any of the status given in the argument list"""
statusList = [ status ] + list(vargs)
return [ f for f,s in self.__filesToPreStage.items() if s not in statusList ]
def stageAllFiles(self,needUpdate=True):
"""Initiate stage-in of files that are not already on disk. Return immediately,
and do not wait until all files are staged."""
if not self.__filesToPreStage: return
if needUpdate: self.updateStatus()
toBeStaged = self.getFilesWithStatus( FilePreStager.ONTAPE )
if toBeStaged:
self.printInfo("Pre-staging file(s) %s" % ','.join(toBeStaged))
self.preStageFiles( toBeStaged )
class FileStagerRobot(TransformLogger):
def __init__(self,name=None):
if not name: name = self.__class__.__name__
TransformLogger.__init__(self,name)
self.setLoggerLevel( logging.INFO )
self.__name = name
self.__stagerList = []
def name(self):
return self.__name
def setLoggerParentName(self,name):
"""Override from TransformLogger: propagate to all stagers"""
TransformLogger.setLoggerParentName(self,name)
for stager in self.__stagerList:
stager.setLoggerParentName(name)
def addStager(self,stager):
if not isinstance(stager,FilePreStager):
raise FilePreStageError('%s is not a FilePreStager' % stager.__class__.__name__)