Commit 5a0d3e41 authored by Andrea Sciaba's avatar Andrea Sciaba
Browse files

Updated WMCore, adapted remotestageout and improved CE-cms-env

parent 8a234625
......@@ -16,7 +16,7 @@ import sys
#py2.6 compatibility
try:
import json
except ImportError, ex:
except ImportError as ex:
import simplejson as json
......@@ -425,4 +425,3 @@ def makeTweakFromJSON(jsonDictionary):
for param, value in jsoniser.parameters.items():
tweak.addParameter(param , value)
return tweak
......@@ -82,7 +82,6 @@ _TweakParams = [
"process.source.inputCommands",
"process.source.dropDescendantsOfDroppedBranches",
# maxevents
"process.maxEvents.input",
"process.maxEvents.output",
......@@ -97,6 +96,7 @@ _TweakParams = [
# random seeds
"process.RandomNumberGeneratorService.*.initialSeed",
"process.GlobalTag.globaltag",
]
......@@ -312,7 +312,7 @@ def applyTweak(process, tweak, fixup = None):
making sure all the necessary PSets and configuration values exist).
"""
for param, value in tweak:
if fixup and fixup.has_key(param):
if fixup and param in fixup:
fixup[param](process)
setParameter(process, param, value)
......@@ -386,9 +386,9 @@ def makeTaskTweak(stepSection):
if hasattr(stepSection.application, "configuration"):
if hasattr(stepSection.application.configuration, "pickledarguments"):
args = pickle.loads(stepSection.application.configuration.pickledarguments)
if args.has_key('globalTag'):
if 'globalTag' in args:
result.addParameter("process.GlobalTag.globaltag", args['globalTag'])
if args.has_key('globalTagTransaction'):
if 'globalTagTransaction' in args:
result.addParameter("process.GlobalTag.DBParameters.transactionId", args['globalTagTransaction'])
return result
......@@ -401,6 +401,10 @@ def makeJobTweak(job):
that can be used to modify a CMSSW process.
"""
result = PSetTweak()
baggage = job.getBaggage()
# Check in the baggage if we are processing .lhe files
lheInput = getattr(baggage, "lheInputFiles", False)
# Input files and secondary input files.
primaryFiles = []
......@@ -426,7 +430,7 @@ def makeJobTweak(job):
result.addParameter("process.source.fileNames", primaryFiles)
if len(secondaryFiles) > 0:
result.addParameter("process.source.secondaryFileNames", secondaryFiles)
else:
elif not lheInput:
#First event parameter should be set from whatever the mask says,
#That should have the added protection of not going over 2^32 - 1
#If there is nothing in the mask, then we fallback to the counter method
......@@ -448,8 +452,11 @@ def makeJobTweak(job):
# We don't want to set skip events for MonteCarlo jobs which have
# no input files.
firstEvent = mask['FirstEvent']
if firstEvent != None and firstEvent >= 0 and len(primaryFiles) > 0:
result.addParameter("process.source.skipEvents", firstEvent)
if firstEvent != None and firstEvent >= 0 and (len(primaryFiles) > 0 or lheInput):
if lheInput:
result.addParameter("process.source.skipEvents", firstEvent - 1)
else:
result.addParameter("process.source.skipEvents", firstEvent)
firstRun = mask['FirstRun']
if firstRun != None:
......@@ -473,8 +480,6 @@ def makeJobTweak(job):
result.addParameter("process.source.lumisToProcess", lumisToProcess)
# install any settings from the per job baggage
baggage = job.getBaggage()
procSection = getattr(baggage, "process", None)
if procSection == None:
return result
......@@ -512,10 +517,3 @@ def makeOutputTweak(outMod, job):
# output module based on the settings in the section
return result
'''
A bunch of functions that check the permissions on a file are what they should
be, or more restrictive.
A bunch of functions that check the permissions on a file are what they should
be, or more restrictive.
'''
......@@ -10,17 +10,17 @@ import stat
def check_permissions(filehandle, permission, pass_stronger = False):
info = os.stat(filehandle)
filepermission = oct(info[stat.ST_MODE] & 0777)
filepermission = oct(info[stat.ST_MODE] & 0o777)
if pass_stronger:
assert filepermission <= permission, "file's permissions are too weak"
else:
assert filepermission == permission, "file does not have the correct permissions"
def owner_readonly(file):
check_permissions(file, oct(0400))
check_permissions(file, oct(0o400))
def owner_readwrite(file):
check_permissions(file, oct(0600))
check_permissions(file, oct(0o600))
def owner_readwriteexec(file):
check_permissions(file, oct(0700))
\ No newline at end of file
check_permissions(file, oct(0o700))
#!/usr/bin/env python
# pylint: disable-msg=C0321,C0103
# pylint: disable=C0321,C0103
"""
_Configuration_
......@@ -26,13 +26,15 @@ _SimpleTypes = [
types.IntType,
]
_SupportedTypes = [
_ComplexTypes = [
types.DictType,
types.ListType,
types.TupleType,
]
_SupportedTypes = []
_SupportedTypes.extend(_SimpleTypes)
_SupportedTypes.extend(_ComplexTypes)
def format(value):
......@@ -93,12 +95,30 @@ class ConfigSection(object):
else:
return (id(self) == id(other))
def _complexTypeCheck(self, name, value):
if type(value) in _SimpleTypes:
return
elif type(value) in _ComplexTypes:
vallist = value
if type(value) == types.DictType:
vallist = value.values()
for val in vallist:
self._complexTypeCheck(name, val)
else:
msg = "Not supported type in sequence:"
msg += "%s\n" % type(value)
msg += "for name: %s and value: %s\n" % (name, value)
msg += "Added to WMAgent Configuration"
raise RuntimeError, msg
def __setattr__(self, name, value):
if name.startswith("_internal_"):
# skip test for internal setting
object.__setattr__(self, name, value)
return
if isinstance(value, ConfigSection):
# child ConfigSection
self._internal_children.add(name)
......@@ -107,23 +127,11 @@ class ConfigSection(object):
object.__setattr__(self, name, value)
return
if type(value) not in _SupportedTypes:
msg = "Unsupported Type: %s\n" % type(value)
msg += "Added to WMAgent Configuration"
raise RuntimeError, msg
if type(value) == types.UnicodeType:
value = str(value)
if type(value) in (types.ListType, types.TupleType, types.DictType):
vallist = value
if type(value) == types.DictType:
vallist = value.values()
for val in vallist:
if type(val) not in _SimpleTypes:
msg = "Complex Value type in sequence:"
msg += "%s\n" % type(val)
msg += "for name: %s and value: %s\n" % (name, value)
msg += "Added to WMAgent Configuration"
raise RuntimeError, msg
self._complexTypeCheck(name, value)
object.__setattr__(self, name, value)
self._internal_settings.add(name)
return
......@@ -177,7 +185,7 @@ class ConfigSection(object):
returns a ConfigSection instance
"""
if self.__dict__.has_key(sectionName):
if sectionName in self.__dict__:
return self.__dict__[sectionName]
newSection = ConfigSection(sectionName)
self.__setattr__(sectionName, newSection)
......@@ -221,7 +229,7 @@ class ConfigSection(object):
result.extend(getattr(self, attr).pythonise_(
document = document, comment = comment, prefix = myName))
continue
if self._internal_docstrings.has_key(attr):
if attr in self._internal_docstrings:
if comment:
result.append("# %s.%s: %s" % (
myName, attr,
......@@ -232,7 +240,7 @@ class ConfigSection(object):
attr, format(getattr(self, attr))
))
if self._internal_docstrings.has_key(attr):
if attr in self._internal_docstrings:
if document:
result.append(
"%s.document_(\"\"\"%s\"\"\", \'%s\')" % (
......@@ -246,32 +254,32 @@ class ConfigSection(object):
_dictionary_
Create a dictionary representation of this object.
This method does not take into account possible ConfigSections
as attributes of self (i.e. sub-ConfigSections) as the
dictionary_whole_tree_() method does.
The reason for this method to stay is that WebTools.Root.py
depends on a few places to check itself like:
if isinstance(param_value, ConfigSection) ...
"""
result = {}
[ result.__setitem__(x, getattr(self, x))
for x in self._internal_settings ]
return result
def dictionary_whole_tree_(self):
"""
Create a dictionary representation of this object.
ConfigSection.dictionary_() method needs to expand possible
items that are ConfigSection instances (those which appear
in the _internal_children set).
Also these sub-ConfigSections have to be made dictionaries
rather than putting e.g.
'Task1': <WMCore.Configuration.ConfigSection object at 0x104ccb50>
"""
result = {}
for x in self._internal_settings:
......@@ -395,7 +403,7 @@ class Configuration(object):
self._internal_webapps.remove(name)
object.__delattr__(self, name)
return
@staticmethod
def getInstance():
return getattr(Configuration, "_instance", None)
......@@ -438,7 +446,7 @@ class Configuration(object):
returns a ConfigSection instance
"""
if self.__dict__.has_key(sectionName):
if sectionName in self.__dict__:
return self.__dict__[sectionName]
newSection = ConfigSection(sectionName)
self.__setattr__(sectionName, newSection)
......@@ -497,7 +505,7 @@ class Configuration(object):
elif sectionName in self._internal_webapps:
result += "config.webapp_(\'%s\')\n" % sectionName
else:
result += "config.section_(\'%s\')\n" % sectionName
result += "config.section_(\'%s\')\n" % sectionName
sectionRef = getattr(self, sectionName)
......@@ -550,7 +558,7 @@ def loadConfigurationFile(filename):
try:
modRef = imp.load_module(cfgBaseName, modPath[0],
modPath[1], modPath[2])
except Exception, ex:
except Exception as ex:
msg = "Unable to load Configuration File:\n"
msg += "%s\n" % filename
msg += "Due to error:\n"
......
......@@ -40,4 +40,4 @@ class DASDocument(dict):
"""
return True
\ No newline at end of file
return True
......@@ -17,7 +17,7 @@ class File(WMObject, dict):
_File_
Data object that contains details for a single file
"""
def __init__(self, lfn = "", size = 0, events = 0, checksums = {},
def __init__(self, lfn = "", size = 0, events = 0, checksums = {},
parents = None, locations = None, merged = False):
dict.__init__(self)
self.setdefault("lfn", lfn)
......@@ -37,7 +37,7 @@ class File(WMObject, dict):
if parents == None:
self.setdefault("parents", set())
else:
self.setdefault("parents", parents)
self.setdefault("parents", parents)
def addRun(self, run):
"""
......@@ -54,15 +54,15 @@ class File(WMObject, dict):
if not isinstance(run, Run):
msg = "addRun argument must be of type WMCore.DataStructs.Run"
raise RuntimeError, msg
addFlag = False
addFlag = False
for runMember in self['runs']:
if runMember.run == run.run:
# this rely on Run object overwrite __add__ to update self
# this rely on Run object overwrite __add__ to update self
runMember + run
addFlag = True
if not addFlag:
if not addFlag:
self['runs'].add(run)
return
......@@ -90,7 +90,7 @@ class File(WMObject, dict):
"""
#if self['run'] == rhs['run']:
# return cmp(self['lumi'], rhs['lumi'])
return self.__eq__(rhs)
def __eq__(self, rhs):
......@@ -144,7 +144,7 @@ class File(WMObject, dict):
runDict = {"run_number": run.run,
"lumis": run.lumis}
fileDict["runs"].append(runDict)
return fileDict
def __to_json__(self, thunker = None):
......
......@@ -8,7 +8,7 @@ Data object that contains a set of files
__all__ = []
from WMCore.DataStructs.WMObject import WMObject
from WMCore.DataStructs.WMObject import WMObject
class Fileset(WMObject):
"""
......@@ -21,12 +21,12 @@ class Fileset(WMObject):
"""
self.name = name
self.files = set()
if files == None:
self.newfiles = set()
else:
self.newfiles = files
# assume that the fileset is open at first
self.open = True
......@@ -45,32 +45,32 @@ class Fileset(WMObject):
def addFile(self, file):
"""
Add a (set of) file(s) to the fileset
If the file is already in self.files update that entry
If the file is already in self.files update that entry
e.g. to handle updated location
If the file is already in self.newfiles update that entry
If the file is already in self.newfiles update that entry
e.g. to handle updated location
Else add the file to self.newfiles
"""
file = self.makeset(file)
new = file - self.getFiles(type='set')
self.newfiles = self.makeset(self.newfiles) | new
updated = self.makeset(file) & self.getFiles(type='set')
"updated contains the original location information for updated files"
self.files = self.files.union(updated)
def getFiles(self, type='list'):
if type == 'list':
"""
List all files in the fileset - returns a set of file objects
List all files in the fileset - returns a set of file objects
sorted by lfn.
"""
files = list(self.getFiles(type='set'))
try:
files.sort(lambda x, y: cmp(x['lfn'], y['lfn']))
except Exception, e:
except Exception as e:
print 'Problem with listFiles for fileset:', self.name
print files.pop()
raise e
......@@ -79,7 +79,7 @@ class Fileset(WMObject):
return self.makeset(self.files) | self.makeset(self.newfiles)
elif type == 'lfn':
"""
All the lfn's for files in the filesets
All the lfn's for files in the filesets
"""
def getLFN(file):
return file["lfn"]
......@@ -87,34 +87,34 @@ class Fileset(WMObject):
return files
elif type == 'id':
"""
All the id's for files in the filesets
All the id's for files in the filesets
"""
def getID(file):
return file["id"]
files = map(getID, self.getFiles(type='list'))
return files
def listNewFiles(self):
def listNewFiles(self):
"""
List all files in the fileset that are new - e.g. not in the DB - returns a set
"""
"""
return self.newfiles
def commit(self):
"""
Add contents of self.newfiles to self, empty self.newfiles
"""
self.files = self.makeset(self.files) | self.makeset(self.newfiles)
self.newfiles = set()
def __len__(self):
return len(self.getFiles(type='set'))
def __iter__(self):
for file in self.getFiles():
yield file
def markOpen(self, isOpen):
"""
_markOpen_
......
......@@ -49,6 +49,9 @@ class Job(WMObject, dict):
self["fwjr_path"] = None
self["workflow"] = None
self["owner"] = None
self["estimatedJobTime"] = None
self["estimatedMemoryUsage"] = None
self["estimatedDiskUsage"] = None
return
# //
......@@ -126,6 +129,29 @@ class Job(WMObject, dict):
self["outcome"] = jobOutcome
return
def addResourceEstimates(self, jobTime = None, memory = None, disk = None):
"""
_addResourceEstimates_
Add to the current resource estimates, if None then initialize them
to the given value. Each value can be set independently.
"""
# Update time
if self["estimatedJobTime"] is None:
self["estimatedJobTime"] = jobTime
elif jobTime is not None:
self["estimatedJobTime"] += jobTime
# Update memory
if self["estimatedMemoryUsage"] is None:
self["estimatedMemoryUsage"] = memory
elif memory is not None:
self["estimatedMemoryUsage"] += memory
# Update disk
if self["estimatedDiskUsage"] is None:
self["estimatedDiskUsage"] = disk
elif disk is not None:
self["estimatedDiskUsage"] += disk
def getBaggage(self):
"""
_getBaggage_
......@@ -156,5 +182,3 @@ class Job(WMObject, dict):
currentPSet = getattr(currentPSet, param)
else:
setattr(currentPSet, param, value)
......@@ -5,7 +5,7 @@ _JobGroup_
Definition of JobGroup:
Set of jobs running on same input file for same Workflow
Set of jobs for a single subscription
Required for certain job splitting Algo's (.g. event split to make complete
Required for certain job splitting Algo's (.g. event split to make complete
lumi)
Subscription:JobGroup == 1:N
JobGroup:Jobs = 1:N
......@@ -17,14 +17,14 @@ A JobGroup is a set of jobs and a Fileset that contains their output.
JobGroup knows the Subscription and passes the Workflow to Jobs in the group.
Jobs know their status (active, failed, complete) and know the files they run
on but don't know the group. They do know their subscription and corresponding
workflow. This means Jobs can update their state in the database without
Jobs know their status (active, failed, complete) and know the files they run
on but don't know the group. They do know their subscription and corresponding
workflow. This means Jobs can update their state in the database without
talking to the group, and WMBS JobGroups can calculate status from the database
instead of the in memory objects.
instead of the in memory objects.
The group has a status call which goes through the jobs and updates the db for
state changes and then returns the status of the group (active, failed,
The group has a status call which goes through the jobs and updates the db for