Skip to content
Snippets Groups Projects
Commit 1a8c346d authored by Petr Zejdl's avatar Petr Zejdl
Browse files

Changing pointers to references

parent 55b9fa80
No related branches found
No related tags found
1 merge request!59CMSSW json file
...@@ -39,12 +39,12 @@ using namespace std; ...@@ -39,12 +39,12 @@ using namespace std;
bool silent = false; bool silent = false;
int run_pipeline( int nthreads, ctrl& control, config *conf) int run_pipeline( int nbThreads, ctrl& control, config& conf )
{ {
config::InputType input = conf->getInput(); config::InputType input = conf.getInput();
size_t MAX_BYTES_PER_INPUT_SLICE = conf->getDmaPacketBufferSize(); size_t MAX_BYTES_PER_INPUT_SLICE = conf.getDmaPacketBufferSize();
size_t TOTAL_SLICES = conf->getNumberOfDmaPacketBuffers(); size_t TOTAL_SLICES = conf.getNumberOfDmaPacketBuffers();
// Create empty input reader, will assing later when we know what is the data source // Create empty input reader, will assing later when we know what is the data source
std::shared_ptr<InputFilter> input_filter; std::shared_ptr<InputFilter> input_filter;
...@@ -54,20 +54,20 @@ int run_pipeline( int nthreads, ctrl& control, config *conf) ...@@ -54,20 +54,20 @@ int run_pipeline( int nthreads, ctrl& control, config *conf)
if (input == config::InputType::FILE) { if (input == config::InputType::FILE) {
// Create file-reading writing stage and add it to the pipeline // Create file-reading writing stage and add it to the pipeline
MAX_BYTES_PER_INPUT_SLICE = 192*conf->getBlocksPerInputBuffer(); MAX_BYTES_PER_INPUT_SLICE = 192*conf.getBlocksPerInputBuffer();
TOTAL_SLICES = conf->getNumInputBuffers(); TOTAL_SLICES = conf.getNumInputBuffers();
//input_filter = std::make_shared<FileInputFilter>( conf->getInputFile(), MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES ); //input_filter = std::make_shared<FileInputFilter>( conf.getInputFile(), MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES );
throw std::runtime_error("input type FILE is temporarily not supported"); throw std::runtime_error("input type FILE is temporarily not supported");
} else if (input == config::InputType::DMA) { } else if (input == config::InputType::DMA) {
// Create DMA reader // Create DMA reader
//input_filter = std::make_shared<DmaInputFilter>( conf->getDmaDevice(), MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES ); //input_filter = std::make_shared<DmaInputFilter>( conf.getDmaDevice(), MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES );
throw std::runtime_error("input type DMA is temporarily not supported"); throw std::runtime_error("input type DMA is temporarily not supported");
} else if (input == config::InputType::FILEDMA) { } else if (input == config::InputType::FILEDMA) {
// Create FILE DMA reader // Create FILE DMA reader
input_filter = std::make_shared<FileDmaInputFilter>( conf->getInputFile(), MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES, control ); input_filter = std::make_shared<FileDmaInputFilter>( conf.getInputFile(), MAX_BYTES_PER_INPUT_SLICE, TOTAL_SLICES, control );
} else if (input == config::InputType::WZDMA ) { } else if (input == config::InputType::WZDMA ) {
// Create WZ DMA reader // Create WZ DMA reader
...@@ -87,23 +87,23 @@ int run_pipeline( int nthreads, ctrl& control, config *conf) ...@@ -87,23 +87,23 @@ int run_pipeline( int nthreads, ctrl& control, config *conf)
// Create reformatter and add it to the pipeline // Create reformatter and add it to the pipeline
// TODO: Created here so we are not subject of scoping, fix later... // TODO: Created here so we are not subject of scoping, fix later...
StreamProcessor stream_processor(MAX_BYTES_PER_INPUT_SLICE); StreamProcessor stream_processor(MAX_BYTES_PER_INPUT_SLICE);
if ( conf->getEnableStreamProcessor() ) { if ( conf.getEnableStreamProcessor() ) {
pipeline.add_filter( stream_processor ); pipeline.add_filter( stream_processor );
} }
// Create elastic populator (if requested) // Create elastic populator (if requested)
std::string url = conf->getElasticUrl(); std::string url = conf.getElasticUrl();
// TODO: Created here so we are not subject of scoping, fix later... // TODO: Created here so we are not subject of scoping, fix later...
ElasticProcessor elastic_processor(MAX_BYTES_PER_INPUT_SLICE, ElasticProcessor elastic_processor(MAX_BYTES_PER_INPUT_SLICE,
&control, &control,
url, url,
conf->getPtCut(), conf.getPtCut(),
conf->getQualCut()); conf.getQualCut());
if ( conf->getEnableElasticProcessor() ) { if ( conf.getEnableElasticProcessor() ) {
pipeline.add_filter(elastic_processor); pipeline.add_filter(elastic_processor);
} }
std::string output_file_base = conf->getOutputFilenameBase(); std::string output_file_base = conf.getOutputFilenameBase();
// Create file-writing stage and add it to the pipeline // Create file-writing stage and add it to the pipeline
OutputStream output_stream( output_file_base.c_str(), control); OutputStream output_stream( output_file_base.c_str(), control);
...@@ -113,7 +113,7 @@ int run_pipeline( int nthreads, ctrl& control, config *conf) ...@@ -113,7 +113,7 @@ int run_pipeline( int nthreads, ctrl& control, config *conf)
tbb::tick_count t0 = tbb::tick_count::now(); tbb::tick_count t0 = tbb::tick_count::now();
// Need more than one token in flight per thread to keep all threads // Need more than one token in flight per thread to keep all threads
// busy; 2-4 works // busy; 2-4 works
pipeline.run( nthreads*4 ); pipeline.run( nbThreads * 4 );
tbb::tick_count t1 = tbb::tick_count::now(); tbb::tick_count t1 = tbb::tick_count::now();
if ( !silent ) { if ( !silent ) {
...@@ -147,9 +147,10 @@ int main( int argc, char* argv[] ) { ...@@ -147,9 +147,10 @@ int main( int argc, char* argv[] ) {
server s(io_service, conf.getPortNumber(), control); server s(io_service, conf.getPortNumber(), control);
boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service)); boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service));
int p = conf.getNumThreads(); int nbThreads = conf.getNumThreads();
tbb::task_scheduler_init init(p);
if(!run_pipeline (p, control, &conf)) tbb::task_scheduler_init init( nbThreads );
if (!run_pipeline (nbThreads, control, conf))
return 1; return 1;
// utility::report_elapsed_time((tbb::tick_count::now() - mainStartTime).seconds()); // utility::report_elapsed_time((tbb::tick_count::now() - mainStartTime).seconds());
......
...@@ -20,7 +20,7 @@ dma_number_of_packet_buffers:1000 ...@@ -20,7 +20,7 @@ dma_number_of_packet_buffers:1000
# Print report each N packets, use 0 to disable # Print report each N packets, use 0 to disable
packets_per_report:5000 packets_per_report:5000
#packets_per_report:10 #packets_per_report:1
## Settings for file input ## Settings for file input
#input_file:/dev/shm/testdata.bin #input_file:/dev/shm/testdata.bin
...@@ -34,12 +34,14 @@ blocks_buffer:1000 ...@@ -34,12 +34,14 @@ blocks_buffer:1000
output_filename_base:/fff/BU0/ramdisk/scdaq output_filename_base:/fff/BU0/ramdisk/scdaq
max_file_size:8589934592 max_file_size:8589934592
threads:8 # Elastics processor
port:8000 port:8000
elastic_url:http://something.somewhere elastic_url:http://something.somewhere
pt_cut:7 pt_cut:7
quality_cut:12 quality_cut:12
# Pipeline settings # Pipeline settings
threads:8
enable_stream_processor:yes enable_stream_processor:yes
enable_elastic_processor:no enable_elastic_processor:no
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment