Skip to content
Snippets Groups Projects
Commit 23ac55b4 authored by Marco Clemencic's avatar Marco Clemencic
Browse files

Made PrecedenceSvc recognize data loader algorithm (mr !473)

parents 765ceac7 054f4877
No related branches found
No related tags found
1 merge request!473Make runtime declaration of auto-loaded data inputs be recognized by the Precedence Service
Pipeline #
#!/usr/bin/env gaudirun.py
"""
Find and attribute unmet data inputs as outputs to a Data Loader algorithm.
"""
from Gaudi.Configuration import *
from Configurables import HiveWhiteBoard, HiveSlimEventLoopMgr, AvalancheSchedulerSvc, CPUCruncher
# metaconfig
evtslots = 1
evtMax = 3
algosInFlight = 1
whiteboard = HiveWhiteBoard("EventDataSvc",
EventSlots=evtslots,
OutputLevel=INFO)
slimeventloopmgr = HiveSlimEventLoopMgr(SchedulerName="AvalancheSchedulerSvc")
AvalancheSchedulerSvc(ThreadPoolSize=algosInFlight,
CheckDependencies=True,
DataLoaderAlg="AlgA")
# Assemble the data flow graph
a1 = CPUCruncher("AlgA", Loader=True, OutputLevel=VERBOSE)
a2 = CPUCruncher("AlgB", OutputLevel=VERBOSE)
a2.inpKeys = ['/Event/A1']
a3 = CPUCruncher("AlgC", OutputLevel=VERBOSE)
a3.inpKeys = ['/Event/A2']
for a in [a1, a2, a3]:
a.shortCalib = True
a.avgRuntime = .01
ApplicationMgr(EvtMax=evtMax,
EvtSel='NONE',
ExtSvc=[whiteboard],
EventLoop=slimeventloopmgr,
TopAlg=[a1, a2, a3],
MessageSvcType="InertMessageSvc",
OutputLevel=DEBUG)
......@@ -103,18 +103,6 @@ StatusCode AvalancheSchedulerSvc::initialize()
return StatusCode::FAILURE;
}
// Get the precedence service
m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
if ( !m_precSvc.isValid() ) {
fatal() << "Error retrieving PrecedenceSvc" << endmsg;
return StatusCode::FAILURE;
}
const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
if ( !precSvc ) {
fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
return StatusCode::FAILURE;
}
m_algExecStateSvc = serviceLocator()->service( "AlgExecStateSvc" );
if ( !m_algExecStateSvc.isValid() ) {
fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
......@@ -175,7 +163,7 @@ StatusCode AvalancheSchedulerSvc::initialize()
std::ostringstream ostdd;
ostdd << "Data Dependencies for Algorithms:";
std::vector<DataObjIDColl> m_algosDependencies;
std::unordered_map<std::string, DataObjIDColl> algosDependenciesMap;
for ( IAlgorithm* ialgoPtr : algos ) {
Algorithm* algoPtr = dynamic_cast<Algorithm*>( ialgoPtr );
if ( nullptr == algoPtr ) {
......@@ -220,49 +208,40 @@ StatusCode AvalancheSchedulerSvc::initialize()
} else {
ostdd << "\n none";
}
m_algosDependencies.emplace_back( algoDependencies );
algosDependenciesMap[algoPtr->name()] = algoDependencies;
}
if ( m_showDataDeps ) {
info() << ostdd.str() << endmsg;
}
// Fill the containers to convert algo names to index
m_algname_vect.resize( algsNumber );
IAlgorithm* dataLoaderAlg( nullptr );
for ( IAlgorithm* algo : algos ) {
const std::string& name = algo->name();
auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
m_algname_index_map[name] = index;
m_algname_vect.at( index ) = name;
if ( algo->name() == m_useDataLoader ) {
dataLoaderAlg = algo;
}
}
// Check if we have unmet global input dependencies
// Check if we have unmet global input dependencies, and, optionally, heal them
// WARNING: this step must be done BEFORE the Precedence Service is initialized
if ( m_checkDeps ) {
DataObjIDColl unmetDep;
for ( auto o : globalInp ) {
if ( globalOutp.find( o ) == globalOutp.end() ) {
unmetDep.insert( o );
}
}
for ( auto o : globalInp )
if ( globalOutp.find( o ) == globalOutp.end() ) unmetDep.insert( o );
if ( unmetDep.size() > 0 ) {
std::ostringstream ost;
for ( const DataObjID* o : sortedDataObjIDColl( unmetDep ) ) {
ost << "\n o " << *o << " required by Algorithm: ";
for ( size_t i = 0; i < m_algosDependencies.size(); ++i ) {
if ( m_algosDependencies[i].find( *o ) != m_algosDependencies[i].end() ) {
ost << "\n * " << m_algname_vect[i];
}
}
for ( const auto& p : algosDependenciesMap )
if ( p.second.find( *o ) != p.second.end() ) ost << "\n * " << p.first;
}
if ( m_useDataLoader != "" ) {
if ( !m_useDataLoader.empty() ) {
// Find the DataLoader Alg
IAlgorithm* dataLoaderAlg( nullptr );
for ( IAlgorithm* algo : algos )
if ( algo->name() == m_useDataLoader ) {
dataLoaderAlg = algo;
break;
}
if ( dataLoaderAlg == nullptr ) {
fatal() << "No DataLoader Algorithm \"" << m_useDataLoader.value()
<< "\" found, and unmet INPUT dependencies "
......@@ -298,6 +277,27 @@ StatusCode AvalancheSchedulerSvc::initialize()
}
}
// Get the precedence service
m_precSvc = serviceLocator()->service( "PrecedenceSvc" );
if ( !m_precSvc.isValid() ) {
fatal() << "Error retrieving PrecedenceSvc" << endmsg;
return StatusCode::FAILURE;
}
const PrecedenceSvc* precSvc = dynamic_cast<const PrecedenceSvc*>( m_precSvc.get() );
if ( !precSvc ) {
fatal() << "Unable to dcast PrecedenceSvc" << endmsg;
return StatusCode::FAILURE;
}
// Fill the containers to convert algo names to index
m_algname_vect.resize( algsNumber );
for ( IAlgorithm* algo : algos ) {
const std::string& name = algo->name();
auto index = precSvc->getRules()->getAlgorithmNode( name )->getAlgoIndex();
m_algname_index_map[name] = index;
m_algname_vect.at( index ) = name;
}
// Shortcut for the message service
SmartIF<IMessageSvc> messageSvc( serviceLocator() );
if ( !messageSvc.isValid() ) error() << "Error retrieving MessageSvc interface IMessageSvc." << endmsg;
......
......@@ -196,10 +196,29 @@ void CPUCruncher::findPrimes( const unsigned long int n_iterations )
delete[] primes;
}
//------------------------------------------------------------------------------
void CPUCruncher::declareRuntimeRequestedOutputs()
{
//
for ( const auto& k : outputDataObjs() ) {
auto outputHandle = new DataObjectHandle<DataObject>( k, Gaudi::DataHandle::Writer, this );
VERBOSE_MSG << "found late-attributed output: " << outputHandle->objKey() << endmsg;
m_outputHandles.push_back( outputHandle );
declareProperty( "dummy_out_" + outputHandle->objKey(), *( m_outputHandles.back() ) );
}
initDataHandleHolder();
m_declAugmented = true;
}
//------------------------------------------------------------------------------
StatusCode CPUCruncher::execute() // the execution of the algorithm
{
if ( m_loader && !m_declAugmented ) declareRuntimeRequestedOutputs();
float crunchtime;
if ( m_local_rndm_gen ) {
......
......@@ -52,6 +52,11 @@ private:
void calibrate();
long unsigned int getNCaliIters( double );
/// Pick up late-attributed data outputs
void declareRuntimeRequestedOutputs();
bool m_declAugmented{false};
Gaudi::Property<bool> m_loader{this, "Loader", false, "Declare the algorithm to be a data loader"};
Gaudi::Property<std::vector<std::string>> m_inpKeys{this, "inpKeys", {}, ""};
Gaudi::Property<std::vector<std::string>> m_outKeys{this, "outKeys", {}, ""};
......
<?xml version="1.0" ?><!DOCTYPE extension PUBLIC '-//QM/2.3/Extension//EN' 'http://www.codesourcery.com/qm/dtds/2.3/-//qm/2.3/extension//en.dtd'>
<extension class="GaudiTest.GaudiExeTest" kind="test">
<argument name="program"><text>gaudirun.py</text></argument>
<argument name="args"><set>
<text>-v</text>
<text>../../options/AutoLoadUnmetDataInputs.py</text>
</set></argument>
<argument name="use_temp_dir"><enumeral>true</enumeral></argument>
<argument name="timeout"><integer>120</integer></argument>
</extension>
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment