Skip to content
Snippets Groups Projects
Commit f9dcaab2 authored by Vakhtang Tsulaia's avatar Vakhtang Tsulaia
Browse files

Merge branch 'filter.provenance' into 'main'

Added property to AthenaOutputStream to list provenance tags to keep ATEAM-1050

See merge request atlas/athena!78609
parents 1efb3727 bd7193e9
No related branches found
No related tags found
1 merge request!78609Added property to AthenaOutputStream to list provenance tags to keep ATEAM-1050
// Dear emacs, this is -*- C++ -*-
/*
Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
*/
#ifndef ATHENASERVICES_ATHENAOUTPUTSTREAM_H
......@@ -92,6 +92,9 @@ protected:
StringArrayProperty m_metadataItemList{this,"MetadataItemList",{},"List of metadata items to write","OutputStreamItemList"};
/// Vector of item names
StringArrayProperty m_excludeList{this,"ExcludeList",{},"List of metadata items to write","OrderedSet<std::string>"};
StringProperty m_keepProvenances { this, "KeepProvenanceTagsRegEx", {".*"},
"RegEx pattern to select processing tags for which DataHeader should retain provenances"};
/// Vector of item names
StringArrayProperty m_compressionListHigh;
/// Vector of item names
......
/*
Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
*/
/** @file AthenaOutputStreamTool.cxx
......@@ -26,6 +26,7 @@
#include "PersistentDataModel/DataHeader.h"
#include "PersistentDataModel/TokenAddress.h"
namespace {
/// Check to see if a DataHeader has been marked as input
......@@ -157,6 +158,12 @@ StatusCode AthenaOutputStreamTool::connectServices(const std::string& dataStore,
}
}
m_extendProvenanceRecord = extendProvenenceRecord;
auto pprop = dynamic_cast<const IProperty*>(parent());
auto keep = dynamic_cast<const StringProperty&>( pprop->getProperty("KeepProvenanceTagsRegEx") );
m_keepProvenancesStr = keep.value();
// create RegEx pattern from the property value, specify extended grammar
m_keepProvenancesRE = std::regex(m_keepProvenancesStr, std::regex::extended);
return(connectServices());
}
//__________________________________________________________________________
......@@ -224,45 +231,8 @@ StatusCode AthenaOutputStreamTool::connectOutput(const std::string& outputName)
if (m_store->retrieve(dh, dhKey).isFailure()) {
ATH_MSG_DEBUG("Unable to retrieve the DataHeader with key " << dhKey);
}
SG::DataProxy* dhProxy = m_store->proxy(dh);
if (dh->isInput() || hasInputAlias (*dhProxy) || primaryDH) {
// Add DataHeader token to new DataHeader
if (m_extendProvenanceRecord) {
std::string pTag;
SG::TransientAddress* dhTransAddr = 0;
for (const DataHeaderElement& dhe : *dh) {
if (dhe.getPrimaryClassID() == ClassID_traits<DataHeader>::ID()) {
pTag = dhe.getKey();
delete dhTransAddr; dhTransAddr = dhe.getAddress(0);
}
}
// Update dhTransAddr to handle fast merged files.
if (dhProxy != 0 && dhProxy->address() != 0) {
delete dhTransAddr; dhTransAddr = 0;
m_dataHeader->insertProvenance(DataHeaderElement(dhProxy,
dhProxy->address(),
pTag));
}
else if (dhTransAddr != nullptr) {
m_dataHeader->insertProvenance(DataHeaderElement(dhTransAddr,
dhTransAddr->address(),
pTag));
delete dhTransAddr; dhTransAddr = 0;
}
}
// Each stream tag is written only once in the provenance record
// In files where there are multiple entries per stream tag
// the record is in reverse, i.e., the latest appears first.
// Therefore, only keep the first entry if there are multiple
// matches so that we retain the latest one.
std::set<std::string> insertedTags{};
for(auto iter=dh->beginProvenance(), iEnd=dh->endProvenance(); iter != iEnd; ++iter) {
const auto & currentKey = (*iter).getKey();
if(!insertedTags.contains(currentKey)) {
insertedTags.insert(currentKey);
m_dataHeader->insertProvenance(*iter);
}
}
if (dh->isInput() || hasInputAlias(*m_store->proxy(dh)) || primaryDH) {
propagateProvenance( *dh );
}
}
......@@ -331,6 +301,62 @@ StatusCode AthenaOutputStreamTool::connectOutput(const std::string& outputName)
m_connectionOpen = true;
return(StatusCode::SUCCESS);
}
//__________________________________________________________________________
void AthenaOutputStreamTool::propagateProvenance( const DataHeader& src_dh )
{
// keep track of provenance entries inserted into the new DataHeader
std::set<std::string> insertedTags{};
// Add DataHeader token to the new DataHeader
if (m_extendProvenanceRecord) {
std::string pTag;
std::unique_ptr<SG::TransientAddress> dhTransAddr;
for (const DataHeaderElement& dhe : src_dh) {
if (dhe.getPrimaryClassID() == ClassID_traits<DataHeader>::ID()) {
pTag = dhe.getKey();
dhTransAddr.reset( dhe.getAddress(0) );
}
}
// Update dhTransAddr to handle fast merged files.
if( auto dhProxy=m_store->proxy(&src_dh); dhProxy && dhProxy->address() ) {
DataHeaderElement dhe(dhProxy, dhProxy->address(), pTag);
m_dataHeader->insertProvenance(dhe);
insertedTags.insert(pTag);
}
else if( dhTransAddr ) {
DataHeaderElement dhe(dhTransAddr.get(), dhTransAddr->address(), pTag);
m_dataHeader->insertProvenance(dhe);
insertedTags.insert(pTag);
}
}
// empty regexpr means do not keep any provenance
if( !m_keepProvenancesStr.empty() ) {
// Each stream tag is written only once in the provenance record
// In files where there are multiple entries per stream tag
// the record is in reverse, i.e., the latest appears first.
// Therefore, only keep the first entry if there are multiple
// matches so that we retain the latest one.
for(auto iter=src_dh.beginProvenance(), iEnd=src_dh.endProvenance(); iter != iEnd; ++iter) {
const auto & currentKey = (*iter).getKey();
if( insertedTags.insert(currentKey).second ) {
// first prov with that tag. Now check if we want to keep that tag
bool keep = false;
auto it = m_keepProvenanceMatch.find( currentKey );
if( it != m_keepProvenanceMatch.end() ) {
keep = it->second;
} else {
keep = std::regex_search(currentKey, m_keepProvenancesRE);
m_keepProvenanceMatch[currentKey] = keep;
}
if( keep ) {
m_dataHeader->insertProvenance(*iter);
}
}
}
}
}
//__________________________________________________________________________
StatusCode AthenaOutputStreamTool::commitOutput(bool doCommit) {
ATH_MSG_DEBUG("In commitOutput");
......
/*
Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
*/
#ifndef ATHENAOUTPUTSTREAMTOOL_H
......@@ -15,6 +15,8 @@
#include "AthenaBaseComps/AthAlgTool.h"
#include <string>
#include <map>
#include <regex>
class IClassIDSvc;
class IDecisionSvc;
......@@ -81,6 +83,8 @@ public:
private:
/// Do the real connection to services
virtual StatusCode connectServices();
/// copy provenance records when creating new DataHeaders
void propagateProvenance( const DataHeader& src_dh );
private:
StringProperty m_outputName{ this, "OutputFile", "", "name of the output db name"};
......@@ -106,12 +110,20 @@ private:
/// Ref to DecisionSvc
ServiceHandle<IDecisionSvc> m_decSvc;
/// Current DataHeader for streamed objects
DataHeader* m_dataHeader;
DataHeader* m_dataHeader;
/// Flag to tell whether connectOutput has been called
bool m_connectionOpen;
bool m_connectionOpen;
/// Flag as to whether to extend provenance via the DataHeader
bool m_extendProvenanceRecord;
/// Flag to extend attribute list with stream flags from DecisionSvc
bool m_extendProvenanceRecord;
/// RegEx string to match provenance tags to keep in the output DataHeader. Retrieved from an OutputStream property
std::string m_keepProvenancesStr;
/// RegEx pattern created from m_keepProvenancesStr
std::regex m_keepProvenancesRE;
/// Cache provenance RegEx matching result in a map
std::map<std::string, bool> m_keepProvenanceMatch;
/// Flag to extend attribute list with stream flags from DecisionSvc
bool m_extend;
/// set of skipped item keys, because of missing CLID
......
# Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator, ConfigurationError
from AthenaConfiguration.ComponentFactory import CompFactory
......@@ -9,9 +9,13 @@ from AthenaCommon.Logging import logging
def outputStreamName(streamName):
return f"Stream{streamName}"
# keepProvenanceTagsRegEx - RexEx string to match processing tags in the Event provenance. Only matching tags will be copied
# to the new DataHeader. Empty string rejects all tags. Direct provenance is not affected
# (see extendProvenanceRecord)
def OutputStreamCfg(flags, streamName, ItemList=[], MetadataItemList=[],
disableEventTag=False, trigNavThinningSvc=None, takeItemsFromInput=False,
extendProvenanceRecord=True, AcceptAlgs=[], HelperTools=[]):
extendProvenanceRecord=True, keepProvenanceTagsRegEx=None, AcceptAlgs=[], HelperTools=[]):
eventInfoKey = "EventInfo"
if flags.Common.ProductionStep in [ProductionStep.PileUpPresampling, ProductionStep.PileUpPretracking, ProductionStep.MinbiasPreprocessing]:
eventInfoKey = f"{flags.Overlay.BkgPrefix}EventInfo"
......@@ -80,6 +84,9 @@ def OutputStreamCfg(flags, streamName, ItemList=[], MetadataItemList=[],
# Treat this similar to takeItemsFromInput
# (C++ default in this case is True)
outputStream.ExtendProvenanceRecord = False
if keepProvenanceTagsRegEx is not None:
# C++ defaults to '.*' which means all. Overwrite only on request.
outputStream.KeepProvenanceTagsRegEx = keepProvenanceTagsRegEx
outputStream.AcceptAlgs += AcceptAlgs
outputStream.ExtraOutputs.add(("DataHeader", f"StoreGateSvc+{outputStreamName(streamName)}"))
if flags.Scheduler.CheckOutputUsage and flags.Concurrency.NumThreads > 0:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment