diff --git a/src/OutputFileHandler.cc b/src/OutputFileHandler.cc index 2e620690e5df6156a2bbbf88a8bc1ff981df8078..c8010748b70a93ed104b05f849ed22ef2391a8df 100644 --- a/src/OutputFileHandler.cc +++ b/src/OutputFileHandler.cc @@ -3,6 +3,29 @@ #include <iomanip> #include "log.h" +#include "tools.h" + +const std::string OutputFileHandler::working_dir_ = "in_progress"; +const std::string OutputFileHandler::journal_file_ = "index.journal"; + +tbb::concurrent_bounded_queue<std::pair<std::string, FILE *>> OutputFileHandler::files_to_close_ = + tbb::concurrent_bounded_queue<std::pair<std::string, FILE *>>(); +bool OutputFileHandler::file_handler_running_ = false; + +void OutputFileHandler::enqueue_current_file_for_close_and_move_maybe() { + if (current_file_ != 0) + files_to_close_.push(std::pair<std::string, FILE *>(current_filename_, current_file_)); +} + +FILE *OutputFileHandler::getFile(uint32_t run, uint32_t index) { + if (current_run_number_ != run || current_index_ != index) { + enqueue_current_file_for_close_and_move_maybe(); + current_run_number_ = run; + current_index_ = index; + open_new_file(); + } + return current_file_; +} void OutputFileHandler::create_output_directory_maybe(std::string &output_directory) { struct stat sb; @@ -27,27 +50,44 @@ void OutputFileHandler::create_output_directory_maybe(std::string &output_direct LOG(TRACE) << "Created output directory: " << output_directory << "'."; } -void OutputFileHandler::open_file(uint32_t index, uint32_t run) { +void OutputFileHandler::open_new_file() { // Create a new file - create_output_directory_maybe(working_files_basepath_); - std::string filename = - working_files_base_path_ + "/" + format_filename(control.run_number, index); - LOG(TRACE) << "opening file with index " << index; - current_file_ = fopen( filename.c_str(), "wbx" )); + create_output_directory_maybe(working_files_base_path_); + current_filename_ = format_filename(current_run_number_, current_index_); + std::string full_filename = working_files_base_path_ + "/" + current_filename_; + LOG(TRACE) << "opening file with index " << current_index_; + current_file_ = fopen(full_filename.c_str(), "wbx"); if (current_file_ == NULL) { - std::string err = tools::strerror("ERROR when creating file '" + current_filename_a + "'"); + std::string err = tools::strerror("ERROR when creating file '" + current_filename_ + "'"); LOG(ERROR) << err; throw std::runtime_error(err); } - current_index_ = index; } -/* - * Create a properly formatted file name - */ +// Create a properly formatted file name + std::string OutputFileHandler::format_filename(uint32_t run_number, uint32_t index) { std::ostringstream ofilename; - ofilename << filename_prefix_ << "_" << std::setfill(0) << std::setw(6) << run_number << "_" + ofilename << filename_prefix_ << "_" << std::setfill('0') << std::setw(6) << run_number << "_" << index << ".dat"; return ofilename.str(); } + +void OutputFileHandler::close_and_rename::operator()() const { + std::pair<std::string, FILE *> n; + while (file_handler_running_ || (files_to_close_.size() > 0)) { + // std::cout << std::this_thread::get_id() << " try pop" << + // files_to_close_.size() << std::endl; + files_to_close_.pop(n); + if (fclose(n.second) != 0) { + LOG(ERROR) << tools::strerror("File close failed"); + } + std::string from = base_path_ + "/" + working_dir_ + "/" + n.first; + std::string to = base_path_ + "/" + n.first; + if (rename(from.c_str(), to.c_str()) != 0) { + LOG(ERROR) << tools::strerror("File rename failed"); + } + // std::cout << std::this_thread::get_id() << " popped " << n.first << " + // size now " << files_to_close_.size() << std::endl; + } +}