From 7ee0147a398d3b2575460b3df9c609e5a0e6c83b Mon Sep 17 00:00:00 2001 From: Giovanna Lazzari Miotto <giovanna.lazzari.miotto@cern.ch> Date: Tue, 24 Sep 2024 11:36:39 +0200 Subject: [PATCH] output: Improve header check --- src/OutputByOrbit.cc | 10 ++++++---- src/OutputFile.cc | 1 - src/OutputFile.h | 16 +++++++++++----- src/OutputFileHandler.cc | 13 +++++-------- src/OutputFileHandler.h | 3 ++- src/sink.cc | 10 ++++++---- src/sink.h | 27 +-------------------------- test/config/filedma-calo-tau.json5 | 2 +- 8 files changed, 32 insertions(+), 50 deletions(-) diff --git a/src/OutputByOrbit.cc b/src/OutputByOrbit.cc index caf3daf5..f711b044 100644 --- a/src/OutputByOrbit.cc +++ b/src/OutputByOrbit.cc @@ -27,7 +27,7 @@ void OutputByOrbitStream::OutputFixedOrbits(Slice &out) { #ifdef DAX_MODE uint32_t index = orbitN; #else - uint32_t index = uint32_t(orbitN / conf_.num_orbits_per_file_); + uint32_t file_index = uint32_t(orbitN / conf_.num_orbits_per_file_); #endif if ((out.get_counts() != 0) || conf_.support_cmssw_headers_) { @@ -36,14 +36,16 @@ void OutputByOrbitStream::OutputFixedOrbits(Slice &out) { if (!out.isInitialized()) { return; } + output_file_handler_.StageSlice(out.begin(), out.size(), conf_.num_orbits_per_packet_, - run_number, index); + run_number, file_index); + } else if (output_file_handler_.HasFile()) { // the run has been stopped so drop but first check if there is a last // file to close LOG(INFO) << "the run was stopped. queueing the last file for close and " - "rename "; - LOG(INFO) << "Committing file in FixedOrbits"; + "rename "; + LOG(TRACE) << "Committing file after run stop."; output_file_handler_.CommitFile(); } } diff --git a/src/OutputFile.cc b/src/OutputFile.cc index 2ca7d7d1..0e93e950 100644 --- a/src/OutputFile.cc +++ b/src/OutputFile.cc @@ -26,7 +26,6 @@ std::string Detail::FormatFileIndex(uint32_t index) { } void Detail::LumisectionMetadata::AddFileMetadata(FileMetadata&& fmd) { - LOG(INFO) << "File metadata: size=" << fmd.size << ", index_in_ls=" << fmd.index_in_ls; content_size += fmd.size; num_orbits += fmd.num_orbits; num_files++; diff --git a/src/OutputFile.h b/src/OutputFile.h index 68de741e..7c36ee2c 100644 --- a/src/OutputFile.h +++ b/src/OutputFile.h @@ -60,11 +60,11 @@ struct LumisectionMetadata { std::pair<uint32_t, uint32_t> UpdateIndex(uint32_t global_index) { auto new_ls = (global_index / (max_index + 1) + 1); - LOG(INFO) << "[max file idx per ls=" << max_index << "] Global-file index=" << global_index - << ", LS=" << lumisection << " ==> " << new_ls; + LOG(TRACE) << "File index (global)=" << global_index << ", lumisection from " << lumisection + << " ==to=> " << new_ls; // Lumisection should only ever change after structure has been reset if (lumisection != 0 && lumisection != new_ls) { - LOG(INFO) << "NOTE: lumisection should remain the same unless reset"; + LOG(WARNING) << "NOTE: lumisection should remain the same unless reset"; } lumisection = new_ls; auto new_file_idx = global_index % (max_index + 1); @@ -116,9 +116,12 @@ class Output { auto GetMetadata() const { return md_; } auto GetLumisection() const { return md_.lumisection; } auto GetLumisectionFooter() const { return ls_footer_; } - auto HasPayload() const { return md_.size > sizeof(HeaderType); } + auto HasPayload() const { + auto min_val = header_.has_value() ? sizeof(HeaderType) : 0; + return md_.size > min_val; + } auto HasLumisectionFooter() const { return ls_footer_.lumisection != 0; } - auto HasHeader() const { return header_.has_value(); }; + auto HasHeader() const { return reserved_header_; }; auto GetHeader() const -> std::optional<HeaderType> { return header_; } void UpdateHeader() { @@ -150,6 +153,7 @@ class Output { protected: Detail::LumisectionMetadata ls_footer_{}; + bool reserved_header_{false}; }; class FileOutput : public Output { @@ -181,6 +185,7 @@ class FileOutput : public Output { md_.size = sizeof(HeaderType); header_ = {}; fseek(std::get<HandleType>(handle_), md_.size, SEEK_SET); + reserved_header_ = true; } int Write(const char*& buffer, size_t size_bytes, uint32_t size_orbits) { @@ -293,6 +298,7 @@ class DaxOutput : public Output { } void ReserveHeader() { assert(md_.size == 0); + reserved_header_ = true; md_.size = sizeof(HeaderType); header_ = {}; diff --git a/src/OutputFileHandler.cc b/src/OutputFileHandler.cc index 2858d825..7de71f20 100644 --- a/src/OutputFileHandler.cc +++ b/src/OutputFileHandler.cc @@ -54,7 +54,8 @@ void OutputFileHandler::CommitFile() { Detail::FileMetadata file_meta = outputFile_.GetMetadata(); if (ls_->IsLastIndex(file_meta.index_in_ls) && IsMainPipeline() && HasCmsswHeaders()) { // If last in lumisection and using CMSSW header and is the main pipeline - LOG(INFO) << "Last file in lumisection " << ls_->lumisection << ", handing over metadata footer and resetting."; + LOG(TRACE) << "Last file in lumisection " << ls_->lumisection + << ", handing over metadata footer."; outputFile_.SetLumisectionFooter(*ls_); ls_ = std::make_unique<Detail::LumisectionMetadata>(GetMaxFileIndexPerLumisection()); } @@ -68,7 +69,7 @@ int OutputFileHandler::StageSlice(const char *buffer, size_t size_bytes, uint32_ bool is_new_index = (current_global_index_ != static_cast<int>(file_index)); if (is_new_run || is_new_index) { - LOG(INFO) << "Committing file because ... new_run? " << is_new_run << ", new_index? " + LOG(TRACE) << "Committing file because ... new_run? " << is_new_run << ", new_index? " << is_new_index << "(context: current_global_index_=" << current_global_index_ << ", incoming_file_index=" << file_index << ", current_run=" << run_.number << ", incoming_run=" << run_number << ")"; @@ -119,19 +120,15 @@ void OutputFileHandler::CommitLumisection() { void OutputFileHandler::CommitRun() { auto ls_index = GetLumisectionFromGlobalFileIndex(current_global_index_); - - LOG(INFO) << "Lumi " << std::to_string(ls_index) - << ". Assert that lumisection pointer exists: " << bool(ls_); if (ls_) { auto ls_index_footer = ls_->lumisection; if (ls_index_footer != ls_index) { LOG(WARNING) << "Expected lumisection number " << ls_index << ", but lumisection footer has " << ls_index_footer; + assert(ls_index_footer == ls_index); } - assert(ls_index_footer == ls_index); - if (IsMainPipeline()) { - LOG(INFO) << "Committing lumi EOLS before EoR can be written"; + LOG(TRACE) << "Committing last EOLS before EoR."; CommitLumisection(); } diff --git a/src/OutputFileHandler.h b/src/OutputFileHandler.h index ba5638a5..fdb15753 100644 --- a/src/OutputFileHandler.h +++ b/src/OutputFileHandler.h @@ -44,7 +44,8 @@ class OutputFileHandler { ~OutputFileHandler() { if (outputFile_.IsOpen()) { - LOG(INFO) << "Committing file in Handler dtor, size=" << outputFile_.GetMetadata().size << ", index_in_ls=" << outputFile_.GetMetadata().index_in_ls; + LOG(TRACE) << "Committing open file because destructing FileHandler. Size=" + << outputFile_.GetMetadata().size; CommitFile(); } } diff --git a/src/sink.cc b/src/sink.cc index 25affc88..fbff4ad4 100644 --- a/src/sink.cc +++ b/src/sink.cc @@ -18,9 +18,11 @@ template <typename TOutput> void Sink<TOutput>::CommitFileHeader(TOutput &file) { auto header = file.GetHeader(); if (header.has_value()) { - char packed[sizeof(header.value())]; - memcpy(packed, &header.value(), sizeof(header.value())); - file.WriteAt(reinterpret_cast<const char *&>(packed), sizeof(header.value()), 0); + size_t header_size = sizeof(header.value()); + char packed[header_size]; + memcpy(packed, &header.value(), header_size); + const char *packed_ptr = reinterpret_cast<const char *>(packed); + file.WriteAt(packed_ptr, sizeof(header.value()), 0); } } @@ -42,7 +44,7 @@ void Sink<TOutput>::CommitOutput(TOutput &f) { Detail::LumisectionMetadata ls_meta = file.GetLumisectionFooter(); Detail::FileMetadata file_meta = file.GetMetadata(); auto run_number = file_meta.run_number; - LOG(INFO) << "Writing scheduled lumifooter for lumisection=" << ls_meta.lumisection; + LOG(TRACE) << "Committing scheduled EoLS for lumisection=" << ls_meta.lumisection; assert(ls_meta.lumisection == file_meta.lumisection); WriteLumisectionFooter(run_number, ls_meta); file.run_metrics_->AddLumisectionMetadata(ls_meta); diff --git a/src/sink.h b/src/sink.h index 9b9ec087..d862cc2f 100644 --- a/src/sink.h +++ b/src/sink.h @@ -39,6 +39,7 @@ class Sink { file.UpdateHeader(); // Issue blocking call CommitFileHeader(file); + LOG(TRACE) << "Committed file header."; } if (file.HasPayload()) { LOG(TRACE) << "Queueing file " << file.GetIdentifier(); @@ -92,30 +93,4 @@ class Sink { void WriteMetadata(std::string &&sub_path, std::string &&content); }; -//class FileSink { -// public: -// explicit FileSink(const std::string &rp) : root_path_(rp + "/") { -// commit_thread_ = std::thread(&FileSink::ProcessQueue, this); -// } -// -// ~FileSink() { -// running_.store(false, std::memory_order_release); -// commit_queue_.abort(); -// commit_thread_.join(); -// } -// -// void ProcessQueue(); -// void Submit(FileOutput &&file); -// void BlockingWrite(std::string &&loc, std::string &&content); -// -// std::string GetRootPath() const { return root_path_; } -// -// private: -// std::thread commit_thread_; -// static tbb::concurrent_bounded_queue<FileOutput> commit_queue_; -// static std::atomic<bool> running_; -// -// const std::string root_path_; // E.g., common directory in which sink files can be stored -//}; - #endif // SCDAQ_SINK_H diff --git a/test/config/filedma-calo-tau.json5 b/test/config/filedma-calo-tau.json5 index 01e6c369..86937dab 100644 --- a/test/config/filedma-calo-tau.json5 +++ b/test/config/filedma-calo-tau.json5 @@ -50,7 +50,7 @@ dev_TCPAutoReconnectOnFailure: "false", // (Minimum) Logging severity: TRACE DEBUG INFO WARNING ERROR FATAL. // Use TRACE to log everything. - log_min_severity: "ERROR", + log_min_severity: "TRACE", threads: 8, // Stores fixed number of orbits per file when nOrbitsPerFile > 1 // If zero, uses a fixed file size (`max_file_size`) instead -- GitLab