Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • lhcb/MooreOnline
1 result
Show changes
Commits on Source (2)
Showing
with 354 additions and 126 deletions
......@@ -61,6 +61,7 @@ gaudi_add_executable(allen_read_mep
AllenOnlineLib
LHCb::MDFLib
Boost::program_options
fmt::fmt
)
gaudi_add_executable(allen_bench_mep_offsets
......@@ -93,6 +94,16 @@ gaudi_add_executable(allen_mpi_send
Boost::program_options
)
gaudi_add_executable(allen_change_mep
SOURCES
application/change_mep.cpp
LINK
AllenOnlineLib
LHCb::MDFLib
Boost::program_options
fmt::fmt
)
# MPI
if (TARGET MPI::MPI_CXX AND MPI_CXX_COMPILER AND TARGET PkgConfig::hwloc)
message(STATUS "Found MPI ${MPI_CXX_VERSION}: ${MPI_CXX_COMPILER}")
......
/*****************************************************************************\
* (c) Copyright 2018-2020 CERN for the benefit of the LHCb Collaboration *
\*****************************************************************************/
#include <cassert>
#include <cstring>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <map>
#include <string>
#include <unordered_set>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <fmt/format.h>
#include <boost/lexical_cast.hpp>
#include <boost/program_options.hpp>
#include <GaudiKernel/Bootstrap.h>
#include <GaudiKernel/IAppMgrUI.h>
#include <GaudiKernel/IProperty.h>
#include <GaudiKernel/IStateful.h>
#include <GaudiKernel/ISvcLocator.h>
#include <GaudiKernel/MsgStream.h>
#include <GaudiKernel/SmartIF.h>
#include <Event/ODIN.h>
#include <Event/RawBank.h>
#include <Allen/read_mdf.hpp>
#include <MDF/StreamDescriptor.h>
#include <EventBuilding/MEP_tools.hpp>
#include <EventBuilding/MFP_tools.hpp>
#include <AllenOnline/ReadMEP.h>
#include <AllenOnline/TransposeMEP.h>
using namespace std;
namespace po = boost::program_options;
int main( int argc, char* argv[] ) {
string filename, output_filename;
std::string tck_str;
ssize_t n_meps = 0;
unsigned run;
// Declare the supported options.
po::options_description desc( "Allowed options" );
// clang-format off
desc.add_options()
( "help,h", "produce help message" )
( "filename", po::value<string>( &filename ), "filename pattern" )
( "output", po::value<string>( &output_filename ), "output filename" )
( "n_meps,n", po::value<ssize_t>( &n_meps ), "number of MEPs" )
( "tck", po::value<std::string>( &tck_str ), "new TCK" )
( "run", po::value<unsigned>( &run ), "new run number" );
// clang-format on
po::positional_options_description p;
p.add( "filename", 1 );
p.add( "n_meps", 1 );
p.add( "output", 1 );
po::variables_map vm;
po::store( po::command_line_parser( argc, argv ).options( desc ).positional( p ).run(), vm );
po::notify( vm );
if ( vm.count( "help" ) ) {
std::cout << desc << "\n";
return 1;
}
unsigned tck = std::stoi( tck_str, nullptr, 16 );
SmartIF<IStateful> app = Gaudi::createApplicationMgr();
auto prop = app.as<IProperty>();
bool sc = prop->setProperty( "JobOptionsType", "\"NONE\"" ).isSuccess();
sc &= app->configure();
sc &= app->initialize();
sc &= app->start();
if ( !sc ) { return 1; }
SmartIF<ISvcLocator> sloc = app.as<ISvcLocator>();
auto msgSvc = sloc->service<IMessageSvc>( "MessageSvc" );
MsgStream info{msgSvc.get(), "allen_mep_read"};
info.activate();
auto gaudi_exit = [&app]( int code ) {
bool sc = app->stop().isSuccess();
sc &= app->finalize();
return code & !sc;
};
// Some storage for reading the events into
bool eof = false, success = false;
auto input = LHCb::StreamDescriptor::bind( filename );
if ( input.ioDesc != 0 ) {
info << "Opened " << filename << endmsg;
} else {
info << "Failed to open file " << filename << " " << strerror( errno ) << endmsg;
return gaudi_exit( 1 );
}
vector<char> data;
EventIDs event_ids;
MEP::Slices mep_slices( 1 );
auto& slice = mep_slices[0];
for ( ssize_t i_mep = 0; ( n_meps == -1 || i_mep < n_meps ) && !eof; ++i_mep ) {
std::tie( eof, success, slice.mep, slice.packing_factor, slice.mep_data ) = MEP::read_mep( input, data, info );
auto const* mep = slice.mep;
if ( !success ) {
return gaudi_exit( 1 );
} else {
std::cout << "Read mep with packing factor " << slice.packing_factor << " #MFPs: " << mep->header.n_MFPs << "\n";
}
if ( i_mep == 0 ) {
slice.blocks.resize( mep->header.n_MFPs, MEP::Blocks::value_type{} );
slice.offsets.resize( mep->header.n_MFPs );
for ( auto& offsets : slice.offsets ) { offsets.resize( slice.packing_factor + 1 ); }
}
std::function<void( size_t )> bad_mfp = []( size_t source_id ) {
auto const* sd = SourceId_sysstr( source_id );
std::cout << "ERROR: bad MFP for " << sd << " with source ID " << source_id << "\n";
};
auto odin_block_index = MEP::find_blocks( mep, slice.blocks, bad_mfp );
if ( !odin_block_index ) {
std::cout << "ERROR: No ODIN MFP"
<< "\n";
return gaudi_exit( 1 );
}
MEP::fragment_offsets( slice.blocks, slice.offsets );
auto const& odin_block = slice.blocks[*odin_block_index];
std::cout << "MEP with packing: " << std::setw( 4 ) << odin_block.header->n_banks << " event_id: " << std::setw( 6 )
<< odin_block.header->ev_id << "\n";
// Decode first ODIN
auto const& odin_offsets = slice.offsets[*odin_block_index];
auto decode_odin = [version = odin_block.header->block_version, &odin_block,
&odin_offsets]( unsigned event_number ) {
auto const* odin_data = reinterpret_cast<unsigned const*>( odin_block.payload + odin_offsets[event_number] );
LHCb::ODIN odin;
if ( version == 7 ) {
odin = LHCb::ODIN{{odin_data, 10}};
} else {
odin = LHCb::ODIN::from_version<6>( {odin_data, 10} );
}
return odin;
};
auto first_odin = decode_odin( 0 );
std::cout << "ODIN version: " << static_cast<unsigned>( first_odin.version() ) << " run: " << std::setw( 7 )
<< first_odin.runNumber() << " event: " << std::setw( 12 ) << first_odin.eventNumber() << "\n";
if ( !tck_str.empty() ) {
std::cout << "Changing TCK from " << fmt::format( "{:#010x}", first_odin.triggerConfigurationKey() ) << " to "
<< fmt::format( "{:#010x}", tck ) << "\n";
}
if ( run != 0 ) {
std::cout << "Changing run from " << std::to_string( first_odin.runNumber() ) << " to " << std::to_string( run )
<< "\n";
}
char* payload = const_cast<char*>( odin_block.payload );
for ( unsigned evt = 0; evt < slice.packing_factor; ++evt ) {
auto* odin_data = reinterpret_cast<unsigned*>( payload + odin_offsets[evt] );
using namespace LHCb::ODINImplementation;
if ( !tck_str.empty() ) {
details::set_bits<LHCb::ODIN::TriggerConfigurationKeySize, LHCb::ODIN::TriggerConfigurationKeyOffset>(
{odin_data, 10}, tck );
}
if ( run != 0 ) {
details::set_bits<LHCb::ODIN::RunNumberSize, LHCb::ODIN::RunNumberOffset>( {odin_data, 10}, run );
}
}
}
std::ofstream output_file{output_filename, std::ios::binary};
output_file.write( slice.mep_data.data(), slice.mep_data.size() );
output_file.close();
return gaudi_exit( 0 );
}
......@@ -15,6 +15,8 @@
#include <sys/types.h>
#include <unistd.h>
#include <fmt/format.h>
#include <boost/algorithm/string.hpp>
#include <boost/format.hpp>
#include <boost/lexical_cast.hpp>
......@@ -80,19 +82,6 @@ namespace {
{{'P', 0x02, 0}, {'N', 0x04, 1}, {'L', 0x08, 2}, {'1', 0x10, 3}, {'2', 0x20, 4}}};
}
std::vector<char> contiguous_mfps( Allen::Slice const& mep_data ) {
// To make direct use of the offsets, the MFPs need to be copied
// into temporary storage
auto const& mfps = mep_data.fragments;
vector<char> mep_fragments( mep_data.fragments_mem_size, '\0' );
char* destination = &mep_fragments[0];
for ( gsl::span<char const> mfp : mfps ) {
::memcpy( destination, mfp.data(), mfp.size_bytes() );
destination += mfp.size_bytes();
}
return mep_fragments;
}
int main( int argc, char* argv[] ) {
string filename;
......@@ -102,12 +91,15 @@ int main( int argc, char* argv[] ) {
// Declare the supported options.
po::options_description desc( "Allowed options" );
desc.add_options()( "help,h", "produce help message" )( "filename", po::value<string>( &filename ),
"filename pattern" )(
"n_meps,n", po::value<ssize_t>( &n_meps ),
"number of events" )( "skip,s", po::value<size_t>( &n_skip )->default_value( 0 ), "number of events to skip" )(
"dump", po::value<string>( &dump ), "dump bank content (source_id_type,start_event[,end_event],bank_number" )(
"count-banks", "count raw banks by bank type" );
// clang-format off
desc.add_options()
( "help,h", "produce help message" )
( "filename", po::value<string>( &filename ), "filename pattern" )
( "n_meps,n", po::value<ssize_t>( &n_meps ), "number of MEPs" )
( "skip,s", po::value<size_t>( &n_skip )->default_value( 0 ), "number of events to skip" )
( "dump", po::value<string>( &dump ), "dump bank content (source_id_type,start_event[,end_event],bank_number" )
( "count-banks", "count raw banks by bank type" );
// clang-format on
po::positional_options_description p;
p.add( "filename", 1 );
......@@ -230,8 +222,6 @@ int main( int argc, char* argv[] ) {
for ( auto& offsets : slice.offsets ) { offsets.resize( slice.packing_factor + 1 ); }
}
if ( n_skip != 0 && n_skip-- > 0 ) continue;
std::function<void( size_t )> bad_mfp = []( size_t source_id ) {
auto const* sd = SourceId_sysstr( source_id );
std::cout << "ERROR: bad MFP for " << sd << " with source ID " << source_id << "\n";
......@@ -268,7 +258,8 @@ int main( int argc, char* argv[] ) {
// Decode first ODIN
auto const odin_index = to_integral( BankTypes::ODIN );
auto const& odin_slice = bank_slices[odin_index][0];
auto odin_banks = contiguous_mfps( odin_slice );
// There is always only a single ODIN MFP
auto odin_banks = odin_slice.fragments[0];
auto decode_odin = [& versions = banks_version, &odin_slice, &odin_banks]( unsigned event_number ) {
auto const& odin_offsets = odin_slice.offsets;
......@@ -285,8 +276,10 @@ int main( int argc, char* argv[] ) {
return odin;
};
auto first_odin = decode_odin( 0 );
std::cout << "ODIN version: " << first_odin.version() << " run: " << std::setw( 7 ) << first_odin.runNumber()
auto first_odin = decode_odin( 0 );
auto const tck = first_odin.triggerConfigurationKey();
std::cout << "ODIN version: " << static_cast<unsigned>( first_odin.version() ) << " TCK "
<< fmt::format( "{:#010x}", tck ) << " run: " << std::setw( 7 ) << first_odin.runNumber()
<< " event: " << std::setw( 12 ) << first_odin.eventNumber() << "\n";
// Print block information
......@@ -331,7 +324,10 @@ int main( int argc, char* argv[] ) {
<< padded_size << "\n";
}
size_t skip = n_skip;
for ( unsigned evt = 0; evt < slice.packing_factor; ++evt ) {
if ( skip != 0 && skip-- > 0 ) continue;
// Count bank types and DAQ errors per SD
auto bank_type = block.bank_types[evt];
if ( daq_errors.count( static_cast<LHCb::RawBank::BankType>( bank_type ) ) ) {
......@@ -342,6 +338,8 @@ int main( int argc, char* argv[] ) {
if ( dump_block ) {
for ( unsigned evt = dump_start; evt < ( dump_end == -1 ? slice.packing_factor : dump_end ); ++evt ) {
if ( n_skip != 0 && n_skip-- > 0 ) continue;
if ( dump_type == "ODIN" ) {
auto odin = decode_odin( evt );
......@@ -370,6 +368,8 @@ int main( int argc, char* argv[] ) {
MDF::dump_hex( block.payload + slice.offsets[i_block][evt], block.bank_sizes[evt] );
}
}
} else if ( n_skip != 0 ) {
n_skip -= std::min( n_skip, size_t{slice.packing_factor} );
}
}
}
......
......@@ -80,7 +80,7 @@ namespace MEP {
*/
std::tuple<bool, std::array<unsigned int, NBankTypes>, std::array<int, NBankTypes>> fill_counts( EB::MEP const* mep );
bool find_blocks( EB::MEP const* mep, Blocks& blocks, std::function<void( size_t )> const& badMFP );
std::optional<size_t> find_blocks( EB::MEP const* mep, Blocks& blocks, std::function<void( size_t )> const& badMFP );
void fragment_offsets( Blocks const& blocks, std::vector<std::vector<uint32_t>>& offsets );
......
......@@ -43,6 +43,7 @@ try:
dddb_tag = "run3/" + dddb_tag
conddb_tag = OnlineEnv.CondDBTag
initial_tck = getattr(OnlineEnv, "InitialTCK", 0)
ignore_odin_tck = int(os.getenv("IGNORE_ODIN_TCK", "0"))
except ImportError:
run_online = False
output_level = 3
......@@ -52,6 +53,7 @@ except ImportError:
dddb_tag = 'run3/trunk'
conddb_tag = 'master'
initial_tck = 0
ignore_odin_tck = 1
integration_test = False
......@@ -173,6 +175,7 @@ allen_conf.Partition = partition
allen_conf.PartitionBuffers = True
allen_conf.PartitionID = partition_id
allen_conf.EnableRunChanges = UseDD4Hep
allen_conf.TCKFromODIN = not ignore_odin_tck
if run_online:
from Configurables import Allen__MBMOutput as MBMOutput
......@@ -256,10 +259,9 @@ messageSvc.OutputLevel = 3
# Add the services that will produce the non-event-data
monSink = OnlMonitorSink(
CountersToPublish=[("Bursts", "IN"), ("Bursts", "OUT"), ("Events", "IN"),
("Events", "OUT"), ("Events", "MB_IN"),
("Events", "MB_OUT"), ("gather_selections",
"Hlt1.*Pass"),
CountersToPublish=[("Bursts", ".*"), ("Events", ".*"), ("MBMOutput", ".*"),
("EventLoop", ".*"), ("gather_selections",
"Hlt1.*Pass"),
("gather_selections", "Hlt1.*Rate"),
("velo_consolidate_tracks_ca09ac3f", "n_velo_tracks"),
("scifi_consolidate_seeds_7b68a248", "n_seed_tracks"),
......
......@@ -198,7 +198,7 @@ int AllenApplication::configureApplication() {
return Online::ONLINE_ERROR;
}
std::string const& sequence = m_allenConfig->sequence();
auto const& [sequence, source] = m_allenConfig->sequence();
if ( sequence.empty() ) {
m_logger->error( "Failed to obtain sequence" );
return Online::ONLINE_ERROR;
......@@ -211,6 +211,7 @@ int AllenApplication::configureApplication() {
{"params", m_allenConfig->paramDir},
{"device", m_allenConfig->device.value()},
{"s", std::to_string( m_nSlices )},
{"tck-from-odin", std::to_string( m_allenConfig->tckFromODIN.value() )},
{"disable-run-changes", std::to_string( !m_allenConfig->runChanges.value() )},
{"monitoring-filename", ""}};
......@@ -223,7 +224,7 @@ int AllenApplication::configureApplication() {
m_allenControl = m_zmqSvc->socket( zmq::PAIR );
m_allenControl->bind( m_controlConnection.c_str() );
m_allenThread = std::thread{&AllenApplication::allenLoop, this, std::move( sequence )};
m_allenThread = std::thread{&AllenApplication::allenLoop, this, sequence, source};
zmq::pollitem_t items[] = {{*m_allenControl, 0, zmq::POLLIN, 0}};
m_zmqSvc->poll( &items[0], 1, -1 );
......@@ -292,8 +293,9 @@ int AllenApplication::continueProcessing() {
return OnlineApplication::continueProcessing();
}
void AllenApplication::allenLoop( std::string config ) {
auto status = allen( m_options, config, this, m_provider, m_output, m_zmqSvc.get(), m_controlConnection );
void AllenApplication::allenLoop( std::string_view config, std::string_view config_source ) {
auto status =
allen( m_options, config, config_source, this, m_provider, m_output, m_zmqSvc.get(), m_controlConnection );
if ( status != 0 ) {
m_logger->error( "Allen event loop exited with error" );
error();
......
......@@ -61,7 +61,7 @@ public:
int continueProcessing() override;
// Main function running the Allen event loop
void allenLoop( std::string config );
void allenLoop( std::string_view config, std::string_view config_source );
bool initMPI();
......
......@@ -29,12 +29,15 @@ StatusCode AllenConfiguration::queryInterface( const InterfaceID& riid, void** p
AllenConfiguration::AllenConfiguration( std::string name, ISvcLocator* svcloc ) : Service( name, svcloc ) {}
std::string AllenConfiguration::sequence() const {
if ( !m_sequence.empty() ) { return m_sequence; }
std::tuple<std::string, std::string> AllenConfiguration::sequence() const {
if ( !m_sequence.empty() ) { return {m_sequence, "json"}; }
std::string source;
if ( !m_tck.empty() ) {
// Load from TCK
auto repo = m_json.value();
auto repo = m_json.value();
source = repo + ":" + m_tck;
LHCb::TCK::Info tck_info{};
std::tie( m_sequence, tck_info ) = Allen::sequence_from_git( repo, m_tck );
......@@ -57,9 +60,10 @@ std::string AllenConfiguration::sequence() const {
} else {
info() << "Configuring Allen from JSON file " << m_json << endmsg;
m_sequence = std::string{std::istreambuf_iterator<char>{sequence_file}, std::istreambuf_iterator<char>{}};
source = m_json.value();
}
}
return m_sequence;
return {m_sequence, source};
}
AllenConfiguration::~AllenConfiguration() {}
......@@ -12,6 +12,7 @@
#include <filesystem>
#include <Allen/Provider.h>
#include <GaudiKernel/Service.h>
class AllenConfiguration : public Service {
......@@ -29,7 +30,7 @@ public:
virtual ~AllenConfiguration();
std::string sequence() const;
std::tuple<std::string, std::string> sequence() const;
Gaudi::Property<float> stopTimeout{this, "StopTimeout", 5.};
Gaudi::Property<unsigned int> nThreads{this, "NThreads", 8};
......@@ -51,6 +52,7 @@ public:
Gaudi::Property<unsigned> partitionID{this, "PartitionID", 0};
Gaudi::Property<bool> partitionBuffers{this, "PartitionBuffers", true};
Gaudi::Property<std::string> partition{this, "Partition", ""};
Gaudi::Property<bool> tckFromODIN{this, "TCKFromODIN", true};
private:
static std::string resolveEnvVars( std::string s ) {
......@@ -70,12 +72,11 @@ private:
return;
}
auto const json_path = resolveEnvVars( m_json.value() );
std::regex tck_option{"([^:]+):(0x[a-fA-F0-9]{8})"};
std::smatch tck_match;
if ( std::regex_match( json_path, tck_match, tck_option ) ) {
std::filesystem::path j{tck_match.str( 1 )};
m_tck = tck_match.str( 2 );
auto const json_path = resolveEnvVars( m_json.value() );
auto [from_tck, repo, tck] = Allen::config_from_tck( json_path );
if ( from_tck ) {
std::filesystem::path j{repo};
m_tck = tck;
if ( !std::filesystem::exists( j ) || !std::filesystem::is_directory( j ) ) {
throw GaudiException{"Git repository " + json_path + " does not exist or is not a directory", name(),
StatusCode::FAILURE};
......
......@@ -58,7 +58,7 @@ DECLARE_COMPONENT( MEPProvider )
// allocated by the BufferManager is registered with the device
// runtime.
MEPProvider::MEPProvider( std::string name, ISvcLocator* loc ) : Service{name, loc}, m_mfp_count{0} {}
MEPProvider::MEPProvider( std::string name, ISvcLocator* loc ) : BaseClass{name, loc}, m_mfp_count{0} {}
EventIDs MEPProvider::event_ids( size_t slice_index, std::optional<size_t> first, std::optional<size_t> last ) const {
auto const& ids = m_event_ids[slice_index];
......@@ -270,7 +270,7 @@ void MEPProvider::copy_banks( size_t const slice_index, unsigned int const event
StatusCode MEPProvider::initialize() {
auto sc = Service::initialize();
auto sc = BaseClass::initialize();
if ( !sc.isSuccess() ) return sc;
// initialize bank version, needed for banks of subdetectors not present in input data
......@@ -291,15 +291,14 @@ StatusCode MEPProvider::initialize() {
}
m_allenConfig = config.get();
std::string const& sequence = m_allenConfig->sequence();
if ( sequence.empty() ) {
error() << "Failed to obtain sequence" << endmsg;
return StatusCode::FAILURE;
}
if ( m_all_bank_types.value() ) {
m_bank_types = DataBankTypes;
} else {
auto const& [sequence, source] = m_allenConfig->sequence();
if ( sequence.empty() ) {
error() << "Failed to obtain sequence" << endmsg;
return StatusCode::FAILURE;
}
const auto config_reader = ConfigurationReader( sequence );
m_bank_types = config_reader.configured_bank_types();
}
......@@ -397,7 +396,7 @@ StatusCode MEPProvider::initialize() {
StatusCode MEPProvider::start() {
if ( m_started ) { return StatusCode::SUCCESS; }
auto sc = Service::start();
auto sc = BaseClass::start();
if ( !sc.isSuccess() ) return sc;
std::unique_lock<std::mutex> lock{m_control_mutex};
......@@ -497,7 +496,7 @@ StatusCode MEPProvider::stop() {
m_input_started = 0;
}
return Service::stop();
return BaseClass::stop();
};
StatusCode MEPProvider::finalize() {
......@@ -507,7 +506,7 @@ StatusCode MEPProvider::finalize() {
m_control_cond.notify_all();
for ( auto& transpose_thread : m_transpose_threads ) { transpose_thread.join(); }
return Service::finalize();
return BaseClass::finalize();
}
bool MEPProvider::release_buffers() {
......@@ -528,20 +527,6 @@ bool MEPProvider::release_buffers() {
if ( bmid != MBM_INV_DESC ) { ::mbm_exclude( bmid ); }
m_bmIDs[b] = MBM_INV_DESC;
}
for ( auto& status : m_buffer_status ) {
status.writable = true;
status.work_counter = 0;
status.intervals.clear();
}
m_buffer_reading = m_buffer_status.begin();
for ( size_t i = 0; i < m_slice_free.size(); ++i ) m_slice_free[i] = true;
for ( size_t i = 0; i < m_odins.size(); ++i ) m_odins[i].fill( 0 );
for ( size_t i = 0; i < m_slice_to_buffer.size(); ++i ) m_slice_to_buffer[i] = {-1, 0, 0};
m_transposed.clear();
return true;
}
......
......@@ -56,6 +56,15 @@ namespace MEP {
class AllenConfiguration;
// backward compatibility with Gaudi < v39 where FSMCallbackHolder does not exist
#include <GAUDI_VERSION.h>
#if GAUDI_MAJOR_VERSION < 39
using BaseClass = Service;
#else
# include <Gaudi/FSMCallbackHolder.h>
using BaseClass = Gaudi::FSMCallbackHolder<Service>;
#endif
/**
* @brief Configuration parameters for the MEPProvider
*/
......@@ -82,7 +91,7 @@ class AllenConfiguration;
* @param Configuration struct
*
*/
class MEPProvider final : public Service, public InputProvider {
class MEPProvider final : public BaseClass, public InputProvider {
public:
MEPProvider( std::string name, ISvcLocator* loc );
......
......@@ -63,18 +63,19 @@ LHCb::ODIN MEP::decode_odin( char const* odin_data, unsigned const offset, unsig
}
}
bool MEP::find_blocks( EB::MEP const* mep, Blocks& blocks, std::function<void( size_t )> const& bad_mfp ) {
std::optional<size_t> MEP::find_blocks( EB::MEP const* mep, Blocks& blocks,
std::function<void( size_t )> const& bad_mfp ) {
// Fill blocks in temporary container
bool found_odin = false;
size_t n_good = 0;
std::optional<size_t> odin_block_index;
size_t n_good = 0;
blocks.resize( mep->header.n_MFPs );
for ( size_t i_block = 0; i_block < blocks.size(); ++i_block ) {
EB::MFP const* mfp = mep->at( i_block );
if ( mfp->is_header_valid() ) {
blocks[n_good] = Block{mfp};
if ( SourceId_sys( mfp->header.src_id ) == SourceIdSys::SourceIdSys_ODIN ) odin_block_index = n_good;
++n_good;
if ( SourceId_sys( mfp->header.src_id ) == SourceIdSys::SourceIdSys_ODIN ) found_odin = true;
} else {
bad_mfp( mep->header.src_ids()[i_block] );
}
......@@ -89,7 +90,7 @@ bool MEP::find_blocks( EB::MEP const* mep, Blocks& blocks, std::function<void( s
[]( size_t s, const MEP::Block& b ) { return s + b.header->bytes(); } );
assert( total_block_size <= mep->bytes() );
#endif
return found_odin;
return odin_block_index;
}
void MEP::fragment_offsets( MEP::Blocks const& blocks, MEP::SourceOffsets& offsets ) {
......
......@@ -47,11 +47,9 @@ if not UseDD4Hep:
app.DDDBtag = "upgrade/dddb-20221004"
app.CondDBtag = "upgrade/mu_VP_SciFi_macromicrosurvey_from20220923"
else:
app.DDDBtag = "trunk"
app.DDDBtag = "run3/trunk"
app.CondDBtag = "master"
# Decode VP, UT, FT and muons
check_seq = GaudiSequencer("CheckODINSeq")
unpack_raw = UnpackRawEvent(
......
......@@ -16,7 +16,7 @@
<argument name="args"><set>
<text>${BINARYDUMPERSROOT}/options/allen.py</text>
<text>--monitoring-filename</text><text>mep_lumi.root</text>
<text>-m</text><text>1000</text>
<text>-m</text><text>300</text>
<text>-t</text><text>1</text>
<text>--events-per-slice</text><text>1000</text>
<text>--tags</text><text>detdesc:upgrade/dddb-20221004,upgrade/mu_VP_SciFi_macromicrosurvey_from20220923|dd4hep:run3/trunk,master</text>
......@@ -24,6 +24,7 @@
<text>--mep</text><text>mdf:root://eoslhcb.cern.ch///eos/lhcb/wg/rta/samples/data/289232-LHCb-MEP/bu_289232_LHCb_ECEB01_BU_0.mep</text>
<text>--output-file</text><text>mep_lumi.mdf</text>
<text>--register-monitoring-counters</text><text>0</text>
<text>--real-data</text>
</set></argument>
<argument name="prerequisites"><set>
<tuple><text>generate_passthrough_sequence</text><enumeral>PASS</enumeral></tuple>
......@@ -33,7 +34,7 @@
</set></argument>
<argument name="use_temp_dir"><enumeral>true</enumeral></argument>
<argument name="reference"><text>../refs/mep_lumi.ref</text></argument>
<argument name="timeout"><integer>3000</integer></argument>
<argument name="timeout"><integer>100</integer></argument>
<argument name="validator"><text>
from Allen.qmtest.exclusions import preprocessor
......
......@@ -6,7 +6,7 @@
#include "$INFO_OPTIONS"
#include "$FARMCONFIGROOT/options/Logging.opts"
// The minimum buffer size that will fit a 2022 MEP is 1200000 kB
OnlineEnv.MBM_setup = "-s=1200000 -e=50 -u=50 -b=18 -t=1 -y -i=Events -f -c -s=100000 -e=50 -u=15 -b=12 -t=1 -y -i=Output -f -c";
OnlineEnv.MBM_setup = "-s=1600000 -e=50 -u=50 -b=18 -t=1 -y -i=Events -f -c -s=100000 -e=50 -u=15 -b=12 -t=1 -y -i=Output -f -c";
//
Manager.Setup = {"Dataflow_MBMServer/MEPManager"};
//
......
......@@ -450,36 +450,38 @@ async def tasks_measure_throughput(tasks,
return await measure_throughput(utgids, max_duration, print_throughput)
async def wait_for_output(utgids):
def diff(start, end):
return (end[1] - start[1])
async def hlt1_wait_for_output(tasks, prod_svc, proc_svc):
async with AsyncExitStack() as stack:
services = [
stack.enter_context(
asyncdim.DimService(u + "/Writer/EventsOut", "X"))
for u in utgids
]
# get the first data point per task
meas = [[(await s.get())[1]] for s in services]
log.debug(str(meas))
da = []
while len(da) < 4 or da[-1] > 0 or da[-2] > 0:
diffs = []
for s, m, utgid in zip(services, meas, utgids):
m.extend(await s.get_all())
log.debug(str(meas))
diffs.append(diff(m[-2], m[-1]))
log.info(
f"{utgid}: {diffs[-1]:d} events written ({m[-1][1]} events)"
)
da.append(sum(diffs))
task_services = {}
for t in tasks:
task_services[t.utgid] = [
stack.enter_context(
asyncdim.DimService(f"{t.utgid}/{s}", "X"))
for s in (prod_svc, proc_svc)
]
async def tasks_wait_for_output(tasks, type_pattern=r".*Writer.*"):
utgids = [t.utgid for t in tasks]
utgids = [u for u in utgids if re.match(type_pattern, u.split("_")[2])]
return await wait_for_output(utgids)
processed = []
meas = {u: [[] for s in svcs] for u, svcs in task_services.items()}
while True:
for u, svcs in task_services.items():
for i, s in enumerate(svcs):
meas[u][i].extend(
x[1] for x in await s.get_all() if x[1] is not None)
log.debug(str(meas))
done = []
diff = 0
for u, (prod_counts, proc_counts) in meas.items():
latest = min(len(prod_counts), len(proc_counts))
if latest >= 2:
done.append((prod_counts[latest - 1] -
proc_counts[latest - 1]) <= 0)
diff += (proc_counts[latest - 1] - proc_counts[latest - 2])
processed.append(diff)
if done and all(done) and len(processed) > 2 and processed[
-1] == 0 and processed[-2] == 0:
break
async def tasks_get_counter(tasks, counter_name):
......
......@@ -19,7 +19,7 @@ from MooreScripts.testbench.emulator import (
tasks_send_command,
tasks_wait_for_exit,
tasks_measure_throughput,
tasks_wait_for_output,
hlt1_wait_for_output,
tasks_wait_for_value,
async_input,
)
......@@ -105,20 +105,23 @@ async def run(tasks: List[emulator.Task], args, extra_argv):
n_events_produced = next(
v for ts, v in reversed(await dim_prod_out.get_all()) if v is not None)
if "HLT1" in main_tasks[0].utgid:
# if there is a writer, wait for the output rate to be 0
if any("Writer" in task.utgid for task in tasks):
await tasks_wait_for_output(tasks)
elif "HLT2" in main_tasks[0].utgid and not args.measure_throughput > 0:
log.info(f"Waiting to process all {n_events_produced} events")
n_events_processed = sum(await tasks_wait_for_value(
main_tasks,
"Events/OUT",
lambda vs: sum(vs) >= n_events_produced,
))
if n_events_processed > n_events_produced:
log.error(f"Produced {n_events_produced} but processed " +
f"more: {n_events_processed}")
if not args.measure_throughput > 0:
if "HLT1" in main_tasks[0].utgid:
log.info(f"Waiting until all events have been processed")
await hlt1_wait_for_output(main_tasks, "Events/OUT",
"MBMOutput/NProcessed")
elif "HLT2" in main_tasks[0].utgid:
log.info(
f"Waiting until all {n_events_produced} events have been processed"
)
n_events_processed = sum(await tasks_wait_for_value(
main_tasks,
"Events/OUT",
lambda vs: sum(vs) >= n_events_produced,
))
if n_events_processed > n_events_produced:
log.error(f"Produced {n_events_produced} but processed " +
f"more: {n_events_processed}")
await tasks_send_command([t for t in tasks if t not in prod_tasks], "stop")
await tasks_wait_for_status([t for t in tasks if t not in prod_tasks],
......
......@@ -129,6 +129,12 @@ parser.add_argument(
help=
"Enables writing of the encoding keys by setting env WRITE_ENCODING_KEYS=1.",
)
parser.add_argument(
"--tck-from-odin",
action="store_true",
help=
"Enables writing of the encoding keys by setting env WRITE_ENCODING_KEYS=1.",
)
args, unknown_argv = parser.parse_known_args()
args.data_dir = args.working_dir / args.data_dir
......@@ -168,6 +174,11 @@ if args.write_encoding_keys:
arch,
{"WRITE_ENCODING_KEYS": "1"},
)
if args.tck_from_odin:
arch = architecture.overwrite_dict_value(
arch,
{"IGNORE_ODIN_TCK": "0"},
)
task_instance_args = architecture.instance_args(arch, replacements)
emulator.check_for_orphans([a["args"][0] for a in task_instance_args])
......
......@@ -42,6 +42,7 @@
<fmcparam name="define" value="WORKING_DIR=${WORKING_DIR}" />
<fmcparam name="define" value="MBM_SETUP_OPTIONS=${MOORESCRIPTSROOT}/tests/options/HLT1/MBM_setup.opts" />
<fmcparam name="define" value="BIND_NUMA=1" />
<fmcparam name="define" value="IGNORE_ODIN_TCK=1" />
<timeout action="Any" value="20"/>
<timeout action="configure" value="70"/>
<timeout action="start" value="80"/>
......
......@@ -37,11 +37,12 @@
<command>${MOORESCRIPTSROOT}/job/runHLT1.sh</command>
<argument name="-type" value="${NAME}"/>
<argument name="-runinfo" value="${RUNINFO}"/>
<argument name="-numthreads" value="2" />
<argument name="-numthreads" value="4" />
<fmcparam name="utgid" value="${PARTITION}_${NODE}_${NAME}_${INSTANCE}"/>
<fmcparam name="define" value="BINARY_TAG=${BINARY_TAG}" />
<fmcparam name="define" value="WORKING_DIR=${WORKING_DIR}" />
<fmcparam name="define" value="MBM_SETUP_OPTIONS=${MOORESCRIPTSROOT}/tests/options/HLT1Slim/MBM_setup.opts" />
<fmcparam name="define" value="IGNORE_ODIN_TCK=1" />
<timeout action="Any" value="20"/>
<timeout action="configure" value="70"/>
<timeout action="start" value="80"/>
......