Commit a5f5522c authored by Vakhtang Tsulaia's avatar Vakhtang Tsulaia Committed by Graeme Stewart
Browse files

Introducing event chunks in the Shared Event Queue (AthenaMPTools-00-02-29)

	* Introducing event chunks in the Shared Event Queue

2014-11-01  Vakho Tsulaia  <tsulaia@cern.ch>

	* Fire UpdateAfterFork incidnt in SharedEvtQueueconsumer

2014-10-30  Vakho Tsulaia  <tsulaia@cern.ch>

	* Serialize finalization of processes in TokenProcessor

2014-10-29  Vakho Tsulaia  <tsulaia@cern.ch>

	* Introducing DoCaching flag in the TokenScatterer
parent 0ceaff33
......@@ -5,6 +5,7 @@
#include "SharedEvtQueueConsumer.h"
#include "copy_file_icc_hack.h"
#include "AthenaInterprocess/ProcessGroup.h"
#include "AthenaInterprocess/Incidents.h"
#include "AthenaKernel/IEventSeek.h"
#include "AthenaKernel/IEventShare.h"
......@@ -13,6 +14,7 @@
#include "GaudiKernel/IFileMgr.h"
#include "GaudiKernel/IChronoStatSvc.h"
#include "GaudiKernel/ISvcLocator.h"
#include "GaudiKernel/IIncidentSvc.h"
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
......@@ -24,6 +26,7 @@
#include <stdio.h>
#include <stdint.h>
#include <stdexcept>
#include <cmath> // For pow
SharedEvtQueueConsumer::SharedEvtQueueConsumer(const std::string& type
, const std::string& name
......@@ -31,8 +34,8 @@ SharedEvtQueueConsumer::SharedEvtQueueConsumer(const std::string& type
: AthenaMPToolBase(type,name,parent)
, m_useSharedReader(false)
, m_isPileup(false)
, m_rankId(-1)
, m_nEventsBeforeFork(0)
, m_rankId(-1)
, m_chronoStatSvc("ChronoStatSvc", name)
, m_evtSeek(0)
, m_evtShare(0)
......@@ -335,6 +338,14 @@ AthenaInterprocess::ScheduledWork* SharedEvtQueueConsumer::bootstrap_func()
return outwork;
}
// ___________________ Fire UpdateAfterFork incident _________________
IIncidentSvc* p_incidentSvc(0);
if(!serviceLocator()->service("IncidentSvc", p_incidentSvc).isSuccess()) {
msg(MSG::ERROR) << "Unable to retrieve IncidentSvc" << endreq;
return outwork;
}
p_incidentSvc->fireIncident(AthenaInterprocess::UpdateAfterFork(m_rankId,getpid(),name()));
// Declare success and return
*errcode = 0;
return outwork;
......@@ -387,9 +398,10 @@ AthenaInterprocess::ScheduledWork* SharedEvtQueueConsumer::exec_func()
// ________________________ This is needed only for PileUp jobs __________________________________
int nEvt(1+m_nEventsBeforeFork);
long intmask = pow(0x100,sizeof(int))-1; // Mask for decoding event number from the value posted to the queue
int nEvt(m_nEventsBeforeFork);
int nEventsProcessed(0);
int evtnum(0);
long evtnumAndChunk(0);
std::string shmemName("/athmp-shmem-"+m_randStr);
boost::interprocess::shared_memory_object shmemSegment(boost::interprocess::open_only
, shmemName.c_str()
......@@ -402,7 +414,7 @@ AthenaInterprocess::ScheduledWork* SharedEvtQueueConsumer::exec_func()
m_chronoStatSvc->chronoStart("AthenaMP_getEvent");
if(all_ok) {
while(true) {
if(!m_sharedEventQueue->try_receive_basic<int>(evtnum)) {
if(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) {
// The event queue is empty, but we should check whether there are more events to come or not
msg(MSG::DEBUG) << "Event queue is empty";
if(*shmemCountFinal) {
......@@ -415,9 +427,12 @@ AthenaInterprocess::ScheduledWork* SharedEvtQueueConsumer::exec_func()
continue;
}
}
msg(MSG::DEBUG) << "Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec << endreq;
m_chronoStatSvc->chronoStop("AthenaMP_getEvent");
msg(MSG::INFO) << "Received event num " << evtnum << endreq;
int chunkSize = evtnumAndChunk >> (sizeof(int)*8);
int evtnum = evtnumAndChunk & intmask;
msg(MSG::INFO) << "Received from the queue: event num=" << evtnum << " chunk size=" << chunkSize << endreq;
nEvt+=chunkSize;
StatusCode sc;
if(m_useSharedReader) {
sc = m_evtShare->share(evtnum);
......@@ -444,10 +459,13 @@ AthenaInterprocess::ScheduledWork* SharedEvtQueueConsumer::exec_func()
m_chronoStatSvc->chronoStop("AthenaMP_seek");
}
m_chronoStatSvc->chronoStart("AthenaMP_nextEvent");
sc = m_evtProcessor->nextEvent(nEvt++);
nEventsProcessed++;
sc = m_evtProcessor->nextEvent(nEvt);
nEventsProcessed += chunkSize;
if(sc.isFailure()){
msg(MSG::ERROR) << "Unable to process " << evtnum << endreq;
if(chunkSize==1)
msg(MSG::ERROR) << "Unable to process event " << evtnum << endreq;
else
msg(MSG::ERROR) << "Unable to process the chunk (" << evtnum << "," << evtnum+chunkSize-1 << ")" << endreq;
all_ok=false;
break;
}
......
......@@ -49,11 +49,13 @@ class SharedEvtQueueConsumer : public AthenaMPToolBase
// 2. If doFinalize flag is set then serialize process finalizations
int decodeProcessResult(const AthenaInterprocess::ProcessResult* presult, bool doFinalize);
// Properties
bool m_useSharedReader; // Work in pair with a SharedReader
bool m_isPileup; // Are we doing pile-up digitization?
int m_rankId; // Each worker has its own unique RankID from the range (0,...,m_nprocs-1)
int m_nEventsBeforeFork;
int m_rankId; // Each worker has its own unique RankID from the range (0,...,m_nprocs-1)
ServiceHandle<IChronoStatSvc> m_chronoStatSvc;
IEventSeek* m_evtSeek;
IEventShare* m_evtShare;
......
......@@ -29,13 +29,14 @@ SharedEvtQueueProvider::SharedEvtQueueProvider(const std::string& type
, const IInterface* parent)
: AthenaMPToolBase(type,name,parent)
, m_isPileup(false)
, m_preCountedEvents(-1)
, m_nEventsBeforeFork(0)
, m_chunkSize(1)
, m_nEvtRequested(-1)
, m_skipEvents(0)
, m_nEvtCounted(0)
, m_nEvtAddPending(0)
, m_preCountedEvents(-1)
, m_needCountEvents(false)
, m_nEventsBeforeFork(0)
, m_nEventsInInpFiles(0)
, m_sharedEventQueue(0)
{
......@@ -44,6 +45,7 @@ SharedEvtQueueProvider::SharedEvtQueueProvider(const std::string& type
declareProperty("IsPileup",m_isPileup);
declareProperty("PreCountedEvents",m_preCountedEvents);
declareProperty("EventsBeforeFork",m_nEventsBeforeFork);
declareProperty("ChunkSize",m_chunkSize);
m_subprocDirPrefix = "evt_counter";
}
......@@ -307,7 +309,8 @@ AthenaInterprocess::ScheduledWork* SharedEvtQueueProvider::exec_func()
}
}
msg(MSG::INFO) << "Done counting events and populating shared queue. Total number of events to be processed: " << std::max(m_nEvtCounted - m_nEventsBeforeFork,0) << endreq;
msg(MSG::INFO) << "Done counting events and populating shared queue. Total number of events to be processed: " << std::max(m_nEvtCounted - m_nEventsBeforeFork,0)
<< ", Event Chunk size in the queue is " << m_chunkSize << endreq;
}
if(all_ok) {
......@@ -430,14 +433,25 @@ int SharedEvtQueueProvider::addEventsToQueue()
{
msg(MSG::DEBUG) << "in addEventsToQueue" << endreq;
long chunkSize(0);
long chunkSizeCounter(0);
// Add events to the queue
while(m_nEvtAddPending>0) {
if(chunkSizeCounter==0) {
chunkSize = (m_nEvtAddPending >= m_chunkSize ? m_chunkSize : m_nEvtAddPending);
chunkSizeCounter = chunkSize;
}
msg(MSG::DEBUG) << "Chunk Size " << chunkSize << endreq;
// Don't add those events which have already been processed by the master before forking
assert(m_nEvtRequested>m_nEvtCounted);
if(m_nEvtCounted<m_nEventsBeforeFork
|| m_sharedEventQueue->try_send_basic<int>(m_nEvtCounted+m_skipEvents)) {
|| chunkSizeCounter < chunkSize
|| m_sharedEventQueue->try_send_basic<long>((chunkSize<<(sizeof(int)*8))|(m_nEvtCounted+m_skipEvents))) {
if(m_nEvtCounted>=m_nEventsBeforeFork && chunkSizeCounter==chunkSize)
msg(MSG::DEBUG) << "Sent to the queue 0x" << std::hex << ((chunkSize<<(sizeof(int)*8))|(m_nEvtCounted+m_skipEvents)) << std::dec << endreq;
m_nEvtCounted++;
m_nEvtAddPending--;
chunkSizeCounter--;
}
else {
// The queue reached maximum capacity
......
......@@ -42,15 +42,17 @@ class SharedEvtQueueProvider : public AthenaMPToolBase
SharedEvtQueueProvider(const SharedEvtQueueProvider&);
SharedEvtQueueProvider& operator= (const SharedEvtQueueProvider&);
// Properties
bool m_isPileup; // Are we doing pile-up digitization?
int m_preCountedEvents; // Somebody (TF) has already counted the events, no need to do that again
int m_nEventsBeforeFork;
int m_chunkSize;
int m_nEvtRequested; // Max event received from AppMgr
int m_skipEvents; // SkipEvent property value of the Event Selectors
int m_nEvtCounted; // The number of events this tool has counted itself in the input files
int m_nEvtAddPending; // Number of pending events to be added to the queue
int m_preCountedEvents; // Somebody (TF) has already counted the events, no need to do that again
bool m_needCountEvents; // Flag indicating whether or not it is necessary to keep counting events
int m_nEventsBeforeFork;
int m_nEventsInInpFiles;// Total number of events in the input files opened so far
AthenaInterprocess::SharedQueue* m_sharedEventQueue;
......
......@@ -50,7 +50,8 @@ TokenProcessor::TokenProcessor(const std::string& type
, m_channel2Scatterer("")
, m_channel2EvtSel("")
, m_sharedRankQueue(0)
, m_sharedFinQueue(0)
, m_socketFactory(0)
, m_socket2Scatterer(0)
{
declareInterface<IAthenaMPTool>(this);
......@@ -99,7 +100,6 @@ StatusCode TokenProcessor::initialize()
StatusCode TokenProcessor::finalize()
{
delete m_sharedRankQueue;
delete m_sharedFinQueue;
return StatusCode::SUCCESS;
}
......@@ -130,16 +130,6 @@ int TokenProcessor::makePool(int, int nprocs, const std::string& topdir)
return -1;
}
// Create finalization scheduling queue
std::ostringstream finQueueName;
finQueueName << "TokenProcessor_FinQueue_" << getpid();
m_sharedFinQueue = new AthenaInterprocess::SharedQueue(finQueueName.str(),m_nprocs,sizeof(int));
for(int i=0; i<m_nprocs; ++i)
if(!m_sharedFinQueue->send_basic<int>(i*3)) { // TO DO: this '3' could be made configurable
msg(MSG::ERROR) << "Unable to send int to the finalization queue!" << endreq;
return -1;
}
// Create the process group and map_async bootstrap
m_processGroup = new AthenaInterprocess::ProcessGroup(m_nprocs);
msg(MSG::INFO) << "Created Pool of " << m_nprocs << " worker processes" << endreq;
......@@ -158,11 +148,6 @@ StatusCode TokenProcessor::exec()
return StatusCode::FAILURE;
msg(MSG::INFO) << "Workers started processing events" << endreq;
// Map exit flag on children
if(m_processGroup->map_async(0,0)){
msg(MSG::ERROR) << "Unable to set exit to the workers" << endreq;
return StatusCode::FAILURE;
}
return StatusCode::SUCCESS;
}
......@@ -188,10 +173,12 @@ StatusCode TokenProcessor::wait_once(int& numFinishedProc)
else {
// Pull one result and decode it if necessary
presult = m_processGroup->pullOneResult();
int res(0);
if(presult && (unsigned)(presult->output.size)>sizeof(int))
decodeProcessResult(presult);
res = decodeProcessResult(presult,true);
if(presult) free(presult->output.data);
delete presult;
if(res) return StatusCode::FAILURE;
}
return sc;
}
......@@ -362,13 +349,13 @@ AthenaInterprocess::ScheduledWork* TokenProcessor::exec_func()
std::queue<std::string> queueTokens;
// Get the yampl connection channels
yampl::ISocketFactory* socketFactory = new yampl::SocketFactory();
yampl::ISocket* socket2Scatterer = socketFactory->createClientSocket(yampl::Channel(m_channel2Scatterer.value(),yampl::LOCAL_PIPE),yampl::MOVE_DATA);
m_socketFactory = new yampl::SocketFactory();
m_socket2Scatterer = m_socketFactory->createClientSocket(yampl::Channel(m_channel2Scatterer.value(),yampl::LOCAL_PIPE),yampl::MOVE_DATA);
msg(MSG::DEBUG) << "Created CLIENT socket to the Scatterer: " << m_channel2Scatterer.value() << endreq;
std::ostringstream pidstr;
pidstr << getpid();
std::string socket2EvtSelName = m_channel2EvtSel.value() + std::string("_") + pidstr.str();
yampl::ISocket* socket2EvtSel = socketFactory->createClientSocket(yampl::Channel(socket2EvtSelName,yampl::LOCAL_PIPE),yampl::COPY_DATA);
yampl::ISocket* socket2EvtSel = m_socketFactory->createClientSocket(yampl::Channel(socket2EvtSelName,yampl::LOCAL_PIPE),yampl::COPY_DATA);
msg(MSG::DEBUG) << "Created CLIENT socket to the Tool: " << socket2EvtSelName << endreq;
// Get the IncidentSvc
......@@ -382,16 +369,14 @@ AthenaInterprocess::ScheduledWork* TokenProcessor::exec_func()
std::string ping = pidstr.str() + std::string(" ready for event processing");
void* message2scatterer = malloc(ping.size());
memcpy(message2scatterer,ping.data(),ping.size());
socket2Scatterer->send(message2scatterer,ping.size());
m_socket2Scatterer->send(message2scatterer,ping.size());
msg(MSG::DEBUG) << "Sent a welcome message to the Scatterer" << endreq;
std::string outputFileReport("");
while(all_ok) {
// Get the response - list of tokens - from the scatterer.
// The format of the response: | ResponseSize | RangeID, | evtToken[,evtToken] |
char *responseBuffer(0);
ssize_t responseSize = socket2Scatterer->recv(responseBuffer);
ssize_t responseSize = m_socket2Scatterer->recv(responseBuffer);
// If response size is 0 then break the loop
if(responseSize==1) {
msg(MSG::DEBUG) << "Empty range received. Terminating the loop" << endreq;
......@@ -421,11 +406,11 @@ AthenaInterprocess::ScheduledWork* TokenProcessor::exec_func()
p_incidentSvc->fireIncident(FileIncident(name(),"NextEventRange",rangeID));
// Time to report the previous output
if(!outputFileReport.empty()) {
message2scatterer = malloc(outputFileReport.size());
memcpy(message2scatterer,outputFileReport.data(),outputFileReport.size());
socket2Scatterer->send(message2scatterer,outputFileReport.size());
outputFileReport.clear();
if(!m_outputFileReport.empty()) {
message2scatterer = malloc(m_outputFileReport.size());
memcpy(message2scatterer,m_outputFileReport.data(),m_outputFileReport.size());
m_socket2Scatterer->send(message2scatterer,m_outputFileReport.size());
m_outputFileReport.clear();
}
}
......@@ -504,13 +489,13 @@ AthenaInterprocess::ScheduledWork* TokenProcessor::exec_func()
outputReportStream << strOutpFile << "," << rangeID
<< ",CPU:" << time_delta.cpuTime<System::Sec>()
<< ",WALL:" << time_delta.elapsedTime<System::Sec>();
outputFileReport = outputReportStream.str();
m_outputFileReport = outputReportStream.str();
}
// Request the next available range
message2scatterer = malloc(ping.size());
memcpy(message2scatterer,ping.data(),ping.size());
socket2Scatterer->send(message2scatterer,ping.size());
m_socket2Scatterer->send(message2scatterer,ping.size());
msg(MSG::DEBUG) << "Sent a message to the scatterer: " << ping << endreq;
} // Main "event loop"
......@@ -521,14 +506,37 @@ AthenaInterprocess::ScheduledWork* TokenProcessor::exec_func()
}
}
// Schedule finalization
int waittime(-1);
if(!m_sharedFinQueue->receive_basic<int>(waittime)) {
msg(MSG::ERROR) << "Unable to value from the finalization queue" << endreq;
all_ok = false;
}
else
usleep(waittime*1000000);
int errcode = (all_ok?0:1); // For now use 0 success, 1 failure
AthenaMPToolBase::Func_Flag func = AthenaMPToolBase::FUNC_EXEC;
// Return value: "ERRCODE|Func_Flag|NEvt"
int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
void* outdata = malloc(outsize);
memcpy(outdata,&errcode,sizeof(int));
memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEventsProcessed,sizeof(int));
AthenaInterprocess::ScheduledWork* outwork = new AthenaInterprocess::ScheduledWork;
outwork->data = outdata;
outwork->size = outsize;
// ...
// (possible) TODO: extend outwork with some error message, which will be eventually
// reported in the master proces
// ...
delete socket2EvtSel;
return outwork;
}
AthenaInterprocess::ScheduledWork* TokenProcessor::fin_func()
{
msg(MSG::INFO) << "Fin function in the AthenaMP worker PID=" << getpid() << endreq;
// We are not able to use private data members after the appMgr has been finalized
yampl::ISocket* socket2Scatterer(m_socket2Scatterer);
yampl::ISocketFactory* socketFactory(m_socketFactory);
std::string outputFileReport(m_outputFileReport);
bool all_ok(true);
if(m_appMgr->stop().isFailure()) {
msg(MSG::ERROR) << "Unable to stop AppMgr" << endreq;
......@@ -543,57 +551,92 @@ AthenaInterprocess::ScheduledWork* TokenProcessor::exec_func()
// Report the last output file
if(!outputFileReport.empty()) {
message2scatterer = malloc(outputFileReport.size());
void* message2scatterer = malloc(outputFileReport.size());
memcpy(message2scatterer,outputFileReport.data(),outputFileReport.size());
socket2Scatterer->send(message2scatterer,outputFileReport.size());
}
int errcode = (all_ok?0:1); // For now use 0 success, 1 failure
AthenaMPToolBase::Func_Flag func = AthenaMPToolBase::FUNC_EXEC;
// Return value: "ERRCODE|Func_Flag|NEvt"
int nEvt = -1;
AthenaMPToolBase::Func_Flag func = AthenaMPToolBase::FUNC_FIN;
// Return value: "ERRCODE|Func_Flag|NEvt" (Here NEvt=-1)
int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
void* outdata = malloc(outsize);
memcpy(outdata,&errcode,sizeof(int));
memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEventsProcessed,sizeof(int));
memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEvt,sizeof(int));
AthenaInterprocess::ScheduledWork* outwork = new AthenaInterprocess::ScheduledWork;
outwork->data = outdata;
outwork->size = outsize;
// ...
// (possible) TODO: extend outwork with some error message, which will be eventually
// reported in the master proces
// ...
delete socket2Scatterer;
delete socket2EvtSel;
delete socketFactory;
return outwork;
}
AthenaInterprocess::ScheduledWork* TokenProcessor::fin_func()
int TokenProcessor::decodeProcessResult(const AthenaInterprocess::ProcessResult* presult, bool doFinalize)
{
// Dummy
int* errcode = new int(0);
AthenaInterprocess::ScheduledWork* outwork = new AthenaInterprocess::ScheduledWork;
outwork->data = (void*)errcode;
outwork->size = sizeof(int);
return outwork;
}
void TokenProcessor::decodeProcessResult(const AthenaInterprocess::ProcessResult* presult)
{
if(!presult) return;
if(!presult) return 0;
const AthenaInterprocess::ScheduledWork& output = presult->output;
if(output.size!=2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)) return;
msg(MSG::DEBUG) << "Decoding the output of PID=" << presult->pid << " with the size=" << output.size << endreq;
if(output.size!=2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)) return 0;
AthenaMPToolBase::Func_Flag func;
memcpy(&func,(char*)output.data+sizeof(int),sizeof(func));
if(func==AthenaMPToolBase::FUNC_EXEC) {
// Store the number of processed events
int nevt(0);
memcpy(&nevt,(char*)output.data+sizeof(int)+sizeof(func),sizeof(int));
m_nProcessedEvents[presult->pid]=nevt;
msg(MSG::DEBUG) << "PID=" << presult->pid << " processed " << nevt << " events" << endreq;
if(doFinalize) {
// Add PID to the finalization queue
m_finQueue.push(presult->pid);
msg(MSG::DEBUG) << "Added PID=" << presult->pid << " to the finalization queue" << endreq;
// If this is the only element in the queue then start its finalization
// Otherwise it has to wait its turn until all previous processes have been finalized
if(m_finQueue.size()==1) {
if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,presult->pid)
|| m_processGroup->map_async(0,0,presult->pid)) {
msg(MSG::ERROR) << "Problem scheduling finalization on PID=" << presult->pid << endreq;
return 1;
}
else {
msg(MSG::DEBUG) << "Scheduled finalization of PID=" << presult->pid << endreq;
}
}
}
}
else if(doFinalize && func==AthenaMPToolBase::FUNC_FIN) {
msg(MSG::DEBUG) << "Finished finalization of PID=" << presult->pid << endreq;
pid_t pid = m_finQueue.front();
if(pid==presult->pid) {
// pid received as expected. Remove it from the queue
m_finQueue.pop();
msg(MSG::DEBUG) << "PID=" << presult->pid << " removed from the queue" << endreq;
// Schedule finalization of the next processe in the queue
if(m_finQueue.size()) {
if(mapAsyncFlag(AthenaMPToolBase::FUNC_FIN,m_finQueue.front())
|| m_processGroup->map_async(0,0,m_finQueue.front())) {
msg(MSG::ERROR) << "Problem scheduling finalization on PID=" << m_finQueue.front() << endreq;
return 1;
}
else {
msg(MSG::DEBUG) << "Scheduled finalization of PID=" << m_finQueue.front() << endreq;
}
}
}
else {
// Error: unexpected pid received from presult
msg(MSG::ERROR) << "Finalized PID=" << presult->pid << " while PID=" << pid << " was expected" << endreq;
return 1;
}
}
return 0;
}
StatusCode TokenProcessor::startProcess()
......
......@@ -11,9 +11,14 @@
#include "yampl/Exceptions.h"
#include "boost/shared_ptr.hpp"
#include <queue>
class IEventShare;
class IChronoStatSvc;
namespace yampl {
class ISocketFactory;
class ISocket;
}
class TokenProcessor : public AthenaMPToolBase
{
......@@ -46,23 +51,30 @@ class TokenProcessor : public AthenaMPToolBase
TokenProcessor(const TokenProcessor&);
TokenProcessor& operator= (const TokenProcessor&);
void decodeProcessResult(const AthenaInterprocess::ProcessResult* presult);
// Decode process results
// 1. Store number of processed events for FUNC_EXEC
// 2. If doFinalize flag is set then serialize process finalizations
int decodeProcessResult(const AthenaInterprocess::ProcessResult* presult, bool doFinalize);
StatusCode startProcess();
bool m_isPileup; // Are we doing pile-up digitization?
int m_rankId; // Each worker has its own unique RankID from the range (0,...,m_nprocs-1)
int m_nEventsBeforeFork;
ServiceHandle<IChronoStatSvc> m_chronoStatSvc;
IEventShare* m_evtShare;
ServiceHandle<IChronoStatSvc> m_chronoStatSvc;
IEventShare* m_evtShare;
StringProperty m_channel2Scatterer;
StringProperty m_channel2EvtSel;
StringProperty m_channel2Scatterer;
StringProperty m_channel2EvtSel;
AthenaInterprocess::SharedQueue* m_sharedRankQueue;
AthenaInterprocess::SharedQueue* m_sharedFinQueue; // Shared queue for scheduling worker finalizations (fighting memory spikes)
std::map<pid_t,int> m_nProcessedEvents; // Number of processed events by PID
std::queue<pid_t> m_finQueue; // PIDs of processes queued for finalization
yampl::ISocketFactory* m_socketFactory;
yampl::ISocket* m_socket2Scatterer;
std::string m_outputFileReport;
};
#endif
......@@ -27,10 +27,12 @@ TokenScatterer::TokenScatterer(const std::string& type
: AthenaMPToolBase(type,name,parent)
, m_processorChannel("")
, m_eventRangeChannel("")
, m_doCaching(false)
{
m_subprocDirPrefix = "token_scatterer";
declareProperty("ProcessorChannel", m_processorChannel);
declareProperty("EventRangeChannel", m_eventRangeChannel);
declareProperty("DoCaching",m_doCaching);
}
TokenScatterer::~TokenScatterer()
......@@ -204,19 +206,31 @@ AthenaInterprocess::ScheduledWork* TokenScatterer::exec_func()
std::string strReady("Ready for events");
std::string strStopProcessing("No more events");
// Signal the Pilot that AthenaMP is ready for event processing
void* ready_message = malloc(strReady.size());
memcpy(ready_message,strReady.data(),strReady.size());
socket2Pilot->send(ready_message,strReady.size());
void* eventRangeMessage;
ssize_t eventRangeSize = socket2Pilot->recv(eventRangeMessage);
std::string eventRange((const char*)eventRangeMessage,eventRangeSize);
size_t carRet = eventRange.find('\n');
if(carRet!=std::string::npos)
eventRange = eventRange.substr(0,carRet);
while(true) {
// NO CACHING MODE: first get a request from one of the processors and only after that request the next event range from the pilot
if(!m_doCaching) {
msg(MSG::DEBUG) << "Start waiting for event range request from one of the processors" << endreq;
while(getNewRangeRequest(socket2Processor,socket2Pilot,procReportPending).empty()) {}
msg(MSG::DEBUG) << "One of the processors is ready for the next range" << endreq;
}
while(eventRange.compare(strStopProcessing)!=0) {
msg(MSG::DEBUG) << "Got Event Range: " << eventRange << endreq;
// Signal the Pilot that AthenaMP is ready for event processing
void* ready_message = malloc(strReady.size());
memcpy(ready_message,strReady.data(),strReady.size());
socket2Pilot->send(ready_message,strReady.size());
void* eventRangeMessage;
ssize_t eventRangeSize = socket2Pilot->recv(eventRangeMessage);
std::string eventRange((const char*)eventRangeMessage,eventRangeSize);
size_t carRet = eventRange.find('\n');
if(carRet!=std::string::npos)
eventRange = eventRange.substr(0,carRet);
// Break the loop if no more ranges are expected
if(eventRange.compare(strStopProcessing)==0) {
msg(MSG::DEBUG) << "Stopped the loop. Last message from the Event Range Channel: " << eventRange << endreq;