diff --git a/Control/AthenaMP/CMakeLists.txt b/Control/AthenaMP/CMakeLists.txt index d397cf3ce15169df758002d8f5e253f9238625ce..be9335c7c542c578dae38929bec9cd90872011ad 100644 --- a/Control/AthenaMP/CMakeLists.txt +++ b/Control/AthenaMP/CMakeLists.txt @@ -17,14 +17,13 @@ atlas_depends_on_subdirs( PRIVATE find_package( Boost COMPONENTS filesystem thread system ) find_package( ROOT COMPONENTS Core PyROOT Tree MathCore Hist RIO pthread ) find_package( Threads ) -find_package( dmtcp ) # Component(s) in the package: atlas_add_component( AthenaMP src/*.cxx src/components/*.cxx src/memory-profiler/getPss.cc - INCLUDE_DIRS ${Boost_INCLUDE_DIRS} ${ROOT_INCLUDE_DIRS} ${DMTCP_INCLUDE_DIRS} + INCLUDE_DIRS ${Boost_INCLUDE_DIRS} ${ROOT_INCLUDE_DIRS} LINK_LIBRARIES ${Boost_LIBRARIES} ${ROOT_LIBRARIES} AthenaBaseComps AthenaInterprocess StoreGateLib SGtests GaudiKernel ) atlas_add_executable( getSharedMemory diff --git a/Control/AthenaMP/src/AthMpEvtLoopMgr.cxx b/Control/AthenaMP/src/AthMpEvtLoopMgr.cxx index 8d087f9765be96bdfe0c1f917855c379e6008d47..381c1064f7eaf0557eef9c8f4e59026c7909cbc0 100644 --- a/Control/AthenaMP/src/AthMpEvtLoopMgr.cxx +++ b/Control/AthenaMP/src/AthMpEvtLoopMgr.cxx @@ -31,8 +31,6 @@ #include <boost/filesystem.hpp> #include <boost/algorithm/string.hpp> -#include "dmtcp.h" - namespace athenaMP_MemHelper { int getPss(pid_t, unsigned long&, unsigned long&, unsigned long&, unsigned long&, bool verbose=false); @@ -248,33 +246,6 @@ StatusCode AthMpEvtLoopMgr::executeRun(int maxevt) int maxEvents(maxevt); // This can be modified after restart - int dmtcp_enabled = dmtcp_is_enabled(); - if(dmtcp_enabled) { - ATH_MSG_INFO("DMTCP is enabled. Preparing to checkpoint ..."); - unsigned original_generation = dmtcp_get_generation(); - int retval = dmtcp_checkpoint(); - if(retval == DMTCP_AFTER_CHECKPOINT){ - // Wait long enough for checkpoint request to be written out - while(dmtcp_get_generation() == original_generation){ - usleep(1000000); - } - // We are done at this point. Exit - ATH_MSG_INFO("Done checkpointing. Exiting ..."); - exit(0); - } - else if(retval == DMTCP_AFTER_RESTART) { - ATH_MSG_INFO("AthenaMP restarted from the checkpoint image"); - ATH_CHECK(afterRestart(maxEvents)); - } - else if(retval == DMTCP_NOT_PRESENT) { - ATH_MSG_WARNING("Attempted to checkpoint, but DMTCP is not running. Skipping checkpoint ..."); - } - - } - else { - ATH_MSG_INFO("DMTCP is not enabled. Proceeding with forking the workers ..."); - } - ToolHandleArray<IAthenaMPTool>::iterator it = m_tools.begin(), itLast = m_tools.end(); @@ -508,8 +479,6 @@ boost::shared_ptr<AthenaInterprocess::FdsRegistry> AthMpEvtLoopMgr::extractFds() ,"/root/include/" ,"/var/tmp/" ,"/var/lock/" - ,"/tmp/dmtcp" - ,"/dmtcp-" }; path fdPath("/proc/self/fd"); @@ -554,162 +523,6 @@ boost::shared_ptr<AthenaInterprocess::FdsRegistry> AthMpEvtLoopMgr::extractFds() return registry; } -StatusCode AthMpEvtLoopMgr::afterRestart(int& maxevt) -{ - // In this function we parse runargs.* script and update several configuration parameters: - // 1. Number of Workers - - // ___________________________________ Get the runargs.* file _____________________________________ - std::string runargsFileName(""); - for(boost::filesystem::directory_iterator fdIt(boost::filesystem::current_path()); fdIt!=boost::filesystem::directory_iterator(); fdIt++) { - if(fdIt->path().string().find("runargs.")!=std::string::npos) { - runargsFileName = fdIt->path().string(); - break; - } - } - - if(runargsFileName.empty()) { - ATH_MSG_WARNING("No file named runargs.* in the run directory. AthenaMP configuration will not be updated"); - return StatusCode::SUCCESS; - } - - // ___________________________________ Parse the runargs.* file _____________________________________ - std::map<std::string,std::string> tokens; - tokens["nprocs"]=std::string(""); - tokens["skipEvents"]=std::string(""); - tokens["maxEvents"]=std::string(""); - tokens["inputEVNTFile"]=std::string(""); - tokens["outputHITSFile"]=std::string(""); - - std::fstream runargsFile(runargsFileName.c_str(),std::fstream::in); - std::string line; - while(!runargsFile.eof()) { - std::getline(runargsFile,line); - for(auto it=tokens.cbegin(); it!=tokens.cend(); ++it) { - if(line.find(it->first+std::string(" ="))!=std::string::npos - || line.find(it->first+std::string("="))!=std::string::npos) { - // Get token value - std::string tokenVal(""); - size_t eqpos = line.rfind("="); - if(eqpos!=std::string::npos) { - tokenVal=line.substr(eqpos+1); - boost::trim(tokenVal); - } - if(!tokenVal.empty()){ - tokens[it->first]=tokenVal; - } - break; - } - } - } - - ATH_MSG_INFO("Read new configurations from the runargs file:"); - for(auto it=tokens.cbegin(); it!=tokens.cend(); ++it) { - ATH_MSG_INFO(it->first<<"="<<it->second); - } - - // _______________________ Update m_nWorkers ___________________________ - const std::string& nprocs = tokens["nprocs"]; - if(!nprocs.empty()) { - int nWorkers = atoi(nprocs.c_str()); - if(nWorkers!=0) { - ATH_MSG_INFO("Retrieved number of workers from runargs.*: " << nWorkers); - m_nWorkers = nWorkers; - } - else { - ATH_MSG_WARNING("Unable to retrieve non-zero number of workers from runargs.*"); - } - } - - ATH_MSG_INFO("AthenaMP will continue by forking " << m_nWorkers << " workers"); - - // _______________________ Update Input File(s) ___________________________ - const std::string& inputEvntFile = tokens["inputEVNTFile"]; - - // Parse the token value - std::vector<std::string> inpFiles; - size_t commapos = inputEvntFile.find(","); - size_t startpos = 0; - while(commapos!=std::string::npos) { - inpFiles.push_back(inputEvntFile.substr(startpos,commapos-startpos)); - startpos = commapos+1; - commapos = inputEvntFile.find(",",startpos); - } - inpFiles.push_back(inputEvntFile.substr(startpos)); - - // Trim the strings. Remove '[', ']', '\"', '\'' and spaces - for(std::string& inp : inpFiles) { - auto pos1 = std::find_if(inp.begin(),inp.end(),[](char ch){return (ch!=' ' && ch!='[' && ch!=']' && ch!='\'' && ch!='\"');}); - inp.erase(inp.begin(),pos1); - auto pos2 = std::find_if(inp.rbegin(),inp.rend(),[](char ch){return (ch!=' ' && ch!='[' && ch!=']' && ch!='\'' && ch!='\"');}); - inp.erase(pos2.base(),inp.end()); - } - - ATH_MSG_INFO("Retrieved new list of input files:"); - for(const std::string& inp : inpFiles) { - if(!inp.empty()) { - ATH_MSG_INFO(" ... " << inp); - } - } - - // Change the InputCollections property of the event selector - IProperty* propertyServer = dynamic_cast<IProperty*>(m_evtSelector); - if(!propertyServer) { - ATH_MSG_ERROR("Unable to dyn-cast the event selector to IProperty"); - return StatusCode::FAILURE; - } - - std::string propertyName("InputCollections"); - StringArrayProperty newInputFileList(propertyName, inpFiles); - if(propertyServer->setProperty(newInputFileList).isFailure()) { - ATH_MSG_ERROR("Unable to update " << newInputFileList.name() << " property on the Event Selector"); - return StatusCode::FAILURE; - } - ATH_MSG_INFO("Updated the InputCollections property of the event selector"); - - // Register new input files with the I/O component manager - IIoComponent* iocomp = dynamic_cast<IIoComponent*>(m_evtSelector); - if(iocomp==nullptr) { - ATH_MSG_FATAL("Unable to dyn-cast Event Selector to IIoComponent"); - return StatusCode::FAILURE; - } - ServiceHandle<IIoComponentMgr> ioMgr("IoComponentMgr",name()); - ATH_CHECK(ioMgr.retrieve()); - for(const std::string& inp :inpFiles) { - if(inp.empty()) continue; - if(!ioMgr->io_register(iocomp, IIoComponentMgr::IoMode::READ,inp,inp).isSuccess()) { - ATH_MSG_FATAL("Unable to register " << inp << " with IoComponentMgr"); - return StatusCode::FAILURE; - } - } - - ATH_MSG_INFO("Successfully registered new input with IoComponentMgr"); - - // _______________________ Update Output File ___________________________ - std::string outputHitsFile = tokens["outputHITSFile"]; - - // Trim the strings. Remove '\"', '\'' and spaces - { - auto pos1 = std::find_if(outputHitsFile.begin(),outputHitsFile.end(),[](char ch){return (ch!=' ' && ch!='\'' && ch!='\"');}); - outputHitsFile.erase(outputHitsFile.begin(),pos1); - auto pos2 = std::find_if(outputHitsFile.rbegin(),outputHitsFile.rend(),[](char ch){return (ch!=' ' && ch!='\'' && ch!='\"');}); - outputHitsFile.erase(pos2.base(),outputHitsFile.end()); - ATH_MSG_INFO("Retrieved new name for the output file: " << outputHitsFile); - } - - // Fire incident, such that AthenaOutputStream can update its output file name - ServiceHandle<IIncidentSvc> incSvc("IncidentSvc",name()); - ATH_CHECK(incSvc.retrieve()); - incSvc->fireIncident(FileIncident(name(),"UpdateOutputFile",outputHitsFile)); - - // _______________________ Update Max Events ___________________________ - maxevt = std::atoi(tokens["maxEvents"].c_str()); - - // _______________________ Update Skip Events ___________________________ - int skipEvents = std::atoi(tokens["skipEvents"].c_str()); - return updateSkipEvents(skipEvents); -} - StatusCode AthMpEvtLoopMgr::updateSkipEvents(int skipEvents) { IProperty* propertyServer = dynamic_cast<IProperty*>(m_evtSelector); diff --git a/Control/AthenaMP/src/AthMpEvtLoopMgr.h b/Control/AthenaMP/src/AthMpEvtLoopMgr.h index c42b5c7e8050b119de8b47b80381ad7e16ff15bb..6f9eb6ca1dbab3e19730ae4d307aec8eda60f125 100644 --- a/Control/AthenaMP/src/AthMpEvtLoopMgr.h +++ b/Control/AthenaMP/src/AthMpEvtLoopMgr.h @@ -65,7 +65,6 @@ class AthMpEvtLoopMgr StatusCode wait(); StatusCode generateOutputReport(); boost::shared_ptr<AthenaInterprocess::FdsRegistry> extractFds(); - StatusCode afterRestart(int& maxevt); StatusCode updateSkipEvents(int skipEvents); }; diff --git a/Tools/PyJobTransforms/python/trfArgs.py b/Tools/PyJobTransforms/python/trfArgs.py index 7279206420cf0a35d04dd2897c8afaf8bd79c099..a89decf30920c7e54c46d0e0d9697a4ad2994ed6 100644 --- a/Tools/PyJobTransforms/python/trfArgs.py +++ b/Tools/PyJobTransforms/python/trfArgs.py @@ -106,12 +106,6 @@ def addAthenaArguments(parser, maxEventsDefaultSubstep='first', addValgrind=True metavar='N', group='Athena', help='Set AthenaMP to fork after processing N events (default is to fork immediately after ' 'initialisation') - parser.add_argument('--checkpoint', type=trfArgClasses.argFactory(trfArgClasses.argBool, runarg = False), - metavar='BOOL', group='Athena', - help='Checkpoint mode active') - parser.add_argument('--restart', type=trfArgClasses.argFactory(trfArgClasses.argString, runarg = False), - group='Athena', - help='Full path to the checkpoint image') parser.add_argument('--sharedWriter', type=trfArgClasses.argFactory(trfArgClasses.argBool, runarg=False), metavar='BOOL', group='Athena', help='SharedWriter mode active') diff --git a/Tools/PyJobTransforms/python/trfExe.py b/Tools/PyJobTransforms/python/trfExe.py index 58f7de9e1766e3df46c2a9e091fe0f7e4ba6b621..d8bfa657f4d1ab94fff99aca97e448032f12d15a 100755 --- a/Tools/PyJobTransforms/python/trfExe.py +++ b/Tools/PyJobTransforms/python/trfExe.py @@ -1315,22 +1315,7 @@ class athenaExecutor(scriptExecutor): else: msg.info('Valgrind not engaged') # run Athena command - if 'checkpoint' in self.conf.argdict and self.conf._argdict['checkpoint'].value is True: - for port in range(7770,7790): - if bind_port("127.0.0.1",port)==0: - break - msg.info("Using port %s for dmtcp_launch."%port) - print >>wrapper,'dmtcp_launch -p %s'%port, ' '.join(self._cmd) - elif 'restart' in self.conf.argdict and self.conf._argdict['restart'].value is not None and 'MergeAthenaMP' not in self.name: - restartTarball = self.conf._argdict['restart'].value - print >>wrapper, 'tar -xf %s -C .' % restartTarball - for port in range(7770,7790): - if bind_port("127.0.0.1",port)==0: - break - msg.info("Using port %s for dmtcp_launch."%port) - print >>wrapper, './dmtcp_restart_script.sh -p %s -h 127.0.0.1'%port - else: - print >>wrapper, ' '.join(self._cmd) + print >>wrapper, ' '.join(self._cmd) os.chmod(self._wrapperFile, 0755) except (IOError, OSError) as e: errMsg = 'error writing athena wrapper {fileName}: {error}'.format(