Skip to content
Snippets Groups Projects
Commit 5a80ab89 authored by Frank Winklmeier's avatar Frank Winklmeier
Browse files

Merge branch 'thinning2.AthenaServices-20200630' into 'master'

AthenaServices: Remove ThinningSvc.

See merge request atlas/athena!34307
parents 60aa3245 4ddc14c0
No related branches found
No related tags found
No related merge requests found
......@@ -7,205 +7,9 @@
from __future__ import print_function
## import the automatically generated Configurables
from AthenaServices.AthenaServicesConf import ThinningSvc as _ThinningSvc
from AthenaServices.AthenaServicesConf import CoreDumpSvc as _CoreDumpSvc
from AthenaServices.AthenaServicesConf import EvtIdModifierSvc as _EvtIdModifierSvc
## import configurables module facade
from AthenaCommon import CfgMgr
import six
class ThinningSvc( _ThinningSvc ):
__slots__ = ( ) # enforce no new properties
def __init__(self, name = "ThinningSvc", **kwargs ):
# have to call base init
kwargs['name'] = name
super( ThinningSvc, self ).__init__( **kwargs )
# initialize the 'Streams' property with the default value
# to prevent people messing up with the 'Streams' array
if 'Streams' not in kwargs:
self.Streams = self.getDefaultProperty('Streams')[:]
return
def setDefaults( cls, handle ):
## continue, only if it is our Configurable
if not isinstance(handle, ThinningSvc):
return
from AthenaCommon.AlgSequence import AlgSequence,AthSequencer
from AthenaCommon.AppMgr import ServiceMgr as svcMgr
from AthenaCommon.Logging import logging
msg = logging.getLogger( handle.name() )
from CLIDComps.clidGenerator import clidGenerator
cliddb = clidGenerator(db=None)
def _clid_from_string(s):
"""return the clid (integer) from a string (which can either
contain the integer or the clid-name associated with it.)
"""
try:
clid = int(s)
except ValueError:
clid = cliddb.getClidFromName(s)
if clid is None or clid == 'None':
msg.warning('could not infer clid-name for: "%s"', s)
clid = None
return clid
from PyUtils.Decorators import memoize
@memoize
def _retrieve_items_from_input():
items = []
from AthenaCommon.AppMgr import ServiceMgr as svcMgr
import PyUtils.PoolFile as _pf
# only inspect the first input file
# that should be enough as we don't really support varying schema
# shapes anymore (as ROOT doesn't)
items = _pf.extract_items(svcMgr.EventSelector.InputCollections[0])
input_items = []
for item in items:
clid = _clid_from_string(item[0])
if clid is None:
msg.warning('could not infer clid for: "%s"', item[0])
clid = item[0] # put back the original string, then.
input_items.append([clid, item[1]])
return input_items
def _doScheduleTool(outStream, streamNames):
outStreamName = outStream.name()
return outStreamName in streamNames
def _build_proxy_list(outStream):
"""helper to access the 'ForceRead' and 'TakeItemsFromInput'
properties of the output stream to build a list of pairs (clid,key)
the thinning tool will forcingly read during the pre-execute step.
"""
from collections import defaultdict
proxies = defaultdict(list)
from_input = outStream.properties()['TakeItemsFromInput']
if from_input == outStream.propertyNoValue:
from_input = outStream.getDefaultProperty('TakeItemsFromInput')
## fetch the items to be written out
itemlist = outStream.properties()['ItemList'][:]
for item in itemlist:
clid_or_name, sg_key = item.split('#')
clid_or_name = clid_or_name.strip()
if clid_or_name.endswith('!'): # Strip exact flag.
clid_or_name = clid_or_name[:-1]
sg_key = sg_key.strip()
clid_or_name = _clid_from_string(clid_or_name)
if clid_or_name is None:
msg.warning('could not infer clid for: "%s"', item)
else:
if len(sg_key) == 0:
msg.warning('invalid key for item: "%s"', item)
else:
proxies[clid_or_name].append(sg_key)
if from_input:
input_items = _retrieve_items_from_input()
for clid,sg_key in input_items:
proxies[clid].append(sg_key)
d = proxies
proxies = []
for clid,sg_keys in six.iteritems (d):
sg_keys = list(set(sg_keys))
proxies.extend([[str(clid),sg_key] for sg_key in sg_keys])
return proxies
# filter out duplicates
streams = [o for o in set(handle.Streams[:])]
## list of streams we know we will apply thinning on
outstreams = []
AthenaOutputStream = CfgMgr.AthenaOutputStream
## first loop over TopAlg (as output stream can be located
## into that sequence)
for o in AlgSequence("TopAlg"):
if (isinstance(o, AthenaOutputStream) and
hasattr(o, 'HelperTools') and
_doScheduleTool(o, streams)):
outstreams.append (o)
pass
## then loop over OutputStream sequence
for o in AthSequencer("AthOutSeq"):
if (isinstance(o, AthenaOutputStream) and
hasattr(o, 'HelperTools') and
_doScheduleTool(o, streams)):
outstreams.append (o)
pass
## then loop over OutStream sequence
if 'Streams' in AlgSequence.configurables:
for o in AlgSequence("Streams"):
if (isinstance(o, AthenaOutputStream) and
hasattr(o, 'HelperTools') and
_doScheduleTool(o, streams)):
outstreams.append (o)
pass
## set the streams we found
setattr(handle, 'Streams', [o.name() for o in outstreams])
if len(outstreams) <= 0:
handle.Streams = []
msg.warning("No output stream will be thinned !")
msg.warning("Check your jobOptions configuration")
msg.warning("...performing hara-kiri...")
delattr (svcMgr, handle.name())
del handle
return
_lvl = handle.properties()['OutputLevel']
if _lvl == handle.propertyNoValue:
_lvl = handle.getDefaultProperty('OutputLevel')
pass
return
pass # class ThinningSvc
def createThinningSvc(svcName = "ThinningSvc", outStreams = []):
"""Helper method to create a completely configured ThinningSvc.
Note that 'outStreams' elements have to be AthenaOutputStreams !
"""
from AthenaCommon.Logging import logging
msg = logging.getLogger( svcName )
if len(outStreams) == 0:
msg.warning("Empty list of (to be thinned) output streams !")
msg.warning("Check your jobOptions configuration")
return
# filter out duplicates
outStreams = [ o for o in set(outStreams) ]
allGood = True
AthenaOutputStream = CfgMgr.AthenaOutputStream
for i,o in enumerate(outStreams):
if not isinstance(o, AthenaOutputStream):
msg.error("output stream #%i (n='%s') is not an instance of type "
"AthenaOutputStream [type: %r]", i, o.name(), type(o))
msg.error("check your configuration !")
allGood = False
if not allGood:
return
del allGood
outStreams = [o.name() for o in outStreams]
svc = ThinningSvc(svcName, Streams=outStreams)
return svc
class CoreDumpSvc( _CoreDumpSvc ):
__slots__ = () # enforce no new properties
......
///////////////////////// -*- C++ -*- /////////////////////////////
/*
Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
*/
// ThinningSvc.cxx
// Implementation file for class ThinningSvc
// Author: S.Binet<binet@cern.ch>
///////////////////////////////////////////////////////////////////
// STL includes
#include <set>
// FrameWork includes
#include "GaudiKernel/Property.h"
#include "GaudiKernel/IIncidentSvc.h"
#include "GaudiKernel/Incident.h"
#include "GaudiKernel/System.h"
// StoreGate includes
#include "StoreGate/StoreGateSvc.h"
// AthenaServices includes
#include "ThinningSvc.h"
using Athena::IThinningHdlr;
using Athena::ISlimmingHdlr;
///////////////////////////////////////////////////////////////////
// Public methods:
///////////////////////////////////////////////////////////////////
// Constructors
////////////////
ThinningSvc::ThinningSvc( const std::string& name,
ISvcLocator* pSvcLocator ) :
AthService(name, pSvcLocator),
m_slimmingStore ( ),
m_thinningStore ( ),
m_thinningOccurred(false),
m_storeGate ("StoreGateSvc", name)
{
//
// Property declaration
//
//declareProperty( "Property", m_nProperty, "descr" );
declareProperty( "StoreGate",
m_storeGate = StoreGateSvc_t( "StoreGateSvc", name ),
"Handle to the StoreGateSvc instance holding containers "
"which will be thinned in the course of the job" );
declareProperty( "Streams",
m_thinnedOutStreamNames,
"The names of output stream(s) we want to apply thinning on."
" If empty, then thinning will be applied on all streams "
"found during the job configuration." );
// defaults:
{
std::vector<std::string> defaultValue(0);
m_thinnedOutStreamNames.set( defaultValue );
}
// for SvcHandle ? someday ?
//declareInterface<IThinningSvc>(this);
}
// Destructor
///////////////
ThinningSvc::~ThinningSvc()
{
cleanupStore();
}
// Athena Algorithm's Hooks
////////////////////////////
StatusCode ThinningSvc::initialize()
{
ATH_MSG_INFO ("Initializing " << name() << "...");
// Set to be listener for beginning of event
IIncidentSvc* incSvc = nullptr;
if ( !service("IncidentSvc",incSvc).isSuccess() ||
nullptr == incSvc ) {
ATH_MSG_ERROR ("Unable to get the IncidentSvc");
return StatusCode::FAILURE;
}
incSvc->addListener( this, IncidentType::BeginEvent );
// retrieve pointer to @c StoreGateSvc
if ( !m_storeGate.retrieve().isSuccess() ) {
ATH_MSG_ERROR ("Could not retrieve pointer to StoreGateSvc !!");
return StatusCode::FAILURE;
}
// display configuration
{
std::ostringstream streams;
if ( m_thinnedOutStreamNames.value().empty() ) {
streams << "<none> ";
} else {
const std::set<std::string> streamset
( m_thinnedOutStreamNames.value().begin(),
m_thinnedOutStreamNames.value().end() );
std::copy( streamset.begin(), streamset.end(),
std::ostream_iterator<std::string>( streams, " " ) );
}
ATH_MSG_INFO ("Thinning streams: [ " << streams.str() << "]");
}
return StatusCode::SUCCESS;
}
StatusCode ThinningSvc::finalize()
{
ATH_MSG_INFO ("Finalizing " << name() << "...");
cleanupStore();
SlimmingStore_t().swap (m_slimmingStore);
ThinningStore_t().swap (m_thinningStore);
HandlerSet_t().swap (m_ownedHandlers);
return StatusCode::SUCCESS;
}
// Query the interfaces.
// Input: riid, Requested interface ID
// ppvInterface, Pointer to requested interface
// Return: StatusCode indicating SUCCESS or FAILURE.
// N.B. Don't forget to release the interface after use!!!
StatusCode
ThinningSvc::queryInterface(const InterfaceID& riid, void** ppvInterface)
{
if ( IThinningSvc::interfaceID().versionMatch(riid) ) {
*ppvInterface = dynamic_cast<IThinningSvc*>(this);
} else if ( IProxyDict::interfaceID().versionMatch(riid) ) {
*ppvInterface = dynamic_cast<IProxyDict*>(this);
} else {
// Interface is not directly available : try out a base class
return AthService::queryInterface(riid, ppvInterface);
}
addRef();
return StatusCode::SUCCESS;
}
SG::DataProxy*
ThinningSvc::proxy( const void* const pTransient ) const
{ return m_storeGate->proxy( pTransient ); }
SG::DataProxy*
ThinningSvc::proxy( const CLID& id, const std::string& key ) const
{ return m_storeGate->proxy( id, key ); }
SG::DataProxy*
ThinningSvc::proxy_exact (SG::sgkey_t sgkey) const
{ return m_storeGate->proxy_exact( sgkey ); }
std::vector<const SG::DataProxy*>
ThinningSvc::proxies() const
{ return m_storeGate->proxies(); }
StatusCode ThinningSvc::addToStore (CLID id, SG::DataProxy* proxy)
{
return m_storeGate->addToStore (id, proxy);
}
/**
* @brief Record an object in the store.
* @param obj The data object to store.
* @param key The key as which it should be stored.
* @param allowMods If false, the object will be recorded as const.
* @param returnExisting If true, return proxy if this key already exists.
*
* Full-blown record. @c obj should usually be something
* deriving from @c SG::DataBucket.
*
* Returns the proxy for the recorded object; nullptr on failure.
* If the requested CLID/key combination already exists in the store,
* the behavior is controlled by @c returnExisting. If true, then
* the existing proxy is returned; otherwise, nullptr is returned.
* In either case, @c obj is destroyed.
*/
SG::DataProxy*
ThinningSvc::recordObject (SG::DataObjectSharedPtr<DataObject> obj,
const std::string& key,
bool allowMods,
bool returnExisting)
{
return m_storeGate->recordObject (obj, key, allowMods, returnExisting);
}
///////////////////////////////////////////////////////////////////
// Const methods:
///////////////////////////////////////////////////////////////////
std::size_t
ThinningSvc::index_impl( const SG::DataProxy* cproxy,
std::size_t idx ) const
{
if ( nullptr == cproxy ) {
// this is now demoted to ::DEBUG as it may very well happen (even in
// sound cases) that e.g. an ElementLink has no proxy (e.g. it points to
// another file and BackNavigation was disabled)
ATH_MSG_DEBUG ("Received a null DataProxy. Probably would have required "
"BackNavigation (but irrelevant to thinning)");
return idx;
}
SG::DataProxy *proxy = const_cast<SG::DataProxy*>(cproxy);
const bool dbg = msgLvl(MSG::VERBOSE);
if ( dbg ) {
msg(MSG::VERBOSE)
<< "index_impl(" << proxy->name()
<< ", clid=[" << proxy->clID() << "]"
<< ", idx="
<< idx << ") -> [";
}
if ( !m_thinningOccurred ) {
if ( dbg ) {
msg(MSG::VERBOSE)
<< idx << "] (no-thinning-occured)"
<< endmsg;
}
return idx;
}
ThinningStore_t::const_iterator iEntry = m_thinningStore.find( proxy );
if ( iEntry == m_thinningStore.end() ) {
// no such object has been registered: not thinned
// old index holds
if ( dbg ) {
msg(MSG::VERBOSE)
<< idx << "] (obj-not-thinned)"
<< endmsg;
}
return idx;
}
const IndexMap_t& indexMap = iEntry->second.get<2>();
IndexMap_t::const_iterator i = indexMap.find( idx );
if ( i == indexMap.end() ) {
// element has been thinned away...
if ( dbg ) {
msg(MSG::VERBOSE) << "<removed>" << "]" << endmsg;
}
return IThinningSvc::RemovedIdx;
}
if ( dbg ) {
if (i->second != IThinningSvc::RemovedIdx) {
msg(MSG::VERBOSE) << i->second << "] (thinned)";
} else {
msg(MSG::VERBOSE) << "<removed>]";
}
msg(MSG::VERBOSE) << endmsg;
}
return i->second;
}
/** @brief test if a container is thinned
*/
bool
ThinningSvc::is_thinned_impl(const SG::DataProxy* p) const
{
// no thinning in that event occurred: return early
if ( !m_thinningOccurred ) {
return false;
}
if (nullptr == p) {
// this is now demoted to ::DEBUG as it may very well happen (even in
// sound cases) that e.g. an ElementLink has no proxy (e.g. it points to
// another file and BackNavigation was disabled)
ATH_MSG_DEBUG ("Received a null DataProxy. Probably would have required "
"BackNavigation (but irrelevant to thinning)");
return false;
}
SG::DataProxy *proxy = const_cast<SG::DataProxy*>(p);
return m_thinningStore.find( proxy ) != m_thinningStore.end();
}
///////////////////////////////////////////////////////////////////
// Non-const methods:
///////////////////////////////////////////////////////////////////
void ThinningSvc::handle( const Incident& inc )
{
ATH_MSG_DEBUG ("Entering handle(): " << endmsg
<< " Incidence type: " << inc.type() << endmsg
<< " from: " << inc.source());
// Only clearing the store for BeginEvent incident
if ( inc.type() == IncidentType::BeginEvent ) {
cleanupStore();
ATH_MSG_DEBUG ("[" << IncidentType::BeginEvent << "] handled");
return;
}
return;
}
StatusCode
ThinningSvc::commit()
{
bool allOk = true;
ATH_MSG_DEBUG
("applying registered slimmers (" << m_slimmingStore.size() << ")...");
for ( SlimmingStore_t::iterator
iStore = m_slimmingStore.begin(),
iStoreEnd = m_slimmingStore.end();
iStore != iStoreEnd;
++iStore ) {
iStore->second->commit();
}
ATH_MSG_DEBUG
("applying registered slimmers (" << m_slimmingStore.size() << ")...[ok]");
for ( ThinningStore_t::iterator
iStore = m_thinningStore.begin(),
iStoreEnd = m_thinningStore.end();
iStore != iStoreEnd;
++iStore ) {
if ( !this->commit( iStore, Policy::Delete ).isSuccess() ) {
allOk = false;
}
}
return allOk ? StatusCode::SUCCESS : StatusCode::FAILURE;
}
StatusCode
ThinningSvc::commit( ThinningStore_t::iterator iStore,
ThinningSvc::Policy::Type deletePolicy )
{
ATH_MSG_VERBOSE ("commit(iStore, "
<< ((deletePolicy==Policy::Delete) ? "delete" : "no del.")
<< ")");
Filter_t& keep = iStore->second.get<0>();
IThinningHdlr* handler = iStore->second.get<1>();
IndexMap_t& indexMap = iStore->second.get<2>();
const bool isMapping = handler->isMapping();
std::size_t thinnedIdx = 0;
for ( Filter_t::const_iterator i = keep.begin(), iEnd = keep.end();
i!= iEnd;
++i ) {
std::size_t idx = i->first;
bool do_keep = i->second;
if ( !do_keep ) {
indexMap[idx] = IThinningSvc::RemovedIdx;
if ( deletePolicy == Policy::Delete ) {
handler->remove(idx);
}
} else {
// if the underlying container uses a mapping-like indexing
// we don't want to screw its indexing: return the original index
indexMap[idx] = isMapping ? idx : thinnedIdx++;
}
}
if ( deletePolicy == Policy::Delete ) {
// we removed all thinned-elements: now we can pack the vector
// (and hope for the best...)
handler->commit();
}
return StatusCode::SUCCESS;
}
/** @brief Rollback the thinning actions:
* - restore the requested to-be-thinned elements
*/
StatusCode
ThinningSvc::rollback()
{
ATH_MSG_VERBOSE ("rollback()");
bool allOk = true;
// -- thinning first --------------------------------------------------------
ATH_MSG_DEBUG ("rolling-back thinning (" <<m_thinningStore.size()<< ")...");
for ( ThinningStore_t::iterator
iStore = m_thinningStore.begin(),
iStoreEnd = m_thinningStore.end();
iStore != iStoreEnd;
++iStore ) {
IThinningHdlr* handler = iStore->second.get<1>();
if ( handler ) {
handler->rollback();
} else {
allOk = false;
}
}
if ( !allOk ) {
ATH_MSG_WARNING ("some problem occurred while rolling-back thinning !"
<< endmsg
<< " (trying nonetheless to revert slimming)...");
} else {
ATH_MSG_DEBUG
("rolling-back thinning (" << m_thinningStore.size() << ")... [done]");
}
// -- then slimming ---------------------------------------------------------
ATH_MSG_DEBUG("rolling-back slimmers (" << m_slimmingStore.size() << ")...");
for ( SlimmingStore_t::iterator
iStore = m_slimmingStore.begin(),
iStoreEnd = m_slimmingStore.end();
iStore != iStoreEnd;
++iStore ) {
ISlimmingHdlr* handler = iStore->second;
if ( handler ) {
handler->rollback();
} else {
allOk = false;
}
}
if ( !allOk ) {
ATH_MSG_WARNING ("some problem occurred while rolling-back slimming !"
<< endmsg
<< " (trying nonetheless to revert slimming)...");
} else {
ATH_MSG_DEBUG
("rolling-back slimmers (" << m_slimmingStore.size() << ")... [done]");
}
return allOk ? StatusCode::SUCCESS : StatusCode::FAILURE;
}
IThinningHdlr*
ThinningSvc::handler( SG::DataProxy* proxy )
{
ThinningStore_t::iterator i = m_thinningStore.find( proxy );
if ( i == m_thinningStore.end() ) {
return nullptr;
}
return i->second.get<1>();
}
StatusCode
ThinningSvc::filter_impl( IThinningHdlr* handler, SG::DataProxy* proxy,
const Filter_t& filter,
const IThinningSvc::Operator::Type op )
{
if ( nullptr == proxy ) {
msg(MSG::WARNING)
<< "Received a null DataProxy ! "
<< "No thinning will occur for that 'object' !"
<< endmsg;
if (m_ownedHandlers.find(handler) == m_ownedHandlers.end())
delete handler;
return StatusCode::RECOVERABLE;
}
ThinningStore_t::iterator i = m_thinningStore.find( proxy );
if ( i == m_thinningStore.end() ) {
m_thinningStore[proxy] = boost::make_tuple( filter,
handler,
ThinningSvc::IndexMap_t() );
m_ownedHandlers.insert (handler);
i = m_thinningStore.find( proxy );
} else {
if (m_ownedHandlers.find(handler) == m_ownedHandlers.end())
delete handler;
typedef Filter_t::const_iterator Iter_t;
Filter_t& m = i->second.get<0>();
if ( op == IThinningSvc::Operator::And ) {
for ( Iter_t itr = filter.begin(), iEnd = filter.end();
itr != iEnd;
++itr ) {
Filter_t::iterator mitr = m.find (itr->first);
if (mitr==m.end()) {
ATH_MSG_ERROR ("could not apply filter for proxy [" << proxy->name()
<< "]"
<< endmsg
<< "indices do not match !");
return StatusCode::FAILURE;
}
mitr->second = mitr->second && itr->second;
}
} else if ( op == IThinningSvc::Operator::Or ) {
for ( Iter_t itr = filter.begin(), iEnd = filter.end();
itr != iEnd;
++itr ) {
Filter_t::iterator mitr = m.find (itr->first);
if (mitr==m.end()) {
ATH_MSG_ERROR ("could not apply filter for proxy [" << proxy->name()
<< "]"
<< endmsg
<< "indices do not match !");
return StatusCode::FAILURE;
}
mitr->second = mitr->second || itr->second;
}
} else {
msg(MSG::ERROR)
<< "Unknown operator being passed for merging of filters !!"
<< endmsg;
return StatusCode::FAILURE;
}
}
m_thinningOccurred = true;
return this->commit( i, Policy::DontDelete );
}
/** Helper method to clean-up thinning handlers and store
*/
void
ThinningSvc::cleanupStore()
{
// cleanup slimming store
for ( SlimmingStore_t::iterator
iStore = m_slimmingStore.begin(),
iStoreEnd = m_slimmingStore.end();
iStore != iStoreEnd;
++iStore ) {
delete iStore->second; iStore->second = nullptr;
}
m_slimmingStore.clear();
// cleanup thinning store
for ( ThinningStore_t::iterator
iStore = m_thinningStore.begin(),
iStoreEnd = m_thinningStore.end();
iStore != iStoreEnd;
++iStore ) {
IThinningHdlr* hdlr = iStore->second.get<1>();
delete hdlr; hdlr = nullptr;
}
m_thinningStore.clear();
m_ownedHandlers.clear();
m_thinningOccurred = false;
}
/** @brief register a slimming handler with the @c IThinningSvc
* Registering a @c ISlimmingHdlr allows to slim an object and keep the
* side effects of slimming it 'guarded' within one output stream.
* The original state (before slimming) of the object will be restored after
* to-disk serialization took place.
* Note: @c IThinningSvc will take ownership of the handler.
*/
StatusCode
ThinningSvc::register_slimmer (ISlimmingHdlr *handler)
{
if (nullptr==handler) {
msg (MSG::ERROR)
<< "received null pointer to ISlimmingHdlr !"
<< endmsg;
return StatusCode::FAILURE;
}
SlimmedObj_t obj = handler->object();
SlimmingStore_t::iterator itr = m_slimmingStore.find (obj);
if ( itr != m_slimmingStore.end() ) {
msg (MSG::ERROR)
<< "A slimmer was already registered for object at [" << obj << "] of "
<< "type [" << System::typeinfoName (handler->type_id()) << "] !"
<< endmsg
<< "Previous slimmer was registered by ["
<< itr->second->requester()->name() << "]"
<< endmsg
<< "Current slimmer is being registered by ["
<< handler->requester()->name() << "]"
<< endmsg
<< "sorry: multi-pass slimming isn't supported !"
<< endmsg
<< " --- now deleting your slim-handler ---"
<< endmsg;
delete handler; handler = nullptr;
return StatusCode::FAILURE;
}
// note that we re-use this notifier so as to not overly complicate the API.
// time will tell if that was a good idea...
m_thinningOccurred = true;
m_slimmingStore[obj] = handler;
return StatusCode::SUCCESS;
}
/**
* @brief Find the key for a string/CLID pair.
* @param str The string to look up.
* @param clid The CLID associated with the string.
* @return A key identifying the string.
* A given string will always return the same key.
* Will abort in case of a hash collision!
*/
SG::sgkey_t ThinningSvc::stringToKey (const std::string& str, CLID clid)
{
return m_storeGate->stringToKey (str, clid);
}
/**
* @brief Find the string corresponding to a given key.
* @param key The key to look up.
* @return Pointer to the string found, or null.
* We can find keys as long as the corresponding string
* was given to either @c stringToKey() or @c registerKey().
*/
const std::string* ThinningSvc::keyToString (sgkey_t key) const
{
return m_storeGate->keyToString (key);
}
/**
* @brief Find the string and CLID corresponding to a given key.
* @param key The key to look up.
* @param clid[out] The found CLID.
* @return Pointer to the string found, or null.
* We can find keys as long as the corresponding string
* was given to either @c stringToKey() or @c registerKey().
*/
const std::string* ThinningSvc::keyToString (sgkey_t key,
CLID& clid) const
{
return m_storeGate->keyToString (key, clid);
}
/**
* @brief Remember an additional mapping from key to string/CLID.
* @param key The key to enter.
* @param str The string to enter.
* @param clid The CLID associated with the string.
* @return True if successful; false if the @c key already
* corresponds to a different string.
*
* This registers an additional mapping from a key to a string;
* it can be found later through @c lookup() on the string.
* Logs an error if @c key already corresponds to a different string.
*/
void ThinningSvc::registerKey (sgkey_t key,
const std::string& str,
CLID clid)
{
m_storeGate->registerKey (key, str, clid);
}
///////////////////////// -*- C++ -*- /////////////////////////////
/*
Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
*/
// ThinningSvc.h
// Header file for class ThinningSvc
// Author: S.Binet<binet@cern.ch>
///////////////////////////////////////////////////////////////////
#ifndef ATHENASERVICES_THINNINGSVC_H
#define ATHENASERVICES_THINNINGSVC_H
/** @file ThinningSvc.h
* @brief This service allows to thin
* (remove elements from) a DataVector-like container while the
* container is being written out. See AthExThinning for an example of usage.
* $Id: ThinningSvc.h,v 1.12 2008-09-08 17:10:19 binet Exp $
* @author S.Binet<binet@cern.ch>
*/
// STL includes
#include <string>
#include <vector>
// boost includes
#include <boost/tuple/tuple.hpp>
// FrameWork includes
#include "GaudiKernel/ServiceHandle.h"
#include "GaudiKernel/IIncidentListener.h"
#include "AthenaBaseComps/AthService.h"
// AthenaKernel includes
#include "AthenaKernel/IThinningHdlr.h"
#include "AthenaKernel/ISlimmingHdlr.h"
#include "AthenaKernel/IThinningSvc.h"
#include <unordered_map>
#include <unordered_set>
// Forward declaration
class DataBucketBase;
class ISvcLocator;
class StoreGateSvc;
template <class TYPE> class SvcFactory;
/**
* @class ThinningSvc
* @brief This service allows to thin
* (remove elements from) a DataVector-like container while the
* container is being written out. See AthExThinning for an example of usage.
*/
class ThinningSvc : virtual public IThinningSvc,
virtual public IIncidentListener,
public AthService
{
protected:
friend class SvcFactory<ThinningSvc>;
///////////////////////////////////////////////////////////////////
// Public typedefs:
///////////////////////////////////////////////////////////////////
public:
/// the type of the map holding the index-to-index conversion
/// (that is the index before and after the thinning took place)
typedef std::unordered_map<std::size_t, std::size_t> IndexMap_t;
///////////////////////////////////////////////////////////////////
// Public methods:
///////////////////////////////////////////////////////////////////
public:
// Copy constructor:
/// Constructor with parameters:
ThinningSvc( const std::string& name, ISvcLocator* pSvcLocator );
/// Destructor:
virtual ~ThinningSvc() override;
/// Gaudi Service Implementation
//@{
virtual StatusCode initialize() override;
virtual StatusCode finalize() override;
virtual StatusCode queryInterface( const InterfaceID& riid,
void** ppvInterface ) override;
//@}
/// get proxy for a given data object address in memory
virtual SG::DataProxy* proxy( const void* const pTransient ) const override final;
/// get proxy with given id and key. Returns 0 to flag failure
virtual SG::DataProxy* proxy( const CLID& id, const std::string& key ) const override final;
/// Get proxy given a hashed key+clid.
/// Find an exact match; no handling of aliases, etc.
/// Returns 0 to flag failure.
virtual SG::DataProxy* proxy_exact (SG::sgkey_t sgkey) const override final;
/// return the list of all current proxies in store
virtual std::vector<const SG::DataProxy*> proxies() const override final;
/// Add a new proxy to the store.
virtual StatusCode addToStore (CLID id, SG::DataProxy* proxy) override final;
/**
* @brief Record an object in the store.
* @param obj The data object to store.
* @param key The key as which it should be stored.
* @param allowMods If false, the object will be recorded as const.
* @param returnExisting If true, return proxy if this key already exists.
*
* Full-blown record. @c obj should usually be something
* deriving from @c SG::DataBucket.
*
* Returns the proxy for the recorded object; nullptr on failure.
* If the requested CLID/key combination already exists in the store,
* the behavior is controlled by @c returnExisting. If true, then
* the existing proxy is returned; otherwise, nullptr is returned.
* In either case, @c obj is destroyed.
*/
virtual
SG::DataProxy* recordObject (SG::DataObjectSharedPtr<DataObject> obj,
const std::string& key,
bool allowMods,
bool returnExisting) override final;
///@}
/// IStringPool implementation.
//@{
/**
* @brief Find the key for a string/CLID pair.
* @param str The string to look up.
* @param clid The CLID associated with the string.
* @return A key identifying the string.
* A given string will always return the same key.
* Will abort in case of a hash collision!
*/
virtual
sgkey_t stringToKey (const std::string& str, CLID clid) override final;
/**
* @brief Find the string corresponding to a given key.
* @param key The key to look up.
* @return Pointer to the string found, or null.
* We can find keys as long as the corresponding string
* was given to either @c stringToKey() or @c registerKey().
*/
virtual
const std::string* keyToString (sgkey_t key) const override final;
/**
* @brief Find the string and CLID corresponding to a given key.
* @param key The key to look up.
* @param clid[out] The found CLID.
* @return Pointer to the string found, or null.
* We can find keys as long as the corresponding string
* was given to either @c stringToKey() or @c registerKey().
*/
virtual
const std::string* keyToString (sgkey_t key,
CLID& clid) const override final;
/**
* @brief Remember an additional mapping from key to string/CLID.
* @param key The key to enter.
* @param str The string to enter.
* @param clid The CLID associated with the string.
* @return True if successful; false if the @c key already
* corresponds to a different string.
*
* This registers an additional mapping from a key to a string;
* it can be found later through @c lookup() on the string.
* Logs an error if @c key already corresponds to a different string.
*/
virtual
void registerKey (sgkey_t key,
const std::string& str,
CLID clid) override final;
///@}
///////////////////////////////////////////////////////////////////
// Const methods:
///////////////////////////////////////////////////////////////////
/** @brief Tell clients if any thinning occurred during the event processing
*/
virtual
bool thinningOccurred() const override
{ return m_thinningOccurred; }
///////////////////////////////////////////////////////////////////
// Non-const methods:
///////////////////////////////////////////////////////////////////
static const InterfaceID& interfaceID();
///////////////////////////////////////////////////////////////////
// Protected types and enums:
///////////////////////////////////////////////////////////////////
protected:
struct Policy {
enum Type {
Delete = 0,
DontDelete = 1
};
};
typedef boost::tuple<IThinningSvc::Filter_t,
Athena::IThinningHdlr*,
ThinningSvc::IndexMap_t> ThinningStoreValue_t;
typedef std::unordered_map< SG::DataProxy*,
ThinningStoreValue_t > ThinningStore_t;
typedef void* SlimmedObj_t;
typedef std::unordered_map< SlimmedObj_t,
Athena::ISlimmingHdlr* > SlimmingStore_t;
///////////////////////////////////////////////////////////////////
// Protected methods:
///////////////////////////////////////////////////////////////////
protected:
/** incident service handle for {Begin,End}Event
*/
virtual void handle( const Incident& incident ) override;
/** Register the map of index-before-thinning/index-after-thinning
* for all containers
*/
virtual StatusCode commit() override;
/** Register the map of index-before-thinning/index-after-thinning
* for a given container.
* It will also remove the 'to-be-thinned' elements, if asked for.
*/
StatusCode commit( ThinningStore_t::iterator iStore,
ThinningSvc::Policy::Type deletePolicy );
/** Rollback the thinning actions:
* - restore the requested to-be-thinned elements
*/
virtual StatusCode rollback() override;
/** Get the index after thinning of a given container, providing
* the old index.
* Returns IThinningSvc::RemovedIdx if the element asked-for has been
* removed during thinning.
*/
virtual
std::size_t index_impl(const SG::DataProxy* proxy, std::size_t idx ) const
override;
/** @brief test if a container is thinned
*/
virtual
bool is_thinned_impl(const SG::DataProxy* p) const override;
/** @brief Retrieve the handler (if any) to thin a @c DataProxy
*/
virtual
Athena::IThinningHdlr* handler( SG::DataProxy* proxy ) override;
/** Build the 'db' of elements to be kept for each container
*/
virtual
StatusCode filter_impl( Athena::IThinningHdlr* handler,
SG::DataProxy* proxy,
const Filter_t& filter,
const IThinningSvc::Operator::Type op ) override;
/** Helper method to clean-up thinning handlers and store
*/
void cleanupStore();
/** @brief register a slimming handler with the @c IThinningSvc
* Registering a @c ISlimmingHdlr allows to slim an object and keep the
* side effects of slimming it 'guarded' within one output stream.
* The original state (before slimming) of the object will be restored after
* to-disk serialization took place.
* Note: @c IThinningSvc will take ownership of the handler.
*/
virtual
StatusCode register_slimmer (Athena::ISlimmingHdlr *handler) override;
/// Default constructor:
ThinningSvc();
///////////////////////////////////////////////////////////////////
// Protected data:
///////////////////////////////////////////////////////////////////
protected:
/// the store of slimmers
SlimmingStore_t m_slimmingStore;
/// the store of everything.
/// It contains the associations:
/// [originalProxy] : [filter, deleter,index]
ThinningStore_t m_thinningStore;
/// keep track of thinning state: if any thinning occured
/// (ie: @c ThinningSvc::filter_impl was called) this variable should be true
bool m_thinningOccurred;
typedef ServiceHandle<StoreGateSvc> StoreGateSvc_t;
/// Pointer to @c StoreGateSvc
StoreGateSvc_t m_storeGate;
/** The names of output stream(s) we want to apply thinning on
*/
StringArrayProperty m_thinnedOutStreamNames;
/// Set of owned handlers.
typedef std::unordered_set<Athena::IThinningHdlr*> HandlerSet_t;
HandlerSet_t m_ownedHandlers;
};
/// I/O operators
//////////////////////
///////////////////////////////////////////////////////////////////
/// Inline methods:
///////////////////////////////////////////////////////////////////
inline const InterfaceID& ThinningSvc::interfaceID()
{
return IThinningSvc::interfaceID();
}
#endif //> ATHENASERVICES_THINNINGSVC_H
......@@ -6,7 +6,6 @@
#include "../MultipleEventLoopMgr.h"
#include "../SimplePOSIXTimeKeeperSvc.h"
#include "../MixingEventSelector.h"
#include "../ThinningSvc.h"
#include "../ThinningCacheTool.h"
//#include "../EventDumperSvc.h"
#include "../MemoryRescueSvc.h"
......@@ -47,7 +46,6 @@ DECLARE_COMPONENT( AthenaMtesEventLoopMgr )
DECLARE_COMPONENT( PyAthenaEventLoopMgr )
DECLARE_COMPONENT( SimplePOSIXTimeKeeperSvc )
DECLARE_COMPONENT( MixingEventSelector )
DECLARE_COMPONENT( ThinningSvc )
DECLARE_COMPONENT( MemoryRescueSvc )
DECLARE_COMPONENT( FPEControlSvc )
DECLARE_COMPONENT( JobIDSvc )
......
# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
# This is the job options that does AOD based skimming and thinning
###############################################################
#
# Job options file
#
# Authors: Davide Costanzo, Stathes Paganis, Sven Menke
# Anastopoulos Christos, Sebastien Binet
#==============================================================
import AthenaCommon.Constants as Lvl
from AthenaCommon.AppMgr import ServiceMgr as svcMgr
from AthenaCommon.AppMgr import theApp
## load POOL support
import AthenaPoolCnvSvc.ReadAthenaPool
svcMgr.AthenaSealSvc.OutputLevel = Lvl.ERROR
include( "ParticleBuilderOptions/AOD_PoolCnv_jobOptions.py")
include( "ParticleBuilderOptions/McAOD_PoolCnv_jobOptions.py")
include( "ParticleBuilderOptions/ESD_PoolCnv_jobOptions.py" )
## general job configuration
from AthenaCommon.AlgSequence import AlgSequence
topSequence = AlgSequence()
###############################
# Load thinning service
###############################
from AthenaServices.Configurables import ThinningSvc
svcMgr += ThinningSvc()
# the list of output streams to thin
svcMgr.ThinningSvc.Streams += ['StreamDPD']
#Needed for lxplus ?
include("RecExCond/AllDet_detDescr.py")
#--------------------------------------------------------------
# Event related parameters
#--------------------------------------------------------------
theApp.EvtMax = 10
#--------------------------------------------------------------
# Set output level threshold (2=DEBUG, 3=INFO, 4=WARNING, 5=ERROR, 6=FATAL )
#--------------------------------------------------------------
MessageSvc = svcMgr.MessageSvc
MessageSvc.OutputLevel = Lvl.INFO
svcMgr.EventSelector.InputCollections = [ "rfio:/castor/cern.ch/grid/atlas/tzero/prod2/perm/fdr08_run2/physics_Egamma/0052301/fdr08_run2.0052301.physics_Egamma.recon.ESD.o3_f8/fdr08_run2.0052301.physics_Egamma.recon.ESD.o3_f8._lb0071._0001.1" ]
#svcMgr.EventSelector.InputCollections = [ "/afs/cern.ch/atlas/offline/ProdData/13.0.X/DC3.007218.singlepart_mu20_ATLAS-CSC-01-00-00_DEFAULT.esd.pool.root" ]
#svcMgr.EventSelector.InputCollections =inFileName
from JetRec.JetRecFlags import jetFlags
jetFlags.inputFileType = "AOD" #
from JetRec.JetGetters import *
Kt4alg = make_StandardJetGetter('Kt', 0.4, 'LCTopo').jetAlgorithmHandle()
Kt4alg.OutputLevel = 2
Kt4talg = make_StandardJetGetter('Kt', 0.4, 'Truth').jetAlgorithmHandle()
Kt4talg.OutputLevel = 2
#from DPDUtils.DpdLib import ttbarFilter
#topSequence += ttbarFilter()
#topSequence.ttbarFilter.jetsName = 'Kt4LCTopoJets' # use new LC topo jets
#topSequence.ttbarFilter.maxDeltaRcx = 100.4 # effectively switch off
# cluster thinning
#print topSequence
#==============================================================
#
# End of job options file
#
###############################################################
#--------------------------------------------------------------
#--- Secondary Write portion ----- Don't change it !!!
#--------------------------------------------------------------
from AthenaPoolCnvSvc.WriteAthenaPool import AthenaPoolOutputStream
StreamDPD = AthenaPoolOutputStream("StreamDPD")
StreamDPD.TakeItemsFromInput = True
#StreamDPD.ItemList += ['CaloClusterContainer#CaloCalTopoCluster']
#StreamDPD.ItemList += ['JetCollection#Cone4H1TowerJets']
#StreamDPD.ItemList += ['JetKeyDescriptor#JetKeyMap']
#StreamDPD.ItemList += ['Rec::TrackParticleContainer#TrackParticleCandidate']
#StreamDPD.ItemList += ['Rec::TrackParticleContainer#StacoTrackParticles']
#StreamDPD.ItemList += ['TruthParticleContainer#SpclMC']
#StreamDPD.ItemList += ['McEventCollection#GEN_AOD']
StreamDPD.ExcludeList =['CaloClusterContainer#CaloCalTopoCluster']
StreamDPD.ExcludeList+=['TrigRoiDescriptor#HLTAutoKey*']
StreamDPD.ExcludeList+=['98849495#ConvertedMBoySegments']
StreamDPD.ExcludeList+=['107385089#*']
StreamDPD.ExcludeList+=['Rec::TrackParticleContainer#TrackParticleCandidate']
StreamDPD.OutputLevel = DEBUG
StreamDPD.OutputFile = "SkimmedThin.AOD.pool.root"
#StreamDPD.OutputFile = outFileName
StreamDPD.AcceptAlgs=["ttbarFilter"]
ChronoStatSvc = Service( "ChronoStatSvc" )
ChronoStatSvc.ChronoDestinationCout = True
ChronoStatSvc.PrintUserTime = True
ChronoStatSvc.PrintSystemTime = True
ChronoStatSvc.PrintEllapsedTime = True
AthenaPoolCnvSvc = Service( "AthenaPoolCnvSvc" )
AthenaPoolCnvSvc.UseDetailChronoStat = True
#MessageSvc = Service("MessageSvc")
#MessageSvc.OutputLevel = DEBUG
#MessageSvc.OutputLevel = ERROR
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment