Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
scdaq.cc 6.77 KiB
#include "tbb/pipeline.h"
#include "tbb/tick_count.h"
#include "tbb/task_scheduler_init.h"
#include "tbb/tbb_allocator.h"
#include "tbb/concurrent_queue.h"
#include <cstring>
#include <cstdlib>
#include <cstdio>
#include <cctype>
#include <string>
#include <iostream>
#include <sysexits.h>

#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

#include "InputFilter.h"
#include "FileDmaInputFilter.h"
#include "WZDmaInputFilter.h"
#include "MicronDmaInputFilter.h"
#include "DmaInputFilter.h"
#include "processor.h"
#include "elastico.h"
#include "OutputBySize.h"
#include "OutputByOrbit.h"
#include "format.h"
#include "server.h"
#include "controls.h"
#include "config.h"
#include "log.h"

using namespace std;


bool silent = false;

int run_pipeline( int nbThreads, ctrl& control, config& conf )
{
  config::InputType input = conf.getInput();
  size_t packetBufferSize = conf.getDmaPacketBufferSize();
  size_t nbPacketBuffers = conf.getNumberOfDmaPacketBuffers();
  uint32_t prescaleFactor = conf.getPrescaleFactor();  
  // Create empty input reader, will assign later when we know what is the data source
  std::shared_ptr<InputFilter> input_filter;
  std::shared_ptr<OutputByOrbitStream> output_stream_by_orbit;
  std::shared_ptr<OutputBySizeStream> output_stream_by_size;
  // Create the pipeline
  tbb::pipeline pipeline;

  if (input == config::InputType::DMA) {
      // Create DMA reader
      input_filter = std::make_shared<DmaInputFilter>( conf.getDmaDevice(), packetBufferSize, nbPacketBuffers, control );
  
  } else if (input == config::InputType::FILEDMA) {
      // Create FILE DMA reader
      input_filter = std::make_shared<FileDmaInputFilter>( conf.getInputFile(), packetBufferSize, nbPacketBuffers, control );

  } else if (input == config::InputType::WZDMA ) {
      
	// Create WZ DMA reader
      input_filter = std::make_shared<WZDmaInputFilter>( packetBufferSize, nbPacketBuffers, control );

  
  } else if (input == config::InputType::MICRONDMA ) {
      // create MicronDmaInputFilter reader
      input_filter = std::make_shared<MicronDmaInputFilter>(packetBufferSize, nbPacketBuffers, control, conf) ;
  } else {
    throw std::invalid_argument("Configuration error: Unknown input type was specified");
  }

  // Add input reader to a pipeline
  pipeline.add_filter( *input_filter );


  // Create reformatter and add it to the pipeline
  // TODO: Created here so we are not subject of scoping, fix later...
  StreamProcessor stream_processor(packetBufferSize, conf.getDoZS(), conf.getProcessorType(), conf.getNOrbitsPerDMAPacket(), prescaleFactor, control);
  if ( conf.getEnableStreamProcessor() ) {
    pipeline.add_filter( stream_processor );
  }

  // Create elastic populator (if requested)
  std::string url = conf.getElasticUrl();
  // TODO: Created here so we are not subject of scoping, fix later...
  ElasticProcessor elastic_processor(packetBufferSize,
              &control,
              url,
              conf.getPtCut(),
              conf.getQualCut());
  if ( conf.getEnableElasticProcessor() ) {
    pipeline.add_filter(elastic_processor);
  }

  // Create file-writing stage and add it to the pipeline
  if ( conf.getNOrbitsPerFile() ){
    output_stream_by_orbit = std::make_shared<OutputByOrbitStream>( conf.getOutputFilenameBase(), conf.getOutputFilenamePrefix(), control, conf );
    pipeline.add_filter( *output_stream_by_orbit );
  }else{ 
    output_stream_by_size = std::make_shared<OutputBySizeStream>( conf.getOutputFilenameBase(), conf.getOutputFilenamePrefix(), control);
    pipeline.add_filter( *output_stream_by_size );
  }


  // Run the pipeline
  tbb::tick_count t0 = tbb::tick_count::now();
  // Need more than one token in flight per thread to keep all threads 
  // busy; 2-4 works
  pipeline.run( nbThreads * 4 );
  tbb::tick_count t1 = tbb::tick_count::now();

  if ( !silent ) {
    LOG(INFO) << "time = " << (t1-t0).seconds();
  }

  return 1;
}


int main( int argc, char* argv[] ) {
  (void)(argc);
  (void)(argv);
  int retval = 0;

if(argc < 2){
		LOG(DEBUG) << "no arguments provided to scdaq, try --help";
		return 1;
	
		}	

	if((std::string(argv[1]) == "-h") || (std::string(argv[1]) == "--help")){
		LOG(DEBUG) << "HELP: expected arguments --config [configfilename]";
		return 1;
	}

	if((argc != 3)){
		LOG(ERROR) << "error occurred, number of arguments != 2, expected --config [configfilename] , try --help";
		return 1;
	}
	
	if (std::string(argv[1]) == "--config"){
 		LOG(DEBUG) << "scdaq started with conffile:   " << std::string(argv[2]);
		}else{
		LOG(ERROR) << "invalid argument, expected --config, see --help";
		return 1;
	}

  try {
    config conf(argv[2]);
    conf.print();
    LOG(DEBUG) << "Configuration loaded";
    ctrl control;
    //    tbb::tick_count mainStartTime = tbb::tick_count::now();


    control.running = false;
    control.run_number = 0;
    control.orbit_trailer_error_count = 0;
    control.packet_count = 0;
    control.max_file_size = conf.getOutputMaxFileSize();//in Bytes
    control.packets_per_report = conf.getPacketsPerReport();
    control.output_force_write = conf.getOutputForceWrite();
    control.n_orbits_per_dma_packet = conf.getNOrbitsPerDMAPacket();
    control.verbosity = conf.getVerbosity();
    control.excessOrbitsPerPacketCount = 0;
    
    // Firmware needs at least 1MB buffer for DMA
    if (conf.getDmaPacketBufferSize() < 1024*1024) {
      LOG(ERROR) << "dma_packet_buffer_size must be at least 1048576 bytes (1MB), but " << conf.getDmaPacketBufferSize() << " bytes was given. Check the configuration file.";
      return 1;
    }

    boost::asio::io_service io_service;
    server s(io_service, conf.getPortNumber(), control);
    boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service));

    int nbThreads = conf.getNumThreads();

    retval = 0;
    try {
      tbb::task_scheduler_init init( nbThreads );
      if (!run_pipeline (nbThreads, control, conf)) {
        retval = 1;
        // Will terminate with errorocde set
      }
    } catch(std::exception& e) {
      LOG(ERROR) << "Error in pipelines. Error text is: \"" << e.what() << "\"";
      retval = 1;
      // Will terminate with errorocde set
    }

    LOG(DEBUG) << "Terminating the internal TCP server.";
    io_service.stop();
    if (!t.try_join_for(boost::chrono::milliseconds(2000))) {
      LOG(ERROR) << "Failed to terminate the internal TCP server, the program may crash on exit.";
    } else {
      LOG(DEBUG) << "Internal TCP server is terminated.";
    }

    //    utility::report_elapsed_time((tbb::tick_count::now() - mainStartTime).seconds());
    return retval;
  } catch (std::exception& e) {
    LOG(FATAL) << "Configuration invalid! Error text is \"" << e.what() << "\" Bailing out.";
    return EX_CONFIG;
  } catch(std::exception& e) {
    LOG(ERROR) << "Error occurred. error text is: \"" << e.what() << "\"";
    return 1;
  }
}