Commit a1a8b4f7 authored by Charles Leggett's avatar Charles Leggett
Browse files

Introduce a generic IDataStoreAgent implementation

See merge request gaudi/Gaudi!661
parents 7d1f4368 649749a0
......@@ -17,7 +17,6 @@
#include "GaudiKernel/strcasecmp.h"
#include "OutputStream.h"
#include "OutputStreamAgent.h"
#include <set>
......@@ -27,8 +26,7 @@ DECLARE_COMPONENT( OutputStream )
#define ON_DEBUG if ( msgLevel( MSG::DEBUG ) )
// Standard Constructor
OutputStream::OutputStream( const std::string& name, ISvcLocator* pSvcLocator )
: Algorithm( name, pSvcLocator ), m_agent{new OutputStreamAgent( this )}
OutputStream::OutputStream( const std::string& name, ISvcLocator* pSvcLocator ) : Algorithm( name, pSvcLocator )
{
// Associate action handlers with the AcceptAlgs, RequireAlgs and VetoAlgs.
m_acceptNames.declareUpdateHandler( &OutputStream::acceptAlgsHandler, this );
......@@ -242,7 +240,7 @@ StatusCode OutputStream::collectObjects()
m_currentItem = i;
StatusCode iret = m_pDataProvider->retrieveObject( m_currentItem->path(), obj );
if ( iret.isSuccess() ) {
iret = m_pDataManager->traverseSubTree( obj, m_agent.get() );
iret = collectFromSubTree( obj );
if ( !iret.isSuccess() ) status = iret;
} else {
error() << "Cannot write mandatory object(s) (Not found) " << m_currentItem->path() << endmsg;
......@@ -252,12 +250,10 @@ StatusCode OutputStream::collectObjects()
// Traverse the tree and collect the requested objects (tolerate missing items here)
for ( auto& i : m_optItemList ) {
DataObject* obj = nullptr;
m_currentItem = i;
StatusCode iret = m_pDataProvider->retrieveObject( m_currentItem->path(), obj );
if ( iret.isSuccess() ) {
iret = m_pDataManager->traverseSubTree( obj, m_agent.get() );
}
DataObject* obj = nullptr;
m_currentItem = i;
StatusCode iret = m_pDataProvider->retrieveObject( m_currentItem->path(), obj );
if ( iret.isSuccess() ) iret = collectFromSubTree( obj );
if ( !iret.isSuccess() ) {
ON_DEBUG
debug() << "Ignore request to write non-mandatory object(s) " << m_currentItem->path() << endmsg;
......@@ -276,7 +272,7 @@ StatusCode OutputStream::collectObjects()
m_currentItem = i;
StatusCode iret = m_pDataProvider->retrieveObject( m_currentItem->path(), obj );
if ( iret.isSuccess() ) {
iret = m_pDataManager->traverseSubTree( obj, m_agent.get() );
iret = collectFromSubTree( obj );
if ( !iret.isSuccess() ) status = iret;
} else {
error() << "Cannot write mandatory (algorithm dependent) object(s) (Not found) " << m_currentItem->path()
......@@ -562,3 +558,9 @@ bool OutputStream::hasInput() const
{
return !( m_itemNames.empty() && m_optItemNames.empty() && m_algDependentItemList.empty() );
}
StatusCode OutputStream::collectFromSubTree( DataObject* pObj )
{
return m_pDataManager->traverseSubTree( pObj,
[this]( IRegistry* r, int level ) { return this->collect( r, level ); } );
}
......@@ -6,7 +6,6 @@
#include "GaudiKernel/IDataSelector.h"
#include "GaudiKernel/IIncidentSvc.h"
#include "GaudiKernel/Property.h"
#include "OutputStreamAgent.h"
// STL include files
#include <memory>
......@@ -73,8 +72,6 @@ protected:
/// Output type: NEW(NEW,CREATE,WRITE,RECREATE), UPDATE)
std::string m_outputType = "UPDATE";
/// Keep reference of agent
std::unique_ptr<OutputStreamAgent> m_agent;
/// Keep reference to the data provider service
SmartIF<IDataProviderSvc> m_pDataProvider;
/// Keep reference to the data manager service
......@@ -157,6 +154,8 @@ private:
void addItem( Items& itms, const std::string& descriptor );
/// Return the list of selected objects
IDataSelector* selectedObjects() { return &m_objects; }
StatusCode collectFromSubTree( DataObject* );
};
#endif // GAUDISVC_PERSISTENCYSVC_OUTPUTSTREAM_H
//====================================================================
// OutputStreamAgent.cpp
//--------------------------------------------------------------------
//
// Package : (The LHCb PersistencySvc service)
//
// Description: Implementation of the OutputStream Agent
//
// Author : M.Frank
// Created : 13/1/99
// Changes :
//
//====================================================================
#define PERSISTENCYSVC_OUTPUTSTREAMAGENT_CPP
// Framework includes
#include "OutputStreamAgent.h"
#include "GaudiKernel/IDataManagerSvc.h"
#include "OutputStream.h"
/// Standard Constructor
OutputStreamAgent::OutputStreamAgent( OutputStream* OutputStream ) : m_OutputStream( OutputStream ) {}
/// Analysis callback
bool OutputStreamAgent::analyse( IRegistry* pRegistry, int level )
{
return m_OutputStream->collect( pRegistry, level );
}
//====================================================================
// OutputStreamAgent.h
//--------------------------------------------------------------------
//
// Package : (The LHCb PersistencySvc service)
//
// Description: Definition the OutputStream Agent
//
// Author : M.Frank
// Created : 13/1/99
// Changes :
//
//====================================================================
#ifndef PERSISTENCYSVC_OUTPUTSTREAMAGENT_H
#define PERSISTENCYSVC_OUTPUTSTREAMAGENT_H
// Framework includes
#include "GaudiKernel/IDataStoreAgent.h"
// Foreward declarations
class IRegistry;
class OutputStream;
/** @name The OutputStreamAgent class.
Data store Agent to traverse data store trees and select all
items to be written to the output file.
@author Markus Frank
*/
class OutputStreamAgent : virtual public IDataStoreAgent
{
/// Reference to data writer
OutputStream* m_OutputStream = nullptr;
public:
/// Standard Constructor
OutputStreamAgent( OutputStream* OutputStream );
/// Analysis callback
bool analyse( IRegistry* dir, int level ) override;
};
#endif // PERSISTENCYSVC_OUTPUTSTREAMAGENT_H
......@@ -7,13 +7,13 @@
#include "GaudiKernel/IDataProviderSvc.h"
#include "GaudiKernel/IEvtSelector.h"
#include "GaudiKernel/IIncidentSvc.h"
#include "GaudiKernel/IRegistry.h"
#include "GaudiKernel/Incident.h"
#include "GaudiKernel/MsgStream.h"
#include <chrono>
#include "EventLoopMgr.h"
#include "HistogramAgent.h"
// Instantiation of a static factory class used by clients to create instances of this service
DECLARE_COMPONENT( EventLoopMgr )
......@@ -63,7 +63,7 @@ StatusCode EventLoopMgr::initialize()
// We do not expect a Event Selector necessarily being declared
setProperty( m_appMgrProperty->getProperty( "EvtSel" ) ).ignore();
if ( m_evtsel != "NONE" || m_evtsel.length() == 0 ) {
if ( m_evtsel != "NONE" || m_evtsel.empty() ) {
m_evtSelector = serviceLocator()->service( "EventSelector" );
if ( m_evtSelector ) {
// Setup Event Selector
......@@ -182,10 +182,8 @@ StatusCode EventLoopMgr::stop()
//--------------------------------------------------------------------------------------------
StatusCode EventLoopMgr::finalize()
{
StatusCode sc;
// Finalize base class
sc = MinimalEventLoopMgr::finalize();
StatusCode sc = MinimalEventLoopMgr::finalize();
if ( !sc.isSuccess() ) {
error() << "Error finalizing base class" << endmsg;
return sc;
......@@ -193,27 +191,27 @@ StatusCode EventLoopMgr::finalize()
// Save Histograms Now
if ( m_histoPersSvc ) {
HistogramAgent agent;
sc = m_histoDataMgrSvc->traverseTree( &agent );
std::vector<DataObject*> objects;
sc = m_histoDataMgrSvc->traverseTree( [&objects]( IRegistry* reg, int ) {
DataObject* obj = reg->object();
if ( !obj || obj->clID() == CLID_StatisticsFile ) return false;
objects.push_back( obj );
return true;
} );
if ( sc.isSuccess() ) {
IDataSelector* objects = agent.selectedObjects();
// skip /stat entry!
if ( objects->size() > 0 ) {
for ( auto& i : *objects ) {
IOpaqueAddress* pAddr = nullptr;
StatusCode iret = m_histoPersSvc->createRep( i, pAddr );
if ( iret.isSuccess() ) {
i->registry()->setAddress( pAddr );
} else {
sc = iret;
}
}
for ( auto& i : *objects ) {
IRegistry* reg = i->registry();
StatusCode iret = m_histoPersSvc->fillRepRefs( reg->address(), i );
if ( !iret.isSuccess() ) sc = iret;
}
}
sc = std::accumulate( begin( objects ), end( objects ), sc, [&]( StatusCode isc, auto& i ) {
IOpaqueAddress* pAddr = nullptr;
StatusCode iret = m_histoPersSvc->createRep( i, pAddr );
if ( iret.isFailure() ) return iret;
i->registry()->setAddress( pAddr );
return isc;
} );
sc = std::accumulate( begin( objects ), end( objects ), sc, [&]( StatusCode isc, auto& i ) {
IRegistry* reg = i->registry();
StatusCode iret = m_histoPersSvc->fillRepRefs( reg->address(), i );
return iret.isFailure() ? iret : isc;
} );
if ( sc.isSuccess() ) {
info() << "Histograms converted successfully according to request." << endmsg;
} else {
......@@ -355,13 +353,13 @@ StatusCode EventLoopMgr::nextEvent( int maxevt )
return sc;
}
}
time_point end_time = Clock::now();
if ( UNLIKELY( outputLevel() <= MSG::DEBUG ) )
debug() << "---> Loop Finished - "
<< " WSS " << System::mappedMemory( System::MemoryUnit::kByte ) * oneOver1024
<< " | total time (skipping 1st evt) "
<< std::chrono::duration_cast<std::chrono::nanoseconds>( end_time - start_time ).count() << " ns" << endmsg;
<< std::chrono::duration_cast<std::chrono::nanoseconds>( Clock::now() - start_time ).count() << " ns"
<< endmsg;
return StatusCode::SUCCESS;
}
......@@ -378,9 +376,7 @@ StatusCode EventLoopMgr::getEventRoot( IOpaqueAddress*& refpAddr )
sc = m_evtSelector->next( *m_evtContext );
if ( sc.isSuccess() ) {
sc = m_evtSelector->createAddress( *m_evtContext, refpAddr );
if ( !sc.isSuccess() ) {
warning() << "Error creating IOpaqueAddress." << endmsg;
}
if ( !sc.isSuccess() ) warning() << "Error creating IOpaqueAddress." << endmsg;
}
}
return sc;
......
#ifndef GAUDIKERNEL_HISTOGRAMAGENT_H
#define GAUDIKERNEL_HISTOGRAMAGENT_H
#include "GaudiKernel/ClassID.h"
#include "GaudiKernel/IDataSelector.h"
#include "GaudiKernel/IDataStoreAgent.h"
#include "GaudiKernel/IRegistry.h"
/** @class HistogramAgent HistogramAgent.h GaudiKernel/HistogramAgent.h
HistogramAgent base in charge of collecting all the refereces to
DataObjects in a transient store that passes some selection criteria. The
default behaviour is to collect all entries.
@author Markus Frank
*/
class HistogramAgent : virtual public IDataStoreAgent
{
protected:
IDataSelector m_objects;
public:
/// Default creator
HistogramAgent() = default;
/// Return the set of selected DataObjects
IDataSelector* selectedObjects() { return &m_objects; }
/// Analyses a given directory entry
bool analyse( IRegistry* pRegistry, int ) override
{
DataObject* obj = pRegistry->object();
if ( obj && obj->clID() != CLID_StatisticsFile ) {
m_objects.push_back( obj );
return true;
}
return false;
}
};
#endif // GAUDIKERNEL_HISTOGRAMAGENT_H
......@@ -89,20 +89,6 @@ namespace
{
std::for_each( begin( c ), end( c ), with_lock( std::forward<Fun>( f ) ) );
}
class DataAgent : virtual public IDataStoreAgent
{
DataObjIDColl& m_dataObjects;
public:
DataAgent( DataObjIDColl& objs ) : m_dataObjects( objs ) {}
bool analyse( IRegistry* pReg, int ) override
{
if ( !pReg->object() ) return false;
m_dataObjects.insert( DataObjID( pReg->identifier() ) );
return true;
}
};
}
TTHREAD_TLS( Synced<Partition>* ) s_current = nullptr;
......@@ -340,8 +326,11 @@ public:
{
return s_current->with_lock( []( Partition& p ) {
StatusCode sc = p.dataProvider->preLoad();
DataAgent da( p.newDataObjects );
p.dataManager->traverseTree( &da );
p.dataManager->traverseTree( [&p]( IRegistry* pReg, int ) {
if ( !pReg->object() ) return false;
p.newDataObjects.insert( DataObjID( pReg->identifier() ) );
return true;
} );
return sc;
} );
}
......
#ifndef GAUDIKERNEL_DATASELECTIONAGENT_H
#define GAUDIKERNEL_DATASELECTIONAGENT_H
#include "GaudiKernel/IDataSelector.h"
#include "GaudiKernel/IDataStoreAgent.h"
#include "GaudiKernel/IRegistry.h"
/** @class DataSelectionAgent DataSelectionAgent.h GaudiKernel/DataSelectionAgent.h
DataSelectionAgent base in charge of collecting all the refereces to
DataObjects in a transient store that passes some selection criteria. The
default behaviour is to collect all entries.
@author Markus Frank
*/
class DataSelectionAgent : virtual public IDataStoreAgent
{
protected:
IDataSelector m_objects;
public:
/// Default creator
DataSelectionAgent() {}
/// Destructor
virtual ~DataSelectionAgent() {}
/// Return the set of selected DataObjects
IDataSelector* selectedObjects() { return &m_objects; }
/// Analyses a given directory entry
bool analyse( IRegistry* pRegistry, int ) override
{
DataObject* obj = pRegistry->object();
if ( 0 != obj ) m_objects.push_back( obj );
return true;
}
};
#endif // GAUDIKERNEL_DATASELECTIONAGENT_H
......@@ -3,6 +3,7 @@
// Include files
#include "GaudiKernel/ClassID.h"
#include "GaudiKernel/IDataStoreAgent.h"
#include "GaudiKernel/IInterface.h"
#include "boost/utility/string_ref.hpp"
#include <string>
......@@ -13,8 +14,6 @@
class DataObject;
// Interface to persistency service
class IConversionSvc;
// Data agent
class IDataStoreAgent;
// Opaque addresses
class IOpaqueAddress;
// Registry entry definition
......@@ -121,6 +120,19 @@ struct GAUDI_API IDataManagerSvc : extend_interfaces<IInterface> {
*/
virtual StatusCode traverseSubTree( boost::string_ref sub_path, IDataStoreAgent* pAgent ) = 0;
/** Analyse by traversing all data objects below the sub tree identified by its full path name.
@param sub_path [IN] Path to sub-tree node.
@param f [IN] callable which will be called for each data object
it should have the signature bool(IRegistry*,int level)
@return Status code indicating success or failure.
*/
template <typename F, typename = std::enable_if_t<!std::is_convertible<F, IDataStoreAgent*>::value>>
StatusCode traverseSubTree( boost::string_ref sub_path, F&& f )
{
auto agent = makeDataStoreAgent( std::forward<F>( f ) );
return traverseSubTree( sub_path, &agent );
}
/** Analyse by traversing all data objects below the sub tree
identified by the object. The object itself is removed as well.
@param pObject [IN] Pointer to object
......@@ -129,11 +141,37 @@ struct GAUDI_API IDataManagerSvc : extend_interfaces<IInterface> {
*/
virtual StatusCode traverseSubTree( DataObject* pObject, IDataStoreAgent* pAgent ) = 0;
/** Analyse by traversing all data objects below the sub tree
identified by the object. The object itself is removed as well.
@param pObject [IN] Pointer to object
@param f [IN] Callable which will be called on each data object
it should have the signature bool(IRegistry*,int level)
@return Status code indicating success or failure
*/
template <typename F, typename = std::enable_if_t<!std::is_convertible<F, IDataStoreAgent*>::value>>
StatusCode traverseSubTree( DataObject* pObject, F&& f )
{
auto agent = makeDataStoreAgent( std::forward<F>( f ) );
return traverseSubTree( pObject, &agent );
}
/** Analyse by traversing all data objects in the data store.
@return Status code indicating success or failure
*/
virtual StatusCode traverseTree( IDataStoreAgent* pAgent ) = 0;
/** Analyse by traversing all data objects in the data store.
@param f [IN] callable which will be called for each data object
it should have the signature bool(IRegistry*,int level)
@return Status code indicating success or failure
*/
template <typename F, typename = std::enable_if_t<!std::is_convertible<F, IDataStoreAgent*>::value>>
StatusCode traverseTree( F&& f )
{
auto agent = makeDataStoreAgent( std::forward<F>( f ) );
return traverseTree( &agent );
}
/** Initialize data store for new event by giving new event path.
Implicitly this clears the entire data store.
@param root_name [IN] String containing root path name
......
......@@ -3,6 +3,7 @@
// Framework include files
#include "GaudiKernel/Kernel.h"
#include "GaudiKernel/invoke.h"
// Forward declarations:
class IRegistry;
......@@ -24,4 +25,28 @@ public:
*/
virtual bool analyse( IRegistry* pObject, int level ) = 0;
};
namespace details
{
template <typename F>
class GenericDataStoreAgent final : public IDataStoreAgent
{
F f;
public:
template <typename G>
GenericDataStoreAgent( G&& g ) : f{std::forward<G>( g )}
{
}
bool analyse( IRegistry* pObj, int level ) override { return Gaudi::invoke( f, pObj, level ); }
};
}
template <typename F>
::details::GenericDataStoreAgent<F> makeDataStoreAgent( F&& f )
{
return {std::forward<F>( f )};
}
#endif // GAUDIKERNEL_IDATASTOREAGENT_H
......@@ -3,7 +3,6 @@
// From Gaudi
#include "GaudiKernel/IAlgManager.h"
#include "GaudiKernel/IDataManagerSvc.h"
#include "GaudiKernel/IDataStoreAgent.h"
// local
#include "RecordOutputStream.h"
#include "ReplayOutputStream.h"
......@@ -116,20 +115,6 @@ StatusCode ReplayOutputStream::start()
return i_outStreamTransition<Gaudi::StateMachine::START>();
}
namespace
{
/// Helper class to collect the names of the subnodes of an entry in the
/// Transient Event Store.
struct OutputStreamsCollector : public IDataStoreAgent {
std::list<std::string> names;
bool analyse( IRegistry* pRegistry, int lvl ) override
{
if ( lvl > 0 ) names.push_back( pRegistry->name() );
return true;
}
};
}
// ============================================================================
// Main execution
// ============================================================================
......@@ -137,10 +122,24 @@ StatusCode ReplayOutputStream::execute()
{
if ( msgLevel( MSG::DEBUG ) ) debug() << "==> Execute" << endmsg;
OutputStreamsCollector collector;
m_evtMgr->traverseSubTree( RecordOutputStream::locationRoot(), &collector );
std::for_each( collector.names.begin(), collector.names.end(), OutStreamTrigger( this ) );
std::vector<std::string> names;
m_evtMgr->traverseSubTree( RecordOutputStream::locationRoot(), [&names]( IRegistry* pReg, int lvl ) {
if ( lvl > 0 ) names.push_back( pReg->name() );
return true;
} );
std::for_each( names.begin(), names.end(), [this]( const std::string& name ) {
SmartIF<IAlgorithm>& alg = this->m_outputStreams[name];
if ( alg ) {
if ( !alg->isExecuted() ) {
alg->sysExecute( Gaudi::Hive::currentContext() );
} else {
this->warning() << name << " already executed for the current event" << endmsg;
}
} else {
this->warning() << "invalid OuputStream " << name << endmsg;
}
} );
return StatusCode::SUCCESS;
}
......
......@@ -46,29 +46,6 @@ private:
ReplayOutputStream* m_ptr;
};
/// Helper class to call the required OutputStream.
class OutStreamTrigger
{
public:
OutStreamTrigger( ReplayOutputStream* ptr ) : m_ptr( ptr ) {}
inline void operator()( const std::string& name ) const
{
SmartIF<IAlgorithm>& alg = m_ptr->m_outputStreams[name];
if ( alg ) {
if ( !alg->isExecuted() ) {
alg->sysExecute( Gaudi::Hive::currentContext() );
} else {
m_ptr->warning() << name << " already executed for the current event" << endmsg;
}
} else {
m_ptr->warning() << "invalid OuputStream " << name << endmsg;
}
}
private:
ReplayOutputStream* m_ptr;
};
/// Helper function to call the transition on the contained OutputStreams.
/// Returns StatusCode::FAILURE if any of the OutputStreams returned a failure.
template <Gaudi::StateMachine::Transition TR>
......
......@@ -30,13 +30,14 @@
#include "GaudiKernel/IDataSelector.h"
#include "GaudiKernel/IProperty.h"
#include "GaudiKernel/IRegistry.h"
#include "GaudiKernel/ISvcLocator.h"
#include "GaudiKernel/ConversionSvc.h"
#include "GaudiKernel/DataSelectionAgent.h"
#include "GaudiKernel/NTupleImplementation.h"
#include "GaudiKernel/Property.h"
#include "GaudiKernel/Selector.h"
#include "GaudiKernel/reverse.h"
#include "NTupleSvc.h"
......@@ -47,18 +48,6 @@ DECLARE_COMPONENT( NTupleSvc )
// Selector factory instantiation
DECLARE_NAMESPACE_OBJECT_FACTORY( NTuple, Selector )
#include <sstream>
namespace
{
inline std::string toStr( long id )