diff --git a/Control/AthenaServices/src/AthenaOutputStream.h b/Control/AthenaServices/src/AthenaOutputStream.h index dce922e3b3a7bc34f84f65f6b582b5c6cd7027ed..eae9a1daf9ecae742e6255ea7780a536dfdb610e 100644 --- a/Control/AthenaServices/src/AthenaOutputStream.h +++ b/Control/AthenaServices/src/AthenaOutputStream.h @@ -1,7 +1,7 @@ // Dear emacs, this is -*- C++ -*- /* - Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration */ #ifndef ATHENASERVICES_ATHENAOUTPUTSTREAM_H @@ -92,6 +92,9 @@ protected: StringArrayProperty m_metadataItemList{this,"MetadataItemList",{},"List of metadata items to write","OutputStreamItemList"}; /// Vector of item names StringArrayProperty m_excludeList{this,"ExcludeList",{},"List of metadata items to write","OrderedSet<std::string>"}; + + StringProperty m_keepProvenances { this, "KeepProvenanceTagsRegEx", {".*"}, + "RegEx pattern to select processing tags for which DataHeader should retain provenances"}; /// Vector of item names StringArrayProperty m_compressionListHigh; /// Vector of item names diff --git a/Control/AthenaServices/src/AthenaOutputStreamTool.cxx b/Control/AthenaServices/src/AthenaOutputStreamTool.cxx index 7c5695525bea6959b2287dd757cbdb31f2851b77..3e9b5a0dcd3b4b38fcf85b56576a09bccabfd485 100644 --- a/Control/AthenaServices/src/AthenaOutputStreamTool.cxx +++ b/Control/AthenaServices/src/AthenaOutputStreamTool.cxx @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration */ /** @file AthenaOutputStreamTool.cxx @@ -26,6 +26,7 @@ #include "PersistentDataModel/DataHeader.h" #include "PersistentDataModel/TokenAddress.h" + namespace { /// Check to see if a DataHeader has been marked as input @@ -157,6 +158,12 @@ StatusCode AthenaOutputStreamTool::connectServices(const std::string& dataStore, } } m_extendProvenanceRecord = extendProvenenceRecord; + auto pprop = dynamic_cast<const IProperty*>(parent()); + auto keep = dynamic_cast<const StringProperty&>( pprop->getProperty("KeepProvenanceTagsRegEx") ); + m_keepProvenancesStr = keep.value(); + // create RegEx pattern from the property value, specify extended grammar + m_keepProvenancesRE = std::regex(m_keepProvenancesStr, std::regex::extended); + return(connectServices()); } //__________________________________________________________________________ @@ -224,45 +231,8 @@ StatusCode AthenaOutputStreamTool::connectOutput(const std::string& outputName) if (m_store->retrieve(dh, dhKey).isFailure()) { ATH_MSG_DEBUG("Unable to retrieve the DataHeader with key " << dhKey); } - SG::DataProxy* dhProxy = m_store->proxy(dh); - if (dh->isInput() || hasInputAlias (*dhProxy) || primaryDH) { - // Add DataHeader token to new DataHeader - if (m_extendProvenanceRecord) { - std::string pTag; - SG::TransientAddress* dhTransAddr = 0; - for (const DataHeaderElement& dhe : *dh) { - if (dhe.getPrimaryClassID() == ClassID_traits<DataHeader>::ID()) { - pTag = dhe.getKey(); - delete dhTransAddr; dhTransAddr = dhe.getAddress(0); - } - } - // Update dhTransAddr to handle fast merged files. - if (dhProxy != 0 && dhProxy->address() != 0) { - delete dhTransAddr; dhTransAddr = 0; - m_dataHeader->insertProvenance(DataHeaderElement(dhProxy, - dhProxy->address(), - pTag)); - } - else if (dhTransAddr != nullptr) { - m_dataHeader->insertProvenance(DataHeaderElement(dhTransAddr, - dhTransAddr->address(), - pTag)); - delete dhTransAddr; dhTransAddr = 0; - } - } - // Each stream tag is written only once in the provenance record - // In files where there are multiple entries per stream tag - // the record is in reverse, i.e., the latest appears first. - // Therefore, only keep the first entry if there are multiple - // matches so that we retain the latest one. - std::set<std::string> insertedTags{}; - for(auto iter=dh->beginProvenance(), iEnd=dh->endProvenance(); iter != iEnd; ++iter) { - const auto & currentKey = (*iter).getKey(); - if(!insertedTags.contains(currentKey)) { - insertedTags.insert(currentKey); - m_dataHeader->insertProvenance(*iter); - } - } + if (dh->isInput() || hasInputAlias(*m_store->proxy(dh)) || primaryDH) { + propagateProvenance( *dh ); } } @@ -331,6 +301,62 @@ StatusCode AthenaOutputStreamTool::connectOutput(const std::string& outputName) m_connectionOpen = true; return(StatusCode::SUCCESS); } + +//__________________________________________________________________________ +void AthenaOutputStreamTool::propagateProvenance( const DataHeader& src_dh ) +{ + // keep track of provenance entries inserted into the new DataHeader + std::set<std::string> insertedTags{}; + // Add DataHeader token to the new DataHeader + if (m_extendProvenanceRecord) { + std::string pTag; + std::unique_ptr<SG::TransientAddress> dhTransAddr; + for (const DataHeaderElement& dhe : src_dh) { + if (dhe.getPrimaryClassID() == ClassID_traits<DataHeader>::ID()) { + pTag = dhe.getKey(); + dhTransAddr.reset( dhe.getAddress(0) ); + } + } + // Update dhTransAddr to handle fast merged files. + if( auto dhProxy=m_store->proxy(&src_dh); dhProxy && dhProxy->address() ) { + DataHeaderElement dhe(dhProxy, dhProxy->address(), pTag); + m_dataHeader->insertProvenance(dhe); + insertedTags.insert(pTag); + } + else if( dhTransAddr ) { + DataHeaderElement dhe(dhTransAddr.get(), dhTransAddr->address(), pTag); + m_dataHeader->insertProvenance(dhe); + insertedTags.insert(pTag); + } + } + + // empty regexpr means do not keep any provenance + if( !m_keepProvenancesStr.empty() ) { + // Each stream tag is written only once in the provenance record + // In files where there are multiple entries per stream tag + // the record is in reverse, i.e., the latest appears first. + // Therefore, only keep the first entry if there are multiple + // matches so that we retain the latest one. + for(auto iter=src_dh.beginProvenance(), iEnd=src_dh.endProvenance(); iter != iEnd; ++iter) { + const auto & currentKey = (*iter).getKey(); + if( insertedTags.insert(currentKey).second ) { + // first prov with that tag. Now check if we want to keep that tag + bool keep = false; + auto it = m_keepProvenanceMatch.find( currentKey ); + if( it != m_keepProvenanceMatch.end() ) { + keep = it->second; + } else { + keep = std::regex_search(currentKey, m_keepProvenancesRE); + m_keepProvenanceMatch[currentKey] = keep; + } + if( keep ) { + m_dataHeader->insertProvenance(*iter); + } + } + } + } +} + //__________________________________________________________________________ StatusCode AthenaOutputStreamTool::commitOutput(bool doCommit) { ATH_MSG_DEBUG("In commitOutput"); diff --git a/Control/AthenaServices/src/AthenaOutputStreamTool.h b/Control/AthenaServices/src/AthenaOutputStreamTool.h index 239915466bc99b55b9e069a6889d19c66b2db8a7..824b9f44017e15c787ba6c33fc3c4d3c7eab6fd9 100644 --- a/Control/AthenaServices/src/AthenaOutputStreamTool.h +++ b/Control/AthenaServices/src/AthenaOutputStreamTool.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration + Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration */ #ifndef ATHENAOUTPUTSTREAMTOOL_H @@ -15,6 +15,8 @@ #include "AthenaBaseComps/AthAlgTool.h" #include <string> +#include <map> +#include <regex> class IClassIDSvc; class IDecisionSvc; @@ -81,6 +83,8 @@ public: private: /// Do the real connection to services virtual StatusCode connectServices(); + /// copy provenance records when creating new DataHeaders + void propagateProvenance( const DataHeader& src_dh ); private: StringProperty m_outputName{ this, "OutputFile", "", "name of the output db name"}; @@ -106,12 +110,20 @@ private: /// Ref to DecisionSvc ServiceHandle<IDecisionSvc> m_decSvc; /// Current DataHeader for streamed objects - DataHeader* m_dataHeader; + DataHeader* m_dataHeader; /// Flag to tell whether connectOutput has been called - bool m_connectionOpen; + bool m_connectionOpen; + /// Flag as to whether to extend provenance via the DataHeader - bool m_extendProvenanceRecord; - /// Flag to extend attribute list with stream flags from DecisionSvc + bool m_extendProvenanceRecord; + /// RegEx string to match provenance tags to keep in the output DataHeader. Retrieved from an OutputStream property + std::string m_keepProvenancesStr; + /// RegEx pattern created from m_keepProvenancesStr + std::regex m_keepProvenancesRE; + /// Cache provenance RegEx matching result in a map + std::map<std::string, bool> m_keepProvenanceMatch; + + /// Flag to extend attribute list with stream flags from DecisionSvc bool m_extend; /// set of skipped item keys, because of missing CLID diff --git a/Database/AthenaPOOL/OutputStreamAthenaPool/python/OutputStreamConfig.py b/Database/AthenaPOOL/OutputStreamAthenaPool/python/OutputStreamConfig.py index d61aa9081ea037a3e7940e6a6a0c21772dfe74ec..8faf8f32d44097d9ae2f5941cad5935ba74e964c 100644 --- a/Database/AthenaPOOL/OutputStreamAthenaPool/python/OutputStreamConfig.py +++ b/Database/AthenaPOOL/OutputStreamAthenaPool/python/OutputStreamConfig.py @@ -1,4 +1,4 @@ -# Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration +# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator, ConfigurationError from AthenaConfiguration.ComponentFactory import CompFactory @@ -9,9 +9,13 @@ from AthenaCommon.Logging import logging def outputStreamName(streamName): return f"Stream{streamName}" + +# keepProvenanceTagsRegEx - RexEx string to match processing tags in the Event provenance. Only matching tags will be copied +# to the new DataHeader. Empty string rejects all tags. Direct provenance is not affected +# (see extendProvenanceRecord) def OutputStreamCfg(flags, streamName, ItemList=[], MetadataItemList=[], disableEventTag=False, trigNavThinningSvc=None, takeItemsFromInput=False, - extendProvenanceRecord=True, AcceptAlgs=[], HelperTools=[]): + extendProvenanceRecord=True, keepProvenanceTagsRegEx=None, AcceptAlgs=[], HelperTools=[]): eventInfoKey = "EventInfo" if flags.Common.ProductionStep in [ProductionStep.PileUpPresampling, ProductionStep.PileUpPretracking, ProductionStep.MinbiasPreprocessing]: eventInfoKey = f"{flags.Overlay.BkgPrefix}EventInfo" @@ -80,6 +84,9 @@ def OutputStreamCfg(flags, streamName, ItemList=[], MetadataItemList=[], # Treat this similar to takeItemsFromInput # (C++ default in this case is True) outputStream.ExtendProvenanceRecord = False + if keepProvenanceTagsRegEx is not None: + # C++ defaults to '.*' which means all. Overwrite only on request. + outputStream.KeepProvenanceTagsRegEx = keepProvenanceTagsRegEx outputStream.AcceptAlgs += AcceptAlgs outputStream.ExtraOutputs.add(("DataHeader", f"StoreGateSvc+{outputStreamName(streamName)}")) if flags.Scheduler.CheckOutputUsage and flags.Concurrency.NumThreads > 0: