CE-cms-xrootd-fallback 15.4 KB
Newer Older
1
2
3
4
5
6
7
#!/usr/bin/env python

import os
import sys
import time
import errno
import fcntl
8
import re
9
10
11
12
13
import select
import signal
import socket
import logging
import optparse
14
import shutil
Andrea Sciaba's avatar
Andrea Sciaba committed
15
from distutils.spawn import find_executable
16
import xml.etree.ElementTree as ET
Andrea Sciaba's avatar
Andrea Sciaba committed
17
import random
18
19
20
21
22
23
24
25
26
27
28
29
# ########################################################################### #


CSWNFB_FILES = [ "/store/mc/SAM/GenericTTbar/AODSIM/" + \
                    "CMSSW_9_2_6_91X_mcRun1_realistic_v2-v1/00000/" + \
                    "A64CCCF2-5C76-E711-B359-0CC47A78A3F8.root",
                 "/store/mc/SAM/GenericTTbar/AODSIM/" + \
                    "CMSSW_9_2_6_91X_mcRun1_realistic_v2-v1/00000/" + \
                    "AE237916-5D76-E711-A48C-FA163EEEBFED.root",
                 "/store/mc/SAM/GenericTTbar/AODSIM/" + \
                    "CMSSW_9_2_6_91X_mcRun1_realistic_v2-v1/00000/" + \
                    "CE860B10-5D76-E711-BCA8-FA163EAA761A.root" ]
30
31
32
CSWNFB_SITES = ["T1_FR_CCIN2P3", "T2_US_Nebraska", "T1_RU_JINR", "T2_UK_London_Brunel", \
                "T2_CN_Beijing", "T2_BE_IIHE", "T2_FR_GRIF_LLR", "T2_HU_Budapest", \
                "T2_UK_London_IC", "T2_US_Wisconsin"]
33

34

35
36
37
38

log = logging.getLogger()

cms_file = """
39
40
41
import FWCore.ParameterSet.Config as cms
process = cms.Process('XrootdTest')
process.source = cms.Source('PoolSource',
42
  fileNames = cms.untracked.vstring("%s"),
43
44
45
)

process.SiteLocalConfigService = cms.Service("SiteLocalConfigService",
46
  debugLevel = cms.untracked.uint32(%d),
47
  overrideSourceCacheHintDir = cms.untracked.string("application-only"),
48
49
)

50
51
52
53
54
55
process.dump = cms.EDAnalyzer("EventContentAnalyzer",
  listContent=cms.untracked.bool(False),
  verboseForModuleLabels = cms.untracked.vstring("recoTracks_generalTracks"),
  getDataForModuleLabels=cms.untracked.vstring("recoTracks_generalTracks"),
  getData=cms.untracked.bool(True),
)
56
process.load("FWCore.MessageService.MessageLogger_cfi")
57
process.MessageLogger.cerr.FwkReport.reportEvery = 1
58
59

process.maxEvents = cms.untracked.PSet(
60
    input = cms.untracked.int32(1)
61
62
)

63
process.p = cms.EndPath(process.dump)
64
65
66
"""

def configure_logging(lvl=logging.INFO):
67
    logger = logging.getLogger("cms.CE.xrootd-fallback")
68
69
70
71
72
73
74
75
76
77
78
79
80
    logger.setLevel(lvl)
    handler = logging.StreamHandler(sys.stdout)
    formatter = logging.Formatter(fmt="[%(process)d] %(asctime)s [%(levelname)07s]: %(message)s")
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    global log
    log = logger

def print_summary(summary, retval):
    if retval == 0:
        log.info(summary)
    else:
        log.error(summary)
81
    print "Summary: %s" % summary
82
83
84
85
86
87
    return retval

def parse_opts():
    parser = optparse.OptionParser()
    parser.add_option("-v", "--verbose", dest="verbose", help="Increase logging verbosity", action="store_true", default=False)
    parser.add_option("-H", "--host", dest="hostname", help="Hostname to use")
88
    parser.add_option("-t", "--timeout", dest="timeout", help="Test timeout in seconds; default is 300", default=300, type="int")
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115

    opts, args = parser.parse_args()

    opts.starttime = time.time()
    opts.endtime = opts.starttime + opts.timeout

    if opts.verbose:
        configure_logging(logging.DEBUG)
    else:
        configure_logging()
    return args, opts

def getExitCode(name, default):
    try:
        return int(os.environ.get(name, default))
    except:
        return 0

def runCommandChild(cmd, args):
    try:
        try:
            os.execvpe(cmd, [cmd]+args, os.environ)
        except OSError, e:
            print "Error exec'ing %s: %s\n" % (cmd, str(e))
    finally:
        os._exit(127)

116
def runCommandParent(r, pid, opts, cmsruntimeout):
117
118
119
120
121
122
    flags = fcntl.fcntl(r, fcntl.F_GETFL)
    flags |= os.O_NONBLOCK
    fcntl.fcntl(r, fcntl.F_SETFL, flags)
    xlist = []
    rlist = [r]
    wlist = []
123
124
125
126
127
    if cmsruntimeout > 0:
        endtime = min(opts.endtime, time.time() + cmsruntimeout)
    else:
        endtime = opts.endtime
    timeout = endtime - time.time()
128
129
130
131
    stdout = ""
    exitCode = -1
    while (timeout >= 0) and (r not in xlist):
        rlist, wlist, xlist = select.select(rlist, wlist, xlist, timeout)
132
        timeout = endtime - time.time()
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
        if r in rlist:
            newstr = os.read(r, 1024)
            stdout += newstr
            while newstr:
                try:
                    newstr = os.read(r, 1024)
                except OSError, oe:
                    if oe.errno == errno.EAGAIN:
                        newstr = ''
                    else:
                        raise
                stdout += newstr
        rlist = [r]
        mypid, exitCode = os.waitpid(pid, os.WNOHANG)
        if mypid:
            break
        exitCode = -1
    if (timeout < 0) and (exitCode < 0):
        os.kill(pid, signal.SIGKILL)
152
        log.error("Killed CMSSW child (pid %d) due to timeout." % pid)
153
154
155
156
    if exitCode < 0:
        pid, exitCode = os.waitpid(pid, 0)
    return stdout, exitCode

157
def runCommand(cmd, args, opts, cmsruntimeout=0, combineStd=False):
158
159
160
161
162
    r, w = os.pipe()
    try:
        pid = os.fork()
        if pid: # parent
            os.close(w)
163
            return runCommandParent(r, pid, opts, cmsruntimeout)
164
165
166
        else:
            os.close(r)
            os.dup2(w, 1)
167
168
            if combineStd:
                os.dup2(w, 2)
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
            os.close(w)
            runCommandChild(cmd, args)
    finally:
        try:
            os.close(r)
        except OSError:
            pass
        try:
            os.close(w)
        except OSError:
            pass

def sourceFile(filename, opts):
    if not os.path.exists(filename):
        log.warn("Trying to source a file (%s) which does not exist" % filename)
        return

186
    stdout, exitCode = runCommand("/bin/sh", ["-c", "source %s && env -0" % filename], opts)
187
188
    if exitCode:
        return exitCode
189
    for line in stdout.split('\0'):
190
191
192
193
194
195
        info = line.split("=", 1)
        if len(info) == 2:
            os.environ[info[0]] = info[1]
    return exitCode

def evalCommand(cmd, args, opts):
196
    stdout, exitCode = runCommand('/bin/sh', ['-c', 'eval `%s %s` && env -0' % (cmd, ' '.join(args))], opts)
197
198
    if exitCode:
        return exitCode
199
    for line in stdout.split('\0'):
200
201
202
203
204
205
206
207
208
209
210
        info = line.split("=", 1)
        if len(info) == 2:
            os.environ[info[0]] = info[1]
    return exitCode

def main():
    # Change to test directory
    dirname = os.path.split(sys.argv[0])[0]
    if dirname:
        os.chdir(dirname)

211
212
213
214
215
216
    # Note we leak the file descriptor - this way, we hold the lock until
    # the probe finishes
    try:
        fd = os.open('cmssw_lock', os.O_RDWR|os.O_CREAT)
        fcntl.fcntl(fd, fcntl.F_SETFL, fcntl.FD_CLOEXEC)
        fcntl.flock(fd, fcntl.LOCK_EX)
Andrea Sciaba's avatar
Andrea Sciaba committed
217
    except (OSError, IOError), oe:
218
219
220
        # Move on - might as well finish running.
        log.exception(oe)

221
222
223
    _, opts = parse_opts()

    # For now, never give failures - just warnings
224
225
    NAG_CRITICAL = getExitCode("SAME_ERROR", 50)  # Maps to Nagios CRITICAL
    NAG_WARNING = getExitCode("SAME_WARNING", 40)
Andrea Sciaba's avatar
Andrea Sciaba committed
226
    NAG_OK = getExitCode("SAME_OK", 10)
227
228
229
230
231
232
233
234
235
236
237
238
239
240

    now = time.strftime("%x %X", time.gmtime())
    currdir = os.path.abspath(os.curdir)
    host = socket.gethostname()

    pilot_user_info, exitCode = runCommand("/usr/bin/id", ["-u"], opts)
    if exitCode:
        return print_summary("Failed to run 'id -u' to get user information", NAG_CRITICAL)
    pilot_user_info = pilot_user_info.strip()
    log.info("Ran at %s on host %s, directory %s, as user %s" % (now, host, currdir, pilot_user_info))

    pilot_uid = os.geteuid()

    pilot_proxy = os.environ.get("X509_USER_PROXY", "/tmp/x509up_u%d" % pilot_uid)
241
    cert_dir = os.environ.get("X509_CERT_DIR", "undefined")
242
243
244
245
    if not os.path.isfile(pilot_proxy):
        return print_summary("X509_USER_PROXY=%s is not a file" % pilot_proxy, NAG_CRITICAL)

    log.info("X509_USER_PROXY=%s" % pilot_proxy)
246
    log.info("X509_CERT_DIR=%s" % cert_dir)
247

248
# Workaround to suppress voms errors on OSG
249
250
    os.environ["VOMS_PROXY_INFO_DONT_VERIFY_AC"] = "1"

Andrea Sciaba's avatar
Andrea Sciaba committed
251
    if 'OSG_GRID' in os.environ and not find_executable('gfal-copy'):
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
        osg_setup = os.path.join(os.environ['OSG_GRID'], 'setup.sh')
        if not os.path.isfile(osg_setup):
            log.warning("$OSG_GRID is defined (%s), but %s does not exist." % (os.environ['OSG_GRID'], osg_setup))
        else:
            exitCode = sourceFile(osg_setup, opts)
            if exitCode:
                return print_summary("Failed to source %s." % osg_setup, NAG_CRITICAL)

    dn, exitCode = runCommand("voms-proxy-info", ["--identity"], opts)
    dn = dn.strip()
    if exitCode:
        log.warning("Unable to determine DN from voms-proxy-info")
    else:
        log.info("DN: %s" % dn)
    fqan, exitCode = runCommand("voms-proxy-info", ["--fqan"], opts)
    if fqan:
        fqan = fqan.splitlines()[0]
    if exitCode:
        log.warning("Unable to determine primary FQAN from voms-proxy-info")
    else:
        log.info("Primary FQAN: %s" % fqan)
273
274
275
    proxyTime, exitCode = runCommand("voms-proxy-info", ["--timeleft"], opts)
    if exitCode:
        log.warning("Unable to determine time left for proxy from voms-proxy-info")
276
277
278
279
280
    else:
        if proxyTime:
            proxyTime = int(proxyTime.splitlines()[0])
            if proxyTime == 0:
                return print_summary("VOMS proxy has expired.", NAG_CRITICAL)
281
282

    # Set the CMS environment
Andrea Sciaba's avatar
Andrea Sciaba committed
283
    if 'VO_CMS_SW_DIR' in os.environ and not find_executable('gfal-copy'):
284
        sw_dir = os.environ['VO_CMS_SW_DIR']
Andrea Sciaba's avatar
Andrea Sciaba committed
285
    elif 'OSG_APP' in os.environ and not find_executable('gfal-copy'):
286
        sw_dir = os.path.join(os.environ['OSG_APP'], 'cmssoft', 'cms')
287
288
        if not os.path.exists(sw_dir):
            sw_dir = os.environ['OSG_APP']
289
290
291
292
293
294
    elif 'CVMFS' in os.environ:
        sw_dir = os.path.join(os.environ['CVMFS'], 'cms.cern.ch')
    elif os.path.isdir('/cvmfs/cms.cern.ch'):
        sw_dir = '/cvmfs/cms.cern.ch'
    else:
        return print_summary("None of $VO_CMS_SW_DIR, $OSG_APP, $CVMFS, or /cvmfs/cms.cern.ch available", NAG_CRITICAL)
Andrea Sciaba's avatar
Andrea Sciaba committed
295
    sw_dir = '/cvmfs/cms.cern.ch'
296
297
298
299
300
301
    log.info("Using software directory %s" % sw_dir)

    sw_setup_script = os.path.join(sw_dir, 'cmsset_default.sh')
    if not os.path.isfile(sw_setup_script):
        return print_summary("The software setup script (%s) is missing" % sw_setup_script, NAG_CRITICAL)
    log.info("CMS software setup script (%s) is present." % sw_setup_script)
302
303
304
305
306
307

    scram_arch, _ = runCommand("/cvmfs/cms.cern.ch/common/cmsarch", [], opts)
    if 'slc6' in scram_arch:
        scram_arch = 'slc6_amd64_gcc530'
    elif 'slc7' in scram_arch:
        scram_arch = 'slc7_amd64_gcc530'
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
    log.info("Using SCRAM_ARCH=%s" % scram_arch)

    exitCode = sourceFile(sw_setup_script, opts)
    if exitCode:
        return print_summary("Failed to source setup script %s." % sw_setup_script, NAG_CRITICAL)

    if 'CMS_PATH' not in os.environ:
        return print_summary("CMS_PATH not defined after sourcing %s" % sw_setup_script, NAG_CRITICAL)
    if not os.path.isdir(os.environ['CMS_PATH']):
        return print_summary("CMS_PATH %s is not a directory." % os.environ['CMS_PATH'], NAG_CRITICAL)

    # Parse the local config file and find site name
    job_config_dir = os.path.join(os.environ['CMS_PATH'], 'SITECONF', 'local', 'JobConfig')
    if not os.path.isdir(job_config_dir):
        return print_summary("JobConfig directory %s does not exist" % job_config_dir, NAG_CRITICAL)

    config_file = os.path.join(job_config_dir, 'site-local-config.xml')
    if not os.path.isfile(config_file):
        return print_summary("Local configuration file %s does not exist." % config_file, NAG_CRITICAL)
    log.info("Local configuration file: %s" % config_file)

329
330
331
332
333
334
335
336
    fd = open(config_file, 'r')
    tree = ET.parse(fd)
    root = tree.getroot()
    numCatalogs = 0
    for siteinfo in root.findall('site'):
        siteName = siteinfo.get('name')
        log.info("Site name: %s" % siteName)
        for event in siteinfo.findall('event-data'):
Andrea Sciaba's avatar
Andrea Sciaba committed
337
338
            for catalog in event.findall('catalog'):
                numCatalogs = numCatalogs + 1
339
340
    fd.close()
    if numCatalogs == 1:
341
        log.info("Only 1 catalog line in site-local-config.xml file -- may be wrong configuration.")
342

343
344

    # Setup CMSSW, run command
345
346
347
348
349
350
351
352
    rndm_file = random.randint(0, len(CSWNFB_FILES) - 1)
    log.info("Fallback file: %s" % CSWNFB_FILES[rndm_file])
    #
    rndm_site = random.randint(0, len(CSWNFB_SITES) - 1)
    if ( CSWNFB_SITES[rndm_site] == siteName ):
        rndm_site = (rndm_site + 1) % len(CSWNFB_SITES)
    log.info("Fallback site: %s" % CSWNFB_SITES[rndm_site])

353
    log.info("Xrootd file we will test: %s" % CSWNFB_FILES[rndm_file])
354

355
    shutil.rmtree("xrootd-fallback", 1)
356
357
358
359
360
361
    try:
        os.mkdir("xrootd-fallback")
    except OSError, oe:
        return print_summary("Failure to create test directory 'xrootd-fallback': %s." % str(oe), NAG_CRITICAL)
    os.chdir("xrootd-fallback")

Andrea Sciaba's avatar
Andrea Sciaba committed
362
    cmssw_version="CMSSW_9_2_6"
363
    stdout, exitCode = runCommand("scramv1", ["-a", scram_arch, "p", "CMSSW", cmssw_version], opts, combineStd=True)
364
365
366
    if stdout.strip():
        log.info("scramv1 p CMSSW %s output:" % cmssw_version)
        print stdout
367
368
    if exitCode:
        return print_summary("Cannot make %s release area (SCRAM_ARCH=%s)" % (cmssw_version, scram_arch), NAG_CRITICAL)
369
    log.info("scramv1 p CMSSW %s successful", cmssw_version)
370
371
372

    os.chdir(cmssw_version)

373
374
375
376
    level = 1
    if opts.verbose:
        level = 2

377
378
379
380
    exitCode = evalCommand("scramv1", ["runtime", "-sh"], opts)
    if exitCode:
        return print_summary("Failure when setting up scramv1 runtime (CMSSW %s, SCRAM_ARCH=%s)" % (cmssw_version, scram_arch), NAG_CRITICAL)

381
382
    no_trial = 0
    while ( no_trial < 3 ):
383
        if (opts.endtime - time.time()) < 60:
384
            log.error("Timed out before reaching 3rd attempt")
385
386
387
            exitCode = 8015
            break

388
389
390
391
392
393
394
        xrootd_file = "/store/test/xrootd/" + CSWNFB_SITES[rndm_site] + CSWNFB_FILES[rndm_file]
        log.info("Xrootd fullpath: %s" % xrootd_file)

        fd = open("test_xrootd.py", "w")
        fd.write(cms_file % (xrootd_file, level))
        fd.close()

Andrea Sciaba's avatar
Andrea Sciaba committed
395
396
397
        if True:
            os.environ["XRD_LOGLEVEL"] = "Debug"

398
399
        stdout, exitCode = runCommand("cmsRun", ["test_xrootd.py"], opts, 150, combineStd=True)

400
401
        no_trial += 1
        for line in stdout.split('\n'):
402
            if re.search('opened', line) or re.search('redirect', line) or re.search('Reading', line) or re.search('server', line):
Andrea Sciaba's avatar
Andrea Sciaba committed
403
                print line
404

405
406
407
408
409
410
411
412
413
        maxlen = 12*1024
        if opts.verbose:
            maxlen = 50*1024

        if len(stdout) > maxlen:
            stdout = "cmsRun output truncated - only last %d KB shown:\n" % (maxlen/1024) + stdout[-(maxlen):]

        if ( exitCode == 0 ):
            break
414

415
416
417
        log.error("Failed cmsRun output:")
        print stdout

418
419
        if (opts.endtime - time.time()) < 60:
            log.error("Not enough time left for another try")
420
            exitCode = 8015
421
422
            break 

423
424
425
        rndm_site = (rndm_site + 1) % len(CSWNFB_SITES)
        if ( CSWNFB_SITES[rndm_site] == siteName ):
            rndm_site = (rndm_site + 1) % len(CSWNFB_SITES)
426
        log.info("Retrying with fallback site: %s" % CSWNFB_SITES[rndm_site])
427
428

    if exitCode:
429
430
        if exitCode == 8015:
            returnCode = NAG_WARNING
431
            return print_summary("Test reached timeout ; exit code %s" % exitCode, returnCode)
432
433
        returnCode = NAG_CRITICAL
        return print_summary("Failed cmsRun; exit code %d" % exitCode, returnCode)
434

435
    log.info("Successful cmsRun.")
436
437

    # Return the correct exit code.
Andrea Sciaba's avatar
Andrea Sciaba committed
438
439
    log.info("Success!")
    return NAG_OK
440
441
442
443

if __name__ == '__main__':
    sys.exit(main())