diff --git a/src/output.cc b/src/output.cc index 878bde6d3fa0ec58cc7364615a6a542abf0ea4b8..d4bd9dc9830ebec2dc2851c2eda68c085e120113 100644 --- a/src/output.cc +++ b/src/output.cc @@ -20,8 +20,7 @@ static void create_output_directory(std::string& output_directory) /* check if path exists and is a directory */ if (stat(output_directory.c_str(), &sb) == 0) { - if (S_ISDIR (sb.st_mode)) { - LOG(TRACE) << "Output directory already exists: " << output_directory << "'."; + if (S_ISDIR (sb.st_mode)) { // output directory already exists return; } std::string err = "ERROR The output directory path '" + output_directory + "' exists, but the path is not a directory!"; @@ -46,15 +45,13 @@ OutputStream::OutputStream( const std::string output_filename_base, const std::s file_count(-1), control(c), current_file(0), - current_file_a(0), - current_file_b(0), current_run_number(0), journal_name(my_output_filename_base + "/" + output_filename_prefix + '_' + journal_file), conf(conf_), fixedOrbitsPerFile(bool(conf.getNOrbitsPerFile())), index(0), - index_a(0), - index_b(0) + f1(), + f2() { LOG(TRACE) << "Created output filter at " << static_cast<void*>(this); @@ -93,68 +90,79 @@ static bool read_journal(std::string journal_name, uint32_t& run_number, uint32_ return false; } -void* OutputStream::operator()( void* item ) -{ - Slice& out = *static_cast<Slice*>(item); - totcounts += out.get_counts(); +void* OutputStream::OutputFixedSize( Slice& out) { + uint32_t journal_run_number{0}; - uint32_t orbitN = out.get_firstOrbitN(); - uint32_t new_index = 0; - if(this->fixedOrbitsPerFile){new_index = uint32_t(orbitN/conf.getNOrbitsPerFile());} size_t n = 0; - bool already_opened=false; if ( control.running.load(std::memory_order_acquire) || control.output_force_write ) { - if (current_file == NULL || current_run_number != control.run_number) { + open_next_file(); - if(!this->fixedOrbitsPerFile){ - open_next_file(); - already_opened = true; - }else{ - if(index==0){ - open_file(new_index); - } - } - } - - if(this->fixedOrbitsPerFile){ - if(new_index > this->index) { // - if(this->index != 0){close_and_move_file(this->index_b); - } - open_file(new_index); - n = fwrite( out.begin(), 1, out.size(), this->current_file_a ); - this->index = new_index; - }else if(new_index < this->index){ - n = fwrite( out.begin(), 1, out.size(), this->current_file_b ); - }else{ - n = fwrite( out.begin(), 1, out.size(), this->current_file_a ); - } - - } - - if (!this->fixedOrbitsPerFile){ - - if((!already_opened) && (current_file_size > control.max_file_size)) { + if(current_file_size > control.max_file_size) { open_next_file(); } n = fwrite( out.begin(), 1, out.size(), current_file ); current_file_size +=n; } - if ( n != out.size() ) { - LOG(ERROR) << "Can't write into output file: Have to write " << out.size() << ", but write returned " << n; - } } - + if ( n != out.size() ) { + LOG(ERROR) << "Can't write into output file: Have to write " << out.size() << ", but write returned " << n; + } // If not running and we have a file then close it - if ( !this->fixedOrbitsPerFile && !control.running && current_file != NULL && !control.output_force_write ) { + if ( !control.running && current_file != NULL && !control.output_force_write ) { close_and_move_current_file(); file_count = -1; } +} + + + +void* OutputStream::OutputFixedOrbits( Slice& out ) { + + uint32_t orbitN = out.get_firstOrbitN(); + uint32_t new_index = uint32_t(orbitN/conf.getNOrbitsPerFile()); + size_t n = 0; + bool already_opened=false; + + if ( control.running.load(std::memory_order_acquire) || control.output_force_write ) { //i.e should be writing data + if (current_run_number != control.run_number) { + + open_file(new_index); + n = fwrite( out.begin(), 1, out.size(), f1.getFile() ); + + }else{ + + if(new_index > f1.getIndex()) { // i.e new file with higher index should be opened + close_and_move_file(); + open_file(new_index); + n = fwrite( out.begin(), 1, out.size(), f1.getFile() ); + f1.setIndex(new_index); + + }else if(new_index < f1.getIndex()){ // i.e should be written to previous index/file (f2) + n = fwrite( out.begin(), 1, out.size(), f2.getFile() ); + + }else{ // i.e new_index == f1 index + n = fwrite( out.begin(), 1, out.size(), f1.getFile() ); + } + } + current_file_size +=n; + }else{ // i.e should not currently be writing data + close_and_move_file();//once for each open file (should be 2) + close_and_move_file(); + + } +} - if ( this->fixedOrbitsPerFile && !control.running && current_file != NULL && !control.output_force_write ) { - close_and_move_file(this->index); - close_and_move_file(new_index); +void* OutputStream::operator()( void* item ) +{ + Slice& out = *static_cast<Slice*>(item); + totcounts += out.get_counts(); + if(this->fixedOrbitsPerFile){ + OutputFixedOrbits(out); + }else{ + OutputFixedSize(out); } + out.free(); return NULL; } @@ -241,22 +249,24 @@ void OutputStream::open_next_file() update_journal(journal_name, current_run_number, file_count + 1); } -void OutputStream::close_and_move_file(uint32_t index_) //Used for fixedNorbits per file option +void OutputStream::close_and_move_file() //Used for fixedNorbits per file option { - std::string run_file = format_run_file_stem(my_output_filename_prefix, current_run_number, index_); + std::string run_file = format_run_file_stem(my_output_filename_prefix, current_run_number, f2.getIndex()); std::string current_file_name = my_output_filename_base + "/" + working_dir + "/" + run_file; std::string target_file_name = my_output_filename_base + "/" + run_file; - if (this->current_file_b != NULL) { - fclose(this->current_file_b); - } - LOG(INFO) << "rename: " << current_file_name << " to " << target_file_name; - if ( rename(current_file_name.c_str(), target_file_name.c_str()) < 0 ) { - LOG(ERROR) << tools::strerror("File rename failed"); - } - this->current_file_b = this->current_file_a; - this->index_b = this->index_a; - this->current_file_a = NULL; - index_a = NULL; + if (f2.getFile() != NULL) { + fclose(f2.getFile()); + } + LOG(INFO) << "rename: " << current_file_name << " to " << target_file_name; + if ( rename(current_file_name.c_str(), target_file_name.c_str()) < 0 ) { + LOG(ERROR) << tools::strerror("File rename failed"); + } + if(f1.getFile() != NULL){ + f2.setFile(f1.getFile()); + f2.setIndex(f1.getIndex()); + f1.setFile(NULL); + f1.setIndex(NULL); + } } void OutputStream::open_file(uint32_t index_) //Used for fixedNorbits per file option @@ -266,10 +276,10 @@ void OutputStream::open_file(uint32_t index_) //Used for fixedNorbits per file o create_output_directory(output_directory); std::string current_filename_a = output_directory + "/" + format_run_file_stem(my_output_filename_prefix, current_run_number, index_); std::string filename = output_directory + "/" + format_run_file_stem(my_output_filename_prefix, control.run_number, index_); - std::cout << "opening file with index " << index_ << std::endl; - this->current_file_a = fopen( filename.c_str(), "w" ); - this->index_a = index_; - if (this->current_file_a == NULL) { + LOG(INFO) << "opening file with index " << index_; + f1.setFile(fopen( filename.c_str(), "w" )); + f1.setIndex(index_); + if (f1.getFile() == NULL) { std::string err = tools::strerror("ERROR when creating file '" + current_filename_a + "'"); LOG(ERROR) << err; throw std::runtime_error(err); diff --git a/src/output.h b/src/output.h index 862d3ab7dd88b021fe5b5c2b3131a65c49d8143f..b9899ea46883b60d77b6be7994d7e7c37c3bba29 100644 --- a/src/output.h +++ b/src/output.h @@ -8,14 +8,39 @@ #include "config.h" #include "controls.h" + +class OutFile{ + +public: + + OutFile(){ + file_ = NULL; + index_ = NULL; + } + + ~OutFile(){}; + + FILE* getFile(){return file_;} + void setFile(FILE* file){file_ = file;} + uint32_t getIndex(){return index_;} + void setIndex(uint32_t index){index_ = index;} + +private: + FILE* file_; + uint32_t index_; + +}; + + //! Filter that writes each buffer to a file. class OutputStream: public tbb::filter { - public: + OutputStream( const std::string output_filename_base, const std::string output_filename_prefix, ctrl& c, config& conf_); void* operator()( void* item ) /*override*/; - + void* OutputFixedOrbits(Slice& out); + void* OutputFixedSize(Slice& out); private: // used for original fixed file size approach void open_next_file(); @@ -23,12 +48,17 @@ private: FILE *current_file; // used for fixed N orbits per file approach - void close_and_move_file(uint32_t index_); + void close_and_move_file(); void open_file(uint32_t index_); - FILE *current_file_a; //two files open at once + + OutFile f1; + OutFile f2; + +/* FILE *current_file_a; //two files open at once FILE *current_file_b; uint32_t index_a; uint32_t index_b; +*/ std::string my_output_filename_base; std::string my_output_filename_prefix;