From 55ec3b5cbcbbe1dd8070aea683de9b103242b7ea Mon Sep 17 00:00:00 2001 From: Vakho Tsulaia <vakhtang.tsulaia@cern.ch> Date: Thu, 25 Feb 2021 07:55:25 +0100 Subject: [PATCH] Concurrent processing of events in hybrid MP+MT workers Modified `AthenaMPTools/SharedHiveEvtQueueConsumer` so that it can process events concurrently. In order to achieve that this tool is now using the `AthenaMtesEventLoopMgr` as the event processor, instead of the previously used `AthenaHiveEventLoopMgr`. Also introduced a helper abstract interface `AthenaKernel/IHybridProcessorHelper` to allow the `SharedHiveEvtQueueConsumer` tool to call certain methods of the `AthenaMtesEventLoopMgr`. --- .../AthenaCommon/python/AtlasThreadedJob.py | 7 +- .../AthenaKernel/IHybridProcessorHelper.h | 23 ++ Control/AthenaMP/python/PyComps.py | 10 +- .../src/SharedHiveEvtQueueConsumer.cxx | 326 ++++++------------ .../src/SharedHiveEvtQueueConsumer.h | 12 +- .../src/AthenaMtesEventLoopMgr.cxx | 56 ++- .../src/AthenaMtesEventLoopMgr.h | 22 +- 7 files changed, 190 insertions(+), 266 deletions(-) create mode 100644 Control/AthenaKernel/AthenaKernel/IHybridProcessorHelper.h diff --git a/Control/AthenaCommon/python/AtlasThreadedJob.py b/Control/AthenaCommon/python/AtlasThreadedJob.py index 050a048ae0ff..7bc20c431ff3 100644 --- a/Control/AthenaCommon/python/AtlasThreadedJob.py +++ b/Control/AthenaCommon/python/AtlasThreadedJob.py @@ -48,15 +48,16 @@ def _setupAtlasThreadedJob(): topSequence += SGInputLoader (FailIfNoProxy = False) AlgScheduler.setDataLoaderAlg ('SGInputLoader' ) - if theApp._opts.mtes : - # Multi-threaded Event Service + if (theApp._opts.mtes or jps.ConcurrencyFlags.NumProcs()>0): + # Either multi-threaded Event Service or hybrid MP+MT from AthenaServices.AthenaServicesConf import AthenaMtesEventLoopMgr svcMgr += AthenaMtesEventLoopMgr() svcMgr.AthenaMtesEventLoopMgr.WhiteboardSvc = "EventDataSvc" svcMgr.AthenaMtesEventLoopMgr.SchedulerSvc = AlgScheduler.getScheduler().getName() + + if theApp._opts.mtes: svcMgr.AthenaMtesEventLoopMgr.EventRangeChannel = theApp._opts.mtes_channel - theApp.EventLoop = "AthenaMtesEventLoopMgr" else: from AthenaServices.AthenaServicesConf import AthenaHiveEventLoopMgr diff --git a/Control/AthenaKernel/AthenaKernel/IHybridProcessorHelper.h b/Control/AthenaKernel/AthenaKernel/IHybridProcessorHelper.h new file mode 100644 index 000000000000..6232f854ff0b --- /dev/null +++ b/Control/AthenaKernel/AthenaKernel/IHybridProcessorHelper.h @@ -0,0 +1,23 @@ +/* + Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration +*/ + +#ifndef ATHENAKERNEL_IHYBRIDPROCESSORHELPER_H +#define ATHENAKERNEL_IHYBRIDPROCESSORHELPER_H + +/** + * @file IHybridProcessorHelper.h + * @class IHybridProcessorHelper + * @brief Helper interface for implementing hybrid MP+MT. + Used by the Hybrid Shared Event Queue Consumer MP tool + **/ + +class IHybridProcessorHelper { + public: + virtual void resetAppReturnCode() = 0; + virtual void setCurrentEventNum(int num) = 0; + virtual bool terminateLoop() = 0; + virtual int drainScheduler(int& finishedEvts, bool report) = 0; +}; + +#endif diff --git a/Control/AthenaMP/python/PyComps.py b/Control/AthenaMP/python/PyComps.py index 0a35b431740f..7d14be6f0052 100644 --- a/Control/AthenaMP/python/PyComps.py +++ b/Control/AthenaMP/python/PyComps.py @@ -1,4 +1,4 @@ -# Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration +# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration #-----Python imports---# import os, sys, shutil @@ -118,12 +118,10 @@ class MpEvtLoopMgr(AthMpEvtLoopMgr): ChunkSize=chunk_size) ] if (self.nThreads >= 1): + if(pileup): + raise Exception('Running pileup digitization in mixed MP+MT currently not supported') from AthenaMPTools.AthenaMPToolsConf import SharedHiveEvtQueueConsumer - self.Tools += [ SharedHiveEvtQueueConsumer(UseSharedReader=use_shared_reader, - UseSharedWriter=use_shared_writer, - IsPileup=pileup, - IsRoundRobin=(strategy=='RoundRobin'), - EventsBeforeFork=events_before_fork, + self.Tools += [ SharedHiveEvtQueueConsumer(EventsBeforeFork=events_before_fork, Debug=debug_worker) ] else: from AthenaMPTools.AthenaMPToolsConf import SharedEvtQueueConsumer diff --git a/Control/AthenaMPTools/src/SharedHiveEvtQueueConsumer.cxx b/Control/AthenaMPTools/src/SharedHiveEvtQueueConsumer.cxx index e8b3b51551db..e04f24293abd 100644 --- a/Control/AthenaMPTools/src/SharedHiveEvtQueueConsumer.cxx +++ b/Control/AthenaMPTools/src/SharedHiveEvtQueueConsumer.cxx @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration */ #include "SharedHiveEvtQueueConsumer.h" @@ -7,10 +7,8 @@ #include "AthenaInterprocess/ProcessGroup.h" #include "AthenaInterprocess/Incidents.h" -#include "AthenaKernel/IEventSeek.h" #include "AthenaKernel/IEvtSelectorSeek.h" -#include "AthenaKernel/IEventShare.h" -#include "AthenaKernel/IDataShare.h" +#include "AthenaKernel/IHybridProcessorHelper.h" #include "GaudiKernel/IEvtSelector.h" #include "GaudiKernel/IIoComponentMgr.h" #include "GaudiKernel/IFileMgr.h" @@ -40,31 +38,20 @@ namespace SharedHiveEvtQueueConsumer_d { } SharedHiveEvtQueueConsumer::SharedHiveEvtQueueConsumer(const std::string& type - , const std::string& name - , const IInterface* parent) + , const std::string& name + , const IInterface* parent) : AthenaMPToolBase(type,name,parent) - , m_useSharedReader(false) - , m_useSharedWriter(false) - , m_isPileup(false) - , m_isRoundRobin(false) , m_nEventsBeforeFork(0) , m_debug(false) , m_rankId(-1) , m_chronoStatSvc("ChronoStatSvc", name) - , m_evtSeek(0) , m_evtSelSeek(0) , m_evtContext(0) - , m_evtShare(0) - , m_dataShare(0) , m_sharedEventQueue(0) , m_sharedRankQueue(0) { declareInterface<IAthenaMPTool>(this); - declareProperty("UseSharedReader",m_useSharedReader); - declareProperty("UseSharedWriter",m_useSharedWriter); - declareProperty("IsPileup",m_isPileup); - declareProperty("IsRoundRobin",m_isRoundRobin); declareProperty("EventsBeforeFork",m_nEventsBeforeFork); declareProperty("Debug", m_debug); @@ -82,60 +69,19 @@ SharedHiveEvtQueueConsumer::~SharedHiveEvtQueueConsumer() StatusCode SharedHiveEvtQueueConsumer::initialize() { ATH_MSG_DEBUG("In initialize"); - if(m_isPileup) { - m_evtProcessor = ServiceHandle<IEventProcessor>("PileUpEventLoopMgr",name()); - ATH_MSG_INFO("The job running in pileup mode"); - } - else { - ATH_MSG_INFO("The job running in non-pileup mode"); - } StatusCode sc = AthenaMPToolBase::initialize(); if(!sc.isSuccess()) return sc; - // For pile-up jobs use event loop manager for seeking - // otherwise use event selector - if(m_isPileup) { - m_evtSeek = dynamic_cast<IEventSeek*>(m_evtProcessor.operator->()); - if(!m_evtSeek) { - ATH_MSG_ERROR("Unable to dyn-cast PileUpEventLoopMgr to IEventSeek"); - return StatusCode::FAILURE; - } - } - else { - sc = serviceLocator()->service(m_evtSelName,m_evtSelSeek); - if(sc.isFailure() || m_evtSelSeek==0) { - ATH_MSG_ERROR("Error retrieving IEvtSelectorSeek"); - return StatusCode::FAILURE; - } - ATH_CHECK( evtSelector()->createContext (m_evtContext) ); - } - - sc = serviceLocator()->service(m_evtSelName,m_evtShare); - if(sc.isFailure() || m_evtShare==0) { - if(m_useSharedReader) { - ATH_MSG_ERROR("Error retrieving IEventShare"); - return StatusCode::FAILURE; - } - msg(MSG::INFO) << "Could not retrieve IEventShare" << endmsg; - } - - IConversionSvc* cnvSvc = 0; - sc = serviceLocator()->service("AthenaPoolCnvSvc",cnvSvc); - m_dataShare = dynamic_cast<IDataShare*>(cnvSvc); - if(sc.isFailure() || m_dataShare==0) { - if(m_useSharedWriter) { - msg(MSG::ERROR) << "Error retrieving AthenaPoolCnvSvc " << cnvSvc << endmsg; - return StatusCode::FAILURE; - } - } - - sc = m_chronoStatSvc.retrieve(); - if (!sc.isSuccess()) { - ATH_MSG_ERROR("Cannot get ChronoStatSvc."); + sc = serviceLocator()->service(m_evtSelName,m_evtSelSeek); + if(sc.isFailure() || m_evtSelSeek==0) { + ATH_MSG_ERROR("Error retrieving IEvtSelectorSeek"); return StatusCode::FAILURE; } + ATH_CHECK( evtSelector()->createContext (m_evtContext) ); + + ATH_CHECK(m_chronoStatSvc.retrieve()); return StatusCode::SUCCESS; } @@ -285,7 +231,6 @@ SharedHiveEvtQueueConsumer::subProcessLogs(std::vector<std::string>& filenames) std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedHiveEvtQueueConsumer::bootstrap_func() { - if (m_debug) { ATH_MSG_INFO("Bootstrap worker PID " << getpid() << " - waiting for SIGUSR1"); sigset_t mask, oldmask; @@ -369,28 +314,7 @@ SharedHiveEvtQueueConsumer::bootstrap_func() return outwork; ATH_MSG_INFO("File descriptors re-opened in the AthenaMP event worker PID=" << getpid()); - - // ________________________ Make Shared Reader/Writer Client ________________________ - if(m_useSharedReader && m_evtShare) { - if(!m_evtShare->makeClient(m_rankId).isSuccess()) { - ATH_MSG_ERROR("Failed to make the event selector a share client"); - return outwork; - } else { - ATH_MSG_DEBUG("Successfully made the event selector a share client"); - } - } - - if(m_useSharedWriter && m_dataShare) { - IProperty* propertyServer = dynamic_cast<IProperty*>(m_dataShare); - if (propertyServer==0 || propertyServer->setProperty("MakeStreamingToolClient", m_rankId + 1).isFailure()) { - ATH_MSG_ERROR("Could not change AthenaPoolCnvSvc MakeClient Property"); - return outwork; - } else { - ATH_MSG_DEBUG("Successfully made the conversion service a share client"); - } - } - // ________________________ I/O reinit ________________________ if(!m_ioMgr->io_reinitialize().isSuccess()) { ATH_MSG_ERROR("Failed to reinitialize I/O"); @@ -412,24 +336,6 @@ SharedHiveEvtQueueConsumer::bootstrap_func() ATH_MSG_DEBUG("Successfully restarted the event selector"); } - // ________________________ Restart background event selectors in pileup jobs ________________________ - if(m_isPileup) { - const std::list<IService*>& service_list = serviceLocator()->getServices(); - std::list<IService*>::const_iterator itSvc = service_list.begin(), - itSvcLast = service_list.end(); - for(;itSvc!=itSvcLast;++itSvc) { - IEvtSelector* evtsel = dynamic_cast<IEvtSelector*>(*itSvc); - if(evtsel && (evtsel != m_evtSelector)) { - if((*itSvc)->start().isSuccess()) - ATH_MSG_DEBUG("Restarted event selector " << (*itSvc)->name()); - else { - ATH_MSG_ERROR("Failed to restart event selector " << (*itSvc)->name()); - return outwork; - } - } - } - } - // ________________________ Worker dir: chdir ________________________ if(chdir(worker_rundir.string().c_str())==-1) { ATH_MSG_ERROR("Failed to chdir to " << worker_rundir.string()); @@ -446,14 +352,13 @@ SharedHiveEvtQueueConsumer::bootstrap_func() /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ -std::unique_ptr<AthenaInterprocess::ScheduledWork> -SharedHiveEvtQueueConsumer::exec_func() +std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedHiveEvtQueueConsumer::exec_func() { ATH_MSG_INFO("Exec function in the AthenaMP worker PID=" << getpid()); bool all_ok(true); - if (! initHive().isSuccess()) { + if (!initHive().isSuccess()) { ATH_MSG_FATAL("unable to initialize Hive"); all_ok = false; } @@ -476,141 +381,118 @@ SharedHiveEvtQueueConsumer::exec_func() } } - - // ________________________ This is needed only for PileUp jobs __________________________________ - // ** - // If either EventsBeforeFork or SkipEvents is nonzero, first we need to advance the event selector - // by EventsBeforeFork+SkipEvents and only after that start seeking on the PileUpEventLoopMgr - // ** - if(m_isPileup && all_ok) { - if (!m_evtSelSeek) { - StatusCode sc = serviceLocator()->service(m_evtSelName,m_evtSelSeek); - if(sc.isFailure() || m_evtSelSeek==0) { - ATH_MSG_ERROR("Error retrieving Event Selector with IEvtSelectorSeek interface for PileUp job"); - all_ok = false; - } - if (evtSelector()->createContext (m_evtContext).isFailure()) { - ATH_MSG_ERROR("Error creating IEventSelector context."); - all_ok = false; - } - } - if (all_ok) { - if((m_nEventsBeforeFork+skipEvents) - && m_evtSelSeek->seek(*m_evtContext, m_nEventsBeforeFork+skipEvents).isFailure()) { - ATH_MSG_ERROR("Unable to seek to " << m_nEventsBeforeFork+skipEvents); - all_ok = false; - } - } + IHybridProcessorHelper* hybridHelper = dynamic_cast<IHybridProcessorHelper*>(m_evtProcessor.get()); + if(!hybridHelper) { + ATH_MSG_FATAL("Failed to acquire IHybridProcessorHelper interface"); + all_ok = false; } - // ________________________ This is needed only for PileUp jobs __________________________________ - + // Reset the application return code. + hybridHelper->resetAppReturnCode(); + int finishedEvts =0; + int createdEvts =0; long intmask = pow(0x100,sizeof(int))-1; // Mask for decoding event number from the value posted to the queue - int nEvt(m_nEventsBeforeFork); - int nEventsProcessed(0); long evtnumAndChunk(0); - - unsigned evtCounter(0); +// unsigned evtCounter(0); int evtnum(0), chunkSize(1); - // For the round robin we need to know the maximum number of events for this job - if(m_isRoundRobin) { - evtnumAndChunk = 1; - while(evtnumAndChunk>0) { - if(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) { - usleep(1000); - } - } - evtnumAndChunk *= -1; + ATH_MSG_INFO("Starting loop on events"); + + StatusCode sc(StatusCode::SUCCESS,true); + + while(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) { + ATH_MSG_DEBUG("Event queue is empty"); + usleep(1000); } - - if(all_ok) { - while(true) { - if(m_isRoundRobin) { - evtnum = skipEvents + m_nprocs*evtCounter + m_rankId; - if(evtnum>=evtnumAndChunk+skipEvents) { - break; - } - evtCounter++; + bool loop_ended = (evtnumAndChunk<0); + if(!loop_ended) { + ATH_MSG_DEBUG("Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec); + chunkSize = evtnumAndChunk >> (sizeof(int)*8); + evtnum = evtnumAndChunk & intmask; + ATH_MSG_INFO("Received from the queue: event num=" << evtnum << " chunk size=" << chunkSize); + hybridHelper->setCurrentEventNum(++evtnum); + } + + bool no_more_events = false; + + while(!loop_ended) { + ATH_MSG_DEBUG(" -> createdEvts: " << createdEvts); + + if(!hybridHelper->terminateLoop() // No scheduled loop termination + && !no_more_events // We are not yet done getting events + && m_schedulerSvc->freeSlots()>0) { // There are still free slots in the scheduler + ATH_MSG_DEBUG("createdEvts: " << createdEvts << ", freeslots: " << m_schedulerSvc->freeSlots()); + + auto ctx = m_evtProcessor->createEventContext(); + if(!ctx.valid()) { + sc = StatusCode::FAILURE; } else { - if(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) { - // The event queue is empty, but we should check whether there are more events to come or not - ATH_MSG_DEBUG("Event queue is empty"); - usleep(1000); - continue; - } - if(evtnumAndChunk<=0) { - evtnumAndChunk *= -1; - ATH_MSG_DEBUG("No more events are expected. The total number of events for this job = " << evtnumAndChunk); - break; - } - while (m_schedulerSvc->freeSlots() < 1) { - ATH_MSG_DEBUG("waiting for a free scheduler slot"); - usleep(1000000); - } + sc = m_evtProcessor->executeEvent(std::move(ctx)); + } - ATH_MSG_DEBUG("Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec); - chunkSize = evtnumAndChunk >> (sizeof(int)*8); - evtnum = evtnumAndChunk & intmask; - ATH_MSG_INFO("Received from the queue: event num=" << evtnum << " chunk size=" << chunkSize); + if (sc.isFailure()) { + ATH_MSG_ERROR("Terminating event processing loop due to errors"); + loop_ended = true; } - nEvt+=chunkSize; - StatusCode sc; - if(m_useSharedReader) { - sc = m_evtShare->share(evtnum); - if(sc.isFailure()){ - ATH_MSG_ERROR("Unable to share " << evtnum); - all_ok=false; - break; - } else { - ATH_MSG_INFO("Share of " << evtnum << " succeeded"); + else { + ++createdEvts; + if(--chunkSize==0) { + // Fetch next chunk + while(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) { + ATH_MSG_DEBUG("Event queue is empty"); + usleep(1000); + } + if(evtnumAndChunk<0) { + no_more_events = true; + evtnumAndChunk *= -1; + ATH_MSG_DEBUG("No more events are expected. The total number of events for this job = " << evtnumAndChunk); + } + else { + ATH_MSG_DEBUG("Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec); + chunkSize = evtnumAndChunk >> (sizeof(int)*8); + evtnum = evtnumAndChunk & intmask; + ATH_MSG_INFO("Received from the queue: event num=" << evtnum << " chunk size=" << chunkSize); + } } - } else { - m_chronoStatSvc->chronoStart("AthenaMP_seek"); - if (m_evtSeek) { - sc=m_evtSeek->seek(evtnum); - } - else { - sc=m_evtSelSeek->seek(*m_evtContext, evtnum); - } - if(sc.isFailure()){ - ATH_MSG_ERROR("Unable to seek to " << evtnum); - all_ok=false; - break; - } else { - ATH_MSG_INFO("Seek to " << evtnum << " succeeded"); + // Advance to the next event + if(!no_more_events) { + hybridHelper->setCurrentEventNum(++evtnum); } - m_chronoStatSvc->chronoStop("AthenaMP_seek"); } - m_chronoStatSvc->chronoStart("AthenaMP_nextEvent"); - sc = m_evtProcessor->nextEvent(chunkSize); - nEventsProcessed += chunkSize; - if(sc.isFailure()){ - if(chunkSize==1) - ATH_MSG_ERROR("Unable to process event " << evtnum); - else - ATH_MSG_ERROR("Unable to process the chunk (" << evtnum << "," << evtnum+chunkSize-1 << ")"); - all_ok=false; - break; + } + else { + // all the events were created but not all finished or the slots were + // all busy: the scheduler should finish its job + ATH_MSG_DEBUG("Draining the scheduler"); + + // Pull out of the scheduler the finished events + int ir = hybridHelper->drainScheduler(finishedEvts,true); + if(ir < 0) { + // some sort of error draining scheduler; + loop_ended = true; + sc = StatusCode::FAILURE; + } + else if(ir == 0) { + // no more events in scheduler + if(no_more_events) { + // We are done + loop_ended = true; + sc = StatusCode::SUCCESS; + } + } + else { + // keep going! } - m_chronoStatSvc->chronoStop("AthenaMP_nextEvent"); } - } + } // end main loop on finished events if(all_ok) { if(m_evtProcessor->executeRun(0).isFailure()) { ATH_MSG_ERROR("Could not finalize the Run"); all_ok=false; } else { - StatusCode sc; - if (m_evtSeek) { - sc = m_evtSeek->seek(evtnumAndChunk+skipEvents); - } - else { - sc = m_evtSelSeek->seek(*m_evtContext, evtnumAndChunk+skipEvents); - } - if(sc.isFailure()) { + if(m_evtSelSeek->seek(*m_evtContext, evtnumAndChunk+skipEvents).isFailure()) { ATH_MSG_DEBUG("Seek past maxevt to " << evtnumAndChunk+skipEvents << " returned failure. As expected..."); } } @@ -624,7 +506,7 @@ SharedHiveEvtQueueConsumer::exec_func() *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure AthenaMPToolBase::Func_Flag func = AthenaMPToolBase::FUNC_EXEC; memcpy((char*)outdata+sizeof(int),&func,sizeof(func)); - memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEventsProcessed,sizeof(int)); + memcpy((char*)outdata+sizeof(int)+sizeof(func),&createdEvts,sizeof(int)); outwork->data = outdata; outwork->size = outsize; @@ -753,7 +635,7 @@ SharedHiveEvtQueueConsumer::initHive() { } } - m_evtProcessor = ServiceHandle<IEventProcessor>("AthenaHiveEventLoopMgr",name()); + m_evtProcessor = ServiceHandle<IEventProcessor>("AthenaMtesEventLoopMgr",name()); if (m_evtProcessor.retrieve().isFailure()) { ATH_MSG_ERROR("could not setup " << m_evtProcessor.typeAndName()); diff --git a/Control/AthenaMPTools/src/SharedHiveEvtQueueConsumer.h b/Control/AthenaMPTools/src/SharedHiveEvtQueueConsumer.h index fbce215993bb..7a4bc164fab1 100644 --- a/Control/AthenaMPTools/src/SharedHiveEvtQueueConsumer.h +++ b/Control/AthenaMPTools/src/SharedHiveEvtQueueConsumer.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration */ #ifndef ATHENAMPTOOLS_SHAREDHIVEEVTQUEUECONSUMER_H @@ -12,10 +12,7 @@ #include "GaudiKernel/IScheduler.h" #include "GaudiKernel/IEvtSelector.h" -class IEventSeek; class IEvtSelectorSeek; -class IEventShare; -class IDataShare; class IChronoStatSvc; class SharedHiveEvtQueueConsumer final : public AthenaMPToolBase @@ -56,21 +53,14 @@ class SharedHiveEvtQueueConsumer final : public AthenaMPToolBase int decodeProcessResult(const AthenaInterprocess::ProcessResult* presult, bool doFinalize); // Properties - bool m_useSharedReader; // Work in pair with a SharedReader - bool m_useSharedWriter; // Work in pair with a SharedWriter - bool m_isPileup; // Are we doing pile-up digitization? - bool m_isRoundRobin; // Are we running in the "reproducible mode"? int m_nEventsBeforeFork; bool m_debug; int m_rankId; // Each worker has its own unique RankID from the range (0,...,m_nprocs-1) ServiceHandle<IChronoStatSvc> m_chronoStatSvc; - IEventSeek* m_evtSeek; IEvtSelectorSeek* m_evtSelSeek; IEvtSelector::Context* m_evtContext; - IEventShare* m_evtShare; - IDataShare* m_dataShare; AthenaInterprocess::SharedQueue* m_sharedEventQueue; AthenaInterprocess::SharedQueue* m_sharedRankQueue; diff --git a/Control/AthenaServices/src/AthenaMtesEventLoopMgr.cxx b/Control/AthenaServices/src/AthenaMtesEventLoopMgr.cxx index ebb677ea7ee9..543c6aae3569 100644 --- a/Control/AthenaServices/src/AthenaMtesEventLoopMgr.cxx +++ b/Control/AthenaServices/src/AthenaMtesEventLoopMgr.cxx @@ -717,14 +717,16 @@ StatusCode AthenaMtesEventLoopMgr::stop() } -StatusCode AthenaMtesEventLoopMgr::nextEvent(int /*maxevt*/) +StatusCode AthenaMtesEventLoopMgr::nextEvent(int maxevt) { + if(maxevt==0) return StatusCode::SUCCESS; + yampl::ISocketFactory* socketFactory = new yampl::SocketFactory(); // Create a socket to communicate with the Pilot - yampl::ISocket* socket2Pilot = socketFactory->createClientSocket(yampl::Channel(m_eventRangeChannel.value(),yampl::LOCAL),yampl::MOVE_DATA); + m_socket = socketFactory->createClientSocket(yampl::Channel(m_eventRangeChannel.value(),yampl::LOCAL),yampl::MOVE_DATA); // Reset the application return code. - Gaudi::setAppReturnCode(m_appMgrProperty, Gaudi::ReturnCode::Success, true).ignore(); + resetAppReturnCode(); int finishedEvts =0; int createdEvts =0; @@ -740,7 +742,7 @@ StatusCode AthenaMtesEventLoopMgr::nextEvent(int /*maxevt*/) std::unique_ptr<RangeStruct> range; while(!range) { - range = getNextRange(socket2Pilot); + range = getNextRange(m_socket); usleep(1000); } @@ -782,7 +784,7 @@ StatusCode AthenaMtesEventLoopMgr::nextEvent(int /*maxevt*/) // Fetch next event range range.reset(); while(!range) { - range = getNextRange(socket2Pilot); + range = getNextRange(m_socket); usleep(1000); } if(range->eventRangeID.empty()) { @@ -803,7 +805,7 @@ StatusCode AthenaMtesEventLoopMgr::nextEvent(int /*maxevt*/) debug() << "Draining the scheduler" << endmsg; // Pull out of the scheduler the finished events - int ir = drainScheduler(finishedEvts,socket2Pilot); + int ir = drainScheduler(finishedEvts,true); if(ir < 0) { // some sort of error draining scheduler; loop_ended = true; @@ -825,7 +827,8 @@ StatusCode AthenaMtesEventLoopMgr::nextEvent(int /*maxevt*/) info() << "---> Loop Finished (seconds): " << secsFromStart() <<endmsg; - delete socket2Pilot; + delete m_socket; + m_socket=nullptr; delete socketFactory; return sc; } @@ -1204,10 +1207,22 @@ EventContext AthenaMtesEventLoopMgr::createEventContext() { return ctx; } +void AthenaMtesEventLoopMgr::resetAppReturnCode() +{ + Gaudi::setAppReturnCode(m_appMgrProperty, Gaudi::ReturnCode::Success, true).ignore(); +} + +void AthenaMtesEventLoopMgr::setCurrentEventNum(int num) { + m_currentEvntNum = num; +} + +bool AthenaMtesEventLoopMgr::terminateLoop() { + return m_terminateLoop; +} //--------------------------------------------------------------------------- int -AthenaMtesEventLoopMgr::drainScheduler(int& finishedEvts,yampl::ISocket* socket){ +AthenaMtesEventLoopMgr::drainScheduler(int& finishedEvts,bool report){ StatusCode sc(StatusCode::SUCCESS); @@ -1273,20 +1288,23 @@ AthenaMtesEventLoopMgr::drainScheduler(int& finishedEvts,yampl::ISocket* socket) // Some code still needs global context in addition to that passed in the incident Gaudi::Hive::setCurrentContext( *thisFinishedEvtContext ); + info() << "Firing EndProcessing" << endmsg; m_incidentSvc->fireIncident(Incident(name(), IncidentType::EndProcessing, *thisFinishedEvtContext )); - // If we completed an event range, then report it to the pilot - OutputStreamSequencerSvc::RangeReport_ptr rangeReport = m_outSeqSvc->getRangeReport(); - if(rangeReport) { - std::string outputFileReport = rangeReport->second + std::string(",ID:") - + rangeReport->first + std::string(",CPU:N/A,WALL:N/A"); - if( not m_inTestMode ) { - // In standalone test mode there is no pilot to talk to - void* message2pilot = malloc(outputFileReport.size()); - memcpy(message2pilot,outputFileReport.data(),outputFileReport.size()); - socket->send(message2pilot,outputFileReport.size()); + if(report) { + // If we completed an event range, then report it to the pilot + OutputStreamSequencerSvc::RangeReport_ptr rangeReport = m_outSeqSvc->getRangeReport(); + if(rangeReport) { + std::string outputFileReport = rangeReport->second + std::string(",ID:") + + rangeReport->first + std::string(",CPU:N/A,WALL:N/A"); + if( not m_inTestMode ) { + // In standalone test mode there is no pilot to talk to + void* message2pilot = malloc(outputFileReport.size()); + memcpy(message2pilot,outputFileReport.data(),outputFileReport.size()); + m_socket->send(message2pilot,outputFileReport.size()); + } + info() << "Reported the output " << outputFileReport << endmsg; } - info() << "Reported the output " << outputFileReport << endmsg; } debug() << "Clearing slot " << thisFinishedEvtContext->slot() diff --git a/Control/AthenaServices/src/AthenaMtesEventLoopMgr.h b/Control/AthenaServices/src/AthenaMtesEventLoopMgr.h index c709563c0f0d..b7e559ea0276 100644 --- a/Control/AthenaServices/src/AthenaMtesEventLoopMgr.h +++ b/Control/AthenaServices/src/AthenaMtesEventLoopMgr.h @@ -1,7 +1,7 @@ // -*- C++ -*- /* - Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration */ #ifndef ATHENASERVICES_ATHENAMTESEVENTLOOPMGR_H @@ -23,6 +23,7 @@ #include "AthenaKernel/IEventSeek.h" #include "AthenaKernel/ICollectionSize.h" #include "AthenaKernel/IConditionsCleanerSvc.h" +#include "AthenaKernel/IHybridProcessorHelper.h" #include "StoreGate/ActiveStoreSvc.h" #include <memory> @@ -52,6 +53,7 @@ class AthenaMtesEventLoopMgr : virtual public IEventSeek, virtual public ICollectionSize, virtual public IIncidentListener, + virtual public IHybridProcessorHelper, public MinimalEventLoopMgr, public Athena::TimeoutMaster { @@ -167,10 +169,6 @@ protected: StatusCode clearWBSlot(int evtSlot); /// Declare the root address of the event int declareEventRootAddress(EventContext&); - /// Create event context - virtual EventContext createEventContext() override; - /// Drain the scheduler from all actions that may be queued - int drainScheduler(int& finishedEvents,yampl::ISocket* socket); /// Instance of the incident listener waiting for AbortEvent. SmartIF< IIncidentListener > m_abortEventListener; /// Name of the scheduler to be used @@ -201,6 +199,8 @@ public: virtual StatusCode finalize() override; /// implementation of IAppMgrUI::nextEvent. maxevt==0 returns immediately virtual StatusCode nextEvent(int maxevt) override; + /// implementation of IEventProcessor::createEventContext() + virtual EventContext createEventContext() override; /// implementation of IEventProcessor::executeEvent(void* par) virtual StatusCode executeEvent( EventContext&& ctx ) override; /// implementation of IEventProcessor::executeRun(int maxevt) @@ -220,6 +220,15 @@ public: /// IIncidentListenet interfaces virtual void handle(const Incident& inc) override; + /// Reset the application return code + virtual void resetAppReturnCode() override; + + virtual void setCurrentEventNum(int num) override; + virtual bool terminateLoop() override; + + /// Drain the scheduler from all actions that may be queued + virtual int drainScheduler(int& finishedEvents, bool report) override; + /// interface dispatcher virtual StatusCode queryInterface( const InterfaceID& riid, void** ppvInterface ) override; @@ -294,6 +303,9 @@ private: // Hopefully a temporary measurement. For the time being we cannot // support event ranges from different input files. std::string m_pfn{""}; + + // For the event service running: + yampl::ISocket* m_socket{nullptr}; }; #endif // ATHENASERVICES_ATHENAHIVEEVENTLOOPMGR_H -- GitLab