Commit 0c90a509 authored by Andrea Sciaba's avatar Andrea Sciaba
Browse files

Updated mc test

parent ae49a1ee
......@@ -2,62 +2,165 @@
"""
_XRDCPImpl_
Implementation of StageOutImpl interface for RFIO in Castor-2
Implementation of StageOutImpl interface for xrdcp
Generic, will/should work with any site.
"""
from __future__ import print_function
import os
import argparse
from WMCore.Storage.Registry import registerStageOutImpl
from WMCore.Storage.StageOutImpl import StageOutImpl
from WMCore.Storage.Execute import runCommand
from WMCore.Storage.Execute import execute
class XRDCPImpl(StageOutImpl):
"""
_XRDCPImpl_
Implement interface for rfcp command
"""
run = staticmethod(runCommand)
def __init__(self, stagein=False):
StageOutImpl.__init__(self, stagein)
self.numRetries = 5
self.retryPause = 300
def createSourceName(self, protocol, pfn):
"""
_createSourceName_
uses pfn
"""
return pfn
def createOutputDirectory(self, targetPFN):
"""
return "%s" % pfn
_createOutputDirectory_
def createStageOutCommand(self, sourcePFN, targetPFN, options = None, checksums = None):
not needed since xrdcp does it automatically
"""
return
def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
"""
_createStageOutCommand_
Build an xrdcp command
Build the actual xrdcp stageout command
If adler32 checksum is provided, use it for the transfer
xrdcp options used:
--force : re-creates a file if it's already present
--nopbar : does not display the progress bar
"""
original_size = os.stat(sourcePFN)[6]
print "Local File Size is: %s" % original_size
result = ". /afs/cern.ch/user/c/cmsprod/scratch1/releases/CMSSW_1_8_0_pre0/src/runtime.sh ; xrdcp -d 3"
if options != None:
result += " %s " % options
result += " %s " % sourcePFN
result += " root://lxgate39.cern.ch/%s " % targetPFN
result += "; DEST_SIZE=`rfstat %s | grep Size | cut -f2 -d:` ; if [ $DEST_SIZE ] && [ '%s' == $DEST_SIZE ]; then exit 0; else echo \"Error: Size Mismatch between local and SE\"; exit 60311 ; fi " % (targetPFN,original_size)
return result
if not options:
options = ''
parser = argparse.ArgumentParser()
parser.add_argument('--cerncastor', action='store_true')
parser.add_argument('--old', action='store_true')
args, unknown = parser.parse_known_args(options.split())
copyCommandOptions = ' '.join(unknown)
copyCommand = ""
if self.stageIn:
remotePFN, localPFN = sourcePFN, targetPFN
else:
remotePFN, localPFN = targetPFN, sourcePFN
copyCommand += "LOCAL_SIZE=`stat -c%%s \"%s\"`\n" % localPFN
copyCommand += "echo \"Local File Size is: $LOCAL_SIZE\"\n"
if args.cerncastor:
targetPFN += "?svcClass=t0cms"
useChecksum = (checksums != None and 'adler32' in checksums and not self.stageIn)
xrdcpExec = "xrdcp"
if args.old:
xrdcpExec = "xrdcp-old"
# check if xrdcp(-old) and xrdfs are in path
# fallback to xrootd 4.0.4 from COMP externals if not
xrootdInPath = False
if any(os.access(os.path.join(path, xrdcpExec), os.X_OK) for path in os.environ["PATH"].split(os.pathsep)):
if any(os.access(os.path.join(path, "xrdfs"), os.X_OK) for path in os.environ["PATH"].split(os.pathsep)):
xrootdInPath = True
if not xrootdInPath:
# COMP software can be in many place, check all of them
cmsSoftDir = os.environ.get("VO_CMS_SW_DIR", None)
if not cmsSoftDir:
cmsSoftDir = os.environ.get("OSG_APP", None)
if cmsSoftDir:
cmsSoftDir = os.path.join(cmsSoftDir, "cmssoft/cms")
else:
cmsSoftDir = os.environ.get("CVMFS", None)
if cmsSoftDir:
initFiles = []
initFiles.append(os.path.join(cmsSoftDir, "COMP/slc6_amd64_gcc493/external/xrootd/4.0.4-comp/etc/profile.d/init.sh"))
initFiles.append(os.path.join(cmsSoftDir, "COMP/slc6_amd64_gcc493/external/libevent/2.0.22/etc/profile.d/init.sh"))
initFiles.append(os.path.join(cmsSoftDir, "COMP/slc6_amd64_gcc493/external/gcc/4.9.3/etc/profile.d/init.sh"))
if all(os.path.isfile(initFile) for initFile in initFiles):
for initFile in initFiles:
copyCommand += "source %s\n" % initFile
copyCommand += "%s --force --nopbar " % xrdcpExec
if copyCommandOptions:
copyCommand += "%s " % copyCommandOptions
if useChecksum:
checksums['adler32'] = "%08x" % int(checksums['adler32'], 16)
copyCommand += "--cksum adler32:%s " % checksums['adler32']
copyCommand += " \"%s\" " % sourcePFN
copyCommand += " \"%s\" \n" % targetPFN
if self.stageIn:
copyCommand += "LOCAL_SIZE=`stat -c%%s \"%s\"`\n" % localPFN
copyCommand += "echo \"Local File Size is: $LOCAL_SIZE\"\n"
(_, host, path, _) = self.splitPFN(remotePFN)
if self.stageIn:
removeCommand = ""
else:
removeCommand = "xrdfs %s rm %s" % (host, path)
copyCommand += "REMOTE_SIZE=`xrdfs '%s' stat '%s' | grep Size | sed -r 's/.*Size:[ ]*([0-9]+).*/\\1/'`\n" % (host, path)
copyCommand += "echo \"Remote File Size is: $REMOTE_SIZE\"\n"
if useChecksum:
copyCommand += "echo \"Local File Checksum is: %s\"\n" % checksums['adler32']
copyCommand += "REMOTE_XS=`xrdfs '%s' query checksum '%s' | grep adler32 | sed -r 's/.*adler32[ ]*([0-9a-fA-F]{8}).*/\\1/'`\n" % (host, path)
copyCommand += "echo \"Remote File Checksum is: $REMOTE_XS\"\n"
copyCommand += "if [ $REMOTE_SIZE ] && [ $REMOTE_XS ] && [ $LOCAL_SIZE == $REMOTE_SIZE ] && [ '%s' == $REMOTE_XS ]; then exit 0; " % checksums['adler32']
copyCommand += "else echo \"Error: Size or Checksum Mismatch between local and SE\"; %s ; exit 60311 ; fi" % removeCommand
else:
copyCommand += "if [ $REMOTE_SIZE ] && [ $LOCAL_SIZE == $REMOTE_SIZE ]; then exit 0; "
copyCommand += "else echo \"Error: Size Mismatch between local and SE\"; %s ; exit 60311 ; fi" % removeCommand
return copyCommand
def removeFile(self, pfnToRemove):
"""
_removeFile_
CleanUp pfn provided: specific for Castor-2
"""
command = "stager_rm -M %s ; nsrm %s" %(pfnToRemove,pfnToRemove)
self.executeCommand(command)
(_, host, path, _) = self.splitPFN(pfnToRemove)
command = "xrdfs %s rm %s" % (host, path)
execute(command)
return
registerStageOutImpl("xrdcp", XRDCPImpl)
......@@ -6,12 +6,14 @@ Interface for Stage Out Plugins. All stage out implementations should
inherit this object and implement the methods accordingly
"""
from __future__ import print_function
import time
import os
from WMCore.Storage.Execute import runCommand
from WMCore.Storage.StageOutError import StageOutError, StageOutInvalidPath
from WMCore.Storage.StageOutError import StageOutError
class StageOutImpl:
class StageOutImpl(object):
"""
_StageOutImpl_
......@@ -30,17 +32,43 @@ class StageOutImpl:
self.numRetries = 3
self.retryPause = 600
self.stageIn = stagein
# tuple of exit codes of copy when dest directory does not exist
self.directoryErrorCodes = tuple()
def deferDirectoryCreation(self):
"""
Can we defer directory creation, hoping it exists,
only to create on a given error condition
"""
return len(self.directoryErrorCodes) != 0
@staticmethod
def splitPFN(pfn):
"""
_splitPFN_
Generic function to split the PFN in smaller pieces, such as:
{ <protocol>, <host>, <path>, <opaque> }
"""
protocol = pfn.split(':')[0]
host = pfn.split('/')[2]
thisList = pfn.replace('%s://%s/' % (protocol, host), '').split('?')
path = thisList[0]
opaque = ""
# If we have any opaque info keep it
if len(thisList) == 2:
opaque = "?%s" % thisList[1]
# check for the path to actually be in the opaque information
if opaque.startswith("?path="):
elements = opaque.split('&')
path = elements[0].replace('?path=', '')
buildingOpaque = '?'
for element in elements[1:]:
buildingOpaque += element
buildingOpaque += '&'
opaque = buildingOpaque.rstrip('&')
elif opaque.find("&path=") != -1:
elements = opaque.split('&')
buildingOpaque = elements[0]
for element in elements[1:]:
if element.startswith('path='):
path = element.replace('path=', '')
else:
buildingOpaque += '&' + element
opaque = buildingOpaque
return protocol, host, path, opaque
def executeCommand(self, command):
"""
......@@ -52,20 +80,17 @@ class StageOutImpl:
"""
try:
exitCode = runCommand(command)
msg = "Command exited with status: %s" % (exitCode)
print msg
msg = "%s : Command exited with status: %s\n" % (time.strftime("%Y-%m-%dT%H:%M:%S"), exitCode)
print(msg)
except Exception as ex:
raise StageOutError(str(ex), Command = command, ExitCode = 60311)
if exitCode in self.directoryErrorCodes:
raise StageOutInvalidPath()
elif exitCode:
msg = "Command exited non-zero"
print "ERROR: Exception During Stage Out:\n"
print msg
raise StageOutError(msg, Command = command, ExitCode = exitCode)
raise StageOutError(str(ex), Command=command, ExitCode=60311)
if exitCode:
msg = "%s : Command exited non-zero" % time.strftime("%Y-%m-%dT%H:%M:%S")
print("ERROR: Exception During Stage Out:\n")
print(msg)
raise StageOutError(msg, Command=command, ExitCode=exitCode)
return
def createSourceName(self, protocol, pfn):
"""
_createSourceName_
......@@ -75,8 +100,7 @@ class StageOutImpl:
implementation uses.
"""
raise NotImplementedError, "StageOutImpl.createSourceName"
raise NotImplementedError("StageOutImpl.createSourceName")
def createTargetName(self, protocol, pfn):
"""
......@@ -93,7 +117,6 @@ class StageOutImpl:
"""
return self.createSourceName(protocol, pfn)
def createOutputDirectory(self, targetPFN):
"""
_createOutputDirectory_
......@@ -105,8 +128,7 @@ class StageOutImpl:
"""
pass
def createStageOutCommand(self, sourcePFN, targetPFN, options = None, checksums = None):
def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
"""
_createStageOutCommand_
......@@ -114,8 +136,7 @@ class StageOutImpl:
targetPFN using the options provided if necessary
"""
raise NotImplementedError, "StageOutImpl.createStageOutCommand"
raise NotImplementedError("StageOutImpl.createStageOutCommand")
def removeFile(self, pfnToRemove):
"""
......@@ -127,8 +148,7 @@ class StageOutImpl:
intermediate files upon successful completion of the merge job
"""
raise NotImplementedError, "StageOutImpl.removeFile"
raise NotImplementedError("StageOutImpl.removeFile")
def createRemoveFileCommand(self, pfn):
"""
......@@ -141,8 +161,7 @@ class StageOutImpl:
else:
return ""
def __call__(self, protocol, inputPFN, targetPFN, options = None, checksums = None):
def __call__(self, protocol, inputPFN, targetPFN, options=None, checksums=None):
"""
_Operator()_
......@@ -153,7 +172,7 @@ class StageOutImpl:
"""
# //
# // Generate the source PFN from the plain PFN if needed
#//
# //
sourcePFN = self.createSourceName(protocol, inputPFN)
# destination may also need PFN changed
......@@ -162,55 +181,46 @@ class StageOutImpl:
# //
# // Create the output directory if implemented
#//
# //
for retryCount in range(1, self.numRetries + 1):
try:
# if we can detect directory problems later
# defer directory creation till then, only applies to stageOut
if not self.deferDirectoryCreation() or self.stageIn:
self.createOutputDirectory(targetPFN)
print("%s : Creating output directory..." % time.strftime("%Y-%m-%dT%H:%M:%S"))
self.createOutputDirectory(targetPFN)
break
except StageOutError as ex:
msg = "Attempted directory creation for stageout %s failed\n" % retryCount
msg = "Attempt %s to create a directory for stageout failed.\n" % retryCount
msg += "Automatically retrying in %s secs\n " % self.retryPause
msg += "Error details:\n%s\n" % str(ex)
if retryCount == self.numRetries :
print(msg)
if retryCount == self.numRetries:
# //
# // last retry, propagate exception
#//
# //
raise ex
time.sleep(self.retryPause)
# //
# //
# // Create the command to be used.
#//
# //
command = self.createStageOutCommand(sourcePFN, targetPFN, options, checksums)
# //
# // Run the command
#//
# //
for retryCount in range(1, self.numRetries + 1):
try:
try:
self.executeCommand(command)
except StageOutInvalidPath as ex:
# plugin indicated directory missing,create and retry
msg = "Copy failure indicates directory does not exist.\n"
msg += "Create now"
print msg
self.createOutputDirectory(targetPFN)
self.executeCommand(command)
return
print("%s : Running the stage out..." % time.strftime("%Y-%m-%dT%H:%M:%S"))
self.executeCommand(command)
break
except StageOutError as ex:
msg = "Attempted stage out %s failed\n" % retryCount
msg = "Attempt %s to stage out failed.\n" % retryCount
msg += "Automatically retrying in %s secs\n " % self.retryPause
msg += "Error details:\n%s\n" % str(ex)
if retryCount == self.numRetries :
print(msg)
if retryCount == self.numRetries:
# //
# // last retry, propagate exception
#//
# //
raise ex
time.sleep(self.retryPause)
......
......@@ -3,7 +3,7 @@
/usr/bin/id
# Source the CMS environment
export SCRAM_ARCH=slc5_amd64_gcc462
export SCRAM_ARCH=slc6_amd64_gcc493
if [ -n "$OSG_GRID" ] ; then
[ -f $OSG_GRID/setup.sh ] && source $OSG_GRID/setup.sh
if [ -d $OSG_APP/cmssoft/cms ] ;then
......@@ -37,26 +37,26 @@ if [ $result != 0 ]; then
fi
rm -f $tmpfile
if [ -e $VO_CMS_SW_DIR/COMP/slc5_amd64_gcc434/external/python/2.6.4/etc/profile.d/init.sh ]
if [ -e $VO_CMS_SW_DIR/COMP/slc6_amd64_gcc493/external/python/2.7.6/etc/profile.d/init.sh ]
then
. $VO_CMS_SW_DIR/COMP/slc5_amd64_gcc434/external/python/2.6.4/etc/profile.d/init.sh
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$VO_CMS_SW_DIR/COMP/slc5_amd64_gcc434/external/openssl/0.9.7m/lib:$VO_CMS_SW_DIR/COMP/slc5_amd64_gcc434/external/bz2lib/1.0.5/lib
elif [ -e $OSG_APP/cmssoft/cms/COMP/slc5_amd64_gcc434/external/python/2.6.4/etc/profile.d/init.sh ]
. $VO_CMS_SW_DIR/COMP/slc6_amd64_gcc493/external/python/2.7.6/etc/profile.d/init.sh
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$VO_CMS_SW_DIR/COMP/slc6_amd64_gcc493/external/openssl/1.0.1r/lib:$VO_CMS_SW_DIR/COMP/slc6_amd64_gcc493/external/bz2lib/1.0.6/lib
elif [ -e $OSG_APP/cmssoft/cms/COMP/slc6_amd64_gcc493/external/python/2.7.6/etc/profile.d/init.sh ]
then
. $OSG_APP/cmssoft/cms/COMP/slc5_amd64_gcc434/external/python/2.6.4/etc/profile.d/init.sh
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$OSG_APP/cmssoft/cms/COMP/slc5_amd64_gcc434/external/openssl/0.9.7m/lib:$OSG_APP/cmssoft/cms/COMP/slc5_amd64_gcc434/external/bz2lib/1.0.5/lib
. $OSG_APP/cmssoft/cms/COMP/slc6_amd64_gcc493/external/python/2.7.6/etc/profile.d/init.sh
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$OSG_APP/cmssoft/cms/COMP/slc6_amd64_gcc493/external/openssl/1.0.1r/lib:$OSG_APP/cmssoft/cms/COMP/slc6_amd64_gcc493/external/bz2lib/1.0.6/lib
fi
command -v python2.6 > /dev/null
command -v python2.7 > /dev/null
rc=$?
if [[ $rc != 0 ]]
then
echo "ERROR: Python2.6 isn't available on this worker node." >&2
echo "ERROR: WMCore/WMAgent REQUIRES python2.6" >&2
echo "summary: PYTHON_26_NOT_FOUND"
echo "ERROR: Python2.7 isn't available on this worker node." >&2
echo "ERROR: WMCore/WMAgent REQUIRES python2.7" >&2
echo "summary: PYTHON_27_NOT_FOUND"
exit $SAME_ERROR
else
echo "WMAgent found python2.6 at..."
echo `which python2.6`
echo "WMAgent found python2.7 at..."
echo `which python2.7`
echo
fi
......
......@@ -4,7 +4,7 @@
Summary: WLCG Compliant Probes from %{site}
Name: nagios-plugins-wlcg-org.cms
Version: 1.1.34
Version: 1.1.36
Release: 1%{?dist}
License: GPL
......@@ -49,6 +49,10 @@ install --directory %{buildroot}/etc/cron.d
/etc/cron.d/cms_glexec
%changelog
* Thu Jan 26 2016 Andrea Sciaba <Andrea.Sciaba@cern.ch> 1.1.36-1.
- updated mc test to use the latest xrootd stagout plugin
* Mon Dec 19 2016 Andrea Sciaba <Andrea.Sciaba@cern.ch> 1.1.35-1.
- SRM test now creates dest dir if missing
* Thu Dec 01 2016 Andrea Sciaba <Andrea.Sciaba@cern.ch> 1.1.34-1.
- added -p to gfal-copy for mc test
- removed check that sw area is writable from env test
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment