-
Dinyar Rabady authoredDinyar Rabady authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
OutputFileHandler.h 3.53 KiB
#ifndef OUTPUTFILEHANDLER_H
#define OUTPUTFILEHANDLER_H
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <string>
#include <thread>
#include <utility>
#include "FRDFileHeader_v2.h"
#include "OutputFile.h"
#include "format.h"
#include "tbb/concurrent_queue.h"
class OutputFileHandler {
public:
OutputFileHandler(const std::string &base_path, const std::string &filename_prefix,
int nOrbitsPerFile, const bool cmsswHeaders, uint32_t sourceID)
: base_path_(base_path),
run_dir_(base_path + "/run000000"),
filename_prefix_(filename_prefix),
working_files_base_path_(base_path_ + "/" + working_dir_),
current_run_number_(-1),
current_index_(-1),
close_and_rename_(base_path_, nOrbitsPerFile),
nOrbitsPerFile_(nOrbitsPerFile),
cmsswHeaders_(cmsswHeaders),
sourceID_(sourceID),
file_size_(0),
NOrbits_(0),
ls_NOrbits_(0),
run_NOrbits_(0),
t{},
max_index_per_ls_(static_cast<int>(constants::N_orbits_per_lumisection / nOrbitsPerFile) -
1) {
close_and_rename_.SetOutputFileHandlerObj(*this);
t = std::thread(close_and_rename_);
if (cmsswHeaders) {
file_size_ = sizeof(FRDFileHeader_v2); // accounting for cmssw header size
ls_file_size_ = 0;
}
}
~OutputFileHandler() {
enqueue_current_file_for_close_and_move_maybe();
exit_file_handler_loop();
}
void enqueue_current_file_for_close_and_move_maybe();
OutputFile getFile(uint32_t run, uint32_t index);
bool hasFile() { return outputFile_.exists(); }
bool getCMSSWHeaders() { return cmsswHeaders_; }
FRDFileHeader_v2 createFileHeader(uint32_t ls);
void upFileSize(size_t n) { file_size_ += n; }
void upNOrbits(uint32_t n) { NOrbits_ += n; }
void write_EoLS_file(uint32_t ls);
void write_EoR_file();
int NOrbitsPerFile() { return nOrbitsPerFile_; }
class close_and_rename {
OutputFileHandler *outputfilehandler_;
public:
close_and_rename(std::string &base_path, int nOrbitsPerFile)
: base_path_(base_path), nOrbitsPerFile_(nOrbitsPerFile) {}
void operator()() const;
void SetOutputFileHandlerObj(OutputFileHandler &outputfilehandler_) {
this->outputfilehandler_ = &outputfilehandler_;
}
private:
std::string base_path_;
int nOrbitsPerFile_;
};
private:
void create_output_directory_maybe(std::string &output_directory);
void open_new_file();
std::string format_filename(uint32_t run_number, uint32_t index, uint32_t ls);
void exit_file_handler_loop() {
file_handler_running_.store(false, std::memory_order_release);
files_to_close_.abort();
t.join();
}
// name of subdir where the files are stored before they are moved to the
// final destination
static const std::string working_dir_;
// name of the journal file. Note: Filename prefix is added making the final
// filename
static const std::string journal_file_;
static tbb::concurrent_bounded_queue<OutputFile> files_to_close_;
static std::atomic<bool> file_handler_running_;
std::string base_path_;
std::string run_dir_;
std::string filename_prefix_;
std::string working_files_base_path_;
OutputFile outputFile_;
int current_run_number_;
int current_index_;
close_and_rename close_and_rename_;
int nOrbitsPerFile_;
const bool cmsswHeaders_;
int sourceID_;
std::thread t;
const uint32_t max_index_per_ls_;
size_t ls_file_size_;
size_t file_size_;
uint32_t NOrbits_;
uint32_t ls_NOrbits_;
uint32_t run_NOrbits_;
};
#endif