Newer
Older

Joerg Stelzer
committed
#!/usr/bin/env python
from __future__ import print_function
# Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration
from __future__ import print_function

Stewart Martin-Haugh
committed
from future import standard_library
standard_library.install_aliases()
from builtins import map
from builtins import str
from builtins import zip
from builtins import object

Joerg Stelzer
committed
from AthenaCommon.Logging import logging
import time

Joerg Stelzer
committed
# useFrontierClient
# True: one uses the python bindings of frontier_client from the TrigConfDBConnection package
# False: one uses a purely python-based implementation
useFrontierClient = False
def getServerUrls(frontier_servers):
"""
turns
'(serverurl=http://atlasfrontier-local.cern.ch:8000/atlr)(serverurl=http://atlasfrontier-ai.cern.ch:8000/atlr)'
into
['http://atlasfrontier-local.cern.ch:8000/atlr','http://atlasfrontier-ai.cern.ch:8000/atlr']
"""
from re import findall
return findall(r'\(serverurl=(.*?)\)',frontier_servers)
def testUrl(url):
import urllib.request, urllib.error, urllib.parse

Joerg Stelzer
committed
try:
urllib.request.urlopen(url)
except urllib.error.URLError:
return False
return True
def resolveUrl(url):
"""
Expects input string to be a URL or $FRONTIER_SERVER
Returns an accessible URL or None"""
import re
if re.match("http://",url): # simple URL specification http://...
return url if testUrl(url) else None
if re.match(r'\(serverurl=(.*?)\)',url): # syntax of FRONTIER_SERVER
for url in getServerUrls(url):
if testUrl(url):
return url
return None
def getFrontierCursor(url, schema, loglevel = logging.INFO):
log = logging.getLogger( "TrigConfFrontier.py" )
log.setLevel(loglevel)
url = resolveUrl(url)
if url is None:
log.warning("Cannot find a valid frontier connection, will not return a Frontier cursor")
return None
else:
log.info(f"Will use Frontier server at {url}")
if useFrontierClient:
log.info("Using frontier_client from TrigConfDBConnection")

Joerg Stelzer
committed
return FrontierCursor2( url = url, schema = schema)
else:
log.info("Using a pure python implementation of frontier")

Joerg Stelzer
committed
return FrontierCursor( url = url, schema = schema)
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# used by FrontierCursor2
def resolvebindvars(query, bindvars):
"""Replaces the bound variables :xyz with a ? in the query and
adding the value behind a : at the end"""
log = logging.getLogger( "TrigConfFrontier.py" )
log.info("Query: %s", query)
log.info("bound variables: %r", bindvars)
import re
varsextract = re.findall(':([A-z0-9]*)',query)
values = list(map(bindvars.get, varsextract))
log.debug("Resolving bound variable %r with %r", varsextract,values)
appendix = ":".join([str(v) for v in values])
queryWithQuestionMarks = re.sub(':[A-z0-9]*','?', query)
query = queryWithQuestionMarks + ':' + appendix
log.info("Resolved query new style: %s", query)
return query
# used by FrontierCursor
def replacebindvars(query, bindvars):
"""Replaces the bound variables with the specified values,
disables variable binding
"""
log = logging.getLogger( "TrigConfFrontier.py" )
from builtins import int
for var,val in list(bindvars.items()):
if query.find(":%s" % var)<0:
raise NameError("variable '%s' is not a bound variable in this query: %s" % (var, query) )
if isinstance (val, int):
query = query.replace(":%s" % var,"%s" % val)
else:
query = query.replace(":%s" % var,"%r" % val)
log.debug("Resolving bound variable '%s' with %r", var,val)
log.debug("Resolved query: %s", query)
return query

Joerg Stelzer
committed

Stewart Martin-Haugh
committed
class FrontierCursor2(object):

Joerg Stelzer
committed
def __init__(self, url, schema, refreshFlag=False):
log = logging.getLogger( "TrigConfFrontier.py" )
self.url = url
self.schema = schema
self.refreshFlag = refreshFlag
from TrigConfDBConnection import frontier_client as fc
fc.init("PyFrontier","debug")
log.debug("Frontier URL : %s", self.url)
log.debug("Schema : %s", self.schema)
log.debug("Refresh cache : %s", self.refreshFlag)

Joerg Stelzer
committed
def execute(self, query, bindvars={}):
if len(bindvars)>0:
query = resolvebindvars(query,bindvars)

Joerg Stelzer
committed
from TrigConfDBConnection import frontier_client as fc
log = logging.getLogger( "TrigConfFrontier.py" )
log.debug("Executing query : %s", query)

Joerg Stelzer
committed
conn = fc.Connection(self.url)
session = fc.Session(conn)
doReload = self.refreshFlag
conn.setReload(doReload)
queryStart = time.localtime()
log.debug("Query started: %s", time.strftime("%m/%d/%y %H:%M:%S %Z", queryStart))

Joerg Stelzer
committed
t1 = time.time()
req = fc.Request("frontier_request:1:DEFAULT", fc.encoding_t.BLOB)
param = fc.Request.encodeParam(query)
req.addKey("p1",param)
session.getData([req])
t2 = time.time()
#session.printHeader()
#nfield = session.getNumberOfFields()
#print ("\nNumber of fields:", nfield, "\n")

Joerg Stelzer
committed
#nrec = session.getNumberOfRecords()
#print ("\nResult contains", nrec, "objects.\n")

Joerg Stelzer
committed
#session.printRecords2()
queryEnd = time.localtime()
self.result = [r for r in session.getRecords2()]
log.debug("Query ended: %s", time.strftime("%m/%d/%y %H:%M:%S %Z", queryEnd))
log.debug("Query time: %s seconds", (t2-t1))
log.debug("Result size: %i entries", len(self.result))

Joerg Stelzer
committed
def fetchall(self):
return self.result
def __str__(self):
s = "FrontierCursor2:\n"
s += "Using Frontier URL: %s\n" % self.url
s += "Schema: %s\n" % self.schema
s += "Refresh cache: %s" % self.refreshFlag
return s

Joerg Stelzer
committed

Stewart Martin-Haugh
committed
class FrontierCursor(object):

Joerg Stelzer
committed
def __init__(self, url, schema, refreshFlag=False, doDecode=True, retrieveZiplevel="zip"):
self.url = url + "/Frontier"

Joerg Stelzer
committed
self.schema = schema
self.refreshFlag = refreshFlag
self.retrieveZiplevel = retrieveZiplevel
self.doDecode = doDecode
def __str__(self):
s = "Using Frontier URL: %s\n" % self.url
s += "Schema: %s\n" % self.schema
s += "Refresh cache: %s" % self.refreshFlag
return s

Joerg Stelzer
committed
def execute(self, query, bindvars={}):
if len(bindvars)>0:
query = replacebindvars(query,bindvars)

Joerg Stelzer
committed
log = logging.getLogger( "TrigConfFrontier.py" )
log.debug("Frontier URL : %s", self.url)
log.debug("Refresh cache : %s", self.refreshFlag)
log.debug("Query : %s", query)

Joerg Stelzer
committed

Stewart Martin-Haugh
committed
import base64, zlib, urllib.request, urllib.error, urllib.parse, time

Joerg Stelzer
committed
self.result = None

Stewart Martin-Haugh
committed
compQuery = zlib.compress(query.encode("utf-8"),9)
base64Query = base64.binascii.b2a_base64(compQuery).decode("utf-8")
encQuery = base64Query.replace("+", ".").replace("\n","").replace("/","-").replace("=","_")

Joerg Stelzer
committed
frontierRequest="%s/type=frontier_request:1:DEFAULT&encoding=BLOB%s&p1=%s" % (self.url, self.retrieveZiplevel, encQuery)

Stewart Martin-Haugh
committed
request = urllib.request.Request(frontierRequest)

Joerg Stelzer
committed
if self.refreshFlag:
request.add_header("pragma", "no-cache")
frontierId = "TrigConfFrontier 1.0"
request.add_header("X-Frontier-Id", frontierId)
queryStart = time.localtime()
log.debug("Query started: %s", time.strftime("%m/%d/%y %H:%M:%S %Z", queryStart))

Joerg Stelzer
committed
t1 = time.time()

Stewart Martin-Haugh
committed
result = urllib.request.urlopen(request,None,10).read().decode()

Joerg Stelzer
committed
t2 = time.time()
queryEnd = time.localtime()
log.debug("Query ended: %s", time.strftime("%m/%d/%y %H:%M:%S %Z", queryEnd))
log.debug("Query time: %s [seconds]", (t2-t1))
log.debug("Result size: %i [seconds]", len(result))

Joerg Stelzer
committed
self.result = result
def fetchall(self):
if self.doDecode: self.decodeResult()
return self.result
def decodeResult(self):
log = logging.getLogger( "TrigConfFrontier.py" )
from xml.dom.minidom import parseString
import base64, zlib, curses.ascii
#print ("Query result:\n", self.result)

Joerg Stelzer
committed
dom = parseString(self.result)
dataList = dom.getElementsByTagName("data")
keepalives = 0
# Control characters represent records, but I won't bother with that now,
# and will simply replace those by space.
for data in dataList:
for node in data.childNodes:
# <keepalive /> elements may be present, combined with whitespace text
if node.nodeName == "keepalive":
# this is of type Element
keepalives += 1
continue
# else assume of type Text
if node.data.strip() == "":
continue
if keepalives > 0:
print (keepalives, "keepalives received\n")

Joerg Stelzer
committed
keepalives = 0

Stewart Martin-Haugh
committed
row = base64.decodebytes(node.data.encode())

Joerg Stelzer
committed
if self.retrieveZiplevel != "":

Stewart Martin-Haugh
committed
row = zlib.decompress(row).decode("utf-8")
#Hack to get these lines to work in python 2
if sys.version_info[0] < 3:
row = row.encode('ascii', 'xmlcharrefreplace')

Joerg Stelzer
committed
endFirstRow = row.find('\x07')
firstRow = row[:endFirstRow]
for c in firstRow:
if curses.ascii.isctrl(c):
firstRow = firstRow.replace(c, ' ')
fields = [x for i,x in enumerate(firstRow.split()) if i%2==0]
types = [x for i,x in enumerate(firstRow.split()) if i%2==1]
ptypes = []
for t in types:
if t.startswith("NUMBER"):
if ",0" in t:
ptypes.append(int)
else:
ptypes.append(float)
else:
ptypes.append(str)
log.debug("Fields : %r", fields)
log.debug("DB Types : %r", types)
log.debug("Python Types: %r", ptypes)

Joerg Stelzer
committed

Stewart Martin-Haugh
committed
row = row[endFirstRow+1:]

Joerg Stelzer
committed
row_h = row.rstrip('\x07')

Joerg Stelzer
committed
import re
row_h = row_h.replace("\x07\x06",'.nNn.\x06')
# pattern = re.compile("\x06\x00\x00\x00.",flags=re.S)
#replace pattern above more restrictive version, as longerstrings in the results
#have a size variable in the column separate that becomes visible if the string
#is large enough - this then broke the prevous decoding
pattern = re.compile("\x06\x00\x00..",flags=re.S)

Joerg Stelzer
committed
row_h = pattern.sub('.xXx.',row_h)
row_h = row_h.replace("\x86", '.xXx.')
row_h = row_h.split('.nNn.')
row_h = [r.split('.xXx.') for r in row_h]
result = []
for r in row_h:
if r[0]=='': r[0:1]=[]
r = tuple([t(v) for t,v in zip(ptypes,r)])
result.append( r )
self.result = result
def testQuery(query, bindvars):
log = logging.getLogger( "TrigConfFrontier.py" )

Joerg Stelzer
committed
from TriggerJobOpts.TriggerFlags import TriggerFlags as tf
tf.triggerUseFrontier = True
from TrigConfigSvc.TrigConfigSvcUtils import interpretConnection

Stewart Martin-Haugh
committed
connectionParameters = interpretConnection("TRIGGERDBMC")
cursor = getFrontierCursor( url = connectionParameters['url'], schema = connectionParameters['schema'])
cursor.execute(query, bindvars)
log.info("Raw response:")
print(cursor.result)
cursor.decodeResult()
log.info("Decoded response:")
log.info(cursor.result[0][0])
if cursor.result[0][0] != 'MC_pp_v7':
return 1

Joerg Stelzer
committed
return 0
def testBindVarResolution(query, bindvars):
resolvebindvars(query, bindvars)

Stewart Martin-Haugh
committed
return 0

Joerg Stelzer
committed
if __name__=="__main__":
log = logging.getLogger( "TrigConfFrontier.py" )
log.setLevel(logging.DEBUG)
dbalias = "TRIGGERDBMC"
query = "select distinct HPS.HPS_NAME from ATLAS_CONF_TRIGGER_RUN2_MC.HLT_PRESCALE_SET HPS where HPS.HPS_ID = :psk"
bindvars = { "psk": 260 }
res = testBindVarResolution(query, bindvars) # query resolution for c++ frontier client
res = max(res, testQuery(query, bindvars)) # pure python frontier query