Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
OutputFileHandler.h 3.53 KiB
#ifndef OUTPUTFILEHANDLER_H
#define OUTPUTFILEHANDLER_H

#include <atomic>
#include <cstdint>
#include <cstdio>
#include <string>
#include <thread>
#include <utility>

#include "FRDFileHeader_v2.h"
#include "OutputFile.h"
#include "format.h"
#include "tbb/concurrent_queue.h"

class OutputFileHandler {
 public:
  OutputFileHandler(const std::string &base_path, const std::string &filename_prefix,
                    int nOrbitsPerFile, const bool cmsswHeaders, uint32_t sourceID)
      : base_path_(base_path),
        run_dir_(base_path + "/run000000"),
        filename_prefix_(filename_prefix),
        working_files_base_path_(base_path_ + "/" + working_dir_),
        current_run_number_(-1),
        current_index_(-1),
        close_and_rename_(base_path_, nOrbitsPerFile),
        nOrbitsPerFile_(nOrbitsPerFile),
        cmsswHeaders_(cmsswHeaders),
        sourceID_(sourceID),
        file_size_(0),
        NOrbits_(0),
        ls_NOrbits_(0),
        run_NOrbits_(0),
        t{},
        max_index_per_ls_(static_cast<int>(constants::N_orbits_per_lumisection / nOrbitsPerFile) -
                          1) {
    close_and_rename_.SetOutputFileHandlerObj(*this);
    t = std::thread(close_and_rename_);
    if (cmsswHeaders) {
      file_size_ = sizeof(FRDFileHeader_v2);  // accounting for cmssw header size
      ls_file_size_ = 0;
    }
  }

  ~OutputFileHandler() {
    enqueue_current_file_for_close_and_move_maybe();
    exit_file_handler_loop();
  }

  void enqueue_current_file_for_close_and_move_maybe();

  OutputFile getFile(uint32_t run, uint32_t index);

  bool hasFile() { return outputFile_.exists(); }

  bool getCMSSWHeaders() { return cmsswHeaders_; }

  FRDFileHeader_v2 createFileHeader(uint32_t ls);
  void upFileSize(size_t n) { file_size_ += n; }
  void upNOrbits(uint32_t n) { NOrbits_ += n; }

  void write_EoLS_file(uint32_t ls);
  void write_EoR_file();
  int NOrbitsPerFile() { return nOrbitsPerFile_; }

  class close_and_rename {
    OutputFileHandler *outputfilehandler_;

   public:
    close_and_rename(std::string &base_path, int nOrbitsPerFile)
        : base_path_(base_path), nOrbitsPerFile_(nOrbitsPerFile) {}

    void operator()() const;

    void SetOutputFileHandlerObj(OutputFileHandler &outputfilehandler_) {
      this->outputfilehandler_ = &outputfilehandler_;
    }

   private:
    std::string base_path_;
    int nOrbitsPerFile_;
  };

 private:
  void create_output_directory_maybe(std::string &output_directory);

  void open_new_file();

  std::string format_filename(uint32_t run_number, uint32_t index, uint32_t ls);

  void exit_file_handler_loop() {
    file_handler_running_.store(false, std::memory_order_release);
    files_to_close_.abort();
    t.join();
  }

  // name of subdir where the files are stored before they are moved to the
  // final destination
  static const std::string working_dir_;
  // name of the journal file. Note: Filename prefix is added making the final
  // filename
  static const std::string journal_file_;

  static tbb::concurrent_bounded_queue<OutputFile> files_to_close_;

  static std::atomic<bool> file_handler_running_;

  std::string base_path_;
  std::string run_dir_;
  std::string filename_prefix_;
  std::string working_files_base_path_;
  OutputFile outputFile_;
  int current_run_number_;
  int current_index_;
  close_and_rename close_and_rename_;
  int nOrbitsPerFile_;
  const bool cmsswHeaders_;
  int sourceID_;
  std::thread t;
  const uint32_t max_index_per_ls_;
  size_t ls_file_size_;
  size_t file_size_;
  uint32_t NOrbits_;
  uint32_t ls_NOrbits_;
  uint32_t run_NOrbits_;
};

#endif