diff --git a/src/config.h b/src/config.h index 836d147c8704b1d25e5936fc240ed0e65dc1bd8b..c16316a8aa8061514661d4e362a0a209987cd822 100644 --- a/src/config.h +++ b/src/config.h @@ -63,6 +63,7 @@ public: std::string v = vmap.at("pt_cut"); return boost::lexical_cast<uint32_t>(v.c_str()); } + const std::string& getOutputFilenameBase() const { return vmap.at("output_filename_base"); @@ -71,6 +72,10 @@ public: std::string v = vmap.at("max_file_size"); return boost::lexical_cast<uint64_t>(v.c_str()); } + bool getOutputForceWrite() const { + return (true ? vmap.at("output_force_write") == "yes" : false); + } + uint32_t getNumThreads() const { std::string v = vmap.at("threads"); return boost::lexical_cast<uint32_t>(v.c_str()); diff --git a/src/controls.h b/src/controls.h index 9c3d260e25004a8bff3d21eff7ae750751da6bee..364a29885e11679f683fba70b100d2b3a009d6be 100644 --- a/src/controls.h +++ b/src/controls.h @@ -6,6 +6,8 @@ struct ctrl { uint32_t run_number; std::atomic<bool> running; + /* Always write data to a file regardless of the run status */ + bool output_force_write; uint64_t max_file_size; int packets_per_report; }; diff --git a/src/output.cc b/src/output.cc index 3190f9fe01dbd1a25e6dce180fc1698b867d32c5..4448f434a9193e103a884ea0329aea29a4f4362f 100644 --- a/src/output.cc +++ b/src/output.cc @@ -6,6 +6,36 @@ #include "log.h" #include "tools.h" +/* Defines the journal file */ +static const std::string journal_file { "index.journal" }; + +/* Defined where are the files stored before they are moved to the final destination */ +static const std::string working_dir { "in_progress" }; + +static void create_output_directory(std::string& output_directory) +{ + struct stat sb; + + /* check if path exists and is a directory */ + if (stat(output_directory.c_str(), &sb) == 0) { + if (S_ISDIR (sb.st_mode)) { + // Is already existing + LOG(TRACE) << "Output directory is already existing: " << output_directory << "'."; + return; + } + std::string err = "ERROR The output directory path '" + output_directory + "' is existing, but the path is not a directory!"; + LOG(ERROR) << err; + throw std::runtime_error(err); + } + + if (!tools::filesystem::create_directories(output_directory)) { + std::string err = tools::strerror("ERROR when creating the output directory '" + output_directory + "'"); + LOG(ERROR) << err; + throw std::runtime_error(err); + } + LOG(TRACE) << "Created output directory: " << output_directory << "'."; +} + OutputStream::OutputStream( const char* output_file_base, ctrl& c) : tbb::filter(serial_in_order), my_output_file_base(output_file_base), @@ -15,9 +45,13 @@ OutputStream::OutputStream( const char* output_file_base, ctrl& c) : control(c), current_file(0), current_run_number(0), - journal_name(my_output_file_base + "/" + "index.journal") + journal_name(my_output_file_base + "/" + journal_file) { LOG(TRACE) << "Created output filter at " << static_cast<void*>(this); + + // Create the ouput directory + std::string output_directory = my_output_file_base + "/" + working_dir; + create_output_directory(output_directory); } static void update_journal(std::string journal_name, uint32_t run_number, uint32_t index) @@ -55,7 +89,7 @@ void* OutputStream::operator()( void* item ) Slice& out = *static_cast<Slice*>(item); totcounts += out.get_counts(); - if ( control.running.load(std::memory_order_acquire) ) { + if ( control.running.load(std::memory_order_acquire) || control.output_force_write ) { if (current_file == NULL || current_file_size > control.max_file_size || current_run_number != control.run_number) { open_next_file(); } @@ -68,7 +102,7 @@ void* OutputStream::operator()( void* item ) } // If not running and we have a file then close it - if (!control.running && current_file != NULL) { + if ( !control.running && current_file != NULL && !control.output_force_write ) { close_and_move_current_file(); file_count = -1; } @@ -77,6 +111,17 @@ void* OutputStream::operator()( void* item ) return NULL; } +/* + * Create a properly formated file name + * TODO: Change to C++ + */ +static std::string format_run_file_stem(uint32_t run_number, int32_t file_count) +{ + char run_order_stem[PATH_MAX]; + snprintf(run_order_stem, sizeof(run_order_stem), "scout_%06d_%06d.dat", run_number, file_count); + return std::string(run_order_stem); +} + void OutputStream::close_and_move_current_file() { // Close and move current file @@ -84,15 +129,12 @@ void OutputStream::close_and_move_current_file() fclose(current_file); current_file = NULL; - // Move the file, so it can be processes by file move daemon - char source_file_name[256]; - char target_file_name[256]; - // TODO: Check if the destination directory exists - sprintf(source_file_name,"%s/in_progress/scout_%06d_%06d.dat", my_output_file_base.c_str(), current_run_number, file_count); - sprintf(target_file_name,"%s/scout_%06d_%06d.dat", my_output_file_base.c_str(), current_run_number, file_count); + std::string run_file = format_run_file_stem(current_run_number, file_count); + std::string current_file_name = my_output_file_base + "/" + working_dir + "/" + run_file; + std::string target_file_name = my_output_file_base + "/" + run_file; - LOG(INFO) << "rename: " << source_file_name << " to " << target_file_name; - if ( rename(source_file_name, target_file_name) < 0 ) { + LOG(INFO) << "rename: " << current_file_name << " to " << target_file_name; + if ( rename(current_file_name.c_str(), target_file_name.c_str()) < 0 ) { LOG(ERROR) << tools::strerror("File rename failed"); } @@ -134,15 +176,17 @@ void OutputStream::open_next_file() LOG(INFO) << " using index: " << file_count; } + // Create the ouput directory + std::string output_directory = my_output_file_base + "/" + working_dir; + create_output_directory(output_directory); + // Create a new file - char run_order_stem[256]; - // TODO: Check if the destination directory exists - sprintf(run_order_stem,"/in_progress/scout_%06d_%06d.dat",current_run_number,file_count); - std::string file_end(run_order_stem); - std::string filename = my_output_file_base+file_end; - current_file = fopen( filename.c_str(), "w" ); + std::string current_filename = output_directory + "/" + format_run_file_stem(current_run_number, file_count); + current_file = fopen( current_filename.c_str(), "w" ); if (current_file == NULL) { - throw std::system_error(errno, std::system_category(), "File open error"); + std::string err = tools::strerror("ERROR when creating file '" + current_filename + "'"); + LOG(ERROR) << err; + throw std::runtime_error(err); } // Update journal file (with the next index file) diff --git a/src/scdaq.cc b/src/scdaq.cc index 43e4259cf802b2286d36305caa57ca2622959b95..62018538bb7bd36847604a9a74c69e413b568376 100644 --- a/src/scdaq.cc +++ b/src/scdaq.cc @@ -142,6 +142,7 @@ int main( int argc, char* argv[] ) { control.run_number = 0; control.max_file_size = conf.getOutputMaxFileSize();//in Bytes control.packets_per_report = conf.getPacketsPerReport(); + control.output_force_write = conf.getOutputForceWrite(); boost::asio::io_service io_service; server s(io_service, conf.getPortNumber(), control); diff --git a/src/scdaq.conf b/src/scdaq.conf index c0d53ce699ca76209ac987c2641952243827b4db..0f1f07e05ae757fce1d372452880ab301191a1de 100644 --- a/src/scdaq.conf +++ b/src/scdaq.conf @@ -22,6 +22,7 @@ dma_number_of_packet_buffers:1000 packets_per_report:1000 #packets_per_report:1 + ## Settings for file input #input_file:/dev/shm/testdata.bin input_file:testdata.bin @@ -30,10 +31,14 @@ input_file:testdata.bin input_buffers:10 blocks_buffer:1000 -# Output +## Settings for file output output_filename_base:/fff/BU0/ramdisk/scdaq max_file_size:8589934592 +# Always write data to a file regardless of the run status, usefull for debugging +output_force_write:no + + # Elastics processor port:8000 elastic_url:http://something.somewhere diff --git a/src/tools.h b/src/tools.h index c934c45a3a89c02e26b9174a908ad10d0388a25e..490efda91afc5d7038086b77baba2c63dcae0141 100644 --- a/src/tools.h +++ b/src/tools.h @@ -8,6 +8,9 @@ #include <cstring> #include <iostream> +#include <sys/stat.h> +#include <limits.h> + namespace tools { @@ -42,6 +45,43 @@ inline const std::string strerror(const std::string& msg) } +/* + * Various filesystem related utilities (will be removed once moved to C++17, or rewritten with boost) + */ +namespace filesystem { + +/* + * Create the target directory and any parent directories if necessary + */ +inline bool create_directories(std::string& path) +{ + char tmp[PATH_MAX]; + + // Add terminating '/' and make a writtable copy; + int len = snprintf(tmp, sizeof(tmp), "%s/", path.c_str()); + if (len > PATH_MAX) len = PATH_MAX; + + char *last_backslash = tmp; + for (char *p = tmp; *p; p++) { + if (*p == '/') { + // Found a new directory, ignore any subsequent back slashes + int dir_len = p - last_backslash - 1; + if (dir_len > 0) { + *p = 0; + if (mkdir(tmp, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH) < 0 && (errno != EEXIST)) { + return false; + } + *p = '/'; + } + last_backslash = p; + } + } + + return true; +} + + +} // namespace filesystem } // namespace tools