From e2bd02ccd19ed479883b7df24d4612a3c5e7970b Mon Sep 17 00:00:00 2001
From: Giovanna Lazzari Miotto <giovanna.lazzari.miotto@cern.ch>
Date: Wed, 16 Aug 2023 16:32:08 +0200
Subject: [PATCH] [main, pipeline] Generalize to accept multiple TBB pipelines

Multiple pipelines are instantiated in parallel and managed by tbb's
group task concurrent scheduling interface.

A maximum number of supported pipelines is defined as a compilation
directive, but a new optional argument can take up to that maximum
number of active streams.

Each stream has its own configuration file.
The base name is passed as config and the files are assumed to follow
 the structure {basename}{i}.{baseformat}, unless no number of streams
 is provided. In that edge case, the config file is taken as-is and
 only one stream is opened.

Add a counter to InputFilter to tag log messages of each instance
---
 src/InputFilter.cc |   8 +-
 src/InputFilter.h  |   3 +-
 src/scdaq.cc       | 360 +++++++++++++++++++++++++++------------------
 3 files changed, 225 insertions(+), 146 deletions(-)

diff --git a/src/InputFilter.cc b/src/InputFilter.cc
index e7838ed9..0b891779 100644
--- a/src/InputFilter.cc
+++ b/src/InputFilter.cc
@@ -8,6 +8,8 @@
 #include "log.h"
 #include "slice.h"
 
+
+std::atomic<int> filterCounter{0};	
 InputFilter::InputFilter(size_t packetBufferSize, size_t nbPacketBuffers, ctrl &control)
     : filter(serial_in_order),
       control_(control),
@@ -20,7 +22,9 @@ InputFilter::InputFilter(size_t packetBufferSize, size_t nbPacketBuffers, ctrl &
   maxBytesRead_ = 0;
   previousNbReads_ = 0;
 
-  LOG(TRACE) << "Configuration translated into:";
+filterId = filterCounter++;
+
+  LOG(TRACE) << "[" << std::to_string(filterId) << "] Configuration translated into:";
   LOG(TRACE) << "  MAX_BYTES_PER_INPUT_SLICE: " << packetBufferSize;
   LOG(TRACE) << "  TOTAL_SLICES: " << nbPacketBuffers;
   LOG(TRACE) << "Created input filter and allocated at " << static_cast<void *>(nextSlice_);
@@ -56,7 +60,7 @@ void InputFilter::printStats(std::ostream &out, ssize_t lastBytesRead) {
   std::ios state(nullptr);
   state.copyfmt(out);
 
-  out << "#" << nbReads_ << ": Reading " << std::fixed << std::setprecision(1) << bwd << " MB/sec, "
+  out << "[" << std::to_string(filterId) << "] " << "#" << nbReads_ << ": Reading " << std::fixed << std::setprecision(1) << bwd << " MB/sec, "
       << nbReadsDiff << " packet(s) min/avg/max " << minBytesRead_ << '/' << avgBytesRead << '/'
       << maxBytesRead_ << " last " << lastBytesRead;
 
diff --git a/src/InputFilter.h b/src/InputFilter.h
index 9b4b9a4b..863868b2 100644
--- a/src/InputFilter.h
+++ b/src/InputFilter.h
@@ -3,7 +3,7 @@
 
 #include <cstddef>
 #include <iostream>
-
+#include <atomic>
 #include "controls.h"
 #include "tbb/pipeline.h"
 #include "tbb/tick_count.h"
@@ -19,6 +19,7 @@ class InputFilter : public tbb::filter {
   InputFilter(size_t packet_buffer_size, size_t number_of_packet_buffers, ctrl &control);
   virtual ~InputFilter();
 
+  int filterId = 0;
   // Return the number of read calls
   uint64_t nbReads() { return nbReads_; }
 
diff --git a/src/scdaq.cc b/src/scdaq.cc
index 24a21b01..bc0bed1b 100644
--- a/src/scdaq.cc
+++ b/src/scdaq.cc
@@ -29,182 +29,256 @@
 #include "tbb/task_scheduler_init.h"
 #include "tbb/tbb_allocator.h"
 #include "tbb/tick_count.h"
+#include "tbb/task_group.h"
 #include "tools.h"
 
 using namespace std;
 
-int run_pipeline(int nbThreads, ctrl &control, config &conf) {
-  config::InputType input = conf.getInput();
-  size_t packetBufferSize = conf.getDmaPacketBufferSize();
-  size_t nbPacketBuffers = conf.getNumberOfDmaPacketBuffers();
-  auto tcpDestPort = static_cast<uint32_t>(conf.getTcpDestPort());
+#define NUM_PIPELINES 2
 
-  // Create empty input reader, will assign later when we know what is the data
-  // source
-  std::shared_ptr<InputFilter> input_filter;
-  std::shared_ptr<OutputByOrbitStream> output_stream_by_orbit;
-  std::shared_ptr<OutputBySizeStream> output_stream_by_size;
-  // Create the pipeline
-  tbb::pipeline pipeline;
+class DAQPipeline {
+public:
+    config fConf;
+    ctrl fControl{};
+
+    std::shared_ptr<InputFilter> fInputFilter;
+    StreamProcessor fStreamProcessor;
+    std::shared_ptr<tbb::filter> fOutputStream;
+    tbb::pipeline fPipeline;
+
+    explicit DAQPipeline(config conf)
+            : fConf(conf),
+              fStreamProcessor(conf.getDmaPacketBufferSize(), conf.getDoZS(), conf.getProcessorType(),
+                               conf.getNOrbitsPerPacket(),
+                               conf.getCMSSWHeaders(), static_cast<uint16_t>(conf.getSourceID()),
+                               fControl) {
+
+        tools::log::set_filter_min_severity(fConf.getLogMinSeverity());
+        fConf.print();
+        const config::InputType &input = fConf.getInput();
+        const size_t &nbPacketBuffers = fConf.getNumberOfDmaPacketBuffers();
+        const size_t &packetBufferSize = fConf.getDmaPacketBufferSize();
 
   if (input == config::InputType::FILEDMA) {
     // Create FILE DMA reader
-    input_filter = std::make_shared<FileDmaInputFilter>(conf.getInputFile(), packetBufferSize,
+    fInputFilter = std::make_shared<FileDmaInputFilter>(conf.getInputFile(), packetBufferSize,
                                                         nbPacketBuffers, control);
 
-  } else if (input == config::InputType::WZDMA) {
-    // Create WZ DMA reader
-    input_filter =
-        std::make_shared<WZDmaInputFilter>(packetBufferSize, nbPacketBuffers, control, conf);
-
-  } else if (input == config::InputType::MICRONDMA) {
-    // create MicronDmaInputFilter reader
-    input_filter =
-        std::make_shared<MicronDmaInputFilter>(packetBufferSize, nbPacketBuffers, control, conf);
-
-  } else if (input == config::InputType::TCPIP) {
-    // create TcpInputFilter reader
-    if (conf.getDev_TCPAutoReconnectOnFailure()) {
-      input_filter =
-          std::make_shared<TcpInputFilter>(tcpDestPort, packetBufferSize, nbPacketBuffers,
-                                           conf.getNOrbitsPerPacket(), control, conf);
-    } else {
-      throw std::invalid_argument(
-          "Configuration error: enable developmentMode to use TCP input filter");
+        } else if (input == config::InputType::WZDMA) {
+            // Create WZ DMA reader
+            fInputFilter =
+                    std::make_shared<WZDmaInputFilter>(packetBufferSize, nbPacketBuffers, fControl, fConf);
+
+        } else if (input == config::InputType::MICRONDMA) {
+            // create MicronDmaInputFilter reader
+            fInputFilter =
+                    std::make_shared<MicronDmaInputFilter>(packetBufferSize, nbPacketBuffers, fControl, fConf);
+
+        } else if (input == config::InputType::TCPIP) {
+            // create TcpInputFilter reader
+            if (fConf.getDev_TCPAutoReconnectOnFailure()) {
+                fInputFilter =
+                        std::make_shared<TcpInputFilter>(fConf.getTcpDestPort(), packetBufferSize, nbPacketBuffers,
+                                                         fConf.getNOrbitsPerPacket(), fControl, fConf);
+
+            } else {
+                exit(2);
+//            throw std::invalid_argument(
+//                    "Configuration error: enable developmentMode to use TCP input filter");
+            }
+        } else {
+            exit(3);
+//            throw std::invalid_argument("Configuration error: Unknown input type was specified");
+        }
+
+        // Add input reader to a pipeline
+        fPipeline.add_filter(*fInputFilter);
+
+
+        // Create reformatter and add it to the pipeline
+        if (fConf.getEnableStreamProcessor()) {
+            fPipeline.add_filter(fStreamProcessor);
+        }
+
+        // Create file-writing stage and add it to the pipeline
+        if (fConf.getNOrbitsPerFile()) {
+            fOutputStream = std::make_shared<OutputByOrbitStream>(fControl, fConf);
+        } else {
+            fOutputStream = std::make_shared<OutputBySizeStream>(
+                    fConf.getOutputFilenameBase(), fConf.getOutputFilenamePrefix(), fControl, fConf.getProcessorType());
+        }
+        fPipeline.add_filter(*fOutputStream);
+
+        //    tbb::tick_count mainStartTime = tbb::tick_count::now();
+        fControl.running = false;
+        fControl.run_number = 0;
+        fControl.max_file_size = fConf.getOutputMaxFileSize();  // in Bytes
+        fControl.packets_per_report = static_cast<int>(fConf.getPacketsPerReport());
+        fControl.output_force_write = fConf.getOutputForceWrite();
+        fControl.n_orbits_per_packet = static_cast<int>(fConf.getNOrbitsPerPacket());
+
+
+        // Firmware needs at least 1MB buffer for DMA
+        if (fConf.getDmaPacketBufferSize() < 1024 * 1024) {
+            LOG(ERROR) << "dma_packet_buffer_size must be at least 1048576 bytes (1MB), but "
+                       << fConf.getDmaPacketBufferSize()
+                       << " bytes was given. Check the configuration file.";
+            exit(1);
+        }
+    }
+};
+
+
+int run_pipelines(uint nbThreads, std::deque<DAQPipeline> &pipelines, int numStreams) {
+
+    tbb::task_group group;
+    // Run the pipeline
+    tbb::tick_count t0 = tbb::tick_count::now();
+    for (int i = 0; i < numStreams; ++i) {
+        // Need more than one token in flight per thread to keep all threads
+        // busy; 2-4 works
+//      try {
+//          pipelines.at(i).fPipeline.run(nbThreads * 4);
+//      } catch (std::out_of_range const &e) {
+//          std::cout << e.what() << std::endl;
+//      }
+        group.run([&pipelines, i = i, &nbThreads] { return pipelines.at(i).fPipeline.run(nbThreads * 4); });
     }
+    group.wait();
+    tbb::tick_count t1 = tbb::tick_count::now();
+
+    LOG(INFO) << "time = " << (t1 - t0).seconds();
 
-  } else {
-    throw std::invalid_argument("Configuration error: Unknown input type was specified");
-  }
-
-  // Add input reader to a pipeline
-  pipeline.add_filter(*input_filter);
-
-  // Create reformatter and add it to the pipeline
-  // TODO: Created here so we are not subject of scoping, fix later...
-  StreamProcessor stream_processor(packetBufferSize, conf.getDoZS(), conf.getProcessorType(),
-                                   conf.getNOrbitsPerPacket(), conf.getCMSSWHeaders(),
-                                   static_cast<uint16_t>(conf.getSourceID()), control);
-  if (conf.getEnableStreamProcessor()) {
-    pipeline.add_filter(stream_processor);
-  }
-
-  // Create file-writing stage and add it to the pipeline
-  if (conf.getNOrbitsPerFile()) {
-    output_stream_by_orbit = std::make_shared<OutputByOrbitStream>(control, conf);
-    pipeline.add_filter(*output_stream_by_orbit);
-  } else {
-    output_stream_by_size = std::make_shared<OutputBySizeStream>(
-        conf.getOutputFilenameBase(), conf.getOutputFilenamePrefix(), control, conf.getProcessorType());
-    pipeline.add_filter(*output_stream_by_size);
-  }
-
-  // Run the pipeline
-  tbb::tick_count t0 = tbb::tick_count::now();
-  // Need more than one token in flight per thread to keep all threads
-  // busy; 2-4 works
-  pipeline.run(nbThreads * 4);
-  tbb::tick_count t1 = tbb::tick_count::now();
-  LOG(INFO) << "time = " << (t1 - t0).seconds();
-
-  return 1;
+    return 1;
 }
 
 int main(int argc, char *argv[]) {
-  (void)(argc);
-  (void)(argv);
-  int retval = 0;
+    (void) (argc);
+    (void) (argv);
+    int retval = 0;
 
-  if (argc < 2) {
-    LOG(DEBUG) << "no arguments provided to scdaq, try --help";
-    return 1;
-  }
+    uint num_streams{};
+    std::string config_basename{};
 
-  if ((std::string(argv[1]) == "-h") || (std::string(argv[1]) == "--help")) {
-    LOG(DEBUG) << "HELP: expected arguments --config [configfilename]";
-    return 1;
-  }
+    if (argc < 2) {
+        LOG(DEBUG) << "no arguments provided to scdaq, try --help";
+        return 1;
+    }
 
-  if ((argc != 3)) {
-    LOG(ERROR) << "error occurred, number of arguments != 2, expected --config "
-                  "[configfilename] , try --help";
-    return 1;
-  }
+    if ((std::string(argv[1]) == "-h") || (std::string(argv[1]) == "--help")) {
+        LOG(DEBUG) << "HELP: expected arguments --config [config basename] --nstreams [number of parallel streams]";
+        return 1;
+    }
 
-  if (std::string(argv[1]) == "--config") {
-    LOG(DEBUG) << "scdaq started with conffile:   " << std::string(argv[2]);
-  } else {
-    LOG(ERROR) << "invalid argument, expected --config, see --help";
-    return 1;
-  }
-
-  try {
-    config conf(argv[2]);
-    conf.print();
-    tools::log::set_filter_min_severity(conf.getLogMinSeverity());
-    LOG(DEBUG) << "Configuration loaded";
-    ctrl control{};
-    //    tbb::tick_count mainStartTime = tbb::tick_count::now();
-
-    control.running = false;
-    control.run_number = 0;
-    control.max_file_size = conf.getOutputMaxFileSize();  // in Bytes
-    control.packets_per_report = static_cast<int>(conf.getPacketsPerReport());
-    control.output_force_write = conf.getOutputForceWrite();
-    control.n_orbits_per_packet = static_cast<int>(conf.getNOrbitsPerPacket());
-
-    // Firmware needs at least 1MB buffer for DMA
-    if (conf.getDmaPacketBufferSize() < 1024 * 1024) {
-      LOG(ERROR) << "dma_packet_buffer_size must be at least 1048576 bytes (1MB), but "
-                 << conf.getDmaPacketBufferSize()
-                 << " bytes was given. Check the configuration file.";
-      return 1;
+    if ((argc < 3)) {
+        LOG(ERROR)
+            << "error occurred, number of arguments is under 2, expected --config [config basename] (and optionally --nstreams [number of parallel streams]), try --help";
+        return 1;
+    }
+
+    if (std::string(argv[1]) == "--config") {
+        LOG(DEBUG) << "scdaq started with config files basename:   " << std::string(argv[2]);
+        config_basename.assign(argv[2]);
+    } else {
+        LOG(ERROR) << "invalid argument, expected --config, see --help";
+        return 1;
+    }
+
+    if (std::string(argv[3]) == "--nstreams") {
+        num_streams = static_cast<uint>(std::stoi(argv[4]));
+        LOG(DEBUG) << "scdaq started with number of parallel streams / config files:   " << std::string(argv[4]);
+        assert(num_streams <= NUM_PIPELINES); // Exceeds number of available pipelines; change directive and recompile.
+    } else {
+        LOG(WARNING) << "scdaq is implicitly processing a single stream configured at " << config_basename;
+        return 1;
+    }
+
+    std::vector<std::string> config_filenames{};
+
+
+    if (!num_streams) { // No number of streams specified, assumes single config file with provided basename
+        config_filenames.emplace_back(config_basename);
+    } else { // Assumes basename followed by a natural integer between 1..N streams
+        std::string::size_type dot_position = config_basename.find_last_of('.');
+        std::string base, postfix{};
+        if (dot_position == std::string::npos) {
+            base = config_basename;
+        } else {
+            base = config_basename.substr(0, dot_position);
+            postfix = "." + config_basename.substr(dot_position + 1, std::string::npos);
+        }
+
+        for (uint i = 0; i < num_streams; ++i) {
+            config_filenames.emplace_back(base);
+            config_filenames.back().append(std::to_string(i + 1)).append(postfix);
+        }
     }
 
-    boost::asio::io_service io_service;
-    server s(io_service, conf.getPortNumber(), control);
-    boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service));
 
-    uint nbThreads = conf.getNumThreads();
+    std::deque<DAQPipeline> pipes;
+
+    for (auto &filename : config_filenames) {
+        try {
+            std::cout << "Loading: " << filename << std::endl;
+            pipes.emplace_back(config(filename));
+            LOG(DEBUG) << "-Configuration loaded";
+            LOG(DEBUG) << "-Pipeline set up";
+        } catch (std::invalid_argument &e) {
+            LOG(FATAL) << "-Configuration invalid! Error text is \"" << e.what() << "\" Bailing out.";
+            return EX_CONFIG;
+        } catch (std::exception &e) {
+            LOG(ERROR) << "-Error occurred. error text is: \"" << e.what() << "\"";
+            return 1;
+        }
+    }
+
+
+    ////////////// I/O
+    boost::asio::io_service io_services[NUM_PIPELINES];
+    boost::thread ts[NUM_PIPELINES];
+
+    std::vector<server> servers;
+    for (uint i = 0; i < 1; ++i) {
+        servers.emplace_back(io_services[i], pipes[i].fConf.getPortNumber(), pipes[i].fControl);
+        ts[i] = boost::thread(boost::bind(&boost::asio::io_service::run, &io_services[i]));
+    }
+    uint nbThreads = pipes[0].fConf.getNumThreads(); // FIXME
+
 
     retval = 0;
     try {
-      tbb::task_scheduler_init init(static_cast<int>(nbThreads));
-      if (!run_pipeline(nbThreads, control, conf)) {
+        tbb::task_scheduler_init init(static_cast<int>(nbThreads));
+        if (!run_pipelines(nbThreads, pipes, num_streams)) {
+            retval = 1;
+            // Will terminate with error code set
+        }
+    } catch (std::exception &e) {
+        LOG(ERROR) << "Error in pipelines. Error text is: \"" << e.what() << "\"";
         retval = 1;
         // Will terminate with error code set
-      }
-    } catch (std::exception &e) {
-      LOG(ERROR) << "Error in pipelines. Error text is: \"" << e.what() << "\"";
-      retval = 1;
-      // Will terminate with error code set
-      LOG(INFO) << "Resetting board at http://" << conf.getSconeHost() << ":" << conf.getSconePort()
-                << "/" << conf.getSconeBoard() << " before exiting.. ";
-      int res = tools::reset_board(conf.getSconeHost(), conf.getSconePort(), conf.getSconeBoard());
-      if (res != 0) {
-        LOG(ERROR) << "Reset failed! Error code: " << res;
-      } else {
-        LOG(INFO) << "Successfully reset the board.";
-      }
+//        LOG(INFO) << "Resetting board at http://" << conf.getSconeHost() << ":" << conf.getSconePort()
+//                  << "/" << conf.getSconeBoard() << " before exiting.. ";
+//        int res = tools::reset_board(conf.getSconeHost(), conf.getSconePort(), conf.getSconeBoard());
+//        if (res != 0) {
+//            LOG(ERROR) << "Reset failed! Error code: " << res;
+//        } else {
+//            LOG(INFO) << "Successfully reset the board.";
+//        }
     }
 
     LOG(DEBUG) << "Terminating the internal TCP server.";
-    io_service.stop();
-    if (!t.try_join_for(boost::chrono::milliseconds(2000))) {
-      LOG(ERROR) << "Failed to terminate the internal TCP server, the program "
-                    "may crash on exit.";
-    } else {
-      LOG(DEBUG) << "Internal TCP server is terminated.";
+    for (int i = 0; i < num_streams; ++i) {
+        io_services[i].stop();
+        if (!ts[i].try_join_for(boost::chrono::milliseconds(2000))) {
+            LOG(ERROR) << "Failed to terminate the internal TCP server, the program "
+                          "may crash on exit.";
+        } else {
+            LOG(DEBUG) << "Internal TCP server is terminated.";
+        }
     }
 
+
     //    utility::report_elapsed_time((tbb::tick_count::now() -
     //    mainStartTime).seconds());
     return retval;
-  } catch (std::invalid_argument &e) {
-    LOG(FATAL) << "Configuration invalid! Error text is \"" << e.what() << "\" Bailing out.";
-    return EX_CONFIG;
-  } catch (std::exception &e) {
-    LOG(ERROR) << "Error occurred. error text is: \"" << e.what() << "\"";
-    return 1;
-  }
 }
-- 
GitLab