diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml deleted file mode 100644 index dcdad8c11c649fb163d67f4bdb31214689486eed..0000000000000000000000000000000000000000 --- a/.gitlab-ci.yml +++ /dev/null @@ -1,104 +0,0 @@ -############################################################################### -# (c) Copyright 2018-2020 CERN for the benefit of the LHCb Collaboration # -############################################################################### - -variables: - GIT_SUBMODULE_STRATEGY: recursive - TARGET_BRANCH: master - - ALLEN_DATA: "/scratch/allen_data" - LCG_VERSION: "LCG_99" - - PROFILE_DEVICE: "a40" - RUN_THROUGHPUT_OPTIONS_CUDAPROF: "-n 500 -m 500 -r 1 -t 1" - RUN_THROUGHPUT_OPTIONS_CUDA: "-n 500 -m 500 -r 1000 -t 16" - RUN_THROUGHPUT_OPTIONS_HIP: "-n 5000 --events-per-slice 5000 -m 3000 -t 10 -r 1000" - RUN_THROUGHPUT_OPTIONS_CPU: "-n 100 -m 100 -r 200" - - OVERRIDE_CUDA_ARCH_FLAG: "-gencode=arch=compute_70,code=sm_70 -gencode=arch=compute_75,code=sm_75 -gencode=arch=compute_86,code=sm_86" - -stages: - - check # Ensures the CI environment is valid - - build docker - - build - - run # Build and run (throughput, efficiency, etc...) - - test # Runs various tests of the software - - publish # Publishes the results of the tests and runs in channels and grafana - - - manual trigger # Blocks full pipeline from running in merge requests - - build full - - run full # Build and run (full child pipelines) - - test full # Tests (full) - - publish full # Publishing (full) - -check-env: - stage: check - except: - - /.*/@lhcb/Allen - script: - - | - echo "The Allen CI depends on custom GitLab runners and therefore tests" - echo "running on forks will fail. Please create a branch in the main" - echo "repository at https://gitlab.cern.ch/lhcb/Allen/" - - exit 1 - - -.active_branches: &active_branches - only: - refs: - - master - - web - - schedules - - merge_requests - - -check-copyright: - <<: *active_branches - - stage: check - image: gitlab-registry.cern.ch/ci-tools/ci-worker:cc7 - script: - - curl -o lb-check-copyright "https://gitlab.cern.ch/lhcb-core/LbDevTools/-/raw/master/LbDevTools/SourceTools.py?inline=False" - - python lb-check-copyright --license=Apache-2.0 origin/${TARGET_BRANCH} - needs: [] - -check-formatting: - <<: *active_branches - stage: check - image: gitlab-registry.cern.ch/lhcb-docker/style-checker - script: - - | - if [ ! -e .clang-format ] ; then - curl -o .clang-format "https://gitlab.cern.ch/lhcb-parallelization/Allen/raw/master/.clang-format?inline=false" - echo '.clang-format' >> .gitignore - git add .gitignore - fi - - - curl -o lb-format "https://gitlab.cern.ch/lhcb-core/LbDevTools/raw/master/LbDevTools/SourceTools.py?inline=false" - - - python lb-format --format-patch apply-formatting.patch origin/master - artifacts: - paths: - - apply-formatting.patch - when: on_failure - expire_in: 1 week - needs: [] - - -docker_image:build: - stage: build docker - only: - refs: - - master - - web - - schedules - tags: - - docker-image-build - script: "echo 'Building Allen dev docker image" - variables: - TO: $CI_REGISTRY_IMAGE:latest - GIT_SUBMODULE_STRATEGY: recursive - allow_failure: true - needs: [] - -include: "scripts/ci/config/main.yaml" \ No newline at end of file diff --git a/Dumpers/BinaryDumpers/CMakeLists.txt b/Dumpers/BinaryDumpers/CMakeLists.txt index 382efe6fcf937b44f2858cf239a48d48607118d9..a28c5f184a1bfe655b4254a0063f1291db8522e0 100644 --- a/Dumpers/BinaryDumpers/CMakeLists.txt +++ b/Dumpers/BinaryDumpers/CMakeLists.txt @@ -32,14 +32,14 @@ gaudi_add_library(BinaryDumpersLib INCLUDE_DIRS Event/FTEvent Pr/PrPixel Muon/MuonID LINK_LIBRARIES DAQEventLib DAQKernelLib GaudiAlgLib PrKernel VPDetLib UTDetLib UTKernelLib - FTDetLib MuonDAQLib) + FTDetLib MuonDAQLib DumperUtilsLib) gaudi_add_module(BinaryDumpers src/*.cpp INCLUDE_DIRS Event/FTEvent Det/VPDet Muon/MuonID LINK_LIBRARIES DAQEventLib DAQKernelLib GaudiAlgLib PrKernel VPDetLib UTDetLib UTKernelLib - CaloDetLib FTDetLib MuonDAQLib MCEvent) + CaloDetLib FTDetLib MuonDAQLib MCEvent DumperUtilsLib) # FIXME: USE_DD4HEP doesn't seem to be inherited from LHCb libraries if (TARGET Detector::DetectorLib) diff --git a/Dumpers/BinaryDumpers/src/DumpCaloGeometry.cpp b/Dumpers/BinaryDumpers/src/DumpCaloGeometry.cpp index 27872de125fda93e644272f60b86e71cd11d159f..330b8193e4c30b96f3ff14f050560e02e4b7add3 100644 --- a/Dumpers/BinaryDumpers/src/DumpCaloGeometry.cpp +++ b/Dumpers/BinaryDumpers/src/DumpCaloGeometry.cpp @@ -17,7 +17,7 @@ #include #include #include "DumpCaloGeometry.h" -#include "Utils.h" +#include DECLARE_COMPONENT(DumpCaloGeometry) diff --git a/Dumpers/BinaryDumpers/src/DumpFTHits.cpp b/Dumpers/BinaryDumpers/src/DumpFTHits.cpp index 8a9d7c262645b3af5dd4818c3e91153178c2f967..3eb6d062f1b807967cf47ed3a83644d849da61de 100644 --- a/Dumpers/BinaryDumpers/src/DumpFTHits.cpp +++ b/Dumpers/BinaryDumpers/src/DumpFTHits.cpp @@ -9,7 +9,7 @@ #include "Event/VPLightCluster.h" #include "DumpFTHits.h" -#include "Utils.h" +#include namespace fs = boost::filesystem; diff --git a/Dumpers/BinaryDumpers/src/DumpForwardTracks.cpp b/Dumpers/BinaryDumpers/src/DumpForwardTracks.cpp index 32589e0e9c675273f4e0974535796e6a5e0e9857..7d76da43d88f63c4b8d2f74df3def99b1f021eac 100644 --- a/Dumpers/BinaryDumpers/src/DumpForwardTracks.cpp +++ b/Dumpers/BinaryDumpers/src/DumpForwardTracks.cpp @@ -6,7 +6,7 @@ #include #include "DumpForwardTracks.h" -#include "Utils.h" +#include namespace fs = boost::filesystem; diff --git a/Dumpers/BinaryDumpers/src/DumpMuonCommonHits.cpp b/Dumpers/BinaryDumpers/src/DumpMuonCommonHits.cpp index 17bcc5baca945fbad00c240f65c311d9a6575e48..5b225b34af2fb28065c94a1ffa13a3d8369dd990 100644 --- a/Dumpers/BinaryDumpers/src/DumpMuonCommonHits.cpp +++ b/Dumpers/BinaryDumpers/src/DumpMuonCommonHits.cpp @@ -6,7 +6,7 @@ #include #include "DumpMuonCommonHits.h" -#include "Utils.h" +#include namespace { using std::string; diff --git a/Dumpers/BinaryDumpers/src/DumpMuonCoords.cpp b/Dumpers/BinaryDumpers/src/DumpMuonCoords.cpp index b2427ee9479b3758baff10dc83d9844727d16d8a..07c5f646c32da154377de558b9b2686315864ab3 100644 --- a/Dumpers/BinaryDumpers/src/DumpMuonCoords.cpp +++ b/Dumpers/BinaryDumpers/src/DumpMuonCoords.cpp @@ -6,7 +6,7 @@ #include #include "DumpMuonCoords.h" -#include "Utils.h" +#include namespace { using std::string; diff --git a/Dumpers/BinaryDumpers/src/DumpMuonTable.cpp b/Dumpers/BinaryDumpers/src/DumpMuonTable.cpp index f382201df315cb7cee2ce6e51d040b4c702f16c0..3c78665a4b7a53fddb4bb92e0b6b6bf31920314e 100644 --- a/Dumpers/BinaryDumpers/src/DumpMuonTable.cpp +++ b/Dumpers/BinaryDumpers/src/DumpMuonTable.cpp @@ -26,7 +26,7 @@ namespace ranges::views { #include #include "DumpMuonTable.h" -#include "Utils.h" +#include namespace { using boost::numeric_cast; diff --git a/Dumpers/BinaryDumpers/src/DumpRawBanks.cpp b/Dumpers/BinaryDumpers/src/DumpRawBanks.cpp index a59e675f01069153696744d3577aab1d0f34d672..f18ec045f4cffe0c40f997059456f1c5ca055004 100644 --- a/Dumpers/BinaryDumpers/src/DumpRawBanks.cpp +++ b/Dumpers/BinaryDumpers/src/DumpRawBanks.cpp @@ -15,7 +15,7 @@ #include #include -#include "Utils.h" +#include namespace { using std::to_string; diff --git a/Dumpers/BinaryDumpers/src/DumpUTHits.cpp b/Dumpers/BinaryDumpers/src/DumpUTHits.cpp index ed2d0de7a2c408371b9314c50690b5aaabfdbc8b..5df89588c60f6ce93873f8716016be07e46bccf7 100644 --- a/Dumpers/BinaryDumpers/src/DumpUTHits.cpp +++ b/Dumpers/BinaryDumpers/src/DumpUTHits.cpp @@ -6,7 +6,7 @@ #include #include "DumpUTHits.h" -#include "Utils.h" +#include namespace fs = boost::filesystem; diff --git a/Dumpers/BinaryDumpers/src/DumpUTLookupTables.cpp b/Dumpers/BinaryDumpers/src/DumpUTLookupTables.cpp index badf0e7201015f436950dc86c04ffaff07deb97d..850a3d3c0f7b4de802691dd72c2ef9335a99fde1 100644 --- a/Dumpers/BinaryDumpers/src/DumpUTLookupTables.cpp +++ b/Dumpers/BinaryDumpers/src/DumpUTLookupTables.cpp @@ -5,7 +5,7 @@ #include #include "DumpUTLookupTables.h" -#include "Utils.h" +#include DECLARE_COMPONENT(DumpUTLookupTables) diff --git a/Dumpers/BinaryDumpers/src/PVDumper.cpp b/Dumpers/BinaryDumpers/src/PVDumper.cpp index 27a52144dd0b58638a4f948337ac9259c5c0d347..91b6ed23bfddd45b028f62787bc4f12a9983be80 100644 --- a/Dumpers/BinaryDumpers/src/PVDumper.cpp +++ b/Dumpers/BinaryDumpers/src/PVDumper.cpp @@ -6,7 +6,7 @@ // local #include "PVDumper.h" #include "Associators/Associators.h" -#include "Utils.h" +#include #include namespace { diff --git a/Dumpers/BinaryDumpers/src/TestMuonTable.cpp b/Dumpers/BinaryDumpers/src/TestMuonTable.cpp index 15276f3239372beed557b6fc054f2b726243449d..b5cc982037ce68b18765a0cb0c759dd09f610b29 100644 --- a/Dumpers/BinaryDumpers/src/TestMuonTable.cpp +++ b/Dumpers/BinaryDumpers/src/TestMuonTable.cpp @@ -8,7 +8,7 @@ #include #include "TestMuonTable.h" -#include "Utils.h" +#include namespace { using std::array; diff --git a/Dumpers/BinaryDumpers/src/TransposeRawBanks.cpp b/Dumpers/BinaryDumpers/src/TransposeRawBanks.cpp index 39f2f626de0b33870e9603779c1c917dcbebbc89..c5551841d5132ed4a8fa956c89d09e87135116fe 100644 --- a/Dumpers/BinaryDumpers/src/TransposeRawBanks.cpp +++ b/Dumpers/BinaryDumpers/src/TransposeRawBanks.cpp @@ -12,7 +12,7 @@ #include #include #include -#include "Utils.h" +#include template using VOC = Gaudi::Functional::vector_of_const_; diff --git a/Dumpers/BinaryDumpers/src/Utils.cpp b/Dumpers/BinaryDumpers/src/Utils.cpp deleted file mode 100644 index 0b61bc2be2bf0d50ca12021f314bdef2d8a0d4b7..0000000000000000000000000000000000000000 --- a/Dumpers/BinaryDumpers/src/Utils.cpp +++ /dev/null @@ -1,38 +0,0 @@ -/*****************************************************************************\ -* (c) Copyright 2000-2018 CERN for the benefit of the LHCb Collaboration * -\*****************************************************************************/ -#include -#include - -#include "Utils.h" - -namespace { - namespace fs = boost::filesystem; -} - -bool DumpUtils::createDirectory(fs::path directory) -{ - if (!fs::exists(directory)) { - boost::system::error_code ec; - bool success = fs::create_directories(directory, ec); - success &= !ec; - return success; - } - return true; -} - -size_t MuonUtils::size_index( - std::array const& offset, - std::array const& gridX, - std::array const& gridY, - LHCb::MuonTileID const& tile) -{ - auto idx = 4 * tile.station() + tile.region(); - auto index = offset[idx] + tile.quarter() * gridY[idx] * 6; - if (tile.nY() < static_cast(gridY[idx])) { - return index + 2 * tile.nY() + 2 * (tile.nX() - gridX[idx]) / gridX[idx]; - } - else { - return index + 4 * tile.nY() - 2 * gridY[idx] + (2 * tile.nX() / gridX[idx]); - } -} diff --git a/device/SciFi/consolidate/src/ConsolidateSciFi.cu b/device/SciFi/consolidate/src/ConsolidateSciFi.cu index 8b9f4df73475151645d61d9cb99b0ff28c10d2b8..4cfc4232baa0eeb74be3848268ded36c83b06ccf 100644 --- a/device/SciFi/consolidate/src/ConsolidateSciFi.cu +++ b/device/SciFi/consolidate/src/ConsolidateSciFi.cu @@ -26,9 +26,9 @@ void scifi_consolidate_tracks::scifi_consolidate_tracks_t::operator()( global_function(scifi_consolidate_tracks)(dim3(size(arguments)), property(), context)( arguments, constants.dev_looking_forward_constants, constants.dev_magnet_polarity.data()); + assign_to_host_buffer(host_buffers.host_atomics_scifi, arguments, context); if (runtime_options.do_check) { // Transmission device to host of Scifi consolidated tracks - assign_to_host_buffer(host_buffers.host_atomics_scifi, arguments, context); assign_to_host_buffer( host_buffers.host_scifi_track_hit_number, arguments, context); assign_to_host_buffer(host_buffers.host_scifi_track_hits, arguments, context); diff --git a/device/UT/consolidate/src/ConsolidateUT.cu b/device/UT/consolidate/src/ConsolidateUT.cu index c879888afbc88e58407b3c3488a4f2c58969d255..a52a0358b2c72cb33ac2debe4d926c69f5829da3 100644 --- a/device/UT/consolidate/src/ConsolidateUT.cu +++ b/device/UT/consolidate/src/ConsolidateUT.cu @@ -28,8 +28,8 @@ void ut_consolidate_tracks::ut_consolidate_tracks_t::operator()( global_function(ut_consolidate_tracks)(dim3(size(arguments)), property(), context)( arguments, constants.dev_unique_x_sector_layer_offsets.data()); + assign_to_host_buffer(host_buffers.host_atomics_ut, arguments, context); if (runtime_options.do_check) { - assign_to_host_buffer(host_buffers.host_atomics_ut, arguments, context); assign_to_host_buffer(host_buffers.host_ut_track_hit_number, arguments, context); assign_to_host_buffer(host_buffers.host_ut_track_hits, arguments, context); assign_to_host_buffer(host_buffers.host_ut_qop, arguments, context); diff --git a/device/velo/consolidate_tracks/src/VeloConsolidateTracks.cu b/device/velo/consolidate_tracks/src/VeloConsolidateTracks.cu index f2a308110d183e9270cd4b2adc39070d947e46c2..28c66b324c458e418c19de5926bb3cac8260f188 100644 --- a/device/velo/consolidate_tracks/src/VeloConsolidateTracks.cu +++ b/device/velo/consolidate_tracks/src/VeloConsolidateTracks.cu @@ -30,11 +30,12 @@ void velo_consolidate_tracks::velo_consolidate_tracks_t::operator()( // Set all found tracks to accepted initialize(arguments, 1, context); + assign_to_host_buffer(host_buffers.host_atomics_velo, arguments, context); global_function(velo_consolidate_tracks)(size(arguments), property(), context)( arguments); if (runtime_options.do_check) { - assign_to_host_buffer(host_buffers.host_atomics_velo, arguments, context); + assign_to_host_buffer(host_buffers.host_atomics_velo, arguments, context); assign_to_host_buffer( host_buffers.host_velo_track_hit_number, arguments, context); assign_to_host_buffer(host_buffers.host_velo_track_hits, arguments, context); diff --git a/main/include/AllenThreads.h b/main/include/AllenThreads.h index d92e0806bc7e4ebb0e8b7406ebe7f4bdbc885f09..01d6dbf9cd6390418abe975343a0bb5e7ecb1593 100644 --- a/main/include/AllenThreads.h +++ b/main/include/AllenThreads.h @@ -24,6 +24,14 @@ void run_output( void run_slices(const size_t thread_id, IZeroMQSvc* zmqSvc, IInputProvider* input_provider); +void loop_slices( + const size_t thread_id, + IZeroMQSvc* zmqSvc, + IInputProvider* input_provider, + const size_t number_of_repetitions, + const size_t number_of_slices, + const unsigned events_per_slice); + void run_stream( size_t const thread_id, size_t const stream_id, diff --git a/main/include/BankTypes.h b/main/include/BankTypes.h index 24db28795c504fc4733f6685b7cd3f3373dfca24..cd3ade9d12798e2cfee91d538a815b5088677598 100644 --- a/main/include/BankTypes.h +++ b/main/include/BankTypes.h @@ -35,7 +35,7 @@ const std::unordered_map BankSizes = {{BankTypes::VP, 12.f}, // Average measured event size, measured // FIXME: make this configurable -constexpr float average_event_size = 65.f; +constexpr float average_event_size = 150.f; // Safety margin // FIXME: make this configurable constexpr float bank_size_fudge_factor = 1.5f; diff --git a/main/include/InputProvider.h b/main/include/InputProvider.h index e784dd5298b933716b1c10402cfea91c505b001b..57a4e9bd683c2c6740686f402c41165d8db1304b 100644 --- a/main/include/InputProvider.h +++ b/main/include/InputProvider.h @@ -43,7 +43,7 @@ struct IInputProvider { * * @param optional timeout in ms to wait for slice * - * @return tuple of (success, eof, timed_out, slice_index, n_filled) + * @return tuple of (success, eof, timed_out, slice_index, n_filled, run_number) */ virtual std::tuple get_slice( boost::optional timeout = boost::optional {}) = 0; @@ -67,6 +67,10 @@ struct IInputProvider { std::vector& sizes) const = 0; virtual void copy_banks(size_t const slice_index, unsigned int const event, gsl::span buffer) const = 0; + + virtual bool is_done() = 0; + + virtual size_t slice_size() = 0; }; // InputProvider @@ -171,6 +175,10 @@ public: int stop() override { return true; }; + bool is_done() override { return true; }; + + size_t slice_size() override { return 0; }; + protected: template void debug_output(const MSG& msg, boost::optional const thread_id = {}) const diff --git a/main/include/MDFProvider.h b/main/include/MDFProvider.h index a2ead7d0c1ad9f79737f6e5c164e379f5fe6280d..2f9025117d418eac03f1d5e84251386ef0429225 100644 --- a/main/include/MDFProvider.h +++ b/main/include/MDFProvider.h @@ -209,12 +209,11 @@ public: /// Destructor virtual ~MDFProvider() { - // Set flag to indicate the prefetch thread should exit, wake it // up and join it m_done = true; m_transpose_done = true; - m_prefetch_cond.notify_one(); + m_prefetch_cond.notify_all(); if (m_prefetch_thread) m_prefetch_thread->join(); // Set a flat to indicate all transpose threads should exit, wake @@ -398,6 +397,14 @@ public: std::memcpy(buffer.data(), banks_start, event_size); } + bool is_done() override + { + const bool done = m_transpose_done; + return done; + }; + + size_t slice_size() override { return this->events_per_slice(); }; + private: size_t count_writable() const { @@ -461,6 +468,7 @@ private: // woken up be the desctructor before a slice was declared // free. In that case, exit without transposing if (m_transpose_done && it == m_slice_free.end()) { + this->debug_output("Exiting without transposing, desctructor was called", thread_id); break; } } @@ -587,6 +595,7 @@ private: bool eof = false, error = false, buffer_full = false, prefetch_done = false; size_t bytes_read = 0; + size_t n_filled = 0; auto to_read = this->n_events(); size_t eps = this->events_per_slice(); @@ -622,6 +631,7 @@ private: } else { it = find_writable(); + this->debug_output("Found writeable buffer"); } } // Flag the prefetch buffer as unavailable @@ -634,16 +644,29 @@ private: // needed while (true) { size_t read = std::get<0>(read_buffer); - size_t to_prefetch = to_read ? std::min(eps, *to_read) : eps; + // size_t to_prefetch = to_ead ? std::min(eps, *to_read) : eps; + size_t to_prefetch = this->n_events() ? std::min(eps, *(this->n_events()) - n_filled) : eps; + this->debug_output( + "Events to prefetch = " + std::to_string(to_prefetch) + ", events to read = " + std::to_string(*to_read) + + ", events per slice = " + std::to_string(eps)); std::tie(eof, error, buffer_full, bytes_read) = - read_events(*m_input, read_buffer, m_header, m_compress_buffer, to_prefetch, m_config.check_checksum); + read_events(*m_input, read_buffer, m_header, m_compress_buffer, eps, m_config.check_checksum); size_t n_read = std::get<0>(read_buffer) - read; + n_filled += n_read; + this->debug_output( + "N_read = " + std::to_string(n_read) + ", bytes_read = " + std::to_string(bytes_read) + + "read_previous = " + std::to_string(read) + ", read_now = " + std::to_string(std::get<0>(read_buffer)) + + ", eof = " + std::to_string(eof)); if (to_read) { *to_read -= std::min(*to_read, n_read); } - + /* this->debug_output( */ + /* "Number of events in slice = " + std::to_string(std::get<0>(read_buffer)) + ", eps = " + + * std::to_string(eps) + */ + /* ", buffer full = " + std::to_string(buffer_full)); */ if (error) { // Error encountered + this->debug_output("ERROR in read_events"); m_read_error = true; break; } @@ -688,6 +711,7 @@ private: { std::unique_lock lock {m_prefetch_mut}; m_prefetched.push_back(i_buffer); + this->debug_output("Prefetch buffer " + std::to_string(i_buffer) + " is ready for transposing"); } if (prefetch_done) { m_done = prefetch_done; diff --git a/main/include/MEPProvider.h b/main/include/MEPProvider.h index 7e5efd5aad84de7cf7ade2eb880a2fdbcdcd4de9..c7a6becb469eb14d24fcc065309a8e9ed154a20d 100644 --- a/main/include/MEPProvider.h +++ b/main/include/MEPProvider.h @@ -65,8 +65,6 @@ struct MEPProviderConfig { // Use MPI and number of receivers bool mpi = false; - bool non_stop = true; - bool transpose_mep = false; bool split_by_run = false; @@ -263,7 +261,7 @@ public: * * @param optional timeout * - * @return (good slice, timed out, slice index, number of events in slice) + * @return (good slice, done, timed out, slice index, number of events in slice) */ std::tuple get_slice( boost::optional timeout = boost::optional {}) override @@ -457,6 +455,17 @@ public: return true; }; + bool is_done() override + { + const bool done = m_transpose_done; + return done; + }; + + size_t slice_size() override + { + return (this->events_per_slice() < m_packing_factor) ? this->events_per_slice() : m_packing_factor; + }; + private: void init_mpi() { @@ -682,12 +691,7 @@ bool open_file() const while (!good) { // If looping on input is configured, do it if (m_current == m_connections.end()) { - if (m_config.non_stop) { - m_current = m_connections.begin(); - } - else { - break; - } + break; } if (m_input) m_input->close(); @@ -810,6 +814,7 @@ void mep_read() while (!success || eof) { std::tie(eof, success, mep_header, buffer_span) = MEP::read_mep(*m_input, read_buffer); + host_register(read_buffer.data(), read_buffer.size(), Allen::hostRegisterPortable); if (!eof) { debug_cout << "Read mep with packing factor " << mep_header.packing_factor << "\n"; @@ -896,7 +901,7 @@ void mpi_read() size_t number_of_meps = std::accumulate(n_meps.begin(), n_meps.end(), 0u); size_t current_mep = 0; - while (!m_done && (m_config.non_stop || current_mep < number_of_meps)) { + while (!m_done && current_mep < number_of_meps) { // info_cout << MPI::rank_str() << "round " << current_file << "\n"; // If we've been stopped, wait for start or exit diff --git a/main/src/Allen.cpp b/main/src/Allen.cpp index f23139c29dced62a9f8ab84e84e50d4fc2ed889f..b45b5a1b024811be08efdf92cd9fafb91701e81b 100644 --- a/main/src/Allen.cpp +++ b/main/src/Allen.cpp @@ -91,7 +91,6 @@ int allen( unsigned number_of_repetitions = 1; unsigned verbosity = 3; bool print_memory_usage = false; - bool non_stop = false; bool write_config = false; // do_check will be true when a MC validator algorithm was configured bool do_check = contains_validator_algorithm(); @@ -115,6 +114,7 @@ int allen( uint mon_save_period = 0; std::string mon_filename; bool disable_run_changes = 0; + std::string mode = "benchmark"; std::string flag, arg; const auto flag_in = [&flag](const std::vector& option_flags) { @@ -218,9 +218,6 @@ int allen( else if (flag_in({"print-config"})) { print_config = atoi(arg.c_str()); } - else if (flag_in({"non-stop"})) { - non_stop = atoi(arg.c_str()); - } else if (flag_in({"print-status"})) { print_status = atoi(arg.c_str()); } @@ -236,6 +233,14 @@ int allen( else if (flag_in({"disable-run-changes"})) { disable_run_changes = atoi(arg.c_str()); } + else if (flag_in({"mode"})) { + mode = arg; + if (!(mode == "throughput" || mode == "benchmark" || mode == "datataking" || mode == "MC-check")) { + error_cout << "Selected mode has to be either throughput, benchmark, datataking or MC-check, but it is " << mode + << std::endl; + exit(1); + } + } } // Options sanity check @@ -268,14 +273,63 @@ int allen( // Show call options print_call_options(options, device_name); + // Mode setup + if (mode == "benchmark") { + if (number_of_events_requested > 0) { + if (events_per_slice != number_of_events_requested) { + info_cout << "Setting events per slice equal to number of events requested in benchmark mode: " + << number_of_events_requested << " events \n"; + events_per_slice = number_of_events_requested; + } + } + else if (events_per_slice == 0) { + info_cout << "Setting events per slice to default (=1000) \n"; + events_per_slice = 1000; + } + if (number_of_slices != 0) { + info_cout << "Number of slices cannot be set in benchmark mode, setting to default \n"; + number_of_slices = 0; + } + } + else if (mode == "throughput") { + if (number_of_events_requested != 0) { + info_cout << "Number of events (-n) cannot be set in throughput mode, it is specified by events_per_slice * " + "number_of_slices * repititions \n"; + number_of_events_requested = 0; + } + } + else if (mode == "datataking") { + if (number_of_repetitions != 1) { + info_cout << "Number of repetitions must be 1 in datataking mode, setting it to 1 \n"; + number_of_repetitions = 1; + } + if (number_of_events_requested != 0) { + info_cout << "Number of events (-n) cannot be set in datataking mode, events are processed continuously as they " + "arrive \n"; + number_of_events_requested = 0; + } + } + else if (mode == "MC-check") { + if (events_per_slice != 1000) { + info_cout << "Events per slice cannot be set in MC-check mode, setting to default (=1000) \n"; + events_per_slice = 1000; + } + if (number_of_slices != 0) { + info_cout << "Number of slices cannot be set in MC-check mode, setting to default \n"; + number_of_slices = 0; + } + if (number_of_repetitions != 1) { + info_cout << "Number of repetitions must be 1 in MC-check mode, setting it to 1 \n"; + number_of_repetitions = 1; + } + } + // Determine wether to run with async I/O. bool enable_async_io = true; size_t n_io_reps = number_of_repetitions; - if ((number_of_slices == 0 || number_of_slices == 1) && number_of_repetitions > 1) { - // NOTE: Special case to be able to compare throughput with and - // without async I/O; if repetitions are requested and the number - // of slices is default (0) or 1, never free the initially filled - // slice. + // if ((number_of_slices == 0 || number_of_slices == 1) && number_of_repetitions > 1) { + if (mode == "benchmark") { + // NOTE: in benchmark mode the initially filled slice will be re-used and not freed enable_async_io = false; number_of_slices = 1; n_io_reps = 1; @@ -300,6 +354,7 @@ int allen( // Set a sane default for the number of events per input slice if (number_of_events_requested != 0 && events_per_slice > number_of_events_requested) { events_per_slice = number_of_events_requested; + info_cout << "Setting number of events per slice to " << events_per_slice << "\n"; } std::unique_ptr configuration_reader; @@ -333,12 +388,19 @@ int allen( // Create the InputProvider, either MDF or MEP // info_cout << with_mpi << ", " << mdf_input[0] << "\n"; if (!mep_input.empty() || with_mpi) { + size_t n_buffers = 10; + if (mode == "throughput") { + n_events = events_per_slice * number_of_slices; + info_cout << "Setting n_events to be processed by MEP provider to " << events_per_slice * number_of_slices + << std::endl; + n_buffers = number_of_slices; + } + MEPProviderConfig config {false, // verify MEP checksums - 10, // number of read buffers + n_buffers, // number of read buffers mep_layout ? 1u : 4u, // number of transpose threads mpi_window_size, // MPI sliding window size with_mpi, // Receive from MPI or read files - non_stop, // Run the application non-stop !mep_layout, // MEPs should be transposed to Allen layout !disable_run_changes, // Whether to split slices by run number receivers}; // Map of receiver to MPI rank to receive from @@ -353,8 +415,13 @@ int allen( } else if (!mdf_input.empty()) { mep_layout = false; + size_t n_buffers = 10; + if (mode == "throughput") { + n_buffers = number_of_slices; + n_events = events_per_slice * number_of_slices; + } MDFProviderConfig config {false, // verify MDF checksums - 10, // number of read buffers + n_buffers, // number of read buffers 4, // number of transpose threads events_per_slice * 10 + 1, // maximum number event of offsets in read buffer events_per_slice, // number of events per read buffer @@ -507,7 +574,13 @@ int allen( // Lambda with the execution of the input thread that polls the // input provider for slices. const auto slice_thread = [&](unsigned thread_id, unsigned) { - return std::thread {run_slices, thread_id, zmqSvc, input_provider.get()}; + if (mode == "benchmark" || mode == "datataking" || mode == "MC-check") { + return std::thread {run_slices, thread_id, zmqSvc, input_provider.get()}; + } + else { + return std::thread { + loop_slices, thread_id, zmqSvc, input_provider.get(), n_io_reps, number_of_slices, events_per_slice}; + } }; // Lambda with the execution of the output thread @@ -620,6 +693,7 @@ int allen( return s + (stat.at(0) == status); }); }; + std::vector slice_repetitions_throughput_mode(number_of_slices, 0); // counters for bookkeeping size_t prev_processor = 0; @@ -666,6 +740,7 @@ int allen( auto& socket = std::get<1>(streams[i]); auto msg = zmqSvc->receive(socket); if (msg == "SPLIT") { + debug_cout << "in SPLIT mode" << std::endl; // This slice required too much memory to process // return it to the I/O thread for splitting auto slice_index = zmqSvc->receive(socket); @@ -706,8 +781,12 @@ int allen( auto slice_index = zmqSvc->receive(socket); auto first_event = zmqSvc->receive(socket); auto buffer_index = zmqSvc->receive(socket); + auto slice_repetition_index = zmqSvc->receive(socket); n_events_processed += events_in_slice[slice_index][first_event]; n_events_measured += events_in_slice[slice_index][first_event]; + debug_cout << "Updating n_events_processed to " << n_events_processed + << ", events in slice = " << events_in_slice[slice_index][first_event] << " in stream " << i + << std::endl; ++slices_processed; stream_ready[i] = true; @@ -740,9 +819,15 @@ int allen( } // Add the slice and buffer to the queue for output - write_queue.push(std::make_tuple(slice_index, first_event, buffer_index)); + if (mode == "throughput") { + write_queue.push(std::make_tuple(slice_index, slice_repetition_index, buffer_index)); + input_slice_status[slice_index][slice_repetition_index] = SliceStatus::Processed; + } + else { + write_queue.push(std::make_tuple(slice_index, first_event, buffer_index)); + input_slice_status[slice_index][first_event] = SliceStatus::Processed; + } - input_slice_status[slice_index][first_event] = SliceStatus::Processed; buffer_manager->returnBufferFilled(buffer_index); } } @@ -835,14 +920,33 @@ int allen( } // FIXME: make the warmup time configurable - if (!t && (number_of_repetitions == 1 || (slices_processed >= 5 * number_of_threads) || !enable_async_io)) { + if ( + !t && + (number_of_repetitions == 1 || (slices_processed >= 5 * number_of_threads) || (mode == "benchmark"))) { info_cout << "Starting timer for throughput measurement\n"; throughput_start = n_events_processed * number_of_repetitions; t = Timer {}; previous_time_measurement = t->get_elapsed_time(); } - input_slice_status[*slice_index][0] = SliceStatus::Filled; - events_in_slice[*slice_index][0] = n_filled; + // In throughput mode we can receive a slice that has not been finished by a stream yet + // In this case, use the sub-slice queue to queue it for processing + // The slice repetition index is used within input_slice_status to keep track of how many times + // slice[slice_index] is processed, while other streams have not yet finished processing this slice + if (mode == "throughput" && input_slice_status[*slice_index][0] != SliceStatus::Empty) { + auto slice_repetition_index = ++slice_repetitions_throughput_mode[*slice_index]; + input_slice_status[*slice_index][slice_repetition_index] = SliceStatus::Filled; + debug_cout << "At slice index " << *slice_index << " filling sub-slice " << slice_repetition_index + << std::endl; + sub_slice_queue.push({*slice_index, slice_repetition_index, events_in_slice[*slice_index][0]}); + slice_index.reset(); // process this slice in the sub-slice queue, rather than in the if statement + // checking for a new slice_index + } + else { + input_slice_status[*slice_index][0] = SliceStatus::Filled; + events_in_slice[*slice_index][0] = n_filled; + debug_cout << "Slice " << *slice_index << " filled with " << n_filled << " events \n"; + } + // debug_cout << "Slice " << *slice_index << " filled with " << n_filled << " events \n"; n_events_read += n_filled; // If we have a slice we must send it for processing before polling remaining I/O threads break; @@ -850,7 +954,7 @@ int allen( else if (msg == "RUN") { run_change = true; next_run_number = zmqSvc->receive(socket); - debug_cout << "Requested run change from " << current_run_number << " to " << *next_run_number << std::endl; + debug_cout << "Requested run change from " << current_run_number << " to " << *next_run_number << "\n"; // guard against double run changes if we have multiple input threads if (disable_run_changes || *next_run_number == current_run_number) next_run_number.reset(); } @@ -860,12 +964,16 @@ int allen( auto buf_idx = zmqSvc->receive(socket); auto success = zmqSvc->receive(socket); auto n_written = zmqSvc->receive(socket); + auto slice_repetition_index = zmqSvc->receive(socket); n_events_output += n_written; n_output_measured += n_written; if (!success) { error_cout << "Failed to write output events.\n"; } - input_slice_status[slc_idx][first_evt] = SliceStatus::Written; + if (mode == "throughput") + input_slice_status[slc_idx][slice_repetition_index] = SliceStatus::Written; + else + input_slice_status[slc_idx][first_evt] = SliceStatus::Written; // check to see if any parts of this slice still need to be written bool slice_finished(true); @@ -878,9 +986,16 @@ int allen( if (enable_async_io && slice_finished) { input_slice_status[slc_idx].clear(); input_slice_status[slc_idx][0] = SliceStatus::Empty; - input_provider->slice_free(slc_idx); - events_in_slice[slc_idx].clear(); - events_in_slice[slc_idx][0] = 0; + if (mode == "datataking" || mode == "MC-check") { + input_provider->slice_free(slc_idx); + events_in_slice[slc_idx].clear(); + events_in_slice[slc_idx][0] = 0; + } + else if (mode == "throughput") { + debug_cout << "Requesting slice " << slc_idx << std::endl; + auto& socket_input = std::get<1>(io_workers[n_input - 1]); + zmqSvc->send(socket_input, "REQUEST_SLICE"); + } } buffer_manager->returnBufferWritten(buf_idx); @@ -901,11 +1016,11 @@ int allen( } } - // If there is a slice, send it to the next processor; when async - // I/O is disabled send the slice(s) to all streams + // If there is a slice, send it to the next processor + // in benchmark mode: send the slice(s) to all streams if (slice_index) { bool first = true; - while ((enable_async_io && first) || (!enable_async_io && stream_ready.count())) { + while ((enable_async_io && first) || ((mode == "benchmark") && stream_ready.count())) { first = false; size_t processor_index = prev_processor++; if (prev_processor == number_of_threads) { @@ -921,7 +1036,8 @@ int allen( zmqSvc->send(socket, *slice_index, send_flags::sndmore); zmqSvc->send(socket, size_t(0), send_flags::sndmore); zmqSvc->send(socket, events_in_slice[*slice_index][0], send_flags::sndmore); - zmqSvc->send(socket, *buffer_index); + zmqSvc->send(socket, *buffer_index, send_flags::sndmore); + zmqSvc->send(socket, size_t(0)); stream_ready[processor_index] = false; if (logger::verbosity() >= logger::debug) { @@ -944,14 +1060,24 @@ int allen( if (prev_processor == number_of_threads) { prev_processor = 0; } + input_slice_status[slice_idx][first_evt] = SliceStatus::Processing; + size_t slice_repetition_index = 0; + if (mode == "throughput") { + slice_repetition_index = first_evt; + first_evt = 0; + debug_cout << "At slice " << slice_idx << ": Fetching sub-slice " << slice_repetition_index + << " from the queue \n"; + } + buffer_index = std::optional {buffer_manager->assignBufferToFill()}; auto& socket = std::get<1>(streams[processor_index]); zmqSvc->send(socket, "PROCESS", send_flags::sndmore); zmqSvc->send(socket, slice_idx, send_flags::sndmore); zmqSvc->send(socket, first_evt, send_flags::sndmore); zmqSvc->send(socket, last_evt, send_flags::sndmore); - zmqSvc->send(socket, *buffer_index); + zmqSvc->send(socket, *buffer_index, send_flags::sndmore); + zmqSvc->send(socket, slice_repetition_index); stream_ready[processor_index] = false; if (logger::verbosity() >= logger::debug) { @@ -967,11 +1093,18 @@ int allen( input_slice_status[slc_index][first_event] = SliceStatus::Writing; + size_t slice_repetition_index = 0; + if (mode == "throughput") { + slice_repetition_index = first_event; + first_event = 0; + } + auto& socket = std::get<1>(io_workers[n_input]); zmqSvc->send(socket, "WRITE", send_flags::sndmore); zmqSvc->send(socket, slc_index, send_flags::sndmore); zmqSvc->send(socket, first_event, send_flags::sndmore); - zmqSvc->send(socket, buf_index); + zmqSvc->send(socket, buf_index, send_flags::sndmore); + zmqSvc->send(socket, slice_repetition_index); } // Send any available HostBuffers to montoring threads @@ -1002,6 +1135,7 @@ int allen( t_mon.restart(); } + // For monitoring (control event loop from outside) if (allen_control && items[control_index].revents & zmq::POLLIN) { auto msg = zmqSvc->receive(*allen_control); if (msg == "STOP") { @@ -1031,8 +1165,9 @@ int allen( // Separate if statement to allow stop in different ways // depending on whether async I/O or repetitions are enabled. // NOTE: This may be called several times when slices are ready - bool io_cond = - ((!enable_async_io && stream_ready.count() == number_of_threads) || (enable_async_io && io_done)) && !run_change; + bool io_cond = ((!enable_async_io && stream_ready.count() == number_of_threads) || + (enable_async_io && io_done && stream_ready.count() == number_of_threads)) && + !run_change; if (t && io_cond && number_of_repetitions > 1) { if (!throughput_processed) { throughput_processed = n_events_processed * number_of_repetitions - throughput_start; @@ -1096,8 +1231,16 @@ loop_error: // Print throughput measurement result if (t && throughput_processed) { - info_cout << (*throughput_processed / t->get()) << " events/s\n" - << "Ran test for " << t->get() << " seconds\n"; + info_cout << "Ran test for " << t->get() << " seconds on " << *throughput_processed << " events \n" + << (*throughput_processed / t->get()) << " events/s\n"; + uint n_requested_events = 0; + if (mode == "benchmark") + n_requested_events = number_of_events_requested * number_of_repetitions * number_of_threads; + else if (mode == "throughput") + n_requested_events = number_of_slices * events_per_slice * n_io_reps; + else if (mode == "MC-check") + n_requested_events = number_of_events_requested; + info_cout << "Number of requested events = " << n_requested_events << " \n"; } else if (!t) { warning_cout << "Timer wasn't started." diff --git a/main/src/AllenThreads.cpp b/main/src/AllenThreads.cpp index fceba810d37784e2376434105d3ce8cdff03f428..296c4281ea79fcb5bdfa7e1f489975f1cc7a4886 100644 --- a/main/src/AllenThreads.cpp +++ b/main/src/AllenThreads.cpp @@ -112,6 +112,7 @@ void run_output( auto slc_idx = zmqSvc->receive(control); auto first_evt = zmqSvc->receive(control); auto buf_idx = zmqSvc->receive(control); + auto slice_repetition_index = zmqSvc->receive(control); bool success = true; auto [passing_event_list, dec_reports, sel_reports, sel_report_offsets] = @@ -126,7 +127,8 @@ void run_output( zmqSvc->send(control, first_evt, send_flags::sndmore); zmqSvc->send(control, buf_idx, send_flags::sndmore); zmqSvc->send(control, success, send_flags::sndmore); - zmqSvc->send(control, static_cast(passing_event_list.size())); + zmqSvc->send(control, static_cast(passing_event_list.size()), send_flags::sndmore); + zmqSvc->send(control, slice_repetition_index); } } } @@ -188,6 +190,190 @@ void run_slices(const size_t thread_id, IZeroMQSvc* zmqSvc, IInputProvider* inpu if (done) { zmqSvc->send(control, "DONE"); timeout = -1; + debug_cout << "Sending DONE signal \n"; + } + } +} + +struct SliceInfo { + size_t slice_index; + size_t n_filled; + uint run_number; + int n_requested; +}; + +/** + * @brief Request slices from the input provider and report + * them to the main thread once all are ready; + * loop over the slices according to repetitions + * run from a separate thread + * + * @param thread ID of this I/O thread + * @param IInputProvider instance + * @param number of repetitions to loop over slices + * @param number of requested slices + * + * @return void + */ +void loop_slices( + const size_t thread_id, + IZeroMQSvc* zmqSvc, + IInputProvider* input_provider, + const size_t number_of_repetitions_requested, + const size_t number_of_slices_requested, + const unsigned events_per_slice) +{ + + // Create a control socket and connect it. + zmq::socket_t control = make_control(thread_id, zmqSvc); + + zmq::pollitem_t items[] = {{control, 0, zmq::POLLIN, 0}}; + + int timeout = -1; + uint current_run_number = 0; + std::vector slice_infos; + slice_infos.resize(number_of_slices_requested); + std::vector::iterator slice_it = slice_infos.begin(); + bool slices_ready = false; + uint n_events_filled = 0; + bool first_slices_sent = false; + bool slice_requested = false; + bool done = false; + bool incomplete_slice = false; + uint n_repetitions = 0; + + while (true) { + + // Check if there are messages without blocking + zmqSvc->poll(&items[0], 1, timeout); + + if (items[0].revents & zmq::POLLIN) { + auto msg = zmqSvc->receive(control); + if (msg == "DONE") { + break; + } + else if (msg == "START") { + timeout = 0; + } + else if (msg == "REQUEST_SLICE") { + slice_requested = true; + debug_cout << "Requested slice \n"; + } + } + + // Get all requested slices + if (!slices_ready && slice_it != slice_infos.end()) { + // Get a slice, the argument specifies the timeout in ms + auto [good, done, timed_out, slice_index, n_filled, run_number] = input_provider->get_slice(1000); + if (n_filled < events_per_slice) { + incomplete_slice = true; + debug_cout << "Incomplete slice: n_filled = " << n_filled << ", events_per_slice = " << events_per_slice + << std::endl; + } + // If the MEP packing factor is less than events_per_slice, the MEPProvider sends slices with the size of the + // packing factor If events_per_slice < packing_factor, but packing_factor is not a multiple of events_per_slice, + // smaller slices are sent in between + // -> don't accept either of these scenarios for now + if (input_provider->slice_size() != events_per_slice) { + error_cout << "Slice size not as requested! Asked for " << events_per_slice << ", got " + << input_provider->slice_size() << std::endl; + zmqSvc->send(control, "ERROR"); + timeout = -1; + continue; + } + if (!timed_out && good && n_filled != 0) { + int n_requested = 0; + SliceInfo slice_info = {slice_index, n_filled, run_number, n_requested}; + *slice_it = slice_info; + slice_it++; + n_events_filled += n_filled; + // debug_cout << "Got slice with index " << slice_index << " and " << n_filled << " events \n"; + } + else if (!good) { + zmqSvc->send(control, "ERROR"); + break; + } + } + + // Check that all requested events were filled into slices + const auto n_events_requested = number_of_slices_requested * events_per_slice; + if (!slices_ready) { + if (input_provider->is_done()) { + debug_cout << "Input provider is done" << std::endl; + } + if (n_events_filled == n_events_requested) { + slices_ready = true; + } + else if (input_provider->is_done() || incomplete_slice) { + if (n_events_filled < n_events_requested) { + error_cout << "Not enough events in input or mismatch between packing factor and events_per_slice: " + << n_events_requested << " requested, but only " << n_events_filled << " present \n"; + zmqSvc->send(control, "ERROR"); + timeout = -1; + } + } + } + + // Send off slices to streams once + if (slices_ready && !first_slices_sent) { + slice_it = slice_infos.begin(); + while (slice_it != slice_infos.end()) { + auto& slice = *slice_it; + // If run number has change then report this first + if (slice.run_number != current_run_number) { + current_run_number = slice.run_number; + zmqSvc->send(control, "RUN", send_flags::sndmore); + zmqSvc->send(control, current_run_number); + } + zmqSvc->send(control, "SLICE", send_flags::sndmore); + zmqSvc->send(control, slice.slice_index, send_flags::sndmore); + zmqSvc->send(control, slice.n_filled); + debug_cout << "Sending off first slices, at index " << slice.slice_index << "\n"; + slice_it++; + slice.n_requested++; + } + if (slice_it == slice_infos.end()) { + slice_it = slice_infos.begin(); + n_repetitions++; + } + first_slices_sent = true; + timeout = 500; + } + + // Loop over slices and send them off as requested by event loop + if (first_slices_sent && slice_requested && !done) { + // Stop after requested number of slices (including repetitions) + // debug_cout << "n_repetitions = " << n_repetitions << ", repetitions requested = " << + // number_of_repetitions_requested << std::endl; + if (n_repetitions == number_of_repetitions_requested) { + debug_cout << "Sending DONE signal" << std::endl; + zmqSvc->send(control, "DONE"); + timeout = -1; + done = true; + for (auto slice : slice_infos) { + info_cout << "Requested slice " << slice.slice_index << " " << slice.n_requested << " times \n"; + } + continue; + } + auto& slice = *slice_it; + if (slice.run_number != current_run_number) { + current_run_number = slice.run_number; + zmqSvc->send(control, "RUN", send_flags::sndmore); + zmqSvc->send(control, current_run_number); + } + zmqSvc->send(control, "SLICE", send_flags::sndmore); + zmqSvc->send(control, slice.slice_index, send_flags::sndmore); + zmqSvc->send(control, slice.n_filled); + debug_cout << "Sending off continuous slice at index " << slice.slice_index << std::endl; + + slice_it++; + if (slice_it == slice_infos.end()) { + slice_it = slice_infos.begin(); + n_repetitions++; + } + + slice.n_requested++; + slice_requested = false; } } } @@ -235,6 +421,7 @@ void run_stream( size_t buf; size_t first; size_t last; + size_t slice_repetition_index; if (items[0].revents & zmq::POLLIN) { command = zmqSvc->receive(control); if (command == "DONE") { @@ -248,6 +435,7 @@ void run_stream( first = zmqSvc->receive(control); last = zmqSvc->receive(control); buf = zmqSvc->receive(control); + slice_repetition_index = zmqSvc->receive(control); } } @@ -272,14 +460,15 @@ void run_stream( zmqSvc->send(control, *idx, send_flags::sndmore); zmqSvc->send(control, first, send_flags::sndmore); zmqSvc->send(control, last, send_flags::sndmore); - zmqSvc->send(control, buf); + zmqSvc->send(control, buf); // to do: add slice repetition index treatment } else if (status == Allen::error::success) { // signal that we're done zmqSvc->send(control, "PROCESSED", send_flags::sndmore); zmqSvc->send(control, *idx, send_flags::sndmore); zmqSvc->send(control, first, send_flags::sndmore); - zmqSvc->send(control, buf); + zmqSvc->send(control, buf, send_flags::sndmore); + zmqSvc->send(control, slice_repetition_index); } } } diff --git a/main/src/ProgramOptions.cpp b/main/src/ProgramOptions.cpp index c188e7901efd77e88a8e709dd59e4623ddcb3da1..474fac2b0daca72d01b676d38aa95c0a718beee7 100644 --- a/main/src/ProgramOptions.cpp +++ b/main/src/ProgramOptions.cpp @@ -58,14 +58,14 @@ std::vector allen_program_options() {{"cpu-offload"}, "offload part of the computation to CPU", "1"}, {{"output-file"}, "Write selected event to output file", ""}, {{"device"}, "select device to use", "0"}, - {{"non-stop"}, "Runs the program indefinitely", "0"}, {{"with-mpi"}, "Read events with MPI"}, {{"mpi-window-size"}, "Size of MPI sliding window", "4"}, {{"mpi-number-of-slices"}, "Number of MPI network slices", "6"}, {{"inject-mem-fail"}, "Whether to insert random memory failures (0: off 1-15: rate of 1 in 2^N)", "0"}, {{"monitoring-filename"}, "ROOT file to write monitoring histograms to", "monitoringHists.root"}, {{"monitoring-save-period"}, "Number of seconds between writes of the monitoring histograms (0: off)", "0"}, - {{"disable-run-changes"}, "Ignore signals to update non-event data with each run change", "1"}}; + {{"disable-run-changes"}, "Ignore signals to update non-event data with each run change", "1"}, + {{"mode"}, "Mode to run Allen: benchmark, throughput, datataking or MC-check", "benchmark"}}; } void print_call_options(const std::map& options, const std::string& device_name) diff --git a/main/src/Transpose.cpp b/main/src/Transpose.cpp index b44a17d13274ce4768bbc672c2c26bb91c5aa242..6a9a78b6a5d6b29f76cfbd8d7578d72ba1637a6e 100644 --- a/main/src/Transpose.cpp +++ b/main/src/Transpose.cpp @@ -31,17 +31,29 @@ std::tuple read_events( bool eof = false, error = false, full = false; gsl::span bank_span; + debug_cout << "in read_events: eof = " << eof << ", error = " << error << ", n_filled = " << n_filled + << ", n_events = " << n_events << ", event_offsets size = " << event_offsets.size() << std::endl; + // Loop until the requested number of events is prefetched, the // maximum number of events per prefetch buffer is hit, an error // occurs or eof is reached while (!eof && !error && n_filled < event_offsets.size() - 1 && n_filled < n_events) { // It is - + // debug_cout << "in read_events: eof = " << eof << ", n_filled = " << n_filled << ", n_events = " << n_events << + // std::endl; // Read the banks gsl::span buffer_span {buffer_start + event_offsets[n_filled], static_cast(buffer.size() - event_offsets[n_filled])}; std::tie(eof, error, bank_span) = MDF::read_banks(input, header, std::move(buffer_span), compress_buffer, check_checksum); + + if (bank_span.size() == 0) { + info_cout << "read_events: Cannot read more data (Banks). End-of-File reached. eof = " << eof << std::endl; + ; + eof = true; + break; + } + // Fill the start offset of the next event event_offsets[++n_filled] = bank_span.data() + bank_span.size() - buffer_start; n_bytes += bank_span.size(); @@ -61,7 +73,7 @@ std::tuple read_events( } } else if (n_bytes == 0) { - info_cout << "Cannot read more data (Header). End-of-File reached.\n"; + info_cout << "read_events: Cannot read more data (Header). End-of-File reached.\n"; eof = true; } else { diff --git a/mdf/src/read_mdf.cpp b/mdf/src/read_mdf.cpp index c531d4bea86d093f7262b9ddea005b8b33fa78bb..b5f11316906ad006349a2f4c69497ba128631dca 100644 --- a/mdf/src/read_mdf.cpp +++ b/mdf/src/read_mdf.cpp @@ -235,7 +235,7 @@ std::tuple> MDF::read_event( return read_banks(input, h, buffer, decompression_buffer, checkChecksum, dbg); } else if (n_bytes == 0) { - cout << "Cannot read more data (Header). End-of-File reached.\n"; + cout << "read_event: Cannot read more data (Header). End-of-File reached.\n"; return {true, false, {}}; } else { @@ -323,7 +323,7 @@ std::tuple> MDF::read_banks( // Read compressed data ssize_t n_bytes = input.read(decompression_buffer.data() + rawSize, readSize); if (n_bytes == 0) { - cout << "Cannot read more data (Header). End-of-File reached.\n"; + cout << "read_banks compressed: Cannot read more data (Header). End-of-File reached.\n"; return {true, false, {}}; } else if (n_bytes == -1) { @@ -358,7 +358,7 @@ std::tuple> MDF::read_banks( // Read uncompressed data from file ssize_t n_bytes = input.read(bptr + rawSize, readSize); if (n_bytes == 0) { - cout << "Cannot read more data (Header). End-of-File reached.\n"; + cout << "read_banks uncompressed: Cannot read more data (Header). End-of-File reached.\n"; return {true, false, {}}; } else if (n_bytes == -1) { @@ -370,9 +370,8 @@ std::tuple> MDF::read_banks( if (!test_checksum(bptr, chkSize)) { return {false, true, {}}; } - return {false, - false, - {buffer.data(), static_cast>(bnkSize + static_cast(readSize))}}; + return { + false, false, {buffer.data(), static_cast>(bnkSize + static_cast(readSize))}}; } } diff --git a/mdf/test/test_mdf_transpose.cpp b/mdf/test/test_mdf_transpose.cpp index a42c485c622a79aa152b0a0247bbe91509423bf4..969ef4416e3ebe5ccdec59df9d118e5ef03c4c2e 100644 --- a/mdf/test/test_mdf_transpose.cpp +++ b/mdf/test/test_mdf_transpose.cpp @@ -1,4 +1,4 @@ -/*****************************************************************************\ +/***************************************************************************** \ * (c) Copyright 2018-2020 CERN for the benefit of the LHCb Collaboration * \*****************************************************************************/ #include diff --git a/mdf/test/test_mep_banks.cpp b/mdf/test/test_mep_banks.cpp index 3dcda1a64ec22401e860417110606ebcdb3a87a8..3afeab8c3683795939b984ede387fd9cdb873a7e 100644 --- a/mdf/test/test_mep_banks.cpp +++ b/mdf/test/test_mep_banks.cpp @@ -91,7 +91,6 @@ int main(int argc, char* argv[]) 1u, // number of transpose threads 4u, // MPI sliding window size false, // Receive from MPI or read files - false, // Run the application non-stop false, // MEPs should be transposed to Allen layout false, // Whether to split slices by run number {}}; // Map of receiver to MPI rank to receive from diff --git a/readme.md b/readme.md index 8af8e50cc099b332bc35435e07030cabe6b71180..2d74db6e0892aa5447144c2507d0ec79380fa3c7 100644 --- a/readme.md +++ b/readme.md @@ -186,56 +186,69 @@ $> ./build.${CMTCONFIG}/run bindings/Allen.py How to run the standalone project ------------- -Some binary input files are included with the project for testing. A run of the program with the help option `-h` will let you know the basic options: Usage: ./Allen - -f, --folder {folder containing data directories}=../input/minbias/ - -g, --geometry {folder containing detector configuration}=../input/detector_configuration/down/ - --mdf {comma-separated list of MDF files to use as input} - --mep {comma-separated list of MEP files to use as input} - --transpose-mep {Transpose MEPs instead of decoding from MEP layout directly}=0 (don't transpose) - --configuration {path to json file containing values of configurable algorithm constants}=Sequence.json - --print-status {show status of buffer and socket}=0 - --print-config {show current algorithm configuration}=0 - --write-configuration {write current algorithm configuration to file}=0 - -n, --number-of-events {number of events to process}=0 (all) - -s, --number-of-slices {number of input slices to allocate}=0 (one more than the number of threads) - --events-per-slice {number of events per slice}=1000 - -t, --threads {number of threads / streams}=1 - -r, --repetitions {number of repetitions per thread / stream}=1 - -m, --memory {memory to reserve per thread / stream (megabytes)}=1024 - -v, --verbosity {verbosity [0-5]}=3 (info) - -p, --print-memory {print memory usage}=0 - -i, --import-tracks {import forward tracks dumped from Moore} - --cpu-offload {offload part of the computation to CPU}=1 - --output-file {Write selected event to output file} - --device {select device to use}=0 - --non-stop {Runs the program indefinitely}=0 - --with-mpi {Read events with MPI} - --mpi-window-size {Size of MPI sliding window}=4 - --mpi-number-of-slices {Number of MPI network slices}=6 - -h {show this help} - + -g, --geometry {folder containing detector configuration}=../input/detector_configuration/down/ + --params {folder containing parameters that do not change with the geometry}=../input/parameters/ + --mdf {comma-separated list of MDF files to use as input} + --mep {comma-separated list of MEP files to use as input} + --transpose-mep {Transpose MEPs instead of decoding from MEP layout directly}=0 (don't transpose) + --configuration {path to json file containing values of configurable algorithm constants}=Sequence.json + --print-status {show status of buffer and socket}=0 + --print-config {show current algorithm configuration}=0 + --write-configuration {write current algorithm configuration to file}=0 + -n, --number-of-events {number of events to process}=0 (all) + -s, --number-of-slices {number of input slices to allocate}=0 (one more than the number of threads) + --events-per-slice {number of events per slice}=1000 + -t, --threads {number of threads / streams}=1 + -r, --repetitions {number of repetitions per thread / stream}=1 + -m, --memory {memory to reserve on the device per thread / stream (megabytes)}=1000 + --host-memory {memory to reserve on the host per thread / stream (megabytes)}=200 + -v, --verbosity {verbosity [0-5]}=3 (info) + -p, --print-memory {print memory usage}=0 + --cpu-offload {offload part of the computation to CPU}=1 + --output-file {Write selected event to output file} + --device {select device to use}=0 + --with-mpi {Read events with MPI} + --mpi-window-size {Size of MPI sliding window}=4 + --mpi-number-of-slices {Number of MPI network slices}=6 + --inject-mem-fail {Whether to insert random memory failures (0: off 1-15: rate of 1 in 2^N)}=0 + --monitoring-filename {ROOT file to write monitoring histograms to}=monitoringHists.root + --monitoring-save-period {Number of seconds between writes of the monitoring histograms (0: off)}=0 + --disable-run-changes {Ignore signals to update non-event data with each run change}=1 + --mode {Mode to run Allen: benchmark, throughput, datataking or MC-check}=benchmark + -h {show this help} + +The different modes mean the following: +- `benchmark`: Each thread/stream runs a total of `events-per-slice * repetitions` events. All streams run on the same events. +- `throughput`: In total `events-per-slice * number-of-slices * repetitions` events are processed, independently of the number of thread-stream pairs. (Note that if `number-of-slices` is less than `threads`, `number-of-slices` is set to `threads + 1` by default.) This mode is intended for representative throughput measurements over a large number of events. +- `datataking`: The program is run until all input is processed, or indefinitely until a STOP signal is received, when controlled by the ECS. +- `MC-check`: A configurable number of events is processed, w/o repetitions. + +One MDF input file containing 10 minimum bias events is included with the project for testing and used by default as input, if no other input file is specified. Here are some example run options: - # Run all input files shipped with Allen once + # Run Allen once with the input shipped in the repository ./Allen - # Specify input files, run once over all of them - ./Allen -f ../input/minbias/ + # Specify MDF files, run once over all of them + ./Allen --mdf my_mdf_file_0.mdf,my_mdf_file_1.mdf + + # Specify a MEP file, run once over it + ./Allen --mep my_mep_file.mep - # Run a total of 1000 events once without tracking validation. If less than 1000 events are + # Run a total of 1000 events once. If less than 1000 events are # provided, the existing ones will be reused in round-robin. ./Allen -n 1000 - # Run four streams, each with 4000 events and 20 repetitions - ./Allen -t 4 -n 4000 -r 20 + # Run four streams, each with 4000 events and 20 repetitions -> process 4 * 4000 * 20 events in total + ./Allen --mode benchmark -t 4 -n 4000 -r 20 # Run one stream with 5000 events and print all memory allocations ./Allen -n 5000 -p 1 - # Default throughput test configuration + # Default throughput test configuration in benchmark mode ./Allen -t 16 -n 500 -m 500 -r 1000 Where to develop for GPUs diff --git a/scripts/ci/config/common-build.yaml b/scripts/ci/config/common-build.yaml index 0450d50a8ab20e15d0ab621f35830f0c02d7bdbd..69ff067f1bebc824569bfa1d0996cbe22699959d 100644 --- a/scripts/ci/config/common-build.yaml +++ b/scripts/ci/config/common-build.yaml @@ -12,6 +12,7 @@ SEQUENCE: - hlt1_pp_default - hlt1_pp_validation + - hlt1_pp_scifi_v6 # Builds that are considered "additional", can be run in addition to "minimal" tests in certain scenarios .build_job_additional_matrix: diff --git a/scripts/ci/config/common-run.yaml b/scripts/ci/config/common-run.yaml index ef6d2cba64b4af7e0ab3db6c0e570a4bcbaae471..49314f0da73284ec70e38c7301d36ce5659a8217 100644 --- a/scripts/ci/config/common-run.yaml +++ b/scripts/ci/config/common-run.yaml @@ -6,46 +6,91 @@ .run_matrix_jobs_minimal: parallel: matrix: - # run_throughput tests + # run_throughput tests in benchmark mode, i.e. same 500 events in all streams - TEST_NAME: "run_throughput" SEQUENCE: ["hlt1_pp_default"] - DATA_TAG: ["upgrade_mc_minbias_scifi_v5_000"] - #DATA_TAG: ["MiniBrunel_2018_MinBias_FTv4_DIGI_1k"] - + DATA_TAG: ["upgrade_mc_minbias_scifi_v5"] + RUN_THROUGHPUT_OPTIONS_CUDA: "-n 500 -m 500 -r 1000 -t 16" + MODE: "benchmark" + + # run throughput test on sim10-up08 sample in benchmark mode + - TEST_NAME: "run_throughput" + SEQUENCE: ["hlt1_pp_scifi_v6"] + DATA_TAG: ["upgrade-magdown-sim10-up08-30000000-digi"] + GEOMETRY: ["sim10_up08_md"] + MODE: "benchmark" + # efficiency tests - TEST_NAME: "run_physics_efficiency" SEQUENCE: ["hlt1_pp_validation"] - DATA_TAG: ["Upgrade_BsPhiPhi_MD_FTv4_DIGI_1k"] - - + DATA_TAG: ["Upgrade_BsPhiPhi_MD_FTv4_DIGI"] + + # run throughput in stable throughput mode, i.e. on many events + # - TEST_NAME: "run_throughput" + # SEQUENCE: ["hlt1_pp_default"] + # DATA_TAG: ["upgrade_mc_minbias_scifi_v5"] + # MODE: "throughput" + + # # run throughput test on sim10-up08 sample in throughput mode + # - TEST_NAME: "run_throughput" + # SEQUENCE: ["hlt1_pp_scifi_v6"] + # DATA_TAG: ["upgrade-magdown-sim10-up08-30000000-digi"] + # GEOMETRY: ["sim10_up08_md"] + # MODE: "throughput" + + .run_matrix_jobs_full: parallel: matrix: - # run throughput, no_gec + # run throughput, no_gec - TEST_NAME: "run_throughput" BUILD_TYPE: ["RelWithDebInfo"] SEQUENCE: ["hlt1_pp_no_gec"] - DATA_TAG: ["upgrade_mc_minbias_scifi_v5_000"] - #DATA_TAG: ["MiniBrunel_2018_MinBias_FTv4_DIGI_1k"] + DATA_TAG: ["upgrade_mc_minbias_scifi_v5"] + MODE: "benchmark" + + # # run throughput, no_gec, many events + # - TEST_NAME: "run_throughput" + # BUILD_TYPE: ["RelWithDebInfo"] + # SEQUENCE: ["hlt1_pp_no_gec"] + # DATA_TAG: ["upgrade_mc_minbias_scifi_v5"] + # MODE: "throughput" # run throughput, default, SMOG2_pppHe - TEST_NAME: "run_throughput" BUILD_TYPE: ["RelWithDebInfo"] SEQUENCE: ["hlt1_pp_default"] - DATA_TAG: ["SMOG2_pppHe_1k"] + DATA_TAG: ["SMOG2_pppHe"] + MODE: "benchmark" + + # # run throughput, default, SMOG2_pppHe, many events + # - TEST_NAME: "run_throughput" + # BUILD_TYPE: ["RelWithDebInfo"] + # SEQUENCE: ["hlt1_pp_default"] + # DATA_TAG: ["SMOG2_pppHe"] + # MODE: "throughput" # run throughput, scifi_v6 - TEST_NAME: "run_throughput" BUILD_TYPE: ["RelWithDebInfo"] SEQUENCE: ["hlt1_pp_scifi_v6"] - DATA_TAG: ["SciFiv6_upgrade_DC19_01_MinBiasMD_1k"] + DATA_TAG: ["upgrade_DC19_01_MinBiasMD"] GEOMETRY: ["scifi_v6"] + MODE: "benchmark" + + # # run throughput, scifi_v6, many events + # - TEST_NAME: "run_throughput" + # BUILD_TYPE: ["RelWithDebInfo"] + # SEQUENCE: ["hlt1_pp_scifi_v6"] + # DATA_TAG: ["upgrade_DC19_01_MinBiasMD"] + # GEOMETRY: ["scifi_v6"] + # MODE: "throughput" # run physics efficiency, scifi_v6 - TEST_NAME: "run_physics_efficiency" BUILD_TYPE: ["RelWithDebInfo"] SEQUENCE: ["hlt1_pp_scifi_v6_validation"] - DATA_TAG: ["SciFiv6_upgrade_DC19_01_Bs2PhiPhiMD_1k"] + DATA_TAG: ["upgrade_DC19_01_Bs2PhiPhiMD"] GEOMETRY: ["scifi_v6"] # FIXME @@ -53,7 +98,7 @@ # BUILD_TYPE: ["Debug"] # OPTIONS: ["USE_ROOT"] # SEQUENCE: ["hlt1_pp_validation"] - # DATA_TAG: ["Upgrade_BsPhiPhi_MD_FTv4_DIGI_1k"] + # DATA_TAG: ["Upgrade_BsPhiPhi_MD_FTv4_DIGI"] - TEST_NAME: "run_built_tests" BUILD_TYPE: [RelWithDebInfo, Debug] @@ -64,13 +109,7 @@ - TEST_NAME: "run_physics_efficiency" BUILD_TYPE: ["RelWithDebInfo"] SEQUENCE: ["hlt1_complex_validation"] - DATA_TAG: ["Upgrade_BsPhiPhi_MD_FTv4_DIGI_1k"] - - - TEST_NAME: "run_throughput" - SEQUENCE: ["hlt1_pp_scifi_v6"] - DATA_TAG: ["upgrade-magdown-sim10-up08-30000000-digi_01"] - GEOMETRY: ["sim10_up08_md"] - + DATA_TAG: ["Upgrade_BsPhiPhi_MD_FTv4_DIGI"] .run_jobs: extends: diff --git a/scripts/ci/config/devices.yaml b/scripts/ci/config/devices.yaml index eb1d567d7f9bce0b48b483c5626d577e5d1827a8..0fab82313ff421692a48a7f214acdfccea737db8 100644 --- a/scripts/ci/config/devices.yaml +++ b/scripts/ci/config/devices.yaml @@ -3,26 +3,26 @@ ############################################################################### -.geforcertx3090: - variables: - DEVICE_ID: "geforcertx3090" - TARGET: "CUDA" - tags: [geforcertx3090] +# .geforcertx3090: +# variables: +# DEVICE_ID: "geforcertx3090" +# TARGET: "CUDA" +# tags: [geforcertx3090] -.geforcertx3080: - variables: - DEVICE_ID: "geforcertx3080" - TARGET: "CUDA" - tags: [geforcertx3080] +# .geforcertx3080: +# variables: +# DEVICE_ID: "geforcertx3080" +# TARGET: "CUDA" +# tags: [geforcertx3080] -.geforcertx2080ti: - variables: - DEVICE_ID: "geforcertx2080ti" - TARGET: "CUDA" - tags: [geforcertx2080ti] - +# .geforcertx2080ti: +# variables: +# DEVICE_ID: "geforcertx2080ti" +# TARGET: "CUDA" +# tags: [geforcertx2080ti] + .quadrortx6000: variables: @@ -31,11 +31,11 @@ tags: [quadrortx6000] -.teslav100: - variables: - DEVICE_ID: "teslav100" - TARGET: "CUDA" - tags: [teslav100] +# .teslav100: +# variables: +# DEVICE_ID: "teslav100" +# TARGET: "CUDA" +# tags: [teslav100] .a40: @@ -66,15 +66,15 @@ tags: [mi100] -.x862630v4: - variables: - DEVICE_ID: "x862630v4" - TARGET: "CPU" - tags: [x862630v4] +# .x862630v4: +# variables: +# DEVICE_ID: "x862630v4" +# TARGET: "CPU" +# tags: [x862630v4] -.epyc7502: - variables: - DEVICE_ID: "epyc7502" - TARGET: "CPU" - tags: [epyc7502] +# .epyc7502: +# variables: +# DEVICE_ID: "epyc7502" +# TARGET: "CPU" +# tags: [epyc7502] diff --git a/scripts/ci/config/main.yaml b/scripts/ci/config/main.yaml index 1fa0cc59c4f8f36281d73eaf70d9076775af2f45..fa1245573a03ff48c463763399c97d704d5fd281 100644 --- a/scripts/ci/config/main.yaml +++ b/scripts/ci/config/main.yaml @@ -26,35 +26,35 @@ build: # included here # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # -geforcertx3090: - extends: - - .geforcertx3090 - - .run_job - -geforcertx3080: - extends: - - .geforcertx3080 - - .run_job - -geforcertx2080ti: - extends: - - .geforcertx2080ti - - .run_job -geforcertx2080ti-run-changes: - extends: - - .geforcertx2080ti - - .run_toggle_run_changes - stage: run +# geforcertx3090: +# extends: +# - .geforcertx3090 +# - .run_job + +# geforcertx3080: +# extends: +# - .geforcertx3080 +# - .run_job + +# geforcertx2080ti: +# extends: +# - .geforcertx2080ti +# - .run_job +# geforcertx2080ti-run-changes: +# extends: +# - .geforcertx2080ti +# - .run_toggle_run_changes +# stage: run quadrortx6000: extends: - .quadrortx6000 - .run_job -teslav100: - extends: - - .teslav100 - - .run_job +# teslav100: +# extends: +# - .teslav100 +# - .run_job a40: extends: @@ -76,34 +76,34 @@ mi100: - .mi100 - .run_job -x862630v4: - extends: - - .x862630v4 - - .run_job -x862630v4-run-changes: - extends: - - .x862630v4 - - .run_toggle_run_changes - stage: run - -epyc7502: - extends: - - .epyc7502 - - .run_job +# x862630v4: +# extends: +# - .x862630v4 +# - .run_job +# x862630v4-run-changes: +# extends: +# - .x862630v4 +# - .run_toggle_run_changes +# stage: run + +# epyc7502: +# extends: +# - .epyc7502 +# - .run_job .device-jobs: dependencies: - - epyc7502 - - x862630v4 + #- epyc7502 + #- x862630v4 - mi100 - a6000 - a40 - a10 - - teslav100 + #- teslav100 - quadrortx6000 - - geforcertx2080ti - - geforcertx3080 - - geforcertx3090 + #- geforcertx2080ti + #- geforcertx3080 + #- geforcertx3090 # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # test and publish stages for the minimal pipeline # @@ -122,8 +122,8 @@ run-changes: - .test_run_changes - .default_rules_always_run dependencies: - - x862630v4-run-changes - - geforcertx2080ti-run-changes + #- x862630v4-run-changes + #- geforcertx2080ti-run-changes # FIXME! Remove as soon as indeterminate behaviour can be fixed. allow_failure: exit_codes: 3 @@ -214,53 +214,53 @@ HIP: # CPU -x862630v4-full: - extends: - - .x862630v4 - - .run_jobs_full - - .cpu_run_job -x862630v4-full-run-changes: - extends: - - .x862630v4 - - .run_toggle_run_changes - - .default_rules_always_run - - .cpu_run_job - stage: "run full" - -epyc7502-full: - extends: - - .epyc7502 - - .run_jobs_full - - .cpu_run_job +# x862630v4-full: +# extends: +# - .x862630v4 +# - .run_jobs_full +# - .cpu_run_job +# x862630v4-full-run-changes: +# extends: +# - .x862630v4 +# - .run_toggle_run_changes +# - .default_rules_always_run +# - .cpu_run_job +# stage: "run full" + +# epyc7502-full: +# extends: +# - .epyc7502 +# - .run_jobs_full +# - .cpu_run_job # GPU / CUDA -geforcertx3090-full: - extends: - - .geforcertx3090 - - .run_jobs_full - - .cuda_run_job - - -geforcertx3080-full: - extends: - - .geforcertx3080 - - .run_jobs_full - - .cuda_run_job - - -geforcertx2080ti-full-run-changes: - extends: - - .geforcertx2080ti - - .run_toggle_run_changes - - .cuda_run_job - - .default_rules_always_run - stage: "run full" -geforcertx2080ti-full: - extends: - - .geforcertx2080ti - - .run_jobs_full - - .cuda_run_job +# geforcertx3090-full: +# extends: +# - .geforcertx3090 +# - .run_jobs_full +# - .cuda_run_job + + +# geforcertx3080-full: +# extends: +# - .geforcertx3080 +# - .run_jobs_full +# - .cuda_run_job + + +# geforcertx2080ti-full-run-changes: +# extends: +# - .geforcertx2080ti +# - .run_toggle_run_changes +# - .cuda_run_job +# - .default_rules_always_run +# stage: "run full" +# geforcertx2080ti-full: +# extends: +# - .geforcertx2080ti +# - .run_jobs_full +# - .cuda_run_job quadrortx6000-full: @@ -270,11 +270,11 @@ quadrortx6000-full: - .cuda_run_job -teslav100-full: - extends: - - .teslav100 - - .run_jobs_full - - .cuda_run_job +# teslav100-full: +# extends: +# - .teslav100 +# - .run_jobs_full +# - .cuda_run_job a40-full: @@ -317,21 +317,21 @@ mi100-full: # is 50. See https://docs.gitlab.com/ee/ci/yaml/#changing-the-needs-job-limit dependencies: # CPU - - x862630v4-full - - epyc7502-full + #- x862630v4-full + #- epyc7502-full # HIP - mi100-full # CUDA - - geforcertx3090-full - - geforcertx3080-full - - geforcertx2080ti-full + #- geforcertx3090-full + #- geforcertx3080-full + #- geforcertx2080ti-full - quadrortx6000-full - a40-full - a10-full - a6000-full - - teslav100-full + #- teslav100-full # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # @@ -349,18 +349,18 @@ physics-efficiency-full: exit_codes: 3 -run-changes-full: - stage: test full - extends: - - .test_run_changes - - .default_rules_always_run - dependencies: - # add run changes jobs here - - x862630v4-full-run-changes - - geforcertx2080ti-full-run-changes - # FIXME! Remove once the intederminate behaviour is fixed. - allow_failure: - exit_codes: 3 +# run-changes-full: +# stage: test full +# extends: +# - .test_run_changes +# - .default_rules_always_run +# dependencies: +# # add run changes jobs here +# - x862630v4-full-run-changes +# - geforcertx2080ti-full-run-changes +# # FIXME! Remove once the intederminate behaviour is fixed. +# allow_failure: +# exit_codes: 3 throughput-full: diff --git a/scripts/ci/jobs/publish_throughput.sh b/scripts/ci/jobs/publish_throughput.sh index 13e4b1355ec406d332a0939a96deb40b0d3721bf..3e73ed0738c103673b02f88a2944893ca3f89f8d 100755 --- a/scripts/ci/jobs/publish_throughput.sh +++ b/scripts/ci/jobs/publish_throughput.sh @@ -17,6 +17,7 @@ ls -1 | grep output | grep run_throughput for SEQUENCE_DATASET in $(ls -1 | grep "run_throughput" | grep -Ei "run_throughput_output_([a-z0-9_]+?)" | sed 's/^run_throughput_output_//') ; do INPUT_FILES=$(cat run_throughput_output_${SEQUENCE_DATASET}/${BREAKDOWN_DEVICE_ID}/input_files.txt) SEQUENCE=$(cat run_throughput_output_${SEQUENCE_DATASET}/${BREAKDOWN_DEVICE_ID}/sequence.txt) + MODE=$(cat run_throughput_output_${SEQUENCE_DATASET}/${BREAKDOWN_DEVICE_ID}/mode.txt) cat run_throughput_output_${SEQUENCE_DATASET}/*/output.txt | grep --color=none "select device" | sed 's/.*:\ [0-9]*\,\ //' > devices_${SEQUENCE_DATASET}.txt cat run_throughput_output_${SEQUENCE_DATASET}/*/output.txt | grep --color=none "events/s" | awk '{ print $1; }' > throughputs_${SEQUENCE_DATASET}.txt @@ -27,7 +28,7 @@ for SEQUENCE_DATASET in $(ls -1 | grep "run_throughput" | grep -Ei "run_throughp python checker/plotting/post_combined_message.py \ -j "${CI_JOB_NAME}" \ - -l "Throughput of [branch **\`${CI_COMMIT_REF_NAME} (${CI_COMMIT_SHORT_SHA})\`**, sequence **\`${SEQUENCE}\`** over dataset **\`${INPUT_FILES}\`**](https://gitlab.cern.ch/lhcb/Allen/pipelines/${CI_PIPELINE_ID})" \ + -l "Throughput of [branch **\`${CI_COMMIT_REF_NAME} (${CI_COMMIT_SHORT_SHA})\`**, sequence **\`${SEQUENCE}\`**, mode **\`${MODE}\`** over dataset **\`${INPUT_FILES}\`**](https://gitlab.cern.ch/lhcb/Allen/pipelines/${CI_PIPELINE_ID})" \ -t devices_throughputs_${SEQUENCE_DATASET}.csv \ -b run_throughput_output_${SEQUENCE_DATASET}/${BREAKDOWN_DEVICE_ID}/algo_breakdown.csv diff --git a/scripts/ci/jobs/run_physics_efficiency.sh b/scripts/ci/jobs/run_physics_efficiency.sh index 3b8405092338da89d8dcfbeaf15eb7c528966db3..4cf87e07f9256b9c24e4978e299d06229f214eb9 100755 --- a/scripts/ci/jobs/run_physics_efficiency.sh +++ b/scripts/ci/jobs/run_physics_efficiency.sh @@ -10,7 +10,7 @@ fi check_build_exists -RUN_OPTIONS="-n 1000 -m 1000" +RUN_OPTIONS="-n 1000 -m 1000 --mode MC-check" # Configure the input files (--mdf) and geometry (-g) diff --git a/scripts/ci/jobs/run_throughput.sh b/scripts/ci/jobs/run_throughput.sh index df7027df5f4164801c143a4d99fc14e442a8a13f..af924cfd7d657bb5c5a9f3dc6c3e612a58272092 100755 --- a/scripts/ci/jobs/run_throughput.sh +++ b/scripts/ci/jobs/run_throughput.sh @@ -16,10 +16,30 @@ if [ ! -z ${GEOMETRY+x} ]; then RUN_OPTIONS="$RUN_OPTIONS -g ../input/detector_configuration/${GEOMETRY}" fi -RUN_OPTIONS="--mdf ${ALLEN_DATA}/mdf_input/${DATA_TAG}.mdf ${RUN_OPTIONS}" +# if the running mode is set, pass it to Allen +if [ ! -z ${MODE+x} ]; then + RUN_OPTIONS="$RUN_OPTIONS --mode ${MODE}" + + if [ ${MODE} == "throughput" ]; then + RUN_THROUGHPUT_OPTIONS_CUDA="--events-per-slice 500 -m 500 -t 16 -s 20 -r 590" + RUN_THROUGHPUT_OPTIONS_CPU="--events-per-slice 100 -m 100 -r 100" + RUN_THROUGHPUT_OPTIONS_HIP="--events-per-slice 5000 -m 3000 -t 10 -s 11 -r 590" + fi +fi + +MDF_FILES="" +for entry in "${ALLEN_DATA}/mdf_input/${DATA_TAG}/mdf"/* +do + echo "$entry" + MDF_FILES="${MDF_FILES}${entry}," +done + +MDF_FILES_STRIPPED=`echo $MDF_FILES | sed 's/.$//'` + +RUN_OPTIONS="--mdf ${MDF_FILES_STRIPPED} ${RUN_OPTIONS}" set -euxo pipefail -OUTPUT_FOLDER_REL="${TEST_NAME}_output_${SEQUENCE}_${DATA_TAG}/${DEVICE_ID}" +OUTPUT_FOLDER_REL="${TEST_NAME}_output_${SEQUENCE}_${MODE}_${DATA_TAG}/${DEVICE_ID}" mkdir -p ${OUTPUT_FOLDER_REL} OUTPUT_FOLDER=$(realpath ${OUTPUT_FOLDER_REL}) @@ -122,6 +142,7 @@ echo "${DATA_TAG}" > "${OUTPUT_FOLDER}/input_files.txt" echo "${SEQUENCE}" > "${OUTPUT_FOLDER}/sequence.txt" echo "${THROUGHPUT}" > "${OUTPUT_FOLDER}/throughput.txt" echo "${CI_COMMIT_SHORT_SHA}" > "${OUTPUT_FOLDER}/revision.txt" +echo "${MODE}" > "${OUTPUT_FOLDER}/mode.txt" # write metric to display on MR -echo "throughput_kHz{device=\"${DEVICE_ID}\",sequence=\"${SEQUENCE}\",dataset=\"${DATA_TAG}\"} ${THROUGHPUT_KHZ}" >> "${OUTPUT_FOLDER}/metrics.txt" +echo "throughput_kHz{device=\"${DEVICE_ID}\",sequence=\"${SEQUENCE}\",dataset=\"${DATA_TAG}\",mode=\"${MODE}\"} ${THROUGHPUT_KHZ}" >> "${OUTPUT_FOLDER}/metrics.txt" diff --git a/stream/sequence/src/HostBuffers.cpp b/stream/sequence/src/HostBuffers.cpp index 9bdc15692b6b8469f0140c8a72f0e459c5d8452a..f95b660aaaaafa85bb8625897ba9139ccd4072ab 100644 --- a/stream/sequence/src/HostBuffers.cpp +++ b/stream/sequence/src/HostBuffers.cpp @@ -106,11 +106,16 @@ void HostBuffers::reserve(const unsigned max_number_of_events, const size_t n_li Allen::malloc_host((void**) &host_sv_offsets, sv_offsets_size); ::memset(host_sv_offsets, 0, sv_offsets_size); + host_atomics_velo = + reinterpret_cast(malloc((2 * max_number_of_events + 1) * sizeof(int))); + host_atomics_ut = + reinterpret_cast(malloc(UT::num_atomics * (max_number_of_events + 1) * sizeof(int))); + + if (do_check) { // Datatypes to be reserved only if checking is on // Note: These datatypes in principle do not require to be pinned - host_atomics_velo = - reinterpret_cast(malloc((2 * max_number_of_events + 1) * sizeof(int))); + host_velo_track_hit_number = reinterpret_cast( malloc(max_number_of_events * host_buffers_max_velo_tracks * sizeof(unsigned))); host_velo_track_hits = reinterpret_cast(malloc( @@ -120,8 +125,6 @@ void HostBuffers::reserve(const unsigned max_number_of_events, const size_t n_li host_velo_kalman_endvelo_states = reinterpret_cast( malloc(max_number_of_events * host_buffers_max_velo_tracks * Velo::Consolidated::States::size)); - host_atomics_ut = - reinterpret_cast(malloc(UT::num_atomics * (max_number_of_events + 1) * sizeof(int))); host_ut_tracks = reinterpret_cast( malloc(max_number_of_events * UT::Constants::max_num_tracks * sizeof(UT::TrackHits))); host_ut_track_hit_number = reinterpret_cast( diff --git a/stream/sequence/src/Stream.cpp b/stream/sequence/src/Stream.cpp index 3ad5ef630775eda8c9e4089b856776293736dcd3..45fa6d5ec14c27a3fb3bfd4e61b4e289c455d5ca 100644 --- a/stream/sequence/src/Stream.cpp +++ b/stream/sequence/src/Stream.cpp @@ -147,6 +147,9 @@ Allen::error Stream::run_sequence(const unsigned buf_idx, const RuntimeOptions& warning_cout << "Insufficient memory to process slice - will sub-divide and retry." << std::endl; return Allen::error::errorMemoryAllocation; } + info_cout << "Total number of reconstructed Velo tracks = " << host_buffers->host_atomics_velo[host_buffers->host_number_of_events] << std::endl; + info_cout << "Total number of reconstructed veloUT tracks = " << host_buffers->host_atomics_ut[host_buffers->host_number_of_events] << std::endl; + info_cout << "Total number of reconstructed forward tracks = " << host_buffers->host_atomics_scifi[host_buffers->host_number_of_events] << std::endl; } }