From 620925b99fa524609d91452b8ccae74de8d12095 Mon Sep 17 00:00:00 2001 From: Petr Zejdl <petr.zejdl@cern.ch> Date: Thu, 13 Oct 2022 15:53:13 +0200 Subject: [PATCH] Processor monitoring moved under processor class --- src/Makefile | 4 ++-- src/monitor.h | 14 -------------- src/processor.cc | 48 +++++++++++++++++++++++++----------------------- src/processor.h | 22 ++++++++++++++++++---- src/scdaq.cc | 15 ++++----------- 5 files changed, 49 insertions(+), 54 deletions(-) delete mode 100644 src/monitor.h diff --git a/src/Makefile b/src/Makefile index b86be105..cdf0f2bb 100644 --- a/src/Makefile +++ b/src/Makefile @@ -57,10 +57,10 @@ ${TARGET}: ${OBJECTS} #test2.o : product.h test2.h -scdaq.o: processor.h elastico.h OutputBySize.h OutputByOrbit.h format.h server.h controls.h monitor.h config.h session.h log.h +scdaq.o: processor.h elastico.h OutputBySize.h OutputByOrbit.h format.h server.h controls.h config.h session.h log.h config.o: config.h log.h DmaInputFilter.o: DmaInputFilter.h slice.h -elastico.o: elastico.h format.h slice.h controls.h monitor.h log.h +elastico.o: elastico.h format.h slice.h controls.h log.h FileDmaInputFilter.o: FileDmaInputFilter.h InputFilter.h log.h InputFilter.o: InputFilter.h log.h outputBySize.o: OutputBySize.h slice.h log.h diff --git a/src/monitor.h b/src/monitor.h deleted file mode 100644 index f8d6ec8e..00000000 --- a/src/monitor.h +++ /dev/null @@ -1,14 +0,0 @@ -#ifndef MONITOR_H -#define MONITOR_H -#include <stdint.h> -#include <atomic> -#include <string> - -struct mon { - std::atomic<uint32_t> orbit_trailer_error_count; - std::atomic<uint64_t> packet_count; - std::atomic<uint64_t> excessOrbitsPerPacketCount; - std::atomic<uint64_t> packet_skipped_inconsistent_size; - std::atomic<uint64_t> n_consistent_sized_packets; -}; -#endif diff --git a/src/processor.cc b/src/processor.cc index 6bf9a622..abac24e6 100644 --- a/src/processor.cc +++ b/src/processor.cc @@ -7,7 +7,10 @@ #include <vector> #include <algorithm> -StreamProcessor::StreamProcessor(size_t max_size_, bool doZS_, ProcessorType processorType_, uint32_t nOrbitsPerDMAPacket_, uint32_t prescaleFactor_, ctrl& control_, mon& monitor_) : +// Definition of the static member stats +StreamProcessor::Statistics StreamProcessor::stats; + +StreamProcessor::StreamProcessor(size_t max_size_, bool doZS_, ProcessorType processorType_, uint32_t nOrbitsPerDMAPacket_, uint32_t prescaleFactor_, ctrl& control_) : tbb::filter(parallel), max_size(max_size_), nbPackets(0), @@ -15,9 +18,8 @@ StreamProcessor::StreamProcessor(size_t max_size_, bool doZS_, ProcessorType pro processorType(processorType_), nOrbitsPerDMAPacket(nOrbitsPerDMAPacket_), prescaleFactor(prescaleFactor_), - control(control_), - monitor(monitor_) -{ + control(control_) +{ LOG(TRACE) << "Created transform filter at " << static_cast<void*>(this); myfile.open ("example.txt"); } @@ -47,15 +49,15 @@ bool StreamProcessor::CheckFrameMultBlock(size_t inputSize){ int bsize = sizeof(blockMuon); if ((inputSize-nOrbitsPerDMAPacket*constants::orbit_trailer_size - 32*nOrbitsPerDMAPacket -32)%bsize!=0){ - monitor.n_consistent_sized_packets = 0; - monitor.packet_skipped_inconsistent_size++; + stats.n_consistent_sized_packets = 0; + stats.packet_skipped_inconsistent_size++; - if (( monitor.packet_skipped_inconsistent_size++ == 1) || - ((monitor.packet_skipped_inconsistent_size < 100) && (monitor.packet_skipped_inconsistent_size%10 == 0)) || - ((monitor.packet_skipped_inconsistent_size < 1000) && (monitor.packet_skipped_inconsistent_size%100 == 0)) || - ((monitor.packet_skipped_inconsistent_size < 10000) && (monitor.packet_skipped_inconsistent_size%1000 == 0)) || - (monitor.packet_skipped_inconsistent_size%10000 == 0)){ - LOG(WARNING) << "Frame size not a multiple of block size after headers and trailers have been subtracted. Counted " << monitor.packet_skipped_inconsistent_size << " packets skipped."; + if (( stats.packet_skipped_inconsistent_size++ == 1) || + ((stats.packet_skipped_inconsistent_size < 100) && (stats.packet_skipped_inconsistent_size%10 == 0)) || + ((stats.packet_skipped_inconsistent_size < 1000) && (stats.packet_skipped_inconsistent_size%100 == 0)) || + ((stats.packet_skipped_inconsistent_size < 10000) && (stats.packet_skipped_inconsistent_size%1000 == 0)) || + (stats.packet_skipped_inconsistent_size%10000 == 0)) { + LOG(WARNING) << "Frame size not a multiple of block size after headers and trailers have been subtracted. Counted " << stats.packet_skipped_inconsistent_size << " packets skipped."; } if (control.verbosity != 0){ @@ -65,10 +67,10 @@ bool StreamProcessor::CheckFrameMultBlock(size_t inputSize){ } return false; } else { - monitor.n_consistent_sized_packets++; - if (monitor.n_consistent_sized_packets == 6000000){ // every ~10 mins - LOG(WARNING) << "Resetting packet_skipped_inconsistent_size counter to 0 after 6000000M consistnent packets. Count was at "<< monitor.packet_skipped_inconsistent_size; - monitor.packet_skipped_inconsistent_size = 0; + stats.n_consistent_sized_packets++; + if (stats.n_consistent_sized_packets == 6000000){ // every ~10 mins + LOG(WARNING) << "Resetting packet_skipped_inconsistent_size counter to 0 after 6000000M consistnent packets. Count was at "<< stats.packet_skipped_inconsistent_size; + stats.packet_skipped_inconsistent_size = 0; } } return true; @@ -265,11 +267,11 @@ StreamProcessor::fillOrbitMetadata StreamProcessor::FillOrbitMuon(std::vector<un void StreamProcessor::process(Slice& input, Slice& out) { nbPackets++; - monitor.packet_count++; + stats.packet_count++; //Implement prescale - only for CALO if ((processorType == ProcessorType::CALO) && (prescaleFactor!=0)) { - if (monitor.packet_count%prescaleFactor != 0){return;} + if (stats.packet_count%prescaleFactor != 0){return;} } char* rd_ptr = input.begin(); @@ -300,8 +302,8 @@ void StreamProcessor::process(Slice& input, Slice& out) bool trailerError = false; bx_vect = CountBX(input, rd_ptr, trailerError); if(trailerError == true){ - monitor.orbit_trailer_error_count++; - LOG(WARNING) << "Orbit trailer error: orbit trailer not found before end of data packet. Packet will be skipped. Orbit trailer error count = " << monitor.orbit_trailer_error_count; + stats.orbit_trailer_error_count++; + LOG(WARNING) << "Orbit trailer error: orbit trailer not found before end of data packet. Packet will be skipped. Orbit trailer error count = " << stats.orbit_trailer_error_count; return; } std::sort(bx_vect.begin(), bx_vect.end()); @@ -346,9 +348,9 @@ void StreamProcessor::process(Slice& input, Slice& out) << std::hex << *dma_trailer_word << ". Orbits per packet count " << orbit_per_packet_count << ", > expected, (" << nOrbitsPerDMAPacket <<") skipping packet."; } - monitor.excessOrbitsPerPacketCount++; - if (monitor.excessOrbitsPerPacketCount%10000 == 0){ - LOG(WARNING) << "count of packets with excess # orbits " << monitor.excessOrbitsPerPacketCount; + stats.excessOrbitsPerPacketCount++; + if (stats.excessOrbitsPerPacketCount%10000 == 0){ + LOG(WARNING) << "count of packets with excess # orbits " << stats.excessOrbitsPerPacketCount; } return; } diff --git a/src/processor.h b/src/processor.h index 31386def..147f8e28 100644 --- a/src/processor.h +++ b/src/processor.h @@ -3,7 +3,6 @@ #include "tbb/pipeline.h" #include "controls.h" -#include "monitor.h" #include "bril_histo.h" #include "format.h" #include <iostream> @@ -15,13 +14,12 @@ class Slice; class StreamProcessor: public tbb::filter { - public: static BrilHistoQueue<std::array<uint32_t, constants::NBXPerOrbit + constants::NFramesInHistoHeader>> BrilQueue; enum class ProcessorType { PASS_THROUGH, GMT, CALO, BRIL }; - StreamProcessor(size_t max_size_, bool doZS_, ProcessorType processorType_, uint32_t nOrbitsPerDMAPacket_, uint32_t prescaleFactor, ctrl& control, mon& monitor); + StreamProcessor(size_t max_size_, bool doZS_, ProcessorType processorType_, uint32_t nOrbitsPerDMAPacket_, uint32_t prescaleFactor, ctrl& control); void* operator()( void* item )/*override*/; ~StreamProcessor(); @@ -45,7 +43,23 @@ private: uint32_t nOrbitsPerDMAPacket; uint32_t prescaleFactor; ctrl& control; - mon& monitor; + + public: + static class Statistics { + public: + Statistics() + : orbit_trailer_error_count(0), + packet_count(0), + excessOrbitsPerPacketCount(0), + packet_skipped_inconsistent_size(0), + n_consistent_sized_packets(0) {} + + std::atomic<uint32_t> orbit_trailer_error_count; + std::atomic<uint64_t> packet_count; + std::atomic<uint64_t> excessOrbitsPerPacketCount; + std::atomic<uint64_t> packet_skipped_inconsistent_size; + std::atomic<uint64_t> n_consistent_sized_packets; + } stats; }; #endif diff --git a/src/scdaq.cc b/src/scdaq.cc index db8bac17..6bba17da 100644 --- a/src/scdaq.cc +++ b/src/scdaq.cc @@ -36,7 +36,7 @@ using namespace std; bool silent = false; -int run_pipeline( int nbThreads, ctrl& control, mon& monitor, config& conf ) +int run_pipeline( int nbThreads, ctrl& control, config& conf ) { config::InputType input = conf.getInput(); size_t packetBufferSize = conf.getDmaPacketBufferSize(); @@ -76,7 +76,7 @@ int run_pipeline( int nbThreads, ctrl& control, mon& monitor, config& conf ) // Create reformatter and add it to the pipeline // TODO: Created here so we are not subject of scoping, fix later... - StreamProcessor stream_processor(packetBufferSize, conf.getDoZS(), conf.getProcessorType(), conf.getNOrbitsPerDMAPacket(), prescaleFactor, control, monitor); + StreamProcessor stream_processor(packetBufferSize, conf.getDoZS(), conf.getProcessorType(), conf.getNOrbitsPerDMAPacket(), prescaleFactor, control); if ( conf.getEnableStreamProcessor() ) { pipeline.add_filter( stream_processor ); } @@ -151,7 +151,6 @@ if(argc < 2){ conf.print(); LOG(DEBUG) << "Configuration loaded"; ctrl control; - mon monitor; // tbb::tick_count mainStartTime = tbb::tick_count::now(); @@ -162,13 +161,7 @@ if(argc < 2){ control.output_force_write = conf.getOutputForceWrite(); control.verbosity = conf.getVerbosity(); control.n_orbits_per_dma_packet = conf.getNOrbitsPerDMAPacket(); - - monitor.orbit_trailer_error_count = 0; - monitor.packet_count = 0; - monitor.excessOrbitsPerPacketCount = 0; - monitor.packet_skipped_inconsistent_size = 0; - monitor.n_consistent_sized_packets = 0; - + // Firmware needs at least 1MB buffer for DMA if (conf.getDmaPacketBufferSize() < 1024*1024) { LOG(ERROR) << "dma_packet_buffer_size must be at least 1048576 bytes (1MB), but " << conf.getDmaPacketBufferSize() << " bytes was given. Check the configuration file."; @@ -184,7 +177,7 @@ if(argc < 2){ retval = 0; try { tbb::task_scheduler_init init( nbThreads ); - if (!run_pipeline (nbThreads, control, monitor, conf)) { + if (!run_pipeline (nbThreads, control, conf)) { retval = 1; // Will terminate with errorocde set } -- GitLab