From b62edc7c9143c1b29dcc62c1b61d896ae5ce6f64 Mon Sep 17 00:00:00 2001 From: Tomasz Bold <tomasz.bold@gmail.com> Date: Fri, 20 Jul 2018 17:41:55 +0200 Subject: [PATCH] Implementation of BS streaming --- .../TrigOutputHandling/CMakeLists.txt | 3 +- .../src/HLTResultCreatorByteStream.cxx | 129 +++++++++++++++--- .../src/HLTResultCreatorByteStream.h | 55 ++++++-- .../components/TrigOutputHandling_entries.cxx | 4 +- .../TrigUpgradeTest/share/egamma.withViews.py | 9 +- 5 files changed, 163 insertions(+), 37 deletions(-) diff --git a/Trigger/TrigSteer/TrigOutputHandling/CMakeLists.txt b/Trigger/TrigSteer/TrigOutputHandling/CMakeLists.txt index e24b615d2c5..881c48f6664 100644 --- a/Trigger/TrigSteer/TrigOutputHandling/CMakeLists.txt +++ b/Trigger/TrigSteer/TrigOutputHandling/CMakeLists.txt @@ -18,6 +18,7 @@ atlas_depends_on_subdirs( PUBLIC Event/xAOD/xAODTrigEgamma Event/xAOD/xAODTrigger Event/xAOD/xAODTracking + Trigger/TrigDataAccess/TrigSerializeResult ) @@ -26,7 +27,7 @@ atlas_add_library( TrigOutputHandlingLib src/*.cxx PUBLIC_HEADERS TrigOutputHandling LINK_LIBRARIES GaudiKernel AthViews AthenaBaseComps TrigSteeringEvent - xAODTrigCalo xAODTrigEgamma xAODTrigger xAODTracking ) + xAODTrigCalo xAODTrigEgamma xAODTrigger xAODTracking TrigSerializeResult ) atlas_add_component( TrigOutputHandling src/components/*.cxx diff --git a/Trigger/TrigSteer/TrigOutputHandling/src/HLTResultCreatorByteStream.cxx b/Trigger/TrigSteer/TrigOutputHandling/src/HLTResultCreatorByteStream.cxx index 3c9d7880087..3a0991b66ce 100644 --- a/Trigger/TrigSteer/TrigOutputHandling/src/HLTResultCreatorByteStream.cxx +++ b/Trigger/TrigSteer/TrigOutputHandling/src/HLTResultCreatorByteStream.cxx @@ -1,44 +1,139 @@ /* Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration */ -// OutputHandling includes -#include "HLTByteStreamCreator.h" -// STL includes -// FrameWork includes -#include "GaudiKernel/IToolSvc.h" +#include <cstring> +#include "GaudiKernel/IToolSvc.h" +#include "AthenaKernel/StorableConversions.h" +#include "TrigSerializeResult/StringSerializer.h" +#include "HLTResultCreatorByteStream.h" -HLTByteStreamCreator::HLTByteStreamCreator( const std::string& type, +HLTResultCreatorByteStream::HLTResultCreatorByteStream( const std::string& type, const std::string& name, const IInterface* parent ) : - ::AthAlgTool ( type, name, parent ) -{ - //declareProperty( "Property", m_nProperty ); + base_class( type, name, parent ) { } -HLTByteStreamCreator::~HLTByteStreamCreator() -{} +HLTResultCreatorByteStream::~HLTResultCreatorByteStream() {} -StatusCode HLTByteStreamCreator::initialize() -{ +StatusCode HLTResultCreatorByteStream::initialize() { ATH_MSG_INFO ("Initializing " << name() << "..."); + + ATH_CHECK( m_hltResultKey.initialize() ); + ATH_CHECK( m_serializerSvc.retrieve() ); + ATH_CHECK( m_dictLoaderSvc.retrieve() ); + + for ( std::string typeAndKey: m_collectionsToSerialize ) { + const std::string type = typeAndKey.substr( 0, typeAndKey.find('#') ); + const std::string key = typeAndKey.substr( typeAndKey.find('#')+1 ); + CLID clid; + if ( m_clidSvc->getIDOfTypeName(type, clid).isFailure() ) { + ATH_MSG_ERROR( "Can not find CLID for " << type << " that is needed to stream " << key ); + return StatusCode::FAILURE; + } else { + ATH_MSG_DEBUG( "Type " << type << " known to ClassID srrvice, CLID: " << clid ); + m_toSerialize.push_back( Address{ type, clid, key } ); + } + if ( not m_dictLoaderSvc->load_type( type ) ) { + ATH_MSG_WARNING("Can not load dictionary for: " << type ); + } + + if ( not m_dictLoaderSvc->has_type( type ) ) { + ATH_MSG_ERROR("No type dictionary for: " << type ); + return StatusCode::FAILURE; + } + } + return StatusCode::SUCCESS; } -StatusCode HLTByteStreamCreator::createOutput(const EventContext& ) const { +void HLTResultCreatorByteStream::makeHeader(const Address& address, std::vector<uint32_t>& buffer ) const { + buffer.push_back(0); // fragment size placeholder + buffer.push_back( address.clid ); // type info via CLID + + std::vector<uint32_t> serializedLabel; + StringSerializer ss; + ss.serialize( address.key, serializedLabel ); + buffer.push_back( serializedLabel.size() ); + buffer.insert( buffer.end(), serializedLabel.begin(), serializedLabel.end() ); // plain SG key +} + + + +void HLTResultCreatorByteStream::fillPayload( void* data, size_t sz, std::vector<uint32_t>& buffer ) const { + buffer.push_back( sz ); // size in bytes + const size_t neededSize = sz/sizeof(uint32_t) + (sz%sizeof(uint32_t) ? 1 : 0); + // ideally we could use the vector<uint32_t> right away + auto intTempBuffer = std::make_unique<uint32_t[]>( neededSize ); + intTempBuffer[ neededSize-1 ] = 0; // empty last bytes + std::memcpy(data, intTempBuffer.get(), sz); + + // copy to buffer + buffer.insert( buffer.end(), intTempBuffer.get(), intTempBuffer.get()+neededSize ); +} + + + + +StatusCode HLTResultCreatorByteStream::createOutput(const EventContext& context ) const { + + auto result = std::make_unique<HLT::HLTResult>(); + + for ( const Address& address: m_toSerialize ) { + ATH_MSG_DEBUG( "Streaming " << address.type << "#" << address.key ); + // obtain object + DataObject* dObj = evtStore()->accessData( address.clid, address.key ); + if ( dObj == 0 ) { + ATH_MSG_DEBUG("Data Object with key " << address.key << " is missing"); + continue; + } + + const void* rawptr = SG::fromStorable( dObj, address.clid, nullptr, msgLvl(MSG::DEBUG) ); + if ( rawptr == nullptr ) { + ATH_MSG_DEBUG( "Data Object with key " << address.key << + " can not be converted to void* for streaming" ); + continue; + } + ATH_MSG_DEBUG("Obtained raw pointer " << rawptr ); + + const RootType rt = m_dictLoaderSvc->load_type(address.type); + + size_t sz=0; + void* mem = m_serializerSvc->serialize( rawptr, rt, sz ); + ATH_MSG_DEBUG( "Streamed to buffer at address " << mem << " of " << sz << " bytes" ); + + if ( mem == nullptr and sz == 0 ) { + ATH_MSG_WARNING( "Serialisation of " << address.type <<"#" << address.key << " unsuccessful" ); + continue; + } + + // prepare fragment + std::vector<uint32_t> fragment; + makeHeader( address, fragment ); + fillPayload( mem, sz, fragment ); + fragment[0] = fragment.size(); + + std::vector<uint32_t>& place = result->getNavigationResult(); + place.insert( place.end(), fragment.begin(), fragment.end() ); + + if ( mem ) delete [] static_cast<const char*>( mem ); + ATH_MSG_DEBUG( "Navigation size after inserting " << address.type << "#" << address.key << " " << place.size()*sizeof(uint32_t) << " bytes" ); + } + + auto resultHandle = SG::makeHandle( m_hltResultKey, context ); + ATH_CHECK( resultHandle.record( std::move(result) ) ); + return StatusCode::SUCCESS; } -StatusCode HLTByteStreamCreator::finalize() -{ +StatusCode HLTResultCreatorByteStream::finalize() { ATH_MSG_INFO ("Finalizing " << name() << "..."); - return StatusCode::SUCCESS; } diff --git a/Trigger/TrigSteer/TrigOutputHandling/src/HLTResultCreatorByteStream.h b/Trigger/TrigSteer/TrigOutputHandling/src/HLTResultCreatorByteStream.h index 6608ddf379d..04abbecaa9e 100644 --- a/Trigger/TrigSteer/TrigOutputHandling/src/HLTResultCreatorByteStream.h +++ b/Trigger/TrigSteer/TrigOutputHandling/src/HLTResultCreatorByteStream.h @@ -1,8 +1,8 @@ /* Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration */ -#ifndef TRIGOUTPUTHANDLING_HLTBYTESTREAMCREATOR_H -#define TRIGOUTPUTHANDLING_HLTBYTESTREAMCREATOR_H 1 +#ifndef TRIGOUTPUTHANDLING_HLTResultCreatorCreatorByteStream_H +#define TRIGOUTPUTHANDLING_HLTResultCreatorCreatorByteStream_H // STL includes #include <string> @@ -13,36 +13,63 @@ // OutputHandling includes #include "TrigOutputHandling/IHLTOutputTool.h" - - +#include "TrigSteeringEvent/HLTResult.h" +#include "AthenaKernel/IClassIDSvc.h" +#include "AthenaKernel/IAthenaSerializeSvc.h" +#include "AthenaKernel/IDictLoaderSvc.h" /** * @class $(klass)s * @brief **/ -class HLTByteStreamCreator - : virtual public ::IHLTOutputTool, - public ::AthAlgTool +class HLTResultCreatorByteStream: public extends<AthAlgTool, IHLTOutputTool> { - + public: - HLTByteStreamCreator( const std::string& type, + HLTResultCreatorByteStream( const std::string& type, const std::string& name, const IInterface* parent ); - virtual ~HLTByteStreamCreator(); + virtual ~HLTResultCreatorByteStream(); virtual StatusCode createOutput(const EventContext& context) const override; virtual StatusCode initialize() override; virtual StatusCode finalize() override; private: - - HLTByteStreamCreator(); - + Gaudi::Property<std::vector<std::string>> m_collectionsToSerialize{this, "CollectionsToSerialize", {}, "TYPE#SG key of collections to be streamed (like in StreamAOD)" }; + + SG::WriteHandleKey<HLT::HLTResult> m_hltResultKey{this, "HLTResult", "HLTResult", "Name of the HLTResult"}; + + // internal structure to keep configuration organised conveniently + struct Address { + std::string type; + CLID clid; + std::string key; + }; + + std::vector< Address > m_toSerialize; // postprocessed configuration info + + ServiceHandle<IClassIDSvc> m_clidSvc{ this, "ClassIDSvc", "ClassIDSvc", "Service to translate class name to CLID" }; + ServiceHandle<IAthenaSerializeSvc> m_serializerSvc{ this, "Serializer", "AthenaRootSerializeSvc", "Service that translates transient to persistent respresenation" }; + ServiceHandle<IDictLoaderSvc> m_dictLoaderSvc{ this, "DictLoaderSvc", "AthDictLoaderSvc", "Service managing ROOT dictionaries for EDM classes" }; + + /** + * Given the ID if the collection (in address arg) insert basic streaming info into the buffer. + */ + void makeHeader( const HLTResultCreatorByteStream::Address& address, std::vector<uint32_t>& buffer ) const; + + /** + * For copy bytest from the memory into the buffer converting from char[] to uint32_t[] + * This function is candidate to be made global function at some point + * and we will need also readPayload function + */ + void fillPayload( void* data, size_t sz, std::vector<uint32_t>& buffer ) const ; + + }; -#endif //> !TRIGOUTPUTHANDLING_HLTBYTESTREAMCREATOR_H +#endif //> !TRIGOUTPUTHANDLING_HLTResultCreatorCreatorByteStream_H diff --git a/Trigger/TrigSteer/TrigOutputHandling/src/components/TrigOutputHandling_entries.cxx b/Trigger/TrigSteer/TrigOutputHandling/src/components/TrigOutputHandling_entries.cxx index 47009e8495d..c96f7a78eea 100644 --- a/Trigger/TrigSteer/TrigOutputHandling/src/components/TrigOutputHandling_entries.cxx +++ b/Trigger/TrigSteer/TrigOutputHandling/src/components/TrigOutputHandling_entries.cxx @@ -1,6 +1,6 @@ -#include "../HLTByteStreamCreator.h" +#include "../HLTResultCreatorByteStream.h" #include "../HLTEDMCreator.h" -DECLARE_COMPONENT( HLTByteStreamCreator ) +DECLARE_COMPONENT( HLTResultCreatorByteStream ) DECLARE_COMPONENT( HLTEDMCreator ) diff --git a/Trigger/TrigValidation/TrigUpgradeTest/share/egamma.withViews.py b/Trigger/TrigValidation/TrigUpgradeTest/share/egamma.withViews.py index f5627867c56..3d3afa8cafe 100644 --- a/Trigger/TrigValidation/TrigUpgradeTest/share/egamma.withViews.py +++ b/Trigger/TrigValidation/TrigUpgradeTest/share/egamma.withViews.py @@ -213,7 +213,7 @@ summary = TriggerSummaryAlg( "TriggerSummaryAlg" ) summary.InputDecision = "HLTChains" summary.FinalDecisions = [ "ElectronL2Decisions", "MuonL2Decisions" ] -from TrigOutputHandling.TrigOutputHandlingConf import HLTEDMCreator +from TrigOutputHandling.TrigOutputHandlingConf import HLTEDMCreator, HLTResultCreatorByteStream egammaViewsMerger = HLTEDMCreator("egammaViewsMerger") egammaViewsMerger.TrigCompositeContainer = [ "L2ElectronLinks", "filterCaloRoIsAlg", "EgammaCaloDecisions","ElectronL2Decisions", "MuonL2Decisions", "EMRoIDecisions", "METRoIDecisions", "MURoIDecisions", "HLTChainsResult", "JRoIDecisions", "MonitoringSummaryStep1", "RerunEMRoIDecisions", "RerunMURoIDecisions", "TAURoIDecisions", "L2CaloLinks", "FilteredEMRoIDecisions", "FilteredEgammaCaloDecisions" ] @@ -232,9 +232,12 @@ egammaViewsMerger.TrigEMClusterContainer = [ clustersKey ] egammaViewsMerger.OutputLevel = VERBOSE -svcMgr.StoreGateSvc.OutputLevel = VERBOSE +svcMgr.StoreGateSvc.OutputLevel = INFO -summary.OutputTools = [ egammaViewsMerger ] +streamnigTool = HLTResultCreatorByteStream(OutputLevel=VERBOSE) + +streamnigTool.CollectionsToSerialize = [ "xAOD::TrigCompositeContainer#EgammaCaloDecisions", "xAOD::TrigElectronContainer#HLT_xAOD__TrigElectronContainer_L2ElectronFex", "xAOD::TrigElectronAuxContainer#HLT_xAOD__TrigElectronContainer_L2ElectronFexAux." ] +summary.OutputTools = [ egammaViewsMerger, streamnigTool ] summary.OutputLevel = DEBUG -- GitLab