diff --git a/src/output.cc b/src/output.cc index 9ffdb425fa1ad2683c15f5ca2aa6a23377a74ac4..ff7e3c22e36458cc4f75756b30aef791c05216e3 100644 --- a/src/output.cc +++ b/src/output.cc @@ -1,4 +1,6 @@ #include <system_error> +#include <iostream> +#include <fstream> #include "output.h" #include "slice.h" @@ -9,13 +11,45 @@ OutputStream::OutputStream( const char* output_file_base, ctrl *c) : my_output_file_base(output_file_base), totcounts(0), current_file_size(0), - file_count(0), + file_count(-1), control(c), - current_file(0) + current_file(0), + current_run_number(0), + journal_name(my_output_file_base + "/" + "index.journal") { fprintf(stderr,"Created output filter at 0x%llx \n",(unsigned long long)this); } +static void update_journal(std::string journal_name, uint32_t run_number, uint32_t index) +{ + std::string new_journal_name = journal_name + ".new"; + + // Create a new journal file + std::ofstream journal (new_journal_name); + if (journal.is_open()) { + journal << run_number << "\n" << index << "\n"; + journal.close(); + } else { + std::cerr << "WARNING: Unable to open journal file"; + } + + // Replace the old journal + if (rename(new_journal_name.c_str(), journal_name.c_str()) < 0 ) { + perror("Journal file move failed"); + } +} + +static bool read_journal(std::string journal_name, uint32_t& run_number, uint32_t& index) +{ + std::ifstream journal (journal_name); + if (journal.is_open()) { + journal >> run_number >> index; + journal.close(); + return true; + } + return false; +} + void* OutputStream::operator()( void* item ) { Slice& out = *static_cast<Slice*>(item); totcounts += out.get_counts(); @@ -32,21 +66,24 @@ void* OutputStream::operator()( void* item ) { fclose(current_file); current_file=0; file_count = 0; + current_run_number = 0; } out.free(); return NULL; } void OutputStream::open_next_file(){ + + // Close and move open file if (current_file) { fclose(current_file); - // Move the file, so it can be processes by file movers daemon + // Move the file, so it can be processes by file move daemon char source_file_name[256]; char target_file_name[256]; // TODO: Check if the destination directory exists - sprintf(source_file_name,"%s/in_progress/scout_%06d_%06d.dat",my_output_file_base.c_str(),control->run_number,file_count); - sprintf(target_file_name,"%s/scout_%06d_%06d.dat",my_output_file_base.c_str(),control->run_number,file_count); + sprintf(source_file_name,"%s/in_progress/scout_%06d_%06d.dat",my_output_file_base.c_str(),current_run_number,file_count); + sprintf(target_file_name,"%s/scout_%06d_%06d.dat",my_output_file_base.c_str(),current_run_number,file_count); fprintf(stderr, "rename: %s to %s\n", source_file_name, target_file_name); if ( rename(source_file_name, target_file_name) < 0 ) { @@ -57,13 +94,43 @@ void OutputStream::open_next_file(){ file_count += 1; } + // We can change current_run_number only here, after the (previous) file was closed and moved + current_run_number = control->run_number; + + // If this is the first file check if we have a journal file + if (file_count < 0) { + // Set default file index + file_count = 0; + + uint32_t run_number; + uint32_t index; + + if (read_journal(journal_name, run_number, index)) { + std::cout << "We have journal:\n"; + std::cout << " run_number: " << run_number << '\n'; + std::cout << " index: " << index << "\n"; + } else { + std::cout << "No journal file.\n"; + } + + std::cout << "Current run_number: " << current_run_number << '\n'; + if (current_run_number == run_number) { + file_count = index; + } + std::cout << " using index: " << file_count << '\n'; + } + + // Create a new file char run_order_stem[256]; // TODO: Check if the destination directory exists - sprintf(run_order_stem,"/in_progress/scout_%06d_%06d.dat",control->run_number,file_count); + sprintf(run_order_stem,"/in_progress/scout_%06d_%06d.dat",current_run_number,file_count); std::string file_end(run_order_stem); std::string filename = my_output_file_base+file_end; current_file = fopen( filename.c_str(), "w" ); if (current_file == NULL) { throw std::system_error(errno, std::system_category(), "File open error"); } + + // Update journal file (with the next index file) + update_journal(journal_name, current_run_number, file_count+1); } diff --git a/src/output.h b/src/output.h index 829bcad4fda93e991977b8a80e2665807ce930b5..53b00cb0cd62080ce9890e2c00ede7499701b105 100644 --- a/src/output.h +++ b/src/output.h @@ -21,9 +21,11 @@ private: std::string my_output_file_base; uint32_t totcounts; uint64_t current_file_size; - uint32_t file_count; + int32_t file_count; ctrl *control; FILE *current_file; + uint32_t current_run_number; + std::string journal_name; }; #endif diff --git a/src/scdaq.conf b/src/scdaq.conf index c40a714318cc00b6f0c76ae168d48b23fb3de50b..08c5ba815a055fbe03bd6f39d79b0d82c4231a49 100644 --- a/src/scdaq.conf +++ b/src/scdaq.conf @@ -29,11 +29,10 @@ blocks_buffer:1000 # Output output_filename_base:/fff/BU0/ramdisk/scdaq -max_file_size:1073741824 +max_file_size:8589934592 threads:8 port:8000 -#elastic_url:http://something.somewhere elastic_url:http://something.somewhere pt_cut:7 quality_cut:12