diff --git a/Control/AthenaInterprocess/AthenaInterprocess/IMPRunStop.h b/Control/AthenaInterprocess/AthenaInterprocess/IMPRunStop.h new file mode 100644 index 0000000000000000000000000000000000000000..2174c249f2ce5ad626c41eabdd38d400c7b56afd --- /dev/null +++ b/Control/AthenaInterprocess/AthenaInterprocess/IMPRunStop.h @@ -0,0 +1,19 @@ +/* + Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration +*/ + +#ifndef ATHENAINTERPROCESS_IMPRUNSTOP_H +#define ATHENAINTERPROCESS_IMPRUNSTOP_H +#include <GaudiKernel/IInterface.h> +namespace AthenaInterprocess { + class IMPRunStop : virtual public IInterface + { + public: + /// InterfaceID + DeclareInterfaceID( IMPRunStop, 1, 0 ); + + virtual bool stopScheduled() const = 0; + }; +} + +#endif diff --git a/Control/AthenaMP/src/AthMpEvtLoopMgr.cxx b/Control/AthenaMP/src/AthMpEvtLoopMgr.cxx index 6a71bb297b76033d04dd5a79c17c5fe466adcb76..23c36a603b9bf50bb67a80f2ad7acc816ebcdefa 100644 --- a/Control/AthenaMP/src/AthMpEvtLoopMgr.cxx +++ b/Control/AthenaMP/src/AthMpEvtLoopMgr.cxx @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration */ #include "AthMpEvtLoopMgr.h" @@ -320,6 +320,8 @@ StatusCode AthMpEvtLoopMgr::executeRun(int maxevt) if(sharedWriterWithFAFE && (*it)->name() == "AthMpEvtLoopMgr.SharedWriterTool") continue; (*it)->useFdsRegistry(registry); (*it)->setRandString(randStream.str()); + (*it)->setMaxEvt(maxevt); + (*it)->setMPRunStop(this); if(it==m_tools.begin()) { incSvc->fireIncident(Incident(name(),"PreFork")); // Do it only once } @@ -386,6 +388,7 @@ StatusCode AthMpEvtLoopMgr::executeRun(int maxevt) StatusCode AthMpEvtLoopMgr::stopRun() { + m_scheduledStop = true; return m_evtProcessor->stopRun(); } diff --git a/Control/AthenaMP/src/AthMpEvtLoopMgr.h b/Control/AthenaMP/src/AthMpEvtLoopMgr.h index a136948686a5e51716024310cc64c1279fed10d0..db584042e0795c7efbdfe22438bc463fb2388715 100644 --- a/Control/AthenaMP/src/AthMpEvtLoopMgr.h +++ b/Control/AthenaMP/src/AthMpEvtLoopMgr.h @@ -9,13 +9,15 @@ #include "AthenaBaseComps/AthService.h" #include "GaudiKernel/ToolHandle.h" #include "AthenaInterprocess/FdsRegistry.h" +#include "AthenaInterprocess/IMPRunStop.h" #include <memory> class IAthenaMPTool; class ISvcLocator; class ATLAS_NOT_THREAD_SAFE AthMpEvtLoopMgr : public extends<AthService, - IEventProcessor> + IEventProcessor, + AthenaInterprocess::IMPRunStop> { public: AthMpEvtLoopMgr(const std::string& name, ISvcLocator* svcLocator); @@ -31,6 +33,8 @@ class ATLAS_NOT_THREAD_SAFE AthMpEvtLoopMgr : public extends<AthService, virtual EventContext createEventContext() override; + virtual bool stopScheduled() const override {return m_scheduledStop;}; + private: ServiceHandle<IEventProcessor> m_evtProcessor; SmartIF<IService> m_evtSelector; @@ -48,6 +52,7 @@ class ATLAS_NOT_THREAD_SAFE AthMpEvtLoopMgr : public extends<AthService, unsigned int m_eventPrintoutInterval; StringArrayProperty m_execAtPreFork; pid_t m_masterPid; + bool m_scheduledStop{false}; // vectors for collecting memory samples std::vector<unsigned long> m_samplesRss; diff --git a/Control/AthenaMPTools/AthenaMPTools/IAthenaMPTool.h b/Control/AthenaMPTools/AthenaMPTools/IAthenaMPTool.h index 487c658e87f2f40ff344c3ee0b6d53801f97b2c5..adce10bd273600a2656ef7fe76f08a7e09b3124c 100644 --- a/Control/AthenaMPTools/AthenaMPTools/IAthenaMPTool.h +++ b/Control/AthenaMPTools/AthenaMPTools/IAthenaMPTool.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration */ #ifndef ATHENAMPTOOLS_IATHENAMPTOOL_H @@ -13,6 +13,10 @@ #include <vector> #include <map> +namespace AthenaInterprocess { + class IMPRunStop; +} + namespace AthenaMP { struct WorkerOutput{ std::string filename; @@ -49,6 +53,8 @@ class IAthenaMPTool : virtual public IAlgTool virtual void useFdsRegistry(std::shared_ptr<AthenaInterprocess::FdsRegistry>) = 0; virtual void setRandString(const std::string& randStr) = 0; + virtual void setMaxEvt(int maxEvt) = 0; + virtual void setMPRunStop(const AthenaInterprocess::IMPRunStop* runStop) = 0; // Brute force: kill all children virtual void killChildren() = 0; diff --git a/Control/AthenaMPTools/src/AthenaMPToolBase.cxx b/Control/AthenaMPTools/src/AthenaMPToolBase.cxx index d6fbaaf57278dbeffc0ddc57c4e090c6f2fd8ee6..ecedfaabac6cdc3e19459cd9fb4a0edaddcfe274 100644 --- a/Control/AthenaMPTools/src/AthenaMPToolBase.cxx +++ b/Control/AthenaMPTools/src/AthenaMPToolBase.cxx @@ -34,6 +34,7 @@ AthenaMPToolBase::AthenaMPToolBase(const std::string& type , const IInterface* parent) : AthAlgTool(type,name,parent) , m_nprocs(-1) + , m_maxEvt(-1) , m_subprocTopDir("") , m_subprocDirPrefix("") // To be set in the derived classes , m_evtSelName("") diff --git a/Control/AthenaMPTools/src/AthenaMPToolBase.h b/Control/AthenaMPTools/src/AthenaMPToolBase.h index 33a467c58450db56b993778500b97e8c4907789b..25b81e8bf37501919179a132d87fe396387a06c4 100644 --- a/Control/AthenaMPTools/src/AthenaMPToolBase.h +++ b/Control/AthenaMPTools/src/AthenaMPToolBase.h @@ -14,6 +14,7 @@ #include "AthenaInterprocess/ProcessGroup.h" #include "AthenaInterprocess/IMessageDecoder.h" +#include "AthenaInterprocess/IMPRunStop.h" #include <filesystem> @@ -41,6 +42,8 @@ class AthenaMPToolBase : public AthAlgTool virtual void useFdsRegistry(std::shared_ptr<AthenaInterprocess::FdsRegistry>) override; virtual void setRandString(const std::string& randStr) override; + virtual void setMaxEvt(int maxEvt) override {m_maxEvt=maxEvt;} + virtual void setMPRunStop(const AthenaInterprocess::IMPRunStop* runStop) override {m_mpRunStop=runStop;} virtual void killChildren() override; @@ -81,11 +84,13 @@ class AthenaMPToolBase : public AthAlgTool IEvtSelector* evtSelector() { return m_evtSelector; } int m_nprocs; // Number of workers spawned by the master process + int m_maxEvt; // Maximum number of events assigned to the job std::string m_subprocTopDir; // Top run directory for subprocesses std::string m_subprocDirPrefix; // For ex. "worker__" std::string m_evtSelName; // Name of the event selector AthenaInterprocess::ProcessGroup* m_processGroup; + const AthenaInterprocess::IMPRunStop* m_mpRunStop{nullptr}; ServiceHandle<IEventProcessor> m_evtProcessor; ServiceHandle<IAppMgrUI> m_appMgr; diff --git a/Control/AthenaMPTools/src/SharedEvtQueueConsumer.cxx b/Control/AthenaMPTools/src/SharedEvtQueueConsumer.cxx index cef8043aa93fdcba465c481050dba74d053feb87..54cdaf05f5d849da666688bcc0fc2594082b3c15 100644 --- a/Control/AthenaMPTools/src/SharedEvtQueueConsumer.cxx +++ b/Control/AthenaMPTools/src/SharedEvtQueueConsumer.cxx @@ -659,7 +659,11 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> SharedEvtQueueConsumer::exec_ all_ok=false; break; } - m_chronoStatSvc->chronoStop("AthenaMP_nextEvent"); + m_chronoStatSvc->chronoStop("AthenaMP_nextEvent"); + if(m_mpRunStop->stopScheduled()) { + ATH_MSG_INFO("Scheduled stop"); + break; + } } fs.close(); } diff --git a/Generators/EvgenProdTools/src/CountHepMC.cxx b/Generators/EvgenProdTools/src/CountHepMC.cxx index 331cb448f8bdc524d5fe305f754a19ad7680581c..43b6c37d1fbb5757e8b66e996082d079d1b63039 100644 --- a/Generators/EvgenProdTools/src/CountHepMC.cxx +++ b/Generators/EvgenProdTools/src/CountHepMC.cxx @@ -266,12 +266,19 @@ StatusCode CountHepMC::execute() { if (m_nPass == m_nCount) { ATH_MSG_INFO("Stopping the event processing...." << m_nPass << "/" << m_nCount); - SmartIF<IEventProcessor> apm(serviceLocator()->service("AthenaEventLoopMgr", /*createIf*/false)); - if (apm) { + // Try the MP ELM first + SmartIF<IEventProcessor> apm(serviceLocator()->service("AthMpEvtLoopMgr", /*createIf*/false)); + if(apm) { ATH_CHECK(apm->stopRun()); } else { - ATH_MSG_WARNING("No EventLoop Manager found "); + apm = serviceLocator()->service("AthenaEventLoopMgr", /*createIf*/false); + if (apm) { + ATH_CHECK(apm->stopRun()); + } + else { + ATH_MSG_WARNING("No EventLoop Manager found "); + } } }