diff --git a/src/InputFilter.cc b/src/InputFilter.cc index a79f501092c0fccdb8602ea0b9174196f7896afc..63e39b51283f4e5050dda90f64e375c9ba0ecd62 100644 --- a/src/InputFilter.cc +++ b/src/InputFilter.cc @@ -122,6 +122,7 @@ void *InputFilter::operator()(void *) { if (!control_.running) { return nullptr; } + // Prepare destination buffer char *buffer = nextSlice_->begin(); // Available buffer size @@ -130,7 +131,10 @@ void *InputFilter::operator()(void *) { nbReads_++; // It is optional to use the provided buffer - ssize_t bytesRead = readInput(&buffer, bufferSize); + auto bytesRead = readInput(&buffer, bufferSize); + + // Acquire run number to tag input data + auto current_run = control_.run_number; // This should really not happen assert(bytesRead != 0); @@ -168,6 +172,8 @@ void *InputFilter::operator()(void *) { // Adjust the end of this buffer thisSlice->set_end(thisSlice->end() + bytesRead); + // Tag slice with run number + thisSlice->SetRunNumber(current_run); return thisSlice; } diff --git a/src/OutputByOrbit.cc b/src/OutputByOrbit.cc index 9b0341bbaa846aac28188ca07a2f278898ae0511..672187beb6267d9587d38e6ba2bf1e6c5302160b 100644 --- a/src/OutputByOrbit.cc +++ b/src/OutputByOrbit.cc @@ -23,7 +23,8 @@ OutputByOrbitStream::OutputByOrbitStream(ctrl &control, const ConfigView &conf_v OutputByOrbitStream::~OutputByOrbitStream() {} void OutputByOrbitStream::OutputFixedOrbits(Slice &out) { - uint32_t orbitN = out.get_firstOrbitN(); + auto orbitN = out.get_firstOrbitN(); + auto run_number = out.GetRunNumber(); auto index = uint32_t(orbitN / conf_.num_orbits_per_file_); if ((out.get_counts() != 0) || conf_.support_cmssw_headers_) { @@ -32,8 +33,8 @@ void OutputByOrbitStream::OutputFixedOrbits(Slice &out) { if (!out.isInitialized()) { return; } - output_file_handler_.StageBuffer(out.begin(), out.size(), conf_.num_orbits_per_packet_, - control_.run_number, index); + output_file_handler_.StageSlice(out.begin(), out.size(), conf_.num_orbits_per_packet_, + run_number, index); } else if (output_file_handler_.hasFile()) { // the run has been stopped so drop but first check if there is a last // file to close diff --git a/src/OutputBySize.cc b/src/OutputBySize.cc index bd554ed11645e3ca7748e6ebc8eceb0eef1a09f7..98eecdc9adde34f681e9e12d8974191b14cb4fde 100644 --- a/src/OutputBySize.cc +++ b/src/OutputBySize.cc @@ -96,11 +96,12 @@ static bool read_journal(const std::string &journal_name, uint32_t &run_number, void *OutputBySizeStream::operator()(void *item) { Slice &out = *static_cast<Slice *>(item); + auto run_number = out.GetRunNumber(); totcounts += out.get_counts(); if (control.running.load(std::memory_order_acquire) || control.output_force_write) { if (current_file == nullptr || current_file_size > control.max_file_size || - current_run_number != control.run_number || bril_) { + current_run_number != run_number || bril_) { open_next_file(); } diff --git a/src/OutputFile.h b/src/OutputFile.h index bc24ed3bd12d7553d9b2dd0f6d2d477a39247bde..d4e5ba5825ebd0ff2ae1a2c6bc44f58e21917b98 100644 --- a/src/OutputFile.h +++ b/src/OutputFile.h @@ -49,12 +49,12 @@ class OutputFile { } }; - OutputFile(const std::string& working_path, std::string fileName, uint32_t run_number, - uint32_t lumisection, uint32_t index, std::string rundir) - : working_path_(working_path), - filename_(fileName), + OutputFile(std::string &&working_path, std::string &&filename, uint32_t run_number, + uint32_t lumisection, uint32_t index, std::string &&run_dir) + : working_path_(std::move(working_path)), + filename_(std::move(filename)), md_{run_number, lumisection, index}, - run_dir_(rundir) { + run_dir_(std::move(run_dir)) { if (!Open()) { auto err = tools::strerror("ERROR when creating file '" + filename_ + "'"); LOG(ERROR) << err; diff --git a/src/OutputFileHandler.cc b/src/OutputFileHandler.cc index d173970237c2c3ccb707543b631845606c7d66db..922f88501f421ef1366b27f62657a6b929648802 100644 --- a/src/OutputFileHandler.cc +++ b/src/OutputFileHandler.cc @@ -108,8 +108,8 @@ bool OutputFileHandler::MaybeCommitFile(uint32_t run, uint32_t index) { return false; } -int OutputFileHandler::StageBuffer(char* buffer, size_t size_bytes, uint32_t size_orbits, - uint32_t run_number, uint32_t file_index) { +int OutputFileHandler::StageSlice(char *buffer, size_t size_bytes, uint32_t size_orbits, + uint32_t run_number, uint32_t file_index) { if (MaybeCommitFile(run_number, file_index)) { open_new_file(); } @@ -125,8 +125,8 @@ void OutputFileHandler::open_new_file() { tools::CreateDirectory(working_dir); LOG(TRACE) << "opening file with index " << current_index_ << ", in lumisection " << ls; - outputFile_ = OutputFile(working_dir, FormatFilename(run_.number, current_index_, ls), - run_.number, ls, index, loc_.GetRunDir(run_.number)); + outputFile_ = OutputFile(std::move(working_dir), std::move(FormatFilename(run_.number, current_index_, ls)), + run_.number, ls, index, std::move(loc_.GetRunDir(run_.number))); // reserve space for CMSSW header if required if (getCMSSWHeaders()) { outputFile_.ReserveHeaderSize(); diff --git a/src/OutputFileHandler.h b/src/OutputFileHandler.h index a3549336762c895d9f566f49b383371f92dbfd92..199dd0f6a741fc9fa813671ff3a4128bd74bf8e2 100644 --- a/src/OutputFileHandler.h +++ b/src/OutputFileHandler.h @@ -85,8 +85,8 @@ class OutputFileHandler { void UpdateRunInfo(uint32_t run, uint32_t index); bool MaybeCommitFile(uint32_t run, uint32_t index); - int StageBuffer(char *buffer, size_t size_bytes, uint32_t size_orbits, uint32_t run_number, - uint32_t file_index); + int StageSlice(char *buffer, size_t size_bytes, uint32_t size_orbits, uint32_t run_number, + uint32_t file_index); void enqueue_current_file_for_close_and_move_maybe(); diff --git a/src/orbit_processor.cc b/src/orbit_processor.cc index 4cdc8f3b8a29720b401cca0a5bc28c312bd4d94d..bc4e6b2c101ba3e6efca3915a99144a7e23dd860 100644 --- a/src/orbit_processor.cc +++ b/src/orbit_processor.cc @@ -76,7 +76,7 @@ std::pair<uint32_t, bool> OrbitProcessor::ProcessOrbitHeader(char *rd_ptr) { // add to header. allocate new[] size_t OrbitProcessor::fillFRDEventHeader_V6(char *wr_ptr_FRDHead, uint32_t input_size, - uint32_t orbit) const { + uint32_t run_number, uint32_t orbit) const { const uint16_t header_version = 6; const uint16_t flags = 0; const uint32_t crc_dummy = 0; @@ -88,8 +88,8 @@ size_t OrbitProcessor::fillFRDEventHeader_V6(char *wr_ptr_FRDHead, uint32_t inpu const uint32_t lumisection = 1 + static_cast<uint32_t>(orbit / constants::N_orbits_per_lumisection); - FRDEventHeader_V6 frdEventHeader_V6(header_version, flags, control_.run_number, lumisection, - orbit, event_size, crc_dummy); + FRDEventHeader_V6 frdEventHeader_V6(header_version, flags, run_number, lumisection, orbit, + event_size, crc_dummy); memcpy(wr_ptr_FRDHead, (char *)&(frdEventHeader_V6), sizeof(frdEventHeader_V6)); wr_ptr_FRDHead += sizeof(frdEventHeader_V6); memcpy(wr_ptr_FRDHead, (char *)&(source_id), sizeof(source_id)); @@ -169,6 +169,7 @@ void OrbitProcessor::ProcessSliceImpl(Slice &input, Slice &out) { packet_header *ph_header = nullptr; // packet header frame fragment_header *fh_header = nullptr; // fragment header frame bool is_dropped_orbit = false; + auto run_number = input.GetRunNumber(); FillOrbitMetadata meta{0, 0, 0}; @@ -223,7 +224,8 @@ void OrbitProcessor::ProcessSliceImpl(Slice &input, Slice &out) { uint32_t orbit_size_bytes = GetOrbitSizeInBytes(meta); if (cmsswHeaders) { - additional_header_size = fillFRDEventHeader_V6(wr_ptr, orbit_size_bytes, meta.orbit); + additional_header_size = + fillFRDEventHeader_V6(wr_ptr, orbit_size_bytes, run_number, meta.orbit); } wr_ptr += (orbit_size_bytes + additional_header_size); diff --git a/src/orbit_processor.h b/src/orbit_processor.h index f95c0225003cd337f36ae7c4caf68c947ba3a3b1..448f109733f3da0b257537bb9217e547cf9e45b7 100644 --- a/src/orbit_processor.h +++ b/src/orbit_processor.h @@ -129,7 +129,8 @@ class OrbitProcessor : public Processor { bool HasTrailer(Slice &input, char *&rd_ptr) const; bool CheckFrameMultBlock(size_t inputSize, uint16_t nDroppedOrbitsInPacket) const; std::pair<uint32_t, bool> ProcessOrbitHeader(char *rd_ptr); - size_t fillFRDEventHeader_V6(char *wr_ptr_FRDHead, uint32_t inputSize, uint32_t orbit) const; + size_t fillFRDEventHeader_V6(char *wr_ptr_FRDHead, uint32_t inputSize, uint32_t run, + uint32_t orbit) const; FillOrbitMetadata FillOrbit(orbit_trailer *trailer, char *rd_ptr, char *wr_ptr, const char *rd_end_ptr, const char *wr_end_ptr, diff --git a/src/processor.cc b/src/processor.cc index 6880f2ff9c1e8d4d8d19838c763329f4b51bb6b4..01796614a8e933201b338b95ecb4d9c217781dcc 100644 --- a/src/processor.cc +++ b/src/processor.cc @@ -16,8 +16,10 @@ void *Processor::operator()(void *item) { Slice &input = *static_cast<Slice *>(item); Slice &out = *Slice::getAllocated(); ProcessSlice(input, out); - Slice::giveAllocated(&input); + // Propagate run number metadata from incoming into outgoing slice + out.SetRunNumber(input.GetRunNumber()); + Slice::giveAllocated(&input); return &out; } diff --git a/src/slice.h b/src/slice.h index 1ab4016cbee795e6b749c1c1a6198ffe7a3ff1b0..59c986aecfe8b4be211d330e8dce640a2f741221 100644 --- a/src/slice.h +++ b/src/slice.h @@ -16,6 +16,7 @@ class Slice { bool output; static tbb::concurrent_bounded_queue<Slice *> free_slices; uint32_t firstOrbitN_; + uint32_t run_number_; bool initialized_{false}; public: @@ -43,6 +44,7 @@ class Slice { counts = 0; output = false; firstOrbitN_ = UINT_MAX; + run_number_ = UINT_MAX; initialized_ = false; } @@ -66,6 +68,8 @@ class Slice { firstOrbitN_ = firstOrbitN; initialized_ = true; } + void SetRunNumber(uint32_t run_no) { run_number_ = run_no; } + uint32_t GetRunNumber() { return run_number_; } bool isInitialized() { return initialized_; } }; #endif