Skip to content
Snippets Groups Projects
Commit 771f0ea6 authored by Giovanna Lazzari Miotto's avatar Giovanna Lazzari Miotto :mushroom:
Browse files

ref: WriteLSFooter, mutexed RunMetadata, constness

parent 28639d0b
No related branches found
No related tags found
1 merge request!107Draft: ref: output: Rewrite sink functionality
Pipeline #7848318 passed
...@@ -31,7 +31,11 @@ void Detail::LumisectionMetadata::AddFileMetadata(FileMetadata&& fmd) { ...@@ -31,7 +31,11 @@ void Detail::LumisectionMetadata::AddFileMetadata(FileMetadata&& fmd) {
num_files++; num_files++;
} }
std::string Detail::LumisectionMetadata::MakeFooter(uint32_t run_number) { bool Detail::LumisectionMetadata::IsLastIndex(int index) const {
return index == static_cast<int>(max_index);
}
std::string Detail::LumisectionMetadata::MakeFooter(uint32_t run_number) const {
std::stringstream footer; std::stringstream footer;
footer << "{\n \"data\":[\""; footer << "{\n \"data\":[\"";
footer << num_orbits << "\",\""; // NEvents footer << num_orbits << "\",\""; // NEvents
...@@ -47,11 +51,12 @@ std::string Detail::LumisectionMetadata::MakeFooter(uint32_t run_number) { ...@@ -47,11 +51,12 @@ std::string Detail::LumisectionMetadata::MakeFooter(uint32_t run_number) {
} }
void Detail::RunMetadata::AddLumisectionMetadata(LumisectionMetadata& ls) { void Detail::RunMetadata::AddLumisectionMetadata(LumisectionMetadata& ls) {
std::lock_guard<std::mutex> l(mutex_);
num_orbits += ls.num_orbits; num_orbits += ls.num_orbits;
num_files += ls.num_files; num_files += ls.num_files;
} }
std::string Detail::RunMetadata::MakeFooter(uint32_t ls_index) { std::string Detail::RunMetadata::MakeFooter(uint32_t ls_index) const {
std::stringstream footer; std::stringstream footer;
footer << "{\n \"data\":[\""; footer << "{\n \"data\":[\"";
footer << num_orbits << "\",\""; // NEvents footer << num_orbits << "\",\""; // NEvents
......
...@@ -40,7 +40,6 @@ struct FileMetadata { ...@@ -40,7 +40,6 @@ struct FileMetadata {
}; };
struct LumisectionMetadata { struct LumisectionMetadata {
uint32_t index{};
size_t file_size{}; size_t file_size{};
uint32_t num_orbits{}; uint32_t num_orbits{};
uint32_t num_files{}; uint32_t num_files{};
...@@ -48,9 +47,10 @@ struct LumisectionMetadata { ...@@ -48,9 +47,10 @@ struct LumisectionMetadata {
LumisectionMetadata(uint32_t max_index_per_ls) : max_index(max_index_per_ls) {} LumisectionMetadata(uint32_t max_index_per_ls) : max_index(max_index_per_ls) {}
bool IsLastIndex(int index) const;
void Reset() { file_size = num_orbits = num_files = 0; } void Reset() { file_size = num_orbits = num_files = 0; }
void AddFileMetadata(FileMetadata&& fmd); void AddFileMetadata(FileMetadata&& fmd);
std::string MakeFooter(uint32_t run_number); std::string MakeFooter(uint32_t run_number) const;
}; };
struct RunMetadata { struct RunMetadata {
...@@ -58,12 +58,15 @@ struct RunMetadata { ...@@ -58,12 +58,15 @@ struct RunMetadata {
uint32_t num_orbits; uint32_t num_orbits;
uint32_t num_files; uint32_t num_files;
std::mutex mutex_;
void Reset(int run_number) { void Reset(int run_number) {
std::lock_guard<std::mutex> l(mutex_);
num_orbits = num_files = 0; num_orbits = num_files = 0;
number = run_number; number = run_number;
} }
void AddLumisectionMetadata(LumisectionMetadata& ls); void AddLumisectionMetadata(LumisectionMetadata& ls);
std::string MakeFooter(uint32_t ls_index); std::string MakeFooter(uint32_t ls_index) const;
}; };
} // namespace Detail } // namespace Detail
...@@ -116,22 +119,22 @@ class OutputFile { ...@@ -116,22 +119,22 @@ class OutputFile {
header_ = HeaderType(md_.source_id, md_.num_orbits, md_.run_number, md_.lumisection, md_.size); header_ = HeaderType(md_.source_id, md_.num_orbits, md_.run_number, md_.lumisection, md_.size);
} }
void SetLumisectionFooter(Detail::LumisectionMetadata &md) { void SetLumisectionFooter(Detail::LumisectionMetadata &&md) {
// The last file in a lumisection carries an LS metadata footer // The last file in a lumisection carries an LS metadata footer
ls_footer_.emplace(md); ls_footer_.emplace(md);
} }
auto GetFilename() -> std::string { return filename_; } auto GetFilename() const { return filename_; }
auto GetFilePointer() -> FILE* { return file_ptr_; } auto GetFilePointer() const { return file_ptr_; }
auto GetWorkingPath() -> std::string { return working_path_; } auto GetWorkingPath() const { return working_path_; }
auto GetDestinationDir() -> std::string { return destination_path_; } auto GetDestinationDir() const { return destination_path_; }
auto GetLumisection() -> uint32_t { return md_.lumisection; } auto GetLumisection() const { return md_.lumisection; }
auto GetMetadata() -> Detail::FileMetadata { return md_; } auto GetMetadata() const { return md_; }
auto GetLumisectionFooter() -> Detail::LumisectionMetadata { return ls_footer_.value(); } auto GetLumisectionFooter() const { return ls_footer_.value(); }
auto HasLumisectionFooter() -> bool { return ls_footer_.has_value(); } auto HasLumisectionFooter() const { return ls_footer_.has_value(); }
auto HasPayload() -> bool { return md_.size > sizeof(HeaderType); } auto HasPayload() const { return md_.size > sizeof(HeaderType); }
auto HasHeader() -> bool { return header_.has_value(); }; auto HasHeader() const { return header_.has_value(); };
auto GetHeader() -> std::optional<HeaderType> { return header_; } auto GetHeader() const -> std::optional<HeaderType> { return header_; }
private: private:
FILE* file_ptr_{}; FILE* file_ptr_{};
......
...@@ -30,10 +30,10 @@ void OutputFileHandler::UpdateRunInfo(uint32_t run, uint32_t index) { ...@@ -30,10 +30,10 @@ void OutputFileHandler::UpdateRunInfo(uint32_t run, uint32_t index) {
void OutputFileHandler::CommitFile(uint32_t run, uint32_t index) { void OutputFileHandler::CommitFile(uint32_t run, uint32_t index) {
ls_.AddFileMetadata(outputFile_.GetMetadata()); ls_.AddFileMetadata(outputFile_.GetMetadata());
if (current_index_ == static_cast<int>(ls_.max_index) && IsMainPipeline() && HasCmsswHeaders()) { if (ls_.IsLastIndex(current_index_) && IsMainPipeline() && HasCmsswHeaders()) {
// If last in lumisection and using CMSSW header and is the main pipeline // If last in lumisection and using CMSSW header and is the main pipeline
LOG(TRACE) << "Last file in lumisection; handing over LS metadata footer"; LOG(TRACE) << "Last file in lumisection; handing over LS metadata footer";
outputFile_.SetLumisectionFooter(ls_); outputFile_.SetLumisectionFooter(std::move(ls_));
} }
sink_.Submit(std::move(outputFile_)); sink_.Submit(std::move(outputFile_));
...@@ -81,7 +81,7 @@ std::string OutputFileHandler::FormatFilename(uint32_t run_number, uint32_t inde ...@@ -81,7 +81,7 @@ std::string OutputFileHandler::FormatFilename(uint32_t run_number, uint32_t inde
} }
void OutputFileHandler::CommitLumisection(uint32_t ls_index) { void OutputFileHandler::CommitLumisection(uint32_t ls_index) {
sink_.WriteLumisectionFooter(run_.number, ls_index, ls_); sink_.WriteLumisectionFooter(run_.number, ls_index, ls_.MakeFooter(run_.number));
// Update run counters // Update run counters
run_.AddLumisectionMetadata(ls_); run_.AddLumisectionMetadata(ls_);
ls_.Reset(); ls_.Reset();
...@@ -95,5 +95,5 @@ void OutputFileHandler::CommitRun() { ...@@ -95,5 +95,5 @@ void OutputFileHandler::CommitRun() {
CommitLumisection(ls_index); CommitLumisection(ls_index);
} }
sink_.WriteRunFooter(ls_index, run_); sink_.WriteRunFooter(run_.number, run_.MakeFooter(ls_index));
} }
\ No newline at end of file
...@@ -41,7 +41,8 @@ void FileSink::ProcessQueue() { ...@@ -41,7 +41,8 @@ void FileSink::ProcessQueue() {
if (file.HasLumisectionFooter()) { if (file.HasLumisectionFooter()) {
auto ls = file.GetLumisectionFooter(); auto ls = file.GetLumisectionFooter();
WriteLumisectionFooter(file.GetMetadata().run_number, file.GetLumisection(), ls); auto run_number = file.GetMetadata().run_number;
WriteLumisectionFooter(run_number, file.GetLumisection(), ls.MakeFooter(run_number));
file.run_metrics_->AddLumisectionMetadata(ls); file.run_metrics_->AddLumisectionMetadata(ls);
} }
} }
...@@ -77,20 +78,19 @@ void FileSink::Submit(OutputFile &&file) { ...@@ -77,20 +78,19 @@ void FileSink::Submit(OutputFile &&file) {
} }
} }
void FileSink::WriteLumisectionFooter(uint32_t run_number, uint32_t ls_id, void FileSink::WriteLumisectionFooter(uint32_t run_number, uint32_t ls_id, std::string &&footer) {
Detail::LumisectionMetadata ls) {
auto filename = auto filename =
Detail::FormatRun(run_number) + "_" + Detail::FormatLumisection(ls_id) + "_EoLS.jsn"; Detail::FormatRun(run_number) + "_" + Detail::FormatLumisection(ls_id) + "_EoLS.jsn";
auto sub_path = Detail::FormatRun(run_number) + "/" + filename; auto sub_path = Detail::FormatRun(run_number) + "/" + filename;
LOG(TRACE) << "Writing EoLS footer file " << sub_path; LOG(TRACE) << "Writing EoLS footer file " << sub_path;
BlockingWrite(std::move(sub_path), ls.MakeFooter(run_number)); BlockingWrite(std::move(sub_path), std::move(footer));
} }
void FileSink::WriteRunFooter(uint32_t ls_id, Detail::RunMetadata run) { void FileSink::WriteRunFooter(uint32_t run_number, std::string &&footer) {
auto filename = Detail::FormatRun(run.number) + "_" + Detail::FormatLumisection(0) + "_EoR.jsn"; auto filename = Detail::FormatRun(run_number) + "_" + Detail::FormatLumisection(0) + "_EoR.jsn";
auto sub_path = Detail::FormatRun(run.number) + "/" + filename; auto sub_path = Detail::FormatRun(run_number) + "/" + filename;
LOG(TRACE) << "Writing EoR file " << sub_path; LOG(TRACE) << "Writing EoR file " << sub_path;
BlockingWrite(std::move(sub_path), run.MakeFooter(ls_id)); BlockingWrite(std::move(sub_path), std::move(footer));
} }
...@@ -31,9 +31,9 @@ class FileSink { ...@@ -31,9 +31,9 @@ class FileSink {
void BlockingWrite(std::string &&loc, std::string &&content); void BlockingWrite(std::string &&loc, std::string &&content);
void CommitFileHeader(OutputFile &file); void CommitFileHeader(OutputFile &file);
void WriteLumisectionFooter(uint32_t run_number, uint32_t ls_id, Detail::LumisectionMetadata md); void WriteLumisectionFooter(uint32_t run_number, uint32_t ls_id, std::string &&footer);
void WriteRunFooter(uint32_t ls_id, Detail::RunMetadata run); void WriteRunFooter(uint32_t run_number, std::string &&footer);
std::string GetRootPath() { return root_path_; } std::string GetRootPath() const { return root_path_; }
private: private:
std::thread commit_thread_; std::thread commit_thread_;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment