Commits on Source (8)
......@@ -61,6 +61,7 @@ gaudi_add_executable(allen_read_mep
......@@ -93,6 +94,16 @@ gaudi_add_executable(allen_mpi_send
* (c) Copyright 2018-2020 CERN for the benefit of the LHCb Collaboration *
#include <cassert>
#include <cstring>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <map>
#include <string>
#include <unordered_set>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <fmt/format.h>
#include <boost/algorithm/string.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/program_options.hpp>
#include <GaudiKernel/Bootstrap.h>
#include <GaudiKernel/IAppMgrUI.h>
#include <GaudiKernel/IProperty.h>
#include <GaudiKernel/IStateful.h>
#include <GaudiKernel/ISvcLocator.h>
#include <GaudiKernel/MsgStream.h>
#include <GaudiKernel/SmartIF.h>
#include <Event/ODIN.h>
#include <Event/RawBank.h>
#include <Allen/read_mdf.hpp>
#include <MDF/StreamDescriptor.h>
#include <EventBuilding/MEP_tools.hpp>
#include <EventBuilding/MFP_tools.hpp>
#include <AllenOnline/ReadMEP.h>
#include <AllenOnline/TransposeMEP.h>
using namespace std;
namespace po = boost::program_options;
namespace ba = boost::algorithm;
struct CaloRawBank {
uint32_t source_id = 0;
uint32_t const* data = nullptr;
uint32_t const* end = nullptr;
// For MEP format
CaloRawBank( const uint32_t sid, const char* fragment, const uint16_t s )
: source_id{sid}
, data{reinterpret_cast<uint32_t const*>( fragment )}
, end{reinterpret_cast<uint32_t const*>( fragment + s )} {
assert( s % sizeof( uint32_t ) == 0 );
struct ODINRawBank {
uint32_t const* data = nullptr;
uint16_t size = 0;
/// Constructor from MEP layout
ODINRawBank( const uint32_t, const char* fragment, uint16_t s ) {
data = reinterpret_cast<uint32_t const*>( fragment );
size = s;
namespace {
const std::array<std::tuple<char, std::uint16_t, short>, 5> event_type_chars = {
{{'P', 0x02, 0}, {'N', 0x04, 1}, {'L', 0x08, 2}, {'1', 0x10, 3}, {'2', 0x20, 4}}};
int main( int argc, char* argv[] ) {
string filename, output_filename;
std::string tck_str;
ssize_t n_meps = 0;
unsigned run;
// Declare the supported options.
po::options_description desc( "Allowed options" );
// clang-format off
( "help,h", "produce help message" )
( "filename", po::value<string>( &filename ), "filename pattern" )
( "output", po::value<string>( &output_filename ), "output filename" )
( "n_meps,n", po::value<ssize_t>( &n_meps ), "number of MEPs" )
( "tck", po::value<std::string>( &tck_str ), "new TCK" )
( "run", po::value<unsigned>( &run ), "new run number" );
// clang-format on
po::positional_options_description p;
p.add( "filename", 1 );
p.add( "n_meps", 1 );
p.add( "output", 1 );
po::variables_map vm;
po::store( po::command_line_parser( argc, argv ).options( desc ).positional( p ).run(), vm );
po::notify( vm );
if ( vm.count( "help" ) ) {
std::cout << desc << "\n";
return 1;
unsigned tck = std::stoi(tck_str, nullptr, 16);
SmartIF<IStateful> app = Gaudi::createApplicationMgr();
auto prop =<IProperty>();
bool sc = prop->setProperty( "JobOptionsType", "\"NONE\"" ).isSuccess();
sc &= app->configure();
sc &= app->initialize();
sc &= app->start();
if ( !sc ) { return 1; }
SmartIF<ISvcLocator> sloc =<ISvcLocator>();
auto msgSvc = sloc->service<IMessageSvc>( "MessageSvc" );
MsgStream info{msgSvc.get(), "allen_mep_read"};
auto gaudi_exit = [&app]( int code ) {
bool sc = app->stop().isSuccess();
sc &= app->finalize();
return code & !sc;
// Some storage for reading the events into
bool eof = false, success = false;
auto input = LHCb::StreamDescriptor::bind( filename );
if ( input.ioDesc != 0 ) {
info << "Opened " << filename << endmsg;
} else {
info << "Failed to open file " << filename << " " << strerror( errno ) << endmsg;
return gaudi_exit( 1 );
vector<char> data;
EventIDs event_ids;
MEP::Slices mep_slices( 1 );
auto& slice = mep_slices[0];
for ( ssize_t i_mep = 0; ( n_meps == -1 || i_mep < n_meps ) && !eof; ++i_mep ) {
std::tie( eof, success, slice.mep, slice.packing_factor, slice.mep_data ) = MEP::read_mep( input, data, info );
auto const* mep = slice.mep;
if ( !success ) {
return gaudi_exit( 1 );
} else {
std::cout << "Read mep with packing factor " << slice.packing_factor << " #MFPs: " << mep->header.n_MFPs << "\n";
if ( i_mep == 0 ) {
slice.blocks.resize( mep->header.n_MFPs, MEP::Blocks::value_type{} );
slice.offsets.resize( mep->header.n_MFPs );
for ( auto& offsets : slice.offsets ) { offsets.resize( slice.packing_factor + 1 ); }
std::function<void( size_t )> bad_mfp = []( size_t source_id ) {
auto const* sd = SourceId_sysstr( source_id );
std::cout << "ERROR: bad MFP for " << sd << " with source ID " << source_id << "\n";
auto odin_block_index = MEP::find_blocks( mep, slice.blocks, bad_mfp );
if (!odin_block_index) {
std::cout << "ERROR: No ODIN MFP" << "\n";
return gaudi_exit( 1 );
MEP::fragment_offsets( slice.blocks, slice.offsets );
auto const& odin_block = slice.blocks[*odin_block_index];
std::cout << "MEP with packing: " << std::setw( 4 ) << odin_block.header->n_banks
<< " event_id: " << std::setw( 6 ) << odin_block.header->ev_id << "\n";
// Decode first ODIN
auto const& odin_offsets = slice.offsets[*odin_block_index];
auto decode_odin = [version = odin_block.header->block_version, &odin_block, &odin_offsets]( unsigned event_number ) {
auto const* odin_data = reinterpret_cast<unsigned const*>(odin_block.payload + odin_offsets[event_number]);
LHCb::ODIN odin;
if ( version == 7 ) {
odin = LHCb::ODIN{ {odin_data, 10} };
} else {
odin = LHCb::ODIN::from_version<6>( {odin_data, 10} );
return odin;
auto first_odin = decode_odin( 0 );
std::cout << "ODIN version: " << static_cast<unsigned>(first_odin.version()) << " run: " << std::setw( 7 ) << first_odin.runNumber()
<< " event: " << std::setw( 12 ) << first_odin.eventNumber() << "\n";
if (!tck_str.empty()) {
std::cout << "Changing TCK from " << fmt::format("{:#010x}", first_odin.triggerConfigurationKey()) << " to " << fmt::format("{:#010x}", tck) << "\n";
if (run != 0) {
std::cout << "Changing run from " << std::to_string(first_odin.runNumber()) << " to " << std::to_string(run) << "\n";
char* payload = const_cast<char*>(odin_block.payload);
for ( unsigned evt = 0; evt < slice.packing_factor; ++evt ) {
auto* odin_data = reinterpret_cast<unsigned*>(payload + odin_offsets[evt]);
using namespace LHCb::ODINImplementation;
if (!tck_str.empty()) {
details::set_bits<LHCb::ODIN::TriggerConfigurationKeySize, LHCb::ODIN::TriggerConfigurationKeyOffset>( {odin_data, 10}, tck );
if (run != 0) {
details::set_bits<LHCb::ODIN::RunNumberSize, LHCb::ODIN::RunNumberOffset>( {odin_data, 10}, run );
std::ofstream output_file{output_filename, std::ios::binary};
output_file.write(, slice.mep_data.size());
return gaudi_exit( 0 );
......@@ -15,6 +15,8 @@
#include <sys/types.h>
#include <unistd.h>
#include <fmt/format.h>
#include <boost/algorithm/string.hpp>
#include <boost/format.hpp>
#include <boost/lexical_cast.hpp>
......@@ -80,19 +82,6 @@ namespace {
{{'P', 0x02, 0}, {'N', 0x04, 1}, {'L', 0x08, 2}, {'1', 0x10, 3}, {'2', 0x20, 4}}};
std::vector<char> contiguous_mfps( Allen::Slice const& mep_data ) {
// To make direct use of the offsets, the MFPs need to be copied
// into temporary storage
auto const& mfps = mep_data.fragments;
vector<char> mep_fragments( mep_data.fragments_mem_size, '\0' );
char* destination = &mep_fragments[0];
for ( gsl::span<char const> mfp : mfps ) {
::memcpy( destination,, mfp.size_bytes() );
destination += mfp.size_bytes();
return mep_fragments;
int main( int argc, char* argv[] ) {
string filename;
......@@ -102,12 +91,15 @@ int main( int argc, char* argv[] ) {
// Declare the supported options.
po::options_description desc( "Allowed options" );
desc.add_options()( "help,h", "produce help message" )( "filename", po::value<string>( &filename ),
"filename pattern" )(
"n_meps,n", po::value<ssize_t>( &n_meps ),
"number of events" )( "skip,s", po::value<size_t>( &n_skip )->default_value( 0 ), "number of events to skip" )(
"dump", po::value<string>( &dump ), "dump bank content (source_id_type,start_event[,end_event],bank_number" )(
"count-banks", "count raw banks by bank type" );
// clang-format off
( "help,h", "produce help message" )
( "filename", po::value<string>( &filename ), "filename pattern" )
( "n_meps,n", po::value<ssize_t>( &n_meps ), "number of MEPs" )
( "skip,s", po::value<size_t>( &n_skip )->default_value( 0 ), "number of events to skip" )
( "dump", po::value<string>( &dump ), "dump bank content (source_id_type,start_event[,end_event],bank_number" )
( "count-banks", "count raw banks by bank type" );
// clang-format on
po::positional_options_description p;
p.add( "filename", 1 );
......@@ -230,8 +222,6 @@ int main( int argc, char* argv[] ) {
for ( auto& offsets : slice.offsets ) { offsets.resize( slice.packing_factor + 1 ); }
if ( n_skip != 0 && n_skip-- > 0 ) continue;
std::function<void( size_t )> bad_mfp = []( size_t source_id ) {
auto const* sd = SourceId_sysstr( source_id );
std::cout << "ERROR: bad MFP for " << sd << " with source ID " << source_id << "\n";
......@@ -268,7 +258,8 @@ int main( int argc, char* argv[] ) {
// Decode first ODIN
auto const odin_index = to_integral( BankTypes::ODIN );
auto const& odin_slice = bank_slices[odin_index][0];
auto odin_banks = contiguous_mfps( odin_slice );
// There is always only a single ODIN MFP
auto odin_banks = odin_slice.fragments[0];
auto decode_odin = [& versions = banks_version, &odin_slice, &odin_banks]( unsigned event_number ) {
auto const& odin_offsets = odin_slice.offsets;
......@@ -286,7 +277,10 @@ int main( int argc, char* argv[] ) {
auto first_odin = decode_odin( 0 );
std::cout << "ODIN version: " << first_odin.version() << " run: " << std::setw( 7 ) << first_odin.runNumber()
auto const tck = first_odin.triggerConfigurationKey();
std::cout << "ODIN version: " << static_cast<unsigned>(first_odin.version())
<< " TCK " << fmt::format( "{:#010x}", tck )
<< " run: " << std::setw( 7 ) << first_odin.runNumber()
<< " event: " << std::setw( 12 ) << first_odin.eventNumber() << "\n";
// Print block information
......@@ -331,7 +325,10 @@ int main( int argc, char* argv[] ) {
<< padded_size << "\n";
size_t skip = n_skip;
for ( unsigned evt = 0; evt < slice.packing_factor; ++evt ) {
if ( skip != 0 && skip-- > 0 ) continue;
// Count bank types and DAQ errors per SD
auto bank_type = block.bank_types[evt];
if ( daq_errors.count( static_cast<LHCb::RawBank::BankType>( bank_type ) ) ) {
......@@ -342,6 +339,8 @@ int main( int argc, char* argv[] ) {
if ( dump_block ) {
for ( unsigned evt = dump_start; evt < ( dump_end == -1 ? slice.packing_factor : dump_end ); ++evt ) {
if ( n_skip != 0 && n_skip-- > 0 ) continue;
if ( dump_type == "ODIN" ) {
auto odin = decode_odin( evt );
......@@ -371,6 +370,9 @@ int main( int argc, char* argv[] ) {
else if ( n_skip != 0 ) {
n_skip -= std::min(n_skip, size_t{slice.packing_factor});
......@@ -80,7 +80,7 @@ namespace MEP {
std::tuple<bool, std::array<unsigned int, NBankTypes>, std::array<int, NBankTypes>> fill_counts( EB::MEP const* mep );
bool find_blocks( EB::MEP const* mep, Blocks& blocks, std::function<void( size_t )> const& badMFP );
std::optional<size_t> find_blocks( EB::MEP const* mep, Blocks& blocks, std::function<void( size_t )> const& badMFP );
void fragment_offsets( Blocks const& blocks, std::vector<std::vector<uint32_t>>& offsets );
......@@ -43,6 +43,7 @@ try:
dddb_tag = "run3/" + dddb_tag
conddb_tag = OnlineEnv.CondDBTag
initial_tck = getattr(OnlineEnv, "InitialTCK", 0)
ignore_odin_tck = int(os.getenv("IGNORE_ODIN_TCK", "0"))
except ImportError:
run_online = False
output_level = 3
......@@ -52,6 +53,7 @@ except ImportError:
dddb_tag = 'run3/trunk'
conddb_tag = 'master'
initial_tck = 0
ignore_odin_tck = 1
integration_test = False
......@@ -173,6 +175,7 @@ allen_conf.Partition = partition
allen_conf.PartitionBuffers = True
allen_conf.PartitionID = partition_id
allen_conf.EnableRunChanges = UseDD4Hep
allen_conf.TCKFromODIN = not ignore_odin_tck
if run_online:
from Configurables import Allen__MBMOutput as MBMOutput
......@@ -256,10 +259,8 @@ messageSvc.OutputLevel = 3
# Add the services that will produce the non-event-data
monSink = OnlMonitorSink(
CountersToPublish=[("Bursts", "IN"), ("Bursts", "OUT"), ("Events", "IN"),
("Events", "OUT"), ("Events", "MB_IN"),
("Events", "MB_OUT"), ("gather_selections",
CountersToPublish=[("Bursts", ".*"), ("Events", ".*"), ("MBMOutput", ".*"),
("gather_selections", "Hlt1.*Pass"),
("gather_selections", "Hlt1.*Rate"),
("velo_consolidate_tracks_ca09ac3f", "n_velo_tracks"),
("scifi_consolidate_seeds_7b68a248", "n_seed_tracks"),
......@@ -198,7 +198,7 @@ int AllenApplication::configureApplication() {
return Online::ONLINE_ERROR;
std::string const& sequence = m_allenConfig->sequence();
auto const& [sequence, source] = m_allenConfig->sequence();
if ( sequence.empty() ) {
m_logger->error( "Failed to obtain sequence" );
return Online::ONLINE_ERROR;
......@@ -211,6 +211,7 @@ int AllenApplication::configureApplication() {
{"params", m_allenConfig->paramDir},
{"device", m_allenConfig->device.value()},
{"s", std::to_string( m_nSlices )},
{"tck-from-odin", std::to_string( m_allenConfig->tckFromODIN.value() )},
{"disable-run-changes", std::to_string( !m_allenConfig->runChanges.value() )},
{"monitoring-filename", ""}};
......@@ -223,7 +224,7 @@ int AllenApplication::configureApplication() {
m_allenControl = m_zmqSvc->socket( zmq::PAIR );
m_allenControl->bind( m_controlConnection.c_str() );
m_allenThread = std::thread{&AllenApplication::allenLoop, this, std::move( sequence )};
m_allenThread = std::thread{&AllenApplication::allenLoop, this, sequence, source};
zmq::pollitem_t items[] = {{*m_allenControl, 0, zmq::POLLIN, 0}};
m_zmqSvc->poll( &items[0], 1, -1 );
......@@ -292,8 +293,8 @@ int AllenApplication::continueProcessing() {
return OnlineApplication::continueProcessing();
void AllenApplication::allenLoop( std::string config ) {
auto status = allen( m_options, config, this, m_provider, m_output, m_zmqSvc.get(), m_controlConnection );
void AllenApplication::allenLoop( std::string_view config, std::string_view config_source ) {
auto status = allen( m_options, config, config_source, this, m_provider, m_output, m_zmqSvc.get(), m_controlConnection );
if ( status != 0 ) {
m_logger->error( "Allen event loop exited with error" );
......@@ -61,7 +61,7 @@ public:
int continueProcessing() override;
// Main function running the Allen event loop
void allenLoop( std::string config );
void allenLoop( std::string_view config, std::string_view config_source );
bool initMPI();
......@@ -29,12 +29,15 @@ StatusCode AllenConfiguration::queryInterface( const InterfaceID& riid, void** p
AllenConfiguration::AllenConfiguration( std::string name, ISvcLocator* svcloc ) : Service( name, svcloc ) {}
std::string AllenConfiguration::sequence() const {
if ( !m_sequence.empty() ) { return m_sequence; }
std::tuple<std::string, std::string> AllenConfiguration::sequence() const {
if ( !m_sequence.empty() ) { return {m_sequence, "json"}; }
std::string source;
if ( !m_tck.empty() ) {
// Load from TCK
auto repo = m_json.value();
source = repo + ":" + m_tck;
LHCb::TCK::Info tck_info{};
std::tie( m_sequence, tck_info ) = Allen::sequence_from_git( repo, m_tck );
......@@ -57,9 +60,10 @@ std::string AllenConfiguration::sequence() const {
} else {
info() << "Configuring Allen from JSON file " << m_json << endmsg;
m_sequence = std::string{std::istreambuf_iterator<char>{sequence_file}, std::istreambuf_iterator<char>{}};
source = m_json.value();
return m_sequence;
return {m_sequence, source};
AllenConfiguration::~AllenConfiguration() {}
......@@ -13,6 +13,7 @@
#include <filesystem>
#include <GaudiKernel/Service.h>
#include <Allen/Provider.h>
class AllenConfiguration : public Service {
......@@ -29,7 +30,7 @@ public:
virtual ~AllenConfiguration();
std::string sequence() const;
std::tuple<std::string, std::string> sequence() const;
Gaudi::Property<float> stopTimeout{this, "StopTimeout", 5.};
Gaudi::Property<unsigned int> nThreads{this, "NThreads", 8};
......@@ -51,6 +52,7 @@ public:
Gaudi::Property<unsigned> partitionID{this, "PartitionID", 0};
Gaudi::Property<bool> partitionBuffers{this, "PartitionBuffers", true};
Gaudi::Property<std::string> partition{this, "Partition", ""};
Gaudi::Property<bool> tckFromODIN{this, "TCKFromODIN", true};
static std::string resolveEnvVars( std::string s ) {
......@@ -70,12 +72,11 @@ private:
auto const json_path = resolveEnvVars( m_json.value() );
std::regex tck_option{"([^:]+):(0x[a-fA-F0-9]{8})"};
std::smatch tck_match;
if ( std::regex_match( json_path, tck_match, tck_option ) ) {
std::filesystem::path j{tck_match.str( 1 )};
m_tck = tck_match.str( 2 );
auto const json_path = resolveEnvVars( m_json.value() );
auto [from_tck, repo, tck] = Allen::config_from_tck( json_path );
if ( from_tck ) {
std::filesystem::path j{repo};
m_tck = tck;
if ( !std::filesystem::exists( j ) || !std::filesystem::is_directory( j ) ) {
throw GaudiException{"Git repository " + json_path + " does not exist or is not a directory", name(),
......@@ -291,15 +291,14 @@ StatusCode MEPProvider::initialize() {
m_allenConfig = config.get();
std::string const& sequence = m_allenConfig->sequence();
if ( sequence.empty() ) {
error() << "Failed to obtain sequence" << endmsg;
return StatusCode::FAILURE;
if ( m_all_bank_types.value() ) {
m_bank_types = DataBankTypes;
} else {
auto const& [sequence, source] = m_allenConfig->sequence();
if ( sequence.empty() ) {
error() << "Failed to obtain sequence" << endmsg;
return StatusCode::FAILURE;
const auto config_reader = ConfigurationReader( sequence );
m_bank_types = config_reader.configured_bank_types();
......@@ -528,20 +527,6 @@ bool MEPProvider::release_buffers() {
if ( bmid != MBM_INV_DESC ) { ::mbm_exclude( bmid ); }
m_bmIDs[b] = MBM_INV_DESC;
for ( auto& status : m_buffer_status ) {
status.writable = true;
status.work_counter = 0;
m_buffer_reading = m_buffer_status.begin();
for ( size_t i = 0; i < m_slice_free.size(); ++i ) m_slice_free[i] = true;
for ( size_t i = 0; i < m_odins.size(); ++i ) m_odins[i].fill( 0 );
for ( size_t i = 0; i < m_slice_to_buffer.size(); ++i ) m_slice_to_buffer[i] = {-1, 0, 0};
return true;
......@@ -63,9 +63,9 @@ LHCb::ODIN MEP::decode_odin( char const* odin_data, unsigned const offset, unsig
bool MEP::find_blocks( EB::MEP const* mep, Blocks& blocks, std::function<void( size_t )> const& bad_mfp ) {
std::optional<size_t> MEP::find_blocks( EB::MEP const* mep, Blocks& blocks, std::function<void( size_t )> const& bad_mfp ) {
// Fill blocks in temporary container
bool found_odin = false;
std::optional<size_t> odin_block_index;
size_t n_good = 0;
blocks.resize( mep->header.n_MFPs );
......@@ -73,8 +73,8 @@ bool MEP::find_blocks( EB::MEP const* mep, Blocks& blocks, std::function<void( s
EB::MFP const* mfp = mep->at( i_block );
if ( mfp->is_header_valid() ) {
blocks[n_good] = Block{mfp};
if ( SourceId_sys( mfp->header.src_id ) == SourceIdSys::SourceIdSys_ODIN ) odin_block_index = n_good;
if ( SourceId_sys( mfp->header.src_id ) == SourceIdSys::SourceIdSys_ODIN ) found_odin = true;
} else {
bad_mfp( mep->header.src_ids()[i_block] );
......@@ -89,7 +89,7 @@ bool MEP::find_blocks( EB::MEP const* mep, Blocks& blocks, std::function<void( s
[]( size_t s, const MEP::Block& b ) { return s + b.header->bytes(); } );
assert( total_block_size <= mep->bytes() );
return found_odin;
return odin_block_index;
void MEP::fragment_offsets( MEP::Blocks const& blocks, MEP::SourceOffsets& offsets ) {
......@@ -47,11 +47,9 @@ if not UseDD4Hep:
app.DDDBtag = "upgrade/dddb-20221004"
app.CondDBtag = "upgrade/mu_VP_SciFi_macromicrosurvey_from20220923"
app.DDDBtag = "trunk"
app.DDDBtag = "run3/trunk"
app.CondDBtag = "master"
# Decode VP, UT, FT and muons
check_seq = GaudiSequencer("CheckODINSeq")
unpack_raw = UnpackRawEvent(