Commit 76e89d84 authored by Andrea Sciaba's avatar Andrea Sciaba
Browse files

mc and remotestageout tests updated for new WMCore code

parent e606eafc
......@@ -117,16 +117,16 @@ class StageOutDiagnostic:
self.summary['SiteConf'] = "Failed: Cannot load SiteConf"
raise RuntimeError, msg
if self.siteConf.localStageOut['command'] == None:
if self.siteConf.localStageOutCommand() == None:
msg = "LocalStageOut Command is not set"
self.summary['SiteConf'] = \
"Failed: local-stage-out command not set"
raise RuntimeError, msg
if self.siteConf.localStageOut['se-name'] == None:
msg = "LocalStageOut SE Name is not set"
if self.siteConf.localStageOutPNN() == None:
msg = "LocalStageOut PhEDEx node is not set"
self.summary['SiteConf'] = \
"Failed: local-stage-out se-name not set"
"Failed: local-stage-out phedex-node not set"
raise RuntimeError, msg
if self.siteConf.localStageOut['catalog'] == None:
......@@ -167,9 +167,7 @@ class StageOutDiagnostic:
raise RuntimeError, msg
msg = "TFC test successful:\n"
msg += "Mapped LFN: %s\n To PFN: %s\n" % (sampleLFN, samplePFN)
# msg += "Using Catalog Rules:\n"
# msg += str(self.tfc)
msg += "Mapped LFN: %s\n To PFN: %s\n" % (sampleLFN, samplePFN)
print msg
return
......@@ -191,8 +189,8 @@ class StageOutDiagnostic:
sourcePFN = os.path.join(os.getcwd(), "TEST-FILE")
seName = self.siteConf.localStageOut['se-name']
command = self.siteConf.localStageOut['command']
seName = self.siteConf.localStageOutPNN()
command = self.siteConf.localStageOutCommand()
options = self.siteConf.localStageOut.get('option', None)
protocol = self.tfc.preferredProtocol
......@@ -205,13 +203,14 @@ class StageOutDiagnostic:
except Exception, ex:
msg += "Unable to retrieve impl for local stage out:\n"
msg += "Error retrieving StageOutImpl for command named: %s\n" % (
command,)
command)
self.summary['LocalStageOut'] = \
"Failure: Cant retrieve StageOut Impl"
raise RuntimeError, msg
try:
impl.retryPause = 15
impl.numRetries = 2
impl(protocol, sourcePFN, targetPFN, options)
wasSuccessful = True
except Exception, ex:
......@@ -228,7 +227,7 @@ class StageOutDiagnostic:
### FALLBACK ###
### there are N fallbacks in a list called fallbackStageOut ###
for fallbackCount in range(len(self.siteConf.fallbackStageOut)):
seName = self.siteConf.fallbackStageOut[fallbackCount]['se-name']
seName = self.siteConf.fallbackStageOut[fallbackCount]['phedex-node']
command = self.siteConf.fallbackStageOut[fallbackCount]['command']
options = self.siteConf.fallbackStageOut[fallbackCount].get('option', None)
try:
......@@ -273,7 +272,7 @@ class StageOutDiagnostic:
"""
os.remove( "TEST-FILE" )
commandList = [ self.siteConf.localStageOut[ 'command' ] ]
commandList = [ self.siteConf.localStageOutCommand() ]
pfnList = [ self.tfc.matchLFN(self.tfc.preferredProtocol, self.testLFN) ]
for fallback in self.siteConf.fallbackStageOut:
......
......@@ -7,19 +7,14 @@ independent python structure
"""
import inspect
import pickle
import StringIO
import imp
import inspect
import json
import pickle
import sys
from functools import reduce
#py2.6 compatibility
try:
import json
except ImportError as ex:
import simplejson as json
class PSetHolder(object):
"""
......@@ -145,7 +140,7 @@ class JSONiser:
for param in params:
self.parameters["%s.%s" % (queue, param)] = dictionary[param]
for key, value in dictionary.items():
if type(value) == type(dict()):
if isinstance(value, dict):
self.queue.append(key)
self.dejson(dictionary[key])
self.queue.pop(-1)
......@@ -180,7 +175,7 @@ class PSetTweak:
"""
currentPSet = None
paramList = attrName.split(".")
for i in range(0, len(paramList)):
for _ in range(0, len(paramList)):
param = paramList.pop(0)
if param == "process":
currentPSet = self.process
......@@ -204,7 +199,7 @@ class PSetTweak:
if not paramName.startswith("process"):
msg = "Invalid Parameter Name: %s\n" % paramName
msg += "Parameter must start with process"
raise RuntimeError, msg
raise RuntimeError(msg)
return recursiveGetattr(self, paramName)
......@@ -261,7 +256,7 @@ class PSetTweak:
current = None
last = None
psets = psetPath.split(".")
for i in range(0, len(psets)):
for _ in range(0, len(psets)):
pset = psets.pop(0)
last = current
if current == None:
......@@ -293,8 +288,7 @@ class PSetTweak:
setattrCalls = {}
for pset in self.psets():
setattrCalls.update(self.setattrCalls(pset))
order = setattrCalls.keys()
order.sort()
order = sorted(setattrCalls.keys())
for call in order:
if call == "process": continue
result += "%s\n" % setattrCalls[call]
......@@ -303,7 +297,7 @@ class PSetTweak:
for param, value in self:
psetName = param.rsplit(".", 1)[0]
paramName = param.rsplit(".", 1)[1]
if type(value) == type("string"):
if isinstance(value, basestring):
value = "\"%s\"" % value
result += "setattr(%s, \"%s\", %s)\n" % (
psetName, paramName, value)
......@@ -336,33 +330,33 @@ class PSetTweak:
return jsoniser.json
def persist(self, filename, format = "python"):
def persist(self, filename, formatting="python"):
"""
_persist_
Save this object as either python, json or pickle
"""
if format not in ("python", "json", "pickle"):
msg = "Unsupported Format: %s" % format
raise RuntimeError, msg
if formatting not in ("python", "json", "pickle"):
msg = "Unsupported Format: %s" % formatting
raise RuntimeError(msg)
if format == "python":
if formatting == "python":
handle = open(filename, 'w')
handle.write(self.pythonise())
handle.close()
if format == "json":
if formatting == "json":
handle = open(filename, "w")
handle.write(self.jsonise())
handle.close()
if format == "pickle":
if formatting == "pickle":
handle = open(filename, "w")
pickle.dump(self, handle)
handle.close()
return
def unpersist(self, filename, format = None):
def unpersist(self, filename, formatting=None):
"""
_unpersist_
......@@ -370,26 +364,26 @@ class PSetTweak:
it based on file extension
"""
if format == None:
if formatting == None:
fileSuffix = filename.rsplit(".", 1)[1]
if fileSuffix == "py":
format = "python"
formatting = "python"
if fileSuffix == "pkl":
format = "pickle"
formatting = "pickle"
if fileSuffix == "json":
format = "json"
formatting = "json"
if format not in ("python", "json", "pickle"):
msg = "Unsupported Format: %s" % format
raise RuntimeError, msg
if formatting not in ("python", "json", "pickle"):
msg = "Unsupported Format: %s" % formatting
raise RuntimeError(msg)
if format == "pickle":
if formatting == "pickle":
handle = open(filename, 'r')
unpickle = pickle.load(handle)
handle.close()
self.process.__dict__.update(unpickle.__dict__)
if format == "python":
if formatting == "python":
modRef = imp.load_source('tempTweak', filename)
lister = PSetLister()
lister(modRef.process)
......@@ -399,7 +393,7 @@ class PSetTweak:
del modRef, sys.modules['tempTweak']
if format == "json":
if formatting == "json":
handle = open(filename, 'r')
jsonContent = handle.read()
handle.close()
......
......@@ -7,8 +7,11 @@ Note: This can be used within the CMSSW environment to act on a
process/config but does not depend on any CMSSW libraries. It needs to stay like this.
"""
from __future__ import print_function
import logging
import pickle
import traceback
import os
from PSetTweaks.PSetTweak import PSetTweak
from PSetTweaks.PSetTweak import parameterIterator, psetIterator
......@@ -25,14 +28,13 @@ _TweakOutputModules = [
"fastCloning",
"sortBaskets",
"dropMetaData",
#"outputCommands", #this is just a huge pile of stuff which we probably shouldnt be setting anyways
# "outputCommands", #this is just a huge pile of stuff which we probably shouldnt be setting anyways
"SelectEvents.SelectEvents",
"dataset.dataTier",
"dataset.filterName",
# TODO: support dataset.* here
]
]
_TweakParams = [
# options
......@@ -46,13 +48,11 @@ _TweakParams = [
"process.options.FailModule",
"process.options.IgnoreCompletely",
#config metadata
# config metadata
"process.configurationMetadata.name",
"process.configurationMetadata.version",
"process.configurationMetadata.annotation",
# source
"process.source.maxEvents",
"process.source.skipEvents",
......@@ -98,17 +98,19 @@ _TweakParams = [
"process.RandomNumberGeneratorService.*.initialSeed",
"process.GlobalTag.globaltag",
]
]
class WMTweakMaskError(Exception):
def __init__(self, mask = None, msg = "Cannot set process from job mask"):
class WMTweakMaskError(Exception):
def __init__(self, mask=None, msg="Cannot set process from job mask"):
super(WMTweakMaskError, self).__init__()
self.mask = mask
self.message = msg
def __str__(self):
return "Error: %s \n Mask: %s" % (self.message, str(self.mask))
def lfnGroup(job):
"""
_lfnGroup_
......@@ -121,7 +123,8 @@ def lfnGroup(job):
lfnGroup = modifier + str(job.get("counter", 0) / 1000).zfill(4)
return lfnGroup
def hasParameter(pset, param, nopop = False):
def hasParameter(pset, param, nopop=False):
"""
_hasParameter_
......@@ -135,7 +138,7 @@ def hasParameter(pset, param, nopop = False):
"""
params = param.split(".")
if not nopop:
params.pop(0) # first param is the pset we have the reference to
params.pop(0) # first param is the pset we have the reference to
lastParam = pset
for param in params:
lastParam = getattr(lastParam, param, None)
......@@ -145,7 +148,8 @@ def hasParameter(pset, param, nopop = False):
return True
return False
def getParameter(pset, param, nopop = False):
def getParameter(pset, param, nopop=False):
"""
_getParameter_
......@@ -157,7 +161,7 @@ def getParameter(pset, param, nopop = False):
"""
params = param.split(".")
if not nopop:
params.pop(0) # first param is the pset we have the reference to
params.pop(0) # first param is the pset we have the reference to
lastParam = pset
for param in params:
lastParam = getattr(lastParam, param, None)
......@@ -165,6 +169,7 @@ def getParameter(pset, param, nopop = False):
return None
return lastParam.value()
def setParameter(process, param, value):
"""
_setParameter_
......@@ -179,22 +184,20 @@ def setParameter(process, param, value):
"""
params = param.split('.')
params.pop(0) # first is process object
params.pop(0) # first is process object
lastPSet = process
for pset in params:
lastPSet = getattr(lastPSet, pset, None)
if lastPSet == None:
msg = "Cannot find attribute named: %s\n" % pset
msg += "Cannot set value: %s" % param
print msg
print(msg)
return
lastPSet.setValue(value)
return
def expandParameter(process, param):
"""
_expandParameter_
......@@ -205,9 +208,9 @@ def expandParameter(process, param):
"""
params = param.split('.')
params.pop(0)
lastResults = {"process" : process}
lastResults = {"process": process}
finalResults = {}
for i in range(0, len(params)):
for _ in range(0, len(params)):
pset = params.pop(0)
if pset == "*":
newResults = {}
......@@ -235,11 +238,11 @@ def expandParameter(process, param):
lastResults = newResults
return finalResults
return finalResults
listParams = lambda x: [y for y in x.parameters_()]
listParams = lambda x: [ y for y in x.parameters_() ]
class TweakMaker:
"""
......@@ -251,8 +254,9 @@ class TweakMaker:
within the output modules
"""
def __init__(self, processParams = _TweakParams,
outmodParams = _TweakOutputModules):
def __init__(self, processParams=_TweakParams,
outmodParams=_TweakOutputModules):
self.processLevel = processParams
self.outModLevel = outmodParams
......@@ -261,12 +265,11 @@ class TweakMaker:
tweak = PSetTweak()
# handle process parameters
processParams = []
[ processParams.extend( expandParameter(process, param).keys())
for param in self.processLevel]
[processParams.extend(expandParameter(process, param).keys())
for param in self.processLevel]
[ tweak.addParameter(param, getParameter(process, param))
for param in processParams if hasParameter(process, param) ]
[tweak.addParameter(param, getParameter(process, param))
for param in processParams if hasParameter(process, param)]
# output modules
tweak.addParameter('process.outputModules_', [])
......@@ -282,9 +285,9 @@ class TweakMaker:
param,
True))
return tweak
def makeTweak(process):
"""
_makeTweak_
......@@ -298,8 +301,7 @@ def makeTweak(process):
return maker(process)
def applyTweak(process, tweak, fixup = None):
def applyTweak(process, tweak, fixup=None):
"""
_applyTweak_
......@@ -318,8 +320,8 @@ def applyTweak(process, tweak, fixup = None):
setParameter(process, param, value)
childParameters = lambda p, x: [ i for i in x._internal_settings if i not in x._internal_children]
childSections = lambda s : [ getattr(s, x) for x in s._internal_children ]
childParameters = lambda p, x: [i for i in x._internal_settings if i not in x._internal_children]
childSections = lambda s: [getattr(s, x) for x in s._internal_children]
class ConfigSectionDecomposer:
......@@ -332,12 +334,12 @@ class ConfigSectionDecomposer:
May turn out to be generally useful for ConfigSections
"""
def __init__(self):
self.configSects = []
self.parameters = {}
self.queue = []
def __call__(self, configSect):
"""
_operator(configSect)_
......@@ -355,7 +357,6 @@ class ConfigSectionDecomposer:
paramVal = getattr(configSect, par)
self.parameters[paramName] = paramVal
map(self, childSections(configSect))
self.queue.pop(-1)
......@@ -373,6 +374,7 @@ def decomposeConfigSection(csect):
return decomposer.parameters
def makeTaskTweak(stepSection):
"""
_makeTaskTweak_
......@@ -393,6 +395,7 @@ def makeTaskTweak(stepSection):
return result
def makeJobTweak(job):
"""
_makeJobTweak_
......@@ -417,7 +420,7 @@ def makeJobTweak(job):
result.addParameter("process.source.firstLuminosityBlock",
job['mask']['FirstLumi'])
else:
#We don't have lumi information in the mask, raise an exception
# We don't have lumi information in the mask, raise an exception
raise WMTweakMaskError(job['mask'],
"No first lumi information provided")
continue
......@@ -431,18 +434,18 @@ def makeJobTweak(job):
if len(secondaryFiles) > 0:
result.addParameter("process.source.secondaryFileNames", secondaryFiles)
elif not lheInput:
#First event parameter should be set from whatever the mask says,
#That should have the added protection of not going over 2^32 - 1
#If there is nothing in the mask, then we fallback to the counter method
if job['mask'].get('FirstEvent',None) != None:
# First event parameter should be set from whatever the mask says,
# That should have the added protection of not going over 2^32 - 1
# If there is nothing in the mask, then we fallback to the counter method
if job['mask'].get('FirstEvent', None) != None:
result.addParameter("process.source.firstEvent",
job['mask']['FirstEvent'])
else:
#No first event information in the mask, raise and error
# No first event information in the mask, raise and error
raise WMTweakMaskError(job['mask'],
"No first event information provided in the mask")
mask = job['mask']
mask = job['mask']
# event limits
maxEvents = mask.getMaxEvents()
......@@ -462,7 +465,7 @@ def makeJobTweak(job):
if firstRun != None:
result.addParameter("process.source.firstRun", firstRun)
elif not len(primaryFiles):
#Then we have a MC job, we need to set firstRun to 1
# Then we have a MC job, we need to set firstRun to 1
logging.debug("MCFakeFile initiated without job FirstRun - using one.")
result.addParameter("process.source.firstRun", 1)
......@@ -485,14 +488,12 @@ def makeJobTweak(job):
return result
baggageParams = decomposeConfigSection(procSection)
for k,v in baggageParams.items():
result.addParameter(k,v)
for k, v in baggageParams.items():
result.addParameter(k, v)
return result
def makeOutputTweak(outMod, job):
"""
_makeOutputTweak_
......@@ -512,8 +513,98 @@ def makeOutputTweak(outMod, job):
lfn = "%s/%s/%s.root" % (lfnBase, lfnGroup(job), modName)
result.addParameter("process.%s.logicalFileName" % modName, lfn)
#TODO: Nice standard way to meddle with the other parameters in the
# TODO: Nice standard way to meddle with the other parameters in the
# output module based on the settings in the section
return result
def readAdValues(attrs, adname, castInt=False):
"""
A very simple parser for the ads available at runtime. Returns
a dictionary containing
- attrs: A list of string keys to look for.
- adname: Which ad to parse; "job" for the $_CONDOR_JOB_AD or
"machine" for $_CONDOR_MACHINE_AD
- castInt: Set to True to force the values to be integer literals.
Otherwise, this will return the values as a string representation
of the ClassAd expression.
Note this is not a ClassAd parser - will not handle new-style ads
or any expressions.
Will return a dictionary containing the key/value pairs that were
present in the ad and parseable.
On error, returns an empty dictionary.
"""
retval = {}
adfile = None
if adname == 'job':
adfile = os.environ.get("_CONDOR_JOB_AD")
elif adname == 'machine':
adfile = os.environ.get("_CONDOR_MACHINE_AD")