From 907ffd1a6697cd966735e64b3b59aa21a1237343 Mon Sep 17 00:00:00 2001 From: Marcin Nowak <marcin.nowak@cern.ch> Date: Thu, 7 May 2020 18:25:01 +0000 Subject: [PATCH] add prepareOutput(stream) and remove IgnoreInputFileBoundary flag IgnoreInputFileBoundary flag had no use and no plans for use. MetaDataSvc::prepareOutput(stream) is a new version of prepareOutput() that should work for parallel streams (metadata tools need to be adapted still) --- .../AthenaMP/share/AthenaMP_EventService.py | 1 - .../AthenaServices/src/AthenaOutputStream.cxx | 8 +++--- Control/AthenaServices/src/MetaDataSvc.cxx | 27 ++++++++++++++++--- Control/AthenaServices/src/MetaDataSvc.h | 5 +++- .../src/OutputStreamSequencerSvc.cxx | 5 ++-- .../src/OutputStreamSequencerSvc.h | 4 --- .../share/VP1_jobOptions.py | 1 - 7 files changed, 35 insertions(+), 16 deletions(-) diff --git a/Control/AthenaMP/share/AthenaMP_EventService.py b/Control/AthenaMP/share/AthenaMP_EventService.py index 5aba6394ca6..cd497963582 100644 --- a/Control/AthenaMP/share/AthenaMP_EventService.py +++ b/Control/AthenaMP/share/AthenaMP_EventService.py @@ -7,5 +7,4 @@ from AthenaCommon.AppMgr import ServiceMgr as svcMgr from AthenaServices.AthenaServicesConf import OutputStreamSequencerSvc outputStreamSequencerSvc = OutputStreamSequencerSvc() outputStreamSequencerSvc.SequenceIncidentName = "NextEventRange" -outputStreamSequencerSvc.IgnoreInputFileBoundary = True svcMgr += outputStreamSequencerSvc diff --git a/Control/AthenaServices/src/AthenaOutputStream.cxx b/Control/AthenaServices/src/AthenaOutputStream.cxx index c783303a0de..08f9607bc04 100644 --- a/Control/AthenaServices/src/AthenaOutputStream.cxx +++ b/Control/AthenaServices/src/AthenaOutputStream.cxx @@ -440,7 +440,7 @@ void AthenaOutputStream::writeMetaData(const std::string outputFN) throw GaudiException("Cannot finalize helper tool", name(), StatusCode::FAILURE); } } - if( m_metaDataSvc->prepareOutput().isFailure() ) { + if( m_metaDataSvc->prepareOutput(outputFN).isFailure() ) { throw GaudiException("Failed on MetaDataSvc prepareOutput", name(), StatusCode::FAILURE); } // Always force a final commit in stop - mainly applies to AthenaPool @@ -539,7 +539,7 @@ StatusCode AthenaOutputStream::write() { bool failed = false; EventContext::ContextID_t slot = Gaudi::Hive::currentContext().slot(); IAthenaOutputStreamTool* streamer = &*m_streamer; - std::string outputFN = m_outSeqSvc->buildSequenceFileName(m_outputName); + std::string outputFN; std::unique_lock<mutex_t> lock(m_mutex); @@ -561,8 +561,10 @@ StatusCode AthenaOutputStream::write() { } m_streamerMap[ outputFN ].reset( streamer ); } + } else { + outputFN = m_outSeqSvc->buildSequenceFileName(m_outputName); } - + // Clear any previously existing item list clearSelection(); collectAllObjects(); diff --git a/Control/AthenaServices/src/MetaDataSvc.cxx b/Control/AthenaServices/src/MetaDataSvc.cxx index df89cd9db69..186e6f9797c 100644 --- a/Control/AthenaServices/src/MetaDataSvc.cxx +++ b/Control/AthenaServices/src/MetaDataSvc.cxx @@ -321,6 +321,27 @@ StatusCode MetaDataSvc::prepareOutput() return rc; } +// like prepareOutput() but for parallel streams +StatusCode MetaDataSvc::prepareOutput(const std::string& outputName) +{ + // default to the serial implementation if no output name given + if( outputName.empty() ) return prepareOutput(); + ATH_MSG_DEBUG( "prepareOutput('" << outputName << "')" ); + + StatusCode rc = StatusCode::SUCCESS; + for (auto it = m_metaDataTools.begin(); it != m_metaDataTools.end(); ++it) { + ATH_MSG_DEBUG(" calling metaDataStop for " << (*it)->name()); + // planning to replace the call below with (*it)->prepareOutput(outputName) + if ( (*it)->metaDataStop().isFailure() ) { + ATH_MSG_ERROR("Unable to call metaDataStop for " << it->name()); + rc = StatusCode::FAILURE; + } + } + // MN: not releasing tools here - revisit when clear what happens on new file open + return rc; +} + + StatusCode MetaDataSvc::shmProxy(const std::string& filename) { if (!m_clearedInputDataStore) { @@ -374,9 +395,8 @@ void MetaDataSvc::handle(const Incident& inc) { } } //__________________________________________________________________________ -StatusCode MetaDataSvc::transitionMetaDataFile(bool ignoreInputFile) { - // Allow MetaDataStop only on Input file transitions - if (!m_allowMetaDataStop && !ignoreInputFile) { +StatusCode MetaDataSvc::transitionMetaDataFile() { + if( !m_allowMetaDataStop ) { return(StatusCode::FAILURE); } // Make sure metadata is ready for writing @@ -469,6 +489,7 @@ StatusCode MetaDataSvc::addProxyToInputMetaDataStore(const std::string& tokenStr } ToolHandle<IMetaDataTool> metadataTool(toolInstName); m_metaDataTools.push_back(metadataTool); + ATH_MSG_DEBUG("Added new MetadDataTool: " << metadataTool->name()); if (!metadataTool.retrieve().isSuccess()) { ATH_MSG_FATAL("Cannot get " << toolInstName); return(StatusCode::FAILURE); diff --git a/Control/AthenaServices/src/MetaDataSvc.h b/Control/AthenaServices/src/MetaDataSvc.h index 23c2a977861..24dbc206eb3 100644 --- a/Control/AthenaServices/src/MetaDataSvc.h +++ b/Control/AthenaServices/src/MetaDataSvc.h @@ -77,6 +77,9 @@ public: // Non-static members /// ready for output virtual StatusCode prepareOutput() override; + /// version of prepareOutput() for parallel streams + virtual StatusCode prepareOutput(const std::string& outputName); + virtual StatusCode shmProxy(const std::string& filename) override; virtual StatusCode queryInterface(const InterfaceID& riid, void** ppvInterface) override; @@ -98,7 +101,7 @@ public: // Non-static members virtual void handle(const Incident& incident) override; /// Transition output metadata file - fire MeteDataStop incident to transition OutputStream - StatusCode transitionMetaDataFile(bool ignoreInputFile = false); + StatusCode transitionMetaDataFile(); /// Callback method to reinitialize the internal state of the component for I/O purposes (e.g. upon @c fork(2)) virtual StatusCode io_reinit() override; diff --git a/Control/AthenaServices/src/OutputStreamSequencerSvc.cxx b/Control/AthenaServices/src/OutputStreamSequencerSvc.cxx index f7337541f65..f678820a9a3 100644 --- a/Control/AthenaServices/src/OutputStreamSequencerSvc.cxx +++ b/Control/AthenaServices/src/OutputStreamSequencerSvc.cxx @@ -23,7 +23,6 @@ OutputStreamSequencerSvc::OutputStreamSequencerSvc(const std::string& name, ISvc { // declare properties declareProperty("SequenceIncidentName", m_incidentName = ""); - declareProperty("IgnoreInputFileBoundary", m_ignoreInputFile = false); } //__________________________________________________________________________ OutputStreamSequencerSvc::~OutputStreamSequencerSvc() { @@ -101,7 +100,7 @@ void OutputStreamSequencerSvc::handle(const Incident& inc) // When processing events sequentially (threads<2) write metadata on the NextRange incident // but ignore the first incident because it only starts the first sequence ATH_MSG_DEBUG("MetaData transition"); - if (!m_metaDataSvc->transitionMetaDataFile( ignoringInputBoundary() ).isSuccess()) { + if (!m_metaDataSvc->transitionMetaDataFile().isSuccess()) { ATH_MSG_FATAL("Cannot transition MetaDataSvc."); } } @@ -118,7 +117,7 @@ void OutputStreamSequencerSvc::handle(const Incident& inc) n << "_" << std::setw(4) << std::setfill('0') << m_fileSequenceNumber; m_currentRangeID = n.str(); ATH_MSG_DEBUG("Default next event range filename extension: " << m_currentRangeID); - } + } } //__________________________________________________________________________ diff --git a/Control/AthenaServices/src/OutputStreamSequencerSvc.h b/Control/AthenaServices/src/OutputStreamSequencerSvc.h index 3880c52ac99..0b43715ba4d 100644 --- a/Control/AthenaServices/src/OutputStreamSequencerSvc.h +++ b/Control/AthenaServices/src/OutputStreamSequencerSvc.h @@ -63,8 +63,6 @@ public: // Non-static members /// The name of the incident that starts a new event sequence std::string incidentName() const { return m_incidentName.value(); } - bool ignoringInputBoundary() const { return m_ignoreInputFile.value(); } - /// Is the service in active use? (true after the first range incident is handled) bool inUse() const; @@ -83,8 +81,6 @@ private: // data private: // properties /// SequenceIncidentName, incident name for triggering file sequencing. StringProperty m_incidentName; - /// IgnoreInputFileBoundary, boolean whether to ignore the input file boundary requirement for file sequencing. - BooleanProperty m_ignoreInputFile; std::map<std::string,std::string> m_fnToRangeId; std::map<std::string,std::string>::iterator m_finishedRange; diff --git a/graphics/EventDisplaysOnline/share/VP1_jobOptions.py b/graphics/EventDisplaysOnline/share/VP1_jobOptions.py index 67251667c26..4b60d58f7fb 100644 --- a/graphics/EventDisplaysOnline/share/VP1_jobOptions.py +++ b/graphics/EventDisplaysOnline/share/VP1_jobOptions.py @@ -9,7 +9,6 @@ from AthenaServices.AthenaServicesConf import OutputStreamSequencerSvc outputStreamSequencerSvc = OutputStreamSequencerSvc() outputStreamSequencerSvc.SequenceIncidentName = "EndEvent" -outputStreamSequencerSvc.IgnoreInputFileBoundary = True svcMgr += outputStreamSequencerSvc ### Add the algorithm producing VP1 events -- GitLab