From 449d5cb8dc41ca56c377cc04e7fe6ce4698de5a4 Mon Sep 17 00:00:00 2001
From: Giovanna Lazzari Miotto <giovanna.lazzari.miotto@cern.ch>
Date: Tue, 23 Jul 2024 09:50:11 +0200
Subject: [PATCH] fix: pipe: Tag slice with run number on input

Previously, the run number was taken at the time of output, from the
concurrent control  structure, which can be out of sync with the
original data
---
 src/InputFilter.cc       |  8 +++++++-
 src/OutputByOrbit.cc     |  7 ++++---
 src/OutputBySize.cc      |  3 ++-
 src/OutputFile.h         | 10 +++++-----
 src/OutputFileHandler.cc |  8 ++++----
 src/OutputFileHandler.h  |  4 ++--
 src/orbit_processor.cc   | 10 ++++++----
 src/orbit_processor.h    |  3 ++-
 src/processor.cc         |  4 +++-
 src/slice.h              |  4 ++++
 10 files changed, 39 insertions(+), 22 deletions(-)

diff --git a/src/InputFilter.cc b/src/InputFilter.cc
index a79f5010..63e39b51 100644
--- a/src/InputFilter.cc
+++ b/src/InputFilter.cc
@@ -122,6 +122,7 @@ void *InputFilter::operator()(void *) {
   if (!control_.running) {
     return nullptr;
   }
+
   // Prepare destination buffer
   char *buffer = nextSlice_->begin();
   // Available buffer size
@@ -130,7 +131,10 @@ void *InputFilter::operator()(void *) {
   nbReads_++;
 
   // It is optional to use the provided buffer
-  ssize_t bytesRead = readInput(&buffer, bufferSize);
+  auto bytesRead = readInput(&buffer, bufferSize);
+
+  // Acquire run number to tag input data
+  auto current_run = control_.run_number;
 
   // This should really not happen
   assert(bytesRead != 0);
@@ -168,6 +172,8 @@ void *InputFilter::operator()(void *) {
 
   // Adjust the end of this buffer
   thisSlice->set_end(thisSlice->end() + bytesRead);
+  // Tag slice with run number
+  thisSlice->SetRunNumber(current_run);
 
   return thisSlice;
 }
diff --git a/src/OutputByOrbit.cc b/src/OutputByOrbit.cc
index 9b0341bb..672187be 100644
--- a/src/OutputByOrbit.cc
+++ b/src/OutputByOrbit.cc
@@ -23,7 +23,8 @@ OutputByOrbitStream::OutputByOrbitStream(ctrl &control, const ConfigView &conf_v
 OutputByOrbitStream::~OutputByOrbitStream() {}
 
 void OutputByOrbitStream::OutputFixedOrbits(Slice &out) {
-  uint32_t orbitN = out.get_firstOrbitN();
+  auto orbitN = out.get_firstOrbitN();
+  auto run_number = out.GetRunNumber();
   auto index = uint32_t(orbitN / conf_.num_orbits_per_file_);
 
   if ((out.get_counts() != 0) || conf_.support_cmssw_headers_) {
@@ -32,8 +33,8 @@ void OutputByOrbitStream::OutputFixedOrbits(Slice &out) {
       if (!out.isInitialized()) {
         return;
       }
-      output_file_handler_.StageBuffer(out.begin(), out.size(), conf_.num_orbits_per_packet_,
-                                       control_.run_number, index);
+      output_file_handler_.StageSlice(out.begin(), out.size(), conf_.num_orbits_per_packet_,
+                                      run_number, index);
     } else if (output_file_handler_.hasFile()) {
       // the run has been stopped so drop but first check if there is a last
       // file to close
diff --git a/src/OutputBySize.cc b/src/OutputBySize.cc
index bd554ed1..98eecdc9 100644
--- a/src/OutputBySize.cc
+++ b/src/OutputBySize.cc
@@ -96,11 +96,12 @@ static bool read_journal(const std::string &journal_name, uint32_t &run_number,
 
 void *OutputBySizeStream::operator()(void *item) {
   Slice &out = *static_cast<Slice *>(item);
+  auto run_number = out.GetRunNumber();
   totcounts += out.get_counts();
 
   if (control.running.load(std::memory_order_acquire) || control.output_force_write) {
     if (current_file == nullptr || current_file_size > control.max_file_size ||
-        current_run_number != control.run_number || bril_) {
+        current_run_number != run_number || bril_) {
       open_next_file();
     }
 
diff --git a/src/OutputFile.h b/src/OutputFile.h
index bc24ed3b..d4e5ba58 100644
--- a/src/OutputFile.h
+++ b/src/OutputFile.h
@@ -49,12 +49,12 @@ class OutputFile {
     }
   };
 
-  OutputFile(const std::string& working_path, std::string fileName, uint32_t run_number,
-             uint32_t lumisection, uint32_t index, std::string rundir)
-      : working_path_(working_path),
-        filename_(fileName),
+  OutputFile(std::string &&working_path, std::string &&filename, uint32_t run_number,
+             uint32_t lumisection, uint32_t index, std::string &&run_dir)
+      : working_path_(std::move(working_path)),
+        filename_(std::move(filename)),
         md_{run_number, lumisection, index},
-        run_dir_(rundir) {
+        run_dir_(std::move(run_dir)) {
     if (!Open()) {
       auto err = tools::strerror("ERROR when creating file '" + filename_ + "'");
       LOG(ERROR) << err;
diff --git a/src/OutputFileHandler.cc b/src/OutputFileHandler.cc
index d1739702..922f8850 100644
--- a/src/OutputFileHandler.cc
+++ b/src/OutputFileHandler.cc
@@ -108,8 +108,8 @@ bool OutputFileHandler::MaybeCommitFile(uint32_t run, uint32_t index) {
   return false;
 }
 
-int OutputFileHandler::StageBuffer(char* buffer, size_t size_bytes, uint32_t size_orbits,
-                                   uint32_t run_number, uint32_t file_index) {
+int OutputFileHandler::StageSlice(char *buffer, size_t size_bytes, uint32_t size_orbits,
+                                  uint32_t run_number, uint32_t file_index) {
   if (MaybeCommitFile(run_number, file_index)) {
     open_new_file();
   }
@@ -125,8 +125,8 @@ void OutputFileHandler::open_new_file() {
   tools::CreateDirectory(working_dir);
   LOG(TRACE) << "opening file with index " << current_index_ << ", in lumisection " << ls;
 
-  outputFile_ = OutputFile(working_dir, FormatFilename(run_.number, current_index_, ls),
-                           run_.number, ls, index, loc_.GetRunDir(run_.number));
+  outputFile_ = OutputFile(std::move(working_dir), std::move(FormatFilename(run_.number, current_index_, ls)),
+                 run_.number, ls, index, std::move(loc_.GetRunDir(run_.number)));
   // reserve space for CMSSW header if required
   if (getCMSSWHeaders()) {
     outputFile_.ReserveHeaderSize();
diff --git a/src/OutputFileHandler.h b/src/OutputFileHandler.h
index a3549336..199dd0f6 100644
--- a/src/OutputFileHandler.h
+++ b/src/OutputFileHandler.h
@@ -85,8 +85,8 @@ class OutputFileHandler {
 
   void UpdateRunInfo(uint32_t run, uint32_t index);
   bool MaybeCommitFile(uint32_t run, uint32_t index);
-  int StageBuffer(char *buffer, size_t size_bytes, uint32_t size_orbits, uint32_t run_number,
-                  uint32_t file_index);
+  int StageSlice(char *buffer, size_t size_bytes, uint32_t size_orbits, uint32_t run_number,
+                 uint32_t file_index);
 
   void enqueue_current_file_for_close_and_move_maybe();
 
diff --git a/src/orbit_processor.cc b/src/orbit_processor.cc
index 4cdc8f3b..bc4e6b2c 100644
--- a/src/orbit_processor.cc
+++ b/src/orbit_processor.cc
@@ -76,7 +76,7 @@ std::pair<uint32_t, bool> OrbitProcessor::ProcessOrbitHeader(char *rd_ptr) {
 
 // add to header. allocate new[]
 size_t OrbitProcessor::fillFRDEventHeader_V6(char *wr_ptr_FRDHead, uint32_t input_size,
-                                             uint32_t orbit) const {
+                                             uint32_t run_number, uint32_t orbit) const {
   const uint16_t header_version = 6;
   const uint16_t flags = 0;
   const uint32_t crc_dummy = 0;
@@ -88,8 +88,8 @@ size_t OrbitProcessor::fillFRDEventHeader_V6(char *wr_ptr_FRDHead, uint32_t inpu
 
   const uint32_t lumisection =
       1 + static_cast<uint32_t>(orbit / constants::N_orbits_per_lumisection);
-  FRDEventHeader_V6 frdEventHeader_V6(header_version, flags, control_.run_number, lumisection,
-                                      orbit, event_size, crc_dummy);
+  FRDEventHeader_V6 frdEventHeader_V6(header_version, flags, run_number, lumisection, orbit,
+                                      event_size, crc_dummy);
   memcpy(wr_ptr_FRDHead, (char *)&(frdEventHeader_V6), sizeof(frdEventHeader_V6));
   wr_ptr_FRDHead += sizeof(frdEventHeader_V6);
   memcpy(wr_ptr_FRDHead, (char *)&(source_id), sizeof(source_id));
@@ -169,6 +169,7 @@ void OrbitProcessor::ProcessSliceImpl(Slice &input, Slice &out) {
   packet_header *ph_header = nullptr;    // packet header frame
   fragment_header *fh_header = nullptr;  // fragment header frame
   bool is_dropped_orbit = false;
+  auto run_number = input.GetRunNumber();
 
   FillOrbitMetadata meta{0, 0, 0};
 
@@ -223,7 +224,8 @@ void OrbitProcessor::ProcessSliceImpl(Slice &input, Slice &out) {
     uint32_t orbit_size_bytes = GetOrbitSizeInBytes(meta);
 
     if (cmsswHeaders) {
-      additional_header_size = fillFRDEventHeader_V6(wr_ptr, orbit_size_bytes, meta.orbit);
+      additional_header_size =
+          fillFRDEventHeader_V6(wr_ptr, orbit_size_bytes, run_number, meta.orbit);
     }
     wr_ptr += (orbit_size_bytes + additional_header_size);
 
diff --git a/src/orbit_processor.h b/src/orbit_processor.h
index f95c0225..448f1097 100644
--- a/src/orbit_processor.h
+++ b/src/orbit_processor.h
@@ -129,7 +129,8 @@ class OrbitProcessor : public Processor {
   bool HasTrailer(Slice &input, char *&rd_ptr) const;
   bool CheckFrameMultBlock(size_t inputSize, uint16_t nDroppedOrbitsInPacket) const;
   std::pair<uint32_t, bool> ProcessOrbitHeader(char *rd_ptr);
-  size_t fillFRDEventHeader_V6(char *wr_ptr_FRDHead, uint32_t inputSize, uint32_t orbit) const;
+  size_t fillFRDEventHeader_V6(char *wr_ptr_FRDHead, uint32_t inputSize, uint32_t run,
+                               uint32_t orbit) const;
 
   FillOrbitMetadata FillOrbit(orbit_trailer *trailer, char *rd_ptr, char *wr_ptr,
                               const char *rd_end_ptr, const char *wr_end_ptr,
diff --git a/src/processor.cc b/src/processor.cc
index 6880f2ff..01796614 100644
--- a/src/processor.cc
+++ b/src/processor.cc
@@ -16,8 +16,10 @@ void *Processor::operator()(void *item) {
   Slice &input = *static_cast<Slice *>(item);
   Slice &out = *Slice::getAllocated();
   ProcessSlice(input, out);
-  Slice::giveAllocated(&input);
 
+  // Propagate run number metadata from incoming into outgoing slice
+  out.SetRunNumber(input.GetRunNumber());
+  Slice::giveAllocated(&input);
   return &out;
 }
 
diff --git a/src/slice.h b/src/slice.h
index 1ab4016c..59c986ae 100644
--- a/src/slice.h
+++ b/src/slice.h
@@ -16,6 +16,7 @@ class Slice {
   bool output;
   static tbb::concurrent_bounded_queue<Slice *> free_slices;
   uint32_t firstOrbitN_;
+  uint32_t run_number_;
   bool initialized_{false};
 
  public:
@@ -43,6 +44,7 @@ class Slice {
     counts = 0;
     output = false;
     firstOrbitN_ = UINT_MAX;
+    run_number_ = UINT_MAX;
     initialized_ = false;
   }
 
@@ -66,6 +68,8 @@ class Slice {
     firstOrbitN_ = firstOrbitN;
     initialized_ = true;
   }
+  void SetRunNumber(uint32_t run_no) { run_number_ = run_no; }
+  uint32_t GetRunNumber() { return run_number_; }
   bool isInitialized() { return initialized_; }
 };
 #endif
-- 
GitLab