diff --git a/Database/EventIndex/EventIndexProducer/cmt/requirements b/Database/EventIndex/EventIndexProducer/cmt/requirements new file mode 100644 index 0000000000000000000000000000000000000000..b02ef308fe8658742788bf02ff7f946b870bf8bc --- /dev/null +++ b/Database/EventIndex/EventIndexProducer/cmt/requirements @@ -0,0 +1,12 @@ +package EventIndexProducer + +author Javier.Sanchez@ific.uv.es + +use AtlasPolicy AtlasPolicy-* +use PyJobTransformsCore PyJobTransformsCore-* Tools +use PyJobTransforms PyJobTransforms-* Tools + +apply_pattern declare_python_modules files="*.py" +apply_pattern declare_job_transforms tfs='*.py' jo='*.py' + + diff --git a/Database/EventIndex/EventIndexProducer/python/EI_Lib.py b/Database/EventIndex/EventIndexProducer/python/EI_Lib.py new file mode 100755 index 0000000000000000000000000000000000000000..348c25be16eeda1c2a9569270c43290a6fc46ab8 --- /dev/null +++ b/Database/EventIndex/EventIndexProducer/python/EI_Lib.py @@ -0,0 +1,81 @@ +#!/bin/env python + +# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + +""" EI classes""" + + +class IOV(object): + + _iovs={} + _cached={} + + def __init__(self): + pass + + def add(self,key,value,interval): + if key not in self._iovs: + self._iovs[key]=[] + self._iovs[key].append((interval,value)) + + def dump(self): + for k,v in self._iovs.iteritems(): + print k,v + + def get(self,key,point): + (r,e)=point + if key not in self._iovs: + return None + if key in self._cached: + i,v = self._cached[key] + (r0,e0,r1,e1) = i + if r >= r0 and ( e >= e0 or e0 == 0 ) and \ + r <= r1 and ( e < e1 or e1 == 0 ): + return v + ivalues=self._iovs[key] + for i,v in ivalues: + (r0,e0,r1,e1) = i + if r >= r0 and ( e >= e0 or e0 == 0 ) and \ + r <= r1 and ( e < e1 or e1 == 0 ): + self._cached[key]=(i,v) + return v + + return None + + +class EIrecord(object): + + _version=1 + + _attrs=['RunNumber','EventNumber','LumiBlockN','EventTime','EventTimeNanoSec','EventWeight', + 'McChannelNumber', 'BunchId','Lvl1ID','IsSimulation','IsCalibration','IsTestBeam', + 'L1PassedTrigMask','L2PassedTrigMask','EFPassedTrigMask', 'SMK','HLTPSK','L1PSK', + 'Snam0','Snam1','Snam2','Snam3','Sref0','Sref1','Sref2','Sref3', + ] + + def __init__(self): + for key in self._attrs: + setattr(self,key,None) + pass + + def __setitem__(self, key, value): + if key not in self._attrs: + raise KeyError(key) + setattr(self,key,value) + + def getRec(self): + rec = [] + for attr in self._attrs: + rec.append(getattr(self, attr, None)) + return rec + + def getRecSchema(self): + rec = [] + for attr in self._attrs: + rec.append(attr) + return rec + + def getVersion(self): + return self._version + + diff --git a/Database/EventIndex/EventIndexProducer/python/POOL2EI_Lib.py b/Database/EventIndex/EventIndexProducer/python/POOL2EI_Lib.py new file mode 100755 index 0000000000000000000000000000000000000000..9242e89659e7a2a5f0efa7fbec1203d011c237aa --- /dev/null +++ b/Database/EventIndex/EventIndexProducer/python/POOL2EI_Lib.py @@ -0,0 +1,633 @@ +# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + +# @file POOL2EI_Lib.py +# @purpose provide components to get EventIndex data from pool files +# @author Javier Sanchez +# @date February 2010 +# +# Some code borrowed from PyAthena.FilePeekerLib +# credits to Sebastien Binet + +__version__= "$Revision: 000001 $" +__author__ = "Javier Sanchez" +__doc__ = "provide components to POOL2EI" + + +### imports -------------------------------------------------------------------- +#import pdb +import AthenaPython.PyAthena as PyAthena +from compressB64 import compressB64 +from EI_Lib import EIrecord, IOV +#from pympler import summary +#from pympler import muppy + +import time +StatusCode = PyAthena.StatusCode + + +def _import_ROOT(): + import sys + sys.argv.insert(1, '-b') + import ROOT + del sys.argv[1] + return ROOT + +def toiter(beg,end): + while beg != end: + yield beg.__deref__() + beg.__preinc__() + return + +class sg_versioned_key(object): + def __init__(self, sgkey): + self.raw_key = sgkey + @property + def key(self): + return self.raw_key.split(";")[-1] + +class POOL2EI(PyAthena.Alg): + """Algorithm + """ + _iov = IOV() + _eif = None + _eif_entries = 0 + _eif_totentries = 0 + _eif_nfiles = 0 + + def __init__(self, name='POOL2EI', **kw): + ## init base class + super(POOL2EI, self).__init__(name, **kw) + + _info = self.msg.info + _info("POOL2EI::__init__") + + + + def initialize(self): + + import AthenaPython.PyAthena as PyAthena + _info = self.msg.info + _info("POOL2EI::initialize") + + _info("## DoProvenanceRef: %s" % self.DoProvenanceRef); + _info("## DoTriggerInfo: %s" % self.DoTriggerInfo); + + # token match regex + import re + self._re_pool_token = re.compile(r'[[]DB=(?P<db>.*?)[]]' \ + r'[[]CNT=(?P<cnt>.*?)[]]' \ + r'[[]CLID=(?P<clid>.*?)[]]' \ + r'[[]TECH=(?P<tech>.*?)[]]' \ + r'[[]OID=(?P<oid>.*?)[]]').match + + # load our pythonizations: + for cls_name in ('EventStreamInfo', 'EventType', 'PyEventType'): + cls = getattr(PyAthena, cls_name) + + _info("retrieving various stores...") + for store_name in ('evtStore', 'inputStore', + 'tagStore', 'metaStore'): + _info('retrieving [%s]...', store_name) + o = getattr(self, store_name) + _info('retrieving [%s]... [done]', store_name) + ## if store_name not in ('evtStore',): + ## _info('loading event proxies for [%s]...', store_name) + ## status = 'ok' if o.loadEventProxies().isSuccess() else 'err' + ## _info('loading event proxies for [%s]... (%s)', store_name, status) + _info("retrieving various stores... [done]") + + + ## open output pkl file + import os + oname = self.Out + oname = os.path.expanduser(os.path.expandvars(oname)) + _info('Opening EI file [%s]...', oname) + if os.path.exists(oname): + os.remove(oname) + + import PyUtils.dbsqlite as dbsqlite + + try: + self._eif = dbsqlite.open(oname,flags='w') + except: + pass + + if self._eif is None: + self.msg.fatal("Unable to open EI output file %s exapnded as %s" % (self.Out, oname)) + raise RuntimeError("Unable to open EI output file") + + # initial information + self._eif['StartProcTime'] = int(time.time() * 1000) + self._eif['Schema'] = EIrecord().getRecSchema() + self._eif['Version'] = EIrecord().getVersion() + self._eif['PandaID'] = os.getenv('PandaID', 0) + self._eif['PanDA_TaskID'] = os.getenv('PanDA_TaskID', 0) + + #processing options + self._eif['ProvenanceRef'] = self.DoProvenanceRef + self._eif['TriggerInfo'] = self.DoTriggerInfo + + + return StatusCode.Success + + @property + def evtStore(self): + import AthenaPython.PyAthena as PyAthena + return PyAthena.py_svc('StoreGateSvc/StoreGateSvc') + + @property + def metaStore(self): + import AthenaPython.PyAthena as PyAthena + return PyAthena.py_svc('StoreGateSvc/MetaDataStore') + + @property + def tagStore(self): + import AthenaPython.PyAthena as PyAthena + return PyAthena.py_svc('StoreGateSvc/TagMetaDataStore') + + @property + def inputStore(self): + import AthenaPython.PyAthena as PyAthena + return PyAthena.py_svc('StoreGateSvc/InputMetaDataStore') + + def process_metadata(self, store, metadata_name): + msg = self.msg + try: + obj = store[metadata_name] + except KeyError,err: + msg.warning('could not retrieve [%s]', metadata_name) + return ([],[]) + msg.info('processing container [%s]', obj.folderName()) + data = [] + iovs = [] + payloads = obj.payloadContainer() + payloads_sz = payloads.size() + if hasattr(payloads, 'at'): + # HACK for bug #77976 + _tmp = payloads + payloads = [] + for ii in range(payloads_sz): + payloads.append(_tmp.at(ii)) + pass + for ii,payload in zip(range(payloads_sz), payloads): + if not payload: + msg.info ("**error** null-pointer ?") + continue + # names + chan_names = [] + sz = payload.name_size() + #msg.info('==names== (sz: %s)', sz) + for idx in xrange(sz): + chan = payload.chanNum(idx) + chan_name = payload.chanName(chan) + #msg.info( '--> (%s, %s)', idx, chan_name) + chan_names.append(chan_name) + + # iovs + sz = payload.iov_size() + #msg.info('==iovs== (sz: %s)',sz) + for idx in xrange(sz): + chan = payload.chanNum(idx) + iov_range = payload.iovRange(chan) + iov_start = iov_range.start() + iov_stop = iov_range.stop() + msg.info( '(%s, %s) => (%s, %s) valid=%s runEvt=%s', + iov_start.run(), + iov_start.event(), + iov_stop.run(), + iov_stop.event(), + iov_start.isValid(), + iov_start.isRunEvent()) + iovs.append((iov_start.run(),iov_start.event(),iov_stop.run(),iov_stop.event(),iov_start.isValid(),iov_start.isRunEvent())) + + # attrs + attrs = [] # can't use a dict as spec.name() isn't unique + sz = payload.size() + #msg.info('==attrs== (sz: %s)', sz) + for idx in xrange(sz): + chan = payload.chanNum(idx) + #msg.info("idx: %i chan: %s", idx, chan) + attr_list = payload.attributeList(chan) + #pdb.set_trace() + attr_list = list(toiter(attr_list.begin(), + attr_list.end())) + attr_data = [] + for a in attr_list: + #msg.info((a,dir(a),type(a))) + spec = a.specification() + a_type = spec.typeName() + a_type = 'std::string' if a_type == 'string' else a_type + a_data = getattr(a,'data<%s>'%a_type)() + if a_type == 'std::string': + try: + a_data = eval(a_data,{},{}) + except Exception: + # swallow and keep as a string + pass + #msg.info("%s: %s", spec.name(), a_data) + attr_data.append((spec.name(), + #type(a_data), + a_data)) + attrs.append(dict(attr_data)) + # msg.info(attrs[-1]) + if len(attrs) == len(chan_names): + data.append(dict(zip(chan_names,attrs))) + else: + if len(attrs): + if len(attrs) == 1: + data.append(attrs[0]) + else: + data.append(attrs) + else: + data.append(chan_names) + pass # loop over payloads... + + return (data,iovs) + + ########################################## + # execute at begin of file + ########################################## + def beginFile(self): + import AthenaPython.PyAthena as PyAthena + _info = self.msg.info + _info("POOL2EI::beginFile") + + # entries for this input file + self._eif_entries = 0 + + # define a new IOV storage + self._iov = IOV() + + ## inputStore + #self.inputStore.dump() + store = self.inputStore + esi_keys = store.keys('EventStreamInfo') + nentries = None + ddt = None + if len(esi_keys) >= 1: + sg_key = esi_keys[-1] + nentries = 0 + stream_names = esi_keys[:] + for sg_key in esi_keys: + esi = store.retrieve('EventStreamInfo', sg_key) + _info('=== [EventStreamInfo#%s] ===', sg_key) + nentries += esi.getNumberOfEvents() + evt_types = PyAthena.EventStreamInfo.evt_types(esi) + #print 'nentries ', nentries + #print 'evt_types ', evt_types + + + ##/TRIGGER/HLT/HltConfigKeys + (hltck_info, hltck_iovs) = self.process_metadata(self.inputStore,'/TRIGGER/HLT/HltConfigKeys') + hltpsk_l = [ x['HltPrescaleConfigurationKey'] for x in hltck_info ] + for val, iov in zip(hltpsk_l,hltck_iovs): + self._iov.add('HLTPSK',val,iov[:4]) + smk_l = [ x['MasterConfigurationKey'] for x in hltck_info ] + for val, iov in zip(smk_l,hltck_iovs): + self._iov.add('SMK',val,iov[:4]) + + ##/TRIGGER/LVL1/Lvl1ConfigKey + (l1ck_info, l1ck_iovs) = self.process_metadata(self.inputStore,'/TRIGGER/LVL1/Lvl1ConfigKey') + l1ck_l = [ x['Lvl1PrescaleConfigurationKey'] for x in l1ck_info ] + for val, iov in zip(l1ck_l,l1ck_iovs): + self._iov.add('L1PSK',val,iov[:4]) + + + (tginfo, tgiovs) = self.process_metadata(self.inputStore,'/TagInfo') + #pdb.set_trace() + amitag=None + trigStream=None + projName=None + if len(tginfo) > 0: + for tgi in tginfo: + if 'AMITag' in tgi: + amitag = tgi['AMITag'] + _info("## AMITag: %s" %amitag) + if 'triggerStreamOfFile' in tgi: + trigStream = tgi['triggerStreamOfFile'] + _info("## triggerStreamOfFile: %s" % trigStream) + if 'project_name' in tgi: + projName = tgi['project_name'] + _info("## project_name: %s" % projName) + + + + if self._eif is not None: + nfile = self._eif_nfiles + self._eif['StartProcTime_%d' % nfile] = int(time.time() * 1000) + self._eif['AMITag_%d' % nfile] = amitag + self._eif['TrigStream_%d' % nfile] = trigStream + self._eif['ProjName_%d' % nfile] = projName + + self._eif_nfiles += 1 + + #self._iov.dump() + + return + + + ########################################## + # execute at end of file + ########################################## + def endFile(self): + import AthenaPython.PyAthena as PyAthena + _info = self.msg.info + _info("POOL2EI::endFile") + + nfile = self._eif_nfiles-1 + self._eif['Nentries_%d'%nfile] = self._eif_entries + self._eif['EndProcTime_%d'%nfile] = int(time.time() * 1000) + + ########################################## + # execute event by event + ########################################## + def execute(self): + import AthenaPython.PyAthena as PyAthena + _info = self.msg.info + _error = self.msg.error + + _info("POOL2EI::execute") + + eirec = EIrecord() + + # -- Get EventInfo data + store = self.evtStore + evt_info_keys = store.keys('EventInfo') + if len(evt_info_keys) != 1: + _info('more than one EventInfo: %s', evt_info_keys) + _info(' ==> we\'ll use [%s]', evt_info_keys[0]) + sg_key = evt_info_keys[0] + ei = store.retrieve('EventInfo', sg_key) + _info('=== [EventInfo#%s] ===', sg_key) + eid = ei.event_ID() + _info('## bunch_crossing_id: %d', eid.bunch_crossing_id()) + #_info('## detector_mask: %d', eid.detector_mask()) + #_info('## detector_mask0: %d', eid.detector_mask0()) + #_info('## detector_mask1: %d', eid.detector_mask1()) + _info('## event_number: %d', eid.event_number()) + _info('## lumi_block: %d', eid.lumi_block()) + _info('## run_number: %d', eid.run_number()) + _info('## time_stamp: %d', eid.time_stamp()) + _info('## time_stamp_ns_offset: %d', eid.time_stamp_ns_offset()) + + + # event type + eitype = ei.event_type() + bm = list(eitype.bit_mask) + if 'IS_SIMULATION' in bm: + eirec['IsSimulation']=1 # IS_SIMULATION + else: + eirec['IsSimulation']=0 # IS_DATA + + if 'IS_TESTBEAM' in bm: + eirec['IsTestBeam']=1 # IS_TESTBEAM + else: + eirec['IsTestBeam']=0 # IS_FROM_ATLAS_DET + + if 'IS_CALIBRATION' in bm: + eirec['IsCalibration']=1 # IS_CALIBRATION + else: + eirec['IsCalibration']=0 # IS_PHYSICS + + + + run_number=eid.run_number() + event_number=eid.event_number() + lumi_block=eid.lumi_block() + + eirec['RunNumber'] = run_number + eirec['EventNumber'] = event_number + eirec['LumiBlockN'] = lumi_block + eirec["BunchId"] = eid.bunch_crossing_id() + eirec['EventTime'] = eid.time_stamp() + eirec['EventTimeNanoSec'] = eid.time_stamp_ns_offset() + eirec['EventWeight'] = eitype.mc_event_weight() + eirec['McChannelNumber'] = eitype.mc_channel_number() + + _info('## EventWeight: %f', eitype.mc_event_weight()) + _info('## McChannelNumber: %d', eitype.mc_channel_number()) + + # -- trigger processing + if self.DoTriggerInfo: + eit = ei.trigger_info() + _info("## Lvl1ID %s" % eit.extendedLevel1ID()) + Lvl1ID = eit.extendedLevel1ID() + eirec['Lvl1ID'] = Lvl1ID + trigL1="" + trigL2="" + trigEF="" + n=0 + for v in eit.level1TriggerInfo(): + trigL1+="{0:032b}".format(v)[::-1] + if v != 0: + _info("L1PassedTrigMask%s%d: %d" % (['TBP','TAP','TAV'][n/8],n%8,v)) + n+=1 + n=0 + for v in eit.level2TriggerInfo(): + trigL2+="{0:032b}".format(v)[::-1] + if v != 0: + _info("L2PassedTrigMask%d: %d"%(n, v)) + n+=1 + n=0 + for v in eit.eventFilterInfo(): + trigEF+="{0:032b}".format(v)[::-1] + if v != 0: + _info("EFPassedTrigMask%d: %d"%(n, v)) + n+=1 + trigL1=compressB64(trigL1) + trigL2=compressB64(trigL2) + trigEF=compressB64(trigEF) + _info("## trigL1: %s"% trigL1) + _info("## trigL2: %s"% trigL2) + _info("## trigEF: %s"% trigEF) + + eirec['L1PassedTrigMask'] = trigL1 + eirec['L2PassedTrigMask'] = trigL2 + eirec['EFPassedTrigMask'] = trigEF + + + ## form Metadata + SMK = self._iov.get('SMK',(run_number,event_number)) + L1PSK = self._iov.get('L1PSK',(run_number,lumi_block)) + HLTPSK = self._iov.get('HLTPSK',(run_number,lumi_block)) + _info('## SMK: %s'% SMK) + _info('## L1PSK: %s'% L1PSK) + _info('## HLTPSK: %s'% HLTPSK) + + eirec['SMK'] = SMK + eirec['L1PSK'] = L1PSK + eirec['HLTPSK'] = HLTPSK + + + stream_refs = {} # sreeam reference + Pstream_refs = {} # provenance references + procTag = None + ## -- Stream references + dh = store.retrieve('DataHeader') + procTag = dh.getProcessTag() + _info("## ProcessTag: " + procTag) + + #pdb.set_trace() + ## provenance referencess + if self.DoProvenanceRef: + if dh.sizeProvenance() > 0: + prv = dh.beginProvenance() + for i in range(0,dh.sizeProvenance()): + try: + tk=prv.getToken().toString() + match = self._re_pool_token(tk) + except: + tk=prv.getToken() + match = self._re_pool_token(tk) + if not match: + msg.warning('Provenance token can not be parsed: %s', tk) + continue + d = match.groupdict() + key=prv.getKey() + # CNT is empty. Complete information + if key == "StreamRAW": + stk = "[DB=%s][CNT=00000000][CLID=%s][TECH=%s][OID=%s]" % \ + (d['db'],d['clid'],d['tech'],d['oid']) + elif key in ("StreamAOD","StreamESD","StreamRDO","StreamHITS", + "StreamEVGEN","EmbeddingStream"): + stk = "[DB=%s][CNT=POOLContainer(DataHeader)][CLID=%s][TECH=%s][OID=%s]" % \ + (d['db'],d['clid'],d['tech'],d['oid']) + else: + _info("provenance %s=%s" % (key,tk)) + _error('Unknown provenance stream: %s', key) + raise RuntimeError('Unknown provenance stream') + _info("## P"+ key+"_ref: "+stk) + Pstream_refs[key]=stk + prv += 1 + + ## stream references + for dhe in dh: + key = dhe.getKey() + if key.startswith('Stream'): + _info("## Stream: "+key) + if key in [ procTag, 'StreamAOD' ]: + try: + match = self._re_pool_token(dhe.getToken().toString()) + except: + match = self._re_pool_token(dhe.getToken()) + if not match: + continue + d = match.groupdict() + tk_guid = d['db'] + stk = "[DB=%s][CNT=POOLContainer(DataHeader)][CLID=%s][TECH=%s][OID=%s]" \ + % (tk_guid,d['clid'],d['tech'],d['oid']) + _info("## "+ key+"_ref: "+stk) + ##stream_refs[key+"_ref"]=stk + if "ProcTag_ref" in stream_refs: + _info("Already inserted key ProcTag_ref in stream_refs with value "+stream_refs["ProcTag_ref"]) + stream_refs["Sref0"]=stk + try: + # Update ref token to handle fast merged files. + stk = store.proxy(dh).address().par().c_str() + stream_refs["Sref0"]=stk + _info("Updated ref token "+stk) + except: + pass + #print "stream_refs: ", stream_refs + del dh + + if self._eif is not None: + for sr,v in stream_refs.iteritems(): + try: + eirec[sr] = v + except: + _info("Unable to insert " + sr + " in stream references with value "+v) + pass + idx=1 + for sr,v in Pstream_refs.iteritems(): + try: + eirec['Snam%d'%idx] = sr + eirec['Sref%d'%idx] = v + except: + _info("Unable to insert " + sr + " in provenance stream references with value "+v) + pass + idx += 1 + + + eirec['Snam0'] = procTag + self._eif['Entry_%d' % self._eif_totentries] = eirec.getRec() + self._eif_entries += 1 # for this input file + self._eif_totentries += 1 # for all input fies + + + """ + all_objects = muppy.get_objects() + sum1 = summary.summarize(all_objects) + summary.print_(sum1) + """ + + return StatusCode.Success + + def finalize(self): + _info = self.msg.info + _info("POOL2EI::finalize") + + if self._eif is not None: + self._eif['Nentries'] = self._eif_totentries + self._eif['Nfiles'] = self._eif_nfiles + self._eif['EndProcTime'] = int(time.time() * 1000) + self._eif.close() + + + return StatusCode.Success + + +class POOL2EISvc(PyAthena.Svc): + """Service + """ + + def __init__(self, name='POOL2EISvc', **kw): + super(POOL2EISvc, self).__init__(name, **kw) + _info = self.msg.info + _info("POOL2EISvc::__init__") + + #save algorithm to call on incident + if 'algo' in kw: + self.algo = kw['algo'] + + def initialize(self): + # register with the incident svc + _info = self.msg.info + _info("POOL2EISvc::initialize") + incsvc = PyAthena.py_svc('IncidentSvc', iface='IIncidentSvc') + if not incsvc: + self.msg.error('unable to get the incident svc') + return StatusCode.Failure + + incsvc.addListener(self, 'BeginInputFile') + incsvc.addListener(self, 'EndInputFile') + incsvc.release() + + return StatusCode.Success + + + def finalize(self): + _info = self.msg.info + _info("POOL2EISvc::finalize") + return StatusCode.Success + + def handle(self, incident): + _info = self.msg.info + _info("POOL2EISvc::handle") + tp = incident.type() + if tp == 'EndEvent': + pass + elif tp == 'BeginInputFile': + self.msg.info('POOL2EISvc::handle BeginInputFile') + self.algo.beginFile() + pass + elif tp == 'EndInputFile': + self.msg.info('POOL2EISvc::handle EndInputFile') + self.algo.endFile() + pass + else: + pass + return + + pass # class POOL2EISvc + diff --git a/Database/EventIndex/EventIndexProducer/python/__init__.py b/Database/EventIndex/EventIndexProducer/python/__init__.py new file mode 100755 index 0000000000000000000000000000000000000000..74583d364ec2ca794156596c7254d9b234a940c6 --- /dev/null +++ b/Database/EventIndex/EventIndexProducer/python/__init__.py @@ -0,0 +1,2 @@ +# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + diff --git a/Database/EventIndex/EventIndexProducer/python/compressB64.py b/Database/EventIndex/EventIndexProducer/python/compressB64.py new file mode 100755 index 0000000000000000000000000000000000000000..adc2a65e454da167769bb354b2800efe86abea79 --- /dev/null +++ b/Database/EventIndex/EventIndexProducer/python/compressB64.py @@ -0,0 +1,131 @@ +#!/bin/env python + +# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + +""" Encodes a bit string composed mostly by zeroes""" + +_B64_alphabet = ( "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789+/") + +def _toB64(n): + s="" + while n>63: + s = _B64_alphabet[n%64] + s + n = n/64 + s = _B64_alphabet[n%64] + s + return s + + +def _fromB64(s): + n=0 + for i in range(0,len(s)): + n *= 64 + n += _B64_alphabet.index(s[i]) + return n + + +def compressB64(s): + """Encodes a string of zeroes and ones using a RLE method. + + s is the string to encode. + + Consecutive zeroes are encoded by its repetition count in Base64. + Ones are copied to output substituted by exclamation marks. + + Example: + input: 0001100001001101010000 + output: D!!E!C!!B!B!E + + Effective compression is only achieved when input is composed mostly + by zeroes. + + The encoded string is returned. + """ + + count = 0; + res="" + for k in range(0,len(s)): + c = s[k] + if c == '0': + count += 1 + else: + if count > 0: + res += _toB64(count) + res += "!" + count = 0 + + if count > 0: + res += _toB64(count) + + return res + +def decompressB64(s): + """Decodes a string coded by compressB(). + + s is the string to decode. + + The decoded string is returned. + """ + + res="" + count="" + for k in range(0,len(s)): + c = s[k] + if c == "!" or c == " ": + if len(count)>0: + res += "0" * _fromB64(count) + count="" + res += c.replace("!","1").replace(" ","0") + else: + count+=c + if len(count)>0: + res += "0" * _fromB64(count) + + return res + + +if __name__ == "__main__": + s = "000010000000000010000001000000000001000000000010000001" + c=compressB64(s) + d=decompressB64(c) + x=int(s, 2) + sx="%X"%x + + print "" + print "Original: >>"+s+"<<" + print "Compressed: >>"+c+"<<" + print "Hex repr: >>"+sx+"<<" + print "Decompressed: >>"+d+"<<" + + import random + for i in range(0,100): + # generate random bit string + s="" + for j in range(0,1000): + if random.random()>0.02: + s += '0' + else: + s += '1' + + # test compression and decompression + c=compressB64(s) + d=decompressB64(c) + x=int(s, 2) + sx="%X"%x + + if i == 0: + print "" + print "Original: >>"+s+"<<" + print "Compressed: >>"+c+"<<" + print "Hex repr: >>"+sx+"<<" + print "Decompressed: >>"+d+"<<" + + if s != d: + print "" + print "Decompression error" + print "Original: >>"+s+"<<" + print "Compressed: >>"+c+"<<" + print "Hex repr: >>"+sx+"<<" + print "Decompressed: >>"+d+"<<" + diff --git a/Database/EventIndex/EventIndexProducer/scripts/POOLtoEI_tf.py b/Database/EventIndex/EventIndexProducer/scripts/POOLtoEI_tf.py new file mode 100755 index 0000000000000000000000000000000000000000..b85ec9c9590de91a3a32109680b7fd3fd56a1910 --- /dev/null +++ b/Database/EventIndex/EventIndexProducer/scripts/POOLtoEI_tf.py @@ -0,0 +1,67 @@ +#! /usr/bin/env python + +# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + +import argparse +import os.path +import sys +import time +import traceback + +import logging + +# Setup core logging here +from PyJobTransforms.trfLogger import msg +msg.info('logging set in %s' % sys.argv[0]) + +from PyJobTransforms.trfExitCodes import trfExit +from PyJobTransforms.transform import transform +from PyJobTransforms.trfExe import athenaExecutor +from PyJobTransforms.trfArgs import addAthenaArguments, addDetectorArguments +from PyJobTransforms.trfDecorators import stdTrfExceptionHandler, sigUsrStackTrace + +import PyJobTransforms.trfExceptions as trfExceptions +import PyJobTransforms.trfArgClasses as trfArgClasses + +@stdTrfExceptionHandler +@sigUsrStackTrace +def main(): + + msg.info('This is %s' % sys.argv[0]) + + trf = getTransform() + trf.parseCmdLineArgs(sys.argv[1:]) + trf.execute() + trf.generateReport() + + msg.info("%s stopped at %s, trf exit code %d" % (sys.argv[0], time.asctime(), trf.exitCode)) + sys.exit(trf.exitCode) + +## Get the base transform with all arguments added +def getTransform(): + trf = transform(executor = athenaExecutor(name = 'POOLtoEI', + skeletonFile = 'EventIndexProducer/skeleton.POOLtoEI_tf.py')) + addAthenaArguments(trf.parser) + addDetectorArguments(trf.parser) + addMyArgs(trf.parser) + return trf + + +def addMyArgs(parser): + # Use arggroup to get these arguments in their own sub-section (of --help) + parser.defineArgGroup('POOLtoEI_tf', 'POOL to EI specific options') + parser.add_argument('--inputPOOLFile', nargs='+', + type=trfArgClasses.argFactory(trfArgClasses.argPOOLFile, io='input'), + help='Input POOL file', group='POOLtoEI_tf') + parser.add_argument('--outputEIFile', + type=trfArgClasses.argFactory(trfArgClasses.argFile, io='output', type='misc'), + help='Output EI file', group='POOLtoEI_tf') + parser.add_argument("--trigger", + type=trfArgClasses.argFactory(trfArgClasses.argBool, runarg=True), + help="Include trigger information (default: false)") + parser.add_argument("--provenance", + type=trfArgClasses.argFactory(trfArgClasses.argBool, runarg=True), + help="Include provenance information (default: true)") + +if __name__ == '__main__': + main() diff --git a/Database/EventIndex/EventIndexProducer/scripts/RAW2EI.py b/Database/EventIndex/EventIndexProducer/scripts/RAW2EI.py new file mode 100755 index 0000000000000000000000000000000000000000..d0b3da52e4688e09cf7e80a1b3c07b24566ba03c --- /dev/null +++ b/Database/EventIndex/EventIndexProducer/scripts/RAW2EI.py @@ -0,0 +1,333 @@ +#!/usr/bin/env python + +# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + +#import pdb +import eformat +import sys +import os +import time +import logging +from libpyeformat import FullEventFragment, convert_old +import libpyeformat_helper as helper +import libpyevent_storage as EventStorage + +import argparse +import PyUtils.dbsqlite as dbsqlite +from EventIndexProducer.compressB64 import compressB64 +from EventIndexProducer.EI_Lib import EIrecord, IOV + + +# logger +log=None + + +class blob_istream2(object): + """Returns useless blobs of data, as defined by the EventStorage i/f.""" + + def __init__(self, f): + + #we leave completely to the data reader to defined "valid files". + self.total_events = 0 + dr = EventStorage.pickDataReader(f) + if dr is None: + raise IOError("Invalid file or format at '%s'" % f) + self.total_events = dr.eventsInFile() + + self.file = f + + def __len__(self): + """Returns the number of events available in this stream""" + return self.total_events + + def __iter__(self): + dr = EventStorage.pickDataReader(self.file) + for k in range(dr.eventsInFile()): + yield (dr.getPosition(), dr.getData()) + + def __getitem__(self, key): + """Returns a specific blob in the stream. The 'key' parameter maybe + either a single valued integer or a slice object, in which case many + fragments are returned in one shot.""" + + if type(key) is int or type(key) is long: + if key >= len(self): + raise IndexError, "Index (%d) is bigger than my length (%d)" % (key, self.total_events) + if key < 0: + if (-key) > len(self): + raise IndexError, "Index (%d) is too small for my length (%d)" % (key, self.total_events) + key = len(self) + key + + f = None + for f in self: + key -= 1 + if key < 0: break + return f + + elif type(key) is slice: + (start, stop, stride) = key.indices(self.total_events) + valid = range(start, stop, stride) + retval = [] + counter = 0 + for f in self: + if counter in valid: + retval.append(f) + counter += 1 + return retval + +class istream2(blob_istream2): + """The istream class allows the user to read file streams. + """ + + def __init__(self, f): + blob_istream2.__init__(self, f) + + def __iter__(self): + dr = EventStorage.pickDataReader(self.file) + for k in range(dr.eventsInFile()): + offset = dr.getPosition() + blob = dr.getData() + + # check for people trying old versions and convert it on the spot + fragment_version = helper.Version(blob[3]) + if fragment_version.major_version() != helper.MAJOR_DEFAULT_VERSION: + current_version = helper.Version() + logging.debug("Converting from version %s to %s" % \ + (fragment_version.human_major(), current_version.human_major())) + blob = convert_old(blob) + + if blob[0] == helper.HeaderMarker.FULL_EVENT: + yield (offset,FullEventFragment(blob)) + else: + raise SyntaxError, "Expecting event marker, not 0x%08x" % blob[0] + + def iter_raw(self): + dr = EventStorage.pickDataReader(self.file) + for k in range(dr.eventsInFile()): + yield (dr.getPosition(), dr.getData()) + + +def processRAW(input_file, eif, nfile, nfirst, evtmax): + + log.info("Opening data file: %s" % ( input_file)) + eif['StartProcTime_%d'%nfile] = int(time.time() * 1000) + + dr = EventStorage.pickDataReader(input_file) + if dr is None: + raise IOError("Invalid file or format at '%s'" % input_file) + + log.info("total_events: %s",dr.eventsInFile()) + log.debug("dataMB: %s", dr.dataMB_InFile()) + log.debug("LumiBlock: %s", dr.lumiblockNumber()) + log.debug("Stream: %s", dr.stream()) + log.debug("App Name: %s", dr.appName()) + log.debug("beamEnergy: %s", dr.beamEnergy()) + log.debug("beamType: %s", dr.beamType()) + log.debug("detectorMask: %s", dr.detectorMask()) + log.debug("Core Name: %s", dr.fileNameCore()) + log.debug("projectTag: %s", dr.projectTag()) + log.debug("GUID: %s", dr.GUID()) + log.debug("runNumber: %s", dr.runNumber()) + log.debug("stream: %s", dr.stream()) + log.debug("triggerType: %s", dr.triggerType()) + + GUID = dr.GUID() + + + eif['ProjName_%d'%nfile] = dr.projectTag() + eif['TrigStream_%d'%nfile] = dr.stream() + eif['AMITag_%d'%nfile] = "" # no tag for RAW data + + + + #input = eformat.istream([input_file]) + input = istream2(input_file) + + ################################################################### + + cntEvt=0 + cntEvtEI=nfirst + + for (offset,event) in input: + + if evtmax >0 and cntEvtEI >= evtmax : + break; + + eirec = EIrecord() + + L1TBP=event.lvl1_trigger_info()[0:8] + L1TAP=event.lvl1_trigger_info()[8:16] + L1TAV=event.lvl1_trigger_info()[16:24] + log.debug("--------------------------------") + log.debug('Event: %s',cntEvt) + log.debug('Offset: %s', offset) + log.debug('RunNumber: %s',event.run_no()) + log.debug("EventNumber: %s", event.global_id()) + log.debug('L1ID: %s',event.lvl1_id()) + log.debug("EventTime: %s", event.bc_time_seconds()) + log.debug("EventTimeNanoSec: %s", event.bc_time_nanoseconds()) + log.debug("LumiBlockN: %s",event.lumi_block()) + log.debug("BunchId: %s", event.bc_id()) + log.debug('L1 type: 0x%02x' % event.lvl1_trigger_type()) + log.debug('L1 Before Prescale: %s',L1TBP) + log.debug('L1 After Prescale: %s', L1TAP) + log.debug('L1 After Veto: %s',L1TAV) + log.debug("RunType: %s",event.run_type()) + + + eirec['RunNumber'] = event.run_no() + eirec['EventNumber'] = event.global_id() + eirec['LumiBlockN'] = event.lumi_block() + eirec["BunchId"] = event.bc_id() + eirec['EventTime'] = event.bc_time_seconds() + eirec['EventTimeNanoSec'] = event.bc_time_nanoseconds() + eirec['EventWeight'] = 1.0 + eirec['McChannelNumber'] = 0 + eirec['Lvl1ID'] = event.lvl1_id() + + + #Run Type Value + #Physics 0x00000000 + #Calibration 0x00000001 + #Cosmics 0x00000002 + #Test 0x0000000f + #Simulation 0x80000000 + + runtype = event.run_type().__str__() + eirec['IsSimulation'] = 1 + eirec['IsTestBeam'] = 0 + eirec['IsCalibration'] = 0 + if "PHYSICS" in runtype: + eirec['IsSimulation'] = 0 + if "CALIBRATION" in runtype: + eirec['IsCalibration'] = 1 + if "TEST" in runtype: + eirec['IsTestBeam'] = 1 + + eirec['SMK'] = 0 + eirec['L1PSK'] = 0 + eirec['HLTPSK'] = 0 + + eirec['Snam0'] = "StreamRAW" + offset_str = "{0:016X}".format(offset) + offset_str1 = offset_str[:8] + offset_str2 = offset_str[8:] + tk_tmpl = "[DB=%s][CNT=00000000][CLID=00000000-0000-0000-0000-000000000000][TECH=00001000][OID=%s-%s]" + eirec['Sref0'] = tk_tmpl % (GUID,offset_str1,offset_str2) + + L1=event.lvl1_trigger_info() + trigL1="" + for l in L1: + trigL1+="{0:032b}".format(l)[::-1] + L2=event.lvl2_trigger_info() + trigL2="" + for l in L2: + trigL2+="{0:032b}".format(l)[::-1] + EF=event.event_filter_info() + trigEF="" + for l in EF: + trigEF+="{0:032b}".format(l)[::-1] + trigL1=compressB64(trigL1) + trigL2=compressB64(trigL2) + trigEF=compressB64(trigEF) + eirec['L1PassedTrigMask'] = trigL1 + eirec['L2PassedTrigMask'] = trigL2 + eirec['EFPassedTrigMask'] = trigEF + + # write to db + eif['Entry_%d' % cntEvtEI] = eirec.getRec() + cntEvt += 1 + cntEvtEI += 1 + + eif['Nentries_%d'%nfile] = cntEvt + eif['EndProcTime_%d'%nfile] = int(time.time() * 1000) + + return cntEvt + + +def options(argv): + + parser = argparse.ArgumentParser(description='RAW to EI processor') + parser.add_argument('-v','--verbose', action='count', help='Verbosity level') + parser.add_argument("-d","--debug", action='store_true', default=False,help="Debug messages") + parser.add_argument("-m","--evtmax", type=int,default=0, help="Max events to read") + parser.add_argument("input", help="Input RAW file") + parser.add_argument("output", help="Output EI file") + + opt=parser.parse_args() + + ifiles = opt.input.split(',') + opt.input=ifiles + + return opt + + +def main(): + + # logger + global log + log = logging.getLogger('RAW2EI.py') + + # get options + opt = options(sys.argv) + + if opt.verbose > 0: + log.setLevel(logging.INFO) + + if opt.debug > 0: + log.setLevel(logging.DEBUG) + + # open EI file + if os.path.exists(opt.output): + os.remove(opt.output) + eif = dbsqlite.open(opt.output,flags='w') + + eif['StartProcTime'] = int(time.time() * 1000) + eif['Schema'] = EIrecord().getRecSchema() + eif['Version'] = EIrecord().getVersion() + eif['PandaID'] = os.getenv('PandaID', 0) + eif['PanDA_TaskID'] = os.getenv('PanDA_TaskID', 0) + + #processing options + eif['ProvenanceRef'] = False + eif['TriggerInfo'] = True + + + ## process RAW files + if type(opt.input) is str: + filelist = [opt.input] + else: + filelist = opt.input + + + nfile=0; + nevents=0; + for f in filelist: + nevents += processRAW(f,eif,nfile,nevents,opt.evtmax); + nfile += 1; + if opt.evtmax >0 and nevents >= opt.evtmax : + break; + + + eif['Nentries'] = nevents + eif['Nfiles'] = nfile + eif['EndProcTime'] = int(time.time() * 1000) + eif.close() + + +if __name__ == '__main__': + + # logger setup + root = logging.getLogger() + ch = logging.StreamHandler(sys.stdout) + ch.setLevel(logging.DEBUG) + #formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + formatter = logging.Formatter('%(name)-15s %(levelname)9s %(message)s') + ch.setFormatter(formatter) + root.addHandler(ch) + + main() + + + + diff --git a/Database/EventIndex/EventIndexProducer/scripts/TAG2EI.py b/Database/EventIndex/EventIndexProducer/scripts/TAG2EI.py new file mode 100755 index 0000000000000000000000000000000000000000..795bbfa21848e9cd0627794598c7a1989bd775d5 --- /dev/null +++ b/Database/EventIndex/EventIndexProducer/scripts/TAG2EI.py @@ -0,0 +1,334 @@ +#!/bin/env python + +# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + +import sys +import os +import time +import PyUtils.dbsqlite as dbsqlite +import re +import argparse +import PyUtils.RootUtils as ru +import PyCintex +#import pdb +from EventIndexProducer.compressB64 import compressB64 +from EventIndexProducer.EI_Lib import EIrecord, IOV + + +def options(argv): + + parser = argparse.ArgumentParser(description='Tag to EI processor') + parser.add_argument('-v','--verbose', action='count', help='Verbosity level') + parser.add_argument("-d","--debug", action='store_true', default=False,help="Debug messages") + parser.add_argument("-m","--evtmax", type=int,default=0, help="Max events to read") + parser.add_argument("input", help="Input TAG file") + parser.add_argument("output", help="Output EI file") + + return parser.parse_args() + + +def _import_ROOT(): + import sys + sys.argv.insert(1, '-b') + import ROOT + del sys.argv[1] + return ROOT + +def Tagfile(fname): + + #ROOT = ru.import_root() + ROOT = _import_ROOT() + PyCintex.Cintex.Enable() + + f = ROOT.TFile.Open (fname, "read") + assert not f.IsZombie() and f.IsOpen(), \ + "problem opening POOL file [%s]"%fname + + tree_name="CollectionMetadata" + + t = f.Get(tree_name) + assert isinstance(t, ROOT.TTree), \ + "could not retrieve tree [%s]" % tree_name + + CollectionMetadata={} + nentries = t.GetEntries() + for i in xrange(nentries): + t.GetEntry (i) + k=getattr(t,"Key") + v=getattr(t,"Value") + + # take string until \0 is found + n=k.find('\0') + k=k[:n] + + n=v.find('\0') + v=v[:n] + + #print k,v + CollectionMetadata[k]=v + + keys = [k.GetName() for k in f.GetListOfKeys()] + #print "keys" + #pprint (keys) + + tree_name="POOLCollectionTree" + if tree_name not in keys and "CollectionTree" not in keys: + err= "::: error: neither [%s] nor [CollectionTree] in file [%s]" % ( + tree_name, fname) + raise RuntimeError(err) + # try the backward compat. hack + if tree_name not in keys: + tree_name = "CollectionTree" + + t = f.Get(tree_name) + assert isinstance(t, ROOT.TTree), \ + "could not retrieve tree [%s]" % tree_name + + branches = [str(b.GetName()) for b in t.GetListOfBranches()] + #print "== branches" + #pprint(branches) + + + # get all leaves + leaves = t.GetListOfLeaves() + #print "== leaves" + #pprint (leaves) + + # disable everything... + t.SetBranchStatus ("*", 0) + #activate only baskets wich are of interest + for ref in ( 'RunNumber', 'EventNumber', 'LumiBlockN', 'EventTime', 'EventTimeNanoSec', 'EventWeight', + 'BunchId', 'Token', 'StreamAOD_ref', 'StreamESD_ref', 'StreamRAW_ref','IsSimulation','IsCalibration','IsTestBeam' ): + t.SetBranchStatus (ref, 1) + # activate trigger baskets + tbranches = [ k for k in branches if k.find("PassedTrigMask") != -1] + for ref in tbranches: + t.SetBranchStatus (ref, 1) + + class PyListOfLeaves(dict) : + pass + + # create an istance + pyl = PyListOfLeaves() + + # add leaves as attributes + for i in range(0,leaves.GetEntries() ) : + leaf = leaves.At(i) + name = leaf.GetName() + #print "== leaf" + #pprint(name) + #print leaf.GetTypeName() + pyl.__setattr__(name,leaf) + #print "leaf: ",name,leaf.GetTypeName() + + + return (t,pyl,CollectionMetadata) + + +def main(): + + # get options + opt = options(sys.argv) + + if opt.input is None or opt.output is None: + pass + + # open tagfile + (tg, pyl, cm) = Tagfile(opt.input) + + nentries = tg.GetEntries() + + # open database + if os.path.exists(opt.output): + os.remove(opt.output) + eif = dbsqlite.open(opt.output,flags='w') + + eif['StartProcTime'] = int(time.time() * 1000) + eif['StartProcTime_0'] = int(time.time() * 1000) + eif['Schema'] = EIrecord().getRecSchema() + eif['Version'] = EIrecord().getVersion() + eif['PandaID'] = os.getenv('PandaID', 0) + eif['PanDA_TaskID'] = os.getenv('PanDA_TaskID', 0) + + #processing options + eif['ProvenanceRef'] = True + eif['TriggerInfo'] = True + + + ## process TAG file + + iov=IOV() + + # get HLTPSK,L1PSK,SMK from metadata + # [188921,1] - [188921,215] + iov_interval = re.compile(r'[[](?P<r0>[0-9]+?),(?P<e0>[0-9]+?)[]] - '\ + r'[[](?P<r1>[0-9]+?),(?P<e1>[0-9]+?)[]]').match + #pdb.set_trace() + Project = "-" + physicsShort = "-" + TAG = "-" + for k, v in cm.iteritems(): + n=k.find('|') + if n != -1: + k2=k[n+1:] + pfn=k[:n+1][4:] + else: + k2=k + pfn="" + #print k,v + if k2.startswith("Key@HLTPSK;") or k2.startswith("Key@L1PSK;") or k2.startswith("Key@SMK;"): + parts=k2.split(";") + k1=parts[0][4:].rstrip(";") + iov1=parts[1][5:].rstrip("}") + match = iov_interval(iov1) + if not match: + err="BAD IOV" + raise RuntimeError(err) + d = match.groupdict() + iov.add(k1,int(v),(int(d['r0']),int(d['e0']), + int(d['r1']),int(d['e1']))) + + # asume that all events in TAG file belongs to same Project,datasetNumber,physicsShort,prodStep,dataType,TAG + if k2 == 'POOLCollectionID' : + try: + Project,datasetNumber,physicsShort,prodStep,dataType,TAG = pfn.split('.')[:6] + iund=physicsShort.find('_') + physicsShort = physicsShort[iund+1:] + #print pfn.split('.')[:6] + except: + pass + + + #print iov.dump() + + eif['ProjName_0'] = Project + eif['TrigStream_0'] = physicsShort + eif['AMITag_0'] = TAG + + + pool_token = re.compile(r'[[]DB=(?P<db>.*?)[]]' \ + r'[[]CNT=(?P<cnt>.*?)[]]' \ + r'[[]CLID=(?P<clid>.*?)[]]' \ + r'[[]TECH=(?P<tech>.*?)[]]' \ + r'[[]OID=(?P<oid1>.*?)-(?P<oid2>.*?)[]]' \ + ).match + + + + # event loop + tnow0=int(time.time() * 1000) + tnow=tnow0 + for evt_idx in xrange(nentries): + + if opt.evtmax != 0 and evt_idx >= opt.evtmax: + break + + if opt.verbose > 0 and evt_idx%1000 == 0: + tnow1=int(time.time() * 1000) + sys.stderr.write("%8d %5d msecs\n"%(evt_idx, tnow1-tnow)) + tnow=tnow1 + + tg.GetEntry (evt_idx) + eirec = EIrecord() + + run_number=int(pyl.RunNumber.GetCnvValue()) + event_number=int(pyl.EventNumber.GetCnvValue()) + lumi_block = pyl.LumiBlockN.GetCnvValue() + #print "run, evt", run_number,event_number + + eirec['RunNumber'] = run_number + eirec['EventNumber'] = event_number + eirec['LumiBlockN'] = lumi_block + eirec["BunchId"] = pyl.BunchId.GetCnvValue() + eirec['EventTime'] = pyl.EventTime.GetCnvValue() + eirec['EventTimeNanoSec'] = pyl.EventTimeNanoSec.GetCnvValue() + eirec['EventWeight'] = pyl.EventWeight.GetCnvValue() + eirec['McChannelNumber'] = 0 + eirec['Lvl1ID'] = 0 + + eirec['IsSimulation'] = int(pyl.IsSimulation.GetValue()) + eirec['IsTestBeam'] = int(pyl.IsTestBeam.GetValue()) + eirec['IsCalibration'] = int(pyl.IsCalibration.GetValue()) + + + SMK = iov.get('SMK',(run_number,event_number)) + L1PSK = iov.get('L1PSK',(run_number,lumi_block)) + HLTPSK = iov.get('HLTPSK',(run_number,lumi_block)) + #print '## SMK ', SMK + #print '## L1PSK ', L1PSK + #print '## HLTPSK ', HLTPSK + eirec['SMK'] = SMK + eirec['L1PSK'] = L1PSK + eirec['HLTPSK'] = HLTPSK + + try: + eirec['Sref0'] = pyl.StreamAOD_ref.GetCnvValue().rstrip('\0') + eirec['Snam0'] = "StreamAOD" + except: + pass + try: + eirec['Sref0'] = pyl.Token.GetCnvValue().rstrip('\0') + eirec['Snam0'] = "StreamAOD" + except: + pass + try: + eirec['Sref1'] = pyl.StreamRAW_ref.GetCnvValue().rstrip('\0') + eirec['Snam1'] = "StreamRAW" + except: + pass + try: + eirec['Sref2'] = pyl.StreamESD_ref.GetCnvValue().rstrip('\0') + eirec['Snam2'] = "StreamESD" + except: + pass + + + trigL1="" + trigL2="" + trigEF="" + for k in range(0,8): + v=getattr(pyl,'L1PassedTrigMaskTBP%d'%k).GetCnvValue() + trigL1+="{0:032b}".format(v)[::-1] + for k in range(0,8): + v=getattr(pyl,'L1PassedTrigMaskTAP%d'%k).GetCnvValue() + trigL1+="{0:032b}".format(v)[::-1] + for k in range(0,8): + v=getattr(pyl,'L1PassedTrigMaskTAV%d'%k).GetCnvValue() + trigL1+="{0:032b}".format(v)[::-1] + for k in range(0,32): + v=getattr(pyl,'L2PassedTrigMask%d'%k).GetCnvValue() + trigL2+="{0:032b}".format(v)[::-1] + for k in range(0,32): + v=getattr(pyl,'EFPassedTrigMask%d'%k).GetCnvValue() + vs="{0:064b}".format(v) + ef_phys=vs[:32] + ef_inclusive=vs[32:] + trigEF+=ef_inclusive[::-1] + + trigL1=compressB64(trigL1) + trigL2=compressB64(trigL2) + trigEF=compressB64(trigEF) + eirec['L1PassedTrigMask'] = trigL1 + eirec['L2PassedTrigMask'] = trigL2 + eirec['EFPassedTrigMask'] = trigEF + + + # write to db + eif['Entry_%d' % evt_idx] = eirec.getRec() + + + eif['Nentries_0'] = evt_idx + eif['EndProcTime_0'] = int(time.time() * 1000) + eif['Nentries'] = evt_idx + eif['Nfiles'] = 1 + eif['EndProcTime'] = int(time.time() * 1000) + eif.close() + + tnow=int(time.time() * 1000) + sys.stderr.write("Processed %d events in %f seconds\n"%(evt_idx, (tnow-tnow0)/1000.)) + +if __name__ == '__main__': + main() + + + diff --git a/Database/EventIndex/EventIndexProducer/scripts/sendEI.py b/Database/EventIndex/EventIndexProducer/scripts/sendEI.py new file mode 100755 index 0000000000000000000000000000000000000000..07107b55ab43270a1181fc0d315e02297612994f --- /dev/null +++ b/Database/EventIndex/EventIndexProducer/scripts/sendEI.py @@ -0,0 +1,728 @@ +#!/bin/env python + +# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + +import sys +import re +import PyUtils.dbsqlite as dbsqlite +from EventIndexProducer.compressB64 import decompressB64 +import json +import stomp +import argparse +import socket +import uuid +#import pdb +import time +import logging +from os import path, access, R_OK + + +class Stats(object): + pass +stats=Stats() + +# logger +log=None + +class EIFile(object): + + _db=None + _nentries=0 + + def __init__(self,fname): + try: + self._db = dbsqlite.open(fname,flags='r') + except: + log.error("Unable to open EI file %s" % fname) + raise + + try: + self._nentries=self._db['Nentries'] + except: + log.error("Unable to get nentries from EI file %s" % fname) + raise + + def __getitem__(self,key): + if isinstance( key, slice ) : + l=[] + start=key.start + stop=key.stop + if stop > self._nentries: + stop = self._nentries + if key.step is not None: + step=key.step + else: + step=1 + for i in range(start, stop, step): + l.append(self._db["Entry_%d"%i]) + return l + elif isinstance( key, int ) : + if key < 0 : #Handle negative indices + key += self._nentries + if key >= self._nentries : + raise IndexError, "The index (%d) is out of range."%key + return self._db["Entry_%d"%key] + else: + if key in self._db: + return self._db[key] + else: + raise TypeError(key) + + def keys(self): + return self._db.keys() + + def __contains__(self,key): + return self._db.__contains__(key) + + def __iter__(self): + return EIIterator(self._db) + + def iterE(self,max=None): + return EIIterator(self._db,max) + + def close(self): + self._db.close() + + +class EIIterator(object): + + def __init__(self, db, max=None): + self.db=db + self.current = 0 + if max is not None: + self.high = int(max) + if self.high > self.db['Nentries']: + self.high = self.db['Nentries'] + else: + self.high = self.db['Nentries'] + + def __iter__(self): + return self + + def next(self): + if self.current >= self.high: + self.current = 0 # prepare for anoter iteration + raise StopIteration + else: + #print "==", self.current + evt=self.db['Entry_%d'%self.current] + self.current += 1 + return evt + + def end(self): + self.current = self.high + +class EI5(object): + + _schema=[] + _evtlist=[] + _lastevt=[] + _ec={} + _ec_next=None + _inctrigger=False + _incprovenance=False + + def __init__(self,schema): + self._schema=schema + pass + + def _tk2components(self,tk): + pool_token = re.compile(r'[[]DB=(?P<db>.*?)[]]' \ + r'[[]CNT=(?P<cnt>.*?)[]]' \ + r'[[]CLID=(?P<clid>.*?)[]]' \ + r'[[]TECH=(?P<tech>.*?)[]]' \ + r'[[]OID=(?P<oid1>.*?)-(?P<oid2>.*?)[]]' \ + ).match + if tk is None: + return None + match = pool_token(tk) + if not match: + return None + d = match.groupdict() + r=[] + r.append(d['db'].replace('-','')) + r.append(d['cnt']) + r.append(d['clid'].replace('-','')) + r.append("%x"%int(d['tech'],16)) + r.append("%x"%int(d['oid1'],16)) + r.append("%x"%int(d['oid2'],16)) + return r + + + def setEvtCommon(self,ec): + ecnames=[('PandaID','a'),('PanDA_TaskID','b'),("AMITag",'c'),("ProjName",'d'),("TrigStream",'e')] + ecn = dict(ecnames) + self._ec_next={} # valid starting from next append + for k,v in ec.iteritems(): + self._ec_next[ecn[k]]=v + + + def append(self,e): + + if self._ec_next is not None: + self._ec = self._ec_next + self._ec_next = None + + evt=dict(zip(self._schema, e)) + + # fill and translate field names + e={} + e["r"]=int(evt['RunNumber']) + e["e"]=int(evt['EventNumber']) + e['l']=int(evt['LumiBlockN']) + e['t']=int(evt['EventTime']) + e['u']=int(evt['EventTimeNanoSec']) + e['w']=float(evt['EventWeight']) + e['j']=int(evt['McChannelNumber']) + e['i']=int(evt['BunchId']) + e['d']=int(evt['Lvl1ID']) + + e['i0']=int(evt['IsSimulation']) + e['i1']=int(evt['IsCalibration']) + e['i2']=int(evt['IsTestBeam']) + + # trigger + if self._inctrigger: + e["t1"]=evt['L1PassedTrigMask'] + e["t2"]=evt['L2PassedTrigMask'] + e["te"]=evt['EFPassedTrigMask'] + if 'SMK' in evt and evt['SMK'] is not None: + e['m']=int(evt['SMK']) + else: + e['m']=0 + if 'HLTPSK' in evt and evt['HLTPSK'] is not None: + e['n']=int(evt['HLTPSK']) + else: + e['n']=0 + if 'L1PSK' in evt and evt['L1PSK'] is not None: + e['o']=int(evt['L1PSK']) + else: + e['o']=0 + + # get refrences from SnamN, SrefN + for idx in range(4): + if idx == 0 or self._incprovenance: + ref="Sref%d"%idx + nam="Snam%d"%idx + if evt[ref] is not None: + tkc = self._tk2components(evt[ref]) + if tkc is None: + continue + for letter,component in zip (['a','b','c','d','e','f'],tkc): + e[letter+str(idx)] = component + e['x%d'%idx]=evt[nam] + + + + # substitute None by 0 + for k,v in e.iteritems(): + if v is None: + e[k]=0 + + # discard redundant fields + e2=e.copy() + for k,v in e.iteritems(): + if k in self._lastevt and v == self._lastevt[k] == v: + e2.pop(k,None) + self._lastevt=e + self._evtlist.append(e2) + + # return lenght + return len(json.dumps(self._evtlist,separators=(',', ':'))) + + + def setIncTrigger(self,trig): + self._inctrigger=trig + + def setIncProvenance(self,provenance): + self._incprovenance=provenance + + def getEvtBlock(self): + + if len(self._evtlist) == 0: + return None + + # block: header,options,data + # header: type=10,version=1 + # options: empty now + # data: evt_common, evt_list + + bh={'t':10,'v':1} # block header + bo={} # block options + bd={} # block data + + # fill data + bd["ec"]=self._ec + bd['el']=self._evtlist + + # build block + eb={} + eb['h']=bh # header + eb['o']=bo # options + eb['d']=bd # data + + self._evtlist=[] # empty list + self._lastevt=[] + return eb + + +class MSG(object): + + def __init__(self, opt): + + self.brokers = opt.endpoint + self.user = opt.user + self.passcode = opt.passcode + self.ssl = opt.ssl + self.keyfile = opt.keyfile + self.certfile = opt.certfile + self.queue = opt.queue + self.queue2 = opt.queue2 + self.verbose = opt.verbose + self.dummy = opt.dummy + + self.conn = None + self.seq = 0 + self.blocks = [] + self.transactionID = None + + def isConnected(self): + if self.conn is None: + return False + else: + return True + + def connect(self): + #print "MSG::connect" + + # prepare for connection + self.conn = None + self.blocks = [] + self.seqid = uuid.uuid4().hex # define a new identifier for this connection + + + if self.dummy: + return + + log.debug("Broker IPs and ports to connect:") + for broker in self.brokers: + log.debug(" %15s %s"%broker) + + conn=None + # build options + copts={'host_and_ports':self.brokers,'wait_on_receipt':True} + if self.user is not None and self.passcode is not None: + log.debug("Conecting broker using user and password") + copts['user'] = self.user + copts['passcode'] = self.passcode + if self.ssl: + log.debug("Conecting broker using SSL") + copts['use_ssl'] = True + copts['ssl_key_file'] = self.keyfile + copts['ssl_cert_file'] = self.certfile + + # connect + conn = stomp.Connection(**copts) + conn.set_listener('', MyListener()) + conn.start() + try: + conn.connect(wait=True) + except stomp.exception.ConnectFailedException, e: + # pass error to caller + raise + self.conn=conn + + + def close(self): + if self.conn is not None: + self.conn.stop() + self.conn = None + + def addBlock(self,block): + self.blocks.append(block) + + def sendMSGstats(self,msg): + + log.debug("Sending stats message: %s", msg) + if not self.dummy: + self.conn.send(message=msg, destination=self.queue2) + if self.verbose > 2: + print >> sys.stderr, msg + + def sendMSG(self,last=False): + + + ## build message + tnow=int(time.time() * 1000) + mh={} # header + mo={} # options + md=[] # data + + mh["v"] = 1 # version + mh["s"] = self.seq # header + mh["t"] = tnow # time + mh["i"] = self.seqid # sequence id + if last: + mh['e'] = True # last message in sequence + + # data (payload) + md=self.blocks + + # message + oo = {"h":mh,"o":mo,"d":md} + msg = json.dumps(oo,separators=(',', ':')) + + + # send message + log.debug("Sending message: %s:%-5d len: %d",self.seqid,self.seq,len(msg)) + + if not self.dummy: + if self.seq == 0: + self.transactionID=self.conn.begin() # start transaction before first message is sent + self.conn.send(message=msg, destination=self.queue, transaction=self.transactionID, + JMSXGroupID=self.seq) + + if self.verbose > 2: + print >> sys.stderr, msg + + self.seq += 1 # increment sequence for nex message + + if last: + if self.conn is not None: + self.conn.commit(transaction=self.transactionID) # commit transaction + + self.blocks=[] + + return len(msg) + +########################### +# listener +class MyListener(object): + def on_error(self, headers, message): + print >> sys.stderr, 'received an error' + print >> sys.stderr, " headers: ", headers + print >> sys.stderr, " message: ", message + + def on_message(self, headers, message): + print >> sys.stderr, 'received a message' + print >> sys.stderr, " headers: ", headers + print >> sys.stderr, " message: ", message + + +def endpointV(endpoint): + # default port + dport=60013 + + # enpoint should be broker1[:port1],broker2:[port2],... + lbroker=endpoint.split(',') + result=[] + for b in lbroker: + try: + (host,port)=b.split(":") + except: + host=b + port=dport + try: + port=int(port) + except: + log.error("Invalid port %s",port) + continue + #raise Exception("Invalid port %s",port) + try: + (h, a, ip)=socket.gethostbyname_ex(host) + except: + log.error("Host can not be resolved %s",host) + #raise Exception("Invalid host %s",host) + continue + for addr in ip: + result.append((addr,port)) + + return result + + +def options(argv): + + parser = argparse.ArgumentParser(description='SendEI to STOMP broker.') + parser.add_argument('-e','--endpoint', default='localhost:61613', type=endpointV, + help="broker name and port") + parser.add_argument('-m','--evtmax', default=0, type=int, help='Max events to process') + parser.add_argument('-s','--msize', default=10000, type=int, help='message size') + parser.add_argument('-q','--queue', default='/queue/atlas.eventindex', help="broker queue name") + parser.add_argument( '--queue2', default=None, help="broker queue name for statistics") + parser.add_argument('-u','--user',default=None, help="Stomp user name") + parser.add_argument('-k','--passcode',default=None,help="Stomp passcode") + parser.add_argument('-v','--verbose', action='count', help='Verbosity level') + parser.add_argument('-d','--debug', action='count', help='Debug') + parser.add_argument("-t","--trigger", action='store_true', help="Include trigger information (default: false)") + parser.add_argument("-p","--provenance", action='store_true', help="Include provenance information (default: true)") + parser.add_argument("--no-trigger", action='store_false', dest='trigger', help="Do not include trigger information") + parser.add_argument("--no-provenance", action='store_false', dest='provenance', help="Do not include provenance information") + parser.add_argument("-n","--dummy", action='store_true', default=False, help="Do not send messages and do not connect") + parser.add_argument("--ssl", action='store_true', default=False,help="SLL connection") + parser.add_argument('--keyfile',default=None,help="SSL Private Key file") + parser.add_argument('--certfile',default=None,help="SSL Cert file") + + parser.add_argument('eifile', help="EventIndex file") + + # default for trigger and provenance + parser.set_defaults(trigger=False) + parser.set_defaults(provenance=True) + + # parse args + opt=parser.parse_args() + + if opt.queue2 is None: + opt.queue2 = opt.queue+"2" + + if opt.ssl: + if opt.keyfile is None or opt.certfile is None: + log.error("keyfile and certfile must be specified to use SSL") + sys.exit(1) + + if opt.user is None or opt.passcode is None: + if not(opt.user is None and opt.passcode is None): + log.error("Both, user and passcode must be specified or none") + sys.exit(1) + + return opt + + + +def main(): + + # logger + global log + log = logging.getLogger('sendEI.py') + + # analyze options + opt = options(sys.argv) + + if opt.verbose > 0: + log.setLevel(logging.INFO) + + if opt.debug > 0: + log.setLevel(logging.DEBUG) + + # open EI file + fname = opt.eifile + if not (path.isfile(fname) and access(fname, R_OK)): + log.error("Event Index file %s does not exists or is not readble"%fname) + sys.exit(1) + try: + eif = EIFile(fname) + except: + log.error("Unable to get info from EI file %s"%fname) + sys.exit(1) + + + # stats + stats.ntot=0 + stats.tot_size=0 + stats.start_time=0 + stats.end_time=0 + stats.nmsg=0 + + # check supported versions of EI file + if int(eif['Version']) > 1: + log.error("Unable to process EI file version %s" % eif['Version']) + sys.exit(1) + + + # dump info if verbose + if opt.verbose >0: + log.info("Event Index file contents") + log.info("-------------------------") + log.info(" Version: %s" % eif['Version']) + log.info(" Schema: %s" % eif['Schema']) + log.info(" #input files: %s" % eif['Nfiles']) + log.info(" Total number of events: %s" % eif['Nentries']) + log.info(" StartProcTime: %s" % eif['StartProcTime']) + log.info(" EndProcTime: %s" % eif['EndProcTime']) + log.info(" PandaID: %s" % eif['PandaID']) + log.info(" PanDA_TaskID: %s" % eif['PanDA_TaskID']) + log.info(" Includes Provenance: %s" % eif['ProvenanceRef']) + log.info(" Includes Trigger: %s" % eif['TriggerInfo']) + + + for i in xrange(eif['Nfiles']): + log.info("") + log.info("File %d"%i) + if 'Nentries_%d'%i in eif: + log.info(" Events in this file: %s" % eif['Nentries_%d'%i]) + log.info(" StartProcTime: %s" % eif['StartProcTime_%d'%i]) + if 'EndProcTime_%d'%i in eif: + log.info(" EndProcTime: %s" % eif['EndProcTime_%d'%i]) + else: + log.info(" EndProcTime: %s" % eif['EndProcTime']) + if "AMITag" in eif: + log.info(" AMITag: %s" % eif['AMITag_%d'%i]) + if "TrigStream_%d"%i in eif: + log.info(" TrigStream: %s" % eif['TrigStream_%d'%i]) + if "ProjName_%d"%i in eif: + log.info(" ProjName: %s" % eif['ProjName_%d'%i]) + + log.info("") + + + # check compatibility of options + if opt.provenance and not eif['ProvenanceRef']: + log.error("Unable to send provenance information since it is missing from EI file") + sys.exit(1) + if opt.trigger and not eif['TriggerInfo']: + log.error("Unable to send trigger information since it is missing from EI file") + sys.exit(1) + + + schema = eif['Schema'] + ei5=EI5(schema) + + #pass options + ei5.setIncTrigger(opt.trigger) + ei5.setIncProvenance(opt.provenance) + + # connect to broker + stats.start_time=int(time.time() * 1000) + mbroker=MSG(opt) + try: + mbroker.connect() + except stomp.exception.ConnectFailedException, e: + log.error("Unable to connect to stomp broker") + sys.exit(1) + + + nfiles = eif['Nfiles'] + # new event iterator + if opt.evtmax != 0: + eifiter=eif.iterE(opt.evtmax) # limit number of events to get + else: + eifiter=eif.iterE() # all events + + + events_tot = eif['Nentries'] # total number of events + proc_events_tot = 0 # total events procesed + proc_events_bunch = 0 # events procesed in bunch + + # get list of files + fdata=[] + for nf in xrange(nfiles): + if 'Nentries_%d'%nf in eif: + nevents = eif['Nentries_%d'%nf] + else: + nevents = events_tot + + # new common values + evtcommon={} + if "AMITag_%d"%nf in eif: + evtcommon['AMITag']=eif['AMITag_%d'%nf] + if "ProjName_%d"%nf in eif: + evtcommon['ProjName']=eif['ProjName_%d'%nf] + if "TrigStream_%d"%nf in eif: + evtcommon['TrigStream']=eif['TrigStream_%d'%nf] + + evtcommon['PandaID']=eif['PandaID'] + evtcommon['PanDA_TaskID']=eif['PanDA_TaskID'] + fdata.append((nevents,evtcommon)) + + i = 0 + nf = 0 + log.info("Processing file %d"%nf) + (nevents,evtcommon) = fdata.pop(0) + ei5.setEvtCommon(evtcommon) + last_in_file = nevents + if opt.evtmax != 0: + last_event = opt.evtmax + else: + last_event = events_tot + if last_in_file > last_event: + last_in_file = last_event + force_send = False + while True: + # get event + try: + evt = eifiter.next() + except: + evtlast=True # no more events + break + + i += 1 + stats.ntot += 1 + + sz = ei5.append(evt) # append event to block + + # last event in input file ? + if i == last_in_file: + log.info("Last event in file %d"%nf) + nf += 1 + # prepare for next file + try: + (nevents,evtcommon) = fdata.pop(0) + except: + (nevents,evtcommon) = (0,{}) + last_in_file += nevents + if last_in_file > last_event: + last_in_file = last_event + ei5.setEvtCommon(evtcommon) + force_send=True + + # send block is greather enough or force_send + if sz >= opt.msize or force_send: + #print "sendig buffer" + force_send = False + blk = ei5.getEvtBlock() + mbroker.addBlock(blk) + if i == last_event: + #print "LAST" + msz = mbroker.sendMSG(last=True) + else: + msz = mbroker.sendMSG() + stats.nmsg += 1 + stats.tot_size += msz + + + # at this point, buffer should be empty + blk = ei5.getEvtBlock() + if blk is not None: + log.error("Buffer is not empty") + stats.end_time=int(time.time() * 1000) + + # send stats to alternate queue + # PandaID,PanDA_TaskID,start_time,end_time,#evts,#msg,totsize + msgst="EISTATS0;%s;%s;%s;%s;%s;%s;%s"%( + eif['PandaID'],eif['PanDA_TaskID'], + stats.start_time,stats.end_time, + stats.ntot,stats.nmsg,stats.tot_size) + mbroker.sendMSGstats(msgst) + + mbroker.close() + + if opt.verbose > 0: + log.info("======== summary") + log.info(" number of events: %10d" % stats.ntot) + log.info(" Sun of message sizes: %10d bytes" % stats.tot_size) + log.info(" mean size per evt: %10.1f bytes" % (float(stats.tot_size)/int(stats.ntot))) + log.info(" number of messages: %10d " % stats.nmsg) + log.info(" mean message size: %10.2f Kbytes" % (float(stats.tot_size)/int(stats.nmsg)/1000.)) + log.info(" mean evts per msg: %10.2f" % (float(stats.ntot)/int(stats.nmsg))) + dt = int(stats.end_time - stats.start_time) + log.info(" connected time: %d ms" % dt) + log.info(" BW %10.2f KB/s" % (float(stats.tot_size)/(dt))) + log.info(" BW %10.2f msg/s" % (float(stats.nmsg)/(dt)*1000)) + log.info(" BW %10.2f evt/s" % (float(stats.ntot)/(dt)*1000)) + + +if __name__ == '__main__': + + # logger setup + root = logging.getLogger() + ch = logging.StreamHandler(sys.stdout) + ch.setLevel(logging.DEBUG) + #formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + formatter = logging.Formatter('%(name)-15s %(levelname)9s %(message)s') + ch.setFormatter(formatter) + root.addHandler(ch) + + main() + + + + diff --git a/Database/EventIndex/EventIndexProducer/scripts/unpickEI.py b/Database/EventIndex/EventIndexProducer/scripts/unpickEI.py new file mode 100755 index 0000000000000000000000000000000000000000..9aca343b6280fe04e7238ffb957f4b1510fc60ff --- /dev/null +++ b/Database/EventIndex/EventIndexProducer/scripts/unpickEI.py @@ -0,0 +1,161 @@ +#!/bin/env python + +# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + +import sys +import PyUtils.dbsqlite as dbsqlite +from EventIndexProducer.compressB64 import decompressB64 +import argparse + + +def options(argv): + + parser = argparse.ArgumentParser(description='Unpick Event Index File') + parser.add_argument('-e','--evtnumber', default=0, type=int, help='event number to dump') + parser.add_argument('-r','--runnumber', default=0, type=int, help='run number to dump') + parser.add_argument('-m','--evtmax', default=1, type=int, help='Max number of events to dump') + parser.add_argument('--decodetrig0',action='count', help='decode trigger. only bits fired') + parser.add_argument('--decodetrig1',action='count', help='decode trigger. Full bit stream') + parser.add_argument('eifile', help="EventIndex file") + + return parser.parse_args() + +def main(): + + # analyze options + opt = options(sys.argv) + + fname = opt.eifile + try: + db = dbsqlite.open(fname,flags='r') + except: + print >> sys.stderr,"Unable to open peeker file %s" % fname + sys.exit(1) + + try: + nentries=db['Nentries'] + except: + print >> sys.stderr,"Unable to get nentries from EI file %s" % fname + sys.exit(1) + + + #print "Stored keys: " + #print " ",db.keys() + + print "" + print " version: ", db['Version'] + print " schema: ", db['Schema'] + print " #input files: ", db['Nfiles'] + print " number of events: ", nentries + print " StartProcTime: ", db['StartProcTime'] + print " EndProcTime: ", db['EndProcTime'] + print " PandaID: ", db['PandaID'] + print " PanDA_TaskID: ", db['PanDA_TaskID'] + print " Includes Provenance: ", db['ProvenanceRef'] + print " Includes Trigger: ", db['TriggerInfo'] + + for i in xrange(db['Nfiles']): + print "" + print "File %d"%i + if 'Nentries_%d'%i in db: + print " Events in this file: %s" % db['Nentries_%d'%i] + print " StartProcTime: ", db['StartProcTime_%d'%i] + if 'EndProcTime_%d'%i in db: + print " EndProcTime: ", db['EndProcTime_%d'%i] + else: + print " EndProcTime: ", db['EndProcTime'] + if "AMITag_%d"%i in db: + print " AMITag: ", db['AMITag_%d'%i] + if "TrigStream_%d"%i in db: + print " TrigStream: ", db['TrigStream_%d'%i] + if "ProjName_%d"%i in db: + print " ProjName: ", db['ProjName_%d'%i] + + print "" + print "Summary: (info for %d events max)" % opt.evtmax + print "" + + schema = db['Schema'] + evtnumber_idx = schema.index('EventNumber') + runnumber_idx = schema.index('RunNumber') + + nevt_shown=0 + if opt.evtnumber == 0: + sw0=True + else: + sw0=False + for i in xrange(nentries): + evt = db['Entry_%d'%i] + data = zip(schema,evt) + + if opt.evtnumber == evt[evtnumber_idx]: + if opt.runnumber == 0 or opt.runnumber == evt[runnumber_idx]: + sw0 = True + + if not sw0: + continue + + if nevt_shown >= opt.evtmax: + break + + nevt_shown += 1 + + trigL1=None + trigL2=None + trigEF=None + + print "%d ============================================================"%nevt_shown + for k,v in data: + print " %-20s: %s" % (k,v) + if k in "L1PassedTrigMask','L2PassedTrigMask','EFPassedTrigMask": + ## trigger + if v is not None and v != '': + v = decompressB64(v) + if opt.decodetrig1 > 0: + print " %-20s: %s" % (k,v) + if k == "L1PassedTrigMask": + trigL1=v + if k == "L2PassedTrigMask": + trigL2=v + if k == "EFPassedTrigMask": + trigEF=v + + #trigger bits fired + + if trigL1 is not None: + for i in xrange(8): + tw = trigL1[32*i:32*(i+1)][::-1] + v = int(tw,2) + if v != 0 and opt.decodetrig0 > 0: + print " L1PassedTrigMaskTBP%d = %d (0x%08x)"%(i, v, v) + for i in xrange(8,16): + tw = trigL1[32*i:32*(i+1)][::-1] + v = int(tw,2) + if v != 0 and opt.decodetrig0 > 0: + print " L1PassedTrigMaskTAP%d = %d (0x%08x)"%(i-8, v, v) + for i in xrange(16,24): + tw = trigL1[32*i:32*(i+1)][::-1] + v = int(tw,2) + if v != 0 and opt.decodetrig0 > 0: + print " L1PassedTrigMaskTAV%d = %d (0x%08x)"%(i-16, v, v) + + if trigL2 is not None: + for i in xrange(len(trigL2)/32): + tw = trigL2[32*i:32*(i+1)][::-1] + v = int(tw,2) + if v != 0 and opt.decodetrig0 > 0: + print " L2PassedTrigMask%d = %d (0x%08x)"%(i, v, v) + + if trigEF is not None: + for i in xrange(len(trigEF)/32): + tw = trigEF[32*i:32*(i+1)][::-1] + v = int(tw,2) + if v != 0 and opt.decodetrig0 > 0: + print " EFPassedTrigMask%d = %d (0x%08x) "%(i, v, v) + + + db.close() + +if __name__ == '__main__': + main() + diff --git a/Database/EventIndex/EventIndexProducer/share/POOL2EI.py b/Database/EventIndex/EventIndexProducer/share/POOL2EI.py new file mode 100755 index 0000000000000000000000000000000000000000..2e3d298662e5619397758e1baf3755d33b135311 --- /dev/null +++ b/Database/EventIndex/EventIndexProducer/share/POOL2EI.py @@ -0,0 +1,39 @@ +#!/bin/env python + +import os +# prevent from running athena-mp in child processes +os.putenv('ATHENA_PROC_NUMBER','0') + +# prevent from running athena in interactive mode (and freeze) +if 'PYTHONINSPECT' in os.environ: + del os.environ['PYTHONINSPECT'] + +try: + FNAME = In +except: + FNAME = ['../1/AOD.01234377._000003.pool.root.1'] + + +include("PyAnalysisCore/InitPyAnalysisCore.py") +include('EventIndexProducer/POOL2EI_joboptions.py') + +try: + svcMgr.MessageSvc.OutputLevel = Level + job.pool2ei.OutputLevel = Level +except: + svcMgr.MessageSvc.OutputLevel = INFO + job.pool2ei.OutputLevel = INFO + +try: + theApp.EvtMax = EvtMax +except: + theApp.EvtMax = -1 + +try: + job.pool2ei.Out = Out +except: + job.pool2ei.Out = 'pool2ei.ei.pkl' + +# +job.pool2ei.DoProvenanceRef = True +job.pool2ei.DoTriggerInfo = True diff --git a/Database/EventIndex/EventIndexProducer/share/POOL2EI_joboptions.py b/Database/EventIndex/EventIndexProducer/share/POOL2EI_joboptions.py new file mode 100644 index 0000000000000000000000000000000000000000..3cfbc911c10b8089771eb6183d00dad692a64974 --- /dev/null +++ b/Database/EventIndex/EventIndexProducer/share/POOL2EI_joboptions.py @@ -0,0 +1,83 @@ +## @file: POOL2EI_joboptions.py +## @purpose: simple joboptions to convert AOD to EI +## @date Feb 2014 +## @author Javier Sanchez <Javier.Sanchez@ific.uv.es> + +__version__ = "$Revision$" +__author__ = "Javier Sanchez <Javier.Sanchez@ific.uv.es>" + + +from AthenaCommon.AppMgr import theApp +from AthenaCommon.AppMgr import ServiceMgr as svcMgr + +from AthenaCommon.AlgSequence import AlgSequence +job = AlgSequence() + +### Event selector +import AthenaPoolCnvSvc.ReadAthenaPool + +# algorithm +from EventIndexProducer.POOL2EI_Lib import POOL2EI +pool2ei = POOL2EI('pool2ei', OutputLevel=Lvl.INFO) +job += pool2ei + +# service +from EventIndexProducer.POOL2EI_Lib import POOL2EISvc +pool2eisvc = POOL2EISvc(algo=pool2ei) +svcMgr += pool2eisvc +theApp.CreateSvc += [pool2eisvc.getFullJobOptName()] + + +#-------------------------------------------------------------- +# Message service output level threshold +# (1=VERBOSE, 2=DEBUG, 3=INFO, 4=WARNING, 5=ERROR, 6=FATAL ) +#-------------------------------------------------------------- +try: + svcMgr.MessageSvc.OutputLevel = Level +except: + svcMgr.MessageSvc.OutputLevel = INFO + + +#-------------------------------------------------------------- +# Number of events to write +#-------------------------------------------------------------- +try: + theApp.EvtMax = EvtMax +except: + theApp.EvtMax = -1 + + +#-------------------------------------------------------------- +# Input collection name +#-------------------------------------------------------------- +try: + svcMgr.EventSelector.InputCollections = In +except: + svcMgr.EventSelector.InputCollections = [ "test.root" ] + +#-------------------------------------------------------------- +# Output Event Index file name +#-------------------------------------------------------------- +try: + job.pool2ei.Out = Out +except: + import os + job.pool2ei.Out = 'pool2ei.%08i.pkl' % os.getpid() + + +#-------------------------------------------------------------- +# Output Provenance references flag +#-------------------------------------------------------------- +try: + job.pool2ei.DoProvenanceRef = DoProvenanceRef +except: + job.pool2ei.DoProvenanceRef = False + +#-------------------------------------------------------------- +# Output Trigger Info flag +#-------------------------------------------------------------- +try: + job.pool2ei.DoTriggerInfo = DoTriggerInfo +except: + job.pool2ei.DoTriggerInfo = False + diff --git a/Database/EventIndex/EventIndexProducer/share/skeleton.POOLtoEI_tf.py b/Database/EventIndex/EventIndexProducer/share/skeleton.POOLtoEI_tf.py new file mode 100644 index 0000000000000000000000000000000000000000..60d398431fda32f9a41d8be53f31c9a2a285193c --- /dev/null +++ b/Database/EventIndex/EventIndexProducer/share/skeleton.POOLtoEI_tf.py @@ -0,0 +1,67 @@ +############################################################### +# +#============================================================== + + +import logging +eiLog = logging.getLogger('pool_to_ei') +eiLog.info( '****************** STARTING POOL->EI MAKING *****************' ) + + +## Input + +if hasattr(runArgs,"inputPOOLFile"): + In=runArgs.inputPOOLFile + if type(In) == type(''): + In = [In] + +## Output + +if hasattr(runArgs,"outputEIFile"): + Out=runArgs.outputEIFile + +# options + +if hasattr(runArgs,"maxEvents"): + EvtMax=runArgs.maxEvents + +if hasattr(runArgs,"trigger"): + DoTriggerInfo = runArgs.trigger +else: + DoTriggerInfo = False + +if hasattr(runArgs,"provenance"): + DoProvenanceRef = runArgs.provenance +else: + DoProvenanceRef = True + + +# load joboptions +#include("PyAnalysisCore/InitPyAnalysisCore.py") +include('EventIndexProducer/POOL2EI_joboptions.py') + + +## Pre-exec +if hasattr(runArgs,"preExec"): + eiLog.info("transform pre-exec") + for cmd in runArgs.preExec: + eiLog.info(cmd) + exec(cmd) + +## Pre-include +if hasattr(runArgs,"preInclude"): + for fragment in runArgs.preInclude: + include(fragment) + +## Post-include +if hasattr(runArgs,"postInclude"): + for fragment in runArgs.postInclude: + include(fragment) + +## Post-exec +if hasattr(runArgs,"postExec"): + eiLog.info("transform post-exec") + for cmd in runArgs.postExec: + eiLog.info(cmd) + exec(cmd) +