Commit 4c496e47 authored by Daniel Charles Craik's avatar Daniel Charles Craik Committed by Dorothea Vom Bruch
Browse files

Added manager for HostBuffers; Buffers assigned and passed to fill but not yet...

Added manager for HostBuffers; Buffers assigned and passed to fill but not yet used by Streams and no monitoring
parent a7fe8833
......@@ -462,6 +462,7 @@ include_directories(stream/sequence/include)
include_directories(cuda/UT/UTDecoding/include)
include_directories(cuda/kalman/ParKalman/include)
include_directories(mdf/include)
include_directories(integration/monitoring/include)
include_directories(integration/non_event_data/include)
include_directories(${ZMQ_INCLUDE_DIRS})
......@@ -508,6 +509,7 @@ target_link_libraries(AllenLib PUBLIC
PVChecking
CheckClustering
SelChecking
AllenMonitoring
NonEventData
AllenZMQ)
......
......@@ -487,3 +487,53 @@ SharedProperty<float> m_shared{this, "example_common", "param"};
These must also be plumbed in to `Configuration::getSharedPropertySet` in `stream/gear/src/Configuration.cu`
to allow the property set to be found by algorithms.
Adding monitoring histograms
============================
Overview
--------
Monitoring in Allen is performed by dedicated monitoring threads (by default there is a single thread).
After a slice of data is processed, the `HostBuffers` corresponding to that slice are sent to the monitoring
thread concurrent with being sent to the I/O thread for output. The flow of `HostBuffers` is shown below:
```mermaid
graph LR
A((HostBuffer<br>Manager))-->B[GPU thread]
B-->C[I/O thread]
B-->|if free|D[Monitoring thread]
C-->A
D-->A
```
To avoid excessive load on the CPU, monitoring threads will not queue `HostBuffers`, i.e, if the
monitoring thread is already busy then new `HostBuffers` will be immediately marked as monitored.
Functionality exists within `MonitorManager` to reactively reduce the amount of monitoring performed
(n.b. this corresponds to an **increase** in the `monitoring_level`) in response to a large number of skipped
slices. This is not currently used but would allow monitoring to favour running *some* types of monitors
for *all* slices over running *all* types of monitors for *some* slices. Additionally, less important monitors
could be run on a random sub-sample of slices. The `MetaMonitor` provides monitoring histograms that track
the numbers of successfully monitored and skipped slices as well as the monitoring level.
Monitor classes
---------------
Additional monitors that produce histograms based on information in the `HostBuffers` should be added to
`integration/monitoring` and inherit from the `BufferMonitor` class. The `RateMonitor` class provides an
example of this. Furthermore, each histogram that is added must be given a unique key in MonitorBase::MonHistType.
Once a new monitoring class has been written, this may be added to the monitoring thread(s) by including an instance
of the class in the vectors created in `MonitorManager::init`, e.g.
```
m_monitors.back().push_back(new RateMonitor(buffers_manager, time_step, offset));
```
Saving histograms
-----------------
All histograms may be saved by calling `MonitorManager::saveHistograms`. This is currently performed once after
Allen has finished executing. In principle, this could be performed on a regular basis within the main loop but
ideally would require monitoring threads to be paused for thread safety.
Histograms are currently written to `monitoringHists.root`.
add_subdirectory(monitoring)
add_subdirectory(non_event_data)
include_directories(include)
include_directories(${CMAKE_CUDA_TOOLKIT_INCLUDE_DIRECTORIES})
include_directories(${CMAKE_SOURCE_DIR}/main/include)
include_directories(${CMAKE_SOURCE_DIR}/checker/tracking/include)
include_directories(${CMAKE_SOURCE_DIR}/cuda/event_model/common/include)
include_directories(${CMAKE_SOURCE_DIR}/cuda/event_model/SciFi/include)
include_directories(${CMAKE_SOURCE_DIR}/cuda/event_model/UT/include)
include_directories(${CMAKE_SOURCE_DIR}/cuda/event_model/velo/include)
include_directories(${CMAKE_SOURCE_DIR}/cuda/kalman/ParKalman/include)
include_directories(${CMAKE_SOURCE_DIR}/cuda/muon/common/include)
include_directories(${CMAKE_SOURCE_DIR}/cuda/PV/beamlinePV/include)
include_directories(${CMAKE_SOURCE_DIR}/cuda/PV/common/include)
include_directories(${CMAKE_SOURCE_DIR}/cuda/SciFi/common/include)
include_directories(${CMAKE_SOURCE_DIR}/cuda/UT/common/include)
include_directories(${CMAKE_SOURCE_DIR}/cuda/velo/common/include)
include_directories(${CMAKE_SOURCE_DIR}/cuda/vertex_fit/common/include)
include_directories(${CMAKE_SOURCE_DIR}/stream/sequence/include)
include_directories(${ROOT_INCLUDE_DIRS})
file(GLOB monitoring_sources "src/*cu")
file(GLOB monitoring_sources_cpp "src/*cpp")
allen_add_library(AllenMonitoring STATIC
${monitoring_sources}
${monitoring_sources_cpp}
)
if(ROOT_FOUND)
target_compile_definitions(AllenMonitoring PRIVATE ${ALLEN_ROOT_DEFINITIONS})
target_include_directories(AllenMonitoring BEFORE PRIVATE
${ROOT_INCLUDE_DIRS}
)
endif()
#pragma once
#include "MonitorBase.h"
struct BufferMonitor : public MonitorBase {
BufferMonitor(std::string name, int timeStep, int offset) : MonitorBase(name, timeStep, offset) {}
virtual ~BufferMonitor() = default;
virtual void fill(uint i_buf, bool useWallTime = true) = 0;
};
#pragma once
#include "MonitorBase.h"
struct MetaMonitor : public MonitorBase {
MetaMonitor(int timeStep = 30, int offset = 0) : MonitorBase("monitoring", timeStep, offset) { init(); };
virtual ~MetaMonitor() = default;
void fill(bool successful, uint monitoringLevel);
private:
void init();
};
#pragma once
#include <deque>
#include <map>
#include <string>
#include "ROOTHeaders.h"
struct MonitorBase {
enum class MonHistType {
MonitoringSuccess,
MonitoringSkipped,
MonitoringLevel0,
MonitoringLevel1,
MonitoringLevel2,
MonitoringLevel3,
MonitoringLevel4,
MonitoringLevel5P,
OneTrackRate,
TwoTrackRate,
SingleMuonRate,
DispDimuonRate,
HighMassDimuonRate,
InclusiveRate
};
MonitorBase(std::string name, int timeStep, int offset) : m_name(name), m_time_step(timeStep), m_offset(offset) {};
virtual ~MonitorBase() = default;
virtual void saveHistograms(std::string file_name, bool append) const;
protected:
uint getWallTimeBin();
std::string m_name;
#ifdef WITH_ROOT
std::map<MonHistType, TH1*> m_histograms;
#endif
uint m_time_step;
uint m_offset;
};
#pragma once
#include "BufferMonitor.h"
#include "MetaMonitor.h"
#include <optional>
#include <queue>
#include <vector>
struct HostBuffersManager;
struct MonitorManager {
MonitorManager(uint n_mon_thread, HostBuffersManager* buffers_manager, int time_step = 30, int offset = 0) :
meta_mon(time_step, offset)
{
init(n_mon_thread, buffers_manager, time_step, offset);
}
void fill(uint i_mon, uint i_buf, bool useWallTime = true);
void saveHistograms(std::string file_name);
std::optional<size_t> getFreeMonitor();
void freeMonitor(size_t i_mon);
private:
void init(uint n_mon_thread, HostBuffersManager* buffers_manager, int time_step, int offset);
std::vector<std::vector<BufferMonitor*>> m_monitors;
std::queue<size_t> free_monitors;
MetaMonitor meta_mon;
uint count_processed {0}, count_skipped {0};
uint monitoring_level {0};
const uint max_monitoring_level {0};
};
#pragma once
#include "BufferMonitor.h"
struct HostBuffersManager;
struct RateMonitor : public BufferMonitor {
RateMonitor(HostBuffersManager* buffers_manager, int timeStep = 30, int offset = 0) :
BufferMonitor("hltRates", timeStep, offset), m_buffers_manager(buffers_manager)
{
init();
};
virtual ~RateMonitor() = default;
void fill(uint i_buf, bool useWallTime = true) override;
private:
void init();
HostBuffersManager* m_buffers_manager;
};
#include "MetaMonitor.h"
#ifdef WITH_ROOT
void MetaMonitor::fill(bool successful, uint monitoringLevel)
{
uint time = getWallTimeBin();
if (successful) {
m_histograms[MonHistType::MonitoringSuccess]->Fill(time);
}
else {
m_histograms[MonHistType::MonitoringSkipped]->Fill(time);
}
switch (monitoringLevel) {
case 0: m_histograms[MonHistType::MonitoringLevel0]->Fill(time); break;
case 1: m_histograms[MonHistType::MonitoringLevel1]->Fill(time); break;
case 2: m_histograms[MonHistType::MonitoringLevel2]->Fill(time); break;
case 3: m_histograms[MonHistType::MonitoringLevel3]->Fill(time); break;
case 4: m_histograms[MonHistType::MonitoringLevel4]->Fill(time); break;
default: m_histograms[MonHistType::MonitoringLevel5P]->Fill(time); break;
}
}
void MetaMonitor::init()
{
// set number of bins such that histograms cover approximately 80 minutes
uint nBins = 80 * 60 / m_time_step;
double max = nBins * m_time_step;
m_histograms.emplace(MonHistType::MonitoringSuccess, new TH1D("monitoringSuccess", "", nBins, 0., max));
m_histograms.emplace(MonHistType::MonitoringSkipped, new TH1D("monitoringSkipped", "", nBins, 0., max));
m_histograms.emplace(MonHistType::MonitoringLevel0, new TH1D("monitoringLevel0", "", nBins, 0., max));
m_histograms.emplace(MonHistType::MonitoringLevel1, new TH1D("monitoringLevel1", "", nBins, 0., max));
m_histograms.emplace(MonHistType::MonitoringLevel2, new TH1D("monitoringLevel2", "", nBins, 0., max));
m_histograms.emplace(MonHistType::MonitoringLevel3, new TH1D("monitoringLevel3", "", nBins, 0., max));
m_histograms.emplace(MonHistType::MonitoringLevel4, new TH1D("monitoringLevel4", "", nBins, 0., max));
m_histograms.emplace(MonHistType::MonitoringLevel5P, new TH1D("monitoringLevel5p", "", nBins, 0., max));
for (auto kv : m_histograms) {
kv.second->SetDirectory(nullptr);
}
}
#else
void MetaMonitor::fill(bool, uint) {}
void MetaMonitor::init() {}
#endif
#include "MonitorBase.h"
#include "ROOTHeaders.h"
#include <ctime>
#ifdef WITH_ROOT
void MonitorBase::saveHistograms(std::string file_name, bool append) const
{
std::string mode = "RECREATE";
if (append) mode = "UPDATE";
TFile* file = TFile::Open(file_name.c_str(), mode.c_str());
if (!file) return;
auto* dir = static_cast<TDirectory*>(file->Get(m_name.c_str()));
if (!dir) {
dir = file->mkdir(m_name.c_str());
dir = static_cast<TDirectory*>(file->Get(m_name.c_str()));
}
for (auto kv : m_histograms) {
TH1* h = kv.second;
dir->cd();
if (append) {
auto* hout = static_cast<TH1D*>(dir->Get(h->GetName()));
if (hout) {
hout->Add(h);
hout->Write();
}
else {
h->Write();
}
}
else {
h->Write();
}
}
file->Close();
#else
void MonitorBase::saveHistograms(std::string, bool) const {
#endif
}
uint MonitorBase::getWallTimeBin()
{
if (m_offset <= 0) m_offset = time(0);
return time(0) - m_offset;
}
#include "MonitorManager.h"
#include "RateMonitor.h"
#include "HostBuffersManager.cuh"
#include "Logger.h"
void MonitorManager::fill(uint i_mon, uint i_buf, bool useWallTime)
{
if (i_mon >= m_monitors.size()) {
warning_cout << "No monitors exist for thread " << i_mon << std::endl;
return;
}
for (auto mon : m_monitors.at(i_mon)) {
mon->fill(i_buf, useWallTime);
}
}
void MonitorManager::saveHistograms(std::string file_name)
{
meta_mon.saveHistograms(file_name, false);
for (auto mons : m_monitors) {
for (auto mon : mons) {
mon->saveHistograms(file_name, true);
}
}
}
std::optional<size_t> MonitorManager::getFreeMonitor()
{
if (free_monitors.empty()) {
++count_skipped;
if (count_skipped > 2) {
if (monitoring_level < max_monitoring_level) {
++monitoring_level;
info_cout << "Reducing monitoring rate" << std::endl;
}
count_skipped = 0;
count_processed = 0;
}
meta_mon.fill(false, monitoring_level);
return std::nullopt;
}
auto ret = std::optional<size_t>(free_monitors.front());
free_monitors.pop();
return ret;
}
void MonitorManager::freeMonitor(size_t i_mon)
{
++count_processed;
if (count_processed > 10) {
if (monitoring_level > 0) {
--monitoring_level;
info_cout << "Increasing monitoring rate" << std::endl;
}
count_skipped = 0;
count_processed = 0;
}
meta_mon.fill(true, monitoring_level);
free_monitors.push(i_mon);
}
void MonitorManager::init(uint n_mon_thread, HostBuffersManager* buffers_manager, int time_step, int offset)
{
for (uint i = 0; i < n_mon_thread; ++i) {
m_monitors.push_back(std::vector<BufferMonitor*>());
m_monitors.back().push_back(new RateMonitor(buffers_manager, time_step, offset));
free_monitors.push(i);
}
}
#include "RateMonitor.h"
#include "HostBuffers.cuh"
#include "HostBuffersManager.cuh"
#include "Logger.h"
#ifdef WITH_ROOT
void RateMonitor::fill(uint i_buf, bool useWallTime)
{
HostBuffers* buf = m_buffers_manager->getBuffers(i_buf);
uint time(0);
if (!useWallTime) {
warning_cout << "ODIN time histograms not avaiable yet" << std::endl;
return;
}
else {
time = getWallTimeBin();
}
uint nevt = buf->host_number_of_selected_events[0];
int* trk_offsets = buf->host_atomics_scifi + nevt;
uint* sv_offsets = buf->host_sv_offsets;
for (uint ievt = 0; ievt < nevt; ++ievt) {
int ntrk = buf->host_atomics_scifi[ievt];
int nsv = buf->host_sv_offsets[ievt + 1] - buf->host_sv_offsets[ievt];
uint trk_offset = trk_offsets[ievt];
uint sv_offset = sv_offsets[ievt];
bool one_track_pass(false);
bool two_track_pass(false);
bool single_muon_pass(false);
bool disp_dimuon_pass(false);
bool high_mass_dimuon_pass(false);
for (int itrk = 0; itrk < ntrk; ++itrk) {
if (buf->host_one_track_decisions[itrk + trk_offset]) one_track_pass = true;
if (buf->host_single_muon_decisions[itrk + trk_offset]) single_muon_pass = true;
}
for (int isv = 0; isv < nsv; ++isv) {
if (buf->host_two_track_decisions[isv + sv_offset]) two_track_pass = true;
if (buf->host_disp_dimuon_decisions[isv + sv_offset]) disp_dimuon_pass = true;
if (buf->host_high_mass_dimuon_decisions[isv + sv_offset]) high_mass_dimuon_pass = true;
}
if (one_track_pass) m_histograms[MonHistType::OneTrackRate]->Fill(time, 1. / m_time_step);
if (two_track_pass) m_histograms[MonHistType::TwoTrackRate]->Fill(time, 1. / m_time_step);
if (single_muon_pass) m_histograms[MonHistType::SingleMuonRate]->Fill(time, 1. / m_time_step);
if (disp_dimuon_pass) m_histograms[MonHistType::DispDimuonRate]->Fill(time, 1. / m_time_step);
if (high_mass_dimuon_pass) m_histograms[MonHistType::HighMassDimuonRate]->Fill(time, 1. / m_time_step);
if (one_track_pass || two_track_pass || single_muon_pass || disp_dimuon_pass || high_mass_dimuon_pass)
m_histograms[MonHistType::InclusiveRate]->Fill(time, 1. / m_time_step);
}
}
void RateMonitor::init()
{
// set number of bins such that histograms cover approximately 80 minutes
uint nBins = 80 * 60 / m_time_step;
double max = nBins * m_time_step;
m_histograms.emplace(MonHistType::OneTrackRate, new TH1D("oneTrackRate", "", nBins, 0., max));
m_histograms.emplace(MonHistType::TwoTrackRate, new TH1D("twoTrackRate", "", nBins, 0., max));
m_histograms.emplace(MonHistType::SingleMuonRate, new TH1D("singleMuonRate", "", nBins, 0., max));
m_histograms.emplace(MonHistType::DispDimuonRate, new TH1D("dispDimuonRate", "", nBins, 0., max));
m_histograms.emplace(MonHistType::HighMassDimuonRate, new TH1D("highMassDimuonRate", "", nBins, 0., max));
m_histograms.emplace(MonHistType::InclusiveRate, new TH1D("inclusiveRate", "", nBins, 0., max));
for (auto kv : m_histograms) {
kv.second->SetDirectory(nullptr);
kv.second->Sumw2();
}
}
#else
void RateMonitor::fill(uint, bool) {}
void RateMonitor::init() {}
#endif
......@@ -9,5 +9,6 @@ int allen(std::map<std::string, std::string> options, Allen::NonEventData::IUpda
namespace {
constexpr size_t n_io = 1;
constexpr size_t n_mon = 1;
constexpr size_t max_stream_threads = 1024;
} // namespace
......@@ -19,6 +19,7 @@
#include <thread>
#include <bitset>
#include <cstdio>
#include <ctime>
#include <unistd.h>
#include <getopt.h>
#include <memory>
......@@ -41,6 +42,8 @@
#include "MuonDefinitions.cuh"
#include "Consumers.h"
#include "CheckerInvoker.h"
#include "HostBuffersManager.cuh"
#include "MonitorManager.h"
#include "Allen.h"
#include <tuple>
......@@ -60,7 +63,7 @@ namespace {
void input_reader(const size_t io_id, IInputProvider* input_provider)
{
// Create a control oscket and connect it.
// Create a control socket and connect it.
zmq::socket_t control = zmqSvc().socket(zmq::PAIR);
zmq::setsockopt(control, zmq::LINGER, 0);
......@@ -184,6 +187,7 @@ void run_stream(
std::string command;
std::optional<size_t> idx;
std::optional<size_t> buf;
if (items[0].revents & zmq::POLLIN) {
command = zmqSvc().receive<std::string>(control);
if (command == "DONE") {
......@@ -194,6 +198,7 @@ void run_stream(
}
else {
idx = zmqSvc().receive<size_t>(control);
buf = zmqSvc().receive<size_t>(control);
}
}
......@@ -205,6 +210,7 @@ void run_stream(
uint n_events = static_cast<uint>(std::get<1>(vp_banks).size() - 1);
wrapper->run_stream(
stream_id,
*buf,
{std::move(vp_banks),
input_provider->banks(BankTypes::UT, *idx),
input_provider->banks(BankTypes::FT, *idx),
......@@ -216,7 +222,8 @@ void run_stream(
// signal that we're done
zmqSvc().send(control, "PROCESSED", zmq::SNDMORE);
zmqSvc().send(control, *idx);
zmqSvc().send(control, *idx, zmq::SNDMORE);
zmqSvc().send(control, *buf);
if (do_check && check_control) {
// Get list of events that are in the slice to load the right
// MC info
......@@ -251,6 +258,78 @@ void run_stream(
}
}
/**
* @brief Receive filled HostBuffers from GPU
* threads and produce rate histograms
*
* @param thread ID of this monitoring thread
* @param manager for the monitor objects
* @param index of the monitor objects to use for this thread
*
* @return void
*/
void run_monitoring(const size_t mon_id, MonitorManager* monitor_manager, uint i_monitor)
{
// Create a control socket and connect it.
zmq::socket_t control = zmqSvc().socket(zmq::PAIR);
zmq::setsockopt(control, zmq::LINGER, 0);
auto con = ZMQ::connection(mon_id);
try {
control.connect(con.c_str());
} catch (const zmq::error_t& e) {
error_cout << "failed to connect connection " << con << "\n";
throw e;
}
zmq::pollitem_t items[] = {{control, 0, zmq::POLLIN, 0}};
while (true) {
// Wait until we need to process
std::optional<int> n;
do {
try {
n = zmq::poll(&items[0], 1, -1);
} catch (const zmq::error_t& err) {
if (err.num() == EINTR) {
continue;
}
else {