Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • rrabadan/LHCb
  • talin/LHCb
  • imjelde/LHCb
  • mstahl/LHCb
  • padeken/LHCb
  • mimazure/LHCb
  • roiser/LHCb
  • conrad/LHCb
  • kklimasz/LHCb
  • rcurrie/LHCb
  • wkrzemie/LHCb
  • fkeizer/LHCb
  • valassi/LHCb
  • hschrein/LHCb
  • anstahll/LHCb
  • jonrob/LHCb
  • graven/LHCb
  • clemenci/LHCb
  • chaen/LHCb
  • sstahl/LHCb
  • lhcb/LHCb
21 results
Show changes
Commits on Source (60)
Showing
with 760 additions and 639 deletions
......@@ -191,6 +191,7 @@ gaudi_install(CMAKE
cmake/FindVDT.cmake
# helpers to use in LHCb projects
cmake/FileContentMetadataRepository.cmake
cmake/FindDataPackage.cmake
cmake/LHCbConfigUtils.cmake
cmake/ProjectConfig.cmake.in
......
......@@ -124,28 +124,15 @@ for report in ["Dec", "Sel"]:
"OutputHlt" + report + "ReportsLocation":
hltname + "/" + report + "Reports"
},
properties={"SourceID": hltname},
properties={
"SourceID": hltname,
"DecoderMapping": "TCKANNSvc"
},
conf=DecoderDB)
if report == "Sel":
dec.Outputs[
"OutputHltObjectSummariesLocation"] = hltname + "/SelReports/Candidates"
#Vertex Decoder
for hltname in ["Hlt1", "Hlt2"]:
Decoder(
"HltVertexReportsDecoder/" + hltname + "VertexReportsDecoder",
active=True,
banks=["HltVertexReports"],
inputs={"RawEventLocations": None},
outputs={
"OutputHltVertexReportsLocations": [
hltname + "/VertexReports/PV3D",
hltname + "/VertexReports/ProtoPV3D"
]
},
properties={"SourceID": hltname},
conf=DecoderDB)
#is a Routing bits filter really a decoder? it doesn't create output...
Decoder(
"HltRoutingBitsFilter",
......
/***** User ApplicationOptions/ApplicationOptions **************************************************
|-auditors = [] (default: [])
|-buffer_events = 20000 (default: 20000)
|-callgrind_profile = False (default: False)
|-conddb_tag = 'upgrade/sim-20220705-vc-mu100' (default: '')
|-control_flow_file = '' (default: '')
|-data_flow_file = '' (default: '')
|-data_type = 'Upgrade' (default: 'Upgrade')
|-dddb_tag = 'upgrade/dddb-20220705' (default: '')
|-event_store = 'HiveWhiteBoard' (default: 'HiveWhiteBoard')
|-evt_max = 8 (default: -1)
|-first_evt = 0 (default: 0)
|-histo_file = '' (default: '')
|-input_files = ['root://eoslhcb.cern.ch//eos/lhcb/wg/rta/WP6/Allen/digi_input/upgrade-magup-sim10aU1-minbias/Digi-00151659_00000001_dddb-20220705_sim-20220705-vc-mu100.digi']
| (default: [])
|-input_raw_format = 0.5 (default: 0.5)
|-input_type = 'ROOT' (default: '')
|-lines_maker = None
|-memory_pool_size = 10485760 (default: 10485760)
|-monitoring_file = '' (default: '')
|-msg_svc_format = '% F%35W%S %7W%R%T %0W%M' (default: '% F%35W%S %7W%R%T %0W%M')
|-msg_svc_time_format = '%Y-%m-%d %H:%M:%S UTC' (default: '%Y-%m-%d %H:%M:%S UTC')
|-n_event_slots = 1 (default: -1)
|-n_threads = 1 (default: 1)
|-ntuple_file = '' (default: '')
|-output_file = '' (default: '')
|-output_level = 3 (default: 3)
|-output_type = '' (default: '')
|-phoenix_filename = '' (default: '')
|-print_freq = 10000 (default: 10000)
|-python_logging_level = 20 (default: 20)
|-scheduler_legacy_mode = True (default: True)
|-simulation = True (default: True)
|-tck = 0 (default: 0)
|-use_iosvc = False (default: False)
| (default: '')
|-append_decoding_keys_to_output_manifest = True (default: True)
|-auditors = [] (default: [])
|-buffer_events = 20000 (default: 20000)
|-callgrind_profile = False (default: False)
|-conddb_tag = 'upgrade/sim-20220705-vc-mu100' (default: '')
|-control_flow_file = '' (default: '')
|-data_flow_file = '' (default: '')
|-data_type = 'Upgrade' (default: 'Upgrade')
|-dddb_tag = 'upgrade/dddb-20220705' (default: '')
|-event_store = 'HiveWhiteBoard' (default: 'HiveWhiteBoard')
|-evt_max = 8 (default: -1)
|-first_evt = 0 (default: 0)
|-histo_file = '' (default: '')
|-input_files = ['root://eoslhcb.cern.ch//eos/lhcb/wg/rta/WP6/Allen/digi_input/upgrade-magup-sim10aU1-minbias/Digi-00151659_00000001_dddb-20220705_sim-20220705-vc-mu100.digi']
| (default: [])
|-input_manifest_file = '' (default: '')
|-input_raw_format = 0.5 (default: 0.5)
|-input_type = 'ROOT' (default: '')
|-lines_maker = None
|-memory_pool_size = 10485760 (default: 10485760)
|-monitoring_file = '' (default: '')
|-msg_svc_format = '% F%35W%S %7W%R%T %0W%M' (default: '% F%35W%S %7W%R%T %0W%M')
|-msg_svc_time_format = '%Y-%m-%d %H:%M:%S UTC' (default: '%Y-%m-%d %H:%M:%S UTC')
|-n_event_slots = 1 (default: -1)
|-n_threads = 1 (default: 1)
|-ntuple_file = '' (default: '')
|-output_file = '' (default: '')
|-output_level = 3 (default: 3)
|-output_manifest_file = '' (default: '')
|-output_type = '' (default: '')
|-phoenix_filename = '' (default: '')
|-print_freq = 10000 (default: 10000)
|-python_logging_level = 20 (default: 20)
|-scheduler_legacy_mode = True (default: True)
|-simulation = True (default: True)
|-use_iosvc = False (default: False)
| (default: '')
|-write_decoding_keys_to_git = True (default: True)
\----- (End of User ApplicationOptions/ApplicationOptions) -----------------------------------------
ApplicationMgr SUCCESS
====================================================================================================================================
......
......@@ -18,7 +18,6 @@
#include "Event/PackedCaloDigit.h"
#include "Event/PackedCaloHypo.h"
#include "Event/PackedCluster.h"
#include "Event/PackedData.h"
#include "Event/PackedDataBuffer.h"
#include "Event/PackedDecReport.h"
#include "Event/PackedFlavourTag.h"
......
......@@ -9,32 +9,12 @@
* or submit itself to any jurisdiction. *
\*****************************************************************************/
#pragma once
#include "Event/RawBank.h"
#include "GaudiKernel/StatusCode.h"
#include <iomanip>
#include <string>
namespace LHCb::Hlt::PackedData {
enum HeaderIDs { kVersionNumber = 2 };
/// Bit masks in the SourceID word
enum struct SourceIDMasks : uint16_t { PartID = 0x00FF, Reserved = 0x1F00, Compression = 0xE000 };
template <SourceIDMasks m>
uint16_t extract( uint16_t i ) {
auto s = __builtin_ctz( static_cast<unsigned>( m ) );
return ( i & static_cast<unsigned>( m ) ) >> s;
}
template <SourceIDMasks m>
uint16_t shift( uint16_t i ) {
auto s = __builtin_ctz( static_cast<unsigned>( m ) );
uint16_t v = ( i << s ) & static_cast<unsigned>( m );
assert( extract<m>( v ) == i );
return v;
}
/// Compression algorithms -- note: these values are written to the RawBank, so have to remain stable
/// hence we do _not_ use native ROOT values, which could change between ROOT versions...
enum struct Compression { NoCompression = 0, ZLIB = 1, LZMA = 2, LZ4 = 3, ZSTD = 4 };
......@@ -60,7 +40,7 @@ namespace LHCb::Hlt::PackedData {
inline StatusCode parse( Compression& c, std::string in ) {
auto sv = std::string_view( in );
if ( !sv.empty() && ( sv.front() == '\'' || sv.front() == '\"' ) && sv.front() == sv.back() ) {
if ( sv.size() > 1 && ( sv.front() == '\'' || sv.front() == '\"' ) && sv.front() == sv.back() ) {
sv.remove_prefix( 1 );
sv.remove_suffix( 1 );
};
......@@ -72,11 +52,4 @@ namespace LHCb::Hlt::PackedData {
return StatusCode::SUCCESS;
}
/// Return the part ID from the SourceID word
inline std::uint32_t partID( RawBank const& b ) { return extract<SourceIDMasks::PartID>( b.sourceID() ); }
/// Return the compression from the SourceID word
inline Compression compression( RawBank const& b ) {
return static_cast<Compression>( extract<SourceIDMasks::Compression>( b.sourceID() ) );
}
} // namespace LHCb::Hlt::PackedData
......@@ -11,6 +11,7 @@
#pragma once
#include "Compression.h"
#include "Event/PackedData.h"
#include "Kernel/STLExtensions.h"
#include "RVersion.h"
#include <algorithm>
#include <cstdint>
......@@ -68,6 +69,7 @@ namespace LHCb::Hlt::PackedData {
class ByteBuffer {
public:
using buffer_type = std::vector<std::byte>;
using buffer_view = span<std::byte const>;
/// Return the current position in the buffer.
std::size_t pos() const { return m_pos; }
......@@ -81,7 +83,7 @@ namespace LHCb::Hlt::PackedData {
/// Reset the position to zero without clearing the buffer
void reset() { m_pos = 0; }
/// Initialize from an existing buffer and reset position to zero.
bool init( const buffer_type& data, bool compressed = false );
bool init( buffer_view data, bool compressed = false );
/// Return the internal buffer.
const buffer_type& buffer() const { return m_buffer; }
/// Return the size of the internal buffer.
......@@ -143,11 +145,11 @@ namespace LHCb::Hlt::PackedData {
m_pos += x.size();
}
/// Take requested part of the buffer
LHCb::span<std::byte const> subspan( size_t offset, size_t size = gsl::dynamic_extent ) const {
ByteBuffer::buffer_view subspan( size_t offset, size_t size = gsl::dynamic_extent ) const {
return LHCb::span{m_buffer}.subspan( offset, size );
}
/// Add sub span of a buffer to a new buffer
void assign( LHCb::span<std::byte const> data ) {
void assign( ByteBuffer::buffer_view data ) {
m_buffer.assign( data.begin(), data.end() );
m_pos = 0;
}
......@@ -262,15 +264,18 @@ namespace LHCb::Hlt::PackedData {
*/
class PackedDataOutBuffer {
public:
PackedDataOutBuffer( std::uint32_t key ) : m_key{key} {}
/// Clear the internal byte buffer.
void clear() { m_buffer.clear(); }
/// Return a reference to the internal buffer.
const std::vector<std::byte>& buffer() const { return m_buffer.buffer(); }
const ByteBuffer::buffer_type& buffer() const { return m_buffer.buffer(); }
/// Compress the buffer
bool compress( Compression compression, int level, ByteBuffer::buffer_type& output ) const {
return m_buffer.compress( compression, level, output );
}
std::uint32_t key() const { return m_key; }
/// Reserve size for the buffer
void reserve( std::size_t size ) { m_buffer.reserve( size ); }
/// Function called by serializable objects' save method.
......@@ -301,9 +306,16 @@ namespace LHCb::Hlt::PackedData {
m_buffer.write( x, pos );
}
/// Add byte buffer of a packed data buffer to another one
void addBuffer( PackedDataOutBuffer const& x ) { m_buffer.writeBuffer( x.m_buffer ); }
/// Add a byte buffer to a packed data buffer
void addBuffer( ByteBuffer const& x ) { m_buffer.writeBuffer( x ); }
void addBuffer( PackedDataOutBuffer const& x ) {
// if empty, inherit value from x. If not empty, require to be the same
if ( x.m_key != m_key ) {
if ( m_key != 0 || m_buffer.size() != 0 || m_buffer.pos() != 0 ) {
throw std::runtime_error( "PackedDataInBuffer: merging buffers with distinct keys" );
}
m_key = x.m_key;
}
m_buffer.writeBuffer( x.m_buffer );
}
/// Save a size integer.
std::pair<std::size_t, std::size_t> saveSize( std::size_t x ) { return save<uint32_t>( x ); }
/// Save a vector of scalars.
......@@ -338,7 +350,8 @@ namespace LHCb::Hlt::PackedData {
}
private:
ByteBuffer m_buffer; ///< Internal byte buffer
std::uint32_t m_key = 0u; ///< key used to encode, and thus required, to decode stored strings
ByteBuffer m_buffer; ///< Internal byte buffer
};
/** @class PackedDataInBuffer PackedDataBuffer.h
......@@ -349,19 +362,10 @@ namespace LHCb::Hlt::PackedData {
*/
class PackedDataInBuffer {
public:
/// Return whether the end of buffer was reached.
bool eof() const { return m_buffer.eof(); }
/// Skip a number of bytes from the buffer.
void skip( std::size_t n ) { m_buffer.readNull( n ); }
/// Initialize from an existing byte buffer.
bool init( const ByteBuffer::buffer_type& data, bool compressed = false ) {
return m_buffer.init( data, compressed );
}
bool init( const ByteBuffer& buffer, bool compressed = false ) {
return m_buffer.init( buffer.buffer(), compressed );
}
PackedDataInBuffer( std::uint32_t key = 0 ) : m_key{key} {}
/// Initialize from an existing header and byte buffer.
void init( const ObjectHeader header, const ByteBuffer::buffer_type& data ) {
PackedDataInBuffer( const ObjectHeader header, ByteBuffer::buffer_view data, std::uint32_t key ) {
m_key = key;
m_buffer.clear();
m_buffer.write( header.classID );
m_buffer.write( header.locationID );
......@@ -370,8 +374,25 @@ namespace LHCb::Hlt::PackedData {
for ( auto d : data ) m_buffer.write( d );
m_buffer.reset();
}
/// init a byte buffer from a subset of another packed data buffer
PackedDataInBuffer( PackedDataInBuffer const& x, std::size_t pos, std::size_t size = -1 ) {
m_key = x.m_key;
m_buffer.assign(
x.m_buffer.subspan( pos, size == static_cast<std::size_t>( -1 ) ? x.m_buffer.pos() - pos : size ) );
}
/// Return whether the end of buffer was reached.
bool eof() const { return m_buffer.eof(); }
/// Skip a number of bytes from the buffer.
void skip( std::size_t n ) { m_buffer.readNull( n ); }
/// Initialize from an existing byte buffer.
bool init( ByteBuffer::buffer_view data, bool compressed = false ) { return m_buffer.init( data, compressed ); }
bool init( const ByteBuffer& buffer, bool compressed = false ) {
return m_buffer.init( buffer.buffer(), compressed );
}
std::uint32_t key() const { return m_key; }
/// Return a reference to the internal buffer.
const ByteBuffer& buffer() const { return m_buffer; }
const ByteBuffer& buffer() const& { return m_buffer; }
ByteBuffer buffer() && { return std::move( m_buffer ); }
/// Clear the internal byte buffer.
void clear() { m_buffer.clear(); }
/// Function called by serializable objects' load method.
......@@ -418,12 +439,6 @@ namespace LHCb::Hlt::PackedData {
return x;
}
/// Add a byte buffer of a packed data buffer to another one at a given position
void addBuffer( PackedDataInBuffer const& x, std::uint32_t p ) {
return m_buffer.assign( x.m_buffer.subspan( p, x.m_buffer.pos() - p ) );
}
/// Add a byte buffer of a packed data buffer to another one at a given position
void addBuffer( ByteBuffer const& x, std::uint32_t p ) { return m_buffer.assign( x.subspan( p, x.pos() - p ) ); }
/// Load a scalar from a given position and return it.
template <typename T>
T loadAt( std::size_t i ) const {
......@@ -468,7 +483,50 @@ namespace LHCb::Hlt::PackedData {
}
private:
ByteBuffer m_buffer; ///< Internal byte buffer
std::uint32_t m_key = 0; ///< key used to encode, and thus required to decode stored strings
ByteBuffer m_buffer; ///< Internal byte buffer
};
class MappedInBuffers {
std::unordered_map<std::int32_t, PackedDataInBuffer> m_map; // map of locationID -> buffer:
std::uint32_t m_key = 0;
std::uint32_t m_bankversion = -1;
public:
MappedInBuffers( std::uint32_t key = 0, std::uint32_t bankversion = -1 ) : m_key{key}, m_bankversion{bankversion} {}
std::uint32_t key() const { return m_key; }
std::uint32_t bankVersion() const { return m_bankversion; }
PackedDataInBuffer const* find( std::int32_t locationID ) const {
auto i = m_map.find( locationID );
return i == m_map.end() ? nullptr : &i->second;
}
bool empty() const { return m_map.empty(); }
auto size() const { return m_map.size(); }
auto begin() const { return m_map.begin(); }
auto end() const { return m_map.end(); }
template <typename... Args>
auto try_emplace( Args&&... args ) {
auto i = m_map.try_emplace( std::forward<Args>( args )... );
if ( i.second ) {
if ( !m_key ) {
m_key = i.first->second.key();
} else if ( i.first->second.key() != m_key ) {
throw std::runtime_error( "MappedInBuffers: tried to add buffer with inconsistent key" );
}
}
return i;
}
MappedInBuffers& setBankVersion( std::uint32_t version ) {
if ( version != m_bankversion && m_bankversion != (std::uint32_t)-1 ) {
throw std::runtime_error( "MappedInBuffers: tried to change bankversion which was already set" );
}
m_bankversion = version;
return *this;
}
};
} // namespace LHCb::Hlt::PackedData
/*****************************************************************************\
* (c) Copyright 2022 CERN for the benefit of the LHCb Collaboration *
* *
* This software is distributed under the terms of the GNU General Public *
* Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". *
* *
* In applying this licence, CERN does not waive the privileges and immunities *
* granted to it by virtue of its status as an Intergovernmental Organization *
* or submit itself to any jurisdiction. *
\*****************************************************************************/
#pragma once
#include "Kernel/IANNSvc.h"
#include "LHCbAlgs/Consumer.h"
#include "Event/Particle.h"
#include "Event/PackedPartToRelatedInfoRelation.h"
#include "Event/PackedRelations.h"
#include "Event/RelatedInfoMap.h"
#include "Relations/Relation1D.h"
#include "Event/PackedDataBuffer.h"
#include "Event/StandardPacker.h"
#include "RegistryWrapper.h"
/**
* Templated base algorithm for
* Particle to Int and Particle to info relation unpacking algorithms
*
* Note that the inheritance from Consumer and the void input are misleading.
* The algorithm is reading from and writing to TES, just via Handles so that
* it can deal with non existant input and with output failures, which is not
* authorized in the functional world.
* FIXME this should not be necessary, it's mainly due to misconfigurations in
* the tests
* Additionally packing requires the data objects to be in TES before one can add links to it
* https://gitlab.cern.ch/lhcb/LHCb/-/issues/180
**/
namespace DataPacking::Buffer {
// Relation types
using Part2IntRelations = LHCb::Relation1D<LHCb::Particle, int>;
using Part2InfoRelations = LHCb::Relation1D<LHCb::Particle, LHCb::RelatedInfoMap>;
template <typename RELATION, typename PRELATION, typename FROM>
class Rel1Unpack : public LHCb::Algorithm::Consumer<void( const LHCb::Hlt::PackedData::PackedDataInBuffer& )> {
using Buffer = LHCb::Hlt::PackedData::PackedDataInBuffer;
public:
Rel1Unpack( const std::string& name, ISvcLocator* pSvcLocator )
: LHCb::Algorithm::Consumer<void( const Buffer& )>( name, pSvcLocator,
KeyValue{"InputName", "/Event/PackedRelations"} ) {}
void operator()( const Buffer& buffer ) const override {
auto* rels = m_rels.put( std::make_unique<RELATION>() );
if ( !buffer.buffer().size() ) return;
// Sadly the pack structure expects an object with a valid RegistryEntry . To be improved
auto prels = RegistryWrapper<PRELATION>( m_rels.fullKey().key() + "_Packed" );
Buffer readBuffer;
readBuffer.init( buffer.buffer(), false );
// Do the actual loading of the objects
LHCb::Hlt::PackedData::ObjectHeader header;
while ( !readBuffer.eof() ) {
Packer::io( readBuffer, header );
auto nBytesRead = readBuffer.load( *prels );
if ( nBytesRead != header.storedSize ) {
this->fatal() << "Loading of object (CLID=" << header.classID << " locationID=" << header.locationID << ") "
<< " consumed " << nBytesRead << " bytes, "
<< " but " << header.storedSize << " were stored!" << endmsg;
}
if ( this->msgLevel( MSG::DEBUG ) ) {
this->debug() << "Loading of object (CLID=" << header.classID << " locationID=" << header.locationID << ") "
<< header.storedSize << " were stored" << endmsg;
}
}
static const Gaudi::StringKey PackedObjectLocations{"PackedObjectLocations"};
for ( auto id : header.linkLocationIDs ) {
auto location = m_hltANNSvc->value( PackedObjectLocations, id );
if ( location ) {
prels->linkMgr()->addLink( location.value().first, nullptr );
} else {
m_missingLinks++;
}
}
if ( prels->data().size() ) {
rels->setVersion( prels->version() );
unpack( *rels, *prels );
}
// Count packed output
m_unpackedData += rels->relations().size();
}
void unpack( Part2IntRelations& rels, const LHCb::PackedRelations& prels ) const {
for ( const auto& prel : prels.data() ) {
for ( int kk = prel.start; prel.end > kk; ++kk ) {
int srcLink( 0 );
int srcKey( 0 );
int destLink( 0 );
int destKey( 0 );
StandardPacker::indexAndKey64( prels.sources()[kk], srcLink, srcKey );
StandardPacker::indexAndKey64( prels.dests()[kk], destLink, destKey );
DataObject* fp = nullptr;
this->evtSvc()->retrieveObject( prels.linkMgr()->link( srcLink )->path(), fp ).ignore();
auto* from = static_cast<FROM*>( fp );
if ( !from ) {
this->warning() << "Source location " << prels.linkMgr()->link( srcLink )->path()
<< " not persisted, skip the relation." << endmsg;
continue;
}
typename RELATION::From f = from->object( srcKey );
typename RELATION::To t = (int)prels.dests()[kk];
StatusCode sc = rels.relate( f, t );
if ( !sc )
this->warning() << "Something went wrong with relation unpacking "
<< "sourceKey " << srcKey << " sourceLink " << srcLink << "destKey " << destKey
<< " destLink " << destLink << endmsg;
}
}
}
void unpack( Part2InfoRelations& rels, const LHCb::PackedRelatedInfoRelations& prels ) const {
for ( const auto& prel : prels.containers() ) {
for ( unsigned int kk = prel.first; prel.last > kk; ++kk ) {
const auto& rel = prels.relations()[kk];
int srcLink( 0 );
int srcKey( 0 );
StandardPacker::indexAndKey64( rel.reference, srcLink, srcKey );
DataObject* fp = nullptr;
this->evtSvc()->retrieveObject( prels.linkMgr()->link( srcLink )->path(), fp ).ignore();
auto* from = static_cast<FROM*>( fp );
if ( !from ) {
this->warning() << "Source location " << prels.linkMgr()->link( srcLink )->path()
<< " not persisted, skip the relation." << endmsg;
continue;
}
typename RELATION::From f = from->object( srcKey );
LHCb::RelatedInfoMap t;
t.reserve( rel.last - rel.first );
for ( const auto& jj : Packer::subrange( prels.info(), rel.first, rel.last ) ) { t.insert( jj ); }
StatusCode sc = rels.relate( f, t );
if ( !sc )
this->warning() << "Something went wrong with relation unpacking "
<< "sourceKey " << srcKey << " sourceLink " << srcLink << endmsg;
}
}
}
private:
ServiceHandle<IANNSvc> m_hltANNSvc{this, "ANNSvc", "HltANNSvc", "Service to retrieve DecReport IDs"};
DataObjectWriteHandle<RELATION> m_rels{this, "OutputName", ""};
mutable Gaudi::Accumulators::StatCounter<> m_unpackedData{this, "# PackedData"};
mutable Gaudi::Accumulators::StatCounter<> m_missingLinks{this, "# Missing Link Locations"};
};
} // namespace DataPacking::Buffer
......@@ -11,7 +11,7 @@
#pragma once
#include "Event/PackedDataBuffer.h"
#include "Kernel/IANNSvc.h"
#include "Kernel/IIndexedANNSvc.h"
#include "LHCbAlgs/Producer.h"
#include "RegistryWrapper.h"
......@@ -32,6 +32,7 @@
namespace DataPacking::Buffer {
using Producer = LHCb::Algorithm::Producer<LHCb::Hlt::PackedData::PackedDataOutBuffer()>;
inline const Gaudi::StringKey PackedObjectLocations{"PackedObjectLocations"};
template <class PACKER>
class Pack final : public Producer {
......@@ -45,14 +46,19 @@ namespace DataPacking::Buffer {
Buffer operator()() const override {
if ( !m_data.exist() ) {
auto const* data = m_data.getIfExists();
m_nbInputFound += ( data != nullptr );
if ( !data ) {
if ( this->msgLevel( MSG::DEBUG ) ) this->debug() << "Input location " << m_data << " doesn't exist." << endmsg;
return {};
return {0};
}
auto const* data = m_data.get();
if ( data->size() == 0 ) {
if ( this->msgLevel( MSG::DEBUG ) ) this->debug() << "Input location " << m_data << " empty." << endmsg;
return {};
// if ( data->size() == 0 ) {
// if ( this->msgLevel( MSG::DEBUG ) ) this->debug() << "Input location " << m_data << " empty." << endmsg;
// return {0};
//}
if ( m_encodingKey == 0 ) {
throw GaudiException( "encoding key is zero", __PRETTY_FUNCTION__, StatusCode::FAILURE );
++m_zero_key;
}
// Sadly the pack structure expects an object with a valid registry entry. To be improved
......@@ -69,36 +75,41 @@ namespace DataPacking::Buffer {
m_nbPackedData += pdata->data().size();
// reserve some space for data, this should be tuned
Buffer buffer;
Buffer buffer( m_encodingKey );
buffer.reserve( pdata->data().size() );
static const Gaudi::StringKey PackedObjectLocations{"PackedObjectLocations"};
auto locationID = m_hltANNSvc->value( PackedObjectLocations, outputLocation() );
auto classID = pdata->clID();
const auto& map = m_annsvc->s2i( m_encodingKey, PackedObjectLocations );
auto iloc = map.find( m_data.fullKey().key() );
if ( iloc == map.end() ) {
error() << "could not locate packedobjectlocation " << m_data.fullKey().key() << " in table with key "
<< m_encodingKey.value() << endmsg;
throw GaudiException( "unknown location", __PRETTY_FUNCTION__, StatusCode::FAILURE );
}
auto locationID = iloc->second;
buffer.save<uint32_t>( classID );
buffer.save<int32_t>( locationID->second );
buffer.save<uint32_t>( pdata->clID() );
buffer.save<int32_t>( locationID );
auto* linkMgr = pdata->linkMgr();
unsigned int nlinks = linkMgr->size();
if ( this->msgLevel( MSG::DEBUG ) ) {
this->debug() << "Packed version " << (unsigned int)pdata->version() << endmsg;
this->debug() << "packed data type " << System::typeinfoName( typeid( *pdata ) ) << endmsg;
this->debug() << "data location " << locationID->first << endmsg;
this->debug() << "data location " << outputLocation() << endmsg;
this->debug() << "packed data size " << pdata->data().size() << endmsg;
this->debug() << "classID " << classID << " locationID " << locationID->second << " nlinks " << nlinks
<< endmsg;
this->debug() << "classID " << pdata->clID() << " locationID " << locationID << " nlinks " << nlinks << endmsg;
}
buffer.saveSize( nlinks );
for ( unsigned int id = 0; id < nlinks; ++id ) {
auto location = linkMgr->link( id )->path();
if ( location[0] != '/' ) { location = "/Event/" + location; }
auto packedLocation = m_containerMap.find( location );
if ( packedLocation != end( m_containerMap ) ) { location = packedLocation->second; }
auto linkID = m_hltANNSvc->value( PackedObjectLocations, location );
buffer.save<int32_t>( linkID->second );
if ( !location.empty() && location.front() != '/' ) { location = "/Event/" + location; }
auto iloc = map.find( location );
if ( iloc == map.end() ) {
error() << "could not locate referred location " << location << " originally: " << linkMgr->link( id )->path()
<< " in table with key " << m_encodingKey.value() << endmsg;
throw GaudiException( "unknown location", __PRETTY_FUNCTION__, StatusCode::FAILURE );
}
buffer.save<int32_t>( iloc->second );
}
// Reserve bytes for the size of the object
......@@ -146,22 +157,27 @@ namespace DataPacking::Buffer {
}
private:
ServiceHandle<IANNSvc> m_hltANNSvc{this, "ANNSvc", "HltANNSvc", "Service to retrieve PackedObjectLocations"};
ServiceHandle<IIndexedANNSvc> m_annsvc{this, "ANNSvc", "HltANNSvc", "Service to retrieve PackedObjectLocations"};
DataObjectReadHandle<typename PACKER::DataVector> m_data{this, "InputName", PACKER::unpackedLocation()};
Gaudi::Property<unsigned int> m_encodingKey{this, "EncodingKey", 0u};
Gaudi::Property<unsigned short int> m_packingVersion{
this, "PackingVersion", (unsigned short int)PACKER::PackedDataVector::defaultPackingVersion(),
"Packing version number"};
Gaudi::Property<bool> m_enableCheck{this, "EnableCheck", false};
// Mapping of reconstruction object to their location after packing
Gaudi::Property<std::map<std::string, std::string>> m_containerMap{this, "ContainerMap"};
mutable Gaudi::Accumulators::BinomialCounter<> m_nbInputFound{this, "input found"};
mutable Gaudi::Accumulators::StatCounter<> m_nbPackedData{this, "# PackedData"};
mutable Gaudi::Accumulators::MsgCounter<MSG::ERROR> m_unregisterError{
this, "Problem unregistering data in PackerBaseAlg", 10};
mutable Gaudi::Accumulators::MsgCounter<MSG::WARNING> m_zero_key{
this,
"Encoding key is zero -- this implies explicit configuration of decoding will be required.. make sure you know "
"how to do this, and will have the required configuration setup",
10};
};
} // namespace DataPacking::Buffer
......@@ -66,15 +66,16 @@ namespace DataPacking::Buffer {
Buffer operator()() const override {
if ( !m_rels.exist() ) {
auto const* rels = m_rels.getIfExists();
m_nbInputFound += ( rels != nullptr );
if ( !rels ) {
if ( this->msgLevel( MSG::DEBUG ) ) this->debug() << "Input location " << m_rels << " doesn't exist." << endmsg;
return {};
return {0};
}
auto const* rels = m_rels.get();
if ( !rels->relations().size() ) {
if ( this->msgLevel( MSG::DEBUG ) ) this->debug() << "Relations empty " << endmsg;
return {};
return {0};
}
// FIXME This (temporary) TES usage should not be needed
......@@ -96,14 +97,22 @@ namespace DataPacking::Buffer {
}
// reserve some space for data, this should be tuned
Buffer buffer;
Buffer buffer{m_encodingKey};
buffer.reserve( prels->data().size() );
static const Gaudi::StringKey PackedObjectLocations{"PackedObjectLocations"};
auto locationID = m_hltANNSvc->value( PackedObjectLocations, outputLocation() );
auto classID = prels->clID();
if ( m_encodingKey == 0 ) ++m_zero_key;
const auto& map = m_annsvc->s2i( m_encodingKey, PackedObjectLocations );
auto iloc = map.find( m_rels.fullKey().key() );
if ( iloc == map.end() ) {
error() << "could not locate packedobjectlocation " << m_rels.fullKey().key() << " in table with key "
<< m_encodingKey.value() << endmsg;
throw GaudiException( "unknown location", __PRETTY_FUNCTION__, StatusCode::FAILURE );
}
auto locationID = iloc->second;
auto classID = prels->clID();
buffer.save<uint32_t>( classID );
buffer.save<int32_t>( locationID->second );
buffer.save<int32_t>( locationID );
auto* linkMgr = prels->linkMgr();
unsigned int nlinks = linkMgr->size();
......@@ -112,11 +121,13 @@ namespace DataPacking::Buffer {
for ( unsigned int id = 0; id < nlinks; ++id ) {
auto location = linkMgr->link( id )->path();
if ( location[0] != '/' ) { location = "/Event/" + location; }
auto packedLocation = m_containerMap.find( location );
if ( packedLocation != end( m_containerMap ) ) { location = packedLocation->second; }
auto linkID = m_hltANNSvc->value( PackedObjectLocations, location );
buffer.save<int32_t>( linkID->second );
auto iloc = map.find( location );
if ( iloc == map.end() ) {
error() << "could not locate referred location " << location << " originally: " << linkMgr->link( id )->path()
<< " in table with key " << m_encodingKey.value() << endmsg;
throw GaudiException( "unknown location", __PRETTY_FUNCTION__, StatusCode::FAILURE );
}
buffer.save<int32_t>( iloc->second );
}
// Reserve bytes for the size of the object
......@@ -280,7 +291,7 @@ namespace DataPacking::Buffer {
private:
DataObjectReadHandle<RELATION> m_rels{this, "InputName", "/Event/PackedRelations"};
ServiceHandle<IANNSvc> m_hltANNSvc{this, "ANNSvc", "HltANNSvc", "Service to retrieve PackdObjectLocations"};
ServiceHandle<IIndexedANNSvc> m_annsvc{this, "ANNSvc", "HltANNSvc", "Service to retrieve PackdObjectLocations"};
/// Related Info Packer
const LHCb::RelatedInfoRelationsPacker m_rInfoPacker{this};
......@@ -288,10 +299,15 @@ namespace DataPacking::Buffer {
// Should add some packing checks, this is here only to make configuration happy for now
Gaudi::Property<bool> m_enableCheck{this, "EnableCheck", false};
// Mapping of reconstruction object to their location after packing
Gaudi::Property<std::map<std::string, std::string>> m_containerMap{this, "ContainerMap"};
Gaudi::Property<unsigned int> m_encodingKey{this, "EncodingKey", 0u};
mutable Gaudi::Accumulators::StatCounter<> m_nbPackedData{this, "# PackedData"};
mutable Gaudi::Accumulators::BinomialCounter<> m_nbInputFound{this, "input found"};
mutable Gaudi::Accumulators::StatCounter<> m_nbPackedData{this, "# PackedData"};
mutable Gaudi::Accumulators::MsgCounter<MSG::WARNING> m_zero_key{
this,
"Encoding key is zero -- this implies explicit configuration of decoding will be required.. make sure you know "
"how to do this, and will have the required configuration setup",
10};
};
} // namespace DataPacking::Buffer
......@@ -10,7 +10,7 @@
\*****************************************************************************/
#pragma once
#include "Kernel/IANNSvc.h"
#include "Kernel/IIndexedANNSvc.h"
#include "LHCbAlgs/Consumer.h"
#include "Event/MCParticle.h"
......@@ -18,6 +18,9 @@
#include "Event/ProtoParticle.h"
#include "Event/RecVertex.h"
#include "Event/PackedPartToRelatedInfoRelation.h"
#include "Event/RelatedInfoMap.h"
#include "Event/PackedRelations.h"
#include "Relations/Relation1D.h"
#include "Relations/RelationWeighted1D.h"
......@@ -25,6 +28,7 @@
#include "Event/PackedDataBuffer.h"
#include "Event/StandardPacker.h"
#include "RawbankV2Compatibility.h"
#include "RegistryWrapper.h"
/**
......@@ -43,24 +47,50 @@
namespace DataPacking::Buffer {
template <typename RELATION, typename PRELATION, typename FROM, typename TO>
class Rel2Unpack : public LHCb::Algorithm::Consumer<void( const LHCb::Hlt::PackedData::PackedDataInBuffer& )> {
// Relation types
using Part2IntRelations = LHCb::Relation1D<LHCb::Particle, int>;
using Part2InfoRelations = LHCb::Relation1D<LHCb::Particle, LHCb::RelatedInfoMap>;
template <typename RELATION, typename PRELATION, typename FROM, typename TO = void>
class RelUnpack : public LHCb::Algorithm::Consumer<void( const LHCb::Hlt::PackedData::MappedInBuffers& )> {
using Buffer = LHCb::Hlt::PackedData::PackedDataInBuffer;
public:
Rel2Unpack( const std::string& name, ISvcLocator* pSvcLocator )
: LHCb::Algorithm::Consumer<void( const Buffer& )>( name, pSvcLocator,
KeyValue{"InputName", "/Event/PackedRelations"} ) {}
RelUnpack( const std::string& name, ISvcLocator* pSvcLocator )
: LHCb::Algorithm::Consumer<void( const LHCb::Hlt::PackedData::MappedInBuffers& )>(
name, pSvcLocator, KeyValue{"InputName", "/Event/DAQ/MappedDstData"} ) {}
void operator()( const Buffer& buffer ) const override {
void operator()( const LHCb::Hlt::PackedData::MappedInBuffers& buffers ) const override {
auto* rels = m_rels.put( std::make_unique<RELATION>() );
if ( buffers.empty() ) {
++m_no_buffers;
return;
}
if ( !buffer.buffer().size() ) return;
const Gaudi::StringKey PackedObjectLocations{"PackedObjectLocations"};
Buffer readBuffer;
readBuffer.init( buffer.buffer(), false );
const auto& s2i = m_annsvc->s2i( buffers.key(), PackedObjectLocations );
auto j = buffers.bankVersion() < 3
? std::find_if( s2i.begin(), s2i.end(),
v2_compatibility::match_first_with_missing_p_after_slash( m_rels.fullKey().key() ) )
: s2i.find( m_rels.fullKey().key() );
if ( j == s2i.end() ) {
// FIXME: requires ref update ++m_unknown;
return;
}
auto* rels = m_rels.put( std::make_unique<RELATION>() );
const auto* buffer = buffers.find( j->second );
if ( !buffer ) {
// FIXME: requires ref update ++m_absent;
return;
}
if ( buffer->buffer().size() == 0 ) return;
Buffer readBuffer{*buffer}; // TODO: avoid copying just because 'pos' gets updated -- allow a 'shallow' emphemeral
// version which allows reading
// Sadly the pack structure expects an object with a valid RegEntry. To be improved
auto prels = RegistryWrapper<PRELATION>( m_rels.fullKey().key() + "_Packed" );
......@@ -90,13 +120,12 @@ namespace DataPacking::Buffer {
<< endmsg;
}
}
static const Gaudi::StringKey PackedObjectLocations{"PackedObjectLocations"};
assert( buffers.key() == readBuffer.key() );
const auto& map = m_annsvc->i2s( readBuffer.key(), PackedObjectLocations );
for ( auto id : header.linkLocationIDs ) {
auto location = m_hltANNSvc->value( PackedObjectLocations, id );
if ( location ) {
prels->linkMgr()->addLink( location.value().first, nullptr );
auto location = map.find( id );
if ( location != end( map ) ) {
prels->linkMgr()->addLink( location->second, nullptr );
} else {
m_missingLinks++;
}
......@@ -104,14 +133,19 @@ namespace DataPacking::Buffer {
if ( prels->data().size() ) {
rels->setVersion( prels->version() );
unpack( *rels, *prels );
if constexpr ( std::is_void<TO>::value )
unpack1d( *rels, *prels );
else
unpack2d( *rels, *prels );
}
// Count packed output
m_unpackedData += rels->relations().size();
}
void unpack( RELATION& rels, const PRELATION& prels ) const {
// Relations between two containers
void unpack2d( RELATION& rels, const PRELATION& prels ) const {
for ( const auto& prel : prels.data() ) {
for ( int kk = prel.start; prel.end > kk; ++kk ) {
......@@ -143,15 +177,14 @@ namespace DataPacking::Buffer {
continue;
}
typename RELATION::From f = from->object( srcKey );
typename RELATION::To t = to->object( destKey );
auto f = from->object( srcKey );
auto t = to->object( destKey );
StatusCode sc;
// only weighted relations have weights
if constexpr ( std::is_same_v<PRELATION, LHCb::PackedWeightedRelations> ) {
typename RELATION::Weight wgt = prels.weights()[kk];
sc = rels.relate( f, t, wgt );
sc = rels.relate( f, t, prels.weights()[kk] );
} else {
sc = rels.relate( f, t );
}
......@@ -176,13 +209,95 @@ namespace DataPacking::Buffer {
}
}
// relation between particles and ints
void unpack1d( Part2IntRelations& rels, const LHCb::PackedRelations& prels ) const {
for ( const auto& prel : prels.data() ) {
for ( int kk = prel.start; prel.end > kk; ++kk ) {
int srcLink( 0 );
int srcKey( 0 );
int destLink( 0 );
int destKey( 0 );
StandardPacker::indexAndKey64( prels.sources()[kk], srcLink, srcKey );
StandardPacker::indexAndKey64( prels.dests()[kk], destLink, destKey );
DataObject* fp = nullptr;
this->evtSvc()->retrieveObject( prels.linkMgr()->link( srcLink )->path(), fp ).ignore();
auto* from = static_cast<FROM*>( fp );
if ( !from ) {
this->warning() << "Source location " << prels.linkMgr()->link( srcLink )->path()
<< " not persisted, skip the relation." << endmsg;
continue;
}
auto f = from->object( srcKey );
auto t = (int)prels.dests()[kk];
StatusCode sc = rels.relate( f, t );
if ( !sc )
this->warning() << "Something went wrong with relation unpacking "
<< "sourceKey " << srcKey << " sourceLink " << srcLink << "destKey " << destKey
<< " destLink " << destLink << endmsg;
}
}
}
// Relation between particles and related info
void unpack1d( Part2InfoRelations& rels, const LHCb::PackedRelatedInfoRelations& prels ) const {
for ( const auto& prel : prels.containers() ) {
for ( unsigned int kk = prel.first; prel.last > kk; ++kk ) {
const auto& rel = prels.relations()[kk];
int srcLink( 0 );
int srcKey( 0 );
StandardPacker::indexAndKey64( rel.reference, srcLink, srcKey );
DataObject* fp = nullptr;
this->evtSvc()->retrieveObject( prels.linkMgr()->link( srcLink )->path(), fp ).ignore();
auto* from = static_cast<FROM*>( fp );
if ( !from ) {
this->warning() << "Source location " << prels.linkMgr()->link( srcLink )->path()
<< " not persisted, skip the relation." << endmsg;
continue;
}
auto f = from->object( srcKey );
LHCb::RelatedInfoMap t;
t.reserve( rel.last - rel.first );
for ( const auto& jj : Packer::subrange( prels.info(), rel.first, rel.last ) ) { t.insert( jj ); }
StatusCode sc = rels.relate( f, t );
if ( !sc )
this->warning() << "Something went wrong with relation unpacking "
<< "sourceKey " << srcKey << " sourceLink " << srcLink << endmsg;
}
}
}
private:
ServiceHandle<IANNSvc> m_hltANNSvc{this, "ANNSvc", "HltANNSvc", "Service to retrieve DecReport IDs"};
ServiceHandle<IIndexedANNSvc> m_annsvc{this, "ANNSvc", "HltANNSvc", "Service to retrieve DecReport IDs"};
DataObjectWriteHandle<RELATION> m_rels{this, "OutputName", ""};
mutable Gaudi::Accumulators::StatCounter<> m_unpackedData{this, "# PackedData"};
mutable Gaudi::Accumulators::MsgCounter<MSG::ERROR> m_unknown{
this, "Requested output is not known for the given input stream - not creating output"};
mutable Gaudi::Accumulators::MsgCounter<MSG::WARNING> m_absent{
this, "Requested output is known, but not present in packed data - creating empty container"};
mutable Gaudi::Accumulators::MsgCounter<MSG::ERROR> m_no_buffers{
this, "Input has empty map of buffers -- can not do anything at this point. Verify HltPackedBufferDecoder "
"configuration"};
mutable Gaudi::Accumulators::StatCounter<> m_unpackedData{this, "# PackedData"};
mutable Gaudi::Accumulators::StatCounter<> m_missingLinks{this, "# Missing Link Locations"};
};
......
......@@ -11,8 +11,9 @@
#pragma once
#include "Event/PackedDataBuffer.h"
#include "Kernel/IANNSvc.h"
#include "Kernel/IIndexedANNSvc.h"
#include "LHCbAlgs/Consumer.h"
#include "RawbankV2Compatibility.h"
#include "RegistryWrapper.h"
/**
......@@ -30,24 +31,36 @@
**/
namespace DataPacking::Buffer {
namespace {
static const Gaudi::StringKey PackedObjectLocations{"PackedObjectLocations"};
}
template <class PACKER>
class Unpack final : public LHCb::Algorithm::Consumer<void()> {
class Unpack final : public LHCb::Algorithm::Consumer<void( LHCb::Hlt::PackedData::MappedInBuffers const& )> {
using Buffer = LHCb::Hlt::PackedData::PackedDataInBuffer;
public:
using Consumer::Consumer;
Unpack( const std::string& name, ISvcLocator* pSvcLocator )
: Consumer{name, pSvcLocator, {"InputName", "/Event/DAQ/MappedDstData"}} {}
StatusCode initialize() override;
void operator()() const override;
void operator()( LHCb::Hlt::PackedData::MappedInBuffers const& ) const override;
private:
ServiceHandle<IANNSvc> m_hltANNSvc{this, "ANNSvc", "HltANNSvc", "Service to retrieve DecReport IDs"};
DataObjectReadHandle<Buffer> m_buffer{this, "InputName", PACKER::packedLocation()};
ServiceHandle<IIndexedANNSvc> m_hltANNSvc{this, "ANNSvc", "HltANNSvc", "Service to resolve location IDs"};
DataObjectWriteHandle<typename PACKER::DataVector> m_data{this, "OutputName", PACKER::unpackedLocation()};
mutable Gaudi::Accumulators::MsgCounter<MSG::ERROR> m_inconsistentSize{this,
"Size read does not match size expected"};
mutable Gaudi::Accumulators::MsgCounter<MSG::ERROR> m_unknown{
this, "Requested output is not known for the given input stream - not creating output"};
mutable Gaudi::Accumulators::MsgCounter<MSG::WARNING> m_absent{
this, "Requested output is known, but not present in packed data - creating empty container"};
mutable Gaudi::Accumulators::MsgCounter<MSG::ERROR> m_no_buffers{
this, "Input has empty map of buffers -- can not do anything at this point. Verify HltPackedBufferDecoder "
"configuration"};
mutable Gaudi::Accumulators::StatCounter<> m_unpackedData{this, "# PackedData"};
mutable Gaudi::Accumulators::StatCounter<> m_missingLinks{this, "# Missing Link Locations"};
};
......@@ -56,25 +69,48 @@ namespace DataPacking::Buffer {
StatusCode Unpack<PACKER>::initialize() {
return Consumer::initialize().andThen( [&] {
if ( this->msgLevel( MSG::DEBUG ) )
this->debug() << "Input " << m_buffer.fullKey() << " Output " << m_data.fullKey() << "" << endmsg;
this->debug() << "Input " << inputLocation() << " Output " << m_data.fullKey() << "" << endmsg;
} );
}
template <class PACKER>
void Unpack<PACKER>::operator()() const {
void Unpack<PACKER>::operator()( LHCb::Hlt::PackedData::MappedInBuffers const& buffers ) const {
if ( buffers.empty() ) {
++m_no_buffers;
return;
}
const auto& s2i = m_hltANNSvc->s2i( buffers.key(), PackedObjectLocations );
auto j = buffers.bankVersion() < 3
? std::find_if( s2i.begin(), s2i.end(),
v2_compatibility::match_first_with_missing_p_after_slash( m_data.fullKey().key() ) )
: s2i.find( m_data.fullKey().key() );
if ( j == s2i.end() ) {
// FIXME: requires update of refs... ++m_unknown;
return;
}
// unfortunately, the output data must be on the TES prior to filling it with data
// as it needs a valid `registry entry` (which it obtains when it is added to the TES)
// for the unpacking to work...
auto* data = m_data.put( std::make_unique<typename PACKER::DataVector>() );
const auto* buffer = buffers.find( j->second );
if ( !buffer ) {
// FIXME: requires update of refs... ++m_absent;
return;
}
if ( buffer->buffer().size() == 0 ) return;
const PACKER packer( this );
const auto buffer = m_buffer.get();
if ( !buffer or !buffer->buffer().size() ) return;
// Sadly the pack structure expects data with valid Registry. To be improved
auto pdata = RegistryWrapper<typename PACKER::PackedDataVector>( m_data.fullKey().key() + "_Packed" );
LHCb::Hlt::PackedData::ObjectHeader header;
Buffer readBuffer;
readBuffer.init( buffer->buffer(), false );
Buffer readBuffer{*buffer}; // TODO: allow for emphemeral 'view' for reading without copying just to update 'pos'
while ( !readBuffer.eof() ) {
readBuffer.load( header.classID );
......@@ -85,6 +121,7 @@ namespace DataPacking::Buffer {
auto nBytesRead = readBuffer.load( *pdata );
if ( nBytesRead != header.storedSize ) {
++m_inconsistentSize;
this->fatal() << "Loading of object (CLID=" << header.classID << " locationID=" << header.locationID << ") "
<< " consumed " << nBytesRead << " bytes, "
<< " but " << header.storedSize << " were stored!" << endmsg;
......@@ -97,12 +134,12 @@ namespace DataPacking::Buffer {
}
}
static const Gaudi::StringKey PackedObjectLocations{"PackedObjectLocations"};
assert( buffers.key() == readBuffer.key() );
const auto& map = m_hltANNSvc->i2s( readBuffer.key(), PackedObjectLocations );
for ( auto id : header.linkLocationIDs ) {
auto location = m_hltANNSvc->value( PackedObjectLocations, id );
if ( location ) {
pdata->linkMgr()->addLink( location.value().first, nullptr );
auto location = map.find( id );
if ( location != end( map ) ) {
pdata->linkMgr()->addLink( location->second, nullptr );
} else {
m_missingLinks++;
}
......@@ -112,12 +149,13 @@ namespace DataPacking::Buffer {
data->setVersion( pdata->version() );
packer.unpack( *pdata, *data );
if ( this->msgLevel( MSG::DEBUG ) ) {
this->debug() << "Created " << data->size() << " data objects from " << m_buffer << endmsg;
this->debug() << "data type " << System::typeinfoName( typeid( data ) ) << endmsg;
this->debug() << "packed data type " << System::typeinfoName( typeid( pdata ) ) << endmsg;
this->debug() << " Packed Data Version = " << (unsigned int)pdata->version() << endmsg;
this->debug() << " Packed Packing Version = " << (unsigned int)pdata->packingVersion() << endmsg;
this->debug() << " Unpacked Data Version = " << (unsigned int)data->version() << endmsg;
this->debug() << "Created " << data->size() << " data objects for " << m_data.fullKey() << " from "
<< inputLocation() << endmsg;
this->debug() << " Output data type " << System::typeinfoName( typeid( *data ) ) << endmsg;
this->debug() << " Packed data type " << System::typeinfoName( typeid( *pdata ) ) << endmsg;
this->debug() << " Packed data Version = " << (unsigned int)pdata->version() << endmsg;
this->debug() << " Packed packing version = " << (unsigned int)pdata->packingVersion() << endmsg;
this->debug() << " Output data Version = " << (unsigned int)data->version() << endmsg;
}
}
......
......@@ -22,7 +22,8 @@
#include "GaudiKernel/DataObjectHandle.h"
#include "Event/PackedDataBuffer.h"
#include "Kernel/IANNSvc.h"
#include "Kernel/IIndexedANNSvc.h"
#include "RawbankV2Compatibility.h"
#include "RegistryWrapper.h"
namespace DataPacking::Buffer {
......@@ -32,43 +33,64 @@ namespace DataPacking::Buffer {
*
* Note that the inheritance from Consumer is misleading. The algorithm is
* writing to TES, just via a Handle so that it can do it at the begining of
* the operator(), as cross pointers are used in the TES and requires this.
* the operator(), as rebuilding pointers from persistency requires objects
* to be in the TES _prior_ to adding their content
*/
static const Gaudi::StringKey PackedObjectLocations{"PackedObjectLocations"};
struct ProtoParticleUnpacker
: LHCb::Algorithm::Consumer<void( DetectorElement const& ),
: LHCb::Algorithm::Consumer<void( LHCb::Hlt::PackedData::MappedInBuffers const&, DetectorElement const& ),
LHCb::DetDesc::usesBaseAndConditions<FixTESPath<Gaudi::Algorithm>, DetectorElement>> {
ProtoParticleUnpacker( const std::string& name, ISvcLocator* pSvcLocator )
: Consumer{name, pSvcLocator, {KeyValue{"StandardGeometryTop", LHCb::standard_geometry_top}}} {}
void operator()( DetectorElement const& ) const override;
: Consumer{name,
pSvcLocator,
{{"InputName", LHCb::PackedProtoParticleLocation::Charged},
{"StandardGeometryTop", LHCb::standard_geometry_top}}} {}
ServiceHandle<IANNSvc> m_hltANNSvc{this, "ANNSvc", "HltANNSvc", "Service to retrieve DecReport IDs"};
void operator()( LHCb::Hlt::PackedData::MappedInBuffers const&, DetectorElement const& ) const override;
DataObjectWriteHandle<LHCb::ProtoParticles> m_protos{this, "OutputName", LHCb::ProtoParticleLocation::Charged};
DataObjectReadHandle<LHCb::Hlt::PackedData::PackedDataInBuffer> m_buffer{
this, "InputName", LHCb::PackedProtoParticleLocation::Charged};
ServiceHandle<IIndexedANNSvc> m_annsvc{this, "ANNSvc", "HltANNSvc", "Service to retrieve DecReport IDs"};
ToolHandleArray<LHCb::Rec::Interfaces::IProtoParticles> m_addInfo{this, "AddInfo", {}};
};
void ProtoParticleUnpacker::operator()( DetectorElement const& lhcb ) const {
void ProtoParticleUnpacker::operator()( LHCb::Hlt::PackedData::MappedInBuffers const& buffers,
DetectorElement const& lhcb ) const {
if ( buffers.empty() ) {
error() << "empty buffer is an error" << endmsg;
// ++m_something;
return;
}
auto* data = m_protos.put( std::make_unique<LHCb::ProtoParticles>() );
const auto& s2i = m_annsvc->s2i( buffers.key(), PackedObjectLocations );
auto const buffer = m_buffer.get();
if ( !buffer or !buffer->buffer().size() ) return;
auto j = buffers.bankVersion() < 3
? std::find_if( s2i.begin(), s2i.end(),
v2_compatibility::match_first_with_missing_p_after_slash( m_protos.fullKey().key() ) )
: s2i.find( m_protos.fullKey().key() );
if ( j == s2i.end() ) {
throw GaudiException{"Could not find entry for requested output " + m_protos.fullKey().key(), __PRETTY_FUNCTION__,
StatusCode::FAILURE};
}
auto* data = m_protos.put( std::make_unique<LHCb::ProtoParticles>() );
const auto* buffer = buffers.find( j->second );
if ( !buffer || !buffer->buffer().size() ) return;
// Sadly the pack structure expects an object with a valid RegEntry. To be improved
auto pdata = RegistryWrapper<LHCb::PackedProtoParticles>( m_protos.fullKey().key() + "_Packed" );
if ( msgLevel( MSG::DEBUG ) )
this->debug() << "Unpacking ProtoParticles from " << m_buffer << " to " << m_protos << endmsg;
this->debug() << "Unpacking ProtoParticles from " << inputLocation() << " to " << m_protos << endmsg;
std::vector<int32_t> linkLocationIDs;
LHCb::Hlt::PackedData::PackedDataInBuffer readBuffer = *buffer;
LHCb::Hlt::PackedData::PackedDataInBuffer readBuffer{*buffer};
const LHCb::ProtoParticlePacker packer( this );
// Do the actual loading of the objects
......@@ -91,10 +113,13 @@ namespace DataPacking::Buffer {
<< "and " << header.linkLocationIDs.size() << " links were stored!" << endmsg;
}
assert( readBuffer.key() == buffers.key() );
const auto& map = m_annsvc->i2s( readBuffer.key(), PackedObjectLocations );
for ( auto id : header.linkLocationIDs ) {
auto location = m_hltANNSvc->value( PackedObjectLocations, id );
if ( location ) {
pdata->linkMgr()->addLink( location.value().first, nullptr );
auto location = map.find( id );
if ( location != end( map ) ) {
pdata->linkMgr()->addLink( location->second, nullptr );
} else {
this->warning() << " could not find the location for link id " << id << endmsg;
}
......
/*****************************************************************************\
* (c) Copyright 2000-2018 CERN for the benefit of the LHCb Collaboration *
* (c) Copyright 2022 CERN for the benefit of the LHCb Collaboration *
* *
* This software is distributed under the terms of the GNU General Public *
* Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". *
......@@ -8,38 +8,21 @@
* granted to it by virtue of its status as an Intergovernmental Organization *
* or submit itself to any jurisdiction. *
\*****************************************************************************/
#include "ANNSvc.h"
#include <functional>
#include <string>
#pragma once
#include <algorithm>
#include <iterator>
#include <string_view>
class HltANNSvc : public ANNSvc {
public:
HltANNSvc( const std::string& name, ISvcLocator* svcLocator )
: ANNSvc( name, svcLocator,
{{"Hlt1SelectionID"},
{"Hlt2SelectionID"},
{"SpruceSelectionID"},
{"InfoID"},
{"RoutingBits"},
{"RelInfoLocations"},
{"PackedObjectLocations"},
{"PackedObjectTypes"}} ) {}
using ANNSvc::handleUndefined;
std::optional<minor_value_type> handleUndefined( const major_key_type& major,
const std::string& minor ) const override;
private:
Gaudi::Property<bool> m_allowUndefined{this, "allowUndefined", true,
"do we allow undefined, on-demand generated, key/value pairs?"};
};
DECLARE_COMPONENT( HltANNSvc )
std::optional<IANNSvc::minor_value_type> HltANNSvc::handleUndefined( const major_key_type& major,
const std::string& minor ) const {
if ( !m_allowUndefined ) return {};
static const std::hash<std::string> hasher;
debug() << "handleUndefined called for " << major << " : " << minor << " --> " << hasher( minor ) << endmsg;
return minor_value_type( minor, hasher( minor ) );
}
namespace DataPacking::Buffer::v2_compatibility {
inline auto match_first_with_missing_p_after_slash( std::string_view k ) {
return [=]( const auto& i ) {
// for 'v2' banks, the id stored is the '/p' version of the name
// but here we want the actual (target) name...
if ( i.first.size() != k.size() + 1 ) return false;
auto [m, n] = std::mismatch( k.begin(), k.end(), i.first.begin() );
if ( n == i.first.end() || *n != 'p' ) return false;
if ( n == i.first.begin() || *std::prev( n ) != '/' ) return false;
return std::mismatch( m, k.end(), std::next( n ) ).first == k.end();
};
}
} // namespace DataPacking::Buffer::v2_compatibility
......@@ -9,8 +9,7 @@
* or submit itself to any jurisdiction. *
\*****************************************************************************/
#include "Buffer1RelationUnpackerBaseAlg.h"
#include "Buffer2RelationUnpackerBaseAlg.h"
#include "BufferRelationUnpackerBaseAlg.h"
#include "BufferUnpackerBaseAlg.h"
#include "UnpackerBaseAlg.h"
......@@ -89,20 +88,20 @@ DECLARE_COMPONENT_WITH_ID( DataPacking::Buffer::Unpack<LHCb::RecSummaryPacker>,
// ProtoParticleUnpacker is not templated, because it needs an additional function.
// Relation packers
using P2VRELATION = DataPacking::Buffer::Rel2Unpack<LHCb::Relation1D<LHCb::Particle, LHCb::VertexBase>,
LHCb::PackedRelations, LHCb::Particles, LHCb::RecVertices>;
using P2MCPRELATION = DataPacking::Buffer::Rel2Unpack<LHCb::Relation1D<LHCb::Particle, LHCb::MCParticle>,
LHCb::PackedRelations, LHCb::Particles, LHCb::MCParticles>;
using P2VRELATION = DataPacking::Buffer::RelUnpack<LHCb::Relation1D<LHCb::Particle, LHCb::VertexBase>,
LHCb::PackedRelations, LHCb::Particles, LHCb::RecVertices>;
using P2MCPRELATION = DataPacking::Buffer::RelUnpack<LHCb::Relation1D<LHCb::Particle, LHCb::MCParticle>,
LHCb::PackedRelations, LHCb::Particles, LHCb::MCParticles>;
using PP2MCPRELATION =
DataPacking::Buffer::Rel2Unpack<LHCb::RelationWeighted1D<LHCb::ProtoParticle, LHCb::MCParticle, double>,
LHCb::PackedWeightedRelations, LHCb::ProtoParticles, LHCb::MCParticles>;
DataPacking::Buffer::RelUnpack<LHCb::RelationWeighted1D<LHCb::ProtoParticle, LHCb::MCParticle, double>,
LHCb::PackedWeightedRelations, LHCb::ProtoParticles, LHCb::MCParticles>;
using P2IntRELATION =
DataPacking::Buffer::Rel1Unpack<LHCb::Relation1D<LHCb::Particle, int>, LHCb::PackedRelations, LHCb::Particles>;
DataPacking::Buffer::RelUnpack<LHCb::Relation1D<LHCb::Particle, int>, LHCb::PackedRelations, LHCb::Particles>;
using P2InfoRELATION = DataPacking::Buffer::Rel1Unpack<LHCb::Relation1D<LHCb::Particle, LHCb::RelatedInfoMap>,
LHCb::PackedRelatedInfoRelations, LHCb::Particles>;
using P2InfoRELATION = DataPacking::Buffer::RelUnpack<LHCb::Relation1D<LHCb::Particle, LHCb::RelatedInfoMap>,
LHCb::PackedRelatedInfoRelations, LHCb::Particles>;
DECLARE_COMPONENT_WITH_ID( P2VRELATION, "P2VRelationUnpacker" )
DECLARE_COMPONENT_WITH_ID( P2MCPRELATION, "P2MCPRelationUnpacker" )
......
......@@ -35,10 +35,11 @@ namespace LHCb::Hlt::PackedData {
return output_size > 0;
}
bool ByteBuffer::init( const buffer_type& data, bool compressed ) {
bool ByteBuffer::init( buffer_view data, bool compressed ) {
m_pos = 0;
if ( !compressed ) {
m_buffer = data;
assert( m_buffer.data() != data.data() );
m_buffer.assign( data.begin(), data.end() );
return true;
}
if ( data.size() < 9 ) {
......
......@@ -48,6 +48,7 @@ class Options(BaseModel):
input_files: list[str] = []
input_type: FileFormats = FileFormats.NONE
input_raw_format: float = 0.5
input_manifest_file: Optional[str] = None
xml_file_catalog: Optional[str] = None
evt_max: int = -1
first_evt: int = 0
......@@ -56,6 +57,9 @@ class Options(BaseModel):
"""Output"""
output_file: Optional[str] = None
output_type: FileFormats = FileFormats.ROOT
output_manifest_file: Optional[str] = None
append_decoding_keys_to_output_manifest: bool = True
write_decoding_keys_to_git: bool = True
compression: Optional[constr(regex=r'^(ZLIB|LZMA|LZ4|ZSTD):\d+$')] = None
histo_file: Optional[str] = None
ntuple_file: Optional[str] = None
......
......@@ -12,6 +12,7 @@
import os
from PyConf import configurable
from PyConf.Algorithms import RecVertexPacker, RecVertexUnpacker
from PyConf.Algorithms import VertexPacker, VertexUnpacker
from PyConf.Algorithms import RichPIDPacker, RichPIDUnpacker
......@@ -35,7 +36,7 @@ from PyConf.Algorithms import PP2MCPRelationPacker, PP2MCPRelationUnpacker
from PyConf.application import make_data_with_FetchDataFromFile
from PyConf.components import get_output, force_location
from PyConf.dataflow import dataflow_config
from PyConf.location_prefix import prefix, packed_prefix, unpacked_prefix
from PyConf.location_prefix import prefix, packed_prefix
import logging
#from Gaudi.Configuration import DEBUG as WARNING
......@@ -210,6 +211,36 @@ standardUnpacked = {
standardPacked = {}
_default_persisted_locations = {
'NeutralProtos': '/Event/Rec/ProtoP/Neutrals',
'ChargedProtos': '/Event/Rec/ProtoP/Charged',
'Tracks': '/Event/Rec/Track/Best',
'PVs': '/Event/Rec/Vertex/Primary',
'MuonPIDs': '/Event/Rec/Muon/MuonPID',
'MuonPIDTracks': '/Event/Rec/Muon/MuonTracks',
'RichPIDs': '/Event/Rec/Rich/PIDs',
'CaloElectrons': '/Event/Rec/Calo/Electrons',
'CaloPhotons': '/Event/Rec/Calo/Photons',
'CaloMergedPi0s': '/Event/Rec/Calo/MergedPi0s',
'CaloSplitPhotons': '/Event/Rec/Calo/SplitPhotons',
'RecSummary': '/Event/Rec/Summary',
}
@configurable
def default_persisted_locations(locations=_default_persisted_locations,
prefix=None):
if not prefix: return locations
return {
k: v.replace('/Event', '/Event/{}'.format(prefix))
for k, v in locations.items()
}
@configurable
def persisted_location(k, force=True, locations=default_persisted_locations):
return force_location(locations()[k]) if force else None
#for unpackers: input with p (ie, pRec), output without(Rec). For packers, it's other way around.
class PersistRecoPacking(object):
......@@ -219,14 +250,13 @@ class PersistRecoPacking(object):
"""
def __init__(self,
annsvc="HltANNSvc",
stream="/Event/HLT2",
reco_stream="",
data_type="Upgrade",
mc={},
descriptors=standardDescriptors,
packed=standardPacked,
unpacked=standardUnpacked):
packed=None,
unpacked=None):
"""Collection of packed object descriptors.
Args:
......@@ -236,23 +266,32 @@ class PersistRecoPacking(object):
unpacked: Dict of the form {descriptor_name: unpacked_location list}.
prefix: TES location stream in the form /Event/Spruce
recoprefix: TES location stream in the form /Event/HLT2 for reconstruction objects
annsvc: Name of the ANNService
mc: in case of simulation, used in unpacking relations to MC particles
"""
self._descriptors = descriptors[data_type]
# if we are only unpacking/packing standard reco locations
# attach the stream to the locations
if packed == standardPacked:
if unpacked is None:
unpacked = standardUnpacked
for k, vals in unpacked.items():
if reco_stream not in stream:
vals = [prefix(l, reco_stream) for l in vals]
packed[k] = [packed_prefix(l, stream) for l in vals]
if unpacked == standardUnpacked:
unpacked[k] = [
prefix(
l.removeprefix(stream + '/'),
stream + '/' + reco_stream) for l in vals
]
else:
unpacked[k] = [
prefix(l.removeprefix(stream + '/'), stream)
for l in vals
]
if packed is None:
packed = standardPacked
for k, vals in unpacked.items():
if reco_stream not in stream:
vals = [prefix(l, reco_stream) for l in vals]
unpacked[k] = [prefix(l, stream) for l in vals]
packed[k] = [packed_prefix(l, stream) for l in vals]
# This is to avoid complaints about missing keys
for name, d in self._descriptors.items():
......@@ -265,7 +304,6 @@ class PersistRecoPacking(object):
self.unpacked = unpacked
self.prefix = stream
self.recoprefix = reco_stream
self.annsvc = annsvc
self.mc = mc
def printPacking(self, log):
......@@ -274,7 +312,6 @@ class PersistRecoPacking(object):
log.debug("Unpacked locations == ", self.unpacked)
log.debug("Stream == ", self.prefix)
log.debug("Reco stream == ", self.recoprefix)
log.debug("ANN service name ==", self.annsvc)
log.debug("Is simulation ==", self.mc)
def packedLocations(self):
......@@ -299,7 +336,7 @@ class PersistRecoPacking(object):
"""Return the dict with object type => unpacked object locations."""
return self.unpacked
def unpackers_by_key(self, configurables=False, output_level=WARNING):
def unpackers_by_key(self, output_level=WARNING):
"""Return the dictionary of unpacking algorithms."""
algs = {}
......@@ -308,30 +345,27 @@ class PersistRecoPacking(object):
algs[name] = []
for name, d in self._descriptors.items():
zipped = zip(self.unpacked[name], self.packed[name])
for loc, ploc in zipped:
for loc, inputName in zip(self.unpacked[name], self.packed[name]):
#unpack only locations in the requested stream
if (ploc.startswith(self.prefix)):
if loc.startswith(self.prefix):
# craete a list of extra inputs for each type
extra_inputs = []
for dl in d.extra_inputs:
extra_inputs += self.unpacked[dl]
# create a list of extra inputs for each type
extra_inputs = [self.unpacked[dl] for dl in d.extra_inputs]
# Relations are special, deal with others first
#if not name.endswith("Relations"):
alg = d.unpacker(
InputName=force_location(ploc),
OutputLevel=output_level,
ANNSvc=self.annsvc,
InputName=force_location(
getattr(
inputName, 'location', inputName
) # check for either location or fullKey().key()
), # BUG / FIXME: why is force_location needed here??? inputName is a data handle!!!
outputs={'OutputName': force_location(loc)},
ExtraOutputs=[force_location(loc)],
ExtraInputs=[force_location(l) for l in extra_inputs])
algs[name] += [alg]
if configurables:
algs = self.as_configurables(algs)
return algs
def unpackers(self, configurables=False, output_level=WARNING):
......@@ -346,34 +380,24 @@ class PersistRecoPacking(object):
def packers_by_key(self,
configurables=False,
output_level=WARNING,
encoding_key=0,
enable_check=False):
"""Return the dict of packing algorithms for the inputs."""
algs = {}
map = {}
for name in self._descriptors.keys():
algs[name] = []
for name in self._descriptors.keys():
zipped = zip(self.unpacked[name], self.packed[name])
for loc, ploc in zipped:
if self.recoprefix in loc:
if loc != unpacked_prefix(ploc, self.prefix):
map[loc] = unpacked_prefix(ploc, self.prefix)
for name, d in self._descriptors.items():
zipped = zip(self.unpacked[name], self.packed[name])
for loc, ploc in zipped:
for loc, ploc in zip(self.unpacked[name], self.packed[name]):
if (loc.startswith(
self.prefix)): #only location in the requested stream
alg = d.packer(
InputName=force_location(loc),
OutputLevel=output_level,
EnableCheck=enable_check,
InputName=force_location(loc),
outputs={'OutputName': force_location(ploc)},
ExtraOutputs=[force_location(ploc)],
ANNSvc=self.annsvc,
ContainerMap=map)
EncodingKey=encoding_key)
algs[name] += [alg]
if configurables:
algs = self.as_configurables(algs)
......@@ -382,11 +406,14 @@ class PersistRecoPacking(object):
def packers(self,
configurables=False,
output_level=WARNING,
encoding_key=0,
enable_check=False):
"""Return the list of packing algorithms."""
algs = []
for p in self.packers_by_key(
output_level=output_level, enable_check=enable_check).values():
output_level=output_level,
enable_check=enable_check,
encoding_key=encoding_key).values():
algs += p
if configurables:
algs = self.as_configurables(algs)
......@@ -399,16 +426,6 @@ class PersistRecoPacking(object):
m[name] = []
return m
def unpackedToPackedLocationMap(self):
"""Return the dict {unpacked_location: packed_location}.
Packed Location is the location of the packed data objects to test the unpacking
"""
m = {}
for name, d in self._descriptors.items():
for loc in self.unpacked[name]:
m[loc] = packed_prefix(loc, self.prefix)
return m
@staticmethod
def as_configurables(algs):
"""Convert a list of PyConf Algorithm objects to Configurables.
......
......@@ -10,12 +10,12 @@
###############################################################################
"""Helpers for configuring an application to read Moore HLT2 output."""
from __future__ import absolute_import
import os, json, XRootD.client
import functools
import os, json
from PyConf.utils import load_file
from .PersistRecoConf import PersistRecoPacking
from PyConf.location_prefix import unpacked_prefix
from PyConf.object_types import inv_classid_map
from PyConf.application import (
default_raw_event, make_data_with_FetchDataFromFile, ComponentConfig)
......@@ -38,6 +38,121 @@ RawBanks = {
}
# These are the known types that we can persist
def type_map():
type_map = {
"KeyedContainer<LHCb::Particle,Containers::KeyedObjectManager<Containers::hashmap> >":
"Particles",
"KeyedContainer<LHCb::ProtoParticle,Containers::KeyedObjectManager<Containers::hashmap> >":
"ProtoParticles",
"KeyedContainer<LHCb::Event::v1::Track,Containers::KeyedObjectManager<Containers::hashmap> >":
"Tracks",
"KeyedContainer<LHCb::RichPID,Containers::KeyedObjectManager<Containers::hashmap> >":
"RichPIDs",
"KeyedContainer<LHCb::MuonPID,Containers::KeyedObjectManager<Containers::hashmap> >":
"MuonPIDs",
"KeyedContainer<LHCb::RecVertex,Containers::KeyedObjectManager<Containers::hashmap> >":
"PVs",
"KeyedContainer<LHCb::Vertex,Containers::KeyedObjectManager<Containers::hashmap> >":
"Vertices",
#"KeyedContainer<LHCb::TwoProngVertex,Containers::KeyedObjectManager<Containers::hashmap> >":
#"TwoProngVertices",
"KeyedContainer<LHCb::CaloHypo,Containers::KeyedObjectManager<Containers::hashmap> >":
"CaloHypos",
#"KeyedContainer<LHCb::CaloCluster,Containers::KeyedObjectManager<Containers::hashmap> >":
#"CaloClusters",
#"KeyedContainer<LHCb::CaloDigit,Containers::KeyedObjectManager<Containers::hashmap> >":
#"CaloDigits",
#"KeyedContainer<LHCb::CaloAdc,Containers::KeyedObjectManager<Containers::hashmap> >":
#"CaloAdcs",
"KeyedContainer<LHCb::FlavourTag,Containers::KeyedObjectManager<Containers::hashmap> >":
"FlavourTags",
#"KeyedContainer<LHCb::WeightsVector,Containers::KeyedObjectManager<Containers::hashmap> >":
#"WeightsVectors",
"LHCb::Relation1D<LHCb::Particle,LHCb::VertexBase>":
"P2VRelations",
"LHCb::Relation1D<LHCb::Particle,LHCb::MCParticle>":
"P2MCPRelations",
"LHCb::Relation1D<LHCb::Particle,int>":
"P2IntRelations",
"LHCb::Relation1D<LHCb::Particle,LHCb::RelatedInfoMap>":
"P2InfoRelations",
"LHCb::RelationWeighted1D<LHCb::ProtoParticle,LHCb::MCParticle,double>":
"PP2MCPRelations",
"LHCb::RecSummary":
"RecSummary",
}
return type_map
def load_manifest(fname):
# needed for backwards compatibility with old style .tck.json files
__lookup = {
1006:
"LHCb::RecSummary",
1541:
"KeyedContainer<LHCb::CaloCluster,Containers::KeyedObjectManager<Containers::hashmap> >",
1542:
"KeyedContainer<LHCb::CaloDigit,Containers::KeyedObjectManager<Containers::hashmap> >",
1543:
"KeyedContainer<LHCb::CaloAdc,Containers::KeyedObjectManager<Containers::hashmap> >",
1550:
"KeyedContainer<LHCb::Event::v1::Track,Containers::KeyedObjectManager<Containers::hashmap> >",
1551:
"KeyedContainer<LHCb::CaloHypo,Containers::KeyedObjectManager<Containers::hashmap> >",
1552:
"KeyedContainer<LHCb::ProtoParticle,Containers::KeyedObjectManager<Containers::hashmap> >",
1553:
"KeyedContainer<LHCb::RecVertex,Containers::KeyedObjectManager<Containers::hashmap> >",
1554:
"KeyedContainer<LHCb::TwoProngVertex,Containers::KeyedObjectManager<Containers::hashmap> >",
1555:
"KeyedContainer<LHCb::WeightsVector,Containers::KeyedObjectManager<Containers::hashmap> >",
1560:
"LHCb::Relation1D<LHCb::Particle,LHCb::VertexBase>",
1561:
"KeyedContainer<LHCb::RichPID,Containers::KeyedObjectManager<Containers::hashmap> >",
1562:
"LHCb::RelationWeighted1D<LHCb::ProtoParticle,LHCb::MCParticle,double>",
1571:
"KeyedContainer<LHCb::MuonPID,Containers::KeyedObjectManager<Containers::hashmap> >",
1581:
"KeyedContainer<LHCb::Particle,Containers::KeyedObjectManager<Containers::hashmap> >",
1582:
"KeyedContainer<LHCb::Vertex,Containers::KeyedObjectManager<Containers::hashmap> >",
1583:
"KeyedContainer<LHCb::FlavourTag,Containers::KeyedObjectManager<Containers::hashmap> >",
1584:
"LHCb::Relation1D<LHCb::Particle,LHCb::RelatedInfoMap>",
1591:
"LHCb::Relation1D<LHCb::Particle,int>",
1660:
"LHCb::Relation1D<LHCb::Particle,LHCb::MCParticle>",
}
d = json.loads(load_file(fname))
# check for old-style .tck.json file (note: this actually has _nothing_ to do with the ANNSvc...)
ann = d.get("HltANNSvc/HltANNSvc", None)
if ann is not None:
# this is an old style .tck.json file -- adapt it on the fly
fix_prefix = lambda x, prefix: prefix + x.removeprefix(prefix + 'p') if x.startswith(prefix + 'p') else x
manifest = {
'PackedLocations':
[(functools.reduce(fix_prefix, ('/Event/Spruce/', '/Event/HLT2/'),
loc), __lookup[t % 10000])
for loc, t in ann['PackedObjectTypes'].items()]
}
else:
manifest = d
return manifest
def mc_unpackers(process='Hlt2',
filtered_mc=True,
configurables=True,
......@@ -117,12 +232,12 @@ def unpack_rawevent(bank_types=["DstData", "HltDecReports"],
if process == "Spruce" or process == "Turbo":
bank_location = make_data_with_FetchDataFromFile(stream)
else:
# FIXME: this assumes all `bank_types` co-exist in the same raw-event as `DstData` - which may, or may not, be true...
bank_location = default_raw_event(["DstData"],
raw_event_format=raw_event_format)
for rb in bank_types:
if rb not in RawBanks.keys():
RawBanks[rb] = "/Event/DAQ/RawBanks/" + rb
for rb in set(bank_types).difference(RawBanks.keys()):
RawBanks[rb] = "/Event/DAQ/RawBanks/" + rb
if configurables:
from Configurables import LHCb__UnpackRawEvent
......@@ -146,7 +261,8 @@ def unpack_rawevent(bank_types=["DstData", "HltDecReports"],
def unpackers(locations,
ann,
cfg,
mappedBuffers,
process='Hlt2',
data_type='Upgrade',
configurables=True,
......@@ -160,7 +276,7 @@ def unpackers(locations,
process (str): 'Turbo' or 'Spruce' or 'Hlt2'.
data_type (str): The data type to configure PersistRecoPacking (only option is Upgrade atm)
mc: MCParticle and MCVertices locations
ann: ANNSvc component, needed for finding linker locations
cfg: configuration needed for finding linker locations
locations: list of packed object locations to unpack
configurables (bool): set to False to use PyConf Algorithm.
output_level (int): Level of verbosity 1-8.
......@@ -174,11 +290,7 @@ def unpackers(locations,
else:
TES_ROOT = '/Event/HLT2'
inputs = make_dict(locations, TES_ROOT, ann)
outputs = {}
for k, v in inputs.items():
outputs[k] = [unpacked_prefix(l, TES_ROOT) for l in v]
outputs = make_dict(locations, TES_ROOT, cfg)
mc_loc = {}
if len(mc) > 1:
if configurables:
......@@ -193,11 +305,12 @@ def unpackers(locations,
}
prpacking = PersistRecoPacking(
annsvc=ann.getFullName(),
stream=TES_ROOT,
reco_stream=RECO,
data_type=data_type,
packed=inputs,
packed={k: [mappedBuffers] * len(v)
for k, v in outputs.items()
}, # the input for _each_ output is 'the one map of buffers'
unpacked=outputs,
mc=mc_loc)
......@@ -207,9 +320,7 @@ def unpackers(locations,
return unpack_persistreco
def decoder(locations,
ann,
process='Hlt2',
def decoder(process='Hlt2',
stream='default',
raw_event_format=0.5,
data_type='Upgrade',
......@@ -223,7 +334,6 @@ def decoder(locations,
- drives `RawEventLocation` only.
raw_event_format (float):
data_type (str): The data type to configure PersistRecoPacking.
ann: ANNSvc component, needed for finding linker locations
locations: list of packed object locations to unpack
configurables (bool): set to False to use PyConf Algorithm.
output_level (int): Level of verbosity 1-8.
......@@ -244,83 +354,58 @@ def decoder(locations,
from Configurables import HltPackedBufferDecoder
bank_location = bank_location.location
decoder = HltPackedBufferDecoder(
SourceID='Hlt2',
ANNSvc=ann.getFullName(),
SourceID=process.replace("Turbo", "Hlt2"),
RawEventLocations=bank_location,
RawBanks=RawBanks["DstData"],
DecReports=RawBanks["HltDecReports"],
OutputBuffers=locations,
OutputLevel=output_level)
else:
from PyConf.Algorithms import HltPackedBufferDecoder
# Decoder is a splitting transformer
# PyConf Algorithms expect a single output location
# so following workaorund is needed to have a list as output
import collections
location_map = collections.OrderedDict()
for l in range(len(locations)):
location_map["Buffer" + str(l)] = locations[l]
def decoder_transform(**outputs):
dict = {
location_map[k]: tes_location
for k, tes_location in outputs.items()
}
return {"OutputBuffers": locations}
decoder = HltPackedBufferDecoder(
SourceID='Hlt2',
ANNSvc=ann.getFullName(),
SourceID=process.replace("Turbo", "Hlt2"),
RawEventLocations=bank_location,
RawBanks=force_location(RawBanks["DstData"]),
DecReports=force_location(RawBanks["HltDecReports"]),
outputs={prop: None
for prop in location_map},
output_transform=decoder_transform,
OutputLevel=output_level)
return decoder
def make_dict(locations, stream, ann):
def make_dict(locations, stream, manifest):
"""Return a dictionary of object type: location list
This is needed to decide which unpacker to use for a given object
Args:
stream (str): TES location prefix
ann: ANNSvc component, needed for finding object types from json file
cfg: configuration needed for finding object types from json file
locations: list of packed object locations to unpack
"""
dict = PersistRecoPacking(stream=stream).dictionary()
ptype = ann.PackedObjectTypes
if ptype:
classids = inv_classid_map()
for loc in locations:
if loc in ptype.keys():
# ANN service doesn't allow duplicate keys/values
# When an object type is registered for a location, type number is increased by i x 10^4
# Here we undo that operation
number = ptype[loc] % 10000
type = classids[number]
dict[type] += [loc]
return dict
def make_locations(locations, stream):
dct = PersistRecoPacking(stream=stream).dictionary()
present = {loc: tpe for (loc, tpe) in manifest['PackedLocations']}
for loc in locations:
typ = present.get(loc)
if typ:
alias = type_map().get(typ)
if alias: typ = alias
if typ not in dct.keys():
print("Warning: unknown type - ", present.get(loc),
" at location ", loc, " -- skipping for now")
else:
dct[typ] += [loc]
return dct
def make_locations(manifest, stream):
"""Return a location list to unpack and decode
Args:
stream (str): TES location prefix
locations: PackedObjectLocations dict from json file
manifest: dict from json file
"""
locations_to_decode = []
for key in locations.keys():
if key.startswith(stream + "/p") or key.startswith(stream + "p"):
locations_to_decode.append(force_location(key))
for (l, t) in manifest['PackedLocations']:
# if l.startswith(stream + "/p") or l.startswith(stream + "p"):
if l.startswith(stream) and not l.startswith(stream + "/MC"):
locations_to_decode.append(force_location(l))
return locations_to_decode
......@@ -375,7 +460,7 @@ def hlt_decisions(process='Hlt2',
return decode_hlt
def do_unpacking(annsvc,
def do_unpacking(cfg,
stream="default",
locations=[],
process='Hlt2',
......@@ -390,7 +475,7 @@ def do_unpacking(annsvc,
stream (str): needed post-sprucing as RawEvent is then dependent on stream name
- drives `RawEventLocation` only.
raw_event_format (float):
annsvc: ANNSvc component, needed for finding linker locations and packed locations
cfg: configuration needed for finding linker locations and packed locations
locations: list of packed object locations to unpack.
if none is given, all locations in PackedObjectLocation are taken
"""
......@@ -400,11 +485,11 @@ def do_unpacking(annsvc,
if len(locations) == 0:
# no locations are specified, so try to unpack everything in PackedObjectLocations
locations = make_locations(annsvc.PackedObjectLocations, TES_ROOT)
locations = make_locations(cfg, TES_ROOT)
assert len(
locations
) > 0, 'Nothing to be decoded, check the locations or ANNSvc file'
) > 0, 'Nothing to be decoded, check the locations or configuration file'
# First we unpack rawbanks
algs = [
......@@ -439,11 +524,9 @@ def do_unpacking(annsvc,
output_loc="/Event/Spruce/DecReports")
]
# decode raw banks to packed data buffers
# decode raw banks to map of packed data buffers
packed_decoder = decoder(
locations=locations,
stream=stream,
ann=annsvc,
raw_event_format=raw_event_format,
process=process,
output_level=output_level,
......@@ -462,94 +545,10 @@ def do_unpacking(annsvc,
# Finally unpack packed data buffers to objects
algs += unpackers(
locations,
annsvc,
cfg,
packed_decoder.OutputBuffers,
process=process,
output_level=output_level,
configurables=configurables,
mc=mc_algs)
return algs
def get_hltAnn_dict(annsvc_json_config):
"""
Extract the HLT ANN dictionary of HLT2/Spruce locations from a .json file.
Args:
annsvc_json_config (str): path to the .json file containing the HltAnnSvc configuration.
Examples:
1. local path: path/to/json_file
2. eos path: root://eoslhcb.cern.ch//path/to/json_file
Returns:
Dict with all the HLT2/Spruce locations.
"""
tck = {}
if "root://eoslhcb.cern.ch//" in str(annsvc_json_config):
with XRootD.client.File() as f:
status, _ = f.open(str(annsvc_json_config))
if not status.ok:
raise RuntimeError(
f"could not open {annsvc_json_config}: {status.message}")
status, data = f.read()
if not status.ok:
raise RuntimeError(
f"could not read {annsvc_json_config}: {status.message}")
tck = json.loads(data.decode('utf-8'))
elif annsvc_json_config:
with open(os.path.expandvars(annsvc_json_config)) as f:
tck = json.load(f)
return tck
def set_hltAnn_svc(annsvc_json_config):
"""
Configures the Hlt ANN service to read correctly the spruced locations using the HltAnnSvc
configuration of the HLT2 application.
Args:
annsvc_json_config (str): path to the .json file containing the HltAnnSvc configuration.
Examples:
1. local path: path/to/json_file
2. eos path: root://eoslhcb.cern.ch//path/to/json_file
"""
config = ComponentConfig()
tck = get_hltAnn_dict(annsvc_json_config)
if tck:
ann_config = tck["HltANNSvc/HltANNSvc"]
hlt1_sel_ids = {
str(k): v
for k, v in ann_config["Hlt1SelectionID"].items()
}
hlt2_sel_ids = {
str(k): v
for k, v in ann_config["Hlt2SelectionID"].items()
}
spruce_sel_ids = {
str(k): v
for k, v in ann_config["SpruceSelectionID"].items()
}
packed_object_locs = {
str(k): v
for k, v in ann_config["PackedObjectLocations"].items()
}
packed_object_types = {}
if "PackedObjectTypes" in ann_config.keys():
packed_object_types = {
str(k): v
for k, v in ann_config["PackedObjectTypes"].items()
}
config.add(
setup_component(
"HltANNSvc",
Hlt1SelectionID=hlt1_sel_ids,
Hlt2SelectionID=hlt2_sel_ids,
SpruceSelectionID=spruce_sel_ids,
PackedObjectLocations=packed_object_locs,
PackedObjectTypes=packed_object_types))
else:
config.add(setup_component("HltANNSvc"))
return config
......@@ -14,12 +14,19 @@
#include "GaudiKernel/EventContext.h"
#include "GaudiKernel/StatusCode.h"
#include "Kernel/EventContextExt.h"
#include "Kernel/IANNSvc.h"
#include "Kernel/IIndexedANNSvc.h"
#include "Kernel/ISchedulerConfiguration.h"
#include <string>
#include <type_traits>
#include <vector>
namespace {
bool ends_with( std::string_view s, std::string_view suffix ) {
return s.size() >= suffix.size() && s.substr( s.size() - suffix.size() ).compare( suffix ) == 0;
}
} // namespace
/** @brief Write DecReport objects based on the status of the execution nodes in the scheduler.
*
* Each execution node of the HltControlFlowMgr is converted to a DecReport
......@@ -41,14 +48,22 @@ private:
ServiceHandle<LHCb::Interfaces::ISchedulerConfiguration> m_scheduler{this, "Scheduler", "HLTControlFlowMgr"};
/// ANNSvc for translating selection names to int selection IDs
ServiceHandle<IANNSvc> m_hltANNSvc{this, "ANNSvc", "HltANNSvc", "Service to retrieve DecReport IDs"};
ServiceHandle<IIndexedANNSvc> m_annsvc{this, "IndexedANNSvc", "HltANNSvc", "Service to retrieve DecReport IDs"};
/// Map from line name to index in the scheduler's execution node list and
/// number assigned by the HltAnnSvc
std::map<std::string, std::pair<int, IANNSvc::minor_mapped_type>> m_name_indices{};
std::map<std::string, std::pair<int, int>> m_name_indices{};
Gaudi::Property<std::vector<std::string>> m_line_names{this, "Persist", {}, "Specify the nodes to be written to TES"};
Gaudi::Property<std::string> m_ann_key{this, "ANNSvcKey", "", "Key from the ANN service to query."};
Gaudi::Property<std::string> m_ann_key{
this, "ANNSvcKey", "",
[this]( const auto& ) {
if ( m_ann_key.value().empty() || ends_with( m_ann_key.value(), "SelectionID" ) ) return;
throw GaudiException( "ANNSvcKey " + m_ann_key.value() + " does not end in \"SelectionID\"",
__PRETTY_FUNCTION__, StatusCode::FAILURE );
},
"Key used to query the ANN service"};
Gaudi::Property<unsigned int> m_key{this, "TCK", 0u};
};
DECLARE_COMPONENT( ExecutionReportsWriter )
......@@ -58,7 +73,6 @@ StatusCode ExecutionReportsWriter::start() {
if ( !sc ) return sc;
const auto& scheduler_items = m_scheduler->getNodeNamesWithIndices();
const auto& ann_items = m_hltANNSvc->items( m_ann_key.value() );
for ( const auto& name : m_line_names ) {
// The scheduler stores execution nodes as a flat list. We can cache the
// index of the execution node for each line now, and retrieve the node from
......@@ -76,7 +90,8 @@ StatusCode ExecutionReportsWriter::start() {
// Translate each decision name to an int, which will be written to the DecReport
// If the decision name isn't known to the ANNSvc we can't translate it
auto ann_idx = std::find_if( ann_items.begin(), ann_items.end(),
const auto& ann_items = m_annsvc->s2i( m_key.value(), m_ann_key.value() );
auto ann_idx = std::find_if( ann_items.begin(), ann_items.end(),
[&]( const auto& p ) { return std::get<0>( p ) == decision_name; } );
if ( ann_idx == ann_items.end() ) {
error() << "Decision name not known to ANNSvc: " << decision_name << endmsg;
......@@ -94,6 +109,7 @@ LHCb::HltDecReports ExecutionReportsWriter::operator()( EventContext const& evtC
auto const& state = lhcbExt.getSchedulerExtension<LHCb::Interfaces::ISchedulerConfiguration::State>();
LHCb::HltDecReports reports{};
reports.setConfiguredTCK( m_key );
reports.reserve( m_name_indices.size() );
for ( const auto& [i, item] : LHCb::range::enumerate( m_name_indices ) ) {
const auto& [decision_name, idx] = item;
......
......@@ -16,8 +16,10 @@ from PyConf.Algorithms import (
)
from PyConf.application import ApplicationOptions, configure, configure_input
from PyConf.control_flow import CompositeNode, NodeLogic
from Configurables import HltANNSvc
from PyConf.filecontent_metadata import key_registry
from Configurables import GitANNSvc
from Gaudi.Configuration import DEBUG
import json
prescale2 = ConfigurableDummy(name="Prescale2", CFD=2)
prescale3 = ConfigurableDummy(name="Prescale3", CFD=3)
......@@ -38,17 +40,22 @@ lines = [
CompositeNode("line3", [prescale3, a4], force_order=True),
CompositeNode("line4", [a1, a4, a5], force_order=True),
]
key_registry.add(
'deadbeef',
json.dumps({
"Hlt1SelectionID":
dict((i, n.name + 'Decision') for i, n in enumerate(lines, 1))
}))
decision = CompositeNode(
"decision", lines, combine_logic=NodeLogic.NONLAZY_OR, force_order=False)
reports_writer = ExecutionReportsWriter(
OutputLevel=DEBUG,
Persist=[line.name for line in lines],
TCK=0xdeadbeef,
ANNSvcKey="Hlt1SelectionID")
HltANNSvc().Hlt1SelectionID = {
name + 'Decision': i + 1
for i, name in enumerate(reports_writer.properties['Persist'])
}
dec_reports_monitor = HltDecReportsMonitor(
Input=reports_writer.DecReportsLocation)
exec_reports_monitor = ExecutionReportsMonitor()
......@@ -69,6 +76,7 @@ options.histo_file = "histograms.root"
options.input_type = "NONE"
options.dddb_tag = "dummy"
options.conddb_tag = "dummy"
options.write_decoding_keys_to_git = False
config2 = configure_input(options)
config = configure(options, moore)