diff --git a/AllenOnline/application/read_mep.cpp b/AllenOnline/application/read_mep.cpp index f2dda47c7545369f24cc59d2fd46a12e719b3992..a589627482f0c5c2477184ec6eba53dbd4687829 100644 --- a/AllenOnline/application/read_mep.cpp +++ b/AllenOnline/application/read_mep.cpp @@ -213,7 +213,11 @@ int main( int argc, char* argv[] ) { if ( n_skip != 0 && n_skip-- > 0 ) continue; - MEP::find_blocks( mep, slice.blocks ); + std::function<void( size_t )> bad_mfp = []( size_t source_id ) { + auto const* sd = SourceId_sysstr( source_id ); + std::cout << "ERROR: bad MFP for " << sd << " with source ID " << source_id << "\n"; + }; + MEP::find_blocks( mep, slice.blocks, bad_mfp ); MEP::fragment_offsets( slice.blocks, slice.offsets ); diff --git a/AllenOnline/include/AllenOnline/TransposeMEP.h b/AllenOnline/include/AllenOnline/TransposeMEP.h index f120d9a3afd37357ab18f1d9e20fb5bba55f78cb..de306857aa2b6a9d3b974332b715ab09e5f76d63 100644 --- a/AllenOnline/include/AllenOnline/TransposeMEP.h +++ b/AllenOnline/include/AllenOnline/TransposeMEP.h @@ -70,17 +70,14 @@ namespace MEP { /** * @brief Fill the array the contains the number of banks per type * - * @details detailed description + * @param MEP * - * @param EB::Header for a MEP - * @param span of the block data in the MEP - * @param LHCb::RawBank::BankType to Allen bank type mapping - * - * @return (success, number of banks per bank type; 0 if the bank is not needed, bank version per type) + * @return (success, number of banks per bank type; 0 if the bank is not needed, bank version per type, map of + * source ID to bin label) */ std::tuple<bool, std::array<unsigned int, NBankTypes>, std::array<int, NBankTypes>> fill_counts( EB::MEP const* mep ); - void find_blocks( EB::MEP const* mep, Blocks& blocks ); + bool find_blocks( EB::MEP const* mep, Blocks& blocks, std::function<void( size_t )> const& badMFP ); void fragment_offsets( Blocks const& blocks, std::vector<std::vector<uint32_t>>& offsets ); diff --git a/AllenOnline/src/MEPProvider.cpp b/AllenOnline/src/MEPProvider.cpp index 6e3480069c865f9fc6d0f497c512c0236cce7df8..118b02db80d439edc459b860ed0da62c19fb0a71 100755 --- a/AllenOnline/src/MEPProvider.cpp +++ b/AllenOnline/src/MEPProvider.cpp @@ -695,8 +695,8 @@ size_t MEPProvider::count_writable() const { []( size_t s, BufferStatus const& stat ) { return s + stat.writable; } ); } -bool MEPProvider::allocate_storage( size_t i_read ) { - if ( m_sizes_known ) return true; +std::tuple<bool, bool> MEPProvider::allocate_storage( size_t i_read ) { + if ( m_sizes_known ) return {true, false}; // Count number of banks per flavour bool count_success = false; @@ -725,6 +725,44 @@ bool MEPProvider::allocate_storage( size_t i_read ) { } std::tie( count_success, m_mfp_count, m_banks_version ) = MEP::fill_counts( mep ); + if ( !count_success ) { + ++m_alloc_tries; + if ( m_alloc_tries < m_allocate_retries.value() ) { + return {false, false}; + } else { + error() << "Memory allocation failed" << endmsg; + error() << "Failed to obtain good MEP with all MFPs valid after " << m_allocate_retries.value() + << " tries; giving up." << endmsg; + return {false, true}; + } + } + + // Allocate histogram that keeps track of how many bad MFPs there + // are per source ID. We know that the MEP we got has all MFPs + // valid. + std::vector<std::string> bin_labels; + bin_labels.reserve( mep->header.n_MFPs ); + std::vector<EB::MFP const*> mfps( mep->header.n_MFPs ); + + // Sort the MFPs by source ID + for ( size_t i = 0; i < mep->header.n_MFPs; ++i ) { mfps[i] = mep->at( i ); } + std::sort( mfps.begin(), mfps.end(), + []( EB::MFP const* a, EB::MFP const* b ) { return a->header.src_id < b->header.src_id; } ); + + // Loop over the sorted MFPs and fill both the mapping from source + // ID to bin and the bin labels + int n_bins = std::accumulate( mfps.begin(), mfps.end(), 0, [this, &bin_labels]( int s, EB::MFP const* mfp ) { + auto const source_id = mfp->header.src_id; + m_mfp_bin_mapping.emplace( source_id, s ); + auto const* sd = SourceId_sysstr( source_id ); + bin_labels.emplace_back( std::to_string( source_id ) + " (" + sd + " " + + std::to_string( SourceId_num( source_id ) ) + ")" ); + return s + 1; + } ); + m_bad_mfps = std::make_unique<Histo1U>( + this, "BadMFPs", "Bad MFPs per source ID", + Gaudi::Accumulators::Axis<float>{ + static_cast<unsigned>( n_bins ), -0.5f, n_bins - 0.5f, {}, std::move( bin_labels )} ); // Allocate slice memory that will contain transposed banks ready // for processing by the Allen kernels @@ -761,13 +799,8 @@ bool MEPProvider::allocate_storage( size_t i_read ) { m_slice_to_buffer = std::vector<std::tuple<int, size_t, size_t>>( m_nslices, std::tuple{-1, 0ul, 0ul} ); - if ( !count_success ) { - error() << "Failed to determine bank counts" << endmsg; - return false; - } else { - m_sizes_known = true; - return true; - } + m_sizes_known = true; + return {true, false}; } bool MEPProvider::open_file() const { @@ -829,8 +862,19 @@ bool MEPProvider::prepare_mep( size_t i_buffer, size_t n_events ) { return false; } - // Fill blocks - MEP::find_blocks( slice.mep, slice.blocks ); + // Find blocks + std::function<void( size_t )> bad_mfp = [this]( size_t source_id ) { + auto const sd_index = SourceId_sys( source_id ); + ++m_bad_mfps_sd[sd_index]; + auto it = m_mfp_bin_mapping.find( source_id ); + if ( it == m_mfp_bin_mapping.end() ) { + warning() << "Failed to find bin in bad MFP histogram for source ID " << source_id << endmsg; + } else { + ++( *m_bad_mfps )[it->second]; + } + }; + + if ( !MEP::find_blocks( slice.mep, slice.blocks, bad_mfp ) ) { return false; } // Fill fragment offsets MEP::fragment_offsets( slice.blocks, slice.offsets ); @@ -994,32 +1038,58 @@ void MEPProvider::mep_read() { } } - if ( m_read_error || ( !m_sizes_known && !allocate_storage( i_buffer ) ) ) { - read_error(); + if ( m_read_error ) { break; + } else if ( !m_sizes_known ) { + auto [success, give_up] = allocate_storage( i_buffer ); + if ( !success ) { + if ( !give_up ) { + m_buffer_reading->writable = false; + if ( preloading ) { + preloaded_buffer[i_buffer] = false; + --preloaded; + } + continue; + } else { + read_error(); + break; + } + } } // Notify a transpose thread that a new buffer of events is // ready. If prefetching is done, wake up all threads if ( !m_read_error ) { - { - - auto const n_events = ( to_read ? to_publish : size_t{slice.packing_factor} ); + auto const n_events = ( to_read ? to_publish : size_t{slice.packing_factor} ); - if ( !eof && to_publish != 0 ) { + if ( !eof && to_publish != 0 ) { + // Set intervals for offset calculation/transposition + auto has_odin = prepare_mep( i_buffer, n_events ); + if ( !has_odin ) { + ++m_mep_without_odin; + std::unique_lock<std::mutex> lock{m_buffer_mutex}; + m_buffer_status[i_buffer].writable = true; + if ( preloading ) { + preloaded_buffer[i_buffer] = false; + --preloaded; + } + continue; + } else { // Monitor this MEP ( *m_mepsInput ) += 1; ( *m_eventsInput ) += n_events; ( *m_mbInput ) += ( 2 * slice.mep->bytes() + 1 ) / ( 2 * 1024 * 1024 ); - - // Set intervals for offset calculation/transposition - if ( !prepare_mep( i_buffer, n_events ) ) break; - } else { - // We didn't read anything, so free the buffer we got again - std::unique_lock<std::mutex> lock{m_buffer_mutex}; - m_buffer_status[i_buffer].writable = true; + } + } else { + // We didn't read anything, so free the buffer we got again + std::unique_lock<std::mutex> lock{m_buffer_mutex}; + m_buffer_status[i_buffer].writable = true; + if ( preloading ) { + preloaded_buffer[i_buffer] = false; + --preloaded; } } + if ( receive_done ) { m_input_done = receive_done; if ( msgLevel( MSG::DEBUG ) ) debug() << "Prefetch notifying all" << endmsg; @@ -1175,61 +1245,75 @@ void MEPProvider::mpi_read() { auto const* mfp = mep->at( 0 ); slice.packing_factor = mfp->header.n_banks; - if ( !m_sizes_known && !allocate_storage( i_buffer ) ) { - read_error(); - break; + if ( !m_sizes_known ) auto [success, give_up] = allocate_storage( i_buffer ); + if ( !success ) { + if ( !give_up ) { + m_buffer_reading->writable = false; + continue; + } else { + read_error(); + break; + } } + } - auto& [meps_received, bytes_received] = data_received[receiver]; - bytes_received += mep_size; - meps_received += 1; - - ( *m_mepsInput ) += 1; - ( *m_eventsInput ) += slice.packing_factor; - ( *m_mbInput ) += ( 2 * mep_size + 1 ) / ( 2 * 1024 * 1024 ); - - if ( t.get_elapsed_time() >= reporting_period ) { - const auto seconds = t.get_elapsed_time(); - auto total_rate = 0.; - auto total_bandwidth = 0.; - for ( size_t i_rec = 0; i_rec < m_receivers.size(); ++i_rec ) { - auto& [mr, br] = data_received[i_rec]; - auto [rec_rank, rec_node] = m_domains[i_rec]; - - const double rate = (double)mr / seconds; - const double bandwidth = ( (double)( br * 8 ) ) / ( 1024 * 1024 * 1024 * seconds ); - total_rate += rate; - total_bandwidth += bandwidth; - printf( "[%lf, %lf] Throughput: %lf MEP/s, %lf Gb/s; Domain %2i; Rank %2i\n", t_origin.get_elapsed_time(), - seconds, rate, bandwidth, rec_node, rec_rank ); - - br = 0; - mr = 0; - } - if ( m_receivers.size() > 1 ) { - printf( "[%lf, %lf] Throughput: %lf MEP/s, %lf Gb/s\n", t_origin.get_elapsed_time(), seconds, total_rate, - total_bandwidth ); - } - t.restart(); + auto& [meps_received, bytes_received] = data_received[receiver]; + bytes_received += mep_size; + meps_received += 1; + + if ( t.get_elapsed_time() >= reporting_period ) { + const auto seconds = t.get_elapsed_time(); + auto total_rate = 0.; + auto total_bandwidth = 0.; + for ( size_t i_rec = 0; i_rec < m_receivers.size(); ++i_rec ) { + auto& [mr, br] = data_received[i_rec]; + auto [rec_rank, rec_node] = m_domains[i_rec]; + + const double rate = (double)mr / seconds; + const double bandwidth = ( (double)( br * 8 ) ) / ( 1024 * 1024 * 1024 * seconds ); + total_rate += rate; + total_bandwidth += bandwidth; + printf( "[%lf, %lf] Throughput: %lf MEP/s, %lf Gb/s; Domain %2i; Rank %2i\n", t_origin.get_elapsed_time(), + seconds, rate, bandwidth, rec_node, rec_rank ); + + br = 0; + mr = 0; + } + if ( m_receivers.size() > 1 ) { + printf( "[%lf, %lf] Throughput: %lf MEP/s, %lf Gb/s\n", t_origin.get_elapsed_time(), seconds, total_rate, + total_bandwidth ); } + t.restart(); + } - // Notify a transpose thread that a new buffer of events is - // ready. If prefetching is done, wake up all threads - if ( !mpi_error ) { - if ( !prepare_mep( i_buffer, size_t{slice.packing_factor} ) ) break; + // Notify a transpose thread that a new buffer of events is + // ready. If prefetching is done, wake up all threads + if ( !mpi_error ) { + auto has_odin = prepare_mep( i_buffer, size_t{slice.packing_factor} ); + if ( !has_odin ) { + ++m_mep_without_odin; + std::unique_lock<std::mutex> lock{m_buffer_mutex}; + m_buffer_status[i_buffer].writable = true; + continue; + } else { + // Monitor this MEP + ( *m_mepsInput ) += 1; + ( *m_eventsInput ) += n_events; + ( *m_mbInput ) += ( 2 * slice.mep->bytes() + 1 ) / ( 2 * 1024 * 1024 ); if ( msgLevel( MSG::DEBUG ) ) debug() << "Prefetch notifying one" << endmsg; m_transpose_cond.notify_one(); } - - current_mep++; } - if ( !m_done ) { - m_done = true; - if ( msgLevel( MSG::DEBUG ) ) debug() << "Prefetch notifying all" << endmsg; - m_transpose_cond.notify_all(); - } + current_mep++; +} + +if ( !m_done ) { + m_done = true; + if ( msgLevel( MSG::DEBUG ) ) debug() << "Prefetch notifying all" << endmsg; + m_transpose_cond.notify_all(); +} #endif } @@ -1313,13 +1397,27 @@ void MEPProvider::bm_read( const std::string& buffer_name ) { #endif auto sc = ::mbm_get_event( m_bmIDs[i_buffer], &ev_data, &ev_len, &ev_type, trmask, partitionID ); if ( sc == MBM_NORMAL ) { - { + auto const* mep = reinterpret_cast<EB::MEP const*>( ev_data ); + + // Check incoming MEP + if ( mep == nullptr || !mep->is_magic_valid() || mep->header.n_MFPs == 0 ) { + if ( mep == nullptr ) + ++m_mep_null; + else if ( !mep->is_magic_valid() ) + ++m_mep_bad_magic; + else + ++m_mep_empty; + + ::mbm_free_event( m_bmIDs[i_buffer] ); + buffer_reading->writable = true; + + continue; + } else { std::unique_lock<std::mutex> lock{m_mbm_mutex}; m_buffer_event[i_buffer] = true; } - // info() << "Got MEP " << i_buffer << endmsg; - slice.mep = reinterpret_cast<EB::MEP const*>( ev_data ); + slice.mep = mep; slice.mep_data = {reinterpret_cast<char const*>( ev_data ), slice.mep->bytes()}; slice.slice_size = static_cast<size_t>( ev_len ); auto const* mfp = slice.mep->at( 0 ); @@ -1338,9 +1436,16 @@ void MEPProvider::bm_read( const std::string& buffer_name ) { } if ( !m_sizes_known && !cancelled ) { - if ( !allocate_storage( i_buffer ) ) { - read_error(); - break; + auto [success, give_up] = allocate_storage( i_buffer ); + if ( !success ) { + if ( !give_up ) { + ::mbm_free_event( m_bmIDs[i_buffer] ); + buffer_reading->writable = true; + continue; + } else { + read_error(); + break; + } } else { m_control_cond.notify_all(); } @@ -1350,12 +1455,19 @@ void MEPProvider::bm_read( const std::string& buffer_name ) { // Notify a transpose thread that a new buffer of events is // ready. If prefetching is done, wake up all threads - if ( !cancelled && !prepare_mep( i_buffer, size_t{slice.packing_factor} ) ) { - break; - } else if ( !cancelled ) { - debug() << "Prefetch notifying transpose threads" << endmsg; - m_transpose_cond.notify_all(); - } else if ( cancelled ) { + if ( !cancelled ) { + auto has_odin = prepare_mep( i_buffer, size_t{slice.packing_factor} ); + if ( !has_odin ) { + ++m_mep_without_odin; + std::unique_lock<std::mutex> lock{m_buffer_mutex}; + ::mbm_free_event( m_bmIDs[i_buffer] ); + m_buffer_status[i_buffer].writable = true; + continue; + } else { + debug() << "Prefetch notifying transpose threads" << endmsg; + m_transpose_cond.notify_all(); + } + } else { break; } } @@ -1487,6 +1599,9 @@ void MEPProvider::transpose( int thread_id ) { } } + m_odin_error_bank += std::accumulate( event_mask.begin(), event_mask.end(), 0, + []( size_t s, const char entry ) { return s + ( entry == 0 ); } ); + // Look for an ODIN bank that is not an error bank and decode the first one we find auto ib = to_integral( BankTypes::ODIN ); auto const& odin_slice = m_slices[ib][*slice_index]; @@ -1494,6 +1609,7 @@ void MEPProvider::transpose( int thread_id ) { auto const* odin_offsets = odin_slice.offsets.data(); auto const interval_size = std::get<1>( interval ) - std::get<0>( interval ); + bool have_odin = false; LHCb::RawBank::BankType odin_type; for ( size_t i_event = 0; i_event < interval_size; ++i_event ) { odin_type = @@ -1508,8 +1624,12 @@ void MEPProvider::transpose( int thread_id ) { offset = odin_offsets[MEP::offset_index( MEP::number_of_banks( odin_offsets ), i_event, 0 )]; size = MEP::bank_size( odin_block, odin_slice.sizes.data(), i_event, 0 ); } - m_odins[*slice_index] = MEP::decode_odin( odin_block, offset, size, m_banks_version[ib] ); - break; + auto odin = MEP::decode_odin( odin_block, offset, size, m_banks_version[ib] ); + if ( odin.runNumber() != 0 ) { + m_odins[*slice_index] = std::move( odin ); + have_odin = true; + break; + } } } @@ -1525,8 +1645,10 @@ void MEPProvider::transpose( int thread_id ) { std::scoped_lock lock{m_transpose_mut, m_buffer_mutex}; // Update transposed status - transpose_it->slice_index = *slice_index; - transpose_it->n_transposed = n_transposed; + if ( have_odin ) { + transpose_it->slice_index = *slice_index; + transpose_it->n_transposed = n_transposed; + } if ( n_transposed != std::get<1>( interval ) - std::get<0>( interval ) ) { auto& status = m_buffer_status[i_buffer]; @@ -1539,7 +1661,7 @@ void MEPProvider::transpose( int thread_id ) { if ( m_transpose_done ) { m_transpose_cond.notify_all(); - } else { + } else if ( have_odin ) { m_transposed_cond.notify_one(); } slice_index.reset(); diff --git a/AllenOnline/src/MEPProvider.h b/AllenOnline/src/MEPProvider.h index f52212fd76ee45fb05fc77a0f1a08d33847d1a06..54820557a203d25ff52c29b300596e6c8ebead0d 100644 --- a/AllenOnline/src/MEPProvider.h +++ b/AllenOnline/src/MEPProvider.h @@ -22,6 +22,7 @@ #include <unistd.h> #include <Gaudi/Accumulators.h> +#include <Gaudi/Accumulators/Histogram.h> #include <Gaudi/Property.h> #include <Kernel/meta_enum.h> @@ -165,7 +166,7 @@ private: size_t count_writable() const; - bool allocate_storage( size_t i_read ); + std::tuple<bool, bool> allocate_storage( size_t i_read ); /** * @brief Open an input file; called from the prefetch thread @@ -291,6 +292,9 @@ private: std::unordered_set<BankTypes> m_bank_types; + size_t m_alloc_tries = 0u; + + Gaudi::Property<size_t> m_allocate_retries{this, "AllocateRetries", 10}; Gaudi::Property<size_t> m_nslices{this, "NSlices", 6}; Gaudi::Property<size_t> m_events_per_slice{this, "EventsPerSlice", 1000}; Gaudi::Property<std::vector<std::string>> m_connections{this, "Connections"}; @@ -332,6 +336,18 @@ private: Gaudi::Property<bool> m_thread_per_buffer{this, "ThreadPerBuffer", true}; Gaudi::Property<bool> m_mask_top5{this, "MaskSourceIDTop5", false}; + Gaudi::Accumulators::Counter<> m_mep_null{this, "MEP_is_NULL"}; + Gaudi::Accumulators::Counter<> m_mep_bad_magic{this, "MEP_magic_invalid"}; + Gaudi::Accumulators::Counter<> m_mep_empty{this, "MEP_no_MFPs"}; + Gaudi::Accumulators::Counter<> m_mep_without_odin{this, "MEP_without_ODIN_MFP"}; + Gaudi::Accumulators::Counter<> m_odin_error_bank{this, "ODIN_error_bank"}; + + std::unordered_map<unsigned int, unsigned int> m_mfp_bin_mapping; + using Histo1U = Gaudi::Accumulators::Histogram<1, Gaudi::Accumulators::atomicity::full, float>; + std::unique_ptr<Histo1U> m_bad_mfps; + constexpr static int s_nsd = static_cast<int>( SourceIdSys::SourceIdSys_TDET ); + Histo1U m_bad_mfps_sd{this, "BadMFPsSubdet", "Bad MFPs per subdetector", {s_nsd + 1, -0.5f, s_nsd + 0.5f}}; + std::unique_ptr<Gaudi::Accumulators::Counter<>> m_mepsInput; std::unique_ptr<Gaudi::Accumulators::Counter<>> m_eventsInput; std::unique_ptr<Gaudi::Accumulators::Counter<>> m_mbInput; diff --git a/AllenOnline/src/TransposeMEP.cpp b/AllenOnline/src/TransposeMEP.cpp index 5c1019553367ee1a399425fb07ff42f59cc1f4dd..24a0ab9397b17036cfdd66427459a5a2192020e8 100644 --- a/AllenOnline/src/TransposeMEP.cpp +++ b/AllenOnline/src/TransposeMEP.cpp @@ -3,6 +3,7 @@ \*****************************************************************************/ #include <cassert> #include <cstring> +#include <sstream> #include <Allen/sourceid.h> #include <Event/ODIN.h> @@ -25,6 +26,8 @@ namespace { std::tuple<bool, std::array<unsigned int, NBankTypes>, std::array<int, NBankTypes>> MEP::fill_counts( EB::MEP const* mep ) { + bool all_valid = true; + bool has_odin = false; std::array<unsigned, NBankTypes> count{0}; std::array<int, NBankTypes> versions; @@ -33,16 +36,20 @@ MEP::fill_counts( EB::MEP const* mep ) { for ( size_t i = 0; i < mep->header.n_MFPs; ++i ) { auto const* mfp = mep->at( i ); - const int source_id = mfp->header.src_id; - auto const allen_type = source_id_type( source_id ); + auto const allen_type = source_id_type( mep->header.src_ids()[i] ); if ( allen_type != BankTypes::Unknown ) { auto const sd_index = to_integral( allen_type ); - versions[sd_index] = mfp->header.block_version; - ++count[sd_index]; + if ( mfp->is_header_valid() ) { + if ( SourceId_sys( mfp->header.src_id ) == SourceIdSys::SourceIdSys_ODIN ) has_odin = true; + versions[sd_index] = mfp->header.block_version; + ++count[sd_index]; + } else { + all_valid = false; + } } } - return {true, count, versions}; + return {all_valid && has_odin, count, versions}; } LHCb::ODIN MEP::decode_odin( char const* odin_data, unsigned const offset, unsigned const size_bytes, @@ -56,26 +63,33 @@ LHCb::ODIN MEP::decode_odin( char const* odin_data, unsigned const offset, unsig } } -void MEP::find_blocks( EB::MEP const* mep, Blocks& blocks ) { +bool MEP::find_blocks( EB::MEP const* mep, Blocks& blocks, std::function<void( size_t )> const& bad_mfp ) { // Fill blocks in temporary container - Blocks tmp{blocks}; - for ( size_t i_block = 0; i_block < mep->header.n_MFPs; ++i_block ) { - // block offsets are in 4-byte words with respect to the start of the MEP header - tmp[i_block] = MEP::Block{mep->at( i_block )}; - } + bool found_odin = false; + size_t n_good = 0; + blocks.resize( mep->header.n_MFPs ); - auto const* src_ids = mep->header.src_ids(); - std::vector<size_t> perm( tmp.size() ); - std::iota( perm.begin(), perm.end(), 0U ); - std::sort( perm.begin(), perm.end(), [src_ids]( size_t a, size_t b ) { return src_ids[a] < src_ids[b]; } ); + for ( size_t i_block = 0; i_block < blocks.size(); ++i_block ) { + EB::MFP const* mfp = mep->at( i_block ); + if ( mfp->is_header_valid() ) { + blocks[n_good] = Block{mfp}; + ++n_good; + if ( SourceId_sys( mfp->header.src_id ) == SourceIdSys::SourceIdSys_ODIN ) found_odin = true; + } else { + bad_mfp( mep->header.src_ids()[i_block] ); + } + } - for ( size_t i_block = 0; i_block < mep->header.n_MFPs; ++i_block ) { blocks[i_block] = tmp[perm[i_block]]; } + blocks.resize( n_good ); + std::sort( blocks.begin(), blocks.end(), + []( Block const& a, Block const& b ) { return a.header->src_id < b.header->src_id; } ); #ifndef NDEBUG size_t total_block_size = std::accumulate( blocks.begin(), blocks.end(), 0u, []( size_t s, const MEP::Block& b ) { return s + b.header->bytes(); } ); assert( total_block_size <= mep->bytes() ); #endif + return found_odin; } void MEP::fragment_offsets( MEP::Blocks const& blocks, MEP::SourceOffsets& offsets ) { @@ -203,6 +217,10 @@ std::tuple<bool, bool, size_t> MEP::mep_offsets( Allen::Slices& slices, int cons unsigned fragment_offset = 0; for ( unsigned i_event = 0; i_event < event_end; ++i_event ) { if ( i_event >= event_start ) { + + auto& [run_id, event_id] = event_ids.emplace_back( 0, 0 ); + event_mask[i_event - event_start] = 0; + if ( bank_types[i_event] == LHCb::RawBank::ODIN ) { auto const odin = decode_odin( block.payload, fragment_offset, bank_sizes[i_event], block.header->block_version ); @@ -210,17 +228,17 @@ std::tuple<bool, bool, size_t> MEP::mep_offsets( Allen::Slices& slices, int cons auto const event_number = odin.eventNumber(); // if splitting by run, check all events have same run number - if ( !run_number ) { - run_number = odin_run_number; - } else if ( split_by_run && run_number && odin_run_number != *run_number ) { - event_end = i_event; - break; + if ( odin_run_number != 0 ) { + if ( !run_number ) { + run_number = odin_run_number; + } else if ( split_by_run && run_number && odin_run_number != *run_number ) { + event_end = i_event; + break; + } + run_id = odin_run_number; + event_id = event_number; + event_mask[i_event - event_start] = 1; } - event_ids.emplace_back( odin_run_number, event_number ); - event_mask[i_event - event_start] = 1; - } else { - event_ids.emplace_back( 0, 0 ); - event_mask[i_event - event_start] = 0; } } fragment_offset += bank_sizes[i_event] + EB::get_padding( bank_sizes[i_event], 1 << align ); @@ -306,7 +324,7 @@ std::tuple<bool, bool, size_t> MEP::mep_offsets( Allen::Slices& slices, int cons bool MEP::transpose_mep( Allen::Slices& slices, int const slice_index, std::unordered_set<BankTypes> const& subdetectors, std::array<unsigned int, NBankTypes> const& mfp_count, EventIDs& event_ids, - std::vector<char>& event_mask, EB::MEP const* mep, MEP::Blocks const& blocks, + std::vector<char>& event_mask, EB::MEP const*, MEP::Blocks const& blocks, MEP::SourceOffsets const& input_offsets, std::tuple<size_t, size_t> const& interval, bool mask_top5 ) { auto [start_event, end_event] = interval; @@ -315,7 +333,7 @@ bool MEP::transpose_mep( Allen::Slices& slices, int const slice_index, size_t bank_index = 1; auto prev_type = BankTypes::Unknown; - for ( size_t i_block = 0; i_block < mep->header.n_MFPs; ++i_block ) { + for ( size_t i_block = 0; i_block < blocks.size(); ++i_block ) { auto const& block = blocks[i_block]; auto const source_id = block.header->src_id; auto allen_type = source_id_type( source_id ); @@ -327,14 +345,17 @@ bool MEP::transpose_mep( Allen::Slices& slices, int const slice_index, if ( allen_type == BankTypes::ODIN ) { // decode ODIN bank to obtain run and event numbers for ( uint16_t i_event = start_event; i_event < end_event; ++i_event ) { - if ( bank_types[i_event] != LHCb::RawBank::ODIN ) { - event_ids.emplace_back( 0, 0 ); - event_mask[i_event - start_event] = 0; - } else { + auto& [odin_id, event_id] = event_ids.emplace_back( 0, 0 ); + event_mask[i_event - start_event] = 0; + + if ( bank_types[i_event] == LHCb::RawBank::ODIN ) { auto const odin = decode_odin( block.payload, source_offsets[i_event], bank_sizes[i_event], block.header->block_version ); - event_ids.emplace_back( odin.runNumber(), odin.eventNumber() ); - event_mask[i_event - start_event] = 1; + if ( odin.runNumber() != 0 ) { + odin_id = odin.runNumber(); + event_id = odin.eventNumber(); + event_mask[i_event - start_event] = 1; + } } } }