#!/usr/bin/env python import os import sys import time import errno import fcntl import re import select import signal import socket import logging import optparse import shutil from distutils.spawn import find_executable import xml.etree.ElementTree as ET import random # ########################################################################### # CSWNFB_FILES = [ "/store/mc/SAM/GenericTTbar/AODSIM/" + \ "CMSSW_9_2_6_91X_mcRun1_realistic_v2-v1/00000/" + \ "A64CCCF2-5C76-E711-B359-0CC47A78A3F8.root", "/store/mc/SAM/GenericTTbar/AODSIM/" + \ "CMSSW_9_2_6_91X_mcRun1_realistic_v2-v1/00000/" + \ "AE237916-5D76-E711-A48C-FA163EEEBFED.root", "/store/mc/SAM/GenericTTbar/AODSIM/" + \ "CMSSW_9_2_6_91X_mcRun1_realistic_v2-v1/00000/" + \ "CE860B10-5D76-E711-BCA8-FA163EAA761A.root" ] CSWNFB_SITES = ["T1_FR_CCIN2P3", "T2_US_Nebraska", "T1_RU_JINR", "T2_UK_London_Brunel", \ "T2_CN_Beijing", "T2_BE_IIHE", "T2_FR_GRIF_LLR", "T2_HU_Budapest", \ "T2_UK_London_IC", "T2_US_Wisconsin"] log = logging.getLogger() cms_file = """ import FWCore.ParameterSet.Config as cms process = cms.Process('XrootdTest') process.source = cms.Source('PoolSource', fileNames = cms.untracked.vstring("%s"), ) process.SiteLocalConfigService = cms.Service("SiteLocalConfigService", debugLevel = cms.untracked.uint32(%d), overrideSourceCacheHintDir = cms.untracked.string("application-only"), ) process.dump = cms.EDAnalyzer("EventContentAnalyzer", listContent=cms.untracked.bool(False), verboseForModuleLabels = cms.untracked.vstring("recoTracks_generalTracks"), getDataForModuleLabels=cms.untracked.vstring("recoTracks_generalTracks"), getData=cms.untracked.bool(True), ) process.load("FWCore.MessageService.MessageLogger_cfi") process.MessageLogger.cerr.FwkReport.reportEvery = 1 process.maxEvents = cms.untracked.PSet( input = cms.untracked.int32(1) ) process.p = cms.EndPath(process.dump) """ def configure_logging(lvl=logging.INFO): logger = logging.getLogger("cms.CE.xrootd-fallback") logger.setLevel(lvl) handler = logging.StreamHandler(sys.stdout) formatter = logging.Formatter(fmt="[%(process)d] %(asctime)s [%(levelname)07s]: %(message)s") handler.setFormatter(formatter) logger.addHandler(handler) global log log = logger def print_summary(summary, retval): if retval == 0: log.info(summary) else: log.error(summary) print "Summary: %s" % summary return retval def parse_opts(): parser = optparse.OptionParser() parser.add_option("-v", "--verbose", dest="verbose", help="Increase logging verbosity", action="store_true", default=False) parser.add_option("-H", "--host", dest="hostname", help="Hostname to use") parser.add_option("-t", "--timeout", dest="timeout", help="Test timeout in seconds; default is 300", default=300, type="int") opts, args = parser.parse_args() opts.starttime = time.time() opts.endtime = opts.starttime + opts.timeout if opts.verbose: configure_logging(logging.DEBUG) else: configure_logging() return args, opts def getExitCode(name, default): try: return int(os.environ.get(name, default)) except: return 0 def runCommandChild(cmd, args): try: try: os.execvpe(cmd, [cmd]+args, os.environ) except OSError, e: print "Error exec'ing %s: %s\n" % (cmd, str(e)) finally: os._exit(127) def runCommandParent(r, pid, opts, cmsruntimeout): flags = fcntl.fcntl(r, fcntl.F_GETFL) flags |= os.O_NONBLOCK fcntl.fcntl(r, fcntl.F_SETFL, flags) xlist = [] rlist = [r] wlist = [] if cmsruntimeout > 0: endtime = min(opts.endtime, time.time() + cmsruntimeout) else: endtime = opts.endtime timeout = endtime - time.time() stdout = "" exitCode = -1 while (timeout >= 0) and (r not in xlist): rlist, wlist, xlist = select.select(rlist, wlist, xlist, timeout) timeout = endtime - time.time() if r in rlist: newstr = os.read(r, 1024) stdout += newstr while newstr: try: newstr = os.read(r, 1024) except OSError, oe: if oe.errno == errno.EAGAIN: newstr = '' else: raise stdout += newstr rlist = [r] mypid, exitCode = os.waitpid(pid, os.WNOHANG) if mypid: break exitCode = -1 if (timeout < 0) and (exitCode < 0): os.kill(pid, signal.SIGKILL) log.error("Killed CMSSW child (pid %d) due to timeout." % pid) if exitCode < 0: pid, exitCode = os.waitpid(pid, 0) return stdout, exitCode def runCommand(cmd, args, opts, cmsruntimeout=0, combineStd=False): r, w = os.pipe() try: pid = os.fork() if pid: # parent os.close(w) return runCommandParent(r, pid, opts, cmsruntimeout) else: os.close(r) os.dup2(w, 1) if combineStd: os.dup2(w, 2) os.close(w) runCommandChild(cmd, args) finally: try: os.close(r) except OSError: pass try: os.close(w) except OSError: pass def sourceFile(filename, opts): if not os.path.exists(filename): log.warn("Trying to source a file (%s) which does not exist" % filename) return stdout, exitCode = runCommand("/bin/sh", ["-c", "source %s && env -0" % filename], opts) if exitCode: return exitCode for line in stdout.split('\0'): info = line.split("=", 1) if len(info) == 2: os.environ[info[0]] = info[1] return exitCode def evalCommand(cmd, args, opts): stdout, exitCode = runCommand('/bin/sh', ['-c', 'eval `%s %s` && env -0' % (cmd, ' '.join(args))], opts) if exitCode: return exitCode for line in stdout.split('\0'): info = line.split("=", 1) if len(info) == 2: os.environ[info[0]] = info[1] return exitCode def main(): # Change to test directory dirname = os.path.split(sys.argv[0])[0] if dirname: os.chdir(dirname) # Note we leak the file descriptor - this way, we hold the lock until # the probe finishes try: fd = os.open('cmssw_lock', os.O_RDWR|os.O_CREAT) fcntl.fcntl(fd, fcntl.F_SETFL, fcntl.FD_CLOEXEC) fcntl.flock(fd, fcntl.LOCK_EX) except (OSError, IOError), oe: # Move on - might as well finish running. log.exception(oe) _, opts = parse_opts() # For now, never give failures - just warnings NAG_CRITICAL = getExitCode("SAME_ERROR", 50) # Maps to Nagios CRITICAL NAG_WARNING = getExitCode("SAME_WARNING", 40) NAG_OK = getExitCode("SAME_OK", 10) now = time.strftime("%x %X", time.gmtime()) currdir = os.path.abspath(os.curdir) host = socket.gethostname() pilot_user_info, exitCode = runCommand("/usr/bin/id", ["-u"], opts) if exitCode: return print_summary("Failed to run 'id -u' to get user information", NAG_CRITICAL) pilot_user_info = pilot_user_info.strip() log.info("Ran at %s on host %s, directory %s, as user %s" % (now, host, currdir, pilot_user_info)) pilot_uid = os.geteuid() pilot_proxy = os.environ.get("X509_USER_PROXY", "/tmp/x509up_u%d" % pilot_uid) cert_dir = os.environ.get("X509_CERT_DIR", "undefined") if not os.path.isfile(pilot_proxy): return print_summary("X509_USER_PROXY=%s is not a file" % pilot_proxy, NAG_CRITICAL) log.info("X509_USER_PROXY=%s" % pilot_proxy) log.info("X509_CERT_DIR=%s" % cert_dir) # Workaround to suppress voms errors on OSG os.environ["VOMS_PROXY_INFO_DONT_VERIFY_AC"] = "1" if 'OSG_GRID' in os.environ and not find_executable('gfal-copy'): osg_setup = os.path.join(os.environ['OSG_GRID'], 'setup.sh') if not os.path.isfile(osg_setup): log.warning("$OSG_GRID is defined (%s), but %s does not exist." % (os.environ['OSG_GRID'], osg_setup)) else: exitCode = sourceFile(osg_setup, opts) if exitCode: return print_summary("Failed to source %s." % osg_setup, NAG_CRITICAL) dn, exitCode = runCommand("voms-proxy-info", ["--identity"], opts) dn = dn.strip() if exitCode: log.warning("Unable to determine DN from voms-proxy-info") else: log.info("DN: %s" % dn) fqan, exitCode = runCommand("voms-proxy-info", ["--fqan"], opts) if fqan: fqan = fqan.splitlines()[0] if exitCode: log.warning("Unable to determine primary FQAN from voms-proxy-info") else: log.info("Primary FQAN: %s" % fqan) proxyTime, exitCode = runCommand("voms-proxy-info", ["--timeleft"], opts) if exitCode: log.warning("Unable to determine time left for proxy from voms-proxy-info") else: if proxyTime: proxyTime = int(proxyTime.splitlines()[0]) if proxyTime == 0: return print_summary("VOMS proxy has expired.", NAG_CRITICAL) # Set the CMS environment if 'VO_CMS_SW_DIR' in os.environ and not find_executable('gfal-copy'): sw_dir = os.environ['VO_CMS_SW_DIR'] elif 'OSG_APP' in os.environ and not find_executable('gfal-copy'): sw_dir = os.path.join(os.environ['OSG_APP'], 'cmssoft', 'cms') if not os.path.exists(sw_dir): sw_dir = os.environ['OSG_APP'] elif 'CVMFS' in os.environ: sw_dir = os.path.join(os.environ['CVMFS'], 'cms.cern.ch') elif os.path.isdir('/cvmfs/cms.cern.ch'): sw_dir = '/cvmfs/cms.cern.ch' else: return print_summary("None of $VO_CMS_SW_DIR, $OSG_APP, $CVMFS, or /cvmfs/cms.cern.ch available", NAG_CRITICAL) sw_dir = '/cvmfs/cms.cern.ch' log.info("Using software directory %s" % sw_dir) sw_setup_script = os.path.join(sw_dir, 'cmsset_default.sh') if not os.path.isfile(sw_setup_script): return print_summary("The software setup script (%s) is missing" % sw_setup_script, NAG_CRITICAL) log.info("CMS software setup script (%s) is present." % sw_setup_script) scram_arch, _ = runCommand("/cvmfs/cms.cern.ch/common/cmsarch", [], opts) if 'slc6' in scram_arch: scram_arch = 'slc6_amd64_gcc530' elif 'slc7' in scram_arch: scram_arch = 'slc7_amd64_gcc530' log.info("Using SCRAM_ARCH=%s" % scram_arch) exitCode = sourceFile(sw_setup_script, opts) if exitCode: return print_summary("Failed to source setup script %s." % sw_setup_script, NAG_CRITICAL) if 'CMS_PATH' not in os.environ: return print_summary("CMS_PATH not defined after sourcing %s" % sw_setup_script, NAG_CRITICAL) if not os.path.isdir(os.environ['CMS_PATH']): return print_summary("CMS_PATH %s is not a directory." % os.environ['CMS_PATH'], NAG_CRITICAL) # Parse the local config file and find site name job_config_dir = os.path.join(os.environ['CMS_PATH'], 'SITECONF', 'local', 'JobConfig') if not os.path.isdir(job_config_dir): return print_summary("JobConfig directory %s does not exist" % job_config_dir, NAG_CRITICAL) config_file = os.path.join(job_config_dir, 'site-local-config.xml') if not os.path.isfile(config_file): return print_summary("Local configuration file %s does not exist." % config_file, NAG_CRITICAL) log.info("Local configuration file: %s" % config_file) fd = open(config_file, 'r') tree = ET.parse(fd) root = tree.getroot() numCatalogs = 0 for siteinfo in root.findall('site'): siteName = siteinfo.get('name') log.info("Site name: %s" % siteName) for event in siteinfo.findall('event-data'): for catalog in event.findall('catalog'): numCatalogs = numCatalogs + 1 fd.close() if numCatalogs == 1: log.info("Only 1 catalog line in site-local-config.xml file -- may be wrong configuration.") # Setup CMSSW, run command rndm_file = random.randint(0, len(CSWNFB_FILES) - 1) log.info("Fallback file: %s" % CSWNFB_FILES[rndm_file]) # rndm_site = random.randint(0, len(CSWNFB_SITES) - 1) if ( CSWNFB_SITES[rndm_site] == siteName ): rndm_site = (rndm_site + 1) % len(CSWNFB_SITES) log.info("Fallback site: %s" % CSWNFB_SITES[rndm_site]) log.info("Xrootd file we will test: %s" % CSWNFB_FILES[rndm_file]) shutil.rmtree("xrootd-fallback", 1) try: os.mkdir("xrootd-fallback") except OSError, oe: return print_summary("Failure to create test directory 'xrootd-fallback': %s." % str(oe), NAG_CRITICAL) os.chdir("xrootd-fallback") cmssw_version="CMSSW_9_2_6" stdout, exitCode = runCommand("scramv1", ["-a", scram_arch, "p", "CMSSW", cmssw_version], opts, combineStd=True) if stdout.strip(): log.info("scramv1 p CMSSW %s output:" % cmssw_version) print stdout if exitCode: return print_summary("Cannot make %s release area (SCRAM_ARCH=%s)" % (cmssw_version, scram_arch), NAG_CRITICAL) log.info("scramv1 p CMSSW %s successful", cmssw_version) os.chdir(cmssw_version) level = 1 if opts.verbose: level = 2 exitCode = evalCommand("scramv1", ["runtime", "-sh"], opts) if exitCode: return print_summary("Failure when setting up scramv1 runtime (CMSSW %s, SCRAM_ARCH=%s)" % (cmssw_version, scram_arch), NAG_CRITICAL) no_trial = 0 while ( no_trial < 3 ): if (opts.endtime - time.time()) < 60: log.error("Timed out before reaching 3rd attempt") exitCode = 8015 break xrootd_file = "/store/test/xrootd/" + CSWNFB_SITES[rndm_site] + CSWNFB_FILES[rndm_file] log.info("Xrootd fullpath: %s" % xrootd_file) fd = open("test_xrootd.py", "w") fd.write(cms_file % (xrootd_file, level)) fd.close() if True: os.environ["XRD_LOGLEVEL"] = "Debug" stdout, exitCode = runCommand("cmsRun", ["test_xrootd.py"], opts, 150, combineStd=True) no_trial += 1 for line in stdout.split('\n'): if re.search('opened', line) or re.search('redirect', line) or re.search('Reading', line) or re.search('server', line): print line maxlen = 12*1024 if opts.verbose: maxlen = 50*1024 if len(stdout) > maxlen: stdout = "cmsRun output truncated - only last %d KB shown:\n" % (maxlen/1024) + stdout[-(maxlen):] if ( exitCode == 0 ): break log.error("Failed cmsRun output:") print stdout if (opts.endtime - time.time()) < 60: log.error("Not enough time left for another try") exitCode = 8015 break rndm_site = (rndm_site + 1) % len(CSWNFB_SITES) if ( CSWNFB_SITES[rndm_site] == siteName ): rndm_site = (rndm_site + 1) % len(CSWNFB_SITES) log.info("Retrying with fallback site: %s" % CSWNFB_SITES[rndm_site]) if exitCode: if exitCode == 8015: returnCode = NAG_WARNING return print_summary("Test reached timeout ; exit code %s" % exitCode, returnCode) returnCode = NAG_CRITICAL return print_summary("Failed cmsRun; exit code %d" % exitCode, returnCode) log.info("Successful cmsRun.") # Return the correct exit code. log.info("Success!") return NAG_OK if __name__ == '__main__': sys.exit(main())