Skip to content
Snippets Groups Projects
Commit f2a79b01 authored by Vakhtang Tsulaia's avatar Vakhtang Tsulaia
Browse files

Bugfix for the Event Service

Bugfix in the mechanism of handling failed MP workers in the Event Service.
This patch covers the case when a worker fails to transition from the event
processing state into the finalization state after receiving "No more events"
message from the range scatterer. The idea here is that instead of releasing
a fixed number of workers (nprocs), the scatterer will keep releasing the
workers until it gets a signal that all of them have finished.
parent 99521457
No related branches found
No related tags found
6 merge requests!58791DataQualityConfigurations: Modify L1Calo config for web display,!46784MuonCondInterface: Enable thread-safety checking.,!46776Updated LArMonitoring config file for WD to match new files produced using MT,!45405updated ART test cron job,!42417Draft: DIRE and VINCIA Base Fragments for Pythia 8.3,!39410Bugfix for the Event Service
/* /*
Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
*/ */
#include "EvtRangeProcessor.h" #include "EvtRangeProcessor.h"
...@@ -47,6 +47,7 @@ EvtRangeProcessor::EvtRangeProcessor(const std::string& type ...@@ -47,6 +47,7 @@ EvtRangeProcessor::EvtRangeProcessor(const std::string& type
, m_isPileup(false) , m_isPileup(false)
, m_rankId(-1) , m_rankId(-1)
, m_nEventsBeforeFork(0) , m_nEventsBeforeFork(0)
, m_activeWorkers(0)
, m_inpFile("") , m_inpFile("")
, m_chronoStatSvc("ChronoStatSvc", name) , m_chronoStatSvc("ChronoStatSvc", name)
, m_incidentSvc("IncidentSvc", name) , m_incidentSvc("IncidentSvc", name)
...@@ -120,6 +121,7 @@ int EvtRangeProcessor::makePool(int, int nprocs, const std::string& topdir) ...@@ -120,6 +121,7 @@ int EvtRangeProcessor::makePool(int, int nprocs, const std::string& topdir)
} }
m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs); m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs);
m_activeWorkers = m_nprocs;
m_subprocTopDir = topdir; m_subprocTopDir = topdir;
// Create rank queue and fill it // Create rank queue and fill it
...@@ -236,6 +238,15 @@ StatusCode EvtRangeProcessor::wait_once(pid_t& pid) ...@@ -236,6 +238,15 @@ StatusCode EvtRangeProcessor::wait_once(pid_t& pid)
return StatusCode::FAILURE; return StatusCode::FAILURE;
} }
} }
else {
// The worker finished successfully and it was the last worker. Release the Event Range Scatterer
if(--m_activeWorkers==0
&& !m_sharedFailedPidQueue->send_basic<pid_t>(-1)) {
// To Do: how to report this error to the pilot?
ATH_MSG_ERROR("Failed to release the Event Range Scatterer");
return StatusCode::FAILURE;
}
}
// Erase the pid from m_procStates map // Erase the pid from m_procStates map
m_procStates.erase(itProcState); m_procStates.erase(itProcState);
......
/* /*
Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
*/ */
#ifndef ATHENAMPTOOLS_EVTRANGEPROCESSOR_H #ifndef ATHENAMPTOOLS_EVTRANGEPROCESSOR_H
...@@ -66,6 +66,7 @@ class EvtRangeProcessor final : public AthenaMPToolBase ...@@ -66,6 +66,7 @@ class EvtRangeProcessor final : public AthenaMPToolBase
bool m_isPileup; // Are we doing pile-up digitization? bool m_isPileup; // Are we doing pile-up digitization?
int m_rankId; // Each worker has its own unique RankID from the range (0,...,m_nprocs-1) int m_rankId; // Each worker has its own unique RankID from the range (0,...,m_nprocs-1)
int m_nEventsBeforeFork; int m_nEventsBeforeFork;
int m_activeWorkers; // Keep track of the number of workers
std::string m_inpFile; // Cached name of the input file. To avoid reopening std::string m_inpFile; // Cached name of the input file. To avoid reopening
ServiceHandle<IChronoStatSvc> m_chronoStatSvc; ServiceHandle<IChronoStatSvc> m_chronoStatSvc;
......
/* /*
Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
*/ */
#include "EvtRangeScatterer.h" #include "EvtRangeScatterer.h"
...@@ -23,8 +23,8 @@ ...@@ -23,8 +23,8 @@
#include <cstdlib> #include <cstdlib>
EvtRangeScatterer::EvtRangeScatterer(const std::string& type EvtRangeScatterer::EvtRangeScatterer(const std::string& type
, const std::string& name , const std::string& name
, const IInterface* parent) , const IInterface* parent)
: AthenaMPToolBase(type,name,parent) : AthenaMPToolBase(type,name,parent)
, m_processorChannel("") , m_processorChannel("")
, m_eventRangeChannel("") , m_eventRangeChannel("")
...@@ -212,6 +212,7 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func( ...@@ -212,6 +212,7 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func(
std::string strReady("Ready for events"); std::string strReady("Ready for events");
std::string strStopProcessing("No more events"); std::string strStopProcessing("No more events");
std::string processorWaitRequest(""); std::string processorWaitRequest("");
int workerPid{-1};
ATH_MSG_INFO("Starting main loop"); ATH_MSG_INFO("Starting main loop");
...@@ -225,6 +226,12 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func( ...@@ -225,6 +226,12 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func(
usleep(1000); usleep(1000);
} }
ATH_MSG_INFO("One of the processors is ready for the next range"); ATH_MSG_INFO("One of the processors is ready for the next range");
// Get PID from the request and Update m_pid2RangeID
workerPid = std::atoi(processorWaitRequest.c_str());
auto it = m_pid2RangeID.find(workerPid);
if(it!=m_pid2RangeID.end()) {
m_pid2RangeID.erase(it);
}
} }
// Signal the Pilot that AthenaMP is ready for event processing // Signal the Pilot that AthenaMP is ready for event processing
...@@ -239,7 +246,7 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func( ...@@ -239,7 +246,7 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func(
eventRange = eventRange.substr(0,carRet); eventRange = eventRange.substr(0,carRet);
// Break the loop if no more ranges are expected // Break the loop if no more ranges are expected
if(eventRange.compare(strStopProcessing)==0) { if(eventRange.find(strStopProcessing)!=std::string::npos) {
ATH_MSG_INFO("Stopped the loop. Last message from the Event Range Channel: " << eventRange); ATH_MSG_INFO("Stopped the loop. Last message from the Event Range Channel: " << eventRange);
break; break;
} }
...@@ -335,6 +342,12 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func( ...@@ -335,6 +342,12 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func(
usleep(1000); usleep(1000);
} }
ATH_MSG_INFO("One of the processors is ready for the next range"); ATH_MSG_INFO("One of the processors is ready for the next range");
// Get PID from the request and Update m_pid2RangeID
workerPid = std::atoi(processorWaitRequest.c_str());
auto it = m_pid2RangeID.find(workerPid);
if(it!=m_pid2RangeID.end()) {
m_pid2RangeID.erase(it);
}
} }
// Send to the Processor: RangeID,evtToken[,evtToken] // Send to the Processor: RangeID,evtToken[,evtToken]
...@@ -344,8 +357,7 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func( ...@@ -344,8 +357,7 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func(
procReportPending++; procReportPending++;
// Get PID from the request and Update m_pid2RangeID // Get PID from the request and Update m_pid2RangeID
int pid = std::atoi(processorWaitRequest.c_str()); m_pid2RangeID[workerPid] = rangeID;
m_pid2RangeID[pid] = rangeID;
processorWaitRequest.clear(); processorWaitRequest.clear();
ATH_MSG_INFO("Sent response to the processor : " << message2ProcessorStr); ATH_MSG_INFO("Sent response to the processor : " << message2ProcessorStr);
...@@ -360,29 +372,32 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func( ...@@ -360,29 +372,32 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func(
// We already have one processor waiting for the answer // We already have one processor waiting for the answer
emptyMess4Processor = malloc(1); emptyMess4Processor = malloc(1);
socket2Processor->send(emptyMess4Processor,1); socket2Processor->send(emptyMess4Processor,1);
ATH_MSG_INFO("Set one processor free"); ATH_MSG_INFO("Set worker PID=" << workerPid << " free");
processorWaitRequest.clear();
} }
for(int i(0); i<(processorWaitRequest.empty()?m_nprocs:m_nprocs-1); ++i) { bool endLoop{false};
while(true) {
ATH_MSG_DEBUG("Going to set another processor free"); ATH_MSG_DEBUG("Going to set another processor free");
while(getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending).empty()) { while(processorWaitRequest.empty()) {
pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending); processorWaitRequest = getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
if(pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending)==-1) {
endLoop = true;
break;
}
usleep(1000); usleep(1000);
} }
if(endLoop) break;
// Remove worker from m_pid2RangeID
workerPid = std::atoi(processorWaitRequest.c_str());
auto it = m_pid2RangeID.find(workerPid);
if(it!=m_pid2RangeID.end()) {
m_pid2RangeID.erase(it);
}
emptyMess4Processor = malloc(1); emptyMess4Processor = malloc(1);
socket2Processor->send(emptyMess4Processor,1); socket2Processor->send(emptyMess4Processor,1);
ATH_MSG_INFO("Set one processor free"); ATH_MSG_INFO("Set worker PID=" << workerPid << " free");
} ATH_MSG_INFO("Still " << procReportPending << " pending reports");
processorWaitRequest.clear();
ATH_MSG_INFO("Still " << procReportPending << " pending reports");
// Final round of colecting output file names from processors
while(procReportPending>0) {
std::string strProcessorRequest = getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
if(!strProcessorRequest.empty()) {
ATH_MSG_WARNING("Unexpected message received from a processor at this stage : " << strProcessorRequest);
}
pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending);
usleep(1000);
} }
} }
...@@ -509,22 +524,26 @@ std::string EvtRangeScatterer::getNewRangeRequest(yampl::ISocket* socket2Process ...@@ -509,22 +524,26 @@ std::string EvtRangeScatterer::getNewRangeRequest(yampl::ISocket* socket2Process
return strProcessorRequest; return strProcessorRequest;
} }
void EvtRangeScatterer::pollFailedPidQueue(AthenaInterprocess::SharedQueue* sharedFailedPidQueue pid_t EvtRangeScatterer::pollFailedPidQueue(AthenaInterprocess::SharedQueue* sharedFailedPidQueue
, yampl::ISocket* socket2Pilot , yampl::ISocket* socket2Pilot
, int& procReportPending) , int& procReportPending)
{ {
pid_t pid; pid_t pid{0};
if(sharedFailedPidQueue->try_receive_basic<pid_t>(pid)) { if(sharedFailedPidQueue->try_receive_basic<pid_t>(pid)
&& pid!=-1) {
ATH_MSG_INFO("Procesor with PID=" << pid << " has failed!"); ATH_MSG_INFO("Procesor with PID=" << pid << " has failed!");
if(m_pid2RangeID.find(pid)!=m_pid2RangeID.end()) { auto itPid = m_pid2RangeID.find(pid);
if(itPid!=m_pid2RangeID.end()) {
ATH_MSG_WARNING("The failed RangeID = " << m_pid2RangeID[pid] << " will be reported to Pilot"); ATH_MSG_WARNING("The failed RangeID = " << m_pid2RangeID[pid] << " will be reported to Pilot");
std::string errorStr("ERR_ATHENAMP_PROCESS " + m_pid2RangeID[pid] + ": Failed to process event range"); std::string errorStr("ERR_ATHENAMP_PROCESS " + m_pid2RangeID[pid] + ": Failed to process event range");
void* errorMessage = malloc(errorStr.size()); void* errorMessage = malloc(errorStr.size());
memcpy(errorMessage,errorStr.data(),errorStr.size()); memcpy(errorMessage,errorStr.data(),errorStr.size());
socket2Pilot->send(errorMessage,errorStr.size()); socket2Pilot->send(errorMessage,errorStr.size());
--procReportPending;
m_pid2RangeID.erase(pid);
} }
procReportPending--;
ATH_MSG_INFO("Reports pending: " << procReportPending); ATH_MSG_INFO("Reports pending: " << procReportPending);
} }
return pid;
} }
/* /*
Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
*/ */
#ifndef ATHENAMPTOOLS_TOKENSCATTERER_H #ifndef ATHENAMPTOOLS_TOKENSCATTERER_H
...@@ -56,9 +56,9 @@ class EvtRangeScatterer final : public AthenaMPToolBase ...@@ -56,9 +56,9 @@ class EvtRangeScatterer final : public AthenaMPToolBase
, int& procReportPending); , int& procReportPending);
// Poll the failed PID queue to see if any of the Processors has failed // Poll the failed PID queue to see if any of the Processors has failed
void pollFailedPidQueue(AthenaInterprocess::SharedQueue* sharedFailedPidQueue pid_t pollFailedPidQueue(AthenaInterprocess::SharedQueue* sharedFailedPidQueue
, yampl::ISocket* socket2Pilot , yampl::ISocket* socket2Pilot
, int& procReportPending); , int& procReportPending);
StringProperty m_processorChannel; StringProperty m_processorChannel;
StringProperty m_eventRangeChannel; StringProperty m_eventRangeChannel;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment