Skip to content
Snippets Groups Projects
Commit d10947b4 authored by Dinyar Rabady's avatar Dinyar Rabady
Browse files

Some refactoring, also more logging

parent 51bfa327
No related branches found
No related tags found
2 merge requests!73Draft: cleanup of conf files,!70Draft: Changes to run with CMSSW
...@@ -12,21 +12,21 @@ ...@@ -12,21 +12,21 @@
#include "log.h" #include "log.h"
#include "tools.h" #include "tools.h"
OutputByOrbitStream::OutputByOrbitStream(ctrl &c, config &conf_) OutputByOrbitStream::OutputByOrbitStream(ctrl &ctrl, config &conf)
: tbb::filter(serial_in_order), : tbb::filter(serial_in_order),
current_file_size(0), totcounts_(0),
totcounts(0), control_(ctrl),
control(c), conf_(conf),
conf(conf_), output_file_handler_(conf_.getOutputFilenameBase(), conf_.getOutputFilenamePrefix(),
output_file_handler_(conf.getOutputFilenameBase(), conf.getOutputFilenamePrefix(), conf_.getNOrbitsPerFile(), conf_.getCMSSWHeaders(), conf_.getSourceID()),
conf.getNOrbitsPerFile(), conf.getCMSSWHeaders(), conf.getSourceID()),
min_buffer_queue_size_(1000) { min_buffer_queue_size_(1000) {
LOG(TRACE) << "Created output filter at " << static_cast<void *>(this); LOG(TRACE) << "Created output filter at " << static_cast<void *>(this);
} }
void OutputByOrbitStream::OutputFixedOrbits(Slice &out) { void OutputByOrbitStream::OutputFixedOrbits(Slice &out) {
uint32_t orbitN = out.get_firstOrbitN(); uint32_t orbitN = out.get_firstOrbitN();
uint32_t index = uint32_t(orbitN / conf.getNOrbitsPerFile()); LOG(TRACE) << "First orbit number in file received is " << orbitN << ".";
uint32_t index = uint32_t(orbitN / conf_.getNOrbitsPerFile());
size_t n = 0; size_t n = 0;
if (index == 0) { if (index == 0) {
LOG(DEBUG) << "index is 0"; LOG(DEBUG) << "index is 0";
...@@ -34,13 +34,13 @@ void OutputByOrbitStream::OutputFixedOrbits(Slice &out) { ...@@ -34,13 +34,13 @@ void OutputByOrbitStream::OutputFixedOrbits(Slice &out) {
if (out.get_counts() == 0) { if (out.get_counts() == 0) {
LOG(TRACE) << "got an empty slice "; LOG(TRACE) << "got an empty slice ";
} }
if ((out.get_counts() != 0) || conf.getCMSSWHeaders()) { if ((out.get_counts() != 0) || conf_.getCMSSWHeaders()) {
if (control.running.load(std::memory_order_acquire) || if (control_.running.load(std::memory_order_acquire) ||
control.output_force_write) { // i.e should be writing data control_.output_force_write) { // i.e should be writing data
n = fwrite(out.begin(), 1, out.size(), n = fwrite(out.begin(), 1, out.size(),
output_file_handler_.getFile(control.run_number, index).getFilePtr()); output_file_handler_.getFile(control_.run_number, index).getFilePtr());
output_file_handler_.upFileSize(n); output_file_handler_.upFileSize(n);
output_file_handler_.upNOrbits(conf.getNOrbitsPerPacket()); output_file_handler_.upNOrbits(conf_.getNOrbitsPerPacket());
} else if (output_file_handler_.hasFile()) { } else if (output_file_handler_.hasFile()) {
// the run has been stopped so drop but first check if there is a last // the run has been stopped so drop but first check if there is a last
// file to close // file to close
...@@ -52,11 +52,11 @@ void OutputByOrbitStream::OutputFixedOrbits(Slice &out) { ...@@ -52,11 +52,11 @@ void OutputByOrbitStream::OutputFixedOrbits(Slice &out) {
} }
void *OutputByOrbitStream::operator()(void *item) { void *OutputByOrbitStream::operator()(void *item) {
Slice &out = *static_cast<Slice *>(item); Slice &out = *static_cast<Slice *>(item);
totcounts += out.get_counts(); totcounts_ += out.get_counts();
OutputFixedOrbits(out); OutputFixedOrbits(out);
if (Slice::current_queue_size() < min_buffer_queue_size_) { if (Slice::current_queue_size() < min_buffer_queue_size_) {
LOG(TRACE) << "New minimum queue size is " << Slice::current_queue_size() << " total counts " LOG(TRACE) << "New minimum queue size is " << Slice::current_queue_size() << " total counts "
<< totcounts; << totcounts_;
min_buffer_queue_size_ = Slice::current_queue_size(); min_buffer_queue_size_ = Slice::current_queue_size();
} }
Slice::giveAllocated(&out); Slice::giveAllocated(&out);
......
...@@ -23,10 +23,9 @@ class OutputByOrbitStream : public tbb::filter { ...@@ -23,10 +23,9 @@ class OutputByOrbitStream : public tbb::filter {
private: private:
// used for fixed N orbits per file approach // used for fixed N orbits per file approach
uint64_t current_file_size; uint64_t totcounts_;
uint64_t totcounts; ctrl &control_;
ctrl &control; config &conf_;
config &conf;
OutputFileHandler output_file_handler_; OutputFileHandler output_file_handler_;
size_t min_buffer_queue_size_; size_t min_buffer_queue_size_;
}; };
......
...@@ -45,25 +45,29 @@ void OutputFileHandler::enqueue_current_file_for_close_and_move_maybe() { ...@@ -45,25 +45,29 @@ void OutputFileHandler::enqueue_current_file_for_close_and_move_maybe() {
} }
OutputFile OutputFileHandler::getFile(uint32_t run, uint32_t index) { OutputFile OutputFileHandler::getFile(uint32_t run, uint32_t index) {
// TODO: We should use the stop signal to trigger this. // TODO: We should maybe move this out of this function...
if (current_run_number_ != run && getCMSSWHeaders()) { // TODO: EoR file should be also written at destruction, probably.
OutputFileHandler::write_EoR_file( if (current_run_number_ > 0 && current_run_number_ != run && getCMSSWHeaders()) {
1 + static_cast<uint32_t>((current_index_ * nOrbitsPerFile) / OutputFileHandler::write_EoR_file();
constants::N_orbits_per_lumisection),
current_index_);
} }
if (current_run_number_ != run || current_index_ != index) { if (current_run_number_ != run || current_index_ != index) {
enqueue_current_file_for_close_and_move_maybe(); enqueue_current_file_for_close_and_move_maybe();
LOG(TRACE) << "new index: " << index << "'."; if (current_run_number_ != run) {
LOG(TRACE) << "currentindex: " << current_index_ << "'."; LOG(TRACE) << "Previous run: " << current_run_number_ << ".";
current_index_ = index; LOG(TRACE) << "New run: " << run << ".";
}
if (current_index_ != index) {
LOG(TRACE) << "Previous index: " << current_index_ << ".";
LOG(TRACE) << "New index: " << index << ".";
}
if (current_run_number_ != run) { if (current_run_number_ != run) {
run_NOrbits_ = 0; run_NOrbits_ = 0;
} }
current_index_ = index;
current_run_number_ = run;
std::stringstream path_ss; std::stringstream path_ss;
path_ss << base_path_ << "/run" << std::setfill('0') << std::setw(6) << current_run_number_; path_ss << base_path_ << "/run" << std::setfill('0') << std::setw(6) << current_run_number_;
current_run_number_ = run;
run_dir_ = path_ss.str(); run_dir_ = path_ss.str();
path_ss << "/" << working_dir_; path_ss << "/" << working_dir_;
working_files_base_path_ = path_ss.str(); working_files_base_path_ = path_ss.str();
...@@ -184,12 +188,15 @@ void OutputFileHandler::write_EoLS_file(uint32_t ls) { ...@@ -184,12 +188,15 @@ void OutputFileHandler::write_EoLS_file(uint32_t ls) {
LOG(TRACE) << "writing EoLS file " << EoLS_filename; LOG(TRACE) << "writing EoLS file " << EoLS_filename;
EoLS_file.open(full_filename.c_str(), std::ios_base::out); EoLS_file.open(full_filename.c_str(), std::ios_base::out);
EoLS_file_string_stream << "{\n \"data\":[\"" << ls_NOrbits_ << "\",\"" // NEvents // Originally went after "total events"
<< 1 + max_index_per_ls_ << "\",\"" // NFiles // "\",\"0\",\"" << constants::N_orbits_per_lumisection -
<< constants::N_orbits_per_lumisection << "\",\"0\",\"" // Total Events // ls_NOrbits_ // Who knows..
<< constants::N_orbits_per_lumisection - ls_NOrbits_ // TODO: To me it looks like there are more fields than intended in this file...
<< "\",\"0\",\"" // NLost Events EoLS_file_string_stream << "{\n \"data\":[\"" << ls_NOrbits_ << "\",\"" // NEvents
<< ls_file_size_ << "\"],\n \"definition\":\"" // NBytes << 1 + max_index_per_ls_ << "\",\"" // NFiles
<< constants::N_orbits_per_lumisection << "\",\"" // Total Events
<< "\",\"0\",\"" // NLost Events
<< ls_file_size_ << "\"],\n \"definition\":\"" // NBytes
<< "/fff/ramdisk/run" << std::setfill('0') << std::setw(6) << "/fff/ramdisk/run" << std::setfill('0') << std::setw(6)
<< current_run_number_ << "/jsd/EoLS.jsd\",\n \"source\":\"l1scout\"\n}"; << current_run_number_ << "/jsd/EoLS.jsd\",\n \"source\":\"l1scout\"\n}";
...@@ -201,21 +208,24 @@ void OutputFileHandler::write_EoLS_file(uint32_t ls) { ...@@ -201,21 +208,24 @@ void OutputFileHandler::write_EoLS_file(uint32_t ls) {
EoLS_file.close(); EoLS_file.close();
} }
void OutputFileHandler::write_EoR_file(uint32_t ls, uint32_t index) { void OutputFileHandler::write_EoR_file() {
// TODO: Static cast neeed?
int ls{1 + static_cast<uint32_t>((current_index_ * nOrbitsPerFile_) /
constants::N_orbits_per_lumisection)};
std::stringstream EoR_filename; std::stringstream EoR_filename;
EoR_filename << run_dir_ << "/" EoR_filename << run_dir_ << "/"
<< "run" << std::setfill('0') << std::setw(6) << current_run_number_ << "run" << std::setfill('0') << std::setw(6) << current_run_number_
<< "_ls0000_EoR.jsn"; << "_ls0000_EoR.jsn";
LOG(TRACE) << "Writing EoR file " << EoR_filename.str();
std::fstream EoR_file; std::fstream EoR_file;
LOG(TRACE) << "writing EoR file " << EoR_filename;
EoR_file.open(EoR_filename.str().c_str(), std::ios_base::out); EoR_file.open(EoR_filename.str().c_str(), std::ios_base::out);
std::stringstream EoR_file_string_stream; std::stringstream EoR_file_string_stream;
EoR_file_string_stream << "{\n \"data\":[\"" << run_NOrbits_ // NEvents EoR_file_string_stream << "{\n \"data\":[\"" << run_NOrbits_ // NEvents
<< "\",\"" << "\",\""
<< static_cast<int>((ls * (max_index_per_ls_ + 1)) + << static_cast<int>((ls * (max_index_per_ls_ + 1)) +
nOrbitsPerFile * index) // NFiles nOrbitsPerFile_ * current_index_) // NFiles
<< "\",\"" << ls << "\",\"" // NLumis << "\",\"" << ls << "\",\"" // NLumis
<< ls << "\"],\n \"definition\":\"" // LastLumi << ls << "\"],\n \"definition\":\"" // LastLumi
<< "/fff/ramdisk/run" << std::setfill('0') << std::setw(6) << "/fff/ramdisk/run" << std::setfill('0') << std::setw(6)
<< current_run_number_ << "/jsd/EoR.jsd\",\n \"source\":\"l1scout\"\n}"; << current_run_number_ << "/jsd/EoR.jsd\",\n \"source\":\"l1scout\"\n}";
std::string EoR_file_string = EoR_file_string_stream.str(); std::string EoR_file_string = EoR_file_string_stream.str();
......
...@@ -16,27 +16,27 @@ ...@@ -16,27 +16,27 @@
class OutputFileHandler { class OutputFileHandler {
public: public:
OutputFileHandler(const std::string &base_path, const std::string &filename_prefix, OutputFileHandler(const std::string &base_path, const std::string &filename_prefix,
int nOrbitsPerFile_, const bool cmsswHeaders_, uint32_t sourceID_) int nOrbitsPerFile, const bool cmsswHeaders, uint32_t sourceID)
: base_path_(base_path), : base_path_(base_path),
run_dir_(base_path + "/run000000"), run_dir_(base_path + "/run000000"),
filename_prefix_(filename_prefix), filename_prefix_(filename_prefix),
working_files_base_path_(base_path_ + "/" + working_dir_), working_files_base_path_(base_path_ + "/" + working_dir_),
current_run_number_(0), current_run_number_(-1),
current_index_(0), current_index_(-1),
close_and_rename_(base_path_, nOrbitsPerFile_), close_and_rename_(base_path_, nOrbitsPerFile),
nOrbitsPerFile(nOrbitsPerFile_), nOrbitsPerFile_(nOrbitsPerFile),
cmsswHeaders(cmsswHeaders_), cmsswHeaders_(cmsswHeaders),
sourceID(sourceID_), sourceID_(sourceID),
file_size_(0),
NOrbits_(0),
ls_NOrbits_(0),
run_NOrbits_(0),
t{}, t{},
max_index_per_ls_(static_cast<int>(constants::N_orbits_per_lumisection / nOrbitsPerFile_) - max_index_per_ls_(static_cast<int>(constants::N_orbits_per_lumisection / nOrbitsPerFile) -
1) { 1) {
close_and_rename_.SetOutputFileHandlerObj(*this); close_and_rename_.SetOutputFileHandlerObj(*this);
t = std::thread(close_and_rename_); t = std::thread(close_and_rename_);
file_size_ = 0; if (cmsswHeaders) {
NOrbits_ = 0;
ls_NOrbits_ = 0;
run_NOrbits_ = 0;
if (cmsswHeaders_) {
file_size_ = sizeof(FRDFileHeader_v2); // accounting for cmssw header size file_size_ = sizeof(FRDFileHeader_v2); // accounting for cmssw header size
ls_file_size_ = 0; ls_file_size_ = 0;
} }
...@@ -53,15 +53,15 @@ class OutputFileHandler { ...@@ -53,15 +53,15 @@ class OutputFileHandler {
bool hasFile() { return outputFile_.exists(); } bool hasFile() { return outputFile_.exists(); }
bool getCMSSWHeaders() { return cmsswHeaders; } bool getCMSSWHeaders() { return cmsswHeaders_; }
FRDFileHeader_v2 createFileHeader(uint32_t ls); FRDFileHeader_v2 createFileHeader(uint32_t ls);
void upFileSize(size_t n) { file_size_ += n; } void upFileSize(size_t n) { file_size_ += n; }
void upNOrbits(uint32_t n) { NOrbits_ += n; } void upNOrbits(uint32_t n) { NOrbits_ += n; }
void write_EoLS_file(uint32_t ls); void write_EoLS_file(uint32_t ls);
void write_EoR_file(uint32_t ls, uint32_t index); void write_EoR_file();
int NOrbitsPerFile() { return nOrbitsPerFile; } int NOrbitsPerFile() { return nOrbitsPerFile_; }
class close_and_rename { class close_and_rename {
OutputFileHandler *outputfilehandler_; OutputFileHandler *outputfilehandler_;
...@@ -110,12 +110,12 @@ class OutputFileHandler { ...@@ -110,12 +110,12 @@ class OutputFileHandler {
std::string filename_prefix_; std::string filename_prefix_;
std::string working_files_base_path_; std::string working_files_base_path_;
OutputFile outputFile_; OutputFile outputFile_;
uint32_t current_run_number_; int current_run_number_;
uint32_t current_index_; int current_index_;
close_and_rename close_and_rename_; close_and_rename close_and_rename_;
int nOrbitsPerFile; int nOrbitsPerFile_;
const bool cmsswHeaders; const bool cmsswHeaders_;
int sourceID; int sourceID_;
std::thread t; std::thread t;
const uint32_t max_index_per_ls_; const uint32_t max_index_per_ls_;
size_t ls_file_size_; size_t ls_file_size_;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment