diff --git a/etc/scdaq/scdaq.conf b/etc/scdaq/scdaq.conf index e3b23a400c219a97ce03dbbc7382b15b5ee91c37..179898fb68a2e5b137a4dee79ee16b2c70eb30ab 100644 --- a/etc/scdaq/scdaq.conf +++ b/etc/scdaq/scdaq.conf @@ -29,7 +29,7 @@ packets_per_report:200000 NOrbitsPerDMAPacket:20 # prescale factor, used for *calo* data only -prescale_factor:20 +prescale_factor:1 ## Extra settings for "filedma" input @@ -52,7 +52,7 @@ enable_stream_processor:yes # Note: When changing the processing type, change also "output_filename_prefix" # in the file output section. # -#processor_type:GMT +processor_type:GMT # Enable software zero-supression doZS:yes @@ -63,9 +63,9 @@ doZS:yes ## ################################################################################ -#output_filename_prefix:scout_GMT +output_filename_prefix:scout_GMT -#output_filename_base:/fff/BU0/ramdisk/scdaq +output_filename_base:/fff/BU0/ramdisk/scdaq max_file_size:8589934592 @@ -96,5 +96,10 @@ quality_cut:12 # Pipeline settings threads:8 -#verbosity level, currently supports 0 and 1 -verbosity:0 +# verbosity level, currently supports 0 and 1 +verbosity:1 + +# N orbits to store to each file +# Configured to store fixed number of orbits per file when nOrbitsPerFile > 1 +# Set to 0 to use fixed file size instead +nOrbitsPerFile:0 diff --git a/src/config.h b/src/config.h index b529892ec38fd6964d00711704f8f81d1938ff84..ebf7a05b59f66299ea0772815db0e9fff16d4a15 100644 --- a/src/config.h +++ b/src/config.h @@ -85,12 +85,10 @@ class config { std::string v = vmap.at("NOrbitsPerDMAPacket"); return boost::lexical_cast<uint32_t>(v.c_str()); } - uint32_t getPrescaleFactor() const { std::string v = vmap.at("prescale_factor"); return boost::lexical_cast<uint32_t>(v.c_str()); } - uint32_t getNumThreads() const { std::string v = vmap.at("threads"); return boost::lexical_cast<uint32_t>(v.c_str()); @@ -110,6 +108,10 @@ class config { std::string v = vmap.at("verbosity"); return boost::lexical_cast<uint32_t>(v.c_str()); } + uint32_t getNOrbitsPerFile() const { // If > 0, the fixed N orbits per file option is performed + std::string v = vmap.at("nOrbitsPerFile"); + return boost::lexical_cast<uint32_t>(v.c_str()); + } private: std::map<std::string, std::string> vmap; diff --git a/src/output.cc b/src/output.cc index 21d23cf46677f0dde2b9e766e39e238fb7a735bb..88e91a797d49637e73ae4500e63012297759999c 100644 --- a/src/output.cc +++ b/src/output.cc @@ -44,7 +44,7 @@ static void create_output_directory(std::string &output_directory) { } OutputStream::OutputStream(const std::string output_filename_base, - const std::string output_filename_prefix, ctrl &c) + const std::string output_filename_prefix, ctrl &c, config &conf_) : tbb::filter(serial_in_order), my_output_filename_base(output_filename_base), my_output_filename_prefix(output_filename_prefix), @@ -53,8 +53,15 @@ OutputStream::OutputStream(const std::string output_filename_base, file_count(-1), control(c), current_file(0), + current_file_a(0), + current_file_b(0), current_run_number(0), - journal_name(my_output_filename_base + "/" + output_filename_prefix + '_' + journal_file) { + journal_name(my_output_filename_base + "/" + output_filename_prefix + '_' + journal_file), + conf(conf_), + fixedOrbitsPerFile(bool(conf.getNOrbitsPerFile())), + index(0), + index_a(0), + index_b(0) { LOG(TRACE) << "Created output filter at " << static_cast<void *>(this); // Create the ouput directory @@ -93,15 +100,48 @@ static bool read_journal(std::string journal_name, uint32_t &run_number, uint32_ void *OutputStream::operator()(void *item) { Slice &out = *static_cast<Slice *>(item); totcounts += out.get_counts(); - + uint32_t journal_run_number{0}; + uint32_t orbitN = out.get_firstOrbitN(); + uint32_t new_index = 0; + if (this->fixedOrbitsPerFile) { + new_index = uint32_t(orbitN / conf.getNOrbitsPerFile()); + } + size_t n = 0; + bool already_opened = false; if (control.running.load(std::memory_order_acquire) || control.output_force_write) { - if (current_file == NULL || current_file_size > control.max_file_size || - current_run_number != control.run_number) { - open_next_file(); + if (current_file == NULL || current_run_number != control.run_number) { + if (!this->fixedOrbitsPerFile) { + open_next_file(); + already_opened = true; + } else { + if (index == 0) { + open_file(new_index); + } + } } - size_t n = fwrite(out.begin(), 1, out.size(), current_file); - current_file_size += n; + if (this->fixedOrbitsPerFile) { + if (new_index > this->index) { // + if (this->index != 0) { + close_and_move_file(this->index_b); + } + open_file(new_index); + n = fwrite(out.begin(), 1, out.size(), this->current_file_a); + this->index = new_index; + } else if (new_index < this->index) { + n = fwrite(out.begin(), 1, out.size(), this->current_file_b); + } else { + n = fwrite(out.begin(), 1, out.size(), this->current_file_a); + } + } + + if (!this->fixedOrbitsPerFile) { + if ((!already_opened) && (current_file_size > control.max_file_size)) { + open_next_file(); + } + n = fwrite(out.begin(), 1, out.size(), current_file); + current_file_size += n; + } if (n != out.size()) { LOG(ERROR) << "Can't write into output file: Have to write " << out.size() << ", but write returned " << n; @@ -109,11 +149,17 @@ void *OutputStream::operator()(void *item) { } // If not running and we have a file then close it - if (!control.running && current_file != NULL && !control.output_force_write) { + if (!this->fixedOrbitsPerFile && !control.running && current_file != NULL && + !control.output_force_write) { close_and_move_current_file(); file_count = -1; } + if (this->fixedOrbitsPerFile && !control.running && current_file != NULL && + !control.output_force_write) { + close_and_move_file(this->index); + close_and_move_file(new_index); + } out.free(); return NULL; } @@ -202,3 +248,43 @@ void OutputStream::open_next_file() { // Update journal file (with the next index file) update_journal(journal_name, current_run_number, file_count + 1); } + +void OutputStream::close_and_move_file(uint32_t index_) // Used for fixedNorbits per file option +{ + std::string run_file = + format_run_file_stem(my_output_filename_prefix, current_run_number, index_); + std::string current_file_name = my_output_filename_base + "/" + working_dir + "/" + run_file; + std::string target_file_name = my_output_filename_base + "/" + run_file; + if (this->current_file_b != NULL) { + fclose(this->current_file_b); + } + LOG(INFO) << "rename: " << current_file_name << " to " << target_file_name; + if (rename(current_file_name.c_str(), target_file_name.c_str()) < 0) { + LOG(ERROR) << tools::strerror("File rename failed"); + } + this->current_file_b = this->current_file_a; + this->index_b = this->index_a; + this->current_file_a = NULL; + index_a = NULL; +} + +void OutputStream::open_file(uint32_t index_) // Used for fixedNorbits per file option +{ + // Create a new file + std::string output_directory = my_output_filename_base + "/" + working_dir; + create_output_directory(output_directory); + std::string current_filename_a = + output_directory + "/" + + format_run_file_stem(my_output_filename_prefix, current_run_number, index_); + std::string filename = + output_directory + "/" + + format_run_file_stem(my_output_filename_prefix, control.run_number, index_); + std::cout << "opening file with index " << index_ << std::endl; + this->current_file_a = fopen(filename.c_str(), "w"); + this->index_a = index_; + if (this->current_file_a == NULL) { + std::string err = tools::strerror("ERROR when creating file '" + current_filename_a + "'"); + LOG(ERROR) << err; + throw std::runtime_error(err); + } +} diff --git a/src/output.h b/src/output.h index 3e490b23d74e96198e34421c8b64a5ff3c1ae0db..d7edbf40e02b2c7218368f423d5ea62d433bae30 100644 --- a/src/output.h +++ b/src/output.h @@ -6,6 +6,7 @@ #include <cstdio> #include <string> +#include "config.h" #include "controls.h" #include "tbb/pipeline.h" @@ -13,23 +14,34 @@ class OutputStream : public tbb::filter { public: OutputStream(const std::string output_filename_base, const std::string output_filename_prefix, - ctrl &c); + ctrl &c, config &conf_); void *operator()(void *item) /*override*/; private: + // used for original fixed file size approach void open_next_file(); void close_and_move_current_file(); + FILE *current_file; + + // used for fixed N orbits per file approach + void close_and_move_file(uint32_t index_); + void open_file(uint32_t index_); + FILE *current_file_a; // two files open at once + FILE *current_file_b; + uint32_t index_a; + uint32_t index_b; - private: std::string my_output_filename_base; std::string my_output_filename_prefix; uint32_t totcounts; uint64_t current_file_size; int32_t file_count; ctrl &control; - FILE *current_file; + config &conf; uint32_t current_run_number; std::string journal_name; + uint32_t index; + const bool fixedOrbitsPerFile; }; #endif diff --git a/src/processor.cc b/src/processor.cc index a408db3d04056bb0a0497951e94b5f4e4bac311b..846a760ab8d525387ffc8155aae20012aadbe08a 100644 --- a/src/processor.cc +++ b/src/processor.cc @@ -68,10 +68,8 @@ std::vector<unsigned int> StreamProcessor::CountBX(Slice &input, char *rd_ptr, b orbit_trailer *ot = (orbit_trailer *)(rd_ptr); for (unsigned int k = 0; k < (14 * 8); k++) { // 14*8 = 14 frames, 8 links of orbit trailer // containing BX hitmap - // bit_check(&bx_vect, ot->bx_map[k], (k*32 + 1));// +1 added to account - // for BX counting starting at 1, commented out until firmware fix for - // BX number being one more in the trailer than the CMS convention - bit_check(&bx_vect, ot->bx_map[k], (k * 32)); + bit_check(&bx_vect, ot->bx_map[k], + (k * 32 + 1)); // +1 added to account for BX counting starting at 1 } return bx_vect; } @@ -96,13 +94,14 @@ inline std::pair<uint32_t, bool> StreamProcessor::ProcessOrbitHeader(char *rd_pt // Goes through orbit worth of data and fills the output memory with the calo // data corresponding to the non-empty bunchcrossings, as marked in bx_vect -uint32_t StreamProcessor::FillOrbitCalo(std::vector<unsigned int> &bx_vect, char *rd_ptr, - char *wr_ptr) { +StreamProcessor::fillOrbitMetadata StreamProcessor::FillOrbitCalo( + std::vector<unsigned int> &bx_vect, char *rd_ptr, char *wr_ptr) { std::pair<uint32_t, bool> orbit_header = std::pair<uint32_t, bool>{ ProcessOrbitHeader(rd_ptr)}; //.second is the warning test enable bit rd_ptr += 32; // +32 to account for orbit header uint32_t relbx = uint32_t{0}; uint32_t counts = uint32_t{0}; + uint32_t orbit = uint32_t{orbit_header.first}; while (relbx < bx_vect.size()) { // total number of non-empty BXs in orbit is // given by bx_vect.size() blockCalo *bl = reinterpret_cast<blockCalo *>(rd_ptr); @@ -141,13 +140,18 @@ uint32_t StreamProcessor::FillOrbitCalo(std::vector<unsigned int> &bx_vect, char rd_ptr += sizeof(blockCalo); relbx++; } - return counts; + + StreamProcessor::fillOrbitMetadata meta = { + counts, + orbit, + }; + return meta; } // Goes through orbit worth of data and fills the output memory with the muons // corresponding to the non-empty bunchcrossings, as marked in bx_vect -uint32_t StreamProcessor::FillOrbitMuon(std::vector<unsigned int> &bx_vect, char *rd_ptr, - char *wr_ptr) { +StreamProcessor::fillOrbitMetadata StreamProcessor::FillOrbitMuon( + std::vector<unsigned int> &bx_vect, char *rd_ptr, char *wr_ptr) { std::pair<uint32_t, bool> orbit_header = std::pair<uint32_t, bool>{ ProcessOrbitHeader(rd_ptr)}; //.second is the warning test enable bit rd_ptr += 32; // +32 to account for orbit header @@ -243,7 +247,11 @@ uint32_t StreamProcessor::FillOrbitMuon(std::vector<unsigned int> &bx_vect, char relbx++; } - return counts; + StreamProcessor::fillOrbitMetadata meta = { + counts, + orbit, + }; + return meta; } void StreamProcessor::process(Slice &input, Slice &out) { @@ -262,6 +270,10 @@ void StreamProcessor::process(Slice &input, Slice &out) { uint32_t counts = 0; bool endofpacket = false; uint32_t orbit_per_packet_count = 0; + StreamProcessor::fillOrbitMetadata meta{ + 0, + 0, + }; if (processorType == ProcessorType::PASS_THROUGH) { memcpy(wr_ptr, rd_ptr, input.size()); out.set_end(out.begin() + input.size()); @@ -286,12 +298,14 @@ void StreamProcessor::process(Slice &input, Slice &out) { } std::sort(bx_vect.begin(), bx_vect.end()); if (processorType == ProcessorType::GMT) { - orbitCount = FillOrbitMuon(bx_vect, rd_ptr, wr_ptr); + meta = FillOrbitMuon(bx_vect, rd_ptr, wr_ptr); + orbitCount = meta.counts; ++orbit_per_packet_count; wr_ptr += orbitCount * 12 + 12 * bx_vect.size(); // 12 bytes for each muon/count then 12 // bytes for each bx header } else if (processorType == ProcessorType::CALO) { - orbitCount = FillOrbitCalo(bx_vect, rd_ptr, wr_ptr); + meta = FillOrbitCalo(bx_vect, rd_ptr, wr_ptr); + orbitCount = meta.counts; ++orbit_per_packet_count; // size of calo packet is 4bytes*(8links*7dataWords + 3headerWords)=236 // bytes Note 7 data words per link because we have the "link number" word @@ -306,6 +320,7 @@ void StreamProcessor::process(Slice &input, Slice &out) { constants::orbit_trailer_size; // 32 for orbit header, + nBXs + // orbit trailer counts += orbitCount; + out.set_firstOrbitN(meta.orbit); bx_vect.clear(); if (rd_ptr < input.end()) { diff --git a/src/processor.h b/src/processor.h index 5120f5ba5ae44a4b7b658461f8c1dee0b1d86f62..96d57462f0af03ba851021dbabb9eef287545f01 100644 --- a/src/processor.h +++ b/src/processor.h @@ -23,12 +23,16 @@ class StreamProcessor : public tbb::filter { ~StreamProcessor(); private: + struct fillOrbitMetadata { + uint32_t counts; + uint32_t orbit; + }; void process(Slice &input, Slice &out); bool CheckFrameMultBlock(size_t inputSize); std::vector<unsigned int> CountBX(Slice &input, char *rd_ptr, bool &trailerError); inline std::pair<uint32_t, bool> ProcessOrbitHeader(char *rd_ptr); - uint32_t FillOrbitMuon(std::vector<unsigned int> &bx_vect, char *rd_ptr, char *wr_ptr); - uint32_t FillOrbitCalo(std::vector<unsigned int> &bx_vect, char *rd_ptr, char *wr_ptr); + fillOrbitMetadata FillOrbitMuon(std::vector<unsigned int> &bx_vect, char *rd_ptr, char *wr_ptr); + fillOrbitMetadata FillOrbitCalo(std::vector<unsigned int> &bx_vect, char *rd_ptr, char *wr_ptr); std::ofstream myfile; size_t max_size; uint64_t nbPackets; diff --git a/src/scdaq.cc b/src/scdaq.cc index 6149484bab8dc0480a36c39524f89f6d9f2558fe..922af7d57e9f85084d74dc5e068bba4ed8e121de 100644 --- a/src/scdaq.cc +++ b/src/scdaq.cc @@ -81,7 +81,8 @@ int run_pipeline(int nbThreads, ctrl &control, config &conf) { } // Create file-writing stage and add it to the pipeline - OutputStream output_stream(conf.getOutputFilenameBase(), conf.getOutputFilenamePrefix(), control); + OutputStream output_stream(conf.getOutputFilenameBase(), conf.getOutputFilenamePrefix(), control, + conf); pipeline.add_filter(output_stream); // Run the pipeline diff --git a/src/slice.h b/src/slice.h index d758be348ae821356d2f0596f638e36cb9447fa7..c64f18f21e79582da311794ef6ae8ec5f564abb3 100644 --- a/src/slice.h +++ b/src/slice.h @@ -15,6 +15,7 @@ class Slice { uint32_t counts; bool output; static tbb::concurrent_bounded_queue<Slice *> free_slices; + uint32_t firstOrbitN; public: //! Allocate a Slice object that can hold up to max_size bytes @@ -58,5 +59,7 @@ class Slice { void set_output(bool o) { output = o; } void set_counts(uint32_t c) { counts = c; } uint32_t get_counts() const { return counts; } + uint32_t get_firstOrbitN() const { return firstOrbitN; } + void set_firstOrbitN(uint32_t firstOrbitN_) { firstOrbitN = firstOrbitN_; } }; #endif diff --git a/test/config/scdaq-calo.conf b/test/config/scdaq-calo.conf index f00c1e63e9ae9d673fd713ffb5ab4cf281963faa..9a8b8ac7f6e5d22b7655ceaf497db1ea97487344 100644 --- a/test/config/scdaq-calo.conf +++ b/test/config/scdaq-calo.conf @@ -29,7 +29,7 @@ packets_per_report:200000 NOrbitsPerDMAPacket:20 # prescale that will be used for *calo* data only -prescale_factor:20 +prescale_factor:1 ## Extra settings for "filedma" input @@ -97,3 +97,8 @@ threads:8 # verbosity level, currently supports 0 and 1 verbosity:1 + +# N orbits to store to each file +# Configured to store fixed number of orbits per file when nOrbitsPerFile > 1 +# Set to 0 to use fixed file size instead +nOrbitsPerFile:1000000 diff --git a/test/config/scdaq-gmt.conf b/test/config/scdaq-gmt.conf index 83344d93b8a4f871ffcef69ef8e4f881623738dd..04a0e114900b9ad9380b24472bfd389e1d034aab 100644 --- a/test/config/scdaq-gmt.conf +++ b/test/config/scdaq-gmt.conf @@ -29,7 +29,7 @@ packets_per_report:200000 NOrbitsPerDMAPacket:20 # prescale factor applied to *calo* data only -prescale_factor:20 +prescale_factor:1 ## Extra settings for "filedma" input @@ -97,4 +97,9 @@ quality_cut:12 threads:8 # verbosity level, currently supports 0 and 1 -verbosity:0 +verbosity:1 + +# N orbits to store to each file +# Configured to store fixed number of orbits per file when nOrbitsPerFile > 1 +# Set to 0 to use fixed file size instead +nOrbitsPerFile:1000000