diff --git a/src/Makefile b/src/Makefile index 1a442fd425caabaaa8233edb113da4b579e38878..849f8ad714741a62a872c532eb000b8eddfbf8e1 100644 --- a/src/Makefile +++ b/src/Makefile @@ -23,12 +23,10 @@ OBJECTS += $(C_SOURCES:.c=.o) # appropriate rules; CXXFLAGS gets passed as part of command # invocation for both compilation (where -c is needed) and linking # (where it's not.) -CXXFLAGS = -std=c++11 -Wall -Wextra -Og -g -rdynamic -Wconversion -#CXXFLAGS = -std=c++11 -Wall -Wextra -g -rdynamic +CXXFLAGS = -std=c++11 -Wall -Wextra -Og -g -rdynamic -Wconversion CFLAGS = $(CXXFLAGS) LDFLAGS = -Llibmicron -ltbb -ltbbmalloc -lboost_thread -lboost_chrono -lcurl -lpthread -lcrypto - CPPFLAGS = -I. -Iwzdma # default target (to build all) diff --git a/src/MicronDmaInputFilter.cc b/src/MicronDmaInputFilter.cc index b4d0f86ebb84428c8a2d015381f6a47128f0c643..71d2bec1e248a26c17b213a10e8587c7287ea6b8 100644 --- a/src/MicronDmaInputFilter.cc +++ b/src/MicronDmaInputFilter.cc @@ -2,7 +2,6 @@ #include <ctype.h> -#include <boost/multiprecision/cpp_int.hpp> #include <cstring> #include <system_error> diff --git a/src/OutputBySize.cc b/src/OutputBySize.cc index a3056ec45c2080ef87aefe63ddb81782a722dec3..05677b7ed262876af9d153e2925d72d758af78dc 100644 --- a/src/OutputBySize.cc +++ b/src/OutputBySize.cc @@ -44,7 +44,8 @@ static void create_output_directory(std::string &output_directory) { } OutputBySizeStream::OutputBySizeStream(const std::string output_filename_base, - const std::string output_filename_prefix, ctrl &c) + const std::string &output_filename_prefix, ctrl &c, + StreamProcessor::ProcessorType processorType_) : tbb::filter(serial_in_order), my_output_filename_base(output_filename_base), my_output_filename_prefix(output_filename_prefix), @@ -55,7 +56,8 @@ OutputBySizeStream::OutputBySizeStream(const std::string output_filename_base, current_file(0), current_run_number(0), min_buffer_queue_size_(1000), - journal_name(my_output_filename_base + "/" + output_filename_prefix + '_' + journal_file) { + journal_name(my_output_filename_base + "/" + output_filename_prefix + '_' + journal_file), + bril_(processorType_ == StreamProcessor::ProcessorType::BRIL) { LOG(TRACE) << "Created output filter at " << static_cast<void *>(this); // Create the ouput directory @@ -97,7 +99,7 @@ void *OutputBySizeStream::operator()(void *item) { if (control.running.load(std::memory_order_acquire) || control.output_force_write) { if (current_file == NULL || current_file_size > control.max_file_size || - current_run_number != control.run_number) { + current_run_number != control.run_number || bril_) { open_next_file(); } diff --git a/src/OutputBySize.h b/src/OutputBySize.h index a49e28053ef469a733323287d22fe4c83e618a4b..1476d95cd3e957d31db5e6fa63904e6eb8511572 100644 --- a/src/OutputBySize.h +++ b/src/OutputBySize.h @@ -7,13 +7,15 @@ #include <string> #include "controls.h" +#include "processor.h" #include "tbb/pipeline.h" //! Filter that writes each buffer to a file. class OutputBySizeStream : public tbb::filter { public: OutputBySizeStream(const std::string output_filename_base, - const std::string output_filename_prefix, ctrl &c); + const std::string &output_filename_prefix, ctrl &c, + StreamProcessor::ProcessorType processorType_); void *operator()(void *item) /*override*/; private: @@ -31,6 +33,7 @@ class OutputBySizeStream : public tbb::filter { uint32_t current_run_number; std::string journal_name; size_t min_buffer_queue_size_; + bool bril_; }; #endif diff --git a/src/processor.cc b/src/processor.cc index 304ea1989006eaf53a2da6d54fb65aca26ccde2b..acebda9e47eedbfabe232d5a4e01e3adb361127e 100644 --- a/src/processor.cc +++ b/src/processor.cc @@ -204,49 +204,32 @@ size_t StreamProcessor::fillFRDEventHeader_V6(char *wr_ptr_FRDHead, uint32_t inp } uint32_t StreamProcessor::FillBril(char *rd_ptr, char *wr_ptr, char *end_ptr) { - static constexpr uint32_t NHistosPerPacket = 1; // set to 1 for minimum brildaq latency + std::array<uint32_t, constants::NBXPerOrbit + constants::NFramesInHistoHeader> histo_arr; - uint32_t histo_i = 0, histo_word_i = 0; - std::array<std::array<uint32_t, constants::NBXPerOrbit + constants::NFramesInHistoHeader>, - NHistosPerPacket> - histo_arr; - // uint32_t histo_arr[NHistosPerPacket][constants::NBXPerOrbit + - // NFramesInHistoHeader]; - - // BrilHistoQueue<std::array<uint32_t, constants::NBXPerOrbit + - // constants::NFramesInHistoHeader>> BrilQueue; - - while ((rd_ptr != end_ptr) && (histo_i < NHistosPerPacket)) { + while (rd_ptr != end_ptr) { assert(rd_ptr + sizeof(brilFrame) - 1 <= end_ptr); brilFrame *fr = reinterpret_cast<brilFrame *>(rd_ptr); - if (fr->word == constants::bril_header) { - rd_ptr += 32; - histo_i++; - histo_word_i = 0; - continue; - } - if (histo_word_i < constants::NFramesInHistoHeader) { - histo_arr[histo_i][histo_word_i] = (fr->word >> 0) & 0xffffffff; + if (fr->counter < constants::NFramesInHistoHeader) { + if (fr->counter == 5) { // add run number, 6th frame in bril header is run number + histo_arr[fr->counter] = control.run_number & 0xffffffff; + } else { + histo_arr[fr->counter] = (fr->word >> 0) & 0xffffffff; + } + } else { - histo_arr[histo_i][(histo_word_i * 2) - constants::NFramesInHistoHeader] = - (fr->word >> 0) & 0xffff; - histo_arr[histo_i][(histo_word_i * 2) + 1 - constants::NFramesInHistoHeader] = + histo_arr[(fr->counter * 2) - constants::NFramesInHistoHeader] = (fr->word >> 0) & 0xffff; + + histo_arr[(fr->counter * 2) + 1 - constants::NFramesInHistoHeader] = (fr->word >> 16) & 0xffff; } rd_ptr += sizeof(brilFrame); - histo_word_i++; } - uint32_t packed_size = sizeof(uint32_t) * NHistosPerPacket * - (constants::NBXPerOrbit + constants::NFramesInHistoHeader); + uint32_t packed_size = + sizeof(uint32_t) * (constants::NBXPerOrbit + constants::NFramesInHistoHeader); memcpy(wr_ptr, (char *)&histo_arr, packed_size); - wr_ptr += packed_size; - - for (std::array<uint32_t, constants::NBXPerOrbit + constants::NFramesInHistoHeader> &hist : - histo_arr) { - BrilQueue.push(hist); - } + BrilQueue.push(histo_arr); return packed_size; } diff --git a/src/processor.h b/src/processor.h index 25b36cb88b611dda271c66718f55d53a10660280..917ceb086a19b1eccde0a66a8b09343a62ea3fdb 100644 --- a/src/processor.h +++ b/src/processor.h @@ -1,6 +1,8 @@ #ifndef PROCESSOR_H #define PROCESSOR_H +#include <unistd.h> + #include <atomic> #include <fstream> #include <iostream> diff --git a/src/scdaq.cc b/src/scdaq.cc index 6910eb29c0fca92c9793b224ab2d7653ee727d34..1bb5ee8595ab57c03a7aaae4aa9f3890ef0cfb29 100644 --- a/src/scdaq.cc +++ b/src/scdaq.cc @@ -94,8 +94,9 @@ int run_pipeline(int nbThreads, ctrl &control, config &conf) { output_stream_by_orbit = std::make_shared<OutputByOrbitStream>(control, conf); pipeline.add_filter(*output_stream_by_orbit); } else { - output_stream_by_size = std::make_shared<OutputBySizeStream>( - conf.getOutputFilenameBase(), conf.getOutputFilenamePrefix(), control); + output_stream_by_size = std::make_shared<OutputBySizeStream>(conf.getOutputFilenameBase(), + conf.getOutputFilenamePrefix(), + control, conf.getProcessorType()); pipeline.add_filter(*output_stream_by_size); } diff --git a/src/session.h b/src/session.h index cad8aadc97fb20be0f4d7a188c4ddb610b8b2240..81ce4bd9461499edacb58aa380c30273ce01f3e2 100644 --- a/src/session.h +++ b/src/session.h @@ -17,7 +17,7 @@ #include <boost/utility/string_ref.hpp> namespace std { -using string_view = boost::string_ref; +using string_view_ = boost::string_ref; } /*****************************************************************************/ @@ -40,7 +40,7 @@ class session { #define RCINFO(msg) \ msg ", run_number: %u, running: %s", control.run_number, (control.running ? "true" : "false") - int process_command(std::string_view input, char *output, size_t size) { + int process_command(std::string_view_ input, char *output, size_t size) { try { std::vector<std::string> items; boost::split(items, input, boost::is_any_of(" ,"), boost::token_compress_on); @@ -85,12 +85,12 @@ class session { void handle_read(const boost::system::error_code &error, size_t bytes_transferred) { if (!error) { - std::string_view input(data_, bytes_transferred); + std::string_view_ input(data_, bytes_transferred); LOG(DEBUG) << "Run control: Received: '" << input << '\''; bytes_transferred = process_command(input, data_, max_length); - std::string_view output(data_, bytes_transferred); + std::string_view_ output(data_, bytes_transferred); LOG(DEBUG) << "Run control: Sending: '" << output << '\''; boost::asio::async_write(