diff --git a/src/Makefile b/src/Makefile index a6d803939038750f15115332c8fbfeb9b08938a4..0dc964e7c1759f53406d29b3f0a684f58d14458a 100644 --- a/src/Makefile +++ b/src/Makefile @@ -12,7 +12,7 @@ TARGET = scdaq # source files -SOURCES = config.cc DmaInputFilter.cc elastico.cc FileDmaInputFilter.cc InputFilter.cc output.cc processor.cc scdaq.cc session.cc slice.cc WZDmaInputFilter.cc MicronDmaInputFilter.cc +SOURCES = config.cc DmaInputFilter.cc elastico.cc FileDmaInputFilter.cc InputFilter.cc outputBySize.cc outputByOrbit.cc processor.cc scdaq.cc session.cc slice.cc WZDmaInputFilter.cc MicronDmaInputFilter.cc C_SOURCES = wz_dma.c # work out names of object files from sources @@ -57,13 +57,14 @@ ${TARGET}: ${OBJECTS} #test2.o : product.h test2.h -scdaq.o: processor.h elastico.h output.h format.h server.h controls.h config.h session.h log.h +scdaq.o: processor.h elastico.h outputBySize.h outputByOrbit.h format.h server.h controls.h config.h session.h log.h config.o: config.h log.h DmaInputFilter.o: DmaInputFilter.h slice.h elastico.o: elastico.h format.h slice.h controls.h log.h FileDmaInputFilter.o: FileDmaInputFilter.h InputFilter.h log.h InputFilter.o: InputFilter.h log.h -output.o: output.h slice.h log.h +outputBySize.o: outputBySize.h slice.h log.h +outputByOrbit.o: outputByOrbit.h slice.h log.h processor.o: processor.h slice.h format.h log.h bril_histo.h session.o: session.h log.h slice.o: slice.h diff --git a/src/outputByOrbit.cc b/src/outputByOrbit.cc index 2f7be11ecb343c44a1fee6072fda839ed40c598e..e398c23db6fdb38fce5c599da74881681cb13cc1 100644 --- a/src/outputByOrbit.cc +++ b/src/outputByOrbit.cc @@ -1,7 +1,7 @@ #include <system_error> #include <fstream> #include <string> -#include "output.h" +#include "outputByOrbit.h" #include "slice.h" #include "log.h" #include "tools.h" @@ -36,7 +36,7 @@ static void create_output_directory(std::string& output_directory) LOG(TRACE) << "Created output directory: " << output_directory << "'."; } -OutputStream::OutputStream( const std::string output_filename_base, const std::string output_filename_prefix, ctrl& c, config& conf_) : +OutputByOrbitStream::OutputByOrbitStream( const std::string output_filename_base, const std::string output_filename_prefix, ctrl& c, config& conf_) : tbb::filter(serial_in_order), my_output_filename_base(output_filename_base), my_output_filename_prefix(output_filename_prefix), @@ -44,14 +44,10 @@ OutputStream::OutputStream( const std::string output_filename_base, const std::s current_file_size(0), file_count(-1), control(c), - current_file(0), current_run_number(0), - journal_name(my_output_filename_base + "/" + output_filename_prefix + '_' + journal_file), conf(conf_), fixedOrbitsPerFile(bool(conf.getNOrbitsPerFile())), - index(0), - f1(), - f2() + outFile() { LOG(TRACE) << "Created output filter at " << static_cast<void*>(this); @@ -60,109 +56,44 @@ OutputStream::OutputStream( const std::string output_filename_base, const std::s create_output_directory(output_directory); } -static void update_journal(std::string journal_name, uint32_t run_number, uint32_t index) -{ - std::string new_journal_name = journal_name + ".new"; - - // Create a new journal file - std::ofstream journal (new_journal_name); - if (journal.is_open()) { - journal << run_number << "\n" << index << "\n"; - journal.close(); - } else { - LOG(ERROR) << "WARNING: Unable to open journal file"; - } - - // Replace the old journal - if (rename(new_journal_name.c_str(), journal_name.c_str()) < 0 ) { - LOG(ERROR) << tools::strerror("Journal file move failed"); - } -} - -static bool read_journal(std::string journal_name, uint32_t& run_number, uint32_t& index) -{ - std::ifstream journal (journal_name); - if (journal.is_open()) { - journal >> run_number >> index; - journal.close(); - return true; - } - return false; -} - -void* OutputStream::OutputFixedSize( Slice& out) { - - uint32_t journal_run_number{0}; - size_t n = 0; - if ( control.running.load(std::memory_order_acquire) || control.output_force_write ) { - if (current_file == NULL || current_run_number != control.run_number) { - open_next_file(); - - if(current_file_size > control.max_file_size) { - open_next_file(); - } - n = fwrite( out.begin(), 1, out.size(), current_file ); - current_file_size +=n; - } - } - if ( n != out.size() ) { - LOG(ERROR) << "Can't write into output file: Have to write " << out.size() << ", but write returned " << n; - } - // If not running and we have a file then close it - if ( !control.running && current_file != NULL && !control.output_force_write ) { - close_and_move_current_file(); - file_count = -1; - } -} - +void* OutputByOrbitStream::OutputFixedOrbits( Slice& out ) { - -void* OutputStream::OutputFixedOrbits( Slice& out ) { - - uint32_t orbitN = out.get_firstOrbitN(); + uint32_t orbitN = out.get_firstOrbitN(); + std::cout << orbitN << std::endl; uint32_t new_index = uint32_t(orbitN/conf.getNOrbitsPerFile()); size_t n = 0; bool already_opened=false; if(out.get_counts() != 0){ - - if ( control.running.load(std::memory_order_acquire) || control.output_force_write ) { //i.e should be writing data - if (current_run_number != control.run_number) { - open_file(new_index); - n = fwrite( out.begin(), 1, out.size(), f1.getFile() ); + if ( control.running.load(std::memory_order_acquire) || control.output_force_write ) { //i.e should be writing data + if (current_run_number != control.run_number) { - }else{ - - if(new_index > f1.getIndex()) { // i.e new file with higher index should be opened - close_and_move_file(); open_file(new_index); - n = fwrite( out.begin(), 1, out.size(), f1.getFile() ); - f1.setIndex(new_index); + n = fwrite( out.begin(), 1, out.size(), outFile.getFile() ); + + }else{ - }else if(new_index < f1.getIndex()){ // i.e should be written to previous index/file (f2) - n = fwrite( out.begin(), 1, out.size(), f2.getFile() ); + if(new_index > outFile.getIndex()) { // i.e new file with higher index should be opened + close_and_move_file(); + open_file(new_index); + outFile.setIndex(new_index); + } - }else{ // i.e new_index == f1 index - n = fwrite( out.begin(), 1, out.size(), f1.getFile() ); - } + n = fwrite( out.begin(), 1, out.size(), outFile.getFile() ); + + } } current_file_size +=n; }else{ // i.e should not currently be writing data close_and_move_file();//once for each open file (should be 2) - close_and_move_file(); - } } -} -void* OutputStream::operator()( void* item ) -{ + +void* OutputByOrbitStream::operator()( void* item ){ + Slice& out = *static_cast<Slice*>(item); totcounts += out.get_counts(); - if(this->fixedOrbitsPerFile){ - OutputFixedOrbits(out); - }else{ - OutputFixedSize(out); - } + OutputFixedOrbits(out); out.free(); return NULL; @@ -179,98 +110,24 @@ static std::string format_run_file_stem(std::string& filename_prefix, uint32_t r return std::string(run_order_stem); } -void OutputStream::close_and_move_current_file() -{ - // Close and move current file - if (current_file) { - fclose(current_file); - current_file = NULL; - - std::string run_file = format_run_file_stem(my_output_filename_prefix, current_run_number, file_count); - std::string current_file_name = my_output_filename_base + "/" + working_dir + "/" + run_file; - std::string target_file_name = my_output_filename_base + "/" + run_file; - - LOG(INFO) << "rename: " << current_file_name << " to " << target_file_name; - if ( rename(current_file_name.c_str(), target_file_name.c_str()) < 0 ) { - LOG(ERROR) << tools::strerror("File rename failed"); - } - current_file_size = 0; - file_count += 1; - } -} - -void OutputStream::open_next_file() +void OutputByOrbitStream::close_and_move_file() //Used for fixedNorbits per file option { - close_and_move_current_file(); - - // We can change current_run_number only here, after the (previous) file was closed and moved - if (current_run_number != control.run_number) { - current_run_number = control.run_number; - file_count = -1; - } - - // If this is the first file then check if we have a journal file - if (file_count < 0) { - // Set default file index - file_count = 0; - - uint32_t journal_run_number; - uint32_t index; - - if (read_journal(journal_name, journal_run_number, index)) { - LOG(INFO) << "We have journal:"; - LOG(INFO) << " run_number: " << journal_run_number; - LOG(INFO) << " index: " << index; - } else { - LOG(INFO) << "No journal file.\n"; - } - - LOG(INFO) << "Current run_number: " << current_run_number; - if (current_run_number == journal_run_number) { - file_count = index; - } - LOG(INFO) << " using index: " << file_count; - } - - // Create the output directory - std::string output_directory = my_output_filename_base + "/" + working_dir; - create_output_directory(output_directory); - - // Create a new file - std::string current_filename = output_directory + "/" + format_run_file_stem(my_output_filename_prefix, current_run_number, file_count); - current_file = fopen( current_filename.c_str(), "w" ); - if (current_file == NULL) { - std::string err = tools::strerror("ERROR when creating file '" + current_filename + "'"); - LOG(ERROR) << err; - throw std::runtime_error(err); - } - - // Update journal file (with the next index file) - update_journal(journal_name, current_run_number, file_count + 1); -} - -void OutputStream::close_and_move_file() //Used for fixedNorbits per file option -{ - std::string run_file = format_run_file_stem(my_output_filename_prefix, current_run_number, f2.getIndex()); + std::string run_file = format_run_file_stem(my_output_filename_prefix, current_run_number, outFile.getIndex()); std::string current_file_name = my_output_filename_base + "/" + working_dir + "/" + run_file; std::string target_file_name = my_output_filename_base + "/" + run_file; - if (f2.getFile() != NULL) { - fclose(f2.getFile()); + if (outFile.getFile() != NULL) { + fclose(outFile.getFile()); + outFile.setFile(NULL); + outFile.setIndex(NULL); } LOG(INFO) << "rename: " << current_file_name << " to " << target_file_name; if ( rename(current_file_name.c_str(), target_file_name.c_str()) < 0 ) { LOG(ERROR) << tools::strerror("File rename failed"); } - if(f1.getFile() != NULL){ - f2.setFile(f1.getFile()); - f2.setIndex(f1.getIndex()); - f1.setFile(NULL); - f1.setIndex(NULL); - } } -void OutputStream::open_file(uint32_t index_) //Used for fixedNorbits per file option +void OutputByOrbitStream::open_file(uint32_t index_) //Used for fixedNorbits per file option { // Create a new file std::string output_directory = my_output_filename_base + "/" + working_dir; @@ -278,9 +135,9 @@ void OutputStream::open_file(uint32_t index_) //Used for fixedNorbits per file o std::string current_filename_a = output_directory + "/" + format_run_file_stem(my_output_filename_prefix, current_run_number, index_); std::string filename = output_directory + "/" + format_run_file_stem(my_output_filename_prefix, control.run_number, index_); LOG(INFO) << "opening file with index " << index_; - f1.setFile(fopen( filename.c_str(), "w" )); - f1.setIndex(index_); - if (f1.getFile() == NULL) { + outFile.setFile(fopen( filename.c_str(), "w" )); + outFile.setIndex(index_); + if (outFile.getFile() == NULL) { std::string err = tools::strerror("ERROR when creating file '" + current_filename_a + "'"); LOG(ERROR) << err; throw std::runtime_error(err); diff --git a/src/outputByOrbit.h b/src/outputByOrbit.h index b9899ea46883b60d77b6be7994d7e7c37c3bba29..71892214023e2d509f55bb2e17380fd15a4058c9 100644 --- a/src/outputByOrbit.h +++ b/src/outputByOrbit.h @@ -1,5 +1,5 @@ -#ifndef OUTPUT_H -#define OUTPUT_H +#ifndef OUTPUTBYORBIT_H +#define OUTPUTBYORBIT_H #include <cstdio> #include <stdint.h> @@ -33,43 +33,29 @@ private: //! Filter that writes each buffer to a file. -class OutputStream: public tbb::filter { +class OutputByOrbitStream: public tbb::filter { public: - OutputStream( const std::string output_filename_base, const std::string output_filename_prefix, ctrl& c, config& conf_); + OutputByOrbitStream( const std::string output_filename_base, const std::string output_filename_prefix, ctrl& c, config& conf_); void* operator()( void* item ) /*override*/; void* OutputFixedOrbits(Slice& out); - void* OutputFixedSize(Slice& out); private: - // used for original fixed file size approach - void open_next_file(); - void close_and_move_current_file(); - FILE *current_file; // used for fixed N orbits per file approach void close_and_move_file(); void open_file(uint32_t index_); - OutFile f1; - OutFile f2; - -/* FILE *current_file_a; //two files open at once - FILE *current_file_b; - uint32_t index_a; - uint32_t index_b; -*/ + OutFile outFile; std::string my_output_filename_base; std::string my_output_filename_prefix; uint32_t totcounts; uint64_t current_file_size; int32_t file_count; + uint32_t current_run_number; ctrl& control; config& conf; - uint32_t current_run_number; - std::string journal_name; - uint32_t index; const bool fixedOrbitsPerFile; }; diff --git a/src/output.cc b/src/outputBySize.cc similarity index 95% rename from src/output.cc rename to src/outputBySize.cc index 35e2f8ef5918e824ebd2affa4ba3e526ad0cb50d..b7c6d3c0a7af199225340de67ebc69b2c8bdcb4b 100644 --- a/src/output.cc +++ b/src/outputBySize.cc @@ -1,7 +1,7 @@ #include <system_error> #include <fstream> #include <string> -#include "output.h" +#include "outputBySize.h" #include "slice.h" #include "log.h" #include "tools.h" @@ -37,7 +37,7 @@ static void create_output_directory(std::string& output_directory) LOG(TRACE) << "Created output directory: " << output_directory << "'."; } -OutputStream::OutputStream( const std::string output_filename_base, const std::string output_filename_prefix, ctrl& c) : +OutputBySizeStream::OutputBySizeStream( const std::string output_filename_base, const std::string output_filename_prefix, ctrl& c) : tbb::filter(serial_in_order), my_output_filename_base(output_filename_base), my_output_filename_prefix(output_filename_prefix), @@ -86,7 +86,7 @@ static bool read_journal(std::string journal_name, uint32_t& run_number, uint32_ return false; } -void* OutputStream::operator()( void* item ) +void* OutputBySizeStream::operator()( void* item ) { Slice& out = *static_cast<Slice*>(item); totcounts += out.get_counts(); @@ -124,7 +124,7 @@ static std::string format_run_file_stem(std::string& filename_prefix, uint32_t r return std::string(run_order_stem); } -void OutputStream::close_and_move_current_file() +void OutputBySizeStream::close_and_move_current_file() { // Close and move current file if (current_file) { @@ -145,7 +145,7 @@ void OutputStream::close_and_move_current_file() } } -void OutputStream::open_next_file() +void OutputBySizeStream::open_next_file() { close_and_move_current_file(); diff --git a/src/output.h b/src/outputBySize.h similarity index 72% rename from src/output.h rename to src/outputBySize.h index cb6de035781aa7bfbcefaf227f2c68d93a8ecb87..294593f030360e61a5930031401076ebed337e93 100644 --- a/src/output.h +++ b/src/outputBySize.h @@ -1,5 +1,5 @@ -#ifndef OUTPUT_H -#define OUTPUT_H +#ifndef OUTPUTBYSIZE_H +#define OUTPUTBYSIZE_H #include <cstdio> #include <stdint.h> @@ -9,11 +9,11 @@ #include "controls.h" //! Filter that writes each buffer to a file. -class OutputStream: public tbb::filter { +class OutputBySizeStream: public tbb::filter { public: - OutputStream( const std::string output_filename_base, const std::string output_filename_prefix, ctrl& c ); + OutputBySizeStream( const std::string output_filename_base, const std::string output_filename_prefix, ctrl& c ); void* operator()( void* item ) /*override*/; private: diff --git a/src/scdaq.cc b/src/scdaq.cc index a7d3706996a3e23ef258218cd94d5d4ed16c1bab..f53386367fb20503fa7a0b63e982b34519c1dd24 100644 --- a/src/scdaq.cc +++ b/src/scdaq.cc @@ -21,7 +21,8 @@ #include "DmaInputFilter.h" #include "processor.h" #include "elastico.h" -#include "output.h" +#include "outputBySize.h" +#include "outputByOrbit.h" #include "format.h" #include "server.h" #include "controls.h" @@ -90,8 +91,14 @@ int run_pipeline( int nbThreads, ctrl& control, config& conf ) } // Create file-writing stage and add it to the pipeline - OutputStream output_stream( conf.getOutputFilenameBase(), conf.getOutputFilenamePrefix(), control, conf); + if ( conf.getNOrbitsPerFile() ){ + OutputByOrbitStream output_stream( conf.getOutputFilenameBase(), conf.getOutputFilenamePrefix(), control, conf ); pipeline.add_filter( output_stream ); + }else{ + OutputBySizeStream output_stream( conf.getOutputFilenameBase(), conf.getOutputFilenamePrefix(), control); + pipeline.add_filter( output_stream ); + } + // Run the pipeline tbb::tick_count t0 = tbb::tick_count::now();