diff --git a/Dumpers/BinaryDumpers/options/allen.py b/Dumpers/BinaryDumpers/options/allen.py index 1b32ddc4cfa8232a23b9583fb3f7e4ac34299ea9..d61feddb4a54c67e8e67f8973b93de1e43365229 100644 --- a/Dumpers/BinaryDumpers/options/allen.py +++ b/Dumpers/BinaryDumpers/options/allen.py @@ -152,7 +152,7 @@ if args.mep: mep_provider = MEPProvider() mep_provider.NSlices = args.slices mep_provider.EventsPerSlice = args.events_per_slice - mep_provider.OutputLevel = 3 + mep_provider.OutputLevel = (6 - int(args.verbosity)) # Number of MEP buffers and number of transpose/offset threads mep_provider.BufferConfig = (10, 8) mep_provider.TransposeMEPs = False @@ -286,11 +286,11 @@ else: assert (msg.decode() == "RUNNING") sleep(5) control.send(b"STOP") + gaudi.stop() msg = control.recv() assert (msg.decode() == "READY") if args.profile == "CUDA": runtime_lib.cudaProfilerStop() - gaudi.stop() gaudi.finalize() control.send(b"RESET") msg = control.recv() diff --git a/device/event_model/common/include/ODINBank.cuh b/device/event_model/common/include/ODINBank.cuh index 0b517feec12e0d309578b7b60949a005522225cb..e0aacf1893088688f3aa4c79388d1f54a437fc45 100644 --- a/device/event_model/common/include/ODINBank.cuh +++ b/device/event_model/common/include/ODINBank.cuh @@ -48,8 +48,7 @@ odin_bank(const char* dev_odin_data, const uint* dev_odin_offsets, const uint* d // is only 1 banks, there are two offsets. char const* event_data = dev_odin_data + dev_odin_offsets[event_number]; assert(reinterpret_cast(event_data)[0] == 1); - auto const size_offset = dev_odin_sizes[event_number]; - auto const size = reinterpret_cast(dev_odin_sizes)[size_offset]; + auto const size = Allen::bank_size(dev_odin_sizes, event_number, 0); return ODINRawBank(event_data + 3 * sizeof(uint32_t), size, 0); } diff --git a/main/include/AllenThreads.h b/main/include/AllenThreads.h index ca405c8916a68faef8800c23c10310553e34801e..117ec5d38e0a136ee919a0dc1f8ae84ffe49f964 100644 --- a/main/include/AllenThreads.h +++ b/main/include/AllenThreads.h @@ -19,6 +19,7 @@ std::string connection(const size_t id, std::string suffix = ""); void run_output( const size_t thread_id, + const size_t output_id, IZeroMQSvc* zmqSvc, OutputHandler* output_handler, HostBuffersManager* buffer_manager); diff --git a/main/include/FileWriter.h b/main/include/FileWriter.h index a58e55b9bbfa9b24b5d137c78dd00f38a62ead10..0a925952898b0f41a1250e73c6a8bf2ea3ff9efd 100644 --- a/main/include/FileWriter.h +++ b/main/include/FileWriter.h @@ -14,7 +14,7 @@ public: size_t const output_batch_size, size_t const n_lines, bool checksum = true) : - OutputHandler {input_provider, filename, output_batch_size, n_lines, checksum}, + OutputHandler {input_provider, filename, 1u, output_batch_size, n_lines, checksum}, m_filename {std::move(filename)} { m_output = MDF::open(m_filename, O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR); @@ -31,10 +31,10 @@ public: } protected: - std::tuple> buffer(size_t buffer_size, size_t) override + gsl::span buffer(size_t, size_t buffer_size, size_t) override { m_buffer.resize(buffer_size); - return {0, gsl::span {&m_buffer[0], static_cast(buffer_size)}}; + return gsl::span {&m_buffer[0], static_cast(buffer_size)}; } virtual bool write_buffer(size_t) override { return m_output.write(m_buffer.data(), m_buffer.size()); } diff --git a/main/include/OutputHandler.h b/main/include/OutputHandler.h index a6ff87099d03ef17b49f016ae7fd70cd70e91360..073b6e61e2c05030df814df7a728d8ab6b03fc00 100644 --- a/main/include/OutputHandler.h +++ b/main/include/OutputHandler.h @@ -12,6 +12,11 @@ #include "BankTypes.h" #include "Timer.h" +#ifndef STANDALONE +#include +#include +#endif + class OutputHandler { public: OutputHandler() {} @@ -19,11 +24,12 @@ public: OutputHandler( IInputProvider const* input_provider, std::string const connection, + size_t const n_threads, size_t const output_batch_size, size_t const n_lines, bool const checksum) { - init(input_provider, std::move(connection), output_batch_size, n_lines, checksum); + init(input_provider, std::move(connection), n_threads, output_batch_size, n_lines, checksum); } virtual ~OutputHandler() {} @@ -31,6 +37,7 @@ public: std::string const& connection() const { return m_connection; } std::tuple output_selected_events( + size_t const thread_id, size_t const slice_index, size_t const event_offset, gsl::span const selected_events, @@ -38,7 +45,7 @@ public: gsl::span const sel_reports, gsl::span const sel_report_offsets); - virtual zmq::socket_t* client_socket() { return nullptr; } + virtual zmq::socket_t* client_socket() const { return nullptr; } virtual void handle() {} @@ -48,31 +55,54 @@ public: bool do_checksum() const { return m_checksum; } + size_t n_threads() const { return m_nthreads; } + protected: void init( IInputProvider const* input_provider, std::string const connection, + size_t const n_threads, size_t const output_batch_size, size_t const n_lines, bool const checksum) { m_input_provider = input_provider; m_connection = std::move(connection); - m_sizes.resize(input_provider->events_per_slice()); + m_sizes.resize(n_threads); + for (auto& sizes : m_sizes) { + sizes.resize(input_provider->events_per_slice()); + } m_output_batch_size = output_batch_size; m_nlines = n_lines; m_checksum = checksum; + m_nthreads = n_threads; + +#ifndef STANDALONE + auto* svc = dynamic_cast(this); + if (svc != nullptr) { + m_noutput = std::make_unique>(svc, "NOutput"); + m_nbatches = std::make_unique>(svc, "NBatches"); + m_batch_size = std::make_unique>(svc, "BatchSize"); + } +#endif } - virtual std::tuple> buffer(size_t buffer_size, size_t n_events) = 0; + virtual gsl::span buffer(size_t thread_id, size_t buffer_size, size_t n_events) = 0; - virtual bool write_buffer(size_t id) = 0; + virtual bool write_buffer(size_t thread_id) = 0; IInputProvider const* m_input_provider = nullptr; std::string m_connection; - std::vector m_sizes; + std::vector> m_sizes; std::array m_trigger_mask = {~0u, ~0u, ~0u, ~0u}; size_t m_output_batch_size = 10; size_t m_nlines = 0; bool m_checksum = false; + size_t m_nthreads = 1; + +#ifndef STANDALONE + std::unique_ptr> m_noutput; + std::unique_ptr> m_batch_size; + std::unique_ptr> m_nbatches; +#endif }; diff --git a/main/include/Provider.h b/main/include/Provider.h index d8ef5817fc56db8df9ef9e8a85ab1adb4a6c3062..65d311fb6a806206f833a382aa1d884f0fe1e341 100644 --- a/main/include/Provider.h +++ b/main/include/Provider.h @@ -12,9 +12,7 @@ class IZeroMQSvc; namespace { - constexpr size_t n_write = 1; constexpr size_t n_input = 1; - constexpr size_t n_io = n_input + n_write; constexpr size_t n_mon = 1; constexpr size_t max_stream_threads = 1024; } // namespace diff --git a/main/include/ZMQOutputSender.h b/main/include/ZMQOutputSender.h index aec6f8021dd5856126651cd6f280e15ad6206652..1f22d0e9c2ad249031270f24117f57f4c90f5e35 100644 --- a/main/include/ZMQOutputSender.h +++ b/main/include/ZMQOutputSender.h @@ -21,12 +21,12 @@ public: ~ZMQOutputSender(); - zmq::socket_t* client_socket() override; + zmq::socket_t* client_socket() const override; void handle() override; protected: - std::tuple> buffer(size_t buffer_size, size_t) override; + gsl::span buffer(size_t, size_t buffer_size, size_t) override; virtual bool write_buffer(size_t) override; @@ -41,7 +41,7 @@ private: bool m_connected = false; // data socket - std::optional m_socket; + mutable std::optional m_socket; // request socket std::optional m_request; diff --git a/main/src/Allen.cpp b/main/src/Allen.cpp index 5c7c56f6276d7acb9ddd841f9d225042e5b3ae4a..10014fc22c9fc8f77f83b5cb57e9d6adf4d061eb 100644 --- a/main/src/Allen.cpp +++ b/main/src/Allen.cpp @@ -105,6 +105,9 @@ int allen( std::string mon_filename; bool disable_run_changes = 0; + size_t const n_write = output_handler != nullptr ? output_handler->n_threads() : 1; + size_t const n_io = n_input + n_write; + std::string flag, arg; // Use flags to populate variables in the program @@ -379,8 +382,8 @@ int allen( }; // Lambda with the execution of the output thread - const auto output_thread = [&](unsigned thread_id, unsigned) { - return std::thread {run_output, thread_id, zmqSvc, output_handler, buffers_manager.get()}; + const auto output_thread = [&](unsigned thread_id, unsigned output_id) { + return std::thread {run_output, thread_id, output_id, zmqSvc, output_handler, buffers_manager.get()}; }; // Lambda with the execution of the monitoring thread @@ -636,7 +639,7 @@ int allen( }; if (!allen_control && !error_count) { - for (size_t i = 0; i < n_io; ++i) { + for (size_t i = 0; i < n_input; ++i) { auto& socket = std::get<1>(io_workers[i]); zmqSvc->send(socket, "START"); } @@ -651,6 +654,24 @@ int allen( std::optional t_stop; float stop_timeout = 5.f; + // Iterator to the first writer thread + auto writer_it = io_workers.begin() + n_input; + + // Get a writer thread in round-robin fashion + auto get_writer = [&writer_it, &io_workers, n_input = n_input, n_write]() -> zmq::socket_t& { + if (n_write == 1) { + return std::get<1>(*writer_it); + } + else { + auto it = writer_it; + ++writer_it; + if (writer_it == io_workers.end()) { + writer_it = io_workers.begin() + n_input; + } + return std::get<1>(*it); + } + }; + // Main event loop // - Check if input slices are available from the input thread // - Distribute new input slices to stream_threads as soon as they arrive @@ -694,6 +715,7 @@ int allen( if (items[number_of_threads + i].revents & zmq::POLLIN) { auto& socket = std::get<1>(io_workers[i]); auto msg = zmqSvc->receive(socket); + if (msg == "SLICE") { slice_index = zmqSvc->receive(socket); auto n_filled = zmqSvc->receive(socket); @@ -846,7 +868,7 @@ int allen( input_slice_status[slc_index][first_event] = SliceStatus::Writing; - auto& socket = std::get<1>(io_workers[n_input]); + auto& socket = get_writer(); zmqSvc->send(socket, "WRITE", send_flags::sndmore); zmqSvc->send(socket, slc_index, send_flags::sndmore); zmqSvc->send(socket, first_event, send_flags::sndmore); @@ -897,7 +919,7 @@ int allen( io_done = false; // Send slice thread start to start asking for slices - for (size_t i = 0; i < n_io; ++i) { + for (size_t i = 0; i < n_input; ++i) { auto& socket = std::get<1>(io_workers[i]); zmqSvc->send(socket, "START"); } diff --git a/main/src/AllenThreads.cpp b/main/src/AllenThreads.cpp index b71953fa3a8a3f983bca5db3e3f4af93c9ee2a55..1ebf397d30c4f47e2c5978661f1e5fae0f46ee44 100644 --- a/main/src/AllenThreads.cpp +++ b/main/src/AllenThreads.cpp @@ -80,6 +80,7 @@ zmq::socket_t make_control(size_t thread_id, IZeroMQSvc* zmqSvc, std::string suf void run_output( const size_t thread_id, + const size_t output_id, IZeroMQSvc* zmqSvc, OutputHandler* output_handler, HostBuffersManager* buffer_manager) @@ -105,7 +106,8 @@ void run_output( } if (items[0].revents & zmq::POLLIN) { - auto msg = zmqSvc->receive(control); + bool more = false; + auto msg = zmqSvc->receive(control, &more); if (msg == "DONE") { break; } @@ -120,7 +122,7 @@ void run_output( buffer_manager->getBufferOutputData(buf_idx); if (output_handler != nullptr) { std::tie(success, n_written) = output_handler->output_selected_events( - slc_idx, first_evt, passing_event_list, dec_reports, sel_reports, sel_report_offsets); + output_id, slc_idx, first_evt, passing_event_list, dec_reports, sel_reports, sel_report_offsets); } zmqSvc->send(control, "WRITTEN", send_flags::sndmore); @@ -130,6 +132,12 @@ void run_output( zmqSvc->send(control, success, send_flags::sndmore); zmqSvc->send(control, n_written); } + else { + error_cout << "Output threads got unknown message: " << msg << "\n"; + while (more) { + zmqSvc->receive(control, &more); + } + } } } } diff --git a/main/src/OutputHandler.cpp b/main/src/OutputHandler.cpp index 68b7970831720ef29948d6c7fe5badb0defadbed..bf7cd096d054ccfb6e2b41ca72b8b798c2208547 100644 --- a/main/src/OutputHandler.cpp +++ b/main/src/OutputHandler.cpp @@ -20,6 +20,7 @@ #include std::tuple OutputHandler::output_selected_events( + size_t const thread_id, size_t const slice_index, size_t const start_event, gsl::span const selected_events_bool, @@ -33,7 +34,6 @@ std::tuple OutputHandler::output_selected_events( // size of the DecReport RawBank const unsigned dec_report_size = (m_nlines + 2) * sizeof(uint32_t); - // m_sizes will contain the total size of all banks in the event std::vector selected_events; selected_events.reserve(selected_events_bool.size()); for (unsigned i = 0; i < selected_events_bool.size(); ++i) { @@ -48,20 +48,32 @@ std::tuple OutputHandler::output_selected_events( auto const n_events = static_cast(selected_events.size()); if (n_events == 0) return {true, 0}; - std::fill_n(m_sizes.begin(), selected_events.size(), 0); - m_input_provider->event_sizes(slice_index, selected_events, m_sizes); + // m_sizes will contain the total size of all banks in the event + auto& event_sizes = m_sizes[thread_id]; + + std::fill_n(event_sizes.begin(), event_sizes.size(), 0); + m_input_provider->event_sizes(slice_index, selected_events, event_sizes); auto event_ids = m_input_provider->event_ids(slice_index); bool output_success = true; size_t n_output = 0; size_t n_batches = n_events / m_output_batch_size + (n_events % m_output_batch_size != 0); +#ifndef STANDALONE + if (m_nbatches) (*m_nbatches) += n_batches; +#endif + for (size_t i_batch = 0; i_batch < n_batches && output_success; ++i_batch) { size_t batch_buffer_size = 0; size_t output_event_offset = 0; size_t batch_size = std::min(m_output_batch_size, n_events - n_output); +#ifndef STANDALONE + if (m_noutput) (*m_noutput) += batch_size; + if (m_batch_size) (*m_batch_size) += batch_size; +#endif + for (size_t i = n_output; i < n_output + batch_size; ++i) { // The event number is constructed to index into a batch. The 0th @@ -76,15 +88,15 @@ std::tuple OutputHandler::output_selected_events( (sel_report_offsets[event_number + 1] - sel_report_offsets[event_number]) * sizeof(uint32_t); // add DecReport and SelReport sizes to the total size (including RawBank headers) - // m_sizes is indexed in the same way as selected_events - size_t event_size = m_sizes[i] + header_size + bank_header_size + dec_report_size; + // event_sizes is indexed in the same way as selected_events + size_t event_size = event_sizes[i] + header_size + bank_header_size + dec_report_size; if (sel_report_size > 0) { event_size += bank_header_size + sel_report_size; } batch_buffer_size += event_size; } - auto [buffer_id, batch_span] = buffer(batch_buffer_size, batch_size); + auto batch_span = buffer(thread_id, batch_buffer_size, batch_size); // In case output was cancelled if (batch_span.empty()) return {false, 0}; @@ -103,8 +115,8 @@ std::tuple OutputHandler::output_selected_events( (sel_report_offsets[event_number + 1] - sel_report_offsets[event_number]) * sizeof(uint32_t); // add DecReport and SelReport sizes to the total size (including RawBank headers) - // m_sizes is indexed in the same way as selected_events - size_t event_size = m_sizes[i] + header_size + bank_header_size + dec_report_size; + // event_sizes is indexed in the same way as selected_events + size_t event_size = event_sizes[i] + header_size + bank_header_size + dec_report_size; if (sel_report_size > 0) { event_size += bank_header_size + sel_report_size; } @@ -139,7 +151,7 @@ std::tuple OutputHandler::output_selected_events( m_input_provider->copy_banks( slice_index, event_number + start_event, - {event_span.data() + header_size, static_cast(m_sizes[i])}); + {event_span.data() + header_size, static_cast(event_sizes[i])}); // add the dec report Allen::add_raw_bank( @@ -148,7 +160,7 @@ std::tuple OutputHandler::output_selected_events( 1 << 13, {reinterpret_cast(dec_reports.data()) + dec_report_size * event_number, static_cast(dec_report_size)}, - event_span.data() + header_size + m_sizes[i]); + event_span.data() + header_size + event_sizes[i]); // add the sel report if (sel_report_size > 0) { @@ -158,7 +170,7 @@ std::tuple OutputHandler::output_selected_events( 1 << 13, {reinterpret_cast(sel_reports.data()) + sel_report_offsets[event_number] * sizeof(uint32_t), static_cast(sel_report_size)}, - event_span.data() + header_size + m_sizes[i] + bank_header_size + dec_report_size); + event_span.data() + header_size + event_sizes[i] + bank_header_size + dec_report_size); } if (m_checksum) { @@ -173,7 +185,7 @@ std::tuple OutputHandler::output_selected_events( output_event_offset += event_size; } - auto output_success = write_buffer(buffer_id); + auto output_success = write_buffer(thread_id); n_output += output_success ? batch_size : 0; } assert(n_events - n_output == 0); diff --git a/main/src/ZMQOutputSender.cpp b/main/src/ZMQOutputSender.cpp index 7c9b4004bf4853c84835fea605b1c4b4de550bae..714d9a56324c0003f6e93aab3c84008d86741c6d 100644 --- a/main/src/ZMQOutputSender.cpp +++ b/main/src/ZMQOutputSender.cpp @@ -44,7 +44,7 @@ ZMQOutputSender::ZMQOutputSender( size_t const n_lines, IZeroMQSvc* zmqSvc, bool const checksum) : - OutputHandler {input_provider, receiver_connection, output_batch_size, n_lines, checksum}, + OutputHandler {input_provider, receiver_connection, 1u, output_batch_size, n_lines, checksum}, m_zmq {zmqSvc} { auto const pos = receiver_connection.rfind(":"); @@ -88,7 +88,7 @@ ZMQOutputSender::~ZMQOutputSender() } } -zmq::socket_t* ZMQOutputSender::client_socket() { return (m_connected && m_socket) ? &(*m_socket) : nullptr; } +zmq::socket_t* ZMQOutputSender::client_socket() const { return (m_connected && m_socket) ? &(*m_socket) : nullptr; } void ZMQOutputSender::handle() { @@ -103,10 +103,10 @@ void ZMQOutputSender::handle() } } -std::tuple> ZMQOutputSender::buffer(size_t buffer_size, size_t) +gsl::span ZMQOutputSender::buffer(size_t, size_t buffer_size, size_t) { m_buffer.rebuild(buffer_size); - return {0, gsl::span {static_cast(m_buffer.data()), static_cast(buffer_size)}}; + return gsl::span {static_cast(m_buffer.data()), static_cast(buffer_size)}; } bool ZMQOutputSender::write_buffer(size_t) diff --git a/mdf/test/test_mep_banks.cpp b/mdf/test/test_mep_banks.cpp index 9078ff7bc9975ee872a70270e8ead1b495517723..7e1927b034a9b56db040e8e334d1eead7e94ffd9 100644 --- a/mdf/test/test_mep_banks.cpp +++ b/mdf/test/test_mep_banks.cpp @@ -128,7 +128,7 @@ IInputProvider* mep_provider(std::string json_file) sc &= provider_prop->setProperty("EvtMax", std::to_string(s_config.n_events)); sc &= provider_prop->setProperty("SplitByRun", "0"); sc &= provider_prop->setProperty("Source", "\"Files\""); - sc &= provider_prop->setProperty("BufferConfig", "(2, 1)"); + sc &= provider_prop->setProperty("BufferConfig", "(2, 2)"); sc &= provider_prop->setProperty("TransposeMEPs", std::to_string(s_config.transpose_mep)); sc &= provider_prop->setProperty("OutputLevel", s_config.debug ? "2" : "3"); @@ -144,6 +144,7 @@ IInputProvider* mep_provider(std::string json_file) sc &= app->initialize(); sc &= app->start(); + sc &= app->stop(); return dynamic_cast(provider.get()); } @@ -259,7 +260,6 @@ int main(int argc, char* argv[]) mdf.reset(); if (app) { - app->stop().ignore(); app->finalize().ignore(); } return r;