diff --git a/etc/scdaq/scdaq.conf b/etc/scdaq/scdaq.conf index e3b23a400c219a97ce03dbbc7382b15b5ee91c37..179898fb68a2e5b137a4dee79ee16b2c70eb30ab 100644 --- a/etc/scdaq/scdaq.conf +++ b/etc/scdaq/scdaq.conf @@ -29,7 +29,7 @@ packets_per_report:200000 NOrbitsPerDMAPacket:20 # prescale factor, used for *calo* data only -prescale_factor:20 +prescale_factor:1 ## Extra settings for "filedma" input @@ -52,7 +52,7 @@ enable_stream_processor:yes # Note: When changing the processing type, change also "output_filename_prefix" # in the file output section. # -#processor_type:GMT +processor_type:GMT # Enable software zero-supression doZS:yes @@ -63,9 +63,9 @@ doZS:yes ## ################################################################################ -#output_filename_prefix:scout_GMT +output_filename_prefix:scout_GMT -#output_filename_base:/fff/BU0/ramdisk/scdaq +output_filename_base:/fff/BU0/ramdisk/scdaq max_file_size:8589934592 @@ -96,5 +96,10 @@ quality_cut:12 # Pipeline settings threads:8 -#verbosity level, currently supports 0 and 1 -verbosity:0 +# verbosity level, currently supports 0 and 1 +verbosity:1 + +# N orbits to store to each file +# Configured to store fixed number of orbits per file when nOrbitsPerFile > 1 +# Set to 0 to use fixed file size instead +nOrbitsPerFile:0 diff --git a/src/config.h b/src/config.h index 0ba19dd2230cbcd7710a9fad8641f57620c92047..28d2e05dd79ece8a245a23ac8ba0b8c56e3211b1 100644 --- a/src/config.h +++ b/src/config.h @@ -98,12 +98,10 @@ public: std::string v = vmap.at("NOrbitsPerDMAPacket"); return boost::lexical_cast<uint32_t>(v.c_str()); } - uint32_t getPrescaleFactor() const { std::string v = vmap.at("prescale_factor"); return boost::lexical_cast<uint32_t>(v.c_str()); } - uint32_t getNumThreads() const { std::string v = vmap.at("threads"); return boost::lexical_cast<uint32_t>(v.c_str()); @@ -125,6 +123,10 @@ public: std::string v = vmap.at("verbosity"); return boost::lexical_cast<uint32_t>(v.c_str()); } + uint32_t getNOrbitsPerFile() const { //If > 0, the fixed N orbits per file option is performed + std::string v = vmap.at("nOrbitsPerFile"); + return boost::lexical_cast<uint32_t>(v.c_str()); + } private: std::map<std::string,std::string> vmap; diff --git a/src/output.cc b/src/output.cc index 35e2f8ef5918e824ebd2affa4ba3e526ad0cb50d..878bde6d3fa0ec58cc7364615a6a542abf0ea4b8 100644 --- a/src/output.cc +++ b/src/output.cc @@ -20,13 +20,13 @@ static void create_output_directory(std::string& output_directory) /* check if path exists and is a directory */ if (stat(output_directory.c_str(), &sb) == 0) { - if (S_ISDIR (sb.st_mode)) { - LOG(TRACE) << "Output directory already exists: " << output_directory << "'."; - return; - } - std::string err = "ERROR The output directory path '" + output_directory + "' exists, but the path is not a directory!"; - LOG(ERROR) << err; - throw std::runtime_error(err); + if (S_ISDIR (sb.st_mode)) { + LOG(TRACE) << "Output directory already exists: " << output_directory << "'."; + return; + } + std::string err = "ERROR The output directory path '" + output_directory + "' exists, but the path is not a directory!"; + LOG(ERROR) << err; + throw std::runtime_error(err); } if (!tools::filesystem::create_directories(output_directory)) { @@ -37,17 +37,24 @@ static void create_output_directory(std::string& output_directory) LOG(TRACE) << "Created output directory: " << output_directory << "'."; } -OutputStream::OutputStream( const std::string output_filename_base, const std::string output_filename_prefix, ctrl& c) : - tbb::filter(serial_in_order), - my_output_filename_base(output_filename_base), - my_output_filename_prefix(output_filename_prefix), - totcounts(0), - current_file_size(0), - file_count(-1), - control(c), - current_file(0), - current_run_number(0), - journal_name(my_output_filename_base + "/" + output_filename_prefix + '_' + journal_file) +OutputStream::OutputStream( const std::string output_filename_base, const std::string output_filename_prefix, ctrl& c, config& conf_) : + tbb::filter(serial_in_order), + my_output_filename_base(output_filename_base), + my_output_filename_prefix(output_filename_prefix), + totcounts(0), + current_file_size(0), + file_count(-1), + control(c), + current_file(0), + current_file_a(0), + current_file_b(0), + current_run_number(0), + journal_name(my_output_filename_base + "/" + output_filename_prefix + '_' + journal_file), + conf(conf_), + fixedOrbitsPerFile(bool(conf.getNOrbitsPerFile())), + index(0), + index_a(0), + index_b(0) { LOG(TRACE) << "Created output filter at " << static_cast<void*>(this); @@ -77,40 +84,79 @@ static void update_journal(std::string journal_name, uint32_t run_number, uint32 static bool read_journal(std::string journal_name, uint32_t& run_number, uint32_t& index) { - std::ifstream journal (journal_name); - if (journal.is_open()) { - journal >> run_number >> index; - journal.close(); - return true; - } - return false; + std::ifstream journal (journal_name); + if (journal.is_open()) { + journal >> run_number >> index; + journal.close(); + return true; + } + return false; } void* OutputStream::operator()( void* item ) { - Slice& out = *static_cast<Slice*>(item); - totcounts += out.get_counts(); + Slice& out = *static_cast<Slice*>(item); + totcounts += out.get_counts(); + uint32_t journal_run_number{0}; + uint32_t orbitN = out.get_firstOrbitN(); + uint32_t new_index = 0; + if(this->fixedOrbitsPerFile){new_index = uint32_t(orbitN/conf.getNOrbitsPerFile());} + size_t n = 0; + bool already_opened=false; + if ( control.running.load(std::memory_order_acquire) || control.output_force_write ) { - if ( control.running.load(std::memory_order_acquire) || control.output_force_write ) { - if (current_file == NULL || current_file_size > control.max_file_size || current_run_number != control.run_number) { - open_next_file(); - } - - size_t n = fwrite( out.begin(), 1, out.size(), current_file ); - current_file_size += n; - if ( n != out.size() ) { - LOG(ERROR) << "Can't write into output file: Have to write " << out.size() << ", but write returned " << n; + if (current_file == NULL || current_run_number != control.run_number) { + + if(!this->fixedOrbitsPerFile){ + open_next_file(); + already_opened = true; + }else{ + if(index==0){ + open_file(new_index); + } + } + } + + if(this->fixedOrbitsPerFile){ + if(new_index > this->index) { // + if(this->index != 0){close_and_move_file(this->index_b); + } + open_file(new_index); + n = fwrite( out.begin(), 1, out.size(), this->current_file_a ); + this->index = new_index; + }else if(new_index < this->index){ + n = fwrite( out.begin(), 1, out.size(), this->current_file_b ); + }else{ + n = fwrite( out.begin(), 1, out.size(), this->current_file_a ); } + } - // If not running and we have a file then close it - if ( !control.running && current_file != NULL && !control.output_force_write ) { - close_and_move_current_file(); - file_count = -1; + if (!this->fixedOrbitsPerFile){ + + if((!already_opened) && (current_file_size > control.max_file_size)) { + open_next_file(); + } + n = fwrite( out.begin(), 1, out.size(), current_file ); + current_file_size +=n; } + if ( n != out.size() ) { + LOG(ERROR) << "Can't write into output file: Have to write " << out.size() << ", but write returned " << n; + } + } + + // If not running and we have a file then close it + if ( !this->fixedOrbitsPerFile && !control.running && current_file != NULL && !control.output_force_write ) { + close_and_move_current_file(); + file_count = -1; + } - out.free(); - return NULL; + if ( this->fixedOrbitsPerFile && !control.running && current_file != NULL && !control.output_force_write ) { + close_and_move_file(this->index); + close_and_move_file(new_index); + } + out.free(); + return NULL; } /* @@ -192,5 +238,40 @@ void OutputStream::open_next_file() } // Update journal file (with the next index file) - update_journal(journal_name, current_run_number, file_count+1); + update_journal(journal_name, current_run_number, file_count + 1); +} + +void OutputStream::close_and_move_file(uint32_t index_) //Used for fixedNorbits per file option +{ + std::string run_file = format_run_file_stem(my_output_filename_prefix, current_run_number, index_); + std::string current_file_name = my_output_filename_base + "/" + working_dir + "/" + run_file; + std::string target_file_name = my_output_filename_base + "/" + run_file; + if (this->current_file_b != NULL) { + fclose(this->current_file_b); + } + LOG(INFO) << "rename: " << current_file_name << " to " << target_file_name; + if ( rename(current_file_name.c_str(), target_file_name.c_str()) < 0 ) { + LOG(ERROR) << tools::strerror("File rename failed"); + } + this->current_file_b = this->current_file_a; + this->index_b = this->index_a; + this->current_file_a = NULL; + index_a = NULL; +} + +void OutputStream::open_file(uint32_t index_) //Used for fixedNorbits per file option +{ + // Create a new file + std::string output_directory = my_output_filename_base + "/" + working_dir; + create_output_directory(output_directory); + std::string current_filename_a = output_directory + "/" + format_run_file_stem(my_output_filename_prefix, current_run_number, index_); + std::string filename = output_directory + "/" + format_run_file_stem(my_output_filename_prefix, control.run_number, index_); + std::cout << "opening file with index " << index_ << std::endl; + this->current_file_a = fopen( filename.c_str(), "w" ); + this->index_a = index_; + if (this->current_file_a == NULL) { + std::string err = tools::strerror("ERROR when creating file '" + current_filename_a + "'"); + LOG(ERROR) << err; + throw std::runtime_error(err); + } } diff --git a/src/output.h b/src/output.h index cb6de035781aa7bfbcefaf227f2c68d93a8ecb87..862d3ab7dd88b021fe5b5c2b3131a65c49d8143f 100644 --- a/src/output.h +++ b/src/output.h @@ -5,7 +5,7 @@ #include <stdint.h> #include <string> #include "tbb/pipeline.h" - +#include "config.h" #include "controls.h" //! Filter that writes each buffer to a file. @@ -13,23 +13,34 @@ class OutputStream: public tbb::filter { public: - OutputStream( const std::string output_filename_base, const std::string output_filename_prefix, ctrl& c ); + OutputStream( const std::string output_filename_base, const std::string output_filename_prefix, ctrl& c, config& conf_); void* operator()( void* item ) /*override*/; private: + // used for original fixed file size approach void open_next_file(); void close_and_move_current_file(); + FILE *current_file; + + // used for fixed N orbits per file approach + void close_and_move_file(uint32_t index_); + void open_file(uint32_t index_); + FILE *current_file_a; //two files open at once + FILE *current_file_b; + uint32_t index_a; + uint32_t index_b; -private: std::string my_output_filename_base; std::string my_output_filename_prefix; uint32_t totcounts; uint64_t current_file_size; int32_t file_count; ctrl& control; - FILE *current_file; + config& conf; uint32_t current_run_number; std::string journal_name; + uint32_t index; + const bool fixedOrbitsPerFile; }; #endif diff --git a/src/processor.cc b/src/processor.cc index b1315db629a14dd5e2ffc5e0f95e5b06ce814b75..77b3f3abf524192fbf4bcc1ea9f561df1410bd8b 100644 --- a/src/processor.cc +++ b/src/processor.cc @@ -62,9 +62,7 @@ std::vector<unsigned int> StreamProcessor::CountBX(Slice& input, char* rd_ptr, b if(bl->orbit[0]==constants::beefdead){ // found orbit trailer orbit_trailer *ot = (orbit_trailer*)(rd_ptr); for (unsigned int k = 0; k < (14*8); k++){ // 14*8 = 14 frames, 8 links of orbit trailer containing BX hitmap - //bit_check(&bx_vect, ot->bx_map[k], (k*32 + 1));// +1 added to account for BX counting starting at 1, commented out until firmware fix for BX number being one more in the trailer than the CMS convention - bit_check(&bx_vect, ot->bx_map[k], (k*32)); - + bit_check(&bx_vect, ot->bx_map[k], (k*32 + 1));// +1 added to account for BX counting starting at 1 } return bx_vect; } @@ -87,11 +85,12 @@ inline std::pair<uint32_t, bool> StreamProcessor::ProcessOrbitHeader(char* rd_pt } // Goes through orbit worth of data and fills the output memory with the calo data corresponding to the non-empty bunchcrossings, as marked in bx_vect -uint32_t StreamProcessor::FillOrbitCalo(std::vector<unsigned int>& bx_vect, char* rd_ptr, char* wr_ptr){ +StreamProcessor::fillOrbitMetadata StreamProcessor::FillOrbitCalo(std::vector<unsigned int>& bx_vect, char* rd_ptr, char* wr_ptr){ std::pair<uint32_t, bool> orbit_header = std::pair<uint32_t, bool> {ProcessOrbitHeader(rd_ptr)};//.second is the warning test enable bit rd_ptr += 32; // +32 to account for orbit header uint32_t relbx = uint32_t{0}; uint32_t counts = uint32_t{0}; + uint32_t orbit = uint32_t{orbit_header.first}; while(relbx < bx_vect.size()){ //total number of non-empty BXs in orbit is given by bx_vect.size() blockCalo *bl = reinterpret_cast<blockCalo*>(rd_ptr); if(bl->calo0[0]==constants::beefdead){break;} // orbit trailer has been reached, end of orbit data @@ -115,14 +114,16 @@ uint32_t StreamProcessor::FillOrbitCalo(std::vector<unsigned int>& bx_vect, char rd_ptr+=sizeof(blockCalo); relbx++; } - return counts; + + StreamProcessor::fillOrbitMetadata meta = {counts, orbit,}; + return meta; } // Goes through orbit worth of data and fills the output memory with the muons corresponding to the non-empty bunchcrossings, as marked in bx_vect -uint32_t StreamProcessor::FillOrbitMuon(std::vector<unsigned int>& bx_vect, char* rd_ptr, char* wr_ptr){ +StreamProcessor::fillOrbitMetadata StreamProcessor::FillOrbitMuon(std::vector<unsigned int>& bx_vect, char* rd_ptr, char* wr_ptr){ std::pair<uint32_t, bool> orbit_header = std::pair<uint32_t, bool>{ProcessOrbitHeader(rd_ptr)};//.second is the warning test enable bit rd_ptr += 32; // +32 to account for orbit header uint32_t orbit = uint32_t{orbit_header.first}; @@ -148,7 +149,7 @@ uint32_t StreamProcessor::FillOrbitMuon(std::vector<unsigned int>& bx_vect, char AblocksOn[i]=((pt>0) || (doZS==0)); if((pt>0) || (doZS==0)){ - mAcount++; + mAcount++; } pt = (bl->mu2f[i] >> shifts::pt) & masks::pt; BblocksOn[i]=((pt>0) || (doZS==0)); @@ -194,7 +195,8 @@ uint32_t StreamProcessor::FillOrbitMuon(std::vector<unsigned int>& bx_vect, char relbx++; } - return counts; + StreamProcessor::fillOrbitMetadata meta = {counts, orbit,}; + return meta; } void StreamProcessor::process(Slice& input, Slice& out) @@ -212,6 +214,7 @@ void StreamProcessor::process(Slice& input, Slice& out) uint32_t counts = 0; bool endofpacket = false; uint32_t orbit_per_packet_count = 0; + StreamProcessor::fillOrbitMetadata meta{0,0,}; if (processorType == ProcessorType::PASS_THROUGH) { memcpy(wr_ptr,rd_ptr,input.size()); out.set_end(out.begin() + input.size()); @@ -232,11 +235,13 @@ void StreamProcessor::process(Slice& input, Slice& out) } std::sort(bx_vect.begin(), bx_vect.end()); if (processorType == ProcessorType::GMT) { - orbitCount = FillOrbitMuon(bx_vect, rd_ptr, wr_ptr); + meta = FillOrbitMuon(bx_vect, rd_ptr, wr_ptr); + orbitCount = meta.counts; ++orbit_per_packet_count; wr_ptr+= orbitCount*12 + 12*bx_vect.size(); // 12 bytes for each muon/count then 12 bytes for each bx header }else if (processorType == ProcessorType::CALO){ - orbitCount = FillOrbitCalo(bx_vect, rd_ptr, wr_ptr); + meta = FillOrbitCalo(bx_vect, rd_ptr, wr_ptr); + orbitCount = meta.counts; ++orbit_per_packet_count; // size of calo packet is 4bytes*(8links*7dataWords + 3headerWords)=236 bytes // Note 7 data words per link because we have the "link number" word + 6 words from calo L2 @@ -248,6 +253,7 @@ void StreamProcessor::process(Slice& input, Slice& out) } rd_ptr+= 32 + bx_vect.size()*sizeof(blockMuon) + constants::orbit_trailer_size; // 32 for orbit header, + nBXs + orbit trailer counts += orbitCount; + out.set_firstOrbitN(meta.orbit); bx_vect.clear(); if(rd_ptr < input.end()){ diff --git a/src/processor.h b/src/processor.h index b1a52866404a90a58f247c356352fbd773e4c6de..55264a753f6496d4b701081887c2da7d5201a868 100644 --- a/src/processor.h +++ b/src/processor.h @@ -16,18 +16,23 @@ class Slice; class StreamProcessor: public tbb::filter { public: + enum class ProcessorType { PASS_THROUGH, GMT, CALO }; StreamProcessor(size_t max_size_, bool doZS_, ProcessorType processorType_, uint32_t nOrbitsPerDMAPacket_, uint32_t prescaleFactor, ctrl& control); void* operator()( void* item )/*override*/; ~StreamProcessor(); private: + struct fillOrbitMetadata{ + uint32_t counts; + uint32_t orbit; + }; void process(Slice& input, Slice& out); bool CheckFrameMultBlock(size_t inputSize); std::vector<unsigned int> CountBX(Slice& input, char* rd_ptr, bool& trailerError); inline std::pair<uint32_t, bool> ProcessOrbitHeader(char* rd_ptr); - uint32_t FillOrbitMuon(std::vector<unsigned int>& bx_vect, char* rd_ptr, char* wr_ptr); - uint32_t FillOrbitCalo(std::vector<unsigned int>& bx_vect, char* rd_ptr, char* wr_ptr); + fillOrbitMetadata FillOrbitMuon(std::vector<unsigned int>& bx_vect, char* rd_ptr, char* wr_ptr); + fillOrbitMetadata FillOrbitCalo(std::vector<unsigned int>& bx_vect, char* rd_ptr, char* wr_ptr); std::ofstream myfile; size_t max_size; uint64_t nbPackets; diff --git a/src/scdaq.cc b/src/scdaq.cc index 8cac5f185fc22fb2b1580a2418ebc91d013a1a5d..5a86eee0e8bfd2eb028693404569bfa3c36dd530 100644 --- a/src/scdaq.cc +++ b/src/scdaq.cc @@ -39,7 +39,7 @@ int run_pipeline( int nbThreads, ctrl& control, config& conf ) size_t packetBufferSize = conf.getDmaPacketBufferSize(); size_t nbPacketBuffers = conf.getNumberOfDmaPacketBuffers(); uint32_t prescaleFactor = conf.getPrescaleFactor(); -// Create empty input reader, will assign later when we know what is the data source + // Create empty input reader, will assign later when we know what is the data source std::shared_ptr<InputFilter> input_filter; // Create the pipeline @@ -84,7 +84,7 @@ int run_pipeline( int nbThreads, ctrl& control, config& conf ) } // Create file-writing stage and add it to the pipeline - OutputStream output_stream( conf.getOutputFilenameBase(), conf.getOutputFilenamePrefix(), control); + OutputStream output_stream( conf.getOutputFilenameBase(), conf.getOutputFilenamePrefix(), control, conf); pipeline.add_filter( output_stream ); // Run the pipeline @@ -148,7 +148,7 @@ if(argc < 2){ control.n_orbits_per_dma_packet = conf.getNOrbitsPerDMAPacket(); control.verbosity = conf.getVerbosity(); control.excessOrbitsPerPacketCount = 0; - + // Firmware needs at least 1MB buffer for DMA if (conf.getDmaPacketBufferSize() < 1024*1024) { LOG(ERROR) << "dma_packet_buffer_size must be at least 1048576 bytes (1MB), but " << conf.getDmaPacketBufferSize() << " bytes was given. Check the configuration file."; diff --git a/src/slice.h b/src/slice.h index 1c762ed1dbdb3dfedd87f866d305a0b03654413f..46900686eb601eff6c0992e919c0ddbdbbc01e66 100644 --- a/src/slice.h +++ b/src/slice.h @@ -15,7 +15,7 @@ class Slice { uint32_t counts; bool output; static tbb::concurrent_bounded_queue<Slice*> free_slices; - + uint32_t firstOrbitN; public: //! Allocate a Slice object that can hold up to max_size bytes static Slice* allocate( size_t max_size ) { @@ -57,5 +57,7 @@ public: void set_output(bool o) {output=o;} void set_counts(uint32_t c){counts=c;} uint32_t get_counts() const {return counts;} + uint32_t get_firstOrbitN() const {return firstOrbitN;} + void set_firstOrbitN(uint32_t firstOrbitN_){firstOrbitN=firstOrbitN_;} }; #endif diff --git a/test/config/scdaq-calo.conf b/test/config/scdaq-calo.conf index f00c1e63e9ae9d673fd713ffb5ab4cf281963faa..9a8b8ac7f6e5d22b7655ceaf497db1ea97487344 100644 --- a/test/config/scdaq-calo.conf +++ b/test/config/scdaq-calo.conf @@ -29,7 +29,7 @@ packets_per_report:200000 NOrbitsPerDMAPacket:20 # prescale that will be used for *calo* data only -prescale_factor:20 +prescale_factor:1 ## Extra settings for "filedma" input @@ -97,3 +97,8 @@ threads:8 # verbosity level, currently supports 0 and 1 verbosity:1 + +# N orbits to store to each file +# Configured to store fixed number of orbits per file when nOrbitsPerFile > 1 +# Set to 0 to use fixed file size instead +nOrbitsPerFile:1000000 diff --git a/test/config/scdaq-gmt.conf b/test/config/scdaq-gmt.conf index 83344d93b8a4f871ffcef69ef8e4f881623738dd..04a0e114900b9ad9380b24472bfd389e1d034aab 100644 --- a/test/config/scdaq-gmt.conf +++ b/test/config/scdaq-gmt.conf @@ -29,7 +29,7 @@ packets_per_report:200000 NOrbitsPerDMAPacket:20 # prescale factor applied to *calo* data only -prescale_factor:20 +prescale_factor:1 ## Extra settings for "filedma" input @@ -97,4 +97,9 @@ quality_cut:12 threads:8 # verbosity level, currently supports 0 and 1 -verbosity:0 +verbosity:1 + +# N orbits to store to each file +# Configured to store fixed number of orbits per file when nOrbitsPerFile > 1 +# Set to 0 to use fixed file size instead +nOrbitsPerFile:1000000