Skip to content
Snippets Groups Projects
Commit 709be0fd authored by Sebastien Ponce's avatar Sebastien Ponce
Browse files

Optimized AvalanchScheduler by avoiding the use of algo names and prefering algo indexes

Credits to Hadrien for spotting that possibility.
Also dropped a couple of dynamic_casts that were simply not useful and moved the waiting list from map to unordered_map (credits to Hadrien again)
parent afb87507
No related branches found
No related tags found
1 merge request!482Optimized AvalancheScheduler by avoiding the use of algo names and prefering algo indexes
......@@ -622,7 +622,7 @@ StatusCode AvalancheSchedulerSvc::eventFailed( EventContext* eventContext )
* * No algorithms have been signed off by the data flow
* * No algorithms have been scheduled
*/
StatusCode AvalancheSchedulerSvc::updateStates( int si, const std::string& algo_name )
StatusCode AvalancheSchedulerSvc::updateStates( int si, const int algo_index )
{
StatusCode global_sc( StatusCode::SUCCESS );
......@@ -652,8 +652,8 @@ StatusCode AvalancheSchedulerSvc::updateStates( int si, const std::string& algo_
AlgsExecutionStates& thisAlgsStates = thisSlot.algsStates;
// Perform the I->CR->DR transitions
if ( !algo_name.empty() ) {
Cause cs = {Cause::source::Task, algo_name};
if ( algo_index >= 0 ) {
Cause cs = {Cause::source::Task, index2algname( algo_index )};
if ( m_precSvc->iterate( thisSlot, cs ).isFailure() ) {
error() << "Failed to call IPrecedenceSvc::iterate for slot " << iSlot << endmsg;
global_sc = StatusCode::FAILURE;
......@@ -672,33 +672,6 @@ StatusCode AvalancheSchedulerSvc::updateStates( int si, const std::string& algo_
for ( auto it = thisAlgsStates.begin( AlgsExecutionStates::State::DATAREADY );
it != thisAlgsStates.end( AlgsExecutionStates::State::DATAREADY ); ++it )
buffer.push( *it );
/*std::stringstream s;
auto buffer2 = buffer;
while (!buffer2.empty()) {
s << m_precSvc->getPriority(index2algname(buffer2.top())) << ", ";
buffer2.pop();
}
info() << "DRBuffer is: [ " << s.str() << " ] <--" << algo_name << " executed" << endmsg;*/
/*while (!buffer.empty()) {
partial_sc = promoteToScheduled(buffer.top(), iSlot);
if (partial_sc.isFailure()) {
if (msgLevel(MSG::VERBOSE))
verbose() << "Could not apply transition from "
<< AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
<< " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot << endmsg;
if (m_useIOBoundAlgScheduler) {
partial_sc = promoteToAsyncScheduled(buffer.top(), iSlot);
if (msgLevel(MSG::VERBOSE))
if (partial_sc.isFailure())
verbose() << "[Asynchronous] Could not apply transition from "
<< AlgsExecutionStates::stateNames[AlgsExecutionStates::State::DATAREADY]
<< " for algorithm " << index2algname(buffer.top()) << " on processing slot " << iSlot <<
endmsg;
}
}
buffer.pop();
}*/
while ( !buffer.empty() ) {
bool IOBound = false;
if ( m_useIOBoundAlgScheduler ) IOBound = m_precSvc->isBlocking( index2algname( buffer.top() ) );
......@@ -740,7 +713,7 @@ StatusCode AvalancheSchedulerSvc::updateStates( int si, const std::string& algo_
if ( m_dumpIntraEventDynamics ) {
std::stringstream s;
s << algo_name << ", " << thisAlgsStates.sizeOfSubset( State::CONTROLREADY ) << ", "
s << index2algname( algo_index ) << ", " << thisAlgsStates.sizeOfSubset( State::CONTROLREADY ) << ", "
<< thisAlgsStates.sizeOfSubset( State::DATAREADY ) << ", " << thisAlgsStates.sizeOfSubset( State::SCHEDULED )
<< ", " << std::chrono::high_resolution_clock::now().time_since_epoch().count() << "\n";
auto threads = ( m_threadPoolSize != -1 ) ? std::to_string( m_threadPoolSize )
......@@ -904,8 +877,7 @@ StatusCode AvalancheSchedulerSvc::promoteToScheduled( unsigned int iAlgo, int si
if ( msgLevel( MSG::VERBOSE ) ) dumpSchedulerState( -1 );
if ( updateSc.isSuccess() )
if ( msgLevel( MSG::VERBOSE ) )
verbose() << "Promoting " << index2algname( iAlgo ) << " to SCHEDULED on slot " << si << endmsg;
if ( msgLevel( MSG::VERBOSE ) ) verbose() << "Promoting " << algName << " to SCHEDULED on slot " << si << endmsg;
return updateSc;
} else {
if ( msgLevel( MSG::DEBUG ) )
......@@ -950,7 +922,7 @@ StatusCode AvalancheSchedulerSvc::promoteToAsyncScheduled( unsigned int iAlgo, i
if ( updateSc.isSuccess() )
if ( msgLevel( MSG::VERBOSE ) )
verbose() << "[Asynchronous] Promoting " << index2algname( iAlgo ) << " to SCHEDULED on slot " << si << endmsg;
verbose() << "[Asynchronous] Promoting " << algName << " to SCHEDULED on slot " << si << endmsg;
return updateSc;
} else {
if ( msgLevel( MSG::DEBUG ) )
......@@ -967,11 +939,6 @@ StatusCode AvalancheSchedulerSvc::promoteToAsyncScheduled( unsigned int iAlgo, i
StatusCode AvalancheSchedulerSvc::promoteToExecuted( unsigned int iAlgo, int si, IAlgorithm* algo,
EventContext* eventContext )
{
// Put back the instance
Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
if ( !castedAlgo ) fatal() << "The casting did not succeed!" << endmsg;
// EventContext* eventContext = castedAlgo->getContext();
// Check if the execution failed
if ( m_algExecStateSvc->eventStatus( *eventContext ) != EventStatus::Success ) eventFailed( eventContext ).ignore();
......@@ -989,7 +956,7 @@ StatusCode AvalancheSchedulerSvc::promoteToExecuted( unsigned int iAlgo, int si,
EventSlot& thisSlot = m_eventSlots[si];
if ( msgLevel( MSG::DEBUG ) )
debug() << "Trying to handle execution result of " << index2algname( iAlgo ) << " on slot " << si << endmsg;
debug() << "Trying to handle execution result of " << algo->name() << " on slot " << si << endmsg;
State state;
if ( algo->filterPassed() ) {
state = State::EVTACCEPTED;
......@@ -1001,15 +968,15 @@ StatusCode AvalancheSchedulerSvc::promoteToExecuted( unsigned int iAlgo, int si,
if ( sc.isSuccess() )
if ( msgLevel( MSG::VERBOSE ) )
verbose() << "Promoting " << index2algname( iAlgo ) << " on slot " << si << " to "
<< AlgsExecutionStates::stateNames[state] << endmsg;
verbose() << "Promoting " << algo->name() << " on slot " << si << " to " << AlgsExecutionStates::stateNames[state]
<< endmsg;
if ( msgLevel( MSG::DEBUG ) )
debug() << "Algorithm " << algo->name() << " executed in slot " << si << ". Algorithms scheduled are "
<< m_algosInFlight << endmsg;
// Schedule an update of the status of the algorithms
m_actionsQueue.push( [ this, name = algo->name() ]() { return this->updateStates( -1, name ); } );
m_actionsQueue.push( [this, iAlgo]() { return this->updateStates( -1, iAlgo ); } );
return sc;
}
......@@ -1021,11 +988,6 @@ StatusCode AvalancheSchedulerSvc::promoteToExecuted( unsigned int iAlgo, int si,
StatusCode AvalancheSchedulerSvc::promoteToAsyncExecuted( unsigned int iAlgo, int si, IAlgorithm* algo,
EventContext* eventContext )
{
// Put back the instance
Algorithm* castedAlgo = dynamic_cast<Algorithm*>( algo ); // DP: expose context getter in IAlgo?
if ( !castedAlgo ) fatal() << "[Asynchronous] The casting did not succeed!" << endmsg;
// EventContext* eventContext = castedAlgo->getContext();
// Check if the execution failed
if ( m_algExecStateSvc->eventStatus( *eventContext ) != EventStatus::Success ) eventFailed( eventContext ).ignore();
......@@ -1042,8 +1004,7 @@ StatusCode AvalancheSchedulerSvc::promoteToAsyncExecuted( unsigned int iAlgo, in
EventSlot& thisSlot = m_eventSlots[si];
if ( msgLevel( MSG::DEBUG ) )
debug() << "[Asynchronous] Trying to handle execution result of " << index2algname( iAlgo ) << " on slot " << si
<< endmsg;
debug() << "[Asynchronous] Trying to handle execution result of " << algo->name() << " on slot " << si << endmsg;
State state;
if ( algo->filterPassed() ) {
state = State::EVTACCEPTED;
......@@ -1055,7 +1016,7 @@ StatusCode AvalancheSchedulerSvc::promoteToAsyncExecuted( unsigned int iAlgo, in
if ( sc.isSuccess() )
if ( msgLevel( MSG::VERBOSE ) )
verbose() << "[Asynchronous] Promoting " << index2algname( iAlgo ) << " on slot " << si << " to "
verbose() << "[Asynchronous] Promoting " << algo->name() << " on slot " << si << " to "
<< AlgsExecutionStates::stateNames[state] << endmsg;
if ( msgLevel( MSG::DEBUG ) )
......@@ -1063,7 +1024,7 @@ StatusCode AvalancheSchedulerSvc::promoteToAsyncExecuted( unsigned int iAlgo, in
<< ". Algorithms scheduled are " << m_IOBoundAlgosInFlight << endmsg;
// Schedule an update of the status of the algorithms
m_actionsQueue.push( [ this, name = algo->name() ]() { return this->updateStates( -1, name ); } );
m_actionsQueue.push( [this, iAlgo]() { return this->updateStates( -1, iAlgo ); } );
return sc;
}
......
......@@ -227,8 +227,8 @@ private:
unsigned int m_IOBoundAlgosInFlight = 0;
/// Loop on algorithm in the slots and promote them to successive states
/// (-1 means all slots, while empty string means skipping an update of the Control Flow state)
StatusCode updateStates( int si = -1, const std::string& algo_name = std::string() );
/// (-1 for algo_index means skipping an update of the Control Flow state)
StatusCode updateStates( int si = -1, int algo_index = -1 );
/// Algorithm promotion
StatusCode promoteToScheduled( unsigned int iAlgo, int si );
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment