Skip to content
Snippets Groups Projects
Commit e3d9f0e5 authored by Marcin Nowak's avatar Marcin Nowak :radioactive: Committed by Julien Maurer
Browse files

Cherry-pick !5d1b2cdc from master: Add flag to OutputStream to write metadata...

Cherry-pick  !5d1b2cdc from master: Add flag to OutputStream to write metadata and close after next Event.  ATEAM-832

Merge branch 'Sequencer.change.transition.SP' into 'master'

Add flag to OutputStream to write metadata and close after next Event. ATEAM-832

See merge request atlas/athena!53619

(cherry picked from commit b4fc1ea8)

5d1b2cdc Add flag to OutputStream to write metadata and close after next Event
parent 2115e4f8
No related branches found
No related tags found
No related merge requests found
......@@ -378,10 +378,17 @@ void AthenaOutputStream::handle(const Incident& inc)
std::unique_lock<mutex_t> lock(m_mutex);
if( inc.type() == "MetaDataStop" ) {
if( m_outSeqSvc->inUse() and m_outSeqSvc->inConcurrentEventsMode() ) {
// all substreams should be closed by this point
ATH_MSG_DEBUG("Ignoring MetaDataStop incident in ES mode");
return;
if( m_outSeqSvc->inUse() ) {
if( m_outSeqSvc->inConcurrentEventsMode() ) {
// EventService MT - all substreams should be closed by this point
ATH_MSG_DEBUG("Ignoring MetaDataStop incident in ES/MT mode");
return;
}
if( m_outSeqSvc->lastIncident() == "EndEvent" ) {
// in r22 EndEvent comes before output writing - queue metadata writing and disconnect for after Event write
m_writeMetadataAndDisconnect = true;
return;
}
}
// not in Event Service
writeMetaData();
......@@ -531,6 +538,14 @@ StatusCode AthenaOutputStream::execute() {
failed = true;
}
}
if( m_writeMetadataAndDisconnect ) {
writeMetaData();
m_writeMetadataAndDisconnect = false;
// finalize will disconnect output
if( !finalize().isSuccess() ) {
failed = true;
}
}
if (failed) {
return(StatusCode::FAILURE);
}
......
......@@ -142,13 +142,15 @@ protected:
/// map to record number of writes per object
typedef std::map<std::string, unsigned int> CounterMapType;
CounterMapType m_objectWriteCounter;
/// Vector of names of AlgTools that are executed by this stream
/// pointer to AthenaOutputStreamTool
ToolHandle<IAthenaOutputStreamTool> m_streamer;
/// vector of AlgTools that that are executed by this stream
ToolHandleArray<IAthenaOutputTool> m_helperTools;
// flag set by MetaDataStop if OutputSequencer is used with EndEvent
bool m_writeMetadataAndDisconnect = false;
// ------- Event Ranges handling in MT -------
/// map of filenames assigned to active slots
std::map< unsigned, std::string > m_slotRangeMap;
......
......@@ -372,7 +372,7 @@ void MetaDataSvc::handle(const Incident& inc) {
//__________________________________________________________________________
// This method is currently called only from OutputStreamSequencerSvc
StatusCode MetaDataSvc::transitionMetaDataFile(const std::string& outputConn)
StatusCode MetaDataSvc::transitionMetaDataFile(const std::string& outputConn, bool disconnect)
{
ATH_MSG_DEBUG("transitionMetaDataFile: " << outputConn );
......@@ -380,21 +380,20 @@ StatusCode MetaDataSvc::transitionMetaDataFile(const std::string& outputConn)
FileIncident inc("transitionMetaDataFile", "EndInputFile", "dummyMetaInputFileName", "");
ATH_CHECK(retireMetadataSource(inc));
// Make sure metadata is ready for writing
// MN TODO: this call is redundant due to AthenaOutputStream.cxx:447 - remove later
ATH_CHECK(this->prepareOutput());
// Reset flag to allow calling prepareOutput again at next transition
m_outputPreprared = false;
Incident metaDataStopIncident(name(), "MetaDataStop");
m_incSvc->fireIncident(metaDataStopIncident);
AthCnvSvc* cnvSvc = dynamic_cast<AthCnvSvc*>(m_addrCrtr.operator->());
if (cnvSvc) {
if (!cnvSvc->disconnectOutput(outputConn).isSuccess()) {
ATH_MSG_WARNING("Cannot get disconnect Output Files");
if( disconnect ) {
AthCnvSvc* cnvSvc = dynamic_cast<AthCnvSvc*>(m_addrCrtr.operator->());
if (cnvSvc) {
if (!cnvSvc->disconnectOutput(outputConn).isSuccess()) {
ATH_MSG_WARNING("Cannot get disconnect Output Files");
}
}
}
// Reset flag to allow calling prepareOutput again at next transition
m_outputPreprared = false;
return(StatusCode::SUCCESS);
}
......
......@@ -232,8 +232,8 @@ class MetaDataSvc : public ::AthService,
virtual void handle(const Incident& incident) override;
/// Transition output metadata file - fire MeteDataStop incident to transition
/// OutputStream
StatusCode transitionMetaDataFile(const std::string& outputConn);
/// OutputStream and disconnect now if requested
StatusCode transitionMetaDataFile(const std::string& outputConn, bool disconnect);
/** Implements IIoComponent interface
* sets m_outputPreprared to false and prints some information.
......
......@@ -41,6 +41,11 @@ StatusCode OutputStreamSequencerSvc::initialize() {
incsvc->addListener(this, IncidentType::BeginProcessing, 100);
ATH_MSG_DEBUG("Listening to " << incidentName() << " incidents" );
ATH_MSG_DEBUG("Reporting is " << (m_reportingOn.value()? "ON" : "OFF") );
// Retrieve MetaDataSvc
if( !m_metaDataSvc.isValid() and !m_metaDataSvc.retrieve().isSuccess() ) {
ATH_MSG_ERROR("Cannot get MetaDataSvc");
return StatusCode::FAILURE;
}
}
if( inConcurrentEventsMode() ) {
......@@ -92,6 +97,7 @@ void OutputStreamSequencerSvc::handle(const Incident& inc)
auto slot = Gaudi::Hive::currentContext().slot();
if( slot == EventContext::INVALID_CONTEXT_ID ) slot = 0;
m_lastIncident = inc.type();
if( inc.type() == incidentName() ) { // NextEventRange
std::string rangeID;
......@@ -100,23 +106,18 @@ void OutputStreamSequencerSvc::handle(const Incident& inc)
rangeID = fileInc->fileName();
ATH_MSG_DEBUG("Requested (through incident) Next Event Range filename extension: " << rangeID);
}
if( not inConcurrentEventsMode() ) {
// finish the previous Range here only in SEQUENTIAL (threads<2) event processing
if( rangeID=="dummy" or // for EventService MP
( rangeID=="" and m_fileSequenceNumber>=0 ) ) { // for Athena SP Event 1+
// Write metadata on the incident finishing a Range (filename=="dummy") in ES
// or on non-file incident (filename=="") in regular LoopMgr (skip first incident)
if( rangeID == "dummy" ) {
if( not inConcurrentEventsMode() ) {
// finish the previous Range here only in SEQUENTIAL (threads<2) event processing
// Write metadata on the incident finishing a Range (filename=="dummy") in ES MP
ATH_MSG_DEBUG("MetaData transition");
// Retrieve MetaDataSvc
if( !m_metaDataSvc.isValid() and !m_metaDataSvc.retrieve().isSuccess() ) {
throw GaudiException("Cannot get MetaDataSvc", name(), StatusCode::FAILURE);
}
if( !m_metaDataSvc->transitionMetaDataFile(m_lastFileName).isSuccess() ) {
// immediate write and disconnect for ES, otherwise do it after Event write is done
bool disconnect { true };
if( !m_metaDataSvc->transitionMetaDataFile( m_lastFileName, disconnect ).isSuccess() ) {
throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE);
}
}
}
if( rangeID=="dummy" ) {
// exit now, wait for the next (real) incident that will start the next range
return;
}
......@@ -136,6 +137,16 @@ void OutputStreamSequencerSvc::handle(const Incident& inc)
m_rangeIDinSlot[ slot ] = rangeID;
// remember range ID for next events in the same range
m_currentRangeID = rangeID;
if( not inConcurrentEventsMode() ) {
// non-file incident case (filename=="") in regular SP LoopMgr
ATH_MSG_DEBUG("MetaData transition");
bool disconnect { false };
// MN: may not know the full filename yet, but that is only needed for disconnect==true
if( !m_metaDataSvc->transitionMetaDataFile( "" /*m_lastFileName*/, disconnect ).isSuccess() ) {
throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE);
}
}
}
else if( inc.type() == IncidentType::BeginProcessing ) {
// new event start - assing current rangeId to its slot
......
......@@ -73,6 +73,9 @@ public: // Non-static members
/// Are there concurrent events? (threads>1)
static bool inConcurrentEventsMode();
/// Last incident type that was handled
const std::string& lastIncident() { return m_lastIncident; }
private: // data
ServiceHandle<MetaDataSvc> m_metaDataSvc;
......@@ -85,6 +88,9 @@ private: // data
/// Recently constructed full file name (useful in single threaded processing)
std::string m_lastFileName;
/// Last incident type that was handled
std::string m_lastIncident;
/// EventRange ID for all slots
std::vector<std::string> m_rangeIDinSlot;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment