CE-cms-xrootd-fallback 12.9 KB
Newer Older
1
2
3
4
5
6
7
#!/usr/bin/env python

import os
import sys
import time
import errno
import fcntl
8
import re
9
10
11
12
13
import select
import signal
import socket
import logging
import optparse
14
import shutil
15
16
import xml.etree.ElementTree as ET

17
18
19
20

log = logging.getLogger()

cms_file = """
21
22
23
import FWCore.ParameterSet.Config as cms
process = cms.Process('XrootdTest')
process.source = cms.Source('PoolSource',
24
  fileNames = cms.untracked.vstring("%s"),
25
26
27
)

process.SiteLocalConfigService = cms.Service("SiteLocalConfigService",
28
  debugLevel = cms.untracked.uint32(%d),
29
  overrideSourceCacheHintDir = cms.untracked.string("application-only"),
30
31
32
33
)

process.dump = cms.EDAnalyzer("EventContentAnalyzer", listContent=cms.untracked.bool(False), getData=cms.untracked.bool(True))
process.load("FWCore.MessageService.MessageLogger_cfi")
34
process.MessageLogger.cerr.FwkReport.reportEvery = 1
35
36

process.maxEvents = cms.untracked.PSet(
37
    input = cms.untracked.int32(10)
38
39
)

40
process.p = cms.EndPath(process.dump)
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
"""

def configure_logging(lvl=logging.INFO):
    logger = logging.getLogger("cms.CE.xrootd-access")
    logger.setLevel(lvl)
    handler = logging.StreamHandler(sys.stdout)
    formatter = logging.Formatter(fmt="[%(process)d] %(asctime)s [%(levelname)07s]: %(message)s")
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    global log
    log = logger

def print_summary(summary, retval):
    if retval == 0:
        log.info(summary)
    else:
        log.error(summary)
    print "summary: %s" % summary
    return retval

def parse_opts():
    parser = optparse.OptionParser()
    parser.add_option("-v", "--verbose", dest="verbose", help="Increase logging verbosity", action="store_true", default=False)
    parser.add_option("-H", "--host", dest="hostname", help="Hostname to use")
    parser.add_option("-t", "--timeout", dest="timeout", help="Test timeout in seconds; default is 240", default=240, type="int")

    opts, args = parser.parse_args()

    opts.starttime = time.time()
    opts.endtime = opts.starttime + opts.timeout

    if opts.verbose:
        configure_logging(logging.DEBUG)
    else:
        configure_logging()
    return args, opts

def getExitCode(name, default):
    try:
        return int(os.environ.get(name, default))
    except:
        return 0

def runCommandChild(cmd, args):
    try:
        try:
            os.execvpe(cmd, [cmd]+args, os.environ)
        except OSError, e:
            print "Error exec'ing %s: %s\n" % (cmd, str(e))
    finally:
        os._exit(127)

def runCommandParent(r, pid, opts):
    flags = fcntl.fcntl(r, fcntl.F_GETFL)
    flags |= os.O_NONBLOCK
    fcntl.fcntl(r, fcntl.F_SETFL, flags)
    xlist = []
    rlist = [r]
    wlist = []
    timeout = opts.endtime - time.time()
    stdout = ""

    exitCode = -1
    while (timeout >= 0) and (r not in xlist):
        rlist, wlist, xlist = select.select(rlist, wlist, xlist, timeout)
        timeout = opts.endtime - time.time()
        if r in rlist:
            newstr = os.read(r, 1024)
            stdout += newstr
            while newstr:
                try:
                    newstr = os.read(r, 1024)
                except OSError, oe:
                    if oe.errno == errno.EAGAIN:
                        newstr = ''
                    else:
                        raise
                stdout += newstr
        rlist = [r]
        mypid, exitCode = os.waitpid(pid, os.WNOHANG)
        if mypid:
            break
        exitCode = -1
    if (timeout < 0) and (exitCode < 0):
        os.kill(pid, signal.SIGKILL)
126
        print "Killed CMSSW child (pid %d) due to timeout." % pid
127
128
129
130
    if exitCode < 0:
        pid, exitCode = os.waitpid(pid, 0)
    return stdout, exitCode

131
def runCommand(cmd, args, opts, combineStd=False):
132
133
134
135
136
137
138
139
140
    r, w = os.pipe()
    try:
        pid = os.fork()
        if pid: # parent
            os.close(w)
            return runCommandParent(r, pid, opts)
        else:
            os.close(r)
            os.dup2(w, 1)
141
142
            if combineStd:
                os.dup2(w, 2)
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
            os.close(w)
            runCommandChild(cmd, args)
    finally:
        try:
            os.close(r)
        except OSError:
            pass
        try:
            os.close(w)
        except OSError:
            pass

def sourceFile(filename, opts):
    if not os.path.exists(filename):
        log.warn("Trying to source a file (%s) which does not exist" % filename)
        return

160
    stdout, exitCode = runCommand("/bin/sh", ["-c", "source %s && env -0" % filename], opts)
161
162
    if exitCode:
        return exitCode
163
    for line in stdout.split('\0'):
164
165
166
167
168
169
        info = line.split("=", 1)
        if len(info) == 2:
            os.environ[info[0]] = info[1]
    return exitCode

def evalCommand(cmd, args, opts):
170
    stdout, exitCode = runCommand('/bin/sh', ['-c', 'eval `%s %s` && env -0' % (cmd, ' '.join(args))], opts)
171
172
    if exitCode:
        return exitCode
173
    for line in stdout.split('\0'):
174
175
176
177
178
179
180
181
182
183
184
        info = line.split("=", 1)
        if len(info) == 2:
            os.environ[info[0]] = info[1]
    return exitCode

def main():
    # Change to test directory
    dirname = os.path.split(sys.argv[0])[0]
    if dirname:
        os.chdir(dirname)

185
186
187
188
189
190
    # Note we leak the file descriptor - this way, we hold the lock until
    # the probe finishes
    try:
        fd = os.open('cmssw_lock', os.O_RDWR|os.O_CREAT)
        fcntl.fcntl(fd, fcntl.F_SETFL, fcntl.FD_CLOEXEC)
        fcntl.flock(fd, fcntl.LOCK_EX)
Andrea Sciaba's avatar
Andrea Sciaba committed
191
    except (OSError, IOError), oe:
192
193
194
        # Move on - might as well finish running.
        log.exception(oe)

195
196
197
    _, opts = parse_opts()

    # For now, never give failures - just warnings
198
199
    NAG_CRITICAL = getExitCode("SAME_ERROR", 50)  # Maps to Nagios CRITICAL
    NAG_WARNING = getExitCode("SAME_WARNING", 40)
Andrea Sciaba's avatar
Andrea Sciaba committed
200
    NAG_OK = getExitCode("SAME_OK", 10)
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247

    # Default to successful job
    exitcode = NAG_OK

    now = time.strftime("%x %X", time.gmtime())
    currdir = os.path.abspath(os.curdir)
    host = socket.gethostname()

    pilot_user_info, exitCode = runCommand("/usr/bin/id", ["-u"], opts)
    if exitCode:
        return print_summary("Failed to run 'id -u' to get user information", NAG_CRITICAL)
    pilot_user_info = pilot_user_info.strip()
    log.info("Ran at %s on host %s, directory %s, as user %s" % (now, host, currdir, pilot_user_info))

    pilot_uid = os.geteuid()

    pilot_proxy = os.environ.get("X509_USER_PROXY", "/tmp/x509up_u%d" % pilot_uid)
    if not os.path.isfile(pilot_proxy):
        return print_summary("X509_USER_PROXY=%s is not a file" % pilot_proxy, NAG_CRITICAL)

    log.info("X509_USER_PROXY=%s" % pilot_proxy)

    # Workaround to suppress voms errors on OSG
    os.environ["VOMS_PROXY_INFO_DONT_VERIFY_AC"] = "1"

    if 'OSG_GRID' in os.environ:
        osg_setup = os.path.join(os.environ['OSG_GRID'], 'setup.sh')
        if not os.path.isfile(osg_setup):
            log.warning("$OSG_GRID is defined (%s), but %s does not exist." % (os.environ['OSG_GRID'], osg_setup))
        else:
            exitCode = sourceFile(osg_setup, opts)
            if exitCode:
                return print_summary("Failed to source %s." % osg_setup, NAG_CRITICAL)

    dn, exitCode = runCommand("voms-proxy-info", ["--identity"], opts)
    dn = dn.strip()
    if exitCode:
        log.warning("Unable to determine DN from voms-proxy-info")
    else:
        log.info("DN: %s" % dn)
    fqan, exitCode = runCommand("voms-proxy-info", ["--fqan"], opts)
    if fqan:
        fqan = fqan.splitlines()[0]
    if exitCode:
        log.warning("Unable to determine primary FQAN from voms-proxy-info")
    else:
        log.info("Primary FQAN: %s" % fqan)
248
249
250
    proxyTime, exitCode = runCommand("voms-proxy-info", ["--timeleft"], opts)
    if exitCode:
        log.warning("Unable to determine time left for proxy from voms-proxy-info")
251
252
253
254
255
    else:
        if proxyTime:
            proxyTime = int(proxyTime.splitlines()[0])
            if proxyTime == 0:
                return print_summary("VOMS proxy has expired.", NAG_CRITICAL)
256
257
258
259
260
261

    # Set the CMS environment
    if 'VO_CMS_SW_DIR' in os.environ:
        sw_dir = os.environ['VO_CMS_SW_DIR']
    elif 'OSG_APP' in os.environ:
        sw_dir = os.path.join(os.environ['OSG_APP'], 'cmssoft', 'cms')
262
263
        if not os.path.exists(sw_dir):
            sw_dir = os.environ['OSG_APP']
264
265
266
267
268
269
270
271
272
273
274
275
    elif 'CVMFS' in os.environ:
        sw_dir = os.path.join(os.environ['CVMFS'], 'cms.cern.ch')
    elif os.path.isdir('/cvmfs/cms.cern.ch'):
        sw_dir = '/cvmfs/cms.cern.ch'
    else:
        return print_summary("None of $VO_CMS_SW_DIR, $OSG_APP, $CVMFS, or /cvmfs/cms.cern.ch available", NAG_CRITICAL)
    log.info("Using software directory %s" % sw_dir)

    sw_setup_script = os.path.join(sw_dir, 'cmsset_default.sh')
    if not os.path.isfile(sw_setup_script):
        return print_summary("The software setup script (%s) is missing" % sw_setup_script, NAG_CRITICAL)
    log.info("CMS software setup script (%s) is present." % sw_setup_script)
276
277
278
279
280
281

    scram_arch, _ = runCommand("/cvmfs/cms.cern.ch/common/cmsarch", [], opts)
    if 'slc6' in scram_arch:
        scram_arch = 'slc6_amd64_gcc530'
    elif 'slc7' in scram_arch:
        scram_arch = 'slc7_amd64_gcc530'
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
    log.info("Using SCRAM_ARCH=%s" % scram_arch)

    exitCode = sourceFile(sw_setup_script, opts)
    if exitCode:
        return print_summary("Failed to source setup script %s." % sw_setup_script, NAG_CRITICAL)

    if 'CMS_PATH' not in os.environ:
        return print_summary("CMS_PATH not defined after sourcing %s" % sw_setup_script, NAG_CRITICAL)
    if not os.path.isdir(os.environ['CMS_PATH']):
        return print_summary("CMS_PATH %s is not a directory." % os.environ['CMS_PATH'], NAG_CRITICAL)

    # Parse the local config file and find site name
    job_config_dir = os.path.join(os.environ['CMS_PATH'], 'SITECONF', 'local', 'JobConfig')
    if not os.path.isdir(job_config_dir):
        return print_summary("JobConfig directory %s does not exist" % job_config_dir, NAG_CRITICAL)

    config_file = os.path.join(job_config_dir, 'site-local-config.xml')
    if not os.path.isfile(config_file):
        return print_summary("Local configuration file %s does not exist." % config_file, NAG_CRITICAL)
    log.info("Local configuration file: %s" % config_file)

303
304
305
306
307
308
309
310
311
312
313
314
    fd = open(config_file, 'r')
    tree = ET.parse(fd)
    root = tree.getroot()
    numCatalogs = 0
    for siteinfo in root.findall('site'):
        siteName = siteinfo.get('name')
        log.info("Site name: %s" % siteName)
        for event in siteinfo.findall('event-data'):
	    for catalog in event.findall('catalog'):
	    	    numCatalogs = numCatalogs + 1
    fd.close()
    if numCatalogs == 1:
315
        log.info("Only 1 catalog line in site-local-config.xml file -- may be wrong configuration.")
316

317
318

    # Setup CMSSW, run command
319
    cmssw_file="/store/mc/SAM/GenericTTbar/AODSIM/CMSSW_9_0_0_90X_mcRun1_realistic_v4-v1/10000/28B9D1FB-8B31-E711-AA4E-0025905B85B2.root"
320
321
322
    xrootd_file="/store/test/xrootd/CMSSAM/%s" % cmssw_file
    log.info("Xrootd file we will test: %s" % xrootd_file)

323
    shutil.rmtree("xrootd-fallback", 1)
324
325
326
327
328
329
    try:
        os.mkdir("xrootd-fallback")
    except OSError, oe:
        return print_summary("Failure to create test directory 'xrootd-fallback': %s." % str(oe), NAG_CRITICAL)
    os.chdir("xrootd-fallback")

330
331
    cmssw_version="CMSSW_9_0_0"
    stdout, exitCode = runCommand("scramv1", ["-a", scram_arch, "p", "CMSSW", cmssw_version], opts, combineStd=True)
332
333
334
    if stdout.strip():
        log.info("scramv1 p CMSSW %s output:" % cmssw_version)
        print stdout
335
336
    if exitCode:
        return print_summary("Cannot make %s release area (SCRAM_ARCH=%s)" % (cmssw_version, scram_arch), NAG_CRITICAL)
337
    log.info("scramv1 p CMSSW %s successful", cmssw_version)
338
339
340

    os.chdir(cmssw_version)

341
342
343
344
    level = 1
    if opts.verbose:
        level = 2

345
    fd = open("test_xrootd.py", "w")
346
    fd.write(cms_file % (xrootd_file, level))
347
348
349
350
351
352
    fd.close()

    exitCode = evalCommand("scramv1", ["runtime", "-sh"], opts)
    if exitCode:
        return print_summary("Failure when setting up scramv1 runtime (CMSSW %s, SCRAM_ARCH=%s)" % (cmssw_version, scram_arch), NAG_CRITICAL)

353
    stdout, exitCode = runCommand("cmsRun", ["test_xrootd.py"], opts, combineStd=True)
354
355
356
    for line in stdout.split('\n'):
    	    if re.search('opened', line) or re.search('redirect', line):
    	    	    print line
357

358
    maxlen = 12*1024
359
360
361
362
363
    if opts.verbose:
        maxlen = 50*1024

    if len(stdout) > maxlen:
        stdout = "cmsRun output truncated - only last %d KB shown:\n" % (maxlen/1024) + stdout[-(maxlen):]
364
365
366
    if exitCode:
        log.error("Failed cmsRun.  Output:")
        print stdout
367
368
369
370
        returnCode = NAG_CRITICAL
        if numCatalogs > 1:   # Fallback correctly configured, so only WARN
            returnCode = NAG_WARNING
        return print_summary("Failed cmsRun; exit code %d" % exitCode, returnCode)
371
    log.info("Successful cmsRun.")
372
373
374

    # Return the correct exit code.
    if exitcode != NAG_OK:
375
376
    	log.info("Output:")
        print stdout
377
378
379
380
381
382
383
384
        log.warning("Execution contains warnings")
    else:
        log.info("Success!")
    return exitcode

if __name__ == '__main__':
    sys.exit(main())