diff --git a/Control/AthenaMPTools/src/EvtRangeProcessor.cxx b/Control/AthenaMPTools/src/EvtRangeProcessor.cxx index d9d8f01cabd84c8626dafea7d8a4e607701a9c51..c931f4f75b3defd948bba1ebd9a9728c810f2eb2 100644 --- a/Control/AthenaMPTools/src/EvtRangeProcessor.cxx +++ b/Control/AthenaMPTools/src/EvtRangeProcessor.cxx @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration */ #include "EvtRangeProcessor.h" @@ -47,6 +47,7 @@ EvtRangeProcessor::EvtRangeProcessor(const std::string& type , m_isPileup(false) , m_rankId(-1) , m_nEventsBeforeFork(0) + , m_activeWorkers(0) , m_inpFile("") , m_chronoStatSvc("ChronoStatSvc", name) , m_incidentSvc("IncidentSvc", name) @@ -120,6 +121,7 @@ int EvtRangeProcessor::makePool(int, int nprocs, const std::string& topdir) } m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs); + m_activeWorkers = m_nprocs; m_subprocTopDir = topdir; // Create rank queue and fill it @@ -236,6 +238,15 @@ StatusCode EvtRangeProcessor::wait_once(pid_t& pid) return StatusCode::FAILURE; } } + else { + // The worker finished successfully and it was the last worker. Release the Event Range Scatterer + if(--m_activeWorkers==0 + && !m_sharedFailedPidQueue->send_basic<pid_t>(-1)) { + // To Do: how to report this error to the pilot? + ATH_MSG_ERROR("Failed to release the Event Range Scatterer"); + return StatusCode::FAILURE; + } + } // Erase the pid from m_procStates map m_procStates.erase(itProcState); diff --git a/Control/AthenaMPTools/src/EvtRangeProcessor.h b/Control/AthenaMPTools/src/EvtRangeProcessor.h index 82b8268489d2fa780bf2968fdd25662379bbff50..ea997db121234623251205a380860d7d6ef76266 100644 --- a/Control/AthenaMPTools/src/EvtRangeProcessor.h +++ b/Control/AthenaMPTools/src/EvtRangeProcessor.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration */ #ifndef ATHENAMPTOOLS_EVTRANGEPROCESSOR_H @@ -66,6 +66,7 @@ class EvtRangeProcessor final : public AthenaMPToolBase bool m_isPileup; // Are we doing pile-up digitization? int m_rankId; // Each worker has its own unique RankID from the range (0,...,m_nprocs-1) int m_nEventsBeforeFork; + int m_activeWorkers; // Keep track of the number of workers std::string m_inpFile; // Cached name of the input file. To avoid reopening ServiceHandle<IChronoStatSvc> m_chronoStatSvc; diff --git a/Control/AthenaMPTools/src/EvtRangeScatterer.cxx b/Control/AthenaMPTools/src/EvtRangeScatterer.cxx index 0c3da82ad342ddadc5605c62fe4293011c84b817..e40a15b132da6e9ebd54ea19741b681e11280d68 100644 --- a/Control/AthenaMPTools/src/EvtRangeScatterer.cxx +++ b/Control/AthenaMPTools/src/EvtRangeScatterer.cxx @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration */ #include "EvtRangeScatterer.h" @@ -23,8 +23,8 @@ #include <cstdlib> EvtRangeScatterer::EvtRangeScatterer(const std::string& type - , const std::string& name - , const IInterface* parent) + , const std::string& name + , const IInterface* parent) : AthenaMPToolBase(type,name,parent) , m_processorChannel("") , m_eventRangeChannel("") @@ -212,6 +212,7 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func( std::string strReady("Ready for events"); std::string strStopProcessing("No more events"); std::string processorWaitRequest(""); + int workerPid{-1}; ATH_MSG_INFO("Starting main loop"); @@ -225,6 +226,12 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func( usleep(1000); } ATH_MSG_INFO("One of the processors is ready for the next range"); + // Get PID from the request and Update m_pid2RangeID + workerPid = std::atoi(processorWaitRequest.c_str()); + auto it = m_pid2RangeID.find(workerPid); + if(it!=m_pid2RangeID.end()) { + m_pid2RangeID.erase(it); + } } // Signal the Pilot that AthenaMP is ready for event processing @@ -239,7 +246,7 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func( eventRange = eventRange.substr(0,carRet); // Break the loop if no more ranges are expected - if(eventRange.compare(strStopProcessing)==0) { + if(eventRange.find(strStopProcessing)!=std::string::npos) { ATH_MSG_INFO("Stopped the loop. Last message from the Event Range Channel: " << eventRange); break; } @@ -335,6 +342,12 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func( usleep(1000); } ATH_MSG_INFO("One of the processors is ready for the next range"); + // Get PID from the request and Update m_pid2RangeID + workerPid = std::atoi(processorWaitRequest.c_str()); + auto it = m_pid2RangeID.find(workerPid); + if(it!=m_pid2RangeID.end()) { + m_pid2RangeID.erase(it); + } } // Send to the Processor: RangeID,evtToken[,evtToken] @@ -344,8 +357,7 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func( procReportPending++; // Get PID from the request and Update m_pid2RangeID - int pid = std::atoi(processorWaitRequest.c_str()); - m_pid2RangeID[pid] = rangeID; + m_pid2RangeID[workerPid] = rangeID; processorWaitRequest.clear(); ATH_MSG_INFO("Sent response to the processor : " << message2ProcessorStr); @@ -360,29 +372,32 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func( // We already have one processor waiting for the answer emptyMess4Processor = malloc(1); socket2Processor->send(emptyMess4Processor,1); - ATH_MSG_INFO("Set one processor free"); + ATH_MSG_INFO("Set worker PID=" << workerPid << " free"); + processorWaitRequest.clear(); } - for(int i(0); i<(processorWaitRequest.empty()?m_nprocs:m_nprocs-1); ++i) { + bool endLoop{false}; + while(true) { ATH_MSG_DEBUG("Going to set another processor free"); - while(getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending).empty()) { - pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending); + while(processorWaitRequest.empty()) { + processorWaitRequest = getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending); + if(pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending)==-1) { + endLoop = true; + break; + } usleep(1000); } + if(endLoop) break; + // Remove worker from m_pid2RangeID + workerPid = std::atoi(processorWaitRequest.c_str()); + auto it = m_pid2RangeID.find(workerPid); + if(it!=m_pid2RangeID.end()) { + m_pid2RangeID.erase(it); + } emptyMess4Processor = malloc(1); socket2Processor->send(emptyMess4Processor,1); - ATH_MSG_INFO("Set one processor free"); - } - - ATH_MSG_INFO("Still " << procReportPending << " pending reports"); - - // Final round of colecting output file names from processors - while(procReportPending>0) { - std::string strProcessorRequest = getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending); - if(!strProcessorRequest.empty()) { - ATH_MSG_WARNING("Unexpected message received from a processor at this stage : " << strProcessorRequest); - } - pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending); - usleep(1000); + ATH_MSG_INFO("Set worker PID=" << workerPid << " free"); + ATH_MSG_INFO("Still " << procReportPending << " pending reports"); + processorWaitRequest.clear(); } } @@ -509,22 +524,26 @@ std::string EvtRangeScatterer::getNewRangeRequest(yampl::ISocket* socket2Process return strProcessorRequest; } -void EvtRangeScatterer::pollFailedPidQueue(AthenaInterprocess::SharedQueue* sharedFailedPidQueue - , yampl::ISocket* socket2Pilot - , int& procReportPending) +pid_t EvtRangeScatterer::pollFailedPidQueue(AthenaInterprocess::SharedQueue* sharedFailedPidQueue + , yampl::ISocket* socket2Pilot + , int& procReportPending) { - pid_t pid; - if(sharedFailedPidQueue->try_receive_basic<pid_t>(pid)) { + pid_t pid{0}; + if(sharedFailedPidQueue->try_receive_basic<pid_t>(pid) + && pid!=-1) { ATH_MSG_INFO("Procesor with PID=" << pid << " has failed!"); - if(m_pid2RangeID.find(pid)!=m_pid2RangeID.end()) { + auto itPid = m_pid2RangeID.find(pid); + if(itPid!=m_pid2RangeID.end()) { ATH_MSG_WARNING("The failed RangeID = " << m_pid2RangeID[pid] << " will be reported to Pilot"); std::string errorStr("ERR_ATHENAMP_PROCESS " + m_pid2RangeID[pid] + ": Failed to process event range"); void* errorMessage = malloc(errorStr.size()); memcpy(errorMessage,errorStr.data(),errorStr.size()); socket2Pilot->send(errorMessage,errorStr.size()); + --procReportPending; + m_pid2RangeID.erase(pid); } - procReportPending--; ATH_MSG_INFO("Reports pending: " << procReportPending); } + return pid; } diff --git a/Control/AthenaMPTools/src/EvtRangeScatterer.h b/Control/AthenaMPTools/src/EvtRangeScatterer.h index b7b40e03f13db840ddbb7cca59005f2a15a9dca7..2b8a617e6dc3c5db5ae2d8d6a452f77660fa66c2 100644 --- a/Control/AthenaMPTools/src/EvtRangeScatterer.h +++ b/Control/AthenaMPTools/src/EvtRangeScatterer.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration */ #ifndef ATHENAMPTOOLS_TOKENSCATTERER_H @@ -56,9 +56,9 @@ class EvtRangeScatterer final : public AthenaMPToolBase , int& procReportPending); // Poll the failed PID queue to see if any of the Processors has failed - void pollFailedPidQueue(AthenaInterprocess::SharedQueue* sharedFailedPidQueue - , yampl::ISocket* socket2Pilot - , int& procReportPending); + pid_t pollFailedPidQueue(AthenaInterprocess::SharedQueue* sharedFailedPidQueue + , yampl::ISocket* socket2Pilot + , int& procReportPending); StringProperty m_processorChannel; StringProperty m_eventRangeChannel;