From 1fb679d4390cff6152531f3b68d4dee561625756 Mon Sep 17 00:00:00 2001
From: Rosen Matev <>
Date: Wed, 24 Aug 2022 01:42:46 +0200
Subject: [PATCH] Add filter to select events that emitted a certain message.

 Hlt/HLTScheduler/src/ConfigurableDummy.cpp    |  10 +-
 Kernel/LHCbAlgs/CMakeLists.txt                |   1 +
 Kernel/LHCbAlgs/src/MessageSvcEventFilter.cpp | 141 ++++++++++++++++++
 Kernel/LHCbAlgs/tests/options/ |  49 ++++++
 .../qmtest/lhcbalgs.qms/error_stream.qmt      |  32 ++++
 5 files changed, 229 insertions(+), 4 deletions(-)
 create mode 100644 Kernel/LHCbAlgs/src/MessageSvcEventFilter.cpp
 create mode 100644 Kernel/LHCbAlgs/tests/options/
 create mode 100644 Kernel/LHCbAlgs/tests/qmtest/lhcbalgs.qms/error_stream.qmt

diff --git a/Hlt/HLTScheduler/src/ConfigurableDummy.cpp b/Hlt/HLTScheduler/src/ConfigurableDummy.cpp
index 39128c7a548..97e16aaeb1f 100644
--- a/Hlt/HLTScheduler/src/ConfigurableDummy.cpp
+++ b/Hlt/HLTScheduler/src/ConfigurableDummy.cpp
@@ -26,8 +26,8 @@ public:
   StatusCode initialize() override;
-  /// Pick up late-attributed data outputs
-  Gaudi::Property<int> m_CFD{this, "CFD", 1, "ControlFlowDecision is true every Nth events"};
+  Gaudi::Property<int>  m_CFD{this, "CFD", 1, "ControlFlowDecision is true every Nth events"};
+  Gaudi::Property<bool> m_decisionWarning{this, "DecisionWarning", false, "Emit a warning for false decisions"};
   Gaudi::Property<std::vector<std::string>> m_inpKeys{this, "inpKeys", {}, ""};
   Gaudi::Property<std::vector<std::string>> m_outKeys{this, "outKeys", {}, ""};
@@ -93,6 +93,8 @@ StatusCode ConfigurableDummy::execute( EventContext const& context ) const // th
     outputHandle->put( std::make_unique<DataObject>() );
-  if ( m_CFD > 0 && context.evt() % m_CFD == 0 ) return Gaudi::Functional::FilterDecision::PASSED;
-  return Gaudi::Functional::FilterDecision::FAILED;
+  bool decision = m_CFD > 0 && context.evt() % m_CFD == 0;
+  if ( m_decisionWarning && !decision ) { warning() << "Event did not pass" << endmsg; }
+  return decision ? Gaudi::Functional::FilterDecision::PASSED : Gaudi::Functional::FilterDecision::FAILED;
diff --git a/Kernel/LHCbAlgs/CMakeLists.txt b/Kernel/LHCbAlgs/CMakeLists.txt
index 27746f04ecc..6a563185143 100644
--- a/Kernel/LHCbAlgs/CMakeLists.txt
+++ b/Kernel/LHCbAlgs/CMakeLists.txt
@@ -66,6 +66,7 @@ gaudi_add_module(LHCbAlgs
+        src/MessageSvcEventFilter.cpp
diff --git a/Kernel/LHCbAlgs/src/MessageSvcEventFilter.cpp b/Kernel/LHCbAlgs/src/MessageSvcEventFilter.cpp
new file mode 100644
index 00000000000..7851b4ff599
--- /dev/null
+++ b/Kernel/LHCbAlgs/src/MessageSvcEventFilter.cpp
@@ -0,0 +1,141 @@
+* (c) Copyright 2022-2024 CERN for the benefit of the LHCb Collaboration      *
+*                                                                             *
+* This software is distributed under the terms of the GNU General Public      *
+* Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".   *
+*                                                                             *
+* In applying this licence, CERN does not waive the privileges and immunities *
+* granted to it by virtue of its status as an Intergovernmental Organization  *
+* or submit itself to any jurisdiction.                                       *
+#include "LHCbAlgs/FilterPredicate.h"
+/** @class MessageSvcEventFilter
+ *
+ *  Pass when the processing of the event caused a certain message.
+ *
+ *  This filter can be configured to filter on ERROR/FATAL messages, for example,
+ *  and can be inserted in the control flow before a writer.
+ *  In this way one can create an "error stream" and save events that
+ *  exhibit errors in the processing.
+ *
+ *  The algorithm hooks into the MessageSvc by modifying the default stream (at start).
+ *  The modified ostream matches the formatted messages it sees with a regex pattern.
+ *  (Take care to synchronise the MessageSvc format with the pattern.)
+ *  If the message matches, a "flag" object is put on the TES (in the slot assigned to
+ *  the algorithm's execution that emitted the message).
+ *  Then, when the MessageSvcEventFilter algorithm is executed, it simply checks the existence of
+ *  the flag object on the TES and passes only if it finds one.
+ *
+ *  Note on implementation. The interface msgSvc()->insertStream() seems more appropriate but we
+ *  cannot use it. Using it would look like this
+ *    msgSvc()->insertStream( MSG::WARNING, "default_stream", msgSvc()->defaultStream() );
+ *    msgSvc()->insertStream( MSG::WARNING, name() + "_spy_stream", m_spyStream.get() );
+ *    ...
+ *    msgSvc()->eraseStream( MSG::WARNING );
+ *  The problem is that the MessageSvc as implemented does not apply formatting in the case
+ *  of explicitly inserted streams. It is impossible to apply it here (we only see strings).
+ *  TODO report/fix this in Gaudi
+ *
+ */
+namespace {
+  using DataHandle = DataObjectWriteHandle<DataObject>;
+  std::ostream* s_origStream = nullptr;
+  class spystreambuf : public std::stringbuf {
+  public:
+    explicit spystreambuf( std::string pattern, DataHandle& dh ) : m_pattern( pattern ), m_dataHandle( dh ) {}
+  private:
+    int sync() override {
+      std::ptrdiff_t n = pptr() - pbase();
+      std::string    temp;
+      temp.assign( pbase(), n );
+      *s_origStream << temp << std::flush;
+      pbump( -n );
+      return 0;
+    }
+    std::streamsize xsputn( const char_type* s, std::streamsize count ) override {
+      std::string temp;
+      temp.assign( s, count );
+      *s_origStream << temp;
+      // FIXME actually use pattern here
+      if ( m_dataHandle.getIfExists() == nullptr ) {
+        try {
+          m_dataHandle.put( std::make_unique<DataObject>() );
+        } catch ( GaudiException& e ) { std::cerr << e.message() << std::endl; };
+      }
+      return count;
+    }
+    int_type overflow( int_type ch ) override {
+      *s_origStream << (char)ch;
+      return std::stringbuf::overflow( ch );
+    }
+    // spystreambuf( const spystreambuf& );
+    // spystreambuf& operator=( const spystreambuf& );
+    std::string m_pattern;
+    DataHandle& m_dataHandle;
+  };
+} // namespace
+class MessageSvcEventFilter : public LHCb::Algorithm::FilterPredicate<bool()> {
+  using FilterPredicate::FilterPredicate;
+  StatusCode start() override;
+  StatusCode stop() override;
+  bool       operator()() const override;
+  Gaudi::Property<std::string> m_pattern{
+      this,
+      "PatternRegex",
+      "",
+      [this]( const auto& ) { this->m_regex = this->m_pattern.value(); },
+      "std::regex pattern that matches messages",
+  };
+  DataHandle m_flag{this, "FlagLocation", ""};
+  mutable Gaudi::Accumulators::BinomialCounter<> m_counter{this, "Accepted"};
+  std::regex                    m_regex;
+  std::unique_ptr<spystreambuf> m_spyBuf;
+  std::unique_ptr<std::ostream> m_spyStream;
+DECLARE_COMPONENT( MessageSvcEventFilter )
+StatusCode MessageSvcEventFilter::start() {
+  return FilterPredicate::start().andThen( [&] {
+    if ( !s_origStream ) {
+      s_origStream = msgSvc()->defaultStream();
+      // TODO treat missing (nullptr) default stream?
+      m_spyStream.reset(); // in case we're called a second time, destroy the stream before the buffer
+      m_spyBuf    = std::make_unique<spystreambuf>( "pattern", m_flag );
+      m_spyStream = std::make_unique<std::ostream>( m_spyBuf.get() );
+      msgSvc()->setDefaultStream( m_spyStream.get() );
+    }
+  } );
+StatusCode MessageSvcEventFilter::stop() {
+  if ( s_origStream ) {
+    msgSvc()->setDefaultStream( s_origStream );
+    s_origStream = nullptr;
+  }
+  return FilterPredicate::stop();
+bool MessageSvcEventFilter::operator()() const {
+  bool pass = m_flag.getIfExists() != nullptr;
+  m_counter += pass;
+  return pass;
diff --git a/Kernel/LHCbAlgs/tests/options/ b/Kernel/LHCbAlgs/tests/options/
new file mode 100644
index 00000000000..af95b181a00
--- /dev/null
+++ b/Kernel/LHCbAlgs/tests/options/
@@ -0,0 +1,49 @@
+# (c) Copyright 2022-2024 CERN for the benefit of the LHCb Collaboration      #
+#                                                                             #
+# This software is distributed under the terms of the GNU General Public      #
+# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".   #
+#                                                                             #
+# In applying this licence, CERN does not waive the privileges and immunities #
+# granted to it by virtue of its status as an Intergovernmental Organization  #
+# or submit itself to any jurisdiction.                                       #
+from PyConf.Algorithms import (
+    MessageSvcEventFilter,
+    ConfigurableDummy,
+from PyConf.application import ApplicationOptions, configure, configure_input
+from PyConf.control_flow import CompositeNode, NodeLogic
+message_producer = ConfigurableDummy(
+    CFD=3, DecisionWarning=True)  # 33% prescale
+algs_node = CompositeNode(
+    "algs", [message_producer],
+    combine_logic=NodeLogic.LAZY_AND,
+    force_order=True)
+event_filter = MessageSvcEventFilter()
+writer_node = CompositeNode(
+    "writer", [event_filter],
+    combine_logic=NodeLogic.LAZY_AND,
+    force_order=True)
+top = CompositeNode(
+    "top", [algs_node, writer_node],
+    combine_logic=NodeLogic.NONLAZY_AND,
+    force_order=True)
+# Define the application environment and run it
+options = ApplicationOptions(_enabled=False)
+options.n_threads = 4
+options.n_event_slots = 4
+options.evt_max = 50
+options.input_type = "NONE"
+options.dddb_tag = "dummy"
+options.conddb_tag = "dummy"
+options.simulation = False
+config2 = configure_input(options)
+config = configure(options, top)
diff --git a/Kernel/LHCbAlgs/tests/qmtest/lhcbalgs.qms/error_stream.qmt b/Kernel/LHCbAlgs/tests/qmtest/lhcbalgs.qms/error_stream.qmt
new file mode 100644
index 00000000000..cf7fd7182c3
--- /dev/null
+++ b/Kernel/LHCbAlgs/tests/qmtest/lhcbalgs.qms/error_stream.qmt
@@ -0,0 +1,32 @@
+<?xml version="1.0" ?><!DOCTYPE extension  PUBLIC '-//QM/2.3/Extension//EN'  ''>
+    (c) Copyright 2000-2024 CERN for the benefit of the LHCb Collaboration
+    This software is distributed under the terms of the GNU General Public
+    Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".
+    In applying this licence, CERN does not waive the privileges and immunities
+    granted to it by virtue of its status as an Intergovernmental Organization
+    or submit itself to any jurisdiction.
+<extension class="GaudiTest.GaudiExeTest" kind="test">
+  <argument name="program"><text></text></argument>
+  <argument name="args"><set>
+      <text>../options/</text>
+  </set></argument>
+  <argument name="use_temp_dir"><enumeral>per-test</enumeral></argument>
+  <argument name="validator">
+    <text>
+countErrorLines({"FATAL": 0, "ERROR": 0, "WARNING": 33})
+# Being determinisitic, the emulator should always accept the same events and
+# hence emulate the same accept fractions
+NONLAZY_AND: top                                        #=50      Sum=0           Eff=|( 0.000000 +- 0.00000 )%|
+ LAZY_AND: algs                                         #=50      Sum=17          Eff=|( 34.00000 +- 6.69925 )%|
+  ConfigurableDummy/ConfigurableDummy_b0635f1e          #=50      Sum=17          Eff=|( 34.00000 +- 6.69925 )%|
+ LAZY_AND: writer                                       #=50      Sum=33          Eff=|( 66.00000 +- 6.69925 )%|
+  MessageSvcEventFilter/MessageSvcEventFilter_56e8af36  #=50      Sum=33          Eff=|( 66.00000 +- 6.69925 )%|
+    </text>
+  </argument>