Skip to content
Snippets Groups Projects
Commit 9fdcf724 authored by Illya Shapoval's avatar Illya Shapoval
Browse files

migrate AvalancheSchedulerSvc to tbb::task_group to address deprecation of tbb::task API

parent e4424a93
No related branches found
No related tags found
1 merge request!1067Migrate from TBB Task API
......@@ -33,30 +33,25 @@ namespace Gaudi {
}
} // namespace Gaudi
template <class T>
class AlgTask : public T {
class AlgTask {
public:
AlgTask( AvalancheSchedulerSvc* scheduler, ISvcLocator* svcLocator, IAlgExecStateSvc* aem )
: m_scheduler( scheduler ), m_aess( aem ), m_serviceLocator( svcLocator ){};
AlgTask( AvalancheSchedulerSvc* scheduler, ISvcLocator* svcLocator, IAlgExecStateSvc* aem, bool blocking = false )
: m_scheduler( scheduler ), m_aess( aem ), m_serviceLocator( svcLocator ), m_blocking( blocking ){};
AlgTask( AvalancheSchedulerSvc::TaskSpec&& ts, AvalancheSchedulerSvc* scheduler, ISvcLocator* svcLocator,
IAlgExecStateSvc* aem )
: m_ts( std::move( ts ) ), m_scheduler( scheduler ), m_aess( aem ), m_serviceLocator( svcLocator ){};
T* execute() override {
void operator()() const {
SmartIF<IMessageSvc> messageSvc( m_serviceLocator );
MsgStream log( messageSvc, "AlgTask" );
// Get task specification dynamically if it was not provided statically
if ( !m_ts.algPtr )
if ( !m_scheduler->m_scheduledQueue.try_pop( m_ts ) ) {
log << MSG::WARNING << "Specification not complete or void while task is running" << endmsg;
return nullptr;
}
AvalancheSchedulerSvc::TaskSpec ts;
if ( !m_scheduler->next( ts, m_blocking ) ) {
log << MSG::WARNING << "Missing specification while task is running" << endmsg;
return;
}
EventContext& evtCtx = *( m_ts.contextPtr );
IAlgorithm*& iAlgoPtr = m_ts.algPtr;
EventContext& evtCtx = *( ts.contextPtr );
IAlgorithm*& iAlgoPtr = ts.algPtr;
Gaudi::Algorithm* this_algo = dynamic_cast<Gaudi::Algorithm*>( iAlgoPtr );
if ( !this_algo ) { throw GaudiException( "Cast to Algorithm failed!", "AlgTask", StatusCode::FAILURE ); }
......@@ -84,21 +79,21 @@ public:
RetCodeGuard rcg( appmgr, Gaudi::ReturnCode::UnhandledException );
if ( UNLIKELY( iAlgoPtr->sysExecute( evtCtx ).isFailure() ) ) {
log << MSG::WARNING << "Execution of algorithm " << m_ts.algName << " failed" << endmsg;
log << MSG::WARNING << "Execution of algorithm " << ts.algName << " failed" << endmsg;
eventfailed = true;
}
rcg.ignore(); // disarm the guard
} catch ( const GaudiException& Exception ) {
log << MSG::FATAL << ".executeEvent(): Exception with tag=" << Exception.tag() << " thrown by " << m_ts.algName
log << MSG::FATAL << ".executeEvent(): Exception with tag=" << Exception.tag() << " thrown by " << ts.algName
<< endmsg;
log << MSG::ERROR << Exception << endmsg;
eventfailed = true;
} catch ( const std::exception& Exception ) {
log << MSG::FATAL << ".executeEvent(): Standard std::exception thrown by " << m_ts.algName << endmsg;
log << MSG::FATAL << ".executeEvent(): Standard std::exception thrown by " << ts.algName << endmsg;
log << MSG::ERROR << Exception.what() << endmsg;
eventfailed = true;
} catch ( ... ) {
log << MSG::FATAL << ".executeEvent(): UNKNOWN Exception thrown by " << m_ts.algName << endmsg;
log << MSG::FATAL << ".executeEvent(): UNKNOWN Exception thrown by " << ts.algName << endmsg;
eventfailed = true;
}
......@@ -106,24 +101,22 @@ public:
m_aess->updateEventStatus( eventfailed, evtCtx );
// Release algorithm
m_scheduler->m_algResourcePool->releaseAlgorithm( m_ts.algName, iAlgoPtr ).ignore();
m_scheduler->m_algResourcePool->releaseAlgorithm( ts.algName, iAlgoPtr ).ignore();
// schedule a sign-off of the Algorithm execution
m_scheduler->m_actionsQueue.push(
[schdlr = this->m_scheduler, ts = std::move( this->m_ts )]() { return schdlr->signoff( ts ); } );
[schdlr = this->m_scheduler, ts = std::move( ts )]() { return schdlr->signoff( ts ); } );
Gaudi::Hive::setCurrentContextEvt( -1 );
return nullptr;
}
void operator()() { execute(); };
private:
AvalancheSchedulerSvc::TaskSpec m_ts;
AvalancheSchedulerSvc* m_scheduler;
IAlgExecStateSvc* m_aess;
SmartIF<ISvcLocator> m_serviceLocator;
// Shortcuts to services
AvalancheSchedulerSvc* m_scheduler;
IAlgExecStateSvc* m_aess;
SmartIF<ISvcLocator> m_serviceLocator;
// Marks the task as CPU-blocking or not
bool m_blocking{false};
};
#endif
......@@ -16,7 +16,6 @@
#include "GaudiKernel/DataHandleHolderVisitor.h"
#include "GaudiKernel/IAlgorithm.h"
#include "GaudiKernel/IDataManagerSvc.h"
#include "GaudiKernel/ITask.h"
#include "GaudiKernel/ThreadLocalContext.h"
#include <Gaudi/Algorithm.h> // can be removed ASA dynamic casts to Algorithm are removed
......@@ -38,7 +37,6 @@
#if TBB_INTERFACE_VERSION_MAJOR < 12
# include "tbb/task_scheduler_init.h"
#endif // TBB_INTERFACE_VERSION_MAJOR < 12
#include "tbb/task.h"
// Instantiation of a static factory class used by clients to create instances of this service
DECLARE_COMPONENT( AvalancheSchedulerSvc )
......@@ -937,22 +935,16 @@ StatusCode AvalancheSchedulerSvc::schedule( TaskSpec&& ts ) {
// Add the algorithm to the scheduled queue
m_scheduledQueue.push( std::move( ts ) );
// Prepare a TBB task that will execute the Algorithm according to the above queued specs
auto algoTask =
new ( tbb::task::allocate_root() ) AlgTask<tbb::task>( this, serviceLocator(), m_algExecStateSvc );
// schedule the task
tbb::task::enqueue( *algoTask );
m_taskGroup.run( AlgTask( this, serviceLocator(), m_algExecStateSvc ) );
++m_algosInFlight;
} else { // schedule blocking algorithm in independent thread
// Prepare Gaudi task that will execute the Algorithm according to the above queued specs
auto algoTask = AlgTask<ITask>( std::move( ts ), this, serviceLocator(), m_algExecStateSvc );
m_scheduledBlockingQueue.push( std::move( ts ) );
// Schedule the blocking task in an independent thread
++m_blockingAlgosInFlight;
std::thread _t( std::move( algoTask ) );
std::thread _t( AlgTask( this, serviceLocator(), m_algExecStateSvc, true ) );
_t.detach();
} // end scheduling blocking Algorithm
......@@ -968,7 +960,7 @@ StatusCode AvalancheSchedulerSvc::schedule( TaskSpec&& ts ) {
<< endmsg;
} else { // Avoid scheduling via TBB if the pool size is -100. Instead, run here in the scheduler's control thread
AlgTask<ITask> theTask( std::move( ts ), this, serviceLocator(), m_algExecStateSvc );
AlgTask theTask( this, serviceLocator(), m_algExecStateSvc, false );
++m_algosInFlight;
sc = revise( ts.algIndex, ts.contextPtr, AState::SCHEDULED );
theTask();
......
......@@ -17,7 +17,6 @@
#include "PrecedenceSvc.h"
// Framework include files
#include "GaudiKernel/IAccelerator.h"
#include "GaudiKernel/IAlgExecStateSvc.h"
#include "GaudiKernel/IAlgResourcePool.h"
#include "GaudiKernel/ICondSvc.h"
......@@ -39,7 +38,7 @@
// External libs
#include "tbb/concurrent_priority_queue.h"
#include "tbb/concurrent_queue.h"
#include "tbb/task.h"
#include "tbb/task_group.h"
class IAlgorithm;
......@@ -112,15 +111,15 @@ class IAlgorithm;
*/
class AvalancheSchedulerSvc : public extends<Service, IScheduler> {
template <class T>
friend class AlgTask;
public:
/// Constructor
using extends::extends;
/// Destructor
~AvalancheSchedulerSvc() override = default;
/// Destructor. Need to enforce noexcept specification as otherwise the noexcept(false) destructor of the
/// tbb::task_group member violates the contract
~AvalancheSchedulerSvc() noexcept override {}
/// Initialise
StatusCode initialize() override;
......@@ -321,6 +320,7 @@ private:
/// Queues for scheduled algorithms
tbb::concurrent_priority_queue<TaskSpec, AlgQueueSort> m_scheduledQueue;
tbb::concurrent_priority_queue<TaskSpec, AlgQueueSort> m_scheduledBlockingQueue;
std::queue<TaskSpec> m_retryQueue;
// Prompt the scheduler to call updateStates
......@@ -332,6 +332,16 @@ private:
SmartIF<IThreadPoolSvc> m_threadPoolSvc;
size_t m_maxEventsInFlight{0};
size_t m_maxAlgosInFlight{1};
// Task management --------------------------------------------------------
tbb::task_group m_taskGroup;
public:
// get next schedule-able TaskSpec
bool next( TaskSpec& ts, bool blocking = false ) {
return blocking ? m_scheduledBlockingQueue.try_pop( ts ) : m_scheduledQueue.try_pop( ts );
};
};
#endif // GAUDIHIVE_AVALANCHESCHEDULERSVC_H
/***********************************************************************************\
* (c) Copyright 1998-2019 CERN for the benefit of the LHCb and ATLAS collaborations *
* *
* This software is distributed under the terms of the Apache version 2 licence, *
* copied verbatim in the file "LICENSE". *
* *
* In applying this licence, CERN does not waive the privileges and immunities *
* granted to it by virtue of its status as an Intergovernmental Organization *
* or submit itself to any jurisdiction. *
\***********************************************************************************/
#ifndef GAUDIKERNEL_IACCELERATOR_H
#define GAUDIKERNEL_IACCELERATOR_H
#include <functional>
#include <vector>
// Framework include files
#include "GaudiKernel/IInterface.h"
#include "GaudiKernel/ITask.h"
/**@class IAccelerator IAccelerator.h GaudiKernel/IAccelerator.h
*
* General interface for an accelerator-based algorithm scheduler.
*
* @author Illya Shapoval
* @version 1.0
*/
class GAUDI_API IAccelerator : virtual public IInterface {
public:
/// InterfaceID
DeclareInterfaceID( IAccelerator, 1, 0 );
virtual StatusCode push( ITask& task ) = 0;
};
#endif
/***********************************************************************************\
* (c) Copyright 1998-2019 CERN for the benefit of the LHCb and ATLAS collaborations *
* *
* This software is distributed under the terms of the Apache version 2 licence, *
* copied verbatim in the file "LICENSE". *
* *
* In applying this licence, CERN does not waive the privileges and immunities *
* granted to it by virtue of its status as an Intergovernmental Organization *
* or submit itself to any jurisdiction. *
\***********************************************************************************/
#ifndef GAUDIKERNEL_GAUDIKERNEL_ITASK_H_
#define GAUDIKERNEL_GAUDIKERNEL_ITASK_H_
/**@class ITask ITask.h GaudiKernel/ITask.h
*
* General interface for a Gaudi task.
*
* @author Illya Shapoval
* @version 1.0
*/
class ITask {
public:
virtual ~ITask() = default;
virtual ITask* execute() = 0;
};
#endif /* GAUDIKERNEL_GAUDIKERNEL_ITASK_H_ */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment