Skip to content
Snippets Groups Projects
Commit 907ffd1a authored by Marcin Nowak's avatar Marcin Nowak :radioactive: Committed by Walter Lampl
Browse files

add prepareOutput(stream) and remove IgnoreInputFileBoundary flag

IgnoreInputFileBoundary flag had no use and no plans for use.
MetaDataSvc::prepareOutput(stream) is a new version of prepareOutput()
that should work for parallel streams (metadata tools need to be adapted
still)
parent 46198016
No related branches found
No related tags found
No related merge requests found
...@@ -7,5 +7,4 @@ from AthenaCommon.AppMgr import ServiceMgr as svcMgr ...@@ -7,5 +7,4 @@ from AthenaCommon.AppMgr import ServiceMgr as svcMgr
from AthenaServices.AthenaServicesConf import OutputStreamSequencerSvc from AthenaServices.AthenaServicesConf import OutputStreamSequencerSvc
outputStreamSequencerSvc = OutputStreamSequencerSvc() outputStreamSequencerSvc = OutputStreamSequencerSvc()
outputStreamSequencerSvc.SequenceIncidentName = "NextEventRange" outputStreamSequencerSvc.SequenceIncidentName = "NextEventRange"
outputStreamSequencerSvc.IgnoreInputFileBoundary = True
svcMgr += outputStreamSequencerSvc svcMgr += outputStreamSequencerSvc
...@@ -440,7 +440,7 @@ void AthenaOutputStream::writeMetaData(const std::string outputFN) ...@@ -440,7 +440,7 @@ void AthenaOutputStream::writeMetaData(const std::string outputFN)
throw GaudiException("Cannot finalize helper tool", name(), StatusCode::FAILURE); throw GaudiException("Cannot finalize helper tool", name(), StatusCode::FAILURE);
} }
} }
if( m_metaDataSvc->prepareOutput().isFailure() ) { if( m_metaDataSvc->prepareOutput(outputFN).isFailure() ) {
throw GaudiException("Failed on MetaDataSvc prepareOutput", name(), StatusCode::FAILURE); throw GaudiException("Failed on MetaDataSvc prepareOutput", name(), StatusCode::FAILURE);
} }
// Always force a final commit in stop - mainly applies to AthenaPool // Always force a final commit in stop - mainly applies to AthenaPool
...@@ -539,7 +539,7 @@ StatusCode AthenaOutputStream::write() { ...@@ -539,7 +539,7 @@ StatusCode AthenaOutputStream::write() {
bool failed = false; bool failed = false;
EventContext::ContextID_t slot = Gaudi::Hive::currentContext().slot(); EventContext::ContextID_t slot = Gaudi::Hive::currentContext().slot();
IAthenaOutputStreamTool* streamer = &*m_streamer; IAthenaOutputStreamTool* streamer = &*m_streamer;
std::string outputFN = m_outSeqSvc->buildSequenceFileName(m_outputName); std::string outputFN;
std::unique_lock<mutex_t> lock(m_mutex); std::unique_lock<mutex_t> lock(m_mutex);
...@@ -561,8 +561,10 @@ StatusCode AthenaOutputStream::write() { ...@@ -561,8 +561,10 @@ StatusCode AthenaOutputStream::write() {
} }
m_streamerMap[ outputFN ].reset( streamer ); m_streamerMap[ outputFN ].reset( streamer );
} }
} else {
outputFN = m_outSeqSvc->buildSequenceFileName(m_outputName);
} }
// Clear any previously existing item list // Clear any previously existing item list
clearSelection(); clearSelection();
collectAllObjects(); collectAllObjects();
......
...@@ -321,6 +321,27 @@ StatusCode MetaDataSvc::prepareOutput() ...@@ -321,6 +321,27 @@ StatusCode MetaDataSvc::prepareOutput()
return rc; return rc;
} }
// like prepareOutput() but for parallel streams
StatusCode MetaDataSvc::prepareOutput(const std::string& outputName)
{
// default to the serial implementation if no output name given
if( outputName.empty() ) return prepareOutput();
ATH_MSG_DEBUG( "prepareOutput('" << outputName << "')" );
StatusCode rc = StatusCode::SUCCESS;
for (auto it = m_metaDataTools.begin(); it != m_metaDataTools.end(); ++it) {
ATH_MSG_DEBUG(" calling metaDataStop for " << (*it)->name());
// planning to replace the call below with (*it)->prepareOutput(outputName)
if ( (*it)->metaDataStop().isFailure() ) {
ATH_MSG_ERROR("Unable to call metaDataStop for " << it->name());
rc = StatusCode::FAILURE;
}
}
// MN: not releasing tools here - revisit when clear what happens on new file open
return rc;
}
StatusCode MetaDataSvc::shmProxy(const std::string& filename) StatusCode MetaDataSvc::shmProxy(const std::string& filename)
{ {
if (!m_clearedInputDataStore) { if (!m_clearedInputDataStore) {
...@@ -374,9 +395,8 @@ void MetaDataSvc::handle(const Incident& inc) { ...@@ -374,9 +395,8 @@ void MetaDataSvc::handle(const Incident& inc) {
} }
} }
//__________________________________________________________________________ //__________________________________________________________________________
StatusCode MetaDataSvc::transitionMetaDataFile(bool ignoreInputFile) { StatusCode MetaDataSvc::transitionMetaDataFile() {
// Allow MetaDataStop only on Input file transitions if( !m_allowMetaDataStop ) {
if (!m_allowMetaDataStop && !ignoreInputFile) {
return(StatusCode::FAILURE); return(StatusCode::FAILURE);
} }
// Make sure metadata is ready for writing // Make sure metadata is ready for writing
...@@ -469,6 +489,7 @@ StatusCode MetaDataSvc::addProxyToInputMetaDataStore(const std::string& tokenStr ...@@ -469,6 +489,7 @@ StatusCode MetaDataSvc::addProxyToInputMetaDataStore(const std::string& tokenStr
} }
ToolHandle<IMetaDataTool> metadataTool(toolInstName); ToolHandle<IMetaDataTool> metadataTool(toolInstName);
m_metaDataTools.push_back(metadataTool); m_metaDataTools.push_back(metadataTool);
ATH_MSG_DEBUG("Added new MetadDataTool: " << metadataTool->name());
if (!metadataTool.retrieve().isSuccess()) { if (!metadataTool.retrieve().isSuccess()) {
ATH_MSG_FATAL("Cannot get " << toolInstName); ATH_MSG_FATAL("Cannot get " << toolInstName);
return(StatusCode::FAILURE); return(StatusCode::FAILURE);
......
...@@ -77,6 +77,9 @@ public: // Non-static members ...@@ -77,6 +77,9 @@ public: // Non-static members
/// ready for output /// ready for output
virtual StatusCode prepareOutput() override; virtual StatusCode prepareOutput() override;
/// version of prepareOutput() for parallel streams
virtual StatusCode prepareOutput(const std::string& outputName);
virtual StatusCode shmProxy(const std::string& filename) override; virtual StatusCode shmProxy(const std::string& filename) override;
virtual StatusCode queryInterface(const InterfaceID& riid, void** ppvInterface) override; virtual StatusCode queryInterface(const InterfaceID& riid, void** ppvInterface) override;
...@@ -98,7 +101,7 @@ public: // Non-static members ...@@ -98,7 +101,7 @@ public: // Non-static members
virtual void handle(const Incident& incident) override; virtual void handle(const Incident& incident) override;
/// Transition output metadata file - fire MeteDataStop incident to transition OutputStream /// Transition output metadata file - fire MeteDataStop incident to transition OutputStream
StatusCode transitionMetaDataFile(bool ignoreInputFile = false); StatusCode transitionMetaDataFile();
/// Callback method to reinitialize the internal state of the component for I/O purposes (e.g. upon @c fork(2)) /// Callback method to reinitialize the internal state of the component for I/O purposes (e.g. upon @c fork(2))
virtual StatusCode io_reinit() override; virtual StatusCode io_reinit() override;
......
...@@ -23,7 +23,6 @@ OutputStreamSequencerSvc::OutputStreamSequencerSvc(const std::string& name, ISvc ...@@ -23,7 +23,6 @@ OutputStreamSequencerSvc::OutputStreamSequencerSvc(const std::string& name, ISvc
{ {
// declare properties // declare properties
declareProperty("SequenceIncidentName", m_incidentName = ""); declareProperty("SequenceIncidentName", m_incidentName = "");
declareProperty("IgnoreInputFileBoundary", m_ignoreInputFile = false);
} }
//__________________________________________________________________________ //__________________________________________________________________________
OutputStreamSequencerSvc::~OutputStreamSequencerSvc() { OutputStreamSequencerSvc::~OutputStreamSequencerSvc() {
...@@ -101,7 +100,7 @@ void OutputStreamSequencerSvc::handle(const Incident& inc) ...@@ -101,7 +100,7 @@ void OutputStreamSequencerSvc::handle(const Incident& inc)
// When processing events sequentially (threads<2) write metadata on the NextRange incident // When processing events sequentially (threads<2) write metadata on the NextRange incident
// but ignore the first incident because it only starts the first sequence // but ignore the first incident because it only starts the first sequence
ATH_MSG_DEBUG("MetaData transition"); ATH_MSG_DEBUG("MetaData transition");
if (!m_metaDataSvc->transitionMetaDataFile( ignoringInputBoundary() ).isSuccess()) { if (!m_metaDataSvc->transitionMetaDataFile().isSuccess()) {
ATH_MSG_FATAL("Cannot transition MetaDataSvc."); ATH_MSG_FATAL("Cannot transition MetaDataSvc.");
} }
} }
...@@ -118,7 +117,7 @@ void OutputStreamSequencerSvc::handle(const Incident& inc) ...@@ -118,7 +117,7 @@ void OutputStreamSequencerSvc::handle(const Incident& inc)
n << "_" << std::setw(4) << std::setfill('0') << m_fileSequenceNumber; n << "_" << std::setw(4) << std::setfill('0') << m_fileSequenceNumber;
m_currentRangeID = n.str(); m_currentRangeID = n.str();
ATH_MSG_DEBUG("Default next event range filename extension: " << m_currentRangeID); ATH_MSG_DEBUG("Default next event range filename extension: " << m_currentRangeID);
} }
} }
//__________________________________________________________________________ //__________________________________________________________________________
......
...@@ -63,8 +63,6 @@ public: // Non-static members ...@@ -63,8 +63,6 @@ public: // Non-static members
/// The name of the incident that starts a new event sequence /// The name of the incident that starts a new event sequence
std::string incidentName() const { return m_incidentName.value(); } std::string incidentName() const { return m_incidentName.value(); }
bool ignoringInputBoundary() const { return m_ignoreInputFile.value(); }
/// Is the service in active use? (true after the first range incident is handled) /// Is the service in active use? (true after the first range incident is handled)
bool inUse() const; bool inUse() const;
...@@ -83,8 +81,6 @@ private: // data ...@@ -83,8 +81,6 @@ private: // data
private: // properties private: // properties
/// SequenceIncidentName, incident name for triggering file sequencing. /// SequenceIncidentName, incident name for triggering file sequencing.
StringProperty m_incidentName; StringProperty m_incidentName;
/// IgnoreInputFileBoundary, boolean whether to ignore the input file boundary requirement for file sequencing.
BooleanProperty m_ignoreInputFile;
std::map<std::string,std::string> m_fnToRangeId; std::map<std::string,std::string> m_fnToRangeId;
std::map<std::string,std::string>::iterator m_finishedRange; std::map<std::string,std::string>::iterator m_finishedRange;
......
...@@ -9,7 +9,6 @@ from AthenaServices.AthenaServicesConf import OutputStreamSequencerSvc ...@@ -9,7 +9,6 @@ from AthenaServices.AthenaServicesConf import OutputStreamSequencerSvc
outputStreamSequencerSvc = OutputStreamSequencerSvc() outputStreamSequencerSvc = OutputStreamSequencerSvc()
outputStreamSequencerSvc.SequenceIncidentName = "EndEvent" outputStreamSequencerSvc.SequenceIncidentName = "EndEvent"
outputStreamSequencerSvc.IgnoreInputFileBoundary = True
svcMgr += outputStreamSequencerSvc svcMgr += outputStreamSequencerSvc
### Add the algorithm producing VP1 events ### Add the algorithm producing VP1 events
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment