Code owners
Assign users and groups as approvers for specific file changes. Learn more.
scdaq.cc 6.77 KiB
#include "tbb/pipeline.h"
#include "tbb/tick_count.h"
#include "tbb/task_scheduler_init.h"
#include "tbb/tbb_allocator.h"
#include "tbb/concurrent_queue.h"
#include <cstring>
#include <cstdlib>
#include <cstdio>
#include <cctype>
#include <string>
#include <iostream>
#include <sysexits.h>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include "InputFilter.h"
#include "FileDmaInputFilter.h"
#include "WZDmaInputFilter.h"
#include "MicronDmaInputFilter.h"
#include "DmaInputFilter.h"
#include "processor.h"
#include "elastico.h"
#include "OutputBySize.h"
#include "OutputByOrbit.h"
#include "format.h"
#include "server.h"
#include "controls.h"
#include "config.h"
#include "log.h"
using namespace std;
bool silent = false;
int run_pipeline( int nbThreads, ctrl& control, config& conf )
{
config::InputType input = conf.getInput();
size_t packetBufferSize = conf.getDmaPacketBufferSize();
size_t nbPacketBuffers = conf.getNumberOfDmaPacketBuffers();
uint32_t prescaleFactor = conf.getPrescaleFactor();
// Create empty input reader, will assign later when we know what is the data source
std::shared_ptr<InputFilter> input_filter;
std::shared_ptr<OutputByOrbitStream> output_stream_by_orbit;
std::shared_ptr<OutputBySizeStream> output_stream_by_size;
// Create the pipeline
tbb::pipeline pipeline;
if (input == config::InputType::DMA) {
// Create DMA reader
input_filter = std::make_shared<DmaInputFilter>( conf.getDmaDevice(), packetBufferSize, nbPacketBuffers, control );
} else if (input == config::InputType::FILEDMA) {
// Create FILE DMA reader
input_filter = std::make_shared<FileDmaInputFilter>( conf.getInputFile(), packetBufferSize, nbPacketBuffers, control );
} else if (input == config::InputType::WZDMA ) {
// Create WZ DMA reader
input_filter = std::make_shared<WZDmaInputFilter>( packetBufferSize, nbPacketBuffers, control );
} else if (input == config::InputType::MICRONDMA ) {
// create MicronDmaInputFilter reader
input_filter = std::make_shared<MicronDmaInputFilter>(packetBufferSize, nbPacketBuffers, control, conf) ;
} else {
throw std::invalid_argument("Configuration error: Unknown input type was specified");
}
// Add input reader to a pipeline
pipeline.add_filter( *input_filter );
// Create reformatter and add it to the pipeline
// TODO: Created here so we are not subject of scoping, fix later...
StreamProcessor stream_processor(packetBufferSize, conf.getDoZS(), conf.getProcessorType(), conf.getNOrbitsPerDMAPacket(), prescaleFactor, control);
if ( conf.getEnableStreamProcessor() ) {
pipeline.add_filter( stream_processor );
}
// Create elastic populator (if requested)
std::string url = conf.getElasticUrl();
// TODO: Created here so we are not subject of scoping, fix later...
ElasticProcessor elastic_processor(packetBufferSize,
&control,
url,
conf.getPtCut(),
conf.getQualCut());
if ( conf.getEnableElasticProcessor() ) {
pipeline.add_filter(elastic_processor);
}
// Create file-writing stage and add it to the pipeline
if ( conf.getNOrbitsPerFile() ){
output_stream_by_orbit = std::make_shared<OutputByOrbitStream>( conf.getOutputFilenameBase(), conf.getOutputFilenamePrefix(), control, conf );
pipeline.add_filter( *output_stream_by_orbit );
}else{
output_stream_by_size = std::make_shared<OutputBySizeStream>( conf.getOutputFilenameBase(), conf.getOutputFilenamePrefix(), control);
pipeline.add_filter( *output_stream_by_size );
}
// Run the pipeline
tbb::tick_count t0 = tbb::tick_count::now();
// Need more than one token in flight per thread to keep all threads
// busy; 2-4 works
pipeline.run( nbThreads * 4 );
tbb::tick_count t1 = tbb::tick_count::now();
if ( !silent ) {
LOG(INFO) << "time = " << (t1-t0).seconds();
}
return 1;
}
int main( int argc, char* argv[] ) {
(void)(argc);
(void)(argv);
int retval = 0;
if(argc < 2){
LOG(DEBUG) << "no arguments provided to scdaq, try --help";
return 1;
}
if((std::string(argv[1]) == "-h") || (std::string(argv[1]) == "--help")){
LOG(DEBUG) << "HELP: expected arguments --config [configfilename]";
return 1;
}
if((argc != 3)){
LOG(ERROR) << "error occurred, number of arguments != 2, expected --config [configfilename] , try --help";
return 1;
}
if (std::string(argv[1]) == "--config"){
LOG(DEBUG) << "scdaq started with conffile: " << std::string(argv[2]);
}else{
LOG(ERROR) << "invalid argument, expected --config, see --help";
return 1;
}
try {
config conf(argv[2]);
conf.print();
LOG(DEBUG) << "Configuration loaded";
ctrl control;
// tbb::tick_count mainStartTime = tbb::tick_count::now();
control.running = false;
control.run_number = 0;
control.orbit_trailer_error_count = 0;
control.packet_count = 0;
control.max_file_size = conf.getOutputMaxFileSize();//in Bytes
control.packets_per_report = conf.getPacketsPerReport();
control.output_force_write = conf.getOutputForceWrite();
control.n_orbits_per_dma_packet = conf.getNOrbitsPerDMAPacket();
control.verbosity = conf.getVerbosity();
control.excessOrbitsPerPacketCount = 0;
// Firmware needs at least 1MB buffer for DMA
if (conf.getDmaPacketBufferSize() < 1024*1024) {
LOG(ERROR) << "dma_packet_buffer_size must be at least 1048576 bytes (1MB), but " << conf.getDmaPacketBufferSize() << " bytes was given. Check the configuration file.";
return 1;
}
boost::asio::io_service io_service;
server s(io_service, conf.getPortNumber(), control);
boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service));
int nbThreads = conf.getNumThreads();
retval = 0;
try {
tbb::task_scheduler_init init( nbThreads );
if (!run_pipeline (nbThreads, control, conf)) {
retval = 1;
// Will terminate with errorocde set
}
} catch(std::exception& e) {
LOG(ERROR) << "Error in pipelines. Error text is: \"" << e.what() << "\"";
retval = 1;
// Will terminate with errorocde set
}
LOG(DEBUG) << "Terminating the internal TCP server.";
io_service.stop();
if (!t.try_join_for(boost::chrono::milliseconds(2000))) {
LOG(ERROR) << "Failed to terminate the internal TCP server, the program may crash on exit.";
} else {
LOG(DEBUG) << "Internal TCP server is terminated.";
}
// utility::report_elapsed_time((tbb::tick_count::now() - mainStartTime).seconds());
return retval;
} catch (std::exception& e) {
LOG(FATAL) << "Configuration invalid! Error text is \"" << e.what() << "\" Bailing out.";
return EX_CONFIG;
} catch(std::exception& e) {
LOG(ERROR) << "Error occurred. error text is: \"" << e.what() << "\"";
return 1;
}
}