######################################################################## # File: CoolTestExtensions.py # Author: Marco Clemencic CERN/PH-LBC # Date: 16/11/2005 # # Contents: # TemporaryEnvironment: class to change the environment and revert to # the original one # # DBPreparer: handle the details for temporary test databases # # StandardTest: extension to ShellCommandTest that finds Python # scripts in the PATH ad run them through the # interpreter # # DatabaseTest: run a test that needs a database # # SourceTargetTest: run a test that needs 2 databases # (source and target) # # Copyright (c) 2003 by CodeSourcery, LLC. All rights reserved. # ######################################################################## __author__ = 'Marco Clemencic CERN/PH-LBC' __version__ = "$Revision: 1.153 $" __tag__ = "$Name: not supported by cvs2svn $" ######################################################################## # Imports ######################################################################## #import dircache import os import qm import sys import platform import re import tempfile import shutil import string from subprocess import Popen, PIPE, STDOUT from qm.test.classes.command import ExecTestBase, ShellCommandTest from time import ctime, time, sleep ######################################################################## # Classes ######################################################################## class TemporaryEnvironment: """ Class to changes the environment temporarily. """ def __init__(self, orig = os.environ): """ Create a temporary environment on top of the one specified (it can be another TemporaryEnvironment instance). """ #print "New environment" self.changes = {} self.env = orig def __setitem__(self,key,value): """ Set an environment variable recording the previous value. """ if key not in self.changes : if key in self.env : self.changes[key] = self.env[key] else: self.changes[key] = None self.env[key] = value def __getitem__(self,key): """ Get an environment variable. Needed to provide the same interface as os.environ. """ return self.env[key] def __delitem__(self,key): """ Unset an environment variable. Needed to provide the same interface as os.environ. """ if key not in self.env : raise KeyError(key) self.changes[key] = self.env[key] del self.env[key] def keys(self): """ Return the list of defined environment variables. Needed to provide the same interface as os.environ. """ return self.env.keys() def items(self): """ Return the list of (name,value) pairs for the defined environment variables. Needed to provide the same interface as os.environ. """ return self.env.items() def __contains__(self,key): """ Operator 'in'. Needed to provide the same interface as os.environ. """ return key in self.env def restore(self): """ Revert all the changes done to the orignal environment. """ for key,value in self.changes.items(): if value is None: del self.env[key] else: self.env[key] = value self.changes = {} def __del__(self): """ Revert the changes on destruction. """ #print "Restoring the enviroment" self.restore() def gen_script(self,shell_type): """ Generate a shell script to reproduce the changes in the environment. """ shells = [ 'csh', 'sh', 'bat' ] if shell_type not in shells: raise RuntimeError("Shell type '%s' unknown. Available: %s"%(shell_type,shells)) out = "" for key,value in self.changes.items(): if key not in self.env: # unset variable if shell_type == 'csh': out += 'unsetenv %s\n'%key elif shell_type == 'sh': out += 'unset %s\n'%key elif shell_type == 'bat': out += 'set %s=\n'%key else: # set variable if shell_type == 'csh': out += 'setenv %s "%s"\n'%(key,self.env[key]) elif shell_type == 'sh': out += 'export %s="%s"\n'%(key,self.env[key]) elif shell_type == 'bat': out += 'set %s=%s\n'%(key,self.env[key]) return out class CoolTestError(RuntimeError): _message = "" def __init__(self,msg=None): _msg = self._message if msg is not None: _msg += " (%s)"%msg RuntimeError.__init__(self,_msg) class DropDBError(CoolTestError): _message = "Errors while dropping the DB" def __init__(self,msg=None): CoolTestError.__init__(self,msg) class DropDBTimeOutError(CoolTestError): _message = "Time-out exceeded when dropping the DB" def __init__(self,msg=None): CoolTestError.__init__(self,msg) class PrepareEnvError(CoolTestError): _message = "Cannot prepare the environment" def __init__(self,msg=None): CoolTestError.__init__(self,msg) class DBPreparer: package_names =[ "RelationalCool", "PyCool", "PyCoolUtilities", "CoolKernel", "CoolApplication" ] backend_names = [ "Oracle", "SQLite", "MySQL", "Frontier", "FrontierCache", "CoralServer-Oracle", "CoralServer-MySQL" ] _DropDBTimeOut = 1200 # max. number of seconds a coolDropDb can run def __init__(self, package, backend, name, env, variable_name): #print "__init__",package, backend, name, env, variable_name self._initialized = False self.name = name self.tempdir = None self.env = env self.variable_name = variable_name #print package, self.package_names if package in self.package_names: self.package = package else: raise RuntimeError("Package '%s' unknown"%package) #print backend, self.backend_names if backend in self.backend_names: self.backend = backend else: raise RuntimeError("Backend type '%s' unknown"%backend) self._initialized = True self.env[variable_name] = self.ConnectionString() ## Not needed any more (task #3442) ##if self.backend == "Frontier": ## self.env[variable_name+"_R"] = self.ConnectionString().replace(self.backend,self.backend + "R") ###self.DropDB() # MOVED TO DatabaseTest.Run and SourceTargetTest.Run def SQLiteFilename(self): if self.tempdir is None: self.tempdir = tempfile.mkdtemp() return os.path.join(self.tempdir,"sqliteTest-%s.db"%(self.name)) def ConnectionString(self): """Generate the connection string to use to connect to the DB""" if self.backend == "SQLite": return "sqlite://;schema=%s;dbname=%s"%(self.SQLiteFilename(),self.name) # With COOL >= 2.0.0 we can use the following #return "sqlite_file:%s/%s"%(self.SQLiteFilename(),self.name) else: # Override default by $COOL_QMTEST_USER if defined (CORALCOOL-2888) if "COOL_QMTEST_USER" in self.env: user=self.env["COOL_QMTEST_USER"] # AV Fix for bug #28349 (use USERNAME instead of USER on Windows) elif "USER" in self.env: user=self.env["USER"] else: user=self.env["USERNAME"] return "COOL-%s-%s/%s"%(self.backend,user,self.name) def DropDB(self): def doKill(proc): """Function for a platform independent kill of a process. The argument must be a Popen object. """ if sys.platform.startswith("win"): from win32api import TerminateProcess TerminateProcess(proc._handle,-9) else: from signal import SIGKILL os.kill(proc.pid, SIGKILL) message = "" if self._initialized: # remove pending databases if self.backend == "SQLite": filename = self.SQLiteFilename() message = "DropDB (remove file %s)"%filename if os.path.exists(filename): os.remove(filename) else: # MCl: This does not work if the output is too much and I do # not read it; unfortunately it is difficult to have a # non-blocking read to make the check of the timeout possible. # @TODO: (MCl) find a better solution of coolDropDB timeout #dropDB = Popen(["coolDropDB",self.ConnectionString()], # stdout=PIPE,stderr=PIPE) devnull = "/dev/null" if sys.platform.startswith("win"): devnull = "nul" message = "DropDB (coolDropDB %s)"%self.ConnectionString() ###sys.stdout.write("-> DROPDB...\n") if platform.system() == 'Darwin': # Workaround for SIP on MacOSX (CORALCOOL-2884) # Use own copy of /bin/bash instead of defult /bin/sh dropDB = Popen("coolDropDB %s"%(self.ConnectionString()), stdout=PIPE, stderr=STDOUT, shell=True, executable="bash") else: ###dropDB = Popen("coolDropDB %s > %s"%(self.ConnectionString(),devnull), stdout=PIPE, stderr=PIPE, shell=True) dropDB = Popen("coolDropDB %s"%(self.ConnectionString()), stdout=PIPE, stderr=STDOUT, shell=True) # AV dump output if it fails (bug #91006) start_time = time() while dropDB.poll() is None: if (time()-start_time) > self._DropDBTimeOut: doKill(dropDB) dropDB.wait() raise DropDBTimeOutError("%ds, %s"%(self._DropDBTimeOut, self.ConnectionString())) sleep(1) if dropDB.wait() != 0: raise DropDBError( "ERROR! coolDropDB returned an error code\n" + dropDB.stdout.read() ) ###sys.stdout.write("-> DROPDB... DONE\n") return "\n"+message+"\n" def __del__(self): ###try: ### if "COOL_QMTEST_SKIPEVOLVE" not in env : ### self.DropDB() ###except Exception,x: ### # ignore failures of drop DB on exit: it is less important ### pass if self.tempdir is not None: try: ###print "WARNING: Remove temp dir '%s'"%self.tempdir shutil.rmtree(self.tempdir) pass except: print "WARNING: Could not remove temp dir '%s'"%self.tempdir pass self.tempdir = None class StandardTest(ShellCommandTest): """Extend standard QMTest shell command test to run python tests as well as normal tests. Modify ValidateOutput to avoid the stdout check.""" # extra arguments arguments = [ qm.fields.TextField( name = "unsupported_platforms", title = "Unsupported Platforms", description = """Comma-separated list of regular expression matching the platforms on which the test cannot be executed.""" ), ] def SupportedPlatform(self, context, result): platform = self.GetPlatform() unsupported = [ re.compile(x.strip()) for x in str(self.unsupported_platforms).split(',') if x.strip() ] for p_re in unsupported: if p_re.search(platform): result.SetOutcome(result.UNTESTED) result[result.CAUSE] = 'Platform not supported.' return False return True def GetPlatform(self): """ Return the platform Id defined in BINARY_TAG. """ arch = "None" # check architecture name if "BINARY_TAG" in os.environ: arch = os.environ["BINARY_TAG"] return arch def Run(self, context, result): """Check if we want to run a python test.""" # check if the test can be run on the current platform if not self.SupportedPlatform(context, result): return original_command = self.command is_python_script = False if self.command.endswith(".py"): is_python_script = True # We need to run a python test and not a standard executable for d in os.environ["PATH"].split(os.path.pathsep): if os.path.isdir(d) and self.command in os.listdir(d): # we found the script in the path self.command = 'python "%s"'%os.path.join(d,self.command) break #print self.command # Test in valgrind if CORAL_TESTSUITE_VALGRIND is set (task #12670) if 'CORAL_TESTSUITE_VALGRIND' in os.environ: # Add the valgrind suppression file from CORAL valgrindSupp = '' if 'CORAL_TESTSUITE_VALGRIND_SUPP' in os.environ: valgrindSupp = ' --suppressions=' + os.path.abspath( os.environ['CORAL_TESTSUITE_VALGRIND_SUPP'] ) # Define the path for the output valgrind log if 'CORAL_TESTSUITE_VALGRIND_LOGDIR' in os.environ: valgrindLog = os.environ['CORAL_TESTSUITE_VALGRIND_LOGDIR'] else: valgrindLog = '../../logs/qmtestCoral/valgrind' # Old default # Option 1 - allow a single platform for valgrind tests ###if os.environ['CMTCONFIG'] != 'x86_64-slc5-gcc43-dbg': ### result.SetOutcome(result.UNTESTED) ### result[result.CAUSE] = 'Use another platform for valgrind.' ### return # Option 2 - use a different log directory per platform valgrindLog = valgrindLog+'/'+os.environ['CMTCONFIG'] valgrindLog = os.path.abspath( valgrindLog ) try: os.mkdir( valgrindLog ) except OSError: pass valgrindLog = valgrindLog+'/valgrind.'+context[context.ID_CONTEXT_PROPERTY]+'.txt' # Add valgrind suppressions for ROOT if found (see ROOT-7510) if 'ROOTSYS' in os.environ: rootSupp = os.environ['ROOTSYS'] + '/etc/valgrind-root.supp' if os.path.isfile( rootSupp ): valgrindSupp += ' --suppressions=' + rootSupp # Redefine the valgrind command # [NB ROOT team (ROOT-7510) recommend --show-reachable=no instead] self.command = 'valgrind -v --leak-check=full --show-reachable=yes --error-limit=no --log-file='+valgrindLog+valgrindSupp+' --gen-suppressions=all --num-callers=50 --track-origins=yes '+self.command # Else debug through gdb if COOL_DEBUG is set elif 'COOL_DEBUG' in os.environ \ and os.environ['COOL_DEBUG'].upper() in ['1','Y','YES','ON','GDB'] \ and not sys.platform.startswith("darwin"): if sys.platform.startswith("win"): dbg_window = "cmd " else: dbg_window = "xterm -e " if is_python_script: if os.environ['COOL_DEBUG'].upper() != 'GDB': self.command = self.command.replace("python", "python -m pdb") else: print "Command to execute: %s"%self.command self.command = 'gdb python' elif sys.platform.startswith("linux"): self.command = 'gdb %s'%self.command elif sys.platform.startswith("win"): # On vc7: 'devenv /debug %s' # On vc9: 'vcexpress /debug %s' self.command = 'devenv /debug %s'%self.command self.command = dbg_window + self.command self.timeout = -1 # We can use the environment variable COOL_IGNORE_TIMEOUT # to ignore all the time-outs for manual tests. # See discussion in bug #28400. # (Note: the time-out on coolDropDB is always active) if 'COOL_IGNORE_TIMEOUT' in os.environ: self.timeout = -1 # sometimes (nightlies) the env variable "OS" is not set if "OS" not in os.environ: if sys.platform.startswith("win"): os.environ["OS"] = 'Windows_NT' elif sys.platform.startswith("linux"): os.environ["OS"] = 'Linux' elif sys.platform.startswith("darwin"): os.environ["OS"] = 'Darwin' else: # assume linux for unknows platform IDs os.environ["OS"] = 'Linux' # @todo: This is a work-around for bug #27650 # module resource is available only on Unix if not sys.platform.startswith("win"): from resource import setrlimit,RLIMIT_NOFILE setrlimit(RLIMIT_NOFILE,(1024,1024)) # unset special DEBUG environment variables for v in [ 'COOL_TIMINGREPORT', 'COOL_COOLCHRONO_PROCMEMORY', 'COOL_210ITERATOR', 'COOL_PDBSTRESSTEST_NBYTES', 'CORAL_TESTSUITE_SLEEPFOR01466', 'CORAL_TESTSUITE_SLEEPFOR01466_PREFIX']: if v in os.environ : del os.environ[v] # unset COOLTESTDB_R environment variable (task #3442) for v in [ 'COOLTESTDB_R' ]: if v in os.environ : del os.environ[v] # unset special Oracle environment variables: this should # prevent errors like 'SP2-0310 unable to access glogin.sql' for v in [ 'ORACLE_HOME' ]: if v in os.environ : del os.environ[v] # Workaround for ORA-01466 on Oracle 11g (bug #89735) if isinstance( self, SourceTargetTest ): os.environ[ 'CORAL_TESTSUITE_SLEEPFOR01466' ] = '1' os.environ[ 'CORAL_TESTSUITE_SLEEPFOR01466_PREFIX' ] = self.DBName(DatabaseTest.TARGET_DB) elif isinstance( self, DatabaseTest ): os.environ[ 'CORAL_TESTSUITE_SLEEPFOR01466' ] = '1' os.environ[ 'CORAL_TESTSUITE_SLEEPFOR01466_PREFIX' ] = self.DBName(DatabaseTest.DEFAULT_DB) else: pass # no need to set CORAL_TESTSUITE_SLEEPFOR01466xxx # Workaround for ORA-04031 on Oracle 11g (bug #94270) os.environ[ 'CORAL_ORA_DISABLE_OPT_DYN_SAMP' ] = '1' # Workaround for slow SQLite commits (CORALCOOL-2922) os.environ[ 'CORAL_SQLITE_SYNCHRONOUS_OFF' ] = '1' # Bypass tests that are expected to fail with ROOT6 (CORALCOOL-2741) # [These are now set in config/cmt/requirements] ###os.environ[ 'COOL_PYCOOLTEST_SKIP_EXCEPTIONS' ] = '1' ###os.environ[ 'COOL_PYCOOLTEST_SKIP_ROOT6927' ] = '1' # dump software versions and other useful env on a test by test basis # [NB CORALSYS and COOLSYS _only_ exist for this reason!] for vname in "QMTEST_CLASS_PATH", "COOLSYS", "CORALSYS", "ROOTSYS", : if vname in os.environ: result['CoolTest.%s'%vname] = os.path.abspath(os.environ[vname]) for vname in "COOL_QMTEST_USER", : if vname in os.environ: result['CoolTest.%s'%vname] = os.environ[vname] # Workaround for System Integrity Protection (CORALCOOL-2884) # Use own copy of /bin/bah to bypass SIP and keep DYLD_LIBRARY_PATH if platform.system() == 'Darwin': context["ShellCommandTest.command_shell"] = "bash -norc -noprofile \ -c" ###sys.stdout.write("-> RUN: "+self.command+"\n") result['CoolTest.command'] = self.command ShellCommandTest.Run(self,context,result) # In some cases on win the test succeed, but the exit code is not 0 if sys.platform.startswith("win"): if result['ExecTest.exit_code'] != '0': # check for the oval m = re.search('\[OVAL\] Cppunit-result =([0-9])',result['ExecTest.stdout']) if m: if m.group(1) == '0': result['ExecTest.original_exit_code'] = result['ExecTest.exit_code'] result['ExecTest.exit_code'] = '0' result.SetOutcome(result.PASS) else: # check for an OK alone on a line (python unittest) m = re.search('\r\nOK\r\n',result['ExecTest.stdout']) if m: result['ExecTest.original_exit_code'] = result['ExecTest.exit_code'] result['ExecTest.exit_code'] = '0' result.SetOutcome(result.PASS) self.command = original_command if result.GetOutcome() not in [ result.PASS ]: self.DumpEnvironment(result) def DumpEnvironment(self,result): vars = os.environ.keys() vars.sort() result['COOLTest.environment'] = \ result.Quote('\n'.join(["%s=%s"%(v,os.environ[v]) for v in vars])) # Convert binary characters on stdout to utf8 (fix bug #54882) # http://www.pyzine.com/Issue008/Section_Articles/article_Encodings.html # Do not attempt to encode all chars: just try to avoid UnicodeEncodeError! def utf8OrSkip(self,text): utf8 = "" for i in text: try: utf8 += i.encode('utf8') except: utf8 += "X" return utf8 # Remove lines matching a given pattern def grepNoMatch(self,text,pattern): outcome = "" for line in text.split('\n'): # Fix bug #77517 ('\n' instead of '\r\n') ###sys.stdout.write("-> GREP.LINE: '"+line+"'\n") if line != "": m = re.search(pattern,line) if not m: outcome = outcome + line + '\n' return outcome def RunProgram(self, program, arguments, context, result): """Run the 'program'. 'program' -- The path to the program to run. 'arguments' -- A list of the arguments to the program. This list must contain a first argument corresponding to 'argv[0]'. 'context' -- A 'Context' giving run-time parameters to the test. 'result' -- A 'Result' object. The outcome will be 'Result.PASS' when this method is called. The 'result' may be modified by this method to indicate outcomes other than 'Result.PASS' or to add annotations. @attention: This method has been copied from command.ExecTestBase (QMTest 2.3.0) and modified to keep stdout and stderr for tests that have been terminated by a signal. (Fundamental for debugging in the Application Area) """ # Initialise result['ExecTest.stdout'] so that we can append to it try: dummy = result['ExecTest.stdout'] except: result['ExecTest.stdout'] = "" # Construct the environment. environment = self.MakeEnvironment(context) # Create the executable. if self.timeout >= 0: # Hardcode a minimum timeout of 10 minutes timeout = max(600,self.timeout) else: # If no timeout was specified, we sill run this process in a # separate process group and kill the entire process group # when the child is done executing. That means that # orphaned child processes created by the test will be # cleaned up. timeout = -2 e = qm.executable.Filter(self.stdin, timeout) # Run it. exit_status = e.Run(arguments, environment, path = program) # If the process terminated normally, check the outputs. if sys.platform.startswith("win") or os.WIFEXITED(exit_status): # There are no causes of failure yet. causes = [] # The target program terminated normally. Extract the # exit code, if this test checks it. if self.exit_code is None: exit_code = None elif sys.platform.startswith("win"): exit_code = exit_status else: exit_code = os.WEXITSTATUS(exit_status) # Get the output generated by the program. stdout = e.stdout stderr = e.stderr ###sys.stdout.write("-> RUN.STDERR (1): '"+stderr+"'\n") # Remove expected lines from stdout ###stdout = self.grepNoMatch(stdout,pattern) # Remove expected lines from stderr # -> Ignore warnings from the ROOT signal handler (see task #6919) pattern = "from LD_PRELOAD cannot be preloaded: ignored." stderr = self.grepNoMatch(stderr,pattern) ###sys.stdout.write("-> RUN.STDERR (2): '"+stderr+"'\n") # Record the results. result["ExecTest.exit_code"] = str(exit_code) result["ExecTest.stdout"] += self.utf8OrSkip(result.Quote(stdout)) result["ExecTest.stderr"] = result.Quote(stderr) # Check to see if the exit code matches. if exit_code != self.exit_code: causes.append("exit_code") result["ExecTest.expected_exit_code"] = str(self.exit_code) # Validate the output. causes += self.ValidateOutput(stdout, stderr, result) # If anything went wrong, the test failed. if causes: result.Fail("Unexpected %s." % string.join(causes, ", ")) elif os.WIFSIGNALED(exit_status): # The target program terminated with a signal. Construe # that as a test failure. signal_number = str(os.WTERMSIG(exit_status)) result.Fail("Program terminated by signal.") result["ExecTest.signal_number"] = signal_number result["ExecTest.stdout"] += result.Quote(e.stdout) result["ExecTest.stderr"] = result.Quote(e.stderr) elif os.WIFSTOPPED(exit_status): # The target program was stopped. Construe that as a # test failure. signal_number = str(os.WSTOPSIG(exit_status)) result.Fail("Program stopped by signal.") result["ExecTest.signal_number"] = signal_number result["ExecTest.stdout"] += result.Quote(e.stdout) result["ExecTest.stderr"] = result.Quote(e.stderr) else: # The target program terminated abnormally in some other # manner. (This shouldn't normally happen...) result.Fail("Program did not terminate normally.") def ValidateOutput(self, stdout, stderr, result): """Copy of ExectTestBase.ValidateOutput, modified to skip stdout check. Replaces previous version ovearloading base class (see CORALCOOL-2114). Needed to call private __CompareText for Mac 10.11 (CORALCOOL-2889). Validate the output of the program. 'stdout' -- A string containing the data written to the standard output stream. 'stderr' -- A string containing the data written to the standard error stream. 'result' -- A 'Result' object. It may be used to annotate the outcome according to the content of stderr. returns -- A list of strings giving causes of failure.""" causes = [] # Check to see if the standard output matches. ###if not self.__CompareText(stdout, self.stdout): ### causes.append("standard output") ### result["ExecTest.expected_stdout"] = result.Quote(self.stdout) # Check to see if the standard error matches. if not self.__CompareText(stderr, self.stderr): causes.append("standard error") result["ExecTest.expected_stderr"] = result.Quote(self.stderr) return causes def __CompareText(self, s1, s2): # Called by ValidateOutput """Compare 's1' and 's2', ignoring line endings. 's1' -- A string. 's2' -- A string. returns -- True if 's1' and 's2' are the same, ignoring differences in line endings. This is the standard mode in which QMTest works """ # The "splitlines" method works independently of the line ending # convention in use. if platform.system() != 'Darwin': return s1.splitlines() == s2.splitlines() else: # Workaround for CFPasteboardRef issues on Mac (CORALCOOL-2889) s1s = s1.splitlines() s2s = s2.splitlines() for l in list(s1s): if 'CFPasteboardRef CFPasteboardCreate(CFAllocatorRef, CFStringRef) : failed to create global data' in l: s1s.remove(l) for l in list(s2s): if 'CFPasteboardRef CFPasteboardCreate(CFAllocatorRef, CFStringRef) : failed to create global data' in l: s2s.remove(l) return s1s == s2s class DatabaseTest(StandardTest): """ This class adds to the standard QMTest shell command test the possibility of setting the environment needed to run COOL tests. """ # extra arguments arguments = [ qm.fields.EnumerationField( name = "db_type", title = "Database type", default_value = "SQLite", enumerals = DBPreparer.backend_names, description = """The type of database the test will access""" ), qm.fields.EnumerationField( name = "package_name", title = "Name of the package being tested", enumerals = DBPreparer.package_names, default_value = "RelationalCool", description = """Wether the test needs 2 databases, one as source and one as target.""" ) ] # database type of names DEFAULT_DB = 0 SOURCE_DB = 1 TARGET_DB = 2 def DBName(self, type = 0): """Generate the conventional db name for tests.""" # Special naming conventions for nightlies: hash of (path+config) # - tests use different names for different slots # (slot is in path, eg /build/jenkins/workspace/lcg_ext_dev4/...) # - tests use different names for different platforms # (platform is also in path for lcgcmake, but was not when using CMT) id = os.getcwd() + self.GetPlatform() # The first letter is because a DBNAME cannot start with a number if type == DatabaseTest.SOURCE_DB: name = 'S' elif type == DatabaseTest.TARGET_DB: name = 'T' else: name = 'C' name += hex(abs((id).__hash__()))[-7:].upper() return name def PrepareEnv(self,env): # Environment for the nightlies (user sftnight) - START # Drop old user lcgnight, keep only sftnight (see CORALCOOL-2191) if sys.platform.startswith("win") : raise RuntimeError("Windows is no longer supported!") elif sys.platform.startswith("darwin") and "USER" in env and \ env["USER"] == "sftnight" : # Use credentials from $HOME/private on Mac (CORALCOOL-1083) env["CORAL_AUTH_PATH"] = os.path.join(env["HOME"],"private") env["CORAL_DBLOOKUP_PATH"] = env["CORAL_AUTH_PATH"] elif platform.machine() == "aarch64" and "USER" in env and \ env["USER"] == "sftnight" : # Use credentials from $HOME/private on ARM (CORALCOOL-2857) env["CORAL_AUTH_PATH"] = os.path.join(env["HOME"],"private") env["CORAL_DBLOOKUP_PATH"] = env["CORAL_AUTH_PATH"] elif sys.platform.startswith("linux") and "USER" in env and \ env["USER"] == "sftnight" : # Use credentials from POOL AFS area on Linux (CORALCOOL-1083) env["CORAL_AUTH_PATH"] = "/afs/cern.ch/sw/lcg/app/pool/db" env["CORAL_DBLOOKUP_PATH"] = env["CORAL_AUTH_PATH"] # Environment for the nightlies (user sftnight) - END # Path to authentication.xml (CORAL_AUTH_PATH) if "CORAL_AUTH_PATH" not in env: # On Win32, variable HOME is not set, so we set AFSHOME ourselves. possible_home_vars = ["AFSHOME", "HOME"] for n in possible_home_vars: if n in env: env["CORAL_AUTH_PATH"] = os.path.join(env[n],"private") break # We didn't manage to set CORAL_AUTH_PATH: do we really need it? if "CORAL_AUTH_PATH" not in env: if self.db_type not in [ "SQLite", "Frontier", "FrontierCache" ]: # All back-ends different from SQLite and Frontier need it raise PrepareEnvError("Cannot set CORAL_AUTH_PATH") # Path to dblookup.xml (CORAL_DBLOOKUP_PATH) if "CORAL_DBLOOKUP_PATH" not in env: env["CORAL_DBLOOKUP_PATH"] = env["CORAL_AUTH_PATH"] # Path to tnsnames.ora - this should not be needed? (CORALCOOL-2756) if self.db_type in [ "Oracle", "Frontier", "FrontierCache" ] : if "TNS_ADMIN" not in env: env["TNS_ADMIN"] = "/afs/cern.ch/sw/lcg/app/releases/CORAL/internal/oracle/admin" # Test Oracle threads on slc5 #if self.GetPlatform().find("i686-slc5-gcc34") >= 0 : # env["CORAL_ORA_NO_OCI_THREADED"] = "1" #elif self.GetPlatform().find("i686-slc5-gcc43") >= 0 : # env["CORAL_ORA_OCI_NO_MUTEX"] = "1" # Signal handler (either COOL or ROOT - not both!) useRootSignalHandler = False if sys.platform.startswith("linux") or \ sys.platform.startswith("darwin") : useRootSignalHandler = True # ROOT signal handle on linux/darwin if not useRootSignalHandler: env["COOL_ENABLE_COOLSIGNALHANDLER"] = "1" if "LD_PRELOAD" in env: del env["LD_PRELOAD"] else: if "COOL_ENABLE_COOLSIGNALHANDLER" in env: del env["COOL_ENABLE_COOLSIGNALHANDLER"] env["LD_PRELOAD"] = "libCore.so:libdl.so" def Run(self, context, result): """Prepare the environment for the test and run it.""" # check if the test can be run on the current platform if not self.SupportedPlatform(context, result): return env = TemporaryEnvironment() try: self.PrepareEnv(env) db = DBPreparer(self.package_name,self.db_type,self.DBName(),env,"COOLTESTDB") result['COOLTest.connection_string'] = db.ConnectionString() ###sys.stdout.write(str(ctime())+" DROPDB...\n") result['ExecTest.stdout'] = db.DropDB() ###sys.stdout.write(str(ctime())+" DROPDB... DONE\n") ###sys.stdout.write(str(ctime())+" RUN...\n") StandardTest.Run(self,context,result) ###sys.stdout.write(str(ctime())+" RUN... DONE\n") ###sys.stdout.write(str(ctime())+" DROPDB...\n") result['ExecTest.stdout'] += db.DropDB() ###sys.stdout.write(str(ctime())+" DROPDB... DONE\n") except CoolTestError,x: result.SetOutcome(result.ERROR) result[result.CAUSE] = str(x) self.DumpEnvironment(result) class SourceTargetTest(DatabaseTest): """ This class extends COOLTest to use two databases instead of one. """ def Run(self, context, result): """Prepare the environment for the test and run it.""" # check if the test can be run on the current platform if not self.SupportedPlatform(context, result): return env = TemporaryEnvironment() # This tries to have a different dbname for different # versions of COOL, so that we can test 2 versions in parallel # (mainly for the nightlies) version_hash = hex(abs(__tag__.__hash__()))[-2:].upper() try: self.PrepareEnv(env) db_src = DBPreparer(self.package_name,self.db_type, self.DBName(DatabaseTest.SOURCE_DB), env,"COOLTESTDB_SOURCE") db_tgt = DBPreparer(self.package_name,self.db_type, self.DBName(DatabaseTest.TARGET_DB), env,"COOLTESTDB_TARGET") result['ExecTest.stdout'] = db_src.DropDB() result['ExecTest.stdout'] += db_tgt.DropDB() result['COOLTest.src_connection_string'] = db_src.ConnectionString() result['COOLTest.tgt_connection_string'] = db_tgt.ConnectionString() StandardTest.Run(self,context,result) result['ExecTest.stdout'] += db_src.DropDB() result['ExecTest.stdout'] += db_tgt.DropDB() except CoolTestError,x: result.SetOutcome(result.ERROR) result[result.CAUSE] = str(x) self.DumpEnvironment(result)