diff --git a/src/DaqPipeline.cc b/src/DaqPipeline.cc new file mode 100644 index 0000000000000000000000000000000000000000..d34a3e9a9f9cb2b69ad3914b94c5bb0fcf47f16b --- /dev/null +++ b/src/DaqPipeline.cc @@ -0,0 +1,2 @@ +#include "DaqPipeline.h" + diff --git a/src/DaqPipeline.h b/src/DaqPipeline.h new file mode 100644 index 0000000000000000000000000000000000000000..0ba10bdcc301f1e5ebde1302d020922ab6c5f073 --- /dev/null +++ b/src/DaqPipeline.h @@ -0,0 +1,245 @@ +#include <sysexits.h> + +#include <cctype> +#include <cstdio> +#include <cstdlib> +#include <cstring> +#include <iostream> +#include <memory> + +#include "tbb/pipeline.h" +#include "tbb/concurrent_queue.h" +#include "tbb/task_group.h" +#include "tbb/task_scheduler_init.h" +#include "tbb/tbb_allocator.h" +#include "tbb/tick_count.h" + +#include <boost/asio.hpp> +#include <boost/bind.hpp> +#include <boost/thread.hpp> + +#include "tools.h" +#include "format.h" +#include "config.h" +#include "server.h" +#include "controls.h" +#include "processor.h" +#include "elastico.h" +#include "InputFilter.h" +#include "OutputByOrbit.h" +#include "OutputBySize.h" +#include "TcpInputFilter.h" +#include "DmaInputFilter.h" +#include "MicronDmaInputFilter.h" +#include "FileDmaInputFilter.h" +#include "WZDmaInputFilter.h" + +class DAQPipeline { + friend class MultiPipeline; + using OutputFilter = tbb::filter; + + +public: + static ctrl control; + + config fConf; + + std::shared_ptr<InputFilter> fInputFilter; + StreamProcessor fStreamProcessor; + ElasticProcessor fElasticProcessor; + std::shared_ptr<OutputFilter> fOutputStream; + tbb::pipeline fPipeline; + +// void FromConfig(config conf) { +// +// } + + explicit DAQPipeline(config conf) + : fConf(conf), + fStreamProcessor(conf.getDmaPacketBufferSize(), conf.getDoZS(), conf.getProcessorType(), + conf.getNOrbitsPerPacket(), conf.getPrescaleFactor(), + conf.getCMSSWHeaders(), static_cast<uint16_t>(conf.getSourceID()), + control), + fElasticProcessor(conf.getDmaPacketBufferSize(), &control, conf.getElasticUrl(), + conf.getPtCut(), conf.getQualCut()) { + + + tools::log::set_filter_min_severity(fConf.getLogMinSeverity()); + fConf.print(); + const config::InputType &input = fConf.getInput(); + const size_t &nbPacketBuffers = fConf.getNumberOfDmaPacketBuffers(); + const size_t &packetBufferSize = fConf.getDmaPacketBufferSize(); + + if (input == config::InputType::DMA) { + fInputFilter = std::make_shared<DmaInputFilter>(fConf.getDmaDevice(), packetBufferSize, + nbPacketBuffers, control); + } else if (input == config::InputType::FILEDMA) { + // Create FILE DMA reader + fInputFilter = std::make_shared<FileDmaInputFilter>(fConf.getInputFile(), packetBufferSize, + nbPacketBuffers, control); + + } else if (input == config::InputType::WZDMA) { + // Create WZ DMA reader + fInputFilter = + std::make_shared<WZDmaInputFilter>(packetBufferSize, nbPacketBuffers, control, fConf); + + } else if (input == config::InputType::MICRONDMA) { + // create MicronDmaInputFilter reader + fInputFilter = std::make_shared<MicronDmaInputFilter>(packetBufferSize, nbPacketBuffers, + control, fConf); + + } else if (input == config::InputType::TCPIP) { + // create TcpInputFilter reader + if (fConf.getDev_TCPAutoReconnectOnFailure()) { + fInputFilter = std::make_shared<TcpInputFilter>( + fConf.getTcpDestPort(), packetBufferSize, nbPacketBuffers, fConf.getNOrbitsPerPacket(), + control, fConf); + + } else { + exit(2); + // throw std::invalid_argument( + // "Configuration error: enable developmentMode to use TCP input + // filter"); + } + } else { + exit(3); + // throw std::invalid_argument("Configuration error: Unknown input type was + // specified"); + } + + + // Add input reader to a pipeline + fPipeline.add_filter(*fInputFilter); + + // Create reformatter and add it to the pipeline + if (fConf.getEnableStreamProcessor()) { + fPipeline.add_filter(fStreamProcessor); + } + + // Create elastic populator (if requested) + if (fConf.getEnableElasticProcessor()) { + fPipeline.add_filter(fElasticProcessor); + } + + // Create file-writing stage and add it to the pipeline + if (fConf.getNOrbitsPerFile()) { + fOutputStream = std::make_shared<OutputByOrbitStream>(control, fConf); + } else { + fOutputStream = std::make_shared<OutputBySizeStream>( + fConf.getOutputFilenameBase(), fConf.getOutputFilenamePrefix(), DAQPipeline::control); + } + fPipeline.add_filter(*fOutputStream); + + // tbb::tick_count mainStartTime = tbb::tick_count::now(); + control.running = false; + control.run_number = 0; + control.max_file_size = fConf.getOutputMaxFileSize(); // in Bytes + control.packets_per_report = static_cast<int>(fConf.getPacketsPerReport()); + control.output_force_write = fConf.getOutputForceWrite(); + control.verbosity = fConf.getVerbosity(); + control.n_orbits_per_packet = static_cast<int>(fConf.getNOrbitsPerPacket()); + + // Firmware needs at least 1MB buffer for DMA + if (fConf.getDmaPacketBufferSize() < 1024 * 1024) { + LOG(ERROR) << "dma_packet_buffer_size must be at least 1048576 bytes (1MB), but " + << fConf.getDmaPacketBufferSize() + << " bytes was given. Check the configuration file."; + exit(1); + } + } +}; + +class MultiPipeline { +public: + std::deque<DAQPipeline> fPipelines; + + std::unique_ptr<tbb::task_scheduler_init> fSchedulerInit; + boost::asio::io_service io_service; + boost::thread fServerThread ; + std::unique_ptr<server> fServer; + uint fNbThreads = 0; + uint fNumStreams = 0; + + + bool fVerbose = true; + + int FromConfigs(std::vector<config> &configs) { + + for (auto &config: configs) { + try { + fPipelines.emplace_back(config); + LOG(DEBUG) << "-Configuration loaded"; + LOG(DEBUG) << "-Pipeline set up"; + } catch (std::invalid_argument &e) { + LOG(FATAL) << "-Configuration invalid! Error text is \"" << e.what() << "\" Bailing out."; + return EX_CONFIG; + } catch (std::exception &e) { + LOG(ERROR) << "-Error occurred. error text is: \"" << e.what() << "\""; + return 1; + } + } + return 0; + } + + void SetupService(uint numStreams) { + fServer = std::unique_ptr<server>(new server(io_service, fPipelines[0].fConf.getPortNumber(), DAQPipeline::control)); + fServerThread = boost::thread(boost::bind(&boost::asio::io_service::run, &io_service));; + fNbThreads = numStreams * fPipelines[0].fConf.getNumThreads(); // FIXME + fNumStreams = numStreams; + } + + int Run() { + int retval = 0; + + try { + fSchedulerInit = std::unique_ptr<tbb::task_scheduler_init>( + new tbb::task_scheduler_init(static_cast<int>(fNbThreads))); + if (!RunPipelines(fNumStreams)) { + retval = 1; // Will terminate with error code set + } + } catch (std::exception &e) { + config &conf = fPipelines.at(0).fConf; + LOG(ERROR) << "Error in pipelines. Error text is: \"" << e.what() << "\""; + retval = 1; // Will terminate with error code set + LOG(INFO) << "Resetting board at http://" << conf.getSconeHost() << ":" << + conf.getSconePort() + << "/" << conf.getSconeBoard() << " before exiting.. "; + int res = tools::reset_board(conf.getSconeHost(), conf.getSconePort(), + conf.getSconeBoard()); + if (res != 0) { + LOG(ERROR) << "Reset failed! Error code: " << res; + } else { + LOG(INFO) << "Successfully reset the board."; + } + } + + LOG(DEBUG) << "Terminating the internal TCP server."; + io_service.stop(); + if (!fServerThread.try_join_for(boost::chrono::milliseconds(2000))) { + LOG(ERROR) << "Failed to terminate the internal TCP server, the program " + "may crash on exit."; + } else { + LOG(DEBUG) << "Internal TCP server is terminated."; + } + + + return retval; + } + + + int RunPipelines(int numStreams) { + tbb::task_group group; + // Run pipelines and measure real time + tbb::tick_count t0 = tbb::tick_count::now(); + for (int i = 0; i < numStreams; ++i) { + group.run([i = i, this] { return this->fPipelines.at(i).fPipeline.run(this->fNbThreads * 4); }); + } + group.wait(); + tbb::tick_count t1 = tbb::tick_count::now(); + + if (fVerbose) { + LOG(INFO) << "time = " << (t1 - t0).seconds(); + } + return 1; + } +}; diff --git a/src/Makefile b/src/Makefile index 5f297d31cff8be01174e885319c0c40f7e02532a..fea4bceea76347f8eb4844ce5bb2bcee7cd1c94b 100644 --- a/src/Makefile +++ b/src/Makefile @@ -12,7 +12,7 @@ TARGET = scdaq # source files -SOURCES = config.cc DmaInputFilter.cc elastico.cc FileDmaInputFilter.cc InputFilter.cc MicronDmaInputFilter.cc OutputByOrbit.cc OutputBySize.cc OutputFileHandler.cc processor.cc scdaq.cc session.cc slice.cc TcpInputFilter.cc tools.cc WZDmaInputFilter.cc +SOURCES = config.cc DmaInputFilter.cc elastico.cc FileDmaInputFilter.cc InputFilter.cc MicronDmaInputFilter.cc OutputByOrbit.cc OutputBySize.cc OutputFileHandler.cc processor.cc scdaq.cc session.cc slice.cc TcpInputFilter.cc tools.cc WZDmaInputFilter.cc DaqPipeline.cc C_SOURCES = wz_dma.c # work out names of object files from sources @@ -33,13 +33,13 @@ LDFLAGS = -Llibmicron -ltbb -ltbbmalloc -lboost_thread -lboost_chrono -lcurl -lp CPPFLAGS = -I. -Iwzdma -cleanprogress: +clean_progress: rm -rf ${TEST_PROGRESS_DIR} TGT_CLEAN_PROG = cleanprogress # default target (to build all) -all: cleanprogress ${TARGET} +all: ${TARGET} # clean target clean: @@ -64,7 +64,7 @@ ${TARGET}: ${OBJECTS} #test2.o : product.h test2.h -scdaq.o: processor.h elastico.h OutputBySize.h OutputByOrbit.h format.h server.h controls.h config.h session.h log.h +scdaq.o: processor.h elastico.h OutputBySize.h OutputByOrbit.h DaqPipeline.h format.h server.h controls.h config.h session.h log.h config.o: config.h log.h DmaInputFilter.o: DmaInputFilter.h slice.h elastico.o: elastico.h format.h slice.h controls.h log.h @@ -81,3 +81,4 @@ TcpInputFilter.o: TcpInputFilter.h InputFilter.h tools.h log.h wz_dma.o: wz_dma.h MicronDmaInputFilter.o: MicronDmaInputFilter.h tools.o: tools.h log.h +DaqPipeline.o: DaqPipeline.h diff --git a/src/TcpInputFilter.cc b/src/TcpInputFilter.cc index 6c24a5ebf19dd32b2bb31ad04659a80231d0ae8f..82ab6cb199ab7be94bc0b22edaa0bd380630486a 100644 --- a/src/TcpInputFilter.cc +++ b/src/TcpInputFilter.cc @@ -39,11 +39,11 @@ TcpInputFilter::TcpInputFilter(int destPort, size_t packetBufferSize, size_t nbP bzero(&(server_addr_.sin_zero), 8); // bind socket to address - if (bind(sock_, (struct sockaddr *)&server_addr_, sizeof(struct sockaddr)) == -1) { - throw std::system_error(errno, std::system_category(), "Unable to bind."); + if (bind(sock_, (struct sockaddr *)&server_addr_, sizeof(struct sockaddr)) == -1) { + throw std::system_error(errno, std::system_category(), "Unable to bind."); } - // check if listen is possible + // check if listen is possible if (listen(sock_, 1) == -1) { throw std::system_error(errno, std::system_category(), "Unable to listen."); } diff --git a/src/scdaq.cc b/src/scdaq.cc index f6ad96c5cdbf3dea3bae84d6df7dd07d85d5b10d..dfc1e824e3b148f16af28a109bec895d407a63a5 100644 --- a/src/scdaq.cc +++ b/src/scdaq.cc @@ -1,301 +1,96 @@ -#include <sysexits.h> - -#include <boost/asio.hpp> -#include <boost/bind.hpp> -#include <boost/thread.hpp> -#include <cctype> -#include <cstdio> -#include <cstdlib> #include <cstring> #include <iostream> -#include <string> +#include <memory> -#include "DmaInputFilter.h" -#include "FileDmaInputFilter.h" -#include "InputFilter.h" -#include "MicronDmaInputFilter.h" -#include "OutputByOrbit.h" -#include "OutputBySize.h" -#include "TcpInputFilter.h" -#include "WZDmaInputFilter.h" #include "config.h" -#include "controls.h" -#include "elastico.h" -#include "format.h" #include "log.h" -#include "processor.h" -#include "server.h" -#include "tbb/concurrent_queue.h" -#include "tbb/pipeline.h" -#include "tbb/task_group.h" -#include "tbb/task_scheduler_init.h" -#include "tbb/tbb_allocator.h" -#include "tbb/tick_count.h" -#include "tools.h" - -using namespace std; +#include "DaqPipeline.h" #define NUM_PIPELINES 10 -bool silent = false; - -class DAQPipeline { - public: - config fConf; - ctrl fControl{}; - - std::shared_ptr<InputFilter> fInputFilter; - StreamProcessor fStreamProcessor; - ElasticProcessor fElasticProcessor; - std::shared_ptr<tbb::filter> fOutputStream; - tbb::pipeline fPipeline; - - explicit DAQPipeline(config conf) - : fConf(conf), - fStreamProcessor(conf.getDmaPacketBufferSize(), conf.getDoZS(), conf.getProcessorType(), - conf.getNOrbitsPerPacket(), conf.getPrescaleFactor(), - conf.getCMSSWHeaders(), static_cast<uint16_t>(conf.getSourceID()), - fControl), - fElasticProcessor(conf.getDmaPacketBufferSize(), &fControl, conf.getElasticUrl(), - conf.getPtCut(), conf.getQualCut()) { - tools::log::set_filter_min_severity(fConf.getLogMinSeverity()); - fConf.print(); - const config::InputType &input = fConf.getInput(); - const size_t &nbPacketBuffers = fConf.getNumberOfDmaPacketBuffers(); - const size_t &packetBufferSize = fConf.getDmaPacketBufferSize(); - - if (input == config::InputType::DMA) { - fInputFilter = std::make_shared<DmaInputFilter>(fConf.getDmaDevice(), packetBufferSize, - nbPacketBuffers, fControl); - } else if (input == config::InputType::FILEDMA) { - // Create FILE DMA reader - fInputFilter = std::make_shared<FileDmaInputFilter>(fConf.getInputFile(), packetBufferSize, - nbPacketBuffers, fControl); - - } else if (input == config::InputType::WZDMA) { - // Create WZ DMA reader - fInputFilter = - std::make_shared<WZDmaInputFilter>(packetBufferSize, nbPacketBuffers, fControl, fConf); - - } else if (input == config::InputType::MICRONDMA) { - // create MicronDmaInputFilter reader - fInputFilter = std::make_shared<MicronDmaInputFilter>(packetBufferSize, nbPacketBuffers, - fControl, fConf); - - } else if (input == config::InputType::TCPIP) { - // create TcpInputFilter reader - if (fConf.getDev_TCPAutoReconnectOnFailure()) { - fInputFilter = std::make_shared<TcpInputFilter>( - fConf.getTcpDestPort(), packetBufferSize, nbPacketBuffers, fConf.getNOrbitsPerPacket(), - fControl, fConf); - - } else { - exit(2); - // throw std::invalid_argument( - // "Configuration error: enable developmentMode to use TCP input - // filter"); - } - } else { - exit(3); - // throw std::invalid_argument("Configuration error: Unknown input type was - // specified"); - } - - // Add input reader to a pipeline - fPipeline.add_filter(*fInputFilter); - - // Create reformatter and add it to the pipeline - if (fConf.getEnableStreamProcessor()) { - fPipeline.add_filter(fStreamProcessor); - } - - // Create elastic populator (if requested) - if (fConf.getEnableElasticProcessor()) { - fPipeline.add_filter(fElasticProcessor); - } - - // Create file-writing stage and add it to the pipeline - if (fConf.getNOrbitsPerFile()) { - fOutputStream = std::make_shared<OutputByOrbitStream>(fControl, fConf); +void LoadConfigs(std::string &basename, uint numStreams, std::vector<config> &out_configs) { + if (!numStreams) { + // No number of streams specified, assumes unique config file with provided basename + out_configs.emplace_back(basename); } else { - fOutputStream = std::make_shared<OutputBySizeStream>( - fConf.getOutputFilenameBase(), fConf.getOutputFilenamePrefix(), fControl); + // Assumes basename followed by a natural integer between 1..N streams + std::string::size_type dot_position = basename.find_last_of('.'); + std::string base, postfix{}; + if (dot_position == std::string::npos) { + base = basename; + } else { + base = basename.substr(0, dot_position); + postfix = "." + basename.substr(dot_position + 1, std::string::npos); + } + + for (uint i = 0; i < numStreams; ++i) { + out_configs.emplace_back(base + std::to_string(i + 1) + postfix); + } } - fPipeline.add_filter(*fOutputStream); - - // tbb::tick_count mainStartTime = tbb::tick_count::now(); - fControl.running = false; - fControl.run_number = 0; - fControl.max_file_size = fConf.getOutputMaxFileSize(); // in Bytes - fControl.packets_per_report = static_cast<int>(fConf.getPacketsPerReport()); - fControl.output_force_write = fConf.getOutputForceWrite(); - fControl.verbosity = fConf.getVerbosity(); - fControl.n_orbits_per_packet = static_cast<int>(fConf.getNOrbitsPerPacket()); - - // Firmware needs at least 1MB buffer for DMA - if (fConf.getDmaPacketBufferSize() < 1024 * 1024) { - LOG(ERROR) << "dma_packet_buffer_size must be at least 1048576 bytes (1MB), but " - << fConf.getDmaPacketBufferSize() - << " bytes was given. Check the configuration file."; - exit(1); - } - } -}; - -int run_pipelines(uint nbThreads, std::deque<DAQPipeline> &pipelines, int numStreams) { - tbb::task_group group; - // Run the pipeline - tbb::tick_count t0 = tbb::tick_count::now(); - for (int i = 0; i < numStreams; ++i) { - // Need more than one token in flight per thread to keep all threads - // busy; 2-4 works - // try { - // pipelines.at(i).fPipeline.run(nbThreads * 4); - // } catch (std::out_of_range const &e) { - // std::cout << e.what() << std::endl; - // } - group.run( - [&pipelines, i = i, &nbThreads] { return pipelines.at(i).fPipeline.run(nbThreads * 4); }); - } - group.wait(); - tbb::tick_count t1 = tbb::tick_count::now(); - - if (!silent) { - LOG(INFO) << "time = " << (t1 - t0).seconds(); - } - - return 1; } -int main(int argc, char *argv[]) { - (void)(argc); - (void)(argv); - int retval = 0; - - uint num_streams{}; - std::string config_basename{}; - - if (argc < 2) { - LOG(DEBUG) << "no arguments provided to scdaq, try --help"; - return 1; - } - if ((std::string(argv[1]) == "-h") || (std::string(argv[1]) == "--help")) { - LOG(DEBUG) << "HELP: expected arguments --config [config basename] --nstreams [number of " - "parallel streams]"; - return 1; - } +ctrl DAQPipeline::control; - if ((argc < 3)) { - LOG(ERROR) << "error occurred, number of arguments is under 2, expected --config [config " - "basename] (and optionally --nstreams [number of parallel streams]), try --help"; - return 1; - } - - if (std::string(argv[1]) == "--config") { - LOG(DEBUG) << "scdaq started with config files basename: " << std::string(argv[2]); - config_basename.assign(argv[2]); - } else { - LOG(ERROR) << "invalid argument, expected --config, see --help"; - return 1; - } - - if (std::string(argv[3]) == "--nstreams") { - num_streams = static_cast<uint>(std::stoi(argv[4])); - LOG(DEBUG) << "scdaq started with number of parallel streams / config files: " - << std::string(argv[4]); - assert( - num_streams <= - NUM_PIPELINES); // Exceeds number of available pipelines; change directive and recompile. - } else { - LOG(WARNING) << "scdaq is implicitly processing a single stream configured at " - << config_basename; - return 1; - } - - std::vector<std::string> config_filenames{}; +int main(int argc, char *argv[]) { + (void) (argc); + (void) (argv); - if (!num_streams) { // No number of streams specified, assumes single config file with provided - // basename - config_filenames.emplace_back(config_basename); - } else { // Assumes basename followed by a natural integer between 1..N streams - std::string::size_type dot_position = config_basename.find_last_of('.'); - std::string base, postfix{}; - if (dot_position == std::string::npos) { - base = config_basename; - } else { - base = config_basename.substr(0, dot_position); - postfix = "." + config_basename.substr(dot_position + 1, std::string::npos); + if (argc < 2) { + LOG(DEBUG) << "no arguments provided to scdaq, try --help"; + return 1; } - for (uint i = 0; i < num_streams; ++i) { - config_filenames.emplace_back(base); - config_filenames.back().append(std::to_string(i + 1)).append(postfix); + if ((std::string(argv[1]) == "-h") || (std::string(argv[1]) == "--help")) { + LOG(DEBUG) << "HELP: expected arguments --config [config basename] --nstreams [number of " + "parallel streams]"; + return 1; } - } - - std::deque<DAQPipeline> pipes; - for (auto &filename : config_filenames) { - try { - std::cout << "Loading: " << filename << std::endl; - pipes.emplace_back(config(filename)); - LOG(DEBUG) << "-Configuration loaded"; - LOG(DEBUG) << "-Pipeline set up"; - } catch (std::invalid_argument &e) { - LOG(FATAL) << "-Configuration invalid! Error text is \"" << e.what() << "\" Bailing out."; - return EX_CONFIG; - } catch (std::exception &e) { - LOG(ERROR) << "-Error occurred. error text is: \"" << e.what() << "\""; - return 1; + if ((argc < 3)) { + LOG(ERROR) << "error occurred, number of arguments is under 2, expected --config [config " + "basename] (and optionally --nstreams [number of parallel streams]), try --help"; + return 1; } - } - ////////////// I/O - boost::asio::io_service io_services[NUM_PIPELINES]; - boost::thread ts[NUM_PIPELINES]; + uint num_streams; + std::string config_basename{}; - std::vector<server> servers; - for (uint i = 0; i < num_streams; ++i) { - servers.emplace_back(io_services[i], pipes[i].fConf.getTcpDestPort(), pipes[i].fControl); - ts[i] = boost::thread(boost::bind(&boost::asio::io_service::run, &io_services[i])); - } - uint nbThreads = num_streams * pipes[0].fConf.getNumThreads(); // FIXME - - retval = 0; - try { - tbb::task_scheduler_init init(static_cast<int>(nbThreads)); - if (!run_pipelines(nbThreads, pipes, num_streams)) { - retval = 1; - // Will terminate with error code set + if (std::string(argv[1]) == "--config") { + LOG(DEBUG) << "scdaq started with config files basename: " << std::string(argv[2]); + config_basename.assign(argv[2]); + } else { + LOG(ERROR) << "invalid argument, expected --config, see --help"; + return 1; } - } catch (std::exception &e) { - LOG(ERROR) << "Error in pipelines. Error text is: \"" << e.what() << "\""; - retval = 1; - // Will terminate with error code set - // LOG(INFO) << "Resetting board at http://" << conf.getSconeHost() << ":" << - // conf.getSconePort() - // << "/" << conf.getSconeBoard() << " before exiting.. "; - // int res = tools::reset_board(conf.getSconeHost(), conf.getSconePort(), - // conf.getSconeBoard()); if (res != 0) { - // LOG(ERROR) << "Reset failed! Error code: " << res; - // } else { - // LOG(INFO) << "Successfully reset the board."; - // } - } - LOG(DEBUG) << "Terminating the internal TCP server."; - for (int i = 0; i < num_streams; ++i) { - io_services[i].stop(); - if (!ts[i].try_join_for(boost::chrono::milliseconds(2000))) { - LOG(ERROR) << "Failed to terminate the internal TCP server, the program " - "may crash on exit."; + if (std::string(argv[3]) == "--nstreams") { + num_streams = static_cast<uint>(std::stoi(argv[4])); + LOG(DEBUG) << "scdaq started with number of parallel streams / config files: " + << std::string(argv[4]); + // Check if `nstreams` exceeds number of available pipelines --> recompile with a larger NUM_PIPELINES. + assert( + num_streams <= + NUM_PIPELINES); } else { - LOG(DEBUG) << "Internal TCP server is terminated."; + LOG(WARNING) << "scdaq is implicitly processing a single stream configured at " + << config_basename; + return 1; } - } - // utility::report_elapsed_time((tbb::tick_count::now() - - // mainStartTime).seconds()); - return retval; + // Collect loaded config instances from configuration files + std::vector<config> configs{}; + LoadConfigs(config_basename, num_streams, configs); + + // Instantiate multiple pipelines (up to the directive-defined limit) with the settings from the config instances. + MultiPipeline multi_pipeline; + multi_pipeline.FromConfigs(configs); + // Spin up servers and spawn threads + multi_pipeline.SetupService(num_streams); + // Launch blocking call to run the pipelines in parallel until all streams are severed + int ret = multi_pipeline.Run(); + + // utility::report_elapsed_time((tbb::tick_count::now() - + // mainStartTime).seconds()); + return ret; } diff --git a/src/tcp-test.conf b/src/tcp-test.conf new file mode 100644 index 0000000000000000000000000000000000000000..6b2c4b22d64b69b2694231aa60c6bae068f4ffd0 --- /dev/null +++ b/src/tcp-test.conf @@ -0,0 +1,136 @@ +################################################################################ +## +## Input settings +## +################################################################################ + +# Input settings, allowed values are: +# "wzdma" for DMA driver from Wojciech M. Zabolotny +# "dma" for XILINX DMA driver +# "filedma" for reading from file and simulating DMA +# "micronDMA" for PICO driver +# "tcpip" for TCP/IP input receving + +input:filedma + +## Settings for DMA input + +# DMA device +dma_dev:/dev/xdma0_c2h_0 + +# Max received packet size in bytes (buffer to reserve) +dma_packet_buffer_size:1261568 + +# Number of packet buffers to allocate +dma_number_of_packet_buffers:1000 + +# Print report each N packets, use 0 to disable +packets_per_report:2000 + +# number of orbits per packet, in decimal +nOrbitsPerPacket:1 + +# prescale factor, used for *calo* data only +prescale_factor:1 + +## Extra settings for "filedma" input + +#input_file:/dev/shm/testdata.bin +#input_file:/home/glazzari/repos/scdaq/test/data/passthrough_test1.dat +input_file:./test/data/calo_testfile.dat + +## Extra settings for "tcpip" input +tcpDestPort:9000 + + +################################################################################ +## +## Stream processor settings +## +################################################################################ + +enable_stream_processor:yes + +# Define processing type (unpacking), allowed values are: +# "PASS_THROUGH" +# "GMT" +# "CALO" +# Note: When changing the processing type, change also "output_filename_prefix" +# in the file output section. +# +processor_type:PASS_THROUGH + +# Enable software zero-supression +doZS:yes + +################################################################################ +## +## File output settings +## +################################################################################ + +output_filename_prefix:tcp-single-scouting_gmt_1 + +output_filename_base:test/in_progress + +max_file_size:8589934592 + +# Always write data to a file regardless of the run status, useful for debugging +output_force_write:no + + +################################################################################ +## +## Elastics processor settings (obsolete) +## +################################################################################ + +enable_elastic_processor:no + +port:8000 +elastic_url:http://something.somewhere +pt_cut:7 +quality_cut:12 + + +################################################################################ +## +## SCDAQ Generic Settings +## +################################################################################ + +# enable development functionalities (e.g. TCP input filter) +dev_TCPAutoReconnectOnFailure:true + +## Logging, supported LOG severities: +# TRACE +# DEBUG +# INFO +# WARNING +# ERROR +# FATAL +# +# Log only severities at the same level or more severe than the log_min_severity +# Use TRACE to log everything +# +log_min_severity:TRACE + +# Pipeline settings +threads:8 + +# verbosity level, currently supports 0 and 1 +verbosity:0 + +# N orbits to store to each file +# Configured to store fixed number of orbits per file when nOrbitsPerFile > 1 +# Set to 0 to use fixed file size instead +nOrbitsPerFile:4096 + +# Headers for cmssw support +cmsswHeaders:no + +## Information necessary to issue a reset request for the board +scone_host:localhost +scone_port:8080 +# Currently can be one of kcu1500_ugmt and kcu1500_demux +scone_board:vcu128_bmtf_tcp diff --git a/src/tcp-test1.conf b/src/tcp-test1.conf new file mode 100644 index 0000000000000000000000000000000000000000..0e6741e9bd2787bf29d0b78219a94dbacbf299c3 --- /dev/null +++ b/src/tcp-test1.conf @@ -0,0 +1,136 @@ +################################################################################ +## +## Input settings +## +################################################################################ + +# Input settings, allowed values are: +# "wzdma" for DMA driver from Wojciech M. Zabolotny +# "dma" for XILINX DMA driver +# "filedma" for reading from file and simulating DMA +# "micronDMA" for PICO driver +# "tcpip" for TCP/IP input receving + +input:tcpip + +## Settings for DMA input + +# DMA device +dma_dev:/dev/xdma0_c2h_0 + +# Max received packet size in bytes (buffer to reserve) +dma_packet_buffer_size:1261568 + +# Number of packet buffers to allocate +dma_number_of_packet_buffers:1000 + +# Print report each N packets, use 0 to disable +packets_per_report:2000 + +# number of orbits per packet, in decimal +nOrbitsPerPacket:1 + +# prescale factor, used for *calo* data only +prescale_factor:1 + +## Extra settings for "filedma" input + +#input_file:/dev/shm/testdata.bin +#input_file:/home/glazzari/repos/scdaq/test/data/passthrough_test1.dat +input_file:./test/data/calo_testfile.dat + +## Extra settings for "tcpip" input +tcpDestPort:10000 + + +################################################################################ +## +## Stream processor settings +## +################################################################################ + +enable_stream_processor:yes + +# Define processing type (unpacking), allowed values are: +# "PASS_THROUGH" +# "GMT" +# "CALO" +# Note: When changing the processing type, change also "output_filename_prefix" +# in the file output section. +# +processor_type:PASS_THROUGH + +# Enable software zero-supression +doZS:yes + +################################################################################ +## +## File output settings +## +################################################################################ + +output_filename_prefix:tcp-mp1-scouting_calo_1 + +output_filename_base:test/in_progress + +max_file_size:8589934592 + +# Always write data to a file regardless of the run status, useful for debugging +output_force_write:no + + +################################################################################ +## +## Elastics processor settings (obsolete) +## +################################################################################ + +enable_elastic_processor:no + +port:8000 +elastic_url:http://something.somewhere +pt_cut:7 +quality_cut:12 + + +################################################################################ +## +## SCDAQ Generic Settings +## +################################################################################ + +# enable development functionalities (e.g. TCP input filter) +dev_TCPAutoReconnectOnFailure:true + +## Logging, supported LOG severities: +# TRACE +# DEBUG +# INFO +# WARNING +# ERROR +# FATAL +# +# Log only severities at the same level or more severe than the log_min_severity +# Use TRACE to log everything +# +log_min_severity:TRACE + +# Pipeline settings +threads:8 + +# verbosity level, currently supports 0 and 1 +verbosity:0 + +# N orbits to store to each file +# Configured to store fixed number of orbits per file when nOrbitsPerFile > 1 +# Set to 0 to use fixed file size instead +nOrbitsPerFile:4096 + +# Headers for cmssw support +cmsswHeaders:no + +## Information necessary to issue a reset request for the board +scone_host:localhost +scone_port:8080 +# Currently can be one of kcu1500_ugmt and kcu1500_demux +scone_board:vcu128_bmtf_tcp diff --git a/src/tcp-test2.conf b/src/tcp-test2.conf new file mode 100644 index 0000000000000000000000000000000000000000..270c25326e6e1db63562fa83e7b8e029599a5823 --- /dev/null +++ b/src/tcp-test2.conf @@ -0,0 +1,137 @@ +################################################################################ +## +## Input settings +## +################################################################################ + +# Input settings, allowed values are: +# "wzdma" for DMA driver from Wojciech M. Zabolotny +# "dma" for XILINX DMA driver +# "filedma" for reading from file and simulating DMA +# "micronDMA" for PICO driver +# "tcpip" for TCP/IP input receving + +input:tcpip + +## Settings for DMA input + +# DMA device +dma_dev:/dev/xdma0_c2h_0 + +# Max received packet size in bytes (buffer to reserve) +dma_packet_buffer_size:1261568 + +# Number of packet buffers to allocate +dma_number_of_packet_buffers:1000 + +# Print report each N packets, use 0 to disable +packets_per_report:2000 + +# number of orbits per packet, in decimal +nOrbitsPerPacket:1 + +# prescale factor, used for *calo* data only +prescale_factor:1 + +## Extra settings for "filedma" input + +#input_file:/dev/shm/testdata.bin +#input_file:/home/glazzari/repos/scdaq/test/data/passthrough_test2.dat +input_file:./test/data/gmt_testfile2.dat + +## Extra settings for "tcpip" input +tcpDestPort:10010 + + +################################################################################ +## +## Stream processor settings +## +################################################################################ + +enable_stream_processor:yes + +# Define processing type (unpacking), allowed values are: +# "PASS_THROUGH" +# "GMT" +# "CALO" +# Note: When changing the processing type, change also "output_filename_prefix" +# in the file output section. +# +processor_type:PASS_THROUGH +#GMT + +# Enable software zero-supression +doZS:yes + +################################################################################ +## +## File output settings +## +################################################################################ + +output_filename_prefix:tcp-mp2-scouting_gmt + +output_filename_base:test/in_progress + +max_file_size:8589934592 + +# Always write data to a file regardless of the run status, useful for debugging +output_force_write:no + + +################################################################################ +## +## Elastics processor settings (obsolete) +## +################################################################################ + +enable_elastic_processor:no + +port:8000 +elastic_url:http://something.somewhere +pt_cut:7 +quality_cut:12 + + +################################################################################ +## +## SCDAQ Generic Settings +## +################################################################################ + +# enable development functionalities (e.g. TCP input filter) +dev_TCPAutoReconnectOnFailure:true + +## Logging, supported LOG severities: +# TRACE +# DEBUG +# INFO +# WARNING +# ERROR +# FATAL +# +# Log only severities at the same level or more severe than the log_min_severity +# Use TRACE to log everything +# +log_min_severity:TRACE + +# Pipeline settings +threads:8 + +# verbosity level, currently supports 0 and 1 +verbosity:0 + +# N orbits to store to each file +# Configured to store fixed number of orbits per file when nOrbitsPerFile > 1 +# Set to 0 to use fixed file size instead +nOrbitsPerFile:4096 + +# Headers for cmssw support +cmsswHeaders:no + +## Information necessary to issue a reset request for the board +scone_host:localhost +scone_port:8080 +# Currently can be one of kcu1500_ugmt and kcu1500_demux +scone_board:vcu128_bmtf_tcp