From 620925b99fa524609d91452b8ccae74de8d12095 Mon Sep 17 00:00:00 2001
From: Petr Zejdl <petr.zejdl@cern.ch>
Date: Thu, 13 Oct 2022 15:53:13 +0200
Subject: [PATCH] Processor monitoring moved under processor class

---
 src/Makefile     |  4 ++--
 src/monitor.h    | 14 --------------
 src/processor.cc | 48 +++++++++++++++++++++++++-----------------------
 src/processor.h  | 22 ++++++++++++++++++----
 src/scdaq.cc     | 15 ++++-----------
 5 files changed, 49 insertions(+), 54 deletions(-)
 delete mode 100644 src/monitor.h

diff --git a/src/Makefile b/src/Makefile
index b86be105..cdf0f2bb 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -57,10 +57,10 @@ ${TARGET}: ${OBJECTS}
 
 #test2.o : product.h test2.h
 
-scdaq.o:	processor.h elastico.h OutputBySize.h OutputByOrbit.h format.h server.h controls.h monitor.h config.h session.h log.h
+scdaq.o:	processor.h elastico.h OutputBySize.h OutputByOrbit.h format.h server.h controls.h config.h session.h log.h
 config.o:	config.h log.h
 DmaInputFilter.o:	DmaInputFilter.h slice.h
-elastico.o:	elastico.h format.h slice.h controls.h monitor.h log.h
+elastico.o:	elastico.h format.h slice.h controls.h log.h
 FileDmaInputFilter.o:	FileDmaInputFilter.h InputFilter.h log.h
 InputFilter.o:	InputFilter.h log.h
 outputBySize.o:	OutputBySize.h slice.h log.h
diff --git a/src/monitor.h b/src/monitor.h
deleted file mode 100644
index f8d6ec8e..00000000
--- a/src/monitor.h
+++ /dev/null
@@ -1,14 +0,0 @@
-#ifndef MONITOR_H
-#define MONITOR_H
-#include <stdint.h>
-#include <atomic>
-#include <string>
-
-struct mon {
-  std::atomic<uint32_t> orbit_trailer_error_count;
-  std::atomic<uint64_t> packet_count;
-  std::atomic<uint64_t> excessOrbitsPerPacketCount;
-  std::atomic<uint64_t> packet_skipped_inconsistent_size;
-  std::atomic<uint64_t> n_consistent_sized_packets;
-};
-#endif 
diff --git a/src/processor.cc b/src/processor.cc
index 6bf9a622..abac24e6 100644
--- a/src/processor.cc
+++ b/src/processor.cc
@@ -7,7 +7,10 @@
 #include <vector>
 #include <algorithm>
 
-StreamProcessor::StreamProcessor(size_t max_size_, bool doZS_, ProcessorType processorType_, uint32_t nOrbitsPerDMAPacket_, uint32_t prescaleFactor_, ctrl& control_, mon& monitor_) :
+// Definition of the static member stats
+StreamProcessor::Statistics StreamProcessor::stats;
+
+StreamProcessor::StreamProcessor(size_t max_size_, bool doZS_, ProcessorType processorType_, uint32_t nOrbitsPerDMAPacket_, uint32_t prescaleFactor_, ctrl& control_) :
 	tbb::filter(parallel),
 	max_size(max_size_),
 	nbPackets(0),
@@ -15,9 +18,8 @@ StreamProcessor::StreamProcessor(size_t max_size_, bool doZS_, ProcessorType pro
 	processorType(processorType_),
 	nOrbitsPerDMAPacket(nOrbitsPerDMAPacket_),
 	prescaleFactor(prescaleFactor_),
- 	control(control_),
-	monitor(monitor_)
-{ 
+	control(control_)
+{
 	LOG(TRACE) << "Created transform filter at " << static_cast<void*>(this);
 	myfile.open ("example.txt");
 }
@@ -47,15 +49,15 @@ bool StreamProcessor::CheckFrameMultBlock(size_t inputSize){
 	int bsize = sizeof(blockMuon);
 	if ((inputSize-nOrbitsPerDMAPacket*constants::orbit_trailer_size - 32*nOrbitsPerDMAPacket -32)%bsize!=0){
 		
-		monitor.n_consistent_sized_packets = 0;
-		monitor.packet_skipped_inconsistent_size++;
+		stats.n_consistent_sized_packets = 0;
+		stats.packet_skipped_inconsistent_size++;
 		
-		if (( monitor.packet_skipped_inconsistent_size++ == 1) || 
-		((monitor.packet_skipped_inconsistent_size < 100) && (monitor.packet_skipped_inconsistent_size%10 == 0)) || 
-		((monitor.packet_skipped_inconsistent_size < 1000) && (monitor.packet_skipped_inconsistent_size%100 == 0)) || 
-		((monitor.packet_skipped_inconsistent_size < 10000) && (monitor.packet_skipped_inconsistent_size%1000 == 0)) || 
-		(monitor.packet_skipped_inconsistent_size%10000 == 0)){
-			LOG(WARNING) << "Frame size not a multiple of block size after headers and trailers have been subtracted. Counted " << monitor.packet_skipped_inconsistent_size << " packets skipped.";
+		if (( stats.packet_skipped_inconsistent_size++ == 1) ||
+		((stats.packet_skipped_inconsistent_size < 100) && (stats.packet_skipped_inconsistent_size%10 == 0)) ||
+		((stats.packet_skipped_inconsistent_size < 1000) && (stats.packet_skipped_inconsistent_size%100 == 0)) ||
+		((stats.packet_skipped_inconsistent_size < 10000) && (stats.packet_skipped_inconsistent_size%1000 == 0)) ||
+		(stats.packet_skipped_inconsistent_size%10000 == 0)) {
+			LOG(WARNING) << "Frame size not a multiple of block size after headers and trailers have been subtracted. Counted " << stats.packet_skipped_inconsistent_size << " packets skipped.";
 		}
 		
 		if (control.verbosity != 0){
@@ -65,10 +67,10 @@ bool StreamProcessor::CheckFrameMultBlock(size_t inputSize){
     		}
 		return false;
 	} else {
-		monitor.n_consistent_sized_packets++;
-		if (monitor.n_consistent_sized_packets == 6000000){ // every ~10 mins
-			LOG(WARNING) << "Resetting packet_skipped_inconsistent_size counter to 0 after 6000000M consistnent packets. Count was at "<< monitor.packet_skipped_inconsistent_size;
-			monitor.packet_skipped_inconsistent_size = 0;
+		stats.n_consistent_sized_packets++;
+		if (stats.n_consistent_sized_packets == 6000000){ // every ~10 mins
+			LOG(WARNING) << "Resetting packet_skipped_inconsistent_size counter to 0 after 6000000M consistnent packets. Count was at "<< stats.packet_skipped_inconsistent_size;
+			stats.packet_skipped_inconsistent_size = 0;
 		}
 	}
 	return true;
@@ -265,11 +267,11 @@ StreamProcessor::fillOrbitMetadata StreamProcessor::FillOrbitMuon(std::vector<un
 void StreamProcessor::process(Slice& input, Slice& out)
 {
 	nbPackets++;
-	monitor.packet_count++;
+	stats.packet_count++;
 	
 	//Implement prescale - only for CALO
 	if ((processorType == ProcessorType::CALO) && (prescaleFactor!=0)) {
-		if (monitor.packet_count%prescaleFactor != 0){return;}
+		if (stats.packet_count%prescaleFactor != 0){return;}
 	}
 	
 	char* rd_ptr = input.begin();
@@ -300,8 +302,8 @@ void StreamProcessor::process(Slice& input, Slice& out)
 		bool trailerError = false;
 		bx_vect = CountBX(input, rd_ptr, trailerError);	
 		if(trailerError == true){
-			monitor.orbit_trailer_error_count++;
-			LOG(WARNING) << "Orbit trailer error: orbit trailer not found before end of data packet. Packet will be skipped. Orbit trailer error count = " << monitor.orbit_trailer_error_count;
+			stats.orbit_trailer_error_count++;
+			LOG(WARNING) << "Orbit trailer error: orbit trailer not found before end of data packet. Packet will be skipped. Orbit trailer error count = " << stats.orbit_trailer_error_count;
 			return;
 		}
 		std::sort(bx_vect.begin(), bx_vect.end());
@@ -346,9 +348,9 @@ void StreamProcessor::process(Slice& input, Slice& out)
 					<< std::hex << *dma_trailer_word << ". Orbits per packet count " 
 					<< orbit_per_packet_count  << ", > expected, (" << nOrbitsPerDMAPacket <<") skipping packet.";
 				}
-				monitor.excessOrbitsPerPacketCount++;
-				if (monitor.excessOrbitsPerPacketCount%10000 == 0){
-					LOG(WARNING) << "count of packets with excess # orbits " << monitor.excessOrbitsPerPacketCount;
+				stats.excessOrbitsPerPacketCount++;
+				if (stats.excessOrbitsPerPacketCount%10000 == 0){
+					LOG(WARNING) << "count of packets with excess # orbits " << stats.excessOrbitsPerPacketCount;
 				}
 				return;
 			}		
diff --git a/src/processor.h b/src/processor.h
index 31386def..147f8e28 100644
--- a/src/processor.h
+++ b/src/processor.h
@@ -3,7 +3,6 @@
 
 #include "tbb/pipeline.h"
 #include "controls.h"    
-#include "monitor.h"    
 #include "bril_histo.h"    
 #include "format.h"    
 #include <iostream>
@@ -15,13 +14,12 @@
 class Slice;
 
 class StreamProcessor: public tbb::filter {
-
 public:
   static BrilHistoQueue<std::array<uint32_t, constants::NBXPerOrbit + constants::NFramesInHistoHeader>> BrilQueue;
   
   enum class ProcessorType { PASS_THROUGH, GMT, CALO, BRIL };
   
-  StreamProcessor(size_t max_size_, bool doZS_, ProcessorType processorType_, uint32_t nOrbitsPerDMAPacket_, uint32_t prescaleFactor, ctrl& control, mon& monitor);
+  StreamProcessor(size_t max_size_, bool doZS_, ProcessorType processorType_, uint32_t nOrbitsPerDMAPacket_, uint32_t prescaleFactor, ctrl& control);
   void* operator()( void* item )/*override*/;
   ~StreamProcessor();
 
@@ -45,7 +43,23 @@ private:
   uint32_t nOrbitsPerDMAPacket;
   uint32_t prescaleFactor;
   ctrl& control;
-  mon& monitor;
+
+ public:
+  static class Statistics {
+   public:
+    Statistics()
+        : orbit_trailer_error_count(0),
+          packet_count(0),
+          excessOrbitsPerPacketCount(0),
+          packet_skipped_inconsistent_size(0),
+          n_consistent_sized_packets(0) {}
+
+    std::atomic<uint32_t> orbit_trailer_error_count;
+    std::atomic<uint64_t> packet_count;
+    std::atomic<uint64_t> excessOrbitsPerPacketCount;
+    std::atomic<uint64_t> packet_skipped_inconsistent_size;
+    std::atomic<uint64_t> n_consistent_sized_packets;
+  } stats;
 };
 
 #endif
diff --git a/src/scdaq.cc b/src/scdaq.cc
index db8bac17..6bba17da 100644
--- a/src/scdaq.cc
+++ b/src/scdaq.cc
@@ -36,7 +36,7 @@ using namespace std;
 
 bool silent = false;
 
-int run_pipeline( int nbThreads, ctrl& control, mon& monitor, config& conf )
+int run_pipeline( int nbThreads, ctrl& control, config& conf )
 {
   config::InputType input = conf.getInput();
   size_t packetBufferSize = conf.getDmaPacketBufferSize();
@@ -76,7 +76,7 @@ int run_pipeline( int nbThreads, ctrl& control, mon& monitor, config& conf )
 
   // Create reformatter and add it to the pipeline
   // TODO: Created here so we are not subject of scoping, fix later...
-  StreamProcessor stream_processor(packetBufferSize, conf.getDoZS(), conf.getProcessorType(), conf.getNOrbitsPerDMAPacket(), prescaleFactor, control, monitor);
+  StreamProcessor stream_processor(packetBufferSize, conf.getDoZS(), conf.getProcessorType(), conf.getNOrbitsPerDMAPacket(), prescaleFactor, control);
   if ( conf.getEnableStreamProcessor() ) {
     pipeline.add_filter( stream_processor );
   }
@@ -151,7 +151,6 @@ if(argc < 2){
     conf.print();
     LOG(DEBUG) << "Configuration loaded";
     ctrl control;
-    mon monitor;
     //    tbb::tick_count mainStartTime = tbb::tick_count::now();
 
 
@@ -162,13 +161,7 @@ if(argc < 2){
     control.output_force_write = conf.getOutputForceWrite();
     control.verbosity = conf.getVerbosity();
     control.n_orbits_per_dma_packet = conf.getNOrbitsPerDMAPacket();
-    
-    monitor.orbit_trailer_error_count = 0;
-    monitor.packet_count = 0;
-    monitor.excessOrbitsPerPacketCount = 0;
-    monitor.packet_skipped_inconsistent_size = 0;
-    monitor.n_consistent_sized_packets = 0;
-    
+
     // Firmware needs at least 1MB buffer for DMA
     if (conf.getDmaPacketBufferSize() < 1024*1024) {
       LOG(ERROR) << "dma_packet_buffer_size must be at least 1048576 bytes (1MB), but " << conf.getDmaPacketBufferSize() << " bytes was given. Check the configuration file.";
@@ -184,7 +177,7 @@ if(argc < 2){
     retval = 0;
     try {
       tbb::task_scheduler_init init( nbThreads );
-      if (!run_pipeline (nbThreads, control, monitor, conf)) {
+      if (!run_pipeline (nbThreads, control, conf)) {
         retval = 1;
         // Will terminate with errorocde set
       }
-- 
GitLab