diff --git a/src/output.cc b/src/output.cc index 2f7be11ecb343c44a1fee6072fda839ed40c598e..35e2f8ef5918e824ebd2affa4ba3e526ad0cb50d 100644 --- a/src/output.cc +++ b/src/output.cc @@ -20,12 +20,13 @@ static void create_output_directory(std::string& output_directory) /* check if path exists and is a directory */ if (stat(output_directory.c_str(), &sb) == 0) { - if (S_ISDIR (sb.st_mode)) { // output directory already exists - return; - } - std::string err = "ERROR The output directory path '" + output_directory + "' exists, but the path is not a directory!"; - LOG(ERROR) << err; - throw std::runtime_error(err); + if (S_ISDIR (sb.st_mode)) { + LOG(TRACE) << "Output directory already exists: " << output_directory << "'."; + return; + } + std::string err = "ERROR The output directory path '" + output_directory + "' exists, but the path is not a directory!"; + LOG(ERROR) << err; + throw std::runtime_error(err); } if (!tools::filesystem::create_directories(output_directory)) { @@ -36,22 +37,17 @@ static void create_output_directory(std::string& output_directory) LOG(TRACE) << "Created output directory: " << output_directory << "'."; } -OutputStream::OutputStream( const std::string output_filename_base, const std::string output_filename_prefix, ctrl& c, config& conf_) : - tbb::filter(serial_in_order), - my_output_filename_base(output_filename_base), - my_output_filename_prefix(output_filename_prefix), - totcounts(0), - current_file_size(0), - file_count(-1), - control(c), - current_file(0), - current_run_number(0), - journal_name(my_output_filename_base + "/" + output_filename_prefix + '_' + journal_file), - conf(conf_), - fixedOrbitsPerFile(bool(conf.getNOrbitsPerFile())), - index(0), - f1(), - f2() +OutputStream::OutputStream( const std::string output_filename_base, const std::string output_filename_prefix, ctrl& c) : + tbb::filter(serial_in_order), + my_output_filename_base(output_filename_base), + my_output_filename_prefix(output_filename_prefix), + totcounts(0), + current_file_size(0), + file_count(-1), + control(c), + current_file(0), + current_run_number(0), + journal_name(my_output_filename_base + "/" + output_filename_prefix + '_' + journal_file) { LOG(TRACE) << "Created output filter at " << static_cast<void*>(this); @@ -81,91 +77,40 @@ static void update_journal(std::string journal_name, uint32_t run_number, uint32 static bool read_journal(std::string journal_name, uint32_t& run_number, uint32_t& index) { - std::ifstream journal (journal_name); - if (journal.is_open()) { - journal >> run_number >> index; - journal.close(); - return true; - } - return false; + std::ifstream journal (journal_name); + if (journal.is_open()) { + journal >> run_number >> index; + journal.close(); + return true; + } + return false; } -void* OutputStream::OutputFixedSize( Slice& out) { - - uint32_t journal_run_number{0}; - size_t n = 0; - if ( control.running.load(std::memory_order_acquire) || control.output_force_write ) { - if (current_file == NULL || current_run_number != control.run_number) { - open_next_file(); +void* OutputStream::operator()( void* item ) +{ + Slice& out = *static_cast<Slice*>(item); + totcounts += out.get_counts(); - if(current_file_size > control.max_file_size) { - open_next_file(); + if ( control.running.load(std::memory_order_acquire) || control.output_force_write ) { + if (current_file == NULL || current_file_size > control.max_file_size || current_run_number != control.run_number) { + open_next_file(); } - n = fwrite( out.begin(), 1, out.size(), current_file ); - current_file_size +=n; - } - } - if ( n != out.size() ) { - LOG(ERROR) << "Can't write into output file: Have to write " << out.size() << ", but write returned " << n; - } - // If not running and we have a file then close it - if ( !control.running && current_file != NULL && !control.output_force_write ) { - close_and_move_current_file(); - file_count = -1; - } -} - - - -void* OutputStream::OutputFixedOrbits( Slice& out ) { - - uint32_t orbitN = out.get_firstOrbitN(); - uint32_t new_index = uint32_t(orbitN/conf.getNOrbitsPerFile()); - size_t n = 0; - bool already_opened=false; - if(out.get_counts() != 0){ - - if ( control.running.load(std::memory_order_acquire) || control.output_force_write ) { //i.e should be writing data - if (current_run_number != control.run_number) { - - open_file(new_index); - n = fwrite( out.begin(), 1, out.size(), f1.getFile() ); - - }else{ - - if(new_index > f1.getIndex()) { // i.e new file with higher index should be opened - close_and_move_file(); - open_file(new_index); - n = fwrite( out.begin(), 1, out.size(), f1.getFile() ); - f1.setIndex(new_index); - - }else if(new_index < f1.getIndex()){ // i.e should be written to previous index/file (f2) - n = fwrite( out.begin(), 1, out.size(), f2.getFile() ); - - }else{ // i.e new_index == f1 index - n = fwrite( out.begin(), 1, out.size(), f1.getFile() ); + + size_t n = fwrite( out.begin(), 1, out.size(), current_file ); + current_file_size += n; + if ( n != out.size() ) { + LOG(ERROR) << "Can't write into output file: Have to write " << out.size() << ", but write returned " << n; } } - current_file_size +=n; - }else{ // i.e should not currently be writing data - close_and_move_file();//once for each open file (should be 2) - close_and_move_file(); - } -} -} -void* OutputStream::operator()( void* item ) -{ - Slice& out = *static_cast<Slice*>(item); - totcounts += out.get_counts(); - if(this->fixedOrbitsPerFile){ - OutputFixedOrbits(out); - }else{ - OutputFixedSize(out); - } + // If not running and we have a file then close it + if ( !control.running && current_file != NULL && !control.output_force_write ) { + close_and_move_current_file(); + file_count = -1; + } - out.free(); - return NULL; + out.free(); + return NULL; } /* @@ -247,42 +192,5 @@ void OutputStream::open_next_file() } // Update journal file (with the next index file) - update_journal(journal_name, current_run_number, file_count + 1); -} - -void OutputStream::close_and_move_file() //Used for fixedNorbits per file option -{ - std::string run_file = format_run_file_stem(my_output_filename_prefix, current_run_number, f2.getIndex()); - std::string current_file_name = my_output_filename_base + "/" + working_dir + "/" + run_file; - std::string target_file_name = my_output_filename_base + "/" + run_file; - if (f2.getFile() != NULL) { - fclose(f2.getFile()); - } - LOG(INFO) << "rename: " << current_file_name << " to " << target_file_name; - if ( rename(current_file_name.c_str(), target_file_name.c_str()) < 0 ) { - LOG(ERROR) << tools::strerror("File rename failed"); - } - if(f1.getFile() != NULL){ - f2.setFile(f1.getFile()); - f2.setIndex(f1.getIndex()); - f1.setFile(NULL); - f1.setIndex(NULL); - } -} - -void OutputStream::open_file(uint32_t index_) //Used for fixedNorbits per file option -{ - // Create a new file - std::string output_directory = my_output_filename_base + "/" + working_dir; - create_output_directory(output_directory); - std::string current_filename_a = output_directory + "/" + format_run_file_stem(my_output_filename_prefix, current_run_number, index_); - std::string filename = output_directory + "/" + format_run_file_stem(my_output_filename_prefix, control.run_number, index_); - LOG(INFO) << "opening file with index " << index_; - f1.setFile(fopen( filename.c_str(), "w" )); - f1.setIndex(index_); - if (f1.getFile() == NULL) { - std::string err = tools::strerror("ERROR when creating file '" + current_filename_a + "'"); - LOG(ERROR) << err; - throw std::runtime_error(err); - } + update_journal(journal_name, current_run_number, file_count+1); } diff --git a/src/output.h b/src/output.h index b9899ea46883b60d77b6be7994d7e7c37c3bba29..cb6de035781aa7bfbcefaf227f2c68d93a8ecb87 100644 --- a/src/output.h +++ b/src/output.h @@ -5,72 +5,31 @@ #include <stdint.h> #include <string> #include "tbb/pipeline.h" -#include "config.h" -#include "controls.h" - - -class OutFile{ - -public: - - OutFile(){ - file_ = NULL; - index_ = NULL; - } - - ~OutFile(){}; - - FILE* getFile(){return file_;} - void setFile(FILE* file){file_ = file;} - uint32_t getIndex(){return index_;} - void setIndex(uint32_t index){index_ = index;} - -private: - FILE* file_; - uint32_t index_; - -}; +#include "controls.h" //! Filter that writes each buffer to a file. class OutputStream: public tbb::filter { + public: - - OutputStream( const std::string output_filename_base, const std::string output_filename_prefix, ctrl& c, config& conf_); + OutputStream( const std::string output_filename_base, const std::string output_filename_prefix, ctrl& c ); void* operator()( void* item ) /*override*/; - void* OutputFixedOrbits(Slice& out); - void* OutputFixedSize(Slice& out); + private: - // used for original fixed file size approach void open_next_file(); void close_and_move_current_file(); - FILE *current_file; - - // used for fixed N orbits per file approach - void close_and_move_file(); - void open_file(uint32_t index_); - - OutFile f1; - OutFile f2; - -/* FILE *current_file_a; //two files open at once - FILE *current_file_b; - uint32_t index_a; - uint32_t index_b; -*/ +private: std::string my_output_filename_base; std::string my_output_filename_prefix; uint32_t totcounts; uint64_t current_file_size; int32_t file_count; ctrl& control; - config& conf; + FILE *current_file; uint32_t current_run_number; std::string journal_name; - uint32_t index; - const bool fixedOrbitsPerFile; }; #endif diff --git a/src/outputByOrbit.cc b/src/outputByOrbit.cc new file mode 100644 index 0000000000000000000000000000000000000000..2f7be11ecb343c44a1fee6072fda839ed40c598e --- /dev/null +++ b/src/outputByOrbit.cc @@ -0,0 +1,288 @@ +#include <system_error> +#include <fstream> +#include <string> +#include "output.h" +#include "slice.h" +#include "log.h" +#include "tools.h" +#include <iostream> +#include <stdio.h> + +/* Defines the journal file. Note: Filename prefix is added making the final filename */ +static const std::string journal_file { "index.journal" }; + +/* Defined where are the files stored before they are moved to the final destination */ +static const std::string working_dir { "in_progress" }; + +static void create_output_directory(std::string& output_directory) +{ + struct stat sb; + + /* check if path exists and is a directory */ + if (stat(output_directory.c_str(), &sb) == 0) { + if (S_ISDIR (sb.st_mode)) { // output directory already exists + return; + } + std::string err = "ERROR The output directory path '" + output_directory + "' exists, but the path is not a directory!"; + LOG(ERROR) << err; + throw std::runtime_error(err); + } + + if (!tools::filesystem::create_directories(output_directory)) { + std::string err = tools::strerror("ERROR when creating the output directory '" + output_directory + "'"); + LOG(ERROR) << err; + throw std::runtime_error(err); + } + LOG(TRACE) << "Created output directory: " << output_directory << "'."; +} + +OutputStream::OutputStream( const std::string output_filename_base, const std::string output_filename_prefix, ctrl& c, config& conf_) : + tbb::filter(serial_in_order), + my_output_filename_base(output_filename_base), + my_output_filename_prefix(output_filename_prefix), + totcounts(0), + current_file_size(0), + file_count(-1), + control(c), + current_file(0), + current_run_number(0), + journal_name(my_output_filename_base + "/" + output_filename_prefix + '_' + journal_file), + conf(conf_), + fixedOrbitsPerFile(bool(conf.getNOrbitsPerFile())), + index(0), + f1(), + f2() +{ + LOG(TRACE) << "Created output filter at " << static_cast<void*>(this); + + // Create the ouput directory + std::string output_directory = my_output_filename_base + "/" + working_dir; + create_output_directory(output_directory); +} + +static void update_journal(std::string journal_name, uint32_t run_number, uint32_t index) +{ + std::string new_journal_name = journal_name + ".new"; + + // Create a new journal file + std::ofstream journal (new_journal_name); + if (journal.is_open()) { + journal << run_number << "\n" << index << "\n"; + journal.close(); + } else { + LOG(ERROR) << "WARNING: Unable to open journal file"; + } + + // Replace the old journal + if (rename(new_journal_name.c_str(), journal_name.c_str()) < 0 ) { + LOG(ERROR) << tools::strerror("Journal file move failed"); + } +} + +static bool read_journal(std::string journal_name, uint32_t& run_number, uint32_t& index) +{ + std::ifstream journal (journal_name); + if (journal.is_open()) { + journal >> run_number >> index; + journal.close(); + return true; + } + return false; +} + +void* OutputStream::OutputFixedSize( Slice& out) { + + uint32_t journal_run_number{0}; + size_t n = 0; + if ( control.running.load(std::memory_order_acquire) || control.output_force_write ) { + if (current_file == NULL || current_run_number != control.run_number) { + open_next_file(); + + if(current_file_size > control.max_file_size) { + open_next_file(); + } + n = fwrite( out.begin(), 1, out.size(), current_file ); + current_file_size +=n; + } + } + if ( n != out.size() ) { + LOG(ERROR) << "Can't write into output file: Have to write " << out.size() << ", but write returned " << n; + } + // If not running and we have a file then close it + if ( !control.running && current_file != NULL && !control.output_force_write ) { + close_and_move_current_file(); + file_count = -1; + } +} + + + +void* OutputStream::OutputFixedOrbits( Slice& out ) { + + uint32_t orbitN = out.get_firstOrbitN(); + uint32_t new_index = uint32_t(orbitN/conf.getNOrbitsPerFile()); + size_t n = 0; + bool already_opened=false; + if(out.get_counts() != 0){ + + if ( control.running.load(std::memory_order_acquire) || control.output_force_write ) { //i.e should be writing data + if (current_run_number != control.run_number) { + + open_file(new_index); + n = fwrite( out.begin(), 1, out.size(), f1.getFile() ); + + }else{ + + if(new_index > f1.getIndex()) { // i.e new file with higher index should be opened + close_and_move_file(); + open_file(new_index); + n = fwrite( out.begin(), 1, out.size(), f1.getFile() ); + f1.setIndex(new_index); + + }else if(new_index < f1.getIndex()){ // i.e should be written to previous index/file (f2) + n = fwrite( out.begin(), 1, out.size(), f2.getFile() ); + + }else{ // i.e new_index == f1 index + n = fwrite( out.begin(), 1, out.size(), f1.getFile() ); + } + } + current_file_size +=n; + }else{ // i.e should not currently be writing data + close_and_move_file();//once for each open file (should be 2) + close_and_move_file(); + + } +} +} +void* OutputStream::operator()( void* item ) +{ + Slice& out = *static_cast<Slice*>(item); + totcounts += out.get_counts(); + if(this->fixedOrbitsPerFile){ + OutputFixedOrbits(out); + }else{ + OutputFixedSize(out); + } + + out.free(); + return NULL; +} + +/* + * Create a properly formatted file name + * TODO: Change to C++ + */ +static std::string format_run_file_stem(std::string& filename_prefix, uint32_t run_number, int32_t file_count) +{ + char run_order_stem[PATH_MAX]; + snprintf(run_order_stem, sizeof(run_order_stem), "%s_%06d_%06d.dat", filename_prefix.c_str(), run_number, file_count); + return std::string(run_order_stem); +} + +void OutputStream::close_and_move_current_file() +{ + // Close and move current file + if (current_file) { + fclose(current_file); + current_file = NULL; + + std::string run_file = format_run_file_stem(my_output_filename_prefix, current_run_number, file_count); + std::string current_file_name = my_output_filename_base + "/" + working_dir + "/" + run_file; + std::string target_file_name = my_output_filename_base + "/" + run_file; + + LOG(INFO) << "rename: " << current_file_name << " to " << target_file_name; + if ( rename(current_file_name.c_str(), target_file_name.c_str()) < 0 ) { + LOG(ERROR) << tools::strerror("File rename failed"); + } + + current_file_size = 0; + file_count += 1; + } +} + +void OutputStream::open_next_file() +{ + close_and_move_current_file(); + + // We can change current_run_number only here, after the (previous) file was closed and moved + if (current_run_number != control.run_number) { + current_run_number = control.run_number; + file_count = -1; + } + + // If this is the first file then check if we have a journal file + if (file_count < 0) { + // Set default file index + file_count = 0; + + uint32_t journal_run_number; + uint32_t index; + + if (read_journal(journal_name, journal_run_number, index)) { + LOG(INFO) << "We have journal:"; + LOG(INFO) << " run_number: " << journal_run_number; + LOG(INFO) << " index: " << index; + } else { + LOG(INFO) << "No journal file.\n"; + } + + LOG(INFO) << "Current run_number: " << current_run_number; + if (current_run_number == journal_run_number) { + file_count = index; + } + LOG(INFO) << " using index: " << file_count; + } + + // Create the output directory + std::string output_directory = my_output_filename_base + "/" + working_dir; + create_output_directory(output_directory); + + // Create a new file + std::string current_filename = output_directory + "/" + format_run_file_stem(my_output_filename_prefix, current_run_number, file_count); + current_file = fopen( current_filename.c_str(), "w" ); + if (current_file == NULL) { + std::string err = tools::strerror("ERROR when creating file '" + current_filename + "'"); + LOG(ERROR) << err; + throw std::runtime_error(err); + } + + // Update journal file (with the next index file) + update_journal(journal_name, current_run_number, file_count + 1); +} + +void OutputStream::close_and_move_file() //Used for fixedNorbits per file option +{ + std::string run_file = format_run_file_stem(my_output_filename_prefix, current_run_number, f2.getIndex()); + std::string current_file_name = my_output_filename_base + "/" + working_dir + "/" + run_file; + std::string target_file_name = my_output_filename_base + "/" + run_file; + if (f2.getFile() != NULL) { + fclose(f2.getFile()); + } + LOG(INFO) << "rename: " << current_file_name << " to " << target_file_name; + if ( rename(current_file_name.c_str(), target_file_name.c_str()) < 0 ) { + LOG(ERROR) << tools::strerror("File rename failed"); + } + if(f1.getFile() != NULL){ + f2.setFile(f1.getFile()); + f2.setIndex(f1.getIndex()); + f1.setFile(NULL); + f1.setIndex(NULL); + } +} + +void OutputStream::open_file(uint32_t index_) //Used for fixedNorbits per file option +{ + // Create a new file + std::string output_directory = my_output_filename_base + "/" + working_dir; + create_output_directory(output_directory); + std::string current_filename_a = output_directory + "/" + format_run_file_stem(my_output_filename_prefix, current_run_number, index_); + std::string filename = output_directory + "/" + format_run_file_stem(my_output_filename_prefix, control.run_number, index_); + LOG(INFO) << "opening file with index " << index_; + f1.setFile(fopen( filename.c_str(), "w" )); + f1.setIndex(index_); + if (f1.getFile() == NULL) { + std::string err = tools::strerror("ERROR when creating file '" + current_filename_a + "'"); + LOG(ERROR) << err; + throw std::runtime_error(err); + } +} diff --git a/src/outputByOrbit.h b/src/outputByOrbit.h new file mode 100644 index 0000000000000000000000000000000000000000..b9899ea46883b60d77b6be7994d7e7c37c3bba29 --- /dev/null +++ b/src/outputByOrbit.h @@ -0,0 +1,76 @@ +#ifndef OUTPUT_H +#define OUTPUT_H + +#include <cstdio> +#include <stdint.h> +#include <string> +#include "tbb/pipeline.h" +#include "config.h" +#include "controls.h" + + +class OutFile{ + +public: + + OutFile(){ + file_ = NULL; + index_ = NULL; + } + + ~OutFile(){}; + + FILE* getFile(){return file_;} + void setFile(FILE* file){file_ = file;} + uint32_t getIndex(){return index_;} + void setIndex(uint32_t index){index_ = index;} + +private: + FILE* file_; + uint32_t index_; + +}; + + +//! Filter that writes each buffer to a file. +class OutputStream: public tbb::filter { + +public: + + OutputStream( const std::string output_filename_base, const std::string output_filename_prefix, ctrl& c, config& conf_); + void* operator()( void* item ) /*override*/; + void* OutputFixedOrbits(Slice& out); + void* OutputFixedSize(Slice& out); +private: + // used for original fixed file size approach + void open_next_file(); + void close_and_move_current_file(); + FILE *current_file; + + // used for fixed N orbits per file approach + void close_and_move_file(); + void open_file(uint32_t index_); + + OutFile f1; + OutFile f2; + +/* FILE *current_file_a; //two files open at once + FILE *current_file_b; + uint32_t index_a; + uint32_t index_b; +*/ + + std::string my_output_filename_base; + std::string my_output_filename_prefix; + uint32_t totcounts; + uint64_t current_file_size; + int32_t file_count; + ctrl& control; + config& conf; + uint32_t current_run_number; + std::string journal_name; + uint32_t index; + const bool fixedOrbitsPerFile; +}; + +#endif