diff --git a/Trigger/TrigConfiguration/TrigConfigSvc/python/TrigConfFrontier.py b/Trigger/TrigConfiguration/TrigConfigSvc/python/TrigConfFrontier.py index 3d4f3270f5225ad6682798363fe653d338c7b77a..15c0a7a7f93d80c6f9c4216396d2feac4ad82e81 100755 --- a/Trigger/TrigConfiguration/TrigConfigSvc/python/TrigConfFrontier.py +++ b/Trigger/TrigConfiguration/TrigConfigSvc/python/TrigConfFrontier.py @@ -16,16 +16,78 @@ from AthenaCommon.Logging import logging import time import sys -def getFrontierCursor(url, schema, loglevel = logging.INFO): - log = logging.getLogger( "TrigConfFrontier.py" ) - log.setLevel(loglevel) + +# useFrontierClient +# True: one uses the python bindings of frontier_client from the TrigConfDBConnection package +# False: one uses a purely python-based implementation +useFrontierClient = False + + +def getServerUrls(frontier_servers): + """ + turns + '(serverurl=http://atlasfrontier-local.cern.ch:8000/atlr)(serverurl=http://atlasfrontier-ai.cern.ch:8000/atlr)' + into + ['http://atlasfrontier-local.cern.ch:8000/atlr','http://atlasfrontier-ai.cern.ch:8000/atlr'] + """ + from re import findall + return findall(r'\(serverurl=(.*?)\)',frontier_servers) + + +def testUrl(url): + import urllib.request, urllib.error, urllib.parse try: - from TrigConfDBConnection import frontier_client # noqa: F401 + urllib.request.urlopen(url) + except urllib.error.URLError: + return False + return True + +def getFrontierCursor(url, schema): + if useFrontierClient: + log.info("Using frontier_client from TrigConfDBConnection") return FrontierCursor2( url = url, schema = schema) - except Exception: - log.warning("Couldn't import frontier_client from TrigConfDBConnection, falling back to pure python implementation without proper url resolution") + else: + log.info("Using a pure python implementation of frontier") return FrontierCursor( url = url, schema = schema) +# used by FrontierCursor2 +def resolvebindvars(query, bindvars): + """Replaces the bound variables :xyz with a ? in the query and + adding the value behind a : at the end""" + log = logging.getLogger( "TrigConfFrontier.py" ) + log.info("Query: %s", query) + log.info("bound variables: %r", bindvars) + import re + varsextract = re.findall(':([A-z0-9]*)',query) + values = list(map(bindvars.get, varsextract)) + log.debug("Resolving bound variable %r with %r", varsextract,values) + appendix = ":".join([str(v) for v in values]) + queryWithQuestionMarks = re.sub(':[A-z0-9]*','?', query) + query = queryWithQuestionMarks + ':' + appendix + log.info("Resolved query new style: %s", query) + return query + +# used by FrontierCursor +def replacebindvars(query, bindvars): + """Replaces the bound variables with the specified values, + disables variable binding + """ + log = logging.getLogger( "TrigConfFrontier.py" ) + from builtins import int + for var,val in list(bindvars.items()): + if query.find(":%s" % var)<0: + raise NameError("variable '%s' is not a bound variable in this query: %s" % (var, query) ) + if isinstance (val, int): + query = query.replace(":%s" % var,"%s" % val) + else: + query = query.replace(":%s" % var,"%r" % val) + log.debug("Resolving bound variable '%s' with %r", var,val) + log.debug("Resolved query: %s", query) + return query + + + + class FrontierCursor2(object): def __init__(self, url, schema, refreshFlag=False): @@ -39,41 +101,10 @@ class FrontierCursor2(object): log.debug("Schema : %s", self.schema) log.debug("Refresh cache : %s", self.refreshFlag) - @classmethod - def resolvebindvars(cls, query, bindvars): - """Replaces the bound variables :xyz with a ? in the query and - adding the value behind a : at the end""" - log = logging.getLogger( "TrigConfFrontier.py" ) - import re - varsextract = re.findall(':([A-z0-9]*)',query) - values = list(map(bindvars.get, varsextract)) - log.debug("Resolving bound variable %r with %r", varsextract,values) - appendix = ":".join([str(v) for v in values]) - queryWithQuestionMarks = re.sub(':[A-z0-9]*','?', query) - query = queryWithQuestionMarks + ':' + appendix - return query - - @classmethod - def replacebindvars(cls, query, bindvars): - """Replaces the bound variables with the specified values, - disables variable binding - """ - log = logging.getLogger( "TrigConfFrontier.py" ) - from builtins import int - for var,val in list(bindvars.items()): - if query.find(":%s" % var)<0: - raise NameError("variable '%s' is not a bound variable in this query: %s" % (var, query) ) - if isinstance (val, int): - query = query.replace(":%s" % var,"%s" % val) - else: - query = query.replace(":%s" % var,"%r" % val) - log.debug("Resolving bound variable '%s' with %r", var,val) - log.debug("Resolved query: %s", query) - return query def execute(self, query, bindvars={}): if len(bindvars)>0: - query = FrontierCursor2.resolvebindvars(query,bindvars) + query = resolvebindvars(query,bindvars) from TrigConfDBConnection import frontier_client as fc log = logging.getLogger( "TrigConfFrontier.py" ) @@ -116,54 +147,37 @@ class FrontierCursor2(object): return self.result def __str__(self): - return """FrontierCursor2: -Using Frontier URL: %s -Schema: %s -Refresh cache: %s""" % (self.url, self.schema, self.refreshFlag) + s = "FrontierCursor2:\n" + s += "Using Frontier URL: %s\n" % self.url + s += "Schema: %s\n" % self.schema + s += "Refresh cache: %s" % self.refreshFlag + return s class FrontierCursor(object): def __init__(self, url, schema, refreshFlag=False, doDecode=True, retrieveZiplevel="zip"): - if url.startswith('('): - self.servertype, self.url = FrontierCursor.getServerUrls(url)[0] - self.url += "/Frontier" - else: - self.url = url + self.url = url + "/Frontier" self.schema = schema self.refreshFlag = refreshFlag self.retrieveZiplevel = retrieveZiplevel self.doDecode = doDecode - # connstr='frontier://ATLF/%s;schema=%s;dbname=TRIGCONF' % (connectionParameters['url'],connectionParameters["schema"]) def __str__(self): - return """FrontierCursor: -Using Frontier URL: %s -Refresh cache: %s""" % (self.url, self.refreshFlag) - - @classmethod - def getServerUrls(cls, frontier_servers): - from re import findall - return findall(r'\((serverurl)=(.*?)\)',frontier_servers) - - @classmethod - def testUrl(cls, url): - import urllib.request, urllib.error, urllib.parse - try: - urllib.request.urlopen(url) - except urllib.error.URLError: - import traceback - traceback.print_exc() - + s = "Using Frontier URL: %s\n" % self.url + s += "Schema: %s\n" % self.schema + s += "Refresh cache: %s" % self.refreshFlag + return s + def execute(self, query, bindvars={}): if len(bindvars)>0: - query = FrontierCursor2.replacebindvars(query,bindvars) + query = replacebindvars(query,bindvars) log = logging.getLogger( "TrigConfFrontier.py" ) - log.debug("Using Frontier URL: %s", self.url) - log.debug("Refresh cache : %s", self.refreshFlag) - log.debug("Query : %s", query) + log.debug("Frontier URL : %s", self.url) + log.debug("Refresh cache : %s", self.refreshFlag) + log.debug("Query : %s", query) import base64, zlib, urllib.request, urllib.error, urllib.parse, time @@ -185,6 +199,8 @@ Refresh cache: %s""" % (self.url, self.refreshFlag) queryStart = time.localtime() log.debug("Query started: %s", time.strftime("%m/%d/%y %H:%M:%S %Z", queryStart)) + print(request) + t1 = time.time() result = urllib.request.urlopen(request,None,10).read().decode() t2 = time.time() @@ -261,10 +277,10 @@ Refresh cache: %s""" % (self.url, self.refreshFlag) import re row_h = row_h.replace("\x07\x06",'.nNn.\x06') -# pattern = re.compile("\x06\x00\x00\x00.",flags=re.S) -#replace pattern above more restrictive version, as longerstrings in the results -#have a size variable in the column separate that becomes visible if the string -#is large enough - this then broke the prevous decoding + # pattern = re.compile("\x06\x00\x00\x00.",flags=re.S) + #replace pattern above more restrictive version, as longerstrings in the results + #have a size variable in the column separate that becomes visible if the string + #is large enough - this then broke the prevous decoding pattern = re.compile("\x06\x00\x00..",flags=re.S) row_h = pattern.sub('.xXx.',row_h) row_h = row_h.replace("\x86", '.xXx.') @@ -281,40 +297,46 @@ Refresh cache: %s""" % (self.url, self.refreshFlag) self.result = result -def testConnection(): - log = logging.getLogger( "TrigConfFrontier.py::testConnection()" ) - log.setLevel(logging.DEBUG) + +def testQuery(query, bindvars): + log = logging.getLogger( "TrigConfFrontier.py" ) from TriggerJobOpts.TriggerFlags import TriggerFlags as tf tf.triggerUseFrontier = True from TrigConfigSvc.TrigConfigSvcUtils import interpretConnection connectionParameters = interpretConnection("TRIGGERDBMC") - - cursor = FrontierCursor( url = connectionParameters['url'], schema = connectionParameters['schema']) - - query = "select distinct HPS.HPS_NAME from ATLAS_CONF_TRIGGER_RUN2_MC.HLT_PRESCALE_SET HPS where HPS.HPS_ID = '260'" - - cursor.execute(query) - print(cursor.result) - cursor.decodeResult() - print(cursor.result[0][0]) - assert cursor.result[0][0] == 'MC_pp_v7' - + for url in getServerUrls( connectionParameters['url'] ): + if not testUrl(url): + log.info("Skipping %s (failing connection test)", url) + continue + log.info("Testing %s", url) + cursor = getFrontierCursor( url = url, schema = connectionParameters['schema']) + cursor.execute(query, bindvars) + log.info("Raw response:") + print(cursor.result) + cursor.decodeResult() + log.info("Decoded response:") + log.info(cursor.result[0][0]) + if cursor.result[0][0] != 'MC_pp_v7': + return 1 + break return 0 -def testBindVarResolution(): - query = "SELECT :bar WHERE :foo = :bar sort by :ups asc, :foo" - bindvars = {"foo": 500, "bar": 8, "ups": 42 } - print("Query") - print(query) - print("is translated to") - print(FrontierCursor2.resolvebindvars(query, bindvars)) +def testBindVarResolution(query, bindvars): + resolvebindvars(query, bindvars) return 0 if __name__=="__main__": - res = testBindVarResolution() - res = max(res, testConnection()) + log = logging.getLogger( "TrigConfFrontier.py" ) + log.setLevel(logging.DEBUG) + + dbalias = "TRIGGERDBMC" + query = "select distinct HPS.HPS_NAME from ATLAS_CONF_TRIGGER_RUN2_MC.HLT_PRESCALE_SET HPS where HPS.HPS_ID = :psk" + bindvars = { "psk": 260 } + + res = testBindVarResolution(query, bindvars) # query resolution for c++ frontier client + res = max(res, testQuery(query, bindvars)) # pure python frontier query sys.exit(res)