Skip to content
Snippets Groups Projects
Commit 07ee389c authored by Vakhtang Tsulaia's avatar Vakhtang Tsulaia
Browse files

Merge branch 'sim.metadata' into 'master'

AthenaOutputStream: added writeMetaData(), not calling metaFileTransition in ES mode

See merge request atlas/athena!32448
parents 11b2dc58 40d9bf0c
No related branches found
No related tags found
No related merge requests found
......@@ -369,92 +369,39 @@ void AthenaOutputStream::handle(const Incident& inc)
{
EventContext::ContextID_t slot = inc.context().slot();
ATH_MSG_DEBUG("slot " << slot << " handle() incident type: " << inc.type());
// mutex shared with write() which is called from writeMetaData
std::unique_lock<mutex_t> lock(m_mutex);
if( inc.type() == "MetaDataStop" ) {
const std::string outputFN = m_slotRangeMap[ slot ];
IAthenaOutputStreamTool* streamer = &*m_streamer;
if( m_outSeqSvc->inUse() and m_outSeqSvc->inConcurrentEventsMode() ) {
if( slot == EventContext::INVALID_CONTEXT_ID ) {
// slot is invalid during application stop, but all ranges are closed by that time
ATH_MSG_DEBUG("Ignoring MetaDataStop incident with invalid slot");
return;
}
streamer = m_streamerMap[outputFN].get();
}
// Moved preFinalize of helper tools to stop - want to optimize the
// output file in finalize RDS 12/2009
for (std::vector<ToolHandle<IAthenaOutputTool> >::iterator iter = m_helperTools.begin();
iter != m_helperTools.end(); iter++) {
if (!(*iter)->preFinalize().isSuccess()) {
ATH_MSG_ERROR("Cannot finalize helper tool");
}
}
if( m_metaDataSvc->prepareOutput().isFailure() ) {
ATH_MSG_ERROR("Failed on MetaDataSvc prepareOutput");
}
// Always force a final commit in stop - mainly applies to AthenaPool
if (m_writeOnFinalize) {
if (write().isFailure()) { // true mean write AND commit
ATH_MSG_ERROR("Cannot write on finalize");
}
ATH_MSG_INFO("Records written: " << m_events);
}
if (!m_metadataItemList.value().empty()) {
m_currentStore = &m_metadataStore;
StatusCode status = streamer->connectServices(m_metadataStore.type(), m_persName, false);
if (status.isFailure()) {
throw GaudiException("Unable to connect metadata services", name(), StatusCode::FAILURE);
}
m_checkNumberOfWrites = false;
m_outputAttributes = "[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData][AttributeListKey=]";
m_p2BWritten->clear();
IProperty *pAsIProp(nullptr);
if ((m_p2BWritten.retrieve()).isFailure() ||
nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_p2BWritten)) ||
(pAsIProp->setProperty("ItemList", m_metadataItemList.toString())).isFailure()) {
throw GaudiException("Folder property [metadataItemList] not found", name(), StatusCode::FAILURE);
}
if (write().isFailure()) { // true mean write AND commit
ATH_MSG_ERROR("Cannot write metadata");
}
m_outputAttributes.clear();
m_currentStore = &m_dataStore;
status = streamer->connectServices(m_dataStore.type(), m_persName, m_extendProvenanceRecord);
if (status.isFailure()) {
throw GaudiException("Unable to re-connect services", name(), StatusCode::FAILURE);
}
m_p2BWritten->clear();
if ((pAsIProp->setProperty(m_itemList)).isFailure()) {
throw GaudiException("Folder property [itemList] not found", name(), StatusCode::FAILURE);
}
ATH_MSG_INFO("Records written: " << m_events);
}
}
// Handle Event Ranges
// Handle Event Ranges for Event Service
if( m_outSeqSvc->inUse() and m_outSeqSvc->inConcurrentEventsMode() )
{
if( inc.type() == "MetaDataStop" ) {
// all substreams should be closed by this point
ATH_MSG_DEBUG("Ignoring MetaDataStop incident in ES mode");
return;
}
if( slot == EventContext::INVALID_CONTEXT_ID ) {
throw GaudiException("Received Incident with invalid slot in ES mode", name(), StatusCode::FAILURE);
}
if( inc.type() == IncidentType::BeginProcessing ) {
// remember in which output filename this event should be stored
m_slotRangeMap[ slot ] = m_outSeqSvc->buildSequenceFileName(m_outputName);
ATH_MSG_DEBUG("slot " << slot << " assigned filename: " << m_slotRangeMap[ slot ] );
return;
}
if( inc.type() == IncidentType::EndProcessing ) {
} else if( inc.type() == IncidentType::EndProcessing ) {
std::string rangeFN = m_slotRangeMap[ slot ];
if( !rangeFN.empty() ) {
// check how many events there are still for the given range
int n = 0;
for( auto& elem : m_slotRangeMap ) {
if( elem.second == rangeFN ) n++;
}
if( n == 1 ) {
// this was the last event in this range, finalize it
ATH_MSG_DEBUG("slot " << slot << " starting transition MetaData for " << rangeFN);
if( !m_metaDataSvc->transitionMetaDataFile( m_outSeqSvc->ignoringInputBoundary() ).isSuccess() ) {
ATH_MSG_FATAL("Cannot transition MetaDataSvc");
}
ATH_MSG_INFO("Finished writing event sequence to " << rangeFN );
ATH_MSG_DEBUG("slot " << slot << " writing MetaData to " << rangeFN);
// MN: not calling StopMetaDataIncident here - OK for Sim, check others
// metadata tools like CutFlowSvc are not able to handle this yet
writeMetaData( rangeFN );
ATH_MSG_INFO("Finished writing Event Sequence to " << rangeFN );
auto strm_iter = m_streamerMap.find( rangeFN );
strm_iter->second->finalizeOutput().ignore();
strm_iter->second->finalize().ignore();
......@@ -463,14 +410,79 @@ void AthenaOutputStream::handle(const Incident& inc)
}
m_slotRangeMap[ slot ].clear();
} else {
ATH_MSG_ERROR("Failed to handle EndProcessing incident");
throw GaudiException("Failed to handle EndProcessing incident - range filename not found",
name(), StatusCode::FAILURE);
}
}
} else {
// not in Event Service
if( inc.type() == "MetaDataStop" ) {
writeMetaData();
}
}
ATH_MSG_DEBUG("Leaving incident handler for " << inc.type());
}
// method to write MetaData for this stream
// in ES mode the range substream is determined by the current Event slot
// called from the incident handler - returns void and throws GaudiExceptions on errors
void AthenaOutputStream::writeMetaData(const std::string outputFN)
{
// use main stream tool by default, or per outputFile in ES mode
IAthenaOutputStreamTool* streamer = outputFN.empty()? &*m_streamer : m_streamerMap[outputFN].get();
// Moved preFinalize of helper tools to stop - want to optimize the
// output file in finalize RDS 12/2009
for (std::vector<ToolHandle<IAthenaOutputTool> >::iterator iter = m_helperTools.begin();
iter != m_helperTools.end(); iter++) {
if (!(*iter)->preFinalize().isSuccess()) {
throw GaudiException("Cannot finalize helper tool", name(), StatusCode::FAILURE);
}
}
if( m_metaDataSvc->prepareOutput().isFailure() ) {
throw GaudiException("Failed on MetaDataSvc prepareOutput", name(), StatusCode::FAILURE);
}
// Always force a final commit in stop - mainly applies to AthenaPool
if (m_writeOnFinalize) {
if (write().isFailure()) { // true mean write AND commit
throw GaudiException("Cannot write on finalize", name(), StatusCode::FAILURE);
}
ATH_MSG_INFO("Records written: " << m_events);
}
ATH_MSG_DEBUG("metadataItemList: " << m_metadataItemList.value() );
if (!m_metadataItemList.value().empty()) {
m_currentStore = &m_metadataStore;
StatusCode status = streamer->connectServices(m_metadataStore.type(), m_persName, false);
if (status.isFailure()) {
throw GaudiException("Unable to connect metadata services", name(), StatusCode::FAILURE);
}
m_checkNumberOfWrites = false;
m_outputAttributes = "[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData][AttributeListKey=]";
m_p2BWritten->clear();
IProperty *pAsIProp(nullptr);
if ((m_p2BWritten.retrieve()).isFailure() ||
nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_p2BWritten)) ||
(pAsIProp->setProperty("ItemList", m_metadataItemList.toString())).isFailure()) {
throw GaudiException("Folder property [metadataItemList] not found", name(), StatusCode::FAILURE);
}
if (write().isFailure()) { // true mean write AND commit
throw GaudiException("Cannot write metadata", name(), StatusCode::FAILURE);
}
m_outputAttributes.clear();
m_currentStore = &m_dataStore;
status = streamer->connectServices(m_dataStore.type(), m_persName, m_extendProvenanceRecord);
if (status.isFailure()) {
throw GaudiException("Unable to re-connect services", name(), StatusCode::FAILURE);
}
m_p2BWritten->clear();
if ((pAsIProp->setProperty(m_itemList)).isFailure()) {
throw GaudiException("Folder property [itemList] not found", name(), StatusCode::FAILURE);
}
ATH_MSG_INFO("Records written: " << m_events);
}
}
// terminate data writer
StatusCode AthenaOutputStream::finalize() {
bool failed = false;
......
......@@ -197,6 +197,9 @@ private:
/// Try to match a DataProxy to a vector of strings
bool matchKey(const std::vector<std::string>& key, const SG::DataProxy* proxy) const;
/// Write MetaData for this stream (by default) or for a substream outputFN (in ES mode)
void writeMetaData( const std::string outputFN="" );
};
#endif // ATHENASERVICES_OUTPUTSTREAM_H
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment