Commit 37357379 authored by Marco Clemencic's avatar Marco Clemencic
Browse files

Adding concurrent incident support and example utilization.

`GaudiExamples/options/AsyncIncidents.py` demonstrates example utilization of asynchronous incident mechanism.

Fixes GAUDI-1213

See merge request !181
parents 7ef9a38c 23a78f71
#include "GaudiKernel/IIncidentSvc.h"
#include "GaudiKernel/IIncidentListener.h"
#include "GaudiKernel/Incident.h"
#include "IncidentProcAlg.h"
DECLARE_COMPONENT(IncidentProcAlg)
#define ON_DEBUG if (msgLevel(MSG::DEBUG))
#define ON_VERBOSE if (msgLevel(MSG::VERBOSE))
#define DEBMSG ON_DEBUG debug()
#define VERMSG ON_VERBOSE verbose()
namespace
{
// ==========================================================================
static const std::string s_unknown = "<unknown>" ;
// Helper to get the name of the listener
inline const std::string& getListenerName ( IIncidentListener* lis )
{
SmartIF<INamedInterface> iNamed(lis);
return iNamed ? iNamed->name() : s_unknown ;
}
// ==========================================================================
}
IncidentProcAlg::IncidentProcAlg(const std::string& name ,
ISvcLocator* pSvcLocator )
:Algorithm(name,pSvcLocator)
{
}
StatusCode IncidentProcAlg::initialize() {
StatusCode sc = Algorithm::initialize();
if (sc.isFailure()) return sc;
m_incSvc = service("IncidentSvc",true);
return StatusCode::SUCCESS;
}
//=============================================================================
StatusCode IncidentProcAlg::execute() {
auto incPack=m_incSvc->getIncidents(getContext());
MsgStream log(msgSvc(), name());
log<<MSG::DEBUG<<" Number of Incidents to process = "<<incPack.incidents.size()<<" Context= "<<getContext()<<endmsg;
while(incPack.incidents.size()){
if(incPack.incidents.size()!=incPack.listeners.size()){
log<<MSG::WARNING<<" Size of fired incidents and listeners do not match!"<<endmsg;
}
for(size_t t=0;t<incPack.incidents.size();t++){
auto &inc=incPack.incidents.at(t);
auto &lis=incPack.listeners.at(t);
for( auto& l:lis){
log<<MSG::DEBUG << "Calling '" << getListenerName(l)
<< "' for incident [" << inc->type() << "]" << endmsg;
// handle exceptions if they occur
try {
l->handle(*inc);
}
catch( const GaudiException& exc ) {
error() << "Exception with tag=" << exc.tag() << " is caught"
" handling incident" << inc->type() << endmsg;
error() << exc << endmsg;
}
catch( const std::exception& exc ) {
error() << "Standard std::exception is caught"
" handling incident" << inc->type() << endmsg;
error() << exc.what() << endmsg;
}
catch(...) {
error() << "UNKNOWN Exception is caught handling incident" << inc->type() << endmsg;
}
// check wheter one of the listeners is singleShot
}
}
incPack=std::move(m_incSvc->getIncidents(getContext()));
}
return StatusCode::SUCCESS;
}
//=============================================================================
StatusCode IncidentProcAlg::finalize() {
info() << "Finalize" << endmsg;
return Algorithm::finalize();
}
#ifndef __INCIDENT_PROC_ALG_H
#define __INCIDENT_PROC_ALG_H
#include "GaudiKernel/Algorithm.h"
#include "GaudiKernel/Property.h"
#include "GaudiKernel/MsgStream.h"
#include "GaudiKernel/IIncidentSvc.h"
class GAUDI_API IncidentProcAlg: public Algorithm {
public:
/**
** Constructor(s)
**/
IncidentProcAlg( const std::string& name, ISvcLocator* pSvcLocator );
/**
** Destructor
**/
virtual ~IncidentProcAlg( ){};
/*****************************
** Public Function Members **
*****************************/
StatusCode initialize();
StatusCode execute();
StatusCode finalize();
private:
SmartIF<IIncidentSvc> m_incSvc;
/**************************
** Private Data Members **
**************************/
};
#endif
......@@ -255,8 +255,17 @@ void IncidentSvc::fireIncident( const Incident& incident )
i_fireIncident(incident, "ALL");
}
}
// ============================================================================
void IncidentSvc::fireIncident( std::unique_ptr<Incident> incident )
{
DEBMSG<<"Async incident '"<<incident->type()<<"' fired on context "<<incident->context()<<endmsg;
auto ctx=incident->context();
auto res=m_firedIncidents.insert(std::make_pair(ctx,IncQueue_t()));
res.first->second.push(incident.release());
}
// ============================================================================
void
IncidentSvc::getListeners(std::vector<IIncidentListener*>& l,
const std::string& type) const
......@@ -276,6 +285,26 @@ IncidentSvc::getListeners(std::vector<IIncidentListener*>& l,
}
}
// ============================================================================
IIncidentSvc::IncidentPack IncidentSvc::getIncidents(const EventContext* ctx){
IIncidentSvc::IncidentPack p;
if(ctx){
auto incs=m_firedIncidents.find(*ctx);
if(incs!=m_firedIncidents.end()){
Incident* inc(0);
DEBMSG<<"Collecting listeners fired on context "<<*ctx<<endmsg;
while(incs->second.try_pop(inc)){
std::vector<IIncidentListener*> ls;
getListeners(ls,inc->type());
p.incidents.emplace_back(std::move(inc));
p.listeners.emplace_back(std::move(ls));
}
}
}
return std::move(p);
}
// ============================================================================
// The END
// ============================================================================
......@@ -17,6 +17,11 @@
#include "GaudiKernel/HashMap.h"
#include "GaudiKernel/ChronoEntity.h"
// ============================================================================
// TBB
// ============================================================================
#include "tbb/concurrent_queue.h"
#include "tbb/concurrent_unordered_map.h"
// ============================================================================
/**
* @class IncidentSvc
* @brief Default implementation of the IIncidentSvc interface.
......@@ -68,6 +73,7 @@ public:
void removeListener( IIncidentListener* l, const std::string& type = "" ) override;
void fireIncident( const Incident& incident) override;
void fireIncident( std::unique_ptr<Incident> incident) override;
//TODO: return by value instead...
void getListeners (std::vector<IIncidentListener*>& lis,
const std::string& type = "") const override;
......@@ -76,6 +82,7 @@ public:
IncidentSvc( const std::string& name, ISvcLocator* svc );
// Destructor.
~IncidentSvc() override;
IIncidentSvc::IncidentPack getIncidents(const EventContext* ctx);
private:
ListenerMap::iterator removeListenerFromList(ListenerMap::iterator,
......@@ -85,6 +92,7 @@ private:
/// Internal function to allow incidents listening to all events
void i_fireIncident(const Incident& incident, const std::string& type);
/// List of auditor names
ListenerMap m_listenerMap;
......@@ -99,6 +107,10 @@ private:
mutable ChronoEntity m_timer ;
mutable bool m_timerLock = false ;
// ==========================================================================
// When TBB supports unique_ptrs in concurrent queue typedef should be changed
//typedef tbb::concurrent_queue<std::unique_ptr<Incident>> IncQueue_t;
typedef tbb::concurrent_queue<Incident*> IncQueue_t;
tbb::concurrent_unordered_map<EventContext,IncQueue_t,EventContextHash,EventContextHash> m_firedIncidents;
};
// ============================================================================
......
#!/usr/bin/env gaudirun.py
#
# Copied from ForwardSchedulerControlFlowTest.py to setup asynchronous
# incident service test and example
#
#
from Gaudi.Configuration import *
from Configurables import HiveWhiteBoard, HiveSlimEventLoopMgr, ForwardSchedulerSvc, CPUCruncher,AlgResourcePool,IncidentProcAlg,IncidentSvc,IncidentAsyncTestSvc,IncidentAsyncTestAlg
from Configurables import GaudiSequencer
msgFmt = "% F%40W%S%4W%s%e%15W%X%7W%R%T %0W%M"
msgSvc = InertMessageSvc("MessageSvc",OutputLevel=INFO)
msgSvc.Format = msgFmt
ApplicationMgr().SvcMapping.append(msgSvc)
IncidentSvc(OutputLevel=DEBUG)
# metaconfig
evtslots = 5
evtMax = 20
cardinality=10
algosInFlight=10
whiteboard = HiveWhiteBoard("EventDataSvc",
EventSlots = evtslots)
slimeventloopmgr = HiveSlimEventLoopMgr(OutputLevel=INFO)
scheduler = ForwardSchedulerSvc(MaxEventsInFlight = evtslots,
MaxAlgosInFlight = algosInFlight,
OutputLevel=DEBUG)
AlgResourcePool(OutputLevel=DEBUG)
# Async Incident svc processing algs to be added at the beginning and
# at the end of event graph to process incidents fired in the context
# of the given event
EventLoopInitProcAlg=IncidentProcAlg("EventLoopInitProcessingAlg",
OutputLevel=DEBUG)
EventLoopFinalProcAlg=IncidentProcAlg("EventLoopFinalProcessingAlg",
OutputLevel=DEBUG)
# add two dummy incident aware services that register
AITestSvc1=IncidentAsyncTestSvc("IncidentAwareService1",
FileOffset=10000000,
EventMultiplier=1000,
IncidentNames=['BeginEvent','EndEvent','BeginRun','EndRun','AbortEvent'],
Priority=1000)
AITestSvc2=IncidentAsyncTestSvc("IncidentAwareService2",
FileOffset=10000000*100,
EventMultiplier=1000*1000,
IncidentNames=['BeginEvent','EndEvent'],
Priority=1)
AITestAlg1=IncidentAsyncTestAlg("IncidentAwareTestAlg1",
ServiceName="IncidentAwareService1",
IsClonable=True,
Cardinality=5,
inpKeys=['/Event/DAQ/ODIN']
)
AITestAlg2=IncidentAsyncTestAlg("IncidentAwareTestAlg2",
ServiceName="IncidentAwareService2",
IsClonable=False,
Cardinality=1,
inpKeys=['/Event/Hlt/DecReports']
)
FakeInput = CPUCruncher("FakeInput",
outKeys = ['/Event/DAQ/ODIN','/Event/DAQ/RawEvent','/Event/Hlt/LumiSummary'],
shortCalib=True,
varRuntime=.1,
avgRuntime=.1 )
BrunelInit = CPUCruncher("BrunelInit",
inpKeys = ['/Event/DAQ/ODIN','/Event/DAQ/RawEvent'],
outKeys = ['/Event/Rec/Status', '/Event/Rec/Header'],
shortCalib=True)
PhysFilter = CPUCruncher("PhysFilter",
shortCalib=True,
inpKeys = ['/Event/Hlt/LumiSummary'])
HltDecReportsDecoder = CPUCruncher("HltDecReportsDecoder",
shortCalib=True,
inpKeys = ['/Event/DAQ/RawEvent'],
outKeys = ['/Event/Hlt/DecReports'])
HltErrorFilter = CPUCruncher("HltErrorFilter",
shortCalib=True,
inpKeys = ['/Event/Hlt/DecReports'])
sequence0 = GaudiSequencer("Sequence0")
sequence0.ModeOR = False
sequence0.ShortCircuit = False # whether the evaluation is lazy or not!
sequence0.Members += [EventLoopInitProcAlg]
sequencex = GaudiSequencer("SequenceX")
sequencex.ModeOR = False
sequencex.ShortCircuit = False # whether the evaluation is lazy or not!
sequencex.Members += [EventLoopFinalProcAlg]
sequence1 = GaudiSequencer("Sequence1")
sequence1.Members += [FakeInput,BrunelInit,PhysFilter,HltDecReportsDecoder,AITestAlg1,AITestAlg2]
sequence1.ModeOR = False
sequence1.ShortCircuit = False # whether the evaluation is lazy or not!
sequence2 = GaudiSequencer("Sequence2")
sequence2.Members += [sequence0,sequence1, HltErrorFilter,sequencex]
ApplicationMgr( EvtMax = evtMax,
EvtSel = 'NONE',
ExtSvc =[whiteboard,AITestSvc1,AITestSvc2],
EventLoop = slimeventloopmgr,
TopAlg = [sequence2],
MessageSvcType="InertMessageSvc")
#ifndef GAUDIEXAMPLES_IINCIDENTASYNCTESTSVC_H_
#define GAUDIEXAMPLES_IINCIDENTASYNCTESTSVC_H_
#include <mutex>
#include "GaudiKernel/IInterface.h"
class EventContext;
/** @class IncidentRegistryTestListener IncidentListenerTest.h
*
*/
class GAUDI_API IIncidentAsyncTestSvc: virtual public IInterface {
public:
DeclareInterfaceID(IIncidentAsyncTestSvc,1,0);
virtual void getData(uint64_t* data,EventContext* ctx=0)const =0;
};
#endif /*GAUDIEXAMPLES_IINCIDENTASYNCTESTSVC_H_*/
#include "IncidentAsyncTestAlg.h"
#include "IIncidentAsyncTestSvc.h"
#include "GaudiKernel/IIncidentSvc.h"
#include "GaudiKernel/Incident.h"
#include "GaudiKernel/DataObjectHandle.h"
#include "GaudiKernel/DataObject.h"
// Static Factory declaration
DECLARE_COMPONENT(IncidentAsyncTestAlg)
//=============================================================================
IncidentAsyncTestAlg::IncidentAsyncTestAlg(const std::string& name ,
ISvcLocator* pSvcLocator )
:Algorithm(name,pSvcLocator)
{
declareProperty("ServiceName", m_serviceName="IncTestSvc" );
declareProperty("inpKeys", m_inpKeys);
declareProperty("outKeys", m_outKeys);
}
//=============================================================================
StatusCode IncidentAsyncTestAlg::initialize() {
StatusCode sc = Algorithm::initialize();
if (sc.isFailure()) return sc;
//get service containing event data
m_service = service(m_serviceName,true);
//Copied from CPUCruncher.cpp
int i=0;
for (auto k: m_inpKeys) {
debug() << "adding input key " << k << endmsg;
m_inputObjHandles.push_back( new DataObjectHandle<DataObject>( k, Gaudi::DataHandle::Reader, this ));
declareInput(m_inputObjHandles.back());
i++;
}
i = 0;
for (auto k: m_outKeys) {
debug() << "adding output key " << k << endmsg;
m_outputObjHandles.push_back( new DataObjectHandle<DataObject>( k, Gaudi::DataHandle::Writer, this ));
declareOutput(m_outputObjHandles.back() );
i++;
}
return StatusCode::SUCCESS;
}
//=============================================================================
StatusCode IncidentAsyncTestAlg::execute() {
uint64_t data=0;
MsgStream logstream(msgSvc(), name());
for (auto & inputHandle: m_inputObjHandles){
if(!inputHandle->isValid())
continue;
DataObject* obj = nullptr;
obj = inputHandle->get();
if (obj == nullptr)
logstream << MSG::ERROR << "A read object was a null pointer." << endmsg;
}
m_service->getData(&data);
for (auto & outputHandle: m_outputObjHandles){
if(!outputHandle->isValid())
continue;
outputHandle->put(new DataObject());
}
info() << "Read data "<<data << endmsg;
return StatusCode::SUCCESS;
}
//=============================================================================
StatusCode IncidentAsyncTestAlg::finalize() {
info() << "Finalizing " << endmsg;
return Algorithm::finalize();
}
IncidentAsyncTestAlg::~IncidentAsyncTestAlg(){
for (uint i = 0; i < m_inputObjHandles.size(); ++i) {
delete m_inputObjHandles[i];
}
for (uint i = 0; i < m_outputObjHandles.size(); ++i) {
delete m_outputObjHandles[i];
}
}
#ifndef GAUDIEXAMPLES_INCIDENTASYNCTESTALG_H_
#define GAUDIEXAMPLES_INCIDENTASYNCTESTALG_H_
#include "GaudiKernel/Algorithm.h"
#include <memory>
class IIncidentSvc;
class IncidentListener;
class IIncidentAsyncTestSvc;
class IncidentAsyncTestAlg: public Algorithm
{
public:
IncidentAsyncTestAlg(const std::string& name ,
ISvcLocator* pSvcLocator );
~IncidentAsyncTestAlg() override;
StatusCode initialize() override;
StatusCode execute() override;
StatusCode finalize() override;
static std::string &incident();
private:
std::string m_serviceName;
SmartIF<IIncidentAsyncTestSvc> m_service;
std::vector<std::string> m_inpKeys, m_outKeys;
std::vector<DataObjectHandle<DataObject> *> m_inputObjHandles;
std::vector<DataObjectHandle<DataObject> *> m_outputObjHandles;
};
#endif /*GAUDIEXAMPLES_INCIDENTREGISTRYTESTALG_H_*/
#include "IncidentAsyncTestSvc.h"
#include "GaudiKernel/IService.h"
#include "GaudiKernel/ISvcLocator.h"
#include "GaudiKernel/IMessageSvc.h"
#include "GaudiKernel/IIncidentSvc.h"
#include "GaudiKernel/MsgStream.h"
#include "GaudiKernel/GaudiException.h"
#include "GaudiKernel/MsgStream.h"
#include "GaudiKernel/ISvcLocator.h"
#include "GaudiKernel/ThreadLocalContext.h"
DECLARE_COMPONENT(IncidentAsyncTestSvc)
#define ON_DEBUG if (msgLevel(MSG::DEBUG))
#define ON_VERBOSE if (msgLevel(MSG::VERBOSE))
#define DEBMSG ON_DEBUG debug()
#define VERMSG ON_VERBOSE verbose()
//=============================================================================
IncidentAsyncTestSvc::IncidentAsyncTestSvc( const std::string& name,
ISvcLocator* svcloc):base_class(name,svcloc)
{
declareProperty("FileOffset", m_fileOffset = 100000000 );
declareProperty("EventMultiplier", m_eventMultiplier = 1000 );
declareProperty("IncidentNames", m_incidentNames );
declareProperty("Priority",m_prio=0);
m_name=name;
}
IncidentAsyncTestSvc::~IncidentAsyncTestSvc(){
}
StatusCode IncidentAsyncTestSvc::initialize(){
auto sc=Service::initialize();
if(sc.isFailure())return sc;
sc=setProperties();
if(sc.isFailure()){
error() <<"Error setting properties!"<<endmsg;
return sc;
}
m_incSvc = service("IncidentSvc",true);
if (!m_incSvc) throw GaudiException("Cannot find IncidentSvc",m_name,StatusCode::FAILURE);
m_msgSvc=msgSvc();
if(m_incidentNames.value().size()==0){
std::vector<std::string> incNames;
incNames.push_back(IncidentType::BeginEvent);
incNames.push_back(IncidentType::EndEvent);
m_incidentNames.setValue(incNames);
}
auto& incNames=m_incidentNames.value();
for(auto& i:incNames){
m_incSvc->addListener(this,i,m_prio);
}
return sc;
}
StatusCode IncidentAsyncTestSvc::finalize(){
return Service::finalize();
}
//=============================================================================
void IncidentAsyncTestSvc::handle(const Incident &incident) {
MsgStream log( m_msgSvc, m_name );
if(incident.type()==IncidentType::BeginEvent){
auto res=m_ctxData.insert(std::make_pair(incident.context(),incident.context().evt()*m_eventMultiplier+m_fileOffset));
if(!res.second){
log << MSG::WARNING << m_name<<" Context already exists for '" << incident.type()
<< "' event="<<incident.context().evt() << endmsg;
}
}else if(incident.type()==IncidentType::EndEvent){
{
std::unique_lock<decltype(m_eraseMutex)>(m_eraseMutex);
auto res=m_ctxData.unsafe_erase(incident.context());
if(res==0){
log << MSG::WARNING << m_name<<" Context is missing for '" << incident.type()
<< "' event="<<incident.context().evt() << endmsg;
}
}
log << MSG::INFO <<m_name<< " Cleaned up context store for event =" <<incident.context().evt()
<< " for incident='"<<incident.type() <<"'"<<endmsg;
}
log << MSG::INFO << m_name<<" Handling incident '" << incident.type() << "' at ctx="<<incident.context() << endmsg;
}
void IncidentAsyncTestSvc::getData(uint64_t* data,EventContext* ctx)const {
MsgStream log( m_msgSvc, m_name );
log<<MSG::DEBUG<<"Asked for data with context "<<*ctx<<endmsg;
if(ctx){
auto cit=m_ctxData.find(*ctx);
if(cit==m_ctxData.end()){
log<<MSG::FATAL<<" data for event "<<ctx->evt()<<" is not initialized yet!. This shouldn't happen!"<<endmsg;
return;
}
*data=cit->second;
}else{
const auto& ct=Gaudi::Hive::currentContext();
auto cit=m_ctxData.find(ct);
if(cit==m_ctxData.end()){
log<<MSG::FATAL<<" data for event "<<ct.evt()<<" is not initialized yet!. This shouldn't happen!"<<endmsg;
return;
}
*data=cit->second;
}
}
#ifndef GAUDIEXAMPLES_INCIDENTASYNCTESTSVC_H_