CE-cms-xrootd-access 13.5 KB
Newer Older
1
2
3
4
5
6
7
#!/usr/bin/env python

import os
import sys
import time
import errno
import fcntl
8
import re
9
import select
10
import shutil
11
12
13
14
import signal
import socket
import logging
import optparse
15
import subprocess
16
17
18
19
import xml.etree.ElementTree as ET

################
# List of sites that should receive Critical errors rather than Warnings
20
criticalSites = ['T0_.*', 'T1_.*', 'T2_.*']
21
22
23
24

log = logging.getLogger()

cms_file = """
25
26
27
import FWCore.ParameterSet.Config as cms
process = cms.Process('XrootdTest')
process.source = cms.Source('PoolSource',
28
  fileNames = cms.untracked.vstring("%s"),
29
30
31
)

process.SiteLocalConfigService = cms.Service("SiteLocalConfigService",
32
  debugLevel = cms.untracked.uint32(%d),
33
  overrideSourceCacheHintDir = cms.untracked.string("application-only"),
34
35
36
37
)

process.dump = cms.EDAnalyzer("EventContentAnalyzer", listContent=cms.untracked.bool(False), getData=cms.untracked.bool(True))
process.load("FWCore.MessageService.MessageLogger_cfi")
38
process.MessageLogger.cerr.FwkReport.reportEvery = 1
39
40

process.maxEvents = cms.untracked.PSet(
41
    input = cms.untracked.int32(10)
42
43
)

44
process.p = cms.EndPath(process.dump)
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
"""

def configure_logging(lvl=logging.INFO):
    logger = logging.getLogger("cms.CE.xrootd-access")
    logger.setLevel(lvl)
    handler = logging.StreamHandler(sys.stdout)
    formatter = logging.Formatter(fmt="[%(process)d] %(asctime)s [%(levelname)07s]: %(message)s")
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    global log
    log = logger

def print_summary(summary, retval):
    if retval == 0:
        log.info(summary)
    else:
        log.error(summary)
    print "summary: %s" % summary
    return retval

def parse_opts():
    parser = optparse.OptionParser()
    parser.add_option("-v", "--verbose", dest="verbose", help="Increase logging verbosity", action="store_true", default=False)
    parser.add_option("-H", "--host", dest="hostname", help="Hostname to use")
    parser.add_option("-t", "--timeout", dest="timeout", help="Test timeout in seconds; default is 240", default=240, type="int")

    opts, args = parser.parse_args()

    opts.starttime = time.time()
    opts.endtime = opts.starttime + opts.timeout

    if opts.verbose:
        configure_logging(logging.DEBUG)
    else:
        configure_logging()
    return args, opts

def getExitCode(name, default):
    try:
        return int(os.environ.get(name, default))
    except:
        return 0

def runCommandChild(cmd, args):
    try:
        try:
            os.execvpe(cmd, [cmd]+args, os.environ)
        except OSError, e:
            print "Error exec'ing %s: %s\n" % (cmd, str(e))
    finally:
        os._exit(127)

def runCommandParent(r, pid, opts):
    flags = fcntl.fcntl(r, fcntl.F_GETFL)
    flags |= os.O_NONBLOCK
    fcntl.fcntl(r, fcntl.F_SETFL, flags)
    xlist = []
    rlist = [r]
    wlist = []
    timeout = opts.endtime - time.time()
    stdout = ""

    exitCode = -1
108
    # print "timeout is %d." % timeout
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
    while (timeout >= 0) and (r not in xlist):
        rlist, wlist, xlist = select.select(rlist, wlist, xlist, timeout)
        timeout = opts.endtime - time.time()
        if r in rlist:
            newstr = os.read(r, 1024)
            stdout += newstr
            while newstr:
                try:
                    newstr = os.read(r, 1024)
                except OSError, oe:
                    if oe.errno == errno.EAGAIN:
                        newstr = ''
                    else:
                        raise
                stdout += newstr
        rlist = [r]
        mypid, exitCode = os.waitpid(pid, os.WNOHANG)
        if mypid:
            break
        exitCode = -1
    if (timeout < 0) and (exitCode < 0):
        os.kill(pid, signal.SIGKILL)
131
        print "Killed CMSSW child (pid %d) due to timeout." % pid
132
133
134
135
    if exitCode < 0:
        pid, exitCode = os.waitpid(pid, 0)
    return stdout, exitCode

136
def runCommand(cmd, args, opts, combineStd=False):
137
138
139
140
141
142
143
144
145
    r, w = os.pipe()
    try:
        pid = os.fork()
        if pid: # parent
            os.close(w)
            return runCommandParent(r, pid, opts)
        else:
            os.close(r)
            os.dup2(w, 1)
146
147
            if combineStd:
                os.dup2(w, 2)
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
            os.close(w)
            runCommandChild(cmd, args)
    finally:
        try:
            os.close(r)
        except OSError:
            pass
        try:
            os.close(w)
        except OSError:
            pass

def sourceFile(filename, opts):
    if not os.path.exists(filename):
        log.warn("Trying to source a file (%s) which does not exist" % filename)
        return

165
    stdout, exitCode = runCommand("/bin/sh", ["-c", "source %s && env -0" % filename], opts)
166
167
    if exitCode:
        return exitCode
168
    for line in stdout.split('\0'):
169
170
171
172
173
174
        info = line.split("=", 1)
        if len(info) == 2:
            os.environ[info[0]] = info[1]
    return exitCode

def evalCommand(cmd, args, opts):
175
    stdout, exitCode = runCommand('/bin/sh', ['-c', 'eval `%s %s` && env -0' % (cmd, ' '.join(args))], opts)
176
177
    if exitCode:
        return exitCode
178
    for line in stdout.split('\0'):
179
180
181
182
183
184
185
186
187
188
189
        info = line.split("=", 1)
        if len(info) == 2:
            os.environ[info[0]] = info[1]
    return exitCode

def main():
    # Change to test directory
    dirname = os.path.split(sys.argv[0])[0]
    if dirname:
        os.chdir(dirname)

190
191
192
193
194
195
    # Note we leak the file descriptor - this way, we hold the lock until
    # the probe finishes
    try:
        fd = os.open('cmssw_lock', os.O_RDWR|os.O_CREAT)
        fcntl.fcntl(fd, fcntl.F_SETFL, fcntl.FD_CLOEXEC)
        fcntl.flock(fd, fcntl.LOCK_EX)
Andrea Sciaba's avatar
Andrea Sciaba committed
196
    except (OSError, IOError), oe:
197
198
199
        # Move on - might as well finish running.
        log.exception(oe)

200
201
    _, opts = parse_opts()

202
203
    NAG_CRITICAL = getExitCode("SAME_ERROR", 50)  # Maps to Nagios CRITICAL
    NAG_WARNING = getExitCode("SAME_WARNING", 40)
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
    NAG_OK = getExitCode("SAME_OK", 0)

    # Default to successful job
    exitcode = NAG_OK

    now = time.strftime("%x %X", time.gmtime())
    currdir = os.path.abspath(os.curdir)
    host = socket.gethostname()

    pilot_user_info, exitCode = runCommand("/usr/bin/id", ["-u"], opts)
    if exitCode:
        return print_summary("Failed to run 'id -u' to get user information", NAG_CRITICAL)
    pilot_user_info = pilot_user_info.strip()
    log.info("Ran at %s on host %s, directory %s, as user %s" % (now, host, currdir, pilot_user_info))

    pilot_uid = os.geteuid()

    pilot_proxy = os.environ.get("X509_USER_PROXY", "/tmp/x509up_u%d" % pilot_uid)
    if not os.path.isfile(pilot_proxy):
        return print_summary("X509_USER_PROXY=%s is not a file" % pilot_proxy, NAG_CRITICAL)

    log.info("X509_USER_PROXY=%s" % pilot_proxy)

    # Workaround to suppress voms errors on OSG
    os.environ["VOMS_PROXY_INFO_DONT_VERIFY_AC"] = "1"

    if 'OSG_GRID' in os.environ:
        osg_setup = os.path.join(os.environ['OSG_GRID'], 'setup.sh')
        if not os.path.isfile(osg_setup):
            log.warning("$OSG_GRID is defined (%s), but %s does not exist." % (os.environ['OSG_GRID'], osg_setup))
        else:
            exitCode = sourceFile(osg_setup, opts)
            if exitCode:
                return print_summary("Failed to source %s." % osg_setup, NAG_CRITICAL)

    dn, exitCode = runCommand("voms-proxy-info", ["--identity"], opts)
    dn = dn.strip()
    if exitCode:
        log.warning("Unable to determine DN from voms-proxy-info")
    else:
        log.info("DN: %s" % dn)
    fqan, exitCode = runCommand("voms-proxy-info", ["--fqan"], opts)
    if fqan:
        fqan = fqan.splitlines()[0]
    if exitCode:
        log.warning("Unable to determine primary FQAN from voms-proxy-info")
    else:
        log.info("Primary FQAN: %s" % fqan)

    # Set the CMS environment
    if 'VO_CMS_SW_DIR' in os.environ:
        sw_dir = os.environ['VO_CMS_SW_DIR']
    elif 'OSG_APP' in os.environ:
        sw_dir = os.path.join(os.environ['OSG_APP'], 'cmssoft', 'cms')
258
259
        if not os.path.exists(sw_dir):
            sw_dir = os.environ['OSG_APP']
260
261
262
263
264
265
266
267
268
269
270
271
    elif 'CVMFS' in os.environ:
        sw_dir = os.path.join(os.environ['CVMFS'], 'cms.cern.ch')
    elif os.path.isdir('/cvmfs/cms.cern.ch'):
        sw_dir = '/cvmfs/cms.cern.ch'
    else:
        return print_summary("None of $VO_CMS_SW_DIR, $OSG_APP, $CVMFS, or /cvmfs/cms.cern.ch available", NAG_CRITICAL)
    log.info("Using software directory %s" % sw_dir)

    sw_setup_script = os.path.join(sw_dir, 'cmsset_default.sh')
    if not os.path.isfile(sw_setup_script):
        return print_summary("The software setup script (%s) is missing" % sw_setup_script, NAG_CRITICAL)
    log.info("CMS software setup script (%s) is present." % sw_setup_script)
272
273
274
275
276
277

    scram_arch, _ = runCommand("/cvmfs/cms.cern.ch/common/cmsarch", [], opts)
    if 'slc6' in scram_arch:
        scram_arch = 'slc6_amd64_gcc530'
    elif 'slc7' in scram_arch:
        scram_arch = 'slc7_amd64_gcc530'
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
    log.info("Using SCRAM_ARCH=%s" % scram_arch)

    exitCode = sourceFile(sw_setup_script, opts)
    if exitCode:
        return print_summary("Failed to source setup script %s." % sw_setup_script, NAG_CRITICAL)

    if 'CMS_PATH' not in os.environ:
        return print_summary("CMS_PATH not defined after sourcing %s" % sw_setup_script, NAG_CRITICAL)
    if not os.path.isdir(os.environ['CMS_PATH']):
        return print_summary("CMS_PATH %s is not a directory." % os.environ['CMS_PATH'], NAG_CRITICAL)

    # Parse the local config file and find site name
    job_config_dir = os.path.join(os.environ['CMS_PATH'], 'SITECONF', 'local', 'JobConfig')
    if not os.path.isdir(job_config_dir):
        return print_summary("JobConfig directory %s does not exist" % job_config_dir, NAG_CRITICAL)

    config_file = os.path.join(job_config_dir, 'site-local-config.xml')
    if not os.path.isfile(config_file):
        return print_summary("Local configuration file %s does not exist." % config_file, NAG_CRITICAL)
    log.info("Local configuration file: %s" % config_file)

299
300
301
302
303
304
305
306
307
    fd = open(config_file, 'r')
    tree = ET.parse(fd)
    root = tree.getroot()
    for siteinfo in root.findall('site'):
        siteName = siteinfo.get('name')
        if siteName == None:
            return print_summary("Missing site name in configuration file %s" % config_file, NAG_CRITICAL)
        log.info("Site name: %s" % siteName)
    fd.close()
308

309
310
    siteIsCritical = False
    for sitePattern in criticalSites:
311
312
313
        if re.match(sitePattern, siteName) != None:
            siteIsCritical = True
            break
314
315
316
317
    
    returnCode = NAG_CRITICAL
    if siteIsCritical == False:   # Site not ready to be critical service
        returnCode = NAG_WARNING
318
           
319
    # Setup CMSSW, run command
320
    cmssw_file="/store/mc/SAM/GenericTTbar/AODSIM/CMSSW_9_0_0_90X_mcRun1_realistic_v4-v1/10000/28B9D1FB-8B31-E711-AA4E-0025905B85B2.root"
321
322
323
    xrootd_file="root://cms-xrd-global.cern.ch//store/test/xrootd/%s/%s" % (siteName, cmssw_file)
    log.info("Xrootd file we will test: %s" % xrootd_file)

324
    shutil.rmtree("xrootd-access", 1)
325
326
327
    try:
        os.mkdir("xrootd-access")
    except OSError, oe:
328
        return print_summary("Failure to create test directory 'xrootd-access': %s." % str(oe), returnCode)
329
330
    os.chdir("xrootd-access")

331
332
    cmssw_version="CMSSW_9_0_0"
    stdout, exitCode = runCommand("scramv1", ["-a", scram_arch, "p", "CMSSW", cmssw_version], opts, combineStd=True)
333
334
335
    if stdout.strip():
        log.info("scramv1 p CMSSW %s output:" % cmssw_version)
        print stdout
336
    if exitCode:
337
        return print_summary("Cannot make %s release area (SCRAM_ARCH=%s)" % (cmssw_version, scram_arch), returnCode)
338
    log.info("scramv1 p CMSSW %s successful", cmssw_version)
339
340
341
342

    os.chdir(cmssw_version)

    fd = open("test_xrootd.py", "w")
343
344
345
346
    level = 1
    if opts.verbose:
        level = 2
    fd.write(cms_file % (xrootd_file, level))
347
348
349
350
    fd.close()

    exitCode = evalCommand("scramv1", ["runtime", "-sh"], opts)
    if exitCode:
351
        return print_summary("Failure when setting up scramv1 runtime (CMSSW %s, SCRAM_ARCH=%s)" % (cmssw_version, scram_arch), returnCode)
352

353
    stdout, exitCode = runCommand("xrdcp", ["-d 1", "-f", "root://cms-xrd-global.cern.ch:1094//tmp/xrootd-testfile.xrdcmsglobal01.cern.ch", "/dev/null"], opts, combineStd=True)
354
355
    if exitCode:
        print stdout
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
        return print_summary("Global redirector error", NAG_OK)
    log.info("Global redirector check OK")

    retry = 0
    while retry<=1:
        stdout, exitCode = runCommand("cmsRun", ["test_xrootd.py"], opts, combineStd=True)
        stdout = stdout.splitlines()
        filtered_stdout = []
        for line in stdout:
            if line.find("10 occurrences of key") >= 0:
                continue
            filtered_stdout.append(line)
        stdout = '\n'.join(filtered_stdout)

        maxlen = 12*1024;
        if opts.verbose:
            maxlen = 50*1024

        if len(stdout) > maxlen:
            stdout = "cmsRun output truncated - only last %d KB shown:\n" % (maxlen/1024) + stdout[-(maxlen):]
        if exitCode == 0:
            log.info("Successful cmsRun.  Output:")
            print stdout
            # Return the correct exit code.
            if exitcode != NAG_OK:
                log.warning("Execution contains warnings")
            else:
                log.info("Success!")
            return exitcode
        else:
            log.error("Failed cmsRun on attempt %d. Output:" % (retry+1))
            print stdout
            retry+=1
            # Extend allowed time for the next try
            opts.endtime = opts.endtime + opts.timeout
            if retry > 1:
                return print_summary("Failed cmsRun: exit code %d" % exitCode, returnCode)
393
394
395

if __name__ == '__main__':
    sys.exit(main())