Skip to content
Snippets Groups Projects
Commit c67ee6fc authored by Emilio Meschi's avatar Emilio Meschi :bicyclist_tone1: Committed by Thomas Owen James
Browse files

moved most of the file handling to OutputFileHandler

parent 9c1561c7
No related branches found
No related tags found
No related merge requests found
......@@ -11,48 +11,29 @@
#include <system_error>
OutputByOrbitStream::OutputByOrbitStream(ctrl &c, config &conf_)
: tbb::filter(serial_in_order), totcounts(0), current_file_size(0),
file_count(-1), control(c), current_run_number(0), conf(conf_),
: tbb::filter(serial_in_order), current_file_size(0), totcounts(0),
control(c), conf(conf_),
output_file_handler_(conf.getOutputFilenameBase(),
conf.getOutputFilenamePrefix()) {
LOG(TRACE) << "Created output filter at " << static_cast<void *>(this);
}
void *OutputByOrbitStream::OutputFixedOrbits(Slice &out) {
void OutputByOrbitStream::OutputFixedOrbits(Slice &out) {
uint32_t orbitN = out.get_firstOrbitN();
// std::cout << orbitN << std::endl;
uint32_t new_index = uint32_t(orbitN / conf.getNOrbitsPerFile());
uint32_t index = uint32_t(orbitN / conf.getNOrbitsPerFile());
size_t n = 0;
bool already_opened = false;
if (out.get_counts() != 0) {
if (control.running.load(std::memory_order_acquire) ||
control.output_force_write) { // i.e should be writing data
if (current_run_number != control.run_number) {
close_and_move_file();
open_file(new_index);
n = fwrite(out.begin(), 1, out.size(), outFile.getFile());
} else {
if (new_index != outFile.getIndex()) { // i.e new file should be opened
if (outFile.getIndex() > new_index) {
LOG(WARNING) << "new file index: " << new_index
<< " < previous file index: " << outFile.getIndex();
}
close_and_move_file();
open_file(new_index);
outFile.setIndex(new_index);
}
n = fwrite(out.begin(), 1, out.size(), outFile.getFile());
}
current_file_size += n;
} else { // i.e should not currently be writing data
close_and_move_file(); // once for each open file (should be 2)
n = fwrite(out.begin(), 1, out.size(),
output_file_handler_.getFile(control.run_number, index));
}
current_file_size += n;
} else { // i.e should not currently be writing data - but then we should
// never get here in the first place
output_file_handler_.enqueue_current_file_for_close_and_move_maybe();
}
}
......@@ -65,23 +46,3 @@ void *OutputByOrbitStream::operator()(void *item) {
out.free();
return NULL;
}
void OutputByOrbitStream::close_and_move_file() // Used for fixedNorbits per
// file option
{
std::string run_file = format_run_file_stem(
my_output_filename_prefix, current_run_number, outFile.getIndex());
std::string current_file_name =
my_output_filename_base + "/" + working_dir + "/" + run_file;
std::string target_file_name = my_output_filename_base + "/" + run_file;
if (outFile.getFile() != NULL) {
fclose(outFile.getFile());
outFile.setFile(NULL);
outFile.setIndex(NULL);
}
LOG(INFO) << "rename: " << current_file_name << " to " << target_file_name;
if (rename(current_file_name.c_str(), target_file_name.c_str()) < 0) {
LOG(ERROR) << tools::strerror("File rename failed");
}
current_run_number = control.run_number;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment