Skip to content
Snippets Groups Projects
Commit 31216d3a authored by Johannes Elmsheuser's avatar Johannes Elmsheuser
Browse files

Merge branch 'master-mpes' into 'master'

Bugfix for the Event Service

See merge request atlas/athena!39410
parents 01df9603 f2a79b01
No related branches found
No related tags found
No related merge requests found
/*
Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
*/
#include "EvtRangeProcessor.h"
......@@ -47,6 +47,7 @@ EvtRangeProcessor::EvtRangeProcessor(const std::string& type
, m_isPileup(false)
, m_rankId(-1)
, m_nEventsBeforeFork(0)
, m_activeWorkers(0)
, m_inpFile("")
, m_chronoStatSvc("ChronoStatSvc", name)
, m_incidentSvc("IncidentSvc", name)
......@@ -120,6 +121,7 @@ int EvtRangeProcessor::makePool(int, int nprocs, const std::string& topdir)
}
m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs);
m_activeWorkers = m_nprocs;
m_subprocTopDir = topdir;
// Create rank queue and fill it
......@@ -236,6 +238,15 @@ StatusCode EvtRangeProcessor::wait_once(pid_t& pid)
return StatusCode::FAILURE;
}
}
else {
// The worker finished successfully and it was the last worker. Release the Event Range Scatterer
if(--m_activeWorkers==0
&& !m_sharedFailedPidQueue->send_basic<pid_t>(-1)) {
// To Do: how to report this error to the pilot?
ATH_MSG_ERROR("Failed to release the Event Range Scatterer");
return StatusCode::FAILURE;
}
}
// Erase the pid from m_procStates map
m_procStates.erase(itProcState);
......
/*
Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration
Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
*/
#ifndef ATHENAMPTOOLS_EVTRANGEPROCESSOR_H
......@@ -66,6 +66,7 @@ class EvtRangeProcessor final : public AthenaMPToolBase
bool m_isPileup; // Are we doing pile-up digitization?
int m_rankId; // Each worker has its own unique RankID from the range (0,...,m_nprocs-1)
int m_nEventsBeforeFork;
int m_activeWorkers; // Keep track of the number of workers
std::string m_inpFile; // Cached name of the input file. To avoid reopening
ServiceHandle<IChronoStatSvc> m_chronoStatSvc;
......
/*
Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
*/
#include "EvtRangeScatterer.h"
......@@ -23,8 +23,8 @@
#include <cstdlib>
EvtRangeScatterer::EvtRangeScatterer(const std::string& type
, const std::string& name
, const IInterface* parent)
, const std::string& name
, const IInterface* parent)
: AthenaMPToolBase(type,name,parent)
, m_processorChannel("")
, m_eventRangeChannel("")
......@@ -212,6 +212,7 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func(
std::string strReady("Ready for events");
std::string strStopProcessing("No more events");
std::string processorWaitRequest("");
int workerPid{-1};
ATH_MSG_INFO("Starting main loop");
......@@ -225,6 +226,12 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func(
usleep(1000);
}
ATH_MSG_INFO("One of the processors is ready for the next range");
// Get PID from the request and Update m_pid2RangeID
workerPid = std::atoi(processorWaitRequest.c_str());
auto it = m_pid2RangeID.find(workerPid);
if(it!=m_pid2RangeID.end()) {
m_pid2RangeID.erase(it);
}
}
// Signal the Pilot that AthenaMP is ready for event processing
......@@ -239,7 +246,7 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func(
eventRange = eventRange.substr(0,carRet);
// Break the loop if no more ranges are expected
if(eventRange.compare(strStopProcessing)==0) {
if(eventRange.find(strStopProcessing)!=std::string::npos) {
ATH_MSG_INFO("Stopped the loop. Last message from the Event Range Channel: " << eventRange);
break;
}
......@@ -335,6 +342,12 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func(
usleep(1000);
}
ATH_MSG_INFO("One of the processors is ready for the next range");
// Get PID from the request and Update m_pid2RangeID
workerPid = std::atoi(processorWaitRequest.c_str());
auto it = m_pid2RangeID.find(workerPid);
if(it!=m_pid2RangeID.end()) {
m_pid2RangeID.erase(it);
}
}
// Send to the Processor: RangeID,evtToken[,evtToken]
......@@ -344,8 +357,7 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func(
procReportPending++;
// Get PID from the request and Update m_pid2RangeID
int pid = std::atoi(processorWaitRequest.c_str());
m_pid2RangeID[pid] = rangeID;
m_pid2RangeID[workerPid] = rangeID;
processorWaitRequest.clear();
ATH_MSG_INFO("Sent response to the processor : " << message2ProcessorStr);
......@@ -360,29 +372,32 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func(
// We already have one processor waiting for the answer
emptyMess4Processor = malloc(1);
socket2Processor->send(emptyMess4Processor,1);
ATH_MSG_INFO("Set one processor free");
ATH_MSG_INFO("Set worker PID=" << workerPid << " free");
processorWaitRequest.clear();
}
for(int i(0); i<(processorWaitRequest.empty()?m_nprocs:m_nprocs-1); ++i) {
bool endLoop{false};
while(true) {
ATH_MSG_DEBUG("Going to set another processor free");
while(getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending).empty()) {
pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending);
while(processorWaitRequest.empty()) {
processorWaitRequest = getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
if(pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending)==-1) {
endLoop = true;
break;
}
usleep(1000);
}
if(endLoop) break;
// Remove worker from m_pid2RangeID
workerPid = std::atoi(processorWaitRequest.c_str());
auto it = m_pid2RangeID.find(workerPid);
if(it!=m_pid2RangeID.end()) {
m_pid2RangeID.erase(it);
}
emptyMess4Processor = malloc(1);
socket2Processor->send(emptyMess4Processor,1);
ATH_MSG_INFO("Set one processor free");
}
ATH_MSG_INFO("Still " << procReportPending << " pending reports");
// Final round of colecting output file names from processors
while(procReportPending>0) {
std::string strProcessorRequest = getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
if(!strProcessorRequest.empty()) {
ATH_MSG_WARNING("Unexpected message received from a processor at this stage : " << strProcessorRequest);
}
pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending);
usleep(1000);
ATH_MSG_INFO("Set worker PID=" << workerPid << " free");
ATH_MSG_INFO("Still " << procReportPending << " pending reports");
processorWaitRequest.clear();
}
}
......@@ -509,22 +524,26 @@ std::string EvtRangeScatterer::getNewRangeRequest(yampl::ISocket* socket2Process
return strProcessorRequest;
}
void EvtRangeScatterer::pollFailedPidQueue(AthenaInterprocess::SharedQueue* sharedFailedPidQueue
, yampl::ISocket* socket2Pilot
, int& procReportPending)
pid_t EvtRangeScatterer::pollFailedPidQueue(AthenaInterprocess::SharedQueue* sharedFailedPidQueue
, yampl::ISocket* socket2Pilot
, int& procReportPending)
{
pid_t pid;
if(sharedFailedPidQueue->try_receive_basic<pid_t>(pid)) {
pid_t pid{0};
if(sharedFailedPidQueue->try_receive_basic<pid_t>(pid)
&& pid!=-1) {
ATH_MSG_INFO("Procesor with PID=" << pid << " has failed!");
if(m_pid2RangeID.find(pid)!=m_pid2RangeID.end()) {
auto itPid = m_pid2RangeID.find(pid);
if(itPid!=m_pid2RangeID.end()) {
ATH_MSG_WARNING("The failed RangeID = " << m_pid2RangeID[pid] << " will be reported to Pilot");
std::string errorStr("ERR_ATHENAMP_PROCESS " + m_pid2RangeID[pid] + ": Failed to process event range");
void* errorMessage = malloc(errorStr.size());
memcpy(errorMessage,errorStr.data(),errorStr.size());
socket2Pilot->send(errorMessage,errorStr.size());
--procReportPending;
m_pid2RangeID.erase(pid);
}
procReportPending--;
ATH_MSG_INFO("Reports pending: " << procReportPending);
}
return pid;
}
/*
Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration
Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
*/
#ifndef ATHENAMPTOOLS_TOKENSCATTERER_H
......@@ -56,9 +56,9 @@ class EvtRangeScatterer final : public AthenaMPToolBase
, int& procReportPending);
// Poll the failed PID queue to see if any of the Processors has failed
void pollFailedPidQueue(AthenaInterprocess::SharedQueue* sharedFailedPidQueue
, yampl::ISocket* socket2Pilot
, int& procReportPending);
pid_t pollFailedPidQueue(AthenaInterprocess::SharedQueue* sharedFailedPidQueue
, yampl::ISocket* socket2Pilot
, int& procReportPending);
StringProperty m_processorChannel;
StringProperty m_eventRangeChannel;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment