Verified Commit 7e274d5a authored by Tadej Novak's avatar Tadej Novak
Browse files

Add support for independent skipping of secondary events in DoubleEventSelector

parent 800582b0
Pipeline #2972425 passed with stage
in 0 seconds
......@@ -98,7 +98,8 @@ else:
theApp.EvtMax = jps.AthenaCommonFlags.EvtMax()
if jps.AthenaCommonFlags.SkipEvents.statusOn:
if hasattr(svcMgr,"EventSelector"):
svcMgr.EventSelector.SkipEvents = jps.AthenaCommonFlags.SkipEvents()
if not hasattr(svcMgr, "SecondaryEventSelector"):
svcMgr.EventSelector.SkipEvents = jps.AthenaCommonFlags.SkipEvents()
else:
_msg.warning('No EventSelector in svcMgr, not skipping events')
......
/*
Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
*/
/**
......@@ -31,8 +31,8 @@ public:
/// Handle file transition at the next iteration
virtual StatusCode nextHandleFileTransition(IEvtSelector::Context& ctxt) const = 0;
/// Sync event count
virtual void syncEventCount(int count) const = 0;
/// Go to next event and skip if necessary
virtual StatusCode nextWithSkip(IEvtSelector::Context& ctxt) const = 0;
/// Record AttributeList in StoreGate
virtual StatusCode recordAttributeList() const = 0;
/// Fill AttributeList with specific items from the selector and a suffix
......
# Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
from AthenaConfiguration.ComponentFactory import CompFactory
......@@ -76,13 +76,19 @@ def PoolReadCfg(configFlags):
result.addService(StoreGateSvc("MetaDataStore"))
if filenamesSecondary:
skipEventsPrimary=configFlags.Exec.SkipEvents
skipEventsSecondary=configFlags.Exec.SkipEvents
if configFlags.Overlay.SkipSecondaryEvents >= 0:
skipEventsSecondary = configFlags.Overlay.SkipSecondaryEvents
# Create DoubleEventSelector (universal for any seconday input type)
evSel = DoubleEventSelectorAthenaPool("EventSelector",
InputCollections=filenames,
SkipEvents=configFlags.Exec.SkipEvents)
InputCollections=filenames)
if configFlags.Overlay.DataOverlay:
# In case of data overlay HITS are primary input
evSel.SkipEvents = skipEventsPrimary
# We have to check if we're running data overlay - BS is needed in this case
from ByteStreamCnvSvc.ByteStreamConfig import ByteStreamReadCfg
result.merge(ByteStreamReadCfg(configFlags))
......@@ -96,7 +102,11 @@ def PoolReadCfg(configFlags):
apapsPrimary.getFullJobOptName(),
])) #No service handle yet???
else:
# In case of MC overlay RDOs are primary input
evSel.SkipEvents = skipEventsSecondary
# Do not process secondary input metadata
evSel.ProcessMetadata = False
# We have primary and secondary pool inputs, create two address providers
apapsPrimary = AthenaPoolAddressProviderSvc("AthenaPoolAddressProviderSvcPrimary")
apapsPrimary.DataHeaderKey = "EventSelector"
......@@ -113,7 +123,8 @@ def PoolReadCfg(configFlags):
secondarySel = EventSelectorAthenaPool("SecondaryEventSelector",
IsSecondary=True,
InputCollections=filenamesSecondary)
InputCollections=filenamesSecondary,
SkipEvents=skipEventsPrimary)
result.addService(secondarySel)
result.addService(evSel)
else:
......
......@@ -60,72 +60,40 @@ StatusCode DoubleEventSelectorAthenaPool::next(IEvtSelector::Context& ctxt) cons
}
for (;;) {
// Check if we're at the end of primary file
if (!m_primaryFileTransition) {
StatusCode sc = nextHandleFileTransition(ctxt);
if (sc.isRecoverable()) {
continue; // handles empty files
}
if (sc.isFailure()) {
return StatusCode::FAILURE;
}
m_primaryFileTransition = true;
}
// Move in the primary file (with skipping)
ATH_CHECK(nextWithSkip(ctxt));
// Check if we're at the end of secondary file
StatusCode sc = m_secondarySelector->nextHandleFileTransition(ctxt);
if (sc.isRecoverable()) {
continue; // handles empty files
}
if (sc.isFailure()) {
return StatusCode::FAILURE;
}
// Both files are OK, set the flag back to false
m_primaryFileTransition = false;
ATH_CHECK(m_secondarySelector->nextWithSkip(ctxt));
// Increase event count
m_secondarySelector->syncEventCount(++m_evtCount);
if (!m_counterTool.empty() && !m_counterTool->preNext().isSuccess()) {
ATH_MSG_WARNING("Failed to preNext() CounterTool.");
// Record the attribute list
if (!recordAttributeList().isSuccess()) {
ATH_MSG_ERROR("Failed to record AttributeList.");
return(StatusCode::FAILURE);
}
if( m_evtCount > m_skipEvents
&& (m_skipEventRanges.empty() || m_evtCount < m_skipEventRanges.front().first))
{
if (!recordAttributeList().isSuccess()) {
ATH_MSG_ERROR("Failed to record AttributeList.");
return(StatusCode::FAILURE);
}
StatusCode status = StatusCode::SUCCESS;
for (const auto& tool : m_helperTools) {
StatusCode toolStatus = tool->postNext();
if (toolStatus.isRecoverable()) {
ATH_MSG_INFO("Request skipping event from: " << tool->name());
if (status.isSuccess()) {
status = StatusCode::RECOVERABLE;
}
} else if (toolStatus.isFailure()) {
ATH_MSG_WARNING("Failed to postNext() " << tool->name());
status = StatusCode::FAILURE;
}
}
if (status.isRecoverable()) {
ATH_MSG_INFO("skipping event " << m_evtCount);
} else if (status.isFailure()) {
ATH_MSG_WARNING("Failed to postNext() HelperTool.");
} else {
if (!m_counterTool.empty() && !m_counterTool->postNext().isSuccess()) {
ATH_MSG_WARNING("Failed to postNext() CounterTool.");
}
break;
StatusCode status = StatusCode::SUCCESS;
for (const auto& tool : m_helperTools) {
StatusCode toolStatus = tool->postNext();
if (toolStatus.isRecoverable()) {
ATH_MSG_INFO("Request skipping event from: " << tool->name());
if (status.isSuccess()) {
status = StatusCode::RECOVERABLE;
}
} else if (toolStatus.isFailure()) {
ATH_MSG_WARNING("Failed to postNext() " << tool->name());
status = StatusCode::FAILURE;
}
}
if (status.isRecoverable()) {
ATH_MSG_INFO("skipping event " << m_evtCount);
} else if (status.isFailure()) {
ATH_MSG_WARNING("Failed to postNext() HelperTool.");
} else {
while( !m_skipEventRanges.empty() && m_evtCount >= m_skipEventRanges.front().second ) {
m_skipEventRanges.erase(m_skipEventRanges.begin());
if (!m_counterTool.empty() && !m_counterTool->postNext().isSuccess()) {
ATH_MSG_WARNING("Failed to postNext() CounterTool.");
}
ATH_MSG_INFO("skipping event " << m_evtCount);
break;
}
}
......
......@@ -69,9 +69,6 @@ private:
// Cache if secondary selector is ByteStream
bool m_secondaryByteStream{};
// Caches for file transitions
mutable bool m_primaryFileTransition{}; // protected by a mutex, used in one place
SG::SlotSpecificObj<SG::SourceID> m_sourceID1;
SG::SlotSpecificObj<SG::SourceID> m_sourceID2;
};
......
......@@ -693,6 +693,44 @@ StatusCode EventSelectorAthenaPool::nextHandleFileTransition(IEvtSelector::Conte
return StatusCode::SUCCESS;
}
//________________________________________________________________________________
StatusCode EventSelectorAthenaPool::nextWithSkip(IEvtSelector::Context& ctxt) const {
ATH_MSG_DEBUG("EventSelectorAthenaPool::nextWithSkip");
for (;;) {
// Check if we're at the end of file
StatusCode sc = nextHandleFileTransition(ctxt);
if (sc.isRecoverable()) {
continue; // handles empty files
}
if (sc.isFailure()) {
return StatusCode::FAILURE;
}
// Increase event count
++m_evtCount;
if (!m_counterTool.empty() && !m_counterTool->preNext().isSuccess()) {
ATH_MSG_WARNING("Failed to preNext() CounterTool.");
}
if( m_evtCount > m_skipEvents
&& (m_skipEventRanges.empty() || m_evtCount < m_skipEventRanges.front().first))
{
return StatusCode::SUCCESS;
} else {
while( !m_skipEventRanges.empty() && m_evtCount >= m_skipEventRanges.front().second ) {
m_skipEventRanges.erase(m_skipEventRanges.begin());
}
if (m_isSecondary.value()) {
ATH_MSG_INFO("skipping secondary event " << m_evtCount);
} else {
ATH_MSG_INFO("skipping event " << m_evtCount);
}
}
}
return StatusCode::SUCCESS;
}
//________________________________________________________________________________
StatusCode EventSelectorAthenaPool::previous(IEvtSelector::Context& /*ctxt*/) const {
ATH_MSG_ERROR("previous() not implemented");
return(StatusCode::FAILURE);
......@@ -1188,9 +1226,3 @@ bool EventSelectorAthenaPool::disconnectIfFinished( const SG::SourceID &fid ) co
}
return false;
}
//__________________________________________________________________________
void EventSelectorAthenaPool::syncEventCount(int count) const
{
m_evtCount = count;
}
......@@ -145,8 +145,8 @@ protected:
// ISecondaryEventSelector
/// Handle file transition at the next iteration
virtual StatusCode nextHandleFileTransition(IEvtSelector::Context& ctxt) const override;
/// Sync event count
virtual void syncEventCount(int count) const override;
/// Go to next event and skip if necessary
virtual StatusCode nextWithSkip(IEvtSelector::Context& ctxt) const override;
/// Record AttributeList in StoreGate
virtual StatusCode recordAttributeList() const override;
/// Fill AttributeList with specific items from the selector and a suffix
......
#!/usr/bin/env python
# Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
"""Set up to read and/or write bytestream files.
This module configures the Athena components required to read from
......@@ -61,6 +61,7 @@ def ByteStreamReadCfg(flags, type_names=None):
name="SecondaryEventSelector",
IsSecondary=True,
Input=flags.Input.SecondaryFiles,
SkipEvents=flags.Exec.SkipEvents if flags.Overlay.SkipSecondaryEvents >= 0 else flags.Exec.SkipEvents,
ByteStreamInputSvc=bytestream_input.name,
)
result.addService(event_selector)
......
......@@ -507,12 +507,12 @@ EventSelectorByteStream::nextImpl(IEvtSelector::Context& ctxt,
}
//________________________________________________________________________________
StatusCode EventSelectorByteStream::nextHandleFileTransition(IEvtSelector::Context& it) const
StatusCode EventSelectorByteStream::nextHandleFileTransition(IEvtSelector::Context& ctxt) const
{
lock_t lock (m_mutex);
return nextHandleFileTransitionImpl (it, lock);
return nextHandleFileTransitionImpl (ctxt, lock);
}
StatusCode EventSelectorByteStream::nextHandleFileTransitionImpl(IEvtSelector::Context& it,
StatusCode EventSelectorByteStream::nextHandleFileTransitionImpl(IEvtSelector::Context& ctxt,
lock_t& lock) const
{
const RawEvent* pre{};
......@@ -544,7 +544,7 @@ StatusCode EventSelectorByteStream::nextHandleFileTransitionImpl(IEvtSelector::C
}
// Check whether a RawEvent has actually been provided
if (pre == nullptr) {
it = *m_endIter;
ctxt = *m_endIter;
return StatusCode::FAILURE;
}
......@@ -562,7 +562,49 @@ StatusCode EventSelectorByteStream::nextHandleFileTransitionImpl(IEvtSelector::C
return StatusCode::SUCCESS;
}
//________________________________________________________________________________
StatusCode EventSelectorByteStream::nextWithSkip(IEvtSelector::Context& ctxt) const
{
lock_t lock (m_mutex);
return nextWithSkipImpl (ctxt, lock);
}
StatusCode EventSelectorByteStream::nextWithSkipImpl(IEvtSelector::Context& ctxt,
lock_t& lock) const {
ATH_MSG_DEBUG("EventSelectorByteStream::nextWithSkip");
for (;;) {
// Check if we're at the end of file
StatusCode sc = nextHandleFileTransitionImpl(ctxt, lock);
if (sc.isRecoverable()) {
continue; // handles empty files
}
if (sc.isFailure()) {
return StatusCode::FAILURE;
}
// Increase event count
++m_NumEvents;
if (!m_counterTool.empty() && !m_counterTool->preNext().isSuccess()) {
ATH_MSG_WARNING("Failed to preNext() CounterTool.");
}
if ( m_NumEvents > m_skipEvents.value() &&
(m_skipEventSequence.empty() || m_NumEvents != m_skipEventSequence.front()) ) {
return StatusCode::SUCCESS;
} else {
if (!m_skipEventSequence.empty() && m_NumEvents == m_skipEventSequence.front()) {
m_skipEventSequence.erase(m_skipEventSequence.begin());
}
if (m_isSecondary.value()) {
ATH_MSG_INFO("skipping secondary event " << m_NumEvents);
} else {
ATH_MSG_INFO("skipping event " << m_NumEvents);
}
}
}
return StatusCode::SUCCESS;
}
//________________________________________________________________________________
StatusCode EventSelectorByteStream::previous(IEvtSelector::Context& ctxt) const
{
......@@ -1075,13 +1117,6 @@ StatusCode EventSelectorByteStream::io_reinit() {
return(this->reinit(lock));
}
//__________________________________________________________________________
void EventSelectorByteStream::syncEventCount(int count) const
{
lock_t lock (m_mutex);
m_NumEvents = count;
}
//__________________________________________________________________________
bool EventSelectorByteStream::disconnectIfFinished(const SG::SourceID &/* fid */) const
{
......
/*
Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
*/
#ifndef EVENTSELECTORBYTESTREAM_H
......@@ -132,9 +132,9 @@ protected:
//-------------------------------------------------
// ISecondaryEventSelector
/// Handle file transition at the next iteration
virtual StatusCode nextHandleFileTransition(IEvtSelector::Context& it) const override;
/// Sync event count
virtual void syncEventCount(int count) const override;
virtual StatusCode nextHandleFileTransition(IEvtSelector::Context& ctxt) const override;
/// Go to next event and skip if necessary
virtual StatusCode nextWithSkip(IEvtSelector::Context& ctxt) const override;
/// Record AttributeList in StoreGate
virtual StatusCode recordAttributeList() const override;
/// Fill AttributeList with specific items from the selector and a suffix
......@@ -147,8 +147,10 @@ private: // internal member functions
StatusCode nextImpl(Context& it, int jump, lock_t& lock) const;
StatusCode previousImpl(Context& it, lock_t& lock) const;
StatusCode previousImpl(Context& it, int jump, lock_t& lock) const;
StatusCode nextHandleFileTransitionImpl(IEvtSelector::Context& it,
StatusCode nextHandleFileTransitionImpl(IEvtSelector::Context& ctxt,
lock_t& lock) const;
StatusCode nextWithSkipImpl(IEvtSelector::Context& ctxt,
lock_t& lock) const;
StatusCode recordAttributeListImpl(lock_t& lock) const;
StatusCode fillAttributeListImpl(coral::AttributeList *attrList, const std::string &suffix, bool copySource,
lock_t& lock) const;
......
# Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
include.block("EventOverlayJobTransforms/ConfiguredOverlayMT_jobOptions.py")
......@@ -48,16 +48,26 @@ if not overlayFlags.isDataOverlay():
import AthenaPoolCnvSvc.ReadAthenaPoolDouble
from AthenaCommon.AppMgr import ServiceMgr
from AthenaCommon.AthenaCommonFlags import athenaCommonFlags
skipPrimary=0
skipSecondary=0
if athenaCommonFlags.SkipEvents.statusOn:
skipPrimary = athenaCommonFlags.SkipEvents()
skipSecondary = athenaCommonFlags.SkipEvents()
if overlayFlags.SkipSecondaryEvents.statusOn and overlayFlags.SkipSecondaryEvents() >= 0:
skipSecondary = overlayFlags.SkipSecondaryEvents()
if overlayFlags.isDataOverlay():
ServiceMgr.EventSelector.InputCollections = athenaCommonFlags.PoolHitsInput()
ServiceMgr.EventSelector.SkipEvents = skipPrimary
ServiceMgr.SecondaryEventSelector.Input = athenaCommonFlags.FilesInput()
ServiceMgr.SecondaryEventSelector.ProcessBadEvent = True
ServiceMgr.SecondaryEventSelector.SkipEvents = skipSecondary
else:
ServiceMgr.EventSelector.ProcessMetadata = False
ServiceMgr.EventSelector.InputCollections = athenaCommonFlags.PoolRDOInput()
ServiceMgr.EventSelector.SkipEvents = skipSecondary
ServiceMgr.SecondaryEventSelector.InputCollections = athenaCommonFlags.PoolHitsInput()
if athenaCommonFlags.SkipEvents.statusOn:
ServiceMgr.EventSelector.SkipEvents = athenaCommonFlags.SkipEvents()
ServiceMgr.SecondaryEventSelector.SkipEvents = skipPrimary
# Properly generate event context
if nThreads > 0:
......
......@@ -27,6 +27,14 @@ class isDataOverlay(JobProperty):
allowedTypes=['bool']
StoredValue = True
class SkipSecondaryEvents(JobProperty):
"""Number of secondary input events to skip when reading an input POOL file. This should
be given to the EventSelector service.
"""
statusOn = False
allowedTypes = ['int']
StoredValue = -1
class EventIDTextFile(JobProperty):
"""Name of the Event ID Text file"""
statusOn=True
......
......@@ -11,6 +11,8 @@ def createOverlayConfigFlags():
flags = AthConfigFlags()
# Data overlay flag
flags.addFlag("Overlay.DataOverlay", False)
# Overlay skip secondary events
flags.addFlag("Overlay.SkipSecondaryEvents", -1)
# Overlay background StoreGate key prefix
flags.addFlag("Overlay.BkgPrefix", "Bkg_")
# Overlay signal StoreGate key prefix
......
......@@ -49,6 +49,9 @@ def fromRunArgs(runArgs):
if not hasRDO_BKGInput and not hasBS_SKIMInput:
raise RuntimeError('Define one of RDO_BKG and BS_SKIM file types')
if hasattr(runArgs, 'skipSecondaryEvents'):
ConfigFlags.Overlay.SkipSecondaryEvents = runArgs.skipSecondaryEvents
from AthenaConfiguration.Enums import ProductionStep
ConfigFlags.Common.ProductionStep = ProductionStep.Overlay
......
......@@ -3,7 +3,7 @@
Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
"""
from PyJobTransforms.trfArgClasses import argBSFile, argFactory, argList, argRDOFile, argSubstep
from PyJobTransforms.trfArgClasses import argBSFile, argFactory, argList, argRDOFile, argSubstep, argSubstepInt
from PyJobTransforms.trfExe import athenaExecutor
......@@ -14,6 +14,10 @@ def addOverlayTrfArgs(parser):
type=argFactory(argList),
help='Detectors autoconfiguration string',
group='Overlay')
parser.add_argument('--skipSecondaryEvents', nargs='+',
type=argFactory(argSubstepInt, defaultSubstep='first'),
help='Number of secondary input events to skip over in the first processing step (skipping substep can be overridden)',
group='Overlay')
parser.add_argument('--outputRDO_SGNLFile', nargs='+',
type=argFactory(argRDOFile, io='output'),
help='The output RDO file of the MC signal alone',
......
......@@ -65,6 +65,8 @@ else:
# Common athena flags
if hasattr(overlayArgs, 'skipEvents'):
athenaCommonFlags.SkipEvents.set_Value_and_Lock(overlayArgs.skipEvents)
if hasattr(overlayArgs, 'skipSecondaryEvents'):
overlayFlags.SkipSecondaryEvents.set_Value_and_Lock(overlayArgs.skipSecondaryEvents)
if hasattr(overlayArgs, 'maxEvents'):
athenaCommonFlags.EvtMax.set_Value_and_Lock(overlayArgs.maxEvents)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment