From f2a79b01bea84a7b10b66afd350bd7da7a446f9b Mon Sep 17 00:00:00 2001
From: Vakho Tsulaia <vakhtang.tsulaia@cern.ch>
Date: Thu, 24 Dec 2020 08:37:17 +0100
Subject: [PATCH] Bugfix for the Event Service

Bugfix in the mechanism of handling failed MP workers in the Event Service.
This patch covers the case when a worker fails to transition from the event
processing state into the finalization state after receiving "No more events"
message from the range scatterer. The idea here is that instead of releasing
a fixed number of workers (nprocs), the scatterer will keep releasing the
workers until it gets a signal that all of them have finished.
---
 .../AthenaMPTools/src/EvtRangeProcessor.cxx   | 13 ++-
 Control/AthenaMPTools/src/EvtRangeProcessor.h |  3 +-
 .../AthenaMPTools/src/EvtRangeScatterer.cxx   | 79 ++++++++++++-------
 Control/AthenaMPTools/src/EvtRangeScatterer.h |  8 +-
 4 files changed, 67 insertions(+), 36 deletions(-)

diff --git a/Control/AthenaMPTools/src/EvtRangeProcessor.cxx b/Control/AthenaMPTools/src/EvtRangeProcessor.cxx
index d9d8f01cabd8..c931f4f75b3d 100644
--- a/Control/AthenaMPTools/src/EvtRangeProcessor.cxx
+++ b/Control/AthenaMPTools/src/EvtRangeProcessor.cxx
@@ -1,5 +1,5 @@
 /*
-  Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
+  Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
 */
 
 #include "EvtRangeProcessor.h"
@@ -47,6 +47,7 @@ EvtRangeProcessor::EvtRangeProcessor(const std::string& type
   , m_isPileup(false)
   , m_rankId(-1)
   , m_nEventsBeforeFork(0)
+  , m_activeWorkers(0)
   , m_inpFile("")
   , m_chronoStatSvc("ChronoStatSvc", name)
   , m_incidentSvc("IncidentSvc", name)
@@ -120,6 +121,7 @@ int EvtRangeProcessor::makePool(int, int nprocs, const std::string& topdir)
   }
 
   m_nprocs = (nprocs==-1?sysconf(_SC_NPROCESSORS_ONLN):nprocs);
+  m_activeWorkers = m_nprocs;
   m_subprocTopDir = topdir;
 
   // Create rank queue and fill it
@@ -236,6 +238,15 @@ StatusCode EvtRangeProcessor::wait_once(pid_t& pid)
 	return StatusCode::FAILURE;
       }
     }
+    else {
+      // The worker finished successfully and it was the last worker. Release the Event Range Scatterer
+      if(--m_activeWorkers==0
+	 && !m_sharedFailedPidQueue->send_basic<pid_t>(-1)) {
+	// To Do: how to report this error to the pilot?
+	ATH_MSG_ERROR("Failed to release the Event Range Scatterer");
+	return StatusCode::FAILURE;
+      }
+    }
 
     // Erase the pid from m_procStates map
     m_procStates.erase(itProcState);
diff --git a/Control/AthenaMPTools/src/EvtRangeProcessor.h b/Control/AthenaMPTools/src/EvtRangeProcessor.h
index 82b8268489d2..ea997db12123 100644
--- a/Control/AthenaMPTools/src/EvtRangeProcessor.h
+++ b/Control/AthenaMPTools/src/EvtRangeProcessor.h
@@ -1,5 +1,5 @@
 /*
-  Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration
+  Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
 */
 
 #ifndef ATHENAMPTOOLS_EVTRANGEPROCESSOR_H
@@ -66,6 +66,7 @@ class EvtRangeProcessor final : public AthenaMPToolBase
   bool m_isPileup;        // Are we doing pile-up digitization?
   int  m_rankId;          // Each worker has its own unique RankID from the range (0,...,m_nprocs-1) 
   int  m_nEventsBeforeFork;
+  int  m_activeWorkers;   // Keep track of the number of workers
   std::string m_inpFile;  // Cached name of the input file. To avoid reopening
 
   ServiceHandle<IChronoStatSvc>     m_chronoStatSvc;
diff --git a/Control/AthenaMPTools/src/EvtRangeScatterer.cxx b/Control/AthenaMPTools/src/EvtRangeScatterer.cxx
index 0c3da82ad342..e40a15b132da 100644
--- a/Control/AthenaMPTools/src/EvtRangeScatterer.cxx
+++ b/Control/AthenaMPTools/src/EvtRangeScatterer.cxx
@@ -1,5 +1,5 @@
 /*
-  Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
+  Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
 */
 
 #include "EvtRangeScatterer.h"
@@ -23,8 +23,8 @@
 #include <cstdlib>
 
 EvtRangeScatterer::EvtRangeScatterer(const std::string& type
-			       , const std::string& name
-			       , const IInterface* parent)
+				     , const std::string& name
+				     , const IInterface* parent)
   : AthenaMPToolBase(type,name,parent)
   , m_processorChannel("")
   , m_eventRangeChannel("")
@@ -212,6 +212,7 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func(
   std::string strReady("Ready for events");
   std::string strStopProcessing("No more events");
   std::string processorWaitRequest("");
+  int workerPid{-1};
 
   ATH_MSG_INFO("Starting main loop");
 
@@ -225,6 +226,12 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func(
 	usleep(1000);
       }
       ATH_MSG_INFO("One of the processors is ready for the next range");
+      // Get PID from the request and Update m_pid2RangeID
+      workerPid = std::atoi(processorWaitRequest.c_str());
+      auto it = m_pid2RangeID.find(workerPid);
+      if(it!=m_pid2RangeID.end()) {
+	m_pid2RangeID.erase(it);
+      }
     }    
 
     // Signal the Pilot that AthenaMP is ready for event processing
@@ -239,7 +246,7 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func(
       eventRange = eventRange.substr(0,carRet);
 
     // Break the loop if no more ranges are expected
-    if(eventRange.compare(strStopProcessing)==0) {
+    if(eventRange.find(strStopProcessing)!=std::string::npos) {
       ATH_MSG_INFO("Stopped the loop. Last message from the Event Range Channel: " << eventRange);
       break;
     }
@@ -335,6 +342,12 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func(
 	usleep(1000);
       }
       ATH_MSG_INFO("One of the processors is ready for the next range");
+      // Get PID from the request and Update m_pid2RangeID
+      workerPid = std::atoi(processorWaitRequest.c_str());
+      auto it = m_pid2RangeID.find(workerPid);
+      if(it!=m_pid2RangeID.end()) {
+	m_pid2RangeID.erase(it);
+      }
     }
     
     // Send to the Processor: RangeID,evtToken[,evtToken] 
@@ -344,8 +357,7 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func(
     procReportPending++;
 
     // Get PID from the request and Update m_pid2RangeID
-    int pid = std::atoi(processorWaitRequest.c_str());
-    m_pid2RangeID[pid] = rangeID;
+    m_pid2RangeID[workerPid] = rangeID;
     processorWaitRequest.clear();
 
     ATH_MSG_INFO("Sent response to the processor : " << message2ProcessorStr);
@@ -360,29 +372,32 @@ std::unique_ptr<AthenaInterprocess::ScheduledWork> EvtRangeScatterer::exec_func(
       // We already have one processor waiting for the answer
       emptyMess4Processor = malloc(1);
       socket2Processor->send(emptyMess4Processor,1);
-      ATH_MSG_INFO("Set one processor free");
+      ATH_MSG_INFO("Set worker PID=" << workerPid << " free");
+      processorWaitRequest.clear();
     }
-    for(int i(0); i<(processorWaitRequest.empty()?m_nprocs:m_nprocs-1); ++i) {
+    bool endLoop{false};
+    while(true) {
       ATH_MSG_DEBUG("Going to set another processor free");
-      while(getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending).empty()) {
-	pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending);
+      while(processorWaitRequest.empty()) {
+	processorWaitRequest = getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
+	if(pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending)==-1) {
+	  endLoop = true;
+	  break;
+	}
 	usleep(1000);
       }
+      if(endLoop) break;
+      // Remove worker from m_pid2RangeID
+      workerPid = std::atoi(processorWaitRequest.c_str());
+      auto it = m_pid2RangeID.find(workerPid);
+      if(it!=m_pid2RangeID.end()) {
+	m_pid2RangeID.erase(it);
+      }
       emptyMess4Processor = malloc(1);
       socket2Processor->send(emptyMess4Processor,1);
-      ATH_MSG_INFO("Set one processor free");
-    }
-
-    ATH_MSG_INFO("Still " << procReportPending << " pending reports");
-    
-    // Final round of colecting output file names from processors
-    while(procReportPending>0) {
-      std::string strProcessorRequest = getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending);
-      if(!strProcessorRequest.empty()) {
-	ATH_MSG_WARNING("Unexpected message received from a processor at this stage : " << strProcessorRequest);
-      }
-      pollFailedPidQueue(sharedFailedPidQueue,socket2Pilot,procReportPending);
-      usleep(1000);
+      ATH_MSG_INFO("Set worker PID=" << workerPid << " free");
+      ATH_MSG_INFO("Still " << procReportPending << " pending reports");
+      processorWaitRequest.clear();
     }
   }
 
@@ -509,22 +524,26 @@ std::string EvtRangeScatterer::getNewRangeRequest(yampl::ISocket* socket2Process
   return strProcessorRequest;
 }
 
-void EvtRangeScatterer::pollFailedPidQueue(AthenaInterprocess::SharedQueue*  sharedFailedPidQueue
-					, yampl::ISocket* socket2Pilot
-					, int& procReportPending)
+pid_t EvtRangeScatterer::pollFailedPidQueue(AthenaInterprocess::SharedQueue* sharedFailedPidQueue
+					    , yampl::ISocket* socket2Pilot
+					    , int& procReportPending)
 {
-  pid_t pid;
-  if(sharedFailedPidQueue->try_receive_basic<pid_t>(pid)) {
+  pid_t pid{0};
+  if(sharedFailedPidQueue->try_receive_basic<pid_t>(pid)
+     && pid!=-1) {
     ATH_MSG_INFO("Procesor with PID=" << pid << " has failed!");
-    if(m_pid2RangeID.find(pid)!=m_pid2RangeID.end()) {
+    auto itPid = m_pid2RangeID.find(pid);
+    if(itPid!=m_pid2RangeID.end()) {
       ATH_MSG_WARNING("The failed RangeID = " << m_pid2RangeID[pid] << " will be reported to Pilot");
 
       std::string errorStr("ERR_ATHENAMP_PROCESS " + m_pid2RangeID[pid] + ": Failed to process event range");
       void* errorMessage = malloc(errorStr.size());
       memcpy(errorMessage,errorStr.data(),errorStr.size());
       socket2Pilot->send(errorMessage,errorStr.size());
+      --procReportPending;
+      m_pid2RangeID.erase(pid);
     }
-    procReportPending--;
     ATH_MSG_INFO("Reports pending: " << procReportPending);
   } 
+  return pid;
 }
diff --git a/Control/AthenaMPTools/src/EvtRangeScatterer.h b/Control/AthenaMPTools/src/EvtRangeScatterer.h
index b7b40e03f13d..2b8a617e6dc3 100644
--- a/Control/AthenaMPTools/src/EvtRangeScatterer.h
+++ b/Control/AthenaMPTools/src/EvtRangeScatterer.h
@@ -1,5 +1,5 @@
 /*
-  Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration
+  Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
 */
 
 #ifndef ATHENAMPTOOLS_TOKENSCATTERER_H
@@ -56,9 +56,9 @@ class EvtRangeScatterer final : public AthenaMPToolBase
 				 , int& procReportPending);
 
   // Poll the failed PID queue to see if any of the Processors has failed
-  void pollFailedPidQueue(AthenaInterprocess::SharedQueue*  sharedFailedPidQueue
-			  , yampl::ISocket* socket2Pilot
-			  , int& procReportPending);
+  pid_t pollFailedPidQueue(AthenaInterprocess::SharedQueue*  sharedFailedPidQueue
+			   , yampl::ISocket* socket2Pilot
+			   , int& procReportPending);
 
   StringProperty           m_processorChannel;
   StringProperty           m_eventRangeChannel;
-- 
GitLab