From 92e31e2859445b6da18724ac0f17d0a577b40647 Mon Sep 17 00:00:00 2001 From: Benedikt Hegner <hegner@cern.ch> Date: Sun, 25 Sep 2016 10:16:25 +0200 Subject: [PATCH 1/4] remove deprecated copy of state machine interface from IAlgManager This removal has been checked grep'ing the git repo of ATLAS --- GaudiKernel/GaudiKernel/IAlgManager.h | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/GaudiKernel/GaudiKernel/IAlgManager.h b/GaudiKernel/GaudiKernel/IAlgManager.h index 995a1323f..f6f40e6be 100644 --- a/GaudiKernel/GaudiKernel/IAlgManager.h +++ b/GaudiKernel/GaudiKernel/IAlgManager.h @@ -59,26 +59,6 @@ public: /// Return the list of Algorithms virtual const std::vector<IAlgorithm*>& getAlgorithms() const = 0; -#if !defined(GAUDI_V22_API) || defined(G22_NEW_SVCLOCATOR) - /// Initializes the list of "managed" algorithms - virtual StatusCode initializeAlgorithms() { return initialize(); } - - /// Starts the list of "managed" algorithms - virtual StatusCode startAlgorithms() { return start(); } - - /// Stops the list of "managed" algorithms - virtual StatusCode stopAlgorithms() { return stop(); } - - /// Finalizes the list of "managed" algorithms - virtual StatusCode finalizeAlgorithms() { return finalize(); } - - /// Initializes the list of "managed" algorithms - virtual StatusCode reinitializeAlgorithms() { return reinitialize(); } - - /// Starts the list of "managed" algorithms - virtual StatusCode restartAlgorithms() { return restart(); } -#endif - /// Returns a smart pointer to a service. virtual SmartIF<IAlgorithm> &algorithm(const Gaudi::Utils::TypeNameString &typeName, const bool createIf = true) = 0; -- GitLab From a870c796bf30bf2627905caf00660e12fba6eff4 Mon Sep 17 00:00:00 2001 From: Benedikt Hegner <hegner@cern.ch> Date: Wed, 28 Sep 2016 09:48:04 +0200 Subject: [PATCH 2/4] remove deprecated RoundRobin, ParallelSequential and Sequential schedulers --- GaudiHive/CMakeLists.txt | 4 - .../ParallelSequentialSchedulerSimpleTest.py | 50 --- .../options/RoundRobinSchedulerSimpleTest.py | 47 --- .../options/SequentialSchedulerSimpleTest.py | 51 --- .../src/ParallelSequentialSchedulerSvc.cpp | 384 ------------------ .../src/ParallelSequentialSchedulerSvc.h | 144 ------- GaudiHive/src/RoundRobinSchedulerSvc.cpp | 297 -------------- GaudiHive/src/RoundRobinSchedulerSvc.h | 106 ----- GaudiHive/src/SequentialSchedulerSvc.cpp | 186 --------- 9 files changed, 1269 deletions(-) delete mode 100644 GaudiHive/options/ParallelSequentialSchedulerSimpleTest.py delete mode 100644 GaudiHive/options/RoundRobinSchedulerSimpleTest.py delete mode 100644 GaudiHive/options/SequentialSchedulerSimpleTest.py delete mode 100644 GaudiHive/src/ParallelSequentialSchedulerSvc.cpp delete mode 100644 GaudiHive/src/ParallelSequentialSchedulerSvc.h delete mode 100644 GaudiHive/src/RoundRobinSchedulerSvc.cpp delete mode 100644 GaudiHive/src/RoundRobinSchedulerSvc.h delete mode 100644 GaudiHive/src/SequentialSchedulerSvc.cpp diff --git a/GaudiHive/CMakeLists.txt b/GaudiHive/CMakeLists.txt index c2af1ef61..226a0e1f4 100644 --- a/GaudiHive/CMakeLists.txt +++ b/GaudiHive/CMakeLists.txt @@ -58,10 +58,6 @@ gaudi_add_test(ForwardSchedulerStall PASSREGEX "Stall detected" TIMEOUT 120) -gaudi_add_test(SequentialSchedulerSimpleTest - FRAMEWORK options/SequentialSchedulerSimpleTest.py - TIMEOUT 120) - gaudi_add_test(GraphSchedulerSimpleTest FRAMEWORK options/GraphSchedulerSimpleTest.py TIMEOUT 120) diff --git a/GaudiHive/options/ParallelSequentialSchedulerSimpleTest.py b/GaudiHive/options/ParallelSequentialSchedulerSimpleTest.py deleted file mode 100644 index c75546741..000000000 --- a/GaudiHive/options/ParallelSequentialSchedulerSimpleTest.py +++ /dev/null @@ -1,50 +0,0 @@ -#!/usr/bin/env gaudirun.py - -from Gaudi.Configuration import * -from Configurables import HiveWhiteBoard, HiveSlimEventLoopMgr, ParallelSequentialSchedulerSvc, CPUCruncher,AlgResourcePool - -InertMessageSvc(OutputLevel=DEBUG) - -# metaconfig -evtMax = 10 -evtInFlight = 10 - -scheduler = ParallelSequentialSchedulerSvc(OutputLevel=DEBUG) - -slimeventloopmgr = HiveSlimEventLoopMgr(SchedulerName="ParallelSequentialSchedulerSvc", - OutputLevel=DEBUG) - -whiteboard = HiveWhiteBoard("EventDataSvc", - EventSlots = evtInFlight) - - -algResPool = AlgResourcePool(OutputLevel=DEBUG) - -a1 = CPUCruncher("A1", - DataOutputs = ['/Event/a1'], - shortCalib=True, - varRuntime=.1, - avgRuntime=.5 ) -a2 = CPUCruncher("A2", - shortCalib=True, - DataInputs = ['/Event/a1'], - DataOutputs = ['/Event/a2']) -a3 = CPUCruncher("A3", - shortCalib=True, - DataInputs = ['/Event/a1'], - DataOutputs = ['/Event/a3']) -a4 = CPUCruncher("A4", - shortCalib=True, - DataInputs = ['/Event/a2','/Event/a3'], - DataOutputs = ['/Event/a4']) - -for algo in [a1,a2,a3,a4]: - algo.OutputLevel=INFO - algo.Cardinality=1 - -ApplicationMgr( EvtMax = evtMax, - EvtSel = 'NONE', - ExtSvc =[whiteboard,algResPool], - EventLoop = slimeventloopmgr, - TopAlg = [a1,a2,a3,a4], - MessageSvcType="InertMessageSvc") diff --git a/GaudiHive/options/RoundRobinSchedulerSimpleTest.py b/GaudiHive/options/RoundRobinSchedulerSimpleTest.py deleted file mode 100644 index da47ac591..000000000 --- a/GaudiHive/options/RoundRobinSchedulerSimpleTest.py +++ /dev/null @@ -1,47 +0,0 @@ -#!/usr/bin/env gaudirun.py - -from Gaudi.Configuration import * -from Configurables import HiveWhiteBoard, HiveSlimEventLoopMgr, RoundRobinSchedulerSvc, CPUCruncher,AlgResourcePool - -InertMessageSvc(OutputLevel=INFO) - -# metaconfig -evtMax = 10 - -scheduler = RoundRobinSchedulerSvc(OutputLevel=DEBUG) - -slimeventloopmgr = HiveSlimEventLoopMgr(SchedulerName="RoundRobinSchedulerSvc", - OutputLevel=DEBUG) - -whiteboard = HiveWhiteBoard("EventDataSvc") - - -algResPool=AlgResourcePool(OutputLevel=DEBUG) - -a1 = CPUCruncher("A1", - DataOutputs = ['/Event/a1'], - shortCalib=True, - varRuntime=.1, - avgRuntime=.5 ) -a2 = CPUCruncher("A2", - shortCalib=True, - DataInputs = ['/Event/a1'], - DataOutputs = ['/Event/a2']) -a3 = CPUCruncher("A3", - shortCalib=True, - DataInputs = ['/Event/a1'], - DataOutputs = ['/Event/a3']) -a4 = CPUCruncher("A4", - shortCalib=True, - DataInputs = ['/Event/a2','/Event/a3'], - DataOutputs = ['/Event/a4']) - -for algo in [a1,a2,a3,a4]: - algo.OutputLevel=INFO - -ApplicationMgr( EvtMax = evtMax, - EvtSel = 'NONE', - ExtSvc =[whiteboard,algResPool], - EventLoop = slimeventloopmgr, - TopAlg = [a1,a2,a3,a4], - MessageSvcType="InertMessageSvc") diff --git a/GaudiHive/options/SequentialSchedulerSimpleTest.py b/GaudiHive/options/SequentialSchedulerSimpleTest.py deleted file mode 100644 index 9412c3ecf..000000000 --- a/GaudiHive/options/SequentialSchedulerSimpleTest.py +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/env gaudirun.py - -from Gaudi.Configuration import * -from Configurables import HiveWhiteBoard, HiveSlimEventLoopMgr, SequentialSchedulerSvc, CPUCruncher,AlgResourcePool - -InertMessageSvc(OutputLevel=INFO) - -# metaconfig -evtMax = 10 - -scheduler = SequentialSchedulerSvc(OutputLevel=DEBUG) - -slimeventloopmgr = HiveSlimEventLoopMgr(SchedulerName="SequentialSchedulerSvc", - OutputLevel=DEBUG) - -whiteboard = HiveWhiteBoard("EventDataSvc") - - -algResPool=AlgResourcePool(OutputLevel=DEBUG) - -a1 = CPUCruncher("A1", - shortCalib=True, - varRuntime=.1, - avgRuntime=.5 ) -a1.outKeys = ['/Event/a1'] - -a2 = CPUCruncher("A2", - shortCalib=True) -a2.inpKeys = ['/Event/a1'] -a2.outKeys = ['/Event/a2'] - -a3 = CPUCruncher("A3", - shortCalib=True) -a3.inpKeys = ['/Event/a1'] -a3.outKeys = ['/Event/a3'] - -a4 = CPUCruncher("A4", - shortCalib=True) -a4.inpKeys = ['/Event/a2','/Event/a3'] -a4.outKeys = ['/Event/a4'] - - -for algo in [a1,a2,a3,a4]: - algo.OutputLevel=INFO - -ApplicationMgr( EvtMax = evtMax, - EvtSel = 'NONE', - ExtSvc =[whiteboard,algResPool], - EventLoop = slimeventloopmgr, - TopAlg = [a1,a2,a3,a4], - MessageSvcType="InertMessageSvc") diff --git a/GaudiHive/src/ParallelSequentialSchedulerSvc.cpp b/GaudiHive/src/ParallelSequentialSchedulerSvc.cpp deleted file mode 100644 index 8fd47d8a7..000000000 --- a/GaudiHive/src/ParallelSequentialSchedulerSvc.cpp +++ /dev/null @@ -1,384 +0,0 @@ -// Framework includes -#include "GaudiKernel/SvcFactory.h" -#include "GaudiKernel/IAlgorithm.h" -#include "GaudiKernel/Algorithm.h" // will be IAlgorithm if context getter promoted to interface -#include "GaudiKernel/IProperty.h" -#include "GaudiKernel/AppReturnCode.h" -#include "GaudiKernel/CommonMessaging.h" -#include "GaudiKernel/IDataManagerSvc.h" - -#include "GaudiKernel/ThreadLocalContext.h" -#include "GaudiKernel/DataHandleHolderVisitor.h" - -// C++ -#include <list> -#include <thread> - -// Local -#include "ParallelSequentialSchedulerSvc.h" -#include "AlgResourcePool.h" -#include "RetCodeGuard.h" - -// Instantiation of a static factory class used by clients to create instances of this service -DECLARE_SERVICE_FACTORY(ParallelSequentialSchedulerSvc) - -//=========================================================================== -// Infrastructure methods - -ParallelSequentialSchedulerSvc::ParallelSequentialSchedulerSvc(const std::string& name, ISvcLocator* svcLoc): - base_class(name,svcLoc) { - - declareProperty("UseTopAlgList", m_useTopAlgList = false); - declareProperty("ThreadPoolSize", m_threadPoolSize = -1); - declareProperty("WhiteboardSvc", m_whiteboardSvcName = "EventDataSvc"); - -} - -//--------------------------------------------------------------------------- -ParallelSequentialSchedulerSvc::~ParallelSequentialSchedulerSvc(){} -//--------------------------------------------------------------------------- - -StatusCode ParallelSequentialSchedulerSvc::initialize(){ - - // Initialise mother class (read properties, ...) - StatusCode sc(Service::initialize()); - if (!sc.isSuccess()) - warning () << "Base class could not be initialized" << endmsg; - - // Get the algo resource pool - m_algResourcePool = serviceLocator()->service("AlgResourcePool"); - if (!m_algResourcePool.isValid()){ - error() << "Error retrieving AlgResourcePool" << endmsg; - return StatusCode::FAILURE; - } - - // Get the list of algorithms - m_algList = m_useTopAlgList ? m_algResourcePool->getTopAlgList() : m_algResourcePool->getFlatAlgList(); - info() << "Found " << m_algList.size() << " algorithms" << endmsg; - - // Get Whiteboard - m_whiteboard = serviceLocator()->service(m_whiteboardSvcName); - if (!m_whiteboard.isValid()) - fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg; - - // Check the MaxEventsInFlight parameters and react - // Deprecated for the moment - size_t numberOfWBSlots = m_whiteboard->getNumberOfStores(); - - // Set the number of free slots - m_freeSlots=numberOfWBSlots; - - info() << "Allowing " << m_freeSlots << " events in flight" << endmsg; - - if(m_threadPoolSize == -1) - m_threadPoolSize = numberOfWBSlots; - - debug() << "Initialising a TBB thread pool of size " << m_threadPoolSize << endmsg; - m_tbb_sched.reset(new tbb::task_scheduler_init(m_threadPoolSize)); - - // Fill the containers to convert algo names to index - m_algname_index_map.reserve(m_algList.size()); - m_algname_vect.reserve(m_algList.size()); - unsigned int index=0; - for (IAlgorithm* algo : m_algList){ - const std::string& name = algo->name(); - m_algname_index_map[name]=index; - m_algname_vect.emplace_back(name); - index++; - } - - //initialize control flow manager - const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>(m_algResourcePool.get()); - - m_controlFlow.initialize(algPool->getExecutionFlowGraph(), m_algname_index_map); - - const unsigned int algosDependenciesSize=0; - info() << "Algodependecies size is " << algosDependenciesSize << endmsg; - - //get algorithm dependencies - /* Dependencies - 0) Read deps from config file - 1) Look for handles in algo, if none - 2) Assume none are required - */ - if (algosDependenciesSize == 0){ - // Get the event root from the IDataManagerSvc interface of the WhiteBoard - SmartIF<IDataManagerSvc> dataMgrSvc (m_whiteboard); - std::string rootInTESName(dataMgrSvc->rootName()); - if ("" != rootInTESName && '/'!=rootInTESName[rootInTESName.size()-1]){ - rootInTESName = rootInTESName+"/"; - } - - for (IAlgorithm* ialgoPtr : m_algList){ - Algorithm* algoPtr = dynamic_cast<Algorithm*> (ialgoPtr); - if (nullptr == algoPtr){ - fatal() << "Could not convert IAlgorithm into Algorithm: this will result in a crash." << endmsg; - } - - std::vector<Gaudi::DataHandle*> algoHandles(algoPtr->inputHandles()); - DataObjIDColl algoDependencies; - if (!algoHandles.empty()){ - info() << "Algorithm " << algoPtr->name() << " data dependencies:" << endmsg; - - DataObjIDColl inputObjs, outputObjs; - DHHVisitor avis(inputObjs, outputObjs); - - algoPtr->acceptDHVisitor( &avis ); - - for (auto id : inputObjs) { - const std::string& productName = rootInTESName + id.key(); - info() << " o Input dep for " << productName << endmsg; - algoDependencies.insert(id); - } - - - } else { - info() << "Algorithm " << algoPtr->name() << " has no data dependencies." - << endmsg; - } - - } - } - - m_aess = serviceLocator()->service("AlgExecStateSvc"); - if( !m_aess.isValid() ) { - fatal() << "Error retrieving AlgExecStateSvc" << endmsg; - return StatusCode::FAILURE; - } - - return StatusCode::SUCCESS; - -} -//--------------------------------------------------------------------------- - -StatusCode ParallelSequentialSchedulerSvc::finalize(){ - m_tbb_sched.reset(); - - StatusCode sc(Service::finalize()); - if (!sc.isSuccess()) - warning () << "Base class could not be finalized" << endmsg; - return sc; -} - -/** Make an event available to the scheduler. Immediately the algortihms are - * executed. - */ -StatusCode ParallelSequentialSchedulerSvc::pushNewEvent(EventContext* eventContext){ - std::vector<EventContext*> eventContexts; - eventContexts.push_back(eventContext); - m_aess->reset(*eventContext); - return pushNewEvents(eventContexts); -} - -StatusCode ParallelSequentialSchedulerSvc::pushNewEvents(std::vector<EventContext*>& eventContexts){ - - for(auto evt : eventContexts){ - if(m_freeSlots.load() > 0){ - //only one thread executes scheduler --> m_freeSlots can only grow if other thread finishes - m_freeSlots--; - - debug() << "Enqueuing event " << evt->evt() << " @ " << evt->slot() << endmsg; - - tbb::task* t = new( tbb::task::allocate_root() ) - SequentialTask(serviceLocator(), evt, this, m_algResourcePool, m_aess); - tbb::task::enqueue( *t); - } else { - return StatusCode::FAILURE; - } - } - - return StatusCode::SUCCESS; - -} - -//--------------------------------------------------------------------------- -/** - * Get a finished event or block until one becomes available. - */ -StatusCode ParallelSequentialSchedulerSvc::popFinishedEvent(EventContext*& eventContext){ - - m_finishedEvents.pop(eventContext); - debug() << "Popped slot " << eventContext->slot() << "(event " - << eventContext->evt() << ")" << endmsg; - m_freeSlots++; - return StatusCode::SUCCESS; -} - -//--------------------------------------------------------------------------- -/** - * Try to get a finished event, if not available just return a failure - */ -StatusCode ParallelSequentialSchedulerSvc::tryPopFinishedEvent(EventContext*& eventContext){ - if (m_finishedEvents.try_pop(eventContext)){ - debug() << "Try Pop successful slot " << eventContext->slot() - << "(event " << eventContext->evt() << ")" << endmsg; - m_freeSlots++; - return StatusCode::SUCCESS; - } - return StatusCode::FAILURE; -} - -//--------------------------------------------------------------------------- - -/** Get free slots number. Given that the scheduler is sequential and its - * methods non reentrant, this is always 1. - */ -unsigned int ParallelSequentialSchedulerSvc::freeSlots(){return m_freeSlots;} - -//--------------------------------------------------------------------------- - -tbb::task* SequentialTask::execute() { - - // Get the IProperty interface of the ApplicationMgr to pass it to RetCodeGuard - const SmartIF<IProperty> appmgr(m_serviceLocator); - SmartIF<IMessageSvc> messageSvc (m_serviceLocator); - MsgStream log(messageSvc, "SequentialAlgoExecutionTask"); - log.activate(); - - StatusCode sc; - - //initialize control algorithm states and decisions - AlgsExecutionStates algStates(m_scheduler->m_algList.size(), messageSvc); - const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>(m_scheduler->m_algResourcePool.get()); - std::vector<int> nodeDecisions(algPool->getExecutionFlowGraph()->getControlFlowNodeCounter(), -1); - - m_scheduler->m_controlFlow.updateEventState(algStates, nodeDecisions); - m_scheduler->m_controlFlow.promoteToControlReadyState(algStates, nodeDecisions); - - //initialize data flow manager - //DataFlowManager dataFlow(m_scheduler->m_algosDependencies); - - //intitialize context - // m_eventContext->m_thread_id = pthread_self(); - bool eventFailed = false; - Gaudi::Hive::setCurrentContextId(m_eventContext->slot()); - - // loop while algorithms are controlFlowReady and event has not failed - while(!eventFailed && algStates.algsPresent(AlgsExecutionStates::State::CONTROLREADY) ){ - - //std::cout << "[" << m_eventContext->evt() << "] algorithms left" << std::endl; - - //std::for_each(m_scheduler->m_algList.begin(), m_scheduler->m_algList.end(), - - //[&] (IAlgorithm* ialgorithm) { - for(auto it = algStates.begin(AlgsExecutionStates::State::CONTROLREADY); it != algStates.end(AlgsExecutionStates::State::CONTROLREADY); ++it){ - - uint algIndex = *it; - - std::string algName = m_scheduler->m_algname_vect[algIndex]; - - //promote algorithm to data ready - algStates.updateState(algIndex,AlgsExecutionStates::DATAREADY); - - //std::cout << "Running algorithm [" << algIndex << "] " << ialgorithm->name() << " for event " << m_eventContext->evt() << std::endl; - log << MSG::DEBUG << "Running algorithm [" << algIndex << "] " << algName << " for event " << m_eventContext->evt() << endmsg; - - IAlgorithm* ialgoPtr=nullptr; - sc = m_algPool->acquireAlgorithm(algName,ialgoPtr, true); //blocking call - - if(sc.isFailure() || ialgoPtr == nullptr){ - log << MSG::ERROR << "Could not acquire algorithm " << algName << endmsg; - m_aess->setEventStatus(EventStatus::Other, *m_eventContext); - } else { // we got an algorithm - - //promote algorithm to scheduled - algStates.updateState(algIndex,AlgsExecutionStates::SCHEDULED); - - Algorithm* algoPtr = dynamic_cast<Algorithm*> (ialgoPtr); // DP: expose the setter of the context? - algoPtr->setContext(m_eventContext); - - // Call the execute() method - try { - RetCodeGuard rcg(appmgr, Gaudi::ReturnCode::UnhandledException); - sc = algoPtr->sysExecute(); - if (UNLIKELY(!sc.isSuccess())) { - log << MSG::WARNING << "Execution of algorithm " << algName << " failed" << endmsg; - eventFailed = true; - } - rcg.ignore(); // disarm the guard - } catch ( const GaudiException& Exception ) { - log << MSG::ERROR << ".executeEvent(): Exception with tag=" << Exception.tag() - << " thrown by " << algName << endmsg; - log << MSG::ERROR << Exception << endmsg; - } catch ( const std::exception& Exception ) { - log << MSG::FATAL << ".executeEvent(): Standard std::exception thrown by " - << algName << endmsg; - log << MSG::ERROR << Exception.what() << endmsg; - } catch(...) { - log << MSG::FATAL << ".executeEvent(): UNKNOWN Exception thrown by " - << algName << endmsg; - } - - if(sc.isFailure()){ - eventFailed = true; - m_aess->algExecState(ialgoPtr,*m_eventContext).setExecStatus(sc); - } - - //std::cout << "Algorithm [" << algIndex << "] " << ialgorithm->name() << " for event " << m_eventContext->evt() - // << (eventFailed ? " failed" : " succeeded") << std::endl; - log << MSG::DEBUG << "Algorithm [" << algIndex << "] " << algName << " for event " << m_eventContext->evt() - << (eventFailed ? " failed" : " succeded") << endmsg; - - AlgsExecutionStates::State state; - if(m_aess->algExecState(ialgoPtr,*m_eventContext).filterPassed()) { - state = AlgsExecutionStates::State::EVTACCEPTED; - } else { - state = AlgsExecutionStates::State::EVTREJECTED; - } - - log << MSG::DEBUG << "Algorithm [" << algIndex << "] " << algName - << " for event " << m_eventContext->evt() - << (m_aess->algExecState(ialgoPtr,*m_eventContext).filterPassed() ? - " passed" : " rejected") - << endmsg; - - sc = m_algPool->releaseAlgorithm(algName,ialgoPtr); - - algStates.updateState(algIndex,state); - - //just for debug: look at products -- not thread safe - // Update the catalog: some new products may be there - /*m_scheduler->m_whiteboard->selectStore(m_eventContext->slot()).ignore(); - - // update prods in the dataflow - // DP: Handles could be used. Just update what the algo wrote - std::vector<std::string> new_products; - m_scheduler->m_whiteboard->getNewDataObjects(new_products).ignore(); - for (const auto& new_product : new_products) - std::cout << "Found in WB: " << new_product << std::endl; - //dataFlow.updateDataObjectsCatalog(new_products);*/ - } - - } - //); - - m_scheduler->m_controlFlow.updateEventState(algStates, nodeDecisions); - m_scheduler->m_controlFlow.promoteToControlReadyState(algStates, nodeDecisions); - - m_aess->updateEventStatus(eventFailed, *m_eventContext); - - if(eventFailed){ - break; - } - - if(!algStates.algsPresent(AlgsExecutionStates::State::CONTROLREADY) && !algStates.allAlgsExecuted()){ - //std::cout << "WARNING: " << " not all algorithms executed for event " << m_eventContext->evt() << std::endl; - - /*std::for_each(m_scheduler->m_algList.begin(), m_scheduler->m_algList.end(), - - [&] (IAlgorithm* ialgorithm) { - uint algIndex = m_scheduler->m_algname_index_map[ialgorithm->name()]; - - if(AlgsExecutionStates::State::SCHEDULED >= algStates.algorithmState(algIndex)) - std::cout << "Event [" << m_eventContext->evt() << "] algorithm " << ialgorithm->name() - << " NOT executed" << std::endl; - - });*/ - } - } - - m_scheduler->m_finishedEvents.push(m_eventContext); - - return nullptr; - -} diff --git a/GaudiHive/src/ParallelSequentialSchedulerSvc.h b/GaudiHive/src/ParallelSequentialSchedulerSvc.h deleted file mode 100644 index 683710ab8..000000000 --- a/GaudiHive/src/ParallelSequentialSchedulerSvc.h +++ /dev/null @@ -1,144 +0,0 @@ -#ifndef GAUDIHIVE_PARALLELSEQUENTIALSCHEDULERSVC_H -#define GAUDIHIVE_PARALLELSEQUENTIALSCHEDULERSVC_H - -// Framework include files -#include "GaudiKernel/IScheduler.h" -#include "GaudiKernel/IRunable.h" -#include "GaudiKernel/Service.h" -#include "GaudiKernel/IAlgResourcePool.h" -#include "GaudiKernel/CommonMessaging.h" -#include "GaudiKernel/EventContext.h" -#include "GaudiKernel/IAlgExecStateSvc.h" - -#include "AlgResourcePool.h" -#include "ExecutionFlowManager.h" -#include "DataFlowManager.h" - -// C++ include files -#include <vector> -#include <string> -#include <unordered_map> -#include <functional> -#include <thread> - -// External libs -#include "tbb/concurrent_queue.h" -#include "tbb/task.h" -#include "tbb/task_scheduler_init.h" - -class SequentialTask; //forward declaration - - -//--------------------------------------------------------------------------- - -/**@class ParallelSequentialSchedulerSvc ParallelSequentialSchedulerSvc.h - * - * This SchedulerSvc implements the IScheduler interface. - * It executes all the algorithms in sequence for several events in flight. - * It pulls the algorithms from the AlgResourcePool - * @author Daniel Funke - * @version 0.1 - */ -class ParallelSequentialSchedulerSvc: public extends<Service, - IScheduler> { -public: - /// Constructor - ParallelSequentialSchedulerSvc( const std::string& name, ISvcLocator* svc ); - - /// Destructor - ~ParallelSequentialSchedulerSvc() override; - - /// Initialise - StatusCode initialize() override; - - /// Finalise - StatusCode finalize() override; - - /// Make an event available to the scheduler - StatusCode pushNewEvent(EventContext* eventContext) override; - - // Make multiple events available to the scheduler - StatusCode pushNewEvents(std::vector<EventContext*>& eventContexts) override; - - /// Blocks until an event is availble - StatusCode popFinishedEvent(EventContext*& eventContext) override; - - /// Try to fetch an event from the scheduler - StatusCode tryPopFinishedEvent(EventContext*& eventContext) override; - - /// Get free slots number - unsigned int freeSlots() override; - - -private: - - /// Decide if the top alglist or its flat version has to be used - bool m_useTopAlgList; - - /// Cache the list of algs to be executed - std::list<IAlgorithm*> m_algList; - - /// Queue of finished events - tbb::concurrent_bounded_queue<EventContext*> m_finishedEvents; - - /// Atomic to account for asyncronous updates by the scheduler wrt the rest - // number of events in flight - std::atomic_int m_freeSlots; - - /// A shortcut to the whiteboard - SmartIF<IHiveWhiteBoard> m_whiteboard; - - /// The whiteboard name - std::string m_whiteboardSvcName; - - /// Cache for the algorithm resource pool - SmartIF<IAlgResourcePool> m_algResourcePool; - - /// Algorithm Execution State manager - SmartIF<IAlgExecStateSvc> m_aess; - - /// Size of the threadpool initialised by TBB; a value of -1 gives TBB the freedom to choose - int m_threadPoolSize; - - //TBB scheduler - std::unique_ptr<tbb::task_scheduler_init> m_tbb_sched; - - //control flow manager - concurrency::ExecutionFlowManager m_controlFlow; - - /// Vector to bookkeep the information necessary to the index2name conversion - std::vector<std::string> m_algname_vect; - - /// Map to bookkeep the information necessary to the name2index conversion - std::unordered_map<std::string,unsigned int> m_algname_index_map; - - // Needed to queue actions on algorithm finishing and decrement algos in flight - friend class SequentialTask; - -}; - -class SequentialTask: public tbb::task { -public: - SequentialTask(ISvcLocator* svcLocator, - EventContext* eventContext, - ParallelSequentialSchedulerSvc* scheduler, - IAlgResourcePool* algPool, - IAlgExecStateSvc* aem): - - m_serviceLocator(svcLocator), - m_eventContext(eventContext), - m_scheduler(scheduler), - m_algPool(algPool), - m_aess(aem) { - - }; - tbb::task* execute() override; -private: - SmartIF<ISvcLocator> m_serviceLocator; - EventContext* m_eventContext; - SmartIF<ParallelSequentialSchedulerSvc> m_scheduler; - SmartIF<IAlgResourcePool> m_algPool; - SmartIF<IAlgExecStateSvc> m_aess; -}; - -#endif // GAUDIHIVE_PARALLELSEQUENTIALSCHEDULERSVC_H diff --git a/GaudiHive/src/RoundRobinSchedulerSvc.cpp b/GaudiHive/src/RoundRobinSchedulerSvc.cpp deleted file mode 100644 index ab0d49d20..000000000 --- a/GaudiHive/src/RoundRobinSchedulerSvc.cpp +++ /dev/null @@ -1,297 +0,0 @@ -// Framework includes -#include "GaudiKernel/SvcFactory.h" -#include "GaudiKernel/IAlgorithm.h" -#include "GaudiKernel/Algorithm.h" // will be IAlgorithm if context getter promoted to interface -#include "GaudiKernel/IProperty.h" -#include "GaudiKernel/AppReturnCode.h" - -#include "GaudiKernel/ThreadLocalContext.h" - -// C++ -#include <list> -#include <thread> -#include <csignal> - -// Local -#include "RoundRobinSchedulerSvc.h" -#include "AlgResourcePool.h" -#include "RetCodeGuard.h" - -// Instantiation of a static factory class used by clients to create instances of this service -DECLARE_SERVICE_FACTORY(RoundRobinSchedulerSvc) - -//=========================================================================== -// Infrastructure methods - -RoundRobinSchedulerSvc::RoundRobinSchedulerSvc( const std::string& name, ISvcLocator* svcLoc ): -base_class(name,svcLoc){ - declareProperty("UseTopAlgList", m_useTopAlgList=true); - declareProperty("SimultaneousEvents", m_freeSlots=1); -} - -//--------------------------------------------------------------------------- -RoundRobinSchedulerSvc::~RoundRobinSchedulerSvc(){} -//--------------------------------------------------------------------------- - -StatusCode RoundRobinSchedulerSvc::initialize(){ - - // Initialise mother class (read properties, ...) - StatusCode sc(Service::initialize()); - if (!sc.isSuccess()) - warning () << "Base class could not be initialized" << endmsg; - - // Get the algo resource pool - m_algResourcePool = serviceLocator()->service("AlgResourcePool"); - if (!m_algResourcePool.isValid()){ - error() << "Error retrieving AlgResourcePool" << endmsg; - return StatusCode::FAILURE; - } - - // Get the list of algorithms - m_algList = m_useTopAlgList ? m_algResourcePool->getTopAlgList() : m_algResourcePool->getFlatAlgList(); - info() << "Found " << m_algList.size() << " algorithms" << endmsg; - - // Fill the containers to convert algo names to index - m_algname_index_map.reserve(m_algList.size()); - m_algname_vect.reserve(m_algList.size()); - unsigned int index=0; - for (IAlgorithm* algo : m_algList){ - const std::string& name = algo->name(); - m_algname_index_map[name]=index; - m_algname_vect.emplace_back(name); - index++; - } - - //initialize control flow manager - const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>(m_algResourcePool.get()); - - m_controlFlow.initialize(algPool->getExecutionFlowGraph(), m_algname_index_map); - - m_aess = serviceLocator()->service("AlgExecStateSvc"); - if( !m_aess.isValid() ) { - fatal() << "Error retrieving AlgExecStateSvc" << endmsg; - return StatusCode::FAILURE; - } - - return StatusCode::SUCCESS; - - // prepare the event slots - // TODO ! - -} -//--------------------------------------------------------------------------- - -StatusCode RoundRobinSchedulerSvc::finalize(){ - StatusCode sc(Service::finalize()); - if (!sc.isSuccess()) - warning () << "Base class could not be finalized" << endmsg; - return sc; -} - -//--------------------------------------------------------------------------- - -/** Make an event available to the scheduler. Immediately the algortihms are - * executed. - */ -StatusCode RoundRobinSchedulerSvc::pushNewEvent(EventContext* eventContext){ - - // consistency check - if (!m_freeSlots) { - fatal() << "More contexts than slots provided" << m_freeSlots << endmsg; - return StatusCode::FAILURE; - } - - --m_freeSlots; - m_evtCtx_buffer.push_back(eventContext); - m_aess->reset(*eventContext); - - return m_freeSlots > 0 ? StatusCode::SUCCESS : processEvents(); -} - -StatusCode RoundRobinSchedulerSvc::pushNewEvents(std::vector<EventContext*>& eventContexts){ - // consistency check - if (eventContexts.size() > m_freeSlots) { - fatal() << "More contexts than slots provided" << m_freeSlots << endmsg; - return StatusCode::FAILURE; - } - m_freeSlots -= eventContexts.size(); - - m_evtCtx_buffer.insert(m_evtCtx_buffer.end(), eventContexts.begin(), eventContexts.end()); - - return m_freeSlots > 0 ? StatusCode::SUCCESS : processEvents(); -} - -//--------------------------------------------------------------------------- -StatusCode RoundRobinSchedulerSvc::processEvents(){ - StatusCode sc(StatusCode::SUCCESS); - - // Get the IProperty interface of the ApplicationMgr to pass it to RetCodeGuard - const SmartIF<IProperty> appmgr(serviceLocator()); - SmartIF<IMessageSvc> messageSvc (serviceLocator()); - - //initialize control algorithm states and decisions - AlgsExecutionStates algStates(m_algList.size(), messageSvc); - const AlgResourcePool* algPool = dynamic_cast<const AlgResourcePool*>(m_algResourcePool.get()); - std::vector<int> nodeDecisions(algPool->getExecutionFlowGraph()->getControlFlowNodeCounter(), -1); - - - m_controlFlow.updateEventState(algStates, nodeDecisions); - m_controlFlow.promoteToControlReadyState(algStates, nodeDecisions); - - //initialize data flow manager - //DataFlowManager dataFlow(m_scheduler->m_algosDependencies); - - info() << "Got " << m_evtCtx_buffer.size() << " events, starting loop" << endmsg; - - while(algStates.algsPresent(AlgsExecutionStates::State::CONTROLREADY) ){ - - debug() << "algorithms left" << endmsg; - - //std::for_each(algStates.begin(AlgsExecutionStates::State::CONTROLREADY), algStates.end(AlgsExecutionStates::State::CONTROLREADY), - - //[&] (uint algIndex) { - for(auto it = algStates.begin(AlgsExecutionStates::State::CONTROLREADY); it != algStates.end(AlgsExecutionStates::State::CONTROLREADY); ++it){ - - uint algIndex = *it; - - std::string algName = m_algname_vect[algIndex]; - - debug() << "Running algorithm [" << algIndex << "] " << algName << endmsg; - - std::vector<AlgsExecutionStates::State> algResults(m_evtCtx_buffer.size()); - - //promote algorithm to data ready - algStates.updateState(algIndex,AlgsExecutionStates::DATAREADY); - - IAlgorithm* ialgoPtr=nullptr; - m_algResourcePool->acquireAlgorithm(algName, ialgoPtr); - //promote algorithm to scheduled - algStates.updateState(algIndex,AlgsExecutionStates::SCHEDULED); - - Algorithm* algoPtr = dynamic_cast<Algorithm*> (ialgoPtr); // DP: expose the setter of the context? - algoPtr->resetExecuted(); - - for (uint i = 0; i < m_evtCtx_buffer.size(); ++i) { - if (m_aess->eventStatus(*m_evtCtx_buffer[i]) == EventStatus::Success || - m_aess->eventStatus(*m_evtCtx_buffer[i]) == EventStatus::Invalid ) { - bool eventfailed=false; - - // m_evtCtx_buffer[i]->m_thread_id = pthread_self(); - algoPtr->resetExecuted(); - algoPtr->setContext(m_evtCtx_buffer[i]); - Gaudi::Hive::setCurrentContextId(m_evtCtx_buffer[i]->slot()); - - sc = StatusCode::FAILURE; - - // Call the execute() method - try { - RetCodeGuard rcg(appmgr, Gaudi::ReturnCode::UnhandledException); - sc = ialgoPtr->sysExecute(); - if (UNLIKELY(!sc.isSuccess())) { - warning() << "Execution of algorithm " << algName << " failed for event " - << m_evtCtx_buffer[i]->evt() << endmsg; - eventfailed = true; - } - rcg.ignore(); // disarm the guard - } catch ( const GaudiException& Exception ) { - error() << ".executeEvent(): Exception with tag=" << Exception.tag() - << " thrown by " << algName << endmsg; - error() << Exception << endmsg; - eventfailed = true; - } catch ( const std::exception& Exception ) { - fatal() << ".executeEvent(): Standard std::exception thrown by " - << algName << endmsg; - error() << Exception.what() << endmsg; - eventfailed = true; - } catch(...) { - fatal() << ".executeEvent(): UNKNOWN Exception thrown by " - << algName << endmsg; - eventfailed = true; - } - m_aess->algExecState(ialgoPtr, *m_evtCtx_buffer[i]).setExecStatus( sc ); - m_aess->updateEventStatus(eventfailed,*m_evtCtx_buffer[i]); - } - - if (m_aess->algExecState(ialgoPtr,*m_evtCtx_buffer[i]).filterPassed()) { - algResults[i] = AlgsExecutionStates::State::EVTACCEPTED; - } else { - algResults[i] = AlgsExecutionStates::State::EVTREJECTED; - } - - } - - m_algResourcePool->releaseAlgorithm(algName,ialgoPtr); - - AlgsExecutionStates::State result = algResults[0]; - bool unanimous = true; - for(uint i = 1; i < algResults.size(); ++i) - if(result != algResults[i]) - unanimous = false; - - if(unanimous) - algStates.updateState(algIndex,result); - else{ - fatal() << "divergent algorithm execution" << endmsg; - fatal() << "Algorithm results: "; - for(uint i =0; i < algResults.size(); ++i){ - fatal() << i << ": " << (algResults[i] == AlgsExecutionStates::State::EVTACCEPTED ? "A" : "R") << "\t"; - if(algResults[i] == AlgsExecutionStates::State::EVTREJECTED){ - //std::cerr << m_evtCtx_buffer[i]->m_evt_num << std::endl; - } - } - fatal() << endmsg; - - sc = StatusCode::FAILURE; - } - } - //}); - - if(sc.isFailure()) - break; //abort execution of events, something went wrong - - m_controlFlow.updateEventState(algStates, nodeDecisions); - m_controlFlow.promoteToControlReadyState(algStates, nodeDecisions); - } - for (EventContext* eventContext : m_evtCtx_buffer) { - m_finishedEvents.push(eventContext); - } - - m_evtCtx_buffer.clear(); - - return sc; //TODO: define proper return value -} - -//--------------------------------------------------------------------------- -/// Blocks until an event is availble -StatusCode RoundRobinSchedulerSvc::popFinishedEvent(EventContext*& eventContext){ - - if(m_finishedEvents.empty() && !m_evtCtx_buffer.empty()) - processEvents(); - - m_finishedEvents.pop(eventContext); - m_freeSlots++; - debug() << "Popped slot " << eventContext->slot() << "(event " - << eventContext->evt() << ")" << endmsg; - return StatusCode::SUCCESS; -} - -//--------------------------------------------------------------------------- -/// Try to get a finished event, if not available just return a failure -StatusCode RoundRobinSchedulerSvc::tryPopFinishedEvent(EventContext*& eventContext){ - if (m_finishedEvents.try_pop(eventContext)){ - debug() << "Try Pop successful slot " << eventContext->slot() - << "(event " << eventContext->evt() << ")" << endmsg; - m_freeSlots++; - return StatusCode::SUCCESS; - } - return StatusCode::FAILURE; - -} -//--------------------------------------------------------------------------- - -/** Get free slots number. Given that the scheduler is sequential and its - * methods non reentrant, this is always 1. - */ -unsigned int RoundRobinSchedulerSvc::freeSlots(){return m_freeSlots;} - -//--------------------------------------------------------------------------- diff --git a/GaudiHive/src/RoundRobinSchedulerSvc.h b/GaudiHive/src/RoundRobinSchedulerSvc.h deleted file mode 100644 index bf2a86469..000000000 --- a/GaudiHive/src/RoundRobinSchedulerSvc.h +++ /dev/null @@ -1,106 +0,0 @@ -#ifndef GAUDIHIVE_ROUNDROBINSCHEDULERSVC_H -#define GAUDIHIVE_ROUNDROBINSCHEDULERSVC_H - -// Framework include files -#include "GaudiKernel/IScheduler.h" -#include "GaudiKernel/IRunable.h" -#include "GaudiKernel/Service.h" -#include "GaudiKernel/IAlgResourcePool.h" -#include "GaudiKernel/IAlgExecStateSvc.h" - -#include "AlgResourcePool.h" -#include "ExecutionFlowManager.h" -#include "DataFlowManager.h" - - -// C++ include files -#include <vector> -#include <string> -#include <unordered_map> -#include <functional> -#include <thread> - -// External libs -#include "tbb/concurrent_queue.h" - - -//--------------------------------------------------------------------------- - -/**@class RoundRobinSchedulerSvc RoundRobinSchedulerSvc.h - * - * The RoundRobinSchedulerSvc implements the IScheduler interface. - * It deals with multiple events and tries to handle all events - * algorithm type by algorithm type, using one single thread. - * It serves as simple implementation against the concurrent state machine - * and provides a test for instruction cache locality - * - * @author Benedikt Hegner - * @version 1.0 - */ -class RoundRobinSchedulerSvc: public extends<Service, - IScheduler> { -public: - /// Constructor - RoundRobinSchedulerSvc( const std::string& name, ISvcLocator* svc ); - - /// Destructor - ~RoundRobinSchedulerSvc() override; - - /// Initialise - StatusCode initialize() override; - - /// Finalise - StatusCode finalize() override; - - /// Make an event available to the scheduler - StatusCode pushNewEvent(EventContext* eventContext) override; - - // Make multiple events available to the scheduler - StatusCode pushNewEvents(std::vector<EventContext*>& eventContexts) override; - - /// Blocks until an event is availble - StatusCode popFinishedEvent(EventContext*& eventContext) override; - - /// Try to fetch an event from the scheduler - StatusCode tryPopFinishedEvent(EventContext*& eventContext) override; - - /// Get free slots number - unsigned int freeSlots() override; - - -private: - - StatusCode processEvents(); - - /// Decide if the top alglist or its flat version has to be used - bool m_useTopAlgList; - SmartIF<IAlgResourcePool> m_algResourcePool; - - /// Algorithm Execution State manager - SmartIF<IAlgExecStateSvc> m_aess; - - //control flow manager - concurrency::ExecutionFlowManager m_controlFlow; - - /// Vector to bookkeep the information necessary to the index2name conversion - std::vector<std::string> m_algname_vect; - - /// Map to bookkeep the information necessary to the name2index conversion - std::unordered_map<std::string,unsigned int> m_algname_index_map; - - /// Ugly, will disappear when the deps are declared only within the C++ code of the algos. - std::vector<std::vector<std::string>> m_algosDependencies; - - /// Cache the list of algs to be executed - std::list<IAlgorithm*> m_algList; - - /// Queue of finished events - tbb::concurrent_bounded_queue<EventContext*> m_finishedEvents; - - /// The number of free slots (0 or 1) - unsigned int m_freeSlots; - std::vector<EventContext*> m_evtCtx_buffer; - -}; - -#endif // GAUDIHIVE_ROUNDROBINSCHEDULERSVC_H diff --git a/GaudiHive/src/SequentialSchedulerSvc.cpp b/GaudiHive/src/SequentialSchedulerSvc.cpp deleted file mode 100644 index 772efbda8..000000000 --- a/GaudiHive/src/SequentialSchedulerSvc.cpp +++ /dev/null @@ -1,186 +0,0 @@ -// Framework includes -#include "GaudiKernel/SvcFactory.h" -#include "GaudiKernel/IAlgorithm.h" -#include "GaudiKernel/Algorithm.h" // will be IAlgorithm if context getter promoted to interface -#include "GaudiKernel/IProperty.h" -#include "GaudiKernel/AppReturnCode.h" - -// C++ -#include <list> -#include <thread> - -// Local -#include "SequentialSchedulerSvc.h" -#include "AlgResourcePool.h" -#include "RetCodeGuard.h" - -// Instantiation of a static factory class used by clients to create instances of this service -DECLARE_SERVICE_FACTORY(SequentialSchedulerSvc) - -//=========================================================================== -// Infrastructure methods - -SequentialSchedulerSvc::SequentialSchedulerSvc( const std::string& name, ISvcLocator* svcLoc ): - base_class(name,svcLoc), - m_eventContext(nullptr), - m_freeSlots(1){ - declareProperty("UseTopAlgList", m_useTopAlgList=true); -} - -//--------------------------------------------------------------------------- -SequentialSchedulerSvc::~SequentialSchedulerSvc(){} -//--------------------------------------------------------------------------- - -StatusCode SequentialSchedulerSvc::initialize(){ - - // Initialise mother class (read properties, ...) - StatusCode sc(Service::initialize()); - if (!sc.isSuccess()) - warning () << "Base class could not be initialized" << endmsg; - - // Get the algo resource pool - SmartIF<IAlgResourcePool> algResourcePool (serviceLocator()->service("AlgResourcePool")); - if (!algResourcePool.isValid()){ - error() << "Error retrieving AlgResourcePool" << endmsg; - return StatusCode::FAILURE; - } - - m_aess = serviceLocator()->service("AlgExecStateSvc"); - if( !m_aess.isValid() ) { - fatal() << "Error retrieving AlgExecStateSvc" << endmsg; - return StatusCode::FAILURE; - } - - // Get the list of algorithms - m_algList = m_useTopAlgList ? algResourcePool->getTopAlgList() : algResourcePool->getFlatAlgList(); - info() << "Found " << m_algList.size() << " algorithms" << endmsg; - - return StatusCode::SUCCESS; - -} -//--------------------------------------------------------------------------- - -StatusCode SequentialSchedulerSvc::finalize(){ - StatusCode sc(Service::finalize()); - if (!sc.isSuccess()) - warning () << "Base class could not be finalized" << endmsg; - return sc; -} - -//--------------------------------------------------------------------------- - -/** Make an event available to the scheduler. Immediately the algortihms are - * executed. - */ -StatusCode SequentialSchedulerSvc::pushNewEvent(EventContext* eventContext){ - - m_freeSlots--; - - debug() << "[pushNewEvent] Free slots are now: " << m_freeSlots << endmsg; - - // Call the resetExecuted() method of ALL "known" algorithms - // (From the MinimalEventLoopMgr) - SmartIF<IAlgManager> algMan(serviceLocator()); - if (LIKELY(algMan.isValid())) { - for( IAlgorithm* ialg : algMan->getAlgorithms() ) { - if (LIKELY(nullptr != ialg)) ialg->resetExecuted(); - } - } - - m_eventContext= eventContext; - bool eventfailed=false; - - for (IAlgorithm* ialgorithm : m_algList){ - - Algorithm* this_algo = dynamic_cast<Algorithm*>(ialgorithm); - if (!this_algo){ - throw GaudiException ("Cast to Algorithm failed!", - "SequentialSchedulerSvc", - StatusCode::FAILURE); - } - - debug() << "Running algorithm " << this_algo->name() << endmsg; - - // m_eventContext->m_thread_id = pthread_self(); - this_algo->setContext(m_eventContext); - - // Get the IProperty interface of the ApplicationMgr to pass it to RetCodeGuard - const SmartIF<IProperty> appmgr(serviceLocator()); - - // Call the execute() method of all top algorithms - StatusCode sc(StatusCode::FAILURE); - try { - RetCodeGuard rcg(appmgr, Gaudi::ReturnCode::UnhandledException); - sc = ialgorithm->sysExecute(); - if (UNLIKELY(!sc.isSuccess())) { - warning() << "Execution of algorithm " << ialgorithm->name() << " failed" << endmsg; - eventfailed = true; - } - rcg.ignore(); // disarm the guard - } catch ( const GaudiException& Exception ) { - error() << ".executeEvent(): Exception with tag=" << Exception.tag() - << " thrown by " << ialgorithm->name() << endmsg; - error() << Exception << endmsg; - eventfailed = true; - } catch ( const std::exception& Exception ) { - fatal() << ".executeEvent(): Standard std::exception thrown by " - << ialgorithm->name() << endmsg; - error() << Exception.what() << endmsg; - eventfailed = true; - } catch(...) { - fatal() << ".executeEvent(): UNKNOWN Exception thrown by " - << ialgorithm->name() << endmsg; - eventfailed = true; - } - - m_aess->algExecState(ialgorithm, *m_eventContext).setExecStatus( sc ); - - debug() << "Algorithm " << this_algo->name() << (eventfailed ? " failed" : " succeeded") << endmsg; - debug() << "Algorithm " << this_algo->name() << (this_algo->filterPassed() ? " passed" : " rejected") << endmsg; - - // DP it is important to propagate the failure of an event. - // We need to stop execution when this happens so that execute run can - // then receive the FAILURE - m_aess->updateEventStatus(eventfailed, *m_eventContext); - - if (eventfailed) - return StatusCode::FAILURE; - } - return StatusCode::SUCCESS; - -} - -//--------------------------------------------------------------------------- -StatusCode SequentialSchedulerSvc::pushNewEvents(std::vector<EventContext*>& eventContexts){ - StatusCode sc; - for (auto context : eventContexts){ - sc = pushNewEvent(context); - if (sc != StatusCode::SUCCESS) return sc; - } - return sc; -} - -//--------------------------------------------------------------------------- -/// Blocks until an event is availble -StatusCode SequentialSchedulerSvc::popFinishedEvent(EventContext*& eventContext){ - m_freeSlots++; - eventContext = m_eventContext; - debug() << "[popFinishedEvent] Free slots are now: " << m_freeSlots << endmsg; - return StatusCode::SUCCESS; -} - -//--------------------------------------------------------------------------- -/** The scheduler is sequential. Therefore pop and try/pop are factually the - * same. - */ -StatusCode SequentialSchedulerSvc::tryPopFinishedEvent(EventContext*& eventContext){ - return m_freeSlots == 1 ? StatusCode::FAILURE : popFinishedEvent(eventContext); -} -//--------------------------------------------------------------------------- - -/** Get free slots number. Given that the scheduler is sequential and its - * methods non reentrant, this is always 1. - */ -unsigned int SequentialSchedulerSvc::freeSlots(){return m_freeSlots;} - -//--------------------------------------------------------------------------- -- GitLab From de0766fb17bd85f1c26588029abfa56e0fa8ab39 Mon Sep 17 00:00:00 2001 From: Benedikt Hegner <hegner@cern.ch> Date: Wed, 28 Sep 2016 17:38:22 +0200 Subject: [PATCH 3/4] remove HiveEventLoopMgr; fix broken test for tool cloning --- .../ROOT_IO/WriteAndReadHandleWhiteBoard.py | 15 +- GaudiHive/CMakeLists.txt | 4 - GaudiHive/GaudiHive/HiveEventLoopMgr.h | 127 --- GaudiHive/GaudiHive/HiveTestAlgorithm.h | 3 + GaudiHive/options/BrunelScenario.py | 153 ---- GaudiHive/options/CMSSWScenario.py | 86 -- GaudiHive/options/CPUCruncher.opts | 49 -- GaudiHive/options/CPUCruncher.py | 38 - GaudiHive/options/StressHive.py | 46 - GaudiHive/options/testWhiteBoard.py | 17 +- GaudiHive/profiling/plotBacklogPyRoot.py | 2 +- GaudiHive/src/HiveEventLoopMgr.cpp | 831 ------------------ GaudiHive/src/HiveTestAlgorithm.cpp | 26 +- 13 files changed, 41 insertions(+), 1356 deletions(-) delete mode 100644 GaudiHive/GaudiHive/HiveEventLoopMgr.h delete mode 100755 GaudiHive/options/BrunelScenario.py delete mode 100644 GaudiHive/options/CMSSWScenario.py delete mode 100644 GaudiHive/options/CPUCruncher.opts delete mode 100644 GaudiHive/options/CPUCruncher.py delete mode 100644 GaudiHive/options/StressHive.py delete mode 100644 GaudiHive/src/HiveEventLoopMgr.cpp diff --git a/GaudiExamples/options/ROOT_IO/WriteAndReadHandleWhiteBoard.py b/GaudiExamples/options/ROOT_IO/WriteAndReadHandleWhiteBoard.py index bc3d1a390..5c5ec7da9 100644 --- a/GaudiExamples/options/ROOT_IO/WriteAndReadHandleWhiteBoard.py +++ b/GaudiExamples/options/ROOT_IO/WriteAndReadHandleWhiteBoard.py @@ -4,7 +4,8 @@ from Gaudi.Configuration import * from Configurables import Gaudi__RootCnvSvc as RootCnvSvc, GaudiPersistency -from Configurables import WriteHandleAlg, ReadHandleAlg, HiveWhiteBoard, HiveEventLoopMgr +from Configurables import WriteHandleAlg, ReadHandleAlg, HiveWhiteBoard, HiveSlimEventLoopMgr +from Configurables import ForwardSchedulerSvc # Output setup # - DST @@ -56,13 +57,11 @@ algoparallel = 10 whiteboard = HiveWhiteBoard("EventDataSvc", EventSlots = evtslots) -eventloopmgr = HiveEventLoopMgr(MaxEventsParallel = evtslots, - MaxAlgosParallel = algoparallel, - CloneAlgorithms = True, - DumpQueues = True, - NumThreads = algoparallel, - AlgosDependencies = [[],[product_name],[product_name]]) +slimeventloopmgr = HiveSlimEventLoopMgr() +scheduler = ForwardSchedulerSvc(MaxAlgosInFlight = algoparallel, + ThreadPoolSize = algoparallel, + OutputLevel=WARNING) # Application setup app = ApplicationMgr() @@ -80,4 +79,4 @@ app.EvtMax = 50 app.EvtSel = "NONE" # do not use any event input app.HistogramPersistency = "NONE" app.ExtSvc = [whiteboard] -app.EventLoop = eventloopmgr +app.EventLoop = slimeventloopmgr diff --git a/GaudiHive/CMakeLists.txt b/GaudiHive/CMakeLists.txt index 226a0e1f4..8ca66a5e2 100644 --- a/GaudiHive/CMakeLists.txt +++ b/GaudiHive/CMakeLists.txt @@ -27,10 +27,6 @@ gaudi_add_test(WhiteBoard FRAMEWORK options/testWhiteBoard.py TIMEOUT 120) -gaudi_add_test(CPUCruncher - FRAMEWORK options/CPUCruncher.py - TIMEOUT 120) - gaudi_add_test(WriteWhiteBoard FRAMEWORK options/WriteWhiteBoard.py TIMEOUT 120) diff --git a/GaudiHive/GaudiHive/HiveEventLoopMgr.h b/GaudiHive/GaudiHive/HiveEventLoopMgr.h deleted file mode 100644 index aacc19221..000000000 --- a/GaudiHive/GaudiHive/HiveEventLoopMgr.h +++ /dev/null @@ -1,127 +0,0 @@ -#ifndef GAUDIHIVE_HIVEEVENTLOOPMGR_H -#define GAUDIHIVE_HIVEEVENTLOOPMGR_H 1 - -// Framework include files -#include "GaudiKernel/IAlgResourcePool.h" -#include "GaudiKernel/IEvtSelector.h" -#include "GaudiKernel/IHiveWhiteBoard.h" -#include "GaudiKernel/MinimalEventLoopMgr.h" -#include "GaudiKernel/IAlgExecStateSvc.h" - -// std includes -#include <atomic> - -//include boost -#include <boost/dynamic_bitset.hpp> - -// include tbb -#include "tbb/concurrent_vector.h" -#include "tbb/concurrent_queue.h" - -// typedef for the event and algo state -typedef boost::dynamic_bitset<> state_type; - -// Forward declarations -class IIncidentSvc; -class IDataManagerSvc; -class IDataProviderSvc; - -namespace tbb { - class task_scheduler_init; -} - -class HiveEventLoopMgr : public MinimalEventLoopMgr { -public: - -protected: - /// Reference to the Event Data Service's IDataManagerSvc interface - SmartIF<IDataManagerSvc> m_evtDataMgrSvc; - /// Reference to the Event Data Service's IDataProviderSvc interface - SmartIF<IDataProviderSvc> m_evtDataSvc; - /// Reference to the Event Selector - SmartIF<IEvtSelector> m_evtSelector; - /// Event Iterator - IEvtSelector::Context* m_evtSelContext; - /// Event selector - std::string m_evtsel; - /// Reference to the Histogram Data Service - SmartIF<IDataManagerSvc> m_histoDataMgrSvc; - /// Reference to the Histogram Persistency Service - SmartIF<IConversionSvc> m_histoPersSvc; - /// Reference to the Histogram Persistency Service - SmartIF<IHiveWhiteBoard> m_whiteboard; - /// Reference to the Algorithm resource pool - SmartIF<IAlgResourcePool> m_algResourcePool; - /// Name of the Hist Pers type - std::string m_histPersName; - /// Property interface of ApplicationMgr - SmartIF<IProperty> m_appMgrProperty; - /// Algorithm Execution State Mgr - SmartIF<IAlgExecStateSvc> m_aess; - /// Flag to avoid to fire the EnvEvent incident twice in a row - /// (and also not before the first event) - bool m_endEventFired; - /// Flag to disable warning messages when using external input - bool m_warnings; - - // Variables for the concurrency - /// Maximum number of parallel running algorithms - unsigned int m_max_parallel; - /// Pointer to tbb task scheduler - tbb::task_scheduler_init* m_tbb_scheduler_init; - /// Get the input and output collections - void find_dependencies(); - /// The termination requirement - state_type m_termination_requirement; - /// All requirements - std::vector<state_type> m_all_requirements; - /// Register of input products - std::map<DataObjID,unsigned int> m_product_indices; - /// Total number of algos in flight across all events - std::atomic_uint m_total_algos_in_flight; - /// Total number of algos - unsigned int m_numberOfAlgos; - /// Dump the algorithm queues - bool m_DumpQueues; - /// Number of events in parallel - unsigned int m_evts_parallel; - /// Total numbers of threads - unsigned int m_num_threads; - /// Clone algorithms to run them simultaneously - bool m_CloneAlgorithms; - /// Algorithms Inputs - // keep room for a class hashing strings instead of strings - typedef std::vector<std::vector<std::string>> algosDependenciesCollection; - // We just need the dependencies and not the algo names. - algosDependenciesCollection m_AlgosDependencies; - // Number of products to deal with - unsigned int m_nProducts; - -public: - /// Standard Constructor - HiveEventLoopMgr(const std::string& nam, ISvcLocator* svcLoc); - /// Standard Destructor - ~HiveEventLoopMgr() override; - /// Create event address using event selector - StatusCode getEventRoot(IOpaqueAddress*& refpAddr); - - /// implementation of IService::initialize - StatusCode initialize() override; - /// implementation of IService::reinitialize - StatusCode reinitialize() override; - /// implementation of IService::stop - StatusCode stop() override; - /// implementation of IService::finalize - StatusCode finalize() override; - /// implementation of IService::nextEvent - StatusCode nextEvent(int maxevt) override; - /// implementation of IEventProcessor::executeEvent(void* par) - StatusCode executeEvent(void* par) override; - /// implementation of IEventProcessor::executeRun() - StatusCode executeRun(int maxevt) override; - - /// Decrement the number of algos in flight and put algo back in manager - maybe private - void taskFinished(IAlgorithm*& algo); - -}; -#endif // GAUDIHIVE_HIVEEVENTLOOPMGR_H diff --git a/GaudiHive/GaudiHive/HiveTestAlgorithm.h b/GaudiHive/GaudiHive/HiveTestAlgorithm.h index a2f24fde0..3a48a541f 100644 --- a/GaudiHive/GaudiHive/HiveTestAlgorithm.h +++ b/GaudiHive/GaudiHive/HiveTestAlgorithm.h @@ -47,5 +47,8 @@ class GAUDI_API HiveTestAlgorithm: public GaudiAlgorithm { std::vector<std::string> m_inputs; std::vector<std::string> m_outputs; + + std::vector<DataObjectHandle<DataObject> *> m_inputHandles; + std::vector<DataObjectHandle<DataObject> *> m_outputHandles; }; diff --git a/GaudiHive/options/BrunelScenario.py b/GaudiHive/options/BrunelScenario.py deleted file mode 100755 index 392eb9673..000000000 --- a/GaudiHive/options/BrunelScenario.py +++ /dev/null @@ -1,153 +0,0 @@ -from Gaudi.Configuration import * -# ============================================================================ -from Configurables import GaudiExamplesCommonConf, CPUCruncher,HiveEventLoopMgr, HiveWhiteBoard - -import json - -#GaudiExamplesCommonConf() -# ============================================================================ - -#------------------------------------------------------------------------------- -# Metaconfig - -NUMBEROFEVENTS = 100 -NUMBEROFEVENTSINFLIGHT = 10 -NUMBEROFALGOSINFLIGHT = 100 -NUMBEROFTHREADS = 10 -CLONEALGOS = True -DUMPQUEUES = False -SCALE = .1 -VERBOSITY = 5 - - -NumberOfEvents = NUMBEROFEVENTS -NumberOfEventsInFlight = NUMBEROFEVENTSINFLIGHT -NumberOfAlgosInFlight = NUMBEROFALGOSINFLIGHT -NumberOfThreads = NUMBEROFTHREADS -CloneAlgos = CLONEALGOS -DumpQueues = DUMPQUEUES -Scale = SCALE -Verbosity = VERBOSITY - - - -#------------------------------------------------------------------------------- - - -def load_brunel_scenario(filename): - algs = {} - timing = {} - objs = [] - curr = None - order = 0 - nodes = ('/Event', '/Event/Rec', '/Event/DAQ') - for l in open(filename).readlines(): - if l.find('StoreTracer') == 0: - if l.find('Executing Algorithm') != -1: - alg = l.split()[-1] - if alg not in algs.keys() : algs[alg] = (order, set(),set()) - curr = alg - order += 1 - elif l.find('Done with Algorithm') != -1: - curr = None - elif l.find('[EventDataSvc]') != -1 and curr: - obj = l.split()[-1] - if obj in nodes : continue - if obj.find('/Event/') == 0 : obj = obj[7:] - obj = obj.replace('/','_') - if obj not in objs : objs.append(obj) - talg = algs[curr] - if l.find('RETRIEVE') != -1: - if obj not in talg[1] : talg[1].add(obj) - elif l.find('REGOBJ') != -1: - if obj not in talg[2] : talg[2].add(obj) - if l.find("TimingAuditor") != -1: - algo = l.split()[2]#.rstrip("|") - index = 13 - if algo.endswith("|"): - index = 12 - algo = algo.rstrip("|") - if algo in algs.keys(): - timing[algo] = l.split()[index] - else: - for name in algs.keys(): - if name.startswith(algo): - timing[name] = l.split()[index] - - all_inputs = set() - all_outputs = set() - all_algos = [] - all_algos_inputs = [] - - sTiming = json.dumps(timing) - - f = open("algTimings.json", 'w') - print >> f, sTiming - - #Scale all algo timings if needed - if Scale!=-1: - for alg in timing.keys(): - old_timing = float(timing[alg]) - new_timing = old_timing*Scale; - #print "Algorithm %s: %f --> %f" %(alg, old_timing, new_timing) - timing[alg]=new_timing - - for i, (alg,deps) in enumerate(algs.items()): - if alg in ["PatPVOffline","PrsADCs"]: continue - if deps[1] or deps[2] : - inputs = [] - inputs = [item for item in deps[1] if item not in ("DAQ_ODIN","DAQ_RawEvent") and item not in deps[2]] - outputs = [item for item in deps[2]] - new_algo = CPUCruncher(alg, - avgRuntime=float(timing[alg]), - DataInputs=inputs, - DataOutputs=outputs, - OutputLevel = 6 - ) - for item in deps[1]: - all_inputs.add(item) - for item in deps[2]: - all_outputs.add(item) - all_algos.append(new_algo) - all_algos_inputs.append(inputs) - #look for the objects that haven't been provided within the job. Assume this needs to come via input - new_algo = CPUCruncher("input", - avgRuntime=1, - DataInputs=[], - DataOutputs=[item for item in all_inputs.difference(all_outputs)], - OutputLevel = 6 - ) - all_algos.append(new_algo) - all_algos_inputs.append([]) - return all_algos,all_algos_inputs - - -# Set output level threshold 2=DEBUG, 3=INFO, 4=WARNING, 5=ERROR, 6=FATAL ) -ms = MessageSvc() -ms.OutputLevel = Verbosity - -crunchers,inputs = load_brunel_scenario("Brunel.TES.trace.log") - -whiteboard = HiveWhiteBoard("EventDataSvc", - EventSlots = NumberOfEventsInFlight) - - -# Setup the Event Loop Manager -evtloop = HiveEventLoopMgr() -evtloop.MaxAlgosParallel = NumberOfAlgosInFlight -evtloop.MaxEventsParallel = NumberOfEventsInFlight -evtloop.NumThreads = NumberOfThreads -evtloop.CloneAlgorithms = CloneAlgos -evtloop.DumpQueues = DumpQueues -evtloop.AlgosDependencies = inputs - -# And the Application Manager -app = ApplicationMgr() -app.TopAlg = crunchers -app.EvtSel = "NONE" # do not use any event input -app.EvtMax = NumberOfEvents -app.EventLoop = evtloop -app.ExtSvc =[whiteboard] -#app.MessageSvcType = "TBBMessageSvc" - - diff --git a/GaudiHive/options/CMSSWScenario.py b/GaudiHive/options/CMSSWScenario.py deleted file mode 100644 index dc404e9b1..000000000 --- a/GaudiHive/options/CMSSWScenario.py +++ /dev/null @@ -1,86 +0,0 @@ -import json -from Gaudi.Configuration import * -# ============================================================================ -from Configurables import GaudiExamplesCommonConf, CPUCruncher,HiveEventLoopMgr -#GaudiExamplesCommonConf() -# ============================================================================ - -#------------------------------------------------------------------------------- -# Metaconfig - -NUMBEROFEVENTS = 1 -NUMBEROFEVENTSINFLIGHT = 1 -NUMBEROFALGOSINFLIGHT = 1000 -NUMBEROFTHREADS = 1 -CLONEALGOS = False -DUMPQUEUES = False -VERBOSITY = 3 - - -NumberOfEvents = NUMBEROFEVENTS -NumberOfEventsInFlight = NUMBEROFEVENTSINFLIGHT -NumberOfAlgosInFlight = NUMBEROFALGOSINFLIGHT -NumberOfThreads = NUMBEROFTHREADS -CloneAlgos = CLONEALGOS -DumpQueues = DUMPQUEUES -Verbosity = VERBOSITY - - -def load_CMSSW_scenario(filename): - data = open(filename).read() - workflow = eval(data) - cpu_cruncher_algos = [] - cpu_cruncher_algos_inputs = [] - all_outputs = set() - all_inputs = set() - for algo in workflow["process"]["producers"]: - # in the next two lines replace the slash as that is a reserved character in the data store - inputs = [ item["label"].replace("/","_") for item in algo["toGet"] ] - outputs = [algo["@label"].replace("/","_"),] - new_algo = CPUCruncher(algo["@label"], - avgRuntime=float(algo["eventTimes"][0]), - DataInputs = inputs, - DataOutputs = outputs - ) - - cpu_cruncher_algos.append(new_algo) - all_outputs.update(outputs) - all_inputs.update(inputs) - cpu_cruncher_algos_inputs.append(inputs) - - #look for the objects that haven't been provided within the job. Assume this needs to come via input - new_algo = CPUCruncher("input", - avgRuntime=1, - DataInputs=[], - DataOutputs=[item for item in all_inputs.difference(all_outputs)] - ) - cpu_cruncher_algos.append(new_algo) - cpu_cruncher_algos_inputs.append([]) - - print [item for item in all_inputs.difference(all_outputs)] - return cpu_cruncher_algos,cpu_cruncher_algos_inputs - -# Set output level threshold 2=DEBUG, 3=INFO, 4=WARNING, 5=ERROR, 6=FATAL ) -ms = MessageSvc() -ms.OutputLevel = Verbosity - -crunchers,inputs = load_CMSSW_scenario("CMS_multijet.json") - -# Setup the Event Loop Manager -evtloop = HiveEventLoopMgr() -evtloop.MaxAlgosParallel = NumberOfAlgosInFlight -evtloop.MaxEventsParallel = NumberOfEventsInFlight -evtloop.NumThreads = NumberOfThreads -evtloop.CloneAlgorithms = CloneAlgos -evtloop.DumpQueues = DumpQueues -evtloop.AlgosDependencies = inputs - -# And the Application Manager -app = ApplicationMgr() -app.TopAlg = crunchers -app.EvtSel = "NONE" # do not use any event input -app.EvtMax = NumberOfEvents -app.EventLoop = evtloop -#app.MessageSvcType = "TBBMessageSvc" - - diff --git a/GaudiHive/options/CPUCruncher.opts b/GaudiHive/options/CPUCruncher.opts deleted file mode 100644 index 728ea7270..000000000 --- a/GaudiHive/options/CPUCruncher.opts +++ /dev/null @@ -1,49 +0,0 @@ -//////////////////////////////////////////////////////////////// -// Example options file. -//////////////////////////////////////////////////////////////// - -AuditorSvc.Auditors = { "ChronoAuditor" }; - -// Set up the transient data store -EventDataSvc.ForceLeaves = true; -EventDataSvc.RootCLID = 1; - -//-------------------------------------------------------------- -// Private Application Configuration options -//-------------------------------------------------------------- - -// Set output level threshold 2=DEBUG, 3=INFO, 4=WARNING, 5=ERROR, 6=FATAL ) -MessageSvc.OutputLevel = 3; - -ApplicationMgr.TopAlg += { "CPUCruncher/Test1", - "CPUCruncher/Test2" }; - -Test1.varRuntime=.5; -Test1.avgRuntime=1; - -Test2.varRuntime=.1; -Test2.avgRuntime=.1; - -Test1.Inputs={}; -Test1.Outputs={"collA","collB"}; - -Test2.Inputs={"collA"}; -Test2.Outputs={"collC"}; - - - -//-------------------------------------------------------------- -// Event related parameters -//-------------------------------------------------------------- -ApplicationMgr.EvtMax = 15; // events to be processed (default is 10) -ApplicationMgr.EvtSel = "NONE"; // do not use any event input -ApplicationMgr.HistogramPersistency = "NONE"; -ApplicationMgr.StatusCodeCheck = false ; -ApplicationMgr.EventLoop = "HiveEventLoopMgr"; - -HiveEventLoopMgr.MaxAlgosParallel = 1; -HiveEventLoopMgr.MaxEventsParallel = 1; -HiveEventLoopMgr.NumThreads = 2; -HiveEventLoopMgr.CloneAlgorithms = true; -HiveEventLoopMgr.DumpQueues = false; - diff --git a/GaudiHive/options/CPUCruncher.py b/GaudiHive/options/CPUCruncher.py deleted file mode 100644 index f74722037..000000000 --- a/GaudiHive/options/CPUCruncher.py +++ /dev/null @@ -1,38 +0,0 @@ -#!/usr/bin/env gaudirun.py - -from Gaudi.Configuration import * -from Configurables import HiveWhiteBoard, HiveEventLoopMgr, CPUCruncher - -evtslots = 10 - -whiteboard = HiveWhiteBoard("EventDataSvc", - EventSlots = evtslots) - -eventloopmgr = HiveEventLoopMgr(MaxEventsParallel = evtslots, - MaxAlgosParallel = 20, - NumThreads = 8, - AlgosDependencies = [[],['a1'],['a1'],['a2','a3']]) - -a1 = CPUCruncher("A1", - varRuntime=.1, - avgRuntime=.5, - shortCalib = True ) -a1.outKeys = ['/Event/a1'] - -a2 = CPUCruncher("A2") -a2.inpKeys = ['/Event/a1'] -a2.outKeys = ['/Event/a2'] - -a3 = CPUCruncher("A3") -a3.inpKeys = ['/Event/a1'] -a3.outKeys = ['/Event/a3'] - -a4 = CPUCruncher("A4") -a4.inpKeys = ['/Event/a2','/Event/a3'] -a4.outKeys = ['/Event/a4'] - -ApplicationMgr( EvtMax = 10, - EvtSel = 'NONE', - ExtSvc =[whiteboard], - EventLoop = eventloopmgr, - TopAlg = [a1,a2,a3,a4] ) diff --git a/GaudiHive/options/StressHive.py b/GaudiHive/options/StressHive.py deleted file mode 100644 index ecd9b5db3..000000000 --- a/GaudiHive/options/StressHive.py +++ /dev/null @@ -1,46 +0,0 @@ -#!/usr/bin/env gaudirun.py - -from Gaudi.Configuration import * -# ============================================================================ -from Configurables import GaudiExamplesCommonConf, CPUCruncher,HiveEventLoopMgr_v2 -#GaudiExamplesCommonConf() -# ============================================================================ - -number_of_algos = 20 -number_of_threads = 10 -mean_time = 1 - -# ============================================================================ -# Create number_of_algos independent algos and run them -import random -auto_crunchers=[] -for i in xrange(number_of_algos): - auto_cruncher = CPUCruncher("cruncher_%s"%i, - avgRuntime=random.uniform(mean_time+.1,mean_time+.5), - DataInputs=[], - DataOutputs=["coll_%s"%i]) - auto_crunchers.append(auto_cruncher) - -# ============================================================================ - -# Setup the Event Loop Manager -evtloop = HiveEventLoopMgr_v2() -evtloop.MaxEventsParallel = number_of_threads -evtloop.NumThreads = number_of_threads -evtloop.CloneAlgorithms = True -evtloop.DumpQueues = False - -# And the Application Manager - -app = ApplicationMgr() -app.TopAlg = auto_crunchers -app.EvtSel = "NONE" # do not use any event input -app.EvtMax = 10 -app.EventLoop = evtloop; -app.MessageSvcType = "TBBMessageSvc"; - - - -# ============================================================================ -# The END -# ============================================================================ diff --git a/GaudiHive/options/testWhiteBoard.py b/GaudiHive/options/testWhiteBoard.py index 0090680fd..45f905b7a 100644 --- a/GaudiHive/options/testWhiteBoard.py +++ b/GaudiHive/options/testWhiteBoard.py @@ -1,14 +1,16 @@ from Gaudi.Configuration import * -from Configurables import HiveWhiteBoard, HiveEventLoopMgr, HiveTestAlgorithm +from Configurables import HiveWhiteBoard, HiveSlimEventLoopMgr, HiveTestAlgorithm, ForwardSchedulerSvc evtslots = 10 whiteboard = HiveWhiteBoard("EventDataSvc", EventSlots = evtslots) -eventloopmgr = HiveEventLoopMgr(MaxEventsParallel = evtslots, - MaxAlgosParallel = 20, - NumThreads = 8, - AlgosDependencies = [[],['a1'],['a1'],['a2','a3']]) + +slimeventloopmgr = HiveSlimEventLoopMgr(OutputLevel=DEBUG) + +scheduler = ForwardSchedulerSvc(MaxAlgosInFlight = 20, + ThreadPoolSize = 8, + OutputLevel=WARNING) a1 = HiveTestAlgorithm("A1", Output = ['/Event/a1']) a2 = HiveTestAlgorithm("A2", Input = ['/Event/a1'], @@ -21,5 +23,6 @@ a4 = HiveTestAlgorithm("A4", Input = ['/Event/a2','/Event/a3'], ApplicationMgr( EvtMax = 100, EvtSel = 'NONE', ExtSvc =[whiteboard], - EventLoop = eventloopmgr, - TopAlg = [a1,a2,a3,a4] ) + EventLoop = slimeventloopmgr, + TopAlg = [a1, a2, a3, a4], + MessageSvcType="InertMessageSvc") diff --git a/GaudiHive/profiling/plotBacklogPyRoot.py b/GaudiHive/profiling/plotBacklogPyRoot.py index ab616b5af..9ca9b1169 100644 --- a/GaudiHive/profiling/plotBacklogPyRoot.py +++ b/GaudiHive/profiling/plotBacklogPyRoot.py @@ -27,7 +27,7 @@ LegendDrawOpts="lp" def parseLog(logfilename): # a line looks like - #"HiveEventLoopMgr SUCCESS Event backlog (max= 3, min= 0 ) = 3" + #"HiveSlimEventLoopMgr SUCCESS Event backlog (max= 3, min= 0 ) = 3" global NEventsInFlight global NThreads ifile = open(logfilename,"r") diff --git a/GaudiHive/src/HiveEventLoopMgr.cpp b/GaudiHive/src/HiveEventLoopMgr.cpp deleted file mode 100644 index b2baede31..000000000 --- a/GaudiHive/src/HiveEventLoopMgr.cpp +++ /dev/null @@ -1,831 +0,0 @@ -#include <algorithm> -#include <tuple> - -#include "GaudiKernel/SmartIF.h" -#include "GaudiKernel/Incident.h" -#include "GaudiKernel/MsgStream.h" -#include "GaudiKernel/SvcFactory.h" -#include "GaudiKernel/DataObject.h" -#include "GaudiKernel/IAlgManager.h" -#include "GaudiKernel/IIncidentSvc.h" -#include "GaudiKernel/IEvtSelector.h" -#include "GaudiKernel/IDataManagerSvc.h" -#include "GaudiKernel/IDataProviderSvc.h" -#include "GaudiKernel/IConversionSvc.h" -#include "GaudiKernel/AppReturnCode.h" -#include "GaudiKernel/DataSvc.h" -#include "GaudiKernel/IAlgExecStateSvc.h" - -#include "HistogramAgent.h" - -// For concurrency -#include "GaudiHive/HiveEventLoopMgr.h" -#include "GaudiHive/EventSchedulingState.h" - -#include "tbb/task_scheduler_init.h" -#include "tbb/task.h" -#include "tbb/tick_count.h" - -#include "GaudiKernel/EventContext.h" -#include "GaudiKernel/Algorithm.h" - -#include "GaudiKernel/GaudiException.h" - -#include <pthread.h> // only for the tID! - - -#include <sys/resource.h> -#include <sys/times.h> - -// Instantiation of a static factory class used by clients to create instances of this service -DECLARE_SERVICE_FACTORY(HiveEventLoopMgr) - - -#define ON_DEBUG if (UNLIKELY(outputLevel() <= MSG::DEBUG)) -#define ON_VERBOSE if (UNLIKELY(outputLevel() <= MSG::VERBOSE)) - -#define DEBMSG ON_DEBUG debug() -#define VERMSG ON_VERBOSE verbose() - -///////////////////////////////////////////////// -/// *dirty* place for adding an AlgoTask wrapper -class HiveAlgoTask : public tbb::task { -public: - HiveAlgoTask(IAlgorithm* algorithm, - EventSchedulingState* scheduler, - HiveEventLoopMgr* eventloopmanager, - EventContext* ctx, - IAlgExecStateSvc* aem): m_algorithm(algorithm), - m_scheduler(scheduler), - m_eventloopmanager(eventloopmanager), - m_ctx(ctx), - m_aess(aem){}; - tbb::task* execute(); - IAlgorithm* m_algorithm; - EventSchedulingState* m_scheduler; - HiveEventLoopMgr* m_eventloopmanager; - EventContext* m_ctx; - IAlgExecStateSvc* m_aess; -}; - -tbb::task* HiveAlgoTask::execute() { - // Algorithm* this_algo = dynamic_cast<Algorithm*>(m_algorithm); - // this_algo->getContext()->m_thread_id = pthread_self(); - m_algorithm->setContext(m_ctx); - bool eventfailed(false); - StatusCode sc (StatusCode::FAILURE); - - try { - sc = m_algorithm->sysExecute(); - } catch ( ... ) { - eventfailed = true; - } - - if (sc.isFailure()) { - eventfailed = true; - } - - m_aess->algExecState(m_algorithm, *m_ctx).setExecuted(true); - m_aess->algExecState(m_algorithm, *m_ctx).setExecStatus(sc); - m_aess->updateEventStatus(eventfailed, *m_ctx); - - m_scheduler->algoFinished(); - // put back the algo into the hive algorithm manager - m_eventloopmanager->taskFinished(m_algorithm); // TODO do this with index: put index in context? - return NULL; -} - -///////////////////////////////////////////////// - -//-------------------------------------------------------------------------------------------- -// Standard Constructor -//-------------------------------------------------------------------------------------------- -HiveEventLoopMgr::HiveEventLoopMgr(const std::string& nam, ISvcLocator* svcLoc) - : MinimalEventLoopMgr(nam, svcLoc) -{ - m_histoDataMgrSvc = 0; - m_histoPersSvc = 0; - m_evtDataMgrSvc = 0; - m_evtDataSvc = 0; - m_evtSelector = 0; - m_evtSelContext = 0; - m_algResourcePool = 0; - m_endEventFired = true; - m_total_algos_in_flight=0; - m_max_parallel = 1; - m_evts_parallel = 1; - m_num_threads = 1; - m_DumpQueues = true; - m_nProducts = 0; - - // Declare properties - declareProperty("HistogramPersistency", m_histPersName = ""); - declareProperty("EvtSel", m_evtsel ); - declareProperty("Warnings",m_warnings=true, - "Set this property to false to suppress warning messages"); - declareProperty("MaxAlgosParallel", m_max_parallel ); - declareProperty("MaxEventsParallel", m_evts_parallel); - declareProperty("NumThreads", m_num_threads); - declareProperty("DumpQueues", m_DumpQueues= false); - declareProperty("CloneAlgorithms", m_CloneAlgorithms= false); - declareProperty("AlgosDependencies", m_AlgosDependencies); -} - -//-------------------------------------------------------------------------------------------- -// Standard Destructor -//-------------------------------------------------------------------------------------------- -HiveEventLoopMgr::~HiveEventLoopMgr() { - if( m_histoDataMgrSvc ) m_histoDataMgrSvc->release(); - if( m_histoPersSvc ) m_histoPersSvc->release(); - if( m_evtDataMgrSvc ) m_evtDataMgrSvc->release(); - if( m_evtDataSvc ) m_evtDataSvc->release(); - if( m_evtSelector ) m_evtSelector->release(); - if( m_evtSelContext ) delete m_evtSelContext; -} - -//-------------------------------------------------------------------------------------------- -// implementation of IAppMgrUI::initialize -//-------------------------------------------------------------------------------------------- -StatusCode HiveEventLoopMgr::initialize() { - // Initialize the base class - StatusCode sc = MinimalEventLoopMgr::initialize(); - if( !sc.isSuccess() ) { - DEBMSG << "Error Initializing base class MinimalEventLoopMgr." << endmsg; - return sc; - } - - find_dependencies(); - - // Setup access to event data services - m_evtDataMgrSvc = serviceLocator()->service("EventDataSvc"); - if( !m_evtDataMgrSvc.isValid() ) { - fatal() << "Error retrieving EventDataSvc interface IDataManagerSvc." << endmsg; - return StatusCode::FAILURE; - } - m_evtDataSvc = serviceLocator()->service("EventDataSvc"); - if( !m_evtDataSvc.isValid() ) { - fatal() << "Error retrieving EventDataSvc interface IDataProviderSvc." << endmsg; - return StatusCode::FAILURE; - } - m_whiteboard = serviceLocator()->service("EventDataSvc"); - if( !m_whiteboard.isValid() ) { - fatal() << "Error retrieving EventDataSvc interface IHiveWhiteBoard." << endmsg; - return StatusCode::FAILURE; - } - m_whiteboard->setNumberOfStores(m_evts_parallel).ignore(); - - // Obtain the IProperty of the ApplicationMgr - m_appMgrProperty = serviceLocator(); - if ( ! m_appMgrProperty.isValid() ) { - fatal() << "IProperty interface not found in ApplicationMgr." << endmsg; - return StatusCode::FAILURE; - } - - // We do not expect a Event Selector necessarily being declared - setProperty(m_appMgrProperty->getProperty("EvtSel")).ignore(); - - if( m_evtsel != "NONE" || m_evtsel.length() == 0) { - m_evtSelector = serviceLocator()->service("EventSelector"); - if( m_evtSelector.isValid() ) { - // Setup Event Selector - sc=m_evtSelector->createContext(m_evtSelContext); - if( !sc.isSuccess() ) { - fatal() << "Can not create the event selector Context." << endmsg; - return sc; - } - } - else { - fatal() << "EventSelector not found." << endmsg; - return sc; - } - } - else { - m_evtSelector = 0; - m_evtSelContext = 0; - if ( m_warnings ) { - warning() << "Unable to locate service \"EventSelector\" " << endmsg; - warning() << "No events will be processed from external input." << endmsg; - } - } - - // Setup access to histogramming services - m_histoDataMgrSvc = serviceLocator()->service("HistogramDataSvc"); - if( !m_histoDataMgrSvc.isValid() ) { - fatal() << "Error retrieving HistogramDataSvc." << endmsg; - return sc; - } - // Setup histogram persistency - m_histoPersSvc = serviceLocator()->service("HistogramPersistencySvc"); - if( !m_histoPersSvc.isValid() ) { - warning() << "Histograms cannot not be saved - though required." << endmsg; - return sc; - } - - // Setup algorithm resource pool - m_algResourcePool = serviceLocator()->service("AlgResourcePool"); - if( !m_algResourcePool.isValid() ) { - fatal() << "Error retrieving AlgResourcePool" << endmsg; - return StatusCode::FAILURE; - } - - m_aess = serviceLocator()->service("AlgExecStateSvc"); - if( !m_aess.isValid() ) { - fatal() << "Error retrieving AlgExecStateSvc" << endmsg; - return StatusCode::FAILURE; - } - - // Setup tbb task scheduler - // TODO: shouldn't be in this case - // One more for the current thread - m_tbb_scheduler_init = new tbb::task_scheduler_init(m_num_threads+1); - - return StatusCode::SUCCESS; -} -//-------------------------------------------------------------------------------------------- -// implementation of IService::reinitialize -//-------------------------------------------------------------------------------------------- -StatusCode HiveEventLoopMgr::reinitialize() { - - // Initialize the base class - StatusCode sc = MinimalEventLoopMgr::reinitialize(); - if( !sc.isSuccess() ) { - DEBMSG << "Error Initializing base class MinimalEventLoopMgr." << endmsg; - return sc; - } - - // Check to see whether a new Event Selector has been specified - setProperty(m_appMgrProperty->getProperty("EvtSel")); - if( m_evtsel != "NONE" || m_evtsel.length() == 0) { - SmartIF<IService> theSvc(serviceLocator()->service("EventSelector")); - SmartIF<IEvtSelector> theEvtSel(theSvc); - if( theEvtSel.isValid() && ( theEvtSel.get() != m_evtSelector.get() ) ) { - // Setup Event Selector - if ( m_evtSelector.get() && m_evtSelContext ) { - // Need to release context before switching to new event selector - m_evtSelector->releaseContext(m_evtSelContext); - m_evtSelContext = 0; - } - m_evtSelector = theEvtSel; - if (theSvc->FSMState() == Gaudi::StateMachine::INITIALIZED) { - sc = theSvc->reinitialize(); - if( !sc.isSuccess() ) { - error() << "Failure Reinitializing EventSelector " - << theSvc->name( ) << endmsg; - return sc; - } - } - else { - sc = theSvc->sysInitialize(); - if( !sc.isSuccess() ) { - error() << "Failure Initializing EventSelector " - << theSvc->name( ) << endmsg; - return sc; - } - } - sc = m_evtSelector->createContext(m_evtSelContext); - if( !sc.isSuccess() ) { - error() << "Can not create Context " << theSvc->name( ) << endmsg; - return sc; - } - info() << "EventSelector service changed to " - << theSvc->name( ) << endmsg; - } - else if ( m_evtSelector.isValid() ) { - if ( m_evtSelContext ) { - m_evtSelector->releaseContext(m_evtSelContext); - m_evtSelContext = 0; - } - sc = m_evtSelector->createContext(m_evtSelContext); - if( !sc.isSuccess() ) { - error() << "Can not create Context " << theSvc->name( ) << endmsg; - return sc; - } - } - } - else if ( m_evtSelector.isValid() && m_evtSelContext ) { - m_evtSelector->releaseContext(m_evtSelContext); - m_evtSelector = 0; - m_evtSelContext = 0; - } - return StatusCode::SUCCESS; -} - - -//-------------------------------------------------------------------------------------------- -// implementation of IService::stop -//-------------------------------------------------------------------------------------------- -StatusCode HiveEventLoopMgr::stop() { - if ( ! m_endEventFired ) { - // Fire pending EndEvent incident - m_incidentSvc->fireIncident(Incident(name(),IncidentType::EndEvent)); - m_endEventFired = true; - } - return MinimalEventLoopMgr::stop(); -} - -//-------------------------------------------------------------------------------------------- -// implementation of IAppMgrUI::finalize -//-------------------------------------------------------------------------------------------- -StatusCode HiveEventLoopMgr::finalize() { - StatusCode sc; - - // Finalize base class - sc = MinimalEventLoopMgr::finalize(); - if (! sc.isSuccess()) { - error() << "Error finalizing base class" << endmsg; - return sc; - } - - // Save Histograms Now - if ( m_histoPersSvc != 0 ) { - HistogramAgent agent; - sc = m_histoDataMgrSvc->traverseTree( &agent ); - if( sc.isSuccess() ) { - IDataSelector* objects = agent.selectedObjects(); - // skip /stat entry! - if ( objects->size() > 0 ) { - IDataSelector::iterator i; - for ( i = objects->begin(); i != objects->end(); i++ ) { - IOpaqueAddress* pAddr = 0; - StatusCode iret = m_histoPersSvc->createRep(*i, pAddr); - if ( iret.isSuccess() ) { - (*i)->registry()->setAddress(pAddr); - } - else { - sc = iret; - } - } - for ( i = objects->begin(); i != objects->end(); i++ ) { - IRegistry* reg = (*i)->registry(); - StatusCode iret = m_histoPersSvc->fillRepRefs(reg->address(), *i); - if ( !iret.isSuccess() ) { - sc = iret; - } - } - } - if ( sc.isSuccess() ) { - info() << "Histograms converted successfully according to request." << endmsg; - } - else { - error() << "Error while saving Histograms." << endmsg; - } - } - else { - error() << "Error while traversing Histogram data store" << endmsg; - } - } - - // Release event selector context - if ( m_evtSelector && m_evtSelContext ) { - m_evtSelector->releaseContext(m_evtSelContext).ignore(); - m_evtSelContext = 0; - } - - // Release all interfaces... - m_histoDataMgrSvc = 0; - m_histoPersSvc = 0; - - m_evtSelector = 0; - m_evtDataSvc = 0; - m_evtDataMgrSvc = 0; - - delete m_tbb_scheduler_init; - - return StatusCode::SUCCESS; -} - -//-------------------------------------------------------------------------------------------- -// implementation of executeEvent(void* par) -//-------------------------------------------------------------------------------------------- -StatusCode HiveEventLoopMgr::executeEvent(void* /*par*/) { - - // FIXME : does this ever get called??? - - // Fire BeginEvent "Incident" - m_incidentSvc->fireIncident(Incident(name(),IncidentType::BeginEvent)); - // An incident may schedule a stop, in which case is better to exit before the actual execution. - if ( m_scheduledStop ) { - always() << "Terminating event processing loop due to a stop scheduled by an incident listener" << endmsg; - return StatusCode::SUCCESS; - } - - // Execute Algorithms - m_incidentSvc->fireIncident(Incident(name(), IncidentType::BeginProcessing)); - - // Prepare the event context for concurrency - - - // Call the resetExecuted() method of ALL "known" algorithms - // (before we were reseting only the topalgs) - SmartIF<IAlgManager> algMan(serviceLocator()); - if (LIKELY(algMan.isValid())) { - for(auto ialg: algMan->getAlgorithms()) { - if (LIKELY(0 != ialg)) ialg->resetExecuted(); - } - } - - bool eventfailed = false;//run_parallel(); - - // ensure that the abortEvent flag is cleared before the next event - if (UNLIKELY(m_abortEvent)) { - DEBMSG << "AbortEvent incident fired by " - << m_abortEventSource << endmsg; - m_abortEvent = false; - } - - // Call the execute() method of all output streams - for (ListAlg::iterator ito = m_outStreamList.begin(); - ito != m_outStreamList.end(); ito++ ) { - (*ito)->resetExecuted(); - StatusCode sc; - sc = (*ito)->sysExecute(); - if (UNLIKELY(!sc.isSuccess())) { - warning() << "Execution of output stream " << (*ito)->name() - << " failed" << endmsg; - eventfailed = true; - } - } - - m_incidentSvc->fireIncident(Incident(name(), IncidentType::EndProcessing)); - - // Check if there was an error processing current event - if (UNLIKELY(eventfailed)){ - error() << "Error processing event loop." << endmsg; - return StatusCode(StatusCode::FAILURE,true); - } - return StatusCode(StatusCode::SUCCESS,true); - -} - -//-------------------------------------------------------------------------------------------- -// implementation of IEventProcessing::executeRun -//-------------------------------------------------------------------------------------------- -StatusCode HiveEventLoopMgr::executeRun( int maxevt ) { - StatusCode sc; - // initialize the base class - sc = MinimalEventLoopMgr::executeRun(maxevt); - return sc; -} - -//-------------------------------------------------------------------------------------------- -// implementation of IAppMgrUI::nextEvent -//-------------------------------------------------------------------------------------------- -// Here the loop on the events takes place. -// This is also the natural place to put the preparation of the algorithms -// contexts, which contain the event specific data. - -StatusCode HiveEventLoopMgr::nextEvent(int maxevt) { - // Collapse executeEvent and run_parallel in the same method - // TODO _very_ sporty on conditions and checks!! - - auto start_time = tbb::tick_count::now(); - auto secsFromStart = [&start_time]()->double{ - return (tbb::tick_count::now()-start_time).seconds(); - }; - - typedef std::tuple<EventContext*,EventSchedulingState*> contextSchedState_tuple; - - MsgStream log(msgSvc(), name()); - - - // Reset the application return code. - Gaudi::setAppReturnCode(m_appMgrProperty, Gaudi::ReturnCode::Success, true).ignore(); - - // Lambda to check if an event has finished - auto has_finished = [] // acquire nothing - (contextSchedState_tuple evtContext_evtstate) // argument is a tuple - { return std::get<1>(evtContext_evtstate)->hasFinished();}; // true if finished - - // Useful for the Logs - always() << "Running with " - << m_evts_parallel << " parallel events, " - << m_max_parallel << " max concurrent algorithms, " - << m_num_threads << " threads." - << endmsg; - - int n_processed_events = 0; - bool eof = false; - StatusCode sc; - - // Events in flight - std::list<contextSchedState_tuple> events_in_flight; - - // Loop until no more evts are there - - while( maxevt == -1 ? !eof : n_processed_events < maxevt ){// TODO Fix the condition in case of -1 - - const unsigned int n_events_in_flight = events_in_flight.size(); - const unsigned int n_evts_to_process = maxevt - n_processed_events - n_events_in_flight; - - unsigned int n_acquirable_events = m_evts_parallel - n_events_in_flight ; - if (n_acquirable_events > n_evts_to_process) - n_acquirable_events = n_evts_to_process; - - log << MSG::INFO << "Evts in flight: " << n_events_in_flight << endmsg; - log << MSG::INFO << "Evts processed: " << n_processed_events<< endmsg; - log << MSG::INFO << "Evts parallel: " << m_evts_parallel << endmsg; - log << MSG::INFO << "Acquirable Events are " << n_acquirable_events << endmsg; - - // Initialisation section ------------------------------------------------ - - // Loop on events to be initialised - for (unsigned int offset=0; offset< n_acquirable_events; ++offset){ - - EventContext* evtContext(new EventContext); - const int evt_num = n_processed_events + offset + n_events_in_flight; - evtContext->set(evt_num, m_whiteboard->allocateStore(evt_num) ); - m_whiteboard->selectStore(evtContext->slot()).ignore(); - - if( m_evtSelContext ) { - //---This is the "event iterator" context from EventSelector - IOpaqueAddress* pAddr = 0; - sc = getEventRoot(pAddr); - if( !sc.isSuccess() ) { - info() << "No more events in event selection " << endmsg; - eof = true; - maxevt = evt_num; // Set the maxevt to the determined maximum - break; - } - sc = m_evtDataMgrSvc->setRoot ("/Event", pAddr); - if( !sc.isSuccess() ) { - warning() << "Error declaring event root address." << endmsg; - } - } - else { - //---In case of no event selector---------------- - sc = m_evtDataMgrSvc->setRoot ("/Event", new DataObject()); - if( !sc.isSuccess() ) { - warning() << "Error declaring event root DataObject" << endmsg; - } - } - - EventSchedulingState* event_state = new EventSchedulingState(m_topAlgList.size(),m_nProducts); - events_in_flight.push_back(std::make_tuple(evtContext,event_state)); - info() << "Started event " << evt_num << " at " << secsFromStart() << endmsg; - - }// End initialisation loop on acquired events - - // End initialisation section -------------------------------------------- - - // Scheduling section ---------------------------------------------------- - auto in_flight_end = events_in_flight.end(); - auto in_flight_begin = events_in_flight.begin(); - // loop until at least one evt finished - while (in_flight_end == find_if(in_flight_begin, in_flight_end ,has_finished)){ - bool no_algo_can_run = true; - for (auto& evtContext_evtstate : events_in_flight){ // loop on evts - - EventContext* event_Context = std::get<0>(evtContext_evtstate); - EventSchedulingState* event_state = std::get<1>(evtContext_evtstate); - - // reset the execution status - m_aess->reset( *event_Context ); - - for (unsigned int algo_counter=0; algo_counter<m_topAlgList.size(); algo_counter++) { // loop on algos - // check whether all requirements/dependencies for the algorithm are fulfilled... - const state_type& algo_requirements = m_all_requirements[algo_counter]; - // Very verbose! - // log << MSG::VERBOSE << "Checking dependencies for algo " << algo_counter << ":\n" - // << " o Requirements: " << algo_requirements << std::endl - // << " o State: " << event_state->state() << endmsg; - - // ...and whether the algorithm was already started and if it can be started - bool algo_not_started_and_dependencies_there = (algo_requirements.is_subset_of(event_state->state()) && - (event_state->hasStarted(algo_counter) ) == false); - - // It could run, just the maximum number of algos in flight has been reached - if (algo_not_started_and_dependencies_there) - no_algo_can_run = false; - if (algo_not_started_and_dependencies_there && - (m_total_algos_in_flight < m_max_parallel )) { - // Pick the algorithm if available and if not and requested create one - IAlgorithm* ialgo=NULL; - // To be transferred to the algomanager, this is inefficient - ListAlg::iterator algoIt = m_topAlgList.begin(); - std::advance(algoIt, algo_counter); - if(m_algResourcePool->acquireAlgorithm(algoIt->get()->name(),ialgo)){ - log << MSG::INFO << "Launching algo " << algo_counter<< " on event " << event_Context->evt() << endmsg; - // Attach context to the algo - Algorithm* algo = dynamic_cast<Algorithm*> (ialgo); - algo->setContext(event_Context); - - tbb::task* t = new( tbb::task::allocate_root() ) - HiveAlgoTask(ialgo, event_state, this, event_Context, m_aess); - tbb::task::enqueue( *t); - - event_state->algoStarts(algo_counter); - ++m_total_algos_in_flight; - - log << MSG::INFO << "Algos in flight: " << m_total_algos_in_flight << endmsg; - } - } // End scheduling if block - - }// end loop on algo indices - - // update the event state with what has been put into the DataSvc - // std::vector<std::string> new_products; - DataObjIDColl new_products; - m_whiteboard->selectStore(event_Context->slot()).ignore(); - sc = m_whiteboard->getNewDataObjects(new_products); - if( !sc.isSuccess() ){ - warning() << "Error getting recent new products (since last time called)" << endmsg; - } - for (const auto& newProduct : new_products) { - log << MSG::DEBUG << "New Product: " << newProduct << " in the store." << endmsg; - if (m_product_indices.count( newProduct ) == 1) { // only products with dependencies upon need to be announced to other algos - log << MSG::DEBUG << " - Used as input by some algorithm. Updating the event state." << endmsg; - event_state->update_state(m_product_indices[newProduct]); - } - } - - - /* Check if we stall on the current event - * One should check if: - * - Nothing can run - * - Nothing is running - * - No new product is available - * - The event is not finished - * At this point one could claim a stall. - * The implementation poses a challenge though, which resides in the - * asyncronous termination of algorithm and potential writing in the - * store. Therefore one checks the 4 aforementioned conditions. - * Then, the store is again checked (without removing the new - * elements). If something new is there the stall is not sure - * anymore. - * Another possibility could be to check if any algo terminated - * during the checks made to the wb probably. - */ - if (no_algo_can_run && // nothing could run - m_total_algos_in_flight==0 && // nothing is running - new_products.size() == 0 && // no new product available - ! event_state->hasFinished() ){ // the event is not finished - - // Check if something arrived on the wb meanwhile - if(!m_whiteboard->newDataObjectsPresent()){ - - std::string errorMessage("No algorithm can run, " - "no algorithm in flight, " - "no new products in the store, " - "event not complete: this is a stall."); - fatal() << errorMessage << std::endl - << "Algorithms that ran for event " << event_Context->evt() << std::endl; - unsigned int algo_counter=0; - for (auto& algo : m_topAlgList){ - bool has_started = event_state->hasStarted(algo_counter); - if (has_started) - fatal() << " o " << algo->name() << " could run" << std::endl; - else - fatal() << " o " << algo->name() << " could NOT run" << std::endl; - algo_counter++; - } // End ofloop on algos - fatal() << endmsg; - throw GaudiException (errorMessage,"HiveEventLoopMgr",StatusCode::FAILURE); - } - } - }// end loop on evts in flight - }// end loop until at least one evt in flight finished - - // Remove from the in flight events the finished ones - std::list<contextSchedState_tuple>::iterator it=events_in_flight.begin(); - - while (it!=events_in_flight.end()){ - // Now proceed to deletion - if (std::get<1>(*it)->hasFinished()){ - const unsigned int evt_num = std::get<0>(*it)->evt(); - const unsigned int evt_slot = std::get<0>(*it)->slot(); - log << MSG::INFO << "Event "<< evt_num << " finished. Events in flight are " - << events_in_flight.size() << ". Processed events are " - << n_processed_events << endmsg; - info() << "Event "<< evt_num << " finished. now is " << secsFromStart() << endmsg; - - // Calculate min and max event num - unsigned int min_event_num=0xFFFFFFFF; - unsigned int max_event_num=0; - - for (auto& evtContext_evtstate : events_in_flight){ - const unsigned int evt_num = std::get<0>(evtContext_evtstate)->evt(); - // Update min and max for backlog calculation - if (evt_num > max_event_num) max_event_num=evt_num; - if (evt_num < min_event_num) min_event_num=evt_num; - } - unsigned int evt_backlog=max_event_num-min_event_num; - info() << "Event backlog (max= " << max_event_num << ", min= " - << min_event_num<<" ) = " << evt_backlog << endmsg; - - - // Output - // Call the execute() method of all output streams - for (ListAlg::iterator ito = m_outStreamList.begin(); ito != m_outStreamList.end(); ito++ ) { - (*ito)->resetExecuted(); - StatusCode sc; - sc = (*ito)->sysExecute(); - if (UNLIKELY(!sc.isSuccess())) { - warning() << "Execution of output stream " << (*ito)->name() << " failed" << endmsg; - } - } - - sc = m_whiteboard->clearStore(evt_slot); - if( !sc.isSuccess() ) { - warning() << "Clear of Event data store failed" << endmsg; - } - else { - info() << "Cleared store " << evt_slot << endmsg; - } - m_whiteboard->freeStore(evt_slot).ignore(); - - delete std::get<0>(*it); - delete std::get<1>(*it); - it=events_in_flight.erase(it) ; - - n_processed_events++; - - } else{ - ++it; - } - } - // End scheduling session ------------------------------------------------ - - } // End while loop on events - - always() << "---> Loop Finished (seconds): " << secsFromStart() <<endmsg; - - return StatusCode::SUCCESS; -} - -/// Create event address using event selector -StatusCode HiveEventLoopMgr::getEventRoot(IOpaqueAddress*& refpAddr) { - refpAddr = 0; - StatusCode sc = m_evtSelector->next(*m_evtSelContext); - if ( !sc.isSuccess() ) { - return sc; - } - // Create root address and assign address to data service - sc = m_evtSelector->createAddress(*m_evtSelContext,refpAddr); - if( !sc.isSuccess() ) { - sc = m_evtSelector->next(*m_evtSelContext); - if ( sc.isSuccess() ) { - sc = m_evtSelector->createAddress(*m_evtSelContext,refpAddr); - if ( !sc.isSuccess() ) { - warning() << "Error creating IOpaqueAddress." << endmsg; - } - } - } - return sc; -} - - -// Here because temporary -#include <iostream> - - -/// Compute dependencies between the algorithms -void -HiveEventLoopMgr::find_dependencies() { - - // Count how many products are actually requested - for (auto& thisAlgoDependencies : m_AlgosDependencies){ - m_nProducts += thisAlgoDependencies.size(); - } - const unsigned int n_algos = m_topAlgList.size(); - std::vector<state_type> all_requirements(n_algos,state_type(m_nProducts)); - - unsigned int algo_counter=0; - unsigned int input_counter=0; - - MsgStream log(msgSvc(), name()); - // loop on the dependencies - for (const auto& algoDependencies : m_AlgosDependencies){ // loop on algo dependencies lists - state_type requirements(m_nProducts); - log << MSG::DEBUG << "Algorithm " << algo_counter << " dependencies: " << endmsg; - for (const auto& dependency : algoDependencies){ // loop on dependencies - log << MSG::DEBUG << " - " << dependency << endmsg; - auto ret = m_product_indices.insert(std::pair<std::string, unsigned int>("/Event/"+dependency,input_counter)); - // insert successful means == wasn't known before. So increment counter - if (ret.second==true) ++input_counter; - // in any case the return value holds the proper product index - requirements[ret.first->second] = true; - log << MSG::DEBUG << " - Requirements now: " << requirements[ret.first->second] << endmsg; - }// end loop on single dependencies - - all_requirements[algo_counter] = requirements; - ++algo_counter; - } // end loop on algo dependencies lists - - // Loop on the product indices - log << MSG::DEBUG << "Product indices:" << endmsg; - for (auto& prod_index: m_product_indices) - log << MSG::DEBUG << " - " << prod_index.first << " " << prod_index.second << endmsg; - - m_numberOfAlgos = algo_counter; - m_all_requirements = all_requirements; - -} - -//--------------------------------------------------------------------------- - -void HiveEventLoopMgr::taskFinished(IAlgorithm*& algo){ - m_algResourcePool->releaseAlgorithm(algo->name(),algo); - --m_total_algos_in_flight; - MsgStream log(msgSvc(), name()); - log << MSG::DEBUG << "[taskFinished] Algos in flight: " << m_total_algos_in_flight << endmsg; -} diff --git a/GaudiHive/src/HiveTestAlgorithm.cpp b/GaudiHive/src/HiveTestAlgorithm.cpp index e5bf4410a..fa86006bf 100644 --- a/GaudiHive/src/HiveTestAlgorithm.cpp +++ b/GaudiHive/src/HiveTestAlgorithm.cpp @@ -47,6 +47,22 @@ StatusCode HiveTestAlgorithm::initialize() { info() << ":HiveTestAlgorithm::initialize " << endmsg; + + int i=0; + for (auto k: m_inputs) { + debug() << "adding input key " << k << endmsg; + m_inputHandles.push_back( new DataObjectHandle<DataObject>( k, Gaudi::DataHandle::Reader, this )); + declareProperty("dummy_in_" + std::to_string(i), *(m_inputHandles.back()) ); + i++; + } + + i = 0; + for (auto k: m_outputs) { + debug() << "adding output key " << k << endmsg; + m_outputHandles.push_back( new DataObjectHandle<DataObject>( k, Gaudi::DataHandle::Writer, this )); + declareProperty("dummy_out_" + std::to_string(i), *(m_outputHandles.back()) ); + i++; + } return StatusCode::SUCCESS; } @@ -59,16 +75,14 @@ HiveTestAlgorithm::execute() info() << ":HiveTestAlgorithm::getting inputs... " << evt << endmsg; - for(vector<string>::iterator i = m_inputs.begin(); i != m_inputs.end(); i++) { - MyObject* obj = get<MyObject>(*i); - info() << "Got data " << *i << " with value " << obj->getData() << endmsg; + for(auto& handle : m_inputHandles) { + auto obj = dynamic_cast<MyObject*>(handle->get()); + info() << "Got data with value " << obj->getData() << endmsg; } info() << ":HiveTestAlgorithm::registering outputs... " << evt << endmsg; - for(vector<string>::iterator i = m_outputs.begin(); i != m_outputs.end(); i++) { - put(new MyObject(1000+evt), *i); - } + for (auto & outputHandle: m_outputHandles){ outputHandle->put(new MyObject(1000+evt)); } return StatusCode::SUCCESS; } -- GitLab From e909988d2610d58666b1b929a9334974c52ad144 Mon Sep 17 00:00:00 2001 From: Charles Leggett <leggett@cern.ch> Date: Wed, 26 Oct 2016 19:28:29 +0200 Subject: [PATCH 4/4] fixed MaxAlgosInFlight default in ForwardScheduler --- GaudiHive/src/ForwardSchedulerSvc.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/GaudiHive/src/ForwardSchedulerSvc.cpp b/GaudiHive/src/ForwardSchedulerSvc.cpp index 493899c43..5470caa50 100644 --- a/GaudiHive/src/ForwardSchedulerSvc.cpp +++ b/GaudiHive/src/ForwardSchedulerSvc.cpp @@ -58,7 +58,7 @@ ForwardSchedulerSvc::ForwardSchedulerSvc( const std::string& name, ISvcLocator* declareProperty("ThreadPoolSize", m_threadPoolSize = -1 ); declareProperty("WhiteboardSvc", m_whiteboardSvcName = "EventDataSvc" ); declareProperty("IOBoundAlgSchedulerSvc", m_IOBoundAlgSchedulerSvcName = "IOBoundAlgSchedulerSvc" ); - declareProperty("MaxAlgosInFlight", m_maxAlgosInFlight = 0, "Taken from the whiteboard. Deprecated" ); + declareProperty("MaxAlgosInFlight", m_maxAlgosInFlight = 1, "Taken from the whiteboard. Deprecated" ); declareProperty("MaxIOBoundAlgosInFlight", m_maxIOBoundAlgosInFlight = 0); // XXX: CF tests. Temporary property to switch between ControlFlow implementations declareProperty("useGraphFlowManagement", m_CFNext = false ); -- GitLab